Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def from_knx(self, raw):
"""Parse/deserialize from KNX/IP raw data."""
if len(raw) < DIBDeviceInformation.LENGTH:
raise CouldNotParseKNXIP("wrong connection header length")
if raw[0] != DIBDeviceInformation.LENGTH:
raise CouldNotParseKNXIP("wrong connection header length")
if DIBTypeCode(raw[1]) != DIBTypeCode.DEVICE_INFO:
raise CouldNotParseKNXIP("DIB is no device info")
self.knx_medium = KNXMedium(raw[2])
# last bit of device_status. All other bits are unused
self.programming_mode = bool(raw[3])
self.individual_address = \
PhysicalAddress((raw[4], raw[5]))
installation_project_identifier = raw[6]*256+raw[7]
self.project_number = installation_project_identifier >> 4
self.installation_number = installation_project_identifier & 15
self.serial_number = ":".join('%02x' % i for i in raw[8:14])
self.multicast_address = ".".join('%i' % i for i in raw[14:18])
self.mac_address = ":".join('%02x' % i for i in raw[18:24])
self.name = "".join(map(chr, raw[24:54])).rstrip('\0')
return DIBDeviceInformation.LENGTH
def cri_from_knx(cri):
"""Parse CRI (Connection Request Information)."""
if cri[0] != ConnectRequest.CRI_LENGTH:
raise CouldNotParseKNXIP("CRI has wrong length")
if len(cri) < ConnectRequest.CRI_LENGTH:
raise CouldNotParseKNXIP("CRI data has wrong length")
self.request_type = ConnectRequestType(cri[1])
self.flags = cri[2]
return 4
def from_knx(cls, raw):
"""Parse/deserialize from KNX/IP raw data."""
cls.test_bytesarray(raw, 1)
if raw[0] & 8 > 0:
return HVACOperationMode.FROST_PROTECTION
if raw[0] & 4 > 0:
return HVACOperationMode.NIGHT
if raw[0] & 2 > 0:
return HVACOperationMode.STANDBY
if raw[0] & 1 > 0:
return HVACOperationMode.COMFORT
raise CouldNotParseKNXIP("Could not parse HVACOperationMode")
def crd_from_knx(crd):
"""Parse CRD (Connection Response Data Block)."""
if crd[0] != ConnectResponse.CRD_LENGTH:
raise CouldNotParseKNXIP("CRD has wrong length")
if len(crd) < ConnectResponse.CRD_LENGTH:
raise CouldNotParseKNXIP("CRD data has wrong length")
self.request_type = ConnectRequestType(crd[1])
self.identifier = crd[2]*256+crd[3]
return 4
def info_from_knx(info):
"""Parse info bytes."""
if len(info) < 2:
raise CouldNotParseKNXIP("Disconnect info has wrong length")
self.communication_channel_id = info[0]
self.status_code = ErrorCode(info[1])
return 2
pos = info_from_knx(raw)
def info_from_knx(info):
"""Parse info bytes."""
if len(info) < 2:
raise CouldNotParseKNXIP("info has wrong length")
self.communication_channel_id = info[0]
self.status_code = ErrorCode(info[1])
return 2
pos = info_from_knx(raw)
def from_knx(self, raw):
"""Parse/deserialize from KNX/IP raw data."""
if len(raw) < 2:
raise CouldNotParseKNXIP("DIB header too small")
length = raw[0]
if len(raw) < length:
raise CouldNotParseKNXIP("DIB wrong size")
if DIBTypeCode(raw[1]) != DIBTypeCode.SUPP_SVC_FAMILIES:
raise CouldNotParseKNXIP("DIB is no device info")
for i in range(0, int((length-2)/2)):
name = DIBServiceFamily(raw[i*2+2])
version = raw[i*2+3]
self.families.append(DIBSuppSVCFamilies.Family(name, version))
return length
self.mpdu_len = cemi[8 + addil]
# TPCI (transport layer control information) -> First 14 bit
# APCI (application layer control information) -> Last 10 bit
tpci_apci = cemi[9 + addil] * 256 + cemi[10 + addil]
try:
self.cmd = APCICommand(tpci_apci & 0xFFC0)
except ValueError:
raise CouldNotParseKNXIP(
"APCI not supported: {0:#012b}".format(tpci_apci & 0xFFC0))
apdu = cemi[10 + addil:]
if len(apdu) != self.mpdu_len:
raise CouldNotParseKNXIP(
"APDU LEN should be {} but is {}".format(
self.mpdu_len, len(apdu)))
if len(apdu) == 1:
apci = tpci_apci & DPTBinary.APCI_BITMASK
self.payload = DPTBinary(apci)
else:
self.payload = DPTArray(cemi[11 + addil:])
return 10 + addil + len(apdu)
def header_from_knx(header):
"""Parse header bytes."""
if header[0] != TunnellingRequest.HEADER_LENGTH:
raise CouldNotParseKNXIP("connection header wrong length")
if len(header) < TunnellingRequest.HEADER_LENGTH:
raise CouldNotParseKNXIP("connection header wrong length")
self.communication_channel_id = header[1]
self.sequence_counter = header[2]
return 4
pos = header_from_knx(raw)
def from_knx(self, raw):
"""Parse/deserialize from KNX/IP raw data."""
if len(raw) < 2:
raise CouldNotParseKNXIP("DIB header too small")
length = raw[0]
if len(raw) < length:
raise CouldNotParseKNXIP("DIB wrong size")
if DIBTypeCode(raw[1]) != DIBTypeCode.SUPP_SVC_FAMILIES:
raise CouldNotParseKNXIP("DIB is no device info")
for i in range(0, int((length-2)/2)):
name = DIBServiceFamily(raw[i*2+2])
version = raw[i*2+3]
self.families.append(DIBSuppSVCFamilies.Family(name, version))
return length