Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def device_provides_pin(self):
"""Return True if remote device presents PIN code, else False."""
return self.service.protocol in [Protocol.MRP, Protocol.AirPlay]
async def test_scan_mrp(self):
zeroconf_stub.stub(
pyatv, MRP_SERVICE_1, MRP_SERVICE_2, DEVICE_SERVICE_1)
atvs = await pyatv.scan(
self.loop, timeout=0, protocol=const.Protocol.MRP)
self.assertEqual(len(atvs), 2)
dev1 = _get_atv(atvs, IP_4)
self.assertIsNotNone(dev1)
self.assertEqual(dev1.name, 'Apple TV 4')
self.assertIsNotNone(dev1.get_service(const.Protocol.MRP))
dev2 = _get_atv(atvs, IP_5)
self.assertIsNotNone(dev2)
self.assertEqual(dev2.name, 'Apple TV 5')
self.assertIsNotNone(dev2.get_service(const.Protocol.MRP))
def setUp(self):
self.config = conf.AppleTV(ADDRESS_1, NAME)
self.service_mock = MagicMock()
self.service_mock.protocol = const.Protocol.DMAP
self.service_mock.port = PORT_1
self.service_mock.identifier = IDENTIFIER_1
self.service_mock.credentials = None
self.service_mock2 = MagicMock()
self.service_mock2.protocol = const.Protocol.MRP
self.service_mock2.port = PORT_2
self.service_mock2.identifier = IDENTIFIER_2
self.airplay_mock = MagicMock()
self.airplay_mock.port = PORT_1
self.airplay_mock.protocol = Protocol.AirPlay
def __init__(self, identifier, port, credentials=None):
"""Initialize a new MrpService."""
super().__init__(identifier, Protocol.MRP, port)
self.credentials = credentials
def __init__(self, config, session, loop):
"""Initialize a new MrpPairingHandler."""
super().__init__(session, config.get_service(Protocol.MRP))
self.connection = MrpConnection(
config.address, self.service.port, loop)
self.srp = SRPAuthHandler()
self.protocol = MrpProtocol(
loop, self.connection, self.srp, self.service)
self.pairing_procedure = MrpPairingProcedure(
self.protocol, self.srp)
self.pin_code = None
def __init__(self, loop, session, config, airplay):
"""Initialize a new Apple TV."""
super().__init__()
self._session = session
self._mrp_service = config.get_service(Protocol.MRP)
self._connection = MrpConnection(
config.address, self._mrp_service.port, loop, atv=self)
self._srp = SRPAuthHandler()
self._protocol = MrpProtocol(
loop, self._connection, self._srp, self._mrp_service)
self._psm = PlayerStateManager(self._protocol, loop)
self._mrp_remote = MrpRemoteControl(loop, self._protocol)
self._mrp_metadata = MrpMetadata(
self._protocol, self._psm, config.identifier)
self._mrp_push_updater = MrpPushUpdater(
loop, self._mrp_metadata, self._psm)
self._airplay = airplay
def is_valid_credentials(credentials):
"""Verify that credentials are valid for establishing a connection."""
return (
credentials.get(const.Protocol.MRP.value) is not None
or credentials.get(const.Protocol.DMAP.value) is not None
)
async def pair_with_device(loop):
"""Make it possible to pair with device."""
atvs = await scan(loop, timeout=5, protocol=Protocol.MRP)
if not atvs:
print('No device found', file=sys.stderr)
return
pairing = await pair(atvs[0], Protocol.MRP, loop)
await pairing.begin()
pin = int(input("Enter PIN: "))
pairing.pin(pin)
await pairing.finish()
# Give some feedback about the process
if pairing.has_paired:
print('Paired with device!')
print('Credentials:', pairing.service.credentials)
else:
print('Did not pair with device!')
await pairing.close()
async def _autodiscover_device(args, loop):
apple_tv = await _scan_for_device(
args, args.scan_timeout, loop, protocol=args.protocol)
if not apple_tv:
return None
def _set_credentials(protocol, field):
service = apple_tv.get_service(protocol)
if service:
value = service.credentials or getattr(args, field)
setattr(args, field, value)
_set_credentials(Protocol.DMAP, 'dmap_credentials')
_set_credentials(Protocol.MRP, 'mrp_credentials')
_set_credentials(Protocol.AirPlay, 'airplay_credentials')
logging.info('Auto-discovered %s at %s', args.name, args.address)
return apple_tv
def __call__(self, parser, namespace, values, option_string=None):
"""Match protocol string and save correct version."""
if values == "mrp":
setattr(namespace, self.dest, const.Protocol.MRP)
elif values == "dmap":
setattr(namespace, self.dest, const.Protocol.DMAP)
elif values == 'airplay':
setattr(namespace, self.dest, const.Protocol.AirPlay)
else:
raise argparse.ArgumentTypeError(
'Valid protocols are: mrp, dmap, airplay')