Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
for data_port in data_ports:
self.available_data_ports.put_nowait((0, data_port))
else:
self.available_data_ports = None
if isinstance(users, AbstractUserManager):
self.user_manager = users
else:
self.user_manager = MemoryUserManager(users)
self.available_connections = AvailableConnections(maximum_connections)
self.throttle = StreamThrottle.from_limits(
read_speed_limit,
write_speed_limit,
)
self.throttle_per_connection = StreamThrottle.from_limits(
read_speed_limit_per_connection,
write_speed_limit_per_connection,
)
self.throttle_per_user = {}
self.encoding = encoding
self.ssl = ssl
self.path_io_factory = pathio.PathIONursery(path_io_factory)
self.path_timeout = path_timeout
if data_ports is not None:
self.available_data_ports = asyncio.PriorityQueue()
for data_port in data_ports:
self.available_data_ports.put_nowait((0, data_port))
else:
self.available_data_ports = None
if isinstance(users, AbstractUserManager):
self.user_manager = users
else:
self.user_manager = MemoryUserManager(users)
self.available_connections = AvailableConnections(maximum_connections)
self.throttle = StreamThrottle.from_limits(
read_speed_limit,
write_speed_limit,
)
self.throttle_per_connection = StreamThrottle.from_limits(
read_speed_limit_per_connection,
write_speed_limit_per_connection,
)
self.throttle_per_user = {}
self.encoding = encoding
self.ssl = ssl
else:
message = f"Unknown response {state}"
raise NotImplementedError(message)
if connection.future.user.done():
connection.current_directory = connection.user.home_path
if connection.user not in self.throttle_per_user:
throttle = StreamThrottle.from_limits(
connection.user.read_speed_limit,
connection.user.write_speed_limit,
)
self.throttle_per_user[connection.user] = throttle
connection.command_connection.throttles.update(
user_global=self.throttle_per_user[connection.user],
user_per_connection=StreamThrottle.from_limits(
connection.user.read_speed_limit_per_connection,
connection.user.write_speed_limit_per_connection,
)
)
connection.response(code, info)
return True
code = "230"
connection.logged = True
connection.user = user
elif state == AbstractUserManager.GetUserResponse.PASSWORD_REQUIRED:
code = "331"
connection.user = user
elif state == AbstractUserManager.GetUserResponse.ERROR:
code = "530"
else:
message = f"Unknown response {state}"
raise NotImplementedError(message)
if connection.future.user.done():
connection.current_directory = connection.user.home_path
if connection.user not in self.throttle_per_user:
throttle = StreamThrottle.from_limits(
connection.user.read_speed_limit,
connection.user.write_speed_limit,
)
self.throttle_per_user[connection.user] = throttle
connection.command_connection.throttles.update(
user_global=self.throttle_per_user[connection.user],
user_per_connection=StreamThrottle.from_limits(
connection.user.read_speed_limit_per_connection,
connection.user.write_speed_limit_per_connection,
)
)
connection.response(code, info)
return True