Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _send(self, message):
data = message.SerializeToString()
_LOGGER.info('<<(DECRYPTED): %s', message)
if self.chacha:
data = self.chacha.encrypt(data)
log_binary(_LOGGER, '<<(ENCRYPTED)', Message=message)
length = variant.write_variant(len(data))
self.transport.write(length + data)
def step3(self):
"""Last authentication step."""
self._check_initialized()
# TODO: verify: self.client_session_key same as self.session.key_b64()?
session_key = binascii.unhexlify(self.client_session_key)
aes_key = hash_sha512('Pair-Setup-AES-Key', session_key)[0:16]
tmp = bytearray(hash_sha512('Pair-Setup-AES-IV', session_key)[0:16])
tmp[-1] = tmp[-1] + 1 # Last byte must be increased by 1
aes_iv = bytes(tmp)
log_binary(_LOGGER, 'Pair-Setup-AES', Key=aes_key, IV=aes_iv)
epk, tag = aes_encrypt(modes.GCM, aes_key, aes_iv, self._auth_public)
log_binary(_LOGGER, 'Pair-Setup EPK+Tag', EPK=epk, Tag=tag)
return epk, tag
def step4(self, encrypted_data):
"""Last pairing step."""
chacha = chacha20.Chacha20Cipher(self._session_key, self._session_key)
decrypted_tlv_bytes = chacha.decrypt(
encrypted_data, nounce='PS-Msg06'.encode())
if not decrypted_tlv_bytes:
raise Exception('data decrypt failed') # TODO: new exception
decrypted_tlv = tlv8.read_tlv(decrypted_tlv_bytes)
_LOGGER.debug('PS-Msg06: %s', decrypted_tlv)
atv_identifier = decrypted_tlv[tlv8.TLV_IDENTIFIER]
atv_signature = decrypted_tlv[tlv8.TLV_SIGNATURE]
atv_pub_key = decrypted_tlv[tlv8.TLV_PUBLIC_KEY]
log_binary(_LOGGER,
'Device',
Identifier=atv_identifier,
Signature=atv_signature,
Public=atv_pub_key)
# TODO: verify signature here
return Credentials(atv_pub_key, self._signing_key.to_seed(),
atv_identifier, self.pairing_id)
self._session_key = hkdf_expand(
'Pair-Setup-Encrypt-Salt',
'Pair-Setup-Encrypt-Info',
binascii.unhexlify(self._client_session_key))
device_info = ios_device_x + self.pairing_id + self._auth_public
device_signature = self._signing_key.sign(device_info)
tlv = tlv8.write_tlv({tlv8.TLV_IDENTIFIER: self.pairing_id,
tlv8.TLV_PUBLIC_KEY: self._auth_public,
tlv8.TLV_SIGNATURE: device_signature})
chacha = chacha20.Chacha20Cipher(self._session_key, self._session_key)
encrypted_data = chacha.encrypt(tlv, nounce='PS-Msg05'.encode())
log_binary(_LOGGER, 'Data', Encrypted=encrypted_data)
return encrypted_data
def _send_raw(self, raw):
parsed = protobuf.ProtocolMessage()
parsed.ParseFromString(raw)
log_binary(_LOGGER, 'ATV->APP', Raw=raw)
_LOGGER.info('ATV->APP Parsed: %s', parsed)
if self.chacha:
raw = self.chacha.encrypt(raw)
log_binary(_LOGGER, 'ATV->APP', Encrypted=raw)
length = variant.write_variant(len(raw))
try:
self.transport.write(length + raw)
except Exception: # pylint: disable=broad-except
_LOGGER.exception('Failed to send to app')
def send(self, message):
"""Send message to device."""
serialized = message.SerializeToString()
log_binary(_LOGGER, '>> Send', Data=serialized)
if self._chacha:
serialized = self._chacha.encrypt(serialized)
log_binary(_LOGGER, '>> Send', Encrypted=serialized)
data = write_variant(len(serialized)) + serialized
self._transport.write(data)
_LOGGER.debug('>> Send: Protobuf=%s', message)
async def verify_credentials(self):
"""Verify credentials with device."""
_, public_key = self.srp.initialize()
msg = messages.crypto_pairing({
tlv8.TLV_SEQ_NO: b'\x01',
tlv8.TLV_PUBLIC_KEY: public_key})
resp = await self.protocol.send_and_receive(
msg, generate_identifier=False)
resp = _get_pairing_data(resp)
session_pub_key = resp[tlv8.TLV_PUBLIC_KEY]
encrypted = resp[tlv8.TLV_ENCRYPTED_DATA]
log_binary(_LOGGER,
'Device',
Public=self.credentials.ltpk,
Encrypted=encrypted)
encrypted_data = self.srp.verify1(
self.credentials, session_pub_key, encrypted)
msg = messages.crypto_pairing({
tlv8.TLV_SEQ_NO: b'\x03',
tlv8.TLV_ENCRYPTED_DATA: encrypted_data})
await self.protocol.send_and_receive(
msg, generate_identifier=False)
# TODO: check status code
self._output_key, self._input_key = self.srp.verify2()
def initialize(self, seed=None):
"""Initialize handler operation.
This method will generate new encryption keys and must be called prior
to doing authentication or verification.
"""
self.seed = seed or os.urandom(32) # Generate new seed if not provided
signing_key = SigningKey(self.seed)
verifying_key = signing_key.get_verifying_key()
self._auth_private = signing_key.to_seed()
self._auth_public = verifying_key.to_bytes()
log_binary(_LOGGER,
'Authentication keys',
Private=self._auth_private,
Public=self._auth_public)
msg = messages.crypto_pairing({
tlv8.TLV_SEQ_NO: b'\x02',
tlv8.TLV_PUBLIC_KEY: server_pub_key,
tlv8.TLV_ENCRYPTED_DATA: encrypted
})
self.output_key = hkdf_expand('MediaRemote-Salt',
'MediaRemote-Write-Encryption-Key',
self._shared)
self.input_key = hkdf_expand('MediaRemote-Salt',
'MediaRemote-Read-Encryption-Key',
self._shared)
log_binary(_LOGGER,
'Keys',
Output=self.output_key,
Input=self.input_key)
else:
msg = messages.crypto_pairing({
tlv8.TLV_SALT: binascii.unhexlify(self.salt),
tlv8.TLV_PUBLIC_KEY: binascii.unhexlify(self._session.public),
tlv8.TLV_SEQ_NO: b'\x02'
})
self._send(msg)