Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# if `file_path` is a URL, redirect to the cache
file_path = cached_path(file_path)
with open(file_path, u"r") as data_file:
logger.info(u"Reading instances from lines in file at: %s", file_path)
for line in data_file:
line = line.strip(u"\n")
# skip blank lines
if not line:
continue
tokens_and_tags = [pair.rsplit(self._word_tag_delimiter, 1)
for pair in line.split(self._token_delimiter)]
tokens = [Token(token) for token, tag in tokens_and_tags]
tags = [tag for token, tag in tokens_and_tags]
yield self.text_to_instance(tokens, tags)
def split_token_by_delimiter(token: Token, delimiter: str) -> List[Token]:
split_tokens = []
char_offset = token.idx
for sub_str in token.text.split(delimiter):
if sub_str:
split_tokens.append(Token(text=sub_str, idx=char_offset))
char_offset += len(sub_str)
split_tokens.append(Token(text=delimiter, idx=char_offset))
char_offset += len(delimiter)
if split_tokens:
split_tokens.pop(-1)
char_offset -= len(delimiter)
return split_tokens
else:
return [token]
def _read(self, file_path: str):
# if `file_path` is a URL, redirect to the cache
file_path = cached_path(file_path)
ontonotes_reader = Ontonotes()
logger.info("Reading Fine-Grained NER instances from dataset files at: %s", file_path)
if self._domain_identifier is not None:
logger.info(
"Filtering to only include file paths containing the %s domain",
self._domain_identifier,
)
for sentence in self._ontonotes_subset(
ontonotes_reader, file_path, self._domain_identifier
):
tokens = [Token(_normalize_word(t)) for t in sentence.words]
yield self.text_to_instance(tokens, sentence.named_entities)
def tokens_to_lm_instance(tokens: List[Token],
token_indexers: Dict[str, TokenIndexer]):
tokens = list(tokens) # shallow copy
tokens.insert(0, Token(START_SYMBOL))
tokens.append(Token(END_SYMBOL))
input_field = TextField(tokens[:-1], token_indexers)
output_field = TextField(tokens[1:], token_indexers)
return Instance({'input_tokens': input_field,
'output_tokens': output_field})
if targets and len(targets) != len(mask_positions):
raise ValueError(f"Found {len(mask_positions)} mask tokens and {len(targets)} targets")
mask_position_field = ListField([IndexField(i, input_field) for i in mask_positions])
fields: Dict[str, Field] = {"tokens": input_field, "mask_positions": mask_position_field}
# TODO(mattg): there's a problem if the targets get split into multiple word pieces...
# (maksym-del): if we index word that was not split into wordpieces with
# PretrainedTransformerTokenizer we will get OOV token ID...
# Until this is handeled, let's use first wordpiece id for each token since tokens should contain text_ids
# to be indexed with PretrainedTokenIndexer. It also requeires hack to avoid adding special tokens...
if targets is not None:
# target_field = TextField([Token(target) for target in targets], self._token_indexers)
first_wordpieces = [self._targets_tokenizer.tokenize(target)[0] for target in targets]
target_tokens = []
for wordpiece, target in zip(first_wordpieces, targets):
target_tokens.append(
Token(text=target, text_id=wordpiece.text_id, type_id=wordpiece.type_id)
)
fields["target_ids"] = TextField(target_tokens, self._token_indexers)
return Instance(fields)
The words in the sentence to be encoded.
upos_tags : ``List[str]``, required.
The universal dependencies POS tags for each word.
dependencies ``List[Tuple[str, int]]``, optional (default = None)
A list of (head tag, head index) tuples. Indices are 1 indexed,
meaning an index of 0 corresponds to that word being the root of
the dependency tree.
Returns
-------
An instance containing words, upos tags, dependency head tags and head
indices as fields. The language identifier is stored in the metadata.
"""
fields: Dict[str, Field] = {}
tokens = TextField([Token(w) for w in words], self._token_indexers)
fields["words"] = tokens
fields["pos_tags"] = SequenceLabelField(upos_tags, tokens, label_namespace="pos")
if dependencies is not None:
# We don't want to expand the label namespace with an additional dummy token, so we'll
# always give the 'ROOT_HEAD' token a label of 'root'.
fields["head_tags"] = SequenceLabelField(
[x[0] for x in dependencies], tokens, label_namespace="head_tags"
)
fields["head_indices"] = SequenceLabelField(
[int(x[1]) for x in dependencies], tokens, label_namespace="head_index_tags"
)
fields["metadata"] = MetadataField({"words": words, "pos": upos_tags, "lang": lang})
return Instance(fields)
def text_to_instance(self, # type: ignore
words: List[str],
lemmas: List[str] = None,
lemma_rules: List[str] = None,
upos_tags: List[str] = None,
xpos_tags: List[str] = None,
feats: List[str] = None,
dependencies: List[Tuple[str, int]] = None,
ids: List[str] = None,
multiword_ids: List[str] = None,
multiword_forms: List[str] = None) -> Instance:
fields: Dict[str, Field] = {}
tokens = TextField([Token(w) for w in words], self._token_indexers)
fields["tokens"] = tokens
names = ["upos", "xpos", "feats", "lemmas"]
all_tags = [upos_tags, xpos_tags, feats, lemma_rules]
for name, field in zip(names, all_tags):
if field:
fields[name] = SequenceLabelField(field, tokens, label_namespace=name)
if dependencies is not None:
# We don't want to expand the label namespace with an additional dummy token, so we'll
# always give the 'ROOT_HEAD' token a label of 'root'.
fields["head_tags"] = SequenceLabelField([x[0] for x in dependencies],
tokens,
label_namespace="head_tags")
fields["head_indices"] = SequenceLabelField([int(x[1]) for x in dependencies],
tokens,
with respect to the document text.
span_labels : ``SequenceLabelField``, optional
The id of the cluster which each possible span belongs to, or -1 if it does
not belong to a cluster. As these labels have variable length (it depends on
how many spans we are considering), we represent this a as a ``SequenceLabelField``
with respect to the ``spans ``ListField``.
"""
flattened_sentences = [
self._normalize_word(word) for sentence in sentences for word in sentence
]
metadata: Dict[str, Any] = {"original_text": flattened_sentences}
if gold_clusters is not None:
metadata["clusters"] = gold_clusters
text_field = TextField([Token(word) for word in flattened_sentences], self._token_indexers)
cluster_dict = {}
if gold_clusters is not None:
for cluster_id, cluster in enumerate(gold_clusters):
for mention in cluster:
cluster_dict[tuple(mention)] = cluster_id
spans: List[Field] = []
span_labels: Optional[List[int]] = [] if gold_clusters is not None else None
sentence_offset = 0
for sentence in sentences:
for start, end in enumerate_spans(
sentence, offset=sentence_offset, max_span_width=self._max_span_width
):
if span_labels is not None:
def prepare_text(text, max_tokens):
text = text.lower() if self._lowercase else text
tokens = self._tokenizer.tokenize(text)[:max_tokens]
tokens.insert(0, Token(START_SYMBOL))
tokens.append(Token(END_SYMBOL))
return tokens
span_labels: Optional[List[int]] = [] if gold_clusters is not None else None
sentence_offset = 0
normal = []
for sentence in sentences:
# enumerate the spans.
for start, end in enumerate_spans(sentence,
offset=sentence_offset,
max_span_width=self._max_span_width):
if span_labels is not None:
if (start, end) in cluster_dict:
span_labels.append(cluster_dict[(start, end)])
else:
span_labels.append(-1)
# align the spans to the BERT tokeniation
normal.append((start, end))
span_field = TextField([Token(["[CLS]"])] + [Token(word) for word in flattened_sentences]+ [Token(["[SEP]"])] , self._token_indexers)
# span field for Span, which needs to be a flattened esnetnece.
spans.append(SpanField(start, end, span_field))
sentence_offset += len(sentence)
span_field = ListField(spans)
metadata_field = MetadataField(metadata)
fields: Dict[str, Field] = {"text": text_field,
"spans": span_field,
"metadata": metadata_field}
if span_labels is not None:
fields["span_labels"] = SequenceLabelField(span_labels, span_field)
return Instance(fields)