Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _create_dataset_ids(self, keys):
from itertools import product
ordered_keys = [k for k in keys.keys() if 'id_key' in keys[k]]
for id_vals in product(*[keys[k]['values'] for k in ordered_keys]):
id_keys = [keys[k]['id_key'] for k in ordered_keys]
msg_info = dict(zip(ordered_keys, id_vals))
ds_info = dict(zip(id_keys, id_vals))
msg_id = DatasetID(**ds_info)
ds_info = msg_id.to_dict()
ds_info.update(msg_info)
ds_info['file_type'] = self.filetype_info['file_type']
self._msg_datasets[msg_id] = ds_info
tmp_scene[ds_id].attrs["area"] = this_grid_definition.to_satpy_area()
if isinstance(v, set):
tmp_scene.attrs["sensor"].update(v["sensor"])
else:
tmp_scene.attrs["sensor"].add(v["sensor"])
# Overwrite the wishlist that will include the above assigned datasets
tmp_scene.wishlist = f.wishlist.copy()
comps, mods = tmp_scene.cpl.load_compositors(tmp_scene.attrs["sensor"])
tmp_scene.dep_tree.compositors = comps
tmp_scene.dep_tree.modifiers = mods
tmp_scene.dep_tree.find_dependencies(tmp_scene.wishlist.copy())
tmp_scene.generate_composites()
tmp_scene.unload()
# Add any new Datasets to our P2G Scene if SatPy created them
for ds in tmp_scene:
ds_id = DatasetID.from_dict(ds.attrs)
if ds_id.name not in gridded_scene:
LOG.debug("Adding Dataset from SatPy Commpositing: %s", ds_id)
gridded_scene[ds_id.name] = dataarray_to_gridded_product(ds, this_grid_definition)
# Remove any Products from P2G Scene that SatPy decided it didn't need anymore
for k, v in list(gridded_scene.items()):
if v['name'] not in tmp_scene:
LOG.debug("Removing Dataset that is no longer used: %s", k)
del gridded_scene[k]
del tmp_scene, v
if isinstance(gridded_scene, Scene):
LOG.debug("Converting satpy Scene to P2G Gridded Scene")
# Convert it to P2G Gridded Scene
gridded_scene = convert_satpy_to_p2g_gridded(f, gridded_scene)
# Writer
new_info["grid_data"] = new_info["grid_data"].replace(v[0], rgb_name)
new_info["product_name"] = rgb_name
data = np.memmap(new_info["grid_data"], dtype=new_info["data_type"],
mode="w+", shape=(3, new_info["grid_definition"]["height"], new_info["grid_definition"]["width"]))
data[0] = r.get_data_array()[:]
data[1] = g.get_data_array()[:]
data[2] = b.get_data_array()[:]
gridded_scene[rgb_name] = new_info
del data, new_info
# Create composites that satpy couldn't complete until after remapping
composite_names = f.missing_datasets
if composite_names:
tmp_scene = Scene()
for k, v in gridded_scene.items():
ds_id = DatasetID.from_dict(v)
dask_arr = da.from_array(v.get_data_array(), chunks=CHUNK_SIZE)
tmp_scene[ds_id] = DataArray(dask_arr, attrs=v)
tmp_scene[ds_id].attrs["area"] = this_grid_definition.to_satpy_area()
if isinstance(v, set):
tmp_scene.attrs["sensor"].update(v["sensor"])
else:
tmp_scene.attrs["sensor"].add(v["sensor"])
# Overwrite the wishlist that will include the above assigned datasets
tmp_scene.wishlist = f.wishlist.copy()
comps, mods = tmp_scene.cpl.load_compositors(tmp_scene.attrs["sensor"])
tmp_scene.dep_tree.compositors = comps
tmp_scene.dep_tree.modifiers = mods
tmp_scene.dep_tree.find_dependencies(tmp_scene.wishlist.copy())
tmp_scene.generate_composites()
tmp_scene.unload()
# Add any new Datasets to our P2G Scene if SatPy created them
def collect_selected_ids(self):
selected_ids = []
for item_idx in range(self.ui.selectIDTable.rowCount()):
id_items = OrderedDict((key, self.ui.selectIDTable.item(item_idx, id_idx))
for id_idx, key in enumerate(self.config['id_components']))
if id_items['name'].checkState():
id_dict = {key: id_item.data(QtCore.Qt.UserRole)
for key, id_item in id_items.items() if id_item is not None}
id_dict['modifiers'] = None
selected_ids.append(DatasetID(**id_dict))
return selected_ids
def _find_compositor(self, dataset_key, calibration=None,
polarization=None, resolution=None):
"""Find the compositor object for the given dataset_key."""
# NOTE: This function can not find a modifier that performs
# one or more modifications if it has modifiers see if we can find
# the unmodified version first
src_node = None
if isinstance(dataset_key, DatasetID) and dataset_key.modifiers:
new_prereq = DatasetID(
*dataset_key[:-1] + (dataset_key.modifiers[:-1],))
src_node, u = self._find_dependencies(new_prereq, calibration, polarization, resolution)
if u:
return None, u
try:
compositor = self.get_compositor(dataset_key)
except KeyError:
raise KeyError("Can't find anything called {}".format(
str(dataset_key)))
if resolution:
compositor.attrs['resolution'] = resolution
if calibration:
compositor.attrs['calibration'] = calibration
if polarization:
# All bits carry information, we update bit_start consequently
bit_start = np.arange(16, dtype=np.uint16).reshape((4, 4))
bit_start = np.tile(bit_start, (shape[0], shape[1]))
# Compute the final bit mask
dataset = bits_strip(bit_start, bit_count, byte_dataset)
# Apply quality assurance filter
if 'quality_assurance' in dataset_info:
quality_assurance_required = self._parse_resolution_info(
dataset_info['quality_assurance'], dataset_id.resolution
)
if quality_assurance_required is True:
# Get quality assurance dataset recursively
from satpy import DatasetID
quality_assurance_dataset_id = DatasetID(
name='quality_assurance', resolution=1000
)
quality_assurance_dataset_info = {
'name': 'quality_assurance',
'resolution': [1000],
'byte_dimension': 2,
'byte': [0],
'bit_start': 0,
'bit_count': 1,
'file_key': 'Quality_Assurance'
}
quality_assurance = self.get_dataset(
quality_assurance_dataset_id, quality_assurance_dataset_info
)
# Duplicate quality assurance dataset to create relevant filter
duplication_factor = [int(dataset_dim / quality_assurance_dim)
def _find_compositor(self, dataset_key, calibration=None,
polarization=None, resolution=None):
"""Find the compositor object for the given dataset_key."""
# NOTE: This function can not find a modifier that performs
# one or more modifications if it has modifiers see if we can find
# the unmodified version first
src_node = None
if isinstance(dataset_key, DatasetID) and dataset_key.modifiers:
new_prereq = DatasetID(
*dataset_key[:-1] + (dataset_key.modifiers[:-1],))
src_node, u = self._find_dependencies(new_prereq, calibration, polarization, resolution)
if u:
return None, u
try:
compositor = self.get_compositor(dataset_key)
except KeyError:
raise KeyError("Can't find anything called {}".format(
str(dataset_key)))
if resolution:
compositor.attrs['resolution'] = resolution
if calibration:
compositor.attrs['calibration'] = calibration
if polarization:
compositor.attrs['polarization'] = polarization
def filter_daytime(self, scene):
if self.fraction_day_scene is None:
self._calc_percent_day(scene)
# make a copy of the scene list so we can edit it later
for ds in list(scene):
if ds.attrs['standard_name'] in ('toa_bidirectional_reflectance',) and \
self.fraction_day_scene <= self.day_fraction:
ds_id = DatasetID.from_dict(ds.attrs)
LOG.info("Will not create product '%s' because there is less than %f%% of day data",
ds.attrs['name'], self.day_fraction * 100.)
del scene[ds_id]
def _analyze_messages(self, grib_file):
grib_file.seek(0)
for idx, msg in enumerate(grib_file):
msg_id = DatasetID(name=msg['shortName'],
level=msg['level'])
ds_info = {
'message': idx + 1,
'name': msg['shortName'],
'level': msg['level'],
'file_type': self.filetype_info['file_type'],
}
self._msg_datasets[msg_id] = ds_info