Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_str_handles_different_types_of_stored_operations(self):
tm = TimeMachine()
def normal_function():
pass
partial_function = partial(str, 1)
tm(do=normal_function, undo=partial_function)
assert tm.__str__().split('\n')[2:-1] == ["undo: " + str(str) + " (1,) {}", 'redo: normal_function']
def _simulate(self, reactions, method, model):
tm = TimeMachine()
for reaction in reactions:
tm(do=partial(setattr, reaction, 'lower_bound', 0),
undo=partial(setattr, reaction, 'lower_bound', reaction.lower_bound))
tm(do=partial(setattr, reaction, 'upper_bound', 0),
undo=partial(setattr, reaction, 'upper_bound', reaction.upper_bound))
try:
solution = method(model)
except Exception as e:
logger.exception(e)
tm.reset()
return solution
def cycle_free_flux(model, fluxes, fix=[]):
tm = TimeMachine()
exchange_reactions = model.exchanges
exchange_ids = [exchange.id for exchange in exchange_reactions]
internal_reactions = [reaction for reaction in model.reactions if reaction.id not in exchange_ids]
for exchange in exchange_reactions:
exchange_flux = fluxes[exchange.id]
tm(do=partial(setattr, exchange, 'lower_bound', exchange_flux),
undo=partial(setattr, exchange, 'lower_bound', exchange.lower_bound))
tm(do=partial(setattr, exchange, 'upper_bound', exchange_flux),
undo=partial(setattr, exchange, 'upper_bound', exchange.upper_bound))
obj_terms = list()
for internal_reaction in internal_reactions:
internal_flux = fluxes[internal_reaction.id]
if internal_flux >= 0:
obj_terms.append(Mul._from_args([S.One, internal_reaction.variable]))
tm(do=partial(setattr, internal_reaction, 'lower_bound', 0),
undo=partial(setattr, internal_reaction, 'lower_bound', internal_reaction.lower_bound))
def expression_sensitivity_analysis(method, model, expression_profile, metrics, condition_exchanges=None,
condition_knockouts=None, growth_rates=None, growth_rates_std=None,
growth_reaction=None, **kwargs):
assert isinstance(model, Model)
assert isinstance(expression_profile, ExpressionProfile)
if condition_exchanges is None:
condition_exchanges = {}
if condition_knockouts is None:
condition_knockouts = {}
data_frames = [DataFrame(columns=["x", "y", "condition"]) for _ in metrics]
for condition in expression_profile.conditions:
with TimeMachine() as tm:
for exchange_condition, ex in six.iteritems(condition_exchanges):
if exchange_condition == condition:
tm(do=p(setattr, ex, "lower_bound", -10), undo=p(setattr, ex, "lower_bound", ex.lower_bound))
else:
tm(do=p(setattr, ex, "lower_bound", 0), undo=p(setattr, ex, "lower_bound", ex.lower_bound))
for knockout in condition_knockouts.get(condition, []):
model.reactions.get_by_id(knockout).knock_out(tm)
if growth_reaction is not None:
growth_rate = growth_rates[condition]
growth_std = growth_rates_std[condition]
if isinstance(growth_reaction, str):
growth_reaction = model.reactions.get_by_id(growth_reaction)
tm(do=p(setattr, growth_reaction, "lower_bound", growth_rate-growth_std),
def essential_genes(self, threshold=1e-6):
essential = []
time_machine = TimeMachine()
try:
solution = self.solve()
except SolveError as e:
print 'Cannot determine essential genes for unoptimal model.'
raise e
genes_to_check = set()
for reaction_id, flux in solution.x_dict.iteritems():
if abs(flux) > 0:
genes_to_check.update(self.reactions.get_by_id(reaction_id).genes)
for gene in genes_to_check:
reactions = find_gene_knockout_reactions(self, [gene])
for reaction in reactions:
time_machine(do=partial(setattr, reaction, 'lower_bound', 0),
undo=partial(setattr, reaction, 'lower_bound', reaction.lower_bound))
time_machine(do=partial(setattr, reaction, 'upper_bound', 0),
undo=partial(setattr, reaction, 'upper_bound', reaction.upper_bound))
def _simulate(self, reactions, method, model):
tm = TimeMachine()
for reaction in reactions:
tm(do=partial(setattr, reaction, 'lower_bound', 0),
undo=partial(setattr, reaction, 'lower_bound', reaction.lower_bound))
tm(do=partial(setattr, reaction, 'upper_bound', 0),
undo=partial(setattr, reaction, 'upper_bound', reaction.upper_bound))
try:
solution = method(model)
except Exception, e:
logger.exception(e)
tm.reset()
return solution
def __call__(self, population):
time_machine = TimeMachine()
return [self.evaluate_individual(i, time_machine) for i in population]
def __init__(self):
super(TimeMachine, self).__init__()
self.history = OrderedDict()
# the model is defined differently or some other normalizing
# reaction is chosen, we use the absolute value.
norm = abs(
self.reference_flux_ranges.at[self.normalize_ranges_by,
"lower_bound"]
)
if norm > non_zero_flux_threshold:
normalized_reference_intervals = reference_intervals / norm
else:
raise ValueError(
"The reaction that you have chosen for normalization '{}' "
"has zero flux in the reference state. Please choose another "
"one.".format(self.normalize_ranges_by)
)
with TimeMachine() as tm:
# Make sure that the design_space_model is initialized to its original state later
for variable in self.variables:
reaction = self.design_space_model.reactions.get_by_id(variable)
tm(do=int, undo=partial(setattr, reaction, 'lower_bound', reaction.lower_bound))
tm(do=int, undo=partial(setattr, reaction, 'upper_bound', reaction.upper_bound))
target_reaction = self.design_space_model.reactions.get_by_id(self.objective)
tm(do=int, undo=partial(setattr, target_reaction, 'lower_bound', target_reaction.lower_bound))
tm(do=int, undo=partial(setattr, target_reaction, 'upper_bound', target_reaction.upper_bound))
if view is None:
view = config.default_view
else:
view = view
self._init_search_grid(surface_only=surface_only, improvements_only=improvements_only)