Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def to_knx(cls, value):
"""Serialize to KNX/IP raw data."""
if value == HVACOperationMode.AUTO:
return (0,)
if value == HVACOperationMode.COMFORT:
return (1,)
if value == HVACOperationMode.STANDBY:
return (2,)
if value == HVACOperationMode.NIGHT:
return (3,)
if value == HVACOperationMode.FROST_PROTECTION:
return (4,)
raise ConversionError("Could not parse HVACOperationMode", value=value)
class DPTControllerStatus(DPTBase):
"""
Abstraction for KNX HVAC Controller status.
Non-standardised DP type (in accordance with KNX AN 097/07 rev 3).
Help needed:
The values of this type were retrieved by reverse engineering. Any
notes on the correct implementation of this type are highly appreciated.
"""
@classmethod
def from_knx(cls, raw):
"""Parse/deserialize from KNX/IP raw data."""
cls.test_bytesarray(raw, 1)
if raw[0] & 8 > 0:
return HVACOperationMode.FROST_PROTECTION
"""
# pylint: disable=too-few-public-methods
@staticmethod
def test_bytesarray(raw, length):
"""Test if array of raw bytes has the correct length and values of correct type."""
if not isinstance(raw, (tuple, list)) \
or len(raw) != length \
or any(not isinstance(byte, int) for byte in raw) \
or any(byte < 0 for byte in raw) \
or any(byte > 255 for byte in raw):
raise ConversionError("Invalid raw bytes", raw=raw)
class DPTBinary(DPTBase):
"""The DPTBinary is a base class for all datatypes encoded directly into the first Byte of the payload (mostly integer)."""
# pylint: disable=too-few-public-methods
# APCI (application layer control information)
APCI_BITMASK = 0x3F
APCI_MAX_VALUE = APCI_BITMASK
def __init__(self, value):
"""Initialize DPTBinary class."""
if not isinstance(value, int):
raise TypeError()
if value > DPTBinary.APCI_BITMASK:
raise ConversionError("Cant init DPTBinary", value=value)
self.value = value
"""Implementation of 3.17 Datapoint Types String."""
from xknx.exceptions import ConversionError
from .dpt import DPTBase
class DPTString(DPTBase):
"""
Abstraction for KNX 14 Octet ASCII String.
DPT 16.000
"""
payload_length = 14
unit = ""
@classmethod
def from_knx(cls, raw):
"""Parse/deserialize from KNX/IP raw data."""
cls.test_bytesarray(raw, cls.payload_length)
value = str()
for byte in raw:
if byte != 0x00:
"""
Implementation of KNX 4 byte Float-values.
They correspond to the the following KDN DPT 14 class.
"""
import struct
from xknx.exceptions import ConversionError
from .dpt import DPTBase
class DPT4ByteFloat(DPTBase):
"""
Abstraction for KNX 4 Octet Floating Point Numbers, with a maximum usable range as specified in IEEE 754.
The largest positive finite float literal is 3.40282347e+38f.
The smallest positive finite non-zero literal of type float is 1.40239846e-45f.
The negative minimum finite float literal is -3.40282347e+38f.
No value range are defined for DPTs 14.000-079.
DPT 14.***
"""
unit = ""
payload_length = 4
@classmethod
def from_knx(cls, raw):
raise TypeError()
if value > DPTBinary.APCI_BITMASK:
raise ConversionError("Cant init DPTBinary", value=value)
self.value = value
def __eq__(self, other):
"""Equal operator."""
return DPTComparator.compare(self, other)
def __str__(self):
"""Return object as readable string."""
return ''.format(self.value)
class DPTArray(DPTBase):
"""The DPTArray is a base class for all datatypes appended to the KNX telegram."""
# pylint: disable=too-few-public-methods
def __init__(self, value):
"""Initialize DPTArray class."""
if isinstance(value, int):
self.value = (value,)
elif isinstance(value, (list, bytes)):
self.value = tuple(value,)
elif isinstance(value, tuple):
self.value = value
else:
raise TypeError()
def __eq__(self, other):
"""Equal operator."""
"""Implementation of scaled KNX DPT_1_Ucount Values."""
from xknx.exceptions import ConversionError
from .dpt import DPTBase
class DPTScaling(DPTBase):
"""
Abstraction for KNX 1 Octet Percent.
DPT 5.001
"""
value_min = 0
value_max = 100
resolution = 100/255
unit = "%"
payload_length = 1
@classmethod
def from_knx(cls, raw):
"""Parse/deserialize from KNX/IP raw data."""
cls.test_bytesarray(raw, 1)
"""
Implementation of Basic KNX 2-Byte Signed Values.
They correspond the following KNX DPTs:
8.*** 2-byte/octet signed (2's complement), i.e. percentV16, delta time
"""
import struct
from xknx.exceptions import ConversionError
from .dpt import DPTBase
class DPT2ByteSigned(DPTBase):
"""
Abstraction for KNX 2 Byte signed values.
DPT 8.***
"""
value_min = -32768
value_max = 32767
unit = ""
resolution = 1
payload_length = 2
_struct_format = ">h"
@classmethod
def from_knx(cls, raw):
"""Implementation of Basic KNX Time."""
import time
from xknx.exceptions import ConversionError
from .dpt import DPTBase, DPTWeekday
class DPTTime(DPTBase):
"""
Abstraction for KNX 3 Octet Time.
DPT 10.001
"""
@classmethod
def from_knx(cls, raw):
"""Parse/deserialize from KNX/IP raw data."""
cls.test_bytesarray(raw, 3)
weekday = (raw[0] & 0xE0) >> 5
hours = raw[0] & 0x1F
minutes = raw[1] & 0x3F
seconds = raw[2] & 0x3F
"""Implementation of different KNX DPT HVAC Operation modes."""
from xknx.exceptions import ConversionError, CouldNotParseKNXIP
from .dpt import DPTBase
from .dpt_hvac_mode import HVACOperationMode
class DPTHVACContrMode(DPTBase):
"""
Abstraction for KNX HVAC controller mode.
DPT 20.105
"""
SUPPORTED_MODES = {
0: HVACOperationMode.AUTO,
1: HVACOperationMode.HEAT,
2: HVACOperationMode.MORNING_WARMUP,
3: HVACOperationMode.COOL,
4: HVACOperationMode.NIGHT_PURGE,
5: HVACOperationMode.PRECOOL,
6: HVACOperationMode.OFF,
7: HVACOperationMode.TEST,
8: HVACOperationMode.EMERGENCY_HEAT,
"""Implementation of Basic KNX 1-Byte signed integer values."""
from xknx.exceptions import ConversionError
from .dpt import DPTBase
class DPTSignedRelativeValue(DPTBase):
"""
Abstraction for KNX 1 Byte "1-octet Signed Relative Value".
DPT 6.***
"""
value_min = -128
value_max = 127
unit = ""
payload_length = 1
@classmethod
def from_knx(cls, raw):
"""Parse/deserialize from KNX/IP raw data."""
cls.test_bytesarray(raw, 1)
if raw[0] > cls.value_max: