Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
return
line = line.split('#')[0].strip()
if line != '':
try:
tilecoord = parse_tilecoord(line)
except ValueError as e: # pragma: no cover
logger.error("A tile '{}' is not in the format 'z/x/y' or z/x/y:+n/+n\n{}".format(
line, repr(e), exc_info=True)
)
continue
for dimensions in self.all_dimensions:
metadata = {'layer': self.layer}
for k, v in dimensions.items():
metadata["dimension_" + k] = v
yield Tile(tilecoord, metadata=metadata)
def __call__(self, tile):
if len(tile.data) != self.size or \
sha1(tile.data).hexdigest() != self.sha1code:
return tile
else:
if self.store is not None:
if tile.tilecoord.n != 1:
for tilecoord in tile.tilecoord:
self.store.delete_one(Tile(tilecoord, metadata=tile.metadata))
else:
self.store.delete_one(tile)
logger.info("The tile {} {} is dropped".format(tile.tilecoord, tile.formated_metadata))
if hasattr(tile, 'metatile'):
tile.metatile.elapsed_togenerate -= 1
if tile.metatile.elapsed_togenerate == 0 and self.queue_store is not None:
self.queue_store.delete_one(tile.metatile) # pragma: no cover
elif self.queue_store is not None: # pragma: no cover
self.queue_store.delete_one(tile)
if self.count:
self.count()
return None
def popleft(self):
sqs_message = self.queue.read(self.visibility_timeout)
if sqs_message is None:
return self.on_empty()
z = sqs_message.get('z')
x = sqs_message.get('x')
y = sqs_message.get('y')
metadata = sqs_message.get('metadata', {})
return Tile(TileCoord(z, x, y), sqs_message=sqs_message, **metadata)
def list(self):
for zipinfo in self.zipfile.infolist():
try:
yield Tile(self.layout.tilecoord(zipinfo.filename), zipinfo=zipinfo)
except ValueError:
pass
def list(self):
top = getattr(self.tilelayout, "prefix", ".")
for dirpath, _, filenames in os.walk(top):
for filename in filenames:
path = os.path.join(dirpath, filename)
tilecoord = self.tilelayout.tilecoord(path)
if tilecoord:
yield Tile(tilecoord, path=path)
continue
if metatile.data is None:
yield Tile(
tilecoord,
metadata=metatile.metadata,
error="Metatile data is None",
metatile=metatile,
)
continue
x = self.border + (tilecoord.x - metatile.tilecoord.x) * self.tile_size
y = self.border + (tilecoord.y - metatile.tilecoord.y) * self.tile_size
image = metaimage.crop((x, y, x + self.tile_size, y + self.tile_size))
string_io = BytesIO()
image.save(string_io, FORMAT_BY_CONTENT_TYPE[self.format])
yield Tile(
tilecoord,
data=string_io.getvalue(),
content_type=self.format,
metadata=metatile.metadata,
metatile=metatile,
)
def _tilestream(tilecoords, default_metadata, all_dimensions):
for tilecoord in tilecoords:
for dimensions in all_dimensions:
metadata = {}
if default_metadata is not None:
metadata.update(default_metadata)
for k, v in dimensions.items():
metadata["dimension_" + k] = v
yield Tile(tilecoord, metadata=metadata)
def list(self):
# FIXME warn that this consumes lines
filename_re = re.compile(self.tile_layout.pattern)
for line in self.lines:
match = filename_re.search(line)
if match:
yield Tile(self.tile_layout.tilecoord(match.group()), line=line)
def list(self):
return (Tile(tilecoord) for tilecoord in self.tiles)
def get(tiles):
for metatile in tiles:
for tilecoord in metatile.tilecoord:
yield Tile(tilecoord)
gene.tilestream = MetaTileSplitter().get(gene.tilestream)