Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# limitations under the License.
"""
Defines the Evaluator interface and related functions.
"""
from abc import abstractmethod
from typing import Any
from forte.data.base_pack import PackType
from forte.pipeline_component import PipelineComponent
__all__ = [
"Evaluator",
]
class Evaluator(PipelineComponent[PackType]):
r"""The base class of the evaluator."""
@abstractmethod
def consume_next(self, pred_pack: PackType, ref_pack: PackType):
r"""Consume the prediction pack and the reference pack to compute
evaluation results.
Args:
pred_pack: The prediction datapack, which should contain the system
predicted results.
ref_pack: The reference datapack, which should contain the reference
to score on.
"""
raise NotImplementedError
from abc import ABC
from typing import Optional
from forte.data.base_pack import PackType
from forte.data.data_pack import DataPack
from forte.data.multi_pack import MultiPack
from forte.processors.base.base_processor import BaseProcessor
__all__ = [
"BasePackProcessor",
"PackProcessor",
"MultiPackProcessor"
]
class BasePackProcessor(BaseProcessor[PackType], ABC):
r"""The base class of processors that process one pack sequentially. If you
are looking for batching (that might happen across packs, refer to
:class:`BaseBatchProcessor`.
"""
pass
class PackProcessor(BaseProcessor[DataPack], ABC):
r"""The base class of processors that process one :class:`DataPack` each
time.
"""
def _process(self, input_pack: DataPack):
raise NotImplementedError
def new_pack(self, pack_name: Optional[str] = None) -> DataPack:
def __init__(self,
pipeline: "Pipeline",
data_iter: Iterator[PackType],
):
self.__data_iter: Iterator[PackType] = data_iter
self.__data_exhausted = False
self.__pipeline = pipeline
self.__process_manager: ProcessManager = pipeline._proc_mgr
from forte.data.multi_pack import MultiPack
from forte.data.ontology.top import Annotation
from forte.data.types import DataRequest
from forte.process_manager import ProcessJobStatus
from forte.processors.base.base_processor import BaseProcessor
__all__ = [
"BaseBatchProcessor",
"BatchProcessor",
"MultiPackBatchProcessor",
"FixedSizeBatchProcessor",
"FixedSizeMultiPackBatchProcessor"
]
class BaseBatchProcessor(BaseProcessor[PackType], ABC):
r"""The base class of processors that process data in batch. This processor
enables easy data batching via analyze the context and data objects. The
context defines the scope of analysis of a particular task. For example, in
dependency parsing, the context is normally a sentence, in entity
coreference, the context is normally a document. The processor will create
data batches relative to the context.
"""
def __init__(self):
super().__init__()
self.context_type: Type[Annotation] = self._define_context()
self.input_info: DataRequest = self._define_input_info()
self.batcher: ProcessingBatcher = self.define_batcher()
self.use_coverage_index = False
def initialize(self, resources: Resources, configs: Optional[Config]):
"""
from typing import Generic, Optional, Union, Dict, Any
import yaml
from forte.common import ProcessorConfigError
from forte.common.configuration import Config
from forte.common.resources import Resources
from forte.data.base_pack import PackType, BasePack
from forte.data.ontology.core import Entry
from forte.pack_manager import PackManager
from forte.process_manager import ProcessManager
from forte.utils import get_full_module_name
class PipelineComponent(Generic[PackType]):
def __init__(self):
self._process_manager: ProcessManager = None
self._pack_manager: PackManager = None
self.resources: Optional[Resources] = None
self.configs: Config = Config({}, {})
def assign_manager(self,
process_manager: ProcessManager,
pack_manager: PackManager):
self._process_manager = process_manager
self._pack_manager = pack_manager
def initialize(self, resources: Resources, configs: Config):
r"""The pipeline will call the initialize method at the start of a
processing. The processor and reader will be initialized with
``configs``, and register global resources into ``resource``. The
f'{desc_str}'
f'Automatically generated ontology {ontology_name}. '
f'Do not change manually.'
)
def class_name(clazz):
return '.'.join((clazz.__module__, clazz.__name__))
SINGLE_PACK_CLASSES = [class_name(clazz) for clazz in top.SinglePackEntries]
MULTI_PACK_CLASSES = [class_name(clazz) for clazz in top.MultiPackEntries]
major_version, minor_version = utils.get_python_version()
if major_version >= 3 and minor_version >= 7:
PACK_TYPE_CLASS_NAME = class_name(PackType)
else:
# bug in python < 3.7
# returns => typing.TypeVar('').__module__ == 'typing' (wrong)
# instead of => typing.TypeVar('').__module__ == 'forte.data.base_pack'
PACK_TYPE_CLASS_NAME = 'forte.data.base_pack.PackType'
def hardcoded_pack_map(clazz):
if clazz in SINGLE_PACK_CLASSES:
return class_name(DataPack)
elif clazz in MULTI_PACK_CLASSES:
return class_name(MultiPack)
else:
# When not found, return the default.
return PACK_TYPE_CLASS_NAME
from forte.data.base_pack import PackType
from forte.data.data_pack import DataPack
from forte.data.multi_pack import MultiPack
from forte.data.types import DataRequest
from forte.data.data_utils_io import merge_batches, batch_instances
from forte.data.ontology.top import Annotation
from forte.data.ontology.core import Entry
__all__ = [
"ProcessingBatcher",
"FixedSizeDataPackBatcher",
"FixedSizeMultiPackProcessingBatcher",
]
class ProcessingBatcher(Generic[PackType]):
r"""This defines the basis interface of the Batcher used in
:class:`~forte.processors.batch_processor.BatchProcessor`. This Batcher
only batches data sequentially. It receives new packs dynamically and cache
the current packs so that the processors can pack prediction results into
the data packs.
Args:
cross_pack (bool, optional): whether to allow batches go across
data packs when there is no enough data at the end.
"""
def __init__(self, cross_pack: bool = True):
self.current_batch: Dict = {}
self.data_pack_pool: List[PackType] = []
self.current_batch_sources: List[int] = []
from forte.data.data_pack import DataPack
from forte.data.multi_pack import MultiPack
from forte.data.types import ReplaceOperationsType
from forte.pipeline_component import PipelineComponent
from forte.utils.utils import get_full_module_name
__all__ = [
"BaseReader",
"PackReader",
'MultiPackReader',
]
logger = logging.getLogger(__name__)
class BaseReader(PipelineComponent[PackType], ABC):
r"""The basic data reader class. To be inherited by all data readers.
Args:
from_cache (bool, optional): Decide whether to read from cache
if cache file exists. By default (``False``), the reader will
only read from the original file and use the cache file path
for caching, it will not read from the ``cache_directory``.
If ``True``, the reader will try to read a datapack from the
caching file.
cache_directory (str, optional): The base directory to place the
path of the caching files. Each collection is contained in one
cached file, under this directory. The cached location for each
collection is computed by :meth:`_cache_key_function`. Note:
A collection is the data returned by :meth:`_collect`.
append_to_cache (bool, optional): Decide whether to append write
if cache file already exists. By default (``False``), we
import itertools
from abc import abstractmethod, ABC
from typing import Any, Dict, Optional
from forte.data.base_pack import PackType
from forte.data.selector import DummySelector
from forte.pipeline_component import PipelineComponent
from forte.process_manager import ProcessJobStatus
__all__ = [
"BaseProcessor",
]
class BaseProcessor(PipelineComponent[PackType], ABC):
r"""Base class inherited by all kinds of processors such as trainer,
predictor and evaluator.
"""
def __init__(self):
super().__init__()
self.selector = DummySelector()
@abstractmethod
def new_pack(self, pack_name: Optional[str] = None) -> PackType:
"""
Create a new pack using the current pack manager.
Args:
pack_name (str, Optional): The name to be used for the pack. If not
set, the pack name will remained unset.