Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Each element's link must point within the cert lineage's
# directory within the official archive directory
if not os.path.samefile(os.path.dirname(target), self.archive_dir):
logger.debug("Element's link does not point within the "
"cert lineage's directory within the "
"official archive directory. Link: %s, "
"target directory: %s, "
"archive directory: %s. If you've specified "
"the archive directory in the renewal configuration "
"file, you may need to update links by running "
"certbot update_symlinks.",
link, os.path.dirname(target), self.archive_dir)
return False
# The link must point to a file that exists
if not os.path.exists(target):
logger.debug("Link %s points to file %s that does not exist.",
link, target)
return False
# The link must point to a file that follows the archive
# naming convention
pattern = re.compile(r"^{0}([0-9]+)\.pem$".format(kind))
if not pattern.match(os.path.basename(target)):
logger.debug("%s does not follow the archive naming "
"convention.", target)
return False
# It is NOT required that the link's target be a regular
# file (it may itself be a symlink). But we should probably
# do a recursive check that ultimately the target does
# exist?
of :func:`~add_to_checkpoint` and :func:`~register_file_creation`
:param str title: Title describing checkpoint
:raises certbot.errors.ReverterError: when the
checkpoint is not able to be finalized.
"""
# Check to make sure an "in progress" directory exists
if not os.path.isdir(self.config.in_progress_dir):
return
changes_since_path = os.path.join(self.config.in_progress_dir, "CHANGES_SINCE")
changes_since_tmp_path = os.path.join(self.config.in_progress_dir, "CHANGES_SINCE.tmp")
if not os.path.exists(changes_since_path):
logger.info("Rollback checkpoint is empty (no changes made?)")
with open(changes_since_path, 'w') as f:
f.write("No changes\n")
# Add title to self.config.in_progress_dir CHANGES_SINCE
try:
with open(changes_since_tmp_path, "w") as changes_tmp:
changes_tmp.write("-- %s --\n" % title)
with open(changes_since_path, "r") as changes_orig:
changes_tmp.write(changes_orig.read())
# Move self.config.in_progress_dir to Backups directory
shutil.move(changes_since_tmp_path, changes_since_path)
except (IOError, OSError):
logger.error("Unable to finalize checkpoint - adding title")
logger.debug("Exception was:\n%s", traceback.format_exc())
def _get_ssl_vhost_path(self, non_ssl_vh_fp):
""" Get a file path for SSL vhost, uses user defined path as priority,
but if the value is invalid or not defined, will fall back to non-ssl
vhost filepath.
:param str non_ssl_vh_fp: Filepath of non-SSL vhost
:returns: Filepath for SSL vhost
:rtype: str
"""
if self.conf("vhost-root") and os.path.exists(self.conf("vhost-root")):
fp = os.path.join(filesystem.realpath(self.option("vhost_root")),
os.path.basename(non_ssl_vh_fp))
else:
# Use non-ssl filepath
fp = filesystem.realpath(non_ssl_vh_fp)
if fp.endswith(".conf"):
return fp[:-(len(".conf"))] + self.option("le_vhost_ext")
return fp + self.option("le_vhost_ext")
def _copy_create_ssl_vhost_skeleton(self, vhost, ssl_fp):
"""Copies over existing Vhost with IfModule mod_ssl.c> skeleton.
:param obj.VirtualHost vhost: Original VirtualHost object
:param str ssl_fp: Full path where the new ssl_vhost will reside.
A new file is created on the filesystem.
"""
# First register the creation so that it is properly removed if
# configuration is rolled back
if os.path.exists(ssl_fp):
notes = "Appended new VirtualHost directive to file %s" % ssl_fp
files = set()
files.add(ssl_fp)
self.reverter.add_to_checkpoint(files, notes)
else:
self.reverter.register_file_creation(False, ssl_fp)
sift = False
try:
orig_contents = self._get_vhost_block(vhost)
ssl_vh_contents, sift = self._sift_rewrite_rules(orig_contents)
with open(ssl_fp, "a") as new_file:
new_file.write("\n")
new_file.write("\n".join(ssl_vh_contents))
# The content does not include the closing tag, so add it
def renewal_file_for_certname(config, certname):
"""Return /path/to/certname.conf in the renewal conf directory"""
path = os.path.join(config.renewal_configs_dir, "{0}.conf".format(certname))
if not os.path.exists(path):
raise errors.CertStorageError("No certificate found with name {0} (expected "
"{1}).".format(certname, path))
return path
def validate_file(filename):
"""Ensure that the specified file exists."""
if not os.path.exists(filename):
raise errors.PluginError('File not found: {0}'.format(filename))
if os.path.isdir(filename):
raise errors.PluginError('Path is a directory: {0}'.format(filename))
def _get_ssl_vhost_path(self, non_ssl_vh_fp):
""" Get a file path for SSL vhost, uses user defined path as priority,
but if the value is invalid or not defined, will fall back to non-ssl
vhost filepath.
:param str non_ssl_vh_fp: Filepath of non-SSL vhost
:returns: Filepath for SSL vhost
:rtype: str
"""
if self.conf("vhost-root") and os.path.exists(self.conf("vhost-root")):
fp = os.path.join(filesystem.realpath(self.option("vhost_root")),
os.path.basename(non_ssl_vh_fp))
else:
# Use non-ssl filepath
fp = filesystem.realpath(non_ssl_vh_fp)
if fp.endswith(".conf"):
return fp[:-(len(".conf"))] + self.option("le_vhost_ext")
return fp + self.option("le_vhost_ext")
def rename_renewal_config(prev_name, new_name, cli_config):
"""Renames cli_config.certname's config to cli_config.new_certname.
:param .NamespaceConfig cli_config: parsed command line
arguments
"""
prev_filename = renewal_filename_for_lineagename(cli_config, prev_name)
new_filename = renewal_filename_for_lineagename(cli_config, new_name)
if os.path.exists(new_filename):
raise errors.ConfigurationError("The new certificate name "
"is already in use.")
try:
filesystem.replace(prev_filename, new_filename)
except OSError:
raise errors.ConfigurationError("Please specify a valid filename "
"for the new certificate name.")
"""Fixes symlinks in the event of an incomplete version update.
If there is no problem with the current symlinks, this function
has no effect.
"""
previous_symlinks = self._previous_symlinks()
if all(os.path.exists(link[1]) for link in previous_symlinks):
for kind, previous_link in previous_symlinks:
current_link = getattr(self, kind)
if os.path.lexists(current_link):
os.unlink(current_link)
os.symlink(os.readlink(previous_link), current_link)
for _, link in previous_symlinks:
if os.path.exists(link):
os.unlink(link)