Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""Return dict of HTTP headers parsed from a file object."""
d = OrderedDict()
while 1:
# The following logic covers two kinds of loop exit criteria.
# 1) If the header is valid, when we reached the end of the header,
# f.readline() would return with '\r\n', then after strip(),
# we can break the loop.
# 2) If this is a weird header, which do not ends with '\r\n',
# f.readline() would return with '', then after strip(),
# we still get an empty string, also break the loop.
line = f.readline().strip().decode("ascii", "ignore")
if not line:
break
l = line.split(':', 1)
if len(l[0].split()) != 1:
raise dpkt.UnpackError('invalid header: %r' % line)
k = l[0].lower()
v = len(l) != 1 and l[1].lstrip() or ''
if k in d:
if not type(d[k]) is list:
d[k] = [d[k]]
d[k].append(v)
else:
d[k] = v
return d
def parse_body(f, headers):
"""Return HTTP body parsed from a file object, given HTTP header dict."""
if headers.get('transfer-encoding', '').lower() == 'chunked':
l = []
found_end = False
while 1:
try:
sz = f.readline().split(None, 1)[0]
except IndexError:
raise dpkt.UnpackError('missing chunk size')
n = int(sz, 16)
if n == 0:
found_end = True
buf = f.read(n)
if f.readline().strip():
break
if n and len(buf) == n:
l.append(buf)
else:
break
if not found_end:
raise dpkt.NeedData('premature end of chunked body')
body = b''.join(l)
elif 'content-length' in headers:
n = int(headers['content-length'])
body = f.read(n)
off += 1
break
elif (n & 0xc0) == 0xc0:
ptr = struct.unpack('>H', buf[off:off + 2])[0] & 0x3fff
if ptr >= start_off:
raise dpkt.UnpackError('Invalid label compression pointer')
off += 2
if not saved_off:
saved_off = off
start_off = off = ptr
elif (n & 0xc0) == 0x00:
off += 1
name.append(buf[off:off + n])
name_length += n + 1
if name_length > 255:
raise dpkt.UnpackError('name longer than 255 bytes')
off += n
else:
raise dpkt.UnpackError('Invalid label length %02x' % n)
if not saved_off:
saved_off = off
return codecs.decode(b'.'.join(name), 'utf-8'), saved_off
def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
n = self.len - 4
if n > len(self.data):
raise dpkt.NeedData('not enough data')
self.msg, self.data = self.data[:n], self.data[n:]
try:
p = self._msgsw[self.msgid](self.msg)
setattr(self, p.__class__.__name__.lower(), p)
except (KeyError, dpkt.UnpackError):
pass
def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
try:
self.data = self._cmdsw[self.cmd](self.data)
setattr(self, self.data.__class__.__name__.lower(), self.data)
except (KeyError, struct.error, dpkt.UnpackError):
pass
Arguments:
buf -- optional packet buffer to unpack
Optional keyword arguments correspond to members to set
(matching fields in self.__hdr__, or 'data').
"""
self.data = b''
if args:
try:
self.unpack(args[0])
except struct.error:
if len(args[0]) < self.__hdr_len__:
raise NeedData('got %d, %d needed at least' % (len(args[0]), self.__hdr_len__))
raise UnpackError('invalid %s: %r' %
(self.__class__.__name__, args[0]))
else:
for k in self.__hdr_fields__:
setattr(self, k, copy.copy(self.__hdr_defaults__[k]))
for k, v in iteritems(kwargs):
setattr(self, k, v)
if hasattr(self, '__hdr_fmt__'):
self._pack_hdr = partial(struct.pack, self.__hdr_fmt__)
ip = ether.data
# is it a TCP/IP packet?
if ip.p != dpkt.ip.IP_PROTO_TCP:
return
# only traffic, from/to the camera
if not ( socket.inet_ntoa(ip.src) == camera_ip or socket.inet_ntoa(ip.dst) == camera_ip):
return
# check for HTTP traffic
try:
http_rq = dpkt.http.Request(ip.tcp.data)
print "\nURL-Req:", urllib.unquote(http_rq.uri)
self.remember_me(http_rq.uri)
except dpkt.dpkt.UnpackError:
pass
# check for "low/level" traffic
# is the tcp data larger than 12 bytes?
# 4 bytes length information, 4 bytes "FOSC", 4 bytes data len
if len(ip.tcp.data)<12:
return
# unpack those 8 bytes
cmd, magic, datalen = FoscDecoder.unpack("
def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
try:
self.data = self._typesw[self.ethtype](self.data)
setattr(self, self.data.__class__.__name__.lower(), self.data)
except (KeyError, dpkt.UnpackError):
pass
def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
try:
if self.code == 0:
self.data = self.ppp = ppp.PPP(self.data)
except dpkt.UnpackError:
pass
def decode_dns_qd_name(pcap_path):
qd_name_list = []
five_tuple = []
conn = {}
fp = open(pcap_path)
pc = dpkt.pcap.Reader(fp)
unknown_opcode_counter = 0
for (src, sport, dst, dport, data) in _udp_iterator(pc):
if dport == 53:
key = (src, sport, dst, dport)
# UDP/53 is a DNS query
try:
dns = dpkt.dns.DNS(data)
conn[key] = [dns.qd[0].name, truncate_dns(hexify(data))]
except (dpkt.dpkt.UnpackError, IndexError):
unknown_opcode_counter += 1
# An unknown opcode maybe malicious traffic
# print unknown_opcode_counter
key = (src, sport, dst, dport, unknown_opcode_counter)
# print 'UNKNOWN_DNS_DATA:', hexify(data)
conn[key] = ['UNKNOWN_DNS', truncate_dns(hexify(data))]
# qd_name_list.append(dns.qd[0].name)
# five_tuple.append((src, sport, dst, dport))
# print truncate_dns(hexify(data))
# print "Query for", repr(dns.qd[0].name)
fp.close()
return conn