Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""
It takes an octal string and return a string
:octal_str: octal str like "110 145 154"
"""
str_converted = []
octal_seq = text.split(" ")
if len(octal_seq) == 1:
# Concatted octal must be formed of octal triplets
if len(text) % 3 != 0:
return None
octal_seq = [text[i:i+3] for i in range(0, len(text), 3)]
logger.trace(f"Trying chunked octal {octal_seq}")
try:
for octal_char in octal_seq:
if len(octal_char) > 3:
logger.trace(f"Octal subseq too long")
n = int(octal_char, 8)
if n < 0: # n cannot be greater than 255, as we checked that with the earlier length check
logger.trace(f"Non octal char {octal_char}")
return None
str_converted.append(n)
return bytes(str_converted)
# Catch bad octal chars
except ValueError:
return None
basedir = os.path.dirname(__file__)
for root, dirs, files in os.walk(basedir):
py_path = '.'.join(os.path.split(os.path.relpath(root, basedir))).strip('.')
for f in files:
modname, ext = os.path.splitext(f)
if modname.startswith('__') or ext != '.py':
continue
if py_path:
module_path = f'veros.core.{py_path}.{modname}'
else:
module_path = f'veros.core.{modname}'
logger.trace('importing {}', module_path)
try:
importlib.import_module(module_path)
except ImportError:
pass
def get_resource(self, res_name: str, t: Optional[Type] = None) -> Any:
logger.trace(f"Loading resource {res_name} of type {t}")
# FIXME: Actually returns obj of type `t`, but python is bad
loader, name = split_resource_name(res_name)
if t is None:
return self(_fwd.registry.get_named(loader, ResourceLoader))(name)
else:
return self(_fwd.registry.get_named(loader, ResourceLoader[t]))(name)
# the valve itself was given
if setting in valve.mapping:
logger.trace(f"{setting} in {repr(valve)}'s mapping.")
kwargs["setting"] = valve.mapping[setting]
# the valve's name was given
# in this case, we get the mapped valve with that name
# we don't have to worry about duplicate names since that's checked later
elif setting in [c.name for c in valve.mapping]:
logger.trace(f"{setting} in {repr(valve)}'s mapping.")
mapped_component = [c for c in valve.mapping if c.name == setting]
kwargs["setting"] = valve.mapping[mapped_component[0]]
# the user gave the actual port mapping number
elif setting in valve.mapping.values() and isinstance(setting, int):
logger.trace(f"User supplied manual setting for {valve}")
else:
raise ValueError(f"Invalid setting {setting} for {repr(valve)}.")
return kwargs
def base58_ripple(self, text: str, alphabet: str):
logger.trace("Attempting Base58 ripple alphabet")
return self._dispatch(base58.b58decode, text, "base58_ripple", alphabet=alphabet)
def evaluate(self, node: Node) -> (bool, Union[List[SearchLevel], List[Node]]):
# logger.debug(f"Evaluating {node}")
res = node.cracker.attemptCrack(node.parents[-1].result.value)
# Detect if we succeeded, and if deduplication is needed
logger.trace(f"Got {len(res)} results")
ret = []
for i in res:
success, res = self.expand(
node.parents
+ [SearchLevel(name=type(node.cracker).__name__.lower(), result=i)]
)
if success:
return True, res
ret.extend(res)
return False, ret
if args.use_hash:
logger.log('NORMAL', "Comparing hashes")
existing_songs = []
google_client_ids = {song.get('clientId', '') for song in get_google_songs(mc)}
for song in local_songs:
if generate_client_id(song) not in google_client_ids:
missing_songs.append(song)
else:
existing_songs.append(song)
logger.info("Found {} songs already exist by audio hash", len(existing_songs))
if logger._core.min_level <= 5:
for song in natsorted(existing_songs):
logger.trace(song)
if args.use_metadata:
if args.use_hash:
local_songs = missing_songs
if local_songs:
logger.log('NORMAL', "Comparing metadata")
google_songs = get_google_songs(mm, filters=args.filters)
missing_songs = natsorted(
gm_utils.find_missing_items(
local_songs,
google_songs,
fields=['artist', 'album', 'title', 'tracknumber'],
normalize_values=True
def _load_auth_token(self):
try:
try:
token = methods.get_setting_value('auth_token')
logger.trace(f"Loaded auth_token: {token}")
if not token:
return {}
return json.loads(token)
except Exception:
return {}
except Exception:
logger.exception("Exception loading auth_token from cache: ")
return {}
mw_path.mkdir()
except FileExistsError:
pass
log_file = mw_path / Path(self.experiment_id + ".log.jsonl")
# automatically configure a logger to persist the logs
self._file_logger_id = logger.add(
log_file,
level=verbosity.upper()
if log_file_verbosity is None
else log_file_verbosity.upper(),
compression=log_file_compression,
serialize=True,
enqueue=True,
)
logger.trace(f"File logger ID is {self._file_logger_id}")
# for typing's sake
assert isinstance(log_file, (str, os.PathLike))
# determine the log file's path
if log_file_compression is not None:
self._log_file = Path(log_file)
self._log_file = self._log_file.with_suffix(
self._log_file.suffix + "." + log_file_compression
)
else:
self._log_file = Path(log_file)
if data_file:
# automatically log to the mw directory
if data_file is True:
def __enter__(self):
logger.trace(f"Entering context for {self}")
return self