Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Fix the value of the current objective to a fraction of is maximum.
Returns
-------
FluxDistributionResult
Contains the result of the linear solver.
References
----------
.. [1] Lewis, N. E., Hixson, K. K., Conrad, T. M., Lerman, J. A., Charusanti, P., Polpitiya, A. D., …
Palsson, B. Ø. (2010). Omic data from evolved E. coli are consistent with computed optimal growth from
genome-scale models. Molecular Systems Biology, 6, 390. doi:10.1038/msb.2010.47
"""
solution = cobrapy_pfba(model, objective=objective, fraction_of_optimum=fraction_of_optimum, reactions=reactions)
return FluxDistributionResult.from_solution(solution)
name=constraint_id)
def update_lower_constraint(model, constraint, reaction, variable, flux_value, epsilon):
w_l = flux_value - delta * abs(flux_value) - epsilon
constraint._set_coefficients_low_level({variable: reaction.lower_bound - w_l})
constraint.lb = w_l
cache.add_constraint("c_%s_lower" % rid, create_lower_constraint, update_lower_constraint,
reaction, cache.variables["y_%s" % rid], flux_value, epsilon)
model.objective = model.solver.interface.Objective(add([mul([One, var]) for var in cache.variables.values()]),
direction='min')
try:
solution = model.optimize(raise_error=True)
if reactions is not None:
result = FluxDistributionResult({r: solution.get_primal_by_id(r) for r in reactions}, solution.f)
else:
result = FluxDistributionResult.from_solution(solution)
return result
except OptimizationError as e:
logger.error("room could not determine an optimal solution for objective %s" % model.objective)
raise e
except Exception as e:
cache.rollback()
raise e
finally:
if volatile:
cache.reset()
def __init__(self, fluxes, objective_value, *args, **kwargs):
super(FluxDistributionResult, self).__init__(*args, **kwargs)
self._fluxes = fluxes
self._objective_value = objective_value
import six
from numpy import nan, zeros
from pandas import DataFrame
from cameo.core.result import Result
from cameo.flux_analysis.simulation import FluxDistributionResult
from cobra import Reaction
def _compare_flux_distributions(flux_dist1, flux_dist2, self_key="A", other_key="B"):
assert isinstance(flux_dist1, FluxDistributionResult)
assert isinstance(flux_dist2, FluxDistributionResult)
return FluxDistributionDiff(flux_dist1, flux_dist2, self_key, other_key)
FluxDistributionResult.__sub__ = lambda self, other: _compare_flux_distributions(self, other)
class FluxBasedFluxDistribution(FluxDistributionResult):
def __init__(self, fluxes, objective_value, c13_flux_distribution, *args, **kwargs):
super(FluxBasedFluxDistribution, self).__init__(fluxes, objective_value, *args, **kwargs)
self._c13_fluxes = c13_flux_distribution
@property
def data_frame(self):
index = list(self.fluxes.keys())
data = zeros((len(self._fluxes.keys()), 3))
data[:, 0] = [self._fluxes[r] for r in index]
data[:, 1] = [self._c13_fluxes.get(r, [nan, nan])[0] for r in index]
data[:, 2] = [self._c13_fluxes.get(r, [nan, nan])[1] for r in index]
return DataFrame(data, index=index, columns=["fluxes", "c13_lower_limit", "c13_upper_limit"])
----------
model: cobra.Model
objective: a valid objective - see SolverBaseModel.objective (optional)
Returns
-------
FluxDistributionResult
Contains the result of the linear solver.
"""
with model:
if objective is not None:
model.objective = objective
solution = model.optimize(raise_error=True)
if reactions is not None:
result = FluxDistributionResult({r: solution[r] for r in reactions}, solution.objective_value)
else:
result = FluxDistributionResult.from_solution(solution)
return result
class FluxBasedFluxDistribution(FluxDistributionResult):
def __init__(self, fluxes, objective_value, c13_flux_distribution, *args, **kwargs):
super(FluxBasedFluxDistribution, self).__init__(fluxes, objective_value, *args, **kwargs)
self._c13_fluxes = c13_flux_distribution
@property
def data_frame(self):
index = list(self.fluxes.keys())
data = zeros((len(self._fluxes.keys()), 3))
data[:, 0] = [self._fluxes[r] for r in index]
data[:, 1] = [self._c13_fluxes.get(r, [nan, nan])[0] for r in index]
data[:, 2] = [self._c13_fluxes.get(r, [nan, nan])[1] for r in index]
return DataFrame(data, index=index, columns=["fluxes", "c13_lower_limit", "c13_upper_limit"])
class ExpressionBasedResult(FluxDistributionResult):
def __init__(self, fluxes, objective_value, expression, *args, **kwargs):
super(ExpressionBasedResult, self).__init__(fluxes, objective_value, *args, **kwargs)
self.expression = expression
class GimmeResult(ExpressionBasedResult):
def __init__(self, fluxes, objective_value, fba_fluxes, reaction_expression, cutoff, *args, **kwargs):
super(GimmeResult, self).__init__(fluxes, objective_value, reaction_expression, *args, **kwargs)
self._fba_fluxes = fba_fluxes
self.cutoff = cutoff
@property
def data_frame(self):
index = list(self.fluxes.keys())
data = zeros((len(self._fluxes.keys()), 4))
data[:, 0] = [self._fluxes[r] for r in index]
return constraint
cache.add_constraint("c_%s_lb" % rid, create_lower_constraint, update_lower_constraint,
cache.variables[neg_var_id], reaction, flux_value)
def create_objective(model, variables):
return model.solver.interface.Objective(add([mul((One, var)) for var in variables]),
direction="min",
sloppy=False)
cache.add_objective(create_objective, None, cache.variables.values())
try:
solution = model.optimize(raise_error=True)
if reactions is not None:
result = FluxDistributionResult({r: solution.get_primal_by_id(r) for r in reactions}, solution.f)
else:
result = FluxDistributionResult.from_solution(solution)
return result
except OptimizationError as e:
raise e
except Exception as e:
cache.rollback()
raise e
finally:
if volatile:
cache.reset()
def _compare_flux_distributions(flux_dist1, flux_dist2, self_key="A", other_key="B"):
assert isinstance(flux_dist1, FluxDistributionResult)
assert isinstance(flux_dist2, FluxDistributionResult)
return FluxDistributionDiff(flux_dist1, flux_dist2, self_key, other_key)
def __check_valid_reference(reference):
if not isinstance(reference, (dict, SolutionBase, FluxDistributionResult)):
raise ValueError('%s is not a valid reference flux distribution.'
'Needs to be either a dict or Solution or FluxDistributionResult.')