Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
dates = list()
val_keys = ['f107', 'ssn', 'ss_area', 'new_reg', 'smf', 'goes_bgd_flux',
'c_flare', 'm_flare', 'x_flare', 'o1_flare', 'o2_flare',
'o3_flare']
optical_keys = ['o1_flare', 'o2_flare', 'o3_flare']
xray_keys = ['c_flare', 'm_flare', 'x_flare']
values = {kk: list() for kk in val_keys}
# Cycle through each line in this file
for line in data_lines:
# Split the line on whitespace
split_line = line.split()
# Format the date
dfmt = "%Y %m %d" if year > 1996 else "%d %b %y"
dates.append(pysat.datetime.strptime(" ".join(split_line[0:3]), dfmt))
# Format the data values
j = 0
for i, kk in enumerate(val_keys):
if year == 1994 and kk == 'new_reg':
# New regions only in files after 1994
val = -999
elif((year == 1994 and kk in xray_keys) or
(not optical and kk in optical_keys)):
# X-ray flares in files after 1994, optical flares come later
val = -1
else:
val = split_line[j + 3]
j += 1
if kk != 'goes_bgd_flux':
data = pds.DataFrame([val1, val2, val3], index=times, columns=['f107'])
# write out as a file
data.to_csv(os.path.join(data_path, 'f107_forecast_' +
date.strftime('%Y-%m-%d') + '.txt'),
header=True)
elif tag == '45day':
import requests
print('This routine can only download the current forecast, not ' +
'archived forecasts')
# download webpage
furl = 'https://services.swpc.noaa.gov/text/45-day-ap-forecast.txt'
r = requests.get(furl)
# parse text to get the date the prediction was generated
date_str = r.text.split(':Issued: ')[-1].split(' UTC')[0]
date = pysat.datetime.strptime(date_str, '%Y %b %d %H%M')
# get to the forecast data
raw_data = r.text.split('45-DAY AP FORECAST')[-1]
# grab AP part
raw_ap = raw_data.split('45-DAY F10.7 CM FLUX FORECAST')[0]
# clean up
raw_ap = raw_ap.split('\n')[1:-1]
# f107
raw_f107 = raw_data.split('45-DAY F10.7 CM FLUX FORECAST')[-1]
# clean up
raw_f107 = raw_f107.split('\n')[1:-4]
# parse the AP data
ap_times, ap = parse_45day_block(raw_ap)
# parse the F10.7 data
f107_times, f107 = parse_45day_block(raw_f107)
lsplit[isd], lsplit[ist],
lsplit[iex]])
if fline.find("Selected parameters:") >= 0:
pflag = True
if fline.count("=") == line_len - 1 and line_len > 2:
hflag = False
else:
# Load the desired data
lsplit = [ll for ll in re.split(r'[\t\n]+', fline)
if len(ll) > 0]
if dflag:
dflag = False # Unset the date flag
dstring = " ".join(lsplit[:6])
dtime = pysat.datetime.strptime(dstring,
"%Y %m %d %H %M %S")
snum = int(lsplit[6]) # Set the number of stations
# Load the times
if tag == "indices":
date_list.append(dtime)
else:
date_list.extend([dtime for i in range(snum)])
elif len(lsplit) == ndata['indices']:
if tag is not '':
if lsplit[0] not in ddict.keys():
ddict[lsplit[0]] = list()
if tag == "indices":
ddict[lsplit[0]].append(int(lsplit[1]))
else:
dstr = 'http://lasp.colorado.edu/lisird/latis/'
dstr += 'noaa_radio_flux.json?time%3E='
dstr += date.strftime('%Y-%m-%d')
dstr += 'T00:00:00.000Z&time%3C='
dstr += (date + pds.DateOffset(months=1) -
pds.DateOffset(days=1)).strftime('%Y-%m-%d')
dstr += 'T00:00:00.000Z'
# data returned as json
r = requests.get(dstr)
# process
raw_dict = json.loads(r.text)['noaa_radio_flux']
data = pds.DataFrame.from_dict(raw_dict['samples'])
if data.empty:
warnings.warn("no data for {:}".format(date), UserWarning)
else:
times = [pysat.datetime.strptime(time, '%Y %m %d')
for time in data.pop('time')]
data.index = times
# replace fill with NaNs
idx, = np.where(data['f107'] == -99999.0)
data.iloc[idx, :] = np.nan
# create file
data.to_csv(os.path.join(data_path, 'f107_monthly_' +
date.strftime('%Y-%m') + '.txt'),
header=True)
elif tag == 'all':
# download from LASP, by year
import requests
import json
# download webpage
day[idx:idx + 24] = int(line[8:10])
ut[idx:idx + 24] = np.arange(24)
temp = line.strip()[20:-4]
temp2 = [temp[4 * i:4 * (i + 1)] for i in np.arange(24)]
dst[idx:idx + 24] = temp2
idx += 24
# f.close()
start = pds.datetime(yr[0], mo[0], day[0], ut[0])
stop = pds.datetime(yr[-1], mo[-1], day[-1], ut[-1])
dates = pds.date_range(start, stop, freq='H')
new_data = pds.DataFrame(dst, index=dates, columns=['dst'])
# pull out specific day
new_date = pysat.datetime.strptime(filename[-10:], '%Y-%m-%d')
idx, = np.where((new_data.index >= new_date) &
(new_data.index < new_date+pds.DateOffset(days=1)))
new_data = new_data.iloc[idx, :]
# add specific day to all data loaded for filenames
data = pds.concat([data, new_data], sort=True, axis=0)
return data, pysat.Meta()
meta = pysat.Meta()
if tag == '':
# Kp data stored monthly, need to return data daily
# the daily date is attached to filename
# parse off the last date, load month of data, downselect to desired
# day
data = pds.DataFrame()
# set up fixed width format for these files
colspec = [(0, 2), (2, 4), (4, 6), (7, 10), (10, 13), (13, 16),
(16, 19), (19, 23), (23, 26), (26, 29), (29, 32), (32, 50)]
for filename in fnames:
# the daily date is attached to filename
# parse off the last date, load month of data, downselect to the
# desired day
fname = filename[0:-11]
date = pysat.datetime.strptime(filename[-10:], '%Y-%m-%d')
temp = pds.read_fwf(fname, colspecs=colspec, skipfooter=4,
header=None, parse_dates=[[0, 1, 2]],
date_parser=parse_date, index_col='0_1_2')
idx, = np.where((temp.index >= date) &
(temp.index < date + pds.DateOffset(days=1)))
temp = temp.iloc[idx, :]
data = pds.concat([data, temp], axis=0)
# drop last column as it has data I don't care about
data = data.iloc[:, 0:-1]
# each column increments UT by three hours
# produce a single data series that has Kp value monotonically
# increasing in time with appropriate datetime indices
s = pds.Series()
def parse_smag_date(dd):
return pysat.datetime.strptime(dd, "%Y-%m-%d %H:%M:%S")
import pysatCDF
if len(fnames) <= 0:
return pysat.DataFrame(None), None
else:
# going to use pysatCDF to load the CDF and format
# data and metadata for pysat using some assumptions.
# Depending upon your needs the resulting pandas DataFrame may
# need modification
# currently only loads one file, which handles more situations via
# pysat than you may initially think
if fake_daily_files_from_monthly:
# parse out date from filename
fname = fnames[0][0:-11]
date = pysat.datetime.strptime(fnames[0][-10:], '%Y-%m-%d')
with pysatCDF.CDF(fname) as cdf:
# convert data to pysat format
data, meta = cdf.to_pysat(flatten_twod=flatten_twod)
# select data from monthly
data = data.loc[date:date+pds.DateOffset(days=1)
- pds.DateOffset(microseconds=1), :]
return data, meta
else:
# basic data return
with pysatCDF.CDF(fnames[0]) as cdf:
return cdf.to_pysat(flatten_twod=flatten_twod)