Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_frame_handler_zdo_annce(app, aps, ieee):
aps.destinationEndpoint = 0
app.handle_join = mock.MagicMock()
nwk = t.uint16_t(0xAA55)
data = b"\x18" + nwk.serialize() + ieee.serialize()
_frame_handler(app, aps, ieee, 0, cluster=0x0013, data=data)
assert app.handle_message.call_count == 1
assert app.handle_message.call_args[0][5] is data
assert app.handle_join.call_count == 1
assert app.handle_join.call_args[0][0] == nwk
assert app.handle_join.call_args[0][1] == ieee
return 1
network = networks[0]
pan_id = network.panId
extended_pan_id = network.extendedPanId
channel = network.channel
click.echo(
"Found network %s %s on channel %s" % (pan_id, extended_pan_id, channel)
)
if pan_id is None:
pan_id = t.uint16_t(0)
else:
pan_id = t.uint16_t(pan_id)
if isinstance(extended_pan_id, str):
extended_pan_id = util.parse_epan(extended_pan_id)
if extended_pan_id is None:
extended_pan_id = t.fixed_list(8, t.uint8_t)([t.uint8_t(0)] * 8)
v = await util.network_init(s)
if v[0] == t.EmberStatus.SUCCESS:
LOGGER.debug("Network was up, leaving...")
v = await s.leaveNetwork()
util.check(v[0], "Failure leaving network: %s" % (v[0],))
await asyncio.sleep(1) # TODO
initial_security_state = zutil.zha_security()
v = await s.setInitialSecurityState(initial_security_state)
util.check(v[0], "Setting security state failed: %s" % (v[0],))
0x001d: ('Extended_Simple_Desc_req', (NWKI, ('EndPoint', t.uint8_t), ('StartIndex', t.uint8_t))),
0x001e: ('Extended_Active_EP_req', (NWKI, ('StartIndex', t.uint8_t))),
# Bind Management Server Services Responses
0x0020: ('End_Device_Bind_req', (('BindingTarget', t.uint16_t), ('SrcAddress', t.EmberEUI64), ('SrcEndpoint', t.uint8_t), ('ProfileID', t.uint8_t), ('InClusterList', t.LVList(t.uint8_t)), ('OutClusterList', t.LVList(t.uint8_t)))),
0x0021: ('Bind_req', (('SrcAddress', t.EmberEUI64), ('SrcEndpoint', t.uint8_t), ('ClusterID', t.uint16_t), ('DstAddress', MultiAddress))),
0x0022: ('Unind_req', (('SrcAddress', t.EmberEUI64), ('SrcEndpoint', t.uint8_t), ('ClusterID', t.uint16_t), ('DstAddress', MultiAddress))),
# Network Management Server Services Requests
# ... TODO optional stuff ...
0x0034: ('Mgmt_Leave_req', (('DeviceAddress', t.EmberEUI64), ('Options', t.uint8_t))), # bitmap8
0x0036: ('Mgmt_Permit_Joining_req', (('PermitDuration', t.uint8_t), ('TC_Significant', t.Bool))),
# ... TODO optional stuff ...
# Responses
# Device and Service Discovery Server Responses
0x8000: ('NWK_addr_rsp', (STATUS, IEEE, NWK, ('NumAssocDev', t.uint8_t), ('StartIndex', t.uint8_t), ('NWKAddressAssocDevList', t.List(t.uint16_t)))),
0x8001: ('IEEE_addr_rsp', (STATUS, IEEE, NWK, ('NumAssocDev', t.uint8_t), ('StartIndex', t.uint8_t), ('NWKAddrAssocDevList', t.List(t.uint16_t)))),
0x8002: ('Node_Desc_rsp', (STATUS, NWKI, ('NodeDescriptor', NodeDescriptor))),
0x8003: ('Power_Desc_rsp', (STATUS, NWKI, ('PowerDescriptor', PowerDescriptor))),
0x8004: ('Simple_Desc_rsp', (STATUS, NWKI, ('SimpleDescriptor', SizePrefixedSimpleDescriptor))),
0x8005: ('Active_EP_rsp', (STATUS, NWKI, ('ActiveEPList', t.LVList(t.uint8_t)))),
0x8006: ('Match_Desc_rsp', (STATUS, NWKI, ('MatchList', t.LVList(t.uint8_t)))),
# 0x8010: ('Complex_Desc_rsp', (STATUS, NWKI, ('Length', t.uint8_t), ('ComplexDescriptor', ComplexDescriptor))),
0x8011: ('User_Desc_rsp', (STATUS, NWKI, ('Length', t.uint8_t), ('UserDescriptor', t.fixed_list(16, t.uint8_t)))),
0x8012: ('Discovery_Cache_rsp', (STATUS, )),
0x8014: ('User_Desc_conf', (STATUS, NWKI)),
0x8015: ('System_Server_Discovery_rsp', (STATUS, ('ServerMask', t.uint16_t))),
0x8016: ('Discovery_Store_rsp', (STATUS, )),
0x8017: ('Node_Desc_store_rsp', (STATUS, )),
0x8018: ('Power_Desc_store_rsp', (STATUS, IEEE, ('PowerDescriptor', PowerDescriptor))),
0x8019: ('Active_EP_store_rsp', (STATUS, )),
0x801a: ('Simple_Desc_store_rsp', (STATUS, )),
0x801b: ('Remove_node_cache_rsp', (STATUS, )),
0x41: ('Octet string', t.LVBytes, Discrete),
0x42: ('Character string', t.LVBytes, Discrete),
# 0x43: ('Long octet string', ),
# 0x44: ('Long character string', ),
# 0x48: ('Array', ),
# 0x4c: ('Structure', ),
# 0x50: ('Set', ),
# 0x51: ('Bag', ),
0xe0: ('Time of day', t.uint32_t, Analog),
0xe1: ('Date', t.uint32_t, Analog),
0xe2: ('UTCTime', t.uint32_t, Analog),
0xe8: ('Cluster ID', t.uint16_t, Discrete),
0xe9: ('Attribute ID', t.uint16_t, Discrete),
0xea: ('BACNet OID', t.uint32_t, Discrete),
0xf0: ('IEEE address', t.EmberEUI64, Discrete),
0xf1: ('128-bit security key', t.fixed_list(16, t.uint16_t), Discrete),
0xff: ('Unknown', None, None),
}
DATA_TYPE_IDX = {
t: tidx
for tidx, (tname, t, ad) in DATA_TYPES.items()
if ad is Analog
}
DATA_TYPE_IDX[t.uint32_t] = 0x23
DATA_TYPE_IDX[t.EmberEUI64] = 0xf0
DATA_TYPE_IDX[t.Bool] = 0x10
class TypeValue():
def serialize(self):
return self.type.to_bytes(1, 'little') + self.value.serialize()
import bellows.types as t
COMMANDS = {
# 4. Configuration frames
"version": (0x00, (t.uint8_t,), (t.uint8_t, t.uint8_t, t.uint16_t)),
"getConfigurationValue": (0x52, (t.EzspConfigId,), (t.EzspStatus, t.uint16_t)),
"setConfigurationValue": (0x53, (t.EzspConfigId, t.uint16_t), (t.EzspStatus,)),
"addEndpoint": (
0x02,
(
t.uint8_t,
t.uint16_t,
t.uint16_t,
t.uint8_t,
t.uint8_t,
t.uint8_t,
t.List(t.uint16_t),
t.List(t.uint16_t),
),
(t.EzspStatus,),
),
def serialize(self):
r = t.uint16_t(self.attrid).serialize()
r += t.uint8_t(self.status).serialize()
if self.status == 0:
r += self.value.serialize()
return r
('status', t.uint8_t),
('direction', t.uint8_t),
('attrid', t.uint16_t),
]
class ReadReportingConfigRecord(t.EzspStruct):
_fields = [
('direction', t.uint8_t),
('attrid', t.uint16_t),
]
class DiscoverAttributesResponseRecord(t.EzspStruct):
_fields = [
('attrid', t.uint16_t),
('datatype', t.uint8_t),
]
COMMANDS = {
# id: (name, params, is_response)
0x00: ('Read attributes', (t.List(t.uint16_t), ), False),
0x01: ('Read attributes response', (t.List(ReadAttributeRecord), ), True),
0x02: ('Write attributes', (t.List(Attribute), ), False),
0x03: ('Write attributes undivided', (t.List(Attribute), ), False),
0x04: ('Write attributes response', (t.List(WriteAttributesStatusRecord), ), True),
0x05: ('Write attributes no response', (t.List(Attribute), ), False),
0x06: ('Configure reporting', (t.List(AttributeReportingConfig), ), False),
0x07: ('Configure reporting response', (t.List(ConfigureReportingResponseRecord), ), True),
0x08: ('Read reporting configuration', (t.List(ReadReportingConfigRecord), ), False),
0x09: ('Read reporting configuration response', (t.List(AttributeReportingConfig), ), True),
of 7 or greater is treated as infinite
:returns: return a tuple of a status and an error_message. Original requestor
has more context to provide a more meaningful error message
"""
if not self.is_controller_running:
raise ControllerError("ApplicationController is not running")
aps_frame = t.EmberApsFrame()
aps_frame.profileId = t.uint16_t(profile)
aps_frame.clusterId = t.uint16_t(cluster)
aps_frame.sourceEndpoint = t.uint8_t(src_ep)
aps_frame.destinationEndpoint = t.uint8_t(src_ep)
aps_frame.options = t.EmberApsOption(
t.EmberApsOption.APS_OPTION_ENABLE_ROUTE_DISCOVERY
)
aps_frame.groupId = t.uint16_t(group_id)
aps_frame.sequence = t.uint8_t(sequence)
message_tag = self.get_sequence()
with self._pending.new(message_tag) as req:
async with self._in_flight_msg:
async with self._req_lock:
res = await self._ezsp.sendMulticast(
aps_frame, hops, non_member_radius, message_tag, data
)
if res[0] != t.EmberStatus.SUCCESS:
return res[0], "EZSP sendMulticast failure: %s" % (res[0],)
res = await asyncio.wait_for(req.result, APS_ACK_TIMEOUT)
return res
def read_attributes_raw(self, attributes):
schema = foundation.COMMANDS[0x00][1]
attributes = [t.uint16_t(a) for a in attributes]
v = yield from self.request(True, 0x00, schema, attributes)
return v