Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def available_rgb_components(self):
non_rgb_classes = [DocBasicLayer, DocCompositeLayer]
valid_ranges = {}
for layer in self.layers_where(is_valid=True, in_type_set=non_rgb_classes):
sname = layer.get(Info.CENTRAL_WAVELENGTH, layer[Info.DATASET_NAME])
valid_ranges.setdefault(sname, layer[Info.VALID_RANGE])
return valid_ranges
try:
data_point = self.workspace.get_content_point(uuid, xy_pos)
except ValueError:
LOG.debug("Could not get data value", exc_info=True)
data_point = None
if data_point is None:
data_str = "N/A"
layer_str = "N/A"
else:
info = self.document[uuid]
unit_info = info[Info.UNIT_CONVERSION]
data_point = unit_info[1](data_point)
data_str = unit_info[2](data_point, numeric=False)
if info.get(Info.CENTRAL_WAVELENGTH):
wl = info[Info.CENTRAL_WAVELENGTH]
if wl < 4.1:
wl_str = "{:0.02f} µm".format(wl)
else:
wl_str = "{:0.01f} µm".format(wl)
layer_str = "{}, {}".format(info[Info.SHORT_NAME],
wl_str)
else:
layer_str = info[Info.SHORT_NAME]
else:
data_str = "N/A"
layer_str = "N/A"
self.ui.cursorProbeLayer.setText(layer_str)
self.ui.cursorProbeText.setText("{} ({})".format(data_str, probe_loc))
elif state and uuid is not None:
try:
data_point = self.workspace.get_content_point(uuid, xy_pos)
except ValueError:
LOG.debug("Could not get data value", exc_info=True)
data_point = None
if data_point is None:
data_str = "N/A"
layer_str = "N/A"
else:
info = self.document[uuid]
unit_info = info[Info.UNIT_CONVERSION]
data_point = unit_info[1](data_point)
data_str = unit_info[2](data_point, numeric=False)
if info.get(Info.CENTRAL_WAVELENGTH):
wl = info[Info.CENTRAL_WAVELENGTH]
if wl < 4.1:
wl_str = "{:0.02f} µm".format(wl)
else:
wl_str = "{:0.01f} µm".format(wl)
layer_str = "{}, {}".format(info[Info.SHORT_NAME],
wl_str)
else:
layer_str = info[Info.SHORT_NAME]
else:
data_str = "N/A"
layer_str = "N/A"
self.ui.cursorProbeLayer.setText(layer_str)
self.ui.cursorProbeText.setText("{} ({})".format(data_str, probe_loc))
def central_wavelength(self):
gb = self._get_if_not_none_func(item=Info.CENTRAL_WAVELENGTH)
return gb(self.r), gb(self.g), gb(self.b)
if fam:
return fam[0]
uuid_or_layer = self[uuid_or_layer]
if Info.FAMILY in uuid_or_layer:
LOG.debug('using pre-existing family {}'.format(uuid_or_layer[Info.FAMILY]))
return uuid_or_layer[Info.FAMILY]
# kind:pointofreference:measurement:wavelength
kind = uuid_or_layer[Info.KIND]
refpoint = 'unknown' # FUTURE: geo/leo
measurement = uuid_or_layer.get(Info.STANDARD_NAME)
if uuid_or_layer.get('recipe'):
# RGB
subcat = uuid_or_layer['recipe'].name
elif uuid_or_layer.get(Info.CENTRAL_WAVELENGTH):
# basic band
subcat = uuid_or_layer[Info.CENTRAL_WAVELENGTH]
else:
# higher level product or algebraic layer
subcat = uuid_or_layer[Info.DATASET_NAME]
return "{}:{}:{}:{}".format(kind.name, refpoint, measurement, subcat)
this_prez = None
for prez_tuple in presentation_info:
if prez_tuple.uuid == layer_uuid:
this_prez = prez_tuple
break
# compare our various values
self._update_if_different(shared_info, layer_info, Info.DISPLAY_NAME)
self._update_if_different(shared_info, layer_info, Info.DISPLAY_TIME)
self._update_if_different(shared_info, layer_info, Info.INSTRUMENT)
self._update_if_different(shared_info, this_prez, attr='colormap')
self._get_shared_color_limits(shared_info, layer_info, this_prez)
self._get_code_block(shared_info, layer_uuid)
# wavelength
wl = layer_info.get(Info.CENTRAL_WAVELENGTH)
fmt = "{:0.2f} µm"
if isinstance(wl, (tuple, list)):
wl = [fmt.format(x) if x is not None else '---' for x in wl]
wl = "(" + ", ".join(wl) + ")"
else:
wl = fmt.format(wl) if wl is not None else '---'
if Info.CENTRAL_WAVELENGTH not in shared_info:
shared_info[Info.CENTRAL_WAVELENGTH] = wl
else:
shared_info[Info.CENTRAL_WAVELENGTH] = "---" if shared_info[Info.CENTRAL_WAVELENGTH] != wl else wl
# set the various text displays
self._display_shared_info(shared_info)
for ds in self.scn:
start_time = ds.attrs['start_time']
ds.attrs[Info.OBS_TIME] = start_time
ds.attrs[Info.SCHED_TIME] = start_time
duration = ds.attrs.get('end_time', start_time) - start_time
if duration.total_seconds() <= 0:
duration = timedelta(minutes=60)
ds.attrs[Info.OBS_DURATION] = duration
# Handle GRIB platform/instrument
ds.attrs[Info.KIND] = Kind.IMAGE if self.reader != 'grib' else \
Kind.CONTOUR
self._get_platform_instrument(ds.attrs)
ds.attrs.setdefault(Info.STANDARD_NAME, ds.attrs.get('standard_name'))
if 'wavelength' in ds.attrs:
ds.attrs.setdefault(Info.CENTRAL_WAVELENGTH,
ds.attrs['wavelength'][1])
# Resolve anything else needed by SIFT
id_str = ":".join(str(v) for v in DatasetID.from_dict(ds.attrs))
ds.attrs[Info.DATASET_NAME] = id_str
model_time = ds.attrs.get('model_time')
if model_time is not None:
ds.attrs[Info.DATASET_NAME] += " " + model_time.isoformat()
ds.attrs[Info.SHORT_NAME] = ds.attrs['name']
if ds.attrs.get('level') is not None:
ds.attrs[Info.SHORT_NAME] = "{} @ {}hPa".format(
ds.attrs['name'], ds.attrs['level'])
ds.attrs[Info.SHAPE] = ds.shape
ds.attrs[Info.UNITS] = ds.attrs.get('units')
if ds.attrs[Info.UNITS] == 'unknown':
LOG.warning("Layer units are unknown, using '1'")
# Generate FAMILY and CATEGORY
if 'model_time' in ds.attrs:
model_time = ds.attrs['model_time'].isoformat()
else:
model_time = None
ds.attrs[Info.SCENE] = ds.attrs.get('scene_id')
if ds.attrs[Info.SCENE] is None:
# compute a "good enough" hash for this Scene
area = ds.attrs['area']
extents = area.area_extent
# round extents to nearest 100 meters
extents = tuple(int(np.round(x / 100.0) * 100.0) for x in extents)
proj_str = area.proj4_string
ds.attrs[Info.SCENE] = "{}-{}".format(str(extents), proj_str)
if ds.attrs.get(Info.CENTRAL_WAVELENGTH) is None:
cw = ""
else:
cw = ":{:5.2f}µm".format(ds.attrs[Info.CENTRAL_WAVELENGTH])
ds.attrs[Info.FAMILY] = '{}:{}:{}{}'.format(
ds.attrs[Info.KIND].name, ds.attrs[Info.STANDARD_NAME],
ds.attrs[Info.SHORT_NAME], cw)
ds.attrs[Info.CATEGORY] = 'SatPy:{}:{}:{}'.format(
ds.attrs[Info.PLATFORM].name, ds.attrs[Info.INSTRUMENT].name,
ds.attrs[Info.SCENE]) # system:platform:instrument:target
# TODO: Include level or something else in addition to time?
start_str = ds.attrs['start_time'].isoformat()
ds.attrs[Info.SERIAL] = start_str if model_time is None else model_time + ":" + start_str
ds.attrs.setdefault('reader', self.reader)
return self.scn
recalculate origin and dimension information based on new upstream
:return:
"""
# FUTURE: resolve dictionary-style into attribute-style uses
dep_info = [self.r, self.g, self.b]
display_time = self._default_display_time()
short_name = self._default_short_name()
name = self._default_display_name(short_name=short_name, display_time=display_time)
ds_info = {
Info.DATASET_NAME: short_name,
Info.SHORT_NAME: short_name,
Info.DISPLAY_NAME: name,
Info.DISPLAY_TIME: display_time,
Info.SCHED_TIME: self.sched_time,
Info.BAND: self.band,
Info.CENTRAL_WAVELENGTH: self.central_wavelength,
Info.INSTRUMENT: self.instrument,
Info.PLATFORM: self.platform,
Info.SCENE: self.scene,
Info.UNIT_CONVERSION: self._get_units_conversion(),
Info.UNITS: None,
Info.VALID_RANGE: [d[Info.VALID_RANGE] if d else (None, None) for d in dep_info],
}
if self.r is None and self.g is None and self.b is None:
ds_info.update({
Info.ORIGIN_X: None,
Info.ORIGIN_Y: None,
Info.CELL_WIDTH: None,
Info.CELL_HEIGHT: None,
Info.PROJ: None,
Info.CLIM: ((None, None), (None, None), (None, None)),
self._update_if_different(shared_info, this_prez, attr='colormap')
self._get_shared_color_limits(shared_info, layer_info, this_prez)
self._get_code_block(shared_info, layer_uuid)
# wavelength
wl = layer_info.get(Info.CENTRAL_WAVELENGTH)
fmt = "{:0.2f} µm"
if isinstance(wl, (tuple, list)):
wl = [fmt.format(x) if x is not None else '---' for x in wl]
wl = "(" + ", ".join(wl) + ")"
else:
wl = fmt.format(wl) if wl is not None else '---'
if Info.CENTRAL_WAVELENGTH not in shared_info:
shared_info[Info.CENTRAL_WAVELENGTH] = wl
else:
shared_info[Info.CENTRAL_WAVELENGTH] = "---" if shared_info[Info.CENTRAL_WAVELENGTH] != wl else wl
# set the various text displays
self._display_shared_info(shared_info)