Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def mock_set(*args):
if success:
return [t.EmberStatus.SUCCESS]
return [t.EmberStatus.ERR_FATAL]
mc = coordinator.application._multicast
mc.subscribe.side_effect = asyncio.coroutine(
mock.MagicMock(return_value=t.EmberStatus.SUCCESS)
)
grp_id = 0x2345
assert grp_id not in coordinator.endpoints[1].member_of
ret = await coordinator.endpoints[1].add_to_group(grp_id)
assert ret == t.EmberStatus.SUCCESS
assert mc.subscribe.call_count == 1
assert mc.subscribe.call_args[0][0] == grp_id
assert grp_id in coordinator.endpoints[1].member_of
mc.reset_mock()
ret = await coordinator.endpoints[1].add_to_group(grp_id)
assert ret == t.EmberStatus.SUCCESS
assert mc.subscribe.call_count == 0
async def test_initialize_fail_configured_size(multicast):
multicast._ezsp.getConfigurationValue.return_value = t.EmberStatus.ERR_FATAL, 16
await multicast._initialize()
ezsp = multicast._ezsp
assert ezsp.getMulticastTableEntry.call_count == 0
assert len(multicast._available) == 0
async def leave(ctx):
"""Leave the ZigBee network"""
s = await util.setup(ctx.obj["device"], ctx.obj["baudrate"])
v = await util.network_init(s)
if v[0] == t.EmberStatus.NOT_JOINED:
click.echo("Not joined, not leaving")
else:
v = await s.leaveNetwork()
util.check(v[0], "Failure leaving network: %s" % (v[0],))
s.close()
async def _list_command(self, name, item_frames, completion_frame, spos, *args):
"""Run a command, returning result callbacks as a list"""
fut = asyncio.Future()
results = []
def cb(frame_name, response):
if frame_name in item_frames:
results.append(response)
elif frame_name == completion_frame:
fut.set_result(response)
cbid = self.add_callback(cb)
try:
v = await self._command(name, *args)
if v[0] != t.EmberStatus.SUCCESS:
raise Exception(v)
v = await fut
if v[spos] != t.EmberStatus.SUCCESS:
raise Exception(v)
finally:
self.remove_callback(cbid)
return results
async def formNetwork(self, parameters): # noqa: N802
fut = asyncio.Future()
def cb(frame_name, response):
nonlocal fut
if frame_name == "stackStatusHandler":
fut.set_result(response)
self.add_callback(cb)
v = await self._command("formNetwork", parameters)
if v[0] != t.EmberStatus.SUCCESS:
raise Exception("Failure forming network: %s" % (v,))
v = await fut
if v[0] != t.EmberStatus.NETWORK_UP:
raise Exception("Failure forming network: %s" % (v,))
return v
def ezsp_callback_handler(self, frame_name, args):
LOGGER.debug("Received %s frame with %s", frame_name, args)
if frame_name == "incomingMessageHandler":
self._handle_frame(*args)
elif frame_name == "messageSentHandler":
if args[4] != t.EmberStatus.SUCCESS:
self._handle_frame_failure(*args)
else:
self._handle_frame_sent(*args)
elif frame_name == "trustCenterJoinHandler":
if args[2] == t.EmberDeviceUpdate.DEVICE_LEFT:
self.handle_leave(args[0], args[1])
elif frame_name == "incomingRouteRecordHandler":
self.handle_route_record(*args)
elif frame_name == "incomingRouteErrorHandler":
self.handle_route_error(*args)
elif frame_name == "_reset_controller_application":
self._handle_reset_request(*args)
async def add_to_group(self, grp_id: int, name: str = None) -> t.EmberStatus:
if grp_id in self.member_of:
return t.EmberStatus.SUCCESS
app = self.device.application
status = await app.multicast.subscribe(grp_id)
if status != t.EmberStatus.SUCCESS:
self.debug("Couldn't subscribe to 0x%04x group", grp_id)
return status
group = app.groups.add_group(grp_id, name)
group.add_member(self)
return status
async def unsubscribe(self, group_id) -> t.EmberStatus:
try:
entry, idx = self._multicast[group_id]
except KeyError:
LOGGER.error(
"Couldn't find MulticastTableEntry for %s multicast_id", group_id
)
return t.EmberStatus.INDEX_OUT_OF_RANGE
entry.endpoint = t.uint8_t(0)
status = await self._ezsp.setMulticastTableEntry(idx, entry)
if status[0] != t.EmberStatus.SUCCESS:
LOGGER.warning(
"Set MulticastTableEntry #%s for %s multicast id: %s",
idx,
entry.multicastId,
status,
)
return status[0]
self._multicast.pop(group_id)
self._available.add(idx)
LOGGER.debug(
"Set MulticastTableEntry #%s for %s multicast id: %s",
idx,
entry.multicastId,
status,
)
ezsp = await util.setup(ctx.obj["device"], ctx.obj["baudrate"], configure=False)
version, plat, micro, phy = await ezsp.getStandaloneBootloaderVersionPlatMicroPhy()
if version == 0xFFFF:
click.echo("No boot loader installed")
ezsp.close()
return
click.echo(
(
f"bootloader version: 0x{version:04x}, nodePlat: 0x{plat:02x}, "
f"nodeMicro: 0x{micro:02x}, nodePhy: 0x{phy:02x}"
)
)
res = await ezsp.launchStandaloneBootloader(0x00)
if res[0] != t.EmberStatus.SUCCESS:
click.echo(f"Couldn't launch bootloader: {res[0]}")
else:
click.echo("bootloader launched successfully")
ezsp.close()