Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
1: 'Enabled'}),
BitEnumField("willretainflag", 0, 1, {0: 'Disabled',
1: 'Enabled'}),
BitEnumField("willQOSflag", 0, 2, QOS_LEVEL),
BitEnumField("willflag", 0, 1, {0: 'Disabled',
1: 'Enabled'}),
BitEnumField("cleansess", 0, 1, {0: 'Disabled',
1: 'Enabled'}),
BitEnumField("reserved", 0, 1, {0: 'Disabled',
1: 'Enabled'}),
ShortField("klive", 0),
FieldLenField("clientIdlen", None, length_of="clientId"),
StrLenField("clientId", "",
length_from=lambda pkt: pkt.clientIdlen),
# Payload with optional fields depending on the flags
ConditionalField(FieldLenField("wtoplen", None, length_of="willtopic"),
lambda pkt: pkt.willflag == 1),
ConditionalField(StrLenField("willtopic", "",
length_from=lambda pkt: pkt.wtoplen),
lambda pkt: pkt.willflag == 1),
ConditionalField(FieldLenField("wmsglen", None, length_of="willmsg"),
lambda pkt: pkt.willflag == 1),
ConditionalField(StrLenField("willmsg", "",
length_from=lambda pkt: pkt.wmsglen),
lambda pkt: pkt.willflag == 1),
ConditionalField(FieldLenField("userlen", None, length_of="username"),
lambda pkt: pkt.usernameflag == 1),
ConditionalField(StrLenField("username", "",
length_from=lambda pkt: pkt.userlen),
lambda pkt: pkt.usernameflag == 1),
ConditionalField(FieldLenField("passlen", None, length_of="password"),
lambda pkt: pkt.passwordflag == 1),
val = 0
fields_desc = [FieldLenField("klen", None, length_of="k", fmt="B"),
StrLenField("k", "", length_from=lambda pkt: pkt.klen)]
def guess_payload_class(self, p):
return Padding
class ECPentanomialBasis(Packet):
name = "EC Pentanomial Basis"
val = 1
fields_desc = [FieldLenField("k1len", None, length_of="k1", fmt="B"),
StrLenField("k1", "", length_from=lambda pkt: pkt.k1len),
FieldLenField("k2len", None, length_of="k2", fmt="B"),
StrLenField("k2", "", length_from=lambda pkt: pkt.k2len),
FieldLenField("k3len", None, length_of="k3", fmt="B"),
StrLenField("k3", "", length_from=lambda pkt: pkt.k3len)]
def guess_payload_class(self, p):
return Padding
_tls_ec_basis_cls = {0: ECTrinomialBasis, 1: ECPentanomialBasis}
class _ECBasisTypeField(ByteEnumField):
__slots__ = ["basis_type_of"]
def __init__(self, name, default, enum, basis_type_of, remain=0):
self.basis_type_of = basis_type_of
EnumField.__init__(self, name, default, enum, "B")
return self.sprintf("ARP is at %hwsrc% says %psrc%")
return self.sprintf("ARP %op% %psrc% > %pdst%")
def l2_register_l3_arp(l2, l3):
return getmacbyip(l3.pdst)
conf.neighbor.register_l3(Ether, ARP, l2_register_l3_arp)
class GRErouting(Packet):
name = "GRE routing information"
fields_desc = [ShortField("address_family", 0),
ByteField("SRE_offset", 0),
FieldLenField("SRE_len", None, "routing_info", "B"),
StrLenField("routing_info", "", "SRE_len"),
]
class GRE(Packet):
name = "GRE"
fields_desc = [BitField("chksum_present", 0, 1),
BitField("routing_present", 0, 1),
BitField("key_present", 0, 1),
BitField("seqnum_present", 0, 1),
BitField("strict_route_source", 0, 1),
BitField("recursion_control", 0, 3),
BitField("flags", 0, 5),
BitField("version", 0, 3),
XShortEnumField("proto", 0x0000, ETHER_TYPES),
ConditionalField(XShortField("chksum", None), lambda pkt:pkt.chksum_present == 1 or pkt.routing_present == 1), # noqa: E501
0x0103: 'Host-Uniq',
0x0104: 'AC-Cookie',
0x0105: 'Vendor-Specific',
0x0106: 'Credits',
0x0107: 'Metrics',
0x0108: 'Sequence Number',
0x0109: 'Credit Scale Factor',
0x0110: 'Relay-Session-Id',
0x0120: 'PPP-Max-Payload',
0x0201: 'Service-Name-Error',
0x0202: 'AC-System-Error',
0x0203: 'Generic-Error'}
fields_desc = [
ShortEnumField('tag_type', None, tag_list),
FieldLenField('tag_len', None, length_of='tag_value', fmt='H'),
StrLenField('tag_value', '', length_from=lambda pkt:pkt.tag_len)
]
def extract_padding(self, s):
return '', s
class PPPoED_Tags(Packet):
name = "PPPoE Tag List"
fields_desc = [PacketListField('tag_list', None, PPPoETag)]
_PPP_PROTOCOLS = {
0x0001: "Padding Protocol",
0x0003: "ROHC small-CID [RFC3095]",
0x0005: "ROHC large-CID [RFC3095]",
class TLSCertificateRequest(_TLSHandshake):
name = "TLS Handshake - Certificate Request"
fields_desc = [ByteEnumField("msgtype", 13, _tls_handshake_type),
ThreeBytesField("msglen", None),
FieldLenField("ctypeslen", None, fmt="B",
length_of="ctypes"),
_CertTypesField("ctypes", [1, 64],
_tls_client_certificate_types,
itemfmt="!B",
length_from=lambda pkt: pkt.ctypeslen),
SigAndHashAlgsLenField("sig_algs_len", None,
length_of="sig_algs"),
SigAndHashAlgsField("sig_algs", [0x0403, 0x0401, 0x0201],
EnumField("hash_sig", None, _tls_hash_sig), # noqa: E501
length_from=lambda pkt: pkt.sig_algs_len), # noqa: E501
FieldLenField("certauthlen", None, fmt="!H",
length_of="certauth"),
_CertAuthoritiesField("certauth", [],
length_from=lambda pkt: pkt.certauthlen)] # noqa: E501
class TLS13CertificateRequest(_TLSHandshake):
name = "TLS 1.3 Handshake - Certificate Request"
fields_desc = [ByteEnumField("msgtype", 13, _tls_handshake_type),
ThreeBytesField("msglen", None),
FieldLenField("cert_req_ctxt_len", None, fmt="B",
length_of="cert_req_ctxt"),
StrLenField("cert_req_ctxt", "",
length_from=lambda pkt: pkt.cert_req_ctxt_len),
_ExtensionsLenField("extlen", None, length_of="ext"),
_ExtensionsField("ext", None,
length_from=lambda pkt: pkt.msglen -
clsname = "CDPAddrRecordIPv4"
elif proto == _cdp_addrrecord_proto_ipv6:
clsname = "CDPAddrRecordIPv6"
else:
clsname = "CDPAddrRecord"
cls = globals()[clsname]
return cls(p, **kargs)
class CDPMsgAddr(CDPMsgGeneric):
name = "Addresses"
fields_desc = [XShortEnumField("type", 0x0002, _cdp_tlv_types),
ShortField("len", None),
FieldLenField("naddr", None, "addr", "!I"),
PacketListField("addr", [], _CDPGuessAddrRecord, count_from=lambda x:x.naddr)] # noqa: E501
def post_build(self, pkt, pay):
if self.len is None:
tmp_len = 8 + len(self.addr) * 9
pkt = pkt[:2] + struct.pack("!H", tmp_len) + pkt[4:]
p = pkt + pay
return p
class CDPMsgPortID(CDPMsgGeneric):
name = "Port ID"
fields_desc = [XShortEnumField("type", 0x0003, _cdp_tlv_types),
FieldLenField("len", None, "iface", "!H"),
StrLenField("iface", "Port 1", length_from=lambda x:x.len - 4)] # noqa: E501
# Elliptic Curve Diffie-Hellman
_tls_ec_curve_types = {1: "explicit_prime",
2: "explicit_char2",
3: "named_curve"}
_tls_ec_basis_types = {0: "ec_basis_trinomial", 1: "ec_basis_pentanomial"}
class ECCurvePkt(Packet):
name = "Elliptic Curve"
fields_desc = [FieldLenField("alen", None, length_of="a", fmt="B"),
StrLenField("a", "", length_from=lambda pkt: pkt.alen),
FieldLenField("blen", None, length_of="b", fmt="B"),
StrLenField("b", "", length_from=lambda pkt: pkt.blen)]
# Char2 Curves
class ECTrinomialBasis(Packet):
name = "EC Trinomial Basis"
val = 0
fields_desc = [FieldLenField("klen", None, length_of="k", fmt="B"),
StrLenField("k", "", length_from=lambda pkt: pkt.klen)]
def guess_payload_class(self, p):
return Padding
class ECPentanomialBasis(Packet):
return "", s
class OSPFv3_Link_LSA(OSPF_BaseLSA):
name = "OSPFv3 Link LSA"
fields_desc = [ShortField("age", 1),
ShortEnumField("type", 0x0008, _OSPFv3_LStypes),
IPField("id", "0.0.0.0"),
IPField("adrouter", "1.1.1.1"),
XIntField("seq", 0x80000001),
XShortField("chksum", None),
ShortField("len", None),
ByteField("prio", 1),
OSPFv3OptionsField(),
IP6Field("lladdr", "fe80::"),
FieldLenField("prefixes", None, count_of="prefixlist", fmt="I"), # noqa: E501
PacketListField("prefixlist", None, OSPFv3_Prefix_Item,
count_from=lambda pkt: pkt.prefixes)]
class OSPFv3_Intra_Area_Prefix_LSA(OSPF_BaseLSA):
name = "OSPFv3 Intra Area Prefix LSA"
fields_desc = [ShortField("age", 1),
ShortEnumField("type", 0x2009, _OSPFv3_LStypes),
IPField("id", "0.0.0.0"),
IPField("adrouter", "1.1.1.1"),
XIntField("seq", 0x80000001),
XShortField("chksum", None),
ShortField("len", None),
FieldLenField("prefixes", None, count_of="prefixlist", fmt="H"), # noqa: E501
ShortEnumField("reflstype", 0, _OSPFv3_LStypes),
IPField("reflsid", "0.0.0.0"),
ConditionalField(FieldLenField("wtoplen", None, length_of="willtopic"),
lambda pkt: pkt.willflag == 1),
ConditionalField(StrLenField("willtopic", "",
length_from=lambda pkt: pkt.wtoplen),
lambda pkt: pkt.willflag == 1),
ConditionalField(FieldLenField("wmsglen", None, length_of="willmsg"),
lambda pkt: pkt.willflag == 1),
ConditionalField(StrLenField("willmsg", "",
length_from=lambda pkt: pkt.wmsglen),
lambda pkt: pkt.willflag == 1),
ConditionalField(FieldLenField("userlen", None, length_of="username"),
lambda pkt: pkt.usernameflag == 1),
ConditionalField(StrLenField("username", "",
length_from=lambda pkt: pkt.userlen),
lambda pkt: pkt.usernameflag == 1),
ConditionalField(FieldLenField("passlen", None, length_of="password"),
lambda pkt: pkt.passwordflag == 1),
ConditionalField(StrLenField("password", "",
length_from=lambda pkt: pkt.passlen),
lambda pkt: pkt.passwordflag == 1),
]
RETURN_CODE = {
0: 'Connection Accepted',
1: 'Unacceptable protocol version',
2: 'Identifier rejected',
3: 'Server unavailable',
4: 'Bad username/password',
5: 'Not authorized'
}
class SigAndHashAlgsLenField(FieldLenField):
"""Used in TLS_Ext_SignatureAlgorithms and TLSCertificateResquest."""
phantom_value = 0
getfield = phantom_decorate(FieldLenField.getfield, True)
addfield = phantom_decorate(FieldLenField.addfield, False)
class SigAndHashAlgsField(FieldListField):
"""Used in TLS_Ext_SignatureAlgorithms and TLSCertificateResquest."""
phantom_value = []
getfield = phantom_decorate(FieldListField.getfield, True)
addfield = phantom_decorate(FieldListField.addfield, False)
class SigLenField(FieldLenField):
"""There is a trick for SSLv2, which uses implicit lengths..."""
def getfield(self, pkt, s):
v = pkt.tls_session.tls_version
if v and v < 0x0300:
return s, None
return super(SigLenField, self).getfield(pkt, s)
def addfield(self, pkt, s, val):
"""With SSLv2 you will never be able to add a sig_len."""
v = pkt.tls_session.tls_version
if v and v < 0x0300:
return s
return super(SigLenField, self).addfield(pkt, s, val)