Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""
avail_fp = nonssl_vhost.filep
ssl_fp = self._get_ssl_vhost_path(avail_fp)
self._copy_create_ssl_vhost_skeleton(avail_fp, ssl_fp)
# Reload augeas to take into account the new vhost
self.aug.load()
#TODO: add line to write vhost name
# Get Vhost augeas path for new vhost
vh_p = self.aug.match("/files%s//* [label()=~regexp('%s')]" %
(ssl_fp, parser.case_i("VirtualHost")))
if len(vh_p) != 1:
logger.error("Error: should only be one vhost in %s", avail_fp)
raise errors.PluginError("Currently, we only support "
"configurations with one vhost per file")
else:
# This simplifies the process
vh_p = vh_p[0]
# Update Addresses
self._update_ssl_vhosts_addrs(vh_p)
# Add directives
self._add_dummy_ssl_directives(vh_p)
self.save()
# Log actions and create save notes
logger.info("Created an SSL vhost at %s", ssl_fp)
self.save_notes += "Created ssl vhost at %s\n" % ssl_fp
self.save()
def test_enhancements(plugin, domains):
"""Tests supported enhancements returning True if successful"""
supported = plugin.supported_enhancements()
if "redirect" not in supported:
logger.error("The plugin and this program support no common "
"enhancements")
return False
for domain in domains:
try:
plugin.enhance(domain, "redirect")
except le_errors.PluginError as error:
# Don't immediately fail because a redirect may already be enabled
logger.warning("Plugin failed to enable redirect for %s:", domain)
logger.warning("%s", error)
except le_errors.Error as error:
logger.error("An error occurred while enabling redirect for %s:",
domain)
logger.exception(error)
if not _save_and_restart(plugin, "enhanced"):
return False
success = True
for domain in domains:
verify = functools.partial(validator.Validator().redirect, "localhost",
plugin.http_port, headers={"Host": domain})
if not _try_until_true(verify):
def test_rollback(plugin, config, backup):
"""Tests the rollback checkpoints function"""
try:
plugin.rollback_checkpoints(1337)
except le_errors.Error as error:
logger.error("Plugin raised an exception during rollback:")
logger.exception(error)
return False
if _dirs_are_unequal(config, backup):
logger.error("Rollback failed for config `%s`", config)
return False
else:
logger.info("Rollback succeeded")
return True
def create_le_config(parent_dir):
"""Sets up LE dirs in parent_dir and returns the config dict"""
config = copy.deepcopy(constants.CLI_DEFAULTS)
le_dir = os.path.join(parent_dir, "letsencrypt")
config["config_dir"] = os.path.join(le_dir, "config")
config["work_dir"] = os.path.join(le_dir, "work")
config["logs_dir"] = os.path.join(le_dir, "logs_dir")
os.makedirs(config["config_dir"])
os.mkdir(config["work_dir"])
os.mkdir(config["logs_dir"])
return argparse.Namespace(**config) # pylint: disable=star-args
def create_le_config(parent_dir):
"""Sets up LE dirs in parent_dir and returns the config dict"""
config = copy.deepcopy(constants.CLI_DEFAULTS)
config["strict_permissions"] = False
le_dir = os.path.join(parent_dir, "letsencrypt")
config["config_dir"] = os.path.join(le_dir, "config")
config["work_dir"] = os.path.join(le_dir, "work")
config["logs_dir"] = os.path.join(le_dir, "logs_dir")
os.makedirs(config["config_dir"])
os.mkdir(config["work_dir"])
os.mkdir(config["logs_dir"])
return argparse.Namespace(**config) # pylint: disable=star-args
raise PluginError('User did not supply a DirectAdmin server url.')
parsed_url = urlsplit(self.conf('server'))
if self.conf('username') is not None:
username = self.conf('username')
elif parsed_url.username is not None:
username = parsed_url.username
else:
raise PluginError('User did not supply a DirectAdmin username')
if self.conf('login-key') is not None:
loginkey = self.conf('login-key')
elif parsed_url.password is not None:
loginkey = parsed_url.password
else:
raise PluginError('User did not supply a DirectAdmin login key')
self.da_api_client = directadmin.Api(
https=(False if parsed_url.scheme == 'http' else True),
hostname=(parsed_url.hostname if parsed_url.hostname else 'localhost'),
port=(parsed_url.port if parsed_url.port else 2222),
username=username,
password=loginkey)
:raises .PluginError: if unable to find Apache version
"""
try:
stdout, _ = le_util.run_script(
constants.os_constant("version_cmd"))
except errors.SubprocessError:
raise errors.PluginError(
"Unable to run %s -v" %
constants.os_constant("version_cmd"))
regex = re.compile(r"Apache/([0-9\.]*)", re.IGNORECASE)
matches = regex.findall(stdout)
if len(matches) != 1:
raise errors.PluginError("Unable to find Apache version")
return tuple([int(i) for i in matches[0].split(".")])
def prepare_da_client(self):
""" Prepare the DirectAdmin Web API Client """
if self.conf('server') is None:
# TODO: check if there is a local server at https://localhost:2222 (with non-ssl fallback?)
raise PluginError('User did not supply a DirectAdmin server url.')
parsed_url = urlsplit(self.conf('server'))
if self.conf('username') is not None:
username = self.conf('username')
elif parsed_url.username is not None:
username = parsed_url.username
else:
raise PluginError('User did not supply a DirectAdmin username')
if self.conf('login-key') is not None:
loginkey = self.conf('login-key')
elif parsed_url.password is not None:
loginkey = parsed_url.password
else:
raise PluginError('User did not supply a DirectAdmin login key')
self.da_api_client = directadmin.Api(
https=(False if parsed_url.scheme == 'http' else True),
hostname=(parsed_url.hostname if parsed_url.hostname else 'localhost'),
port=(parsed_url.port if parsed_url.port else 2222),
username=username,
password=loginkey)
def get_version(self):
"""Return version of Apache Server.
Version is returned as tuple. (ie. 2.4.7 = (2, 4, 7))
:returns: version
:rtype: tuple
:raises .PluginError: if unable to find Apache version
"""
try:
stdout, _ = le_util.run_script(
constants.os_constant("version_cmd"))
except errors.SubprocessError:
raise errors.PluginError(
"Unable to run %s -v" %
constants.os_constant("version_cmd"))
regex = re.compile(r"Apache/([0-9\.]*)", re.IGNORECASE)
matches = regex.findall(stdout)
if len(matches) != 1:
raise errors.PluginError("Unable to find Apache version")
return tuple([int(i) for i in matches[0].split(".")])
def config_test(self): # pylint: disable=no-self-use
"""Check the configuration of Apache for errors.
:raises .errors.MisconfigurationError: If config_test fails
"""
try:
le_util.run_script(constants.os_constant("conftest_cmd"))
except errors.SubprocessError as err:
raise errors.MisconfigurationError(str(err))