Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
for filepath in filepaths:
if not filepath.is_file():
raise OSError(
"requested comments file {} not found;\n"
"has the dataset been downloaded yet?".format(filepath)
)
else:
filepaths = self.filepaths
if not filepaths:
raise OSError(
"no comments files found in {} directory;\n"
"has the dataset been downloaded yet?".format(self.data_dir)
)
for filepath in filepaths:
for line in tio.read_json(filepath, mode="rb", lines=True):
line["created_utc"] = self._convert_timestamp(
line.get("created_utc", ""))
line["retrieved_on"] = self._convert_timestamp(
line.get("retrieved_on", ""))
line["body"] = self._clean_content(line["body"])
yield line
def __iter__(self):
if not self._filepath.is_file():
raise OSError(
"dataset file {} not found;\n"
"has the dataset been downloaded yet?".format(self._filepath)
)
for record in tio.read_json(self._filepath, mode="rt", lines=True):
yield record
def _get_relation_data(self, relation, is_symmetric=False):
if not self.filepath:
raise OSError(
"resource file {} not found;\n"
"has the data been downloaded yet?".format(self._filepath)
)
rel_fname = "{}.json.gz".format(_split_uri(relation)[1].lower())
rel_fpath = self.data_dir.joinpath(rel_fname)
if rel_fpath.is_file():
LOGGER.debug("loading data for '%s' relation from %s", relation, rel_fpath)
return next(
tio.read_json(rel_fpath, mode="rt", encoding="utf-8", lines=False)
)
else:
rel_data = collections.defaultdict(
lambda: collections.defaultdict(
lambda: collections.defaultdict(set)
)
)
LOGGER.info(
"preparing data for '%s' relation; this may take a while...", relation)
rows = tio.read_csv(self.filepath, delimiter="\t", quoting=1)
with tqdm() as pbar:
for row in rows:
pbar.update(1)
_, rel_type, start_uri, end_uri, _ = row
if rel_type < relation:
continue
def load_twitter_data(dirpath, langs, min_len=25):
"""
Args:
dirpath (str)
langs (Set[str])
min_len (int): minimum text length in *chars*
Returns:
List[Tuple[str, str]]
"""
dirpath = textacy.utils.to_path(dirpath).resolve()
raw_tweets = textacy.io.read_json(
dirpath.joinpath("tweets.jsonl"), mode="rt", lines=True)
tweets = []
for tweet in raw_tweets:
# totally remove any URLS from tweet text
for url in tweet.get("urls", []):
for item in url.values():
tweet["text"] = tweet["text"].replace(item, "")
tweets.append(tweet)
ds = [
(tweet["text"], tweet["lang"])
for tweet in tweets
if tweet["lang"] in langs
and itertoolz.count(char for char in tweet["text"] if char.isalnum()) >= min_len
]
return ds
def __iter__(self):
if not self.filepath:
raise OSError(
"{} database dump file {} not found; "
"has the dataset been downloaded yet?".format(
self.project, self.filepath)
)
is_bad_category = is_bad_category_funcs.get(self.project, {}).get(self.lang)
bad_wl_starts = _bad_wiki_link_starts.get(self.project, {}).get(self.lang, tuple())
lines = tio.read_json(self.filepath, mode="rb", lines=True)
for index, source in itertoolz.partition(2, lines):
if source.get("namespace") != self.namespace:
continue
# split opening text from main body text, if available
opening_text = source.get("opening_text")
text = source.get("text")
if opening_text and text and text.startswith(opening_text):
text = opening_text + "\n\n" + text[len(opening_text):].strip()
# do minimal cleaning of categories and wiki links, if available
if is_bad_category:
categories = tuple(
cat for cat in source.get("category", [])
if not is_bad_category(cat)
)
else:
categories = tuple(source.get("category", []))
def __iter__(self):
if not self._filepath.is_file():
raise OSError(
"dataset file {} not found;\n"
"has the dataset been downloaded yet?".format(self._filepath)
)
for record in tio.read_json(self._filepath, mode="rt", lines=True):
yield record