Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_zips_multiple_files(self):
fp = self.fp
lines = self.lines
data = TextDataset([fp.name, fp.name], mode='zip')
for x, y in zip(data, lines):
self.assertTupleEqual(x, (y, y))
for j, y in enumerate(lines):
self.assertTupleEqual(data[j], (y, y))
self.assertEqual(len(data), len(lines))
self.assertEqual(data._length, len(lines))
self.assertIsInstance(data._dataset, lineflow.core.ZipDataset)
self.assertIsInstance(data.map(lambda x: x)._dataset, TextDataset)
def test_text(self):
fp = self.fp
lines = self.lines
data = TextDataset(fp.name)
self.assertEqual(data._length, None)
for x, y in zip(data, lines):
self.assertEqual(x, y)
for i, y in enumerate(lines):
self.assertEqual(data[i], y)
self.assertEqual(len(data), len(lines))
self.assertEqual(data._length, len(lines))
# check if length is cached
self.assertEqual(len(data), len(lines))
self.assertIsInstance(data._dataset, easyfile.TextFile)
data = data.map(str.split)
def test_raises_value_error_with_invalid_mode(self):
with self.assertRaises(ValueError):
TextDataset([self.fp.name, self.fp.name], mode='invalid_mode')
def test_concats_multiple_files(self):
fp = self.fp
lines = self.lines
data = TextDataset([fp.name, fp.name], mode='concat')
for x, y in zip(data, lines + lines):
self.assertEqual(x, y)
for j, y in enumerate(lines + lines):
self.assertEqual(data[j], y)
self.assertEqual(len(data), len(lines) * 2)
self.assertEqual(data._length, len(lines) * 2)
self.assertEqual(data[len(data) - 1], lines[-1])
self.assertIsInstance(data._dataset, lineflow.core.ConcatDataset)
self.assertIsInstance(data.map(lambda x: x)._dataset, TextDataset)
if __name__ == '__main__':
nlp = spacy.load('en_core_web_sm',
disable=['vectors', 'textcat', 'tagger', 'ner'])
# training data
if not osp.exists(PREPROCESSED[0]):
ds_train = TextDataset('./train-v1.1.jsonl').map(json.loads)
ds_train = ds_train.map(preprocess(nlp)).save(PREPROCESSED[0])
else:
ds_train = TextDataset.load(PREPROCESSED[0])
# dev data
if not osp.exists(PREPROCESSED[1]):
ds_dev = TextDataset('./dev-v1.1.jsonl').map(json.loads)
ds_dev = ds_dev.map(preprocess(nlp)).save(PREPROCESSED[1])
else:
ds_dev = TextDataset.load(PREPROCESSED[1])
# peek a first item
print(ds_train.first())
print(ds_dev.first())
# support random access
print(ds_train[100])
print(ds_dev[100])
token_to_index, words = build_vocab(Concat()(ds_train, ds_dev))
# training data
if not osp.exists(POSTPROCESSED[0]):
ds_train = ds_train \
.map(postprocess_train(token_to_index)) \
.save(POSTPROCESSED[0])
else:
return question, context, x['context'], x['id']
return f
if __name__ == '__main__':
nlp = spacy.load('en_core_web_sm',
disable=['vectors', 'textcat', 'tagger', 'ner'])
# training data
if not osp.exists(PREPROCESSED[0]):
ds_train = TextDataset('./train-v1.1.jsonl').map(json.loads)
ds_train = ds_train.map(preprocess(nlp)).save(PREPROCESSED[0])
else:
ds_train = TextDataset.load(PREPROCESSED[0])
# dev data
if not osp.exists(PREPROCESSED[1]):
ds_dev = TextDataset('./dev-v1.1.jsonl').map(json.loads)
ds_dev = ds_dev.map(preprocess(nlp)).save(PREPROCESSED[1])
else:
ds_dev = TextDataset.load(PREPROCESSED[1])
# peek a first item
print(ds_train.first())
print(ds_dev.first())
# support random access
print(ds_train[100])
print(ds_dev[100])
token_to_index, words = build_vocab(Concat()(ds_train, ds_dev))
# training data
if not osp.exists(POSTPROCESSED[0]):
ds_train = ds_train \
def postprocess_dev(token_to_index):
def f(x):
question = [token_to_index[token] for token in x['question']]
context = [token_to_index[token] for token in x['context']]
return question, context, x['context'], x['id']
return f
if __name__ == '__main__':
nlp = spacy.load('en_core_web_sm',
disable=['vectors', 'textcat', 'tagger', 'ner'])
# training data
if not osp.exists(PREPROCESSED[0]):
ds_train = TextDataset('./train-v1.1.jsonl').map(json.loads)
ds_train = ds_train.map(preprocess(nlp)).save(PREPROCESSED[0])
else:
ds_train = TextDataset.load(PREPROCESSED[0])
# dev data
if not osp.exists(PREPROCESSED[1]):
ds_dev = TextDataset('./dev-v1.1.jsonl').map(json.loads)
ds_dev = ds_dev.map(preprocess(nlp)).save(PREPROCESSED[1])
else:
ds_dev = TextDataset.load(PREPROCESSED[1])
# peek a first item
print(ds_train.first())
print(ds_dev.first())
# support random access
print(ds_train[100])
print(ds_dev[100])