Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _avoid_invalidating_lineage(config, lineage, original_server):
"Do not renew a valid cert with one from a staging server!"
# Some lineages may have begun with --staging, but then had production certs
# added to them
latest_cert = OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_PEM, open(lineage.cert).read())
# all our test certs are from happy hacker fake CA, though maybe one day
# we should test more methodically
now_valid = "fake" not in repr(latest_cert.get_issuer()).lower()
if util.is_staging(config.server):
if not util.is_staging(original_server) or now_valid:
if not config.break_my_certs:
names = ", ".join(lineage.names())
raise errors.Error(
"You've asked to renew/replace a seemingly valid certificate with "
"a test certificate (domains: {0}). We will not do that "
"unless you use the --break-my-certs flag!".format(names))
def get_certnames(config, verb, allow_multiple=False, custom_prompt=None):
"""Get certname from flag, interactively, or error out.
"""
certname = config.certname
if certname:
certnames = [certname]
else:
disp = zope.component.getUtility(interfaces.IDisplay)
filenames = storage.renewal_conf_files(config)
choices = [storage.lineagename_for_filename(name) for name in filenames]
if not choices:
raise errors.Error("No existing certificates found.")
if allow_multiple:
if not custom_prompt:
prompt = "Which certificate(s) would you like to {0}?".format(verb)
else:
prompt = custom_prompt
code, certnames = disp.checklist(
prompt, choices, cli_flag="--cert-name", force_interactive=True)
if code != display_util.OK:
raise errors.Error("User ended interaction.")
else:
if not custom_prompt:
prompt = "Which certificate would you like to {0}?".format(verb)
else:
prompt = custom_prompt
code, index = disp.menu(
:param str domain: Domain of the challb
:returns: Appropriate AnnotatedChallenge
:rtype: :class:`certbot.achallenges.AnnotatedChallenge`
"""
chall = challb.chall
logger.info("%s challenge for %s", chall.typ, domain)
if isinstance(chall, challenges.KeyAuthorizationChallenge):
return achallenges.KeyAuthorizationAnnotatedChallenge(
challb=challb, domain=domain, account_key=account_key)
elif isinstance(chall, challenges.DNS):
return achallenges.DNS(challb=challb, domain=domain)
else:
raise errors.Error(
"Received unsupported challenge of type: {0}".format(chall.typ))
return parsed_args
self.defaults = dict((key, copy.deepcopy(self.parser.get_default(key)))
for key in vars(parsed_args))
# Do any post-parsing homework here
if self.verb == "renew":
if parsed_args.force_interactive:
raise errors.Error(
"{0} cannot be used with renew".format(
constants.FORCE_INTERACTIVE_FLAG))
parsed_args.noninteractive_mode = True
if parsed_args.force_interactive and parsed_args.noninteractive_mode:
raise errors.Error(
"Flag for non-interactive mode and {0} conflict".format(
constants.FORCE_INTERACTIVE_FLAG))
if parsed_args.staging or parsed_args.dry_run:
self.set_test_server(parsed_args)
if parsed_args.csr:
self.handle_csr(parsed_args)
if parsed_args.must_staple:
parsed_args.staple = True
if parsed_args.validate_hooks:
hooks.validate_hooks(parsed_args)
if parsed_args.allow_subset_of_names:
).format(lineage.configfile.filename, br=os.linesep)
if config.verb == "run":
keep_opt = "Attempt to reinstall this existing certificate"
elif config.verb == "certonly":
keep_opt = "Keep the existing certificate for now"
choices = [keep_opt,
"Renew & replace the cert (limit ~5 per 7 days)"]
display = zope.component.getUtility(interfaces.IDisplay)
response = display.menu(question, choices,
default=0, force_interactive=True)
if response[0] == display_util.CANCEL:
# TODO: Add notification related to command-line options for
# skipping the menu for this case.
raise errors.Error(
"Operation canceled. You may re-run the client.")
elif response[1] == 0:
return "reinstall", lineage
elif response[1] == 1:
return "renew", lineage
raise AssertionError('This is impossible')
cli_flag="--expand",
force_interactive=True):
return "renew", cert
else:
reporter_util = zope.component.getUtility(interfaces.IReporter)
reporter_util.add_message(
"To obtain a new certificate that contains these names without "
"replacing your existing certificate for {0}, you must use the "
"--duplicate option.{br}{br}"
"For example:{br}{br}{1} --duplicate {2}".format(
existing,
sys.argv[0], " ".join(sys.argv[1:]),
br=os.linesep
),
reporter_util.HIGH_PRIORITY)
raise errors.Error(USER_CANCELLED)
:param .JWK account_key: Authorized Account Key
:param str domain: Domain of the challb
:returns: Appropriate AnnotatedChallenge
:rtype: :class:`certbot.achallenges.AnnotatedChallenge`
"""
chall = challb.chall
logger.info("%s challenge for %s", chall.typ, domain)
if isinstance(chall, challenges.KeyAuthorizationChallenge):
return achallenges.KeyAuthorizationAnnotatedChallenge(
challb=challb, domain=domain, account_key=account_key)
elif isinstance(chall, challenges.DNS):
return achallenges.DNS(challb=challb, domain=domain)
raise errors.Error(
"Received unsupported challenge of type: {0}".format(chall.typ))
:raises acme.errors.Error: In case of any protocol problems.
:returns: Newly registered and saved account, as well as protocol
API handle (should be used in `Client` initialization).
:rtype: `tuple` of `.Account` and `acme.client.Client`
"""
# Log non-standard actions, potentially wrong API calls
if account_storage.find_all():
logger.info("There are already existing accounts for %s", config.server)
if config.email is None:
if not config.register_unsafely_without_email:
msg = ("No email was provided and "
"--register-unsafely-without-email was not present.")
logger.warning(msg)
raise errors.Error(msg)
if not config.dry_run:
logger.info("Registering without email!")
# If --dry-run is used, and there is no staging account, create one with no email.
if config.dry_run:
config.email = None
# Each new registration shall use a fresh new key
rsa_key = generate_private_key(
public_exponent=65537,
key_size=config.rsa_key_size,
backend=default_backend())
key = jose.JWKRSA(key=jose.ComparableRSAKey(rsa_key))
acme = acme_from_config_key(config, key)
# TODO: add phone?
regr = perform_registration(acme, config, tos_cb)
try:
with open(renewable_cert.chain_path) as chain_file: # type: IO[str]
chain = chain_file.read()
with open(renewable_cert.cert_path) as cert_file: # type: IO[str]
cert = cert_file.read()
with open(renewable_cert.fullchain_path) as fullchain_file: # type: IO[str]
fullchain = fullchain_file.read()
if (cert + chain) != fullchain:
error_str = "fullchain does not match cert + chain for {0}!"
error_str = error_str.format(renewable_cert.lineagename)
raise errors.Error(error_str)
except IOError as e:
error_str = "reading one of cert, chain, or fullchain has failed: {0}".format(e)
logger.exception(error_str)
raise errors.Error(error_str)
except errors.Error as e:
raise e