Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def send_disconnect_request(self):
yield from self.udpclient.connect(
self.own_ip,
(self.gateway_ip, self.gateway_port),
multicast=False)
(local_addr, local_port) = self.udpclient.getsockname()
knxipframe = KNXIPFrame()
knxipframe.init(KNXIPServiceType.DISCONNECT_REQUEST)
knxipframe.body.communication_channel_id = \
self.communication_channel_id
knxipframe.body.control_endpoint = HPAI(
ip_addr=local_addr, port=local_port)
knxipframe.normalize()
self.udpclient.send(knxipframe)
def send_ack(self, communication_channel_id, sequence_counter):
"""Send tunnelling ACK after tunnelling request received."""
ack_knxipframe = KNXIPFrame(self.xknx)
ack_knxipframe.init(KNXIPServiceType.TUNNELLING_ACK)
ack_knxipframe.body.communication_channel_id = communication_channel_id
ack_knxipframe.body.sequence_counter = sequence_counter
ack_knxipframe.normalize()
self.udp_client.send(ack_knxipframe)
def create_knxipframe(self):
"""Create KNX/IP Frame object to be sent to device."""
knxipframe = KNXIPFrame(self.xknx)
knxipframe.init(KNXIPServiceType.TUNNELLING_REQUEST)
knxipframe.body.communication_channel_id = self.communication_channel_id
knxipframe.body.cemi.telegram = self.telegram
knxipframe.body.cemi.src_addr = self.src_address
knxipframe.body.sequence_counter = self.sequence_counter
return knxipframe
def create_knxipframe(self):
"""Create KNX/IP Frame object to be sent to device."""
(local_addr, local_port) = self.udpclient.getsockname()
knxipframe = KNXIPFrame(self.xknx)
knxipframe.init(KNXIPServiceType.CONNECTIONSTATE_REQUEST)
knxipframe.body.communication_channel_id = \
self.communication_channel_id
knxipframe.body.control_endpoint = HPAI(
ip_addr=local_addr, port=local_port)
return knxipframe
def create_knxipframe(self):
"""Create KNX/IP Frame object to be sent to device."""
(local_addr, local_port) = self.udpclient.getsockname()
knxipframe = KNXIPFrame(self.xknx)
knxipframe.init(KNXIPServiceType.DISCONNECT_REQUEST)
knxipframe.body.communication_channel_id = \
self.communication_channel_id
knxipframe.body.control_endpoint = HPAI(
ip_addr=local_addr, port=local_port)
return knxipframe
"""Send a search request on a specific interface."""
self.xknx.logger.debug("Searching on %s / %s", interface, ip_addr)
udp_client = UDPClient(self.xknx,
(ip_addr, 0, interface),
(DEFAULT_MCAST_GRP, DEFAULT_MCAST_PORT),
multicast=True)
udp_client.register_callback(
self._response_rec_callback, [KNXIPServiceType.SEARCH_RESPONSE])
await udp_client.connect()
self._udp_clients.append(udp_client)
(local_addr, local_port) = udp_client.getsockname()
knx_ip_frame = KNXIPFrame(self.xknx)
knx_ip_frame.init(KNXIPServiceType.SEARCH_REQUEST)
knx_ip_frame.body.discovery_endpoint = \
HPAI(ip_addr=local_addr, port=local_port)
knx_ip_frame.normalize()
udp_client.send(knx_ip_frame)
def data_received_callback(self, raw):
if raw:
try:
knxipframe = KNXIPFrame()
knxipframe.from_knx(raw)
self.handle_knxipframe(knxipframe)
except CouldNotParseKNXIP as couldnotparseknxip:
print(couldnotparseknxip)
def data_received_callback(self, raw):
"""Parse and process KNXIP frame. Callback for having received an UDP packet."""
if raw:
try:
knxipframe = KNXIPFrame(self.xknx)
knxipframe.from_knx(raw)
self.xknx.knx_logger.debug("Received: %s", knxipframe)
self.handle_knxipframe(knxipframe)
except CouldNotParseKNXIP as couldnotparseknxip:
self.xknx.logger.exception(couldnotparseknxip)