Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if isinstance(layer, DocBasicLayer):
family = layer[Info.FAMILY]
# one layer that represents all the layers in this family
family_rep = self[self._families[family][0]]
# convert family subcategory to displayable name
# if isinstance(family[2], UUID):
# # RGB Recipes, this needs more thinking
# display_family = family_rep[Info.SHORT_NAME]
# elif not isinstance(family[2], str):
# display_family = "{:.02f} µm".format(family[2])
# else:
# display_family = family[2]
family_name_components = family.split(':')
display_family = family_rep[Info.SHORT_NAME] + ' ' + ' '.join(reversed(family_name_components))
# display_family = str(family)
# NOTE: For RGBs the SHORT_NAME will update as the RGB changes
return {
Info.VALID_RANGE: family_rep[Info.VALID_RANGE],
Info.UNIT_CONVERSION: family_rep[Info.UNIT_CONVERSION],
Info.SHORT_NAME: family_rep[Info.SHORT_NAME],
Info.UNITS: family_rep[Info.UNITS],
Info.KIND: family_rep[Info.KIND],
Info.DISPLAY_FAMILY: display_family,
}
# another has a different missing time step
time_master = max(namespace_siblings.values(), key=lambda v: len(v))
for idx in range(len(time_master)):
t = self[time_master[idx]][Info.SCHED_TIME]
channel_siblings = [(self[u][Info.SHORT_NAME], u) for u in self.channel_siblings(time_master[idx])[0]]
temp_namespace = {}
for sn, u in channel_siblings:
if sn not in short_name_to_ns_name:
continue
# set each ns variable to this UUID
for ns_name in short_name_to_ns_name[sn]:
temp_namespace[ns_name] = u
if len(temp_namespace) != len(namespace):
LOG.info("Missing some layers to create algebraic layer at {:%Y-%m-%d %H:%M:%S}".format(t))
continue
LOG.info("Creating algebraic layer '{}' for time {:%Y-%m-%d %H:%M:%S}".format(info.get(Info.SHORT_NAME),
self[time_master[idx]].get(
Info.SCHED_TIME)))
uuid, layer_info, data = self._workspace.create_algebraic_composite(operations, temp_namespace, info.copy())
self._layer_with_uuid[uuid] = dataset = DocBasicLayer(self, layer_info)
presentation, reordered_indices = self._insert_layer_with_info(dataset, insert_before=insert_before)
if Info.UNIT_CONVERSION not in dataset:
dataset[Info.UNIT_CONVERSION] = units_conversion(dataset)
if Info.FAMILY not in dataset:
dataset[Info.FAMILY] = self.family_for_product_or_layer(dataset)
self._add_layer_family(dataset)
self.didAddCompositeLayer.emit(reordered_indices, dataset.uuid, presentation)
info.setdefault(Info.UNITS, '1')
max_meta = max(md_list, key=lambda x: x[Info.SHAPE])
for k in (Info.PROJ, Info.ORIGIN_X, Info.ORIGIN_Y, Info.CELL_WIDTH, Info.CELL_HEIGHT, Info.SHAPE):
info[k] = max_meta[k]
info[Info.VALID_RANGE] = (np.nanmin(composite_array), np.nanmax(composite_array))
info[Info.CLIM] = (np.nanmin(composite_array), np.nanmax(composite_array))
info[Info.OBS_TIME] = min([x[Info.OBS_TIME] for x in md_list])
info[Info.SCHED_TIME] = min([x[Info.SCHED_TIME] for x in md_list])
# get the overall observation time
info[Info.OBS_DURATION] = max([
x[Info.OBS_TIME] + x.get(Info.OBS_DURATION, timedelta(seconds=0)) for x in md_list]) - info[Info.OBS_TIME]
# generate family and category names
info[Info.FAMILY] = family = self._merge_famcat_strings(md_list, Info.FAMILY, suffix=info.get(Info.SHORT_NAME))
info[Info.CATEGORY] = category = self._merge_famcat_strings(md_list, Info.CATEGORY)
info[Info.SERIAL] = serial = self._merge_famcat_strings(md_list, Info.SERIAL)
LOG.debug("algebraic product will be {}::{}::{}".format(family, category, serial))
return info
LOG.debug('DEPRECATED: setting origin_x on resource')
@property
def origin_y(self):
nat = self.content[-1] if len(self.content) else None
return nat.origin_y if nat else None
@origin_y.setter
def origin_y(self, value):
LOG.debug('DEPRECATED: setting origin_y on resource')
def can_be_activated_without_importing(self):
return len(self.content) > 0
INFO_TO_FIELD = {
Info.SHORT_NAME: 'name',
Info.UUID: 'uuid',
Info.PROJ: 'proj4',
Info.OBS_TIME: 'obs_time',
Info.OBS_DURATION: 'obs_duration',
Info.CELL_WIDTH: 'cell_width',
Info.CELL_HEIGHT: 'cell_height',
Info.ORIGIN_X: 'origin_x',
Info.ORIGIN_Y: 'origin_y',
Info.FAMILY: 'family',
Info.CATEGORY: 'category',
Info.SERIAL: 'serial'
}
def touch(self, when=None):
self.atime = when = when or datetime.utcnow()
[x.touch(when) for x in self.resource]
Returns: dict of overall metadata (same as `info`)
"""
if not all(x[Info.PROJ] == md_list[0][Info.PROJ] for x in md_list[1:]):
raise ValueError("Algebraic inputs must all be the same projection")
uuid = uuidgen()
info[Info.UUID] = uuid
for k in (Info.PLATFORM, Info.INSTRUMENT, Info.SCENE):
if md_list[0].get(k) is None:
continue
if all(x.get(k) == md_list[0].get(k) for x in md_list[1:]):
info.setdefault(k, md_list[0][k])
info.setdefault(Info.KIND, Kind.COMPOSITE)
info.setdefault(Info.SHORT_NAME, '')
info.setdefault(Info.DATASET_NAME, info[Info.SHORT_NAME])
info.setdefault(Info.UNITS, '1')
max_meta = max(md_list, key=lambda x: x[Info.SHAPE])
for k in (Info.PROJ, Info.ORIGIN_X, Info.ORIGIN_Y, Info.CELL_WIDTH, Info.CELL_HEIGHT, Info.SHAPE):
info[k] = max_meta[k]
info[Info.VALID_RANGE] = (np.nanmin(composite_array), np.nanmax(composite_array))
info[Info.CLIM] = (np.nanmin(composite_array), np.nanmax(composite_array))
info[Info.OBS_TIME] = min([x[Info.OBS_TIME] for x in md_list])
info[Info.SCHED_TIME] = min([x[Info.SCHED_TIME] for x in md_list])
# get the overall observation time
info[Info.OBS_DURATION] = max([
x[Info.OBS_TIME] + x.get(Info.OBS_DURATION, timedelta(seconds=0)) for x in md_list]) - info[Info.OBS_TIME]
# generate family and category names
layer_str = "N/A"
else:
info = self.document[uuid]
unit_info = info[Info.UNIT_CONVERSION]
data_point = unit_info[1](data_point)
data_str = unit_info[2](data_point, numeric=False)
if info.get(Info.CENTRAL_WAVELENGTH):
wl = info[Info.CENTRAL_WAVELENGTH]
if wl < 4.1:
wl_str = "{:0.02f} µm".format(wl)
else:
wl_str = "{:0.01f} µm".format(wl)
layer_str = "{}, {}".format(info[Info.SHORT_NAME],
wl_str)
else:
layer_str = info[Info.SHORT_NAME]
else:
data_str = "N/A"
layer_str = "N/A"
self.ui.cursorProbeLayer.setText(layer_str)
self.ui.cursorProbeText.setText("{} ({})".format(data_str, probe_loc))
def __init__(self, doc, uuid, parent=None):
super(ChangeColormapDialog, self).__init__(parent)
self.ui = Ui_changeColormapDialog()
self.ui.setupUi(self)
self.doc = doc
self.uuid = uuid
self._slider_steps = 100
self.valid_min, self.valid_max = self.doc.valid_range_for_uuid(self.uuid)
prez = self.doc.prez_for_uuid(self.uuid)
layer = self.doc[self.uuid]
conv = layer[Info.UNIT_CONVERSION]
self.setWindowTitle(str(self.windowTitle()) + ": " + self.doc[self.uuid][Info.SHORT_NAME])
self._initial_cmap = prez.colormap
self._current_cmap = self._initial_cmap
self._initial_clims = prez.climits
self._current_clims = self._initial_clims
self._initial_gamma = prez.gamma
self._current_gamma = self._initial_gamma
self._validator = QtGui.QDoubleValidator()
self._init_cmap_combo()
self._init_vmin_slider()
self.ui.vmin_edit.setValidator(self._validator)
self.ui.vmin_edit.setText('{:0.03f}'.format(conv[1](self._initial_clims[0])))
self._init_vmax_slider()
self.ui.vmax_edit.setValidator(self._validator)
self.ui.vmax_edit.setText('{:0.03f}'.format(conv[1](self._initial_clims[1])))
self.ui.gammaSpinBox.setValue(self._initial_gamma)
def _get_code_block(self, shared_info: defaultdict, layer_uuid: UUID):
ns, codeblock = self.document.get_algebraic_namespace(layer_uuid)
if codeblock:
short_names = []
for name, uuid in ns.items():
try:
dep_info = self.document[uuid]
short_name = dep_info.get(Info.SHORT_NAME, '')
except KeyError:
LOG.debug("Layer '{}' not found in document".format(uuid))
short_name = ''
short_names.append("# {} = {}".format(name, short_name))
ns_str = "\n".join(short_names)
codeblock_str = ns_str + '\n\n' + codeblock
else:
codeblock_str = ''
if 'codeblock' not in shared_info:
shared_info['codeblock'] = codeblock_str
else:
shared_info['codeblock'] = '' if shared_info['codeblock'] != codeblock_str else codeblock_str
def collect_info(self, info):
"""Collect information that may not come from the dataset.
This method should only be called once to "fill in" metadata
that isn't originally known about an opened file. The provided `info`
is used as a starting point, but is not modified by this method.
"""
z = {}
band_short_name = info.get(Info.DATASET_NAME, '???')
# FIXME: Don't use pure DATASET_NAME since resolution should not be part of the SHORT_NAME
# And/or don't use SHORT_NAME for grouping
if Info.SHORT_NAME not in info:
z[Info.SHORT_NAME] = band_short_name
else:
z[Info.SHORT_NAME] = info[Info.SHORT_NAME]
if Info.LONG_NAME not in info:
z[Info.LONG_NAME] = info.get(Info.SHORT_NAME, z[Info.SHORT_NAME])
z.setdefault(Info.STANDARD_NAME, info.get(Info.STANDARD_NAME, 'unknown'))
if info.get(Info.UNITS, z.get(Info.UNITS)) in ['K', 'Kelvin']:
z[Info.UNITS] = 'kelvin'
return z