Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self,
xknx,
group_address=None,
group_address_state=None,
sync_state=True,
device_name=None,
after_update_cb=None):
"""Initialize RemoteValue class."""
# pylint: disable=too-many-arguments
self.xknx = xknx
if isinstance(group_address, (str, int)):
group_address = GroupAddress(group_address)
if isinstance(group_address_state, (str, int)):
group_address_state = GroupAddress(group_address_state)
self.group_address = group_address
self.group_address_state = group_address_state
self.sync_state = sync_state
self.device_name = "Unknown" \
if device_name is None else device_name
self.after_update_cb = after_update_cb
self.payload = None
async def main():
"""Connect and read value from KNX bus."""
xknx = XKNX()
await xknx.start()
value_reader = ValueReader(xknx, GroupAddress('2/0/8'))
telegram = await value_reader.read()
if telegram is not None:
print(telegram)
value_reader = ValueReader(xknx, GroupAddress('2/1/8'))
telegram = await value_reader.read()
if telegram is not None:
print(telegram)
await xknx.stop()
def from_knx_data_link_layer(self, cemi):
"""Parse L_DATA_IND, CEMIMessageCode.L_Data_REQ, CEMIMessageCode.L_DATA_CON."""
if len(cemi) < 11:
raise CouldNotParseKNXIP("CEMI too small")
# AddIL (Additional Info Length), as specified within
# KNX Chapter 3.6.3/4.1.4.3 "Additional information."
# Additional information is not yet parsed.
addil = cemi[1]
self.flags = cemi[2 + addil] * 256 + cemi[3 + addil]
self.src_addr = PhysicalAddress((cemi[4 + addil], cemi[5 + addil]))
if self.flags & CEMIFlags.DESTINATION_GROUP_ADDRESS:
self.dst_addr = GroupAddress((cemi[6 + addil], cemi[7 + addil]),
levels=self.xknx.address_format)
else:
self.dst_addr = PhysicalAddress((cemi[6 + addil], cemi[7 + addil]))
self.mpdu_len = cemi[8 + addil]
# TPCI (transport layer control information) -> First 14 bit
# APCI (application layer control information) -> Last 10 bit
tpci_apci = cemi[9 + addil] * 256 + cemi[10 + addil]
try:
self.cmd = APCICommand(tpci_apci & 0xFFC0)
except ValueError:
raise CouldNotParseKNXIP(
"APCI not supported: {0:#012b}".format(tpci_apci & 0xFFC0))
def __init__(self,
xknx,
group_address=None,
group_address_state=None,
sync_state=True,
device_name=None,
after_update_cb=None):
"""Initialize RemoteValue class."""
# pylint: disable=too-many-arguments
self.xknx = xknx
if isinstance(group_address, (str, int)):
group_address = GroupAddress(group_address)
if isinstance(group_address_state, (str, int)):
group_address_state = GroupAddress(group_address_state)
self.group_address = group_address
self.group_address_state = group_address_state
self.sync_state = sync_state
self.device_name = "Unknown" \
if device_name is None else device_name
self.after_update_cb = after_update_cb
self.payload = None
def __init__(self,
xknx,
name,
broadcast_type=DateTimeBroadcastType.TIME,
group_address=None,
device_updated_cb=None):
"""Initialize DateTime class."""
super().__init__(xknx, name, device_updated_cb)
self.broadcast_type = broadcast_type
if isinstance(group_address, (str, int)):
group_address = GroupAddress(group_address)
self.group_address = group_address
async def service_send_to_knx_bus(self, call):
"""Service for sending an arbitrary KNX message to the KNX bus."""
attr_payload = call.data.get(SERVICE_XKNX_ATTR_PAYLOAD)
attr_address = call.data.get(SERVICE_XKNX_ATTR_ADDRESS)
def calculate_payload(attr_payload):
"""Calculate payload depending on type of attribute."""
if isinstance(attr_payload, int):
return DPTBinary(attr_payload)
return DPTArray(attr_payload)
payload = calculate_payload(attr_payload)
address = GroupAddress(attr_address)
telegram = Telegram()
telegram.payload = payload
telegram.group_address = address
await self.xknx.telegrams.put(telegram)
group_address_setpoint_shift_state=None,
setpoint_shift_step=DEFAULT_SETPOINT_SHIFT_STEP,
setpoint_shift_max=DEFAULT_SETPOINT_SHIFT_MAX,
setpoint_shift_min=DEFAULT_SETPOINT_SHIFT_MIN,
group_address_on_off=None,
group_address_on_off_state=None,
on_off_invert=False,
min_temp=None,
max_temp=None,
mode=None,
device_updated_cb=None):
"""Initialize Climate class."""
# pylint: disable=too-many-arguments, too-many-locals, too-many-branches, too-many-statements
super().__init__(xknx, name, device_updated_cb)
if isinstance(group_address_on_off, (str, int)):
group_address_on_off = GroupAddress(group_address_on_off)
if isinstance(group_address_on_off_state, (str, int)):
group_address_on_off_state = GroupAddress(group_address_on_off_state)
self.group_address_on_off = group_address_on_off
self.group_address_on_off_state = group_address_on_off_state
self.min_temp = min_temp
self.max_temp = max_temp
self.temperature = RemoteValueTemp(
xknx,
group_address_state=group_address_temperature,
device_name=self.name,
after_update_cb=self.after_update)
self.target_temperature = RemoteValueTemp(
def readable(self):
"""Evaluate if remote value should be read from bus."""
return self.sync_state and isinstance(self.group_address_state, GroupAddress)