Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def h8mail(user_args):
"""
Handles most user arg logic. Creates a list() of targets from user input.
Starts the target object factory loop; starts local searches after factory if in user inputs
Prints results, saves to csv if in user inputs
"""
if not user_args.user_targets:
c.bad_news("Missing Target")
exit(1)
targets = []
start_time = time.time()
c.good_news("Targets:")
# Find targets in user input or file
for arg in user_args.user_targets:
user_stdin_target = fetch_emails(arg, user_args)
if user_stdin_target:
targets.extend(user_stdin_target)
elif os.path.isfile(arg):
c.info_news("Reading from file " + arg)
targets.extend(get_emails_from_file(arg, user_args))
else:
c.bad_news("No targets found in user input")
exit(1)
c.bad_news("Missing Target")
exit(1)
targets = []
start_time = time.time()
c.good_news("Targets:")
# Find targets in user input or file
for arg in user_args.user_targets:
user_stdin_target = fetch_emails(arg, user_args)
if user_stdin_target:
targets.extend(user_stdin_target)
elif os.path.isfile(arg):
c.info_news("Reading from file " + arg)
targets.extend(get_emails_from_file(arg, user_args))
else:
c.bad_news("No targets found in user input")
exit(1)
c.info_news("Removing duplicates")
targets = list(set(targets))
# Launch
breached_targets = target_factory(targets, user_args)
# These are not done inside the factory as the factory iterates over each target individually
# The following functions perform line by line checks of all targets per line
if user_args.bc_path:
breached_targets = breachcomp_check(breached_targets, user_args.bc_path)
local_found = None
# Handle cleartext search
"Found {num} entries for {target} using LeakLookup (public)".format(
num=len(response["message"]), target=self.target
)
)
for result in response["message"]:
self.pwned += 1
self.data.append(("LEAKLOOKUP_PUB", result))
if "false" in response["error"] and len(response["message"]) == 0:
c.info_news(
"No breaches found for {} using Leak-lookup (public)".format(
self.target
)
)
except Exception as ex:
c.bad_news(
"Leak-lookup error with {target} (public)".format(target=self.target)
)
print(ex)
)
else:
self.data.append(("HIBP_PASTE", d["Id"]))
c.good_news(
"Found {num} pastes for {target} using HIBP".format(
num=len(data), target=self.target
)
)
elif response.status_code == 404:
c.info_news(
"No pastes found for {} using HIBP PASTE".format(self.target)
)
else:
c.bad_news(
"HIBP PASTE: got API response code {code} for {target}".format(
code=response.status_code, target=self.target
)
)
except Exception as ex:
c.bad_news("HIBP PASTE error: " + self.target)
print(ex)
return
for result in response["Data"]:
if "Username" in result:
self.data.append(("WLI_USERNAME", result["Username"]))
if "Email" in result and self.not_exists(result["Email"]):
self.data.append(("WLI_RELATED", result["Email"].strip()))
if "Password" in result:
self.data.append(("WLI_PASSWORD", result["Password"]))
self.pwned += 1
if "Hash" in result:
self.data.append(("WLI_HASH", result["Hash"]))
self.pwned += 1
if "Database" in result and self.not_exists(result["Database"]):
self.data.append(("WLI_SOURCE", result["Database"]))
except Exception as ex:
c.bad_news(
"WeLeakInfo error with {target} (private)".format(target=self.target)
)
print(ex)
)
self.headers.update({"Authorization": "Bearer " + api_key})
req = self.make_request(url, timeout=30)
self.headers.popitem()
response = req.json()
if req.status_code != 200:
c.bad_news(f"Got WLI API response code {req.status_code} (public)")
return
else:
c.good_news(
"Found {num} entries for {target} using WeLeakInfo (public)".format(
num=response["Total"], target=self.target
)
)
if response["Success"] is False:
c.bad_news(response["Message"])
return
self.data.append(("WLI_PUB_TOTAL", response["Total"]))
if response["Total"] == 0:
return
for name, data in response["Data"].items():
self.data.append(("WLI_PUB_SRC", name + " (" + str(data) + ")"))
except Exception as ex:
c.bad_news(
"WeLeakInfo error with {target} (public)".format(target=self.target)
)
print(ex)
)
)
if "never" in data["details"]["last_seen"]:
return
self.data.append(("EMAILREP_LASTSN", data["details"]["last_seen"]))
if len(data["details"]["profiles"]) != 0:
for profile in data["details"]["profiles"]:
self.data.append(("EMAILREP_SOCIAL", profile))
c.good_news("Found social profils")
elif response.status_code == 404:
c.info_news(
"No data found for {} using emailrep.io".format(self.target)
)
else:
c.bad_news(
"emailrep.io: got API response code {code} for {target}".format(
code=response.status_code, target=self.target
)
)
except Exception as ex:
c.bad_news("emailrep.io error: " + self.target)
print(ex)
for d in data: # Returned type is a dict of Name : Service
for _, ser in d.items():
self.data.append(("HIBP", ser))
self.pwned += 1
c.good_news(
"Found {num} breaches for {target} using HIBP".format(
num=len(self.data) - 1, target=self.target
)
)
self.get_hibp_pastes()
elif response.status_code == 404:
c.info_news("No breaches found for {} using HIBP".format(self.target))
else:
c.bad_news(
"HIBP: got API response code {code} for {target}".format(
code=response.status_code, target=self.target
)
)
except Exception as ex:
c.bad_news("HIBP error: " + self.target)
print(ex)
for profile in data["details"]["profiles"]:
self.data.append(("EMAILREP_SOCIAL", profile))
c.good_news("Found social profils")
elif response.status_code == 404:
c.info_news(
"No data found for {} using emailrep.io".format(self.target)
)
else:
c.bad_news(
"emailrep.io: got API response code {code} for {target}".format(
code=response.status_code, target=self.target
)
)
except Exception as ex:
c.bad_news("emailrep.io error: " + self.target)
print(ex)
"""
Outputs CSV from target object list.
Dumps the target.data object variable into CSV file.
"""
with open(dest_csv, "w", newline="") as csvfile:
try:
writer = csv.writer(csvfile)
writer.writerow(["Target", "Type", "Data"])
c.good_news("Writing to CSV")
for t in target_obj_list:
for i in range(len(t.data)):
if len(t.data[i]) == 2: # Contains data header + body
writer.writerow([t.target, t.data[i][0], t.data[i][1]])
except Exception as ex:
c.bad_news("Error writing to csv")
print(ex)