Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""Base format classes."""
# InputData
tp = TilePyramid("geodetic")
tmp = base.InputData(dict(pyramid=tp, pixelbuffer=0))
assert tmp.pyramid
assert tmp.pixelbuffer == 0
assert tmp.crs
with pytest.raises(NotImplementedError):
tmp.open(None)
with pytest.raises(NotImplementedError):
tmp.bbox()
with pytest.raises(NotImplementedError):
tmp.exists()
# InputTile
tmp = base.InputTile(None)
with pytest.raises(NotImplementedError):
tmp.read()
with pytest.raises(NotImplementedError):
tmp.is_empty()
# OutputDataWriter
tmp = base.OutputDataWriter(dict(pixelbuffer=0, grid="geodetic", metatiling=1))
assert tmp.pyramid
assert tmp.pixelbuffer == 0
assert tmp.crs
with pytest.raises(NotImplementedError):
tmp.read(None)
with pytest.raises(NotImplementedError):
tmp.write(None, None)
with pytest.raises(NotImplementedError):
tmp.is_valid_with_config(None)
elif len(data) == 2:
rgba = np.stack((data[0], data[0], data[0], data[1]))
elif len(data) == 3:
rgba = np.stack((
data[0], data[1], data[2], np.where(
data[0].data == self.output_params["nodata"], 0, 255
).astype("uint8", copy=False)
))
elif len(data) == 4:
rgba = np.array(data).astype("uint8", copy=False)
else:
raise TypeError("invalid number of bands: %s" % len(data))
return rgba
class OutputDataWriter(base.OutputDataWriter, OutputDataReader):
METADATA = METADATA
def write(self, process_tile, data):
"""
Write data from one or more process tiles.
Parameters
----------
process_tile : ``BufferedTile``
must be member of process ``TilePyramid``
"""
rgba = self._prepare_array_for_png(data)
data = ma.masked_where(rgba == self.output_params["nodata"], rgba)
if data.mask.all():
)
else:
return out_bbox
def exists(self):
"""
Check if data or file even exists.
Returns
-------
file exists : bool
"""
return os.path.isfile(self.path) # pragma: no cover
class InputTile(base.InputTile):
"""
Target Tile representation of input data.
Parameters
----------
tile : ``Tile``
kwargs : keyword arguments
driver specific parameters
Attributes
----------
tile : tile : ``Tile``
raster_file : ``InputData``
parent InputData object
resampling : string
resampling method passed on to rasterio
return [
(_tile, _path)
for _tile, _path in [
(
t,
"%s.%s" % (
os.path.join(*([basepath, str(t.zoom), str(t.row), str(t.col)])), ext
)
)
for t in pyramid.tiles_from_bounds(bounds, zoom)
]
if path_exists(_path)
]
class InputTile(base.InputTile):
"""
Target Tile representation of input data.
Parameters
----------
tile : ``Tile``
kwargs : keyword arguments
driver specific parameters
Attributes
----------
tile : tile : ``Tile``
"""
def __init__(
self,
rasterio CRS object (default: CRS of process pyramid)
Returns
-------
bounding box : geometry
Shapely geometry object
"""
out_crs = self.pyramid.crs if out_crs is None else out_crs
with fiona.open(self.path) as inp:
inp_crs = CRS(inp.crs)
bbox = box(*inp.bounds)
# TODO find a way to get a good segmentize value in bbox source CRS
return reproject_geometry(bbox, src_crs=inp_crs, dst_crs=out_crs)
class InputTile(base.InputTile):
"""
Target Tile representation of input data.
Parameters
----------
tile : ``Tile``
kwargs : keyword arguments
driver specific parameters
Attributes
----------
tile : tile : ``Tile``
vector_file : string
path to input vector file
"""
"""
return list(data), "application/json"
def open(self, tile, process):
"""
Open process output as input for other process.
Parameters
----------
tile : ``Tile``
process : ``MapcheteProcess``
"""
return InputTile(tile, process)
class OutputDataWriter(base.TileDirectoryOutputWriter, OutputDataReader):
METADATA = METADATA
def write(self, process_tile, data):
"""
Write data from process tiles into GeoJSON file(s).
Parameters
----------
process_tile : ``BufferedTile``
must be member of process ``TilePyramid``
"""
if data is None or len(data) == 0:
return
if not isinstance(data, (list, types.GeneratorType)): # pragma: no cover
raise TypeError(
try:
if "compression" in self.output_params:
warnings.warn(
DeprecationWarning("use 'compress' instead of 'compression'")
)
dst_metadata.update(compress=self.output_params["compression"])
else:
dst_metadata.update(compress=self.output_params["compress"])
dst_metadata.update(predictor=self.output_params["predictor"])
except KeyError:
pass
return dst_metadata
class GTiffTileDirectoryOutputWriter(
GTiffTileDirectoryOutputReader, base.TileDirectoryOutputWriter
):
def write(self, process_tile, data):
"""
Write data from process tiles into GeoTIFF file(s).
Parameters
----------
process_tile : ``BufferedTile``
must be member of process ``TilePyramid``
data : ``np.ndarray``
"""
if (
isinstance(data, tuple) and
len(data) == 2 and
isinstance(data[1], dict)
):
# Convert from process_tile to output_tiles
for tile in self.pyramid.intersecting(process_tile):
out_path = self.get_path(tile)
self.prepare_path(tile)
out_tile = BufferedTile(tile, self.pixelbuffer)
write_vector_window(
in_data=data,
out_schema=self.output_params["schema"],
out_tile=out_tile,
out_path=out_path,
bucket_resource=bucket_resource
)
class InputTile(base.InputTile):
"""
Target Tile representation of input data.
Parameters
----------
tile : ``Tile``
process : ``MapcheteProcess``
Attributes
----------
tile : ``Tile``
process : ``MapcheteProcess``
"""
def __init__(self, tile, process):
"""Initialize."""
empty array with correct data type for raster data or empty list
for vector data
"""
return ma.masked_values(np.zeros(process_tile.shape), 0)
def _prepare_array(self, data):
data = prepare_array(-(data - 255), dtype="uint8", masked=False, nodata=0)[0]
zeros = np.zeros(data.shape)
if self.old_band_num:
data = np.stack([zeros, zeros, zeros, data])
else:
data = np.stack([zeros, data])
return prepare_array(data, dtype="uint8", masked=True, nodata=255)
class OutputDataWriter(base.OutputDataWriter, OutputDataReader):
METADATA = METADATA
def write(self, process_tile, data):
"""
Write data from process tiles into PNG file(s).
Parameters
----------
process_tile : ``BufferedTile``
must be member of process ``TilePyramid``
"""
data = self._prepare_array(data)
if data.mask.all(): # pragma: no cover
logger.debug("data empty, nothing to write")
out_path = self.get_path(tile)
self.prepare_path(tile)
out_tile = BufferedTile(tile, self.pixelbuffer)
write_raster_window(
in_tile=process_tile,
in_data=data,
out_profile=self.profile(out_tile),
out_tile=out_tile,
out_path=out_path,
tags=tags,
bucket_resource=bucket_resource
)
class GTiffSingleFileOutputWriter(
GTiffOutputReaderFunctions, base.SingleFileOutputWriter
):
write_in_parent_process = True
def __init__(self, output_params, **kwargs):
"""Initialize."""
logger.debug("output is single file")
self.rio_file = None
super().__init__(output_params, **kwargs)
self._set_attributes(output_params)
if len(self.output_params["delimiters"]["zoom"]) != 1:
raise ValueError("single file output only works with one zoom level")
self.zoom = output_params["delimiters"]["zoom"][0]
if "overviews" in output_params:
self.overviews = True
self.overviews_resampling = output_params.get(