Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_reader_incomplete_error(self):
s = BytesIO(b"foobar")
s = tcp.Reader(s)
with pytest.raises(exceptions.TcpReadIncomplete):
s.safe_read(10)
def test_load_file(tmpdir):
s = serverplayback.ServerPlayback()
with taddons.context(s):
fpath = str(tmpdir.join("flows"))
tdump(fpath, [tflow.tflow(resp=True)])
s.load_file(fpath)
assert s.flowmap
with pytest.raises(exceptions.CommandError):
s.load_file("/nonexistent")
def test_configure():
up = upstream_auth.UpstreamAuth()
with taddons.context() as tctx:
tctx.configure(up, upstream_auth="test:test")
assert up.auth == b"Basic" + b" " + base64.b64encode(b"test:test")
tctx.configure(up, upstream_auth="test:")
assert up.auth == b"Basic" + b" " + base64.b64encode(b"test:")
tctx.configure(up, upstream_auth=None)
assert not up.auth
with pytest.raises(exceptions.OptionsError):
tctx.configure(up, upstream_auth="")
with pytest.raises(exceptions.OptionsError):
tctx.configure(up, upstream_auth=":")
with pytest.raises(exceptions.OptionsError):
tctx.configure(up, upstream_auth=":test")
r, _, _ = select.select([self.rfile], [], [], 0.05)
except OSError: # pragma: no cover
return # this is not reliably triggered due to its nature, so we exclude it from coverage.
delta = time.time() - starttime
if not r and self.timeout and delta > self.timeout:
return
try:
self.terminate.get_nowait()
return
except queue.Empty:
pass
for rfile in r:
with self.logger.ctx() as log:
try:
frm = websockets.Frame.from_file(self.rfile)
except exceptions.TcpDisconnect:
return
self.frames_queue.put(frm)
log("<< %s" % repr(frm.header))
if self.ws_read_limit is not None:
self.ws_read_limit -= 1
starttime = time.time()
def add(view):
# TODO: auto-select a different name (append an integer?)
for i in views:
if i.name == view.name:
raise exceptions.ContentViewException("Duplicate view: " + view.name)
# TODO: the UI should auto-prompt for a replacement shortcut
for prompt in view_prompts:
if prompt[1] == view.prompt[1]:
raise exceptions.ContentViewException("Duplicate view shortcut: " + view.prompt[1])
views.append(view)
for ct in view.content_types:
l = content_types_map.setdefault(ct, [])
l.append(view)
view_prompts.append(view.prompt)
def send(self, msg, force=False):
if self.state not in {"start", "taken"}:
raise exceptions.ControlException(
"Reply is {}, but expected it to be start or taken.".format(self.state)
)
if self.has_message and not force:
raise exceptions.ControlException("There is already a reply message.")
self.value = msg
# SSL_read() will yield SSL_ERROR_WANT_READ or SSL_ERROR_WANT_WRITE.
if (time.time() - start) < self.o.gettimeout():
time.sleep(0.1)
continue
else:
raise exceptions.TcpTimeout()
except socket.timeout:
raise exceptions.TcpTimeout()
except socket.error as e:
raise exceptions.TcpDisconnect(str(e))
except SSL.SysCallError as e:
if e.args == (-1, 'Unexpected EOF'):
break
raise exceptions.TlsException(str(e))
except SSL.Error as e:
raise exceptions.TlsException(str(e))
self.first_byte_timestamp = self.first_byte_timestamp or time.time()
if not data:
break
result += data
if length != -1:
length -= len(data)
self.add_log(result)
return result
# In all modes, we directly connect to the server displayed
if self.options.mode.startswith("upstream:"):
server_address = server_spec.parse_with_mode(self.options.mode)[1].address
server = connections.ServerConnection(server_address)
server.connect()
if r.scheme == "https":
connect_request = http.make_connect_request((r.data.host, r.port))
server.wfile.write(http1.assemble_request(connect_request))
server.wfile.flush()
resp = http1.read_response(
server.rfile,
connect_request,
body_size_limit=bsl
)
if resp.status_code != 200:
raise exceptions.ReplayException(
"Upstream server refuses CONNECT request"
)
server.establish_tls(
sni=f.server_conn.sni,
**tls.client_arguments_from_options(self.options)
)
r.first_line_format = "relative"
else:
r.first_line_format = "absolute"
else:
server_address = (r.host, r.port)
server = connections.ServerConnection(server_address)
server.connect()
if r.scheme == "https":
server.establish_tls(
sni=f.server_conn.sni,
def handle_client_connection(self, request, client_address):
h = PathodHandler(
request,
client_address,
self,
self.logfp,
self.settings,
self.http2_framedump,
)
try:
h.handle()
h.finish()
except exceptions.TcpDisconnect: # pragma: no cover
log.write_raw(self.logfp, "Disconnect")
self.add_log(
dict(
type="error",
msg="Disconnect"
)
)
return
except exceptions.TcpTimeout:
log.write_raw(self.logfp, "Timeout")
self.add_log(
dict(
type="timeout",
)
)
return
def killable(self):
return (
self.reply and
self.reply.state in {"start", "taken"} and
self.reply.value != exceptions.Kill
)