Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def main():
parser = argparse.ArgumentParser(description="Spawn tasks for bugbug data pipeline")
parser.add_argument("data_pipeline_json")
args = parser.parse_args()
decision_task_id = os.environ.get("TASK_ID")
options = get_taskcluster_options()
add_self = False
if decision_task_id:
add_self = True
task_group_id = decision_task_id
else:
task_group_id = taskcluster.utils.slugId()
keys = {"taskGroupId": task_group_id}
id_mapping = {}
# First pass, do the template rendering and dependencies resolution
tasks = []
with open(args.data_pipeline_json) as pipeline_file:
raw_tasks = yaml.safe_load(pipeline_file.read())
version = os.getenv("TAG", "latest")
context = {"version": version}
rendered = jsone.render(raw_tasks, context)
for task in rendered["tasks"]:
# We need to generate new unique task ids for taskcluster to be happy
:param repo_name: push revision
:type repo_name: str
:param revision: push revision
:type revision: str
:return: TC tasks
:rtype: dict
"""
tasks = []
build_builders = {}
# We need to determine what upstream jobs need to be triggered besides the
# builders already on our list
for builder in builders:
if is_upstream(builder):
properties = {'upload_to_task_id': slugId()}
# Bug 1274483 - Android multi-locale nightly builds need to upload to two different
# tasks, thus, it fails when we tell it to upload to the same task twice.
builder_details = get_buildername_metadata(builder)
if builder_details['platform_name'].startswith('android') and \
builder_details['nightly'] is True and \
'l10n' not in builder:
properties = {}
task = _create_task(
buildername=builder,
repo_name=repo_name,
revision=revision,
# task_graph_id=task_graph_id,
properties=properties,
)
raise exceptions.TaskclusterFailure('Scope must be string')
# Credentials can only be valid for 31 days. I hope that
# this is validated on the server somehow...
if expiry - start > datetime.timedelta(days=31):
raise exceptions.TaskclusterFailure('Only 31 days allowed')
# We multiply times by 1000 because the auth service is JS and as a result
# uses milliseconds instead of seconds
cert = dict(
version=1,
scopes=scopes,
start=calendar.timegm(start.utctimetuple()) * 1000,
expiry=calendar.timegm(expiry.utctimetuple()) * 1000,
seed=utils.slugId().encode('ascii') + utils.slugId().encode('ascii'),
)
# if this is a named temporary credential, include the issuer in the certificate
if name:
cert['issuer'] = utils.toStr(clientId)
sig = ['version:' + utils.toStr(cert['version'])]
if name:
sig.extend([
'clientId:' + utils.toStr(name),
'issuer:' + utils.toStr(clientId),
])
sig.extend([
'seed:' + utils.toStr(cert['seed']),
'start:' + utils.toStr(cert['start']),
'expiry:' + utils.toStr(cert['expiry']),
# First pass, do the template rendering and dependencies resolution
tasks = []
with open(args.data_pipeline_json) as pipeline_file:
raw_tasks = yaml.safe_load(pipeline_file.read())
version = os.getenv("TAG", "latest")
context = {"version": version}
rendered = jsone.render(raw_tasks, context)
for task in rendered["tasks"]:
# We need to generate new unique task ids for taskcluster to be happy
# but need to identify dependencies across tasks. So we create a
# mapping between an internal ID and the generate ID
task_id = taskcluster.utils.slugId()
task_internal_id = task.pop("ID")
if task_internal_id in id_mapping:
raise ValueError(f"Conflicting IDs {task_internal_id}")
id_mapping[task_internal_id] = task_id
for key, value in keys.items():
task[key] = value
task_payload = task["payload"]
if "env" in task_payload and task_payload["env"]:
task_payload["env"]["TAG"] = version
else:
task_payload["env"] = {
def create_task(**kwargs):
""" Create a TC task.
NOTE: This code needs to be tested for normal TC tasks to determine
if the default values would also work for non BBB tasks.
"""
task_id = kwargs.get('taskId', slugId())
task_definition = {
'taskId': task_id,
# Do not retry the task if it fails to run successfully
'reruns': kwargs.get('reruns', 0),
'task': {
'workerType': kwargs['workerType'], # mandatory
'provisionerId': kwargs['provisionerId'], # mandatory
'created': kwargs.get('created', fromNow('0d')),
'deadline': kwargs.get('deadline', fromNow('1d')),
'expires': kwargs.get('deadline', fromNow('1d')),
'payload': kwargs.get('payload', {}),
'metadata': kwargs['metadata'], # mandatory
'schedulerId': kwargs.get('schedulerId', 'task-graph-scheduler'),
'tags': kwargs.get('tags', {}),
'extra': kwargs.get('extra', {}),
:rtype: dict
"""
if not type(required_task_ids) == list:
raise MozciError("required_task_ids must be a list")
tasks = []
if type(builders_graph) != dict:
raise MozciError("The buildbot graph should be a dictionary")
# Let's iterate through the root builders in this graph
for builder, dependent_graph in builders_graph.iteritems():
# Due to bug 1221091 this will be used to know to which task
# the artifacts will be uploaded to
upload_to_task_id = slugId()
properties = {'upload_to_task_id': upload_to_task_id}
builder_details = get_buildername_metadata(builder)
# Bug 1274483 - Android multi-locale nightly builds need to upload to two different tasks,
# thus, it fails when we tell it to upload to the same task twice.
if builder_details['platform_name'].startswith('android') and \
builder_details['nightly'] is True and \
'l10n' not in builder:
properties = {}
task = _create_task(
buildername=builder,
repo_name=repo_name,
revision=revision,
metadata=metadata,
task_graph_id=task_graph_id,