Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def load_comparable_csr(*names):
"""Load ComparableX509 certificate request."""
return jose.ComparableX509(load_csr(*names))
def load_comparable_cert(*names):
"""Load ComparableX509 cert."""
return jose.ComparableX509(load_cert(*names))
def issue(client, authzs, cert_output=None):
"""Given a list of authzs that are being processed by the server,
wait for them to be ready, then request issuance of a cert with a random
key for the given domains.
If cert_output is provided, write the cert as a PEM file to that path."""
csr = make_csr([authz.body.identifier.value for authz in authzs])
cert_resource = None
try:
cert_resource, _ = client.poll_and_request_issuance(jose.ComparableX509(csr), authzs)
except acme_errors.PollError as error:
# If we get a PollError, pick the first failed authz and turn it into a more
# useful ValidationError that contains details we can look for in tests.
for authz in error.updated:
updated_authz = json.loads(urllib2.urlopen(authz.uri).read())
domain = authz.body.identifier.value,
for c in updated_authz['challenges']:
if 'error' in c:
err = c['error']
raise ValidationError(domain, err['type'], err['detail'])
# If none of the authz's had an error, just re-raise.
raise
if cert_output is not None:
pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
cert_resource.body)
with open(cert_output, 'w') as f:
"""
if self.auth_handler is None:
msg = ("Unable to obtain certificate because authenticator is "
"not set.")
logger.warning(msg)
raise errors.Error(msg)
if self.account.regr is None:
raise errors.Error("Please register with the ACME server first.")
logger.debug("CSR: %s, domains: %s", csr, domains)
if authzr is None:
authzr = self.auth_handler.get_authorizations(domains)
certr = self.acme.request_issuance(
jose.ComparableX509(
OpenSSL.crypto.load_certificate_request(typ, csr.data)),
authzr)
return certr, self.acme.fetch_chain(certr)
:rtype: tuple
"""
if self.auth_handler is None:
msg = ("Unable to obtain certificate because authenticator is "
"not set.")
logger.warning(msg)
raise errors.Error(msg)
if self.account.regr is None:
raise errors.Error("Please register with the ACME server first.")
logger.debug("CSR: %s, domains: %s", csr, domains)
authzr = self.auth_handler.get_authorizations(domains)
certr = self.acme.request_issuance(
jose.ComparableX509(OpenSSL.crypto.load_certificate_request(
OpenSSL.crypto.FILETYPE_ASN1, csr.data)),
authzr)
return certr, self.acme.fetch_chain(certr)
def _get_cert(self, uri):
"""Returns certificate from URI.
:param str uri: URI of certificate
:returns: tuple of the form
(response, :class:`acme.jose.ComparableX509`)
:rtype: tuple
"""
content_type = self.DER_CONTENT_TYPE # TODO: make it a param
response = self.net.get(uri, headers={'Accept': content_type},
content_type=content_type)
return response, jose.ComparableX509(OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_ASN1, response.content))
def _revoke(rawkey, rawcert):
ns = ConfigNamespace(None)
acme = acme_client.Client(ns.server, key=JWKRSA(
key=serialization.load_pem_private_key(
rawkey, password=None, backend=default_backend())))
acme.revoke(jose.ComparableX509(OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_PEM, rawcert)))
def _revoke_cert(self, cert, version):
if self.cpath_validity[cert.version("cert", version)]:
logger.debug("Certificate is already revoked.")
return
acme = self._get_acme_client_for_revoc(cert, version)
try:
acme.revoke(jose.ComparableX509(cert.pyopenssl(version)))
except acme_errors.ClientError:
logger.error(
"Unable to revoke certificate at %s",
cert.version("cert", version))
raise errors.Error("Failed revocation")
else:
self.cpath_validity[cert.version("cert", version)] = REV_LABEL
def _dump_cert(cert):
if isinstance(cert, jose.ComparableX509):
# pylint: disable=protected-access
cert = cert.wrapped
return OpenSSL.crypto.dump_certificate(filetype, cert)
def _get_cert(self, uri):
"""Returns certificate from URI.
:param str uri: URI of certificate
:returns: tuple of the form
(response, :class:`acme.jose.ComparableX509`)
:rtype: tuple
"""
content_type = DER_CONTENT_TYPE # TODO: make it a param
response = self.net.get(uri, headers={'Accept': content_type},
content_type=content_type)
return response, jose.ComparableX509(OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_ASN1, response.content))