Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def craft_response_of_type(response_type):
"""Generates an executor.Response from an icmp.Types
:param response_type: Type of response
:type response_type: Union[icmp.Type, tuple]
:return: The crafted response
:rtype: executor.Response"""
return executor.Response(executor.Message('', icmp.ICMP(response_type), '127.0.0.1'), 0.1)
:type packet_id: int
:param timeout: How long to listen for the specified packet, in seconds
:type timeout: float
:return: The response to the request with the specified packet_id
:rtype: Response"""
time_left = timeout
response = icmp.ICMP()
while time_left > 0:
# Keep listening until a packet arrives
raw_packet, source_socket, time_left = self.socket.receive(time_left)
# If we actually received something
if raw_packet != b'':
response.unpack(raw_packet)
# Ensure we have not unpacked the packet we sent (RHEL will also listen to outgoing packets)
if response.id == packet_id and response.message_type != icmp.Types.EchoRequest.type_id:
return Response(Message('', response, source_socket[0]), timeout-time_left)
return Response(None, timeout)