How to use the netflow.ipfix.IPFIXHeader.size function in netflow

To help you get started, we’ve selected a few netflow examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github bitkeks / python-netflow-v9-softflowd / netflow / ipfix.py View on Github external
def __init__(self, data: bytes, templates: Dict[int, list]):
        self.header = IPFIXHeader(data[:IPFIXHeader.size])
        self.sets = []
        self._contains_new_templates = False
        self._flows = []
        self._templates = templates

        offset = IPFIXHeader.size
        while offset < self.header.length:
            try:
                new_set = IPFIXSet(data[offset:], templates)
            except IPFIXTemplateNotRecognized:
                raise
            if new_set.is_template:
                self._contains_new_templates = True
                self._templates.update(new_set.templates)
                for template_id, template_fields in self._templates.items():
                    if template_fields is None:
github bitkeks / python-netflow-v9-softflowd / netflow / ipfix.py View on Github external
def __init__(self, data: bytes, templates: Dict[int, list]):
        self.header = IPFIXHeader(data[:IPFIXHeader.size])
        self.sets = []
        self._contains_new_templates = False
        self._flows = []
        self._templates = templates

        offset = IPFIXHeader.size
        while offset < self.header.length:
            try:
                new_set = IPFIXSet(data[offset:], templates)
            except IPFIXTemplateNotRecognized:
                raise
            if new_set.is_template:
                self._contains_new_templates = True
                self._templates.update(new_set.templates)
                for template_id, template_fields in self._templates.items():
                    if template_fields is None:
                        # Template withdrawal
                        del self._templates[template_id]
            elif new_set.is_data:
                self._flows += new_set.records

            self.sets.append(new_set)