Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
for filename in files:
count += 1
# Extract the label from the filename
name, label = get_true_label(filename, label_assignments)
if label not in assigned_labels:
assigned_labels.append(label)
logger.info('Reading {0} ({1} bytes) as {2} ({3}/{4})'.format(
name, os.path.getsize(filename), label, count, len(files)))
# Bin the sessions with the specified time window
binned_sessions = sessionizer(
filename,
duration=duration
)
# Get the capture source from the binned sessions
capture_source = get_source(binned_sessions)
# For each of the session bins, compute the full feature vectors
for session_dict in binned_sessions:
features, _, _, _ = extract_features(
session_dict,
capture_source=capture_source
)
# Store the feature vector and the labels
X.append(features)
y.append(assigned_labels.index(label))
# Update the labels to reflect the new assignments
new_labels = assigned_labels + \
[l for l in labels if l not in assigned_labels]
Args:
pcap_path: path to the packet capture to process into features
max_port: Maximum port to get features on (default to reading config)
Returns:
feature_vector: Vector containing the featurized representation
of the input pcap.
'''
address_type = 'MAC'
capture_ip_source = capture_source
# If the capture source isn't specified, default to the most used address
if capture_source is None:
capture_source = get_source(session_dict, address_type=address_type)
capture_ip_source = get_source(session_dict, address_type='IP')
# Initialize some counter variables
num_sport_init = [0]*max_port
num_dport_init = [0]*max_port
num_sport_rec = [0]*max_port
num_dport_rec = [0]*max_port
num_sessions_init = 0
num_external_init = 0
num_tcp_sess_init = 0
num_udp_sess_init = 0
num_icmp_sess_init = 0
num_sessions_rec = 0
num_external_rec = 0
)
preds = model.classify_representation(new_rep)
if label is not None:
preds = [(p[0], 0) for p in preds if p[0] != label]
preds.append((label, 1))
model_outputs[timestamp] = {
'classification': list(preds),
'representation': list(rep),
'mean representation': list(new_rep)
}
prev_rep, prev_time = new_rep, time
# Clean the sessions and merge them into a single session dict
session_rep_pairs = []
source = get_source(sessions, address_type='IP')
for session_dict in sessions:
for key, value in session_dict.items():
session_info = featurize_session(key, value, source=source)
first_time = value[0][0].timestamp()
prior_time = None
for timestamp in timestamps:
time = timestamp.timestamp()
if first_time > time:
prior_time = timestamp
if prior_time == None:
prior_time = timestamps[0]
pair = {
'model outputs': model_outputs[prior_time],
'session info': session_info,
Args:
pcap_path: path to the packet capture to process into features
max_port: Maximum port to get features on (default to reading config)
Returns:
feature_vector: Vector containing the featurized representation
of the input pcap.
'''
address_type = 'MAC'
capture_ip_source = capture_source
# If the capture source isn't specified, default to the most used address
if capture_source is None:
capture_source = get_source(session_dict, address_type=address_type)
capture_ip_source = get_source(session_dict, address_type='IP')
# Initialize some counter variables
num_sport_init = [0]*max_port
num_dport_init = [0]*max_port
num_sport_rec = [0]*max_port
num_dport_rec = [0]*max_port
num_sessions_init = 0
num_external_init = 0
num_tcp_sess_init = 0
num_udp_sess_init = 0
num_icmp_sess_init = 0
num_sessions_rec = 0
num_external_rec = 0
num_tcp_sess_rec = 0