Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def init(self, service_type_ident):
"""Init object by service_type_ident. Will instanciate a body object depending on service_type_ident."""
self.header.service_type_ident = service_type_ident
if service_type_ident == \
KNXIPServiceType.ROUTING_INDICATION:
self.body = CEMIFrame(self.xknx)
elif service_type_ident == \
KNXIPServiceType.CONNECT_REQUEST:
self.body = ConnectRequest(self.xknx)
elif service_type_ident == \
KNXIPServiceType.CONNECT_RESPONSE:
self.body = ConnectResponse(self.xknx)
elif service_type_ident == \
KNXIPServiceType.TUNNELLING_REQUEST:
self.body = TunnellingRequest(self.xknx)
elif service_type_ident == \
KNXIPServiceType.TUNNELLING_ACK:
self.body = TunnellingAck(self.xknx)
elif service_type_ident == \
KNXIPServiceType.SEARCH_REQUEST:
self.body = SearchRequest(self.xknx)
With an Connect Response the receiving party acknowledges the valid processing of the request.
"""
from xknx.exceptions import CouldNotParseKNXIP
from .body import KNXIPBody
from .error_code import ErrorCode
from .hpai import HPAI
from .knxip_enum import ConnectRequestType, KNXIPServiceType
class ConnectResponse(KNXIPBody):
"""Representation of a KNX Connect Response."""
# pylint: disable=too-many-instance-attributes
service_type = KNXIPServiceType.CONNECT_RESPONSE
CRD_LENGTH = 4
def __init__(self, xknx):
"""Initialize ConnectResponse class."""
super().__init__(xknx)
self.communication_channel = 0
self.status_code = ErrorCode.E_NO_ERROR
self.request_type = None
self.control_endpoint = HPAI()
self.identifier = None
def calculated_length(self):
"""Get length of KNX/IP body."""
return 2 + HPAI.LENGTH + \
Connect requests are used to transmit a KNX telegram within an existing KNX tunnel connection.
With an Tunnel ACK the receiving party acknowledges the valid processing of the request.
"""
from xknx.exceptions import CouldNotParseKNXIP
from .body import KNXIPBody
from .error_code import ErrorCode
from .knxip_enum import KNXIPServiceType
class TunnellingAck(KNXIPBody):
"""Representation of a KNX Connect Request."""
# pylint: disable=too-many-instance-attributes
service_type = KNXIPServiceType.TUNNELLING_ACK
BODY_LENGTH = 4
def __init__(self, xknx):
"""Initialize TunnellingAck object."""
super().__init__(xknx)
self.communication_channel_id = 1
self.sequence_counter = 0
self.status_code = ErrorCode.E_NO_ERROR
def calculated_length(self):
"""Get length of KNX/IP body."""
return TunnellingAck.BODY_LENGTH
def from_knx(self, raw):
"""Parse/deserialize from KNX/IP raw data."""
Connect requests are used to disconnect a tunnel from a KNX/IP device.
"""
from xknx.exceptions import CouldNotParseKNXIP
from .body import KNXIPBody
from .hpai import HPAI
from .knxip_enum import KNXIPServiceType
class DisconnectRequest(KNXIPBody):
"""Representation of a KNX Disconnect Request."""
# pylint: disable=too-many-instance-attributes
service_type = KNXIPServiceType.DISCONNECT_REQUEST
def __init__(self, xknx):
"""Initialize DisconnectRequest object."""
super().__init__(xknx)
self.communication_channel_id = 1
self.control_endpoint = HPAI()
def calculated_length(self):
"""Get length of KNX/IP body."""
return 2 + HPAI.LENGTH
def from_knx(self, raw):
"""Parse/deserialize from KNX/IP raw data."""
def info_from_knx(info):
"""Parse info bytes."""
Connectionstate requests are used to determine if a tunnel connection is still active and valid.
"""
from xknx.exceptions import CouldNotParseKNXIP
from .body import KNXIPBody
from .hpai import HPAI
from .knxip_enum import KNXIPServiceType
class ConnectionStateRequest(KNXIPBody):
"""Representation of a KNX Connection State Request."""
# pylint: disable=too-many-instance-attributes
service_type = KNXIPServiceType.CONNECTIONSTATE_REQUEST
def __init__(self, xknx):
"""Initialize ConnectionStateRequest object."""
super().__init__(xknx)
self.communication_channel_id = 1
self.control_endpoint = HPAI()
def calculated_length(self):
"""Get length of KNX/IP body."""
return 2 + HPAI.LENGTH
def from_knx(self, raw):
"""Parse/deserialize from KNX/IP raw data."""
def info_from_knx(info):
"""Parse info bytes."""
if len(info) < 2:
Disconnect requests are used to disconnect a tunnel from a KNX/IP device.
With a Disconnect Response the receiving party acknowledges the valid processing of the request.
"""
from xknx.exceptions import CouldNotParseKNXIP
from .body import KNXIPBody
from .error_code import ErrorCode
from .knxip_enum import KNXIPServiceType
class DisconnectResponse(KNXIPBody):
"""Representation of a KNX Disconnect Response."""
# pylint: disable=too-many-instance-attributes
service_type = KNXIPServiceType.DISCONNECT_RESPONSE
def __init__(self, xknx):
"""Initialize DisconnectResponse object."""
super().__init__(xknx)
self.communication_channel_id = 1
self.status_code = ErrorCode.E_NO_ERROR
def calculated_length(self):
"""Get length of KNX/IP body."""
return 2
def from_knx(self, raw):
"""Parse/deserialize from KNX/IP raw data."""
def info_from_knx(info):
"""Parse info bytes."""
Connectionstate requests are used to determine if a tunnel connection is still active and valid.
With a connectionstate response the receiving party acknowledges the valid processing of the request.
"""
from xknx.exceptions import CouldNotParseKNXIP
from .body import KNXIPBody
from .error_code import ErrorCode
from .knxip_enum import KNXIPServiceType
class ConnectionStateResponse(KNXIPBody):
"""Representation of a KNX Connection State Response."""
# pylint: disable=too-many-instance-attributes
service_type = KNXIPServiceType.CONNECTIONSTATE_RESPONSE
def __init__(self, xknx):
"""Initialize ConnectionStateResponse object."""
super().__init__(xknx)
self.communication_channel_id = 1
self.status_code = ErrorCode.E_NO_ERROR
def calculated_length(self):
"""Get length of KNX/IP body."""
return 2
def from_knx(self, raw):
"""Parse/deserialize from KNX/IP raw data."""
def info_from_knx(info):
"""Parse info bytes."""
if len(info) < 2:
KNXIPServiceType.TUNNELLING_ACK:
self.body = TunnellingAck(self.xknx)
elif service_type_ident == \
KNXIPServiceType.SEARCH_REQUEST:
self.body = SearchRequest(self.xknx)
elif service_type_ident == \
KNXIPServiceType.SEARCH_RESPONSE:
self.body = SearchResponse(self.xknx)
elif service_type_ident == \
KNXIPServiceType.DISCONNECT_REQUEST:
self.body = DisconnectRequest(self.xknx)
elif service_type_ident == \
KNXIPServiceType.DISCONNECT_RESPONSE:
self.body = DisconnectResponse(self.xknx)
elif service_type_ident == \
KNXIPServiceType.CONNECTIONSTATE_REQUEST:
self.body = ConnectionStateRequest(self.xknx)
elif service_type_ident == \
KNXIPServiceType.CONNECTIONSTATE_RESPONSE:
self.body = ConnectionStateResponse(self.xknx)
else:
raise TypeError(self.header.service_type_ident)
def __init__(self, xknx):
"""Initialize KNXIPHeader class."""
self.xknx = xknx
self.header_length = KNXIPHeader.HEADERLENGTH
self.protocol_version = KNXIPHeader.PROTOCOLVERSION
self.service_type_ident = KNXIPServiceType.ROUTING_INDICATION
self.b4_reserve = 0
self.total_length = 0 # to be set later
def handle_frame(self, knxipframe):
if knxipframe.header.service_type_ident == \
KNXIPServiceType.ROUTING_INDICATION:
self.handle_frame_routing_indication(knxipframe)
else:
print("SERVICE TYPE NOT IMPLEMENETED: ", knxipframe)