Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@PathPermissions(PathPermissions.writable)
async def rnto(self, connection, rest):
real_path, virtual_path = self.get_paths(connection, rest)
rename_from = connection.rename_from
del connection.rename_from
await connection.path_io.rename(rename_from, real_path)
connection.response("250", "")
return True
@PathPermissions(PathPermissions.writable)
async def rmd(self, connection, rest):
real_path, virtual_path = self.get_paths(connection, rest)
await connection.path_io.rmdir(real_path)
connection.response("250", "")
return True
@PathPermissions(PathPermissions.readable)
async def mlst(self, connection, rest):
real_path, virtual_path = self.get_paths(connection, rest)
s = await self.build_mlsx_string(connection, real_path)
connection.response("250", ["start", s, "end"], True)
return True
@PathPermissions(PathPermissions.writable)
async def dele(self, connection, rest):
real_path, virtual_path = self.get_paths(connection, rest)
await connection.path_io.unlink(real_path)
connection.response("250", "")
return True
@PathPermissions(PathPermissions.readable)
async def retr(self, connection, rest):
@ConnectionConditions(
ConnectionConditions.data_connection_made,
wait=True,
fail_code="425",
fail_info="Can't open data connection")
@worker
async def retr_worker(self, connection, rest):
stream = connection.data_connection
del connection.data_connection
file_in = connection.path_io.open(real_path, mode="rb")
async with file_in, stream:
if connection.restart_offset:
await file_in.seek(connection.restart_offset)
async for data in file_in.iter_by_block(connection.block_size):