Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def worker(fname):
_client = aioftp.Client(loop=loop, ssl=client.ssl)
await _client.connect("127.0.0.1", PORT)
await _client.login()
await _client.download(
"tests/foo/foo.txt",
str.format("tests/foo/{}", fname),
write_into=True
)
await _client.quit()
async def worker(fname):
_client = aioftp.Client(loop=loop, ssl=client.ssl)
await _client.connect("127.0.0.1", PORT)
await _client.login()
await _client.upload(
"tests/foo/foo.txt",
str.format("tests/foo/{}", fname),
write_into=True
)
await _client.quit()
def test_user_manager_timeout():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(None)
server = aioftp.Server(SlowUserManager(None, timeout=1, loop=loop),
loop=loop)
client = aioftp.Client(loop=loop)
async def coro():
try:
await server.start(None, 8888)
await client.connect("127.0.0.1", 8888)
await client.login()
finally:
await server.close()
loop.run_until_complete(coro())
def test_parse_list_line_failed():
with pytest.raises(ValueError):
aioftp.Client(encoding="utf-8").parse_list_line(b"what a hell?!")
def run_in_loop(s_args, s_kwargs, c_args, c_kwargs, s_ssl=None, c_ssl=None):
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(message)s",
datefmt="[%H:%M:%S]:",
)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(None)
server = aioftp.Server(*s_args, loop=loop, ssl=s_ssl, **s_kwargs)
client = aioftp.Client(*c_args, loop=loop, ssl=c_ssl, **c_kwargs)
try:
loop.run_until_complete(f(loop, client, server))
finally:
if hasattr(server, "server"):
loop.run_until_complete(server.close())
if hasattr(client, "writer"):
client.close()
loop.close()
async def ftp_fetch(host, path, outfile, digest_func):
client = aioftp.Client()
await client.connect(host)
await client.login("anonymous", "drobbins@funtoo.org")
fd = open(outfile, 'wb')
hash = digest_func()
if not await client.exists(path):
return ("ftp_missing", None)
stream = await client.download_stream(path)
async for block in stream.iter_by_block(chunk_size):
sys.stdout.write(".")
sys.stdout.flush()
fd.write(block)
hash.update(block)
await stream.finish()
cur_digest = hash.hexdigest()
await client.quit()
fd.close()