Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def log_errors(out_path, err_path):
error_filter = ["[fail ]", "[fatal]"]
try:
errors = 0
tail_lines = deque(maxlen=10)
with open(out_path, "r", errors="replace") as lines:
for line in lines:
stripped_line = line.rstrip()
tail_lines.append(stripped_line)
if any(x in stripped_line for x in error_filter):
LOG.error("{}: {}".format(out_path, stripped_line))
errors += 1
if errors:
LOG.info("{} errors found, printing end of output for context:", errors)
for line in tail_lines:
LOG.info(line)
try:
with open(err_path, "r") as lines:
LOG.error("contents of {}:".format(err_path))
LOG.error(lines.read())
except IOError:
LOG.exception("Could not read err output {}".format(err_path))
except IOError:
LOG.exception("Could not check output {} for errors".format(out_path))
elif package_list_file_valid == 2:
logger.error(
"Package file is a directory, please remove `.gitget.yaml` and run `gitget setup`"
)
exit(1)
elif package_list_file_valid == 0:
logger.debug("Package file found")
# try loading the file
logger.debug("Attempting to load file")
try:
with open(package_list_filepath) as file:
package_list = yaml.safe_load(file)
except Exception as ex:
logger.error("Could not load package list due to the following error:")
logger.error(ex)
exit(1)
logger.debug("Package list loaded")
# if the list is NONE, set to an empty dictionary to prevent iteration errors
logger.debug("Checking if package list is None")
if package_list is None:
package_list = {}
logger.debug("Package list has no content, set to empty dict")
return package_list
def create_group(name, domain):
global google, cfg
# create group
logger.info(f"Creating group named: {name} - {name}@{domain}")
success, group = google.create_group(name, domain)
if success:
logger.info(f"Created group {name!r}:\n{group}")
sys.exit(0)
else:
logger.error(f"Failed to create group {name!r}:\n{group}")
sys.exit(1)
if raw.startswith("content"):
raw = f"body{raw[len('content'):]}"
elif raw.startswith("json"):
raw = f"body{raw[len('json'):]}"
raw_list = []
for item in raw.split("."):
if "-" in item:
# add quotes for field with separator
# e.g. headers.Content-Type => headers."Content-Type"
item = item.strip('"')
raw_list.append(f'"{item}"')
elif item.isdigit():
# convert lst.0.name to lst[0].name
if len(raw_list) == 0:
logger.error(f"Invalid jmespath: {raw}")
sys.exit(1)
last_item = raw_list.pop()
item = f"{last_item}[{item}]"
raw_list.append(item)
else:
raw_list.append(item)
return ".".join(raw_list)
exit(1)
elif package_list_file_valid == 2:
logger.error(
"Package file is a directory, please remove `.gitget.yaml` and run `gitget setup`"
)
exit(1)
elif package_list_file_valid == 0:
logger.debug("Package file found")
# try loading the file
logger.debug("Attempting to load file")
try:
with open(package_list_filepath) as file:
package_list = yaml.safe_load(file)
except Exception as ex:
logger.error("Could not load package list due to the following error:")
logger.error(ex)
exit(1)
logger.debug("Package list loaded")
# if the list is NONE, set to an empty dictionary to prevent iteration errors
logger.debug("Checking if package list is None")
if package_list is None:
package_list = {}
logger.debug("Package list has no content, set to empty dict")
return package_list
def main() -> None:
parser = get_argument_parser()
arguments = parser.parse_args()
logger.stop(0)
logger.add(sys.stderr, level=arguments.logging_level)
# noinspection PyBroadException
try:
cli_main(arguments)
except KeyboardInterrupt:
logger.info("Aborted")
except ConfigError as error:
logger.error("Invalid configuration: {}", error)
except MissingDependencyError as error:
logger.error("Missing dependency: {}", error)
except Exception:
logger.exception("Unexpected error occurred:")
if result is not False:
return result
if fail_callback:
fail_callback()
now = time.time()
if not wait_start:
wait_start = now
else:
waiting = now - wait_start
msg = wait_message.format(f"for {round(waiting, 1)} seconds")
if timeout is not None and waiting >= timeout:
logger.error(msg)
if timeout_callback:
if inspect.isclass(timeout_callback):
raise timeout_callback()
else:
timeout_callback()
else:
logger.debug(msg)
if wait_fn:
wait_fn()
try:
assert_func(check_value, expect_value)
validate_msg += "\t==> pass"
logger.info(validate_msg)
validator_dict["check_result"] = "pass"
except AssertionError:
validate_pass = False
validator_dict["check_result"] = "fail"
validate_msg += "\t==> fail"
validate_msg += f"\n" \
f"check_item: {check_item}\n" \
f"check_value: {check_value}({type(check_value).__name__})\n" \
f"assert_method: {assert_method}\n" \
f"expect_value: {expect_value}({type(expect_value).__name__})"
logger.error(validate_msg)
failures.append(validate_msg)
self.validation_results["validate_extractor"].append(validator_dict)
if not validate_pass:
failures_string = "\n".join([failure for failure in failures])
raise ValidationFailure(failures_string)
async def check_webhook():
from app.misc import bot
webhook = await bot.get_webhook_info()
if webhook.url and webhook.url == config.WEBHOOK_URL:
return True, f"Webhook configured. Pending updates count {webhook.pending_update_count}"
else:
logger.error("Configured wrong webhook URL {webhook}", webhook=webhook.url)
return False, "Configured invalid webhook URL"
logger.debug(data.keys())
if key == search:
try:
search_ret.append(data[key])
except (KeyError, ValueError, IndexError, TypeError, AttributeError):
pass
if search_ret:
val = search_ret
else:
val = default
except (KeyError, ValueError, IndexError, TypeError, AttributeError):
val = default
if s2:
logger.error('8')
logger.error(data)
if s2 in data:
logger.error('contains')
val = 'contains!'
if checknone:
if val is None or val == default:
raise ValueError('value not found for search path: "%s"' % path)
return val