Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
CHI2: lambda x: res_to_chi2(self.obj.get_res(x)),
SCHI2: lambda x: sres_to_schi2(*self.obj(
x, (0, 1,),
pypesto.objective.constants.MODE_RES
))
}
for var, fun in funs.items():
if not var == FVAL and not getattr(self.history_options,
f'trace_record_{var}'):
continue
for it in range(5):
x_full = xfull(start.history.get_x_trace(it))
val = getattr(start.history, f'get_{var}_trace')(it)
if np.all(np.isnan(val)):
continue
if var in [FVAL, CHI2]:
assert np.isclose(
val, fun(x_full),
), var
elif var in [RES]:
# note that we can expect slight deviations here since
# this res is computed without sensitivities while the
# result here may be computed with with sensitivies
# activated. If this fails to often, increase atol/rtol
assert np.allclose(
val, fun(x_full),
rtol=1e-3, atol=1e-4
), var
elif var in [SRES]:
assert np.allclose(
val, fun(x_full)[:, self.problem.x_free_indices],
), var
single ResultDict.
Parameters
----------
rvals:
results to aggregate
"""
# rvals are guaranteed to be consistent as _check_sensi_orders checks
# whether each objective can be called with the respective
# sensi_orders/mode
# sum over fval/grad/hess
result = {
key: sum(rval[key] for rval in rvals)
for key in [FVAL, CHI2, SCHI2, GRAD, HESS, HESSP]
if rvals[0].get(key, None) is not None
}
# extract rdatas and flatten
result[RDATAS] = []
for rval in rvals:
if RDATAS in rval:
result[RDATAS].extend(rval[RDATAS])
# initialize res and sres
if RES in rvals[0]:
res = np.asarray(rvals[0][RES])
else:
res = None
if SRES in rvals[0]:
def get_fval_trace(
self, ix: Union[int, Sequence[int], None]
) -> Union[Sequence[float], float]:
return list(self._trace[(FVAL, np.nan)].values[ix])
# amici_model,
# amici_solver,
# edatas,
# num_threads=min(n_threads, len(edatas)),
#)
sy = [rdata['sy'] for rdata in rdatas]
snllh = self.inner_solver.calculate_gradients(self.inner_problem,
x_inner_opt,
sim,
sy,
parameter_mapping,
x_ids,
amici_model,
snllh)
return {FVAL: nllh,
GRAD: snllh,
HESS: s2nllh,
RES: res,
SRES: sres,
RDATAS: rdatas
}
def call_unprocessed(
self,
x: np.ndarray,
sensi_orders: Tuple[int, ...],
mode: str
) -> ResultDict:
res = {}
for order in sensi_orders:
if order == 0:
res[FVAL] = self.neg_log_density(x)
elif order == 1:
res[GRAD] = self.gradient_neg_log_density(x)
elif order == 2:
res[HESS] = self.hessian_neg_log_density(x)
else:
ValueError(f'Invalid sensi order {order}.')
return res
chi2 += rdata['chi2']
res = np.hstack([res, rdata['res']]) \
if res.size else rdata['res']
if sensi_order > 0:
opt_sres = sim_sres_to_opt_sres(
x_ids,
par_sim_ids,
condition_map_sim_var,
rdata['sres'],
coefficient=1.0
)
sres = np.vstack([sres, opt_sres]) \
if sres.size else opt_sres
ret = {
FVAL: nllh,
CHI2: chi2,
GRAD: snllh,
HESS: s2nllh,
RES: res,
SRES: sres,
RDATAS: rdatas
}
return {
key: val
for key, val in ret.items()
if val is not None
}
def output_to_dict(
sensi_orders: Tuple[int, ...], mode: str, output_tuple: Tuple
) -> Dict:
"""
Convert output tuple to dict.
"""
output_dict = {}
index = 0
if not isinstance(output_tuple, tuple):
output_tuple = (output_tuple,)
if mode == MODE_FUN:
if 0 in sensi_orders:
output_dict[FVAL] = output_tuple[index]
index += 1
if 1 in sensi_orders:
output_dict[GRAD] = output_tuple[index]
index += 1
if 2 in sensi_orders:
output_dict[HESS] = output_tuple[index]
elif mode == MODE_RES:
if 0 in sensi_orders:
output_dict[RES] = output_tuple[index]
index += 1
if 1 in sensi_orders:
output_dict[SRES] = output_tuple[index]
return output_dict
rdatas: Sequence['amici.ReturnData'],
dim: int):
"""Default output upon error.
Returns values indicative of an error, that is with nan entries in all
vectors, and a function value, i.e. nllh, of `np.inf`.
"""
if not amici_model.nt():
nt = sum([data.nt() for data in edatas])
else:
nt = sum([data.nt() if data.nt() else amici_model.nt()
for data in edatas])
n_res = nt * amici_model.nytrue
return {
FVAL: np.inf,
GRAD: np.nan * np.ones(dim),
HESS: np.nan * np.ones([dim, dim]),
RES: np.nan * np.ones(n_res),
SRES: np.nan * np.ones([n_res, dim]),
RDATAS: rdatas
}
WLS = np.nan
print('Inner optimization not succeeded')
else:
# WLS = np.sum([optimal_surrogate[i]['fun'] for i in range(len(optimal_surrogate))])
# WLS = np.sqrt(WLS)
WLS = compute_WLS(optimal_surrogate, self.problem, edatas, rdatas)
print('cost function: ' + str(WLS))
#TODO: gradient computation
#if sensi_order > 0:
# snllh = compute_snllh(edatas, rdatas, optimal_scalings, obj.x_ids, obj.mapping_par_opt_to_par_sim, obj.dim)
# TODO compute FIM or HESS
# TODO RES, SRES should also be possible, right?
return {
FVAL: WLS,
GRAD: snllh,
HESS: s2nllh,
RES: res,
SRES: sres,
RDATAS: rdatas
}
# update steady state
if self.guess_steadystate and \
self.steadystate_guesses['fval'] < np.inf:
for data_ix in range(len(self.edatas)):
self.apply_steadystate_guess(data_ix, x_dct)
ret = self.calculator(
x_dct=x_dct, sensi_order=sensi_order, mode=mode,
amici_model=self.amici_model, amici_solver=self.amici_solver,
edatas=self.edatas, n_threads=self.n_threads,
x_ids=self.x_ids, parameter_mapping=self.parameter_mapping,
fim_for_hess=self.fim_for_hess,
)
nllh = ret[FVAL]
rdatas = ret[RDATAS]
# check whether we should update data for preequilibration guesses
if self.guess_steadystate and \
nllh <= self.steadystate_guesses['fval'] and \
nllh < np.inf:
self.steadystate_guesses['fval'] = nllh
for data_ix, rdata in enumerate(rdatas):
self.store_steadystate_guess(data_ix, x_dct, rdata)
return ret