Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
'''
# Read the capture into a feature array
X = []
timestamps = []
binned_sessions = sessionizer(
filepath, duration=self.duration, threshold_time=self.threshold_time)
self.sessions = binned_sessions
if len(binned_sessions) is 0:
return None, None, None, None, None
for session_dict in binned_sessions:
if session_dict is not None and len(session_dict) > 0:
if source_ip is None:
feature_list, source_ip, other_ips, capture_source_ip = extract_features(
session_dict
)
else:
feature_list, _, other_ips, capture_source_ip = extract_features(
session_dict,
capture_source=source_ip
)
X.append(feature_list)
last_packet = list(session_dict.items())[-1]
timestamps.append(last_packet[1][0][0])
if len(X) == 0:
return None, None, None, None, None
full_features = np.stack(X)
if label not in assigned_labels:
assigned_labels.append(label)
logger.info('Reading {0} ({1} bytes) as {2} ({3}/{4})'.format(
name, os.path.getsize(filename), label, count, len(files)))
# Bin the sessions with the specified time window
binned_sessions = sessionizer(
filename,
duration=duration
)
# Get the capture source from the binned sessions
capture_source = get_source(binned_sessions)
# For each of the session bins, compute the full feature vectors
for session_dict in binned_sessions:
features, _, _, _ = extract_features(
session_dict,
capture_source=capture_source
)
# Store the feature vector and the labels
X.append(features)
y.append(assigned_labels.index(label))
# Update the labels to reflect the new assignments
new_labels = assigned_labels + \
[l for l in labels if l not in assigned_labels]
try:
return np.stack(X), np.stack(y), new_labels
except Exception as e: # pragma: no cover
logger.error('Failed because {0}'.format(str(e)))
timestamps = []
binned_sessions = sessionizer(
filepath, duration=self.duration, threshold_time=self.threshold_time)
self.sessions = binned_sessions
if len(binned_sessions) is 0:
return None, None, None, None, None
for session_dict in binned_sessions:
if session_dict is not None and len(session_dict) > 0:
if source_ip is None:
feature_list, source_ip, other_ips, capture_source_ip = extract_features(
session_dict
)
else:
feature_list, _, other_ips, capture_source_ip = extract_features(
session_dict,
capture_source=source_ip
)
X.append(feature_list)
last_packet = list(session_dict.items())[-1]
timestamps.append(last_packet[1][0][0])
if len(X) == 0:
return None, None, None, None, None
full_features = np.stack(X)
# Mean normalize the features
try:
full_features -= np.expand_dims(self.means, 0)
full_features /= np.expand_dims(self.stds, 0)