Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_xray_tracing(caplog):
from terracotta import update_settings
import terracotta.profile
update_settings(XRAY_PROFILE=True)
try:
@terracotta.profile.trace('dummy')
def func_to_trace():
time.sleep(0.1)
with XRaySegment():
func_to_trace()
with XRaySegment():
with terracotta.profile.trace('dummy2'):
time.sleep(0.1)
for record in caplog.records:
assert record.levelname != 'ERROR'
# sanity check, recording without starting a segment should fail
func_to_trace()
assert any(
('cannot find the current segment' in record.message)
and (record.levelname == 'ERROR') for record in caplog.records
)
finally:
update_settings(XRAY_PROFILE=False)
@terracotta.profile.trace('dummy')
def func_to_trace():
time.sleep(0.1)
@trace('get_datasets')
@requires_connection
@convert_exceptions('Could not retrieve datasets')
def get_datasets(self, where: Mapping[str, str] = None,
page: int = 0, limit: int = None) -> Dict[Tuple[str, ...], str]:
conn = self._connection
if limit is not None:
# explicitly cast to int to prevent SQL injection
page_fragment = f'LIMIT {int(limit)} OFFSET {int(page) * int(limit)}'
else:
page_fragment = ''
# sort by keys to ensure deterministic results
order_fragment = f'ORDER BY {", ".join(self.key_names)}'
if where is None:
@trace('compute_metadata')
def compute_metadata(cls, raster_path: str, *, # type: ignore[override] # noqa: F821
extra_metadata: Any = None,
use_chunks: bool = None,
max_shape: Sequence[int] = None) -> Dict[str, Any]:
"""Read given raster file and compute metadata from it.
This handles most of the heavy lifting during raster ingestion. The returned metadata can
be passed directly to :meth:`insert`.
Arguments:
raster_path: Path to GDAL-readable raster file
extra_metadata: Any additional metadata to attach to the dataset. Will be
JSON-serialized and returned verbatim by :meth:`get_metadata`.
use_chunks: Whether to process the image in chunks (slower, but uses less memory).
If not given, use chunks for large images only.
@trace('get_raster_tile')
def _get_raster_tile(cls, path: str, *,
reprojection_method: str,
resampling_method: str,
tile_bounds: Tuple[float, float, float, float] = None,
tile_size: Tuple[int, int] = (256, 256),
preserve_values: bool = False) -> np.ma.MaskedArray:
"""Load a raster dataset from a file through rasterio.
Heavily inspired by mapbox/rio-tiler
"""
import rasterio
from rasterio import transform, windows, warp
from rasterio.vrt import WarpedVRT
from affine import Affine
dst_bounds: Tuple[float, float, float, float]
@trace('array_to_png')
def array_to_png(img_data: Array,
colormap: Union[str, Palette, None] = None) -> BinaryIO:
"""Encode an 8bit array as PNG"""
from terracotta.cmaps import get_cmap
transparency: Union[Tuple[int, int, int], int, bytes]
settings = get_settings()
compress_level = settings.PNG_COMPRESS_LEVEL
if img_data.ndim == 3: # encode RGB image
if img_data.shape[-1] != 3:
raise ValueError('3D input arrays must have three bands')
if colormap is not None:
raise ValueError('Colormap argument cannot be given for multi-band data')
@trace()
def colormaps() -> List[str]:
"""Return all supported colormaps"""
from terracotta.cmaps import AVAILABLE_CMAPS
return list(sorted(AVAILABLE_CMAPS))
# remove padding in output
out_window = windows.Window(
col_off=num_pad_pixels, row_off=num_pad_pixels, width=dst_width, height=dst_height
)
# construct VRT
vrt = es.enter_context(
WarpedVRT(
src, crs=cls._TARGET_CRS, resampling=reproject_enum,
transform=vrt_transform, width=vrt_width, height=vrt_height,
add_alpha=not cls._has_alpha_band(src)
)
)
# read data
with warnings.catch_warnings(), trace('read_from_vrt'):
warnings.filterwarnings('ignore', message='invalid value encountered.*')
tile_data = vrt.read(
1, resampling=resampling_enum, window=out_window, out_shape=tile_size
)
# assemble alpha mask
mask_idx = vrt.count
mask = vrt.read(mask_idx, window=out_window, out_shape=tile_size) == 0
if src.nodata is not None:
mask |= tile_data == src.nodata
return np.ma.masked_array(tile_data, mask=mask)
@trace('get_metadata')
@requires_connection
@convert_exceptions('Could not retrieve metadata')
def get_metadata(self, keys: Union[Sequence[str], Mapping[str, str]]) -> Dict[str, Any]:
keys = tuple(self._key_dict_to_sequence(keys))
if len(keys) != len(self.key_names):
raise exceptions.InvalidKeyError(
f'Got wrong number of keys (available keys: {self.key_names})'
)
conn = self._connection
where_string = ' AND '.join([f'{key}=?' for key in self.key_names])
row = conn.execute(f'SELECT * FROM metadata WHERE {where_string}', keys).fetchone()
if not row: # support lazy loading
@trace('datasets_handler')
def datasets(some_keys: Mapping[str, str] = None,
page: int = 0, limit: int = 500) -> 'List[OrderedDict[str, str]]':
"""List all available key combinations"""
settings = get_settings()
driver = get_driver(settings.DRIVER_PATH, provider=settings.DRIVER_PROVIDER)
with driver.connect():
dataset_keys = driver.get_datasets(
where=some_keys, page=page, limit=limit
).keys()
key_names = driver.key_names
return [OrderedDict(zip(key_names, ds_keys)) for ds_keys in dataset_keys]