|
# This file is part of Buildbot. Buildbot is free software: you can # redistribute it and/or modify it under the terms of the GNU General Public # License as published by the Free Software Foundation, version 2. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., 51 # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # # Copyright Buildbot Team Members
"""What's the plan?
the LogFile has just one FD, used for both reading and writing. Each time you add an entry, fd.seek to the end and then write.
Each reader (i.e. Producer) keeps track of their own offset. The reader starts by seeking to the start of the logfile, and reading forwards. Between each hunk of file they yield chunks, so they must remember their offset before yielding and re-seek back to that offset before reading more data. When their read() returns EOF, they're finished with the first phase of the reading (everything that's already been written to disk).
After EOF, the remaining data is entirely in the current entries list. These entries are all of the same channel, so we can do one "".join and obtain a single chunk to be sent to the listener. But since that involves a yield, and more data might arrive after we give up control, we have to subscribe them before yielding. We can't subscribe them any earlier, otherwise they'd get data out of order.
We're using a generator in the first place so that the listener can throttle us, which means they're pulling. But the subscription means we're pushing. Really we're a Producer. In the first phase we can be either a PullProducer or a PushProducer. In the second phase we're only a PushProducer.
So the client gives a LogFileConsumer to File.subscribeConsumer . This Consumer must have registerProducer(), unregisterProducer(), and writeChunk(), and is just like a regular twisted.interfaces.IConsumer, except that writeChunk() takes chunks (tuples of (channel,text)) instead of the normal write() which takes just text. The LogFileConsumer is allowed to call stopProducing, pauseProducing, and resumeProducing on the producer instance it is given. """
# now subscribe them to receive new entries
# then give them the not-yet-merged data channel = self.logfile.runEntries[0][0] text = "".join([c[1] for c in self.logfile.runEntries]) yield (channel, text)
# now we've caught up to the present. Anything further will come from # the logfile subscription. We add the callback *after* yielding the # data from runEntries, because the logfile might have finished # during the yield.
# TODO: should we still call consumer.finish? probably not. self.paused = True self.consumer = None self.done()
self.paused = True
# Twisted-1.3.0 has a bug which causes hangs when resumeProducing # calls transport.write (there is a recursive loop, fixed in 2.0 in # t.i.abstract.FileDescriptor.doWrite by setting the producerPaused # flag *before* calling resumeProducing). To work around this, we # just put off the real resumeProducing for a moment. This probably # has a performance hit, but I'm going to assume that the log files # are not retrieved frequently enough for it to be an issue.
eventually(self._resumeProducing)
self.paused = False if not self.chunkGenerator: return try: while not self.paused: chunk = self.chunkGenerator.next() self.consumer.writeChunk(chunk) # we exit this when the consumer says to stop, or we run out # of chunks except StopIteration: # if the generator finished, it will have done releaseFile self.chunkGenerator = None # now everything goes through the subscription, and they don't get to # pause anymore
if self.consumer: self.consumer.writeChunk((channel, chunk))
""" A LogFile keeps all of its contents on disk, in a non-pickle format to which new entries can easily be appended. The file on disk has a name like 12-log-compile-output, under the Builder's directory. The actual filename is generated (before the LogFile is created) by L{BuildStatus.generateLogfileName}.
@ivar length: length of the data in the logfile (sum of chunk sizes; not the length of the on-disk encoding) """
# No max size by default # Don't keep a tail buffer by default
""" @type parent: L{BuildStepStatus} @param parent: the Step that this log is a part of @type name: string @param name: the name of this log, typically 'output' @type logfilename: string @param logfilename: the Builder-relative pathname for the saved entries """ # the buildmaster was probably stopped abruptly, before the # BuilderStatus could be saved, so BuilderStatus.nextBuildNumber # is out of date, and we're overlapping with earlier builds now. # Warn about it, but then overwrite the old pickle file log.msg("Warning: Overwriting old serialized Build at %s" % fn) os.makedirs(dirname)
""" Get the base (uncompressed) filename for this log file.
@returns: filename """
""" Return true if this logfile's contents are available. For a newly created logfile, this is always true, but for a L{LogFile} instance that has been persisted, the logfiles themselves may have been deleted, in which case this method will return False.
@returns: boolean """ os.path.exists(self.getFilename() + '.gz') or \ os.path.exists(self.getFilename())
""" Get this logfile's name
@returns: string """
""" Get the L{BuildStepStatus} instance containing this logfile
@returns: L{BuildStepStatus} instance """
""" Return true if this logfile is finished (that is, if it will not receive any additional data
@returns: boolean """
""" Return a Deferred that will fire when this logfile is finished, or will fire immediately if the logfile is already finished. """ d = defer.succeed(self) else:
""" Get an open file object for this log. The file may also be in use for writing, so it should not be closed by the caller, and the caller should not rely on its file position remaining constant between asynchronous code segments.
@returns: file object """ # this is the filehandle we're using to write to the log, so # don't close it! # otherwise they get their own read-only handle # try a compressed log first pass pass
# this produces one ginormous string
# generate chunks for everything that was logged at the time we were # first called, so remember how long the file was when we started. # Don't read beyond that point. The current contents of # self.runEntries will follow.
# this returns an iterator, which means arbitrary things could happen # while we're yielding. This will faithfully deliver the log as it # existed when it was started, and not return anything after that # point. To use this in subscribe(catchup=True) without missing any # data, you must insure that nothing will be added to the log during # yield() calls.
else: offset = 0 remaining = None
(self.runEntries[0][0] in channels)): "".join([c[1] for c in self.runEntries]))
# freeze the state of the LogFile by passing a lot of parameters into # a generator channels, onlyText)
channels, onlyText): else: data = f.read(self.BUFFERSIZE)
else: yield (channel, text) else: data = f.read(self.BUFFERSIZE)
else: yield leftover
"""Return an iterator that produces newline-terminated lines, excluding header chunks."""
if self.finished: return self.watchers.append(receiver) if catchup: for channel, text in self.getChunks(): # TODO: add logChunks(), to send over everything at once? receiver.logChunk(self.step.build, self.step, self, channel, text)
if receiver in self.watchers: self.watchers.remove(receiver)
p = LogFileProducer(self, consumer) p.resumeProducing()
# interface used by the build steps to add things to the log
# merge all .runEntries (which are all of the same type) into a # single chunk for .entries
""" Add an entry to the logfile. The C{channel} is one of L{STDOUT}, L{STDERR}, or L{HEADER}. The C{text} is the text to add to the logfile, which can be a unicode string or a bytestring which is presumed to be encoded with utf-8.
This method cannot be called after the logfile is finished.
@param channel: channel to add a chunk for @param text: chunk of text @param _no_watchers: private """
# notify watchers first, before the chunk gets munged, so that they get # a complete picture of the actual log output # TODO: is this right, or should the watchers get a picture of the chunks?
# Truncate the log if it's more than logMaxSize bytes # Add a message about what's going on and truncate this # chunk if necessary self._merge() "has been truncated\n" % logMaxSize)
# and track the tail of the text # Update the tail buffer # Drop some stuff off the beginning of the buffer
# we only add to .runEntries here. _merge() is responsible for adding # merged chunks to .entries
""" Shortcut to add stdout text to the logfile
@param text: text to add to the logfile """
""" Shortcut to add stderr text to the logfile
@param text: text to add to the logfile """
""" Shortcut to add header text to the logfile
@param text: text to add to the logfile """
""" Finish the logfile, flushing any buffers and preventing any further writes to the log. """
# we don't do an explicit close, because there might be readers # shareing the filehandle. As soon as they stop reading, the # filehandle will be released and automatically closed.
# bail out if there's no compression support else:
else: if runtime.platformType == 'win32': # windows cannot rename a file on top of an existing one, so # fall back to delete-first. There are ways this can fail and # lose the builder's history, so we avoid using it in the # general (non-windows) case if os.path.exists(filename): os.unlink(filename)
log.msg("failed to compress %s" % self.getFilename()) if os.path.exists(compressed): _tryremove(compressed, 1, 5) failure.trap() # reraise the failure
# persistence stuff
# self.step must be filled in by our parent
self.step = parent self.name = name self.filename = logfilename self.html = html
return self.name # set in BuildStepStatus.addLog return self.step
return True return defer.succeed(self)
return True return self.html # looks kinda like text return self.html return [(STDERR, self.html)]
pass pass
pass
d = self.__dict__.copy() del d['step'] return d
"""Try to remove a file, and if failed, try again in timeout. Increases the timeout by a factor of 4, and only keeps trying for another retries-amount of times.
""" except OSError: if retries > 0: reactor.callLater(timeout, _tryremove, filename, timeout * 4, retries - 1) else: log.msg("giving up on removing %s after over %d seconds" % (filename, timeout))
|