Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_get_aps():
app_mock = mock.MagicMock()
ieee = t.EmberEUI64(map(t.uint8_t, [0, 1, 2, 3, 4, 5, 6, 7]))
dev = device.Device(app_mock, ieee, 65535)
ep = endpoint.Endpoint(dev, 55)
ep.status = endpoint.Status.ZDO_INIT
ep.profile_id = 99
aps = ep.get_aps(255)
assert aps.profileId == 99
assert aps.clusterId == 255
assert aps.sourceEndpoint == 55
assert aps.destinationEndpoint == 55
def parse_epan(epan):
"""Parse a user specified extended PAN ID"""
epan_list = [t.uint8_t(x, 16) for x in epan.split(":")]
return t.fixed_list(8, t.uint8_t)(epan_list)
:param timeout: how long to wait for transmission ACK
:param broadcast_address: broadcast address.
:returns: return a tuple of a status and an error_message. Original requestor
has more context to provide a more meaningful error message
"""
if not self.is_controller_running:
raise ControllerError("ApplicationController is not running")
aps_frame = t.EmberApsFrame()
aps_frame.profileId = t.uint16_t(profile)
aps_frame.clusterId = t.uint16_t(cluster)
aps_frame.sourceEndpoint = t.uint8_t(src_ep)
aps_frame.destinationEndpoint = t.uint8_t(dst_ep)
aps_frame.options = t.EmberApsOption(t.EmberApsOption.APS_OPTION_NONE)
aps_frame.groupId = t.uint16_t(grpid)
aps_frame.sequence = t.uint8_t(sequence)
message_tag = self.get_sequence()
with self._pending.new(message_tag) as req:
async with self._in_flight_msg:
async with self._req_lock:
res = await self._ezsp.sendBroadcast(
broadcast_address, aps_frame, radius, message_tag, data
)
if res[0] != t.EmberStatus.SUCCESS:
return res[0], "broadcast send failure"
# Wait for messageSentHandler message
res = await asyncio.wait_for(req.result, timeout=APS_ACK_TIMEOUT)
return res
def cb(fut, frame_name, response):
if frame_name == "stackStatusHandler":
fut.set_result(response)
s = await util.setup(ctx.obj["device"], ctx.obj["baudrate"])
channel = None
if len(channels) != 1:
if pan_id or extended_pan_id:
raise click.BadOptionUsage(
"Specify exactly one channel to join a specific network"
)
else:
channel = t.uint8_t(channels[0])
if not (pan_id or extended_pan_id):
scan_type = t.EzspNetworkScanType.ACTIVE_SCAN
channel_mask = util.channel_mask(channels)
click.echo(
"PAN not provided, scanning channels %s..."
% (" ".join(map(str, channels)),)
)
v = await s.startScan(scan_type, channel_mask, 3)
networks = [n[0] for n in v if n[0].allowingJoin]
if len(networks) == 0:
click.echo("No joinable networks found")
return 1
if len(networks) > 1:
click.echo("Multiple joinable networks found. Refusing to pick.")
pass
class Discrete:
pass
DATA_TYPES = {
0x00: ('No data', None, None),
0x08: ('General', t.fixed_list(1, t.uint8_t), Discrete),
0x09: ('General', t.fixed_list(2, t.uint8_t), Discrete),
0x0a: ('General', t.fixed_list(3, t.uint8_t), Discrete),
0x0b: ('General', t.fixed_list(4, t.uint8_t), Discrete),
0x0c: ('General', t.fixed_list(5, t.uint8_t), Discrete),
0x0d: ('General', t.fixed_list(6, t.uint8_t), Discrete),
0x0e: ('General', t.fixed_list(7, t.uint8_t), Discrete),
0x0f: ('General', t.fixed_list(8, t.uint8_t), Discrete),
0x10: ('Boolean', t.Bool, Discrete),
0x18: ('Bitmap', t.uint8_t, Discrete),
0x19: ('Bitmap', t.uint16_t, Discrete),
0x1a: ('Bitmap', t.uint24_t, Discrete),
0x1b: ('Bitmap', t.uint32_t, Discrete),
0x1c: ('Bitmap', t.uint40_t, Discrete),
0x1d: ('Bitmap', t.uint48_t, Discrete),
0x1e: ('Bitmap', t.uint56_t, Discrete),
0x1f: ('Bitmap', t.uint64_t, Discrete),
0x20: ('Unsigned Integer', t.uint8_t, Analog),
0x21: ('Unsigned Integer', t.uint16_t, Analog),
0x22: ('Unsigned Integer', t.uint24_t, Analog),
0x23: ('Unsigned Integer', t.uint32_t, Analog),
0x24: ('Unsigned Integer', t.uint40_t, Analog),
0x25: ('Unsigned Integer', t.uint48_t, Analog),
if v[0] == t.EmberStatus.SUCCESS:
LOGGER.debug("Network was up, leaving...")
v = await s.leaveNetwork()
util.check(v[0], "Failure leaving network: %s" % (v[0],))
await asyncio.sleep(1) # TODO
initial_security_state = zutil.zha_security()
v = await s.setInitialSecurityState(initial_security_state)
util.check(v[0], "Setting security state failed: %s" % (v[0],))
parameters = t.EmberNetworkParameters()
parameters.extendedPanId = extended_pan_id
parameters.panId = pan_id
parameters.radioTxPower = t.uint8_t(8)
parameters.radioChannel = t.uint8_t(channel)
parameters.joinMethod = t.EmberJoinMethod.USE_MAC_ASSOCIATION
parameters.nwkManagerId = t.EmberNodeId(0)
parameters.nwkUpdateId = t.uint8_t(0)
parameters.channels = t.uint32_t(0)
click.echo(parameters)
fut = asyncio.Future()
cbid = s.add_callback(functools.partial(cb, fut))
v = await s.joinNetwork(t.EmberNodeType.END_DEVICE, parameters)
util.check(v[0], "Joining network failed: %s" % (v[0],))
v = await fut
click.echo(v)
s.remove_callback(cbid)
s.close()
empty_key_data = t.EmberKeyData()
empty_key_data.contents = t.fixed_list(16, t.uint8_t)([t.uint8_t(0)] * 16)
zha_key = t.EmberKeyData()
zha_key.contents = t.fixed_list(16, t.uint8_t)(
[t.uint8_t(c) for c in b"ZigBeeAlliance09"]
)
isc = t.EmberInitialSecurityState()
isc.bitmask = t.uint16_t(
t.EmberInitialSecurityBitmask.HAVE_PRECONFIGURED_KEY
| t.EmberInitialSecurityBitmask.REQUIRE_ENCRYPTED_KEY
)
isc.preconfiguredKey = zha_key
isc.networkKey = empty_key_data
isc.networkKeySequenceNumber = t.uint8_t(0)
isc.preconfiguredTrustCenterEui64 = t.EmberEUI64([t.uint8_t(0)] * 8)
if controller:
isc.bitmask |= (
t.EmberInitialSecurityBitmask.TRUST_CENTER_GLOBAL_LINK_KEY
| t.EmberInitialSecurityBitmask.HAVE_NETWORK_KEY
)
isc.bitmask = t.uint16_t(isc.bitmask)
random_key = t.fixed_list(16, t.uint8_t)([t.uint8_t(x) for x in os.urandom(16)])
isc.networkKey = random_key
return isc
def zha_security(controller=False):
empty_key_data = t.EmberKeyData()
empty_key_data.contents = t.fixed_list(16, t.uint8_t)([t.uint8_t(0)] * 16)
zha_key = t.EmberKeyData()
zha_key.contents = t.fixed_list(16, t.uint8_t)(
[t.uint8_t(c) for c in b"ZigBeeAlliance09"]
)
isc = t.EmberInitialSecurityState()
isc.bitmask = t.uint16_t(
t.EmberInitialSecurityBitmask.HAVE_PRECONFIGURED_KEY
| t.EmberInitialSecurityBitmask.REQUIRE_ENCRYPTED_KEY
)
isc.preconfiguredKey = zha_key
isc.networkKey = empty_key_data
isc.networkKeySequenceNumber = t.uint8_t(0)
isc.preconfiguredTrustCenterEui64 = t.EmberEUI64([t.uint8_t(0)] * 8)
if controller:
isc.bitmask |= (
t.EmberInitialSecurityBitmask.TRUST_CENTER_GLOBAL_LINK_KEY
| t.EmberInitialSecurityBitmask.HAVE_NETWORK_KEY
def deserialize(cls, data):
self = cls()
self.direction, data = t.Bool.deserialize(data)
self.attrid, data = t.uint16_t.deserialize(data)
if self.direction:
# Requesting things to be received by me
self.timeout, data = t.uint16_t.deserialize(data)
else:
# Notifying that I will report things to you
self.datatype, data = t.uint8_t.deserialize(data)
self.min_interval, data = t.uint16_t.deserialize(data)
self.max_interval, data = t.uint16_t.deserialize(data)
datatype = DATA_TYPES[self.datatype]
if datatype[2] is Analog:
self.reportable_change, data = datatype[1].deserialize(data)
return self, data
async def subscribe(self, group_id) -> t.EmberStatus:
if group_id in self._multicast:
LOGGER.debug("%s is already subscribed", t.EmberMulticastId(group_id))
return t.EmberStatus.SUCCESS
try:
idx = self._available.pop()
except KeyError:
LOGGER.error("No more available slots MulticastId subscription")
return t.EmberStatus.INDEX_OUT_OF_RANGE
entry = t.EmberMulticastTableEntry()
entry.endpoint = t.uint8_t(1)
entry.multicastId = t.EmberMulticastId(group_id)
entry.networkIndex = t.uint8_t(0)
status = await self._ezsp.setMulticastTableEntry(idx, entry)
if status[0] != t.EmberStatus.SUCCESS:
LOGGER.warning(
"Set MulticastTableEntry #%s for %s multicast id: %s",
idx,
entry.multicastId,
status,
)
self._available.add(idx)
return status[0]
self._multicast[entry.multicastId] = (entry, idx)
LOGGER.debug(
"Set MulticastTableEntry #%s for %s multicast id: %s",
idx,
entry.multicastId,