Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
(('DS1', 'x'), ('DS3', 'x')): {
'n_obs': np.array([1000], dtype=np.int32),
'tau': np.array([np.nan], dtype=np.float32),
'gpi': np.array([4], dtype=np.int32),
'RMSD': np.array([0.], dtype=np.float32),
'lon': np.array([4.]),
'p_tau': np.array([np.nan], dtype=np.float32),
'BIAS': np.array([0.], dtype=np.float32),
'p_rho': np.array([0.], dtype=np.float32),
'rho': np.array([1.], dtype=np.float32),
'lat': np.array([4.]),
'R': np.array([1.], dtype=np.float32),
'p_R': np.array([0.], dtype=np.float32)}}
tempdir = tempfile.mkdtemp()
netcdf_results_manager(tst_results, tempdir)
assert sorted(os.listdir(tempdir)) == sorted(['DS1.x_with_DS3.x.nc',
'DS1.x_with_DS3.y.nc',
'DS1.x_with_DS2.y.nc'])
# check a few variable in the file
with netCDF4.Dataset(os.path.join(tempdir, 'DS1.x_with_DS3.x.nc')) as ds:
assert ds.variables['lon'][:] == np.array([4])
assert ds.variables['n_obs'][:] == np.array([1000])
read_ts_names = {'ASCAT': 'read', 'ISMN': 'read_ts'}
period = [datetime(2007, 1, 1), datetime(2014, 12, 31)]
datasets = DataManager(datasets, 'ISMN', period, read_ts_names=read_ts_names)
process = Validation(
datasets, 'ISMN',
temporal_ref='ASCAT',
scaling='lin_cdf_match',
scaling_ref='ASCAT',
metrics_calculators={
(2, 2): metrics_calculators.BasicMetrics(other_name='k1', metadata_template=metadata_dict_template).calc_metrics},
period=period)
for job in jobs:
results = process.calc(*job)
netcdf_results_manager(results, save_path)
results_fname = os.path.join(
save_path, 'ASCAT.sm_with_ISMN.soil moisture.nc')
vars_should = [u'n_obs', u'tau', u'gpi', u'RMSD', u'lon', u'p_tau',
u'BIAS', u'p_rho', u'rho', u'lat', u'R', u'p_R', u'time',
u'idx', u'_row_size']
for key, value in metadata_dict_template.items():
vars_should.append(key)
n_obs_should = [384, 357, 482, 141, 251, 1927, 1887, 1652]
rho_should = np.array([0.70022893, 0.53934574,
0.69356072, 0.84189808,
0.74206454, 0.30299741,
0.53143877, 0.62204134], dtype=np.float32)
(('DS1', 'x'), ('DS2', 'y'), ('DS3', 'y')): {
'n_obs': np.array([1000], dtype=np.int32),
'tau': np.array([np.nan], dtype=np.float32),
'gpi': np.array([4], dtype=np.int32),
'RMSD': np.array([0.], dtype=np.float32),
'lon': np.array([4.]),
'p_tau': np.array([np.nan], dtype=np.float32),
'BIAS': np.array([0.], dtype=np.float32),
'p_rho': np.array([0.], dtype=np.float32),
'rho': np.array([1.], dtype=np.float32),
'lat': np.array([4.]),
'R': np.array([1.], dtype=np.float32),
'p_R': np.array([0.], dtype=np.float32)}}
tempdir = tempfile.mkdtemp()
netcdf_results_manager(tst_results, tempdir)
assert sorted(os.listdir(tempdir)) == sorted(['DS1.x_with_DS2.y_with_DS3.x.nc',
'DS1.x_with_DS2.y_with_DS3.y.nc'])
# check a few variable in the file
with netCDF4.Dataset(os.path.join(tempdir, 'DS1.x_with_DS2.y_with_DS3.x.nc')) as ds:
assert ds.variables['lon'][:] == np.array([4])
assert ds.variables['n_obs'][:] == np.array([1000])
datasets = DataManager(datasets, 'ISMN', period, read_ts_names=read_ts_names)
process = Validation(
datasets, 'ISMN',
temporal_ref='ASCAT',
scaling='lin_cdf_match',
scaling_ref='ASCAT',
metrics_calculators={
(2, 2): metrics_calculators.RollingMetrics(other_name='k1',
metadata_template=metadata_dict_template).calc_metrics},
period=period)
for job in jobs:
results = process.calc(*job)
netcdf_results_manager(results, save_path, ts_vars=[
'R', 'p_R', 'RMSD'])
results_fname = os.path.join(
save_path, 'ASCAT.sm_with_ISMN.soil moisture.nc')
vars_should = [u'gpi', u'lon', u'lat', u'R', u'p_R', u'time',
u'idx', u'_row_size']
for key, value in metadata_dict_template.items():
vars_should.append(key)
network_should = np.array(['MAQU', 'MAQU', 'SCAN', 'SCAN', 'SCAN',
'SOILSCAPE', 'SOILSCAPE', 'SOILSCAPE'], dtype='U256')
reader = PointDataResults(results_fname, read_only=True)
df = reader.read_loc(None)
period = [datetime(2007, 1, 1), datetime(2014, 12, 31)]
datasets = DataManager(datasets, 'ISMN', period, read_ts_names=read_ts_names)
process = Validation(
datasets, 'ISMN',
temporal_ref='ASCAT',
scaling='lin_cdf_match',
scaling_ref='ASCAT',
metrics_calculators={
(2, 2): metrics_calculators.BasicMetrics(other_name='k1').calc_metrics},
period=period)
for job in jobs:
results = process.calc(*job)
netcdf_results_manager(results, save_path)
results_fname = os.path.join(
save_path, 'ASCAT.sm_with_ISMN.soil moisture.nc')
vars_should = [u'n_obs', u'tau', u'gpi', u'RMSD', u'lon', u'p_tau',
u'BIAS', u'p_rho', u'rho', u'lat', u'R', u'p_R', u'time',
u'idx', u'_row_size']
n_obs_should = [384, 357, 482, 141, 251, 1927, 1887, 1652]
rho_should = np.array([0.70022893, 0.53934574,
0.69356072, 0.84189808,
0.74206454, 0.30299741,
0.53143877, 0.62204134], dtype=np.float32)
rmsd_should = np.array([7.72966719, 11.58347607,
14.57700157, 13.06224251,
12.90389824, 14.24668026,
np.array([1,2,3]),
np.array([1,2,3,4,5])]),
'lvar': np.array([99,100]),
'lon': np.array([1.,2.]),
'lat': np.array([1.,2.])},
(('a2', 'x2'), ('b2', 'y2')):
{'time': np.array([
pd.date_range('2003-01-01', '2003-01-02', freq='D')]),
'tvar': np.array([
np.array([1, 2])]),
'lvar': np.array([99]),
'lon': np.array([1.]),
'lat': np.array([1.])}}
tempdir = tempfile.mkdtemp()
netcdf_results_manager(results=results, save_path=tempdir, ts_vars=['tvar'],
attr={'tvar':{'long_name': 'Time var'},
'lvar': {'long_name': 'Loc var'}})
ds= PointDataResults(os.path.join(tempdir, 'a1.x1_with_b1.y1.nc'), read_only=True)
ts = ds.read_ts(0)
assert ts.loc['2000-01-02', 'tvar'] == 2
df = ds.read_loc(None)
assert np.all(df.loc[0, :] == ds.read_loc(0))
assert df.loc[1, 'lvar'] == 100
except parallel.CompositeError:
print("Variable 'jobs' is not defined!")
save_path = None
try:
save_path = dv['save_path'][0]
except parallel.CompositeError:
print("Variable 'save_path' is not defined!")
to_write = len(jobs)
if (jobs is not None) and (save_path is not None):
with lview.temp_flags(retries=2):
amr = lview.map_async(func, jobs)
results = zip(amr, jobs)
for result, job in results:
netcdf_results_manager(result, save_path)
to_write -= 1
print('job = ' + str(job), 'remaining jobs = ' + str(to_write))
c[:].clear()
# { (3 ,2): metric_calc,
# (3, 3): triple_collocation}
# ```
#
# Create the variable ***save_path*** which is a string representing the path where the results will be saved.
# **DO NOT CHANGE** the name ***save_path*** because it will be searched during the parallel processing!
# In[9]:
save_path = output_folder
import pprint
for job in jobs:
results = process.calc(*job)
pprint.pprint(results)
netcdf_results_manager(results, save_path)
# The validation is then performed by looping over all the defined jobs and storing the results.
# You can see that the results are a dictionary where the key is a tuple defining the exact combination of datasets
# and columns that were used for the calculation of the metrics. The metrics itself are a dictionary of `metric-name:
# numpy.ndarray` which also include information about the gpi, lon and lat. Since all the information contained in
# the job is given to the metric calculator they can be stored in the results.
#
# Storing of the results to disk is at the moment supported by the `netcdf_results_manager` which creates a netCDF
# file for each dataset combination and stores each metric as a variable. We can inspect the stored netCDF file which
# is named after the dictionary key:
# In[10]:
import netCDF4
# Create the variable ***save_path*** which is a string representing the path where the results will be saved.
# **DO NOT CHANGE** the name ***save_path*** because it will be searched during the parallel processing!
# In[10]:
#save_path = tempfile.mkdtemp()
save_path = output_folder
# In[22]:
import pprint
for job in jobs:
results = process.calc(*job)
pprint.pprint(results)
netcdf_results_manager(results, save_path)
# The validation is then performed by looping over all the defined jobs and storing the results.
# You can see that the results are a dictionary where the key is a tuple defining the exact combination of datasets
# and columns that were used for the calculation of the metrics. The metrics itself are a dictionary of `metric-name:
# numpy.ndarray` which also include information about the gpi, lon and lat. Since all the information contained in
# the job is given to the metric calculator they can be stored in the results.
#
# Storing of the results to disk is at the moment supported by the `netcdf_results_manager` which creates a netCDF
# file for each dataset combination and stores each metric as a variable. We can inspect the stored netCDF file which
# is named after the dictionary key:
# In[23]:
import netCDF4
results_fname = os.path.join(save_path, 'ASCAT.sm_with_ISMN.soil moisture.nc')