Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
await client_stream.send_request(create_headers(), _processor=client_proc)
initial_window = server_h2c.local_settings.initial_window_size
assert (client_h2c.local_flow_control_window(client_stream.id)
== initial_window)
# data should be bigger than window size
data = b'0' * (initial_window + 1)
size = len(data)
# sending less than a full message
await client_stream.send_data(data[:initial_window - 1])
# let server process it's events
server_processor = EventsProcessor(DummyHandler(), server_conn)
to_server_transport.process(server_processor)
# checking window size was decreased
assert client_h2c.local_flow_control_window(client_stream.id) == 1
# simulate that server is waiting for the size of a message and should
# acknowledge that size as soon as it will be received
server_stream, = server_processor.streams.values()
recv_task = loop.create_task(server_stream.recv_data(size))
await asyncio.wait([recv_task], timeout=.01)
assert server_stream.buffer._acked_size == initial_window - 1
# check that server acknowledged received partial data
assert client_h2c.local_flow_control_window(client_stream.id) > 1
# sending remaining data and recv_task should finish
def __mapping__(self):
return {
'/ping.PingService/UnaryUnary': grpclib.const.Handler(
self.UnaryUnary,
grpclib.const.Cardinality.UNARY_UNARY,
None,
None,
),
def __mapping__(self) -> typing.Dict[str, grpclib.const.Handler]:
return {
'/dummy.DummyService/UnaryUnary': grpclib.const.Handler(
self.UnaryUnary,
grpclib.const.Cardinality.UNARY_UNARY,
dummy_pb2.DummyRequest,
dummy_pb2.DummyReply,
),
'/dummy.DummyService/UnaryStream': grpclib.const.Handler(
self.UnaryStream,
grpclib.const.Cardinality.UNARY_STREAM,
dummy_pb2.DummyRequest,
dummy_pb2.DummyReply,
),
'/dummy.DummyService/StreamUnary': grpclib.const.Handler(
self.StreamUnary,
grpclib.const.Cardinality.STREAM_UNARY,
dummy_pb2.DummyRequest,
dummy_pb2.DummyReply,
),
'/dummy.DummyService/StreamStream': grpclib.const.Handler(
self.StreamStream,
grpclib.const.Cardinality.STREAM_STREAM,
dummy_pb2.DummyRequest,
dummy_pb2.DummyReply,
),
def mk_stream(h2_stream, metadata):
stream = Stream(h2_stream, '/svc/Method', Cardinality.UNARY_UNARY,
DummyRequest, DummyReply, codec=ProtoCodec(),
status_details_codec=None,
dispatch=_DispatchServerEvents())
stream.metadata = metadata
return stream
def __init__(self, channel):
self.UnaryUnary = UnaryUnaryMethod(
channel,
'/ping.PingService/UnaryUnary',
None,
None,
)
def __init__(self, channel: grpclib.client.Channel) -> None:
self.Plaster = grpclib.client.UnaryUnaryMethod(
channel,
'/Bombed/Plaster',
bombed_pb2.SavoysRequest,
bombed_pb2.SavoysReply,
)
self.Benzine = grpclib.client.UnaryStreamMethod(
channel,
'/Bombed/Benzine',
bombed_pb2.SavoysRequest,
bombed_pb2.GoowyChunk,
)
self.Anginal = grpclib.client.StreamUnaryMethod(
channel,
'/Bombed/Anginal',
bombed_pb2.UnyoungChunk,
bombed_pb2.SavoysReply,
async def StreamUnary(self, stream):
raise GRPCError(Status.UNIMPLEMENTED)
async def StreamUnary(self, stream):
raise GRPCError(Status.UNIMPLEMENTED)
async def UnaryUnary(self, stream):
raise GRPCError(Status.UNIMPLEMENTED)
async def test_error_before_send_initial_metadata(stream, stub):
async with stream:
raise Exception()
assert stub.__events__ == [
SendHeaders(
[(':status', '200'),
('content-type', 'application/grpc+proto'),
('grpc-status', str(Status.UNKNOWN.value)),
('grpc-message', 'Internal Server Error')],
end_stream=True,
),
Reset(ErrorCodes.NO_ERROR),
]