Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_grid_search_multivariate_normal():
m = MultivariateNormalTransition()
m_grid = GridSearchCV(m, {"scaling": np.logspace(-5, 1.5, 5)}, n_jobs=1)
df, w = data(20)
m_grid.fit(df, w)
def test_one_with_one_without_parameters(population_strategy:
PopulationStrategy):
n = 10
kernels = []
df_without = pd.DataFrame(index=list(range(n)))
w_without = np.ones(n) / n
kernel_without = MultivariateNormalTransition()
kernel_without.fit(df_without, w_without)
kernels.append(kernel_without)
df_with = pd.DataFrame([{"s": np.random.rand()} for _ in range(n)])
w_with = np.ones(n) / n
kernel_with = MultivariateNormalTransition()
kernel_with.fit(df_with, w_with)
kernels.append(kernel_with)
population_strategy.update(kernels, np.array([.7, .3]), t=0)
assert population_strategy(t=0) > 0
def test_transitions_not_modified(population_strategy: PopulationStrategy):
n = 10
kernels = []
test_points = pd.DataFrame([{"s": np.random.rand()} for _ in range(n)])
for _ in range(2):
df = pd.DataFrame([{"s": np.random.rand()} for _ in range(n)])
w = np.ones(n) / n
kernel = MultivariateNormalTransition()
kernel.fit(df, w)
kernels.append(kernel)
test_weights = [k.pdf(test_points) for k in kernels]
population_strategy.update(kernels, np.array([.7, .2]))
after_adaptation_weights = [k.pdf(test_points) for k in kernels]
same = all((k1 == k2).all()
for k1, k2 in zip(test_weights, after_adaptation_weights))
err_msg = ("Population strategy {}"
" modified the transitions".format(population_strategy))
assert same, err_msg
def test_no_parameters(population_strategy: PopulationStrategy):
n = 10
df = pd.DataFrame(index=list(range(n)))
w = np.ones(n) / n
kernels = []
for _ in range(2):
kernel = MultivariateNormalTransition()
kernel.fit(df, w)
kernels.append(kernel)
population_strategy.update(kernels, np.array([.7, .3]), t=0)
assert population_strategy(t=0) > 0
def test_gaussian_multiple_populations_adpative_population_size(db_path, sampler):
sigma_x = 1
sigma_y = .5
y_observed = 2
def model(args):
return {"y": st.norm(args['x'], sigma_y).rvs()}
models = [model]
models = list(map(SimpleModel, models))
model_prior = RV("randint", 0, 1)
nr_populations = 4
population_size = AdaptivePopulationStrategy(600, nr_populations)
parameter_given_model_prior_distribution = [Distribution(x=RV("norm", 0, sigma_x))]
parameter_perturbation_kernels = [MultivariateNormalTransition()]
abc = ABCSMC(models, model_prior, ModelPerturbationKernel(1, probability_to_stay=1),
parameter_given_model_prior_distribution, parameter_perturbation_kernels,
PercentileDistanceFunction(measures_to_use=["y"]), MedianEpsilon(.2),
population_size,
sampler=sampler)
options = {'db_path': db_path}
abc.set_data({"y": y_observed}, 0, {}, options)
minimum_epsilon = -1
abc.do_not_stop_when_only_single_model_alive()
def test_two_competing_gaussians_single_population(db_path, sampler):
sigma_x = .5
sigma_y = .5
y_observed = 1
def model(args):
return {"y": st.norm(args['x'], sigma_y).rvs()}
models = [model, model]
models = list(map(SimpleModel, models))
model_prior = RV("randint", 0, 2)
population_size = ConstantPopulationStrategy(500, 1)
mu_x_1, mu_x_2 = 0, 1
parameter_given_model_prior_distribution = [Distribution(x=RV("norm", mu_x_1, sigma_x)),
Distribution(x=RV("norm", mu_x_2, sigma_x))]
parameter_perturbation_kernels = [MultivariateNormalTransition() for _ in range(2)]
abc = ABCSMC(models, model_prior, ModelPerturbationKernel(2, probability_to_stay=.7),
parameter_given_model_prior_distribution, parameter_perturbation_kernels,
PercentileDistanceFunction(measures_to_use=["y"]), MedianEpsilon(.02),
population_size,
sampler=sampler)
options = {'db_path': db_path}
abc.set_data({"y": y_observed}, 0, {}, options)
minimum_epsilon = -1
nr_populations = 1
abc.do_not_stop_when_only_single_model_alive()
history = abc.run(minimum_epsilon)
mp = history.get_model_probabilities(history.max_t)
def test_redis_catch_error():
def model(pars):
if np.random.uniform() < 0.1:
raise ValueError("error")
return {'s0': pars['p0'] + 0.2 * np.random.uniform()}
def distance(s0, s1):
return abs(s0['s0'] - s1['s0'])
prior = Distribution(p0=RV("uniform", 0, 10))
sampler = RedisEvalParallelSamplerServerStarter(
batch_size=3, workers=1, processes_per_worker=1, port=8775)
abc = ABCSMC(model, prior, distance, sampler=sampler, population_size=10)
db_file = "sqlite:///" + os.path.join(tempfile.gettempdir(), "test.db")
data = {'s0': 2.8}
abc.new(db_file, data)
abc.run(minimum_epsilon=.1, max_nr_populations=3)
sampler.cleanup()
sigma = .5
def model(args):
return {"y": st.norm(args['x'], sigma).rvs()}
# We define two models, but they are identical so far
models = [model, model]
models = list(map(SimpleModel, models))
# The prior over the model classes is uniform
model_prior = RV("randint", 0, 2)
# However, our models' priors are not the same. Their mean differs.
mu_x_1, mu_x_2 = 0, 1
parameter_given_model_prior_distribution = [Distribution(x=RV("norm", mu_x_1, sigma)),
Distribution(x=RV("norm", mu_x_2, sigma))]
# Particles are perturbed in a Gaussian fashion
parameter_perturbation_kernels = [MultivariateNormalTransition() for _ in range(2)]
# We plug all the ABC setup together
nr_populations = 3
population_size = AdaptivePopulationStrategy(400, 3, mean_cv=0.05)
abc = ABCSMC(models, model_prior, ModelPerturbationKernel(2, probability_to_stay=.7),
parameter_given_model_prior_distribution, parameter_perturbation_kernels,
PercentileDistanceFunction(measures_to_use=["y"]), MedianEpsilon(.2),
population_size,
sampler=sampler)
# Finally we add meta data such as model names and define where to store the results
options = {'db_path': db_path}
# y_observed is the important piece here: our actual observation.
def test_gaussian_single_population(db_path, sampler):
sigma_prior = 1
sigma_ground_truth = 1
observed_data = 1
def model(args):
return {"y": st.norm(args['x'], sigma_ground_truth).rvs()}
models = [model]
models = list(map(SimpleModel, models))
model_prior = RV("randint", 0, 1)
nr_populations = 1
population_size = ConstantPopulationStrategy(600, nr_populations)
parameter_given_model_prior_distribution = [Distribution(x=RV("norm", 0, sigma_prior))]
parameter_perturbation_kernels = [MultivariateNormalTransition()]
abc = ABCSMC(models, model_prior, ModelPerturbationKernel(1, probability_to_stay=1),
parameter_given_model_prior_distribution, parameter_perturbation_kernels,
PercentileDistanceFunction(measures_to_use=["y"]), MedianEpsilon(.1),
population_size,
sampler=sampler)
options = {'db_path': db_path}
abc.set_data({"y": observed_data}, 0, {}, options)
minimum_epsilon = -1
abc.do_not_stop_when_only_single_model_alive()
history = abc.run(minimum_epsilon)
posterior_x, posterior_weight = history.get_results_distribution(0, "x")
def test_gaussian_multiple_populations_adpative_population_size(db_path, sampler):
sigma_x = 1
sigma_y = .5
y_observed = 2
def model(args):
return {"y": st.norm(args['x'], sigma_y).rvs()}
models = [model]
models = list(map(SimpleModel, models))
model_prior = RV("randint", 0, 1)
nr_populations = 4
population_size = AdaptivePopulationStrategy(600, nr_populations)
parameter_given_model_prior_distribution = [Distribution(x=RV("norm", 0, sigma_x))]
parameter_perturbation_kernels = [MultivariateNormalTransition()]
abc = ABCSMC(models, model_prior, ModelPerturbationKernel(1, probability_to_stay=1),
parameter_given_model_prior_distribution, parameter_perturbation_kernels,
PercentileDistanceFunction(measures_to_use=["y"]), MedianEpsilon(.2),
population_size,
sampler=sampler)
options = {'db_path': db_path}
abc.set_data({"y": y_observed}, 0, {}, options)
minimum_epsilon = -1
abc.do_not_stop_when_only_single_model_alive()
history = abc.run(minimum_epsilon)
posterior_x, posterior_weight = history.get_results_distribution(0, "x")
sort_indices = sp.argsort(posterior_x)