Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_handle_message(ep):
c = ep.add_input_cluster(0)
c.handle_message = mock.MagicMock()
f = t.EmberApsFrame()
f.clusterId = 0
ep.handle_message(False, f, 0, 1, [])
c.handle_message.assert_called_once_with(False, f, 0, 1, [])
def test_handle_message_output(ep):
c = ep.add_output_cluster(0)
c.handle_message = mock.MagicMock()
f = t.EmberApsFrame()
f.clusterId = 0
ep.handle_message(False, f, 0, 1, [])
c.handle_message.assert_called_once_with(False, f, 0, 1, [])
def test_handle_announce(zdo_f):
dev = zdo_f._device
zdo_f.listener_event = mock.MagicMock()
dev._application.devices.pop(dev.ieee)
aps = t.EmberApsFrame()
zdo_f.handle_message(False, aps, 111, 0x0013, [0, dev.ieee, dev.nwk])
assert zdo_f.listener_event.call_count == 1
def aps():
return t.EmberApsFrame()
def _handle_match_desc(zdo_f, profile):
zdo_f.reply = mock.MagicMock()
aps = t.EmberApsFrame()
zdo_f.handle_message(False, aps, 123, 0x0006, [None, profile, [], []])
assert zdo_f.reply.call_count == 1
:param device: destination device
:param profile: Zigbee Profile ID to use for outgoing message
:param cluster: cluster id where the message is being sent
:param src_ep: source endpoint id
:param dst_ep: destination endpoint id
:param sequence: transaction sequence number of the message
:param data: Zigbee message payload
:param expect_reply: True if this is essentially a request
:param use_ieee: use EUI64 for destination addressing
:returns: return a tuple of a status and an error_message. Original requestor
has more context to provide a more meaningful error message
"""
if not self.is_controller_running:
raise ControllerError("ApplicationController is not running")
aps_frame = t.EmberApsFrame()
aps_frame.profileId = t.uint16_t(profile)
aps_frame.clusterId = t.uint16_t(cluster)
aps_frame.sourceEndpoint = t.uint8_t(src_ep)
aps_frame.destinationEndpoint = t.uint8_t(dst_ep)
aps_frame.options = self._tx_options
aps_frame.groupId = t.uint16_t(0)
aps_frame.sequence = t.uint8_t(sequence)
message_tag = self.get_sequence()
if use_ieee:
LOGGER.warning(
("EUI64 addressing is not currently supported, " "reverting to NWK")
)
with self._pending.new(message_tag) as req:
async with self._in_flight_msg:
delays = [0.5, 1.0, 1.5]
:param cluster: cluster id where the message is being sent
:param src_ep: source endpoint id
:param sequence: transaction sequence number of the message
:param data: Zigbee message payload
:param hops: the message will be delivered to all nodes within this number of
hops of the sender. A value of zero is converted to MAX_HOPS
:param non_member_radius: the number of hops that the message will be forwarded
by devices that are not members of the group. A value
of 7 or greater is treated as infinite
:returns: return a tuple of a status and an error_message. Original requestor
has more context to provide a more meaningful error message
"""
if not self.is_controller_running:
raise ControllerError("ApplicationController is not running")
aps_frame = t.EmberApsFrame()
aps_frame.profileId = t.uint16_t(profile)
aps_frame.clusterId = t.uint16_t(cluster)
aps_frame.sourceEndpoint = t.uint8_t(src_ep)
aps_frame.destinationEndpoint = t.uint8_t(src_ep)
aps_frame.options = t.EmberApsOption(
t.EmberApsOption.APS_OPTION_ENABLE_ROUTE_DISCOVERY
)
aps_frame.groupId = t.uint16_t(group_id)
aps_frame.sequence = t.uint8_t(sequence)
message_tag = self.get_sequence()
with self._pending.new(message_tag) as req:
async with self._in_flight_msg:
async with self._req_lock:
res = await self._ezsp.sendMulticast(
aps_frame, hops, non_member_radius, message_tag, data