Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_listenable():
listen = Listenable()
listener = mock.MagicMock()
listen.add_listener(listener)
listen.add_listener(listener)
broken_listener = mock.MagicMock()
broken_listener.event.side_effect = Exception()
listen.add_listener(broken_listener)
listen.listener_event('event')
assert listener.event.call_count == 2
assert broken_listener.event.call_count == 1
class Logger(util.LocalLogMixin):
log = mock.MagicMock()
def test_log():
log = Logger()
log.debug("Test debug")
log.info("Test info")
log.warn("Test warn")
log.error("Test error")
def test_zha_security_end_device():
util.zha_security(controller=False)
def test_zha_security_controller():
def _test_retry(exception, retry_exceptions, n):
counter = 0
@asyncio.coroutine
def count():
nonlocal counter
counter += 1
if counter <= n:
exc = exception()
exc._counter = counter
raise exc
loop = asyncio.get_event_loop()
loop.run_until_complete(util.retry(count, retry_exceptions))
return counter
import asyncio
from unittest import mock
import pytest
from bellows.zigbee import util
class Listenable(util.ListenableMixin):
def __init__(self):
self._listeners = {}
def test_listenable():
listen = Listenable()
listener = mock.MagicMock()
listen.add_listener(listener)
listen.add_listener(listener)
broken_listener = mock.MagicMock()
broken_listener.event.side_effect = Exception()
listen.add_listener(broken_listener)
listen.listener_event('event')
assert listener.event.call_count == 2
async def form_network(self):
nwk = self.config[zigpy.config.CONF_NWK]
pan_id = nwk[zigpy.config.CONF_NWK_PAN_ID]
if pan_id is None:
pan_id = int.from_bytes(os.urandom(2), byteorder="little")
extended_pan_id = nwk[zigpy.config.CONF_NWK_EXTENDED_PAN_ID]
if extended_pan_id is None:
extended_pan_id = t.EmberEUI64([t.uint8_t(0)] * 8)
initial_security_state = bellows.zigbee.util.zha_security(nwk, controller=True)
v = await self._ezsp.setInitialSecurityState(initial_security_state)
assert v[0] == t.EmberStatus.SUCCESS # TODO: Better check
parameters = t.EmberNetworkParameters()
parameters.panId = t.EmberPanId(pan_id)
parameters.extendedPanId = extended_pan_id
parameters.radioTxPower = t.uint8_t(8)
parameters.radioChannel = t.uint8_t(nwk[zigpy.config.CONF_NWK_CHANNEL])
parameters.joinMethod = t.EmberJoinMethod.USE_MAC_ASSOCIATION
parameters.nwkManagerId = t.EmberNodeId(0)
parameters.nwkUpdateId = t.uint8_t(nwk[zigpy.config.CONF_NWK_UPDATE_ID])
parameters.channels = nwk[zigpy.config.CONF_NWK_CHANNELS]
await self._ezsp.formNetwork(parameters)
await self._ezsp.setValue(t.EzspValueId.VALUE_STACK_TOKEN_WRITING, 1)
is_reply = bool(cluster_id & 0x8000)
try:
cluster_details = types.CLUSTERS[cluster_id]
except KeyError:
LOGGER.warning("Unknown ZDO cluster 0x%02x", cluster_id)
return tsn, cluster_id, is_reply, data
args, data = t.deserialize(data, cluster_details[2])
if data != b'':
# TODO: Seems sane to check, but what should we do?
LOGGER.warning("Data remains after deserializing ZDO frame")
return tsn, cluster_id, is_reply, args
class ZDO(util.LocalLogMixin, util.ListenableMixin):
"""The ZDO endpoint of a device"""
def __init__(self, device):
self._device = device
self._listeners = {}
def _serialize(self, command, *args):
aps = self._device.get_aps(profile=0, cluster=command, endpoint=0)
data = aps.sequence.to_bytes(1, 'little')
schema = types.CLUSTERS[command][2]
data += t.serialize(args, schema)
return aps, data
@util.retryable_request
def request(self, command, *args):
aps, data = self._serialize(command, *args)
return self._device.request(aps, data)
cls._attridx = {}
for attrid, (attrname, datatype) in cls.attributes.items():
cls._attridx[attrname] = attrid
if hasattr(cls, 'server_commands'):
cls._server_command_idx = {}
for command_id, details in cls.server_commands.items():
command_name, schema, is_reply = details
cls._server_command_idx[command_name] = command_id
if hasattr(cls, 'client_commands'):
cls._client_command_idx = {}
for command_id, details in cls.client_commands.items():
command_name, schema, is_reply = details
cls._client_command_idx[command_name] = command_id
class Cluster(util.ListenableMixin, util.LocalLogMixin, metaclass=Registry):
"""A cluster on an endpoint"""
_registry = {}
_registry_range = {}
_server_command_idx = {}
_client_command_idx = {}
def __init__(self, endpoint):
self._endpoint = endpoint
self._attr_cache = {}
self._listeners = {}
@classmethod
def from_id(cls, endpoint, cluster_id):
if cluster_id in cls._registry:
return cls._registry[cluster_id](endpoint)
else:
@util.retryable_request
def request(self, command, *args):
aps, data = self._serialize(command, *args)
return self._device.request(aps, data)
import bellows.zigbee.profiles
import bellows.zigbee.util as zutil
import bellows.zigbee.zcl
LOGGER = logging.getLogger(__name__)
class Status(enum.IntEnum):
"""The status of an Endpoint"""
# No initialization is done
NEW = 0
# Endpoint information (device type, clusters, etc) init done
ZDO_INIT = 1
class Endpoint(zutil.LocalLogMixin, zutil.ListenableMixin):
"""An endpoint on a device on the network"""
def __init__(self, device, endpoint_id):
self._device = device
self._endpoint_id = endpoint_id
self.in_clusters = {}
self.out_clusters = {}
self._cluster_attr = {}
self.status = Status.NEW
self._listeners = {}
@asyncio.coroutine
def initialize(self):
if self.status == Status.ZDO_INIT:
return
self.info("Discovering endpoint information")
import bellows.zigbee.util
import bellows.zigbee.zdo
LOGGER = logging.getLogger(__name__)
class Status(enum.IntEnum):
"""The status of a Device"""
# No initialization done
NEW = 0
# ZDO endpoint discovery done
ZDO_INIT = 1
class Device(bellows.zigbee.util.LocalLogMixin):
"""A device on the network"""
def __init__(self, application, ieee, nwk):
self._application = application
self._ieee = ieee
self._nwk = nwk
self.zdo = bellows.zigbee.zdo.ZDO(self)
self.endpoints = {0: self.zdo}
self.lqi = None
self.rssi = None
self.status = Status.NEW
@asyncio.coroutine
def initialize(self):
if self.status == Status.NEW:
self.info("Discovering endpoints")