Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def gen_ping(self, opts):
for i in xrange(opts.count):
icmp = dpkt.icmp.ICMP(
type=8, data=dpkt.icmp.ICMP.Echo(id=random.randint(0, 0xffff),
seq=i, data=opts.payload))
yield str(icmp)
icmp_data.append('ICMP traceroute')
elif icmp_type is dpkt.icmp.ICMP_DATACONVERR:
icmp_data.append('ICMP data conversion error')
elif icmp_type is dpkt.icmp.ICMP_MOBILE_REDIRECT:
icmp_data.append('ICMP mobile host redirect')
elif icmp_type is dpkt.icmp.ICMP_IP6_WHEREAREYOU:
icmp_data.append('ICMP IPv6 where-are-yo')
elif icmp_type is dpkt.icmp.ICMP_IP6_IAMHERE:
icmp_data.append('ICMP IPv6 i-am-here')
elif icmp_type is dpkt.icmp.ICMP_MOBILE_REG:
icmp_data.append('ICMP mobile registration req')
elif icmp_type is dpkt.icmp.ICMP_MOBILE_REGREPLY:
icmp_data.append('ICMP mobile registration reply')
elif icmp_type is dpkt.icmp.ICMP_DNS:
icmp_data.append('ICMP domain name request')
elif icmp_type is dpkt.icmp.ICMP_DNSREPLY:
icmp_data.append('ICMP domain name reply')
elif icmp_type is dpkt.icmp.ICMP_PHOTURIS:
icmp_data.append('ICMP Photuris')
if icmp_code is dpkt.icmp.ICMP_PHOTURIS_UNKNOWN_INDEX:
icmp_data.append(': unknown sec index')
elif icmp_code is dpkt.icmp.ICMP_PHOTURIS_AUTH_FAILED:
icmp_data.append(': auth failed')
elif icmp_code is dpkt.icmp.ICMP_PHOTURIS_DECOMPRESS_FAILED:
icmp_data.append(': decompress failed')
elif icmp_code is dpkt.icmp.ICMP_PHOTURIS_DECRYPT_FAILED:
icmp_data.append(': decrypt failed')
elif icmp_code is dpkt.icmp.ICMP_PHOTURIS_NEED_AUTHN:
icmp_data.append(': no authentication')
elif icmp_code is dpkt.icmp.ICMP_PHOTURIS_NEED_AUTHZ:
icmp_data.append(': no authorization')
elif icmp_type is dpkt.icmp.ICMP_TYPE_MAX:
icmp_data.append('ICMP Router solicitation')
elif icmp_type is dpkt.icmp.ICMP_TIMEXCEED:
icmp_data.append('ICMP time exceeded, code:')
if icmp_code is dpkt.icmp.ICMP_TIMEXCEED_INTRANS:
icmp_data.append(' ttl==0 in transit')
elif icmp_code is dpkt.icmp.ICMP_TIMEXCEED_REASS:
icmp_data.append('ttl==0 in reass')
elif icmp_type is dpkt.icmp.ICMP_PARAMPROB:
icmp_data.append('ICMP ip header bad')
if icmp_code is dpkt.icmp.ICMP_PARAMPROB_ERRATPTR:
icmp_data.append(':req. opt. absent')
elif icmp_code is dpkt.icmp.ICMP_PARAMPROB_OPTABSENT:
icmp_data.append(': req. opt. absent')
elif icmp_code is dpkt.icmp.ICMP_PARAMPROB_LENGTH:
icmp_data.append(': length')
elif icmp_type is dpkt.icmp.ICMP_TSTAMP:
icmp_data.append('ICMP timestamp request')
elif icmp_type is dpkt.icmp.ICMP_TSTAMPREPLY:
icmp_data.append('ICMP timestamp reply')
elif icmp_type is dpkt.icmp.ICMP_INFO:
icmp_data.append('ICMP information request')
elif icmp_type is dpkt.icmp.ICMP_INFOREPLY:
icmp_data.append('ICMP information reply')
elif icmp_type is dpkt.icmp.ICMP_MASK:
icmp_data.append('ICMP address mask request')
elif icmp_type is dpkt.icmp.ICMP_MASKREPLY:
icmp_data.append('ICMP address mask reply')
elif icmp_type is dpkt.icmp.ICMP_TRACEROUTE:
icmp_data.append('ICMP traceroute')
elif icmp_type is dpkt.icmp.ICMP_DATACONVERR:
icmp_data.append('ICMP data conversion error')
elif icmp_type is dpkt.icmp.ICMP_MOBILE_REDIRECT:
def init_pcap():
if protocols: # already init'ed
return
global dpkt, dnslib
import dpkt
import dnslib
load_consts(protocols['ethernet'], dpkt.ethernet, 'ETH_TYPE_')
load_consts(protocols['ip'], dpkt.ip, 'IP_PROTO_')
load_consts(_flags['ip_tos'], dpkt.ip, 'IP_TOS_')
load_consts(protocols['icmp'], dpkt.icmp, 'ICMP_')
load_consts(_flags['tcp'], dpkt.tcp, 'TH_')
load_oui(url_oui)
load_iana(url_iana)
def createICMP(self, msg):
echo = dpkt.icmp.ICMP.Echo()
echo.id = random.randint(0, 0xffff)
echo.seq = random.randint(0, 0xffff)
echo.data = msg
icmp = dpkt.icmp.ICMP()
icmp.type = dpkt.icmp.ICMP_ECHO
icmp.data = echo
return str(icmp)
return
if pkt.p == dpkt.ip.IP_PROTO_TCP:
ret = self.tcp.handle(pkt)
payload.set_verdict(ret[1])
self.nodata_count += ret[0]
elif pkt.p == dpkt.ip.IP_PROTO_UDP:
ret = self.udp.handle(pkt)
payload.set_verdict(ret[1])
self.nodata_count += ret[0]
elif pkt.p == dpkt.ip.IP_PROTO_ICMP:
frame = pkt.data
if frame.type == dpkt.icmp.ICMP_ECHO:
log.info("ICMP ECHO %s" % inet_ntoa(pkt.dst))
return
elif frame.type == dpkt.icmp.ICMP_ECHOREPLY:
log.info("ICMP REPLY from %s" % inet_ntoa(pkt.src))
return
else:
return
else:
log.warning("unsupported protocol %s recieved (ignored)" % pkt.p)
return
icmp_data.append(': src host isolated')
elif icmp_code is dpkt.icmp.ICMP_UNREACH_NET_PROHIB:
icmp_data.append(': for crypto devs')
elif icmp_code is dpkt.icmp.ICMP_UNREACH_HOST_PROHIB:
icmp_data.append(': for cypto devs')
elif icmp_code is dpkt.icmp.ICMP_UNREACH_TOSNET:
icmp_data.append(': bad tos for net')
elif icmp_code is dpkt.icmp.ICMP_UNREACH_TOSHOST:
icmp_data.append(': bad tos for host')
elif icmp_code is dpkt.icmp.ICMP_UNREACH_FILTER_PROHIB:
icmp_data.append(': prohibited access')
elif icmp_code is dpkt.icmp.ICMP_UNREACH_HOST_PRECEDENCE:
icmp_data.append(': precedence error')
elif icmp_code is dpkt.icmp.ICMP_UNREACH_PRECEDENCE_CUTOFF:
icmp_data.append(': precedence cutoff')
elif icmp_type is dpkt.icmp.ICMP_SRCQUENCH:
icmp_data.append('ICMP source quench')
elif icmp_type is dpkt.icmp.ICMP_REDIRECT:
icmp_data.append('ICMP Redirect')
if icmp_code is dpkt.icmp.ICMP_REDIRECT_NET:
icmp_data.append(' for network')
elif icmp_code is dpkt.icmp.ICMP_REDIRECT_HOST:
icmp_data.append(' for host')
elif icmp_code is dpkt.icmp.ICMP_REDIRECT_TOSNET:
icmp_data.append(' for tos and net')
elif icmp_code is dpkt.icmp.ICMP_REDIRECT_TOSHOST:
icmp_data.append(' for tos and host')
elif icmp_type is dpkt.icmp.ICMP_ALTHOSTADDR:
icmp_data.append('ICMP alternate host address')
elif icmp_type is dpkt.icmp.ICMP_ECHO:
icmp_data.append('ICMP echo')
elif icmp_type is dpkt.icmp.ICMP_RTRADVERT:
icmp_data.append('ICMP address mask reply')
elif icmp_type is dpkt.icmp.ICMP_TRACEROUTE:
icmp_data.append('ICMP traceroute')
elif icmp_type is dpkt.icmp.ICMP_DATACONVERR:
icmp_data.append('ICMP data conversion error')
elif icmp_type is dpkt.icmp.ICMP_MOBILE_REDIRECT:
icmp_data.append('ICMP mobile host redirect')
elif icmp_type is dpkt.icmp.ICMP_IP6_WHEREAREYOU:
icmp_data.append('ICMP IPv6 where-are-yo')
elif icmp_type is dpkt.icmp.ICMP_IP6_IAMHERE:
icmp_data.append('ICMP IPv6 i-am-here')
elif icmp_type is dpkt.icmp.ICMP_MOBILE_REG:
icmp_data.append('ICMP mobile registration req')
elif icmp_type is dpkt.icmp.ICMP_MOBILE_REGREPLY:
icmp_data.append('ICMP mobile registration reply')
elif icmp_type is dpkt.icmp.ICMP_DNS:
icmp_data.append('ICMP domain name request')
elif icmp_type is dpkt.icmp.ICMP_DNSREPLY:
icmp_data.append('ICMP domain name reply')
elif icmp_type is dpkt.icmp.ICMP_PHOTURIS:
icmp_data.append('ICMP Photuris')
if icmp_code is dpkt.icmp.ICMP_PHOTURIS_UNKNOWN_INDEX:
icmp_data.append(': unknown sec index')
elif icmp_code is dpkt.icmp.ICMP_PHOTURIS_AUTH_FAILED:
icmp_data.append(': auth failed')
elif icmp_code is dpkt.icmp.ICMP_PHOTURIS_DECOMPRESS_FAILED:
icmp_data.append(': decompress failed')
elif icmp_code is dpkt.icmp.ICMP_PHOTURIS_DECRYPT_FAILED:
icmp_data.append(': decrypt failed')
elif icmp_code is dpkt.icmp.ICMP_PHOTURIS_NEED_AUTHN:
icmp_data.append(': no authentication')
elif icmp_code is dpkt.icmp.ICMP_PHOTURIS_NEED_AUTHZ:
def test_eth_mpls_stacked(): # Eth - MPLS - MPLS - IP - ICMP
from . import ip
from . import icmp
s = (b'\x00\x30\x96\xe6\xfc\x39\x00\x30\x96\x05\x28\x38\x88\x47\x00\x01\x20\xff\x00\x01\x01\xff'
b'\x45\x00\x00\x64\x00\x50\x00\x00\xff\x01\xa7\x06\x0a\x1f\x00\x01\x0a\x22\x00\x01\x08\x00'
b'\xbd\x11\x0f\x65\x12\xa0\x00\x00\x00\x00\x00\x53\x9e\xe0' + b'\xab\xcd' * 32)
eth = Ethernet(s)
assert len(eth.mpls_labels) == 2
assert eth.mpls_labels[0].val == 18
assert eth.mpls_labels[1].val == 16
assert eth.labels == [(18, 0, 255), (16, 0, 255)]
assert isinstance(eth.data, ip.IP)
assert isinstance(eth.data.data, icmp.ICMP)
# construction
assert str(eth) == str(s), 'pack 1'
assert str(eth) == str(s), 'pack 2'
assert len(eth) == len(s)
# construction with kwargs
eth2 = Ethernet(src=eth.src, dst=eth.dst, mpls_labels=eth.mpls_labels, data=eth.data)
assert str(eth2) == str(s)
# construction w/o labels
del eth.labels, eth.mpls_labels
assert str(eth) == str(s[:12] + b'\x08\x00' + s[22:])
icmp_data.append(': precedence error')
elif icmp_code is dpkt.icmp.ICMP_UNREACH_PRECEDENCE_CUTOFF:
icmp_data.append(': precedence cutoff')
elif icmp_type is dpkt.icmp.ICMP_SRCQUENCH:
icmp_data.append('ICMP source quench')
elif icmp_type is dpkt.icmp.ICMP_REDIRECT:
icmp_data.append('ICMP Redirect')
if icmp_code is dpkt.icmp.ICMP_REDIRECT_NET:
icmp_data.append(' for network')
elif icmp_code is dpkt.icmp.ICMP_REDIRECT_HOST:
icmp_data.append(' for host')
elif icmp_code is dpkt.icmp.ICMP_REDIRECT_TOSNET:
icmp_data.append(' for tos and net')
elif icmp_code is dpkt.icmp.ICMP_REDIRECT_TOSHOST:
icmp_data.append(' for tos and host')
elif icmp_type is dpkt.icmp.ICMP_ALTHOSTADDR:
icmp_data.append('ICMP alternate host address')
elif icmp_type is dpkt.icmp.ICMP_ECHO:
icmp_data.append('ICMP echo')
elif icmp_type is dpkt.icmp.ICMP_RTRADVERT:
icmp_data.append('ICMP Route advertisement')
if icmp_code is dpkt.icmp.ICMP_RTRADVERT_NORMAL:
icmp_data.append(': normal')
elif icmp_code is dpkt.icmp.ICMP_RTRADVERT_NOROUTE_COMMON:
icmp_data.append(': selective routing')
elif icmp_type is dpkt.icmp.ICMP_RTRSOLICIT:
icmp_data.append('ICMP Router solicitation')
elif icmp_type is dpkt.icmp.ICMP_TIMEXCEED:
icmp_data.append('ICMP time exceeded, code:')
if icmp_code is dpkt.icmp.ICMP_TIMEXCEED_INTRANS:
icmp_data.append(' ttl==0 in transit')
elif icmp_code is dpkt.icmp.ICMP_TIMEXCEED_REASS: