Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setUp(self):
self.config = conf.AppleTV(ADDRESS_1, NAME)
self.service_mock = MagicMock()
self.service_mock.protocol = const.Protocol.DMAP
self.service_mock.port = PORT_1
self.service_mock.identifier = IDENTIFIER_1
self.service_mock.credentials = None
self.service_mock2 = MagicMock()
self.service_mock2.protocol = const.Protocol.MRP
self.service_mock2.port = PORT_2
self.service_mock2.identifier = IDENTIFIER_2
self.airplay_mock = MagicMock()
self.airplay_mock.port = PORT_1
self.airplay_mock.protocol = Protocol.AirPlay
def test_protocol_str(self):
self.assertEqual('MRP',
convert.protocol_str(Protocol.MRP))
self.assertEqual('DMAP',
convert.protocol_str(Protocol.DMAP))
self.assertEqual('AirPlay',
convert.protocol_str(Protocol.AirPlay))
def test_add_airplay_service(self):
self.config.add_service(self.airplay_mock)
airplay = self.config.get_service(Protocol.AirPlay)
self.assertEqual(airplay.protocol, Protocol.AirPlay)
self.assertEqual(airplay.port, PORT_1)
async def test_play_url_not_authenticated_error(self):
self.conf.get_service(Protocol.AirPlay).credentials = None
self.usecase.airplay_require_authentication()
with self.assertRaises(exceptions.AuthenticationError):
await self.atv.airplay.play_url(
AirPlay_STREAM, port=self.server.port)
async def test_connect_invalid_protocol(self):
conf = AppleTV('1.2.3.4', 'Apple TV')
conf.add_service(AirPlayService('airplay_id'))
with self.assertRaises(exceptions.UnsupportedProtocolError):
await pyatv.connect(
conf, self.loop, protocol=Protocol.AirPlay)
async def test_scan_no_home_sharing(self):
zeroconf_stub.stub(pyatv, DEVICE_SERVICE_1)
atvs = await pyatv.scan(self.loop, timeout=0)
self.assertEqual(len(atvs), 1)
self.assertEqual(atvs[0].name, 'Apple TV 3')
self.assertEqual(atvs[0].address, ipaddress.ip_address(IP_3))
atv = atvs[0]
self.assertEqual(
atv.get_service(Protocol.DMAP).port, 3689)
self.assertEqual(
atv.get_service(Protocol.AirPlay).port, 7000)
async def do_pairing(self):
self.usecase.airplay_require_authentication()
self.pairing = await pyatv.pair(
self.conf, Protocol.AirPlay, self.loop)
self.assertTrue(self.pairing.device_provides_pin)
await self.pairing.begin()
self.pairing.pin(DEVICE_PIN)
self.assertFalse(self.pairing.has_paired)
await self.pairing.finish()
self.assertTrue(self.pairing.has_paired)
self.assertEqual(self.service.credentials, DEVICE_CREDENTIALS)
def __call__(self, parser, namespace, values, option_string=None):
"""Match protocol string and save correct version."""
if values == "mrp":
setattr(namespace, self.dest, const.Protocol.MRP)
elif values == "dmap":
setattr(namespace, self.dest, const.Protocol.DMAP)
elif values == 'airplay':
setattr(namespace, self.dest, const.Protocol.AirPlay)
else:
raise argparse.ArgumentTypeError(
'Valid protocols are: mrp, dmap, airplay')
_LOGGER = logging.getLogger(__name__)
INPUT_PIN_SCHEMA = vol.Schema({vol.Required(CONF_PIN, default=None): int})
DEFAULT_START_OFF = False
PROTOCOL_PRIORITY = [const.Protocol.MRP, const.Protocol.DMAP, const.Protocol.AirPlay]
SCAN_TIMEOUT = 5
UNICAST_SCAN_TIMEOUT = 30
# Mapping between config entry format and pyatv
CREDENTIAL_MAPPING = {
CONF_CREDENTIALS_MRP: const.Protocol.MRP.value,
CONF_CREDENTIALS_DMAP: const.Protocol.DMAP.value,
CONF_CREDENTIALS_AIRPLAY: const.Protocol.AirPlay.value,
}
async def device_scan(identifier, loop, cache=None):
"""Scan for a specific device using identifier as filter."""
def _filter_device(dev):
if identifier is None:
return True
if identifier == str(dev.address):
return True
if identifier == dev.name:
return True
return any([service.identifier == identifier for service in dev.services])
def _host_filter():