Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def from_bytes(self, bytes_data):
"""Deserialize the DocBin's annotations from a bytestring.
bytes_data (bytes): The data to load from.
RETURNS (DocBin): The loaded DocBin.
DOCS: https://spacy.io/api/docbin#from_bytes
"""
msg = srsly.msgpack_loads(zlib.decompress(bytes_data))
self.attrs = msg["attrs"]
self.strings = set(msg["strings"])
lengths = numpy.frombuffer(msg["lengths"], dtype="int32")
flat_spaces = numpy.frombuffer(msg["spaces"], dtype=bool)
flat_tokens = numpy.frombuffer(msg["tokens"], dtype="uint64")
shape = (flat_tokens.size // len(self.attrs), len(self.attrs))
flat_tokens = flat_tokens.reshape(shape)
flat_spaces = flat_spaces.reshape((flat_spaces.size, 1))
self.tokens = NumpyOps().unflatten(flat_tokens, lengths)
self.spaces = NumpyOps().unflatten(flat_spaces, lengths)
self.cats = msg["cats"]
if self.store_user_data and "user_data" in msg:
self.user_data = list(msg["user_data"])
for tokens in self.tokens:
assert len(tokens.shape) == 2, tokens.shape # this should never happen
return self
def test_entity_ruler_existing_bytes_old_format_safe(patterns, en_vocab):
nlp = Language(vocab=en_vocab)
ruler = EntityRuler(nlp, patterns=patterns, overwrite_ents=True)
bytes_old_style = srsly.msgpack_dumps(ruler.patterns)
new_ruler = EntityRuler(nlp)
new_ruler = new_ruler.from_bytes(bytes_old_style)
assert len(new_ruler) == len(ruler)
for pattern in ruler.patterns:
assert pattern in new_ruler.patterns
assert new_ruler.overwrite is not ruler.overwrite
def test_roundtrip_docs_to_json():
text = "I flew to Silicon Valley via London."
cats = {"TRAVEL": 1.0, "BAKING": 0.0}
nlp = English()
doc = nlp(text)
doc.cats = cats
doc[0].is_sent_start = True
for i in range(1, len(doc)):
doc[i].is_sent_start = False
with make_tempdir() as tmpdir:
json_file = tmpdir / "roundtrip.json"
srsly.write_json(json_file, [docs_to_json(doc)])
goldcorpus = GoldCorpus(str(json_file), str(json_file))
reloaded_doc, goldparse = next(goldcorpus.train_docs(nlp))
assert len(doc) == goldcorpus.count_train()
assert text == reloaded_doc.text
assert "TRAVEL" in goldparse.cats
assert "BAKING" in goldparse.cats
assert cats["TRAVEL"] == goldparse.cats["TRAVEL"]
assert cats["BAKING"] == goldparse.cats["BAKING"]
def test_entity_ruler_from_disk_old_format_safe(patterns, en_vocab):
nlp = Language(vocab=en_vocab)
ruler = EntityRuler(nlp, patterns=patterns, overwrite_ents=True)
with make_tempdir() as tmpdir:
out_file = tmpdir / "entity_ruler"
srsly.write_jsonl(out_file.with_suffix(".jsonl"), ruler.patterns)
new_ruler = EntityRuler(nlp).from_disk(out_file)
for pattern in ruler.patterns:
assert pattern in new_ruler.patterns
assert len(new_ruler) == len(ruler)
assert new_ruler.overwrite is not ruler.overwrite
def test_pickle_string_store(text1, text2):
stringstore = StringStore()
store1 = stringstore[text1]
store2 = stringstore[text2]
data = srsly.pickle_dumps(stringstore, protocol=-1)
unpickled = srsly.pickle_loads(data)
assert unpickled[text1] == store1
assert unpickled[text2] == store2
assert len(stringstore) == len(unpickled)
def test_pickle_vocab(text1, text2):
vocab = Vocab(lex_attr_getters={int(NORM): lambda string: string[:-1]})
vocab.set_vector("dog", numpy.ones((5,), dtype="f"))
lex1 = vocab[text1]
lex2 = vocab[text2]
assert lex1.norm_ == text1[:-1]
assert lex2.norm_ == text2[:-1]
data = srsly.pickle_dumps(vocab)
unpickled = srsly.pickle_loads(data)
assert unpickled[text1].orth == lex1.orth
assert unpickled[text2].orth == lex2.orth
assert unpickled[text1].norm == lex1.norm
assert unpickled[text2].norm == lex2.norm
assert unpickled[text1].norm != unpickled[text2].norm
assert unpickled.vectors is not None
assert list(vocab["dog"].vector) == [1.0, 1.0, 1.0, 1.0, 1.0]
def test_pickle_vocab(text1, text2):
vocab = Vocab(lex_attr_getters={int(NORM): lambda string: string[:-1]})
vocab.set_vector("dog", numpy.ones((5,), dtype="f"))
lex1 = vocab[text1]
lex2 = vocab[text2]
assert lex1.norm_ == text1[:-1]
assert lex2.norm_ == text2[:-1]
data = srsly.pickle_dumps(vocab)
unpickled = srsly.pickle_loads(data)
assert unpickled[text1].orth == lex1.orth
assert unpickled[text2].orth == lex2.orth
assert unpickled[text1].norm == lex1.norm
assert unpickled[text2].norm == lex2.norm
assert unpickled[text1].norm != unpickled[text2].norm
assert unpickled.vectors is not None
assert list(vocab["dog"].vector) == [1.0, 1.0, 1.0, 1.0, 1.0]
def test_pickle_string_store(text1, text2):
stringstore = StringStore()
store1 = stringstore[text1]
store2 = stringstore[text2]
data = srsly.pickle_dumps(stringstore, protocol=-1)
unpickled = srsly.pickle_loads(data)
assert unpickled[text1] == store1
assert unpickled[text2] == store2
assert len(stringstore) == len(unpickled)
def from_bytes(bytes_data, setters, exclude):
msg = srsly.msgpack_loads(bytes_data)
for key, setter in setters.items():
# Split to support file names like meta.json
if key.split(".")[0] not in exclude and key in msg:
setter(msg[key])
return msg
def main(path):
reddit = Reddit(path)
for comment in reddit:
print(srsly.json_dumps(comment))