Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
-------
filepath : `str`
The name of the file saved.
"""
parse = urllib.parse.urlparse(url)
try:
async with aioftp.ClientSession(parse.hostname, **kwargs) as client:
if parse.username and parse.password:
await client.login(parse.username, parse.password)
# This has to be done before we start streaming the file:
total_size = await get_ftp_size(client, parse.path)
async with client.download_stream(parse.path) as stream:
filepath, skip = get_filepath(filepath_partial(None, url), overwrite)
if skip:
return str(filepath)
if callable(file_pb):
file_pb = file_pb(position=token.n, unit='B', unit_scale=True,
desc=filepath.name, leave=False, total=total_size)
else:
file_pb = None
downloaded_chunks_queue = asyncio.Queue()
download_workers = []
writer = self.loop.create_task(
self._write_worker(downloaded_chunks_queue, file_pb, filepath))
download_workers.append(
self.loop.create_task(self._ftp_download_worker(stream, downloaded_chunks_queue))
)