Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_initializer(self):
def evaluation_function(x):
return 1
evaluator = EvaluatorWrapper(config.default_view, evaluation_function)
assert hasattr(evaluator, '__call__')
assert hasattr(evaluator, 'view')
assert hasattr(evaluator, 'evaluator')
assert evaluator.view == config.default_view
assert evaluator.evaluator == evaluation_function
def test_api():
mock_host = Host('core',
models=['e_coli_core'],
biomass=['BIOMASS_Ecoli_core_w_GAM'],
carbon_sources=['EX_glc__D_e'])
api.design.debug = True
pathways = api.design.predict_pathways(product=UNIVERSALMODEL.metabolites.ser__L_c, hosts=[mock_host],
database=UNIVERSALMODEL, aerobic=True)
optimization_reports = api.design.optimize_strains(pathways, config.default_view, aerobic=True)
pickle.loads(pickle.dumps(optimization_reports))
assert len(optimization_reports) > 0
def test_initializer(self):
def evaluation_function(x):
return 1
evaluator = EvaluatorWrapper(config.default_view, evaluation_function)
assert hasattr(evaluator, '__call__')
assert hasattr(evaluator, 'view')
assert hasattr(evaluator, 'evaluator')
assert evaluator.view == config.default_view
assert evaluator.evaluator == evaluation_function
def flux_variability_analysis(model, reactions=None, view=config.default_view):
"""Flux-variability analysis."""
if reactions is None:
reactions = model.reactions
reaction_chunks = (chunk for chunk in partition(reactions, len(view)))
func_obj = FvaFunctionObject(model)
chunky_results = view.map(func_obj, reaction_chunks)
# chunky_results = view.map(lambda reactions, model=model:_flux_variability_analysis(model, reactions), reaction_chunks)
solution = {}
for dictionary in chunky_results:
solution.update(dictionary)
return solution
"has zero flux in the reference state. Please choose another "
"one.".format(self.normalize_ranges_by)
)
with TimeMachine() as tm:
# Make sure that the design_space_model is initialized to its original state later
for variable in self.variables:
reaction = self.design_space_model.reactions.get_by_id(variable)
tm(do=int, undo=partial(setattr, reaction, 'lower_bound', reaction.lower_bound))
tm(do=int, undo=partial(setattr, reaction, 'upper_bound', reaction.upper_bound))
target_reaction = self.design_space_model.reactions.get_by_id(self.objective)
tm(do=int, undo=partial(setattr, target_reaction, 'lower_bound', target_reaction.lower_bound))
tm(do=int, undo=partial(setattr, target_reaction, 'upper_bound', target_reaction.upper_bound))
if view is None:
view = config.default_view
else:
view = view
self._init_search_grid(surface_only=surface_only, improvements_only=improvements_only)
func_obj = _DifferentialFvaEvaluator(
self.design_space_model,
self.variables,
self.objective,
self.included_reactions
)
if progress:
progress = ProgressBar(len(self.grid))
results = list(progress(view.imap(func_obj, self.grid.iterrows())))
else:
results = list(view.map(func_obj, self.grid.iterrows()))
def run(self, view=config.default_view, maximize=False, **kwargs):
res = self._heuristic_method.evolve(generator=self._generator,
maximize=maximize,
representation=self.candidates,
bounder=zero_one_bounder,
evaluator=self._evaluator,
binary=self.binary,
view=view,
**kwargs)
return res
def __call__(self, product='L-glutamate',
hosts=HOSTS,
database=None,
aerobic=True,
view=config.default_view):
"""The works.
The following workflow will be followed to determine suitable
metabolic engineering strategies for a desired product:
- Determine production pathways for desired product and host organisms.
Try a list of default hosts if no hosts are specified.
- Determine maximum theoretical yields and production envelopes for
all routes.
- Determine if production routes can be coupled to growth.
- Determine over-expression, down-regulation, and KO targets.
Parameters
----------
product : str or Metabolite
The desired product.
If not None, fix the total sum of reaction fluxes to its minimum times a factor. Expected to be within [
1;inf]. Higher factors increase flux variability to a certain point since the bound for the objective is
still fixed.
remove_cycles : bool
If true, apply the CycleFreeFlux algorithm to remove loops from each simulated flux distribution.
view: cameo.parallel.SequentialView or cameo.parallel.MultiprocessingView or ipython.cluster.DirectView
A parallelization view.
Returns
-------
pandas.DataFrame
Pandas DataFrame containing the results of the flux variability analysis.
"""
if view is None:
view = config.default_view
if reactions is None:
reactions = model.reactions
with model:
if fraction_of_optimum > 0.:
fix_objective_as_constraint(model, fraction=fraction_of_optimum)
if pfba_factor is not None:
# don't add the objective-constraint again so fraction_of_optimum=0
fix_pfba_as_constraint(model, multiplier=pfba_factor, fraction_of_optimum=0)
reaction_chunks = (chunk for chunk in partition(reactions, len(view)))
if remove_cycles:
func_obj = _FvaFunctionObject(model, _cycle_free_fva)
else:
func_obj = _FvaFunctionObject(model, _flux_variability_analysis)
chunky_results = view.map(func_obj, reaction_chunks)
solution = pandas.concat(chunky_results)
def run(self, view=config.default_view, **kwargs):
return self.heuristic_method.evolve(
generator=self._generator,
maximize=True,
view=view,
evaluator=self._evaluator,
**kwargs)
def run(self, view=config.default_view, **run_kwargs):
run_kwargs['view'] = parallel.SequentialView()
runner = MultiprocessRunner(self._island_class, self._init_kwargs(), self.migrator, run_kwargs)
clients = [[o.clients[i] for o in self.observers] for i in xrange(len(view))]
results = view.map(runner, clients)
return results