Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def ip_addr_to_bytes(ip_addr):
"""Serialize ip address to byte array."""
if not isinstance(ip_addr, str):
raise ConversionError("ip_addr is not a string")
for i in ip_addr.split("."):
yield int(i)
data = []
def from_knx(cls, raw):
"""Parse/deserialize from KNX/IP raw data."""
cls.test_bytesarray(raw, 1)
value = round((raw[0]/256)*100)
if not cls._test_boundaries(value):
raise ConversionError("Cant parse DPTScaling", value=value, raw=raw)
return value
def to_knx(cls, value):
"""Serialize to KNX/IP raw data."""
if not isinstance(value, (int)):
raise ConversionError("Cant serialize DPTSceneNumber", value=value)
if not cls._test_boundaries(value):
raise ConversionError("Cant serialize DPTSceneNumber", value=value)
return (value,)
def to_knx(cls, value):
"""Serialize to KNX/IP raw data."""
try:
knx_value = int(value)
if not cls._test_boundaries(knx_value):
raise ValueError
if knx_value < 0:
knx_value += 0x100
return (knx_value & 0xff,)
except ValueError:
raise ConversionError("Cant serialize %s" % cls.__name__, value=value)
def to_knx(cls, value):
"""Serialize to KNX/IP raw data."""
if value == HVACOperationMode.AUTO:
raise ConversionError("Cant serialize DPTControllerStatus", value=value)
if value == HVACOperationMode.COMFORT:
return (0x21,)
if value == HVACOperationMode.STANDBY:
return (0x22,)
if value == HVACOperationMode.NIGHT:
return (0x24,)
if value == HVACOperationMode.FROST_PROTECTION:
return (0x28,)
raise ConversionError("Could not parse DPTControllerStatus", value=value)
def to_knx(cls, value):
"""Serialize to KNX/IP raw data."""
try:
knx_value = int(value) - 1
if not cls._test_boundaries(knx_value + 1):
raise ValueError
return (knx_value,)
except ValueError:
raise ConversionError("Cant serialize %s" % cls.__name__, value=value)
def __init__(self,
xknx,
group_address=None,
group_address_state=None,
sync_state=True,
value_type='time',
device_name=None,
feature_name="DateTime",
after_update_cb=None):
"""Initialize RemoteValueSensor class."""
# pylint: disable=too-many-arguments
try:
self.dpt_class = DateTimeType[value_type.upper()].value
except KeyError:
raise ConversionError("invalid datetime value type",
value_type=value_type, device_name=device_name, feature_name=feature_name)
super().__init__(xknx,
group_address,
group_address_state,
sync_state=sync_state,
device_name=device_name,
feature_name=feature_name,
after_update_cb=after_update_cb)
def from_knx(cls, raw):
"""Parse/deserialize from KNX/IP raw data."""
cls.test_bytesarray(raw, 3)
day = raw[0] & 0x1F
month = raw[1] & 0x0F
year = raw[2] & 0x7F
if not DPTDate._test_range(day, month, year):
raise ConversionError("Cant parse DPTDate", raw=raw)
if year >= 90:
year += 1900
else:
year += 2000
return {
'day': day,
'month': month,
'year': year
}
def to_knx(cls, values):
"""Serialize to KNX/IP raw data from dict with elements weekday,hours,minutes,seconds."""
if not isinstance(values, dict):
raise ConversionError("Cant serialize DPTTime", values=values)
weekday = values.get('weekday', DPTWeekday.NONE).value
hours = values.get('hours', 0)
minutes = values.get('minutes', 0)
seconds = values.get('seconds', 0)
if not DPTTime._test_range(weekday, hours, minutes, seconds):
raise ConversionError("Cant serialize DPTTime", values=values)
return weekday << 5 | hours, minutes, seconds
def _knx_year(year):
if 2000 <= year < 2090:
return year-2000
if 1990 <= year < 2000:
return year-1900
raise ConversionError("Cant serialize DPTDate", year=year)