Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def load_comparable_csr(*names):
"""Load ComparableX509 certificate request."""
return jose.ComparableX509(load_csr(*names))
cleanup = chisel2.do_http_challenges(client, order.authorizations)
try:
order = client.poll_and_finalize(order)
finally:
cleanup()
# Create a new client with the JWK as the cert private key
jwk = josepy.JWKRSA(key=key)
net = acme_client.ClientNetwork(key, user_agent="Boulder integration tester")
directory = Directory.from_json(net.get(chisel2.DIRECTORY_V2).json())
new_client = acme_client.ClientV2(directory, net)
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, order.fullchain_pem)
reset_akamai_purges()
client.revoke(josepy.ComparableX509(cert), 0)
cert_file = tempfile.NamedTemporaryFile(
dir=tempdir, suffix='.test_revoke_by_privkey.pem',
mode='w+', delete=False)
cert_file.write(OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_PEM, cert).decode())
cert_file.close()
verify_ocsp(cert_file.name, "/tmp/intermediate-cert-rsa-a.pem", "http://localhost:4002", "revoked")
verify_akamai_purge()
for name, auth in six.iteritems(authorizations):
challb = supported_challb(auth)
response, validation = challb.response_and_validation(client.net.key)
save_validation(roots[name], challb, validation)
client.answer_challenge(challb, response)
try:
order = finalize_order(client, order)
pems = list(split_pems(order.fullchain_pem))
persist_data(args, existing_data, new_data=IOPlugin.Data(
account_key=client.net.key,
account_reg=client.net.account,
key=key,
cert=jose.ComparableX509(OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_PEM, pems[0])),
chain=[
jose.ComparableX509(OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_PEM, pem))
for pem in pems[1:]
],
))
except Error as error:
persist_data(args, existing_data, new_data=IOPlugin.Data(
account_key=client.net.key,
account_reg=client.net.account,
key=None,
cert=None,
chain=None,
))
raise error
raise errors.Error("Error! Exactly one of --cert-path or --cert-name must be specified!")
if config.key_path is not None: # revocation by cert key
logger.debug("Revoking %s using cert key %s",
config.cert_path[0], config.key_path[0])
crypto_util.verify_cert_matches_priv_key(config.cert_path[0], config.key_path[0])
key = jose.JWK.load(config.key_path[1])
acme = client.acme_from_config_key(config, key)
else: # revocation by account key
logger.debug("Revoking %s using Account Key", config.cert_path[0])
acc, _ = _determine_account(config)
acme = client.acme_from_config_key(config, acc.key, acc.regr)
cert = crypto_util.pyopenssl_load_certificate(config.cert_path[1])[0]
logger.debug("Reason code for revocation: %s", config.reason)
try:
acme.revoke(jose.ComparableX509(cert), config.reason)
_delete_if_appropriate(config)
except acme_errors.ClientError as e:
return str(e)
display_ops.success_revocation(config.cert_path[0])
return None
save_validation(roots[name], challb, validation)
client.answer_challenge(challb, response)
try:
order = finalize_order(client, order)
pems = list(split_pems(order.fullchain_pem))
persist_data(args, existing_data, new_data=IOPlugin.Data(
account_key=client.net.key,
account_reg=client.net.account,
key=key,
cert=jose.ComparableX509(OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_PEM, pems[0])),
chain=[
jose.ComparableX509(OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_PEM, pem))
for pem in pems[1:]
],
))
except Error as error:
persist_data(args, existing_data, new_data=IOPlugin.Data(
account_key=client.net.key,
account_reg=client.net.account,
key=None,
cert=None,
chain=None,
))
raise error
finally:
for name, auth in six.iteritems(authorizations):
challb = supported_challb(auth)
if service_cert_id == id:
verrors.add(
'certificate_delete.id',
f'Selected certificate is being used by {text} service, please select another one'
)
verrors.check()
certificate = self.middleware.call_sync('certificate._get_instance', id)
if certificate.get('acme'):
client, key = self.get_acme_client_and_key(certificate['acme']['directory'], True)
try:
client.revoke(
jose.ComparableX509(
crypto.load_certificate(crypto.FILETYPE_PEM, certificate['certificate'])
),
0
)
except (errors.ClientError, messages.Error) as e:
if not force:
raise CallError(f'Failed to revoke certificate: {e}')
response = self.middleware.call_sync(
'datastore.delete',
self._config.datastore,
id
)
self.middleware.call_sync('service.start', 'ssl')
def load_cert(self, data):
"""Load certificate."""
try:
cert = OpenSSL.crypto.load_certificate(self.typ, data)
except OpenSSL.crypto.Error:
raise Error("simp_le couldn't load a certificate from {0}; the "
"file might be empty or corrupt.".format(self.path))
return jose.ComparableX509(cert)
def __init__(self, *args, **kwargs):
super(PluginIOTestMixin, self).__init__(*args, **kwargs)
raw_key = gen_pkey(1024)
self.all_data = IOPlugin.Data(
account_key=jose.JWKRSA(key=rsa.generate_private_key(
public_exponent=65537, key_size=1024,
backend=default_backend(),
)),
account_reg=messages.NewRegistration.from_data(),
key=ComparablePKey(raw_key),
cert=jose.ComparableX509(crypto_util.gen_ss_cert(raw_key, ['a'])),
chain=[
jose.ComparableX509(crypto_util.gen_ss_cert(raw_key, ['b'])),
jose.ComparableX509(crypto_util.gen_ss_cert(raw_key, ['c'])),
],
)
self.key_data = IOPlugin.EMPTY_DATA._replace(key=self.all_data.key)