Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def run(self):
# Process packets from the queue
try:
templates = {"netflow": {}, "ipfix": {}}
to_retry = []
while not self._shutdown.is_set():
try:
# 0.5s delay to limit CPU usage while waiting for new packets
pkt = self.input.get(block=True, timeout=0.5) # type: RawPacket
except queue.Empty:
continue
try:
# templates is passed as reference, updated in V9ExportPacket
export = parse_packet(pkt.data, templates)
except UnknownExportVersion as e:
logger.error("%s, ignoring the packet", e)
continue
except (V9TemplateNotRecognized, IPFIXTemplateNotRecognized):
# TODO: differentiate between v9 and IPFIX, use separate to_retry lists
if time.time() - pkt.ts > PACKET_TIMEOUT:
logger.warning("Dropping an old and undecodable v9/IPFIX ExportPacket")
else:
to_retry.append(pkt)
logger.debug("Failed to decode a v9/IPFIX ExportPacket - will "
"re-attempt when a new template is discovered")
continue
if export.header.version == 10:
logger.debug("Processed an IPFIX ExportPacket with length %d.", export.header.length)
else:
logger.debug("Processed a v%d ExportPacket with %d flows.",
dec = data.decode()
data = bytes.fromhex(dec)
except UnicodeDecodeError:
# use data as given, assuming hex-formatted bytes
pass
version = get_export_version(data)
if version == 1:
return V1ExportPacket(data)
elif version == 5:
return V5ExportPacket(data)
elif version == 9:
return V9ExportPacket(data, templates["netflow"])
elif version == 10:
return IPFIXExportPacket(data, templates["ipfix"])
raise UnknownExportVersion(data, version)