Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def on_flow_init(self, flow):
NFStreamClassifier.on_flow_init(self, flow)
flow.classifiers[self.name]['ndpi_flow'] = NDPIFlowStruct()
memset(byref(flow.classifiers[self.name]['ndpi_flow']), 0, sizeof(NDPIFlowStruct))
flow.classifiers[self.name]['detected_protocol'] = NDPIProtocol()
flow.classifiers[self.name]['detection_completed'] = 0
flow.classifiers[self.name]['src_id'] = pointer(NDPIIdStruct())
flow.classifiers[self.name]['dst_id'] = pointer(NDPIIdStruct())
flow.classifiers[self.name]['application_name'] = ''
flow.classifiers[self.name]['category_name'] = ''
flow.classifiers[self.name]['guessed'] = 0
('pplive_stage2', c_uint8, 2),
('pplive_stage3', c_uint8, 2),
('starcraft_udp_stage', c_uint8, 3),
('ovpn_session_id', c_uint8 * 8),
('ovpn_counter', c_uint8),
('tinc_state', c_uint8),
('TincCacheEntry', TincCacheEntry),
('csgo_strid', c_uint8 * 18),
('csgo_state', c_uint8),
('csgo_s2', c_uint8),
('csgo_id2', c_uint32),
('kxun_counter', c_uint16),
('iqiyi_counter', c_uint16),
('packet', NDPIPacketStruct),
('flow', POINTER(NDPIFlowStruct)),
('src', POINTER(NDPIIdStruct)),
('dst', POINTER(NDPIIdStruct))
]
# ----------------------------------------------- nDPI APIs ------------------------------------------------------------
""" ndpi_detection_giveup: Function to be called before we give up with detection for a given flow.
This function reduces the NDPI_UNKNOWN_PROTOCOL detection. """
ndpi.ndpi_detection_giveup.restype = NDPIProtocol
ndpi.ndpi_detection_giveup.argtypes = [POINTER(NDPIDetectionModuleStruct),
POINTER(NDPIFlowStruct), c_uint8,
POINTER(c_uint8)]
""" ndpi_detection_process_packet: Processes one packet and returns the ID of the detected protocol.
This is the MAIN PACKET PROCESSING FUNCTION. """
ndpi.ndpi_detection_process_packet.restype = NDPIProtocol
ndpi.ndpi_detection_process_packet.argtypes = [POINTER(NDPIDetectionModuleStruct),