Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_send_trailing_metadata_twice(stream):
async with stream:
await stream.send_trailing_metadata(status=Status.UNKNOWN)
with pytest.raises(ProtocolError) as err:
await stream.send_trailing_metadata(status=Status.UNKNOWN)
err.match('Trailing metadata was already sent')
to_server_transport = TransportStub(server_h2c)
client_conn = Connection(client_h2c, to_server_transport, config=config)
client_proc = EventsProcessor(DummyHandler(), client_conn)
client_stream = client_conn.create_stream()
await client_stream.send_request(create_headers(), _processor=client_proc)
await client_stream.send_data(b'x' * data_size)
assert client_h2c.outbound_flow_control_window == initial_window - data_size
to_server_transport.process(server_proc)
# server_handler.stream.recv_data(data_size) intentionally not called
await server_handler.stream.send_headers([ # trailers-only error
(':status', '200'),
('content-type', 'application/grpc+proto'),
('grpc-status', str(Status.UNKNOWN.value)),
], end_stream=True)
to_client_transport.process(client_proc)
assert client_h2c.outbound_flow_control_window == initial_window - data_size
server_handler.release_stream() # should ack received data
assert client_h2c.outbound_flow_control_window == initial_window
server_conn = Connection(server_h2c, to_client_transport, config=config)
server_proc = EventsProcessor(server_handler, server_conn)
to_server_transport = TransportStub(server_h2c)
client_conn = Connection(client_h2c, to_server_transport, config=config)
client_proc = EventsProcessor(DummyHandler(), client_conn)
client_stream = client_conn.create_stream()
await client_stream.send_request(create_headers(), _processor=client_proc)
to_server_transport.process(server_proc)
# server_handler.stream.recv_data(data_size) intentionally not called
await server_handler.stream.send_headers([ # trailers-only error
(':status', '200'),
('content-type', 'application/grpc+proto'),
('grpc-status', str(Status.UNKNOWN.value)),
], end_stream=True)
to_client_transport.process(client_proc)
server_handler.release_stream()
assert client_h2c.outbound_flow_control_window == initial_window
await client_stream.send_data(b'x' * data_size)
assert client_h2c.outbound_flow_control_window == 1
to_server_transport.process(server_proc)
# client-side flow control window will increase to initial value eventually
assert client_h2c.outbound_flow_control_window > 1
async def test_error_after_send_message(stream, stub):
async with stream:
await stream.send_message(DummyReply(value='pong'))
raise Exception()
assert stub.__events__ == [
SendHeaders(
[(':status', '200'),
('content-type', 'application/grpc+proto')],
end_stream=False,
),
SendData(
encode_message(DummyReply(value='pong')),
end_stream=False,
),
SendHeaders(
[('grpc-status', str(Status.UNKNOWN.value)),
('grpc-message', 'Internal Server Error')],
end_stream=True,
),
Reset(ErrorCodes.NO_ERROR),
]
cs.client_conn.server_h2c.send_data(
stream_id,
grpc_encode(DummyReply(value='pong'), DummyReply),
)
cs.client_conn.server_h2c.send_headers(stream_id, [
('foo', 'bar'),
], end_stream=True)
cs.client_conn.server_flush()
await stream.recv_initial_metadata()
await stream.recv_message()
try:
await stream.recv_trailing_metadata()
except GRPCError as exc:
assert exc
assert exc.status == Status.UNKNOWN
assert exc.message == 'Missing grpc-status header'
raise ErrorDetected()
def _raise_for_grpc_status(self, headers_map: Dict[str, str]) -> None:
grpc_status = headers_map.get('grpc-status')
if grpc_status is None:
raise GRPCError(Status.UNKNOWN, 'Missing grpc-status header')
try:
status = Status(int(grpc_status))
except ValueError:
raise GRPCError(Status.UNKNOWN, ('Invalid grpc-status: {!r}'
.format(grpc_status)))
else:
if status is not Status.OK:
message = headers_map.get('grpc-message')
if message is not None:
message = decode_grpc_message(message)
details = None
if self._status_details_codec is not None:
details_bin = headers_map.get(_STATUS_DETAILS_KEY)
if details_bin is not None:
details = self._status_details_codec.decode(
status, message,
decode_bin_value(details_bin.encode('ascii'))
)
raise GRPCError(status, message, details)
def _raise_for_grpc_status(self, headers_map: Dict[str, str]) -> None:
grpc_status = headers_map.get('grpc-status')
if grpc_status is None:
raise GRPCError(Status.UNKNOWN, 'Missing grpc-status header')
try:
status = Status(int(grpc_status))
except ValueError:
raise GRPCError(Status.UNKNOWN, ('Invalid grpc-status: {!r}'
.format(grpc_status)))
else:
if status is not Status.OK:
message = headers_map.get('grpc-message')
if message is not None:
message = decode_grpc_message(message)
details = None
if self._status_details_codec is not None:
details_bin = headers_map.get(_STATUS_DETAILS_KEY)
if details_bin is not None:
details = self._status_details_codec.decode(
status, message,
def _raise_for_status(self, headers_map: Dict[str, str]) -> None:
status = headers_map[':status']
if status is not None and status != _H2_OK:
grpc_status = _H2_TO_GRPC_STATUS_MAP.get(status, Status.UNKNOWN)
raise GRPCError(grpc_status,
'Received :status = {!r}'.format(status))
headers: _Headers,
codec: CodecBase,
status_details_codec: Optional[StatusDetailsCodecBase],
dispatch: _DispatchServerEvents,
release_stream: Callable[[], Any],
) -> None:
try:
headers_map = dict(headers)
if headers_map[':method'] != 'POST':
await _abort(_stream, 405)
return
content_type = headers_map.get('content-type')
if content_type is None:
await _abort(_stream, 415, Status.UNKNOWN,
'Missing content-type header')
return
base_content_type, _, sub_type = content_type.partition('+')
sub_type = sub_type or ProtoCodec.__content_subtype__
if (
base_content_type != GRPC_CONTENT_TYPE
or sub_type != codec.__content_subtype__
):
await _abort(_stream, 415, Status.UNKNOWN,
'Unacceptable content-type header')
return
if headers_map.get('te') != 'trailers':
await _abort(_stream, 400, Status.UNKNOWN,
'Required "te: trailers" header is missing')
status_details = exc_val.details
elif isinstance(exc_val, Exception):
status = Status.UNKNOWN
status_message = 'Internal Server Error'
status_details = None
else:
# propagate exception
return None
elif (
# There is a possibility of a ``ProtocolError`` in the
# ``send_trailing_metadata`` method, so we are checking for such
# errors here
not self._cardinality.server_streaming
and not self._send_message_done
):
status = Status.UNKNOWN
status_message = 'Internal Server Error'
status_details = None
protocol_error = ('Unary response with OK status requires '
'a single message to be sent: {!r}'
.format(self._method_name))
else:
status = Status.OK
status_message = None
status_details = None
try:
await self.send_trailing_metadata(status=status,
status_message=status_message,
status_details=status_details)
except h2.exceptions.StreamClosedError:
pass