Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def abs_path(self, path):
"""Converts a relative path to an absolute path relative to the root.
Does nothing for paths that are already absolute.
:param str path: The path
:returns: The absolute path
:rtype: str
"""
if not os.path.isabs(path):
return os.path.normpath(os.path.join(self.root, path))
return os.path.normpath(path)
def acquire(self):
# type: () -> None
"""Acquire the lock."""
while self._fd is None:
# Open the file
fd = filesystem.open(self._path, os.O_CREAT | os.O_WRONLY, 0o600)
try:
self._try_lock(fd)
if self._lock_success(fd):
self._fd = fd
finally:
# Close the file if it is not the required one
if self._fd is None:
os.close(fd)
:param client: Client object
:type client: client.Client
:returns: `cert_path` and `fullchain_path` as absolute paths to the actual files
:rtype: `tuple` of `str`
"""
csr, _ = config.actual_csr
cert, chain = le_client.obtain_certificate_from_csr(csr)
if config.dry_run:
logger.debug(
"Dry run: skipping saving certificate to %s", config.cert_path)
return None, None
cert_path, _, fullchain_path = le_client.save_certificate(
cert, chain, os.path.normpath(config.cert_path),
os.path.normpath(config.chain_path), os.path.normpath(config.fullchain_path))
return cert_path, fullchain_path
def determine_user_agent(config):
"""
Set a user_agent string in the config based on the choice of plugins.
(this wasn't knowable at construction time)
:returns: the client's User-Agent string
:rtype: `str`
"""
# WARNING: To ensure changes are in line with Certbot's privacy
# policy, talk to a core Certbot team member before making any
# changes here.
if config.user_agent is None:
ua = ("CertbotACMEClient/{0} ({1}; {2}{8}) Authenticator/{3} Installer/{4} "
"({5}; flags: {6}) Py/{7}")
if os.environ.get("CERTBOT_DOCS") == "1":
cli_command = "certbot(-auto)"
os_info = "OS_NAME OS_VERSION"
python_version = "major.minor.patchlevel"
else:
cli_command = cli.cli_command
os_info = util.get_os_info_ua()
python_version = platform.python_version()
ua = ua.format(certbot.__version__, cli_command, os_info,
config.authenticator, config.installer, config.verb,
ua_flags(config), python_version,
"; " + config.user_agent_comment if config.user_agent_comment else "")
else:
ua = config.user_agent
return ua
def _check_path_actions(self, filepath):
"""Determine actions to take with a new augeas path
This helper function will return a tuple that defines
if we should try to append the new filepath to augeas
parser paths, and / or remove the old one with more
narrow matching.
:param str filepath: filepath to check the actions for
"""
try:
new_file_match = os.path.basename(filepath)
existing_matches = self.parser_paths[os.path.dirname(filepath)]
if "*" in existing_matches:
use_new = False
else:
use_new = True
remove_old = new_file_match == "*"
except KeyError:
use_new = True
remove_old = False
return use_new, remove_old
def compare_file_modes(mode1, mode2):
"""Return true if the two modes can be considered as equals for this platform"""
if os.name != 'nt':
# Linux specific: standard compare
return oct(stat.S_IMODE(mode1)) == oct(stat.S_IMODE(mode2))
# Windows specific: most of mode bits are ignored on Windows. Only check user R/W rights.
return (stat.S_IMODE(mode1) & stat.S_IREAD == stat.S_IMODE(mode2) & stat.S_IREAD
and stat.S_IMODE(mode1) & stat.S_IWRITE == stat.S_IMODE(mode2) & stat.S_IWRITE)
possible_next_link = True
while possible_next_link:
possible_next_link = False
if server_path in reused_servers:
next_server_path = reused_servers[server_path]
next_dir_path = link_func(next_server_path)
if os.path.islink(next_dir_path) and os.readlink(next_dir_path) == dir_path:
possible_next_link = True
server_path = next_server_path
dir_path = next_dir_path
# if there's not a next one up to delete, then delete me
# and whatever I link to
while os.path.islink(dir_path):
target = os.readlink(dir_path)
os.unlink(dir_path)
dir_path = target
return dir_path
"""
if config.dry_run:
_report_successful_dry_run(config)
return
assert cert_path and fullchain_path, "No certificates saved to report."
expiry = crypto_util.notAfter(cert_path).date()
reporter_util = zope.component.getUtility(interfaces.IReporter)
# Print the path to fullchain.pem because that's what modern webservers
# (Nginx and Apache2.4) will want.
verbswitch = ' with the "certonly" option' if config.verb == "run" else ""
privkey_statement = 'Your key file has been saved at:{br}{0}{br}'.format(
key_path, br=os.linesep) if key_path else ""
# XXX Perhaps one day we could detect the presence of known old webservers
# and say something more informative here.
msg = ('Congratulations! Your certificate and chain have been saved at:{br}'
'{0}{br}{1}'
'Your cert will expire on {2}. To obtain a new or tweaked version of this '
'certificate in the future, simply run {3} again{4}. '
'To non-interactively renew *all* of your certificates, run "{3} renew"'
.format(fullchain_path, privkey_statement, expiry, cli.cli_command, verbswitch,
br=os.linesep))
reporter_util.add_message(msg, reporter_util.MEDIUM_PRIORITY)
:raises errors.ConfigurationError: if cert name and domains mismatch
"""
if config.renew_with_new_domains:
return
added, removed = _get_added_removed(new_domains, old_domains)
msg = ("You are updating certificate {0} to include new domain(s): {1}{br}{br}"
"You are also removing previously included domain(s): {2}{br}{br}"
"Did you intend to make this change?".format(
certname,
_format_list("+", added),
_format_list("-", removed),
br=os.linesep))
obj = zope.component.getUtility(interfaces.IDisplay)
if not obj.yesno(msg, "Update cert", "Cancel", default=True):
raise errors.ConfigurationError("Specified mismatched cert name and domains.")
:param list prepared: List of `~.PluginEntryPoint`.
:param str question: Question to be presented to the user.
:returns: Plugin entry point chosen by the user.
:rtype: `~.PluginEntryPoint`
"""
opts = [plugin_ep.description_with_name +
(" [Misconfigured]" if plugin_ep.misconfigured else "")
for plugin_ep in prepared]
names = set(plugin_ep.name for plugin_ep in prepared)
while True:
disp = z_util(interfaces.IDisplay)
if "CERTBOT_AUTO" in os.environ and names == set(("apache", "nginx")):
# The possibility of being offered exactly apache and nginx here
# is new interactivity brought by https://github.com/certbot/certbot/issues/4079,
# so set apache as a default for those kinds of non-interactive use
# (the user will get a warning to set --non-interactive or --force-interactive)
apache_idx = [n for n, p in enumerate(prepared) if p.name == "apache"][0]
code, index = disp.menu(question, opts, default=apache_idx)
else:
code, index = disp.menu(question, opts, force_interactive=True)
if code == display_util.OK:
plugin_ep = prepared[index]
if plugin_ep.misconfigured:
z_util(interfaces.IDisplay).notification(
"The selected plugin encountered an error while parsing "
"your server configuration and cannot be used. The error "
"was:\n\n{0}".format(plugin_ep.prepare()), pause=False)