Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_blank_header_creation(self):
"""Verifies it generates the correct header (no checksum) given packet data"""
packet = icmp.ICMP(icmp.Types.EchoRequest, payload='random text goes here', identifier=16)
self.assertEqual(packet._header(), b'\x08\x00\x00\x00\x10\x00\x01\x00',
'Blank header creation failed (without checksum)')
packet = icmp.ICMP(icmp.Types.EchoReply, payload='foo', identifier=11)
self.assertEqual(packet._header(), b'\x00\x00\x00\x00\x0b\x00\x01\x00',
'Blank header creation failed (without checksum)')
def test_pack(self):
"""Verifies that creates the correct pack"""
self.assertEqual(
icmp.ICMP(icmp.Types.EchoReply, payload='banana', identifier=19700).packet,
b'\x00\x00\xcb\x8e\xf4L\x01\x00banana',
"Fail to pack ICMP structure to packet"
)
self.assertEqual(
icmp.ICMP(icmp.Types.EchoReply, payload='random text goes here', identifier=12436).packet,
b'\x00\x00h\xd1\x940\x01\x00random text goes here',
"Fail to pack ICMP structure to packet"
)
self.assertEqual(
icmp.ICMP(icmp.Types.EchoRequest, payload='random text goes here', identifier=18676).packet,
b'\x08\x00\x00\xb9\xf4H\x01\x00random text goes here',
"Fail to unpack ICMP structure to packet"
)
def send_ping(self, packet_id, sequence_number, payload):
"""Sends one ICMP Echo Request on the socket
:param packet_id: The ID to use for the packet
:type packet_id: int
:param sequence_number: The seuqnce number to use for the packet
:type sequence_number: int
:param payload: The payload of the ICMP message
:type payload: bytes"""
self.socket.send(icmp.ICMP(
icmp.Types.EchoRequest,
payload=payload,
identifier=packet_id, sequence_number=sequence_number).packet)
:param packet_id: The ID of the packet to listen for, the same for request and response
:type packet_id: int
:param timeout: How long to listen for the specified packet, in seconds
:type timeout: float
:return: The response to the request with the specified packet_id
:rtype: Response"""
time_left = timeout
response = icmp.ICMP()
while time_left > 0:
# Keep listening until a packet arrives
raw_packet, source_socket, time_left = self.socket.receive(time_left)
# If we actually received something
if raw_packet != b'':
response.unpack(raw_packet)
# Ensure we have not unpacked the packet we sent (RHEL will also listen to outgoing packets)
if response.id == packet_id and response.message_type != icmp.Types.EchoRequest.type_id:
return Response(Message('', response, source_socket[0]), timeout-time_left)
return Response(None, timeout)