Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_str_port(self):
cfile = create_config(("listen=127.0.0.1:abc",))
with pytest.raises(ConfigException):
Config(cfile).load()
raise ConfigException(
f"'{value}' is not a valid delay. Delay is "
"in seconds and must be below 60."
)
if value.count("-") == 1:
start, end = value.split("-")
start, end = start.strip(), end.strip()
if (
start.isdigit()
and end.isdigit()
and int(start) < 60
and int(end) < 60
and int(end) > int(start)
):
return {flag: (start, end)}
raise ConfigException(
f"'{value}' is not a valid delay value. It must be "
"either a single value or a range i.e. 5-10 and "
:param argparse.Namespace args: Parsed arguments.
:raises SystemExit: Exit code :py:obj:`os.EX_OK` when config is valid
or :py:obj:`os.EX_USAGE` when config is invalid.
.. note::
Problems with the configuration will be written to the console using
the :py:mod:`logging` module.
"""
logger = logging.getLogger("blackhole.config.test")
logger.setLevel(logging.INFO)
try:
conf = Config(args.config_file).load().test()
conf.args = args
except ConfigException:
logger.critical("Config error")
raise SystemExit(os.EX_USAGE)
logger.info(f"blackhole: {args.config_file} syntax is OK.")
logger.info(f"blackhole: {args.config_file} test was successful.")
warn_options(conf)
raise SystemExit(os.EX_OK)
.. note::
Spaces, single and double quotes will be stripped. Lines beginning
# will be ignored. # comments in-line will be stripped out and
ignored.
i.e.
# listen = :1025, :::1025 ->
listen = :25, :::25 # IPv4 & IPv6 -> listen = :25, :::25
"""
if self.config_file is None:
return self
if not os.access(self.config_file, os.R_OK):
msg = "Config file does not exist or is not readable."
raise ConfigException(msg)
with open(self.config_file, "r") as _conf_file:
for line in _conf_file.readlines():
line = line.strip()
if line.startswith("#"):
continue
if line.strip() == "":
continue
if "#" in line:
line = line.split("#")[0]
if line.count("=") >= 1:
key, value = line.split("=", 1)
key, value = key.strip(), value.strip()
self.validate_option(key)
value = value.replace('"', "").replace("'", "")
setattr(self, key, value)
else:
"""
if len(self.listen) == 0 and len(self.tls_listen) == 0:
return
listen, tls_listen = [], []
for llisten in self.listen:
addr, port, family, flags = llisten
listen.append((addr, port, family))
for llisten in self.tls_listen:
addr, port, family, flags = llisten
tls_listen.append((addr, port, family))
if set(listen).intersection(tls_listen):
msg = (
"Cannot have multiple listeners on the same address and "
"port."
)
raise ConfigException(msg)
msg = f"You do not have permission to use port {port}"
raise ConfigException(msg)
sock = socket.socket(family, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except (AttributeError, OSError):
pass
if family == socket.AF_INET6:
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
try:
sock.bind((address, port))
except OSError as err:
errmsg = err.strerror
msg = f"Could not use port {port}, {errmsg}"
raise ConfigException(msg)
finally:
sock.close()
del sock
def test_ipv6_support(self):
"""
If an IPv6 listener is configured, confirm IPv6 is supported.
:raises ConfigException: If IPv6 is configured but is not supported by
the operating system.
"""
for address, __, family, ___ in self.listen:
if ":" in address:
if not socket.has_ipv6 and family == socket.AF_UNSPEC:
msg = (
"An IPv6 listener is configured but IPv6 is not "
"available on this platform."
)
raise ConfigException(msg)
self, lambda a: not (inspect.isroutine(a))
)
attrs = [
a[0][1:]
for a in attributes
if not (a[0].startswith("__") and a[0].endswith("__"))
and a[0].startswith("_")
]
if key not in attrs:
_attrs = "', '".join(attrs[:-1])
valid_attrs = f"'{_attrs}' and '{attrs[-1]}'"
msg = (
f"Invalid configuration option '{key}'.\n\nValid options "
f"are: {valid_attrs}"
)
raise ConfigException(msg)
def test_dynamic_switch(self):
"""
Validate that the dynamic_switch value is correct.
:raises ConfigException: When the dynamic switch value is invalid.
"""
if self._dynamic_switch is None:
return
if self._dynamic_switch not in (True, False):
msg = "Allowed dynamic_switch values are true and false."
raise ConfigException(msg)
:raises ConfigException: When the delay is not a number or is above
the maximum allowed value of 60.
.. note::
Delay must be lower than the timeout.
"""
if self.delay and self.delay >= self.timeout:
msg = "Delay must be lower than timeout."
raise ConfigException(msg)
if self.delay and self.delay > 60:
msg = (
"Delay must be 60 seconds or less for security (denial of "
"service)."
)
raise ConfigException(msg)