Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_init_and_add_samples():
"""
Tests object creation and add_samples() with some simulated posterior samples
Returns results.Results object
"""
# Create object
results_obj = results.Results(
sampler_name='testing', tau_ref_epoch=50000,
labels=std_labels
)
# Simulate some sample draws, assign random likelihoods
n_orbit_draws1 = 1000
sim_post = simulate_orbit_sampling(n_orbit_draws1)
sim_lnlike = np.random.uniform(size=n_orbit_draws1)
# Test adding samples
results_obj.add_samples(sim_post, sim_lnlike, labels=std_labels)
# Simulate some more sample draws
n_orbit_draws2 = 2000
sim_post = simulate_orbit_sampling(n_orbit_draws2)
sim_lnlike = np.random.uniform(size=n_orbit_draws2)
# Test adding more samples
results_obj.add_samples(sim_post, sim_lnlike, labels=std_labels)
# Check shape of results.post
def test_save_and_load_results(results_to_test, has_lnlike=True):
"""
Tests saving and reloading of a results object
has_lnlike: allows for tests with and without lnlike values
(e.g. OFTI doesn't output lnlike)
"""
results_to_save = results_to_test
if not has_lnlike: # manipulate object to remove lnlike (as in OFTI)
results_to_save.lnlike = None
save_filename = 'test_results.h5'
# Save to file
results_to_save.save_results(save_filename)
# Create new blank results object and load from file
loaded_results = results.Results()
loaded_results.load_results(save_filename, append=False)
# Check if loaded results equal saved results
assert results_to_save.sampler_name == loaded_results.sampler_name
assert np.array_equal(results_to_save.post, loaded_results.post)
if has_lnlike:
assert np.array_equal(results_to_save.lnlike, loaded_results.lnlike)
# Try to load the saved results again, this time appending
loaded_results.load_results(save_filename, append=True)
# Now check that the loaded results object has the expected size
original_length = results_to_save.post.shape[0]
expected_length = original_length * 2
assert loaded_results.post.shape == (expected_length, 8)
assert loaded_results.labels.tolist() == std_labels
if has_lnlike:
assert loaded_results.lnlike.shape == (expected_length,)
def test_add_and_clear_results():
num_secondary_bodies=1
testdir = os.path.dirname(os.path.abspath(__file__))
input_file = os.path.join(testdir, 'test_val.csv')
data_table=read_input.read_file(input_file)
system_mass=1.0
plx=10.0
mass_err=0.1
plx_err=1.0
# Initialize System object
test_system = system.System(
num_secondary_bodies, data_table, system_mass,
plx, mass_err=mass_err, plx_err=plx_err
)
# Initialize dummy results.Results object
test_results = results.Results()
# Add one result object
test_system.add_results(test_results)
assert len(test_system.results)==1
# Adds second result object
test_system.add_results(test_results)
assert len(test_system.results)==2
# Clears result objects
test_system.clear_results()
assert len(test_system.results)==0
# Add one more result object
test_system.add_results(test_results)
assert len(test_system.results)==1
self.system.convert_data_table_radec2seppa(body_num=body_num)
# these are of type astropy.table.column
self.sep_observed = self.system.data_table[:]['quant1'].copy()
self.pa_observed = self.system.data_table[:]['quant2'].copy()
self.sep_err = self.system.data_table[:]['quant1_err'].copy()
self.pa_err = self.system.data_table[:]['quant2_err'].copy()
### this is OK, ONLY IF we are only using self.epochs for computing RA/Dec from Keplerian elements
self.epochs = np.array(self.system.data_table['epoch']) - self.system.tau_ref_epoch
# choose scale-and-rotate epoch
self.epoch_idx = np.argmin(self.sep_err) # epoch with smallest error
# create an empty results object
self.results = orbitize.results.Results(
sampler_name = self.__class__.__name__,
post = None,
lnlike = None,
tau_ref_epoch=self.system.tau_ref_epoch
)
n_chopped_steps = n_steps - trim - burn
# Update arrays in `sampler`: chain, lnlikes, lnlikes_alltemps (if PT), post
chopped_chain = chn[:, keep_start:keep_end, :]
chopped_lnlikes = lnlikes[:, keep_start:keep_end]
# Update current position if trimmed from edge
if trim > 0:
self.curr_pos = chopped_chain[:, -1, :]
# Flatten likelihoods and samples
flat_chopped_chain = chopped_chain.reshape(self.num_walkers*n_chopped_steps, n_params)
flat_chopped_lnlikes = chopped_lnlikes.reshape(self.num_walkers*n_chopped_steps)
# Update results object associated with this sampler
self.results = orbitize.results.Results(
sampler_name=self.__class__.__name__,
post=flat_chopped_chain,
lnlike=flat_chopped_lnlikes,
tau_ref_epoch=self.system.tau_ref_epoch,
labels=self.system.labels,
num_secondary_bodies=self.system.num_secondary_bodies
)
# Print a confirmation
print('Chains successfully chopped. Results object updated.')
def __init__(self, system, num_temps=20, num_walkers=1000, num_threads=1, like='chi2_lnlike', custom_lnlike=None):
super(MCMC, self).__init__(system, like=like, custom_lnlike=custom_lnlike)
self.num_temps = num_temps
self.num_walkers = num_walkers
self.num_threads = num_threads
# create an empty results object
self.results = orbitize.results.Results(
sampler_name=self.__class__.__name__,
post=None,
lnlike=None,
tau_ref_epoch=system.tau_ref_epoch,
num_secondary_bodies=system.num_secondary_bodies
)
if self.num_temps > 1:
self.use_pt = True
else:
self.use_pt = False
self.num_temps = 1
# get priors from the system class. need to remove and record fixed priors
self.priors = []
self.fixed_params = []
def __init__(self, system, num_temps=20, num_walkers=1000, num_threads=1, like='chi2_lnlike', custom_lnlike=None):
super(MCMC, self).__init__(system, like=like, custom_lnlike=custom_lnlike)
self.num_temps = num_temps
self.num_walkers = num_walkers
self.num_threads = num_threads
# create an empty results object
self.results = orbitize.results.Results(
sampler_name = self.__class__.__name__,
post = None,
lnlike = None,
tau_ref_epoch=system.tau_ref_epoch
)
if self.num_temps > 1:
self.use_pt = True
else:
self.use_pt = False
self.num_temps = 1
# get priors from the system class. need to remove and record fixed priors
self.priors = []
self.fixed_params = []
for i, prior in enumerate(system.sys_priors):