Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if self.resolution != key.resolution:
return
datasets = datadict[self.resolution]
for dataset in datasets:
subdata = self.sd.select(dataset)
band_names = subdata.attributes()["band_names"].split(",")
# get the relative indices of the desired channel
try:
index = band_names.index(key.name)
except ValueError:
continue
uncertainty = self.sd.select(dataset + "_Uncert_Indexes")
array = xr.DataArray(da.from_array(subdata[index, :, :], chunks=CHUNK_SIZE),
dims=['y', 'x']).astype(np.float32)
valid_range = subdata.attributes()['valid_range']
array = array.where(array >= np.float32(valid_range[0]))
array = array.where(array <= np.float32(valid_range[1]))
array = array.where(uncertainty[index, :, :] < 15)
if dataset.endswith('Emissive'):
projectable = calibrate_tb(array, subdata.attributes(), index, key.name)
else:
projectable = calibrate_refl(array, subdata.attributes(), index)
projectable.attrs = info
# if ((platform_name == 'Aqua' and key.name in ["6", "27", "36"]) or
# (platform_name == 'Terra' and key.name in ["29"])):
# height, width = projectable.shape
# row_indices = projectable.mask.sum(1) == width
def read(self, filename):
self.h5 = h5py.File(filename, 'r')
missing = -9999.
self.Lat = da.from_array(self.h5[u'DATA/Latitude'], chunks=CHUNK_SIZE) / 10000.
self.Lon = da.from_array(self.h5[u'DATA/Longitude'], chunks=CHUNK_SIZE) / 10000.
self.selected = (self.Lon > missing)
self.file_content = {}
for key in self.h5['DATA'].keys():
self.file_content[key] = da.from_array(self.h5[u'DATA/' + key], chunks=CHUNK_SIZE)
for key in self.h5[u'HEADER'].keys():
self.file_content[key] = self.h5[u'HEADER/' + key][:]
# Cloud Mask on pixel
mask = 2**0 + 2**1 + 2**2
lst = self.file_content[u'CloudMask'] & mask
lst = lst / 2**0
self.file_content[u"cma"] = lst
# Cloud Mask confidence
mask = 2**5 + 2**6
lst = self.file_content[u'CloudMask'] & mask
lst = lst / 2**5
self.file_content[u"cma_conf"] = lst
# Cloud Mask Quality
tb13_4 = None
for dataset in optional_datasets:
wavelengths = dataset.attrs.get('wavelength', [100., 0, 0])
if (dataset.attrs.get('units') == 'K' and
wavelengths[0] <= 13.4 <= wavelengths[2]):
tb13_4 = dataset
elif ("standard_name" in dataset.attrs and
dataset.attrs["standard_name"] == "solar_zenith_angle"):
sun_zenith = dataset
# Check if the sun-zenith angle was provided:
if sun_zenith is None:
if sun_zenith_angle is None:
raise ImportError("No module named pyorbital.astronomy")
lons, lats = _nir.attrs["area"].get_lonlats(chunks=CHUNK_SIZE)
sun_zenith = sun_zenith_angle(_nir.attrs['start_time'], lons, lats)
return self._refl3x.reflectance_from_tbs(sun_zenith, _nir, _tb11, tb_ir_co2=tb13_4)
def read_band_blocks(self, blocksize=CHUNK_SIZE):
"""Read the band in native blocks."""
# For sentinel 1 data, the block are 1 line, and dask seems to choke on that.
band = self.filehandle
shape = band.shape
token = tokenize(blocksize, band)
name = 'read_band-' + token
dskx = dict()
if len(band.block_shapes) != 1:
raise NotImplementedError('Bands with multiple shapes not supported.')
else:
chunks = band.block_shapes[0]
def do_read(the_band, the_window, the_lock):
with the_lock:
return the_band.read(1, None, window=the_window)
def get_dataset(self, dataset_id, ds_info):
"""Read a GRIB message into an xarray DataArray."""
msg = self._get_message(ds_info)
ds_info = self.get_metadata(msg, ds_info)
fill = msg['missingValue']
data = msg.values.astype(np.float32)
if isinstance(data, np.ma.MaskedArray):
data = data.filled(np.nan)
data = da.from_array(data, chunks=CHUNK_SIZE)
else:
data[data == fill] = np.nan
data = da.from_array(data, chunks=CHUNK_SIZE)
return xr.DataArray(data, attrs=ds_info, dims=('y', 'x'))
def interpolate_xarray(xpoints, ypoints, values, shape, kind='cubic',
blocksize=CHUNK_SIZE):
"""Interpolate, generating a dask array."""
vchunks = range(0, shape[0], blocksize)
hchunks = range(0, shape[1], blocksize)
token = tokenize(blocksize, xpoints, ypoints, values, kind, shape)
name = 'interpolate-' + token
from scipy.interpolate import interp2d
interpolator = interp2d(xpoints, ypoints, values, kind=kind)
dskx = {(name, i, j): (interpolate_slice,
slice(vcs, min(vcs + blocksize, shape[0])),
slice(hcs, min(hcs + blocksize, shape[1])),
interpolator)
for i, vcs in enumerate(vchunks)
for j, hcs in enumerate(hchunks)
def read_dataset(fid, key):
"""Read dataset"""
dsid = DSET_NAMES[key.name]
dset = fid["/PWLR/" + dsid]
if dset.ndim == 3:
dims = ['y', 'x', 'level']
else:
dims = ['y', 'x']
data = xr.DataArray(da.from_array(dset.value, chunks=CHUNK_SIZE),
name=key.name, dims=dims).astype(np.float32)
data = xr.where(data > 1e30, np.nan, data)
dset_attrs = dict(dset.attrs)
data.attrs.update(dset_attrs)
return data
def __init__(self, filename, filename_info, filetype_info,
calib_mode='PYSPECTRAL', allow_conditional_pixels=False):
"""Open the NetCDF file with xarray and prepare the Dataset for reading."""
super(AMIL1bNetCDF, self).__init__(filename, filename_info, filetype_info)
self.nc = xr.open_dataset(self.filename,
decode_cf=True,
mask_and_scale=False,
chunks={'dim_image_x': CHUNK_SIZE, 'dim_image_y': CHUNK_SIZE})
self.nc = self.nc.rename({'dim_image_x': 'x', 'dim_image_y': 'y'})
platform_shortname = self.nc.attrs['satellite_name']
self.platform_name = PLATFORM_NAMES.get(platform_shortname)
self.sensor = 'ami'
self.allow_conditional_pixels = allow_conditional_pixels
calib_mode_choices = ('FILE', 'PYSPECTRAL')
if calib_mode.upper() not in calib_mode_choices:
raise ValueError('Invalid calibration mode: {}. Choose one of {}'.format(
calib_mode, calib_mode_choices))
self.calib_mode = calib_mode.upper()
def __getitem__(self, key):
"""Get item for given key."""
val = self.file_content[key]
if isinstance(val, h5py.Dataset):
# these datasets are closed and inaccessible when the file is closed, need to reopen
dset = h5py.File(self.filename, 'r')[key]
dset_data = da.from_array(dset, chunks=CHUNK_SIZE)
if dset.ndim == 2:
return xr.DataArray(dset_data, dims=['y', 'x'], attrs=dset.attrs)
return xr.DataArray(dset_data, attrs=dset.attrs)
return val
try:
output_data = dnb_product.copy_array(filename=filename, read_only=False)
# use SatPy to perform the calculations
from satpy.composites.viirs import NCCZinke
from satpy import CHUNK_SIZE
import xarray as xr
import dask.array as da
dnb_data = xr.DataArray(da.from_array(dnb_data, chunks=CHUNK_SIZE), attrs={
# 'units': "W m-2 sr-1",
'units': "W cm-2 sr-1",
'calibration': 'radiance',
'wavelength': (0.500, 0.700, 0.900),
})
sza_data = xr.DataArray(da.from_array(sza_data, chunks=CHUNK_SIZE))
lza_data = xr.DataArray(da.from_array(sza_data, chunks=CHUNK_SIZE))
sza_data = xr.DataArray(da.from_array(sza_data, chunks=CHUNK_SIZE))
moon_illum_fraction = xr.DataArray(da.from_array(moon_illum_fraction, chunks=CHUNK_SIZE))
compositor = NCCZinke(product_name,
prequisites=[dnb_product_name,
sza_product_name,
lza_product_name,
'moon_illumination_fraction'])
hncc_ds = compositor([dnb_data, sza_data, lza_data, moon_illum_fraction])
output_data[:] = hncc_ds.data.compute()
one_swath = self.create_secondary_swath_object(product_name, swath_definition, filename,
dnb_product["data_type"], products_created)
except (RuntimeError, ValueError, KeyError, OSError):
if os.path.isfile(filename):
os.remove(filename)
raise
return one_swath