Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_str():
assert str(t.EzspStatus.deserialize(b"\0")[0]) == "EzspStatus.SUCCESS"
0x8003: ('Power_Desc_rsp', (STATUS, NWKI, ('PowerDescriptor', PowerDescriptor))),
0x8004: ('Simple_Desc_rsp', (STATUS, NWKI, ('SimpleDescriptor', SizePrefixedSimpleDescriptor))),
0x8005: ('Active_EP_rsp', (STATUS, NWKI, ('ActiveEPList', t.LVList(t.uint8_t)))),
0x8006: ('Match_Desc_rsp', (STATUS, NWKI, ('MatchList', t.LVList(t.uint8_t)))),
# 0x8010: ('Complex_Desc_rsp', (STATUS, NWKI, ('Length', t.uint8_t), ('ComplexDescriptor', ComplexDescriptor))),
0x8011: ('User_Desc_rsp', (STATUS, NWKI, ('Length', t.uint8_t), ('UserDescriptor', t.fixed_list(16, t.uint8_t)))),
0x8012: ('Discovery_Cache_rsp', (STATUS, )),
0x8014: ('User_Desc_conf', (STATUS, NWKI)),
0x8015: ('System_Server_Discovery_rsp', (STATUS, ('ServerMask', t.uint16_t))),
0x8016: ('Discovery_Store_rsp', (STATUS, )),
0x8017: ('Node_Desc_store_rsp', (STATUS, )),
0x8018: ('Power_Desc_store_rsp', (STATUS, IEEE, ('PowerDescriptor', PowerDescriptor))),
0x8019: ('Active_EP_store_rsp', (STATUS, )),
0x801a: ('Simple_Desc_store_rsp', (STATUS, )),
0x801b: ('Remove_node_cache_rsp', (STATUS, )),
0x801c: ('Find_node_cache_rsp', (('CacheNWKAddr', t.EmberEUI64), NWK, IEEE)),
0x801d: ('Extended_Simple_Desc_rsp', (STATUS, NWK, ('Endpoint', t.uint8_t), ('AppInputClusterCount', t.uint8_t), ('AppOutputClusterCount', t.uint8_t), ('StartIndex', t.uint8_t), ('AppClusterList', t.List(t.uint16_t)))),
0x801e: ('Extended_Active_EP_rsp', (STATUS, NWKI, ('ActiveEPCount', t.uint8_t), ('StartIndex', t.uint8_t), ('ActiveEPList', t.List(t.uint8_t)))),
# Bind Management Server Services Responses
0x8020: ('End_Device_Bind_rsp', (STATUS, )),
0x8021: ('Bind_rsp', (STATUS, )),
0x8022: ('Unbind_rsp', (STATUS, )),
# ... TODO optional stuff ...
# Network Management Server Services Responses
# ... TODO optional stuff ...
0x8034: ('Mgmt_Leave_rsp', (STATUS, )),
0x8036: ('Mgmt_Permit_Joining_rsp', (STATUS, )),
# ... TODO optional stuff ...
}
# Rewrite to (name, param_names, param_types)
0x0017: ('max_pin_len', t.uint8_t),
0x0018: ('min_pin_len', t.uint8_t),
0x0019: ('max_rfid_len', t.uint8_t),
0x001a: ('min_rfid_len', t.uint8_t),
0x0020: ('enable__logging', t.Bool),
0x0021: ('language', t.LVBytes),
0x0022: ('led_settings', t.uint8_t),
0x0023: ('auto_relock_time', t.uint32_t),
0x0024: ('sound_volume', t.uint8_t),
0x0025: ('operating_mode', t.uint32_t),
0x0026: ('lock_type', t.uint16_t), # bitmap16
0x0027: ('default_configuration_register', t.uint16_t), # bitmap16
0x0028: ('enable_local_programming', t.Bool),
0x0029: ('enable_one_touch_locking', t.Bool),
0x002a: ('enable_inside_status_led', t.Bool),
0x002b: ('enable_privacy_mode_button', t.Bool),
0x0030: ('wrong_code_entry_limit', t.uint8_t),
0x0031: ('user_code_temporary_disable_time', t.uint8_t),
0x0032: ('send_pin_ota', t.Bool),
0x0033: ('require_pin_for_rf_operation', t.Bool),
0x0034: ('zigbee_security_level', t.uint8_t),
0x0040: ('alarm_mask', t.uint16_t), # bitmap16
0x0041: ('keypad_operation_event_mask', t.uint16_t), # bitmap16
0x0042: ('rf_operation_event_mask', t.uint16_t), # bitmap16
0x0043: ('manual_operation_event_mask', t.uint16_t), # bitmap16
0x0044: ('rfid_operation_event_mask', t.uint16_t), # bitmap16
0x0045: ('keypad_programming_event_mask', t.uint16_t), # bitmap16
0x0046: ('rf_programming_event_mask', t.uint16_t), # bitmap16
0x0047: ('rfid_programming_event_mask', t.uint16_t), # bitmap16
}
server_commands = {
0x0000: ('lock_door', (), False),
import bellows.types as t
from bellows.zigbee.zcl import Cluster
class ApplianceIdentification(Cluster):
cluster_id = 0x0b00
name = 'Appliance Identification'
ep_attribute = 'appliance_id'
attributes = {
0x0000: ('basic_identification', t.uint56_t),
0x0010: ('company_name', t.LVBytes),
0x0011: ('company_id', t.uint16_t),
0x0012: ('brand_name', t.LVBytes),
0x0013: ('brand_id', t.uint16_t),
0x0014: ('model', t.LVBytes),
0x0015: ('part_number', t.LVBytes),
0x0016: ('product_revision', t.LVBytes),
0x0017: ('software_revision', t.LVBytes),
0x0018: ('product_type_name', t.LVBytes),
0x0019: ('product_type_id', t.uint16_t),
0x001a: ('ceced_specification_version', t.uint8_t),
}
server_commands = {}
client_commands = {}
from datetime import datetime
import bellows.types as t
from bellows.zigbee.zcl import Cluster
from bellows.zigbee.zcl import foundation
class Basic(Cluster):
""" Attributes for determining basic information about a
device, setting user device information such as location,
and enabling a device.
"""
cluster_id = 0x0000
ep_attribute = 'basic'
attributes = {
# Basic Device Information
0x0000: ('zcl_version', t.uint8_t),
0x0001: ('app_version', t.uint8_t),
0x0002: ('stack_version', t.uint8_t),
0x0003: ('hw_version', t.uint8_t),
0x0004: ('manufacturer', t.LVBytes),
0x0005: ('model', t.LVBytes),
0x0006: ('date_code', t.LVBytes),
0x0007: ('power_source', t.uint8_t), # enum8
0x0008: ('app_profile_version', t.uint8_t), # enum8
# Basic Device Settings
0x0010: ('location_desc', t.LVBytes),
0x0011: ('physical_env', t.uint8_t), # enum8
0x0012: ('device_enabled', t.Bool),
0x0013: ('alarm_mask', t.uint8_t), # bitmap8
0x0014: ('disable_local_config', t.uint8_t), # bitmap8
0x4000: ('sw_build_id', t.LVBytes),
}
}
server_commands = {}
client_commands = {}
class UserInterface(Cluster):
"""An interface for configuring the user interface of a
thermostat (which may be remote from the
thermostat)."""
cluster_id = 0x0204
name = 'Thermostat User Interface Configuration'
ep_attribute = 'thermostat_ui'
attributes = {
0x0000: ('temp_display_mode', t.uint8_t), # enum8
0x0001: ('keypad_lockout', t.uint8_t), # enum8
0x0002: ('programming_visibility', t.uint8_t), # enum8
}
server_commands = {}
client_commands = {}
0x1b: ('Bitmap', t.uint32_t, Discrete),
0x1c: ('Bitmap', t.uint40_t, Discrete),
0x1d: ('Bitmap', t.uint48_t, Discrete),
0x1e: ('Bitmap', t.uint56_t, Discrete),
0x1f: ('Bitmap', t.uint64_t, Discrete),
0x20: ('Unsigned Integer', t.uint8_t, Analog),
0x21: ('Unsigned Integer', t.uint16_t, Analog),
0x22: ('Unsigned Integer', t.uint24_t, Analog),
0x23: ('Unsigned Integer', t.uint32_t, Analog),
0x24: ('Unsigned Integer', t.uint40_t, Analog),
0x25: ('Unsigned Integer', t.uint48_t, Analog),
0x26: ('Unsigned Integer', t.uint56_t, Analog),
0x27: ('Unsigned Integer', t.uint64_t, Analog),
0x28: ('Signed Integer', t.int8s, Analog),
0x29: ('Signed Integer', t.int16s, Analog),
0x2a: ('Signed Integer', t.int24s, Analog),
0x2b: ('Signed Integer', t.int32s, Analog),
0x2c: ('Signed Integer', t.int40s, Analog),
0x2d: ('Signed Integer', t.int48s, Analog),
0x2e: ('Signed Integer', t.int56s, Analog),
0x2f: ('Signed Integer', t.int64s, Analog),
0x30: ('Enumeration', t.uint8_t, Discrete),
0x31: ('Enumeration', t.uint16_t, Discrete),
# 0x38: ('Floating point', t.Half, Analog),
0x39: ('Floating point', t.Single, Analog),
0x3a: ('Floating point', t.Double, Analog),
0x41: ('Octet string', t.LVBytes, Discrete),
0x42: ('Character string', t.LVBytes, Discrete),
# 0x43: ('Long octet string', ),
# 0x44: ('Long character string', ),
# 0x48: ('Array', ),
# 0x4c: ('Structure', ),
0x0023: ('event_enable', t.uint8_t), # bitmap8
0x0024: ('event_state', t.uint8_t), # enum8
0x0025: ('fault_values', t.uint16_t),
0x0048: ('notify_type', t.uint8_t), # enum8
0x0071: ('time_delay', t.uint8_t),
# 0x0082: ('event_time_stamps', TODO.array), # Array[3] of (16-bit unsigned integer, time of day, or structure of (date, time of day))
}
server_commands = {}
client_commands = {}
class MultistateOutputRegular(Cluster):
cluster_id = 0x0610
ep_attribute = 'bacnet_regular_multistate_output'
attributes = {
0x001f: ('device_type', t.LVBytes),
0x0028: ('feed_back_value', t.uint8_t), # enum8
0x004b: ('object_id', t.fixed_list(4, t.uint8_t)),
0x004d: ('object_name', t.LVBytes),
0x004f: ('object_type', t.uint16_t), # enum16
0x00a8: ('profile_name', t.LVBytes),
}
server_commands = {}
client_commands = {}
class MultistateOutputExtended(Cluster):
cluster_id = 0x0611
ep_attribute = 'bacnet_extended_multistate_output'
attributes = {
0x0000: ('acked_transitions', t.uint8_t), # bitmap8
0x0011: ('notification_class', t.uint16_t),
async def subscribe(self, group_id) -> t.EmberStatus:
if group_id in self._multicast:
LOGGER.debug("%s is already subscribed", t.EmberMulticastId(group_id))
return t.EmberStatus.SUCCESS
try:
idx = self._available.pop()
except KeyError:
LOGGER.error("No more available slots MulticastId subscription")
return t.EmberStatus.INDEX_OUT_OF_RANGE
entry = t.EmberMulticastTableEntry()
entry.endpoint = t.uint8_t(1)
entry.multicastId = t.EmberMulticastId(group_id)
entry.networkIndex = t.uint8_t(0)
status = await self._ezsp.setMulticastTableEntry(idx, entry)
if status[0] != t.EmberStatus.SUCCESS:
LOGGER.warning(
"""Security and Safety Functional Domain"""
import bellows.types as t
from bellows.zigbee.zcl import Cluster
class IasZone(Cluster):
cluster_id = 0x0500
name = 'IAS Zone'
ep_attribute = 'ias_zone'
attributes = {
# Zone Information
0x0000: ('zone_state', t.uint8_t), # enum8
0x0001: ('zone_type', t.uint16_t), # enum16
0x0002: ('zone_status', t.uint16_t), # bitmap16
# Zone Settings
0x0010: ('cie_addr', t.EmberEUI64),
0x0011: ('zone_id', t.uint8_t),
0x0012: ('num_zone_sensitivity_levels_supported', t.uint8_t),
0x0013: ('current_zone_sensitivity_level', t.uint8_t),
}
server_commands = {
0x0000: ('enroll_response', (t.uint8_t, t.uint8_t), True),
0x0001: ('init_normal_op_mode', (), False),
0x0002: ('init_test_mode', (), False),
}
client_commands = {
0x0000: ('status_change_notification', (t.uint16_t, t.uint8_t, t.uint8_t, t.uint16_t), False),
0x0001: ('enroll', (), False),