Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
PathConditions.path_must_be_dir)
@PathPermissions(PathPermissions.writable)
async def rmd(self, connection, rest):
real_path, virtual_path = self.get_paths(connection, rest)
await connection.path_io.rmdir(real_path)
connection.response("250", "")
return True
@PathConditions(PathConditions.path_must_not_exists)
@PathPermissions(PathPermissions.writable)
async def rnto(self, connection, rest):
real_path, virtual_path = self.get_paths(connection, rest)
rename_from = connection.rename_from
del connection.rename_from
await connection.path_io.rename(rename_from, real_path)
connection.response("250", "")
return True
PathConditions.path_must_exists,
PathConditions.path_must_be_file)
@PathPermissions(PathPermissions.writable)
async def dele(self, connection, rest):
real_path, virtual_path = self.get_paths(connection, rest)
await connection.path_io.unlink(real_path)
connection.response("250", "")
return True
@PathConditions(
PathConditions.path_must_exists,
PathConditions.path_must_be_file)
@PathPermissions(PathPermissions.readable)
async def retr(self, connection, rest):
@ConnectionConditions(
ConnectionConditions.data_connection_made,
wait=True,
fail_code="425",
fail_info="Can't open data connection")
@worker
async def retr_worker(self, connection, rest):
stream = connection.data_connection
del connection.data_connection
file_in = connection.path_io.open(real_path, mode="rb")
async with file_in, stream:
@PathConditions(
PathConditions.path_must_exists,
PathConditions.path_must_be_dir)
@PathPermissions(PathPermissions.readable)
async def cwd(self, connection, rest):
real_path, virtual_path = self.get_paths(connection, rest)
connection.current_directory = virtual_path
connection.response("250", "")
return True
PathConditions.path_must_be_file)
@PathPermissions(PathPermissions.writable)
async def dele(self, connection, rest):
real_path, virtual_path = self.get_paths(connection, rest)
await connection.path_io.unlink(real_path)
connection.response("250", "")
return True
@PathConditions(PathConditions.path_must_not_exists)
@PathPermissions(PathPermissions.writable)
async def mkd(self, connection, rest):
real_path, virtual_path = self.get_paths(connection, rest)
await connection.path_io.mkdir(real_path, parents=True)
connection.response("257", "")
return True
@PathConditions(PathConditions.path_must_exists)
@PathPermissions(PathPermissions.readable)
async def list(self, connection, rest):
@ConnectionConditions(
ConnectionConditions.data_connection_made,
wait=True,
fail_code="425",
fail_info="Can't open data connection")
@worker
async def list_worker(self, connection, rest):
stream = connection.data_connection
del connection.data_connection
async with stream:
async for path in connection.path_io.list(real_path):
s = await self.build_list_string(connection, path)
b = (s + END_OF_LINE).encode(encoding=self.encoding)