Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _check_symlinks(self):
"""Raises an exception if a symlink doesn't exist"""
for kind in ALL_FOUR:
link = getattr(self, kind)
if not os.path.islink(link):
raise errors.CertStorageError(
"expected {0} to be a symlink".format(link))
target = get_link_target(link)
if not os.path.exists(target):
raise errors.CertStorageError("target {0} of symlink {1} does "
"not exist".format(target, link))
"""The filename that corresponds to the specified version and kind.
.. warning:: The specified version may not exist in this
lineage. There is no guarantee that the file path returned
by this method actually exists.
:param str kind: the lineage member item ("cert", "privkey",
"chain", or "fullchain")
:param int version: the desired version
:returns: The path to the specified version of the specified member.
:rtype: str
"""
if kind not in ALL_FOUR:
raise errors.CertStorageError("unknown kind of item")
where = os.path.dirname(self.current_target(kind))
return os.path.join(where, "{0}{1}.pem".format(kind, version))
def lineage_for_certname(cli_config, certname):
"""Find a lineage object with name certname."""
configs_dir = cli_config.renewal_configs_dir
# Verify the directory is there
util.make_or_verify_dir(configs_dir, mode=0o755)
try:
renewal_file = storage.renewal_file_for_certname(cli_config, certname)
except errors.CertStorageError:
return None
try:
return storage.RenewableCert(renewal_file, cli_config)
except (errors.CertStorageError, IOError):
logger.debug("Renewal conf file %s is broken.", renewal_file)
logger.debug("Traceback was:\n%s", traceback.format_exc())
return None
def current_version(self, kind):
"""Returns numerical version of the specified item.
For example, if kind is "chain" and the current chain link
points to a file named "chain7.pem", returns the integer 7.
:param str kind: the lineage member item ("cert", "privkey",
"chain", or "fullchain")
:returns: the current version of the specified member.
:rtype: int
"""
if kind not in ALL_FOUR:
raise errors.CertStorageError("unknown kind of item")
pattern = re.compile(r"^{0}([0-9]+)\.pem$".format(kind))
target = self.current_target(kind)
if target is None or not os.path.exists(target):
logger.debug("Current-version target for %s "
"does not exist at %s.", kind, target)
target = ""
matches = pattern.match(os.path.basename(target))
if matches:
return int(matches.groups()[0])
logger.debug("No matches for target %s.", kind)
return None
def current_version(self, kind):
"""Returns numerical version of the specified item.
For example, if kind is "chain" and the current chain link
points to a file named "chain7.pem", returns the integer 7.
:param str kind: the lineage member item ("cert", "privkey",
"chain", or "fullchain")
:returns: the current version of the specified member.
:rtype: int
"""
if kind not in ALL_FOUR:
raise errors.CertStorageError("unknown kind of item")
pattern = re.compile(r"^{0}([0-9]+)\.pem$".format(kind))
target = self.current_target(kind)
if target is None or not os.path.exists(target):
logger.debug("Current-version target for %s "
"does not exist at %s.", kind, target)
target = ""
matches = pattern.match(os.path.basename(target))
if matches:
return int(matches.groups()[0])
else:
logger.debug("No matches for target %s.", kind)
return None
"""What are the subject names of this certificate?
(If no version is specified, use the current version.)
:param int version: the desired version number
:returns: the subject names
:rtype: `list` of `str`
:raises .CertStorageError: if could not find cert file.
"""
if version is None:
target = self.current_target("cert")
else:
target = self.version("cert", version)
if target is None:
raise errors.CertStorageError("could not find cert file")
with open(target) as f:
return crypto_util.get_names_from_cert(f.read())
def renewal_file_for_certname(config, certname):
"""Return /path/to/certname.conf in the renewal conf directory"""
path = os.path.join(config.renewal_configs_dir, "{0}.conf".format(certname))
if not os.path.exists(path):
raise errors.CertStorageError("No certificate found with name {0} (expected "
"{1}).".format(certname, path))
return path
def _update_link_to(self, kind, version):
"""Make the specified item point at the specified version.
(Note that this method doesn't verify that the specified version
exists.)
:param str kind: the lineage member item ("cert", "privkey",
"chain", or "fullchain")
:param int version: the desired version
"""
if kind not in ALL_FOUR:
raise errors.CertStorageError("unknown kind of item")
link = getattr(self, kind)
filename = "{0}{1}.pem".format(kind, version)
# Relative rather than absolute target directory
target_directory = os.path.dirname(os.readlink(link))
# TODO: it could be safer to make the link first under a temporary
# filename, then unlink the old link, then rename the new link
# to the old link; this ensures that this process is able to
# create symlinks.
# TODO: we might also want to check consistency of related links
# for the other corresponding items
os.unlink(link)
os.symlink(os.path.join(target_directory, filename), link)
# self.configuration should be used to read parameters that
# may have been chosen based on default values from the
# systemwide renewal configuration; self.configfile should be
# used to make and save changes.
try:
self.configfile = configobj.ConfigObj(config_filename)
except configobj.ConfigObjError:
raise errors.CertStorageError(
"error parsing {0}".format(config_filename))
# TODO: Do we actually use anything from defaults and do we want to
# read further defaults from the systemwide renewal configuration
# file at this stage?
self.configuration = config_with_defaults(self.configfile)
if not all(x in self.configuration for x in ALL_FOUR):
raise errors.CertStorageError(
"renewal config file {0} is missing a required "
"file reference".format(self.configfile))
self.cert = self.configuration["cert"]
self.privkey = self.configuration["privkey"]
self.chain = self.configuration["chain"]
self.fullchain = self.configuration["fullchain"]
self.live_dir = os.path.dirname(self.cert)
self._fix_symlinks()
self._check_symlinks()
def lineagename_for_filename(config_filename):
"""Returns the lineagename for a configuration filename.
"""
if not config_filename.endswith(".conf"):
raise errors.CertStorageError(
"renewal config file name must end in .conf")
return os.path.basename(config_filename[:-len(".conf")])