Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
dataset.attrs['platform_name'] = self.platform_name
dataset.attrs['sensor'] = self.sensor
dataset.attrs.update(key.to_dict())
return dataset
def getbitmask(self, wqsf, items=None):
"""Get the bitmask."""
if items is None:
items = ["INVALID", "SNOW_ICE", "INLAND_WATER", "SUSPECT",
"AC_FAIL", "CLOUD", "HISOLZEN", "OCNN_FAIL",
"CLOUD_MARGIN", "CLOUD_AMBIGUOUS", "LOWRW", "LAND"]
bflags = BitFlags(wqsf)
return reduce(np.logical_or, [bflags[item] for item in items])
class NCOLCIAngles(BaseFileHandler):
"""File handler for the OLCI angles."""
datasets = {'satellite_azimuth_angle': 'OAA',
'satellite_zenith_angle': 'OZA',
'solar_azimuth_angle': 'SAA',
'solar_zenith_angle': 'SZA'}
def __init__(self, filename, filename_info, filetype_info,
engine=None):
"""Init the file handler."""
super(NCOLCIAngles, self).__init__(filename, filename_info,
filetype_info)
self.nc = None
# TODO: get metadata from the manifest file (xfdumanifest.xml)
self.platform_name = PLATFORM_NAMES[filename_info['mission_id']]
self.sensor = 'olci'
def __getitem__(self, item):
"""Get the item."""
pos = self.meaning[item]
data = self._value
if isinstance(data, xr.DataArray):
data = data.data
res = ((data >> pos) % 2).astype(np.bool)
res = xr.DataArray(res, coords=self._value.coords,
attrs=self._value.attrs,
dims=self._value.dims)
else:
res = ((data >> pos) % 2).astype(np.bool)
return res
class NCOLCIBase(BaseFileHandler):
"""The OLCI reader base."""
def __init__(self, filename, filename_info, filetype_info,
engine=None):
"""Init the olci reader base."""
super(NCOLCIBase, self).__init__(filename, filename_info,
filetype_info)
self.nc = xr.open_dataset(self.filename,
decode_cf=True,
mask_and_scale=True,
engine=engine,
chunks={'columns': CHUNK_SIZE,
'rows': CHUNK_SIZE})
self.nc = self.nc.rename({'columns': 'x', 'rows': 'y'})
import os
import numpy as np
import xarray as xr
from pyresample import geometry
from satpy.readers.file_handlers import BaseFileHandler
from satpy import CHUNK_SIZE
# NetCDF doesn't support multi-threaded reading, trick it by opening
# as one whole chunk then split it up before we do any calculations
LOAD_CHUNK_SIZE = int(os.getenv('PYTROLL_LOAD_CHUNK_SIZE', -1))
logger = logging.getLogger(__name__)
class SCMIFileHandler(BaseFileHandler):
"""Handle a single SCMI NetCDF4 file."""
def __init__(self, filename, filename_info, filetype_info):
super(SCMIFileHandler, self).__init__(filename, filename_info,
filetype_info)
# xarray's default netcdf4 engine
self.nc = xr.open_dataset(self.filename,
decode_cf=True,
mask_and_scale=False,
chunks={'x': LOAD_CHUNK_SIZE, 'y': LOAD_CHUNK_SIZE})
self.platform_name = self.nc.attrs['satellite_id']
self.sensor = self._get_sensor()
self.nlines = self.nc.dims['y']
self.ncols = self.nc.dims['x']
self.coords = {}
return int(r.text)
except ValueError:
try:
return float(r.text)
except ValueError:
return r.text
for x in r.findall("./*"):
if x.tag in d and not isinstance(d[x.tag], list):
d[x.tag] = [d[x.tag]]
d[x.tag].append(dictify(x, False))
else:
d[x.tag] = dictify(x, False)
return d
class SAFEXML(BaseFileHandler):
"""XML file reader for the SAFE format."""
def __init__(self, filename, filename_info, filetype_info,
header_file=None):
super(SAFEXML, self).__init__(filename, filename_info, filetype_info)
self._start_time = filename_info['start_time']
self._end_time = filename_info['end_time']
self._polarization = filename_info['polarization']
self.root = ET.parse(self.filename)
self.hdr = {}
if header_file is not None:
self.hdr = header_file.get_metadata()
def get_metadata(self):
"""Convert the xml metadata to dict."""
def __init__(self, filename, filename_info, filetype_info):
BaseFileHandler.__init__(self, filename, filename_info, filetype_info)
try:
self.sd = SD(self.filename)
except HDF4Error as err:
error_message = "Could not load data from file {}: {}".format(self.filename, err)
raise ValueError(error_message)
# Read metadata
self.metadata = self.read_mda(self.sd.attributes()['CoreMetadata.0'])
self.metadata.update(self.read_mda(
self.sd.attributes()['StructMetadata.0'])
)
self.metadata.update(self.read_mda(
self.sd.attributes()['ArchiveMetadata.0'])
)
import pyproj
from pyresample import geometry
from pyspectral.blackbody import blackbody_wn_rad2temp as rad2temp
from satpy.readers.file_handlers import BaseFileHandler
from satpy import CHUNK_SIZE
logger = logging.getLogger(__name__)
PLATFORM_NAMES = {
'GK-2A': 'GEO-KOMPSAT-2A',
'GK-2B': 'GEO-KOMPSAT-2B',
}
class AMIL1bNetCDF(BaseFileHandler):
"""Base reader for AMI L1B NetCDF4 files."""
def __init__(self, filename, filename_info, filetype_info,
calib_mode='PYSPECTRAL', allow_conditional_pixels=False):
"""Open the NetCDF file with xarray and prepare the Dataset for reading."""
super(AMIL1bNetCDF, self).__init__(filename, filename_info, filetype_info)
self.nc = xr.open_dataset(self.filename,
decode_cf=True,
mask_and_scale=False,
chunks={'dim_image_x': CHUNK_SIZE, 'dim_image_y': CHUNK_SIZE})
self.nc = self.nc.rename({'dim_image_x': 'x', 'dim_image_y': 'y'})
platform_shortname = self.nc.attrs['satellite_name']
self.platform_name = PLATFORM_NAMES.get(platform_shortname)
self.sensor = 'ami'
self.allow_conditional_pixels = allow_conditional_pixels
"""
import logging
import h5py
import numpy as np
from xarray import DataArray
import dask.array as da
from satpy.readers.file_handlers import BaseFileHandler
from satpy import CHUNK_SIZE
logger = logging.getLogger(__name__)
class MAIAFileHandler(BaseFileHandler):
def __init__(self, filename, filename_info, filetype_info):
super(MAIAFileHandler, self).__init__(
filename, filename_info, filetype_info)
self.finfo = filename_info
# set the day date part for end_time from the file name
self.finfo['end_time'] = self.finfo['end_time'].replace(
year=self.finfo['start_time'].year,
month=self.finfo['start_time'].month,
day=self.finfo['start_time'].day)
if self.finfo['end_time'] < self.finfo['start_time']:
myday = self.finfo['end_time'].day
self.finfo['end_time'] = self.finfo['end_time'].replace(
day=myday + 1)
self.selected = None
self.read(self.filename)
from pyresample import geometry
from datetime import datetime
from satpy import DatasetID, CHUNK_SIZE
from satpy.readers.file_handlers import BaseFileHandler
import pygrib
LOG = logging.getLogger(__name__)
CF_UNITS = {
'none': '1',
}
class GRIBFileHandler(BaseFileHandler):
def __init__(self, filename, filename_info, filetype_info):
super(GRIBFileHandler, self).__init__(filename, filename_info, filetype_info)
self._msg_datasets = {}
self._start_time = None
self._end_time = None
try:
with pygrib.open(self.filename) as grib_file:
first_msg = grib_file.message(1)
last_msg = grib_file.message(grib_file.messages)
start_time = self._convert_datetime(
first_msg, 'validityDate', 'validityTime')
end_time = self._convert_datetime(
last_msg, 'validityDate', 'validityTime')
self._start_time = start_time
from pyresample import geometry
from datetime import datetime, timedelta
from satpy import CHUNK_SIZE
from satpy.readers.file_handlers import BaseFileHandler
import pygrib
LOG = logging.getLogger(__name__)
CF_UNITS = {
'none': '1',
}
class HSAFFileHandler(BaseFileHandler):
def __init__(self, filename, filename_info, filetype_info):
super(HSAFFileHandler, self).__init__(filename,
filename_info,
filetype_info)
self._msg_datasets = {}
self._start_time = None
self._end_time = None
try:
with pygrib.open(self.filename) as grib_file:
first_msg = grib_file.message(1)
analysis_time = self._get_datetime(first_msg)
self._analysis_time = analysis_time
self.metadata = self.get_metadata(first_msg)
(7307, 13852): NORTH_HEMIS_EAST,
(2267, 13852): SOUTH_HEMIS_EAST,
(5419, 13244): NORTH_HEMIS_WEST,
(4251, 11044): SOUTH_HEMIS_WEST
} # (nlines, ncols)
SCAN_DURATION = {
FULL_DISC: timedelta(minutes=26),
NORTH_HEMIS_WEST: timedelta(minutes=10, seconds=5),
SOUTH_HEMIS_WEST: timedelta(minutes=6, seconds=54),
NORTH_HEMIS_EAST: timedelta(minutes=14, seconds=15),
SOUTH_HEMIS_EAST: timedelta(minutes=4, seconds=49)
} # Source: [SCHED-W], [SCHED-E]
class GOESNCBaseFileHandler(BaseFileHandler):
"""File handler for GOES Imager data in netCDF format"""
def __init__(self, filename, filename_info, filetype_info, geo_data=None):
"""Initialize the reader."""
super(GOESNCBaseFileHandler, self).__init__(filename, filename_info,
filetype_info)
self.nc = xr.open_dataset(self.filename,
decode_cf=True,
mask_and_scale=False,
chunks={'xc': CHUNK_SIZE, 'yc': CHUNK_SIZE})
self.sensor = 'goes_imager'
self.nlines = self.nc.dims['yc']
self.ncols = self.nc.dims['xc']
self.platform_name = self._get_platform_name(
self.nc.attrs['Satellite Sensor'])
self.platform_shortname = self.platform_name.replace('-', '').lower()
self.gvar_channel = int(self.nc['bands'].values)