Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_size_prefixed_simple_descriptor():
sd = types.SizePrefixedSimpleDescriptor()
sd.endpoint = t.uint8_t(1)
sd.profile = t.uint16_t(2)
sd.device_type = t.uint16_t(3)
sd.device_version = t.uint8_t(4)
sd.input_clusters = t.LVList(t.uint16_t)([t.uint16_t(5), t.uint16_t(6)])
sd.output_clusters = t.LVList(t.uint16_t)([t.uint16_t(7), t.uint16_t(8)])
ser = sd.serialize()
assert ser[0] == len(ser) - 1
sd2, data = types.SizePrefixedSimpleDescriptor.deserialize(ser)
assert sd.input_clusters == sd2.input_clusters
assert sd.output_clusters == sd2.output_clusters
def test_lvlist():
d, r = t.LVList(t.uint8_t).deserialize(b"\x0412345")
assert r == b"5"
assert d == list(map(ord, "1234"))
assert t.LVList(t.uint8_t).serialize(d) == b"\x041234"
def test_size_prefixed_simple_descriptor():
sd = types.SizePrefixedSimpleDescriptor()
sd.endpoint = t.uint8_t(1)
sd.profile = t.uint16_t(2)
sd.device_type = t.uint16_t(3)
sd.device_version = t.uint8_t(4)
sd.input_clusters = t.LVList(t.uint16_t)([t.uint16_t(5), t.uint16_t(6)])
sd.output_clusters = t.LVList(t.uint16_t)([t.uint16_t(7), t.uint16_t(8)])
ser = sd.serialize()
assert ser[0] == len(ser) - 1
sd2, data = types.SizePrefixedSimpleDescriptor.deserialize(ser)
assert sd.input_clusters == sd2.input_clusters
assert sd.output_clusters == sd2.output_clusters
]
class PowerProfile(t.EzspStruct):
_fields = [
('power_profile_id', t.uint8_t),
('energy_phase_id', t.uint8_t),
('power_profile_remote_control', t.Bool),
('power_profile_state', t.uint8_t),
]
server_commands = {
0x0000: ('power_profile_request', (t.uint8_t, ), False),
0x0001: ('power_profile_state_request', (), False),
0x0002: ('get_power_profile_price_response', (t.uint8_t, t.uint16_t, t.uint32_t, t.uint8_t), True),
0x0003: ('get_overall_schedule_price_response', (t.uint16_t, t.uint32_t, t.uint8_t), True),
0x0004: ('energy_phases_schedule_notification', (t.uint8_t, t.LVList(ScheduleRecord)), False),
0x0005: ('energy_phases_schedule_response', (t.uint8_t, t.LVList(ScheduleRecord)), True),
0x0006: ('power_profile_schedule_constraints_request', (t.uint8_t, ), False),
0x0007: ('energy_phases_schedule_state_request', (t.uint8_t, ), False),
0x0008: ('get_power_profile_price_extended_response', (t.uint8_t, t.uint16_t, t.uint32_t, t.uint8_t), True),
}
client_commands = {
0x0000: ('power_profile_notification', (t.uint8_t, t.uint8_t, t.LVList(PowerProfilePhase)), False),
0x0001: ('power_profile_response', (t.uint8_t, t.uint8_t, t.LVList(PowerProfilePhase)), True),
0x0002: ('power_profile_state_response', (t.LVList(PowerProfile), ), True),
0x0003: ('get_power_profile_price', (t.uint8_t, ), False),
0x0004: ('power_profile_state_notification', (t.LVList(PowerProfile), ), False),
0x0005: ('get_overall_schedule_price', (), False),
0x0006: ('energy_phases_schedule_request', (), False),
0x0007: ('energy_phases_schedule_state_response', (t.uint8_t, t.uint8_t), True),
0x0008: ('energy_phases_schedule_state_notification', (t.uint8_t, t.uint8_t), False),
0x0009: ('power_profile_schedule_constraints_notification', (t.uint8_t, t.uint16_t, t.uint16_t), False),
t.int8s,
t.EmberNodeId,
t.uint8_t,
t.uint8_t,
t.LVBytes,
),
),
"incomingRouteRecordHandler": (
0x59,
(),
(t.EmberNodeId, t.EmberEUI64, t.uint8_t, t.int8s, t.LVList(t.EmberNodeId)),
),
"changeSourceRouteHandler": (0xC4, (), (t.EmberNodeId, t.EmberNodeId, t.Bool)),
"setSourceRoute": (
0x5A,
(t.EmberNodeId, t.LVList(t.EmberNodeId)),
(t.EmberStatus,),
),
"incomingManyToOneRouteRequestHandler": (
0x7D,
(),
(t.EmberNodeId, t.EmberEUI64, t.uint8_t),
),
"incomingRouteErrorHandler": (0x80, (), (t.EmberStatus, t.EmberNodeId)),
"addressTableEntryIsActive": (0x5B, (t.uint8_t,), (t.Bool,)),
"setAddressTableRemoteEui64": (0x5C, (t.uint8_t, t.EmberEUI64), (t.EmberStatus,)),
"setAddressTableRemoteNodeId": (0x5D, (t.uint8_t, t.EmberNodeId), ()),
"getAddressTableRemoteEui64": (0x5E, (t.uint8_t,), (t.EmberEUI64,)),
"getAddressTableRemoteNodeId": (0x5F, (t.uint8_t,), (t.EmberNodeId,)),
"setExtendedTimeout": (0x7E, (t.EmberEUI64, t.Bool), ()),
"getExtendedTimeout": (0x7F, (t.EmberEUI64,), (t.Bool,)),
"replaceAddressTableEntry": (
0x0000: ('NWK_addr_req', (IEEE, ('RequestType', t.uint8_t), ('StartIndex', t.uint8_t))),
0x0001: ('IEEE_addr_req', (NWKI, ('RequestType', t.uint8_t), ('StartIndex', t.uint8_t))),
0x0002: ('Node_Desc_req', (NWKI, )),
0x0003: ('Power_Desc_req', (NWKI, )),
0x0004: ('Simple_Desc_req', (NWKI, ('EndPoint', t.uint8_t))),
0x0005: ('Active_EP_req', (NWKI, )),
0x0006: ('Match_Desc_req', (NWKI, ('ProfileID', t.uint16_t), ('InClusterList', t.LVList(t.uint16_t)), ('OutClusterList', t.LVList(t.uint16_t)))),
# 0x0010: ('Complex_Desc_req', (NWKI, )),
0x0011: ('User_Desc_req', (NWKI, )),
0x0012: ('Discovery_Cache_req', (NWK, IEEE)),
0x0013: ('Device_annce', (NWK, IEEE, ('Capability', t.uint8_t))),
0x0014: ('User_Desc_set', (NWKI, ('UserDescriptor', t.fixed_list(16, t.uint8_t)))), # Really a string
0x0015: ('System_Server_Discovery_req', (('ServerMask', t.uint16_t), )),
0x0016: ('Discovery_store_req', (NWK, IEEE, ('NodeDescSize', t.uint8_t), ('PowerDescSize', t.uint8_t), ('ActiveEPSize', t.uint8_t), ('SimpleDescSizeList', t.LVList(t.uint8_t)))),
0x0017: ('Node_Desc_store_req', (NWK, IEEE, ('NodeDescriptor', NodeDescriptor))),
0x0019: ('Active_EP_store_req', (NWK, IEEE, ('ActiveEPList', t.LVList(t.uint8_t)))),
0x001a: ('Simple_Desc_store_req', (NWK, IEEE, ('SimpleDescriptor', SizePrefixedSimpleDescriptor))),
0x001b: ('Remove_node_cache_req', (NWK, IEEE)),
0x001c: ('Find_node_cache_req', (NWK, IEEE)),
0x001d: ('Extended_Simple_Desc_req', (NWKI, ('EndPoint', t.uint8_t), ('StartIndex', t.uint8_t))),
0x001e: ('Extended_Active_EP_req', (NWKI, ('StartIndex', t.uint8_t))),
# Bind Management Server Services Responses
0x0020: ('End_Device_Bind_req', (('BindingTarget', t.uint16_t), ('SrcAddress', t.EmberEUI64), ('SrcEndpoint', t.uint8_t), ('ProfileID', t.uint8_t), ('InClusterList', t.LVList(t.uint8_t)), ('OutClusterList', t.LVList(t.uint8_t)))),
0x0021: ('Bind_req', (('SrcAddress', t.EmberEUI64), ('SrcEndpoint', t.uint8_t), ('ClusterID', t.uint16_t), ('DstAddress', MultiAddress))),
0x0022: ('Unind_req', (('SrcAddress', t.EmberEUI64), ('SrcEndpoint', t.uint8_t), ('ClusterID', t.uint16_t), ('DstAddress', MultiAddress))),
# Network Management Server Services Requests
# ... TODO optional stuff ...
0x0034: ('Mgmt_Leave_req', (('DeviceAddress', t.EmberEUI64), ('Options', t.uint8_t))), # bitmap8
0x0036: ('Mgmt_Permit_Joining_req', (('PermitDuration', t.uint8_t), ('TC_Significant', t.Bool))),
# ... TODO optional stuff ...
# Responses
0x0000: ('power_profile_request', (t.uint8_t, ), False),
0x0001: ('power_profile_state_request', (), False),
0x0002: ('get_power_profile_price_response', (t.uint8_t, t.uint16_t, t.uint32_t, t.uint8_t), True),
0x0003: ('get_overall_schedule_price_response', (t.uint16_t, t.uint32_t, t.uint8_t), True),
0x0004: ('energy_phases_schedule_notification', (t.uint8_t, t.LVList(ScheduleRecord)), False),
0x0005: ('energy_phases_schedule_response', (t.uint8_t, t.LVList(ScheduleRecord)), True),
0x0006: ('power_profile_schedule_constraints_request', (t.uint8_t, ), False),
0x0007: ('energy_phases_schedule_state_request', (t.uint8_t, ), False),
0x0008: ('get_power_profile_price_extended_response', (t.uint8_t, t.uint16_t, t.uint32_t, t.uint8_t), True),
}
client_commands = {
0x0000: ('power_profile_notification', (t.uint8_t, t.uint8_t, t.LVList(PowerProfilePhase)), False),
0x0001: ('power_profile_response', (t.uint8_t, t.uint8_t, t.LVList(PowerProfilePhase)), True),
0x0002: ('power_profile_state_response', (t.LVList(PowerProfile), ), True),
0x0003: ('get_power_profile_price', (t.uint8_t, ), False),
0x0004: ('power_profile_state_notification', (t.LVList(PowerProfile), ), False),
0x0005: ('get_overall_schedule_price', (), False),
0x0006: ('energy_phases_schedule_request', (), False),
0x0007: ('energy_phases_schedule_state_response', (t.uint8_t, t.uint8_t), True),
0x0008: ('energy_phases_schedule_state_notification', (t.uint8_t, t.uint8_t), False),
0x0009: ('power_profile_schedule_constraints_notification', (t.uint8_t, t.uint16_t, t.uint16_t), False),
0x000a: ('power_profile_schedule_constraints_response', (t.uint8_t, t.uint16_t, t.uint16_t), True),
0x000b: ('get_power_profile_price_extended', (t.uint8_t, t.uint8_t, t.uint16_t), False),
}
class ApplianceControl(Cluster):
cluster_id = 0x001b
ep_attribute = 'appliance_control'
attributes = {}
server_commands = {}
client_commands = {}
0x0000: ('identify_query_response', (t.uint16_t, ), True),
}
class Groups(Cluster):
"""Attributes and commands for group configuration and
manipulation."""
cluster_id = 0x0004
ep_attribute = 'groups'
attributes = {
0x0000: ('name_support', t.uint8_t), # bitmap8
}
server_commands = {
0x0000: ('add', (t.uint16_t, t.LVBytes), False),
0x0001: ('view', (t.uint16_t, ), False),
0x0002: ('get_membership', (t.LVList(t.uint16_t), ), False),
0x0003: ('remove', (t.uint16_t, ), False),
0x0004: ('remove_all', (), False),
0x0005: ('add_if_identifying', (t.uint16_t, t.LVBytes), False),
}
client_commands = {
0x0000: ('add_response', (t.uint8_t, t.uint16_t), True),
0x0001: ('view_response', (t.uint8_t, t.uint16_t, t.LVBytes), True),
0x0002: ('get_membership_response', (t.uint8_t, t.LVList(t.uint16_t)), True),
0x0003: ('remove_response', (t.uint8_t, t.uint16_t), True),
}
class Scenes(Cluster):
"""Attributes and commands for scene configuration and
manipulation."""
cluster_id = 0x0005
class PowerDescriptor(t.EzspStruct):
_fields = [
('byte_1', 1), # Current power mode 4, Available power sources 4
('byte_2', 1), # Current power source 4, Current power source level 4
]
class SimpleDescriptor(t.EzspStruct):
_fields = [
('endpoint', t.uint8_t),
('profile', t.uint16_t),
('device_type', t.uint16_t),
('device_version', t.uint8_t),
('input_clusters', t.LVList(t.uint16_t)),
('output_clusters', t.LVList(t.uint16_t)),
]
class SizePrefixedSimpleDescriptor(SimpleDescriptor):
def serialize(self):
data = super().serialize()
return len(data).to_bytes(1, 'little') + data
@classmethod
def deserialize(cls, data):
if data[0] == 0:
return None, data[1:]
return SimpleDescriptor.deserialize(data[1:])
class NodeDescriptor(t.EzspStruct):