Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def register(self, new_reg=None):
self._registered = True
if new_reg is None:
new_reg = messages.NewRegistration()
self.regr = messages.RegistrationResource(
body=messages.Registration(
contact=new_reg.contact,
agreement=new_reg.agreement))
return succeed(self.regr)
def _regr_from_response(cls, response, uri=None, new_authzr_uri=None,
terms_of_service=None):
terms_of_service = (
response.links['terms-of-service']['url']
if 'terms-of-service' in response.links else terms_of_service)
if new_authzr_uri is None:
try:
new_authzr_uri = response.links['next']['url']
except KeyError:
raise errors.ClientError('"next" link missing')
return messages.RegistrationResource(
body=messages.Registration.from_json(response.json()),
uri=response.headers.get('Location', uri),
new_authzr_uri=new_authzr_uri,
terms_of_service=terms_of_service)
def _regr_from_response(cls, response, uri=None, new_authzr_uri=None,
terms_of_service=None):
if 'terms-of-service' in response.links:
terms_of_service = response.links['terms-of-service']['url']
if 'next' in response.links:
new_authzr_uri = response.links['next']['url']
if new_authzr_uri is None:
raise errors.ClientError('"next" link missing')
return messages.RegistrationResource(
body=messages.Registration.from_json(response.json()),
uri=response.headers.get('Location', uri),
new_authzr_uri=new_authzr_uri,
terms_of_service=terms_of_service)
account_dir_path = self._account_dir_path(account.id)
util.make_or_verify_dir(account_dir_path, 0o700, self.config.strict_permissions)
try:
with open(self._regr_path(account_dir_path), "w") as regr_file:
regr = account.regr
# If we have a value for new-authz, save it for forwards
# compatibility with older versions of Certbot. If we don't
# have a value for new-authz, this is an ACMEv2 directory where
# an older version of Certbot won't work anyway.
if hasattr(acme.directory, "new-authz"):
regr = RegistrationResourceWithNewAuthzrURI(
new_authzr_uri=acme.directory.new_authz,
body={},
uri=regr.uri)
else:
regr = messages.RegistrationResource(
body={},
uri=regr.uri)
regr_file.write(regr.json_dumps())
if not regr_only:
with util.safe_open(self._key_path(account_dir_path),
"w", chmod=0o400) as key_file:
key_file.write(account.key.json_dumps())
with open(self._metadata_path(
account_dir_path), "w") as metadata_file:
metadata_file.write(account.meta.json_dumps())
except IOError as error:
raise errors.AccountStorageError(error)
lambda body:
messages.RegistrationResource(
body=messages.Registration.from_json(body),
uri=self._maybe_location(response, uri=uri),
new_authzr_uri=new_authzr_uri,
terms_of_service=terms_of_service))
)
if args.email is None:
logger.warning('--email was not provided; ACME CA will have no '
'way of contacting you.')
new_reg = messages.NewRegistration.from_data(email=args.email)
if "terms_of_service" in client.directory.meta:
logger.info("By using simp_le, you implicitly agree "
"to the CA's terms of service: %s",
client.directory.meta.terms_of_service)
new_reg = new_reg.update(terms_of_service_agreed=True)
try:
client.new_account(new_reg)
except acme_errors.ConflictError as error:
logger.debug('Client already registered: %s', error.location)
existing_reg = messages.RegistrationResource(uri=error.location)
existing_reg = client.query_registration(existing_reg)
client.net.account = existing_reg
return client
def find_all(self):
return list(six.itervalues(self.accounts))
def save(self, account, client):
if account.id in self.accounts:
logger.debug("Overwriting account: %s", account.id)
self.accounts[account.id] = account
def load(self, account_id):
try:
return self.accounts[account_id]
except KeyError:
raise errors.AccountNotFound(account_id)
class RegistrationResourceWithNewAuthzrURI(messages.RegistrationResource):
"""A backwards-compatible RegistrationResource with a new-authz URI.
Hack: Certbot versions pre-0.11.1 expect to load
new_authzr_uri as part of the account. Because people
sometimes switch between old and new versions, we will
continue to write out this field for some time so older
clients don't crash in that scenario.
"""
new_authzr_uri = jose.Field('new_authzr_uri')
class AccountFileStorage(interfaces.AccountStorage):
"""Accounts file storage.
:ivar .IConfig config: Client configuration
"""
def _deactivate_account(self) -> None:
"""Deactivate account."""
if not self.path_registration_info.exists():
return
_LOGGER.info("Load exists ACME registration")
regr = messages.RegistrationResource.json_loads(
self.path_registration_info.read_text()
)
try:
self._acme_client.deactivate_registration(regr)
except errors.Error as err:
_LOGGER.error("Can't deactivate account: %s", err)
raise AcmeClientError()
self.path_registration_info.unlink()
self.path_account_key.unlink()