Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
class DbTxAttenuation(dpkt.Packet):
__hdr__ = (
('db', 'H', 0),
)
class DbAntennaNoise(dpkt.Packet):
__hdr__ = (
('db', 'B', 0),
)
class DbAntennaSignal(dpkt.Packet):
__hdr__ = (
('db', 'B', 0),
)
class DbmTxPower(dpkt.Packet):
__hdr__ = (
('dbm', 'B', 0),
)
if __name__ == '__main__':
import unittest
class RadiotapTestCase(unittest.TestCase):
def test_Radiotap(self):
s = '\x00\x00\x00\x18\x6e\x48\x00\x00\x00\x02\x6c\x09\xa0\x00\xa8\x81\x02\x00\x00\x00\x00\x00\x00\x00'
rad = Radiotap(s)
self.failUnless(rad.version == 0)
self.failUnless(rad.present_flags == 0x6e480000)
self.failUnless(rad.tsft_present == 0)
self.failUnless(rad.flags_present == 1)
self.failUnless(rad.rate_present == 1)
seq=99, data='echo request by xxxx'))
i = dpkt.ip.IP(data = echo)
i.p = dpkt.ip.IP_PROTO_ICMP
i.src = socket.inet_aton(src)
i.dst = socket.inet_aton(dst)
i.len = len(i)
return i
ETH_P_IP = 0x0800
ETH_P_IPV6 = 0x86DD
MINIVTUN_MSG_KEEPALIVE = 0
MINIVTUN_MSG_IPDATA = 1
MINIVTUN_MSG_DISCONNECT = 2
class Msg(dpkt.Packet):
__hdr__ = (
('opcode', 'B', MINIVTUN_MSG_IPDATA),
('rsv', '3s', '\x00' * 3),
('passwd_md5sum', '16s', '\x00' * 16)
)
class IPData(dpkt.Packet):
__hdr__ = (
('proto', 'H', ETH_P_IP),
('ip_dlen', 'H', 0)
)
class KeepAlive(dpkt.Packet):
__hdr__ = (
('loc_tun_in', '4s', '\x00' * 4),
ICMP_PHOTURIS_UNKNOWN_INDEX = 0 # unknown sec index
ICMP_PHOTURIS_AUTH_FAILED = 1 # auth failed
ICMP_PHOTURIS_DECOMPRESS_FAILED = 2 # decompress failed
ICMP_PHOTURIS_DECRYPT_FAILED = 3 # decrypt failed
ICMP_PHOTURIS_NEED_AUTHN = 4 # no authentication
ICMP_PHOTURIS_NEED_AUTHZ = 5 # no authorization
ICMP_TYPE_MAX = 40
class ICMP(Packet):
"""Internet Control Message Protocol."""
__hdr__ = (
('type', 'B', 8),
('code', 'B', 0),
('sum', 'H', 0)
)
class Echo(Packet):
__hdr__ = (('id', 'H', 0), ('seq', 'H', 0))
class Quote(Packet):
__hdr__ = (('pad', 'I', 0),)
def unpack(self, buf):
Packet.unpack(self, buf)
self.data = self.ip = ip.IP(self.data)
class Unreach(Quote):
__hdr__ = (('pad', 'H', 0), ('mtu', 'H', 0))
class Quench(Quote):
pass
class Redirect(Quote):
__hdr__ = (('gw', 'I', 0),)
class ParamProbe(Quote):
__hdr__ = (('ptr', 'B', 0), ('pad1', 'B', 0), ('pad2', 'H', 0))
class TimeExceed(Quote):
pass
# $Id: netflow.py,v 1.1.1.1 2005/10/29 18:20:48 provos Exp $
from dpkt import Packet
from struct import unpack as _st_unpack
from itertools import izip as _it_izip
class NetflowBase(Packet):
"""Base class for Cisco Netflow packets."""
__hdr__ = (
('version', 'H', 1),
('count', 'H', 0),
('sys_uptime', 'I', 0),
('unix_sec', 'I', 0),
('unix_nsec', 'I', 0)
)
def __len__(self):
return self.__hdr_len__ + (len(self.data[0]) * self.count)
def __str__(self):
# for now, don't try to enforce any size limits
self.count = len(self.data)
# -*- coding: utf-8 -*-
import struct
import dpkt
from . import stp
from . import ethernet
class LLC(dpkt.Packet):
_typesw = {}
def _unpack_data(self, buf):
if self.type == ethernet.ETH_TYPE_8021Q:
self.tag, self.type = struct.unpack('>HH', buf[:4])
buf = buf[4:]
elif self.type == ethernet.ETH_TYPE_MPLS or self.type == ethernet.ETH_TYPE_MPLS_MCAST:
# XXX - skip labels
for i in range(24):
if struct.unpack('>I', buf[i:i + 4])[0] & 0x0100: # MPLS_STACK_BOTTOM
break
self.type = ethernet.ETH_TYPE_IP
buf = buf[(i + 1) * 4:]
try:
self.data = self._typesw[self.type](buf)
setattr(self, self.data.__class__.__name__.lower(), self.data)
def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
if self.type > 1500:
# Ethernet II
self._unpack_data(self.data)
elif self.dst.startswith('\x01\x00\x0c\x00\x00') or \
self.dst.startswith('\x03\x00\x0c\x00\x00'):
# Cisco ISL
self.vlan = struct.unpack('>H', self.data[6:8])[0]
self.unpack(self.data[12:])
elif self.data.startswith('\xff\xff'):
# Novell "raw" 802.3
self.type = ETH_TYPE_IPX
self.data = self.ipx = self._typesw[ETH_TYPE_IPX](self.data[2:])
else:
# 802.2 LLC
self.dsap, self.ssap, self.ctl = struct.unpack('BBB', self.data[:3])
if self.data.startswith('\xaa\xaa'):
def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
# Wait, might there be more than one message of self.type?
embedded_type = HANDSHAKE_TYPES.get(self.type, None)
if embedded_type is None:
raise SSL3Exception('Unknown or invalid handshake type %d' %
self.type)
# only take the right number of bytes
self.data = self.data[:self.length]
if len(self.data) != self.length:
raise dpkt.NeedData
# get class out of embedded_type tuple
self.data = embedded_type[1](self.data)
def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
#print "\nv_fl=%x, win_len=%d, validation=%d, seq_num=%d" % (self.v_fl, self.win_len, self.validation, self.seq_num)
self.len = len(buf)
self.chunks = []
self.version = (self.v_fl >> 12) & 0xF
#print "version=%d" % self.version
self.flags = [ MessagePacket._flags[x] for x in MessagePacket._flags.keys() if (self.v_fl & 0xFFF) & x ]
#print "flags=%s" % ', '.join(self.flags)
self.payload_offset = MessagePacket.HEADER_LENGTH
if self.v_fl & 0x010: # if we have delivery prereq
self.dp = DeliveryPrerequisites(buf, MessagePacket.HEADER_LENGTH)
self.payload_offset = self.payload_offset + self.dp.getLength()
#print "payload_offset=%s" % self.payload_offset
self.unpack_chunks(buf)
"""
As far as TLSRecord is concerned, AppData is just an opaque blob.
"""
pass
class TLSAlert(dpkt.Packet):
__hdr__ = (
('level', 'B', 1),
('description', 'B', 0),
)
class TLSHelloRequest(dpkt.Packet):
__hdr__ = tuple()
class TLSClientHello(dpkt.Packet):
__hdr__ = (
('version', 'H', 0x0301),
('random', '32s', '\x00' * 32),
) # the rest is variable-length and has to be done manually
def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
# now session, cipher suites, extensions are in self.data
self.session_id, pointer = parse_variable_array(self.data, 1)
# print 'pointer',pointer
# handle ciphersuites
ciphersuites, parsed = parse_variable_array(self.data[pointer:], 2)
PCAP_VERSION_MINOR = 4
class PcapFileHdr(dpkt.Packet):
"""pcap file header."""
__hdr__ = (
('magic', 'I', TCPDUMP_MAGIC),
('v_major', 'H', PCAP_VERSION_MAJOR),
('v_minor', 'H', PCAP_VERSION_MINOR),
('thiszone', 'I', 0),
('sigfigs', 'I', 0),
('snaplen', 'I', 1500),
('linktype', 'I', 1),
)
__byte_order__ = '@'
class PcapPktHdr(dpkt.Packet):
"""pcap packet header."""
__hdr__ = (
('tv_sec', 'I', 0),
('tv_usec', 'I', 0),
('caplen', 'I', 0),
('len', 'I', 0),
)
__byte_order__ = '@'
class PcapDumper(object):
def __init__(self, filename, snaplen=1500, linktype=1):
self.f = open(filename, 'w')
fh = PcapFileHdr(snaplen=snaplen, linktype=linktype)
self.f.write(str(fh))
def append(self, pkt, ts=None):