1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

# This file is part of Buildbot.  Buildbot is free software: you can 

# redistribute it and/or modify it under the terms of the GNU General Public 

# License as published by the Free Software Foundation, version 2. 

# 

# This program is distributed in the hope that it will be useful, but WITHOUT 

# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 

# FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more 

# details. 

# 

# You should have received a copy of the GNU General Public License along with 

# this program; if not, write to the Free Software Foundation, Inc., 51 

# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 

# 

# Copyright Buildbot Team Members 

 

from twisted.internet import reactor 

 

from buildbot.changes import base 

from buildbot.util import json 

from buildbot import util 

from twisted.python import log 

from twisted.internet import defer 

from twisted.internet.protocol import ProcessProtocol 

 

class GerritChangeSource(base.ChangeSource): 

    """This source will maintain a connection to gerrit ssh server 

    that will provide us gerrit events in json format.""" 

 

    compare_attrs = ["gerritserver", "gerritport"] 

 

    STREAM_GOOD_CONNECTION_TIME = 120 

    "(seconds) connections longer than this are considered good, and reset the backoff timer" 

 

    STREAM_BACKOFF_MIN = 0.5 

    "(seconds) minimum, but nonzero, time to wait before retrying a failed connection" 

 

    STREAM_BACKOFF_EXPONENT = 1.5 

    "multiplier used to increase the backoff from MIN to MAX on repeated failures" 

 

    STREAM_BACKOFF_MAX = 60 

    "(seconds) maximum time to wait before retrying a failed connection" 

 

    def __init__(self, gerritserver, username, gerritport=29418, identity_file=None): 

        """ 

        @type  gerritserver: string 

        @param gerritserver: the dns or ip that host the gerrit ssh server, 

 

        @type  gerritport: int 

        @param gerritport: the port of the gerrit ssh server, 

 

        @type  username: string 

        @param username: the username to use to connect to gerrit, 

 

        @type  identity_file: string 

        @param identity_file: identity file to for authentication (optional). 

 

        """ 

        # TODO: delete API comment when documented 

 

        self.gerritserver = gerritserver 

        self.gerritport = gerritport 

        self.username = username 

        self.identity_file = identity_file 

        self.process = None 

        self.wantProcess = False 

        self.streamProcessTimeout = self.STREAM_BACKOFF_MIN 

 

    class LocalPP(ProcessProtocol): 

        def __init__(self, change_source): 

            self.change_source = change_source 

            self.data = "" 

 

        @defer.inlineCallbacks 

        def outReceived(self, data): 

            """Do line buffering.""" 

            self.data += data 

            lines = self.data.split("\n") 

            self.data = lines.pop(-1) # last line is either empty or incomplete 

            for line in lines: 

                log.msg("gerrit: %s" % (line,)) 

                yield self.change_source.lineReceived(line) 

 

        def errReceived(self, data): 

            log.msg("gerrit stderr: %s" % (data,)) 

 

        def processEnded(self, status_object): 

            self.change_source.streamProcessStopped() 

 

    def lineReceived(self, line): 

        try: 

            event = json.loads(line.decode('utf-8')) 

        except ValueError: 

            log.msg("bad json line: %s" % (line,)) 

            return defer.succeed(None) 

 

        if not(type(event) == type({}) and "type" in event): 

            log.msg("no type in event %s" % (line,)) 

            return defer.succeed(None) 

        func = getattr(self, "eventReceived_"+event["type"].replace("-","_"), None) 

        if func == None: 

            log.msg("unsupported event %s" % (event["type"],)) 

            return defer.succeed(None) 

 

        # flatten the event dictionary, for easy access with WithProperties 

        def flatten(event, base, d): 

            for k, v in d.items(): 

                if type(v) == dict: 

                    flatten(event, base + "." + k, v) 

                else: # already there 

                    event[base + "." + k] = v 

 

        properties = {} 

        flatten(properties, "event", event) 

        return func(properties,event) 

    def addChange(self, chdict): 

        d = self.master.addChange(**chdict) 

        # eat failures.. 

        d.addErrback(log.err, 'error adding change from GerritChangeSource') 

        return d 

    def eventReceived_patchset_created(self, properties, event): 

        change = event["change"] 

        return self.addChange(dict( 

                author="%s <%s>" % (change["owner"]["name"], change["owner"]["email"]), 

                project=change["project"], 

                repository="ssh://%s@%s:%s/%s" % ( 

                    self.username, self.gerritserver, self.gerritport, change["project"]), 

                branch=change["branch"]+"/"+change["number"], 

                revision=event["patchSet"]["revision"], 

                revlink=change["url"], 

                comments=change["subject"], 

                files=["unknown"], 

                category=event["type"], 

                properties=properties)) 

    def eventReceived_ref_updated(self, properties, event): 

        ref = event["refUpdate"] 

        author = "gerrit" 

 

        if "submitter" in event: 

            author="%s <%s>" % (event["submitter"]["name"], event["submitter"]["email"]) 

 

        return self.addChange(dict( 

                author=author, 

                project=ref["project"], 

                repository="ssh://%s@%s:%s/%s" % ( 

                    self.username, self.gerritserver, self.gerritport, ref["project"]), 

                branch=ref["refName"], 

                revision=ref["newRev"], 

                comments="Gerrit: patchset(s) merged.", 

                files=["unknown"], 

                category=event["type"], 

                properties=properties)) 

 

    def streamProcessStopped(self): 

        self.process = None 

 

        # if the service is stopped, don't try to restart the process 

        if not self.wantProcess: 

            log.msg("service is not running; not reconnecting") 

            return 

 

        now = util.now() 

        if now - self.lastStreamProcessStart < self.STREAM_GOOD_CONNECTION_TIME: 

            # bad startup; start the stream process again after a timeout, and then 

            # increase the timeout 

            log.msg("'gerrit stream-events' failed; restarting after %ds" % round(self.streamProcessTimeout)) 

            reactor.callLater(self.streamProcessTimeout, self.startStreamProcess) 

            self.streamProcessTimeout *= self.STREAM_BACKOFF_EXPONENT 

            if self.streamProcessTimeout > self.STREAM_BACKOFF_MAX: 

                self.streamProcessTimeout = self.STREAM_BACKOFF_MAX 

        else: 

            # good startup, but lost connection; restart immediately, and set the timeout 

            # to its minimum 

            self.startStreamProcess() 

            self.streamProcessTimeout = self.STREAM_BACKOFF_MIN 

 

    def startStreamProcess(self): 

        log.msg("starting 'gerrit stream-events'") 

        self.lastStreamProcessStart = util.now() 

        args = [ self.username+"@"+self.gerritserver,"-p", str(self.gerritport)] 

        if self.identity_file is not None: 

          args = args + [ '-i', self.identity_file ] 

        self.process = reactor.spawnProcess(self.LocalPP(self), "ssh", 

          [ "ssh" ] + args + [ "gerrit", "stream-events" ]) 

 

    def startService(self): 

        self.wantProcess = True 

        self.startStreamProcess() 

 

    def stopService(self): 

        self.wantProcess = False 

        if self.process: 

            self.process.signalProcess("KILL") 

        # TODO: if this occurs while the process is restarting, some exceptions may 

        # be logged, although things will settle down normally 

        return base.ChangeSource.stopService(self) 

 

    def describe(self): 

        status = "" 

        if not self.process: 

            status = "[NOT CONNECTED - check log]" 

        str = ('GerritChangeSource watching the remote Gerrit repository %s@%s %s' % 

                            (self.username, self.gerritserver, status)) 

        return str