Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setUp(self):
self.interp = Interpreter()
self.symtable = self.interp.symtable
self.set_stdout()
self.set_stderr()
def _get_existing_symbols():
interpreter = asteval.Interpreter()
return set(interpreter.symtable)
def get_interpreter(self) -> Interpreter:
""" Use the activity key to determine which symbols are added
to the formula interpreter.
"""
interpreter = Interpreter()
act = ActivityParameter.get_or_none(database=self.key[0], code=self.key[1])
if act:
interpreter.symtable.update(ActivityParameter.static(act.group, full=True))
else:
print("No parameter found for {}, creating one on formula save".format(self.key))
interpreter.symtable.update(ProjectParameter.static())
interpreter.symtable.update(DatabaseParameter.static(self.key[0]))
return interpreter
def evaluate_parameter_sets(self):
"""
This takes the parameter sets of the model instance and evaluates any formulas using the parameter values to create a
fixed, full set of parameters for each parameter set in the model
"""
evaluated_parameter_sets = OrderedDict()
for ps_name, ps in self.parameter_sets.items():
#print(ps_name)
this_ps = {}
aeval = Interpreter()
all_params = dict(self.params)
all_params.update(dict(self.normalised_params))
all_params.update(dict(self.production_params))
for key in self.order:
if key in self.global_params:
if key not in ps.keys():
ps[key] = self.global_params[key]['default']
aeval.symtable[key] = this_ps[key] = ps[key]
elif all_params[key].get('function'):
value = aeval(all_params[key]['function'])
aeval.symtable[key] = this_ps[key] = value
def recalculate(dataset):
"""Recalculate parameterized relationships within a dataset.
Modifies values in place.
Creates a ``TolerantParameterSet``, populates it with named parameters with a dataset, and then gets the evaluation order the graph of parameter relationships. After reevaluating all named parameters, creates an ``Interpreter`` with named parameters and all of numpy in its namespace. This interpreter is used to evaluate all other formulas in the dataset.
Formulas that divide by zero are evaluated to zero.
Returns the modified dataset."""
interpreter = Interpreter()
parameter_set = TolerantParameterSet(extract_named_parameters(dataset))
for key, value in parameter_set.evaluate().items():
interpreter.symtable[key] = value
for exc in iterate_all_parameters(dataset):
if 'formula' in exc:
try:
exc['amount'] = interpreter(exc['formula'])
except ZeroDivisionError:
exc['amount'] = 0
elif 'variable' in exc:
exc['amount'] = interpreter.symtable[exc['variable']]
else:
raise ValueError
# https://github.com/OcelotProject/Ocelot/issues/111
raise NotImplementedError("No file access in WITCH")
# ADDING A NEW WITCH API FUNCTION?
# This is the place to start. usersyms is what is exposed to WITCH programmers. Be mindful
# what you expose here: it should only be functions that act as closures over sensitive
# objects.
#
# Not all of these are intended to be used directly; stuff like add_hears_handler is called
# from a convenience macro, (hears).
#
# Decide how you want to expose stuff to the user: can it just be a regular function defined
# here or is it a helper for a macro? If you are providing something that takes code from
# the user to be used as a callback, you're going to want to wrap it in a macro. If it's a
# simple function like (random-number), just define it as a function here.
self.script_engine = script_engine
self.interpreter = asteval.Interpreter(
use_numpy=False,
max_time=100000.0, # there's a bug with this and setting it arbitrarily high avoids it
usersyms=dict(
open=witch_open,
split_args=split_args,
random_number=random_number,
add_provides_handler=add_provides_handler,
add_hears_handler=add_hears_handler,
add_sees_handler=add_sees_handler,
set_data=set_data,
get_data=get_data,
says=says,
does=does,
set_permissions=set_permissions,
add_docstring=add_docstring,
witch_tell_sender=tell_sender,
def calcastevalformula(formula, x=None):
"""
This takes a formula, such as 9**8 + sin(19)
Optionally, provide the formula with x included and a value for x, e.g.
calcastevalformula(x**2 =5, x=18)
"""
from asteval import Interpreter
if x or x==0:
# print('we found x with value ' + str(x))
formula = formula.replace('x',str(x))
# print(formula)
aeval = Interpreter()
result = aeval(formula)
# print('RESULT: ' + str(result))
return result
def _eval_context(self, vars):
e = asteval.Interpreter(use_numpy=False, writer=None)
e.symtable.update(vars)
e.symtable['__last_iteration'] = vars.get("__last_iteration", False)
return e
if type(dev_score) == list:
dev_scores.extend(dev_score)
else:
dev_scores.append(dev_score)
self.dev_loss_tracker.set_dev_score(dev_scores[0])
for dev_score in dev_scores[1:]:
self.dev_loss_tracker.add_aux_score(dev_score)
self.dev_loss_tracker.report()
# Control the learning schedule
if control_learning_schedule:
# Check if this is the best
is_best = False
if self.dev_combinator is not None:
x = [y.value() for y in dev_scores]
aevala = Interpreter(symtable={'x': x})
my_score = aevala(self.dev_combinator)
logger.info(' combined dev scores according to {}: {}'.format(self.dev_combinator, my_score))
if self.training_state.best_dev_score is None or my_score > self.training_state.best_dev_score:
self.training_state.best_dev_score = my_score
is_best = True
elif dev_scores[0].better_than(self.training_state.best_dev_score):
self.training_state.best_dev_score = dev_scores[0]
is_best = True
# If this is the best, write the model out
if is_best:
self.training_state.cur_attempt = 0
needs_saving = True
logger.info(f" best dev score, writing out model")
else:
needs_saving = False
# otherwise: learning rate decay / early stopping