Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
broadcast_data = DPTDateTime.current_datetime_as_knx()
await self.send(
self.group_address,
DPTArray(broadcast_data),
response=response)
elif self.broadcast_type == DateTimeBroadcastType.DATE:
broadcast_data = DPTDate.current_date_as_knx()
await self.send(
self.group_address,
DPTArray(broadcast_data),
response=response)
elif self.broadcast_type == DateTimeBroadcastType.TIME:
broadcast_data = DPTTime.current_time_as_knx()
await self.send(
self.group_address,
DPTArray(broadcast_data),
response=response)
def payload_valid(self, payload):
"""Test if telegram payload may be parsed."""
return (isinstance(payload, DPTArray)
and len(payload.value) == 1)
self.cmd = APCICommand(tpci_apci & 0xFFC0)
except ValueError:
raise CouldNotParseKNXIP(
"APCI not supported: {0:#012b}".format(tpci_apci & 0xFFC0))
apdu = cemi[10 + addil:]
if len(apdu) != self.mpdu_len:
raise CouldNotParseKNXIP(
"APDU LEN should be {} but is {}".format(
self.mpdu_len, len(apdu)))
if len(apdu) == 1:
apci = tpci_apci & DPTBinary.APCI_BITMASK
self.payload = DPTBinary(apci)
else:
self.payload = DPTArray(cemi[11 + addil:])
return 10 + addil + len(apdu)
def calculated_length(self):
"""Get length of KNX/IP body."""
if self.payload is None:
return 11
if isinstance(self.payload, DPTBinary):
return 11
if isinstance(self.payload, DPTArray):
return 11 + len(self.payload.value)
raise TypeError()
def payload_valid(self, payload):
"""Test if telegram payload may be parsed."""
return (isinstance(payload, DPTArray)
and len(payload.value) == 2)
def to_knx(self, value):
"""Convert value to payload."""
if not isinstance(value, (list, tuple)):
raise ConversionError("Cant serialize RemoteValueColorRGB (wrong type)",
value=value, type=type(value))
if len(value) != 3:
raise ConversionError("Cant serialize DPT 232.600 (wrong length)",
value=value, type=type(value))
if any(not isinstance(color, int) for color in value) \
or any(color < 0 for color in value) \
or any(color > 255 for color in value):
raise ConversionError("Cant serialize DPT 232.600 (wrong bytes)", value=value)
return DPTArray(list(value))
def payload_valid(self, payload):
"""Test if telegram payload may be parsed."""
return (isinstance(payload, DPTArray)
and len(payload.value) == 3)
async def set_operation_mode(self, operation_mode):
"""Set the operation mode of a thermostat. Send new operation_mode to BUS and update internal state."""
if not self.supports_operation_mode:
raise DeviceIllegalValue("operation mode not supported", operation_mode)
if self.group_address_operation_mode is not None:
await self.send(
self.group_address_operation_mode,
DPTArray(DPTHVACMode.to_knx(operation_mode)))
if self.group_address_operation_mode_protection is not None:
protection_mode = operation_mode == HVACOperationMode.FROST_PROTECTION
await self.send(
self.group_address_operation_mode_protection,
DPTBinary(protection_mode))
if self.group_address_operation_mode_night is not None:
night_mode = operation_mode == HVACOperationMode.NIGHT
await self.send(
self.group_address_operation_mode_night,
DPTBinary(night_mode))
if self.group_address_operation_mode_comfort is not None:
comfort_mode = operation_mode == HVACOperationMode.COMFORT
await self.send(
self.group_address_operation_mode_comfort,
DPTBinary(comfort_mode))
if self.group_address_controller_status is not None:
await self.send(
self.group_address_operation_mode_night,
DPTBinary(night_mode))
if self.group_address_operation_mode_comfort is not None:
comfort_mode = operation_mode == HVACOperationMode.COMFORT
await self.send(
self.group_address_operation_mode_comfort,
DPTBinary(comfort_mode))
if self.group_address_controller_status is not None:
await self.send(
self.group_address_controller_status,
DPTArray(DPTControllerStatus.to_knx(operation_mode)))
if self.group_address_controller_mode is not None:
await self.send(
self.group_address_controller_mode,
DPTArray(DPTHVACContrMode.to_knx(operation_mode)))
await self._set_internal_operation_mode(operation_mode)
def payload_valid(self, payload):
"""Test if telegram payload may be parsed."""
return (isinstance(payload, DPTArray)
and len(payload.value) == self.dpt_class.payload_length)