Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def ipv6_connectivity_test(self, peer):
# if LOGGER.isEnabledFor(logging.DEBUG):
# LOGGER.debug('ipv6_connectivity_test,dst:%s try_time:%d' % (hexlify(peer['id']), peer['try_time']))
try_time = peer['try_time'] + 1
if try_time > 3:
self.untrusted_peer_list.remove(peer)
if LOGGER.isEnabledFor(logging.DEBUG):
LOGGER.debug('ipv6_connectivity_test fail for %s' % hexlify(peer['id']))
return
if not self.qualified:
return
peer['try_time'] = try_time
ip6_pkt = dpkt.ip6.IP6(blank_echo_packet)
ip6_pkt.icmp6.echo.id = peer['nonce']
ip6_pkt.icmp6.sum = 0
ip6_pkt.src = self.teredo_ip
ip6_pkt.dst = peer['id']
self.teredo_sock.sendto(str(ip6_pkt), (self.server_ip, teredo_port))
def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
self.data = self.ip6 = ip6.IP6(self.data)
def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
if self.family == 2:
self.data = ip.IP(self.data)
elif self.family == 0x02000000:
self.family = 2
self.data = ip.IP(self.data)
elif self.family in (24, 28, 30):
self.data = ip6.IP6(self.data)
elif self.family > 1500:
self.data = ethernet.Ethernet(self.data)
assert rc == 0 or rc == win32file.WSA_IO_PENDING
bytes_recvd = yield
p = buf[:bytes_recvd]
p = netmsg_to_local(p)
p = unpack_header(p)
if p:
if verbose:
logger.debug('tunnel send: ')
if (ord(p[0])&0xf0) == 0x40:
logger.debug(pprint.pformat(IP(p)))
elif (ord(p[0])&0xf0)==0x60:
logger.debug(pprint.pformat(IP6(p)))
else:
logger.warning('Unknown layer 3 protocol')
win32file.WriteFile(handle, p, self.overlapped_tx)
yield
def test_ip6_extension_headers():
p = (b'\x60\x00\x00\x00\x00\x3c\x2b\x40\x20\x48\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
b'\xde\xca\x20\x47\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xca\xfe\x06\x04\x00\x02'
b'\x00\x00\x00\x00\x20\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca\x20\x22'
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca\x00\x14\x00\x50\x00\x00\x00\x00'
b'\x00\x00\x00\x00\x50\x02\x20\x00\x91\x7f\x00\x00')
_ip = IP6(p)
o = (b'\x3b\x04\x01\x02\x00\x00\xc9\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
b'\x00\x00\x01\x00\xc2\x04\x00\x00\x00\x00\x05\x02\x00\x00\x01\x02\x00\x00')
_ip.extension_hdrs[0] = IP6HopOptsHeader(o)
fh = b'\x06\xee\xff\xfb\x00\x00\xff\xff'
_ip.extension_hdrs[44] = IP6FragmentHeader(fh)
ah = b'\x3b\x04\x00\x00\x02\x02\x02\x02\x01\x01\x01\x01\x78\x78\x78\x78\x78\x78\x78\x78'
_ip.extension_hdrs[51] = IP6AHHeader(ah)
do = b'\x3b\x02\x01\x02\x00\x00\xc9\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
_ip.extension_hdrs[60] = IP6DstOptsHeader(do)
assert len(_ip.extension_hdrs) == 5
def test_ipg():
s = (b'\x60\x00\x00\x00\x00\x28\x06\x40\xfe\x80\x00\x00\x00\x00\x00\x00\x02\x11\x24\xff\xfe\x8c'
b'\x11\xde\xfe\x80\x00\x00\x00\x00\x00\x00\x02\xb0\xd0\xff\xfe\xe1\x80\x72\xcd\xca\x00\x16'
b'\x04\x84\x46\xd5\x00\x00\x00\x00\xa0\x02\xff\xff\xf8\x09\x00\x00\x02\x04\x05\xa0\x01\x03'
b'\x03\x00\x01\x01\x08\x0a\x7d\x18\x35\x3f\x00\x00\x00\x00')
_ip = IP6(s)
# basic properties
assert _ip.v == 6
assert _ip.fc == 0
assert _ip.flow == 0
_ip.data.sum = 0
s2 = bytes(_ip)
assert s == s2
def _packet_get_key(self, pkt):
"""
Returns a populated hashable "struct" (a PacketKey instance) for
the given packet.
"""
def is_lt(addr1, addr2, port1=0, port2=0):
return addr1 < addr2 or (addr1 == addr2 and port1 < port2)
if IP in pkt:
saddr = pkt[IP].src
daddr = pkt[IP].dst
elif IP6 in pkt:
saddr = pkt[IP6].src
daddr = pkt[IP6].dst
else:
return None
if TCP in pkt:
if is_lt(saddr, daddr, pkt[TCP].sport, pkt[TCP].dport):
return PacketKey(saddr, daddr, dpkt.ip.IP_PROTO_TCP,
pkt[TCP].sport, pkt[TCP].dport)
return PacketKey(daddr, saddr, dpkt.ip.IP_PROTO_TCP,
pkt[TCP].dport, pkt[TCP].sport)
if UDP in pkt:
if is_lt(saddr, daddr, pkt[UDP].sport, pkt[UDP].dport):
return PacketKey(saddr, daddr, dpkt.ip.IP_PROTO_UDP,
pkt[UDP].sport, pkt[UDP].dport)
def parse_nfqueue_ipv6_packet(bytez):
hdr = dpkt.ip6.IP6(raw)
if hdr.hl < 5:
return None, None
return hdr, hdr.p
pkt_time=0.0; # time from last pkt
if last_time == None:
pkt_time = 0.0;
else:
pkt_time=dtime-last_time;
last_time = dtime
eth = dpkt.ethernet.Ethernet(buf)
l3 = None;
next = eth.data;
if isinstance(next, dpkt.ip.IP):
l3 = next;
if isinstance(next, dpkt.ip6.IP6):
l3 = next;
if not l3:
self.fail('Packet #%s in pcap is not IPv4 or IPv6!' % index)
# first packet
if self.c_ip is None:
self.c_ip = l3.src
self.s_ip = l3.dst
direction = "c"
else:
if self.c_ip == l3.src and self.s_ip == l3.dst:
direction = "c"
elif self.s_ip == l3.src and self.c_ip == l3.dst:
direction = "s"
else:
def handle_qualify(self, indicate_pkt, ipv6_pkt):
if not indicate_pkt:
raise Exception('no indication packet')
obfuscated_port,obfuscated_ip = self.unpack_indication(indicate_pkt)
ipv6_pkt = dpkt.ip6.IP6(ipv6_pkt)
if not hasattr(ipv6_pkt, 'icmp6') or ipv6_pkt.icmp6.type != 134:
raise Exception('not a Router Advertisement packet')
teredo_ip = bytearray(struct.unpack('!16s', str(ipv6_pkt)[72:72 +16])[0])
rnd = random.randint(0,1<<16-1)
flag = bytearray(struct.pack('!H', rnd))
flag[0] = flag[0] & 0x3c
teredo_ip[8:10] = flag
teredo_ip[10:12] = obfuscated_port
teredo_ip[12:16] = obfuscated_ip
LOGGER.info('qualify succeed, teredo_ip:%s obfuscated_port:%s obfuscated_ip:%s'
% (hexlify(teredo_ip),hexlify(obfuscated_port),hexlify(obfuscated_ip)))
self.last_time_with_server = time.time()
return str(teredo_ip),obfuscated_port,obfuscated_ip