Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setUp(self):
self.config = conf.AppleTV(ADDRESS_1, NAME)
self.service_mock = MagicMock()
self.service_mock.protocol = const.Protocol.DMAP
self.service_mock.port = PORT_1
self.service_mock.identifier = IDENTIFIER_1
self.service_mock.credentials = None
self.service_mock2 = MagicMock()
self.service_mock2.protocol = const.Protocol.MRP
self.service_mock2.port = PORT_2
self.service_mock2.identifier = IDENTIFIER_2
self.airplay_mock = MagicMock()
self.airplay_mock.port = PORT_1
self.airplay_mock.protocol = Protocol.AirPlay
async def test_scan_mrp(self):
zeroconf_stub.stub(
pyatv, MRP_SERVICE_1, MRP_SERVICE_2, DEVICE_SERVICE_1)
atvs = await pyatv.scan(
self.loop, timeout=0, protocol=const.Protocol.MRP)
self.assertEqual(len(atvs), 2)
dev1 = _get_atv(atvs, IP_4)
self.assertIsNotNone(dev1)
self.assertEqual(dev1.name, 'Apple TV 4')
self.assertIsNotNone(dev1.get_service(const.Protocol.MRP))
dev2 = _get_atv(atvs, IP_5)
self.assertIsNotNone(dev2)
self.assertEqual(dev2.name, 'Apple TV 5')
self.assertIsNotNone(dev2.get_service(const.Protocol.MRP))
zeroconf_stub.stub(
pyatv, MRP_SERVICE_1, MRP_SERVICE_2, DEVICE_SERVICE_1)
atvs = await pyatv.scan(
self.loop, timeout=0, protocol=const.Protocol.MRP)
self.assertEqual(len(atvs), 2)
dev1 = _get_atv(atvs, IP_4)
self.assertIsNotNone(dev1)
self.assertEqual(dev1.name, 'Apple TV 4')
self.assertIsNotNone(dev1.get_service(const.Protocol.MRP))
dev2 = _get_atv(atvs, IP_5)
self.assertIsNotNone(dev2)
self.assertEqual(dev2.name, 'Apple TV 5')
self.assertIsNotNone(dev2.get_service(const.Protocol.MRP))
async def pair(self):
"""Pair pyatv as a remote control with an Apple TV."""
if self.args.protocol is None:
logging.error('No protocol specified')
return 1
apple_tv = await _scan_for_device(
self.args, self.args.scan_timeout, self.loop)
if not apple_tv:
return 2
options = {}
# Inject user provided credentials
apple_tv.set_credentials(
const.Protocol.AirPlay, self.args.airplay_credentials)
apple_tv.set_credentials(
const.Protocol.DMAP, self.args.dmap_credentials)
apple_tv.set_credentials(
const.Protocol.MRP, self.args.mrp_credentials)
# Protocol specific options
if self.args.protocol == const.Protocol.DMAP:
options.update({
'pairing_guid': self.args.pairing_guid,
'remote_name': self.args.remote_name,
})
pairing = await pair(
apple_tv, self.args.protocol, self.loop, **options)
try:
if self.args.protocol is None:
logging.error('No protocol specified')
return 1
apple_tv = await _scan_for_device(
self.args, self.args.scan_timeout, self.loop)
if not apple_tv:
return 2
options = {}
# Inject user provided credentials
apple_tv.set_credentials(
const.Protocol.AirPlay, self.args.airplay_credentials)
apple_tv.set_credentials(
const.Protocol.DMAP, self.args.dmap_credentials)
apple_tv.set_credentials(
const.Protocol.MRP, self.args.mrp_credentials)
# Protocol specific options
if self.args.protocol == const.Protocol.DMAP:
options.update({
'pairing_guid': self.args.pairing_guid,
'remote_name': self.args.remote_name,
})
pairing = await pair(
apple_tv, self.args.protocol, self.loop, **options)
try:
await self._perform_pairing(pairing)
except Exception: # pylint: disable=broad-except # noqa
def __call__(self, parser, namespace, values, option_string=None):
"""Match protocol string and save correct version."""
if values == "mrp":
setattr(namespace, self.dest, const.Protocol.MRP)
elif values == "dmap":
setattr(namespace, self.dest, const.Protocol.DMAP)
elif values == 'airplay':
setattr(namespace, self.dest, const.Protocol.AirPlay)
else:
raise argparse.ArgumentTypeError(
'Valid protocols are: mrp, dmap, airplay')
def media_type(self):
"""What type of media is currently playing, e.g. video, music."""
state = dmap.first(self.playstatus, 'cmst', 'caps')
if not state:
return const.MEDIA_TYPE_UNKNOWN
mediakind = dmap.first(self.playstatus, 'cmst', 'cmmk')
if mediakind is not None:
return convert.media_kind(mediakind)
# Fallback: if artist or album exists we assume music (not present
# for video)
if self.artist or self.album:
return const.MEDIA_TYPE_MUSIC
return const.MEDIA_TYPE_VIDEO
async def async_step_import(self, info):
"""Import device from configuration file."""
await self.async_set_unique_id(info.get(CONF_IDENTIFIER))
_LOGGER.debug("Importing device with identifier %s", self.unique_id)
creds = {
CREDENTIAL_MAPPING[prot]: creds
for prot, creds in info.get(CONF_CREDENTIALS).items()
}
return await self._async_get_entry(
const.Protocol[info.get(CONF_PROTOCOL)],
info.get(CONF_NAME),
creds,
info.get(CONF_ADDRESS),
is_import=True,
)
def state(self):
"""Return the state of the device."""
if not self._power.turned_on:
return STATE_OFF
if self._playing:
from pyatv import const
state = self._playing.play_state
if state in (const.PLAY_STATE_IDLE, const.PLAY_STATE_NO_MEDIA,
const.PLAY_STATE_LOADING):
return STATE_IDLE
if state == const.PLAY_STATE_PLAYING:
return STATE_PLAYING
if state in (const.PLAY_STATE_PAUSED,
const.PLAY_STATE_FAST_FORWARD,
const.PLAY_STATE_FAST_BACKWARD):
# Catch fast forward/backward here so "play" is default action
return STATE_PAUSED
return STATE_STANDBY # Bad or unknown state?
def state(self):
"""Return the state of the device."""
if not self._power.turned_on:
return STATE_OFF
if self._playing:
state = self._playing.play_state
if state in (
atv_const.PLAY_STATE_IDLE,
atv_const.PLAY_STATE_NO_MEDIA,
atv_const.PLAY_STATE_LOADING,
):
return STATE_IDLE
if state == atv_const.PLAY_STATE_PLAYING:
return STATE_PLAYING
if state in (
atv_const.PLAY_STATE_PAUSED,
atv_const.PLAY_STATE_FAST_FORWARD,
atv_const.PLAY_STATE_FAST_BACKWARD,
atv_const.PLAY_STATE_STOPPED,
):
# Catch fast forward/backward here so "play" is default action
return STATE_PAUSED
return STATE_STANDBY # Bad or unknown state?