Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def load(l):
ctx.log.info("This is some informative text.")
ctx.log.warn("This is a warning.")
ctx.log.error("This is an error.")
def response(flow: http.HTTPFlow) -> None:
routers = utils.readFile(ROUTER_FILE)
url = flow.request.url
if routers is not None:
for patternURL, yamlfilename in routers.items():
if re.match(patternURL, url) is not None:
yamlfile = DATA_DIR + str(yamlfilename) + '.yaml'
ctx.log.info('>>> FOUND "' + url + '" to replace strings from "' + yamlfile + '"')
data = utils.readFile(yamlfile)
ctx.log.info(data)
if data is not None:
for old, new in data.items():
flow.response.content = flow.response.content.replace(bytes(old.encode('utf8')), bytes(new.encode('utf8')))
def configure(self, updated):
"""
.replacements is a list of tuples (fpat, rex, s):
fpatt: a string specifying a filter pattern.
rex: a regular expression, as string.
s: the replacement string
"""
if "replacements" in updated:
lst = []
for rep in ctx.options.replacements:
fpatt, rex, s = parse_hook(rep)
flt = flowfilter.parse(fpatt)
if not flt:
raise exceptions.OptionsError(
"Invalid filter pattern: %s" % fpatt
)
try:
# We should ideally escape here before trying to compile
re.compile(rex)
except re.error as e:
raise exceptions.OptionsError(
"Invalid regular expression: %s - %s" % (rex, str(e))
)
if s.startswith("@") and not os.path.isfile(s[1:]):
raise exceptions.OptionsError(
def create_client_proxy_ssl_conn(self, tls_start: tls.StartHookData) -> None:
tls_method, tls_options = net_tls.VERSION_CHOICES[ctx.options.ssl_version_client]
cert, key, chain_file = self.get_cert(tls_start.context)
if ctx.options.add_upstream_certs_to_client_chain:
raise NotImplementedError()
else:
extra_chain_certs = None
ssl_ctx = net_tls.create_server_context(
cert=cert,
key=key,
method=tls_method,
options=tls_options,
cipher_list=ctx.options.ciphers_client or DEFAULT_CLIENT_CIPHERS,
dhparams=self.certstore.dhparams,
chain_file=chain_file,
alpn_select_callback=alpn_select_callback,
extra_chain_certs=extra_chain_certs,
)
tls_start.ssl_conn = SSL.Connection(ssl_ctx)
tls_start.ssl_conn.set_app_data({
"server_alpn": tls_start.context.server.alpn
})
tls_start.ssl_conn.set_accept_state()
def request(flow: http.HTTPFlow) -> None:
matches = utils.readFile(CONFIG_FILE)
url = flow.request.url
if matches is not None:
for patternURL, dumpFolder in matches.items():
if not os.path.exists(dumpFolder):
os.makedirs(dumpFolder)
if re.match(patternURL, url) is not None:
dumpFile = dumpFolder + '/' + str(int(round(time.time() * 1000)))
ctx.log.info('>>> Dump ' + url + ' to ' + dumpFile)
ctx.master.commands.call("export.file", 'curl', flow, dumpFile)
def request(self, flow: mitmproxy.http.HTTPFlow):
# 过滤非企查查接口地址
if flow.request.host != "appv3.qichacha.net":
return
ctx.log.info(f"sign is: {flow.request.query.get('sign')},时间戳 is: {flow.request.query.get('timestamp')}")
def configure(self, updated):
# We're already streaming - stop the previous stream and restart
if "save_stream_filter" in updated:
if ctx.options.save_stream_filter:
self.filt = flowfilter.parse(ctx.options.save_stream_filter)
if not self.filt:
raise exceptions.OptionsError(
"Invalid filter specification: %s" % ctx.options.save_stream_filter
)
else:
self.filt = None
if "save_stream_file" in updated or "save_stream_filter" in updated:
if self.stream:
self.done()
if ctx.options.save_stream_file:
self.start_stream_to_path(ctx.options.save_stream_file, self.filt)
def _echo_message(self, message, flow):
_, lines, error = contentviews.get_message_content_view(
ctx.options.dumper_default_contentview,
message,
flow
)
if error:
ctx.log.debug(error)
if ctx.options.flow_detail == 3:
lines_to_echo = itertools.islice(lines, 70)
else:
lines_to_echo = lines
styles = dict(
highlight=dict(bold=True),
offset=dict(fg="blue"),
header=dict(fg="green", bold=True),
text=dict(fg="green")
)
content = u"\r\n".join(
u"".join(colorful(line, styles)) for line in lines_to_echo
)
def layout_cycle(self) -> None:
"""
Cycle through the console layout options.
"""
opts = self.layout_options()
off = self.layout_options().index(ctx.options.console_layout)
ctx.options.update(
console_layout = opts[(off + 1) % len(opts)]
)
def done(self):
if ctx.options.command_history and len(self.history) > self.VACUUM_SIZE:
# vacuum history so that it doesn't grow indefinitely.
history_str = "\n".join(self.history[-self.VACUUM_SIZE / 2:]) + "\n"
self.history_file.write_text(history_str)