Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_no_unknown_protocols_without_timeouts(self):
files = get_files_list("tests/pcap/")
ground_truth_ndpi = build_ground_truth_dict("tests/result/")
print("\n----------------------------------------------------------------------")
print(".Testing on {} applications:".format(len(files)))
ok_files = []
ko_files = []
for test_file in files:
streamer_test = NFStreamer(source=test_file, idle_timeout=31556952, active_timeout=31556952)
test_case_name = test_file.split('/')[-1]
result = {}
for flow in streamer_test:
if flow.application_name != 'Unknown':
try:
result[flow.application_name]['bytes'] += flow.bidirectional_raw_bytes
result[flow.application_name]['flows'] += 1
result[flow.application_name]['pkts'] += flow.bidirectional_packets
except KeyError:
result[flow.application_name] = {"bytes": flow.bidirectional_raw_bytes,
'flows': 1, 'pkts': flow.bidirectional_packets}
if result == ground_truth_ndpi[test_case_name]:
ok_files.append(test_case_name)
print("{}\t: \033[94mOK\033[0m".format(test_case_name.ljust(60, ' ')))
else:
ko_files.append(test_case_name)
def test_flow_metadata_extraction(self):
print("\n----------------------------------------------------------------------")
streamer_test = NFStreamer(source='tests/pcap/facebook.pcap', bpf_filter="src port 52066 or dst port 52066")
flows = []
for flow in streamer_test:
flows.append(flow)
del streamer_test
self.assertEqual(flows[0].client_info, 'facebook.com')
self.assertEqual(flows[0].server_info, '*.facebook.com,*.facebook.net,*.fb.com,*.fbcdn.net,*.fbsbx.com,\
*.m.facebook.com,*.messenger.com,*.xx.fbcdn.net,*.xy.fbcdn.net,*.xz.fbcdn.net,facebook.com,fb.com,\
messenger.com')
self.assertEqual(flows[0].client_info, 'facebook.com')
self.assertEqual(flows[0].j3a_client, 'bfcc1a3891601edb4f137ab7ab25b840')
self.assertEqual(flows[0].j3a_server, '2d1eb5817ece335c24904f516ad5da12')
print("{}\t: \033[94mOK\033[0m".format(".Testing metadata extraction".ljust(60, ' ')))
def test_custom_expiration(self):
class custom_expire(NFPlugin):
def on_update(self, obs, entry):
if entry.bidirectional_packets == 10:
entry.expiration_id = -1
entry.custom_expire = True
print("\n----------------------------------------------------------------------")
streamer_test = NFStreamer(source='tests/pcap/facebook.pcap',
plugins=[custom_expire(volatile=True)],
bpf_filter="src port 52066 or dst port 52066")
rs = []
for flow in streamer_test:
rs.append(flow)
self.assertEqual(rs[0].expiration_id, -1)
self.assertEqual(len(rs), 2)
del streamer_test
print("{}\t: \033[94mOK\033[0m".format(".Testing custom expiration".ljust(60, ' ')))
def test_to_pandas_anonymized(self):
print("\n----------------------------------------------------------------------")
df = NFStreamer(source='tests/pcap/ethereum.pcap',
idle_timeout=31556952,
active_timeout=31556952).to_pandas(ip_anonymization=True)
self.assertEqual(df.shape[0], 74)
self.assertEqual(df.shape[1], 37)
print("{}\t: \033[94mOK\033[0m".format(".Testing to Pandas ip_anonymization=True".ljust(60, ' ')))
def test_expiration_management(self):
print("\n----------------------------------------------------------------------")
streamer_test = NFStreamer(source='tests/pcap/facebook.pcap', active_timeout=0)
flows = []
for flow in streamer_test:
flows.append(flow)
self.assertEqual(len(flows), 60)
print("{}\t: \033[94mOK\033[0m".format(".Testing Streamer expiration management".ljust(60, ' ')))
def test_bpf_filter(self):
print("\n----------------------------------------------------------------------")
streamer_test = NFStreamer(source='tests/pcap/facebook.pcap',
statistics=True,
bpf_filter="src port 52066 or dst port 52066")
count = 0
for flow in streamer_test:
print(flow)
print(flow.to_namedtuple())
print(flow.to_json())
count = count + 1
self.assertEqual(flow.src_port, 52066)
self.assertEqual(count, 1)
del streamer_test
print("{}\t: \033[94mOK\033[0m".format(".Testing BPF filtering".ljust(60, ' ')))
def _init_streamer(source, online_mode=False):
# since we decide and set routing policy upon first occurrence of a flow we don't care about its re-export
idle_timeout = 10 if online_mode else 10e5
return nfstream.NFStreamer(source=source,
statistics=False,
idle_timeout=60,
active_timeout=idle_timeout,
plugins=[raw_packets_matrix(volatile=False)],
enable_guess=True)
nfstream is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License
as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
nfstream is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with nfstream.
If not, see .
"""
from nfstream import NFStreamer
import sys
path = sys.argv[1]
flow_streamer = NFStreamer(source=path)
result = {}
try:
for flow in flow_streamer:
print(flow)
try:
result[flow.application_name] += flow.bidirectional_packets
except KeyError:
result[flow.application_name] = flow.bidirectional_packets
print("Summary (Application Name: Packets):")
print(result)
except KeyboardInterrupt:
print("Summary (Application Name: Packets):")
print(result)
print("Terminated.")
of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with nfstream.
If not, see .
"""
from nfstream import NFStreamer
import sys
import datetime
path = sys.argv[1]
output_file_name = path + ".csv"
print("nfstream processing started. Use Ctrl+C to interrupt and save.")
start = datetime.datetime.now()
total_flows = NFStreamer(source=path, statistics=True).to_csv(path=output_file_name)
end = datetime.datetime.now()
print("\nnfstream processed {} flows and saved them in file: {}".format(total_flows, output_file_name))
print("Processing time: {}".format(end - start))