Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""Perform the parent process side of the TCP listener task.
This should only be called by :meth:`start_listener`. We will
wait up to delay_amount seconds to hear from the child process
via a signal.
:param int port: Which TCP port to bind.
:param float delay_amount: How long in seconds to wait for the
subprocess to notify us whether it succeeded.
:returns: ``True`` or ``False`` according to whether we were notified
that the child process succeeded or failed in binding the port.
:rtype: bool
"""
display = zope.component.getUtility(interfaces.IDisplay)
start_time = time.time()
while time.time() < start_time + delay_amount:
if self.subproc_state == "ready":
return True
elif self.subproc_state == "inuse":
display.notification(
"Could not bind TCP port {0} because it is already in "
"use by another process on this system (such as a web "
"server). Please stop the program in question and then "
"try again.".format(port))
return False
elif self.subproc_state == "cantbind":
display.notification(
"Could not bind TCP port {0} because you don't have "
"the appropriate permissions (for example, you "
def choose_plugin(prepared, question):
"""Allow the user to choose their plugin.
:param list prepared: List of `~.PluginEntryPoint`.
:param str question: Question to be presented to the user.
:returns: Plugin entry point chosen by the user.
:rtype: `~.PluginEntryPoint`
"""
opts = [plugin_ep.description_with_name +
(" [Misconfigured]" if plugin_ep.misconfigured else "")
for plugin_ep in prepared]
while True:
code, index = util(interfaces.IDisplay).menu(
question, opts, help_label="More Info")
if code == display_util.OK:
plugin_ep = prepared[index]
if plugin_ep.misconfigured:
util(interfaces.IDisplay).notification(
"The selected plugin encountered an error while parsing "
"your server configuration and cannot be used. The error "
"was:\n\n{0}".format(plugin_ep.prepare()),
height=display_util.HEIGHT, pause=False)
else:
return plugin_ep
elif code == display_util.HELP:
if prepared[index].misconfigured:
msg = "Reported Error: %s" % prepared[index].prepare()
else:
def perform(self, achalls): # pylint: disable=missing-docstring
if any(util.already_listening(port) for port in self._necessary_ports):
raise errors.MisconfigurationError(
"At least one of the (possibly) required ports is "
"already taken.")
try:
return self.perform2(achalls)
except errors.StandaloneBindError as error:
display = zope.component.getUtility(interfaces.IDisplay)
if error.socket_error.errno == socket.errno.EACCES:
display.notification(
"Could not bind TCP port {0} because you don't have "
"the appropriate permissions (for example, you "
"aren't running this program as "
"root).".format(error.port))
elif error.socket_error.errno == socket.errno.EADDRINUSE:
display.notification(
"Could not bind TCP port {0} because it is already in "
"use by another process on this system (such as a web "
"server). Please stop the program in question and then "
"try again.".format(error.port))
else:
raise # XXX: How to handle unknown errors in binding?
def confirm_revocation(self, cert, version=None):
"""Confirm revocation screen.
:param storage.RenewableCert cert: Renewable certificate object
:returns: True if user would like to revoke, False otherwise
:rtype: bool
"""
if version is None:
info = self._more_info_lineage(cert)
return zope.component.getUtility(interfaces.IDisplay).yesno(
"Are you sure you would like to revoke all of valid certificates "
"in this lineage?{br}{info}".format(br=os.linesep, info=info))
else:
return zope.component.getUtility(interfaces.IDisplay).yesno(
"Are you sure you would like to revoke the following "
"certificate:{br}{info}{br}"
"This action cannot be reversed!".format(
br=os.linesep, info=self._more_info_cert(cert, version)))
This should only be called by :meth:`start_listener`. We will
wait up to delay_amount seconds to hear from the child process
via a signal.
:param int port: Which TCP port to bind.
:param float delay_amount: How long in seconds to wait for the
subprocess to notify us whether it succeeded.
:returns: ``True`` or ``False`` according to whether we were notified
that the child process succeeded or failed in binding the port.
:rtype: bool
"""
display = zope.component.getUtility(interfaces.IDisplay)
start_time = time.time()
while time.time() < start_time + delay_amount:
if self.subproc_state == "ready":
return True
elif self.subproc_state == "inuse":
display.notification(
"Could not bind TCP port {0} because it is already in "
"use by another process on this system (such as a web "
"server). Please stop the program in question and then "
"try again.".format(port))
return False
elif self.subproc_state == "cantbind":
display.notification(
"Could not bind TCP port {0} because you don't have "
"the appropriate permissions (for example, you "
skip it
:param bool invalid: true if the user just typed something, but it wasn't
a valid-looking email
:returns: Email or ``None`` if cancelled by user.
:rtype: str
"""
msg = "Enter email address (used for urgent notices and lost key recovery)"
if invalid:
msg = "There seem to be problems with that address. " + msg
if more:
msg += ('\n\nIf you really want to skip this, you can run the client with '
'--register-unsafely-without-email but make sure you backup your '
'account key from /etc/letsencrypt/accounts\n\n')
code, email = zope.component.getUtility(interfaces.IDisplay).input(msg)
if code == display_util.OK:
if le_util.safe_email(email):
return email
else:
# TODO catch the server's ACME invalid email address error, and
# make a similar call when that happens
return get_email(more=True, invalid=(email != ""))
else:
return None
def more_info_cert(cert):
"""Displays more info about the cert.
:param dict cert: cert dict used throughout revoker.py
"""
util(interfaces.IDisplay).notification(
"Certificate Information:{0}{1}".format(
os.linesep, cert.pretty_print()),
height=display_util.HEIGHT)
:returns: tuple of the form (code, selection) where
code is a display exit code
selection is the user's int selection
:rtype: tuple
"""
list_choices = [
"%s | %s | %s" % (
str(cert.get_cn().ljust(display_util.WIDTH - 39)),
cert.get_not_before().strftime("%m-%d-%y"),
"Installed" if cert.installed and cert.installed != ["Unknown"]
else "") for cert in certs
]
code, tag = util(interfaces.IDisplay).menu(
"Which certificates would you like to revoke?",
list_choices, help_label="More Info", ok_label="Revoke",
cancel_label="Exit")
return code, tag
# Displayer
if args.text_mode:
displayer = display_util.FileDisplay(sys.stdout)
else:
displayer = display_util.NcursesDisplay()
zope.component.provideUtility(displayer)
# Reporter
report = reporter.Reporter()
zope.component.provideUtility(report)
atexit.register(report.atexit_print_messages)
# TODO: remove developer EULA prompt for the launch
if not config.eula:
eula = pkg_resources.resource_string("letsencrypt", "EULA")
if not zope.component.getUtility(interfaces.IDisplay).yesno(
eula, "Agree", "Cancel"):
raise errors.Error("Must agree to TOS")
if not os.geteuid() == 0:
logger.warning(
"Root (sudo) is required to run most of letsencrypt functionality.")
# check must be done after arg parsing as --help should work
# w/o root; on the other hand, e.g. "letsencrypt run
# --authenticator dns" or "letsencrypt plugins" does not
# require root as well
#return (
# "{0}Root is required to run letsencrypt. Please use sudo.{0}"
# .format(os.linesep))
return args.func(args, config, plugins)