Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
'''
Reads a pcap specified by the file path and returns an array of the
computed model inputs
Args:
filepath: Path to pcap to compute features for
Returns:
features: Numpy 2D array containing features for each time bin
timestamp: datetime of the last observed packet
'''
# Read the capture into a feature array
X = []
timestamps = []
binned_sessions = sessionizer(
filepath, duration=self.duration, threshold_time=self.threshold_time)
self.sessions = binned_sessions
if len(binned_sessions) is 0:
return None, None, None, None, None
for session_dict in binned_sessions:
if session_dict is not None and len(session_dict) > 0:
if source_ip is None:
feature_list, source_ip, other_ips, capture_source_ip = extract_features(
session_dict
)
else:
feature_list, _, other_ips, capture_source_ip = extract_features(
session_dict,
capture_source=source_ip
files = get_pcap_paths(data_dir)
# Go through all the files in the directory
logger.info('Found {0} pcap files to read.'.format(len(files)))
count = 0
for filename in files:
count += 1
# Extract the label from the filename
name, label = get_true_label(filename, label_assignments)
if label not in assigned_labels:
assigned_labels.append(label)
logger.info('Reading {0} ({1} bytes) as {2} ({3}/{4})'.format(
name, os.path.getsize(filename), label, count, len(files)))
# Bin the sessions with the specified time window
binned_sessions = sessionizer(
filename,
duration=duration
)
# Get the capture source from the binned sessions
capture_source = get_source(binned_sessions)
# For each of the session bins, compute the full feature vectors
for session_dict in binned_sessions:
features, _, _, _ = extract_features(
session_dict,
capture_source=capture_source
)
# Store the feature vector and the labels
X.append(features)
y.append(assigned_labels.index(label))