Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
passpath = os.path.expanduser(os.path.join(maindir, "metadata/pass/"))
passmeta = drf.DigitalMetadataReader(passpath)
pdict = passmeta.read_latest()
pdict1 = list(pdict.values())[0]
rtime = (pdict1["rise_time"] - ut0) * e2p
tsave = list(pdict.keys())[0]
Dop_bw = pdict1["doppler_bandwidth"]
t = sp.arange(0, (Dop_bw.shape[0] + 1) * 10, 10.0) + rtime
t = t.astype(float)
obsLoc = ephem.Observer()
obsLoc.lat = sdict1["latitude"]
obsLoc.long = sdict1["longitude"]
satObj = ephem.readtle(idict1["name"], idict1["tle1"][1:-1], idict1["tle2"][1:-1])
tephem = (t - rtime) * ephem.second + pdict1["rise_time"]
sublat = sp.zeros_like(tephem)
sublon = sp.zeros_like(tephem)
for i, itime in enumerate(tephem):
obsLoc.date = itime
satObj.compute(obsLoc)
sublat[i] = sp.rad2deg(satObj.sublat)
sublon[i] = sp.rad2deg(satObj.sublong)
# XXX Extend t vector because for the most part the velocities at the edge
# are not changing much so to avoid having the interpolation extrapolate.
# If extrapolation used then error messages that the time was off
t[-1] = t[-1] + 600
t[-2] = t[-2] + 500
t[0] = t[0] - 240
observer_elevation=0,
tle_file=None,
):
if tle_file is not None:
with open(tle_file) as f:
tle = f.read().strip().split("\n")
elif number is not None:
number = ALIASES.get(number, number)
tle = get(
"http://www.celestrak.com/satcat/tle.php?CATNR={}".format(number)
).text.strip().split("\n")
if tle == ["No TLE found"]:
raise ValueError("Unable to find TLE for {}".format(number))
else:
raise ValueError("No SATCAT number or TLE file provided")
self._satellite = ephem.readtle(*tle)
self.argument_of_periapsis = float(self._satellite._ap.norm)
self.eccentricity = self._satellite._e
self.epoch = self._satellite._epoch.datetime()
self.inclination = float(self._satellite._inc.norm)
self.mean_anomaly_at_epoch = float(self._satellite._M.norm)
self.mean_motion_revs_per_day = self._satellite._n
self.name = tle[0].strip()
self.observer_elevation = observer_elevation
self.observer_latitude = observer_latitude
self.observer_longitude = observer_longitude
self.orbital_period = timedelta(days=1) / self.mean_motion_revs_per_day
self.mean_motion = 2 * pi / self.orbital_period.total_seconds()
self.apoapsis_latitude = degrees(asin(
sin(self.argument_of_periapsis + pi) * sin(self.inclination)
))
self.periapsis_latitude = degrees(asin(
def get_iss_location():
"""Compute the lat/lon directly under the ISS (RBAR) at a moment in time
"""
# Load the TLE from redis
tle = json.loads(redis.get("iss-tle").decode())
iss = ephem.readtle(str(tle[0]), str(tle[1]), str(tle[2]))
# Compute "now" with pyephem
now = datetime.datetime.utcnow()
iss.compute(now)
lon = degrees(iss.sublong)
lat = degrees(iss.sublat)
data = {
"message": "success",
"iss_position": {
"latitude": "%0.4f" % lat,
"longitude": "%0.4f" % lon,
},
"timestamp": timegm(now.timetuple()),
}
redis.set('iss-now', json.dumps(data))
def open_tle(tle_path, centre_datetime):
"""Function to open specified TLE file
"""
try:
fd = open(tle_path, 'r')
tle_text = fd.readlines()
logger.info('TLE file %s opened', tle_path)
log_multiline(logger.debug, tle_text, 'TLE FILE CONTENTS', '\t')
if self.TAG == 'LS5':
tle1, tle2 = tle_text[7:9]
elif self.TAG == 'LS7':
tle1, tle2 = tle_text[1:3]
sat_obj = ephem.readtle(self.NAME, tle1, tle2)
# Cache TLE filename for specified date
self._tle_path_dict[centre_datetime.date()] = tle_path
return sat_obj
finally:
fd.close()
def fromtle(url, body):
if url not in TLES:
f = urlopen(url)
TLES[url] = chunk(f.readlines(), 3)
for tle in TLES[url]:
if tle[0].strip().lower() == body.lower():
return ephem.readtle(*tle)
else:
raise KeyError('%s not found in %s' % (body, url))
# To force decoding the upper frequency uncomment this line
# sat_center_frequency = frequencies[1]
# PyEphem observer
obs = ephem.Observer()
obs.lat, obs.lon = '{}'.format(lat), '{}'.format(lon)
obs.elevation = alt # Technically is the altitude of observer
obs.date = datetime.utcfromtimestamp(timestamp)
# Normalize samples
samples /= np.median(np.abs(samples))
# Get the TLE information from the .mat file
sat_line0, sat_line1, sat_line2 = [str(xx) for xx in data['tles'][0]]
sat = ephem.readtle(sat_line0, sat_line1, sat_line2)
sat.compute(obs)
# Use the TLE info that was in the .mat file to calculate doppler shift
# of the satellite's transmission
relative_vel = sat.range_velocity
doppler = c/(c+relative_vel) * sat_center_frequency - sat_center_frequency
# Mix the samples to baseband (compensating for doppler shift)
# There will be a residual carrier error because of the RLTSDR frequency offset
freq_shift = center_freq - sat_center_frequency - doppler
mixed_down_samples, _ = complex_mix(samples,
freq_shift,
sample_rate)
filter_freq = 10e3
baud_rate = 4800.0
with open(TLE_offline_path, 'w') as f:
for line in tle_file:
f.write(line)
for i in range (0, len(tle_file)):
line = tle_file[i]
if line[:11] == satellite:
print('TLE data: \n')
print(tle_file[i])
print(tle_file[i + 1])
print(tle_file[i + 2])
line_1 = tle_file[i]
line_2 = tle_file[i + 1]
line_3 = tle_file[i + 2]
return ephem.readtle(line_1, line_2, line_3)
plt.plot(f, full_pxx)
plt.title("Periodogram of recording\nRed dots are orbcomm channels")
plt.xlabel("Frequency (Hz)")
plt.ylabel("Magnitude (dB)")
low_point = np.min(full_pxx)
for freq in frequencies:
plt.plot(freq/1e6, low_point, 'ro')
# Plot one of the channels as baseband
plt.subplot(222)
sat_center_frequency = frequencies[0]
# Use the TLE info that was in the .mat file to calculate doppler shift
# of the satellite's transmission
sat_line0, sat_line1, sat_line2 = [str(xx) for xx in data['tles'][0]]
sat = ephem.readtle(sat_line0, sat_line1, sat_line2)
sat.compute(obs)
relative_vel = sat.range_velocity
doppler = c/(c+relative_vel) * sat_center_frequency - sat_center_frequency
freq_shift = center_freq - sat_center_frequency - doppler
filter_freq = 10e3
mixed_down_samples = complex_mix(samples, freq_shift, sample_rate)
filtered_samples = butter_lowpass_filter(mixed_down_samples, filter_freq, sample_rate, order=5)
f, pxx = scisig.welch(filtered_samples, fs=sample_rate, nperseg=nperseg, scaling='density')
f = (np.roll(f, len(f)/2))
pxx = np.roll(pxx, len(pxx)/2)
pxx = 10*np.log10(pxx)
plt.plot(f, pxx)
plt.xlim([-10e3, 10e3])
plt.ylim([np.min(full_pxx)-1, np.max(pxx)+1])
plt.title("Spectrum of one channel at baseband")