Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""
KNX device objects are created and the MQTT server connection is established.
Then the XKNX Daemon will be started.
Then everything else happens in the device_updated-function above as it is triggered when we receive data.
"""
global mqttc
# Connect to KNX/IP device and listen if a switch was updated via KNX bus.
xknx = XKNX(device_updated_cb=device_updated_cb)
# The KNX addresses to monitor are defined below, but is normally placed in an external
# file that is loaded in on start.
# Generic Types not specifically supported by XKNX
el_meter_reading_active_energy = Sensor(
xknx, 'EL-T-O_MeterReading_ActiveEnergy', group_address_state='5/6/11', value_type="DPT-13")
xknx.devices.add(el_meter_reading_active_energy)
el_meter_reading_reactive_energy = Sensor(
xknx, 'EL-T-O_MeterReading_ReactiveEnergy', group_address_state='5/6/16', value_type="DPT-13")
xknx.devices.add(el_meter_reading_reactive_energy)
# Active Power
el_total_active_power = Sensor(
xknx, 'EL-T-O_TotalActivePower', group_address_state='5/6/24', value_type="power")
xknx.devices.add(el_total_active_power)
el_active_power_l1 = Sensor(
xknx, 'EL-T-O_ActivePower_L1', group_address_state='5/6/25', value_type="power")
xknx.devices.add(el_active_power_l1)
# ...
# Reactive Power
# ...
# Current
el_current_l1 = Sensor(
xknx, 'EL-T-O_Current_L1', group_address_state='5/6/45', value_type="electric_current")
xknx.devices.add(el_current_l1)
# ...
# Voltage
el_voltage_l1 = Sensor(
xknx, 'EL-T-O_Voltage_L1-N', group_address_state='5/6/48', value_type="electric_potential")
xknx.devices.add(el_voltage_l1)
# ...
# Frequency
el_frequency = Sensor(
xknx, 'EL-T-O_Frequency', group_address_state='5/6/53', value_type="frequency")
xknx.devices.add(el_frequency)
mqttc.connect(BROKER_ADDRESS, 8883, 60)
mqttc.loop_start()
# Wait until Ctrl-C was pressed
await xknx.start(daemon_mode=True)
await xknx.stop()
await mqttc.loop_stop()
await mqttc.disconnect()
# ...
# Reactive Power
el_total_reactive_power = Sensor(
xknx, 'EL-T-O_TotalReactivePower', group_address_state='5/6/28', value_type="power")
xknx.devices.add(el_total_reactive_power)
el_reactive_power_l1 = Sensor(
xknx, 'EL-T-O_ReactivePower_L1', group_address_state='5/6/29', value_type="power")
xknx.devices.add(el_reactive_power_l1)
# ...
# Apparent Power
el_total_apparent_power = Sensor(
xknx, 'EL-T-O_TotalReactivePower', group_address_state='5/6/32', value_type="power")
xknx.devices.add(el_total_apparent_power)
el_apparent_power_l1 = Sensor(
xknx, 'EL-T-O_ApparentPower_L1', group_address_state='5/6/33', value_type="power")
xknx.devices.add(el_apparent_power_l1)
# ...
# Current
el_current_l1 = Sensor(
xknx, 'EL-T-O_Current_L1', group_address_state='5/6/45', value_type="electric_current")
xknx.devices.add(el_current_l1)
# ...
# Voltage
el_voltage_l1 = Sensor(
xknx, 'EL-T-O_Voltage_L1-N', group_address_state='5/6/48', value_type="electric_potential")
xknx.devices.add(el_voltage_l1)
# ...
el_total_apparent_power = Sensor(
xknx, 'EL-T-O_TotalReactivePower', group_address_state='5/6/32', value_type="power")
xknx.devices.add(el_total_apparent_power)
el_apparent_power_l1 = Sensor(
xknx, 'EL-T-O_ApparentPower_L1', group_address_state='5/6/33', value_type="power")
xknx.devices.add(el_apparent_power_l1)
# ...
# Current
el_current_l1 = Sensor(
xknx, 'EL-T-O_Current_L1', group_address_state='5/6/45', value_type="electric_current")
xknx.devices.add(el_current_l1)
# ...
# Voltage
el_voltage_l1 = Sensor(
xknx, 'EL-T-O_Voltage_L1-N', group_address_state='5/6/48', value_type="electric_potential")
xknx.devices.add(el_voltage_l1)
# ...
# Frequency
el_frequency = Sensor(
xknx, 'EL-T-O_Frequency', group_address_state='5/6/53', value_type="frequency")
xknx.devices.add(el_frequency)
mqttc.connect(BROKER_ADDRESS, 8883, 60)
mqttc.loop_start()
# Wait until Ctrl-C was pressed
await xknx.start(daemon_mode=True)
await xknx.stop()
xknx.devices.add(el_meter_reading_reactive_energy)
# Active Power
el_total_active_power = Sensor(
xknx, 'EL-T-O_TotalActivePower', group_address_state='5/6/24', value_type="power")
xknx.devices.add(el_total_active_power)
el_active_power_l1 = Sensor(
xknx, 'EL-T-O_ActivePower_L1', group_address_state='5/6/25', value_type="power")
xknx.devices.add(el_active_power_l1)
# ...
# Reactive Power
el_total_reactive_power = Sensor(
xknx, 'EL-T-O_TotalReactivePower', group_address_state='5/6/28', value_type="power")
xknx.devices.add(el_total_reactive_power)
el_reactive_power_l1 = Sensor(
xknx, 'EL-T-O_ReactivePower_L1', group_address_state='5/6/29', value_type="power")
xknx.devices.add(el_reactive_power_l1)
# ...
# Apparent Power
el_total_apparent_power = Sensor(
xknx, 'EL-T-O_TotalReactivePower', group_address_state='5/6/32', value_type="power")
xknx.devices.add(el_total_apparent_power)
el_apparent_power_l1 = Sensor(
xknx, 'EL-T-O_ApparentPower_L1', group_address_state='5/6/33', value_type="power")
xknx.devices.add(el_apparent_power_l1)
# ...
# Current
el_current_l1 = Sensor(
xknx, 'EL-T-O_Current_L1', group_address_state='5/6/45', value_type="electric_current")
el_reactive_power_l1 = Sensor(
xknx, 'EL-T-O_ReactivePower_L1', group_address_state='5/6/29', value_type="power")
xknx.devices.add(el_reactive_power_l1)
# ...
# Apparent Power
el_total_apparent_power = Sensor(
xknx, 'EL-T-O_TotalReactivePower', group_address_state='5/6/32', value_type="power")
xknx.devices.add(el_total_apparent_power)
el_apparent_power_l1 = Sensor(
xknx, 'EL-T-O_ApparentPower_L1', group_address_state='5/6/33', value_type="power")
xknx.devices.add(el_apparent_power_l1)
# ...
# Current
el_current_l1 = Sensor(
xknx, 'EL-T-O_Current_L1', group_address_state='5/6/45', value_type="electric_current")
xknx.devices.add(el_current_l1)
# ...
# Voltage
el_voltage_l1 = Sensor(
xknx, 'EL-T-O_Voltage_L1-N', group_address_state='5/6/48', value_type="electric_potential")
xknx.devices.add(el_voltage_l1)
# ...
# Frequency
el_frequency = Sensor(
xknx, 'EL-T-O_Frequency', group_address_state='5/6/53', value_type="frequency")
xknx.devices.add(el_frequency)
mqttc.connect(BROKER_ADDRESS, 8883, 60)
async def main():
"""Connect to KNX/IP device and read the value of a temperature and a motion sensor."""
xknx = XKNX()
await xknx.start()
sensor1 = BinarySensor(
xknx,
'DiningRoom.Motion.Sensor',
group_address_state='6/0/2',
device_class='motion'
)
await sensor1.sync()
print(sensor1)
sensor2 = Sensor(
xknx,
'DiningRoom.Temperature.Sensor',
group_address_state='6/2/1',
value_type='temperature'
)
await sensor2.sync()
print(sensor2)
await xknx.stop()
def async_add_entities_config(hass, config, async_add_entities):
"""Set up sensor for KNX platform configured within platform."""
sensor = XknxSensor(
hass.data[DATA_XKNX].xknx,
name=config[CONF_NAME],
group_address_state=config[CONF_STATE_ADDRESS],
sync_state=config[CONF_SYNC_STATE],
value_type=config[CONF_TYPE],
)
hass.data[DATA_XKNX].xknx.devices.add(sensor)
async_add_entities([KNXSensor(sensor)])
# The KNX addresses to monitor are defined below, but is normally placed in an external
# file that is loaded in on start.
# Generic Types not specifically supported by XKNX
el_meter_reading_active_energy = Sensor(
xknx, 'EL-T-O_MeterReading_ActiveEnergy', group_address_state='5/6/11', value_type="DPT-13")
xknx.devices.add(el_meter_reading_active_energy)
el_meter_reading_reactive_energy = Sensor(
xknx, 'EL-T-O_MeterReading_ReactiveEnergy', group_address_state='5/6/16', value_type="DPT-13")
xknx.devices.add(el_meter_reading_reactive_energy)
# Active Power
el_total_active_power = Sensor(
xknx, 'EL-T-O_TotalActivePower', group_address_state='5/6/24', value_type="power")
xknx.devices.add(el_total_active_power)
el_active_power_l1 = Sensor(
xknx, 'EL-T-O_ActivePower_L1', group_address_state='5/6/25', value_type="power")
xknx.devices.add(el_active_power_l1)
# ...
# Reactive Power
el_total_reactive_power = Sensor(
xknx, 'EL-T-O_TotalReactivePower', group_address_state='5/6/28', value_type="power")
xknx.devices.add(el_total_reactive_power)
el_reactive_power_l1 = Sensor(
xknx, 'EL-T-O_ReactivePower_L1', group_address_state='5/6/29', value_type="power")
xknx.devices.add(el_reactive_power_l1)
# ...
# Apparent Power
el_total_apparent_power = Sensor(
xknx, 'EL-T-O_TotalReactivePower', group_address_state='5/6/32', value_type="power")
Then the XKNX Daemon will be started.
Then everything else happens in the device_updated-function above as it is triggered when we receive data.
"""
global mqttc
# Connect to KNX/IP device and listen if a switch was updated via KNX bus.
xknx = XKNX(device_updated_cb=device_updated_cb)
# The KNX addresses to monitor are defined below, but is normally placed in an external
# file that is loaded in on start.
# Generic Types not specifically supported by XKNX
el_meter_reading_active_energy = Sensor(
xknx, 'EL-T-O_MeterReading_ActiveEnergy', group_address_state='5/6/11', value_type="DPT-13")
xknx.devices.add(el_meter_reading_active_energy)
el_meter_reading_reactive_energy = Sensor(
xknx, 'EL-T-O_MeterReading_ReactiveEnergy', group_address_state='5/6/16', value_type="DPT-13")
xknx.devices.add(el_meter_reading_reactive_energy)
# Active Power
el_total_active_power = Sensor(
xknx, 'EL-T-O_TotalActivePower', group_address_state='5/6/24', value_type="power")
xknx.devices.add(el_total_active_power)
el_active_power_l1 = Sensor(
xknx, 'EL-T-O_ActivePower_L1', group_address_state='5/6/25', value_type="power")
xknx.devices.add(el_active_power_l1)
# ...
# Reactive Power
el_total_reactive_power = Sensor(
xknx, 'EL-T-O_TotalReactivePower', group_address_state='5/6/28', value_type="power")
xknx.devices.add(el_total_reactive_power)