Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"out_file": {
"type": "string",
"default": "output.csv"}}
output_filename = params["out_file"]["default"]
output_columns = ["u"]
encoder = uq.encoders.GenericEncoder(
template_fname=f'tests/sc/sc.template',
delimiter='$',
target_filename='sc_in.json')
decoder = uq.decoders.SimpleCSV(target_filename=output_filename,
output_columns=output_columns,
header=0)
collater = uq.collate.AggregateSamples(average=False)
vary = {
"Pe": cp.Uniform(100.0, 200.0),
"f": cp.Normal(1.0, 0.1)
}
sampler = uq.sampling.SCSampler(vary=vary, polynomial_order=1)
actions = uq.actions.ExecuteLocal(f"tests/sc/sc_model.py sc_in.json")
stats = uq.analysis.SCAnalysis(sampler=sampler, qoi_cols=output_columns)
campaign(tmpdir, 'sc', 'sc', params, encoder, decoder, sampler,
collater, actions, stats, vary, 0, 1)
decoder=decoder)
# Set the active app to be cannonsim (this is redundant when only one app
# has been added)
my_campaign.set_app("cannonsim")
# Create a collation element for this campaign
collater = uq.collate.AggregateSamples(average=False)
my_campaign.set_collater(collater)
print("Serialized collation:", collater.serialize())
# Make a random sampler
vary = {
"angle": cp.Uniform(0.0, 1.0),
"height": cp.Uniform(2.0, 10.0),
"velocity": cp.Normal(10.0, 1.0),
"mass": cp.Uniform(5.0, 1.0)
}
sampler1 = uq.sampling.RandomSampler(vary=vary)
print("Serialized sampler:", sampler1.serialize())
# Set the campaign to use this sampler
my_campaign.set_sampler(sampler1)
# Draw 5 samples
my_campaign.draw_samples(num_samples=5)
# Print the list of runs now in the campaign db
print("List of runs added:")
pprint(my_campaign.list_runs())
print("---")
def test_get_error(self):
parameter_list = [["gbar_Na", 120, cp.Uniform(110, 130)],
["gbar_K", 36, cp.Normal(36, 1)],
["gbar_L", 0.3, cp.Chi(1, 1, 0.3)]]
self.parameters = Parameters(parameter_list)
with self.assertRaises(AttributeError):
self.parameters.get("not_a_parameter")
target_filename='output.csv', output_columns=[
'Dist', 'lastvx', 'lastvy'], header=0)
# Create a collation element for this campaign
collater = uq.collate.AggregateSamples(average=False)
actions = uq.actions.ExecuteLocal("/home/hpc/pn69ju/di73kuj2/cannonsim/bin/cannonsim in.cannon output.csv")
campaign.add_app(name='cannonsim',
params=params,
encoder=encoder,
decoder=decoder,
collater=collater)
stats = uq.analysis.BasicStats(qoi_cols=['Dist', 'lastvx', 'lastvy'])
# Make a random sampler
vary = {
"angle": cp.Uniform(0.0, 1.0),
"height": cp.Uniform(2.0, 10.0),
"velocity": cp.Normal(10.0, 1.0),
"mass": cp.Uniform(5.0, 1.0)
}
sampler = uq.sampling.RandomSampler(vary=vary)
campaign.set_sampler(sampler)
campaign.draw_samples(num_samples=56, replicas=1)
cluster = SLURMCluster(job_extra=['--cluster=mpp2'], queue='mpp2_batch', cores=56, processes=56, memory='64 GB')
print(cluster.job_script())
cluster.scale(4)
client = Client(cluster)
campaign.populate_runs_dir()
campaign.apply_for_each_run_dir(actions, client)
campaign.collate()
campaign.apply_analysis(stats)
print(campaign.get_last_analysis())
def test_iter(self):
parameter_list = [["gbar_Na", 120, cp.Uniform(110, 130)],
["gbar_K", 36, cp.Normal(36, 1)],
["gbar_L", 0.3, cp.Chi(1, 1, 0.3)]]
parameters = Parameters(parameter_list)
result = [parameter for parameter in parameters]
self.assertEqual(len(result), 3)
self.assertIsInstance(result[0], Parameter)
self.assertIsInstance(result[1], Parameter)
self.assertIsInstance(result[2], Parameter)
def test_dist():
dist = [cp.Normal()]
for d in range(dim-1):
dist.append(cp.Normal(dist[-1]))
dist = cp.J(*dist)
out = dist.sample(samples)
out = dist.fwd(out)
out = dist.inv(out)
def test_init_list_only_chaospy(self):
parameter_list = [["gbar_Na", cp.Uniform(110, 130)],
["gbar_K", cp.Normal(36, 1)],
["gbar_L", cp.Chi(1, 1, 0.3)]]
parameters = Parameters(parameter_list)
self.assertIsInstance(parameters, Parameters)
self.assertIsInstance(parameters["gbar_Na"], Parameter)
self.assertIsInstance(parameters["gbar_K"], Parameter)
self.assertIsInstance(parameters["gbar_L"], Parameter)
self.assertIsNone(parameters["gbar_Na"].value)
self.assertIsNone(parameters["gbar_K"].value)
self.assertIsNone(parameters["gbar_L"].value)
self.assertIsInstance(parameters["gbar_Na"].distribution, cp.Dist)
self.assertIsInstance(parameters["gbar_K"].distribution, cp.Dist)
# Add the cannonsim app
my_campaign.add_app(name="cannonsim",
params=params,
encoder=encoder,
decoder=decoder,
collater=collater)
# Set the active app to be cannonsim (this is redundant when only one app
# has been added)
my_campaign.set_app("cannonsim")
# Make a random sampler
vary = {
"angle": cp.Uniform(0.0, 1.0),
"height": cp.DiscreteUniform(0, 100),
"velocity": cp.Normal(10.0, 1.0),
"mass": cp.Uniform(5.0, 1.0)
}
sampler1 = uq.sampling.RandomSampler(vary=vary)
print("Serialized sampler:", sampler1.serialize())
# Set the campaign to use this sampler
my_campaign.set_sampler(sampler1)
# Draw 5 samples
my_campaign.draw_samples(num_samples=5)
# Print the list of runs now in the campaign db
print("List of runs added:")
pprint(my_campaign.list_runs())
print("---")
See also
--------
uncertainpy.Data
uncertainpy.Parameters
"""
uncertain_parameters = self.convert_uncertain_parameters(uncertain_parameters)
distribution = self.create_distribution(uncertain_parameters=uncertain_parameters)
# Create the Multivariate normal distribution
dist_R = []
for parameter in uncertain_parameters:
dist_R.append(cp.Normal())
dist_R = cp.J(*dist_R)
P = cp.orth_ttr(polynomial_order, dist_R)
if quadrature_order is None:
quadrature_order = polynomial_order + 2
nodes_R, weights_R = cp.generate_quadrature(quadrature_order,
dist_R,
rule="J",
sparse=True)
nodes = distribution.inv(dist_R.fwd(nodes_R))
# weights = weights_R*distribution.pdf(nodes)/dist_R.pdf(nodes_R)
def samples(x, y):
# First distributions: vary the mean
mu1 = (y - 50.)**2 / 500.
sig1 = 0.2
dist1 = cp.Normal(mu1, sig1)
# Second distributions: vary the std
mu2 = 2.5
sig2 = 0.1 + 0.01 * y
dist2 = cp.Normal(mu2, sig2)
# Probabily densities
p1 = dist1.pdf(x)
p2 = dist2.pdf(x)
# Cummulative distributions
dx = x[-1] - x[0]
c1 = dx * dist1.cdf(x)
c2 = dx * dist2.cdf(x)
return p1, p2, c1, c2