Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
References
----------
Blanes, Sergio, Fernando Casas, and J. M. Sanz-Serna. "Numerical
Integrators for the Hybrid Monte Carlo Method." SIAM Journal on
Scientific Computing 36, no. 4 (January 2014): A1556-80.
doi:10.1137/130932740.
Mannseth, Janne, Tore Selland Kleppe, and Hans J. Skaug. "On the
Application of Higher Order Symplectic Integrators in
Hamiltonian Monte Carlo." arXiv:1608.07048 [Stat],
August 25, 2016. http://arxiv.org/abs/1608.07048.
"""
epsilon = tt.scalar('epsilon')
epsilon.tag.test_value = 1.
a = floatX((3 - np.sqrt(3)) / 6)
p_ae = p + a * epsilon * q_grad
q_e2 = q + epsilon / 2 * H.pot.velocity(p_ae)
p_1ae = p_ae + (1 - 2 * a) * epsilon * H.dlogp(q_e2)
q_e = q_e2 + epsilon / 2 * H.pot.velocity(p_1ae)
grad_e = H.dlogp(q_e)
p_e = p_1ae + a * epsilon * grad_e
v_e = H.pot.velocity(p_e)
new_energy = energy(H, q_e, p_e)
f = theano.function(inputs=[q, p, q_grad, epsilon],
outputs=[q_e, p_e, v_e, grad_e, new_energy],
**theano_kwargs)
f.trust_input = True
return f
def random(self):
"""Draw random value from QuadPotential."""
n = floatX(normal(size=self.size))
n /= self.d_sqrt
n = self.factor.solve_Lt(n)
n = self.factor.apply_Pt(n)
return n
def __init__(self, nu=None, sigma=None, b=None, sd=None, *args, **kwargs):
super().__init__(*args, **kwargs)
if sd is not None:
sigma = sd
nu, b, sigma = self.get_nu_b(nu, b, sigma)
self.nu = nu = tt.as_tensor_variable(floatX(nu))
self.sigma = self.sd = sigma = tt.as_tensor_variable(floatX(sigma))
self.b = b = tt.as_tensor_variable(floatX(b))
self.mean = sigma * np.sqrt(np.pi / 2) * tt.exp((-nu**2 / (2 * sigma**2)) / 2) * ((1 - (-nu**2 / (2 * sigma**2)))
* tt.i0(-(-nu**2 / (2 * sigma**2)) / 2) - (-nu**2 / (2 * sigma**2)) * tt.i1(-(-nu**2 / (2 * sigma**2)) / 2))
self.variance = 2 * sigma**2 + nu**2 - (np.pi * sigma**2 / 2) * (tt.exp((-nu**2 / (2 * sigma**2)) / 2) * ((1 - (-nu**2 / (
2 * sigma**2))) * tt.i0(-(-nu**2 / (2 * sigma**2)) / 2) - (-nu**2 / (2 * sigma**2)) * tt.i1(-(-nu**2 / (2 * sigma**2)) / 2)))**2
tau = 1.
else:
tau = sd**-2.
else:
if sd is not None:
raise ValueError("Can't pass both tau and sd")
else:
sd = tau**-.5
# cast tau and sd to float in a way that works for both np.arrays
# and pure python
tau = 1. * tau
sd = 1. * sd
return (floatX(tau), floatX(sd))
def _init_uw_global_shared(start, global_RVs):
global_order = pm.ArrayOrdering([v for v in global_RVs])
start = {v.name: start[v.name] for v in global_RVs}
bij = pm.DictToArrayBijection(global_order, start)
u_start = bij.map(start)
w_start = np.zeros_like(u_start)
uw_start = floatX(np.concatenate([u_start, w_start]))
uw_global_shared = theano.shared(uw_start, 'uw_global_shared')
return uw_global_shared, bij
):
"""
Metropolis kernel
"""
deltas = np.squeeze(proposal(n_steps) * scaling)
for n_step in range(n_steps):
delta = deltas[n_step]
if any_discrete:
if all_discrete:
delta = np.round(delta, 0).astype("int64")
q_old = q_old.astype("int64")
q_new = (q_old + delta).astype("int64")
else:
delta[discrete] = np.round(delta[discrete], 0)
q_new = floatX(q_old + delta)
else:
q_new = floatX(q_old + delta)
ll = likelihood_logp(q_new)
new_tempered_logp = prior_logp(q_new) + ll * beta
q_old, accept = metrop_select(new_tempered_logp - old_tempered_logp, q_new, q_old)
if accept:
accepted += 1
old_tempered_logp = new_tempered_logp
return q_old, accepted
def forward_val(self, x, point=None):
# 2017-06-19
# the `self.a-0.` below is important for the testval to propagates
# For an explanation see pull/2328#issuecomment-309303811
a, b = draw_values([self.a - 0.0, self.b - 0.0], point=point)
return floatX(np.log(x - a) - np.log(b - x))
def __init__(self, approx, kernel=rbf, use_histogram=True, temperature=1):
self.approx = approx
self.temperature = floatX(temperature)
self._kernel_f = kernel
self.use_histogram = use_histogram
def __init__(self, psi, mu, alpha, *args, **kwargs):
super().__init__(*args, **kwargs)
self.mu = mu = tt.as_tensor_variable(floatX(mu))
self.alpha = alpha = tt.as_tensor_variable(floatX(alpha))
self.psi = psi = tt.as_tensor_variable(floatX(psi))
self.nb = NegativeBinomial.dist(mu, alpha)
self.mode = self.nb.mode
def _calc_elbo(vars, model, n_mcsamples, random_seed):
"""Calculate approximate ELBO.
"""
theano.config.compute_test_value = 'ignore'
shared = pm.make_shared_replacements(vars, model)
factors = [var.logpt for var in model.basic_RVs] + model.potentials
logpt = tt.add(*map(tt.sum, factors))
[logp], inarray = pm.join_nonshared_inputs([logpt], vars, shared)
uw = tt.vector('uw')
uw.tag.test_value = floatX(np.concatenate([inarray.tag.test_value,
inarray.tag.test_value]))
elbo = _elbo_t(logp, uw, inarray, n_mcsamples, random_seed)
return elbo, shared