Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
db.insert({'some': 'a', 'keynames': 'third_value'}, str(raster_file))
data = db.get_datasets()
assert len(data) == 3
data = db.get_datasets(where=dict(some='some'))
assert len(data) == 2
data = db.get_datasets(where=dict(some='some', keynames='value'))
assert list(data.keys()) == [('some', 'value')]
assert data[('some', 'value')] == str(raster_file)
data = db.get_datasets(where=dict(some='unknown'))
assert data == {}
with pytest.raises(exceptions.InvalidKeyError) as exc:
db.get_datasets(where=dict(unknown='foo'))
assert 'unrecognized keys' in str(exc.value)
from terracotta import drivers, exceptions
db = drivers.get_driver(driver_path, provider=provider)
keys = ('keyname',)
db.create(keys)
with pytest.raises(exceptions.InvalidKeyError) as exc:
db.get_metadata(['a', 'b'])
assert 'wrong number of keys' in str(exc.value)
with pytest.raises(exceptions.InvalidKeyError) as exc:
db.insert(['a', 'b'], '')
assert 'wrong number of keys' in str(exc.value)
with pytest.raises(exceptions.InvalidKeyError) as exc:
db.delete(['a', 'b'])
assert 'wrong number of keys' in str(exc.value)
def test_creation_invalid_description(driver_path, provider):
from terracotta import drivers, exceptions
db = drivers.get_driver(driver_path, provider=provider)
keys = ('some', 'keynames')
with pytest.raises(exceptions.InvalidKeyError) as exc:
db.create(keys, key_descriptions={'unknown_key': 'blah'})
assert 'contains unknown keys' in str(exc.value)
conn = self._connection
if limit is not None:
# explicitly cast to int to prevent SQL injection
page_fragment = f'LIMIT {int(limit)} OFFSET {int(page) * int(limit)}'
else:
page_fragment = ''
# sort by keys to ensure deterministic results
order_fragment = f'ORDER BY {", ".join(self.key_names)}'
if where is None:
rows = conn.execute(f'SELECT * FROM datasets {order_fragment} {page_fragment}')
else:
if not all(key in self.key_names for key in where.keys()):
raise exceptions.InvalidKeyError('Encountered unrecognized keys in '
'where clause')
where_fragment = ' AND '.join([f'{key}=?' for key in where.keys()])
rows = conn.execute(
f'SELECT * FROM datasets WHERE {where_fragment} {order_fragment} {page_fragment}',
list(where.values())
)
def keytuple(row: Dict[str, Any]) -> Tuple[str, ...]:
return tuple(row[key] for key in self.key_names)
return {keytuple(row): row['filepath'] for row in rows}
def delete(self, keys: Union[Sequence[str], Mapping[str, str]]) -> None:
conn = self._connection
if len(keys) != len(self.key_names):
raise exceptions.InvalidKeyError(
f'Got wrong number of keys (available keys: {self.key_names})'
)
keys = self._key_dict_to_sequence(keys)
key_dict = dict(zip(self.key_names, keys))
if not self.get_datasets(key_dict):
raise exceptions.DatasetNotFoundError(f'No dataset found with keys {keys}')
where_string = ' AND '.join([f'{key}=?' for key in self.key_names])
conn.execute(f'DELETE FROM datasets WHERE {where_string}', keys)
conn.execute(f'DELETE FROM metadata WHERE {where_string}', keys)
cursor = self._cursor
if limit is not None:
# explicitly cast to int to prevent SQL injection
page_fragment = f'LIMIT {int(limit)} OFFSET {int(page) * int(limit)}'
else:
page_fragment = ''
# sort by keys to ensure deterministic results
order_fragment = f'ORDER BY {", ".join(self.key_names)}'
if where is None:
cursor.execute(f'SELECT * FROM datasets {order_fragment} {page_fragment}')
else:
if not all(key in self.key_names for key in where.keys()):
raise exceptions.InvalidKeyError('Encountered unrecognized keys in '
'where clause')
where_fragment = ' AND '.join([f'{key}=%s' for key in where.keys()])
cursor.execute(
f'SELECT * FROM datasets WHERE {where_fragment} {order_fragment} {page_fragment}',
list(where.values())
)
def keytuple(row: Dict[str, Any]) -> Tuple[str, ...]:
return tuple(row[key] for key in self.key_names)
datasets = {}
for row in cursor:
row = cast(Dict[str, Any], row)
datasets[keytuple(row)] = row['filepath']
return datasets
def insert(self,
keys: Union[Sequence[str], Mapping[str, str]],
filepath: str, *,
metadata: Mapping[str, Any] = None,
skip_metadata: bool = False,
override_path: str = None) -> None:
conn = self._connection
if len(keys) != len(self.key_names):
raise exceptions.InvalidKeyError(
f'Got wrong number of keys (available keys: {self.key_names})'
)
if override_path is None:
override_path = filepath
keys = self._key_dict_to_sequence(keys)
template_string = ', '.join(['?'] * (len(keys) + 1))
conn.execute(f'INSERT OR REPLACE INTO datasets VALUES ({template_string})',
[*keys, override_path])
if metadata is None and not skip_metadata:
metadata = self.compute_metadata(filepath)
if metadata is not None:
encoded_data = self._encode_data(metadata)
def _key_dict_to_sequence(self, keys: Union[Mapping[str, Any], Sequence[Any]]) -> List[Any]:
"""Convert {key_name: key_value} to [key_value] with the correct key order."""
try:
keys_as_mapping = cast(Mapping[str, Any], keys)
return [keys_as_mapping[key] for key in self.key_names]
except TypeError: # not a mapping
return list(keys)
except KeyError as exc:
raise exceptions.InvalidKeyError('Encountered unknown key') from exc