Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
elif option in ('-c', '--list-commands'):
assert len(arguments) == 1, \
"Please provide the name of a guest as the 1st and only positional argument!"
actions.append(functools.partial(context.print_commands, arguments[0]))
elif option in ('-e', '--execute'):
assert len(arguments) == 1, \
"Please provide the name of a guest as the 1st and only positional argument!"
actions.append(functools.partial(context.execute_command, arguments[0], value))
elif option in ('-t', '--timeout'):
context.timeout = int(value)
elif option in ('-d', '--daemon'):
actions.append(HostDaemon)
elif option in ('-v', '--verbose'):
coloredlogs.increase_verbosity()
elif option in ('-q', '--quiet'):
coloredlogs.decrease_verbosity()
elif option in ('-h', '--help'):
usage(__doc__)
sys.exit(0)
if not actions:
usage(__doc__)
sys.exit(0)
except Exception:
warning("Failed to parse command line arguments!")
sys.exit(1)
# Execute the requested action(s).
try:
for action in actions:
action()
except GuestDiscoveryError as e:
# Don't spam the logs with tracebacks when the libvirt daemon is down.
logger.error("%s", e)
])
for option, value in options:
if option in ('-l', '--list-commands'):
list_commands = True
elif option in ('-e', '--execute'):
execute_command = value
elif option in ('-d', '--daemon'):
start_daemon = True
elif option in ('-t', '--timeout'):
timeout = int(value)
elif option in ('-c', '--character-device'):
character_device = value
elif option in ('-v', '--verbose'):
coloredlogs.increase_verbosity()
elif option in ('-q', '--quiet'):
coloredlogs.decrease_verbosity()
elif option in ('-h', '--help'):
usage(__doc__)
sys.exit(0)
if not (list_commands or execute_command or start_daemon):
usage(__doc__)
sys.exit(0)
except Exception:
warning("Error: Failed to parse command line arguments!")
sys.exit(1)
# Start the guest daemon.
try:
if not character_device:
channel_name = HOST_TO_GUEST_CHANNEL_NAME if start_daemon else GUEST_TO_HOST_CHANNEL_NAME
character_device = find_character_device(channel_name)
if start_daemon:
agent = GuestAgent(character_device=character_device, retry=False)
])
for option, value in options:
if option in ('-e', '--exclusive'):
exclusive = True
elif option in ('-T', '--lock-timeout'):
lock_timeout = parse_timespan(value)
elif option in ('-l', '--lock-file'):
lock_name = value
elif option in ('-t', '--timeout'):
command_timeout = parse_timespan(value)
elif option in ('-f', '--fudge-factor'):
fudge_factor = parse_timespan(value)
elif option in ('-v', '--verbose'):
coloredlogs.increase_verbosity()
elif option in ('-q', '--quiet'):
coloredlogs.decrease_verbosity()
elif option in ('-h', '--help'):
usage(__doc__)
sys.exit(0)
else:
assert False, "Unhandled option!"
# Make sure the operator provided a program to execute.
if not arguments:
usage(__doc__)
sys.exit(0)
# Make sure the program actually exists.
program_name = arguments[0]
if not os.path.isfile(program_name):
# Only search the $PATH if the given program name
# doesn't already include one or more path segments.
if program_name == os.path.basename(program_name):
matching_programs = which(program_name)
kw['io_scheduling_class'] = value
elif option in ('-c', '--config'):
kw['config_file'] = parse_path(value)
elif option in ('-u', '--use-sudo'):
use_sudo = True
elif option in ('-n', '--dry-run'):
logger.info("Performing a dry run (because of %s option) ..", option)
kw['dry_run'] = True
elif option in ('-C', '--removal-command'):
removal_command = shlex.split(value)
logger.info("Using custom removal command: %s", removal_command)
kw['removal_command'] = removal_command
elif option in ('-v', '--verbose'):
coloredlogs.increase_verbosity()
elif option in ('-q', '--quiet'):
coloredlogs.decrease_verbosity()
elif option in ('-h', '--help'):
usage(__doc__)
return
else:
assert False, "Unhandled option! (programming error)"
if rotation_scheme:
logger.verbose("Rotation scheme defined on command line: %s", rotation_scheme)
if arguments:
# Rotation of the locations given on the command line.
location_source = 'command line arguments'
selected_locations.extend(coerce_location(value, sudo=use_sudo) for value in arguments)
else:
# Rotation of all configured locations.
location_source = 'configuration file'
selected_locations.extend(
location for location, rotation_scheme, options
# This will not return.
os.execvp('pip', ['pip'] + arguments)
else:
arguments = [arg for arg in arguments if arg != 'install']
config = Config()
# Initialize logging output.
coloredlogs.install(
fmt=config.log_format,
level=config.log_verbosity,
)
# Adjust verbosity based on -v, -q, --verbose, --quiet options.
for argument in list(arguments):
if match_option(argument, '-v', '--verbose'):
coloredlogs.increase_verbosity()
elif match_option(argument, '-q', '--quiet'):
coloredlogs.decrease_verbosity()
# Perform the requested action(s).
try:
accelerator = PipAccelerator(config)
accelerator.install_from_arguments(arguments)
except NothingToDoError as e:
# Don't print a traceback for this (it's not very user friendly) and
# exit with status zero to stay compatible with pip. For more details
# please refer to https://github.com/paylogic/pip-accel/issues/47.
logger.warning("%s", e)
sys.exit(0)
except Exception:
logger.exception("Caught unhandled exception!")
sys.exit(1)
elif option in ("-l", "--list"):
action = list_matching_entries
elif option in ("-n", "--no-clipboard"):
show_opts["use_clipboard"] = False
elif option in ("-p", "--password-store"):
stores = program_opts.setdefault("stores", [])
stores.append(PasswordStore(directory=value))
elif option in ("-f", "--filter"):
show_opts["filters"].append(value)
elif option in ("-x", "--exclude"):
program_opts["exclude_list"].append(value)
elif option in ("-v", "--verbose"):
coloredlogs.increase_verbosity()
verbosity += 1
elif option in ("-q", "--quiet"):
coloredlogs.decrease_verbosity()
verbosity -= 1
elif option in ("-h", "--help"):
usage(__doc__)
return
else:
raise Exception("Unhandled option! (programming error)")
if not (arguments or action == list_matching_entries):
usage(__doc__)
return
except Exception as e:
warning("Error: %s", e)
sys.exit(1)
# Execute the requested action.
try:
show_opts["quiet"] = verbosity < 0
kw = show_opts if action == show_matching_entry else {}
ubuntu_mode = True
elif option in ('-x', '--exclude'):
actions.insert(0, functools.partial(updater.ignore_mirror, value))
elif option in ('-m', '--max'):
limit = int(value)
elif option in ('-v', '--verbose'):
coloredlogs.increase_verbosity()
elif option in ('-V', '--version'):
output("Version: %s on Python %i.%i", updater_version, sys.version_info[0], sys.version_info[1])
return
elif option in ('-C', '--codename'):
codename = value
elif option in ('-R', '--create-chroot'):
chroot_path = value
elif option in ('-q', '--quiet'):
coloredlogs.decrease_verbosity()
elif option in ('-h', '--help'):
usage(__doc__)
return
else:
assert False, "Unhandled option!"
if codename and not chroot_path:
assert False, "--codename must be used with valid -R to specify chroot path"
if chroot_path:
actions.append(functools.partial(updater.create_chroot, chroot_path, codename=codename))
if not actions:
usage(__doc__)
return
# Propagate options to the Python API.
updater.max_mirrors = limit
updater.url_char_len = url_char_len
updater.ubuntu_mode = ubuntu_mode
elif option in ('-n', '--dry-run'):
logger.info("Performing a dry run (because of %s option) ..", option)
program_opts['dry_run'] = True
elif option in ('-f', '--force'):
program_opts['force'] = True
elif option in ('-x', '--exclude'):
program_opts.setdefault('exclude_list', [])
program_opts['exclude_list'].append(value)
elif option == '--multi-fs':
program_opts['multi_fs'] = True
elif option == '--disable-notifications':
program_opts['notifications_enabled'] = False
elif option in ('-v', '--verbose'):
coloredlogs.increase_verbosity()
elif option in ('-q', '--quiet'):
coloredlogs.decrease_verbosity()
elif option in ('-h', '--help'):
usage(__doc__)
return
else:
raise Exception("Unhandled option! (programming error)")
if len(arguments) > 2:
msg = "Expected one or two positional arguments! (got %i)"
raise Exception(msg % len(arguments))
if len(arguments) == 2:
# Get the source from the first of two arguments.
program_opts['source'] = arguments.pop(0)
if arguments:
# Get the destination from the second (or only) argument.
dest_opts['expression'] = arguments[0]
program_opts['destination'] = Destination(**dest_opts)
elif not os.environ.get('RSYNC_MODULE_PATH'):