Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def open_test_run(db_filename, port=constants.DEFAULT_WEB_UI_PORT, address="localhost"):
s = SessionInfo(db_filename=db_filename)
w = WebApp(session_info=s, web_port=port, web_addr=address)
w.server_init()
if len(self._post_test_case_methods) > 0:
try:
for f in self._post_test_case_methods:
self._fuzz_data_logger.open_test_step('Post- test case callback: "{0}"'.format(f.__name__))
f(target=target, fuzz_data_logger=self._fuzz_data_logger, session=self, sock=target)
except exception.BoofuzzTargetConnectionReset:
self._fuzz_data_logger.log_fail(constants.ERR_CONN_RESET_FAIL)
except exception.BoofuzzTargetConnectionAborted as e:
self._fuzz_data_logger.log_info(
constants.ERR_CONN_ABORTED.format(socket_errno=e.socket_errno, socket_errmsg=e.socket_errmsg)
)
except exception.BoofuzzTargetConnectionFailedError:
self._fuzz_data_logger.log_fail(constants.ERR_CONN_FAILED)
except Exception:
self._fuzz_data_logger.log_error(
constants.ERR_CALLBACK_FUNC.format(func_name="post_send") + traceback.format_exc()
)
finally:
self._fuzz_data_logger.open_test_step("Cleaning up connections from callbacks")
def _post_send(self, target):
if len(self._post_test_case_methods) > 0:
try:
for f in self._post_test_case_methods:
self._fuzz_data_logger.open_test_step('Post- test case callback: "{0}"'.format(f.__name__))
f(target=target, fuzz_data_logger=self._fuzz_data_logger, session=self, sock=target)
except exception.BoofuzzTargetConnectionReset:
self._fuzz_data_logger.log_fail(constants.ERR_CONN_RESET_FAIL)
except exception.BoofuzzTargetConnectionAborted as e:
self._fuzz_data_logger.log_info(
constants.ERR_CONN_ABORTED.format(socket_errno=e.socket_errno, socket_errmsg=e.socket_errmsg)
)
except exception.BoofuzzTargetConnectionFailedError:
self._fuzz_data_logger.log_fail(constants.ERR_CONN_FAILED)
except Exception:
self._fuzz_data_logger.log_error(
constants.ERR_CALLBACK_FUNC.format(func_name="post_send") + traceback.format_exc()
)
finally:
self._fuzz_data_logger.open_test_step("Cleaning up connections from callbacks")
self._keep_web_open = keep_web_open
self.console_gui = console_gui
self._crash_threshold_node = crash_threshold_request
self._crash_threshold_element = crash_threshold_element
self.restart_sleep_time = restart_sleep_time
if fuzz_data_logger is not None:
raise exception.BoofuzzError("Session fuzz_data_logger is deprecated. Use fuzz_loggers instead!")
if fuzz_loggers is None:
fuzz_loggers = []
if self.console_gui and os.name != "nt":
fuzz_loggers.append(fuzz_logger_curses.FuzzLoggerCurses(web_port=self.web_port))
self._keep_web_open = False
if len(fuzz_loggers) == 0:
fuzz_loggers = [fuzz_logger_text.FuzzLoggerText()]
helpers.mkdir_safe(os.path.join(constants.RESULTS_DIR))
self._run_id = datetime.datetime.utcnow().replace(microsecond=0).isoformat().replace(":", "-")
self._db_filename = os.path.join(constants.RESULTS_DIR, "run-{0}.db".format(self._run_id))
self._db_logger = fuzz_logger_db.FuzzLoggerDb(
db_filename=self._db_filename, num_log_cases=fuzz_db_keep_only_n_pass_cases
)
self._crash_filename = "boofuzz-crash-bin-{0}".format(self._run_id)
self._fuzz_data_logger = fuzz_logger.FuzzLogger(fuzz_loggers=[self._db_logger] + fuzz_loggers)
self._check_data_received_each_request = check_data_received_each_request
self._receive_data_after_each_request = receive_data_after_each_request
self._receive_data_after_fuzz = receive_data_after_fuzz
self._skip_current_node_after_current_test_case = False
self._skip_current_element_after_current_test_case = False
if self.web_port is not None:
self.last_recv = self.targets[0].recv()
if self._check_data_received_each_request:
self._fuzz_data_logger.log_check("Verify some data was received from the target.")
if not self.last_recv:
# Assume a crash?
self._fuzz_data_logger.log_fail("Nothing received from target.")
else:
self._fuzz_data_logger.log_pass("Some data received from target.")
except exception.BoofuzzTargetConnectionReset:
if self._check_data_received_each_request:
self._fuzz_data_logger.log_fail(constants.ERR_CONN_RESET)
else:
self._fuzz_data_logger.log_info(constants.ERR_CONN_RESET)
except exception.BoofuzzTargetConnectionAborted as e:
msg = constants.ERR_CONN_ABORTED.format(socket_errno=e.socket_errno, socket_errmsg=e.socket_errmsg)
if self._check_data_received_each_request:
self._fuzz_data_logger.log_fail(msg)
else:
self._fuzz_data_logger.log_info(msg)
def _open_connection_keep_trying(self, target):
""" Open connection and if it fails, keep retrying.
Args:
target (Target): Target to open.
"""
if not self._reuse_target_connection:
out_of_available_sockets_count = 0
while True:
try:
target.open()
break # break if no exception
except exception.BoofuzzTargetConnectionFailedError:
self._fuzz_data_logger.log_info(constants.WARN_CONN_FAILED_TERMINAL)
self._restart_target(target)
except exception.BoofuzzOutOfAvailableSockets:
out_of_available_sockets_count += 1
if out_of_available_sockets_count == 50:
raise exception.BoofuzzError("There are no available sockets. Ending fuzzing.")
self._fuzz_data_logger.log_info("There are no available sockets. Waiting for another 5 seconds.")
time.sleep(5)
def _post_send(self, target):
if len(self._post_test_case_methods) > 0:
try:
for f in self._post_test_case_methods:
self._fuzz_data_logger.open_test_step('Post- test case callback: "{0}"'.format(f.__name__))
f(target=target, fuzz_data_logger=self._fuzz_data_logger, session=self, sock=target)
except exception.BoofuzzTargetConnectionReset:
self._fuzz_data_logger.log_fail(constants.ERR_CONN_RESET_FAIL)
except exception.BoofuzzTargetConnectionAborted as e:
self._fuzz_data_logger.log_info(
constants.ERR_CONN_ABORTED.format(socket_errno=e.socket_errno, socket_errmsg=e.socket_errmsg)
)
except exception.BoofuzzTargetConnectionFailedError:
self._fuzz_data_logger.log_fail(constants.ERR_CONN_FAILED)
except Exception:
self._fuzz_data_logger.log_error(
constants.ERR_CALLBACK_FUNC.format(func_name="post_send") + traceback.format_exc()
)
finally:
self._fuzz_data_logger.open_test_step("Cleaning up connections from callbacks")
def build_webapp_thread(self, port=constants.DEFAULT_WEB_UI_PORT):
app.session = self
http_server = HTTPServer(WSGIContainer(app))
while True:
try:
http_server.listen(port)
except socket.error as exc:
# Only handle "Address already in use"
if exc.errno != errno.EADDRINUSE:
raise
port += 1
else:
self._fuzz_data_logger.log_info("Web interface can be found at http://localhost:%d" % port)
break
flask_thread = threading.Thread(target=IOLoop.instance().start)
flask_thread.daemon = True
return flask_thread
if len(self._post_test_case_methods) > 0:
try:
for f in self._post_test_case_methods:
self._fuzz_data_logger.open_test_step('Post- test case callback: "{0}"'.format(f.__name__))
f(target=target, fuzz_data_logger=self._fuzz_data_logger, session=self, sock=target)
except exception.BoofuzzTargetConnectionReset:
self._fuzz_data_logger.log_fail(constants.ERR_CONN_RESET_FAIL)
except exception.BoofuzzTargetConnectionAborted as e:
self._fuzz_data_logger.log_info(
constants.ERR_CONN_ABORTED.format(socket_errno=e.socket_errno, socket_errmsg=e.socket_errmsg)
)
except exception.BoofuzzTargetConnectionFailedError:
self._fuzz_data_logger.log_fail(constants.ERR_CONN_FAILED)
except Exception:
self._fuzz_data_logger.log_error(
constants.ERR_CALLBACK_FUNC.format(func_name="post_send") + traceback.format_exc()
)
finally:
self._fuzz_data_logger.open_test_step("Cleaning up connections from callbacks")
default=constants.DEFAULT_WEB_UI_PORT,
)
@click.option(
"--ui-addr",
help="Address on which to serve the web interface (default localhost). Set to empty "
"string to serve on all interfaces.",
type=str,
default="localhost",
)
@click.argument("filename")
def open_file(debug, filename, ui_port, ui_addr):
if debug:
logging.basicConfig(level=logging.DEBUG)
sessions.open_test_run(db_filename=filename, port=ui_port, address=ui_addr)
print("Serving web page at http://{0}:{1}. Hit Ctrl+C to quit.".format(ui_addr, ui_port))