Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _authzr_from_response(self, response, identifier,
uri=None, new_cert_uri=None):
# pylint: disable=no-self-use
if new_cert_uri is None:
try:
new_cert_uri = response.links['next']['url']
except KeyError:
raise errors.ClientError('"next" link missing')
authzr = messages.AuthorizationResource(
body=messages.Authorization.from_json(response.json()),
uri=response.headers.get('Location', uri),
new_cert_uri=new_cert_uri)
if authzr.body.identifier != identifier:
raise errors.UnexpectedUpdate(authzr)
return authzr
if config.key_path is not None: # revocation by cert key
logger.debug("Revoking %s using cert key %s",
config.cert_path[0], config.key_path[0])
crypto_util.verify_cert_matches_priv_key(config.cert_path[0], config.key_path[0])
key = jose.JWK.load(config.key_path[1])
acme = client.acme_from_config_key(config, key)
else: # revocation by account key
logger.debug("Revoking %s using Account Key", config.cert_path[0])
acc, _ = _determine_account(config)
acme = client.acme_from_config_key(config, acc.key, acc.regr)
cert = crypto_util.pyopenssl_load_certificate(config.cert_path[1])[0]
logger.debug("Reason code for revocation: %s", config.reason)
try:
acme.revoke(jose.ComparableX509(cert), config.reason)
_delete_if_appropriate(config)
except acme_errors.ClientError as e:
return str(e)
display_ops.success_revocation(config.cert_path[0])
return None
def _parse_regr_response(self, response, uri=None, new_authzr_uri=None,
terms_of_service=None):
"""
Parse a registration response from the server.
"""
links = _parse_header_links(response)
if u'terms-of-service' in links:
terms_of_service = links[u'terms-of-service'][u'url']
if u'next' in links:
new_authzr_uri = links[u'next'][u'url']
if new_authzr_uri is None:
raise errors.ClientError('"next" link missing')
return (
response.json()
.addCallback(
lambda body:
messages.RegistrationResource(
body=messages.Registration.from_json(body),
uri=self._maybe_location(response, uri=uri),
new_authzr_uri=new_authzr_uri,
terms_of_service=terms_of_service))
)
try:
raise messages.Error.from_json(jobj)
except jose.DeserializationError as error:
# Couldn't deserialize JSON object
raise errors.ClientError((response, error))
else:
# response is not JSON object
raise errors.ClientError(response)
else:
if jobj is not None and response_ct != cls.JSON_CONTENT_TYPE:
logger.debug(
'Ignoring wrong Content-Type (%r) for JSON decodable '
'response', response_ct)
if content_type == cls.JSON_CONTENT_TYPE and jobj is None:
raise errors.ClientError(
'Unexpected response Content-Type: {0}'.format(response_ct))
return response
def _regr_from_response(cls, response, uri=None, new_authzr_uri=None,
terms_of_service=None):
if 'terms-of-service' in response.links:
terms_of_service = response.links['terms-of-service']['url']
if 'next' in response.links:
new_authzr_uri = response.links['next']['url']
if new_authzr_uri is None:
raise errors.ClientError('"next" link missing')
return messages.RegistrationResource(
body=messages.Registration.from_json(response.json()),
uri=response.headers.get('Location', uri),
new_authzr_uri=new_authzr_uri,
terms_of_service=terms_of_service)
jobj = None
if not response.ok:
if jobj is not None:
if response_ct != cls.JSON_ERROR_CONTENT_TYPE:
logger.debug(
'Ignoring wrong Content-Type (%r) for JSON Error',
response_ct)
try:
raise messages.Error.from_json(jobj)
except jose.DeserializationError as error:
# Couldn't deserialize JSON object
raise errors.ClientError((response, error))
else:
# response is not JSON object
raise errors.ClientError(response)
else:
if jobj is not None and response_ct != cls.JSON_CONTENT_TYPE:
logger.debug(
'Ignoring wrong Content-Type (%r) for JSON decodable '
'response', response_ct)
if content_type == cls.JSON_CONTENT_TYPE and jobj is None:
raise errors.ClientError(
'Unexpected response Content-Type: {0}'.format(response_ct))
return response
try:
raise messages.Error.from_json(jobj)
except jose.DeserializationError as error:
# Couldn't deserialize JSON object
raise errors.ClientError((response, error))
else:
# response is not JSON object
raise errors.ClientError(response)
else:
if jobj is not None and response_ct != cls.JSON_CONTENT_TYPE:
logger.debug(
'Ignoring wrong Content-Type (%r) for JSON decodable '
'response', response_ct)
if content_type == cls.JSON_CONTENT_TYPE and jobj is None:
raise errors.ClientError(
'Unexpected response Content-Type: {0}'.format(response_ct))
return response
def check_cert(self, certr):
"""Check for new cert.
:param certr: Certificate Resource
:type certr: `.CertificateResource`
:returns: Updated Certificate Resource.
:rtype: `.CertificateResource`
"""
# TODO: acme-spec 5.1 table action should be renamed to
# "refresh cert", and this method integrated with self.refresh
response, cert = self._get_cert(certr.uri)
if 'Location' not in response.headers:
raise errors.ClientError('Location header missing')
if response.headers['Location'] != certr.uri:
raise errors.UnexpectedUpdate(response.text)
return certr.update(body=cert)
if config.key_path is not None: # revocation by cert key
logger.debug("Revoking %s using cert key %s",
config.cert_path[0], config.key_path[0])
crypto_util.verify_cert_matches_priv_key(config.cert_path[0], config.key_path[0])
key = jose.JWK.load(config.key_path[1])
acme = client.acme_from_config_key(config, key)
else: # revocation by account key
logger.debug("Revoking %s using Account Key", config.cert_path[0])
acc, _ = _determine_account(config)
acme = client.acme_from_config_key(config, acc.key, acc.regr)
cert = crypto_util.pyopenssl_load_certificate(config.cert_path[1])[0]
logger.debug("Reason code for revocation: %s", config.reason)
try:
acme.revoke(jose.ComparableX509(cert), config.reason)
_delete_if_appropriate(config)
except acme_errors.ClientError as e:
return str(e)
display_ops.success_revocation(config.cert_path[0])
return None