Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_valid(self, ip):
"""Valid ip address values."""
validate_ip(ip)
def test_invalid(self, ip):
"""Invalid ip address values."""
with pytest.raises(ValueError) as exception:
validate_ip(ip)
assert str(exception.value) == "Invalid IP address: {!r}".format(ip)
:type input_file: click.File | None
:param ip_address: IP addresses passed via the ip address argument
:type query: tuple(str, ...)
"""
if input_file is None and not sys.stdin.isatty():
input_file = click.open_file("-")
if input_file is None and not ip_address:
click.echo(context.get_help())
context.exit(-1)
ip_addresses = []
if input_file is not None:
lines = [line.strip() for line in input_file]
ip_addresses.extend([line for line in lines if validate_ip(line, strict=False)])
ip_addresses.extend(list(ip_address))
if not ip_addresses:
output = [
context.command.get_usage(context),
(
"Error: at least one valid IP address must be passed either as an "
"argument (IP_ADDRESS) or through the -i/--input_file option."
),
]
click.echo("\n\n".join(output))
context.exit(-1)
return ip_addresses
def ip_addresses_parameter(_context, _parameter, values):
"""IPv4 addresses passed from the command line.
:param values: IPv4 address values
:type value: list
:raises click.BadParameter: when any IP address value is invalid
"""
for value in values:
try:
validate_ip(value)
except ValueError:
raise click.BadParameter(value)
return values
def ip(self, ip_address):
"""Get context associated with an IP address.
:param ip_address: IP address to use in the look-up.
:type ip_address: str
:return: Context for the IP address.
:rtype: dict
"""
LOGGER.debug("Getting context for %s...", ip_address, ip_address=ip_address)
validate_ip(ip_address)
endpoint = self.EP_NOISE_CONTEXT.format(ip_address=ip_address)
if self.use_cache:
cache = self.IP_CONTEXT_CACHE
response = (
cache[ip_address]
if ip_address in self.IP_CONTEXT_CACHE
else cache.setdefault(ip_address, self._request(endpoint))
)
else:
response = self._request(endpoint)
if "ip" not in response:
response["ip"] = ip_address
return response
"""Get activity associated with one or more IP addresses.
:param ip_addresses: One or more IP addresses to use in the look-up.
:type ip_addresses: str | list
:return: Bulk status information for IP addresses.
:rtype: dict
"""
if isinstance(ip_addresses, str):
ip_addresses = [ip_addresses]
LOGGER.debug("Getting noise status...", ip_addresses=ip_addresses)
ip_addresses = [
ip_address
for ip_address in ip_addresses
if validate_ip(ip_address, strict=False)
]
if self.use_cache:
cache = self.IP_QUICK_CHECK_CACHE
# Keep the same ordering as in the input
ordered_results = OrderedDict(
(ip_address, cache.get(ip_address)) for ip_address in ip_addresses
)
api_ip_addresses = [
ip_address
for ip_address, result in ordered_results.items()
if result is None
]
if api_ip_addresses:
api_results = []
chunks = more_itertools.chunked(
def interesting(self, ip_address):
"""Report an IP as "interesting".
:param ip_address: IP address to report as "interesting".
:type ip_address: str
"""
LOGGER.debug(
"Reporting interesting IP: %s...", ip_address, ip_address=ip_address
)
validate_ip(ip_address)
endpoint = self.EP_INTERESTING.format(ip_address=ip_address)
response = self._request(endpoint, method="post")
return response
:return: Bulk status information for IP addresses.
:rtype: dict
"""
if isinstance(ip_addresses, str):
ip_addresses = [ip_addresses]
LOGGER.msg(
"Getting noise status for {}...".format(ip_addresses),
ip_addresses=ip_addresses,
level="info",
)
ip_addresses = [
ip_address
for ip_address in ip_addresses
if validate_ip(ip_address, strict=False)
]
if self.use_cache:
cache = self.IP_QUICK_CHECK_CACHE
# Keep the same ordering as in the input
ordered_results = OrderedDict(
(ip_address, cache.get(ip_address)) for ip_address in ip_addresses
)
api_ip_addresses = [
ip_address
for ip_address, result in ordered_results.items()
if result is None
]
if api_ip_addresses:
api_results = []
if len(api_ip_addresses) == 1:
def ip(self, ip_address):
"""Get context associated with an IP address.
:param ip_address: IP address to use in the look-up.
:type recurse: str
:return: Context for the IP address.
:rtype: dict
"""
LOGGER.msg(
"Getting context for {}...".format(ip_address),
ip_address=ip_address,
level="debug",
)
validate_ip(ip_address)
endpoint = self.EP_NOISE_CONTEXT.format(ip_address=ip_address)
if self.use_cache:
cache = self.IP_CONTEXT_CACHE
response = (
cache[ip_address]
if ip_address in self.IP_CONTEXT_CACHE
else cache.setdefault(ip_address, self._request(endpoint))
)
else:
response = self._request(endpoint)
if "ip" not in response:
response["ip"] = ip_address
return response