Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_fails_to_make_directory(self, f):
f.side_effect = OSError()
with self.assertRaises(OSError):
download.cached_download('https://example.com')
def test_cache_exists(self, f):
f.return_value = True
url = 'https://example.com'
path = download.cached_download(url)
self.assertEqual(path, f'{self.temp_dir}/_dl_cache/{hashlib.md5(url.encode("utf-8")).hexdigest()}')
def creator(path):
archive_path = download.cached_download(url)
with tarfile.open(archive_path, 'r') as archive:
print(f'Extracting to {root}...')
archive.extractall(root)
extracted_path = os.path.join(root, 'aclImdb')
dataset = {}
for split in ('train', 'test'):
pos_path = os.path.join(extracted_path, split, 'pos')
neg_path = os.path.join(extracted_path, split, 'neg')
dataset[split] = [x.path for x in os.scandir(pos_path)
if x.is_file() and x.name.endswith('.txt')] + \
[x.path for x in os.scandir(neg_path)
if x.is_file() and x.name.endswith('.txt')]
with io.open(path, 'wb') as f:
def creator(path):
archive_path = download.cached_download(url)
target_path = os.path.join(root, 'raw')
with tarfile.open(archive_path, 'r') as archive:
print(f'Extracting to {target_path}')
archive.extractall(target_path)
dataset = {}
for split in ('train', 'dev', 'test'):
src_path = f'{split if split != "dev" else "val"}.txt.src'
tgt_path = f'{split if split != "dev" else "val"}.txt.tgt.tagged'
dataset[split] = (
easyfile.TextFile(os.path.join(target_path, src_path)),
easyfile.TextFile(os.path.join(target_path, tgt_path))
)
with io.open(path, 'wb') as f:
pickle.dump(dataset, f)
def creator(path):
dataset = {}
fieldnames = ('quality', 'id1', 'id2', 'string1', 'string2')
for split in ('train', 'test'):
data_path = download.cached_download(url.format(split))
with io.open(data_path, 'r', encoding='utf-8') as f:
f.readline() # skip header
reader = csv.DictReader(f, delimiter='\t', fieldnames=fieldnames)
dataset[split] = [dict(row) for row in reader]
with io.open(path, 'wb') as f:
pickle.dump(dataset, f)
return dataset
def list_creator(path):
archive_path = download.cached_download(url)
with zipfile.ZipFile(archive_path, 'r') as archive:
dataset = {}
path2key = {f'{name}/wiki.train.tokens': 'train',
f'{name}/wiki.valid.tokens': 'dev',
f'{name}/wiki.test.tokens': 'test'}
for p, key in path2key.items():
print(f'Extracting {p}...')
with archive.open(p) as f:
lines = [line.decode('utf-8').rstrip(os.linesep) for line in f]
dataset[key] = lines
with io.open(path, 'wb') as f:
pickle.dump(dataset, f)
return dataset
def creator(path):
dataset = {}
for split in ('train', 'dev', 'test'):
en_path = download.cached_download(en_url.format(split))
ja_path = download.cached_download(ja_url.format(split))
with io.open(en_path, 'rt') as en, io.open(ja_path, 'rt') as ja:
dataset[split] = [(x.rstrip(os.linesep), y.rstrip(os.linesep))
for x, y in zip(en, ja)]
with io.open(path, 'wb') as f:
pickle.dump(dataset, f)
return dataset
def creator(path):
train_path = download.cached_download(train_url)
dev_path = download.cached_download(dev_url)
test_path = download.cached_download(test_url)
dataset = {}
for split in ("train", "dev", "test"):
data_path = {"train": train_path, "dev": dev_path, "test": test_path}[split]
with io.open(data_path, "rt", encoding="utf-8") as f:
data = [json.loads(line) for line in f.readlines()]
temp = []
for x in data:
answer_key = x["answerKey"] if split != "test" else ""
options = {choice["label"]: choice["text"] for choice in x["question"]["choices"]}
stem = x["question"]["stem"]
temp.append({
"id": x["id"],
"answer_key": answer_key,
"options": options,