Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _legacy_sign_md5_sha1(self, M):
M = bytes_encode(M)
k = self._modulusLen // 8
EM = _legacy_pkcs1_v1_5_encode_md5_sha1(M, k)
if EM is None:
warning("Key._rsassa_pkcs1_v1_5_sign(): unable to encode")
return None
m = pkcs_os2ip(EM)
n = self._modulus
if isinstance(m, int) and six.PY2:
m = long(m) # noqa: F821
if (six.PY2 and not isinstance(m, long)) or m > n - 1: # noqa: F821
warning("Key._rsaep() expects a long between 0 and n-1")
return None
privExp = self.key.private_numbers().d
s = pow(m, privExp, n)
return pkcs_i2osp(s, k)
def create_packet(self, src_if, dst_if, do_dot1=True):
packet_sizes = [64, 512, 1518, 9018]
dst_host = random.choice(self.hosts_by_pg_idx[dst_if.sw_if_index])
src_host = random.choice(self.hosts_by_pg_idx[src_if.sw_if_index])
pkt_info = self.create_packet_info(src_if, dst_if)
payload = self.info_to_payload(pkt_info)
p = (Ether(dst=dst_host.mac, src=src_host.mac) /
IP(src=src_host.ip4, dst=dst_host.ip4) /
UDP(sport=1234, dport=1234) /
Raw(payload))
pkt_info.data = p.copy()
if do_dot1 and hasattr(src_if, 'sub_if'):
p = src_if.sub_if.add_dot1_layer(p)
size = random.choice(packet_sizes)
self.extend_packet(p, size)
return p
#
# set the directed broadcast on pg0 first, then config IP4 addresses
# for pg1 directed broadcast is always disabled
self.vapi.sw_interface_set_ip_directed_broadcast(
self.pg0.sw_if_index, 1)
p0 = (Ether(src=self.pg1.remote_mac,
dst=self.pg1.local_mac) /
IP(src="1.1.1.1",
dst=self.pg0._local_ip4_bcast) /
UDP(sport=1234, dport=1234) /
Raw(b'\xa5' * 2000))
p1 = (Ether(src=self.pg0.remote_mac,
dst=self.pg0.local_mac) /
IP(src="1.1.1.1",
dst=self.pg1._local_ip4_bcast) /
UDP(sport=1234, dport=1234) /
Raw(b'\xa5' * 2000))
self.pg0.config_ip4()
self.pg0.resolve_arp()
self.pg1.config_ip4()
self.pg1.resolve_arp()
#
# test packet is L2 broadcast
#
rx = self.send_and_expect(self.pg1, p0 * NUM_PKTS, self.pg0)
self.assertTrue(rx[0][Ether].dst, "ff:ff:ff:ff:ff:ff")
self.send_and_assert_no_replies(self.pg0, p1 * NUM_PKTS,
def modify_packet(self, src_if, packet_size, pkt):
"""Add load, set destination IP and extend packet to required packet
size for defined interface.
:param VppInterface src_if: Interface to create packet for.
:param int packet_size: Required packet size.
:param Scapy pkt: Packet to be modified.
"""
dst_if_idx = int(packet_size / 10 % 2)
dst_if = self.flows[src_if][dst_if_idx]
info = self.create_packet_info(src_if, dst_if)
payload = self.info_to_payload(info)
p = pkt/Raw(payload)
p[IP].dst = dst_if.remote_ip4
info.data = p.copy()
if isinstance(src_if, VppSubInterface):
p = src_if.add_dot1_layer(p)
self.extend_packet(p, packet_size)
return p
nh_table_id=1)])
route_to_src = VppIpRoute(self, "1.1.1.2", 32,
[VppRoutePath("0.0.0.0",
0xffffffff,
nh_table_id=2,
is_source_lookup=1)])
route_to_dst.add_vpp_config()
route_to_src.add_vpp_config()
#
# packets to these destination are dropped, since they'll
# hit the respective default routes in the second table
#
p_dst = (Ether(src=self.pg0.remote_mac,
dst=self.pg0.local_mac) /
IP(src="5.5.5.5", dst="1.1.1.1") /
TCP(sport=1234, dport=1234) /
Raw('\xa5' * 100))
p_src = (Ether(src=self.pg0.remote_mac,
dst=self.pg0.local_mac) /
IP(src="2.2.2.2", dst="1.1.1.2") /
TCP(sport=1234, dport=1234) /
Raw('\xa5' * 100))
pkts_dst = p_dst * 257
pkts_src = p_src * 257
self.send_and_assert_no_replies(self.pg0, pkts_dst,
"IP in dst table")
self.send_and_assert_no_replies(self.pg0, pkts_src,
"IP in src table")
#
def send_frames(payload_len, dump_size, dump_filename):
""" Send frames function
send the frames.
"""
for int_iterator in range(dump_size):
l2_header = Ether(src="00:11:22:33:44:55", dst="55:44:33:22:11")
l3_header = IP(src="192.168.1.1", dst="192.168.1.2", id=int_iterator)
payload = Raw(make_payload(
PAYLOAD_SEED,
payload_len.rand_payload_len(),
len(l3_header)))
pkt = l2_header / l3_header / payload
sendp(pkt, iface=dump_filename, verbose=False)
if int_iterator % 100 == 0:
print "... " + str(int_iterator) + " packets sent..."
ARP(op="who-has",
hwsrc=self.pg0.remote_mac,
pdst=self.pg1.local_ip4,
psrc=self.pg1.remote_ip4))
self.send_and_assert_no_replies(self.pg0, p,
"ARP req diff sub-net")
self.assertFalse(find_nbr(self,
self.pg0.sw_if_index,
self.pg1.remote_ip4))
#
# 2 - don't respond to ARP request from an address not within the
# interface's sub-net
# 2b - to a proxied address
# 2c - not within a different interface's sub-net
p = (Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) /
ARP(op="who-has",
hwsrc=self.pg0.remote_mac,
psrc="10.10.10.3",
pdst=self.pg0.local_ip4))
self.send_and_assert_no_replies(self.pg0, p,
"ARP req for non-local source")
p = (Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg2.remote_mac) /
ARP(op="who-has",
hwsrc=self.pg2.remote_mac,
psrc="10.10.10.3",
pdst=self.pg0.local_ip4))
self.send_and_assert_no_replies(
self.pg0, p,
"ARP req for non-local source - unnum")
p = (Ether(dst="ff:ff:ff:ff:ff:ff", src=self.pg0.remote_mac) /
ARP(op="who-has",
def create_tunnel_stream_4o4(self, src_if,
tunnel_src, tunnel_dst,
src_ip, dst_ip):
pkts = []
for i in range(0, 257):
info = self.create_packet_info(src_if, src_if)
payload = self.info_to_payload(info)
p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
IP(src=tunnel_src, dst=tunnel_dst) /
GRE() /
IP(src=src_ip, dst=dst_ip) /
UDP(sport=1234, dport=1234) /
Raw(payload))
info.data = p.copy()
pkts.append(p)
return pkts
def compare_rx_tx_packet_End_AD_L2_out(self, tx_pkt, rx_pkt):
""" Compare input and output packet after passing End.AD with L2
:param tx_pkt: transmitted packet
:param rx_pkt: received packet
"""
# get IPv4 header of rx'ed packet
rx_eth = rx_pkt.getlayer(Ether)
tx_ip = tx_pkt.getlayer(IPv6)
# we can't just get the 2nd Ether layer
# get the Raw content and dissect it as Ether
tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw]))
# verify if rx'ed packet has no SRH
self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
# the whole rx_eth pkt should be equal to tx_eth1
self.assertEqual(rx_eth, tx_eth1)
self.logger.debug("packet verification: SUCCESS")
# epg is not learnt, becasue the EPG is unknwon
self.assertEqual(len(self.vapi.gbp_endpoint_dump()), 1)
#
# Learn new EPs from IP packets
#
for ii, l in enumerate(learnt):
# a packet with an sclass from a knwon EPG
# arriving on an unknown TEP
p = (Ether(src=self.pg2.remote_mac,
dst=self.pg2.local_mac) /
IP(src=self.pg2.remote_hosts[1].ip4,
dst=self.pg2.local_ip4) /
UDP(sport=1234, dport=48879) /
VXLAN(vni=99, gpid=112, flags=0x88) /
Ether(src=l['mac'], dst=ep.mac) /
IP(src=l['ip'], dst=ep.ip4.address) /
UDP(sport=1234, dport=1234) /
Raw('\xa5' * 100))
rx = self.send_and_expect(self.pg2, [p], self.pg0)
# the new TEP
tep1_sw_if_index = find_vxlan_gbp_tunnel(
self,
self.pg2.local_ip4,
self.pg2.remote_hosts[1].ip4,
99)
self.assertNotEqual(INDEX_INVALID, tep1_sw_if_index)
#
# the EP is learnt via the learnt TEP