Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
option_requires(
option='--some-option',
requires_all=['--set_option'],
scope=locals()
)
option_requires(
option='--some-option',
requires_any=[
'--set_option',
'--unset-option'],
scope=locals()
)
with pytest.raises(UsageError):
option_requires(
option='--some-option',
requires_all=[
'--set-option',
'--unset-option'],
scope=locals()
)
with pytest.raises(UsageError):
option_requires(
option='--some-option',
requires_any=[
'--unset-option'],
scope=locals()
)
conditional_value='magic',
requires_any=[
'--unset-option'],
scope=locals()
)
some_option = ''
option_requires(
option='--some-option',
conditional_value='',
requires_any=[
'--unset-option'],
scope=locals()
)
with pytest.raises(UsageError):
some_option = 'magic'
option_requires(
option='--some-option',
conditional_value='magic',
requires_any=[
'--unset-option'],
scope=locals()
)
def get_latest_commit(github_repository: str):
"""
Get the latest commit on the default branch of a repository hosted on GitHub.
"""
parsed_url = urllib.parse.urlparse(github_repository)
repo_domain, repo_path = parsed_url.netloc, parsed_url.path.strip('/')
if repo_domain != 'github.com':
raise UsageError(
"Error: Getting the latest commit is only supported "
"for repositories hosted on GitHub. "
"Provided repository domain was: {d}".format(d=repo_domain))
url = "https://api.github.com/repos/{rp}/commits".format(rp=repo_path)
try:
with urllib.request.urlopen(url) as response:
result = json.loads(response.read().decode('utf-8'))
return result[0]['sha']
except Exception as e:
raise Exception(
"Could not get latest commit for repository: {r}"
.format(r=repo_path)) from e
# See: http://click.pocoo.org/6/api/#click.Context
cli(obj={})
except botocore.exceptions.NoCredentialsError:
raise Error(
"Flintrock could not find your AWS credentials. "
"You can fix this is by providing your credentials "
"via environment variables or by creating a shared "
"credentials file.\n"
"For more information see:\n"
" * https://boto3.readthedocs.io/en/latest/guide/configuration.html#environment-variables\n"
" * https://boto3.readthedocs.io/en/latest/guide/configuration.html#shared-credentials-file"
)
except NothingToDo as e:
print(e)
return 0
except UsageError as e:
print(e, file=sys.stderr)
return 2
except Error as e:
print(e, file=sys.stderr)
return 1
if required_name not in scope or not scope[required_name]:
raise UsageError(
"Error: Missing option \"{missing_option}\" is required by "
"\"{option}{space}{conditional_value}\"."
.format(
missing_option=required_option,
option=option,
space=' ' if conditional_value is not None else '',
conditional_value=conditional_value if conditional_value is not None else ''))
if requires_any:
for required_option in requires_any:
required_name = option_name_to_variable_name(required_option)
if required_name in scope and scope[required_name] is not None:
break
else:
raise UsageError(
"Error: \"{option}{space}{conditional_value}\" requires at least "
"one of the following options to be set: {at_least}"
.format(
option=option,
space=' ' if conditional_value is not None else '',
conditional_value=conditional_value if conditional_value is not None else '',
at_least=', '.join(['"' + ra + '"' for ra in requires_any])))
This function looks for values by converting the option names to their
corresponding variable names (e.g. --option-a becomes option_a) and looking them
up in the provided scope.
"""
mutually_exclusive_names = [option_name_to_variable_name(o) for o in options]
used_options = set()
for name, value in scope.items():
if name in mutually_exclusive_names and scope[name]: # is not None:
used_options.add(name)
if len(used_options) > 1:
bad_option1 = used_options.pop()
bad_option2 = used_options.pop()
raise UsageError(
"Error: \"{option1}\" and \"{option2}\" are mutually exclusive.\n"
" {option1}: {value1}\n"
" {option2}: {value2}"
.format(
option1=variable_name_to_option_name(bad_option1),
value1=scope[bad_option1],
option2=variable_name_to_option_name(bad_option2),
value2=scope[bad_option2]))
class NothingToDo(Exception):
pass
class UsageError(Exception):
pass
class UnsupportedProviderError(UsageError):
def __init__(self, provider: str):
super().__init__(
"This provider is not supported: {p}".format(p=provider))
self.provider = provider
class Error(Exception):
pass
class ClusterNotFound(Error):
pass
class ClusterAlreadyExists(Error):
pass