Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
epochs = np.linspace(0, period_c*365.25, 100) + tau_ref_epoch # the full period of c, MJD
ra_model, dec_model, vz_model = kepler.calc_orbit(epochs, b_params[0], b_params[1], b_params[2], b_params[3], b_params[4], b_params[5], plx, mtot, tau_ref_epoch=tau_ref_epoch)
# generate some fake measurements just to feed into system.py to test bookkeeping
# just make a 1 planet fit for now
t = table.Table([epochs, np.ones(epochs.shape, dtype=int), ra_model, np.zeros(ra_model.shape), dec_model, np.zeros(dec_model.shape)],
names=["epoch", "object" ,"raoff", "raoff_err","decoff","decoff_err"])
filename = os.path.join(orbitize.DATADIR, "multiplanet_fake_1planettest.csv")
t.write(filename, overwrite=True)
# create the orbitize system and generate model predictions using the standard 1 body model for b, and the 2 body model for b and c
astrom_dat = read_input.read_file(filename)
sys_1body = system.System(1, astrom_dat, m0, plx, tau_ref_epoch=tau_ref_epoch, fit_secondary_mass=True)
sys_2body = system.System(2, astrom_dat, m0, plx, tau_ref_epoch=tau_ref_epoch, fit_secondary_mass=True)
# model predictions for the 1 body case
# we had put one measurements of planet b in the data table, so compute_model only does planet b measurements
params = np.append(b_params, [plx, mass_b, m0])
radec_1body, _ = sys_1body.compute_model(params)
ra_1body = radec_1body[:, 0]
dec_1body = radec_1body[:, 1]
# model predictions for the 2 body case
# still only generates predictions of b's location, but including the perturbation for c
params = np.append(b_params, np.append(c_params, [plx, mass_b, mass_c, m0]))
radec_2body, _ = sys_2body.compute_model(params)
ra_2body = radec_2body[:, 0]
dec_2body = radec_2body[:, 1]
def test_scale_and_rotate():
# perform scale-and-rotate
myDriver = orbitize.driver.Driver(input_file, 'OFTI',
1, 1.22, 56.95, mass_err=0.08, plx_err=0.26)
s = myDriver.sampler
samples = s.prepare_samples(100)
sma, ecc, inc, argp, lan, tau, plx, mtot = [samp for samp in samples]
ra, dec, vc = orbitize.kepler.calc_orbit(s.epochs, sma, ecc, inc, argp, lan, tau, plx, mtot)
sep, pa = orbitize.system.radec2seppa(ra, dec)
sep_sar, pa_sar = np.median(sep[s.epoch_idx]), np.median(pa[s.epoch_idx])
# test to make sure sep and pa scaled to scale-and-rotate epoch
sar_epoch = s.system.data_table[s.epoch_idx]
assert sep_sar == pytest.approx(sar_epoch['quant1'], abs=sar_epoch['quant1_err'])
assert pa_sar == pytest.approx(sar_epoch['quant2'], abs=sar_epoch['quant2_err'])
# test scale-and-rotate for orbits run all the way through OFTI
s.run_sampler(100)
# test orbit plot generation
s.results.plot_orbits(start_mjd=s.epochs[0])
samples = s.results.post
sma = samples[:, 0]
ecc = samples[:, 1]
)
s = myDriver.sampler
samples = s.prepare_samples(10)
if myDriver.system.fit_secondary_mass:
sma, ecc, inc, argp, lan, tau, plx, gamma, sigma, m1, m0 = [samp for samp in samples]
ra, dec, vc = orbitize.kepler.calc_orbit(
s.epochs, sma, ecc, inc, argp, lan, tau, plx, mtot=m1 + m0,
mass_for_Kamp=m0)
v_star = vc*-(m1/m0)
else:
sma, ecc, inc, argp, lan, tau, plx, mtot = [samp for samp in samples]
ra, dec, vc = orbitize.kepler.calc_orbit(s.epochs, sma, ecc, inc, argp, lan, tau, plx, mtot)
sep, pa = orbitize.system.radec2seppa(ra, dec)
sep_sar, pa_sar = np.median(sep[s.epoch_idx]), np.median(pa[s.epoch_idx])
rv_sar = np.median(v_star[s.epoch_rv_idx[1]])
# test to make sure sep and pa scaled to scale-and-rotate epoch
sar_epoch = s.system.data_table[np.where(
s.system.data_table['quant_type'] == 'seppa')][s.epoch_idx]
rv_sar_epoch = s.system.data_table[np.where(
s.system.data_table['quant_type'] == 'rv')][s.epoch_rv_idx[1]]
pdb.set_trace()
# assert sep_sar == pytest.approx(sar_epoch['quant1'], abs=sar_epoch['quant1_err']) # issue here
#assert pa_sar == pytest.approx(sar_epoch['quant2'], abs=sar_epoch['quant2_err'])
#print('SEP/PA assert tests completed')
# test orbit plot generation
s.results.plot_orbits(start_mjd=s.epochs[0])
samples = s.results.post
sma = samples[:, 0]
ecc = samples[:, 1]
inc = samples[:, 2]
argp = samples[:, 3]
lan = samples[:, 4]
tau = samples[:, 5]
plx = samples[:, 6]
mtot = samples[:, 7]
ra, dec, vc = orbitize.kepler.calc_orbit(s.epochs, sma, ecc, inc, argp, lan, tau, plx, mtot)
sep, pa = orbitize.system.radec2seppa(ra, dec)
sep_sar, pa_sar = np.median(sep[s.epoch_idx]), np.median(pa[s.epoch_idx])
# test to make sure sep and pa scaled to scale-and-rotate epoch
assert sep_sar == pytest.approx(sar_epoch['quant1'], abs=sar_epoch['quant1_err'])
assert pa_sar == pytest.approx(sar_epoch['quant2'], abs=sar_epoch['quant2_err'])
def test_ensemble_mcmc_runs(num_threads=1):
"""
Tests the EnsembleMCMC sampler by making sure it even runs
"""
# use the test_csv dir
testdir = os.path.dirname(os.path.abspath(__file__))
input_file = os.path.join(testdir, 'test_val.csv')
data_table = read_input.read_file(input_file)
# Manually set 'object' column of data table
data_table['object'] = 1
# construct the system
orbit = system.System(1, data_table, 1, 0.01)
# construct sampler
n_walkers=100
mcmc = sampler.MCMC(orbit, 0, n_walkers, num_threads=num_threads)
# run it a little (tests 0 burn-in steps)
mcmc.run_sampler(100)
# run it a little more
mcmc.run_sampler(1000, burn_steps=1)
# run it a little more (tests adding to results object)
mcmc.run_sampler(1000, burn_steps=1)
def test_radec2seppa():
ras = np.array([-1, -1, 1, 1])
decs = np.array([-1, 1, -1, 1])
pas_expected = np.array([225., 315., 135., 45.])
pas_expected_180mod = np.array([225., 315., 495., 405.])
seps_expected = np.ones(4)*np.sqrt(2)
sep_nomod, pa_nomod = system.radec2seppa(ras, decs)
sep_180mod, pa_180mod = system.radec2seppa(ras, decs, mod180=True)
assert sep_nomod == pytest.approx(seps_expected, abs=1e-3)
assert sep_180mod == pytest.approx(seps_expected, abs=1e-3)
assert pa_nomod == pytest.approx(pas_expected, abs=1e-3)
assert pa_180mod == pytest.approx(pas_expected_180mod, abs=1e-3)
def test_compute_model():
"""
Test basic functionality of ``System.compute_model()``
"""
input_file = os.path.join(orbitize.DATADIR, 'test_val.csv')
data_table = read_input.read_file(input_file)
data_table['object'] = 1
testSystem_parsing = system.System(
1, data_table, 10., 10.
)
params_arr = np.array([[1., 0.5], [0., 0.], [0., 0.], [0., 0.], [
0., 0.], [245000., 245000.], [10, 10], [10, 10]])
model, jitter = testSystem_parsing.compute_model(params_arr)
assert model.shape == (4, 2, 2)
params_arr = np.array([1., 0., 0., 0., 0., 245000., 10, 10])
model, jitter = testSystem_parsing.compute_model(params_arr)
assert model.shape == (4, 2)
def test_add_and_clear_results():
num_secondary_bodies=1
testdir = os.path.dirname(os.path.abspath(__file__))
input_file = os.path.join(testdir, 'test_val.csv')
data_table=read_input.read_file(input_file)
system_mass=1.0
plx=10.0
mass_err=0.1
plx_err=1.0
# Initialize System object
test_system = system.System(
num_secondary_bodies, data_table, system_mass,
plx, mass_err=mass_err, plx_err=plx_err
)
# Initialize dummy results.Results object
test_results = results.Results()
# Add one result object
test_system.add_results(test_results)
assert len(test_system.results)==1
# Adds second result object
test_system.add_results(test_results)
assert len(test_system.results)==2
# Clears result objects
test_system.clear_results()
assert len(test_system.results)==0
# Add one more result object
test_system.add_results(test_results)
else:
mtot = samples[-1,:]
m1 = None
period_prescale = np.sqrt(
4*np.pi**2*(sma*u.AU)**3/(consts.G*(mtot*u.Msun))
)
period_prescale = period_prescale.to(u.day).value
meananno = self.epochs[self.epoch_idx]/period_prescale - tau
# compute sep/PA of generated orbits
ra, dec, vc = orbitize.kepler.calc_orbit(
self.epochs[self.epoch_idx], sma, ecc, inc, argp, lan, tau, plx, mtot,
mass_for_Kamp=m1
)
sep, pa = orbitize.system.radec2seppa(ra, dec) # sep[mas], PA[deg]
# generate Gaussian offsets from observational uncertainties
sep_offset = np.random.normal(
0, self.sep_err[self.epoch_idx], size=num_samples
)
pa_offset = np.random.normal(
0, self.pa_err[self.epoch_idx], size=num_samples
)
# calculate correction factors
sma_corr = (sep_offset + self.sep_observed[self.epoch_idx])/sep
lan_corr = (pa_offset + self.pa_observed[self.epoch_idx] - pa)
# perform scale-and-rotate
sma *= sma_corr # [AU]
lan += np.radians(lan_corr) # [rad]
vz_star[i, :] = vzoff0*-(m1[orb_ind]/m0[orb_ind]) + gamma[orb_ind]
else:
raoff0, deoff0, _ = kepler.calc_orbit(
epochs_seppa[i, :], sma[orb_ind], ecc[orb_ind], inc[orb_ind], aop[orb_ind], pan[orb_ind],
tau[orb_ind], plx[orb_ind], mtot[orb_ind], tau_ref_epoch=self.tau_ref_epoch,
)
raoff[i, :] = raoff0
deoff[i, :] = deoff0
yr_epochs = Time(epochs_seppa[i, :], format='mjd').decimalyear
plot_epochs = np.where(yr_epochs <= sep_pa_end_year)[0]
yr_epochs = yr_epochs[plot_epochs]
seps, pas = orbitize.system.radec2seppa(raoff[i, :], deoff[i, :], mod180=mod180)
plt.sca(ax1)
plt.plot(yr_epochs, seps, color=sep_pa_color)
plt.sca(ax2)
plt.plot(yr_epochs, pas, color=sep_pa_color)
if rv_time_series:
plt.sca(ax3)
plt.plot(yr_epochs, vz_star[i, :], color=sep_pa_color)
ax3.locator_params(axis='x', nbins=6)
ax3.locator_params(axis='y', nbins=6)
plt.tight_layout()
ax1.locator_params(axis='x', nbins=6)
ax1.locator_params(axis='y', nbins=6)