Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
:ivar JWK jwk: JSON Web Key
:ivar tuple cert_fingerprints: `tuple` of `unicode`
:ivar tuple certs: Sequence of :class:`acme.jose.ComparableX509`
certificates.
:ivar tuple subject_key_identifiers: `tuple` of `unicode`
:ivar tuple issuers: `tuple` of `unicode`
:ivar tuple authorized_for: `tuple` of `unicode`
"""
jwk = jose.Field("jwk", decoder=jose.JWK.from_json)
cert_fingerprints = jose.Field(
"certFingerprints", omitempty=True, default=())
certs = jose.Field("certs", omitempty=True, default=())
subject_key_identifiers = jose.Field(
"subjectKeyIdentifiers", omitempty=True, default=())
serial_numbers = jose.Field("serialNumbers", omitempty=True, default=())
issuers = jose.Field("issuers", omitempty=True, default=())
authorized_for = jose.Field("authorizedFor", omitempty=True, default=())
@certs.encoder
def certs(value): # pylint: disable=missing-docstring,no-self-argument
return tuple(jose.encode_cert(cert) for cert in value)
@certs.decoder
def certs(value): # pylint: disable=missing-docstring,no-self-argument
return tuple(jose.decode_cert(cert) for cert in value)
alg = jose.Field("alg", decoder=jose.JWASignature.from_json)
nonce = jose.Field(
"nonce", encoder=jose.encode_b64jose, decoder=functools.partial(
jose.decode_b64jose, size=NONCE_SIZE))
hints = jose.Field("hints", decoder=Hints.from_json)
hints = jose.Field("hints", decoder=Hints.from_json)
@ChallengeResponse.register
class ProofOfPossessionResponse(ChallengeResponse):
"""ACME "proofOfPossession" challenge response.
:ivar bytes nonce: Random data, **not** base64-encoded.
:ivar acme.other.Signature signature: Sugnature of this message.
"""
typ = "proofOfPossession"
NONCE_SIZE = ProofOfPossession.NONCE_SIZE
nonce = jose.Field(
"nonce", encoder=jose.encode_b64jose, decoder=functools.partial(
jose.decode_b64jose, size=NONCE_SIZE))
signature = jose.Field("signature", decoder=other.Signature.from_json)
def verify(self):
"""Verify the challenge."""
# self.signature is not Field | pylint: disable=no-member
return self.signature.verify(self.nonce)
@Challenge.register # pylint: disable=too-many-ancestors
class DNS(_TokenDVChallenge):
"""ACME "dns" challenge."""
typ = "dns"
LABEL = "_acme-challenge"
@classmethod
def from_json(cls, jobj):
return cls(jobj)
class _TokenDVChallenge(DVChallenge):
"""DV Challenge with token.
:ivar bytes token:
"""
TOKEN_SIZE = 128 / 8 # Based on the entropy value from the spec
"""Minimum size of the :attr:`token` in bytes."""
# TODO: acme-spec doesn't specify token as base64-encoded value
token = jose.Field(
"token", encoder=jose.encode_b64jose, decoder=functools.partial(
jose.decode_b64jose, size=TOKEN_SIZE, minimum=True))
# XXX: rename to ~token_good_for_url
@property
def good_token(self): # XXX: @token.decoder
"""Is `token` good?
.. todo:: acme-spec wants "It MUST NOT contain any non-ASCII
characters", but it should also warrant that it doesn't
contain ".." or "/"...
"""
# TODO: check that path combined with uri does not go above
# URI_ROOT_PATH!
return b'..' not in self.token and b'/' not in self.token
@Directory.register
class NewAuthorization(Authorization):
"""New authorization."""
resource_type = 'new-authz'
resource = fields.Resource(resource_type)
class AuthorizationResource(ResourceWithURI):
"""Authorization Resource.
:ivar acme.messages.Authorization body:
:ivar unicode new_cert_uri: URI found in the 'next' ``Link`` header
"""
body = jose.Field('body', decoder=Authorization.from_json)
new_cert_uri = jose.Field('new_cert_uri')
@Directory.register
class CertificateRequest(jose.JSONObjectWithFields):
"""ACME new-cert request.
:ivar acme.jose.util.ComparableX509 csr:
`OpenSSL.crypto.X509Req` wrapped in `.ComparableX509`
"""
resource_type = 'new-cert'
resource = fields.Resource(resource_type)
csr = jose.Field('csr', decoder=jose.decode_csr, encoder=jose.encode_csr)
class CertificateResource(ResourceWithURI):
return self.response(account_key).gen_cert(key=kwargs.get('cert_key'))
@Challenge.register
class RecoveryContact(ContinuityChallenge):
"""ACME "recoveryContact" challenge.
:ivar unicode activation_url:
:ivar unicode success_url:
:ivar unicode contact:
"""
typ = "recoveryContact"
activation_url = jose.Field("activationURL", omitempty=True)
success_url = jose.Field("successURL", omitempty=True)
contact = jose.Field("contact", omitempty=True)
@ChallengeResponse.register
class RecoveryContactResponse(ChallengeResponse):
"""ACME "recoveryContact" challenge response.
:ivar unicode token:
"""
typ = "recoveryContact"
token = jose.Field("token", omitempty=True)
@Challenge.register
class ProofOfPossession(ContinuityChallenge):
@Directory.register
class NewAuthorization(Authorization):
"""New authorization."""
resource_type = 'new-authz'
resource = fields.Resource(resource_type)
class AuthorizationResource(ResourceWithURI):
"""Authorization Resource.
:ivar acme.messages.Authorization body:
:ivar unicode new_cert_uri: URI found in the 'next' ``Link`` header
"""
body = jose.Field('body', decoder=Authorization.from_json)
new_cert_uri = jose.Field('new_cert_uri')
@Directory.register
class CertificateRequest(jose.JSONObjectWithFields):
"""ACME new-cert request.
:ivar acme.jose.util.ComparableX509 csr:
`OpenSSL.crypto.X509Req` wrapped in `.ComparableX509`
"""
resource_type = 'new-cert'
resource = fields.Resource(resource_type)
csr = jose.Field('csr', decoder=jose.decode_csr, encoder=jose.encode_csr)
# pylint: disable=function-redefined
return self.body.uri # pylint: disable=no-member
class Authorization(ResourceBody):
"""Authorization Resource Body.
:ivar acme.messages.Identifier identifier:
:ivar list challenges: `list` of `.ChallengeBody`
:ivar tuple combinations: Challenge combinations (`tuple` of `tuple`
of `int`, as opposed to `list` of `list` from the spec).
:ivar acme.messages.Status status:
:ivar datetime.datetime expires:
"""
identifier = jose.Field('identifier', decoder=Identifier.from_json)
challenges = jose.Field('challenges', omitempty=True)
combinations = jose.Field('combinations', omitempty=True)
status = jose.Field('status', omitempty=True, decoder=Status.from_json)
# TODO: 'expires' is allowed for Authorization Resources in
# general, but for Key Authorization '[t]he "expires" field MUST
# be absent'... then acme-spec gives example with 'expires'
# present... That's confusing!
expires = fields.RFC3339Field('expires', omitempty=True)
@challenges.decoder
def challenges(value): # pylint: disable=missing-docstring,no-self-argument
return tuple(ChallengeBody.from_json(chall) for chall in value)
@property
def resolved_combinations(self):
class Signature(jose.JSONObjectWithFields):
"""ACME signature.
:ivar .JWASignature alg: Signature algorithm.
:ivar bytes sig: Signature.
:ivar bytes nonce: Nonce.
:ivar .JWK jwk: JWK.
"""
NONCE_SIZE = 16
"""Minimum size of nonce in bytes."""
alg = jose.Field('alg', decoder=jose.JWASignature.from_json)
sig = jose.Field('sig', encoder=jose.encode_b64jose,
decoder=jose.decode_b64jose)
nonce = jose.Field(
'nonce', encoder=jose.encode_b64jose, decoder=functools.partial(
jose.decode_b64jose, size=NONCE_SIZE, minimum=True))
jwk = jose.Field('jwk', decoder=jose.JWK.from_json)
@classmethod
def from_msg(cls, msg, key, nonce=None, nonce_size=None, alg=jose.RS256):
"""Create signature with nonce prepended to the message.
:param bytes msg: Message to be signed.
:param key: Key used for signing.
:type key: `cryptography.hazmat.primitives.asymmetric.rsa.RSAPrivateKey`
(optionally wrapped in `.ComparableRSAKey`).
``achall``.
:ivar acme.challenges.Challenge: Wrapped challenge.
Conveniently, all challenge fields are proxied, i.e. you can
call ``challb.x`` to get ``challb.chall.x`` contents.
:ivar acme.messages.Status status:
:ivar datetime.datetime validated:
:ivar messages.Error error:
"""
__slots__ = ('chall',)
uri = jose.Field('uri')
status = jose.Field('status', decoder=Status.from_json,
omitempty=True, default=STATUS_PENDING)
validated = fields.RFC3339Field('validated', omitempty=True)
error = jose.Field('error', decoder=Error.from_json,
omitempty=True, default=None)
def to_partial_json(self):
jobj = super(ChallengeBody, self).to_partial_json()
jobj.update(self.chall.to_partial_json())
return jobj
@classmethod
def fields_from_json(cls, jobj):
jobj_fields = super(ChallengeBody, cls).fields_from_json(jobj)
jobj_fields['chall'] = challenges.Challenge.from_json(jobj)
return jobj_fields
def __getattr__(self, name):
return getattr(self.chall, name)