Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
rev=rev,
tree=tree,
product=artifact_job_class.product,
job=job,
)
self.log(logging.INFO, 'artifact',
{'namespace': namespace},
'Searching Taskcluster index with namespace: {namespace}')
try:
taskId = find_task_id(namespace)
except KeyError:
# Not all revisions correspond to pushes that produce the job we
# care about; and even those that do may not have completed yet.
raise ValueError('Task for {namespace} does not exist (yet)!'.format(namespace=namespace))
return taskId, list_artifacts(taskId)
def install_from_task(self, taskId, distdir):
artifacts = list_artifacts(taskId)
urls = []
for artifact_name in self._artifact_job.find_candidate_artifacts(artifacts):
# We can easily extract the task ID from the URL. We can't easily
# extract the build ID; we use the .ini files embedded in the
# downloaded artifact for this.
url = get_artifact_url(taskId, artifact_name)
urls.append(url)
if not urls:
raise ValueError('Task {taskId} existed, but no artifacts found!'.format(taskId=taskId))
for url in urls:
if self.install_from_url(url, distdir):
return 1
return 0