Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_read_write_sparse_csr_compressed(self, tmpdir):
expected = sp.csr_matrix(
(
np.array([1, 2, 3, 4, 5, 6]),
(np.array([0, 0, 1, 2, 2, 2]), np.array([0, 2, 2, 0, 1, 2])),
),
shape=(3, 3),
)
filepath = str(tmpdir.join("test_read_write_sparse_matrix_csr_compressed.npz"))
io.write_sparse_matrix(expected, filepath, compressed=True)
observed = io.read_sparse_matrix(filepath, kind="csr")
assert abs(observed - expected).nnz == 0
def test_unpack_archive(tmpdir):
data = "Here's some text data to pack and unpack."
fpath_txt = str(tmpdir.join("test_unpack_archive.txt"))
with tio.open_sesame(fpath_txt, mode="wt") as f:
f.write(data)
fpath_zip = str(tmpdir.join("test_unpack_archive.zip"))
with zipfile.ZipFile(fpath_zip, "w") as f:
f.write(fpath_txt)
unpack_archive(fpath_zip, extract_dir=tmpdir)
fpath_tar = str(tmpdir.join("test_unpack_archive.tar"))
with tarfile.TarFile(fpath_tar, "w") as f:
f.add(fpath_txt)
unpack_archive(fpath_tar, extract_dir=tmpdir)
unpack_archive(fpath_txt, extract_dir=tmpdir)
def test_read_write_bytes_lines(self, tmpdir, spacy_doc):
expected = [{"idx": i, "sent": sent.text} for i, sent in enumerate(spacy_doc.sents)]
for ext in (".json", ".json.gz", ".json.bz2", ".json.xz"):
filepath = str(tmpdir.join("test_read_write_json_lines_bytes" + ext))
if compat.PY2 is True:
if ext == ".json.xz":
with pytest.raises(ValueError):
io.open_sesame(
filepath, mode="wb", encoding="utf-8", make_dirs=True
)
else:
io.write_json(expected, filepath, mode="wb", make_dirs=True, lines=True)
observed = list(io.read_json(filepath, mode="rb", lines=True))
assert observed == expected
else:
with pytest.raises(TypeError):
io.write_json(
expected,
filepath,
mode="wb",
encoding=None,
make_dirs=True,
lines=True,
)
then just iterate over all ISO-639-1 language codes.
"""
dirpath = textacy.utils.to_path(dirpath).resolve()
url_fnames = [
(
"https://raw.githubusercontent.com/mitjat/langid_eval/master/uniformly_sampled.tsv",
"uniformly_sampled.tsv",
),
(
"https://raw.githubusercontent.com/mitjat/langid_eval/master/recall_oriented.tsv",
"recall_oriented.tsv",
)
]
# download tweet ids first
for url, fname in url_fnames:
textacy.io.download_file(url, filename=fname, dirpath=dirpath, force=force)
# download full tweets data next
tweets_fpath = dirpath.joinpath("tweets.jsonl")
if tweets_fpath.is_file() and force is False:
logging.info("tweets data already downloaded to %s", tweets_fpath)
return
# load twitter ids data from disk
tweet_lang_ids = []
for fname in ["uniformly_sampled.tsv", "recall_oriented.tsv"]:
tweet_lang_ids.extend(
textacy.io.read_csv(
dirpath.joinpath(fname),
delimiter="\t",
fieldnames=["lang", "status_id"],
quoting=1,
)
"""
Download resource data as a zip archive file, then save it to disk
and extract its contents under the ``data_dir`` directory.
Args:
force (bool): If True, download the resource, even if it already
exists on disk under ``data_dir``.
"""
filepath = tio.download_file(
DOWNLOAD_URL,
filename=None,
dirpath=self.data_dir,
force=force,
)
if filepath:
tio.unpack_archive(filepath, extract_dir=None)
# get tweets data in chunks
chunk_size = 100
pbar = tqdm.tqdm(total=len(status_ids), unit="tweets")
tweets = []
try:
for chunk_ids in itertoolz.partition_all(chunk_size, status_ids):
chunk_tweets = api.GetStatuses(chunk_ids, trim_user=True, include_entities=True, map=False)
tweets.extend(chunk_tweets)
pbar.update(len(chunk_ids))
except Exception:
logging.exception("encountered an error while downloading tweets")
finally:
pbar.close()
tweets = [tweet.AsDict() for tweet in tweets]
logging.info("downloaded data for %s tweets", len(tweets))
textacy.io.write_json(tweets, tweets_fpath, mode="wt", lines=True)
Yields:
Tuple[str, str, str]: Page id, title, content with wikimedia markup.
"""
if not self.filepath:
raise OSError(
"database dump file {} not found; "
"has the dataset been downloaded yet?".format(self.filepath)
)
if compat.PY2: # Python 2 can't open bzip in text mode :(
events = (b"end",)
f = tio.open_sesame(self.filepath, mode="rb")
else:
events = ("end",)
f = tio.open_sesame(self.filepath, mode="rt", encoding="UTF-8")
# TODO: figure out if we can/should clear out the tree's root element
# in case all the empty references to children eat up too much memory
with f:
elems = (elem for _, elem in iterparse(f, events=events))
elem = next(elems)
match = re.match("^{(.*?)}", elem.tag)
namespace = match.group(1) if match else ""
if not namespace.startswith("http://www.mediawiki.org/xml/export-"):
raise ValueError(
"'{}' not a valid MediaWiki dump namespace".format(namespace)
)
page_tag = "{{{}}}page".format(namespace)
ns_path = "./{{{}}}ns".format(namespace)
page_id_path = "./{{{}}}id".format(namespace)
def download(self, *, force=False):
"""
Download the data as a Python version-specific compressed json file and
save it to disk under the ``data_dir`` directory.
Args:
force (bool): If True, download the dataset, even if it already
exists on disk under ``data_dir``.
"""
release_tag = "capitol_words_py3_v{data_version}".format(data_version=1.0)
url = urllib.parse.urljoin(DOWNLOAD_ROOT, release_tag + "/" + self._filename)
tio.download_file(
url,
filename=self._filename,
dirpath=self.data_dir,
force=force,
)
"""
Download the data as a compressed tar archive file, then save it to disk and
extract its contents under the ``data_dir`` directory.
Args:
force (bool): If True, always download the dataset even if
it already exists.
"""
filepath = tio.download_file(
DOWNLOAD_URL,
filename="aclImdb.tar.gz",
dirpath=self.data_dir,
force=force,
)
if filepath:
tio.unpack_archive(filepath, extract_dir=None)
self._check_data()