Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
try:
with pygrib.open(self.filename) as grib_file:
first_msg = grib_file.message(1)
last_msg = grib_file.message(grib_file.messages)
start_time = self._convert_datetime(
first_msg, 'validityDate', 'validityTime')
end_time = self._convert_datetime(
last_msg, 'validityDate', 'validityTime')
self._start_time = start_time
self._end_time = end_time
if 'keys' not in filetype_info:
self._analyze_messages(grib_file)
self._idx = None
else:
self._create_dataset_ids(filetype_info['keys'])
self._idx = pygrib.index(self.filename,
*filetype_info['keys'].keys())
except (RuntimeError, KeyError):
raise IOError("Unknown GRIB file format: {}".format(self.filename))
ts = ts.replace(minute=0)
fn = None
for offset in range(0, 24, 4):
ts2 = ts - datetime.timedelta(hours=offset)
testfn = ts2.strftime(
(
"/mesonet/ARCHIVE/data/%Y/%m/%d/model/ffg/"
"5kmffg_%Y%m%d%H.grib2"
)
)
if os.path.isfile(testfn):
fn = testfn
break
if fn is None:
raise NoDataFound("No valid grib data found!")
grbs = pygrib.index(fn, "stepRange")
grb = grbs.select(stepRange="0-%s" % (hour,))[0]
lats, lons = grb.latlons()
data = (
masked_array(grb.values, data_units=units("mm"))
.to(units("inch"))
.m
)
plot.pcolormesh(lons, lats, data, bins, cmap=cmap)
if ilabel:
plot.drawcounties()
df = pd.DataFrame()
return plot.fig, df
#assign dimensions
times[:] = self.ndate
levels[:] = self.levs
latitudes[:] = self.lats
longitudes[:] = self.lons
#make actual variables
variables = []
for var in self.nams:
# isolate from [u'Geopotential']
variables.append(ncd_root.createVariable(var,'f4',
('time','level',
'lat','lon',)))
#read file, get levels and times
grbindx = pg.index(self.file_grib,'name','level','dataDate',
'dataTime','step')
levs = np.array(self.levs)
for l in self.levs:
for d in self.jday:
nd = nc.date2num(d, units = "seconds since 1970-1-1",
calendar = 'standard')
var_n = 0
for var in self.nams:
#distinguish forecast data to deal with accumulated fields
if var in self.accumulated:
vpre = 0 #initial for subtraction
for s in self.step:
sel = grbindx.select(name = var, level = l,
dataDate = int(d.strftime("%Y%m%d")),
dataTime = d.hour * 100, step = s)
#assign dimensions
times[:] = self.ndate
levels[:] = self.levs
latitudes[:] = self.lats
longitudes[:] = self.lons
#make actual variables
variables = []
for var in self.nams:
# isolate from [u'Geopotential']
variables.append(ncd_root.createVariable(var,'f4',
('time','level',
'lat','lon',)))
#read file, get levels and times
grbindx = pg.index(self.file_grib,'name','level','dataDate',
'dataTime','step')
levs = np.array(self.levs)
for l in self.levs:
for d in self.jday:
nd = nc.date2num(d, units = "seconds since 1970-1-1",
calendar = 'standard')
var_n = 0
for var in self.nams:
#distinguish forecast data to deal with accumulated fields
if var in self.accumulated:
vpre = 0 #initial for subtraction
for s in self.step:
sel = grbindx.select(name = var, level = l,
dataDate = int(d.strftime("%Y%m%d")),
dataTime = d.hour * 100, step = s)
#assign dimensions
times[:] = self.ndate
levels[:] = self.levs
latitudes[:] = self.lats
longitudes[:] = self.lons
#make actual variables
variables = []
for var in self.nams:
# isolate from [u'Geopotential']
variables.append(ncd_root.createVariable(var,'f4',
('time','level',
'lat','lon',)))
#read file, get levels and times
grbindx = pg.index(self.file_grib,'name','level','dataDate',
'dataTime','step')
levs = np.array(self.levs)
for l in self.levs:
for d in self.jday:
nd = nc.date2num(d, units = "seconds since 1970-1-1",
calendar = 'standard')
var_n = 0
for var in self.nams:
#distinguish forecast data to deal with accumulated fields
if var in self.accumulated:
vpre = 0 #initial for subtraction
for s in self.step:
sel = grbindx.select(name = var, level = l,
dataDate = int(d.strftime("%Y%m%d")),
dataTime = d.hour * 100, step = s)