Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import yaml
from termcolor import colored
from forte.common.configuration import Config
from forte.data.multi_pack import MultiPack
from forte.data.readers import MultiPackTerminalReader
from forte.pipeline import Pipeline
from forte.processors.ir import (
ElasticSearchQueryCreator, ElasticSearchProcessor, BertRerankingProcessor)
from ft.onto.base_ontology import Sentence
if __name__ == "__main__":
config_file = os.path.join(os.path.dirname(__file__), 'config.yml')
config = yaml.safe_load(open(config_file, "r"))
config = Config(config, default_hparams=None)
data_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),
config.data.relative_path)
nlp: Pipeline[MultiPack] = Pipeline()
nlp.set_reader(reader=MultiPackTerminalReader(), config=config.reader)
# Indexing and Re-ranking
nlp.add(ElasticSearchQueryCreator(), config=config.query_creator)
nlp.add(ElasticSearchProcessor(), config=config.indexer)
nlp.add(BertRerankingProcessor(), config=config.reranker)
nlp.initialize()
passage_keys = [f"passage_{i}" for i in range(config.query_creator.size)]
num_passages = len(passage_keys)
def __init__(self):
self._process_manager: ProcessManager = None
self._pack_manager: PackManager = None
self.resources: Optional[Resources] = None
self.configs: Config = Config({}, {})
if configs is not None:
if isinstance(configs, Config):
configs = configs.todict()
if "config_path" in configs and not configs["config_path"] is None:
filebased_configs = yaml.safe_load(
open(configs.pop("config_path")))
else:
filebased_configs = {}
merged_configs.update(filebased_configs)
merged_configs.update(configs)
try:
final_configs = Config(merged_configs, cls.default_configs())
except ValueError as e:
raise ProcessorConfigError(
f'Configuration error for the processor '
f'{get_full_module_name(cls)}.') from e
return final_configs
def _get_default_config():
return {"relative_path": "./data/collectionandqueries"}
if __name__ == "__main__":
parser = argparse.ArgumentParser()
# data path can be read from config file
parser.add_argument("--config_file", default="./config.yml",
help="Config YAML filepath")
args = parser.parse_args()
data_config = yaml.safe_load(open(args.config_file, "r"))["data"]
config = Config(data_config, default_hparams=_get_default_config())
# data path can be passed through command line, it is given priority
default_data_path = config.relative_path
parser.add_argument("--path", default=default_data_path,
help="Path to where data will be saved")
args = parser.parse_args()
resource_path = Path(args.path)
# create the path if it doesn't exist
resource_path.mkdir(parents=True, exist_ok=True)
# download data
url = "https://msmarco.blob.core.windows.net/msmarcoranking/" \
"collectionandqueries.tar.gz"
from forte.common.configuration import Config
from forte.data.multi_pack import MultiPack
from forte.pipeline import Pipeline
from forte.processors.ir import (
ElasticSearchQueryCreator, ElasticSearchProcessor, BertRerankingProcessor)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--config_file", default="./config.yml",
help="Config YAML filepath")
args = parser.parse_args()
# loading config
config = yaml.safe_load(open(args.config_file, "r"))
config = Config(config, default_hparams=None)
# reading query input file
parser.add_argument("--input_file",
default="./data/collectionandqueries/query_doc_id.tsv",
help="Input query filepath")
input_file = config.evaluator.input_file
# initializing pipeline with processors
nlp: Pipeline = Pipeline[MultiPack]()
eval_reader = EvalReader()
nlp.set_reader(reader=eval_reader, config=config.reader)
nlp.add(ElasticSearchQueryCreator(), config=config.query_creator)
nlp.add(ElasticSearchProcessor(), config=config.indexer)
nlp.add(BertRerankingProcessor(), config=config.reranker)
nlp.add(MSMarcoEvaluator(), config=config.evaluator)
def __init__(self, resource: Optional[Resources] = None):
self._reader: BaseReader
self._reader_config: Optional[Config]
self._components: List[PipelineComponent] = []
self._selectors: List[Selector] = []
self._processors_index: Dict = {'': -1}
self._configs: List[Optional[Config]] = []
# This manager controls global pack access information
self._pack_manager: PackManager = PackManager()
# Will initialize at `initialize` because the processors length is
# unknown.
self._proc_mgr: ProcessManager = None # type: ignore
self.evaluator_indices: List[int] = []
def main(dataset_dir: str):
config = yaml.safe_load(open("config.yml", "r"))
config = Config(config, default_hparams=None)
pl = Pipeline[DataPack]()
pl.set_reader(PlainTextReader())
pl.add(NLTKSentenceSegmenter())
pl.add(NLTKWordTokenizer())
pl.add(NLTKPOSTagger())
pl.add(CoNLLNERPredictor(), config=config.NER)
pl.add(SRLPredictor(), config=config.SRL)
pl.initialize()
for pack in pl.process_dataset(dataset_dir):
print(colored("Document", 'red'), pack.meta.pack_name)
for sentence in pack.get(Sentence):
sent_text = sentence.text
print(colored("Sentence:", 'red'), sent_text, "\n")
self.cache_dir = os.path.join(os.path.dirname(__file__), rel_dir)
if self.pretrained_model_name is None or self.cache_dir is None:
raise ValueError("Pre-trained model name and directory should"
"be defined in the fine tuned BERT model.")
self.pretrained_model_dir = os.path.join(self.cache_dir,
self.pretrained_model_name)
pretrained_model_hparams = self._transform_config(
self.pretrained_model_name, self.pretrained_model_dir)
super_params = self.default_hparams()
if 'prefix' not in super_params:
super_params["prefix"] = '_encoder.encoder.'
self._hparams = Config(pretrained_model_hparams, super_params)
from forte.common.configuration import Config
from forte.data.data_pack import DataPack
from forte.data.readers import MSMarcoPassageReader
from forte.pipeline import Pipeline
from forte.processors.ir import ElasticSearchIndexProcessor
logging.basicConfig(level=logging.INFO)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--config_file", default="./config.yml",
help="Config YAML filepath")
args = parser.parse_args()
config = yaml.safe_load(open(args.config_file, "r"))
config = Config(config, default_hparams=None)
nlp: Pipeline[DataPack] = Pipeline()
nlp.set_reader(MSMarcoPassageReader())
nlp.add(ElasticSearchIndexProcessor(), config=config.create_index)
nlp.initialize()
data_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),
config.data.relative_path)
for idx, pack in enumerate(nlp.process_dataset(data_path)):
if idx + 1 > 0 and (idx + 1) % 10000 == 0:
print(f"Indexed {idx + 1} packs")