Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_constuctor():
ip1 = IP(data = b"Hello world!")
ip2 = IP(data = b"Hello world!", len = 0)
ip3 = IP(bytes(ip1))
ip4 = IP(bytes(ip2))
assert (bytes(ip1) == bytes(ip3))
assert (bytes(ip1) == b'E\x00\x00 \x00\x00\x00\x00@\x00z\xdf\x00\x00\x00\x00\x00\x00\x00\x00Hello world!')
assert (bytes(ip2) == bytes(ip4))
assert (bytes(ip2) == b'E\x00\x00 \x00\x00\x00\x00@\x00z\xdf\x00\x00\x00\x00\x00\x00\x00\x00Hello world!')
def Setup(self):
self.name = "Connection to open port"
self.expect = "gen.output.2"
# Packet 1
payload = TCP(sport=555, dport=80, seq=10000, flags=dnet.TH_SYN)
ip = IP(src=dnet.ip_aton("192.0.2.254"),
dst=dnet.ip_aton("192.18.0.10"),
id=5624,
p=dnet.IP_PROTO_TCP)
ip.data = payload
ip.len += len(ip.data)
self.packets.append(ip)
# Packet 2
payload = TCP(sport=555, dport=80,
seq=10001, ack=194595108,
flags=dnet.TH_ACK)
ip = IP(src=dnet.ip_aton("192.0.2.254"),
dst=dnet.ip_aton("192.18.0.10"),
id=5625, p=dnet.IP_PROTO_TCP)
ip.data = payload
ip.len += len(ip.data)
def post_process(self):
"""
Process results.
"""
time_ping = (self.time_close - self.time_connect) * 1000.
# Unpack packet data.
if len(self.buffer_read) > 0:
ip = dpkt.ip.IP(self.buffer_read)
# Store results.
self.is_same_data = (self.data_echo_send == ip.icmp.echo.data)
self.time_ping = time_ping
self.id = ip.icmp.echo.id
self.seq = ip.icmp.echo.seq
self.icmp = ip.icmp
else:
self.is_same_data = False
self.time_ping = None
self.id = None
self.seq = None
self.icmp = None
def test_opt():
s = b'\x4f\x00\x00\x3c\xae\x08\x00\x00\x40\x06\x18\x10\xc0\xa8\x0a\x26\xc0\xa8\x0a\x01\x07\x27\x08\x01\x02\x03\x04\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
ip = IP(s)
ip.sum = 0
assert (bytes(ip) == s)
def gather_statistics(cap,serverAddr):
counters = defaultdict(int)
known_extensions = set()
count_extensions = set()
pkt_count = 0
for ts, buf in cap:
pkt_count += 1
eth = dpkt.ethernet.Ethernet(buf)
#print 'pkt: %d' % (pkt_count)
if not isinstance(eth.data, dpkt.ip.IP):
continue
ip = eth.data
if not isinstance(ip.data, dpkt.tcp.TCP):
continue
# TODO: consider doing TCP streams, so multi-packet things can be parsed right... "meh"
tcp = ip.data
if tcp.dport != 443 and tcp.sport != 443:
continue
if socket.gethostbyname(serverAddr) != socket.inet_ntoa(ip.src):
continue
if len(tcp.data) <= 0:
continue
# we only care about handshakes for now...
while True:
rc, bytes_recvd = win32file.WSARecv(sock.fileno(), buf, self.overlapped_rx)
assert rc == 0 or rc == win32file.WSA_IO_PENDING
bytes_recvd = yield
p = buf[:bytes_recvd]
p = netmsg_to_local(p)
p = unpack_header(p)
if p:
if verbose:
logger.debug('tunnel send: ')
if (ord(p[0])&0xf0) == 0x40:
logger.debug(pprint.pformat(IP(p)))
elif (ord(p[0])&0xf0)==0x60:
logger.debug(pprint.pformat(IP6(p)))
else:
logger.warning('Unknown layer 3 protocol')
win32file.WriteFile(handle, p, self.overlapped_tx)
yield
def iplayer_from_raw(raw, linktype=1):
"""Converts a raw packet to a dpkt packet regarding of link type.
@param raw: raw packet
@param linktype: integer describing link type as expected by dpkt
"""
if linktype == 1: # ethernet
pkt = dpkt.ethernet.Ethernet(raw)
ip = pkt.data
elif linktype == 101: # raw
ip = dpkt.ip.IP(raw)
else:
raise CuckooProcessingError("unknown PCAP linktype")
return ip
def parse_pcap(filename):
streams = dict() # Connections with current buffer
with open(filename, "rb") as f:
pcap = dpkt.pcap.Reader(f)
for ts, buf in pcap:
eth = dpkt.ethernet.Ethernet(buf)
if eth.type != dpkt.ethernet.ETH_TYPE_IP:
continue
ip = eth.data
if not isinstance(ip, dpkt.ip.IP):
try:
ip = dpkt.ip.IP(ip)
except:
continue
if ip.p != dpkt.ip.IP_PROTO_TCP:
continue
tcp = ip.data
if not isinstance(tcp, dpkt.tcp.TCP):
try:
tcp = dpkt.tcp.TCP(tcp)
except:
continue
tupl = (ip.src, ip.dst, tcp.sport, tcp.dport)
if tupl in streams:
def add_fix_checksum_inst(self, linked_ipv4_obj, offset_to_obj=14, name=None):
# check if specified linked_ipv4_obj is indeed an ipv4 object
if not (isinstance(linked_ipv4_obj, dpkt.ip.IP)):
raise ValueError("The provided layer object is not of IPv4.")
if not name:
name = "checksum_{off}".format(off=offset_to_obj) # name will override previous checksum inst, OK
new_checksum_inst = self.CTRexVMChecksumInst(name, offset_to_obj)
# store the checksum inst in the end of the IP header (20 Bytes long)
inst = self.InstStore('checksum', new_checksum_inst)
self._inst_by_offset[offset_to_obj + 20] = inst
self._off_inst_by_name[name] = inst
def liveFlowCaptureDpkt(targetDevice,flows,packetLimit=5):
#start the capture
sniffer = pcap.pcap(name=targetDevice, promisc=True, immediate=True, timeout_ms=50)
count=0
for ts,raw_pkt in sniffer:
count = count+1
#convert raw bytes to an ethernet object
eth = dpkt.ethernet.Ethernet(raw_pkt)
ip = eth.data
#create the keys for IP UDP/TCP flows
if isinstance(ip, dpkt.ip.IP):
if isinstance(ip.data, dpkt.tcp.TCP):
key = ('TCP', frozenset( ((ip_to_string(ip.src), ip.data.sport),(ip_to_string(ip.dst), ip.data.dport)) ))
elif isinstance(ip.data, dpkt.udp.UDP):
#hard-coded packet limit for UDP to allow for DNS
packetLimit = 2
key = ('UDP', frozenset( ((ip_to_string(ip.src), ip.data.sport),(ip_to_string(ip.dst), ip.data.dport)) ))
else:
continue
#create an entry in the flow dict if one is absent. init with the unprocessed mark
if key not in flows:
#print("New flow detected:\n",key)
flows[key]=[False]
#print(flows[key])
if (len(flows[key]) < packetLimit+1):
#append packets to the flow entry