|
# This file is part of Buildbot. Buildbot is free software: you can # redistribute it and/or modify it under the terms of the GNU General Public # License as published by the Free Software Foundation, version 2. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., 51 # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # # Copyright Buildbot Team Members
"""This source will maintain a connection to gerrit ssh server that will provide us gerrit events in json format."""
"(seconds) connections longer than this are considered good, and reset the backoff timer"
"(seconds) minimum, but nonzero, time to wait before retrying a failed connection"
"multiplier used to increase the backoff from MIN to MAX on repeated failures"
"(seconds) maximum time to wait before retrying a failed connection"
""" @type gerritserver: string @param gerritserver: the dns or ip that host the gerrit ssh server,
@type gerritport: int @param gerritport: the port of the gerrit ssh server,
@type username: string @param username: the username to use to connect to gerrit,
@type identity_file: string @param identity_file: identity file to for authentication (optional).
""" # TODO: delete API comment when documented
self.change_source = change_source self.data = ""
def outReceived(self, data): """Do line buffering.""" self.data += data lines = self.data.split("\n") self.data = lines.pop(-1) # last line is either empty or incomplete for line in lines: log.msg("gerrit: %s" % (line,)) yield self.change_source.lineReceived(line)
log.msg("gerrit stderr: %s" % (data,))
self.change_source.streamProcessStopped()
except ValueError: log.msg("bad json line: %s" % (line,)) return defer.succeed(None)
log.msg("no type in event %s" % (line,)) return defer.succeed(None) log.msg("unsupported event %s" % (event["type"],)) return defer.succeed(None)
# flatten the event dictionary, for easy access with WithProperties else: # already there
# eat failures.. author="%s <%s>" % (change["owner"]["name"], change["owner"]["email"]), project=change["project"], repository="ssh://%s@%s:%s/%s" % ( self.username, self.gerritserver, self.gerritport, change["project"]), branch=change["branch"]+"/"+change["number"], revision=event["patchSet"]["revision"], revlink=change["url"], comments=change["subject"], files=["unknown"], category=event["type"], properties=properties)) ref = event["refUpdate"] author = "gerrit"
if "submitter" in event: author="%s <%s>" % (event["submitter"]["name"], event["submitter"]["email"])
return self.addChange(dict( author=author, project=ref["project"], repository="ssh://%s@%s:%s/%s" % ( self.username, self.gerritserver, self.gerritport, ref["project"]), branch=ref["refName"], revision=ref["newRev"], comments="Gerrit: patchset(s) merged.", files=["unknown"], category=event["type"], properties=properties))
self.process = None
# if the service is stopped, don't try to restart the process if not self.wantProcess: log.msg("service is not running; not reconnecting") return
now = util.now() if now - self.lastStreamProcessStart < self.STREAM_GOOD_CONNECTION_TIME: # bad startup; start the stream process again after a timeout, and then # increase the timeout log.msg("'gerrit stream-events' failed; restarting after %ds" % round(self.streamProcessTimeout)) reactor.callLater(self.streamProcessTimeout, self.startStreamProcess) self.streamProcessTimeout *= self.STREAM_BACKOFF_EXPONENT if self.streamProcessTimeout > self.STREAM_BACKOFF_MAX: self.streamProcessTimeout = self.STREAM_BACKOFF_MAX else: # good startup, but lost connection; restart immediately, and set the timeout # to its minimum self.startStreamProcess() self.streamProcessTimeout = self.STREAM_BACKOFF_MIN
log.msg("starting 'gerrit stream-events'") self.lastStreamProcessStart = util.now() args = [ self.username+"@"+self.gerritserver,"-p", str(self.gerritport)] if self.identity_file is not None: args = args + [ '-i', self.identity_file ] self.process = reactor.spawnProcess(self.LocalPP(self), "ssh", [ "ssh" ] + args + [ "gerrit", "stream-events" ])
self.wantProcess = True self.startStreamProcess()
self.wantProcess = False if self.process: self.process.signalProcess("KILL") # TODO: if this occurs while the process is restarting, some exceptions may # be logged, although things will settle down normally return base.ChangeSource.stopService(self)
(self.username, self.gerritserver, status))
|