Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_membership_functions(filename):
"""Reads membership function parameters from wradlib-data file.
Parameters
----------
filename : filename
Filename of wradlib-data file
Returns
-------
msf : :class:`numpy:numpy.ndarray`
Array of membership funcions with shape (hm-classes, observables,
indep-ranges, 5)
"""
gzip = util.import_optional('gzip')
with gzip.open(filename, 'rb') as f:
nclass = int(f.readline().decode().split(':')[1].strip())
nobs = int(f.readline().decode().split(':')[1].strip())
with warnings.catch_warnings():
warnings.filterwarnings('ignore', category=UserWarning)
data = np.genfromtxt(f, skip_header=10, autostrip=True,
invalid_raise=False)
data = np.reshape(data, (nobs, int(data.shape[0] / nobs), data.shape[1]))
msf = np.reshape(data, (data.shape[0], nclass, int(data.shape[1] / nclass),
data.shape[2]))
msf = np.swapaxes(msf, 0, 1)
return msf
The algorithm is based on the paper of :cite:`Wang2009`.
Parameters
----------
phidp : :class:`numpy:numpy.ndarray`
array of shape (...,nr) with nr being the number of range bins
rho : :class:`numpy:numpy.ndarray`
array of same shape as ``phidp``
width : int
Width of the analysis window
copy : bool
Leaves original ``phidp`` array unchanged if set to True
(default: False)
"""
# Check whether fast Fortran implementation is available
speedup = util.import_optional("wradlib.speedup")
shape = phidp.shape
assert rho.shape == shape, "rho and phidp must have the same shape."
phidp = phidp.reshape((-1, shape[-1]))
if copy:
phidp = phidp.copy()
rho = rho.reshape((-1, shape[-1]))
gradphi = util.gradient_from_smoothed(phidp)
beams, rs = phidp.shape
# Compute the standard deviation within windows of 9 range bins
stdarr = np.zeros(phidp.shape, dtype=np.float32)
for r in range(rs - 9):
stdarr[..., r] = np.std(phidp[..., r:r + 9], -1)
def get_radolan_filehandle(fname):
"""Opens radolan file and returns file handle
Parameters
----------
fname : string
filename
Returns
-------
f : object
filehandle
"""
gzip = util.import_optional('gzip')
# open file handle
try:
f = gzip.open(fname, 'rb')
f.read(1)
except IOError:
f = open(fname, 'rb')
f.read(1)
# rewind file
f.seek(0, 0)
return f
def get_rb_blob_data(datastring, blobid):
""" Read BLOB data from datastring and return it
Parameters
----------
datastring : string
Blob Description String
blobid : int
Number of requested blob
Returns
-------
data : string
Content of blob
"""
xmltodict = util.import_optional('xmltodict')
start = 0
search_string = '', start)
xmlstring = datastring[start:end + 1]
# cheat the xml parser by making xml well-known
xmldict = xmltodict.parse(xmlstring.decode() + '')
cmpr = get_rb_blob_attribute(xmldict, 'compression')
size = int(get_rb_blob_attribute(xmldict, 'size'))
data = datastring[end + 2:end + 2 + size] # read blob data to string
# decompress if necessary
Parameters
----------
datastring : string
Blob Description String
blobid : int
Number of requested blob
Returns
-------
data : string
Content of blob
"""
xmltodict = util.import_optional('xmltodict')
start = 0
searchString = '', start)
xmlstring = datastring[start:end + 1]
# cheat the xml parser by making xml well-known
xmldict = xmltodict.parse(xmlstring.decode() + '')
cmpr = get_RB_blob_attribute(xmldict, 'compression')
size = int(get_RB_blob_attribute(xmldict, 'size'))
data = datastring[end + 2:end + 2 + size] # read blob data to string
# decompress if necessary
def decompress(data):
"""Decompression of data
Parameters
----------
data : string
(from xml) data string containing compressed data.
"""
zlib = util.import_optional('zlib')
return zlib.decompress(data)
def decompress(data):
"""Decompression of data
Parameters
----------
data : string
(from xml) data string containing compressed data.
"""
zlib = util.import_optional('zlib')
return zlib.decompress(data)
proj : osr spatial reference object
GDAL OSR Spatial Reference Object describing projection
Returns
-------
output : :class:`numpy:numpy.ndarray`
(num volume bins, 3)
Examples
--------
See :ref:`/notebooks/workflow/recipe2.ipynb`.
"""
# make sure that elevs is an array
elevs = np.array([elevs]).ravel()
# create polar grid
el, az, r = util.meshgrid_n(elevs, azimuths, ranges)
# get projected coordinates
coords = georef.spherical_to_proj(r, az, el, sitecoords, proj=proj)
coords = coords.reshape(-1, 3)
return coords
python datetime.object.
Parameters
----------
name : string
representing a DWD product name
tz : timezone object
(see pytz package or datetime module for explanation)
in case the timezone of the data is not UTC
opt : currently unused
Returns
-------
time : timezone-aware datetime.datetime object
"""
return _getTimestampFromFilename(name).replace(tzinfo=util.UTC())
def _get_valid_pairs(self, obs, raw):
"""INTERNAL: Helper method to identify valid obs-raw pairs
"""
# checking input shape consistency
self._check_shape(obs, raw)
# radar values at gage locations
rawatobs = self.get_raw_at_obs(raw, obs)
# check where both gage and radar observations are valid
ix = np.intersect1d(util._idvalid(obs, minval=self.minval),
util._idvalid(rawatobs, minval=self.minval))
return rawatobs, ix