Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
model_path=self.args.trained_model)
## Check whether operation is evaluation, train, or test
## Evaluation returns predictions that are useful for the deployment
## of networkml in an operational environment.
if self.args.operation == 'eval':
self.load_model()
if (self.args.algorithm == 'onelayer' or self.args.algorithm == 'randomforest'):
base_alg = create_base_alg()
base_alg.eval(self.args.algorithm)
## SOS refers to statistical outlier selection model
elif self.args.algorithm == 'sos':
from networkml.algorithms.sos.eval_SoSModel import eval_pcap
eval_pcap(self.args.path, self.conf_labels, self.time_const)
## Train entails training a new model on specific packet captures
elif self.args.operation == 'train':
## Check for model type specified
## onelayer refers to a one-layer neural network
if self.args.algorithm == 'onelayer':
m = MLPClassifier(
(self.state_size),
alpha=0.1,
activation='relu',
max_iter=1000
)
base_alg = create_base_alg()
base_alg.train(self.args.path, self.args.save, m, self.args.algorithm)
source_address=source_mac
)
clean_sessions.append(cleaned_sessions)
if source_mac is None:
source_mac = inferred_mac
## Make simple decisions based on vector differences and update
## times
timestamp = timestamps[0].timestamp()
labels, confs = zip(*preds)
abnormality = 0.0
if self.has_avx():
from networkml.algorithms.sos.eval_SoSModel import eval_pcap
try:
abnormality = eval_pcap(
str(fi), self.conf_labels, self.time_const, label=labels[0],
rnn_size=self.rnn_size, model_path=self.model_path, model_type=algorithm)
except ValueError:
self.logger.warning("Can't run abnormality detection because not a big enough sample size")
else:
self.logger.warning(
"Can't run abnormality detection because this CPU doesn't support AVX")
prev_s = self.common.get_address_info(
source_mac,
timestamp
)
decision = self.common.basic_decision(
pcap_key,
source_mac,
prev_s,
inferred_mac = None
for session_dict in sessions:
cleaned_sessions, inferred_mac = \
clean_session_dict(
session_dict,
source_address=source_mac
)
clean_sessions.append(cleaned_sessions)
if source_mac is None:
source_mac = inferred_mac
# Make simple decisions based on vector differences and update times
timestamp = timestamps[0].timestamp()
labels, confs = zip(*preds)
abnormality = eval_pcap(
str(fi), self.conf_labels, self.time_const, label=labels[0],
rnn_size=self.rnn_size, model_path=self.model_path, model_type='randomforest')
prev_s = self.common.get_address_info(
source_mac,
timestamp
)
decision = self.common.basic_decision(
key,
source_mac,
prev_s,
timestamp,
labels,
confs,
abnormality
)
decision[source_mac]['source_ip'] = capture_ip_source