Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
type=SplitTypes.SPLIT_DOUBLE.value,
original_log=split.original_log,
test_size=split.test_size,
splitting_method=split.splitting_method
).exists() or split.splitting_method == SplitOrderingMethods.SPLIT_RANDOM.value):
training_log, test_log = _split_single_log(split)
additional_columns = get_additional_columns(get_log(split.original_log))
if split.splitting_method != SplitOrderingMethods.SPLIT_RANDOM.value:
_ = Split.objects.get_or_create(
type=SplitTypes.SPLIT_DOUBLE.value,
original_log=split.original_log,
test_size=split.test_size,
splitting_method=split.splitting_method,
train_log=create_log(EventLog(training_log), '0-' + str(100 - int(split.test_size * 100)) + '.xes'),
test_log=create_log(EventLog(test_log), str(100 - int(split.test_size * 100)) + '-100.xes'),
additional_columns=split.additional_columns
)[0]
logger.info("\t\tLoaded single log from {}".format(split.original_log.path))
else:
# Have to use sklearn to convert some internal data types
training_log = get_log(split.train_log)
additional_columns = get_additional_columns(training_log)
if split.additional_columns is None:
split.additional_columns = split.train_log.name + split.test_log.name + '_ac.xes'
split.save()
training_log, train_log_to_append = train_test_split(training_log, test_size=0, shuffle=False)
test_log, test_log_to_append = train_test_split(get_log(split.test_log), test_size=0, shuffle=False)
logger.info("\t\tLoaded double logs from {} and {}.".format(split.train_log.path, split.test_log.path))
if len(training_log) == 0:
raise TypeError("Training log is empty. Create a new Split with better parameters")
def replay_prediction_calculate(job: Job, log) -> (dict, dict):
"""calculate the prediction for the log coming from replayers
:param job: job dictionary
:param log: log model
:return: runtime results
"""
additional_columns = get_additional_columns(log)
data_df, _ = train_test_split(log, test_size=0, shuffle=False)
data_df, _ = encode_label_logs(data_df, EventLog(), job, additional_columns)
results = MODEL[job.predictive_model.predictive_model][ModelActions.PREDICT.value](job, data_df)
logger.info("End {} job {}, {} . Results {}".format('runtime', job.predictive_model.predictive_model, get_run(job), results))
results_dict = dict(zip(data_df['trace_id'], list(map(int, results))))
events_for_trace = dict()
data_encoder_decoder(job, data_df, EventLog())
return results_dict, events_for_trace
if len(search_for_already_existing_split) >= 1:
job.split = search_for_already_existing_split[0]
job.split.save()
job.save()
return get_encoded_logs(job, use_cache=use_cache)
else:
job.split = duplicate_orm_row(Split.objects.filter(pk=job.split.pk)[0])
job.split.type = SplitTypes.SPLIT_DOUBLE.value
train_name = 'SPLITTED_' + job.split.original_log.name.split('.')[0] + '_0-' + str(int(100 - (job.split.test_size * 100)))
job.split.train_log = create_log(
EventLog(training_log),
train_name + '.xes'
)
test_name = 'SPLITTED_' + job.split.original_log.name.split('.')[0] + '_' + str(int(100 - (job.split.test_size * 100))) + '-100'
job.split.test_log = create_log(
EventLog(test_log),
test_name + '.xes'
)
job.split.additional_columns = str(train_name + test_name) # TODO: find better naming policy
job.split.save()
put_loaded_logs(job.split, training_log, test_log, additional_columns)
training_df, test_df = encode_label_logs(
training_log,
test_log,
job,
additional_columns=additional_columns)
put_labelled_logs(job, training_df, test_df)
else:
training_log, test_log, additional_columns = get_train_test_log(job.split)
training_df, test_df = encode_label_logs(training_log, test_log, job, additional_columns=additional_columns)
type=SplitTypes.SPLIT_DOUBLE.value,
original_log=job.split.original_log,
test_size=job.split.test_size,
splitting_method=job.split.splitting_method
)
if len(search_for_already_existing_split) >= 1:
job.split = search_for_already_existing_split[0]
job.split.save()
job.save()
return get_encoded_logs(job, use_cache=use_cache)
else:
job.split = duplicate_orm_row(Split.objects.filter(pk=job.split.pk)[0])
job.split.type = SplitTypes.SPLIT_DOUBLE.value
train_name = 'SPLITTED_' + job.split.original_log.name.split('.')[0] + '_0-' + str(int(100 - (job.split.test_size * 100)))
job.split.train_log = create_log(
EventLog(training_log),
train_name + '.xes'
)
test_name = 'SPLITTED_' + job.split.original_log.name.split('.')[0] + '_' + str(int(100 - (job.split.test_size * 100))) + '-100'
job.split.test_log = create_log(
EventLog(test_log),
test_name + '.xes'
)
job.split.additional_columns = str(train_name + test_name) # TODO: find better naming policy
job.split.save()
put_loaded_logs(job.split, training_log, test_log, additional_columns)
training_df, test_df = encode_label_logs(
training_log,
test_log,
job,
elif split.original_log is not None and (not Split.objects.filter(
type=SplitTypes.SPLIT_DOUBLE.value,
original_log=split.original_log,
test_size=split.test_size,
splitting_method=split.splitting_method
).exists() or split.splitting_method == SplitOrderingMethods.SPLIT_RANDOM.value):
training_log, test_log = _split_single_log(split)
additional_columns = get_additional_columns(get_log(split.original_log))
if split.splitting_method != SplitOrderingMethods.SPLIT_RANDOM.value:
_ = Split.objects.get_or_create(
type=SplitTypes.SPLIT_DOUBLE.value,
original_log=split.original_log,
test_size=split.test_size,
splitting_method=split.splitting_method,
train_log=create_log(EventLog(training_log), '0-' + str(100 - int(split.test_size * 100)) + '.xes'),
test_log=create_log(EventLog(test_log), str(100 - int(split.test_size * 100)) + '-100.xes'),
additional_columns=split.additional_columns
)[0]
logger.info("\t\tLoaded single log from {}".format(split.original_log.path))
else:
# Have to use sklearn to convert some internal data types
training_log = get_log(split.train_log)
additional_columns = get_additional_columns(training_log)
if split.additional_columns is None:
split.additional_columns = split.train_log.name + split.test_log.name + '_ac.xes'
split.save()
training_log, train_log_to_append = train_test_split(training_log, test_size=0, shuffle=False)
test_log, test_log_to_append = train_test_split(get_log(split.test_log), test_size=0, shuffle=False)
logger.info("\t\tLoaded double logs from {} and {}.".format(split.train_log.path, split.test_log.path))
if len(training_log) == 0:
def declare_encoding(log, labelling, encoding, additional_columns, cols=None): #TODO JONAS
filter_t = True
print("Filter_t", filter_t)
templates = template_sizes.keys()
constraint_threshold = 0.1
candidate_threshold = 0.1
#apply prefix
log = [Trace(trace[:encoding.prefix_length], attributes=trace.attributes) for trace in log]
# Read into suitable data structure
transformed_log = xes_to_positional(log)
labels = {trace.attributes['concept:name']: trace.attributes['label'] for trace in log}
# Extract unique activities from log
events_set = {event_label for tid in transformed_log for event_label in transformed_log[tid]}
# Brute force all possible candidates
if cols is None:
candidates = [(event,) for event in events_set] + [(e1, e2) for e1 in events_set for e2 in events_set if e1 != e2]
else:
candidates = list({
make_tuple(c.split(':')[1]) if len(c.split(':')) > 1 else c
for c in cols
if c not in ['label', 'trace_id']
def replay_prediction(replay_job: Job, training_initial_job: Job, trace_id) -> list:
"""The function create a set with timestamps of events, then create a list of requests
simulating the log in the time passing
:param trace_id:
:param replay_job: job dictionary
:param training_initial_job: job dictionary
:return: List of requests
"""
split = replay_job.split
log = get_log(split.train_log)
requests_list = list()
eventlog = EventLog()
trace = log[int(trace_id)]
for key in log.attributes.keys():
eventlog.attributes[key] = log.attributes[key]
for index in range(len(trace)):
new_trace = Trace(trace[0:index])
for key in trace.attributes:
new_trace.attributes[key] = trace.attributes[key]
eventlog.append(new_trace)
replay_job.case_id = trace_id
replay_job.event_number = len(trace)
replay_job.save()
try:
logger.error("Sending request for replay_prediction task.")
r = requests.post(
url="http://127.0.0.1:8000/runtime/replay_prediction/",
data=export_log_as_string(eventlog),
def create_log(log, name: str, folder='cache/log_cache/', import_in_cache=True):
logger.info('\tCreating new file (' + name + ') in memory')
if import_in_cache:
name = create_unique_name(name)
path = folder + name
if import_in_cache:
if isinstance(log, EventLog):
export_log[pathlib.Path(name).suffixes[0]](log, path)
else:
default_storage.save(path, ContentFile(log.read()))
log = import_log[pathlib.Path(name).suffixes[0]](path)
else: # TODO: this might be risky
if not isinstance(log, EventLog):
log = import_log[pathlib.Path(name).suffixes[0]](path)
properties = create_properties(log)
return Log.objects.create(name=name, path=path, properties=properties)
def create_log(log, name: str, folder='cache/log_cache/', import_in_cache=True):
logger.info('\tCreating new file (' + name + ') in memory')
if import_in_cache:
name = create_unique_name(name)
path = folder + name
if import_in_cache:
if isinstance(log, EventLog):
export_log[pathlib.Path(name).suffixes[0]](log, path)
else:
default_storage.save(path, ContentFile(log.read()))
log = import_log[pathlib.Path(name).suffixes[0]](path)
else: # TODO: this might be risky
if not isinstance(log, EventLog):
log = import_log[pathlib.Path(name).suffixes[0]](path)
properties = create_properties(log)
return Log.objects.create(name=name, path=path, properties=properties)
simulating the log in the time passing
:param trace_id:
:param replay_job: job dictionary
:param training_initial_job: job dictionary
:return: List of requests
"""
split = replay_job.split
log = get_log(split.train_log)
requests_list = list()
eventlog = EventLog()
trace = log[int(trace_id)]
for key in log.attributes.keys():
eventlog.attributes[key] = log.attributes[key]
for index in range(len(trace)):
new_trace = Trace(trace[0:index])
for key in trace.attributes:
new_trace.attributes[key] = trace.attributes[key]
eventlog.append(new_trace)
replay_job.case_id = trace_id
replay_job.event_number = len(trace)
replay_job.save()
try:
logger.error("Sending request for replay_prediction task.")
r = requests.post(
url="http://127.0.0.1:8000/runtime/replay_prediction/",
data=export_log_as_string(eventlog),
params={'jobId': replay_job.id, 'training_job': training_initial_job.id},
headers={'Content-Type': 'text/plain', 'charset': 'UTF-8'}
)
requests_list.append(str(r))
except Exception as e: