Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def send_recv_packets(packets, delay=0.0001) -> (list, float, float):
"""Starts a listener, send packets, receives packets
returns a tuple: ([(ts, export), ...], time_started_sending, time_stopped_sending)
"""
listener = ThreadedNetFlowListener(*CONNECTION)
tstart = time.time()
emit_packets(packets, delay=delay)
time.sleep(0.5) # Allow packets to be sent and recieved
tend = time.time()
listener.start()
pkts = []
while True:
try:
pkts.append(listener.get(timeout=0.5))
except queue.Empty:
break
listener.stop()
listener.join()
return pkts, tstart, tend
def send_recv_packets(packets, delay=0.0001, store_packets=-1) -> (list, float, float):
"""Starts a listener, send packets, receives packets
returns a tuple: ([(ts, export), ...], time_started_sending, time_stopped_sending)
"""
listener = ThreadedNetFlowListener(*CONNECTION)
tstart = time.time()
emit_packets(packets, delay=delay)
time.sleep(0.5) # Allow packets to be sent and recieved
tend = time.time()
listener.start()
pkts = []
to_pad = 0
while True:
try:
packet = listener.get(timeout=0.5)
if -1 == store_packets or store_packets > 0:
# Case where a programm yields from the queue and stores all packets.
pkts.append(packet)
if store_packets != -1 and len(pkts) > store_packets:
to_pad += len(pkts) # Hack for testing
def get_export_packets(host: str, port: int) -> ParsedPacket:
"""A threaded generator that will yield ExportPacket objects until it is killed
"""
listener = ThreadedNetFlowListener(host, port)
listener.start()
try:
while True:
yield listener.get()
finally:
listener.stop()
listener.join()
if not field_type and type(field) is not TemplateFieldEnterprise:
# This should break, since the exporter seems to use a field identifier
# which is not standardized by IANA.
raise NotImplementedError("Field type with ID {} is not implemented".format(field_type_id))
datatype = field_type.type # type: str
discovered_fields.append((field_type.name, field_type_id))
# Catch fields which are meant to be raw bytes and skip the rest
if IPFIXDataTypes.is_bytes(datatype):
unpacker += "{}s".format(field_length)
continue
# Go into int, uint, float types
issigned = IPFIXDataTypes.is_signed(datatype)
isfloat = IPFIXDataTypes.is_float(datatype)
assert not (all([issigned, isfloat])) # signed int and float are exclusive
if field_length == 1:
unpacker += "b" if issigned else "B"
elif field_length == 2:
unpacker += "h" if issigned else "H"
elif field_length == 4:
unpacker += "i" if issigned else "f" if isfloat else "I"
elif field_length == 8:
unpacker += "q" if issigned else "d" if isfloat else "Q"
else:
raise IPFIXTemplateError("Template field_length {} not handled in unpacker".format(field_length))
# Finally, unpack the data byte stream according to format defined in iteration above
pack = struct.unpack(unpacker, data[0:offset])
field_type = IPFIXFieldTypes.by_id(field_type_id) # type: Optional[FieldType]
if not field_type and type(field) is not TemplateFieldEnterprise:
# This should break, since the exporter seems to use a field identifier
# which is not standardized by IANA.
raise NotImplementedError("Field type with ID {} is not implemented".format(field_type_id))
datatype = field_type.type # type: str
discovered_fields.append((field_type.name, field_type_id))
# Catch fields which are meant to be raw bytes and skip the rest
if IPFIXDataTypes.is_bytes(datatype):
unpacker += "{}s".format(field_length)
continue
# Go into int, uint, float types
issigned = IPFIXDataTypes.is_signed(datatype)
isfloat = IPFIXDataTypes.is_float(datatype)
assert not (all([issigned, isfloat])) # signed int and float are exclusive
if field_length == 1:
unpacker += "b" if issigned else "B"
elif field_length == 2:
unpacker += "h" if issigned else "H"
elif field_length == 4:
unpacker += "i" if issigned else "f" if isfloat else "I"
elif field_length == 8:
unpacker += "q" if issigned else "d" if isfloat else "Q"
else:
raise IPFIXTemplateError("Template field_length {} not handled in unpacker".format(field_length))
# Finally, unpack the data byte stream according to format defined in iteration above
pack = struct.unpack(unpacker, data[0:offset])
offset += field_length
# Here, reduced-size encoding of fields blocks the usage of IPFIXFieldTypes.get_type_unpack.
# See comment in IPFIXFieldTypes.get_type_unpack for more information.
field_type = IPFIXFieldTypes.by_id(field_type_id) # type: Optional[FieldType]
if not field_type and type(field) is not TemplateFieldEnterprise:
# This should break, since the exporter seems to use a field identifier
# which is not standardized by IANA.
raise NotImplementedError("Field type with ID {} is not implemented".format(field_type_id))
datatype = field_type.type # type: str
discovered_fields.append((field_type.name, field_type_id))
# Catch fields which are meant to be raw bytes and skip the rest
if IPFIXDataTypes.is_bytes(datatype):
unpacker += "{}s".format(field_length)
continue
# Go into int, uint, float types
issigned = IPFIXDataTypes.is_signed(datatype)
isfloat = IPFIXDataTypes.is_float(datatype)
assert not (all([issigned, isfloat])) # signed int and float are exclusive
if field_length == 1:
unpacker += "b" if issigned else "B"
elif field_length == 2:
unpacker += "h" if issigned else "H"
elif field_length == 4:
unpacker += "i" if issigned else "f" if isfloat else "I"
elif field_length == 8:
unpacker += "q" if issigned else "d" if isfloat else "Q"
def get_type_unpack(cls, key: Union[int, str]) -> Optional[DataType]:
"""
This method covers the mapping from a field type to a struct.unpack format string.
BLOCKED: due to Reduced-Size Encoding, fields may be exported with a smaller length than defined in
the standard. Because of this mismatch, the parser in `IPFIXDataRecord.__init__` cannot use this method.
:param key:
:return:
"""
item = None
if type(key) == int:
item = cls.by_id(key)
elif type(key) == str:
item = cls.by_name(key)
if not item:
return None
return IPFIXDataTypes.by_name(item.type)
self.fields = set()
offset = 0
unpacker = "!"
discovered_fields = []
# Iterate through all fields of this template and build the unpack format string
# See https://www.iana.org/assignments/ipfix/ipfix.xhtml
for index, field in enumerate(template):
field_type_id = field.id
field_length = field.length
offset += field_length
# Here, reduced-size encoding of fields blocks the usage of IPFIXFieldTypes.get_type_unpack.
# See comment in IPFIXFieldTypes.get_type_unpack for more information.
field_type = IPFIXFieldTypes.by_id(field_type_id) # type: Optional[FieldType]
if not field_type and type(field) is not TemplateFieldEnterprise:
# This should break, since the exporter seems to use a field identifier
# which is not standardized by IANA.
raise NotImplementedError("Field type with ID {} is not implemented".format(field_type_id))
datatype = field_type.type # type: str
discovered_fields.append((field_type.name, field_type_id))
# Catch fields which are meant to be raw bytes and skip the rest
if IPFIXDataTypes.is_bytes(datatype):
unpacker += "{}s".format(field_length)
continue
# Go into int, uint, float types
issigned = IPFIXDataTypes.is_signed(datatype)
isfloat = IPFIXDataTypes.is_float(datatype)
def data(self):
return {
IPFIXFieldTypes.by_id(key)[1]: value for (key, value) in self.fields
}
def __init__(self, data: bytes, templates: Dict[int, list]):
self.header = IPFIXHeader(data[:IPFIXHeader.size])
self.sets = []
self._contains_new_templates = False
self._flows = []
self._templates = templates
offset = IPFIXHeader.size
while offset < self.header.length:
try:
new_set = IPFIXSet(data[offset:], templates)
except IPFIXTemplateNotRecognized:
raise
if new_set.is_template:
self._contains_new_templates = True
self._templates.update(new_set.templates)
for template_id, template_fields in self._templates.items():
if template_fields is None: