How to use the bellows.zigbee function in bellows

To help you get started, we’ve selected a few bellows examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github zigpy / bellows / bellows / zigbee / endpoint.py View on Github external
def add_input_cluster(self, cluster_id):
        """Adds an endpoint's input cluster

        (a server cluster supported by the device)
        """
        if cluster_id in self.in_clusters:
            return self.in_clusters[cluster_id]

        cluster = bellows.zigbee.zcl.Cluster.from_id(self, cluster_id)
        self.in_clusters[cluster_id] = cluster
        if hasattr(cluster, 'ep_attribute'):
            self._cluster_attr[cluster.ep_attribute] = cluster

        listener = bellows.zigbee.appdb.ClusterPersistingListener(
            self._device.application._dblistener,
            cluster,
        )
        cluster.add_listener(listener)

        return cluster
github zigpy / bellows / bellows / zigbee / appdb.py View on Github external
def load(self):
        LOGGER.debug("Loading application state from %s", self._database_file)
        for (ieee, nwk, status) in self._scan("devices"):
            dev = self._application.add_device(ieee, nwk)
            dev.status = bellows.zigbee.device.Status(status)

        for (ieee, epid, profile_id, device_type, status) in self._scan("endpoints"):
            dev = self._application.get_device(ieee)
            ep = dev.add_endpoint(epid)
            ep.profile_id = profile_id
            try:
                if profile_id == 260:
                    device_type = bellows.zigbee.profiles.zha.DeviceType(device_type)
                elif profile_id == 49246:
                    device_type = bellows.zigbee.profiles.zll.DeviceType(device_type)
            except ValueError:
                pass
            ep.device_type = device_type
            ep.status = bellows.zigbee.endpoint.Status(status)

        for (ieee, endpoint_id, cluster) in self._scan("clusters"):
            dev = self._application.get_device(ieee)
            ep = dev.endpoints[endpoint_id]
            ep.add_input_cluster(cluster)

        for (ieee, endpoint_id, cluster) in self._scan("output_clusters"):
            dev = self._application.get_device(ieee)
            ep = dev.endpoints[endpoint_id]
            ep.add_output_cluster(cluster)

        for (ieee, endpoint_id, cluster, attrid, value) in self._scan("attributes"):
github zigpy / bellows / bellows / cli / application.py View on Github external
def commands(ctx):
    database = ctx.obj["database_file"]
    node = ctx.obj["node"]
    endpoint_id = ctx.obj["endpoint"]
    cluster_id = ctx.obj["cluster"]

    ezsp = bellows.ezsp.EZSP()
    app = bellows.zigbee.application.ControllerApplication(ezsp, database)

    dev, endpoint, cluster = util.get_in_cluster(app, node, endpoint_id, cluster_id)
    if cluster is None:
        return

    for c in cluster.commands:
        click.echo(c)
github home-assistant / home-assistant / homeassistant / components / zha / core / registries.py View on Github external
def establish_device_mappings():
    """Establish mappings between ZCL objects and HA ZHA objects.

    These cannot be module level, as importing bellows must be done in a
    in a function.
    """
    RADIO_TYPES[RadioType.ezsp.name] = {
        ZHA_GW_RADIO: bellows.ezsp.EZSP,
        CONTROLLER: bellows.zigbee.application.ControllerApplication,
        ZHA_GW_RADIO_DESCRIPTION: "EZSP",
    }

    RADIO_TYPES[RadioType.deconz.name] = {
        ZHA_GW_RADIO: zigpy_deconz.api.Deconz,
        CONTROLLER: zigpy_deconz.zigbee.application.ControllerApplication,
        ZHA_GW_RADIO_DESCRIPTION: "Deconz",
    }

    RADIO_TYPES[RadioType.xbee.name] = {
        ZHA_GW_RADIO: zigpy_xbee.api.XBee,
        CONTROLLER: zigpy_xbee.zigbee.application.ControllerApplication,
        ZHA_GW_RADIO_DESCRIPTION: "XBee",
    }

    RADIO_TYPES[RadioType.zigate.name] = {
github zigpy / bellows / bellows / zigbee / appdb.py View on Github external
def load(self):
        LOGGER.debug("Loading application state from %s", self._database_file)
        for (ieee, nwk, status) in self._scan("devices"):
            dev = self._application.add_device(ieee, nwk)
            dev.status = bellows.zigbee.device.Status(status)

        for (ieee, epid, profile_id, device_type, status) in self._scan("endpoints"):
            dev = self._application.get_device(ieee)
            ep = dev.add_endpoint(epid)
            ep.profile_id = profile_id
            try:
                if profile_id == 260:
                    device_type = bellows.zigbee.profiles.zha.DeviceType(device_type)
                elif profile_id == 49246:
                    device_type = bellows.zigbee.profiles.zll.DeviceType(device_type)
            except ValueError:
                pass
            ep.device_type = device_type
            ep.status = bellows.zigbee.endpoint.Status(status)

        for (ieee, endpoint_id, cluster) in self._scan("clusters"):