Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def do_http_challenges(client, authzs):
cleanup_tokens = []
challs = [get_chall(a, challenges.HTTP01) for a in authzs]
for chall_body in challs:
# Determine the token and key auth for the challenge
token = chall_body.chall.encode("token")
resp = chall_body.response(client.net.key)
keyauth = resp.key_authorization
# Add the HTTP-01 challenge response for this token/key auth to the
# challtestsrv
challSrv.add_http01_response(token, keyauth)
cleanup_tokens.append(token)
# Then proceed initiating the challenges with the ACME server
client.answer_challenge(chall_body, chall_body.response(client.net.key))
def cleanup():
def rand_http_chall(client):
d = random_domain()
csr_pem = chisel2.make_csr([d])
order = client.new_order(csr_pem)
authzs = order.authorizations
for a in authzs:
for c in a.body.challenges:
if isinstance(c.chall, challenges.HTTP01):
return d, c.chall
raise(Exception("No HTTP-01 challenge found for random domain authz"))
def perform2(self, achalls):
"""Perform achallenges without IDisplay interaction."""
responses = []
for achall in achalls:
if isinstance(achall.chall, challenges.HTTP01):
server = self.servers.run(
self.config.http01_port, challenges.HTTP01)
response, validation = achall.response_and_validation()
self.http_01_resources.add(
acme_standalone.HTTP01RequestHandler.HTTP01Resource(
chall=achall.chall, response=response,
validation=validation))
else: # tls-sni-01
server = self.servers.run(
self.config.tls_sni_01_port, challenges.TLSSNI01)
response, (cert, _) = achall.response_and_validation(
cert_key=self.key)
self.certs[response.z_domain] = (self.key, cert)
self.served[server].add(achall)
responses.append(response)
def _get_validation_path(self, achall):
return os.sep + os.path.join(challenges.HTTP01.URI_ROOT_PATH, achall.chall.encode("token"))
def new_authorization(self, authz, client, key, domain):
for combination in authz.combinations:
if len(combination) == 1:
challenger = authz.challenges[combination[0]]
challenge = challenger.chall
if isinstance(challenge, acme.challenges.HTTP01):
# store (and deliver) needed response for challenge
content = challenge.validation(key)
event = Event()
self.responses.setdefault(domain, {})
self.responses[domain][challenge.path] = (content, event)
# answer challenges / give ACME server go to check challenge
resp = challenge.response(key)
client.answer_challenge(challenger, resp)
# we can wait until this challenge is first requested ...
raise exceptions.AuthorizationNotYetRequested(event)
else:
return False
def _perform_achall_with_script(self, achall):
env = dict(CERTBOT_DOMAIN=achall.domain,
CERTBOT_VALIDATION=achall.validation(achall.account_key))
if isinstance(achall.chall, challenges.HTTP01):
env['CERTBOT_TOKEN'] = achall.chall.encode('token')
else:
os.environ.pop('CERTBOT_TOKEN', None)
os.environ.update(env)
_, out = self._execute_hook('auth-hook')
env['CERTBOT_AUTH_OUTPUT'] = out.strip()
self.env[achall] = env
def perform2(self, achalls):
"""Perform achallenges without IDisplay interaction."""
responses = []
for achall in achalls:
if isinstance(achall.chall, challenges.HTTP01):
server = self.servers.run(
self.config.http01_port, challenges.HTTP01)
response, validation = achall.response_and_validation()
self.http_01_resources.add(
acme_standalone.HTTP01RequestHandler.HTTP01Resource(
chall=achall.chall, response=response,
validation=validation))
else: # tls-sni-01
server = self.servers.run(
self.config.tls_sni_01_port, challenges.TLSSNI01)
response, (cert, _) = achall.response_and_validation(
cert_key=self.key)
self.certs[response.z_domain] = (self.key, cert)
self.served[server].add(achall)
responses.append(response)
def get_chall_pref(self, domain): # pragma: no cover
# pylint: disable=missing-docstring,no-self-use,unused-argument
return [challenges.HTTP01]
def perform2(self, achalls):
"""Perform achallenges without IDisplay interaction."""
responses = []
for achall in achalls:
if isinstance(achall.chall, challenges.HTTP01):
server = self.servers.run(
self.config.http01_port, challenges.HTTP01)
response, validation = achall.response_and_validation()
self.http_01_resources.add(
acme_standalone.HTTP01RequestHandler.HTTP01Resource(
chall=achall.chall, response=response,
validation=validation))
else: # tls-sni-01
server = self.servers.run(
self.config.tls_sni_01_port, challenges.TLSSNI01)
response, (cert, _) = achall.response_and_validation(
cert_key=self.key)
self.certs[response.z_domain] = (self.key, cert)
self.served[server].add(achall)
responses.append(response)
return responses
def _necessary_ports(self):
necessary_ports = set()
if challenges.HTTP01 in self.supported_challenges:
necessary_ports.add(self.config.http01_port)
if challenges.TLSSNI01 in self.supported_challenges:
necessary_ports.add(self.config.tls_sni_01_port)
return necessary_ports