Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_response_headers_2(example):
resp, _ = example.send(h11.Request(method='GET', target='/',
headers=[('Host', 'example')]),
h11.EndOfMessage())
assert (b'set-cookie', b'sessionid=123456') in resp.headers
assert (b'set-cookie', b'__adtrack=abcdef') in resp.headers
def test_forwarding_requests_2(example):
resp, _, _ = example.send(h11.Request(method='GET', target='/',
headers=[('Host', 'example')]),
h11.EndOfMessage())
# ``develop1.example`` is unreachable
assert 500 <= resp.status_code <= 599
async def __anext__(self):
"""
Iterate over the body bytes of the response until end of message.
"""
event = await _read_until_event(
self._state_machine, self._sock, self.read_timeout
)
if isinstance(event, h11.Data):
return bytes(event.data)
elif isinstance(event, h11.EndOfMessage):
self._reset()
raise StopAsyncIteration
else:
# can't happen
raise RuntimeError("Unexpected h11 event {}".format(event))
)
self._state = ConnectionState.CLOSED
elif isinstance(event, h11.Response):
self._state = ConnectionState.REJECTING
self._events.append(
RejectConnection(
headers=event.headers,
status_code=event.status_code,
has_body=True,
)
)
elif isinstance(event, h11.Data):
self._events.append(
RejectData(data=event.data, body_finished=False)
)
elif isinstance(event, h11.EndOfMessage):
self._events.append(RejectData(data=b"", body_finished=True))
self._state = ConnectionState.CLOSED
else:
if isinstance(event, h11.Request):
self._events.append(self._process_connection_request(event))
async def send_echo_response(wrapper, request):
wrapper.info("Preparing echo response")
if request.method not in {b"GET", b"POST"}:
# Laziness: we should send a proper 405 Method Not Allowed with the
# appropriate Accept: header, but we don't.
raise RuntimeError("unsupported method")
response_json = {
"method": request.method.decode("ascii"),
"target": request.target.decode("ascii"),
"headers": [(name.decode("ascii"), value.decode("ascii"))
for (name, value) in request.headers],
"body": "",
}
while True:
event = await wrapper.next_event()
if type(event) is h11.EndOfMessage:
break
assert type(event) is h11.Data
response_json["body"] += event.data.decode("ascii")
response_body_unicode = json.dumps(response_json,
sort_keys=True,
indent=4,
separators=(",", ": "))
response_body_bytes = response_body_unicode.encode("utf-8")
await send_simple_response(wrapper,
200,
"application/json; charset=utf-8",
response_body_bytes)
def _send_body(self):
if self._response.body:
self.chunk(self._response.body)
self._log_headers(self._response.raw_headers)
self._handler.send_event(h11.EndOfMessage(
headers=_encode_headers(self._response.raw_headers),
))
def handle_no_connect(self, event):
headers = [
(b"content-type", b"text/plain; charset=utf-8"),
(b"connection", b"close"),
]
msg = h11.Response(status_code=400, headers=headers, reason="Bad Request")
output = self.conn.send(msg)
msg = h11.Data(data=event.reason.encode("utf-8"))
output += self.conn.send(msg)
msg = h11.EndOfMessage()
output += self.conn.send(msg)
self.transport.write(output)
self.transport.close()
def send_response(self, response):
logger.debug("sent response to {0}: {1}".format(self.conn_type, response))
self.send(h11.Response(
status_code=int(response.code),
reason=response.reason,
headers=response.headers,
))
if response.body:
self.send(h11.Data(data=response.body))
if not self.our_state == h11.SWITCHED_PROTOCOL:
self.send(h11.EndOfMessage())
if self.our_state is h11.MUST_CLOSE:
self.io_stream.close()
def _request_bytes_iterable(request, state_machine):
"""
An iterable that serialises a set of bytes for the body.
"""
h11_request = h11.Request(
method=request.method,
target=request.target,
headers=_stringify_headers(request.headers.items()),
)
yield state_machine.send(h11_request)
for chunk in _make_body_iterable(request.body):
yield state_machine.send(h11.Data(data=chunk))
yield state_machine.send(h11.EndOfMessage())