Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def testReadlines(self):
l = MyLog(self.basedir, "chunks1")
l.addHeader("HEADER\n") # should be ignored
l.addStdout("Some text\n")
l.addStdout("Some More Text\nAnd Some More\n")
l.addStderr("Some Stderr\n")
l.addStdout("Last line\n")
l.finish()
alllines = list(l.readlines())
self.failUnlessEqual(len(alllines), 4)
self.failUnlessEqual(alllines[0], "Some text\n")
self.failUnlessEqual(alllines[2], "And Some More\n")
self.failUnlessEqual(alllines[3], "Last line\n")
stderr = list(l.readlines(interfaces.LOG_CHANNEL_STDERR))
self.failUnlessEqual(len(stderr), 1)
self.failUnlessEqual(stderr[0], "Some Stderr\n")
lines = l.readlines()
if False: # TODO: l.readlines() is not yet an iterator
# verify that it really is an iterator
line0 = lines.next()
self.failUnlessEqual(line0, "Some text\n")
line1 = lines.next()
del line1
line2 = lines.next()
self.failUnlessEqual(line2, "And Some More\n")
self.slaves = {}
if self.basedir is None:
self.basedir = self.mktemp()
basedir = self.basedir
os.makedirs(basedir)
self.master = master.BuildMaster(basedir, **kwargs)
spec = dbspec.DBSpec.from_url("sqlite:///state.sqlite", basedir=basedir)
sm = schema.DBSchemaManager(spec, basedir)
sm.upgrade(quiet=True)
self.master.loadDatabase(spec)
self.master.readConfig = True
self.master.startService()
self.status = self.master.getStatus()
self.control = interfaces.IControl(self.master)
def logChunk(self, build, step, log, channel, text):
if channel == interfaces.LOG_CHANNEL_STDOUT:
self.outReceived(text)
elif channel == interfaces.LOG_CHANNEL_STDERR:
self.errReceived(text)
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc., 51
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# Portions Copyright Buildbot Team Members
from zope.interface import implementer
from buildbot import interfaces
from buildbot.util import service
@implementer(interfaces.IMachine)
class Machine(service.BuildbotService):
def checkConfig(self, name, **kwargs):
super().checkConfig(**kwargs)
self.name = name
self.workers = []
def reconfigService(self, name, **kwargs):
super().reconfigService(**kwargs)
assert self.name == name
def registerWorker(self, worker):
assert worker.machine_name == self.name
self.workers.append(worker)
def unregisterWorker(self, worker):
def __init__(self, name, builderNames, properties=None,
codebases=DEFAULT_CODEBASES):
super(BaseScheduler, self).__init__(name=name)
ok = True
if interfaces.IRenderable.providedBy(builderNames):
pass
elif isinstance(builderNames, (list, tuple)):
for b in builderNames:
if not isinstance(b, str) and \
not interfaces.IRenderable.providedBy(b):
ok = False
else:
ok = False
if not ok:
config.error(
"The builderNames argument to a scheduler must be a list "
"of Builder names or an IRenderable object that will render"
"to a list of builder names.")
self.builderNames = builderNames
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc., 51
# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# Copyright Buildbot Team Members
from twisted.internet import defer
from zope.interface import implementer
from buildbot import interfaces
from buildbot.data import resultspec
from buildbot.status.buildrequest import BuildRequestStatus
@implementer(interfaces.IBuildSetStatus)
class BuildSetStatus:
def __init__(self, bsdict, status):
self.id = bsdict['bsid']
self.bsdict = bsdict
self.status = status
self.master = status.master
# methods for our clients
def getReason(self):
return self.bsdict['reason']
def getResults(self):
return self.bsdict['results']
data = ''
ss = build.getSourceStamp()
if ss:
data += 'CHANGES:\n'
data += '\n\n'.join([c.asText() for c in ss.changes])
data += '\n\n'
else:
data += 'NO SOURCE STAMP (CHANGES UNAVAILABLE)'
data += '\n\n'
return data
class InformativeMailNotifier(mail.MailNotifier):
"""MailNotifier subclass which provides additional information about the
build failure inside the email."""
implements(interfaces.IEmailSender)
compare_attrs = (mail.MailNotifier.compare_attrs +
["num_lines", "only_failure_logs"])
# Remove messageFormatter from the compare_attrs, that would lead to
# recursion, and is checked by the class test.
compare_attrs.remove("messageFormatter")
def __init__(self,
num_lines = 10, only_failure_logs = True,
*attrs, **kwargs):
mail.MailNotifier.__init__(self,
messageFormatter=self.informative_formatter,
*attrs, **kwargs)
self.num_lines = num_lines
self.only_failure_logs = only_failure_logs
span.stderr {
font-family: "Courier New", courier, monotype;
color: red;
}
span.header {
font-family: "Courier New", courier, monotype;
color: blue;
}
"""
class ChunkConsumer:
if implements:
implements(interfaces.IStatusLogConsumer)
else:
__implements__ = interfaces.IStatusLogConsumer,
def __init__(self, original, textlog):
self.original = original
self.textlog = textlog
def registerProducer(self, producer, streaming):
self.producer = producer
self.original.registerProducer(producer, streaming)
def unregisterProducer(self):
self.original.unregisterProducer()
def writeChunk(self, chunk):
formatted = self.textlog.content([chunk])
try:
self.original.write(formatted)
except pb.DeadReferenceError:
self.producing.stopProducing()
def finish(self):
bc = self.getControl(which)
who = None # TODO: if we can authenticate that a particular User
# asked for this, use User Name instead of None so they'll
# be informed of the results.
# TODO: or, monitor this build and announce the results through the
# 'reply' argument.
r = "forced: by IRC user <%s>: %s" % (user, reason)
# TODO: maybe give certain users the ability to request builds of
# certain branches
s = SourceStamp(branch=branch, revision=revision)
req = BuildRequest(r, s, which)
try:
bc.requestBuildSoon(req)
except interfaces.NoSlaveError:
self.reply(reply,
"sorry, I can't force a build: all slaves are offline")
return
ireq = IrcBuildRequest(self, reply)
req.subscribe(ireq.started)
def logChunk(self, build, step, log, channel, text):
if channel == interfaces.LOG_CHANNEL_STDOUT:
self.outReceived(text)
elif channel == interfaces.LOG_CHANNEL_STDERR:
self.errReceived(text)