Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def app(monkeypatch):
ezsp = mock.MagicMock()
ezsp.setConcentrator = asynctest.CoroutineMock(return_value=[0])
ezsp.setSourceRoute = asynctest.CoroutineMock(return_value=[0])
type(ezsp).is_ezsp_running = mock.PropertyMock(return_value=True)
config = bellows.zigbee.application.ControllerApplication.SCHEMA(APP_CONFIG)
ctrl = bellows.zigbee.application.ControllerApplication(config)
ctrl._ezsp = ezsp
monkeypatch.setattr(bellows.zigbee.application, "APS_ACK_TIMEOUT", 0.1)
ctrl._ctrl_event.set()
ctrl._in_flight_msg = asyncio.Semaphore()
ctrl.handle_message = mock.MagicMock()
return ctrl
def make_app(database_file):
ezsp = mock.MagicMock()
return ControllerApplication(ezsp, database_file)
def test_src_rtg_config(config, result):
"""Test src routing configuration parameter."""
app_cfg = bellows.zigbee.application.ControllerApplication.SCHEMA(APP_CONFIG)
ctrl = bellows.zigbee.application.ControllerApplication(app_cfg)
assert ctrl.use_source_routing is False
app_cfg = bellows.zigbee.application.ControllerApplication.SCHEMA(
{**APP_CONFIG, **config}
)
ctrl = bellows.zigbee.application.ControllerApplication(config=app_cfg)
assert ctrl.use_source_routing is result
def app(monkeypatch):
ezsp = mock.MagicMock()
ezsp.setConcentrator = asynctest.CoroutineMock(return_value=[0])
ezsp.setSourceRoute = asynctest.CoroutineMock(return_value=[0])
type(ezsp).is_ezsp_running = mock.PropertyMock(return_value=True)
config = bellows.zigbee.application.ControllerApplication.SCHEMA(APP_CONFIG)
ctrl = bellows.zigbee.application.ControllerApplication(config)
ctrl._ezsp = ezsp
monkeypatch.setattr(bellows.zigbee.application, "APS_ACK_TIMEOUT", 0.1)
ctrl._ctrl_event.set()
ctrl._in_flight_msg = asyncio.Semaphore()
ctrl.handle_message = mock.MagicMock()
return ctrl
def print_clusters(title, clusters):
clusters = sorted(list(clusters.items()))
if clusters:
click.echo(" %s:" % (title,))
for cluster_id, cluster in clusters:
click.echo(" %s (%s)" % (cluster.name, cluster_id))
loop = asyncio.get_event_loop()
config = {
zigpy.config.CONF_DATABASE: database,
bellows.config.CONF_DEVICE: {bellows.config.CONF_DEVICE_PATH: "/dev/null"},
}
config = bellows.config.CONFIG_SCHEMA(config)
app = loop.run_until_complete(
bellows.zigbee.application.ControllerApplication.new(config, start_radio=False)
)
for ieee, dev in app.devices.items():
click.echo("Device:")
click.echo(" NWK: 0x%04x" % (dev.nwk,))
click.echo(" IEEE: %s" % (ieee,))
click.echo(" Endpoints:")
for epid, ep in dev.endpoints.items():
if epid == 0:
continue
if ep.status == zigpy.endpoint.Status.NEW:
click.echo(" %s: Uninitialized")
else:
click.echo(
" %s: profile=0x%02x, device_type=%s"
% (epid, ep.profile_id, ep.device_type)
)
async def setup_application(app_config, startup=True):
app_config = bellows.zigbee.application.ControllerApplication.SCHEMA(app_config)
app = await bellows.zigbee.application.ControllerApplication.new(
app_config, start_radio=startup
)
return app
async def setup_application(app_config, startup=True):
app_config = bellows.zigbee.application.ControllerApplication.SCHEMA(app_config)
app = await bellows.zigbee.application.ControllerApplication.new(
app_config, start_radio=startup
)
return app