Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""
import pandas as pds
import numpy as np
import pysat
# pysat required parameters
platform = 'pysat'
name = 'testing'
# dictionary of data 'tags' and corresponding description
tags = {'': 'Regular testing data set'}
# dictionary of satellite IDs, list of corresponding tags
sat_ids = {'': ['']}
test_dates = {'': {'': pysat.datetime(2009, 1, 1)}}
meta = pysat.Meta()
meta['uts'] = {'units': 's', 'long_name': 'Universal Time', 'custom': False}
meta['mlt'] = {'units': 'hours', 'long_name': 'Magnetic Local Time'}
meta['slt'] = {'units': 'hours', 'long_name': 'Solar Local Time'}
def init(self):
self.new_thing = True
def load(fnames, tag=None, sat_id=None, sim_multi_file_right=False,
sim_multi_file_left=False, root_date=None):
# create an artifical satellite data set
parts = fnames[0].split('/')
yr = int('20' + parts[-1][0:2])
month = int(parts[-3])
day = int(parts[-2])
"""
import pandas as pds
import numpy as np
import pysat
# pysat required parameters
platform = 'pysat'
name = 'testing'
# dictionary of data 'tags' and corresponding description
tags = {'': 'Regular testing data set'}
# dictionary of satellite IDs, list of corresponding tags
sat_ids = {'': ['']}
test_dates = {'': {'': pysat.datetime(2009, 1, 1)}}
meta = pysat.Meta()
meta['uts'] = {'units': 's', 'long_name': 'Universal Time', 'custom': False}
meta['mlt'] = {'units': 'hours', 'long_name': 'Magnetic Local Time'}
meta['slt'] = {'units': 'hours', 'long_name': 'Solar Local Time'}
def init(self):
self.new_thing = True
def load(fnames, tag=None, sat_id=None, sim_multi_file_right=False,
sim_multi_file_left=False, root_date=None):
# create an artifical satellite data set
parts = fnames[0].split('/')
yr = int('20' + parts[-1][0:2])
month = int(parts[-3])
day = int(parts[-2])
# pysat required parameters
platform = 'pysat'
name = 'testadd1'
# dictionary of data 'tags' and corresponding description
tags = {'': 'Ascending Integers from 0 testing data set',
'negative': 'Descending Integers from 0 testing data set',
'plus10': 'Ascending Integers from 10 testing data set',
'five': 'All 5s testing data set'}
# dictionary of satellite IDs, list of corresponding tags
sat_ids = {'': ['', 'negative', 'plus10', 'five']}
test_dates = {'': {'': pysat.datetime(2009, 1, 1),
'negative': pysat.datetime(2009, 1, 1),
'plus10': pysat.datetime(2009, 1, 1),
'five': pysat.datetime(2009, 1, 1)}}
meta = pysat.Meta()
meta['uts'] = {'units': 's', 'long_name': 'Universal Time', 'custom': False}
meta['mlt'] = {'units': 'hours', 'long_name': 'Magnetic Local Time'}
meta['slt'] = {'units': 'hours', 'long_name': 'Solar Local Time'}
def init(self):
self.new_thing = True
def load(fnames, tag=None, sat_id=None, sim_multi_file_right=False,
sim_multi_file_left=False, root_date=None):
""" Loads the test files
Parameters
----------
fnames : (list)
# f.close()
start = pds.datetime(yr[0], mo[0], day[0], ut[0])
stop = pds.datetime(yr[-1], mo[-1], day[-1], ut[-1])
dates = pds.date_range(start, stop, freq='H')
new_data = pds.DataFrame(dst, index=dates, columns=['dst'])
# pull out specific day
new_date = pysat.datetime.strptime(filename[-10:], '%Y-%m-%d')
idx, = np.where((new_data.index >= new_date) &
(new_data.index < new_date+pds.DateOffset(days=1)))
new_data = new_data.iloc[idx, :]
# add specific day to all data loaded for filenames
data = pds.concat([data, new_data], sort=True, axis=0)
return data, pysat.Meta()
if num != 0:
# call separate load_files routine, segemented for possible
# multiprocessor load, not included and only benefits about 20%
output = pysat.DataFrame(load_files(cosmicFiles, tag=tag,
sat_id=sat_id,
altitude_bin=altitude_bin))
utsec = output.hour * 3600. + output.minute * 60. + output.second
output.index = \
pysat.utils.time.create_datetime_index(year=output.year,
month=output.month,
day=output.day,
uts=utsec)
# make sure UTS strictly increasing
output.sort_index(inplace=True)
# use the first available file to pick out meta information
profile_meta = pysat.Meta()
meta = pysat.Meta()
ind = 0
repeat = True
while repeat:
try:
data = netcdf_file(cosmicFiles[ind], mode='r', mmap=False)
keys = data.variables.keys()
for key in keys:
profile_meta[key] = {'units': data.variables[key].units,
'long_name':
data.variables[key].long_name}
# ncattrsList = data.ncattrs()
ncattrsList = data._attributes.keys()
for d in ncattrsList:
meta[d] = {'units': '', 'long_name': d}
repeat = False
'(ram direction) of',
'satellite'),
'iv_xOy_Ox_angle': ' '.join('Angle between',
'projection of the ion',
'velocity on the x-y',
'plane and satellite',
'x axis'),
'satellite_potential': 'Satellite potential'}}
if name not in long_inst.keys():
print('Warning, no long-form names available for {:s}'.format(name))
long_inst[name] = {nn: nn for nn in meta_dict['data names']}
# Initialise the meta data
meta = pysat.Meta()
for cc in meta_dict['data names']:
# Determine the long instrument name
if cc in long_inst[name].keys():
ll = long_inst[name][cc]
else:
ll = long_name[cc]
# Assign the data units, long names, acknowledgements, and references
meta[cc] = {'units': meta_dict['data units'][cc], 'long_name': ll}
# Set the remaining metadata
meta_dict['acknowledgements'] = ackn[name]
meta_dict['reference'] = refs[name]
mkeys = list(meta_dict.keys())
mkeys.pop(mkeys.index('data names'))
mkeys.pop(mkeys.index('data units'))
for d in ncattrsList:
meta[d] = {'units': '', 'long_name': d}
keys = data.variables.keys()
for key in keys:
profile_meta[key] = {'units': data.variables[key].units,
'long_name':
data.variables[key].long_name}
repeat = False
except RuntimeError:
# file was empty, try the next one by incrementing ind
ind += 1
meta['profiles'] = profile_meta
return output, meta
else:
# no data
return pysat.DataFrame(None), pysat.Meta()
file_format = file_format.upper()
saved_mdata = None
running_idx = 0
running_store = []
two_d_keys = []
two_d_dims = []
three_d_keys = []
three_d_dims = []
for fname in fnames:
with netCDF4.Dataset(fname, mode='r', format=file_format) as data:
# build up dictionary with all global ncattrs
# and add those attributes to a pysat meta object
ncattrsList = data.ncattrs()
mdata = pysat.Meta(units_label=units_label, name_label=name_label,
notes_label=notes_label, desc_label=desc_label,
plot_label=plot_label, axis_label=axis_label,
scale_label=scale_label,
min_label=min_label, max_label=max_label,
fill_label=fill_label)
for d in ncattrsList:
if hasattr(mdata, d):
mdata.__setattr__(d+'_', data.getncattr(d))
else:
mdata.__setattr__(d, data.getncattr(d))
# loadup all of the variables in the netCDF
loadedVars = {}
for key in data.variables.keys():
# load up metadata. From here group unique
# dimensions and act accordingly, 1D, 2D, 3D
data = pds.read_csv(fnames[0][0:-11], index_col=0, parse_dates=True)
idx, = np.where((data.index >= date) &
(data.index < date + pds.DateOffset(days=1)))
result = data.iloc[idx, :]
elif tag == 'all':
result = pds.read_csv(fnames[0], index_col=0, parse_dates=True)
elif tag == 'daily' or tag == 'prelim':
result = pds.read_csv(fnames[0], index_col=0, parse_dates=True)
elif tag == 'forecast':
# load forecast data
result = pds.read_csv(fnames[0], index_col=0, parse_dates=True)
elif tag == '45day':
# load forecast data
result = pds.read_csv(fnames[0], index_col=0, parse_dates=True)
meta = pysat.Meta()
meta['f107'] = {meta.units_label: 'SFU',
meta.name_label: 'F10.7 cm solar index',
meta.desc_label:
'F10.7 cm radio flux in Solar Flux Units (SFU)'}
if tag == '45day':
meta['ap'] = {meta.name_label: 'Daily Ap index',
meta.desc_label: 'Daily average of 3-h ap indices'}
elif tag == 'daily' or tag == 'prelim':
meta['ssn'] = {meta.name_label: 'Sunspot Number',
meta.desc_label: 'SESC Sunspot Number',
meta.fill_label: -999}
meta['ss_area'] = {meta.name_label: 'Sunspot Area',
meta.desc_label: 'Sunspot Area 10$^6$ Hemisphere',
meta.fill_label: -999}
meta['new_reg'] = {meta.name_label: 'New Regions',