Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def ip_addr_to_bytes(ip_addr):
"""Serialize ip address to byte array."""
if not isinstance(ip_addr, str):
raise ConversionError("ip_addr is not a string")
for i in ip_addr.split("."):
yield int(i)
data = []
def init(self, service_type_ident):
"""Init object by service_type_ident. Will instanciate a body object depending on service_type_ident."""
self.header.service_type_ident = service_type_ident
if service_type_ident == \
KNXIPServiceType.ROUTING_INDICATION:
self.body = CEMIFrame(self.xknx)
elif service_type_ident == \
KNXIPServiceType.CONNECT_REQUEST:
self.body = ConnectRequest(self.xknx)
elif service_type_ident == \
KNXIPServiceType.CONNECT_RESPONSE:
self.body = ConnectResponse(self.xknx)
elif service_type_ident == \
KNXIPServiceType.TUNNELLING_REQUEST:
self.body = TunnellingRequest(self.xknx)
elif service_type_ident == \
KNXIPServiceType.TUNNELLING_ACK:
self.body = TunnellingAck(self.xknx)
elif service_type_ident == \
KNXIPServiceType.SEARCH_REQUEST:
self.body = SearchRequest(self.xknx)
def send_disconnect_request(self):
yield from self.udpclient.connect(
self.own_ip,
(self.gateway_ip, self.gateway_port),
multicast=False)
(local_addr, local_port) = self.udpclient.getsockname()
knxipframe = KNXIPFrame()
knxipframe.init(KNXIPServiceType.DISCONNECT_REQUEST)
knxipframe.body.communication_channel_id = \
self.communication_channel_id
knxipframe.body.control_endpoint = HPAI(
ip_addr=local_addr, port=local_port)
knxipframe.normalize()
self.udpclient.send(knxipframe)
def create_knxipframe(self):
"""Create KNX/IP Frame object to be sent to device."""
(local_addr, local_port) = self.udpclient.getsockname()
knxipframe = KNXIPFrame(self.xknx)
knxipframe.init(KNXIPServiceType.DISCONNECT_REQUEST)
knxipframe.body.communication_channel_id = \
self.communication_channel_id
knxipframe.body.control_endpoint = HPAI(
ip_addr=local_addr, port=local_port)
return knxipframe
async def send_telegram(self, telegram):
"""Send Telegram to routing connected device."""
knxipframe = KNXIPFrame(self.xknx)
knxipframe.init(KNXIPServiceType.ROUTING_INDICATION)
knxipframe.body.src_addr = self.xknx.own_address
knxipframe.body.telegram = telegram
knxipframe.body.sender = self.xknx.own_address
knxipframe.normalize()
await self.send_knxipframe(knxipframe)
def create_knxipframe(self):
"""Create KNX/IP Frame object to be sent to device."""
(local_addr, local_port) = self.udp_client.getsockname()
knxipframe = KNXIPFrame(self.xknx)
knxipframe.init(KNXIPServiceType.CONNECT_REQUEST)
knxipframe.body.request_type = ConnectRequestType.TUNNEL_CONNECTION
# set control_endpoint and data_endpoint to the same udp_connection
knxipframe.body.control_endpoint = HPAI(
ip_addr=local_addr, port=local_port)
knxipframe.body.data_endpoint = HPAI(
ip_addr=local_addr, port=local_port)
return knxipframe
def send_ack(self, communication_channel_id, sequence_counter):
"""Send tunnelling ACK after tunnelling request received."""
ack_knxipframe = KNXIPFrame(self.xknx)
ack_knxipframe.init(KNXIPServiceType.TUNNELLING_ACK)
ack_knxipframe.body.communication_channel_id = communication_channel_id
ack_knxipframe.body.sequence_counter = sequence_counter
ack_knxipframe.normalize()
self.udp_client.send(ack_knxipframe)
def create_knxipframe(self):
"""Create KNX/IP Frame object to be sent to device."""
(local_addr, local_port) = self.udpclient.getsockname()
knxipframe = KNXIPFrame(self.xknx)
knxipframe.init(KNXIPServiceType.CONNECTIONSTATE_REQUEST)
knxipframe.body.communication_channel_id = \
self.communication_channel_id
knxipframe.body.control_endpoint = HPAI(
ip_addr=local_addr, port=local_port)
return knxipframe
async def _search_interface(self, interface, ip_addr):
"""Send a search request on a specific interface."""
self.xknx.logger.debug("Searching on %s / %s", interface, ip_addr)
udp_client = UDPClient(self.xknx,
(ip_addr, 0, interface),
(DEFAULT_MCAST_GRP, DEFAULT_MCAST_PORT),
multicast=True)
udp_client.register_callback(
self._response_rec_callback, [KNXIPServiceType.SEARCH_RESPONSE])
await udp_client.connect()
self._udp_clients.append(udp_client)
(local_addr, local_port) = udp_client.getsockname()
knx_ip_frame = KNXIPFrame(self.xknx)
knx_ip_frame.init(KNXIPServiceType.SEARCH_REQUEST)
knx_ip_frame.body.discovery_endpoint = \
HPAI(ip_addr=local_addr, port=local_port)
knx_ip_frame.normalize()
udp_client.send(knx_ip_frame)
def send_disconnect_request(self):
yield from self.udpclient.connect(
self.own_ip,
(self.gateway_ip, self.gateway_port),
multicast=False)
(local_addr, local_port) = self.udpclient.getsockname()
knxipframe = KNXIPFrame()
knxipframe.init(KNXIPServiceType.DISCONNECT_REQUEST)
knxipframe.body.communication_channel_id = \
self.communication_channel_id
knxipframe.body.control_endpoint = HPAI(
ip_addr=local_addr, port=local_port)
knxipframe.normalize()
self.udpclient.send(knxipframe)