1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

# This file is part of Buildbot.  Buildbot is free software: you can 

# redistribute it and/or modify it under the terms of the GNU General Public 

# License as published by the Free Software Foundation, version 2. 

# 

# This program is distributed in the hope that it will be useful, but WITHOUT 

# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 

# FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more 

# details. 

# 

# You should have received a copy of the GNU General Public License along with 

# this program; if not, write to the Free Software Foundation, Inc., 51 

# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 

# 

# Copyright Buildbot Team Members 

 

""" 

A wrapper around `sqlalchemy.create_engine` that handles all of the 

special cases that Buildbot needs.  Those include: 

 

- pool_recycle for MySQL 

- %(basedir) substitution 

- optimal thread pool size calculation 

 

""" 

 

import os 

import sqlalchemy as sa 

from twisted.python import log 

from sqlalchemy.engine import strategies, url 

from sqlalchemy.pool import NullPool 

from buildbot.util import sautils 

 

# from http://www.mail-archive.com/sqlalchemy@googlegroups.com/msg15079.html 

class ReconnectingListener(object): 

    def __init__(self): 

        self.retried = False 

 

class BuildbotEngineStrategy(strategies.ThreadLocalEngineStrategy): 

    # A subclass of the ThreadLocalEngineStrategy that can effectively interact 

    # with Buildbot. 

    #  

    # This adjusts the passed-in parameters to ensure that we get the behaviors 

    # Buildbot wants from particular drivers, and wraps the outgoing Engine 

    # object so that its methods run in threads and return deferreds. 

 

    name = 'buildbot' 

 

    def special_case_sqlite(self, u, kwargs): 

        """For sqlite, percent-substitute %(basedir)s and use a full 

        path to the basedir.  If using a memory database, force the 

        pool size to be 1.""" 

        max_conns = None 

 

        # when given a database path, stick the basedir in there 

        if u.database: 

 

            # Use NullPool instead of the sqlalchemy-0.6.8-default 

            # SingletonThreadpool for sqlite to suppress the error in 

            # http://groups.google.com/group/sqlalchemy/msg/f8482e4721a89589, 

            # which also explains that NullPool is the new default in 

            # sqlalchemy 0.7 for non-memory SQLite databases. 

            kwargs.setdefault('poolclass', NullPool) 

 

            u.database = u.database % dict(basedir = kwargs['basedir']) 

            if not os.path.isabs(u.database[0]): 

                u.database = os.path.join(kwargs['basedir'], u.database) 

 

        # in-memory databases need exactly one connection 

        if not u.database: 

            kwargs['pool_size'] = 1 

            max_conns = 1 

 

        # allow serializing access to the db 

        if 'serialize_access' in u.query: 

            u.query.pop('serialize_access') 

            max_conns = 1 

 

        return u, kwargs, max_conns 

 

    def set_up_sqlite_engine(self, u, engine): 

        """Special setup for sqlite engines""" 

        # try to enable WAL logging 

        if u.database: 

            def connect_listener(connection, record): 

                connection.execute("pragma checkpoint_fullfsync = off") 

 

            if sautils.sa_version() < (0,7,0): 

                class CheckpointFullfsyncDisabler(object): 

                    pass 

                disabler = CheckpointFullfsyncDisabler() 

                disabler.connect = connect_listener 

                engine.pool.add_listener(disabler) 

            else: 

                sa.event.listen(engine.pool, 'connect', connect_listener) 

 

            log.msg("setting database journal mode to 'wal'") 

            try: 

                engine.execute("pragma journal_mode = wal") 

            except: 

                log.msg("failed to set journal mode - database may fail") 

 

    def special_case_mysql(self, u, kwargs): 

        """For mysql, take max_idle out of the query arguments, and 

        use its value for pool_recycle.  Also, force use_unicode and 

        charset to be True and 'utf8', failing if they were set to 

        anything else.""" 

 

        kwargs['pool_recycle'] = int(u.query.pop('max_idle', 3600)) 

 

        # default to the InnoDB storage engine 

        storage_engine = u.query.pop('storage_engine', 'MyISAM') 

        kwargs['connect_args'] = { 

            'init_command' : 'SET storage_engine=%s' % storage_engine, 

        } 

 

        if 'use_unicode' in u.query: 

            if u.query['use_unicode'] != "True": 

                raise TypeError("Buildbot requires use_unicode=True " + 

                                 "(and adds it automatically)") 

        else: 

            u.query['use_unicode'] = True 

 

        if 'charset' in u.query: 

            if u.query['charset'] != "utf8": 

                raise TypeError("Buildbot requires charset=utf8 " + 

                                 "(and adds it automatically)") 

        else: 

            u.query['charset'] = 'utf8' 

 

        return u, kwargs, None 

 

    def set_up_mysql_engine(self, u, engine): 

        """Special setup for mysql engines""" 

        # add the reconnecting PoolListener that will detect a 

        # disconnected connection and automatically start a new 

        # one.  This provides a measure of additional safety over 

        # the pool_recycle parameter, and is useful when e.g., the 

        # mysql server goes away 

        def checkout_listener(dbapi_con, con_record, con_proxy): 

            try: 

                cursor = dbapi_con.cursor() 

                cursor.execute("SELECT 1") 

            except dbapi_con.OperationalError, ex: 

                if ex.args[0] in (2006, 2013, 2014, 2045, 2055): 

                    # sqlalchemy will re-create the connection 

                    raise sa.exc.DisconnectionError() 

                raise 

 

        # older versions of sqlalchemy require the listener to be specified 

        # in the kwargs, in a class instance 

        if sautils.sa_version() < (0,7,0): 

            class ReconnectingListener(object): 

                pass 

            rcl = ReconnectingListener() 

            rcl.checkout = checkout_listener 

            engine.pool.add_listener(rcl) 

        else: 

            sa.event.listen(engine.pool, 'checkout', checkout_listener) 

 

    def create(self, name_or_url, **kwargs): 

        if 'basedir' not in kwargs: 

            raise TypeError('no basedir supplied to create_engine') 

 

        max_conns = None 

 

        # apply special cases 

        u = url.make_url(name_or_url) 

        if u.drivername.startswith('sqlite'): 

            u, kwargs, max_conns = self.special_case_sqlite(u, kwargs) 

        elif u.drivername.startswith('mysql'): 

            u, kwargs, max_conns = self.special_case_mysql(u, kwargs) 

 

        # remove the basedir as it may confuse sqlalchemy 

        basedir = kwargs.pop('basedir') 

 

        # calculate the maximum number of connections from the pool parameters, 

        # if it hasn't already been specified 

        if max_conns is None: 

            max_conns = kwargs.get('pool_size', 5) + kwargs.get('max_overflow', 10) 

 

        engine = strategies.ThreadLocalEngineStrategy.create(self, 

                                            u, **kwargs) 

 

        # annotate the engine with the optimal thread pool size; this is used 

        # by DBConnector to configure the surrounding thread pool 

        engine.optimal_thread_pool_size = max_conns 

 

        # keep the basedir 

        engine.buildbot_basedir = basedir 

 

        if u.drivername.startswith('sqlite'): 

            self.set_up_sqlite_engine(u, engine) 

        elif u.drivername.startswith('mysql'): 

            self.set_up_mysql_engine(u, engine) 

 

        return engine 

 

BuildbotEngineStrategy() 

 

# this module is really imported for the side-effects, but pyflakes will like 

# us to use something from the module -- so offer a copy of create_engine, 

# which explicitly adds the strategy argument 

def create_engine(*args, **kwargs): 

    kwargs['strategy'] = 'buildbot' 

 

    return sa.create_engine(*args, **kwargs)