Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def radial_range(self):
"""Return the radial range of this dataset."""
if self._radial_range is None:
ngates = self._obj.attrs['bin_count']
# range_start = self._obj.attrs['range_start']
range_samples = self._obj.attrs['range_samples']
range_step = self._obj.attrs['range_step']
bin_range = range_step * range_samples
range_data = np.arange(bin_range / 2., bin_range * ngates,
bin_range,
dtype='float32')
range_attrs['meters_to_center_of_first_gate'] = bin_range / 2.
da = xr.DataArray(range_data, dims=['dim_1'], attrs=range_attrs)
self._radial_range = da
return self._radial_range
def n_samples(self):
if self._n_samples is None:
try:
da = xr.DataArray(self._obj.attrs['pulse'], dims=['dim_0'])
self._n_samples = da
except KeyError:
pass
return self._n_samples
('platform_type', 'fixed'),
('instrument_type', 'radar'),
('primary_axis', 'axis_z'),
('time_coverage_start', '1970-01-01T00:00:00Z'),
('time_coverage_end', '1970-01-01T00:00:00Z'),
('latitude', np.nan),
('longitude', np.nan),
('altitude', np.nan),
('altitude_agl', np.nan),
('sweep_group_name', (['sweep'], [np.nan])),
('sweep_fixed_angle', (['sweep'], [np.nan])),
('frequency', np.nan),
('status_xml', 'None')])
@xr.register_dataset_accessor('gamic')
class GamicAccessor(object):
"""Dataset Accessor for handling GAMIC HDF5 data files
"""
def __init__(self, xarray_obj):
self._obj = xarray_obj
self._radial_range = None
self._azimuth_range = None
self._elevation_range = None
self._time_range = None
self._sitecoords = None
self._polcoords = None
self._projection = None
self._time = None
@property
def radial_range(self):
ds = xr.decode_cf(ds, decode_times=decode_times,
decode_coords=decode_coords,
mask_and_scale=mask_and_scale)
# determine if same sweep
try:
index = self.sweep_angles.index(fixed_angle)
except ValueError:
nidx = len(self._sweeps) + 1
swp_grp_name = f'sweep_{nidx}'
self._sweeps[swp_grp_name] = ds
self.sweep_names.append(swp_grp_name)
self.sweep_angles.append(fixed_angle)
else:
dictkey = self.sweep_names[index]
self._sweeps[dictkey] = xr.merge([self._sweeps[dictkey], ds])
# coordinates wrap-up
vars = collections.OrderedDict()
coords = collections.OrderedDict()
if 'cf' in standard or georef:
coords['longitude'] = rt_grps['where'].attrs['lon']
coords['latitude'] = rt_grps['where'].attrs['lat']
coords['altitude'] = rt_grps['where'].attrs['height']
if 'cf' in standard or georef:
sweep_mode = _get_odim_sweep_mode(nch, ds_grps)
coords['sweep_mode'] = sweep_mode
if 'cf' in standard or decode_coords or georef:
coords.update(_get_odim_coordinates(nch, ds_grps))
# georeference needs coordinate variables
if georef:
geods = xr.Dataset(vars, coords)
geods = xarray.georeference_dataset(geods)
coords.update(geods.coords)
# time coordinate
if 'cf' in standard or decode_times:
timevals = _get_odim_timevalues(nch, ds_grps)
if decode_times:
coords['time'] = ([dim0], timevals, time_attrs)
else:
coords['time'] = ([dim0], timevals)
# assign global sweep attributes
fixed_angle = _get_odim_fixed_angle(nch, ds_grps)
if 'cf' in standard:
vars.update({'sweep_number': i,
'sweep_mode': sweep_mode,
'sweep_mode': sweep_mode,
'follow_mode': 'none',
'prt_mode': 'fixed',
'fixed_angle': fixed_angle})
if 'cf-full' in standard:
full_vars = _get_odim_full_vars(nch, ds_grps)
vars.update(full_vars)
# assign variables and coordinates
ds = ds.assign(vars)
ds = ds.assign_coords(**coords)
ds = ds.rename_dims({'dim_0': dim0, 'dim_1': 'range'})
# decode dataset if requested
if decode_times or decode_coords or mask_and_scale:
ds = xr.decode_cf(ds, decode_times=decode_times,
decode_coords=decode_coords,
mask_and_scale=mask_and_scale)
# determine if same sweep
try:
index = self.sweep_angles.index(fixed_angle)
except ValueError:
nidx = len(self._sweeps) + 1
swp_grp_name = f'sweep_{nidx}'
self._sweeps[swp_grp_name] = ds
self.sweep_names.append(swp_grp_name)
self.sweep_angles.append(fixed_angle)
else:
dictkey = self.sweep_names[index]
self._sweeps[dictkey] = xr.merge([self._sweeps[dictkey], ds])
Parameters
----------
nch : handle
netcdf4-file handle
grp : str
group to access
Returns
-------
nch : handle
xarray Dataset handle
"""
if grp is not None:
nch = nch.groups.get(grp, False)
if nch:
nch = xr.open_dataset(xr.backends.NetCDF4DataStore(nch), **kwargs)
return nch
self._elevation_range = elevation
return self._elevation_range
@property
def time_range(self):
"""Return the time range of this dataset."""
if self._time_range is None:
times = self._obj['timestamp'] / 1e6
attrs = {'units': 'seconds since 1970-01-01T00:00:00Z',
'standard_name': 'time'}
da = xr.DataArray(times, attrs=attrs)
self._time_range = da
return self._time_range
@xr.register_dataset_accessor('odim')
class OdimAccessor(object):
"""Dataset Accessor for handling ODIM_H5 data files
"""
def __init__(self, xarray_obj):
self._obj = xarray_obj
self._radial_range = None
self._azimuth_range = None
self._elevation_range = None
self._time_range = None
self._time_range2 = None
self._prt = None
self._n_samples = None
@property
def radial_range(self):
"""Return the radial range of this dataset."""