Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_server_side_throttle(pair_factory, skip_sleep, times, users,
throttle_direction, data_direction,
throttle_level):
async with pair_factory() as pair:
names = []
for i in range(users):
name = f"foo{i}"
names.append(name)
await pair.make_server_files(name, size=SIZE)
throttle = reduce(getattr, [throttle_level, throttle_direction],
pair.server)
throttle.limit = SIZE / times
clients = []
for name in names:
c = aioftp.Client(path_io_factory=aioftp.MemoryPathIO)
async with c.path_io.open(Path(name), "wb") as f:
await f.write(b"-" * SIZE)
await c.connect(pair.server.server_host, pair.server.server_port)
await c.login()
clients.append(c)
coros = [getattr(c, data_direction)(n) for c, n in zip(clients, names)]
await asyncio.gather(*coros)
await asyncio.gather(*[c.quit() for c in clients])
throttled = {("read", "upload"), ("write", "download")}
if (throttle_direction, data_direction) not in throttled:
assert skip_sleep.is_close(0)
else:
t = times
if throttle_level == "throttle": # global
t *= users
assert skip_sleep.is_close(t)
from aioftp.server import MemoryUserManager
@nose.tools.raises(ConnectionResetError)
@aioftp_setup(
server_args=(
[(aioftp.User(base_path="tests/foo", home_path="/"),)],
{"idle_timeout": 1}))
@with_connection
async def test_idle_timeout(loop, client, server):
await asyncio.sleep(2, loop=loop)
await client.login()
class SlowMemoryPathIO(aioftp.MemoryPathIO):
@aioftp.pathio.universal_exception
@aioftp.with_timeout
async def mkdir(self, path, parents=False, exist_ok=False):
await asyncio.sleep(10, loop=self.loop)
@aioftp.pathio.universal_exception
@aioftp.with_timeout
async def _open(self, path, mode):
await asyncio.sleep(10, loop=self.loop)
@aioftp_setup(
server_args=(
if path.is_dir():
sh.rm("-rdf", str(path))
cov = pathlib.Path(".coverage")
if cov.exists():
cov.unlink()
run_tests = sh.Command("nosetests")
for el in dir(aioftp):
item = getattr(aioftp, el)
if inspect.isclass(item) and issubclass(item, aioftp.AbstractPathIO) and \
item not in (aioftp.AbstractPathIO, aioftp.MemoryPathIO):
print(str.format("Testing {}", el))
new_env = os.environ.copy()
new_env["AIOFTP_TESTS"] = el
p = run_tests(
str.format("--exclude={}", __file__),
"--stop",
"--no-byte-compile",
"--with-coverage",
"--cover-package=aioftp",
"--logging-format='%(asctime)s %(message)s'",
"--logging-datefmt='[%H:%M:%S]:'",
"--logging-level=INFO",
_env=new_env,
_err=lambda line: print(str.strip(line)),
_out=lambda line: print(str.strip(line)),
assert await pair.server_paths_exists("bar") is False
assert await pair.server_paths_exists("baz")
@pytest.mark.asyncio
async def test_rename_non_empty_directory(pair_factory):
async with pair_factory() as pair:
await pair.make_server_files("bar/foo.txt")
assert await pair.server_paths_exists("bar/foo.txt", "bar")
await pair.client.make_directory("hurr")
await pair.client.rename("bar", "hurr/baz")
assert await pair.server_paths_exists("hurr/baz/foo.txt")
assert await pair.server_paths_exists("bar") is False
class FakeErrorPathIO(aioftp.MemoryPathIO):
def list(self, path):
class Lister(aioftp.AbstractAsyncLister):
@aioftp.pathio.universal_exception
async def __anext__(self):
raise Exception("KERNEL PANIC")
return Lister(timeout=self.timeout)
@pytest.mark.asyncio
async def test_exception_in_list(pair_factory, Server,
expect_codes_in_exception):
s = Server(path_io_factory=FakeErrorPathIO)
async with pair_factory(None, s) as pair:
def test_memory_path_io_repr():
from aioftp.pathio import Node
pio = aioftp.MemoryPathIO(loop="loop")
pio.fs = [Node("dir", "/", 1, 1, content=[])]
nose.tools.eq_(
repr(pio),
"[Node(type='dir', name='/', ctime=1, mtime=1, content=[])]"
)
parser.add_argument("--family", choices=("ipv4", "ipv6", "auto"),
default="auto",
help="Socket family [default: %(default)s]")
args = parser.parse_args()
print(f"aioftp v{aioftp.__version__}")
if not args.quiet:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(name)s] %(message)s",
datefmt="[%H:%M:%S]:",
)
if args.memory:
user = aioftp.User(args.login, args.password, base_path="/")
path_io_factory = aioftp.MemoryPathIO
else:
if args.home:
user = aioftp.User(args.login, args.password, base_path=args.home)
else:
user = aioftp.User(args.login, args.password)
path_io_factory = aioftp.PathIO
family = {
"ipv4": socket.AF_INET,
"ipv6": socket.AF_INET6,
"auto": socket.AF_UNSPEC,
}[args.family]
server = aioftp.Server([user], path_io_factory=path_io_factory)
loop = asyncio.get_event_loop()
loop.run_until_complete(server.start(args.host, args.port, family=family))
try: