Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_bbbike_subregion_downloads_index(subregion, update=False, verbose=True):
"""
:param subregion: [str]
:param update: [bool]
:param verbose: [bool]
:return:
"""
bbbike_subregion_index = get_bbbike_subregion_index(update=False)
subregion_name = fuzzywuzzy.process.extractOne(subregion, bbbike_subregion_index.Name)[0]
if verbose:
if subregion != subregion_name:
print("\"{}\" is not found. \n".format(subregion))
print("Trying to get downloads index for \"{}\" ... ".format(subregion_name), end="")
path_to_file = cd_dat_bbbike(subregion_name, subregion_name + "-url-index.pickle")
if os.path.isfile(path_to_file) and not update:
subregion_downloads_index = load_pickle(path_to_file)
if verbose:
print("Done.")
else:
try:
url = 'https://download.bbbike.org/osm/bbbike/{}/'.format(subregion_name)
source = urllib.request.urlopen(url)
def _question_loop(question, choices=None, default=None, fuzz_percentage=75):
# type: (str, Opt[List[str]], Opt[str], Opt[int]) -> str
while True:
answer = _input(question)
if answer is "" and default:
return default
else:
if choices:
fuzzed = fuzzywuzzy.process.extractOne(answer, choices)
if _answer_is_valid(fuzzed, fuzz_percentage):
return fuzzed[0]
else:
return answer
if fuzzy:
try:
from fuzzywuzzy import process
except ImportError:
raise ImportError("Fuzzy column name matching requires "
"`fuzzywuzzy`. Install with pip install "
"fuzzywuzzy.")
# FUTURETODO: could make this customizable too...
score_thresh = 90
matches = []
scores = []
for name in _valid_rv_names:
match, score = process.extractOne(name, lwr_cols)
matches.append(match)
scores.append(score)
scores = np.array(scores)
matches = np.array(matches)
# error if the best match is below threshold
if scores.max() < score_thresh:
raise RuntimeError("Failed to parse radial velocity data from "
"input table: No column names looked "
"good with fuzzy name matching.")
# check for multiple bests:
if np.sum(scores == scores.max()) > 1:
raise RuntimeError("Failed to parse radial velocity data from "
"input table: Multiple column names looked "
"good with fuzzy matching {}."
# stopMapping[obj.from_stop_txt] = from_to[0].name
# obj.to_stop = from_to[1]
# if not stopMapping.has_key(obj.to_stop_txt):
# stopMapping[obj.to_stop_txt] = from_to[1].name
# else: #Else we do fuzzy string matching against all possible values for stopname got from RouteDetails
stopnames = []
stopcodes = []
if RouteDetail.objects.filter(route=routeObj).count() == 0:
routeDoesNotExistErrors.append({'routeDetailDoesNotExist': routeObj.code})
continue
for r in RouteDetail.objects.filter(route=routeObj):
stopnames.append(r.stop.name)
stopcodes.append(r.stop.code)
from_fuzz = fuzzprocess.extractOne(thisRoute['from'], stopnames)
to_fuzz = fuzzprocess.extractOne(thisRoute['to'], stopnames)
#pdb.set_trace()
obj.from_stop = Stop.objects.filter(name=from_fuzz[0]).filter(code__in=stopcodes)[0]
obj.to_stop = Stop.objects.filter(name=to_fuzz[0]).filter(code__in=stopcodes)[0]
obj.save()
#pdb.set_trace()
# print thisRoute['rows'].keys()
for schedule in thisRoute['rows'].keys(): #loop through each schedule per UniqueRoute and save it
rows = thisRoute['rows'][schedule]
try:
depot = Depot.objects.get(code=row[6])
except:
depot = None #FIXME!! Catch depot errors based on findings
#pdb.set_trace()
for row in rows:
"""
interests = ["Javascript", "Ruby", "Java", "Python", "C#", "C", "Swift",
".NET", "HTML / CSS", "Mobile / IOS", "Full-Stack Developer",
"Data Science", "Back-End Developer", "Front-End Developer",
"Cyber Security", "I.T / SysAdmin", "Web Designer",
"Web Developer", "Mobile / Android"
]
keys = ['category', 'language']
some_items = [single_dict_item[key] for key in keys]
match_percent = 0
dict_key = None
interest = None
resource_val = None
for single_key in keys:
if single_dict_item[single_key]:
matched_string, percent = process.extractOne(single_dict_item[single_key], interests)
if percent > match_percent:
match_percent = percent
dict_key = single_key
interest = matched_string
resource_val = single_dict_item[single_key]
return MatchGroup(match_percent, dict_key, interest, resource_val, some_items)
def fuzzy_match(x, choices, scorer, cutoff):
return process.extractOne(
x, choices=choices, scorer=scorer, score_cutoff=cutoff
)
query (str): target string
choices (list): items to fuzzy match against
lowest_match_score (float): a score below this value is considered a fail
Returns:
(str): the closest match
(float): score between 0 and 1
"""
if query in self.failed_fuzzy_matches:
raise KeyError(f"Fuzzy match failed previously: {query}")
try:
match, score = self.successful_fuzzy_matches[query]
except KeyError:
# attempt a new fuzzy match
match, score = fuzzy_proc.extractOne(query=query,
choices=choices,
scorer=self.combo_fuzz)
if score < lowest_match_score:
if self.store_history:
self.failed_fuzzy_matches.add(query)
raise KeyError(f"Failed to fuzzy match: {query}")
if self.store_history:
self.successful_fuzzy_matches.update({query: (match, score)})
return match, score
previous = None
current = None
segment_index = 0
segment_sents = []
segments_ = []
individual_speakers = self.speaker_ids.keys()
for sent in chain(sents, ('',)):
speaker = next(
filter(lambda person: person in sent, chain(
individual_speakers, GENERIC_SPEAKERS)), None)
if speaker is not None:
current = speaker
logger.debug(
'Found speaker: {}, previous speaker {}'.format(current, previous))
else:
speaker, score = process.extractOne(sent, chain(
individual_speakers, GENERIC_SPEAKERS))
if score > APPROX_MATCH_THRESHOLD:
current = speaker
logger.debug(
'Found speaker: {} (approx. score {}/100), previous speaker: {}'.format(
current, score, previous))
if previous != current or sent == '':
if segment_sents:
segment_index += 1
segment = {
'id': '{}-{}'.format(self.id, segment_index),
'speaker': previous,
'text': ' '.join(segment_sents),
'bioguide_id': None,
}
if segment['speaker'] in self.speaker_ids:
genre_name = genre['name']
genre_id = genre['id']
station_id = self.__gmusic.create_station(genre_name, \
None, None, None, genre_id)
num_tracks = MAX_TRACKS
tracks = self.__gmusic.get_station_tracks(station_id, num_tracks)
tracks_added = self.__enqueue_tracks(tracks)
logging.info("Added %d tracks from %s to queue", tracks_added, genre_name)
if not tracks_added:
print_wrn("[Google Play Music] '{0}' No tracks found. "\
"Trying something else." \
.format(to_ascii(genre_name)))
del choices[genre_name]
choice_names.remove(genre_name)
choice_name = process.extractOne(arg, choice_names)[0]
genre = choices[choice_name]
print_wrn("[Google Play Music] Playing '{0}'." \
.format(to_ascii(genre['name'])))
self.__update_play_queue_order()
except KeyError:
raise KeyError("Genre not found : {0}".format(arg))
except CallFailure:
raise RuntimeError("Operation requires an Unlimited subscription.")