Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _thd_parse_volumes(self, volumes):
volume_list = []
for volume_string in (volumes or []):
try:
_, volume = volume_string.split(":", 1)
except ValueError:
config.error("Invalid volume definition for docker "
"%s. Skipping..." % volume_string)
continue
if volume.endswith(':ro') or volume.endswith(':rw'):
volume = volume[:-3]
volume_list.append(volume)
return volume_list, volumes
def __init__(self, name, password, connection, hd_image, base_image=None, xml=None,
**kwargs):
super().__init__(name, password, **kwargs)
if not libvirt:
config.error(
"The python module 'libvirt' is needed to use a LibVirtWorker")
self.connection = connection
self.image = hd_image
self.base_image = base_image
self.xml = xml
self.cheap_copy = True
self.graceful_shutdown = False
self.domain = None
self.ready = False
self._find_existing_deferred = self._find_existing_instance()
known_keys = set(['branch', 'repository', 'revision'])
if codebases is None:
config.error("Codebases cannot be None")
elif isinstance(codebases, list):
codebases = dict((codebase, {}) for codebase in codebases)
elif not isinstance(codebases, dict):
config.error(
"Codebases must be a dict of dicts, or list of strings")
else:
for codebase, attrs in codebases.items():
if not isinstance(attrs, dict):
config.error("Codebases must be a dict of dicts")
else:
unk = set(attrs) - known_keys
if unk:
config.error(
"Unknown codebase keys %s for codebase %s"
% (', '.join(unk), codebase))
self.codebases = codebases
# internal variables
self._change_consumer = None
self._enable_consumer = None
self._change_consumption_lock = defer.DeferredLock()
self.enabled = True
config.error("extraHeaders must be a dictionary")
self.extraHeaders = extraHeaders
self.addPatch = addPatch
self.useTls = useTls
self.smtpUser = smtpUser
self.smtpPassword = smtpPassword
self.smtpPort = smtpPort
self.buildSetSummary = buildSetSummary
self.buildSetSubscription = None
self.getPreviousBuild = previousBuildGetter
self.watched = []
self.master_status = None
# you should either limit on builders or categories, not both
if self.builders != None and self.categories != None:
config.error(
"Please specify only builders or categories to include - " +
"not both.")
if customMesg:
config.error(
"customMesg is deprecated; use messageFormatter instead")
self.encoding = encoding
self.gitbin = gitbin
self.workdir = name+'-work'
self.usetimestamps = usetimestamps
self.category = category
self.project = project
self.changeCount = 0
self.lastRev = {}
self.lastRevs = {}
self.pull_requests = []
self.auth_header = {'Authorization': 'token ' + token}
if fetch_refspec is not None:
config.error("GitPoller: fetch_refspec is no longer supported. "
"Instead, only the given branches are downloaded.")
if name is None:
name = "P4Source:%s:%s" % (p4port, p4base)
super().__init__(name=name,
pollInterval=pollInterval,
pollAtLaunch=pollAtLaunch)
if project is None:
project = ''
if use_tickets and not p4passwd:
config.error(
"You need to provide a P4 password to use ticket authentication")
if not callable(revlink):
config.error(
"You need to provide a valid callable for revlink")
if not callable(resolvewho):
config.error(
"You need to provide a valid callable for resolvewho")
self.p4port = p4port
self.p4user = p4user
self.p4passwd = p4passwd
self.p4base = p4base
self.p4bin = p4bin
self.split_file = split_file
self.encoding = encoding
self.project = util.bytes2unicode(project)
self.use_tickets = use_tickets
self.ticket_login_interval = ticket_login_interval
def __init__(self, *args, **kwargs):
"""Bookkeep a bit of metadata to describe the workers"""
self.tags = list(kwargs.pop('tags', []))
self.platform = kwargs.pop('platform')
if not isinstance(self.platform, Platform):
config.error('`platform` must be an instance of Platform')
super().__init__(*args, **kwargs)
def __init__(self, logfiles={}, lazylogfiles=False, log_eval_func=None,
*args, **kwargs):
BuildStep.__init__(self, *args, **kwargs)
if logfiles and not isinstance(logfiles, dict):
config.error(
"the ShellCommand 'logfiles' parameter must be a dictionary")
# merge a class-level 'logfiles' attribute with one passed in as an
# argument
self.logfiles = self.logfiles.copy()
self.logfiles.update(logfiles)
self.lazylogfiles = lazylogfiles
if log_eval_func and not callable(log_eval_func):
config.error(
"the 'log_eval_func' paramater must be a callable")
self.log_eval_func = log_eval_func
self.addLogObserver('stdio', OutputProgressObserver("output"))
unimportantSchedulerNames=None, **kwargs):
if schedulerNames is None:
schedulerNames = []
if unimportantSchedulerNames is None:
unimportantSchedulerNames = []
if not schedulerNames:
config.error(
"You must specify a scheduler to trigger")
if (sourceStamp or sourceStamps) and (updateSourceStamp is not None):
config.error(
"You can't specify both sourceStamps and updateSourceStamp")
if (sourceStamp or sourceStamps) and alwaysUseLatest:
config.error(
"You can't specify both sourceStamps and alwaysUseLatest")
if alwaysUseLatest and (updateSourceStamp is not None):
config.error(
"You can't specify both alwaysUseLatest and updateSourceStamp"
)
if not set(schedulerNames).issuperset(set(unimportantSchedulerNames)):
config.error(
"unimportantSchedulerNames must be a subset of schedulerNames"
)
self.schedulerNames = schedulerNames
self.unimportantSchedulerNames = unimportantSchedulerNames
self.sourceStamps = sourceStamps or []
if sourceStamp:
self.sourceStamps.append(sourceStamp)
if updateSourceStamp is not None:
self.updateSourceStamp = updateSourceStamp
else:
self.updateSourceStamp = not (alwaysUseLatest or self.sourceStamps)
def checkConfig(self, baseURL, headers, report_on, dont_report_on,
**kwargs):
if not isinstance(baseURL, str):
config.error('`baseURL` must be an instrance of str')
if not isinstance(headers, dict):
config.error('`headers` must be an instrance of dict')
# validating report on events sets
args = [('report_on', report_on),
('dont_report_on', dont_report_on)]
for name, value in args:
if value is None:
continue
elif not isinstance(value, collections.abc.Set):
config.error(f'`{name}` argument must be an instanse of set')
elif not value.issubset(_statuses):
invalids = value - _statuses
config.error(f'`{name}` contains invalid elements: {invalids}')
if report_on and dont_report_on:
config.error('Ambiguously both `report_on` and `dont_report_on` '
'are defined, please pass either `report_on` or '