Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Returns:
np.array: imputated dataset.
"""
# check if fitted then predict with least squares
check_is_fitted(self, "statistics_")
model = self.statistics_["param"]["model"]
labels = self.statistics_["param"]["labels"]
# add a Deterministic node for each missing value
# sampling then pulls from the posterior predictive distribution
# each missing data point. I.e. distribution for EACH missing
with model:
p_pred = pm.Deterministic(
"p_pred", pm.invlogit(model["alpha"] + model["beta"].dot(X.T))
)
tr = pm.sample(
self.sample,
tune=self.tune,
init=self.init,
**self.sample_kwargs
)
self.trace_ = tr
# decide how to impute. Use mean of posterior predictive or random draw
# not supported yet, but eventually consider using the MAP
if not self.fill_value or self.fill_value == "mean":
imp = tr["p_pred"].mean(0)
elif self.fill_value == "random":
imp = np.apply_along_axis(np.random.choice, 0, tr["p_pred"])
else:
err = f"{self.fill_value} must be 'mean' or 'random'."
raise ValueError(err)
def on_fit(self, X, y):
self.X = shared(X)
self.model_definition(model=self.model, X=self.X, y=y)
with self.model:
self.trace = pm.sample()
def _imp_bayes_logistic_reg(X, col_name, x, lm, imp_ix, fv, verbose,
sample=1000, tune=1000, init="auto", thresh=0.5):
"""Private method to perform bayesian logistic imputation."""
progress = _pymc3_logger(verbose)
model, labels = lm
# add a Deterministic node for each missing value
# sampling then pulls from the posterior predictive distribution
# of each of the missing data points. I.e. distribution for EACH missing
with model:
p_pred = pm.Deterministic(
"p_pred", pm.invlogit(model["alpha"] + model["beta"].dot(x.T))
)
tr = pm.sample(
sample=sample, tune=tune, init=init, progress_bar=progress
)
# decide how to impute. Use mean of posterior predictive or random draw
# not supported yet, but eventually consider using the MAP
if not fv or fv == "mean":
fills = tr["p_pred"].mean(0)
elif fv == "random":
fills = np.apply_along_axis(np.random.choice, 0, tr["p_pred"])
else:
err = f"{fv} not accepted reducer. Choose `mean` or `random`."
raise ValueError(err)
# convert probabilities to class membership
# then map class membership to corresponding label
fill_thresh = np.vectorize(lambda f: 1 if f > thresh else 0)
Args:
X (pd.DataFrame): predictors to determine imputed values.
Returns:
np.array: imputed dataset.
"""
# check if fitted then predict with least squares
check_is_fitted(self, "statistics_")
model = self.statistics_["param"]["model"]
df = self.statistics_["param"]["y_obs"]
df = df.reset_index(drop=True)
# generate posterior distribution for alpha, beta coefficients
with model:
tr = pm.sample(
self.sample,
tune=self.tune,
init=self.init,
**self.sample_kwargs
)
self.trace_ = tr
# sample random alpha from alpha posterior distribution
# get the mean and covariance of the multivariate betas
# betas assumed multivariate normal by linear reg rules
# sample beta w/ cov structure to create realistic variability
alpha_bayes = np.random.choice(tr["alpha"])
beta_means = tr["beta"].mean(0)
beta_cov = np.cov(tr["beta"].T)
beta_bayes = np.array(multivariate_normal(beta_means, beta_cov).rvs())
import pymc3 as pm
az.style.use('arviz-darkgrid')
# Data of the Eight Schools Model
J = 8
y = np.array([28., 8., -3., 7., -1., 1., 18., 12.])
sigma = np.array([15., 10., 16., 11., 9., 11., 10., 18.])
with pm.Model() as centered_eight:
mu = pm.Normal('mu', mu=0, sd=5)
tau = pm.HalfCauchy('tau', beta=5)
theta = pm.Normal('theta', mu=mu, sd=tau, shape=J)
obs = pm.Normal('obs', mu=theta, sd=sigma, observed=y)
centered_eight_trace = pm.sample()
az.parallelplot(centered_eight_trace, var_names=['theta', 'tau', 'mu'])
## Specify the model in PyMC
with pm.Model() as model:
# define the HyperPriors
muG = pm.Normal('muG', mu=2.3, tau=0.1)
tauG = pm.Gamma('tauG', 1, .5)
m = pm.Gamma('m', 1, .25)
d = pm.Gamma('d', 1, .5)
sG = m**2 / d**2
rG = m / d**2
# define the priors
tau = pm.Gamma('tau', sG, rG, shape=n_subj)
mu = pm.Normal('mu', mu=muG, tau=tauG, shape=n_subj)
# define the likelihood
y = pm.Normal('y', mu=mu[subj-1], tau=tau[subj-1], observed=y)
# Generate a MCMC chain
trace = pm.sample(2000)
# EXAMINE THE RESULTS
## Print summary for each trace
#pm.summary(trace)
## Check for mixing and autocorrelation
#pm.autocorrplot(trace, vars =[mu, tau])
## Plot KDE and sampled values for each parameter.
#pm.traceplot(trace)
## Extract chains
def run(n=2000):
if n == "short":
n = 50
import matplotlib.pyplot as plt
with model:
start = find_MAP(fmin=opt.fmin_powell)
trace = sample(n, Slice(), start=start)
plt.plot(x, y, 'x')
glm.plot_posterior_predictive(trace)
# plt.show()
Args:
X (pd.DataFrame): predictors to determine imputed values.
Returns:
np.array: imputed dataset.
"""
# check if fitted then predict with least squares
check_is_fitted(self, "statistics_")
model = self.statistics_["param"]["model"]
df = self.statistics_["param"]["y_obs"]
df = df.reset_index(drop=True)
# generate posterior distribution for alpha, beta coefficients
with model:
tr = pm.sample(
self.sample,
tune=self.tune,
init=self.init,
**self.sample_kwargs
)
self.trace_ = tr
# sample random alpha from alpha posterior distribution
# get the mean and covariance of the multivariate betas
# betas assumed multivariate normal by linear reg rules
# sample beta w/ cov structure to create realistic variability
alpha_bayes = np.random.choice(tr["alpha"])
beta_means = tr["beta"].mean(0)
beta_cov = np.cov(tr["beta"].T)
beta_bayes = np.array(multivariate_normal(beta_means, beta_cov).rvs())
# switchpoint location
idx = arange(years)
rate = rate_(switchpoint, early_mean, late_mean)
# Data likelihood
disasters = pm.Poisson('disasters', rate, observed=disasters_data)
# Use slice sampler for means
step1 = pm.Slice([early_mean, late_mean])
# Use Metropolis for switchpoint, since it accomodates discrete variables
step2 = pm.Metropolis([switchpoint])
# Initial values for stochastic nodes
start = {'early_mean': 2., 'late_mean': 3.}
tr = pm.sample(1000, tune=500, start=start, step=[step1, step2], cores=2)
pm.traceplot(tr)