Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
class Registration(ResourceBody):
"""Registration Resource Body.
:ivar josepy.jwk.JWK key: Public key.
:ivar tuple contact: Contact information following ACME spec,
`tuple` of `unicode`.
:ivar unicode agreement:
"""
# on new-reg key server ignores 'key' and populates it based on
# JWS.signature.combined.jwk
key = jose.Field('key', omitempty=True, decoder=jose.JWK.from_json)
contact = jose.Field('contact', omitempty=True, default=())
agreement = jose.Field('agreement', omitempty=True)
status = jose.Field('status', omitempty=True)
terms_of_service_agreed = jose.Field('termsOfServiceAgreed', omitempty=True)
only_return_existing = jose.Field('onlyReturnExisting', omitempty=True)
external_account_binding = jose.Field('externalAccountBinding', omitempty=True)
phone_prefix = 'tel:'
email_prefix = 'mailto:'
@classmethod
def from_data(cls, phone=None, email=None, external_account_binding=None, **kwargs):
"""Create registration resource from contact details."""
details = list(kwargs.pop('contact', ()))
if phone is not None:
details.append(cls.phone_prefix + phone)
if email is not None:
details.extend([cls.email_prefix + mail for mail in email.split(',')])
kwargs['contact'] = tuple(details)
return tuple(Identifier.from_json(identifier) for identifier in value)
class OrderResource(ResourceWithURI):
"""Order Resource.
:ivar acme.messages.Order body:
:ivar str csr_pem: The CSR this Order will be finalized with.
:ivar list of acme.messages.AuthorizationResource authorizations:
Fully-fetched AuthorizationResource objects.
:ivar str fullchain_pem: The fetched contents of the certificate URL
produced once the order was finalized, if it's present.
"""
body = jose.Field('body', decoder=Order.from_json)
csr_pem = jose.Field('csr_pem', omitempty=True)
authorizations = jose.Field('authorizations')
fullchain_pem = jose.Field('fullchain_pem', omitempty=True)
@Directory.register
class NewOrder(Order):
"""New order."""
resource_type = 'new-order'
class IdentifierType(_Constant):
"""ACME identifier type."""
POSSIBLE_NAMES = {} # type: dict
IDENTIFIER_FQDN = IdentifierType('dns') # IdentifierDNS in Boulder
class Identifier(jose.JSONObjectWithFields):
"""ACME identifier.
:ivar IdentifierType typ:
:ivar unicode value:
"""
typ = jose.Field('type', decoder=IdentifierType.from_json)
value = jose.Field('value')
class Directory(jose.JSONDeSerializable):
"""Directory."""
_REGISTERED_TYPES = {} # type: dict
class Meta(jose.JSONObjectWithFields):
"""Directory Meta."""
_terms_of_service = jose.Field('terms-of-service', omitempty=True)
_terms_of_service_v2 = jose.Field('termsOfService', omitempty=True)
website = jose.Field('website', omitempty=True)
caa_identities = jose.Field('caaIdentities', omitempty=True)
external_account_required = jose.Field('externalAccountRequired', omitempty=True)
def nonce(value): # pylint: disable=missing-docstring,no-self-argument
try:
return jose.decode_b64jose(value)
except jose.DeserializationError as error:
# TODO: custom error
raise jose.DeserializationError("Invalid nonce: {0}".format(error))
class Signature(jose.Signature):
"""ACME-specific Signature. Uses ACME-specific Header for customer fields."""
__slots__ = jose.Signature._orig_slots # pylint: disable=no-member
# TODO: decoder/encoder should accept cls? Otherwise, subclassing
# JSONObjectWithFields is tricky...
header_cls = Header
header = jose.Field(
'header', omitempty=True, default=header_cls(),
decoder=header_cls.from_json)
# TODO: decoder should check that nonce is in the protected header
class JWS(jose.JWS):
"""ACME-specific JWS. Includes none, url, and kid in protected header."""
signature_cls = Signature
__slots__ = jose.JWS._orig_slots # pylint: disable=no-member
@classmethod
# pylint: disable=arguments-differ
def sign(cls, payload, key, alg, nonce, url=None, kid=None):
# Per ACME spec, jwk and kid are mutually exclusive, so only include a
# jwk field if kid is not provided.
return (ERROR_PREFIX in err.typ) or (OLD_ERROR_PREFIX in err.typ)
return False
@six.python_2_unicode_compatible
class Error(jose.JSONObjectWithFields, errors.Error):
"""ACME error.
https://tools.ietf.org/html/draft-ietf-appsawg-http-problem-00
:ivar unicode typ:
:ivar unicode title:
:ivar unicode detail:
"""
typ = jose.Field('type', omitempty=True, default='about:blank')
title = jose.Field('title', omitempty=True)
detail = jose.Field('detail', omitempty=True)
@classmethod
def with_code(cls, code, **kwargs):
"""Create an Error instance with an ACME Error code.
:unicode code: An ACME error code, like 'dnssec'.
:kwargs: kwargs to pass to Error.
"""
if code not in ERROR_CODES:
raise ValueError("The supplied code: %s is not a known ACME error"
" code" % code)
typ = ERROR_PREFIX + code
return cls(typ=typ, **kwargs)
"""Order Resource Body.
:ivar list of .Identifier: List of identifiers for the certificate.
:ivar acme.messages.Status status:
:ivar list of str authorizations: URLs of authorizations.
:ivar str certificate: URL to download certificate as a fullchain PEM.
:ivar str finalize: URL to POST to to request issuance once all
authorizations have "valid" status.
:ivar datetime.datetime expires: When the order expires.
:ivar .Error error: Any error that occurred during finalization, if applicable.
"""
identifiers = jose.Field('identifiers', omitempty=True)
status = jose.Field('status', decoder=Status.from_json,
omitempty=True)
authorizations = jose.Field('authorizations', omitempty=True)
certificate = jose.Field('certificate', omitempty=True)
finalize = jose.Field('finalize', omitempty=True)
expires = fields.RFC3339Field('expires', omitempty=True)
error = jose.Field('error', omitempty=True, decoder=Error.from_json)
@identifiers.decoder
def identifiers(value): # pylint: disable=missing-docstring,no-self-argument
return tuple(Identifier.from_json(identifier) for identifier in value)
class OrderResource(ResourceWithURI):
"""Order Resource.
:ivar acme.messages.Order body:
:ivar str csr_pem: The CSR this Order will be finalized with.
:ivar list of acme.messages.AuthorizationResource authorizations:
Fully-fetched AuthorizationResource objects.
:ivar str fullchain_pem: The fetched contents of the certificate URL
def load(self, account_id):
try:
return self.accounts[account_id]
except KeyError:
raise errors.AccountNotFound(account_id)
class RegistrationResourceWithNewAuthzrURI(messages.RegistrationResource):
"""A backwards-compatible RegistrationResource with a new-authz URI.
Hack: Certbot versions pre-0.11.1 expect to load
new_authzr_uri as part of the account. Because people
sometimes switch between old and new versions, we will
continue to write out this field for some time so older
clients don't crash in that scenario.
"""
new_authzr_uri = jose.Field('new_authzr_uri')
class AccountFileStorage(interfaces.AccountStorage):
"""Accounts file storage.
:ivar .IConfig config: Client configuration
"""
def __init__(self, config):
self.config = config
util.make_or_verify_dir(config.accounts_dir, 0o700, self.config.strict_permissions)
def _account_dir_path(self, account_id):
return self._account_dir_path_for_server_path(account_id, self.config.server_path)
def _account_dir_path_for_server_path(self, account_id, server_path):
accounts_dir = self.config.accounts_dir_for_server_path(server_path)
:ivar IdentifierType typ:
:ivar unicode value:
"""
typ = jose.Field('type', decoder=IdentifierType.from_json)
value = jose.Field('value')
class Directory(jose.JSONDeSerializable):
"""Directory."""
_REGISTERED_TYPES = {} # type: dict
class Meta(jose.JSONObjectWithFields):
"""Directory Meta."""
_terms_of_service = jose.Field('terms-of-service', omitempty=True)
_terms_of_service_v2 = jose.Field('termsOfService', omitempty=True)
website = jose.Field('website', omitempty=True)
caa_identities = jose.Field('caaIdentities', omitempty=True)
external_account_required = jose.Field('externalAccountRequired', omitempty=True)
def __init__(self, **kwargs):
kwargs = dict((self._internal_name(k), v) for k, v in kwargs.items())
super(Directory.Meta, self).__init__(**kwargs)
@property
def terms_of_service(self):
"""URL for the CA TOS"""
return self._terms_of_service or self._terms_of_service_v2
def __iter__(self):
# When iterating over fields, use the external name 'terms_of_service' instead of
"""Update registration."""
resource_type = 'reg'
resource = fields.Resource(resource_type)
class RegistrationResource(ResourceWithURI):
"""Registration Resource.
:ivar acme.messages.Registration body:
:ivar unicode new_authzr_uri: Deprecated. Do not use.
:ivar unicode terms_of_service: URL for the CA TOS.
"""
body = jose.Field('body', decoder=Registration.from_json)
new_authzr_uri = jose.Field('new_authzr_uri', omitempty=True)
terms_of_service = jose.Field('terms_of_service', omitempty=True)
class ChallengeBody(ResourceBody):
"""Challenge Resource Body.
.. todo::
Confusingly, this has a similar name to `.challenges.Challenge`,
as well as `.achallenges.AnnotatedChallenge`. Please use names
such as ``challb`` to distinguish instances of this class from
``achall``.
:ivar acme.challenges.Challenge: Wrapped challenge.
Conveniently, all challenge fields are proxied, i.e. you can
call ``challb.x`` to get ``challb.chall.x`` contents.
:ivar acme.messages.Status status:
:ivar datetime.datetime validated:
"""ACME-specific JWS.
The JWS implementation in josepy only implements the base JOSE standard. In
order to support the new header fields defined in ACME, this module defines some
ACME-specific classes that layer on top of josepy.
"""
import josepy as jose
class Header(jose.Header):
"""ACME-specific JOSE Header. Implements nonce, kid, and url.
"""
nonce = jose.Field('nonce', omitempty=True, encoder=jose.encode_b64jose)
kid = jose.Field('kid', omitempty=True)
url = jose.Field('url', omitempty=True)
@nonce.decoder
def nonce(value): # pylint: disable=missing-docstring,no-self-argument
try:
return jose.decode_b64jose(value)
except jose.DeserializationError as error:
# TODO: custom error
raise jose.DeserializationError("Invalid nonce: {0}".format(error))
class Signature(jose.Signature):
"""ACME-specific Signature. Uses ACME-specific Header for customer fields."""
__slots__ = jose.Signature._orig_slots # pylint: disable=no-member