Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def process_one(self, *args, **kwargs) -> PackType:
r"""Process one single data pack. This is done by only reading and
processing the first pack in the reader.
Args:
kwargs: the information needed to load the data. For example, if
:attr:`_reader` is :class:`StringReader`, this should contain a
single piece of text in the form of a string variable. If
:attr:`_reader` is a file reader, this can point to the file
path.
"""
if not self.initialized:
raise ProcessFlowException(
"Please call initialize before running the pipeline")
first_pack = []
for p in self._reader.iter(*args, **kwargs):
first_pack.append(p)
break
if len(first_pack) == 1:
results = list(self._process_packs(iter(first_pack)))
return results[0]
else:
raise ValueError("Input data source contains no packs.")
# |___________| |_______________| |_J1:UNPROCESSED_|
#
#
# After 5th step (iteration),
#
# batch_size = 2 batch_size = 2
# Reader -> B1 (BatchProcessor) -> P1 (PackProcessor) -> B2(BatchProcessor)
#
# |___________| |_______________| |_______________|
# |___________| |_______________| |_______________|
# |___________| |_______________| |_______________| --> Yield J1.pack and J2.pack
# |___________| |_______________| |_______________|
# |___________| |_______________| |_______________|
if not self.initialized:
raise ProcessFlowException(
"Please call initialize before running the pipeline")
buffer = ProcessBuffer(self, data_iter)
if len(self.components) == 0:
yield from data_iter
# Write return here instead of using if..else to reduce indent.
return
while not self._proc_mgr.exhausted():
# job has to be the first UNPROCESSED element
# the status of the job now is UNPROCESSED
unprocessed_job: ProcessJob = next(buffer)
processor_index = self._proc_mgr.current_processor_index
processor = self.components[processor_index]
we will have memory issues.
Args:
pack_id: The pack id that points to the pack to be de-referenced.
Returns:
"""
if pack_id not in self.pack_references:
# This can happen when the instance is reset by the pipeline.
return
if self.pack_references[pack_id] < 0:
# I am not sure if there are cases that can deduct the reference
# count too much, but we'd put a check here just in case.
raise ProcessFlowException(
f"Pack reference count for pack [{pack_id}] is only "
f"{self.pack_references[pack_id]},"
f" which is invalid.")
# Reduce the reference count.
self.pack_references[pack_id] -= 1
# If the reference count reaches 0, then we can remove the pack from
# the pool and allow Python to garbage collect it.
if self.pack_references[pack_id] == 0:
self.pack_pool.pop(pack_id)
def add(self, component: PipelineComponent,
config: Optional[Union[Config, Dict[str, Any]]] = None,
selector: Optional[Selector] = None):
self._processors_index[component.name] = len(self.components)
if isinstance(component, BaseReader):
raise ProcessFlowException("Reader need to be set via set_reader()")
if isinstance(component, Evaluator):
# This will ask the job to keep a copy of the gold standard.
self.evaluator_indices.append(len(self.components))
component.assign_manager(self._proc_mgr, self._pack_manager)
self._components.append(component)
self.processor_configs.append(component.make_configs(config))
if selector is None:
self._selectors.append(DummySelector())
else:
self._selectors.append(selector)