Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_custom_expiration(self):
class custom_expire(NFPlugin):
def on_update(self, obs, entry):
if entry.bidirectional_packets == 10:
entry.expiration_id = -1
entry.custom_expire = True
print("\n----------------------------------------------------------------------")
streamer_test = NFStreamer(source='tests/pcap/facebook.pcap',
plugins=[custom_expire(volatile=True)],
bpf_filter="src port 52066 or dst port 52066")
rs = []
for flow in streamer_test:
rs.append(flow)
self.assertEqual(rs[0].expiration_id, -1)
self.assertEqual(len(rs), 2)
del streamer_test
print("{}\t: \033[94mOK\033[0m".format(".Testing custom expiration".ljust(60, ' ')))
def test_user_plugins(self):
class feat_1(NFPlugin):
def on_update(self, obs, entry):
if entry.bidirectional_packets == 4:
entry.feat_1 = obs.ip_size
print("\n----------------------------------------------------------------------")
streamer_test = NFStreamer(source='tests/pcap/facebook.pcap',
plugins=[feat_1()],
bpf_filter="src port 52066 or dst port 52066")
rs = []
for flow in streamer_test:
rs.append(flow)
self.assertEqual(rs[0].feat_1, 248)
self.assertEqual(len(rs), 1)
del streamer_test
print("{}\t: \033[94mOK\033[0m".format(".Testing adding user plugins".ljust(60, ' ')))
import argparse
import logging
import dpkt
import pandas as pd
import nfstream
import numpy as np
import settings
from feature_processing import calc_raw_features, calc_flow_features, RMI
logger = logging.getLogger('flow_parser')
class raw_packets_matrix(nfstream.NFPlugin):
@staticmethod
def _fill_flow_stats(obs, raw_feature_matrix, counter=0):
raw_feature_matrix[counter, RMI.TIMESTAMP] = obs.time
raw_feature_matrix[counter, RMI.IP_LEN] = obs.ip_size
raw_feature_matrix[counter, RMI.TRANSP_PAYLOAD] = obs.payload_size
raw_feature_matrix[counter, RMI.TCP_FLAGS] = int(''.join(str(i) for i in obs.tcpflags), 2)
if obs.protocol == 6 and obs.version == 4:
packet = dpkt.ip.IP(obs.ip_packet)
raw_feature_matrix[counter, RMI.TCP_WINDOW] = packet.data.win
raw_feature_matrix[counter, RMI.IP_PROTO] = obs.protocol
raw_feature_matrix[counter, RMI.IS_CLIENT] = 1 if obs.direction == 0 else 0
return raw_feature_matrix
def on_init(self, obs):
raw_feature_matrix = np.zeros((settings.PACKET_LIMIT_PER_FLOW, 7))
return self._fill_flow_stats(obs, raw_feature_matrix)