Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_log_prior_uniform():
post = np.array([[3.0, 2.0], [3.1, 1.0], [3.6, 1.5]])
prior = np.array([3.5, 0.0])
priorlow = np.array([0.0, 0.0])
priorup = np.array([0.0, 0.0])
pstep = np.array([1.0, 1.0])
log_prior = ms.log_prior(post, prior, priorlow, priorup, pstep)
np.testing.assert_equal(log_prior, np.array([0.0, 0.0, 0.0]))
def test_log_prior_single_sample():
params = np.array([3.0, 2.0])
prior = np.array([3.5, 0.0])
priorlow = np.array([0.1, 0.0])
priorup = np.array([0.1, 0.0])
pstep = np.array([1.0, 1.0])
log_prior = ms.log_prior(params, prior, priorlow, priorup, pstep)
np.testing.assert_allclose(log_prior, -12.5)
def test_log_prior_fixed_params():
post = np.array([[3.0, 2.0], [3.1, 1.0], [3.6, 1.5]])
prior = np.array([3.5, 0.0, 0.0])
priorlow = np.array([0.1, 0.0, 0.0])
priorup = np.array([0.1, 0.0, 0.0])
pstep = np.array([1.0, 0.0, 1.0])
log_prior = ms.log_prior(post, prior, priorlow, priorup, pstep)
np.testing.assert_allclose(log_prior, np.array([-12.5, -8.0, -0.5]))
def test_log_prior_gaussian():
post = np.array([[3.0, 2.0], [3.1, 1.0], [3.6, 1.5]])
prior = np.array([3.5, 0.0])
priorlow = np.array([0.1, 0.0])
priorup = np.array([0.1, 0.0])
pstep = np.array([1.0, 1.0])
log_prior = ms.log_prior(post, prior, priorlow, priorup, pstep)
np.testing.assert_allclose(log_prior, np.array([-12.5, -8.0, -0.5]))
elif leastsq == 'trf':
lsfit = so.least_squares(residuals, fitparams,
bounds=(pmin[ifree], pmax[ifree]), args=args,
ftol=3e-16, xtol=3e-16, gtol=3e-16, method='trf')
params[ifree] = lsfit["x"]
resid = lsfit["fun"]
# Update shared parameters:
for s in ishare:
params[s] = params[-int(pstep[s])-1]
# Compute best-fit model:
best_model = func(params, *indparams)
# Calculate chi-squared for best-fitting values:
best_log_post = -0.5*np.sum(resid**2.0)
log_prior = ms.log_prior(params[ifree], prior, priorlow, priorup, pstep)
best_chisq = -2*(best_log_post - log_prior)
return {
'bestp':params,
'best_log_post':best_log_post,
'best_chisq':best_chisq,
'best_model':best_model,
'optimizer_res':lsfit,
}
# Evaluate model for best fitting parameters:
fitpars = np.asarray(params)
fitpars[ifree] = np.copy(bestp[ifree])
for s in ishare:
fitpars[s] = fitpars[-int(pstep[s])-1]
best_model = chains[0].eval_model(fitpars)
# Remove pre-MCMC and post-MCMC alocated samples:
zvalid = zchain>=0
Z = Z[zvalid]
zchain = zchain[zvalid]
log_post = log_post[zvalid]
log_prior = ms.log_prior(Z, prior, priorlow, priorup, pstep)
chisq = -2*(log_post - log_prior)
best_log_prior = ms.log_prior(bestp[ifree], prior, priorlow, priorup, pstep)
best_chisq = -2*(best_log_post.value - best_log_prior)
# And remove burn-in samples:
posterior, _, zmask = mu.burn(Z=Z, zchain=zchain, burnin=zburn)
# Number of evaluated and kept samples:
nsample = len(Z)*thinning
nzsample = len(posterior)
# Print out Summary:
log.msg('\nMCMC Summary:'
'\n-------------')
fmt = len(str(nsample))
log.msg("Number of evaluated samples: {:{}d}".
format(nsample, fmt), indent=2)
log.msg("Number of parallel chains: {:{}d}".
format(nchains, fmt), indent=2)
for chain in chains: # Make sure to terminate the subprocesses
chain.terminate()
# Evaluate model for best fitting parameters:
fitpars = np.asarray(params)
fitpars[ifree] = np.copy(bestp[ifree])
for s in ishare:
fitpars[s] = fitpars[-int(pstep[s])-1]
best_model = chains[0].eval_model(fitpars)
# Remove pre-MCMC and post-MCMC alocated samples:
zvalid = zchain>=0
Z = Z[zvalid]
zchain = zchain[zvalid]
log_post = log_post[zvalid]
log_prior = ms.log_prior(Z, prior, priorlow, priorup, pstep)
chisq = -2*(log_post - log_prior)
best_log_prior = ms.log_prior(bestp[ifree], prior, priorlow, priorup, pstep)
best_chisq = -2*(best_log_post.value - best_log_prior)
# And remove burn-in samples:
posterior, _, zmask = mu.burn(Z=Z, zchain=zchain, burnin=zburn)
# Number of evaluated and kept samples:
nsample = len(Z)*thinning
nzsample = len(posterior)
# Print out Summary:
log.msg('\nMCMC Summary:'
'\n-------------')
fmt = len(str(nsample))
log.msg("Number of evaluated samples: {:{}d}".
format(nsample, fmt), indent=2)
log.msg('Running dynesty dynamic nested-samping run:\n')
sampler = dynesty.DynamicNestedSampler(loglike, prior_transform, nfree,
**dyn_args)
sampler.run_nested(**kwargs)
weights = np.exp(sampler.results.logwt - sampler.results.logz[-1])
isample = resample_equal(weights)
posterior = sampler.results.samples[isample]
chisq = -2.0*sampler.results.logl[isample]
# Contribution to chi-square from non-uniform priors:
if skip_logp:
# TBD: Compute from prior_transform()
log_prior = 0.0
else:
log_prior = ms.log_prior(posterior, prior, priorlow, priorup, pstep)
log_post = -0.5*chisq + log_prior
# Best-fit statistics from sample:
ibest = np.argmin(log_post)
bestp = np.copy(params)
bestp[ifree] = posterior[ibest]
for s in ishare:
bestp[s] = bestp[-int(pstep[s])-1]
best_model = func(bestp, *indparams)
best_chisq = chisq[ibest]
best_log_post = log_post[ibest]
# Print out Summary:
log.msg("\nNested Sampling Summary:"
"\n------------------------")