Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
class DataInterDS(dpkt.Packet):
__hdr__ = (
('dst', '6s', '\x00'*6),
('src', '6s', '\x00'*6),
('da', '6s', '\x00'*6),
('frag_seq', 'H', 0),
('sa', '6s', '\x00'*6)
)
class QoS_Data(dpkt.Packet):
__hdr__ = (
('control', 'H', 0),
)
class IE(dpkt.Packet):
__hdr__ = (
('id', 'B', 0),
('len', 'B', 0)
)
def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
self.info = buf[2:self.len+ 2]
class FH(dpkt.Packet):
__hdr__ = (
('id', 'B', 0),
('len', 'B', 0),
('tu', 'H', 0),
('hopset', 'B', 0),
('hoppattern', 'B', 0),
('hopindex', 'B', 0)
# Channel type
_CHAN_TYPE_SIZE = 4
_CHANNEL_TYPE_SHIFT = 4
_CCK_SHIFT = 5
_OFDM_SHIFT = 6
_TWO_GHZ_SHIFT = 7
_FIVE_GHZ_SHIFT = 8
_PASSIVE_SHIFT = 9
_DYN_CCK_OFDM_SHIFT = 10
_GFSK_SHIFT = 11
_GSM_SHIFT = 12
_STATIC_TURBO_SHIFT = 13
_HALF_RATE_SHIFT = 14
_QUARTER_RATE_SHIFT = 15
class Radiotap(dpkt.Packet):
__hdr__ = (
('version', 'B', 0),
('pad', 'B', 0),
('length', 'H', 0),
('present_flags', 'I', 0)
)
def _get_tsft_present(self): return (self.present_flags & _TSFT_MASK) >> _TSFT_SHIFT
def _set_tsft_present(self, val): self.present_flags = self.present_flags | (val << _TSFT_SHIFT)
def _get_flags_present(self): return (self.present_flags & _FLAGS_MASK) >> _FLAGS_SHIFT
def _set_flags_present(self, val): self.present_flags = self.present_flags | (val << _FLAGS_SHIFT)
def _get_rate_present(self): return (self.present_flags & _RATE_MASK) >> _RATE_SHIFT
def _set_rate_present(self, val): self.present_flags = self.present_flags | (val << _RATE_SHIFT)
def _get_channel_present(self): return (self.present_flags & _CHANNEL_MASK) >> _CHANNEL_SHIFT
def _set_channel_present(self, val): self.present_flags = self.present_flags | (val << _CHANNEL_SHIFT)
def _get_fhss_present(self): return (self.present_flags & _FHSS_MASK) >> _FHSS_SHIFT
def pack_hdr(self):
try:
if self.p > 0xff:
return struct.pack('>H', self.p)
return dpkt.Packet.pack_hdr(self)
except struct.error as e:
raise dpkt.PackError(str(e))
_TWO_GHZ_SHIFT = 7
_FIVE_GHZ_SHIFT = 8
_PASSIVE_SHIFT = 9
_DYN_CCK_OFDM_SHIFT = 10
_GFSK_SHIFT = 11
_GSM_SHIFT = 12
_STATIC_TURBO_SHIFT = 13
_HALF_RATE_SHIFT = 14
_QUARTER_RATE_SHIFT = 15
# Flags offsets and masks
_FCS_SHIFT = 4
_FCS_MASK = 0x10
class Radiotap(dpkt.Packet):
"""Radiotap.
TODO: Longer class information....
Attributes:
__hdr__: Header fields of Radiotap.
TODO.
"""
__hdr__ = (
('version', 'B', 0),
('pad', 'B', 0),
('length', 'H', 0),
('present_flags', 'I', 0)
)
def __str__(self):
# fix https://code.google.com/p/dpkt/issues/detail?id=59
if (self.p == 6 or self.p == 17 or self.p == 58) and not self.data.sum:
# XXX - set TCP, UDP, and ICMPv6 checksums
p = str(self.data)
s = dpkt.struct.pack('>16s16sxBH', self.src, self.dst, self.p, len(p))
s = dpkt.in_cksum_add(0, s)
s = dpkt.in_cksum_add(s, p)
try:
self.data.sum = dpkt.in_cksum_done(s)
except AttributeError:
pass
return self.pack_hdr() + self.headers_str() + str(self.data)
t1 = self.vlan_tags[0]
if len(self.vlan_tags) == 1:
if isinstance(t1, VLANtag8021Q):
if new_type not in _ETH_TYPES_QINQ: # preserve the type if already set
new_type = ETH_TYPE_8021Q
elif isinstance(t1, VLANtagISL):
t1.type = 0 # 0 means Ethernet
is_isl = True
elif len(self.vlan_tags) == 2:
t2 = self.vlan_tags[1]
if isinstance(t1, VLANtag8021Q) and isinstance(t2, VLANtag8021Q):
t1.type = ETH_TYPE_8021Q
if new_type not in _ETH_TYPES_QINQ:
new_type = ETH_TYPE_8021AD
else:
raise dpkt.PackError('maximum is 2 VLAN tags per Ethernet frame')
tags_buf = b''.join(tag.pack_hdr() for tag in self.vlan_tags)
# initial type is based on next layer, pointed by self.data;
# try to find an ETH_TYPE matching the data class
elif isinstance(self.data, dpkt.Packet):
new_type = self._typesw_rev.get(self.data.__class__, new_type)
# if self.data is LLC then this is IEEE 802.3 Ethernet and self.type
# then actually encodes the length of data
if isinstance(self.data, llc.LLC):
new_type = len(self.data)
hdr_buf = dpkt.Packet.pack_hdr(self)[:-2] + struct.pack('>H', new_type)
if not is_isl:
return hdr_buf + tags_buf
else:
class FLAP(dpkt.Packet):
__hdr__ = (
('ast', 'B', 0x2a), # '*'
('type', 'B', 0),
('seq', 'H', 0),
('len', 'H', 0)
)
def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
if self.ast != 0x2a:
raise dpkt.UnpackError('invalid FLAP header')
if len(self.data) < self.len:
raise dpkt.NeedData('%d left, %d needed' % (len(self.data), self.len))
class SNAC(dpkt.Packet):
__hdr__ = (
('family', 'H', 0),
('subtype', 'H', 0),
('flags', 'H', 0),
('reqid', 'I', 0)
)
def tlv(buf):
n = 4
try:
t, l = struct.unpack('>HH', buf[:n])
except struct.error:
raise dpkt.UnpackError
v = buf[n:n+l]
if len(v) < l:
raise dpkt.NeedData
def __bytes__(self):
if not self.sum:
self.sum = dpkt.in_cksum(dpkt.Packet.__bytes__(self))
return dpkt.Packet.__bytes__(self)
ICMP_PHOTURIS_AUTH_FAILED = 1 # auth failed
ICMP_PHOTURIS_DECOMPRESS_FAILED = 2 # decompress failed
ICMP_PHOTURIS_DECRYPT_FAILED = 3 # decrypt failed
ICMP_PHOTURIS_NEED_AUTHN = 4 # no authentication
ICMP_PHOTURIS_NEED_AUTHZ = 5 # no authorization
ICMP_TYPE_MAX = 40
class ICMP(dpkt.Packet):
__hdr__ = (
('type', 'B', 8),
('code', 'B', 0),
('sum', 'H', 0)
)
class Echo(dpkt.Packet):
__hdr__ = (('id', 'H', 0), ('seq', 'H', 0))
class Quote(dpkt.Packet):
__hdr__ = (('pad', 'I', 0),)
def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
self.data = self.ip = ip.IP(self.data)
class Unreach(Quote):
__hdr__ = (('pad', 'H', 0), ('mtu', 'H', 0))
class Quench(Quote):
pass
class Redirect(Quote):
__hdr__ = (('gw', 'I', 0),)
class ParamProbe(Quote):
__hdr__ = (('ptr', 'B', 0), ('pad1', 'B', 0), ('pad2', 'H', 0))
class TimeExceed(Quote):
pass
_typesw = { 0:Echo, 3:Unreach, 4:Quench, 5:Redirect, 8:Echo,