Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, ctx):
""" Create a RealTrafficCollector object.
Subscribe itself to RAW_REQUEST, RAW_RESPONSE and RESET.
"""
self._ctx = ctx
self._current_key = None
self._request_buffer = ""
self._response_buffer = ""
self._request_time = None
self._response_time = None
self._state = collector.COLLECT_STATE.FINISH_COLLECT
self._ctx.broker.subscribe(broker.TOPICS.RAW_REQUEST,
self.collect_raw_request)
self._ctx.broker.subscribe(broker.TOPICS.RAW_RESPONSE,
self.collect_raw_response)
self._ctx.broker.subscribe(broker.TOPICS.RESET, self._reset)
""" Create a LogCollector object.
Initialize SwitchCollector with delimiter log.
Subscribe itself to RAW_LOG and RESET.
"""
self._ctx = ctx
self._unique_id_pattern = re.compile(
r'\[unique_id "([^\]]+)"\]'
)
super(LogCollector, self).__init__(
self._ctx.delimiter.get_delimiter_log(),
self._ctx.delimiter.get_delimiter_log(),
save_entire_line = True)
self._reset()
self._ctx.broker.subscribe(broker.TOPICS.RAW_LOG, self)
self._ctx.broker.subscribe(broker.TOPICS.RESET, self._reset)
def __del__(self):
self._ctx.broker.unsubscribe(broker.TOPICS.PYWB_OUTPUT, self)
self._ctx.broker.unsubscribe(broker.TOPICS.RESET, self._reset)
def _start_experiment(self, destination):
if not os.path.exists(self._conf.pkt_path):
self._gen_requests()
if not os.path.exists(self._conf.pkt_path):
self._ctx.broker.publish(
broker.TOPICS.WARNING,
" %s is not existed " % (self._conf.pkt_path, ))
return
self._ctx.broker.publish(broker.TOPICS.SQL_COMMAND,
sql.SQL_CLEAN_RAW_DATA)
self._ctx.broker.publish(broker.TOPICS.RESET)
def collect_pywb_ouput(line):
self._ctx.broker.publish(broker.TOPICS.PYWB_OUTPUT, line)
ret = pywb.execute([
"-F", self._conf.pkt_path, "-v", "4", destination, "-n", "1", "-c",
"1", "-r", "-s",
str(self._conf.timeout), "-o", "/dev/null"],
customized_filters=[collect_pywb_ouput])
return ret