Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_returns_true_on_valid_ipv4_address(address):
assert ipv4(address)
assert not ipv6(address)
def validate_indicator(self, indicator_type, indicator_value):
if indicator_type == 'IPv4':
return validators.ipv4(indicator_value)
elif indicator_type == 'hostname' or indicator_type == 'domain':
return validators.domain(indicator_value)
elif indicator_type == 'email':
return validators.email(indicator_value)
elif indicator_type == 'URL':
return validators.url(indicator_value)
elif indicator_type.startswith('FileHash'):
return re.match(r'(?=(?:.{32}|.{40}|.{64})$)[a-fA-F\d]', indicator_value)
def RunEasy():
"""It receives information from the user."""
# server_ip_or_domain
while True:
cprint("Please type in the server ip or domain address.",
'white', 'on_red', attrs=['bold'])
server_name_or_ip = str(input('server ip or domain = '))
if server_name_or_ip == "":
cprint("Please do not leave blank, try again...",
'white', 'on_red', attrs=['bold'])
continue
else:
if(domain(server_name_or_ip) or ipv4(server_name_or_ip)):
break
else:
cprint("Please enter a valid address...",
'white', 'on_red', attrs=['bold'])
continue
# Static Url
while True:
cprint("Write your STATIC_URL (Django Settings.py)",
'white', 'on_red', attrs=['bold'])
static_url = str(input('STATIC_URL = '))
if static_url == "":
cprint("Please do not leave blank, try again...",
'white', 'on_red', attrs=['bold'])
continue
def asn_lookup(ip_addr):
"""Resolve the ASN of an IP address.
Params:
- ip_addr: (type: string) IP address.
Returns:
- result: (type: string) ASN number and organisation.
"""
if validators.ipv4(ip_addr):
try:
if ip_addr == '255.255.255.255':
return 'AS0000 Unknown'
with geoip2.database.Reader(BASECONFIG.asn_db) as reader:
response = reader.asn(ip_addr)
if response is not None:
asn_number = response.autonomous_system_number
asn_org = response.autonomous_system_organization
return 'AS{0} {1}'.format(asn_number, asn_org)
except Exception as e:
LOGGING.warning(
'Failed to perform ASN lookup for address {0}: {1}'.format(
ip_addr, str(e)))
"""
if not argument or len(argument.strip()) == 0:
return None
elif argument[0] is '#':
return None
elif validators.url(argument):
return "URL"
elif validators.md5(argument):
return "MD5"
elif validators.sha1(argument):
return "SHA1"
elif validators.sha256(argument):
return "SHA256"
elif validators.sha512(argument):
return "SHA512"
elif validators.ipv4(argument):
return "IPv4"
elif validators.ipv6(argument):
return "IPv6"
elif validators.domain(argument):
return "domain"
else:
return None
def is_valid_ip(ip):
if any(s in ip for s in IP_BLACKLIST):
return False
return validators.ipv4(ip)
def process_host(provided_ioc):
"""Pivot off an IP or domain and return data as an dictonary."""
if validators.ipv4(provided_ioc):
ioc_type = "ip"
else:
ioc_type = "domain"
ioc_dicts = []
resp = requests.get(api.format(ioc_type, ioc_type, provided_ioc),
headers={"User-Agent": useragent})
if resp.status_code == 200 and "permalink" in resp.json().keys() and \
provided_ioc in resp.json()["permalink"]:
for key in resp.json().keys():
if key == "votes" or key == "permalink" or key == "response_code":
pass
elif key == "resolutions":
for res in resp.json()[key]:
res = lower_keys(res)
def process_iocs(results):
"""Return data formatted for Splunk from CyberCrime Tracker."""
if results != None:
provided_iocs = [y for x in results for y in x.values()]
else:
provided_iocs = sys.argv[1:]
session = commons.create_session()
splunk_table = []
for provided_ioc in set(provided_iocs):
provided_ioc = commons.deobfuscate_string(provided_ioc)
if validators.domain(provided_ioc) or validators.ipv4(provided_ioc):
cct_dicts = query_cct(provided_ioc, session)
else:
splunk_table.append({"invalid": provided_ioc})
continue
for cct_dict in cct_dicts:
splunk_table.append(cct_dict)
session.close()
return splunk_table
def read_iocs(cb, file=sys.stdin):
iocs = defaultdict(list)
report_id = hashlib.md5()
report_id.update(str(time.time()).encode("utf-8"))
for idx, line in enumerate(sys.stdin):
line = line.rstrip("\r\n")
report_id.update(line.encode("utf-8"))
if validators.md5(line):
iocs["md5"].append(line)
elif validators.sha256(line):
eprint("line {}: sha256 provided but not yet supported by backend".format(idx + 1))
iocs["sha256"].append(line)
elif validators.ipv4(line):
iocs["ipv4"].append(line)
elif validators.ipv6(line):
iocs["ipv6"].append(line)
elif validators.domain(line):
iocs["dns"].append(line)
else:
if cb.validate_query(line):
query_ioc = {"search_query": line}
iocs["query"].append(query_ioc)
else:
eprint("line {}: invalid query".format(idx + 1))
return (report_id.hexdigest(), dict(iocs))
#