How to use the cameo.util.partition function in cameo

To help you get started, we’ve selected a few cameo examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github biosustain / cameo / tests / test_util.py View on Github external
iterables = [
            [1, 2, 3, 4, 5, 6, 7, 8, 9],
            {5, 3, 8, 3, 8, 5, 8, 0, 10, 11, 15},
            range(29)
        ]
        for fixture in iterables:
            test_output = partition(fixture, chunks)
            assert len(fixture) == sum(map(len, test_output))
            assert len(test_output) == chunks
            assert list(fixture) == list(chain(*test_output))
            for out_chunk in test_output:
                assert set(out_chunk).issubset(set(fixture))

        bad_input = 5
        with pytest.raises(TypeError):
            partition(bad_input, chunks)
github biosustain / cameo / cameo / flux_analysis / analysis.py View on Github external
-------
    pandas.DataFrame
        Pandas DataFrame containing the results of the flux variability analysis.

    """
    if view is None:
        view = config.default_view
    if reactions is None:
        reactions = model.reactions
    with model:
        if fraction_of_optimum > 0.:
            fix_objective_as_constraint(model, fraction=fraction_of_optimum)
        if pfba_factor is not None:
            # don't add the objective-constraint again so fraction_of_optimum=0
            fix_pfba_as_constraint(model, multiplier=pfba_factor, fraction_of_optimum=0)
        reaction_chunks = (chunk for chunk in partition(reactions, len(view)))
        if remove_cycles:
            func_obj = _FvaFunctionObject(model, _cycle_free_fva)
        else:
            func_obj = _FvaFunctionObject(model, _flux_variability_analysis)
        chunky_results = view.map(func_obj, reaction_chunks)
        solution = pandas.concat(chunky_results)

    return FluxVariabilityResult(solution)
github biosustain / cameo / cameo / basics.py View on Github external
def flux_variability_analysis(model, reactions=None, view=config.default_view):
    """Flux-variability analysis."""
    if reactions is None:
        reactions = model.reactions
    reaction_chunks = (chunk for chunk in partition(reactions, len(view)))

    func_obj = FvaFunctionObject(model)
    chunky_results = view.map(func_obj, reaction_chunks)
    # chunky_results = view.map(lambda reactions, model=model:_flux_variability_analysis(model, reactions), reaction_chunks)
    solution = {}
    for dictionary in chunky_results:
        solution.update(dictionary)
    return solution
github biosustain / cameo / cameo / visualization / plotting / with_plotly.py View on Github external
def _make_grid(grid):
        rows = grid.n_rows
        columns = math.ceil(len(grid.plots) / rows)

        plot = tools.make_subplots(rows=rows, cols=columns, subplot_titles=[p.layout['title'] for p in grid.plots])
        plot['layout']['width'] = grid.width
        plot['layout']['height'] = grid.height
        for i, subplots in enumerate(partition(grid.plots, rows)):
            for j, subplot in enumerate(subplots):
                for trace in subplot.data:
                    plot.append_trace(trace, i + 1, j + 1)

        return plot
github biosustain / cameo / cameo / visualization / plotting.py View on Github external
def _plot_bokeh_grid(self):
        if len(self.plots) > 0:
            grid = GridPlot(children=partition(self.plots, self.nrows), title=self.title)
            plotting.show(grid)
github biosustain / cameo / cameo / strain_design / heuristic / __init__.py View on Github external
def _evaluator(self, candidates, args):
        view = args.get('view')
        population_chunks = (chunk for chunk in partition(candidates, len(view)))
        func_obj = KnockoutEvaluator(self.model, self._decoder, self.objective_function, self.simulation_method)
        results = view.map(func_obj, population_chunks)
        fitness = reduce(list.__add__, results)

        return fitness
github biosustain / cameo / cameo / strain_design / heuristic / evolutionary / optimization.py View on Github external
def _simplify_solutions(self, solutions):
        simplification = SolutionSimplification(self._evaluator)
        chunks = (chunk for chunk in partition(solutions, len(self._view)))
        try:
            chunked_results = self._view.map(simplification, chunks)
        except KeyboardInterrupt as e:
            self.view.shutdown()
            raise e

        solutions = reduce(list.__add__, chunked_results)

        return solutions
github biosustain / cameo / cameo / visualization / plotting / with_bokeh.py View on Github external
def _make_grid(grid):
        return gridplot(children=partition(grid.plots, grid.n_rows),
                        name=grid.title)
github biosustain / cameo / cameo / flux_analysis / analysis.py View on Github external
#     pass

            model.objective = objective

        if source is not None:
            source_reaction = get_reaction_for(model, source)
        else:
            source_reaction = get_c_source_reaction(model)

        variable_reactions = model.reactions.get_by_any(variables)
        variables_min_max = flux_variability_analysis(model, reactions=variable_reactions, view=SequentialView())
        grid = [numpy.linspace(lower_bound, upper_bound, points, endpoint=True) for
                reaction_id, lower_bound, upper_bound in
                variables_min_max.data_frame.loc[variable_ids].itertuples()]
        grid_generator = itertools.product(*grid)
        chunks_of_points = partition(list(grid_generator), len(view))
        evaluator = _PhenotypicPhasePlaneChunkEvaluator(model, variable_reactions, source_reaction)
        chunk_results = view.map(evaluator, chunks_of_points)
        envelope = reduce(list.__add__, chunk_results)

    nice_variable_ids = [_nice_id(reaction) for reaction in variable_reactions]
    variable_reactions_ids = [reaction.id for reaction in variable_reactions]
    phase_plane = pandas.DataFrame(envelope,
                                   columns=(variable_reactions_ids +
                                            ['objective_lower_bound',
                                             'objective_upper_bound',
                                             'c_yield_lower_bound',
                                             'c_yield_upper_bound',
                                             'mass_yield_lower_bound',
                                             'mass_yield_upper_bound']))

    if objective is None: