Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
'kwargs': {'limit': 500},
'use_lut': False,
'grids_compatible': True},
'masking2': {
'class': mds2,
'columns': ['x'],
'args': [],
'kwargs': {'limit': 750},
'use_lut': False,
'grids_compatible': True}
}
process = Validation(
datasets, 'DS1',
temporal_matcher=temporal_matchers.BasicTemporalMatching(
window=1 / 24.0).combinatory_matcher,
scaling='lin_cdf_match',
metrics_calculators={
(3, 2): metrics_calculators.BasicMetrics(other_name='k1').calc_metrics},
masking_datasets=mds)
gpi_info = (1, 1, 1)
ref_df = datasets['DS1']['class'].read(1)
with warnings.catch_warnings():
warnings.simplefilter('ignore', category=DeprecationWarning) # read_ts is hard coded when using mask_data
new_ref_df = process.mask_dataset(ref_df, gpi_info)
assert len(new_ref_df) == 250
nptest.assert_allclose(new_ref_df.x.values, np.arange(750, 1000))
jobs = process.get_processing_jobs()
for job in jobs:
'lon': np.array([4.]),
'p_tau': np.array([np.nan], dtype=np.float32),
'BIAS': np.array([0.], dtype=np.float32),
'p_rho': np.array([0.], dtype=np.float32),
'rho': np.array([1.], dtype=np.float32),
'lat': np.array([4.]),
'R': np.array([1.], dtype=np.float32),
'p_R': np.array([0.], dtype=np.float32)}}
datasets = setup_TestDatasets()
dm = DataManager(datasets, 'DS1', read_ts_names={d: 'read' for d in ['DS1', 'DS2', 'DS3']})
process = Validation(
dm, 'DS1',
temporal_matcher=temporal_matchers.BasicTemporalMatching(
window=1 / 24.0).combinatory_matcher,
scaling='lin_cdf_match',
metrics_calculators={
(2, 2): metrics_calculators.BasicMetrics(other_name='k1').calc_metrics})
jobs = process.get_processing_jobs()
for job in jobs:
results = process.calc(*job)
assert sorted(list(results)) == sorted(list(tst_results))
'RMSD': np.array([0.], dtype=np.float32),
'lon': np.array([4.]),
'p_tau': np.array([np.nan], dtype=np.float32),
'BIAS': np.array([0.], dtype=np.float32),
'p_rho': np.array([0.], dtype=np.float32),
'rho': np.array([1.], dtype=np.float32),
'lat': np.array([4.]),
'R': np.array([1.], dtype=np.float32),
'p_R': np.array([0.], dtype=np.float32)}}
datasets = setup_TestDatasets()
dm = DataManager(datasets, 'DS1', read_ts_names={d: 'read' for d in ['DS1', 'DS2', 'DS3']})
process = Validation(
dm, 'DS1',
temporal_matcher=temporal_matchers.BasicTemporalMatching(
window=1 / 24.0).combinatory_matcher,
scaling='lin_cdf_match',
metrics_calculators={
(3, 2): metrics_calculators.BasicMetrics(other_name='k1').calc_metrics})
jobs = process.get_processing_jobs()
for job in jobs:
results = process.calc(*job)
assert sorted(list(results)) == sorted(list(tst_results))
def test_validation_n2_k2_temporal_matching_no_matches():
tst_results = {}
datasets = setup_two_without_overlap()
dm = DataManager(datasets, 'DS1', read_ts_names={d: 'read' for d in ['DS1', 'DS2', 'DS3']})
process = Validation(
dm, 'DS1',
temporal_matcher=temporal_matchers.BasicTemporalMatching(
window=1 / 24.0).combinatory_matcher,
scaling='lin_cdf_match',
metrics_calculators={
(2, 2): metrics_calculators.BasicMetrics(other_name='k1').calc_metrics})
jobs = process.get_processing_jobs()
for job in jobs:
results = process.calc(*job)
assert sorted(list(results)) == sorted(list(tst_results))
'gpi': np.array([4], dtype=np.int32),
'RMSD': np.array([0.], dtype=np.float32),
'lon': np.array([4.]),
'p_tau': np.array([np.nan], dtype=np.float32),
'BIAS': np.array([0.], dtype=np.float32),
'p_rho': np.array([0.], dtype=np.float32),
'rho': np.array([1.], dtype=np.float32),
'lat': np.array([4.]),
'R': np.array([1.], dtype=np.float32),
'p_R': np.array([0.], dtype=np.float32)}}
datasets = setup_TestDatasets()
dm = DataManager(datasets, 'DS1', read_ts_names={d: 'read' for d in ['DS1', 'DS2', 'DS3']})
process = Validation(dm, 'DS1',
temporal_matcher=temporal_matchers.BasicTemporalMatching(
window=1 / 24.0).combinatory_matcher,
scaling='lin_cdf_match',
metrics_calculators={
(2, 2): metrics_calculators.BasicMetrics(other_name='k1').calc_metrics})
jobs = process.get_processing_jobs()
for job in jobs:
results = process.calc(*job)
assert sorted(list(results)) == sorted(list(tst_results))
'RMSD': np.array([0.], dtype=np.float32),
'lon': np.array([4.]),
'p_tau': np.array([np.nan], dtype=np.float32),
'BIAS': np.array([0.], dtype=np.float32),
'p_rho': np.array([0.], dtype=np.float32),
'rho': np.array([1.], dtype=np.float32),
'lat': np.array([4.]),
'R': np.array([1.], dtype=np.float32),
'p_R': np.array([0.], dtype=np.float32)}}
datasets = setup_three_with_two_overlapping()
dm = DataManager(datasets, 'DS1', read_ts_names={d: 'read' for d in ['DS1', 'DS2', 'DS3']})
process = Validation(
dm, 'DS1',
temporal_matcher=temporal_matchers.BasicTemporalMatching(
window=1 / 24.0).combinatory_matcher,
scaling='lin_cdf_match',
metrics_calculators={
(2, 2): metrics_calculators.BasicMetrics(other_name='k1').calc_metrics})
jobs = process.get_processing_jobs()
for job in jobs:
results = process.calc(*job)
assert sorted(list(results)) == sorted(list(tst_results))
'args': [],
'kwargs': {'limit': 500},
'use_lut': False,
'grids_compatible': True},
'masking2': {
'class': mds2,
'columns': ['x'],
'args': [],
'kwargs': {'limit': 1000},
'use_lut': False,
'grids_compatible': True}
}
process = Validation(
datasets, 'DS1',
temporal_matcher=temporal_matchers.BasicTemporalMatching(
window=1 / 24.0).combinatory_matcher,
scaling='lin_cdf_match',
metrics_calculators={
(3, 2): metrics_calculators.BasicMetrics(other_name='k1').calc_metrics},
masking_datasets=mds)
gpi_info = (1, 1, 1)
ref_df = datasets['DS1']['class'].read(1)
with warnings.catch_warnings():
warnings.filterwarnings('ignore', category=DeprecationWarning)
new_ref_df = process.mask_dataset(ref_df, gpi_info)
assert len(new_ref_df) == 0
nptest.assert_allclose(new_ref_df.x.values, np.arange(1000, 1000))
jobs = process.get_processing_jobs()
for job in jobs:
with warnings.catch_warnings():
def __init__(self, datasets, spatial_ref, metrics_calculators,
temporal_matcher=None, temporal_window=1 / 24.0,
temporal_ref=None,
masking_datasets=None,
period=None,
scaling='lin_cdf_match', scaling_ref=None):
if type(datasets) is DataManager:
self.data_manager = datasets
else:
self.data_manager = DataManager(datasets, spatial_ref, period)
self.temp_matching = temporal_matcher
if self.temp_matching is None:
self.temp_matching = temporal_matchers.BasicTemporalMatching(
window=temporal_window).combinatory_matcher
self.temporal_ref = temporal_ref
if self.temporal_ref is None:
self.temporal_ref = self.data_manager.reference_name
self.metrics_c = metrics_calculators
self.masking_dm = None
if masking_datasets is not None:
# add temporal reference dataset to the masking datasets since it
# is necessary for temporally matching the masking datasets to the
# common time stamps. Use _reference here to make a clash with the
# names of the masking datasets unlikely
masking_datasets.update(
{'_reference': datasets[self.temporal_ref]})