Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
templates = {"netflow": {}, "ipfix": {}}
to_retry = []
while not self._shutdown.is_set():
try:
# 0.5s delay to limit CPU usage while waiting for new packets
pkt = self.input.get(block=True, timeout=0.5) # type: RawPacket
except queue.Empty:
continue
try:
# templates is passed as reference, updated in V9ExportPacket
export = parse_packet(pkt.data, templates)
except UnknownExportVersion as e:
logger.error("%s, ignoring the packet", e)
continue
except (V9TemplateNotRecognized, IPFIXTemplateNotRecognized):
# TODO: differentiate between v9 and IPFIX, use separate to_retry lists
if time.time() - pkt.ts > PACKET_TIMEOUT:
logger.warning("Dropping an old and undecodable v9/IPFIX ExportPacket")
else:
to_retry.append(pkt)
logger.debug("Failed to decode a v9/IPFIX ExportPacket - will "
"re-attempt when a new template is discovered")
continue
if export.header.version == 10:
logger.debug("Processed an IPFIX ExportPacket with length %d.", export.header.length)
else:
logger.debug("Processed a v%d ExportPacket with %d flows.",
export.header.version, export.header.count)
# If any new templates were discovered, dump the unprocessable
def __init__(self, data, templates):
pack = struct.unpack('!HH', data[:4])
self.template_id = pack[0] # flowset_id is reference to a template_id
self.length = pack[1]
self.flows = []
offset = 4
if self.template_id not in templates:
raise V9TemplateNotRecognized
template = templates[self.template_id]
# As the field lengths are variable V9 has padding to next 32 Bit
padding_size = 4 - (self.length % 4) # 4 Byte
while offset <= (self.length - padding_size):
new_record = V9DataRecord()
for field in template.fields:
flen = field.field_length
fkey = V9_FIELD_TYPES[field.field_type]
# The length of the value byte slice is defined in the template
dataslice = data[offset:offset + flen]