Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Create a random domains.
d = random_domain()
# Configure the chall srv to SERVFAIL all queries for that domain.
challSrv.add_servfail_response(d)
# Expect a DNS problem with a detail that matches a regex
expectedProbType = "dns"
expectedProbRegex = re.compile(r"DNS problem: SERVFAIL looking up (A|AAAA|TXT|CAA) for {0}".format(d))
# Try and issue for the domain with the given challenge type.
failed = False
try:
chisel2.auth_and_issue([d], client=client, chall_type=chalType)
except acme_errors.ValidationError as e:
# Mark that the auth_and_issue failed
failed = True
# Extract the failed challenge from each failed authorization
for authzr in e.failed_authzrs:
c = None
if chalType == "http-01":
c = chisel2.get_chall(authzr, challenges.HTTP01)
elif chalType == "dns-01":
c = chisel2.get_chall(authzr, challenges.DNS01)
elif chalType == "tls-alpn-01":
c = chisel2.get_chall(authzr, challenges.TLSALPN01)
else:
raise(Exception("Invalid challenge type requested: {0}".format(challType)))
# The failed challenge's error should match expected
error = c.error
def certificate(self, cert, name, alt_host=None, port=443):
"""Verifies the certificate presented at name is cert"""
if alt_host is None:
host = socket.gethostbyname(name)
elif isinstance(alt_host, six.binary_type):
host = alt_host
else:
host = alt_host.encode()
name = name if isinstance(name, six.binary_type) else name.encode()
try:
presented_cert = crypto_util.probe_sni(name, host, port)
except acme_errors.Error as error:
logger.exception(str(error))
return False
return presented_cert.digest("sha256") == cert.digest("sha256")
def _poll_validations_and_fetch_crt(ctx, csr, authzrs):
try:
logger.info('validating requests')
crt, updated_authzrs = ctx.obj.acme.poll_and_request_issuance(csr, authzrs)
except errors.PollError as e:
if e.exhausted:
logger.error('validation timed out for the following domains: %s', ', '.join(authzr.body.identifier for
authzr in e.exhausted))
invalid_domains = [(e_authzr.body.identifier.value, _get_http_challenge(ctx, e_authzr).error.detail)
for e_authzr in e.updated.values() if e_authzr.body.status == messages.STATUS_INVALID]
if invalid_domains:
logger.error('validation invalid for the following domains:')
for invalid_domain in invalid_domains:
logger.error('%s: %s' % invalid_domain)
ctx.exit(1)
return crt
:return: The response, unmodified.
"""
nonce = response.headers.getRawHeaders(
REPLAY_NONCE_HEADER, [None])[0]
with LOG_JWS_ADD_NONCE(raw_nonce=nonce) as action:
if nonce is None:
raise errors.MissingNonce(response)
else:
try:
decoded_nonce = Header._fields['nonce'].decode(
nonce.decode('ascii')
)
action.add_success_fields(nonce=decoded_nonce)
except DeserializationError as error:
raise errors.BadNonce(nonce, error)
self._nonces.add(decoded_nonce)
return response
def issue(client, authzs, cert_output=None):
"""Given a list of authzs that are being processed by the server,
wait for them to be ready, then request issuance of a cert with a random
key for the given domains.
If cert_output is provided, write the cert as a PEM file to that path."""
csr = make_csr([authz.body.identifier.value for authz in authzs])
cert_resource = None
try:
cert_resource, _ = client.poll_and_request_issuance(jose.ComparableX509(csr), authzs)
except acme_errors.PollError as error:
# If we get a PollError, pick the first failed authz and turn it into a more
# useful ValidationError that contains details we can look for in tests.
for authz in error.updated:
updated_authz = json.loads(urllib2.urlopen(authz.uri).read())
domain = authz.body.identifier.value,
for c in updated_authz['challenges']:
if 'error' in c:
err = c['error']
raise ValidationError(domain, err['type'], err['detail'])
# If none of the authz's had an error, just re-raise.
raise
if cert_output is not None:
pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
cert_resource.body)
with open(cert_output, 'w') as f:
f.write(pem)
def _add_nonce(self, response):
if self.REPLAY_NONCE_HEADER in response.headers:
nonce = response.headers[self.REPLAY_NONCE_HEADER]
try:
decoded_nonce = jws.Header._fields['nonce'].decode(nonce)
except jose.DeserializationError as error:
raise errors.BadNonce(nonce, error)
logger.debug('Storing nonce: %s', nonce)
self._nonces.add(decoded_nonce)
else:
raise errors.MissingNonce(response)
in subsequent orders.
:param messages.OrderResource orderr: must have authorizations filled in
:returns: tuple of list of successfully deactivated authorizations, and
list of unsuccessfully deactivated authorizations.
:rtype: tuple
"""
to_deactivate = [authzr for authzr in orderr.authorizations
if authzr.body.status == messages.STATUS_VALID]
deactivated = []
failed = []
for authzr in to_deactivate:
try:
authzr = self.acme.deactivate_authorization(authzr)
deactivated.append(authzr)
except acme_errors.Error as e:
failed.append(authzr)
logger.debug('Failed to deactivate authorization %s: %s', authzr.uri, e)
return (deactivated, failed)
def _add_nonce(self, response):
"""
Store a nonce from a response we received.
:param twisted.web.iweb.IResponse response: The HTTP response.
:return: The response, unmodified.
"""
nonce = response.headers.getRawHeaders(
REPLAY_NONCE_HEADER, [None])[0]
with LOG_JWS_ADD_NONCE(raw_nonce=nonce) as action:
if nonce is None:
raise errors.MissingNonce(response)
else:
try:
decoded_nonce = Header._fields['nonce'].decode(
nonce.decode('ascii')
)
action.add_success_fields(nonce=decoded_nonce)
except DeserializationError as error:
raise errors.BadNonce(nonce, error)
self._nonces.add(decoded_nonce)
return response
def finalize_order(client, order):
"""Finalize the specified order and return the order resource."""
try:
finalized_order = client.poll_and_finalize(order)
except acme_errors.PollError as error:
if error.timeout:
logger.error(
'Timed out while waiting for CA to verify '
'challenge(s) for the following authorizations: %s',
', '.join(authzr.uri for _, authzr in error.exhausted)
)
raise Error('Challenge validation has failed, see error log.')
except acme_errors.ValidationError as error:
logger.error("CA marked some of the authorizations as invalid, "
"which likely means it could not access "
"http://example.com/.well-known/acme-challenge/X. "
"Did you set correct path in -d example.com:path "
"or --default_root? Are all your domains accessible "
"from the internet? Please check your domains' DNS "
"entries, your host's network/firewall setup and "
"your webserver config. If a domain's DNS entry has "
"both A and AAAA fields set up, some CAs such as "
"Let's Encrypt will perform the challenge validation "
"over IPv6. If your DNS provider does not answer "
"correctly to CAA records request, Let's Encrypt "
"won't issue a certificate for your domain (see "
"https://letsencrypt.org/docs/caa/). Failing "
"authorizations: %s",
', '.join(authzr.uri for authzr in error.failed_authzrs))