Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_version(self):
"""Return version of Apache Server.
Version is returned as tuple. (ie. 2.4.7 = (2, 4, 7))
:returns: version
:rtype: tuple
:raises .PluginError: if unable to find Apache version
"""
try:
stdout, _ = le_util.run_script(
constants.os_constant("version_cmd"))
except errors.SubprocessError:
raise errors.PluginError(
"Unable to run %s -v" %
constants.os_constant("version_cmd"))
regex = re.compile(r"Apache/([0-9\.]*)", re.IGNORECASE)
matches = regex.findall(stdout)
if len(matches) != 1:
raise errors.PluginError("Unable to find Apache version")
return tuple([int(i) for i in matches[0].split(".")])
def _handle_http_error(self, e, domain_name):
hint = None
if str(e).startswith('400 Client Error:'):
hint = 'Are your API key and Secret key values correct?'
return errors.PluginError('Error determining zone identifier for {0}: {1}.{2}'
.format(domain_name, e, ' ({0})'.format(hint) if hint else ''))
:returns: version
:rtype: tuple
:raises .PluginError:
Unable to find Nginx version or version is unsupported
"""
try:
proc = subprocess.Popen(
[self.conf('ctl'), "-c", self.nginx_conf, "-V"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
text = proc.communicate()[1] # nginx prints output to stderr
except (OSError, ValueError) as error:
logger.debug(error, exc_info=True)
raise errors.PluginError(
"Unable to run %s -V" % self.conf('ctl'))
version_regex = re.compile(r"nginx/([0-9\.]*)", re.IGNORECASE)
version_matches = version_regex.findall(text)
sni_regex = re.compile(r"TLS SNI support enabled", re.IGNORECASE)
sni_matches = sni_regex.findall(text)
ssl_regex = re.compile(r" --with-http_ssl_module")
ssl_matches = ssl_regex.findall(text)
if not version_matches:
raise errors.PluginError("Unable to find Nginx version")
if not ssl_matches:
raise errors.PluginError(
"Nginx build is missing SSL module (--with-http_ssl_module).")
self.logger.error('Error getting DNS records using the Transip API: %s', error)
raise errors.PluginError('Error finding DNS entries using the Transip API: {0}'
.format(domain))
if dns_entries:
break
self.logger.warning('Error getting DNS records using the Transip API: retry in {} seconds'.format(
backoff
))
time.sleep(backoff)
backoff = backoff * 2
# If there are still no entries then the Transip API returned the wrong data
if not dns_entries:
error = 'Error finding DNS entries using the Transip API: Empty record set for {}'.format(domain)
self.logger.error(error)
raise errors.PluginError(error)
return dns_entries
def _find_domain_record_id(self, domain, rr = '', typ = '', value = ''):
records = self._request('DescribeDomainRecords', {
'DomainName': domain,
'RRKeyWord': rr,
'TypeKeyWord': typ,
'ValueKeyWord': value,
})
for record in records['DomainRecords']['Record']:
if record['RR'] == rr:
return record['RecordId']
raise errors.PluginError('Unexpected error determining record identifier for {0}: {1}'
.format(rr, 'record not found'))
def enhance(self, domain, enhancement, options=None):
"""Raises an exception for request for unsupported enhancement.
:raises .PluginError: this is always raised as no enhancements
are currently supported
"""
# pylint: disable=unused-argument
raise errors.PluginError(
"Unsupported enhancement: {0}".format(enhancement))
path = {"cert_path": self.parser.find_dir("SSLCertificateFile",
None, vhost.path),
"cert_key": self.parser.find_dir("SSLCertificateKeyFile",
None, vhost.path)}
# Only include if a certificate chain is specified
if chain_path is not None:
path["chain_path"] = self.parser.find_dir(
"SSLCertificateChainFile", None, vhost.path)
# Handle errors when certificate/key directives cannot be found
if not path["cert_path"]:
logger.warning(
"Cannot find an SSLCertificateFile directive in %s. "
"VirtualHost was not modified", vhost.path)
raise errors.PluginError(
"Unable to find an SSLCertificateFile directive")
elif not path["cert_key"]:
logger.warning(
"Cannot find an SSLCertificateKeyFile directive for "
"certificate in %s. VirtualHost was not modified", vhost.path)
raise errors.PluginError(
"Unable to find an SSLCertificateKeyFile directive for "
"certificate")
logger.info("Deploying Certificate to VirtualHost %s", vhost.filep)
if self.version < (2, 4, 8) or (chain_path and not fullchain_path):
# install SSLCertificateFile, SSLCertificateKeyFile,
# and SSLCertificateChainFile directives
set_cert_path = cert_path
self.parser.aug.set(path["cert_path"][-1], cert_path)
self.conf("path"))
# Change the permissions to be writable (GH #1389)
# Umask is used instead of chmod to ensure the client can also
# run as non-root (GH #1795)
old_umask = os.umask(0o022)
try:
# This is coupled with the "umask" call above because
# os.makedirs's "mode" parameter may not always work:
# https://stackoverflow.com/questions/5231901/permission-problems-when-creating-a-dir-with-os-makedirs-python
os.makedirs(self.full_roots[achall.domain], 0o0755)
except OSError as exception:
if exception.errno != errno.EEXIST:
raise errors.PluginError(
"Couldn't create root for {0} http-01 "
"challenge responses: {1}", achall.domain, exception)
finally:
os.umask(old_umask)
"""
Performs the actual challenge resolving.
:param achalls:
:return:
"""
# pylint: disable=missing-docstring
self._get_ip_logging_permission()
mapping = {"http-01": self._perform_http01_challenge,
"dns-01": self._perform_dns01_challenge
}
responses = []
# TODO: group achalls by the same socket.gethostbyname(_ex)
# and prompt only once per server (one "echo -n" per domain)
if self._is_classic_handler_mode() and self._call_handler("pre-perform") is None:
raise errors.PluginError("Error in calling the handler to do the pre-perform (challenge) stage")
for achall in achalls:
responses.append(mapping[achall.typ](achall))
if self._is_classic_handler_mode() and self._call_handler("post-perform") is None:
raise errors.PluginError("Error in calling the handler to do the post-perform (challenge) stage")
return responses
Searches through VirtualHosts and tries to match the id in a comment
:param str id_str: Id string for matching
:returns: The matched VirtualHost or None
:rtype: :class:`~certbot_apache.obj.VirtualHost` or None
:raises .errors.PluginError: If no VirtualHost is found
"""
for vh in self.vhosts:
if self._find_vhost_id(vh) == id_str:
return vh
msg = "No VirtualHost with ID {} was found.".format(id_str)
logger.warning(msg)
raise errors.PluginError(msg)