Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _decode_lldp(host, interface, instance, wallclock, pktstart, pktend):
"""Decode LLDP packet into a JSON discovery packet"""
# print('DECODING LLDP PACKET!<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<', file=stderr)
thisportinfo = pyConfigContext(
init={"ConnectsToHost": host, "ConnectsToInterface": interface}
)
switchinfo = pyConfigContext(init={"ports": pyConfigContext()})
metadata = pyConfigContext(
init={
"discovertype": "__LinkDiscovery",
"description": "Link Level Switch Discovery (lldp)",
"source": "_decode_lldp()",
"host": host,
"instance": instance,
"localtime": str(wallclock),
"data": switchinfo,
}
)
capnames = [
None,
CMAconsts.ROLE_repeater,
CMAconsts.ROLE_bridge,
CMAconsts.ROLE_AccessPoint,
CMAconsts.ROLE_router,
def _decode_cdp(host, interface, instance, wallclock, pktstart, pktend):
"Decode CDP packet into a JSON discovery packet"
thisportinfo = pyConfigContext(
init={"ConnectsToHost": host, "ConnectsToInterface": interface}
)
switchinfo = pyConfigContext(init={"ports": pyConfigContext()})
metadata = pyConfigContext(
init={
"discovertype": "__LinkDiscovery",
"description": "Link Level Switch Discovery (cdp)",
"source": "_decode_cdp()",
"host": host,
"instance": instance,
"localtime": str(wallclock),
"data": switchinfo,
}
)
sourcemacptr = pySwitchDiscovery._byteNaddr(cast(pktstart, cClass.guint8), 6)
if not sourcemacptr:
def _decode_cdp(host, interface, instance, wallclock, pktstart, pktend):
"Decode CDP packet into a JSON discovery packet"
thisportinfo = pyConfigContext(
init={"ConnectsToHost": host, "ConnectsToInterface": interface}
)
switchinfo = pyConfigContext(init={"ports": pyConfigContext()})
metadata = pyConfigContext(
init={
"discovertype": "__LinkDiscovery",
"description": "Link Level Switch Discovery (cdp)",
"source": "_decode_cdp()",
"host": host,
"instance": instance,
"localtime": str(wallclock),
"data": switchinfo,
}
)
sourcemacptr = pySwitchDiscovery._byteNaddr(cast(pktstart, cClass.guint8), 6)
if not sourcemacptr:
return metadata
Cmacaddr = netaddr_mac48_new(sourcemacptr)
sourcemac = pyNetAddr(None, Cstruct=Cmacaddr)
return self._Cstruct[0].u.intvalue != 0
elif vtype == CFG_INT64:
return int(self._Cstruct[0].u.intvalue)
elif vtype == CFG_STRING:
strval = self._Cstruct[0].u.strvalue
value = strval.data.decode("utf8") if isinstance(strval, String) else strval
assert isinstance(value, str)
return value
# return u_string_at(self._Cstruct[0].u.strvalue)
return str(self._Cstruct[0].u.strvalue)
elif vtype == CFG_FLOAT:
return float(self._Cstruct[0].u.floatvalue)
elif vtype == CFG_CFGCTX:
# We're creating a new reference to the pre-existing NetAddr
CCref(self._Cstruct[0].u.cfgctxvalue)
return pyConfigContext(Cstruct=self._Cstruct[0].u.cfgctxvalue)
elif vtype == CFG_NETADDR:
net = pyNetAddr(None, Cstruct=self._Cstruct[0].u.addrvalue)
# We're creating a new reference to the pre-existing NetAddr
CCref(net._Cstruct)
return net
elif vtype == CFG_FRAME:
# Cstruct2Frame calls CCref() - so we don't need to
return pyFrame.Cstruct2Frame(self._Cstruct[0].u.framevalue)
elif vtype == CFG_ARRAY:
# An Array is a linked list (GSList) of ConfigValue objects...
ret = []
this = self._Cstruct[0].u.arrayvalue
while this:
dataptr = cast(this[0].data, struct__GSList._fields_[0][1])
dataptr = cast(dataptr, cClass.ConfigValue)
CCref(dataptr)
print(f"INDEX IS {idx}", file=stderr)
value = array[idx] # possible IndexError or TypeError
assert not isinstance(value, bytes)
if suffix is None:
assert not isinstance(value, bytes)
return value
except (TypeError, IndexError, ValueError):
return alternative
return value.deepget(suffix, alternative)
print(f"prefix {prefix} IN self {self}")
prefixvalue = self[prefix]
assert not isinstance(prefixvalue, bytes)
if suffix is None:
return prefixvalue
if not isinstance(prefixvalue, pyConfigContext):
return alternative
gotten = prefixvalue.deepget(suffix, alternative)
assert not isinstance(gotten, bytes)
return gotten
elif tlvtype == LLDP_TLV_MGMT_ADDR: #########################################
addrlen = pySwitchDiscovery._byte0(tlvptr)
byte1addr = pySwitchDiscovery._byte1addr(tlvptr)
value = pySwitchDiscovery._decode_netaddr(byte1addr, addrlen)
elif tlvtype == LLDP_TLV_SYS_CAPS: #########################################
byte0 = pySwitchDiscovery._byte0(tlvptr)
byte1 = pySwitchDiscovery._byteN(tlvptr, 1)
byte2 = pySwitchDiscovery._byteN(tlvptr, 2)
byte3 = pySwitchDiscovery._byteN(tlvptr, 3)
caps0 = byte0 << 8 | byte1
caps1 = byte2 << 8 | byte3
# The values we assign here have many possibilities
# pylint: disable=R0204
value = pyConfigContext()
mask = 2
for j in range(1, 7):
if caps0 & mask:
value[capnames[j]] = (caps1 & mask) != 0
mask <<= 1
elif tlvtype == LLDP_TLV_ORG_SPECIFIC: ######################################
pySwitchDiscovery._decode_lldp_org_specific(
switchinfo, thisportinfo, tlvptr, tlvlen, pktend
)
if value is not None:
if tlvtype == LLDP_TLV_PID:
switchinfo["ports"][value] = thisportinfo
thisportinfo["PortId"] = value
numericpart = value
def _decode_lldp(host, interface, instance, wallclock, pktstart, pktend):
"""Decode LLDP packet into a JSON discovery packet"""
# print('DECODING LLDP PACKET!<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<', file=stderr)
thisportinfo = pyConfigContext(
init={"ConnectsToHost": host, "ConnectsToInterface": interface}
)
switchinfo = pyConfigContext(init={"ports": pyConfigContext()})
metadata = pyConfigContext(
init={
"discovertype": "__LinkDiscovery",
"description": "Link Level Switch Discovery (lldp)",
"source": "_decode_lldp()",
"host": host,
"instance": instance,
"localtime": str(wallclock),
"data": switchinfo,
}
)
capnames = [
None,