Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if not pkt.is_extra_auth_data():
raise OperationalError(
"caching sha2: Unknown packet "
"for fast auth: {0}".format(pkt._data[:1])
)
# magic numbers:
# 2 - request public key
# 3 - fast auth succeeded
# 4 - need full auth
pkt.advance(1)
n = pkt.read_uint8()
if n == 3:
logger.debug("caching sha2: succeeded by fast path.")
pkt = await self._read_packet()
pkt.check_error() # pkt must be OK packet
return pkt
if n != 4:
raise OperationalError("caching sha2: Unknown "
"result for fast auth: {0}".format(n))
logger.debug("caching sha2: Trying full auth...")
if self._ssl_context:
logger.debug("caching sha2: Sending plain "
"password via secure connection")
self.write_packet(self._password.encode('latin1') + b'\0')
pkt = await self._read_packet()
pkt.check_error()
pkt.check_error()
return pkt
if not self.server_public_key:
self.write_packet(b'\x02')
pkt = await self._read_packet() # Request public key
pkt.check_error()
if not pkt.is_extra_auth_data():
raise OperationalError(
"caching sha2: Unknown packet "
"for public key: {0}".format(pkt._data[:1])
)
self.server_public_key = pkt._data[1:]
logger.debug(self.server_public_key.decode('ascii'))
data = _auth.sha2_rsa_encrypt(
self._password.encode('latin1'), self.salt,
self.server_public_key
)
self.write_packet(data)
pkt = await self._read_packet()
pkt.check_error()
n = pkt.read_uint8()
if n == 3:
logger.debug("caching sha2: succeeded by fast path.")
pkt = await self._read_packet()
pkt.check_error() # pkt must be OK packet
return pkt
if n != 4:
raise OperationalError("caching sha2: Unknown "
"result for fast auth: {0}".format(n))
logger.debug("caching sha2: Trying full auth...")
if self._ssl_context:
logger.debug("caching sha2: Sending plain "
"password via secure connection")
self.write_packet(self._password.encode('latin1') + b'\0')
pkt = await self._read_packet()
pkt.check_error()
return pkt
if not self.server_public_key:
self.write_packet(b'\x02')
pkt = await self._read_packet() # Request public key
pkt.check_error()
if not pkt.is_extra_auth_data():
raise OperationalError(
"caching sha2: Unknown packet "
"for public key: {0}".format(pkt._data[:1])
)
async def sha256_password_auth(self, pkt):
if self._ssl_context:
logger.debug("sha256: Sending plain password")
data = self._password.encode('latin1') + b'\0'
self.write_packet(data)
pkt = await self._read_packet()
pkt.check_error()
return pkt
if pkt.is_auth_switch_request():
self.salt = pkt.read_all()
if not self.server_public_key and self._password:
# Request server public key
logger.debug("sha256: Requesting server public key")
self.write_packet(b'\1')
pkt = await self._read_packet()
pkt.check_error()
if pkt.is_extra_auth_data():
self.server_public_key = pkt._data[1:]
logger.debug(
"Received public key:\n",
self.server_public_key.decode('ascii')
)
if self._password:
if not self.server_public_key:
raise OperationalError("Couldn't receive server's public key")
data = _auth.sha2_rsa_encrypt(
async def sha256_password_auth(self, pkt):
if self._ssl_context:
logger.debug("sha256: Sending plain password")
data = self._password.encode('latin1') + b'\0'
self.write_packet(data)
pkt = await self._read_packet()
pkt.check_error()
return pkt
if pkt.is_auth_switch_request():
self.salt = pkt.read_all()
if not self.server_public_key and self._password:
# Request server public key
logger.debug("sha256: Requesting server public key")
self.write_packet(b'\1')
pkt = await self._read_packet()
pkt.check_error()
if pkt.is_extra_auth_data():
# 4 - need full auth
pkt.advance(1)
n = pkt.read_uint8()
if n == 3:
logger.debug("caching sha2: succeeded by fast path.")
pkt = await self._read_packet()
pkt.check_error() # pkt must be OK packet
return pkt
if n != 4:
raise OperationalError("caching sha2: Unknown "
"result for fast auth: {0}".format(n))
logger.debug("caching sha2: Trying full auth...")
if self._ssl_context:
logger.debug("caching sha2: Sending plain "
"password via secure connection")
self.write_packet(self._password.encode('latin1') + b'\0')
pkt = await self._read_packet()
pkt.check_error()
return pkt
if not self.server_public_key:
self.write_packet(b'\x02')
pkt = await self._read_packet() # Request public key
pkt.check_error()
if not pkt.is_extra_auth_data():
raise OperationalError(