Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
server_args=([(aioftp.User(maximum_connections=4),)], {}))
@expect_codes_in_exception("530")
@with_connection
async def test_multiply_connections_limited_error(loop, client, server):
clients = [aioftp.Client(loop=loop, ssl=client.ssl) for _ in range(5)]
for client in clients:
await client.connect("127.0.0.1", PORT)
await client.login()
for client in clients:
await client.quit()
[[aioftp.User(read_speed_limit_per_connection=200 * 1024)]],
{},
))
@with_connection
@with_tmp_dir("foo")
async def test_server_user_per_connection_read_throttle_multi_users(loop,
client,
server, *,
tmp_dir):
async def worker(fname):
_client = aioftp.Client(loop=loop, ssl=client.ssl)
await _client.connect("127.0.0.1", PORT)
await _client.login()
await _client.upload(
"tests/foo/foo.txt",
[(aioftp.User(base_path="tests/foo", home_path="/"),)],
{}))
@with_connection
@expect_codes_in_exception("503")
@with_tmp_dir("foo")
async def test_wait_pasv_timeout_fail_long(loop, client, server, *, tmp_dir):
f = tmp_dir / "foo.txt"
b = b"foobar"
await client.login()
await client.command("STOR " + f.name)
await asyncio.sleep(2, loop=loop)
reader, writer = await client.get_passive_connection("I")
with contextlib.closing(writer) as writer:
def test_reprs_works():
repr(aioftp.Throttle())
repr(aioftp.Permission())
repr(aioftp.User())
def test_user_not_absolute_home():
with pytest.raises(aioftp.errors.PathIsNotAbsolute):
aioftp.User(home_path="foo")
[[aioftp.User(write_speed_limit_per_connection=200 * 1024)]],
{},
))
@with_connection
@with_tmp_dir("foo")
async def test_server_user_per_connection_write_throttle_multi_users(loop,
client,
server, *,
tmp_dir):
async def worker(fname):
_client = aioftp.Client(loop=loop, ssl=client.ssl)
await _client.connect("127.0.0.1", PORT)
await _client.login()
await _client.download(
"tests/foo/foo.txt",
[[aioftp.User(read_speed_limit=200 * 1024)]],
{},
))
@with_connection
@with_tmp_dir("foo")
async def test_server_user_global_read_throttle_multi_users(loop, client,
server, *,
tmp_dir):
async def worker(fname):
_client = aioftp.Client(loop=loop, ssl=client.ssl)
await _client.connect("127.0.0.1", PORT)
await _client.login()
await _client.upload(
"tests/foo/foo.txt",
str.format("tests/foo/{}", fname),
[(aioftp.User(base_path="tests/foo", home_path="/"),)],
{}))
@with_connection
@expect_codes_in_exception("425")
@with_tmp_dir("foo")
async def test_wait_pasv_timeout_ok_but_too_long(loop, client, server, *,
tmp_dir):
f = tmp_dir / "foo.txt"
b = b"foobar"
await client.login()
await client.command("TYPE I", "200")
code, info = await client.command("PASV", "227")
ip, port = client.parse_pasv_response(info[-1])
await client.command("STOR " + f.name)
args = parser.parse_args()
print(f"aioftp v{aioftp.__version__}")
if not args.quiet:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(message)s",
datefmt="[%H:%M:%S]:",
)
if args.memory:
user = aioftp.User(args.login, args.password, base_path="/")
path_io_factory = aioftp.MemoryPathIO
else:
if args.home:
user = aioftp.User(args.login, args.password, base_path=args.home)
else:
user = aioftp.User(args.login, args.password)
path_io_factory = aioftp.PathIO
family = {
"ipv4": socket.AF_INET,
"ipv6": socket.AF_INET6,
"auto": socket.AF_UNSPEC,
}[args.family]
server = aioftp.Server([user], path_io_factory=path_io_factory)
loop = asyncio.get_event_loop()
loop.run_until_complete(server.start(args.host, args.port, family=family))
try:
loop.run_forever()
except KeyboardInterrupt:
loop.run_until_complete(server.close())