Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
start_time = time.time()
c.good_news("Targets:")
# Find targets in user input or file
for arg in user_args.user_targets:
user_stdin_target = fetch_emails(arg, user_args)
if user_stdin_target:
targets.extend(user_stdin_target)
elif os.path.isfile(arg):
c.info_news("Reading from file " + arg)
targets.extend(get_emails_from_file(arg, user_args))
else:
c.bad_news("No targets found in user input")
exit(1)
c.info_news("Removing duplicates")
targets = list(set(targets))
# Launch
breached_targets = target_factory(targets, user_args)
# These are not done inside the factory as the factory iterates over each target individually
# The following functions perform line by line checks of all targets per line
if user_args.bc_path:
breached_targets = breachcomp_check(breached_targets, user_args.bc_path)
local_found = None
# Handle cleartext search
if user_args.local_breach_src:
for arg in user_args.local_breach_src:
res = find_files(arg)
def breachcomp_check(targets, breachcomp_path):
# https://gist.github.com/scottlinux/9a3b11257ac575e4f71de811322ce6b3
try:
import subprocess
query_bin = os.path.join(breachcomp_path, "query.sh")
st = os.stat(query_bin)
os.chmod(query_bin, st.st_mode | stat.S_IEXEC)
for t in targets:
c.info_news(f"Looking up {t.email} in BreachCompilation")
procfd = subprocess.run([query_bin, t.email], stdout=subprocess.PIPE)
try:
output = procfd.stdout.decode("cp437")
except Exception as e:
c.bad_news(f"Could not decode bytes for {t.email} results")
output = procfd.stdout
# print(output[:85], "[...]")
print(output)
continue
if len(output) != 0:
split_output = output.split("\n")
for line in split_output:
if ":" in line:
t.pwned += 1
t.data.append(("BC_PASS", line.split(":")[1]))
c.good_news(
"Found BreachedCompilation entry {line}".format(line=line)
)
return targets
except Exception as e:
c.bad_news("Breach compilation")
allfiles = []
if "*" in to_parse:
glob_result = glob.glob(to_parse)
for g in glob_result:
allfiles.append(g)
c.info_news("Using file {}".format(g))
if os.path.isfile(to_parse):
if pattern in to_parse:
c.info_news("Using file {}".format(to_parse))
allfiles.append(to_parse)
elif os.path.isdir(to_parse):
for root, _, filenames in os.walk(to_parse):
for filename in filenames:
if pattern in filename:
c.info_news("Using file {}".format(os.path.join(root, filename)))
allfiles.append(os.path.join(root, filename))
return allfiles
def get_urls_from_file(targets_file):
"""
For each line in file, check for URLs using fetch_urls()(todo).
Returns list of URLs.
"""
email_obj_list = []
c.info_news("Parsing urls from \t" + targets_file)
try:
target_fd = open(targets_file).readlines()
for line in target_fd:
e = fetch_urls(line)
if e is None:
continue
else:
email_obj_list.extend(e)
return email_obj_list
except Exception as ex:
c.bad_news("Problems occurred while trying to get URLs from file")
print(ex)
c.good_news(
f"Found occurrence [{filepath}] Line {cnt}: {decoded}"
)
except Exception as e:
c.bad_news(
f"Got a decoding error line {cnt} - file: {filepath}"
)
c.good_news(
f"Found occurrence [{filepath}] Line {cnt}: {line}"
)
found_list.append(
local_breach_target(t, filepath, cnt, str(line))
)
return found_list
except Exception as e:
c.bad_news("Something went wrong with gzip worker")
print(e)
self.pwned += 1
if "Pastebin" in d["Source"]:
self.data.append(
("HIBP_PASTE", "https://pastebin.com/" + d["Id"])
)
else:
self.data.append(("HIBP_PASTE", d["Id"]))
c.good_news(
"Found {num} pastes for {target} using HIBP".format(
num=len(data), target=self.target
)
)
elif response.status_code == 404:
c.info_news(
"No pastes found for {} using HIBP PASTE".format(self.target)
)
else:
c.bad_news(
"HIBP PASTE: got API response code {code} for {target}".format(
code=response.status_code, target=self.target
)
)
except Exception as ex:
c.bad_news("HIBP PASTE error: " + self.target)
print(ex)
def info_news(news):
"""
Print an information with grey text
"""
print(
colors.bold
+ colors.fg.lightblue
+ "[~] "
+ colors.reset
+ colors.fg.lightgrey
+ news.strip()
+ colors.reset
)
def worker_url(url):
"""
Fetches the URL without the h8mail UA
Saves result to tempdir
"""
paramsUA = {"User-Agent": "h8mail/5.0 (X11; Linux i586; rv:31.0)"}
try:
c.info_news("Worker fetching")
r = requests.get(url, params = paramsUA, allow_redirects=False)
c.info_news("Worker done fetch url")
print(f"Status code: {r.status_code}")
e = re.findall(r"[\w\.-]+@[\w\.-]+", r.text)
print("debug toto")
print(e)
if e:
print(", ".join(e), c.reset)
return e
return None
except Exception as ex:
c.bad_news("URL fetch worker error:")
print(ex)