Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def current_version(self, kind):
"""Returns numerical version of the specified item.
For example, if kind is "chain" and the current chain link
points to a file named "chain7.pem", returns the integer 7.
:param str kind: the lineage member item ("cert", "privkey",
"chain", or "fullchain")
:returns: the current version of the specified member.
:rtype: int
"""
if kind not in ALL_FOUR:
raise errors.CertStorageError("unknown kind of item")
pattern = re.compile(r"^{0}([0-9]+)\.pem$".format(kind))
target = self.current_target(kind)
if target is None or not os.path.exists(target):
logger.debug("Current-version target for %s "
"does not exist at %s.", kind, target)
target = ""
matches = pattern.match(os.path.basename(target))
if matches:
return int(matches.groups()[0])
else:
logger.debug("No matches for target %s.", kind)
return None
def current_target(self, kind):
"""Returns full path to which the specified item currently points.
:param str kind: the lineage member item ("cert", "privkey",
"chain", or "fullchain")
:returns: The path to the current version of the specified
member.
:rtype: str or None
"""
if kind not in ALL_FOUR:
raise errors.CertStorageError("unknown kind of item")
link = getattr(self, kind)
if not os.path.exists(link):
logger.debug("Expected symlink %s for %s does not exist.",
link, kind)
return None
target = os.readlink(link)
if not os.path.isabs(target):
target = os.path.join(os.path.dirname(link), target)
return os.path.abspath(target)
# self.configuration should be used to read parameters that
# may have been chosen based on default values from the
# systemwide renewal configuration; self.configfile should be
# used to make and save changes.
try:
self.configfile = configobj.ConfigObj(config_filename)
except configobj.ConfigObjError:
raise errors.CertStorageError(
"error parsing {0}".format(config_filename))
# TODO: Do we actually use anything from defaults and do we want to
# read further defaults from the systemwide renewal configuration
# file at this stage?
self.configuration = config_with_defaults(self.configfile)
if not all(x in self.configuration for x in ALL_FOUR):
raise errors.CertStorageError(
"renewal config file {0} is missing a required "
"file reference".format(self.configfile))
self.cert = self.configuration["cert"]
self.privkey = self.configuration["privkey"]
self.chain = self.configuration["chain"]
self.fullchain = self.configuration["fullchain"]
self._fix_symlinks()
"""What are the subject names of this certificate?
(If no version is specified, use the current version.)
:param int version: the desired version number
:returns: the subject names
:rtype: `list` of `str`
:raises .CertStorageError: if could not find cert file.
"""
if version is None:
target = self.current_target("cert")
else:
target = self.version("cert", version)
if target is None:
raise errors.CertStorageError("could not find cert file")
with open(target) as f:
return crypto_util.get_sans_from_cert(f.read())
"""The filename that corresponds to the specified version and kind.
.. warning:: The specified version may not exist in this
lineage. There is no guarantee that the file path returned
by this method actually exists.
:param str kind: the lineage member item ("cert", "privkey",
"chain", or "fullchain")
:param int version: the desired version
:returns: The path to the specified version of the specified member.
:rtype: str
"""
if kind not in ALL_FOUR:
raise errors.CertStorageError("unknown kind of item")
where = os.path.dirname(self.current_target(kind))
return os.path.join(where, "{0}{1}.pem".format(kind, version))
# This attempts to read the renewer config file and augment or replace
# the renewer defaults with any options contained in that file. If
# renewer_config_file is undefined or if the file is nonexistent or
# empty, this .merge() will have no effect.
config.merge(configobj.ConfigObj(cli_config.renewer_config_file))
# Examine the configuration and find the new lineage's name
for i in (cli_config.renewal_configs_dir, cli_config.archive_dir,
cli_config.live_dir):
if not os.path.exists(i):
os.makedirs(i, 0700)
logger.debug("Creating directory %s.", i)
config_file, config_filename = le_util.unique_lineage_name(
cli_config.renewal_configs_dir, lineagename)
if not config_filename.endswith(".conf"):
raise errors.CertStorageError(
"renewal config file name must end in .conf")
# Determine where on disk everything will go
# lineagename will now potentially be modified based on which
# renewal configuration file could actually be created
lineagename = os.path.basename(config_filename)[:-len(".conf")]
archive = os.path.join(cli_config.archive_dir, lineagename)
live_dir = os.path.join(cli_config.live_dir, lineagename)
if os.path.exists(archive):
raise errors.CertStorageError(
"archive directory exists for " + lineagename)
if os.path.exists(live_dir):
raise errors.CertStorageError(
"live directory exists for " + lineagename)
os.mkdir(archive)
os.mkdir(live_dir)
le_util.make_or_verify_dir(cli_config.work_dir,
constants.CONFIG_DIRS_MODE, uid)
for renewal_file in os.listdir(cli_config.renewal_configs_dir):
print "Processing", renewal_file
try:
# TODO: Before trying to initialize the RenewableCert object,
# we could check here whether the combination of the config
# and the rc_config together disables all autorenewal and
# autodeployment applicable to this cert. In that case, we
# can simply continue and don't need to instantiate a
# RenewableCert object for this cert at all, which could
# dramatically improve performance for large deployments
# where autorenewal is widely turned off.
cert = storage.RenewableCert(renewal_file, cli_config)
except errors.CertStorageError:
# This indicates an invalid renewal configuration file, such
# as one missing a required parameter (in the future, perhaps
# also one that is internally inconsistent or is missing a
# required parameter). As a TODO, maybe we should warn the
# user about the existence of an invalid or corrupt renewal
# config rather than simply ignoring it.
continue
if cert.should_autorenew():
# Note: not cert.current_version() because the basis for
# the renewal is the latest version, even if it hasn't been
# deployed yet!
old_version = cert.latest_common_version()
renew(cert, old_version)
notify.notify("Autorenewed a cert!!!", "root", "It worked!")
# TODO: explain what happened
if cert.should_autodeploy():
def _update_link_to(self, kind, version):
"""Make the specified item point at the specified version.
(Note that this method doesn't verify that the specified version
exists.)
:param str kind: the lineage member item ("cert", "privkey",
"chain", or "fullchain")
:param int version: the desired version
"""
if kind not in ALL_FOUR:
raise errors.CertStorageError("unknown kind of item")
link = getattr(self, kind)
filename = "{0}{1}.pem".format(kind, version)
# Relative rather than absolute target directory
target_directory = os.path.dirname(os.readlink(link))
# TODO: it could be safer to make the link first under a temporary
# filename, then unlink the old link, then rename the new link
# to the old link; this ensures that this process is able to
# create symlinks.
# TODO: we might also want to check consistency of related links
# for the other corresponding items
os.unlink(link)
os.symlink(os.path.join(target_directory, filename), link)
cli_config.renewal_configs_dir, lineagename)
if not config_filename.endswith(".conf"):
raise errors.CertStorageError(
"renewal config file name must end in .conf")
# Determine where on disk everything will go
# lineagename will now potentially be modified based on which
# renewal configuration file could actually be created
lineagename = os.path.basename(config_filename)[:-len(".conf")]
archive = os.path.join(cli_config.archive_dir, lineagename)
live_dir = os.path.join(cli_config.live_dir, lineagename)
if os.path.exists(archive):
raise errors.CertStorageError(
"archive directory exists for " + lineagename)
if os.path.exists(live_dir):
raise errors.CertStorageError(
"live directory exists for " + lineagename)
os.mkdir(archive)
os.mkdir(live_dir)
logger.debug("Archive directory %s and live "
"directory %s created.", archive, live_dir)
relative_archive = os.path.join("..", "..", "archive", lineagename)
# Put the data into the appropriate files on disk
target = dict([(kind, os.path.join(live_dir, kind + ".pem"))
for kind in ALL_FOUR])
for kind in ALL_FOUR:
os.symlink(os.path.join(relative_archive, kind + "1.pem"),
target[kind])
with open(target["cert"], "w") as f:
logger.debug("Writing certificate to %s.", target["cert"])
f.write(cert)