Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@requires_connection
@convert_exceptions('Could not retrieve datasets')
def get_datasets(self, where: Mapping[str, str] = None,
page: int = 0, limit: int = None) -> Dict[Tuple[str, ...], str]:
conn = self._connection
if limit is not None:
# explicitly cast to int to prevent SQL injection
page_fragment = f'LIMIT {int(limit)} OFFSET {int(page) * int(limit)}'
else:
page_fragment = ''
# sort by keys to ensure deterministic results
order_fragment = f'ORDER BY {", ".join(self.key_names)}'
if where is None:
rows = conn.execute(f'SELECT * FROM datasets {order_fragment} {page_fragment}')
@requires_connection
@convert_exceptions('Could not retrieve datasets')
def get_datasets(self, where: Mapping[str, str] = None,
page: int = 0, limit: int = None) -> Dict[Tuple[str, ...], str]:
cursor = self._cursor
if limit is not None:
# explicitly cast to int to prevent SQL injection
page_fragment = f'LIMIT {int(limit)} OFFSET {int(page) * int(limit)}'
else:
page_fragment = ''
# sort by keys to ensure deterministic results
order_fragment = f'ORDER BY {", ".join(self.key_names)}'
if where is None:
cursor.execute(f'SELECT * FROM datasets {order_fragment} {page_fragment}')
@requires_connection
@convert_exceptions('Could not write to database')
def delete(self, keys: Union[Sequence[str], Mapping[str, str]]) -> None:
conn = self._connection
if len(keys) != len(self.key_names):
raise exceptions.InvalidKeyError(
f'Got wrong number of keys (available keys: {self.key_names})'
)
keys = self._key_dict_to_sequence(keys)
key_dict = dict(zip(self.key_names, keys))
if not self.get_datasets(key_dict):
raise exceptions.DatasetNotFoundError(f'No dataset found with keys {keys}')
where_string = ' AND '.join([f'{key}=?' for key in self.key_names])
@requires_connection
@convert_exceptions(_ERROR_ON_CONNECT)
def _get_db_version(self) -> str:
"""Terracotta version used to create the database"""
conn = self._connection
db_row = conn.execute('SELECT version from terracotta').fetchone()
return db_row['version']
@requires_connection
@convert_exceptions('Could not write to database')
def insert(self,
keys: Union[Sequence[str], Mapping[str, str]],
filepath: str, *,
metadata: Mapping[str, Any] = None,
skip_metadata: bool = False,
override_path: str = None) -> None:
cursor = self._cursor
if len(keys) != len(self.key_names):
raise exceptions.InvalidKeyError(
f'Got wrong number of keys (available keys: {self.key_names})'
)
if override_path is None:
override_path = filepath
@requires_connection
@convert_exceptions('Could not retrieve metadata')
def get_metadata(self, keys: Union[Sequence[str], Mapping[str, str]]) -> Dict[str, Any]:
keys = tuple(self._key_dict_to_sequence(keys))
if len(keys) != len(self.key_names):
raise exceptions.InvalidKeyError(
f'Got wrong number of keys (available keys: {self.key_names})'
)
conn = self._connection
where_string = ' AND '.join([f'{key}=?' for key in self.key_names])
row = conn.execute(f'SELECT * FROM metadata WHERE {where_string}', keys).fetchone()
if not row: # support lazy loading
filepath = self.get_datasets(dict(zip(self.key_names, keys)), page=0, limit=1)
@requires_connection
@convert_exceptions('Could not write to database')
def insert(self,
keys: Union[Sequence[str], Mapping[str, str]],
filepath: str, *,
metadata: Mapping[str, Any] = None,
skip_metadata: bool = False,
override_path: str = None) -> None:
conn = self._connection
if len(keys) != len(self.key_names):
raise exceptions.InvalidKeyError(
f'Got wrong number of keys (available keys: {self.key_names})'
)
if override_path is None:
override_path = filepath
@requires_connection
@convert_exceptions('Could not retrieve keys from database')
def get_keys(self) -> OrderedDict:
conn = self._connection
key_rows = conn.execute('SELECT * FROM keys')
out: OrderedDict = OrderedDict()
for row in key_rows:
out[row['key']] = row['description']
return out
@requires_connection
def get_raster_tile(self,
keys: Union[Sequence[str], Mapping[str, str]], *,
tile_bounds: Sequence[float] = None,
tile_size: Sequence[int] = None,
preserve_values: bool = False,
asynchronous: bool = False) -> Any:
# This wrapper handles cache interaction and asynchronous tile retrieval.
# The real work is done in _get_raster_tile.
future: Future[np.ma.MaskedArray]
result: np.ma.MaskedArray
settings = get_settings()
key_tuple = tuple(self._key_dict_to_sequence(keys))
datasets = self.get_datasets(dict(zip(self.key_names, key_tuple)))
assert len(datasets) == 1
@requires_connection
@convert_exceptions('Could not retrieve keys from database')
def _get_keys(self) -> OrderedDict:
out: OrderedDict = OrderedDict()
cursor = self._cursor
cursor.execute('SELECT * FROM key_names')
key_rows = cursor.fetchall() or ()
for row in key_rows:
out[row['key_name']] = row['description']
return out