Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def post(self, url, data=None, headers={}):
try:
res = requests.post(self.url(url), data=data, headers=headers, timeout=self.timeout)
if res.status_code == 401:
raise NodeResponseError("Unauthorized. Do you need to set a token?")
elif res.status_code != 200 and res.status_code != 403:
raise NodeServerError(res.status_code)
if "Content-Type" in res.headers and "application/json" in res.headers['Content-Type']:
result = res.json()
if 'error' in result:
raise NodeResponseError(result['error'])
return result
else:
return res
except json.decoder.JSONDecodeError as e:
raise NodeServerError(str(e))
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
raise NodeConnectionError(str(e))
# will be processed at the next tick
if self.status == status_codes.QUEUED:
logger.info("Processing node {} went offline, reassigning {}...".format(self.processing_node, self))
self.uuid = ''
self.processing_node = None
self.status = None
self.save()
elif self.status == status_codes.RUNNING:
# Task was running and processing node went offline
# It could have crashed due to low memory
# or perhaps it went offline due to network errors.
# We can't easily differentiate between the two, so we need
# to notify the user because if it crashed due to low memory
# the user might need to take action (or be stuck in an infinite loop)
raise NodeServerError("Processing node went offline. This could be due to insufficient memory or a network error.")
if self.processing_node:
# Need to process some images (UUID not yet set and task doesn't have pending actions)?
if not self.uuid and self.pending_action is None and self.status is None:
logger.info("Processing... {}".format(self))
images = [image.path() for image in self.imageupload_set.all()]
# Track upload progress, but limit the number of DB updates
# to every 2 seconds (and always record the 100% progress)
last_update = 0
def callback(progress):
nonlocal last_update
time_has_elapsed = time.time() - last_update >= 2
if time_has_elapsed:
def handle_task_new_response(self, result):
if isinstance(result, dict) and 'uuid' in result:
return Task(self, result['uuid'])
elif isinstance(result, dict) and 'error' in result:
raise NodeResponseError(result['error'])
else:
raise NodeServerError('Invalid response: ' + str(result))
res = requests.post(self.url(url), data=data, headers=headers, timeout=self.timeout)
if res.status_code == 401:
raise NodeResponseError("Unauthorized. Do you need to set a token?")
elif res.status_code != 200 and res.status_code != 403:
raise NodeServerError(res.status_code)
if "Content-Type" in res.headers and "application/json" in res.headers['Content-Type']:
result = res.json()
if 'error' in result:
raise NodeResponseError(result['error'])
return result
else:
return res
except json.decoder.JSONDecodeError as e:
raise NodeServerError(str(e))
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
raise NodeConnectionError(str(e))
with open(zip_path, 'wb') as fd:
for chunk in download_stream.iter_content(4096):
downloaded += len(chunk)
if time.time() - last_update >= 2:
# Update progress
if total_length is not None:
Task.objects.filter(pk=self.id).update(running_progress=(float(downloaded) / total_length) * 0.9)
self.check_if_canceled()
last_update = time.time()
fd.write(chunk)
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, ReadTimeoutError) as e:
raise NodeServerError(e)
self.refresh_from_db()
try:
self.extract_assets_and_complete()
except zipfile.BadZipFile:
raise NodeServerError("Invalid zip file")
images_json = self.assets_path("images.json")
if os.path.exists(images_json):
try:
with open(images_json) as f:
images = json.load(f)
self.images_count = len(images)
except:
logger.warning("Cannot read images count from imported task {}".format(self))
nonlocal last_update
time_has_elapsed = time.time() - last_update >= 2
if time_has_elapsed:
testWatch.manual_log_call("Task.process.callback")
self.check_if_canceled()
Task.objects.filter(pk=self.id).update(upload_progress=float(progress) / 100.0)
last_update = time.time()
# This takes a while
try:
uuid = self.processing_node.process_new_task(images, self.name, self.options, callback)
except NodeConnectionError as e:
# If we can't create a task because the node is offline
# We want to fail instead of trying again
raise NodeServerError('Connection error: ' + str(e))
# Refresh task object before committing change
self.refresh_from_db()
self.upload_progress = 1.0
self.uuid = uuid
self.save()
# TODO: log process has started processing
if self.pending_action is not None:
if self.pending_action == pending_actions.CANCEL:
# Do we need to cancel the task on the processing node?
logger.info("Canceling {}".format(self))
if self.processing_node and self.uuid:
# Attempt to cancel the task on the processing node
# We don't care if this fails (we tried)
def get(self, url, query={}, **kwargs):
try:
res = requests.get(self.url(url, query), timeout=self.timeout, **kwargs)
if res.status_code == 401:
raise NodeResponseError("Unauthorized. Do you need to set a token?")
elif not res.status_code in [200, 403, 206]:
raise NodeServerError("Unexpected status code: %s" % res.status_code)
if "Content-Type" in res.headers and "application/json" in res.headers['Content-Type']:
result = res.json()
if 'error' in result:
raise NodeResponseError(result['error'])
return result
else:
return res
except json.decoder.JSONDecodeError as e:
raise NodeServerError(str(e))
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
raise NodeConnectionError(str(e))
# to process this the next tick
self.uuid = ''
# We also remove the "rerun-from" parameter if it's set
self.options = list(filter(lambda d: d['name'] != 'rerun-from', self.options))
self.upload_progress = 0
self.console_output = ""
self.processing_time = -1
self.status = None
self.last_error = None
self.pending_action = None
self.running_progress = 0
self.save()
else:
raise NodeServerError("Cannot restart a task that has no processing node")
elif self.pending_action == pending_actions.REMOVE:
logger.info("Removing {}".format(self))
if self.processing_node and self.uuid:
# Attempt to delete the resources on the processing node
# We don't care if this fails, as resources on processing nodes
# Are expected to be purged on their own after a set amount of time anyway
try:
self.processing_node.remove_task(self.uuid)
except OdmError:
pass
# What's more important is that we delete our task properly here
self.delete()
# Stop right here!
# Rename to all.zip
all_zip_path = self.assets_path("all.zip")
os.rename(zip_path, all_zip_path)
logger.info("Extracting all.zip for {}".format(self))
try:
self.extract_assets_and_complete()
extracted = True
except zipfile.BadZipFile:
if retry_num < 4:
logger.warning("{} seems corrupted. Retrying...".format(all_zip_path))
retry_num += 1
os.remove(all_zip_path)
else:
raise NodeServerError("Invalid zip file")
else:
# FAILED, CANCELED
self.save()
else:
# Still waiting...
self.save()
except (NodeServerError, NodeResponseError) as e:
self.set_failure(str(e))
except NodeConnectionError as e:
logger.warning("{} connection/timeout error: {}. We'll try reprocessing at the next tick.".format(self, str(e)))
except TaskInterruptedException as e:
# Task was interrupted during image resize / upload
logger.warning("{} interrupted".format(self, str(e)))