Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _send(data, path, mime_type, cache):
if cache['type'] == 's3': # pragma: no cover
client = tilecloud.store.s3.get_client(cache.get('host'))
key_name = os.path.join('{folder!s}'.format(**cache), path)
bucket = cache['bucket']
client.put_object(ACL='public-read', Body=data, Key=key_name, Bucket=bucket, ContentEncoding='utf-8',
ContentType=mime_type)
else:
if isinstance(data, str):
data = data.encode('utf-8')
folder = cache['folder'] or ''
filename = os.path.join(folder, path)
directory = os.path.dirname(filename)
if not os.path.exists(directory):
os.makedirs(directory)
with open(filename, 'wb') as f:
f.write(data)
def _get(path, cache):
if cache['type'] == 's3': # pragma: no cover
client = tilecloud.store.s3.get_client(cache.get('host'))
key_name = os.path.join('{folder!s}'.format(**cache), path)
bucket = cache['bucket']
response = client.get_object(Bucket=bucket, Key=key_name)
return response['Body'].read()
else:
p = os.path.join(cache['folder'], path)
if not os.path.isfile(p): # pragma: no cover
return None
with open(p, 'rb') as file:
return file.read()
def _get(self, path, **kwargs):
key_name = os.path.join('{folder}'.format(**self.cache), path)
try:
return self._read(key_name)
except Exception:
self.s3_client = tilecloud.store.s3.get_client(self.cache.get('host'))
return self._read(key_name)
self._read = types.MethodType(_read, self)
global tilegeneration
self.expires_hours = tilegeneration.config['server']['expires']
self.static_allow_extension = tilegeneration.config['server'].get(
'static_allow_extension', ['jpeg', 'png', 'xml', 'js', 'html', 'css']
)
self.cache = tilegeneration.caches[
tilegeneration.config['server'].get(
'cache', tilegeneration.config['generation']['default_cache']
)
]
if self.cache['type'] == 's3': # pragma: no cover
self.s3_client = tilecloud.store.s3.get_client(self.cache.get('host'))
bucket = self.cache['bucket']
def _read(self, key_name):
response = self.s3_client.get_object(Bucket=bucket, Key=key_name)
body = response['Body']
try:
return body.read(), response.get('ContentType')
finally:
body.close()
def _get(self, path, **kwargs):
key_name = os.path.join('{folder}'.format(**self.cache), path)
try:
return self._read(key_name)
except Exception:
self.s3_client = tilecloud.store.s3.get_client(self.cache.get('host'))