Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
""" It schedules a TaskCluster graph and returns its id.
:param task_graph: It is a TaskCluster graph as defined in here:
http://docs.taskcluster.net/scheduler/api-docs/#createTaskGraph
:type task_graph: json
:param task_graph_id: TC graph id to which this task belongs to
:type task_graph_id: str
:param dry_run: It does not schedule the graph
:type dry_run: bool
:returns: task graph id.
:rtype: int
"""
if not task_graph_id:
task_graph_id = taskcluster_client.slugId()
scheduler = taskcluster_client.Scheduler()
LOG.info("Outputting the graph (graph id: %s):" % task_graph_id)
# We print to stdout instead of using the standard logging with dates and info levels
# XXX: Use a different formatter for other tools to work better with this code
print(json.dumps(task_graph, indent=4))
if dry_run:
LOG.info("DRY-RUN: We have not scheduled the graph.")
else:
if not credentials_available():
return None
try:
# https://github.com/taskcluster/taskcluster-client.py#create-new-task-graph
result = scheduler.createTaskGraph(task_graph_id, task_graph)
LOG.info("See the graph in %s%s" % (TC_TASK_GRAPH_INSPECTOR, task_graph_id))
return result
def get_task_graph_status(task_graph_id):
""" Returns state of a Task-Graph Status Response
"""
scheduler = taskcluster_client.Scheduler()
response = scheduler.status(task_graph_id)
return response['status']['state']
def get_task_graph_status(task_graph_id):
""" Returns state of a Task-Graph Status Response
"""
scheduler = taskcluster_client.Scheduler()
response = scheduler.status(task_graph_id)
return response['status']['state']
def extend_task_graph(task_graph_id, task_graph, dry_run=False):
"""
From: http://docs.taskcluster.net/scheduler/api-docs/#extendTaskGraph
Safety, it is only safe to call this API end-point while the task-graph
being modified is still running. If the task-graph is finished or blocked,
this method will leave the task-graph in this state. Hence, it is only
truly safe to call this API end-point from within a task in the task-graph
being modified.
returns Task-Graph Status Response
"""
# XXX: handle the case when the task-graph is not running
scheduler = taskcluster_client.Scheduler()
if dry_run:
LOG.info("DRY-RUN: We have not extended the graph.")
else:
LOG.debug("When extending a graph we don't need metadata and scopes.")
del task_graph['metadata']
del task_graph['scopes']
print(json.dumps(task_graph, indent=4))
return scheduler.extendTaskGraph(task_graph_id, task_graph)
""" It schedules a TaskCluster graph and returns its id.
:param task_graph: It is a TaskCluster graph as defined in here:
http://docs.taskcluster.net/scheduler/api-docs/#createTaskGraph
:type task_graph: json
:param task_graph_id: TC graph id to which this task belongs to
:type task_graph_id: str
:param dry_run: It does not schedule the graph
:type dry_run: bool
:returns: task graph id.
:rtype: int
"""
if not task_graph_id:
task_graph_id = taskcluster_client.slugId()
scheduler = taskcluster_client.Scheduler()
LOG.info("Outputting the graph (graph id: %s):" % task_graph_id)
# We print to stdout instead of using the standard logging with dates and info levels
# XXX: Use a different formatter for other tools to work better with this code
print(json.dumps(task_graph, indent=4))
if dry_run:
LOG.info("DRY-RUN: We have not scheduled the graph.")
else:
if not credentials_available():
return None
try:
# https://github.com/taskcluster/taskcluster-client.py#create-new-task-graph
result = scheduler.createTaskGraph(task_graph_id, task_graph)
LOG.info("See the graph in %s%s" % (TC_TASK_GRAPH_INSPECTOR, task_graph_id))
return result
def extend_task_graph(task_graph_id, task_graph, dry_run=False):
"""
From: http://docs.taskcluster.net/scheduler/api-docs/#extendTaskGraph
Safety, it is only safe to call this API end-point while the task-graph
being modified is still running. If the task-graph is finished or blocked,
this method will leave the task-graph in this state. Hence, it is only
truly safe to call this API end-point from within a task in the task-graph
being modified.
returns Task-Graph Status Response
"""
# XXX: handle the case when the task-graph is not running
scheduler = taskcluster_client.Scheduler()
if dry_run:
LOG.info("DRY-RUN: We have not extended the graph.")
else:
LOG.debug("When extending a graph we don't need metadata and scopes.")
del task_graph['metadata']
del task_graph['scopes']
print(json.dumps(task_graph, indent=4))
return scheduler.extendTaskGraph(task_graph_id, task_graph)