Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
period_c = np.sqrt(c_params[0]**3/mtot)
period_b = np.sqrt(b_params[0]**3/mtot)
epochs = np.linspace(0, period_c*365.25, 100) + tau_ref_epoch # the full period of c, MJD
ra_model, dec_model, vz_model = kepler.calc_orbit(epochs, b_params[0], b_params[1], b_params[2], b_params[3], b_params[4], b_params[5], plx, mtot, tau_ref_epoch=tau_ref_epoch)
# generate some fake measurements just to feed into system.py to test bookkeeping
# just make a 1 planet fit for now
t = table.Table([epochs, np.ones(epochs.shape, dtype=int), ra_model, np.zeros(ra_model.shape), dec_model, np.zeros(dec_model.shape)],
names=["epoch", "object" ,"raoff", "raoff_err","decoff","decoff_err"])
filename = os.path.join(orbitize.DATADIR, "multiplanet_fake_1planettest.csv")
t.write(filename, overwrite=True)
# create the orbitize system and generate model predictions using the standard 1 body model for b, and the 2 body model for b and c
astrom_dat = read_input.read_file(filename)
sys_1body = system.System(1, astrom_dat, m0, plx, tau_ref_epoch=tau_ref_epoch, fit_secondary_mass=True)
sys_2body = system.System(2, astrom_dat, m0, plx, tau_ref_epoch=tau_ref_epoch, fit_secondary_mass=True)
# model predictions for the 1 body case
# we had put one measurements of planet b in the data table, so compute_model only does planet b measurements
params = np.append(b_params, [plx, mass_b, m0])
radec_1body, _ = sys_1body.compute_model(params)
ra_1body = radec_1body[:, 0]
dec_1body = radec_1body[:, 1]
# model predictions for the 2 body case
# still only generates predictions of b's location, but including the perturbation for c
params = np.append(b_params, np.append(c_params, [plx, mass_b, mass_c, m0]))
radec_2body, _ = sys_2body.compute_model(params)
ra_2body = radec_2body[:, 0]
def test_custom_likelihood():
"""
Tests the inclusion of a custom likelihood function in the code
"""
# use the test_csv dir
testdir = orbitize.DATADIR
input_file = os.path.join(testdir, 'GJ504.csv')
data_table = read_input.read_file(input_file)
# Manually set 'object' column of data table
data_table['object'] = 1
# construct the system
orbit = system.System(1, data_table, 1, 0.01)
# construct custom likelihood function
def my_likelihood(params):
return -5
# construct sampler
n_walkers = 100
mcmc1 = sampler.MCMC(orbit, 0, n_walkers, num_threads=1)
mcmc2 = sampler.MCMC(orbit, 0, n_walkers, num_threads=1, custom_lnlike=my_likelihood)
param = np.array([2, 0.5, 0.5, 0.5, 0.5, 0.5, 1, 0.01])
def test_add_and_clear_results():
num_secondary_bodies=1
testdir = os.path.dirname(os.path.abspath(__file__))
input_file = os.path.join(testdir, 'test_val.csv')
data_table=read_input.read_file(input_file)
system_mass=1.0
plx=10.0
mass_err=0.1
plx_err=1.0
# Initialize System object
test_system = system.System(
num_secondary_bodies, data_table, system_mass,
plx, mass_err=mass_err, plx_err=plx_err
)
# Initialize dummy results.Results object
test_results = results.Results()
# Add one result object
test_system.add_results(test_results)
assert len(test_system.results)==1
# Adds second result object
test_system.add_results(test_results)
def test_convert_data_table_radec2seppa():
num_secondary_bodies=1
testdir = os.path.dirname(os.path.abspath(__file__))
input_file = os.path.join(testdir, 'test_val.csv')
data_table=read_input.read_file(input_file)
system_mass=1.0
plx=10.0
mass_err=0.1
plx_err=1.0
# Initialize System object
test_system = system.System(
num_secondary_bodies, data_table, system_mass,
plx, mass_err=mass_err, plx_err=plx_err
)
test_system.convert_data_table_radec2seppa()
def test_ensemble_mcmc_runs(num_threads=1):
"""
Tests the EnsembleMCMC sampler by making sure it even runs
"""
# use the test_csv dir
testdir = os.path.dirname(os.path.abspath(__file__))
input_file = os.path.join(testdir, 'test_val.csv')
data_table = read_input.read_file(input_file)
# Manually set 'object' column of data table
data_table['object'] = 1
# construct the system
orbit = system.System(1, data_table, 1, 0.01)
# construct sampler
n_walkers=100
mcmc = sampler.MCMC(orbit, 0, n_walkers, num_threads=num_threads)
# run it a little (tests 0 burn-in steps)
mcmc.run_sampler(100)
# run it a little more
mcmc.run_sampler(1000, burn_steps=1)
# run it a little more (tests adding to results object)
# perturb b due to c
ra_model_c += mass_b/m0 * ra_model_b_orig
dec_model_c += mass_b/m0 * dec_model_b_orig
# generate some fake measurements to fit to. Make it with b first
t = table.Table([epochs, np.ones(epochs.shape, dtype=int), ra_model_b, 0.00001 * np.ones(epochs.shape, dtype=int), dec_model_b, 0.00001 * np.ones(epochs.shape, dtype=int)],
names=["epoch", "object" ,"raoff", "raoff_err","decoff","decoff_err"])
# add c
for eps, ra, dec in zip(epochs, ra_model_c, dec_model_c):
t.add_row([eps, 2, ra, 0.000001, dec, 0.000001])
filename = os.path.join(orbitize.DATADIR, "multiplanet_fake_2planettest.csv")
t.write(filename, overwrite=True)
# create the orbitize system and generate model predictions using the standard 1 body model for b, and the 2 body model for b and c
astrom_dat = read_input.read_file(filename)
sys = system.System(2, astrom_dat, m0, plx, tau_ref_epoch=tau_ref_epoch, fit_secondary_mass=True)
# fix most of the orbital parameters to make the dimensionality a bit smaller
sys.sys_priors[sys.param_idx['ecc1']] = b_params[1]
sys.sys_priors[sys.param_idx['inc1']] = b_params[2]
sys.sys_priors[sys.param_idx['aop1']] = b_params[3]
sys.sys_priors[sys.param_idx['pan1']] = b_params[4]
sys.sys_priors[sys.param_idx['ecc2']] = c_params[1]
sys.sys_priors[sys.param_idx['inc2']] = c_params[2]
sys.sys_priors[sys.param_idx['aop2']] = c_params[3]
sys.sys_priors[sys.param_idx['pan2']] = c_params[4]
sys.sys_priors[sys.param_idx['m1']] = priors.LogUniformPrior(mass_b*0.01, mass_b*100)
sys.sys_priors[sys.param_idx['m2']] = priors.LogUniformPrior(mass_c*0.01, mass_c*100)
def __init__(self, input_data, sampler_str,
num_secondary_bodies, system_mass, plx,
mass_err=0, plx_err=0, lnlike='chi2_lnlike',
system_kwargs=None, mcmc_kwargs=None):
# Read in data
# Try to interpret input as a filename first
try:
data_table = orbitize.read_input.read_file(input_data)
except:
try:
# Check if input might be an orbitize style astropy.table.Table
if 'quant_type' in input_data.columns:
data_table = input_data.copy()
except:
raise Exception('Invalid value of input_data for Driver')
if system_kwargs is None:
system_kwargs = {}
# Initialize System object which stores data & sets priors
self.system = orbitize.system.System(
num_secondary_bodies, data_table, system_mass,
plx, mass_err=mass_err, plx_err=plx_err, **system_kwargs
)
num_secondary_bodies=1
system_mass=1.75 # Msol
plx=51.44 #mas
mass_err=0.05 # Msol
plx_err=0.12 #mas
# Sampler parameters
likelihood_func_name='chi2_lnlike'
n_temps=20
n_walkers=1000
n_threads=mp.cpu_count()
total_orbits=10000 # n_walkers x num_steps_per_walker
burn_steps=1
# Read in data
data_table = read_input.read_formatted_file(datafile)
# convert to mas
data_table['quant1'] *= 1000
data_table['quant1_err'] *= 1000
data_table['quant2'] *= 1000
data_table['quant2_err'] *= 1000
data_table['epoch'] -= 50000
# Initialize System object which stores data & sets priors
bP_system = system.System(
num_secondary_bodies, data_table, system_mass,
plx, mass_err=mass_err, plx_err=plx_err
)
# We could overwrite any priors we want to here.
# Using defaults for now.