Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _get_cache_location(self, collection: Any) -> str:
r"""Gets the path to the cache file for a collection.
Args:
collection: information to compute cache key.
Returns (Path): file path to the cache file for a Pack.
"""
# pylint: disable=assignment-from-none
file_path = self._cache_key_function(collection)
if file_path is None:
raise ProcessExecutionException(
"Cache key is None. You probably set `from_cache` to true but "
"fail to implement the _cache_key_function")
return os.path.join(str(self._cache_directory), file_path)
try:
if isinstance(processor, Caster):
# Replacing the job pack with the casted version.
unprocessed_job.alter_pack(processor.cast(pack))
elif isinstance(processor, BaseProcessor):
processor.process(pack)
elif isinstance(processor, Evaluator):
processor.consume_next(
pack, self._predict_to_gold[unprocessed_job.id]
)
# After the component action, make sure the entry is
# added into the index.
pack.add_all_remaining_entries()
except ValueError as e:
raise ProcessExecutionException(
f'Exception occurred when running '
f'{processor.name}') from e
# Then, based on component type, handle the queue.
if isinstance(processor, BaseBatchProcessor):
index = unprocessed_queue_indices[current_queue_index]
# check status of all the jobs up to "index"
for i, job_i in enumerate(
itertools.islice(current_queue, 0,
index + 1)):
if job_i.status == ProcessJobStatus.PROCESSED:
processed_queue_indices[
current_queue_index] = i
def parse_pack(self, collection: Any) -> Iterator[PackType]:
r"""Calls :meth:`_parse_pack` to create packs from the collection.
This internally setup the component meta data. Users should implement
the :meth:`_parse_pack` method.
"""
if collection is None:
raise ProcessExecutionException(
"Got None collection, cannot parse as data pack.")
for p in self._parse_pack(collection):
p.add_all_remaining_entries(self.name)
yield p
def _parse_pack(self, data_source: str) -> Iterator[DataPack]:
if data_source is None:
raise ProcessExecutionException(
"Data source is None, cannot deserialize.")
# pack: DataPack = DataPack.deserialize(data_source)
pack: DataPack = deserialize(self._pack_manager, data_source)
if pack is None:
raise ProcessExecutionException(
f"Cannot recover pack from the following data source: \n"
f"{data_source}")
yield pack
def _parse_pack(self, data_source: str) -> Iterator[DataPack]:
if data_source is None:
raise ProcessExecutionException(
"Data source is None, cannot deserialize.")
# pack: DataPack = DataPack.deserialize(data_source)
pack: DataPack = deserialize(self._pack_manager, data_source)
if pack is None:
raise ProcessExecutionException(
f"Cannot recover pack from the following data source: \n"
f"{data_source}")
yield pack