Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, data_dir=constants.DEFAULT_DATA_DIR.joinpath(NAME)):
super().__init__(NAME, meta=META)
self.data_dir = utils.to_path(data_dir).resolve()
self._date_range = None
def __init__(
self,
data_dir=constants.DEFAULT_DATA_DIR.joinpath(NAME),
version="5.7.0",
):
super().__init__(NAME, meta=META)
self.version = version
self.data_dir = utils.to_path(data_dir).resolve().joinpath(self.version)
self._filename = "conceptnet-assertions-{}.csv.gz".format(self.version)
self._filepath = self.data_dir.joinpath(self._filename)
self._antonyms = None
self._hyponyms = None
self._meronyms = None
self._synonyms = None
def load_tatoeba_data(dirpath, iso_lang_map, min_len=25):
"""
Args:
dirpath (str or :class:`pathlib.Path`)
iso_lang_map (Dict[str, str])
min_len (int): minimum text length in *chars*
Returns:
List[Tuple[str, str]]
"""
dirpath = textacy.utils.to_path(dirpath).resolve()
rows = textacy.io.read_csv(
dirpath.joinpath("sentences.csv"),
fieldnames=["sent_id", "iso-639-3", "text"],
delimiter="\t",
quoting=1,
)
langs = set(iso_lang_map.keys())
ds = [
(row["text"], iso_lang_map[row["iso-639-3"]])
for row in rows
if row["iso-639-3"] in langs
and itertoolz.count(char for char in row["text"] if char.isalnum()) >= min_len
]
return ds
download_wili_data(wili_dirpath, force=args.force)
wili_data = load_wili_data(wili_dirpath, iso_639_data, min_len=args.min_len)
summarize_dataset(wili_data)
download_udhr_data(udhr_dirpath, force=args.force)
udhr_data = load_udhr_data(
udhr_dirpath, set(iso_639_data.values()), min_len=args.min_len)
summarize_dataset(udhr_data)
download_dslcc_data(dslcc_dirpath, force=args.force)
dslcc_data = load_dslcc_data(
dslcc_dirpath, set(iso_639_data.values()), min_len=args.min_len)
summarize_dataset(dslcc_data)
# HACK HACK HACK
leipzig_dirpath = textacy.utils.to_path(
"/Users/burtondewilde/Desktop/datasets/language_identification/leipzig-corpora"
).resolve()
if leipzig_dirpath.is_dir():
leipzig_data = load_leipzig_data(leipzig_dirpath, iso_639_data, min_len=args.min_len)
summarize_dataset(leipzig_data)
else:
logging.warning("leipzig data hack unavailable, sorry")
leipzig_data = []
# aggregate and sample datasets
datasets = (
udhr_data +
wili_data +
get_random_sample(tatoeba_data, 420000, stratify=True, random_state=42) +
get_random_sample(leipzig_data, 480000, stratify=True, random_state=42) +
get_random_sample(twitter_data, len(twitter_data), stratify=True, random_state=42) +
def __init__(self, data_dir=constants.DEFAULT_DATA_DIR.joinpath(NAME)):
super().__init__(NAME, meta=META)
self.data_dir = utils.to_path(data_dir).resolve()
self._texts_dirpath = self.data_dir.joinpath("udhr_txt")
self._index_filepath = self._texts_dirpath.joinpath("index.xml")
self._index = None
self.langs = None
def load_twitter_data(dirpath, langs, min_len=25):
"""
Args:
dirpath (str)
langs (Set[str])
min_len (int): minimum text length in *chars*
Returns:
List[Tuple[str, str]]
"""
dirpath = textacy.utils.to_path(dirpath).resolve()
raw_tweets = textacy.io.read_json(
dirpath.joinpath("tweets.jsonl"), mode="rt", lines=True)
tweets = []
for tweet in raw_tweets:
# totally remove any URLS from tweet text
for url in tweet.get("urls", []):
for item in url.values():
tweet["text"] = tweet["text"].replace(item, "")
tweets.append(tweet)
ds = [
(tweet["text"], tweet["lang"])
for tweet in tweets
if tweet["lang"] in langs
and itertoolz.count(char for char in tweet["text"] if char.isalnum()) >= min_len
]
return ds
quoting=1,
)
)
logging.info("loaded %s tweet ids from disk", len(tweet_lang_ids))
# parse status ids
status_ids = set()
for row in tweet_lang_ids:
try:
status_ids.add(int(row["status_id"]))
# there are a small handful of bad status ids, shrug
except ValueError:
pass
logging.info("... of which %s had valid, unique ids", len(status_ids))
status_ids = list(status_ids)
# instantiate twitter api client
with textacy.utils.to_path(creds_fpath).resolve().open(mode="rt") as f:
creds = yaml.safe_load(f.read())
api = twitter.Api(sleep_on_rate_limit=True, **creds)
# get tweets data in chunks
chunk_size = 100
pbar = tqdm.tqdm(total=len(status_ids), unit="tweets")
tweets = []
try:
for chunk_ids in itertoolz.partition_all(chunk_size, status_ids):
chunk_tweets = api.GetStatuses(chunk_ids, trim_user=True, include_entities=True, map=False)
tweets.extend(chunk_tweets)
pbar.update(len(chunk_ids))
except Exception:
logging.exception("encountered an error while downloading tweets")
finally:
pbar.close()
tweets = [tweet.AsDict() for tweet in tweets]
def load_iso_639_data(dirpath, exclude=None):
"""
Args:
dirpath (str or :class:`pathlib.Path`)
exclude (Set[str])
Returns:
Dict[str, str]
"""
dirpath = textacy.utils.to_path(dirpath).resolve()
rows = textacy.io.read_csv(
dirpath.joinpath("iso-639-3.tsv").resolve(),
delimiter="\t",
fieldnames=["Id", "Part2B", "Part2T", "Part1", "Scope", "Language_Type", "Ref_Name", "Comment"],
quoting=1,
)
lang_map = {
row["Id"]: row["Part1"]
for row in rows
if row.get("Part1") and
(exclude is None or row["Part1"] not in exclude)
}
return lang_map
for text, lang in ds_test:
pbar.update(1)
true.append(lang)
try:
lang = model.predict([text])[0]
preds.append(lang)
except Exception:
exceptions.update([lang])
preds.append("un")
print("# exceptions :", len(exceptions))
if len(exceptions):
print(exceptions.most_common())
classification_report = sklearn.metrics.classification_report(true, preds)
print(classification_report)
if filepath:
filepath = textacy.utils.to_path(filepath).resolve()
with filepath.open(mode="wt", encoding="utf-8") as f:
f.write(classification_report)
return true, preds