Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def send_disconnect_request(self):
yield from self.udpclient.connect(
self.own_ip,
(self.gateway_ip, self.gateway_port),
multicast=False)
(local_addr, local_port) = self.udpclient.getsockname()
knxipframe = KNXIPFrame()
knxipframe.init(KNXIPServiceType.DISCONNECT_REQUEST)
knxipframe.body.communication_channel_id = \
self.communication_channel_id
knxipframe.body.control_endpoint = HPAI(
ip_addr=local_addr, port=local_port)
knxipframe.normalize()
self.udpclient.send(knxipframe)
def create_knxipframe(self):
"""Create KNX/IP Frame object to be sent to device."""
(local_addr, local_port) = self.udpclient.getsockname()
knxipframe = KNXIPFrame(self.xknx)
knxipframe.init(KNXIPServiceType.DISCONNECT_REQUEST)
knxipframe.body.communication_channel_id = \
self.communication_channel_id
knxipframe.body.control_endpoint = HPAI(
ip_addr=local_addr, port=local_port)
return knxipframe
async def send_telegram(self, telegram):
"""Send Telegram to routing connected device."""
knxipframe = KNXIPFrame(self.xknx)
knxipframe.init(KNXIPServiceType.ROUTING_INDICATION)
knxipframe.body.src_addr = self.xknx.own_address
knxipframe.body.telegram = telegram
knxipframe.body.sender = self.xknx.own_address
knxipframe.normalize()
await self.send_knxipframe(knxipframe)
def create_knxipframe(self):
"""Create KNX/IP Frame object to be sent to device."""
(local_addr, local_port) = self.udp_client.getsockname()
knxipframe = KNXIPFrame(self.xknx)
knxipframe.init(KNXIPServiceType.CONNECT_REQUEST)
knxipframe.body.request_type = ConnectRequestType.TUNNEL_CONNECTION
# set control_endpoint and data_endpoint to the same udp_connection
knxipframe.body.control_endpoint = HPAI(
ip_addr=local_addr, port=local_port)
knxipframe.body.data_endpoint = HPAI(
ip_addr=local_addr, port=local_port)
return knxipframe
def send_ack(self, communication_channel_id, sequence_counter):
"""Send tunnelling ACK after tunnelling request received."""
ack_knxipframe = KNXIPFrame(self.xknx)
ack_knxipframe.init(KNXIPServiceType.TUNNELLING_ACK)
ack_knxipframe.body.communication_channel_id = communication_channel_id
ack_knxipframe.body.sequence_counter = sequence_counter
ack_knxipframe.normalize()
self.udp_client.send(ack_knxipframe)
def create_knxipframe(self):
"""Create KNX/IP Frame object to be sent to device."""
(local_addr, local_port) = self.udpclient.getsockname()
knxipframe = KNXIPFrame(self.xknx)
knxipframe.init(KNXIPServiceType.CONNECTIONSTATE_REQUEST)
knxipframe.body.communication_channel_id = \
self.communication_channel_id
knxipframe.body.control_endpoint = HPAI(
ip_addr=local_addr, port=local_port)
return knxipframe
async def _search_interface(self, interface, ip_addr):
"""Send a search request on a specific interface."""
self.xknx.logger.debug("Searching on %s / %s", interface, ip_addr)
udp_client = UDPClient(self.xknx,
(ip_addr, 0, interface),
(DEFAULT_MCAST_GRP, DEFAULT_MCAST_PORT),
multicast=True)
udp_client.register_callback(
self._response_rec_callback, [KNXIPServiceType.SEARCH_RESPONSE])
await udp_client.connect()
self._udp_clients.append(udp_client)
(local_addr, local_port) = udp_client.getsockname()
knx_ip_frame = KNXIPFrame(self.xknx)
knx_ip_frame.init(KNXIPServiceType.SEARCH_REQUEST)
knx_ip_frame.body.discovery_endpoint = \
HPAI(ip_addr=local_addr, port=local_port)
knx_ip_frame.normalize()
udp_client.send(knx_ip_frame)
def __init__(self, xknx, own_ip, gateway_ip, gateway_port,
communication_channel_id, timeout_in_seconds=1):
self.xknx = xknx
self.own_ip = own_ip
self.gateway_ip = gateway_ip
self.gateway_port = gateway_port
self.communication_channel_id = communication_channel_id
self.response_recieved_or_timeout = asyncio.Event()
self.success = False
self.udpclient = UDPClient(self.xknx)
self.udpclient.register_callback(
self.response_rec_callback, [KNXIPServiceType.DISCONNECT_RESPONSE])
self.timeout_in_seconds = timeout_in_seconds
self.timeout_callback = None
self.timeout_handle = None
def __init__(self, xknx, telegram_received_callback, local_ip, bind_to_multicast_addr):
"""Initialize Routing class."""
self.xknx = xknx
self.telegram_received_callback = telegram_received_callback
self.local_ip = local_ip
self.udpclient = UDPClient(self.xknx,
(local_ip, 0),
(DEFAULT_MCAST_GRP, DEFAULT_MCAST_PORT),
multicast=True,
bind_to_multicast_addr=bind_to_multicast_addr)
self.udpclient.register_callback(
self.response_rec_callback,
[KNXIPServiceType.ROUTING_INDICATION])