How to use the bellows.ezsp.EZSP function in bellows

To help you get started, we’ve selected a few bellows examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github zigpy / bellows / tests / test_application.py View on Github external
def _test_startup(app, nwk_type, ieee, auto_form=False, init=0, ezsp_version=4):
    async def mockezsp(*args, **kwargs):
        return [0, nwk_type]

    async def mockinit(*args, **kwargs):
        return [init]

    app._in_flight_msg = None
    ezsp_mock = mock.MagicMock(spec_set=bellows.ezsp.EZSP)
    type(ezsp_mock.return_value).ezsp_version = mock.PropertyMock(
        return_value=ezsp_version
    )
    ezsp_mock.return_value.connect = CoroutineMock()
    ezsp_mock.return_value.setConcentrator = CoroutineMock()
    ezsp_mock.return_value._command = mockezsp
    ezsp_mock.return_value.addEndpoint = mockezsp
    ezsp_mock.return_value.setConfigurationValue = mockezsp
    ezsp_mock.return_value.networkInit = mockinit
    ezsp_mock.return_value.getNetworkParameters = mockezsp
    ezsp_mock.return_value.setPolicy = mockezsp
    ezsp_mock.return_value.getNodeId = mockezsp
    ezsp_mock.return_value.getEui64.side_effect = CoroutineMock(return_value=[ieee])
    ezsp_mock.return_value.leaveNetwork = mockezsp
    app.form_network = CoroutineMock()
    ezsp_mock.return_value.reset.side_effect = CoroutineMock()
github zigpy / bellows / tests / test_application.py View on Github external
async def test_probe_success(mock_connect, mock_reset):
    """Test device probing."""

    res = await ezsp.EZSP.probe(APP_CONFIG[config.CONF_DEVICE])
    assert res is True
    assert mock_connect.call_count == 1
    assert mock_connect.await_count == 1
    assert mock_reset.call_count == 1
    assert mock_connect.return_value.close.call_count == 1

    mock_connect.reset_mock()
    mock_reset.reset_mock()
    mock_connect.reset_mock()
    res = await ezsp.EZSP.probe(APP_CONFIG[config.CONF_DEVICE])
    assert res is True
    assert mock_connect.call_count == 1
    assert mock_connect.await_count == 1
    assert mock_reset.call_count == 1
    assert mock_connect.return_value.close.call_count == 1
github zigpy / bellows / tests / test_ezsp.py View on Github external
@mock.patch.object(ezsp.EZSP, "reset", new_callable=CoroutineMock)
@mock.patch.object(uart, "connect")
@pytest.mark.parametrize(
    "exception", (asyncio.TimeoutError, serial.SerialException, EzspError)
)
async def test_probe_fail(mock_connect, mock_reset, exception):
    """Test device probing fails."""

    mock_reset.side_effect = exception
    mock_reset.reset_mock()
    mock_connect.reset_mock()
    res = await ezsp.EZSP.probe(DEVICE_CONFIG)
    assert res is False
    assert mock_connect.call_count == 1
    assert mock_connect.await_count == 1
    assert mock_reset.call_count == 1
    assert mock_connect.return_value.close.call_count == 1
github zigpy / bellows / tests / test_application.py View on Github external
async def test_probe_success(mock_connect, mock_reset):
    """Test device probing."""

    res = await ezsp.EZSP.probe(APP_CONFIG[config.CONF_DEVICE])
    assert res is True
    assert mock_connect.call_count == 1
    assert mock_connect.await_count == 1
    assert mock_reset.call_count == 1
    assert mock_connect.return_value.close.call_count == 1

    mock_connect.reset_mock()
    mock_reset.reset_mock()
    mock_connect.reset_mock()
    res = await ezsp.EZSP.probe(APP_CONFIG[config.CONF_DEVICE])
    assert res is True
    assert mock_connect.call_count == 1
    assert mock_connect.await_count == 1
    assert mock_reset.call_count == 1
    assert mock_connect.return_value.close.call_count == 1
github zigpy / bellows / tests / test_ezsp.py View on Github external
def ezsp_f():
    api = ezsp.EZSP(DEVICE_CONFIG)
    api._gw = mock.MagicMock(spec_set=uart.Gateway)
    return api
github zigpy / bellows / bellows / zigbee / application.py View on Github external
async def startup(self, auto_form=False):
        """Perform a complete application startup"""
        ezsp = bellows.ezsp.EZSP(self.config[zigpy.config.CONF_DEVICE])
        self._ezsp = ezsp
        await self.initialize()

        await self.set_source_routing()
        v = await ezsp.networkInit()
        if v[0] != t.EmberStatus.SUCCESS:
            if not auto_form:
                raise Exception("Could not initialize network")
            await self.form_network()

        v = await ezsp.getNetworkParameters()
        assert v[0] == t.EmberStatus.SUCCESS  # TODO: Better check
        if v[1] != t.EmberNodeType.COORDINATOR:
            if not auto_form:
                raise Exception("Network not configured as coordinator")
github zigpy / bellows / bellows / cli / util.py View on Github external
async def setup(dev, baudrate, cbh=None, configure=True):
    device_config = {
        config.CONF_DEVICE_PATH: dev,
        config.CONF_DEVICE_BAUDRATE: baudrate,
    }
    s = bellows.ezsp.EZSP(device_config)
    if cbh:
        s.add_callback(cbh)
    try:
        await s.connect()
    except Exception as e:
        LOGGER.error(e)
        raise click.Abort()
    LOGGER.debug("Connected. Resetting.")
    await s.reset()
    await s.version()

    async def cfg(config_id, value):
        v = await s.setConfigurationValue(config_id, value)
        check(v[0], "Setting config %s to %s: %s" % (config_id, value, v[0]))

    c = t.EzspConfigId
github zigpy / bellows / bellows / cli / application.py View on Github external
def commands(ctx):
    database = ctx.obj["database_file"]
    node = ctx.obj["node"]
    endpoint_id = ctx.obj["endpoint"]
    cluster_id = ctx.obj["cluster"]

    ezsp = bellows.ezsp.EZSP()
    app = bellows.zigbee.application.ControllerApplication(ezsp, database)

    dev, endpoint, cluster = util.get_in_cluster(app, node, endpoint_id, cluster_id)
    if cluster is None:
        return

    for c in cluster.commands:
        click.echo(c)