Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
'class': ds2,
'columns': ['sm'],
'args': [],
'kwargs': {},
'grids_compatible': True
},
'DS3': {
'class': ds3,
'columns': ['sm', 'sm2'],
'args': [],
'kwargs': {},
'grids_compatible': True
}
}
dm = DataManager(datasets, 'DS1')
return dm
'ISMN': {
'class': ismn_reader,
'columns': ['soil moisture'],
},
'ASCAT': {
'class': ascat_reader,
'columns': ['sm'],
'kwargs': {'mask_frozen_prob': 80,
'mask_snow_prob': 80,
'mask_ssf': True},
}}
read_ts_names = {'ASCAT': 'read', 'ISMN': 'read_ts'}
period = [datetime(2007, 1, 1), datetime(2014, 12, 31)]
datasets = DataManager(datasets, 'ISMN', period, read_ts_names=read_ts_names)
process = Validation(
datasets, 'ISMN',
temporal_ref='ASCAT',
scaling='lin_cdf_match',
scaling_ref='ASCAT',
metrics_calculators={
(2, 2): metrics_calculators.BasicMetrics(other_name='k1', metadata_template=metadata_dict_template).calc_metrics},
period=period)
for job in jobs:
results = process.calc(*job)
netcdf_results_manager(results, save_path)
results_fname = os.path.join(
save_path, 'ASCAT.sm_with_ISMN.soil moisture.nc')
(('DS1', 'x'), ('DS3', 'x')): {
'n_obs': np.array([1000], dtype=np.int32),
'tau': np.array([np.nan], dtype=np.float32),
'gpi': np.array([4], dtype=np.int32),
'RMSD': np.array([0.], dtype=np.float32),
'lon': np.array([4.]),
'p_tau': np.array([np.nan], dtype=np.float32),
'BIAS': np.array([0.], dtype=np.float32),
'p_rho': np.array([0.], dtype=np.float32),
'rho': np.array([1.], dtype=np.float32),
'lat': np.array([4.]),
'R': np.array([1.], dtype=np.float32),
'p_R': np.array([0.], dtype=np.float32)}}
datasets = setup_TestDatasets()
dm = DataManager(datasets, 'DS1', read_ts_names={d: 'read' for d in ['DS1', 'DS2', 'DS3']})
process = Validation(
dm, 'DS1',
temporal_matcher=temporal_matchers.BasicTemporalMatching(
window=1 / 24.0).combinatory_matcher,
scaling='lin_cdf_match',
metrics_calculators={
(3, 2): metrics_calculators.BasicMetrics(other_name='k1').calc_metrics})
jobs = process.get_processing_jobs()
for job in jobs:
results = process.calc(*job)
assert sorted(list(results)) == sorted(list(tst_results))
'ISMN': {
'class': ismn_reader,
'columns': ['soil moisture']
},
'ASCAT': {
'class': ascat_reader,
'columns': ['sm'],
'kwargs': {'mask_frozen_prob': 80,
'mask_snow_prob': 80,
'mask_ssf': True}
}}
read_ts_names = {'ASCAT': 'read', 'ISMN': 'read_ts'}
period = [datetime(2007, 1, 1), datetime(2014, 12, 31)]
datasets = DataManager(datasets, 'ISMN', period, read_ts_names=read_ts_names)
process = Validation(
datasets, 'ISMN',
temporal_ref='ASCAT',
scaling='lin_cdf_match',
scaling_ref='ASCAT',
metrics_calculators={
(2, 2): metrics_calculators.BasicMetrics(other_name='k1').calc_metrics},
period=period)
for job in jobs:
results = process.calc(*job)
netcdf_results_manager(results, save_path)
results_fname = os.path.join(
save_path, 'ASCAT.sm_with_ISMN.soil moisture.nc')
def test_validation_n2_k2_temporal_matching_no_matches():
tst_results = {}
datasets = setup_two_without_overlap()
dm = DataManager(datasets, 'DS1', read_ts_names={d: 'read' for d in ['DS1', 'DS2', 'DS3']})
process = Validation(
dm, 'DS1',
temporal_matcher=temporal_matchers.BasicTemporalMatching(
window=1 / 24.0).combinatory_matcher,
scaling='lin_cdf_match',
metrics_calculators={
(2, 2): metrics_calculators.BasicMetrics(other_name='k1').calc_metrics})
jobs = process.get_processing_jobs()
for job in jobs:
results = process.calc(*job)
assert sorted(list(results)) == sorted(list(tst_results))
'ISMN': {
'class': ismn_reader,
'columns': ['soil moisture']
},
'ASCAT': {
'class': ascat_reader,
'columns': ['sm'],
'kwargs': {'mask_frozen_prob': 80,
'mask_snow_prob': 80,
'mask_ssf': True}
}}
read_ts_names = {'ASCAT': 'read', 'ISMN': 'read_ts'}
period = [datetime(2007, 1, 1), datetime(2014, 12, 31)]
datasets = DataManager(datasets, 'ISMN', period, read_ts_names=read_ts_names)
process = Validation(
datasets, 'ISMN',
temporal_ref='ASCAT',
scaling='lin_cdf_match',
scaling_ref='ASCAT',
metrics_calculators={
(2, 2): metrics_calculators.RollingMetrics(other_name='k1',
metadata_template=metadata_dict_template).calc_metrics},
period=period)
for job in jobs:
results = process.calc(*job)
netcdf_results_manager(results, save_path, ts_vars=[
'R', 'p_R', 'RMSD'])
def test_DataManager_get_data():
datasets = setup_TestDatasets()
dm = DataManager(datasets, 'DS1', read_ts_names={f'DS{i}': 'read' for i in range(1,4)})
data = dm.get_data(1, 1, 1)
assert sorted(list(data)) == ['DS1', 'DS2', 'DS3']
def __init__(self, datasets, spatial_ref, metrics_calculators,
temporal_matcher=None, temporal_window=1 / 24.0,
temporal_ref=None,
masking_datasets=None,
period=None,
scaling='lin_cdf_match', scaling_ref=None):
if type(datasets) is DataManager:
self.data_manager = datasets
else:
self.data_manager = DataManager(datasets, spatial_ref, period)
self.temp_matching = temporal_matcher
if self.temp_matching is None:
self.temp_matching = temporal_matchers.BasicTemporalMatching(
window=temporal_window).combinatory_matcher
self.temporal_ref = temporal_ref
if self.temporal_ref is None:
self.temporal_ref = self.data_manager.reference_name
self.metrics_c = metrics_calculators
self.masking_dm = None
if masking_datasets is not None:
# add temporal reference dataset to the masking datasets since it
# is necessary for temporally matching the masking datasets to the