Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_request_bytes_iterable(self):
# Assert that we send the first set of body bytes with the request packet.
body_bytes = [b"Hello, ", b"world!"]
body_size = 13
request = Request(
method=b"POST",
target="post",
body=body_bytes,
headers={"Content-Length": body_size},
)
request.add_host("httpbin.org", port=80, scheme="http")
state_machine = h11.Connection(our_role=h11.CLIENT)
iterable = _request_bytes_iterable(request, state_machine)
first_packet, second_packet = next(iterable), next(iterable)
assert request.method in first_packet
assert body_bytes[0] in first_packet
assert body_bytes[1] in second_packet
with pytest.raises(StopIteration):
next(iterable)
def __init__(self, *, framework: ASGIFramework = echo_framework) -> None:
self.client_stream, server_stream = trio.testing.memory_stream_pair()
server_stream.socket = MockSocket()
self.client = h11.Connection(h11.CLIENT)
self.server = H11Server(framework, Config(), server_stream)
def _make_handshake(
request_headers: Headers,
accept_headers: Optional[Headers] = None,
subprotocol: Optional[str] = None,
extensions: Optional[List[Extension]] = None,
) -> Tuple[h11.InformationalResponse, bytes]:
client = h11.Connection(h11.CLIENT)
server = WSConnection(SERVER)
nonce = generate_nonce()
server.receive_data(
client.send(
h11.Request(
method="GET",
target="/",
headers=[
(b"Host", b"localhost"),
(b"Connection", b"Keep-Alive, Upgrade"),
(b"Upgrade", b"WebSocket"),
(b"Sec-WebSocket-Version", b"13"),
(b"Sec-WebSocket-Key", nonce),
]
+ request_headers,
)
def __init__(
self,
app: 'Quart',
loop: asyncio.AbstractEventLoop,
transport: asyncio.BaseTransport,
logger: Optional[Logger],
access_log_format: str,
timeout: int,
) -> None:
super().__init__(app, loop, transport, logger, access_log_format, timeout)
self.connection = h11.Connection(h11.SERVER)
def forward(request, hostname, port, target, tls=None):
hconn = h11.Connection(our_role=h11.CLIENT)
if tls is None:
tls = (port == 443)
headers = _forward_headers(request.raw_headers, request.http_version,
also_exclude=['Host'])
# RFC 7230 recommends that ``Host`` be the first header.
headers.insert(0, ('Host', _generate_host_header(hostname, port, tls)))
headers.append(('Connection', 'close'))
sock = socket.create_connection((hostname, port))
try:
if tls:
# We intentionally ignore server certificates. In this context,
# they are more likely to be a nuisance than a boon.
ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
sock = ssl_context.wrap_socket(sock, server_hostname=hostname)
def _tunnel(self, conn):
"""
This method establishes a CONNECT tunnel shortly after connection.
"""
# Basic sanity check that _tunnel is only called at appropriate times.
assert self._state_machine.our_state is h11.IDLE
tunnel_request = _build_tunnel_request(
self._tunnel_host, self._tunnel_port, self._tunnel_headers
)
tunnel_state_machine = h11.Connection(our_role=h11.CLIENT)
bytes_to_send = _request_to_bytes(tunnel_request, tunnel_state_machine)
bytes_to_send += tunnel_state_machine.send(h11.EndOfMessage())
# First, register the socket with the selector. We want to look for
# readability *and* writability, because if the socket suddenly becomes
# readable we need to stop our upload immediately. Then, send the
# request.
# Because this method is called before we have fully set the connection
# up, we need to briefly register the socket with the connection.
conn.setblocking(0)
self._sock = conn
self._selector.register(
self._sock, selectors.EVENT_READ | selectors.EVENT_WRITE
)
self._send_unless_readable(tunnel_state_machine, bytes_to_send)
def __init__(
self,
origin: typing.Union[str, Origin],
ssl: SSLConfig = DEFAULT_SSL_CONFIG,
timeout: TimeoutConfig = DEFAULT_TIMEOUT_CONFIG,
on_release: typing.Callable = None,
):
self.origin = Origin(origin) if isinstance(origin, str) else origin
self.ssl = ssl
self.timeout = timeout
self.on_release = on_release
self._reader = None
self._writer = None
self._h11_state = h11.Connection(our_role=h11.CLIENT)
"""
Opens a connection to the server.
"""
sock_args = dict(
host=self.host,
port=self.port,
)
if self.ssl:
sock_args.update(dict(
ssl=self.ssl,
server_hostname=self.host
))
self.sock = await curio.open_connection(**sock_args)
self.state = h11.Connection(our_role=h11.CLIENT)
logger.debug('Opened %r', self)