Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from forte.data.multi_pack import MultiPack
from forte.data.readers import MultiPackTerminalReader
from forte.pipeline import Pipeline
from forte.processors.ir import (
ElasticSearchQueryCreator, ElasticSearchProcessor, BertRerankingProcessor)
from ft.onto.base_ontology import Sentence
if __name__ == "__main__":
config_file = os.path.join(os.path.dirname(__file__), 'config.yml')
config = yaml.safe_load(open(config_file, "r"))
config = Config(config, default_hparams=None)
data_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),
config.data.relative_path)
nlp: Pipeline[MultiPack] = Pipeline()
nlp.set_reader(reader=MultiPackTerminalReader(), config=config.reader)
# Indexing and Re-ranking
nlp.add(ElasticSearchQueryCreator(), config=config.query_creator)
nlp.add(ElasticSearchProcessor(), config=config.indexer)
nlp.add(BertRerankingProcessor(), config=config.reranker)
nlp.initialize()
passage_keys = [f"passage_{i}" for i in range(config.query_creator.size)]
num_passages = len(passage_keys)
print(f"Retrieved {num_passages} passages.")
m_pack: MultiPack
for m_pack in nlp.process_dataset():
for p, passage in enumerate(passage_keys):
def create_pipeline(config_path: str) -> Pipeline[MultiPack]:
pl = Pipeline[MultiPack]()
pl.init_from_config_path(config_path)
print("\nFinished loading\n")
return pl
help="Config YAML filepath")
args = parser.parse_args()
# loading config
config = yaml.safe_load(open(args.config_file, "r"))
config = Config(config, default_hparams=None)
# reading query input file
parser.add_argument("--input_file",
default="./data/collectionandqueries/query_doc_id.tsv",
help="Input query filepath")
input_file = config.evaluator.input_file
# initializing pipeline with processors
nlp: Pipeline = Pipeline[MultiPack]()
eval_reader = EvalReader()
nlp.set_reader(reader=eval_reader, config=config.reader)
nlp.add(ElasticSearchQueryCreator(), config=config.query_creator)
nlp.add(ElasticSearchProcessor(), config=config.indexer)
nlp.add(BertRerankingProcessor(), config=config.reranker)
nlp.add(MSMarcoEvaluator(), config=config.evaluator)
nlp.initialize()
for idx, m_pack in enumerate(nlp.process_dataset(input_file)):
if (idx + 1) % 1000 == 0:
print(f"Processed {idx + 1} examples")
scores = nlp.evaluate()
print(scores)
def new_pack(self, pack_name: Optional[str] = None) -> DataPack:
return DataPack(self._pack_manager, pack_name)
def set_text(self, pack: DataPack, text: str):
r"""Assign the text value to the :class:`DataPack`. This function will
pass the ``text_replace_operation`` to the :class:`DataPack` to conduct
the pre-processing step.
Args:
pack: The :class:`DataPack` to assign value for.
text: The original text to be recorded in this dataset.
"""
pack.set_text(text, replace_func=self.text_replace_operation)
class MultiPackReader(BaseReader[MultiPack], ABC):
r"""The basic :class:`MultiPack` data reader class. To be inherited by all
data readers which return :class:`MultiPack`.
"""
@property
def pack_type(self):
return MultiPack
def new_pack(self, pack_name: Optional[str] = None) -> MultiPack:
return MultiPack(self._pack_manager, pack_name)
def new_pack(self, pack_name: Optional[str] = None) -> MultiPack:
"""
Create a new multi pack using the current pack manager.
Args:
pack_name (str, Optional): The name to be used for the pack. If not
set, the pack name will remained unset.
Returns:
"""
return MultiPack(self._pack_manager, pack_name)
def new_pack(self, pack_name: Optional[str] = None) -> DataPack:
"""
Create a new pack based using the current pack manager.
Args:
pack_name (str, Optional): The name to be used for the pack. If not
set, the pack name will remained unset.
Returns:
"""
return DataPack(self._pack_manager, pack_name)
class MultiPackProcessor(BaseProcessor[MultiPack], ABC):
r"""The base class of processors that process :class:`MultiPack` each time.
"""
def _process(self, input_pack: MultiPack):
raise NotImplementedError
def new_pack(self, pack_name: Optional[str] = None) -> MultiPack:
"""
Create a new multi pack using the current pack manager.
Args:
pack_name (str, Optional): The name to be used for the pack. If not
set, the pack name will remained unset.
Returns:
input_pack,
self.context_type,
entry_type
)
def new_pack(self, pack_name: Optional[str] = None) -> DataPack:
return DataPack(self._pack_manager, pack_name)
class FixedSizeBatchProcessor(BatchProcessor, ABC):
@staticmethod
def define_batcher() -> ProcessingBatcher:
return FixedSizeDataPackBatcher()
class MultiPackBatchProcessor(BaseBatchProcessor[MultiPack], ABC):
r"""This just defines the generic type to :class:`MultiPack`.
The implemented batch processors will process :class:`MultiPacks`.
"""
def __init__(self):
super().__init__()
self.input_pack_name = None
# TODO multi pack batcher need to be further studied.
def prepare_coverage_index(self, input_pack: MultiPack):
for entry_type in self.input_info.keys():
if input_pack.packs[self.input_pack_name].index.coverage_index(
self.context_type, entry_type) is None:
p = input_pack.packs[self.input_pack_name]
p.index.build_coverage_index(
p, self.context_type, entry_type)
def new_pack(self, pack_name: Optional[str] = None) -> MultiPack:
return MultiPack(self._pack_manager, pack_name)
def cast(self, pack: DataPack) -> MultiPack:
"""
Auto-box the data-pack into a multi-pack by simple wrapping.
Args:
pack: The data pack to be boxed
Returns: An iterator that produces the boxed multi pack.
"""
p = MultiPack(self._pack_manager)
p.add_pack_(pack, self.configs.pack_name)
return p
def main(config: Config):
query_pipeline = setup(config)
resource = query_pipeline.resource
m_pack: MultiPack
for m_pack in query_pipeline.process_dataset():
# update resource to be used in the next conversation
query_pack = m_pack.get_pack(config.translator.in_pack_name)
if resource.get("user_utterance"):
resource.get("user_utterance").append(query_pack)
else:
resource.update(user_utterance=[query_pack])
response_pack = m_pack.get_pack(config.back_translator.in_pack_name)
if resource.get("bot_utterance"):
resource.get("bot_utterance").append(response_pack)
else:
resource.update(bot_utterance=[response_pack])
english_pack = m_pack.get_pack("pack")