Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# total primary key length has an upper limit in MySQL
key_size = self._MAX_PRIMARY_KEY_LENGTH // len(keys)
key_type = f'VARCHAR({key_size})'
with pymysql.connect(host=self._db_args.host, user=self._db_args.user,
password=self._db_args.password, port=self._db_args.port,
read_timeout=self.DB_CONNECTION_TIMEOUT,
write_timeout=self.DB_CONNECTION_TIMEOUT,
binary_prefix=True, charset='utf8mb4') as con:
con.execute(f'CREATE DATABASE {self._db_args.db}')
with self._connect(check=False):
cursor = self._cursor
cursor.execute(f'CREATE TABLE terracotta (version VARCHAR(255)) '
f'CHARACTER SET {self._CHARSET}')
cursor.execute('INSERT INTO terracotta VALUES (%s)', [str(__version__)])
cursor.execute(f'CREATE TABLE key_names (key_name {key_type}, '
f'description VARCHAR(8000)) CHARACTER SET {self._CHARSET}')
key_rows = [(key, key_descriptions[key]) for key in keys]
cursor.executemany('INSERT INTO key_names VALUES (%s, %s)', key_rows)
key_string = ', '.join([f'{key} {key_type}' for key in keys])
cursor.execute(f'CREATE TABLE datasets ({key_string}, filepath VARCHAR(8000), '
f'PRIMARY KEY({", ".join(keys)})) CHARACTER SET {self._CHARSET}')
column_string = ', '.join(f'{col} {col_type}' for col, col_type
in self._METADATA_COLUMNS)
cursor.execute(f'CREATE TABLE metadata ({key_string}, {column_string}, '
f'PRIMARY KEY ({", ".join(keys)})) CHARACTER SET {self._CHARSET}')
# invalidate key cache
if use_chunks:
logger.debug(
f'Computing metadata for file {raster_path} using more than '
f'{RasterDriver._LARGE_RASTER_THRESHOLD // 10**6}M pixels, iterating '
'over chunks'
)
if use_chunks and not has_crick:
warnings.warn(
'Processing a large raster file, but crick failed to import. '
'Reading whole file into memory instead.', exceptions.PerformanceWarning
)
use_chunks = False
if use_chunks:
raster_stats = RasterDriver._compute_image_stats_chunked(src)
else:
raster_stats = RasterDriver._compute_image_stats(src, max_shape)
if raster_stats is None:
raise ValueError(f'Raster file {raster_path} does not contain any valid data')
row_data.update(raster_stats)
row_data['bounds'] = bounds
row_data['metadata'] = extra_metadata
return row_data
f'Computing metadata for file {raster_path} using more than '
f'{RasterDriver._LARGE_RASTER_THRESHOLD // 10**6}M pixels, iterating '
'over chunks'
)
if use_chunks and not has_crick:
warnings.warn(
'Processing a large raster file, but crick failed to import. '
'Reading whole file into memory instead.', exceptions.PerformanceWarning
)
use_chunks = False
if use_chunks:
raster_stats = RasterDriver._compute_image_stats_chunked(src)
else:
raster_stats = RasterDriver._compute_image_stats(src, max_shape)
if raster_stats is None:
raise ValueError(f'Raster file {raster_path} does not contain any valid data')
row_data.update(raster_stats)
row_data['bounds'] = bounds
row_data['metadata'] = extra_metadata
return row_data
exceptions.PerformanceWarning, stacklevel=3
)
with rasterio.open(raster_path) as src:
if src.nodata is None and not cls._has_alpha_band(src):
warnings.warn(
f'Raster file {raster_path} does not have a valid nodata value, '
'and does not contain an alpha band. No data will be masked.'
)
bounds = warp.transform_bounds(
src.crs, 'epsg:4326', *src.bounds, densify_pts=21
)
if use_chunks is None and max_shape is None:
use_chunks = src.width * src.height > RasterDriver._LARGE_RASTER_THRESHOLD
if use_chunks:
logger.debug(
f'Computing metadata for file {raster_path} using more than '
f'{RasterDriver._LARGE_RASTER_THRESHOLD // 10**6}M pixels, iterating '
'over chunks'
)
if use_chunks and not has_crick:
warnings.warn(
'Processing a large raster file, but crick failed to import. '
'Reading whole file into memory instead.', exceptions.PerformanceWarning
)
use_chunks = False
if use_chunks:
def test_auto_detect(driver_path, provider):
from terracotta import drivers
db = drivers.get_driver(driver_path)
assert db.__class__.__name__ == DRIVER_CLASSES[provider]
assert drivers.get_driver(driver_path, provider=provider) is db
def test_connect_before_create(driver_path, provider):
from terracotta import drivers, exceptions
db = drivers.get_driver(driver_path, provider=provider)
with pytest.raises(exceptions.InvalidDatabaseError):
with db.connect():
pass
def test_invalid_group_insertion(monkeypatch, driver_path, provider, raster_file):
from terracotta import drivers
db = drivers.get_driver(driver_path, provider=provider)
keys = ('keyname',)
db.create(keys)
def throw(*args, **kwargs):
raise NotImplementedError()
with monkeypatch.context() as m:
m.setattr(db, 'compute_metadata', throw)
with db.connect():
db.insert(['bar'], str(raster_file), skip_metadata=True)
with pytest.raises(NotImplementedError):
db.insert(['foo'], str(raster_file), skip_metadata=False)
def test_path_override(driver_path, provider, raster_file):
from terracotta import drivers
db = drivers.get_driver(driver_path, provider=provider)
keys = ('some', 'keynames')
key_value = ('some', 'value')
bogus_path = 'foo'
db.create(keys)
db.insert(key_value, str(raster_file), override_path=bogus_path)
assert db.get_datasets()[key_value] == bogus_path
with pytest.raises(IOError) as exc:
# overridden path doesn't exist
db.get_raster_tile(key_value)
assert bogus_path in exc.value
def test_version_conflict(tmpdir, raster_file, monkeypatch):
from terracotta import drivers, exceptions
dbfile = tmpdir.join('test.sqlite')
db = drivers.get_driver(str(dbfile), provider='sqlite')
keys = ('some', 'keys')
db.create(keys)
db.insert(['some', 'value'], str(raster_file))
# works
with db.connect():
pass
with monkeypatch.context() as m:
fake_version = '0.0.0'
m.setattr('terracotta.drivers.sqlite.__version__', fake_version)
# works
with db.connect(check=False):
pass
def test_get_driver_invalid():
from terracotta import drivers
with pytest.raises(ValueError) as exc:
drivers.get_driver('', provider='foo')
assert 'Unknown database provider' in str(exc.value)