Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
:return: IPv4 packet.
:rtype: bytes
"""
ip_header = b"\x45" # Version | Header Length
ip_header += b"\x00" # "Differentiated Services Field"
ip_header += struct.pack(">H", IP_HEADER_LEN + len(payload)) # Length
ip_header += b"\x00\x01" # ID Field
ip_header += b"\x40\x00" # Flags, Fragment Offset
ip_header += b"\x40" # Time to live
ip_header += protocol
ip_header += b"\x00\x00" # Header checksum (fill in zeros in order to compute checksum)
ip_header += src_ip
ip_header += dst_ip
checksum = struct.pack(">H", helpers.ipv4_checksum(ip_header))
ip_header = ip_header[:10] + checksum + ip_header[12:]
return ip_header + payload
def test_udp_broadcast_client(self):
"""
Given: A SocketConnection 'udp' object with udp_broadcast set, and a UDP server.
When: Calling SocketConnection.open(), .send(), .recv(), and .close()
Then: send() returns length of payload.
and: Sent and received data is as expected.
"""
try:
broadcast_addr = six.next(get_local_non_loopback_ipv4_addresses_info())["broadcast"]
except StopIteration:
assert False, TEST_ERR_NO_NON_LOOPBACK_IPV4
data_to_send = helpers.str_to_bytes(
'"Never drink because you need it, for this is rational drinking, and the '
"way to death and hell. But drink because you do not need it, for this is "
'irrational drinking, and the ancient health of the world."'
)
# Given
server = MiniTestServer(proto="udp", host="")
server.bind()
t = threading.Thread(target=server.serve_once)
t.daemon = True
t.start()
# noinspection PyDeprecation
uut = SocketConnection(
host=broadcast_addr,
def css_class(self):
return helpers.test_step_info[self.type]["css_class"]
def log_fail(self, description="", indent_size=INDENT_SIZE):
# TODO: Why do some fail messages have a trailing whitespace?
fail_msg = (
"#"
+ str(self._total_index)
+ (4 * indent_size + 1 - len(str(self._total_index))) * " "
+ description.strip()
)
self._fail_storage.append([fail_msg, 1])
self._log_storage.append(helpers.format_log_msg(msg_type="fail", description=description, format_type="curses"))
self._event_crash = True
self._event_log = True
if isinstance(self._algorithm, six.string_types):
if self._algorithm == "crc32":
check = struct.pack(self._endian + "L", (zlib.crc32(data) & 0xFFFFFFFF))
elif self._algorithm == "crc32c":
check = struct.pack(self._endian + "L", crc32c.crc32(data))
elif self._algorithm == "adler32":
check = struct.pack(self._endian + "L", (zlib.adler32(data) & 0xFFFFFFFF))
elif self._algorithm == "ipv4":
check = struct.pack(self._endian + "H", helpers.ipv4_checksum(data))
elif self._algorithm == "udp":
return struct.pack(
self._endian + "H", helpers.udp_checksum(msg=data, src_addr=ipv4_src, dst_addr=ipv4_dst)
)
elif self._algorithm == "md5":
digest = hashlib.md5(data).digest()
# TODO: is this right?
if self._endian == ">":
(a, b, c, d) = struct.unpack("LLLL", a, b, c, d)
check = digest
elif self._algorithm == "sha1":
digest = hashlib.sha1(data).digest()
# TODO: is this right?
:rtype: bytes
:return: Checksum.
"""
if isinstance(self._algorithm, six.string_types):
if self._algorithm == "crc32":
check = struct.pack(self._endian + "L", (zlib.crc32(data) & 0xFFFFFFFF))
elif self._algorithm == "crc32c":
check = struct.pack(self._endian + "L", crc32c.crc32(data))
elif self._algorithm == "adler32":
check = struct.pack(self._endian + "L", (zlib.adler32(data) & 0xFFFFFFFF))
elif self._algorithm == "ipv4":
check = struct.pack(self._endian + "H", helpers.ipv4_checksum(data))
elif self._algorithm == "udp":
return struct.pack(
self._endian + "H", helpers.udp_checksum(msg=data, src_addr=ipv4_src, dst_addr=ipv4_dst)
)
elif self._algorithm == "md5":
digest = hashlib.md5(data).digest()
# TODO: is this right?
if self._endian == ">":
(a, b, c, d) = struct.unpack("LLLL", a, b, c, d)
check = digest
def render(self):
"""
Render this instance's value.
Returns:
bytes: Rendered value
"""
value = self._render(self._value)
self._rendered = value
return helpers.str_to_bytes(value)
def _render(self, value):
temp = self.render_int(
value, output_format=self.format, bit_width=self.width, endian=self.endian, signed=self.signed
)
return helpers.str_to_bytes(temp)