Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, data: bytes, templates: Dict[int, list]):
self.header = IPFIXHeader(data[:IPFIXHeader.size])
self.sets = []
self._contains_new_templates = False
self._flows = []
self._templates = templates
offset = IPFIXHeader.size
while offset < self.header.length:
try:
new_set = IPFIXSet(data[offset:], templates)
except IPFIXTemplateNotRecognized:
raise
if new_set.is_template:
self._contains_new_templates = True
self._templates.update(new_set.templates)
for template_id, template_fields in self._templates.items():
if template_fields is None:
# Template withdrawal
del self._templates[template_id]
elif new_set.is_data:
self._flows += new_set.records
self.sets.append(new_set)
offset += new_set.get_length()
# Here all data should be processed and offset set to the length
if offset != self.header.length: