Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
download_workers = []
writer = self.loop.create_task(
self._write_worker(downloaded_chunks_queue, file_pb, filepath))
download_workers.append(
self.loop.create_task(self._ftp_download_worker(stream, downloaded_chunks_queue))
)
await asyncio.gather(*download_workers)
await downloaded_chunks_queue.join()
writer.cancel()
return str(filepath)
except Exception as e:
raise FailedDownload(filepath_partial, url, e)
else:
download_workers.append(
self.loop.create_task(self._http_download_worker(
session, url, chunksize, None, timeout, downloaded_chunk_queue, **kwargs
))
)
# run all the download workers
await asyncio.gather(*download_workers)
# join() waits till all the items in the queue have been processed
await downloaded_chunk_queue.join()
writer.cancel()
return str(filepath)
except Exception as e:
raise FailedDownload(filepath_partial, url, e)
def __str__(self):
out = super().__repr__()
if self.errors:
out += '\nErrors:\n'
for error in self.errors:
if isinstance(error, FailedDownload):
resp = self._get_nice_resp_repr(error.exception)
out += f"(url={error.url}, response={resp})\n"
else:
out += "({})".format(repr(error))
return out
"""
timeouts = timeouts or {"total": os.environ.get("PARFIVE_TOTAL_TIMEOUT", 5 * 60),
"sock_read": os.environ.get("PARFIVE_SOCK_READ_TIMEOUT", 90)}
try:
future = self.run_until_complete(self._run_download(timeouts))
finally:
self.loop.stop()
dlresults = future.result()
results = Results()
# Iterate through the results and store any failed download errors in
# the errors list of the results object.
for res in dlresults:
if isinstance(res, FailedDownload):
results.add_error(res.filepath_partial, res.url, res.exception)
elif isinstance(res, Exception):
raise res
else:
results.append(res)
return results
filepath : `str`
The name of the file saved.
"""
timeout = aiohttp.ClientTimeout(**timeouts)
try:
scheme = urllib.parse.urlparse(url).scheme
if 'HTTP_PROXY' in os.environ and scheme == 'http':
kwargs['proxy'] = os.environ['HTTP_PROXY']
elif 'HTTPS_PROXY' in os.environ and scheme == 'https':
kwargs['proxy'] = os.environ['HTTPS_PROXY']
async with session.get(url, timeout=timeout, **kwargs) as resp:
if resp.status != 200:
raise FailedDownload(filepath_partial, url, resp)
else:
filepath, skip = get_filepath(filepath_partial(resp, url), overwrite)
if skip:
return str(filepath)
if callable(file_pb):
file_pb = file_pb(position=token.n, unit='B', unit_scale=True,
desc=filepath.name, leave=False,
total=get_http_size(resp))
else:
file_pb = None
# This queue will contain the downloaded chunks and their offsets
# as tuples: (offset, chunk)
downloaded_chunk_queue = asyncio.Queue()
download_workers = []