Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""
from pyodesys.results import Result
if isinstance(xyp, Result):
xyp = xyp.odesys.to_arrays(xyp.xout, xyp.yout, xyp.params, reshape=False)
if varied is None:
varied = xyp[0]
if xyp[1].shape[-2] != varied.size:
raise ValueError("Size mismatch between varied and yout")
if substance_keys is None:
substance_keys = rsys.substances.keys()
if axes is None:
_fig, axes = plt.subplots(len(substance_keys))
rates = rate_exprs_cb(*xyp)
if unit_registry is not None:
time_unit = get_derived_unit(unit_registry, 'time')
conc_unit = get_derived_unit(unit_registry, 'concentration')
rates = to_unitless(rates*conc_unit/time_unit, u.molar/u.second)
eqk1, eqk2, eqs = _combine_rxns_to_eq(rsys) if combine_equilibria else ([], [], [])
for sk, ax in zip(substance_keys, axes):
data, tot = _dominant_reaction_effects(sk, rsys, rates, linthreshy, eqk1, eqk2, eqs)
factor = 1/xyp[1][:, rsys.as_substance_index(sk)] if relative else 1
if total:
ax.plot(varied, factor*tot, c='k', label='Total', linewidth=2, ls=':')
for y, rxn in sorted(data, key=lambda args: args[0][-1], reverse=True):
ax.plot(varied, factor*y,
label=r'$\mathrm{%s}$' % rxn.latex(rsys.substances))
if rsys.substances[sk].latex_name is None:
ttl = rsys.substances[sk].name
def _get_derived_unit(reg, key):
try:
return get_derived_unit(reg, key)
except KeyError:
return get_derived_unit(reg, '_'.join(key.split('_')[:-1]))
def _mk_dedim(unit_registry):
unit_time = get_derived_unit(unit_registry, 'time')
unit_conc = get_derived_unit(unit_registry, 'concentration')
def dedim_tcp(t, c, p, param_unit=lambda k, v: default_unit_in_registry(v, unit_registry)):
_t = to_unitless(t, unit_time)
_c = to_unitless(c, unit_conc)
_p, pu = {}, {}
for k, v in p.items():
pu[k] = param_unit(k, v)
_p[k] = to_unitless(v, pu[k])
return (_t, _c, _p), dict(unit_time=unit_time, unit_conc=unit_conc, param_units=pu)
return locals()
return '\\ensuremath{' + s + '}'
else:
return s
lines = []
for ri, rxn in enumerate(rsys.rxns):
rxn_ref = rxn.ref
if isinstance(rxn.param, RadiolyticBase):
if unit_registry is not None:
kunit = get_derived_unit(unit_registry, 'radiolytic_yield')
k = k_fmt % to_unitless(rxn.param.args[0], kunit)
k_unit_str = (kunit.dimensionality.latex.strip('$') if tex
else kunit.dimensionality)
else:
if unit_registry is not None:
kunit = (get_derived_unit(unit_registry,
'concentration')**(1-rxn.order()) /
get_derived_unit(unit_registry, 'time'))
try:
k = k_fmt % to_unitless(rxn.param, kunit)
k_unit_str = (kunit.dimensionality.latex.strip('$') if tex
else kunit.dimensionality)
except Exception:
k, k_unit_str = rxn.param.equation_as_string(k_fmt, tex)
else:
k_unit_str = '-'
if isinstance(k_fmt, str):
k = k_fmt % rxn.param
else:
k = k_fmt(rxn.param)
latex_kw = dict(with_param=False, with_name=False)
if tex:
if s.startswith('doi:'):
return _doi(s[4:])
return s
def _wrap(s):
if tex:
return '\\ensuremath{' + s + '}'
else:
return s
lines = []
for ri, rxn in enumerate(rsys.rxns):
rxn_ref = rxn.ref
if isinstance(rxn.param, RadiolyticBase):
if unit_registry is not None:
kunit = get_derived_unit(unit_registry, 'radiolytic_yield')
k = k_fmt % to_unitless(rxn.param.args[0], kunit)
k_unit_str = (kunit.dimensionality.latex.strip('$') if tex
else kunit.dimensionality)
else:
if unit_registry is not None:
kunit = (get_derived_unit(unit_registry,
'concentration')**(1-rxn.order()) /
get_derived_unit(unit_registry, 'time'))
try:
k = k_fmt % to_unitless(rxn.param, kunit)
k_unit_str = (kunit.dimensionality.latex.strip('$') if tex
else kunit.dimensionality)
except Exception:
k, k_unit_str = rxn.param.equation_as_string(k_fmt, tex)
else:
k_unit_str = '-'
return s
lines = []
for ri, rxn in enumerate(rsys.rxns):
rxn_ref = rxn.ref
if isinstance(rxn.param, RadiolyticBase):
if unit_registry is not None:
kunit = get_derived_unit(unit_registry, 'radiolytic_yield')
k = k_fmt % to_unitless(rxn.param.args[0], kunit)
k_unit_str = (kunit.dimensionality.latex.strip('$') if tex
else kunit.dimensionality)
else:
if unit_registry is not None:
kunit = (get_derived_unit(unit_registry,
'concentration')**(1-rxn.order()) /
get_derived_unit(unit_registry, 'time'))
try:
k = k_fmt % to_unitless(rxn.param, kunit)
k_unit_str = (kunit.dimensionality.latex.strip('$') if tex
else kunit.dimensionality)
except Exception:
k, k_unit_str = rxn.param.equation_as_string(k_fmt, tex)
else:
k_unit_str = '-'
if isinstance(k_fmt, str):
k = k_fmt % rxn.param
else:
k = k_fmt(rxn.param)
latex_kw = dict(with_param=False, with_name=False)
if tex:
latex_kw['substances'] = rsys.substances
latex_kw['Reaction_around_arrow'] = ('}}' + coldelim + '\\ensuremath{{',
def _mk_dedim(unit_registry):
unit_time = get_derived_unit(unit_registry, 'time')
unit_conc = get_derived_unit(unit_registry, 'concentration')
def dedim_tcp(t, c, p, param_unit=lambda k, v: default_unit_in_registry(v, unit_registry)):
_t = to_unitless(t, unit_time)
_c = to_unitless(c, unit_conc)
_p, pu = {}, {}
for k, v in p.items():
pu[k] = param_unit(k, v)
_p[k] = to_unitless(v, pu[k])
return (_t, _c, _p), dict(unit_time=unit_time, unit_conc=unit_conc, param_units=pu)
return locals()
param_names_for_odesys = all_pk_with_unique
if unit_registry is None:
p_units = None
else:
# We need to make rsys_params unitless and create
# a pre- & post-processor for SymbolicSys
pk_units = [_get_derived_unit(unit_registry, k) for k in all_pk]
p_units = pk_units if include_params else (pk_units + [unique_units[k] for k in unique])
new_r_exprs = []
for ratex in r_exprs:
_pu, _new_ratex = ratex.dedimensionalisation(unit_registry)
new_r_exprs.append(_new_ratex)
r_exprs = new_r_exprs
time_unit = get_derived_unit(unit_registry, 'time')
conc_unit = get_derived_unit(unit_registry, 'concentration')
def post_processor(x, y, p):
time = x*time_unit
if output_time_unit is not None:
time = rescale(time, output_time_unit)
conc = y*conc_unit
if output_conc_unit is not None:
conc = rescale(conc, output_conc_unit)
return time, conc, np.array([elem*p_unit for elem, p_unit in zip(p.T, p_units)], dtype=object).T
kwargs['to_arrays_callbacks'] = (
lambda x: to_unitless(x, time_unit),
lambda y: to_unitless(y, conc_unit),
lambda p: np.array([to_unitless(px, p_unit) for px, p_unit in zip(
p.T if hasattr(p, 'T') else p, p_units)]).T
def of_quantity(cls, quantity_name, *args, **kwargs):
instance = cls(get_derived_unit(SI_base_registry, quantity_name), *args, **kwargs)
instance.quantity_name = quantity_name
return instance