Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_flux_variability_parallel(self, core_model):
original_objective = core_model.objective
mp_view = MultiprocessingView(2)
fva_solution = flux_variability_analysis(core_model, fraction_of_optimum=0.999999419892,
remove_cycles=False, view=mp_view)
pfba_fva = flux_variability_analysis(core_model, fraction_of_optimum=1, pfba_factor=1,
view=mp_view).data_frame
mp_view.shutdown()
assert_data_frames_equal(fva_solution, REFERENCE_FVA_SOLUTION_ECOLI_CORE)
assert original_objective == core_model.objective
assert sum(abs(pfba_fva.lower_bound)) - 518.422 < .001
assert sum(abs(pfba_fva.upper_bound)) - 518.422 < .001
def test_flux_variability_parallel(self, core_model):
original_objective = core_model.objective
mp_view = MultiprocessingView(2)
fva_solution = flux_variability_analysis(core_model, fraction_of_optimum=0.999999419892,
remove_cycles=False, view=mp_view)
pfba_fva = flux_variability_analysis(core_model, fraction_of_optimum=1, pfba_factor=1,
view=mp_view).data_frame
mp_view.shutdown()
assert_data_frames_equal(fva_solution, REFERENCE_FVA_SOLUTION_ECOLI_CORE)
assert original_objective == core_model.objective
assert sum(abs(pfba_fva.lower_bound)) - 518.422 < .001
assert sum(abs(pfba_fva.upper_bound)) - 518.422 < .001
def test_flux_variability_parallel_remove_cycles(self, core_model):
original_objective = core_model.objective
fva_solution = flux_variability_analysis(core_model, fraction_of_optimum=0.999999419892,
remove_cycles=True, view=MultiprocessingView())
assert REFERENCE_FVA_SOLUTION_ECOLI_CORE['upper_bound']['FRD7'] > 666.
assert round(abs(fva_solution['upper_bound']['FRD7'] - 0.), 7) == 0
for key in fva_solution.data_frame.index:
if REFERENCE_FVA_SOLUTION_ECOLI_CORE['lower_bound'][key] > -666:
assert abs(
fva_solution['lower_bound'][key] - REFERENCE_FVA_SOLUTION_ECOLI_CORE['lower_bound'][key]) < 0.0001
if REFERENCE_FVA_SOLUTION_ECOLI_CORE['upper_bound'][key] < 666:
assert abs(
fva_solution['upper_bound'][key] - REFERENCE_FVA_SOLUTION_ECOLI_CORE['upper_bound'][key]) < 0.0001
assert original_objective == core_model.objective
def test_flux_variability_parallel_remove_cycles(self):
fva_solution = flux_variability_analysis(self.model, remove_cycles=True, view=MultiprocessingView())
for key in fva_solution.index:
if abs(REFERENCE_FVA_SOLUTION_ECOLI_CORE['lower_bound'][key]) < 999993:
self.assertAlmostEqual(fva_solution['lower_bound'][key],
REFERENCE_FVA_SOLUTION_ECOLI_CORE['lower_bound'][key], delta=0.000001)
if abs(REFERENCE_FVA_SOLUTION_ECOLI_CORE['upper_bound'][key]) < 999993:
self.assertAlmostEqual(fva_solution['upper_bound'][key],
REFERENCE_FVA_SOLUTION_ECOLI_CORE['upper_bound'][key], delta=0.000001)
assert round(abs(fva_solution['upper_bound']['FRD7'] - 0.), 7) == 0
for key in fva_solution.data_frame.index:
if REFERENCE_FVA_SOLUTION_ECOLI_CORE['lower_bound'][key] > -666:
assert abs(
fva_solution['lower_bound'][key] - REFERENCE_FVA_SOLUTION_ECOLI_CORE['lower_bound'][key]) < 0.0001
if REFERENCE_FVA_SOLUTION_ECOLI_CORE['upper_bound'][key] < 666:
assert abs(
fva_solution['upper_bound'][key] - REFERENCE_FVA_SOLUTION_ECOLI_CORE['upper_bound'][key]) < 0.0001
cycle_reac = Reaction("minus_PGI") # Create fake cycle
cycle_reac.lower_bound = -1000
core_model.add_reaction(cycle_reac)
cycle_reac.add_metabolites({met: -c for met, c in core_model.reactions.PGI.metabolites.items()})
fva_solution = flux_variability_analysis(core_model, remove_cycles=False, reactions=["PGI"])
assert fva_solution.data_frame.loc["PGI", "upper_bound"] == 1000
fva_solution = flux_variability_analysis(core_model, remove_cycles=True, reactions=["PGI"])
assert fva_solution.data_frame.loc["PGI", "upper_bound"] < 666
assert original_objective == core_model.objective
def test_flux_variability_parallel(self):
mp_view = MultiprocessingView()
fva_solution = flux_variability_analysis(self.model, remove_cycles=False, view=mp_view)
mp_view.shutdown()
assert_dataframes_equal(fva_solution, REFERENCE_FVA_SOLUTION_ECOLI_CORE)
def _process_knockouts(self):
progress = ProgressBar(maxval=len(self._knockouts), widgets=["Processing solutions: ", Bar(), Percentage()])
self._processed_knockouts = DataFrame(columns=["reactions", "size", self._target,
"biomass", "fva_min", "fva_max"])
for i, knockouts in progress(enumerate(self._knockouts)):
try:
with self._model:
[self._model.reactions.get_by_id(ko).knock_out() for ko in knockouts]
fva = flux_variability_analysis(self._model, fraction_of_optimum=0.99, reactions=[self.target])
self._processed_knockouts.loc[i] = [knockouts, len(knockouts), self.production[i], self.biomass[i],
fva.lower_bound(self.target), fva.upper_bound(self.target)]
except OptimizationError:
self._processed_knockouts.loc[i] = [numpy.nan for _ in self._processed_knockouts.columns]
def _remove_blocked_reactions(self):
fva_res = flux_variability_analysis(self._model, fraction_of_optimum=0)
# FIXME: Iterate over the index only (reaction identifiers).
blocked = [
self._model.reactions.get_by_id(reaction) for reaction, row in fva_res.data_frame.iterrows()
if (round(row["lower_bound"], config.ndecimals) == round(
row["upper_bound"], config.ndecimals) == 0)
]
self._model.remove_reactions(blocked)
def __call__(self, point):
self._set_bounds(point[1])
fva_result = flux_variability_analysis(self.model, reactions=self.included_reactions,
remove_cycles=False, view=SequentialView()).data_frame
fva_result['lower_bound'] = fva_result.lower_bound.apply(lambda v: 0 if abs(v) < non_zero_flux_threshold else v)
fva_result['upper_bound'] = fva_result.upper_bound.apply(lambda v: 0 if abs(v) < non_zero_flux_threshold else v)
return point[1], fva_result