Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _get_features(self, service):
service = self._layers()[service]
features = util.to_geodataframe(
ned.get_raster_availability(service, (-180, -90, 180, 90))
)
if features.empty:
return features
features['parameters'] = 'elevation'
features['file_format'] = 'raster-gdal'
features['filename'] = features['download url'].apply(lambda x: x.split('/')[-1])
columns = {
'name': 'display_name',
'download url': 'download_url',
'format': 'extract_from_zip',
}
features['reserved'] = features['download url'].apply(lambda x: {'download_url': x, 'file_format': 'raster-gdal','extract_from_zip': '.DEM'})
return features.rename(columns=columns)
def search_catalog(self, **kwargs):
service = self._description
catalog_entries = util.to_geodataframe(
ned.get_raster_availability(service, (-180, -90, 180, 90))
)
if catalog_entries.empty:
return catalog_entries
catalog_entries['parameters'] = 'elevation'
catalog_entries['filename'] = catalog_entries['download url'].apply(lambda x: x.split('/')[-1])
catalog_entries['reserved'] = catalog_entries.apply(
lambda x: {'download_url': x['download url'],
'filename': x['filename'],
'file_format': 'raster-gdal',
'extract_from_zip': '.img',
}, axis=1)
catalog_entries.drop(labels=['filename', 'download url', 'format'], axis=1, inplace=True)
return catalog_entries.rename(columns={'name': 'display_name'})
parameter = p.parameter
start = p.start
end = p.end
period = p.period
if dataset is None:
dataset = 'station-' + catalog_id
if start and end:
period = None
pmap = self.parameter_map(invert=True)
parameter_code, statistic_code = (pmap[parameter].split(':') + [None])[:2]
data = nwis.get_site_data(catalog_id,
parameter_code=parameter_code,
statistic_code=statistic_code,
start=start, end=end, period=period,
service=self.service_name)
# dict contains only one key since only one parameter/statistic was
# downloaded, this would need to be changed if multiple
# parameter/stat were downloaded together
if not data:
raise ValueError('No Data Available')
data = list(data.values())[0]
# convert to dataframe and cleanup bad data
df = pd.DataFrame(data['values'])
if df.empty:
def data(self):
data = ghcn_daily.get_data(self.catalog_entry,
elements=self.parameter_code,
as_dataframe=True) # [parameter_code]
if not data or data[self.parameter_code].empty:
raise ValueError('No Data Available')
data = data[self.parameter_code]
data = data[self.start_string:self.end_string]
if data.empty:
raise ValueError('No Data Available')
data.rename(columns={'value': self.parameter}, inplace=True)
return data
def _climo_wind(config, dates=None):
"""
Fetches climatological wind data using ulmo package to retrieve NCDC archives.
:param config:
:param dates: list of datetime objects
:return: dict: dictionary of wind values
"""
import ulmo
if config['verbose']:
print('_climo_wind: fetching data from NCDC (may take a while)...')
v = 'WSF2'
wind_dict = {}
D = ulmo.ncdc.ghcn_daily.get_data(get_ghcn_stid(config), as_dataframe=True, elements=[v])
if dates is None:
dates = list(D[v].index.to_timestamp().to_pydatetime())
for date in dates:
wind_dict[date] = {'wind': D[v].loc[date]['value'] / 10. * 1.94384}
return wind_dict
def _download_file(self, path, url, tile_fmt, filename, check_modified=False):
os.makedirs(path, exist_ok=True)
os.makedirs(os.path.join(path, 'zip'), exist_ok=True)
tile_path = os.path.join(path, filename)
util.logger.info('... downloading %s' % url)
if tile_fmt == '':
ulmo.util.download_if_new(url, tile_path, check_modified=check_modified)
else:
zip_path = os.path.join(path, 'zip', filename)
ulmo.util.download_if_new(url, zip_path, check_modified=check_modified)
util.logger.info('... ... zipfile saved at %s' % zip_path)
tile_path = ulmo.util.extract_from_zip(zip_path, tile_path, tile_fmt)
return tile_path
def _download_file(self, path, url, tile_fmt, filename, check_modified=False):
os.makedirs(path, exist_ok=True)
os.makedirs(os.path.join(path, 'zip'), exist_ok=True)
tile_path = os.path.join(path, filename)
util.logger.info('... downloading %s' % url)
if tile_fmt == '':
ulmo.util.download_if_new(url, tile_path, check_modified=check_modified)
else:
zip_path = os.path.join(path, 'zip', filename)
ulmo.util.download_if_new(url, zip_path, check_modified=check_modified)
util.logger.info('... ... zipfile saved at %s' % zip_path)
tile_path = ulmo.util.extract_from_zip(zip_path, tile_path, tile_fmt)
return tile_path
def _search_catalog(self, **kwargs):
return ghcn_daily.get_stations(as_dataframe=True)
if locations is not None:
try:
with open(self.cache_file) as f:
metadata = json.load(f)
except:
metadata = self.get_locations()
selected = [feature for feature in metadata['features'] if feature['id'] in locations]
return FeatureCollection(selected)
if bounding_box is None:
bounding_box = self.metadata['bounding_boxes'][0]
bbox = [float(p) for p in bounding_box]
locations = eros.get_raster_availability(self.product_key, bbox)
if os.path.exists(self.cache_file):
existing = json.load(open(self.cache_file))
locations = util.append_features(existing, locations)
with open(self.cache_file, 'w') as f:
dump(locations, f)
return locations
def _nwis_parameters(site, service):
return {site: list(nwis.get_site_data(site, service=service).keys())}