Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def mk_stream(h2_stream, metadata):
stream = Stream(h2_stream, '/svc/Method', Cardinality.UNARY_UNARY,
DummyRequest, DummyReply, codec=ProtoCodec(),
status_details_codec=None,
dispatch=_DispatchServerEvents())
stream.metadata = metadata
return stream
async def test_connection_error():
class BrokenChannel:
_calls_started = 0
def __connect__(self):
raise IOError('Intentionally broken connection')
stream = Stream(BrokenChannel(), '/foo/bar', MultiDict(),
Cardinality.UNARY_UNARY, DummyRequest, DummyReply,
codec=ProtoCodec(), status_details_codec=None,
dispatch=_DispatchChannelEvents())
with pytest.raises(IOError) as err:
async with stream:
await stream.send_request()
err.match('Intentionally broken connection')
async def test_deadline(
loop, caplog, handler, level, msg, exc_type, exc_text
):
caplog.set_level(logging.INFO)
stream = H2StreamStub()
headers = [
(':method', 'POST'),
(':path', '/package.Service/Method'),
('te', 'trailers'),
('content-type', 'application/grpc'),
('grpc-timeout', '10m'),
]
methods = {'/package.Service/Method': Handler(
handler,
Cardinality.UNARY_UNARY,
DummyRequest,
DummyReply,
)}
task = loop.create_task(
call_handler(methods, stream, headers)
)
await asyncio.wait_for(task, 0.1)
assert stream.__events__ == [
SendHeaders(headers=[
(':status', '200'),
('content-type', 'application/grpc+proto'),
('grpc-status', '4'), # DEADLINE_EXCEEDED
], end_stream=True),
Reset(ErrorCodes.NO_ERROR),
]
def __init__(self, *, client_conn=None,
send_type=None, recv_type=None,
path='/foo/bar', codec=ProtoCodec(),
cardinality=Cardinality.UNARY_UNARY,
connect_time=None,
timeout=None, deadline=None, metadata=None):
self.client_conn = client_conn or ClientConn()
channel = client.Channel(port=-1, codec=codec)
self.client_stream = channel.request(
path, cardinality, send_type, recv_type,
timeout=timeout, deadline=deadline, metadata=metadata,
)
self.client_stream._channel = ChannelStub(self.client_conn.client_proto,
connect_time=connect_time)
def __mapping__(self) -> typing.Dict[str, grpclib.const.Handler]:
return {
'/grpc.health.v1.Health/Check': grpclib.const.Handler(
self.Check,
grpclib.const.Cardinality.UNARY_UNARY,
grpclib.health.v1.health_pb2.HealthCheckRequest,
grpclib.health.v1.health_pb2.HealthCheckResponse,
),
'/grpc.health.v1.Health/Watch': grpclib.const.Handler(
self.Watch,
grpclib.const.Cardinality.UNARY_STREAM,
grpclib.health.v1.health_pb2.HealthCheckRequest,
grpclib.health.v1.health_pb2.HealthCheckResponse,
),
return {
'/ClientMaster.AngelCleintMaster/RegisterWorker': grpclib.const.Handler(
self.RegisterWorker,
grpclib.const.Cardinality.UNARY_UNARY,
client_master_pb2.RegisterWorkerReq,
client_master_pb2.RegisterWorkerResp,
),
'/ClientMaster.AngelCleintMaster/RegisterTask': grpclib.const.Handler(
self.RegisterTask,
grpclib.const.Cardinality.UNARY_UNARY,
client_master_pb2.RegisterTaskReq,
client_master_pb2.RegisterTaskResp,
),
'/ClientMaster.AngelCleintMaster/SetAngelLocation': grpclib.const.Handler(
self.SetAngelLocation,
grpclib.const.Cardinality.UNARY_UNARY,
client_master_pb2.SetAngelLocationReq,
common_pb2.VoidResp,
),
'/ClientMaster.AngelCleintMaster/GetAngelLocation': grpclib.const.Handler(
self.GetAngelLocation,
grpclib.const.Cardinality.UNARY_UNARY,
common_pb2.VoidReq,
client_master_pb2.GetAngelLocationResp,
),
'/ClientMaster.AngelCleintMaster/HeartBeat': grpclib.const.Handler(
self.HeartBeat,
grpclib.const.Cardinality.UNARY_UNARY,
client_master_pb2.HeartBeatReq,
client_master_pb2.HeartBeatResp,
),
'/ClientMaster.AngelCleintMaster/Clock': grpclib.const.Handler(
def __mapping__(self) -> typing.Dict[str, grpclib.const.Handler]:
return {
'/remote.Remoting/Connect': grpclib.const.Handler(
self.Connect,
grpclib.const.Cardinality.UNARY_UNARY,
protoactor.remote.protos_remote_pb2.ConnectRequest,
protoactor.remote.protos_remote_pb2.ConnectResponse,
),
'/remote.Remoting/Receive': grpclib.const.Handler(
self.Receive,
grpclib.const.Cardinality.STREAM_STREAM,
protoactor.remote.protos_remote_pb2.MessageBatch,
protoactor.remote.protos_remote_pb2.Unit,
),
def __mapping__(self):
return {
'/DotaOptimizer/Rollout': grpclib.const.Handler(
self.Rollout,
grpclib.const.Cardinality.UNARY_UNARY,
protos.DotaOptimizer_pb2.RolloutData,
protos.DotaOptimizer_pb2.Empty2,
),
'/DotaOptimizer/GetWeights': grpclib.const.Handler(
self.GetWeights,
grpclib.const.Cardinality.UNARY_UNARY,
protos.DotaOptimizer_pb2.WeightQuery,
protos.DotaOptimizer_pb2.Weights,
),
def __mapping__(self) -> typing.Dict[str, grpclib.const.Handler]:
return {
'/helloworld.Greeter/UnaryUnaryGreeting': grpclib.const.Handler(
self.UnaryUnaryGreeting,
grpclib.const.Cardinality.UNARY_UNARY,
streaming.helloworld_pb2.HelloRequest,
streaming.helloworld_pb2.HelloReply,
),
'/helloworld.Greeter/UnaryStreamGreeting': grpclib.const.Handler(
self.UnaryStreamGreeting,
grpclib.const.Cardinality.UNARY_STREAM,
streaming.helloworld_pb2.HelloRequest,
streaming.helloworld_pb2.HelloReply,
),
'/helloworld.Greeter/StreamUnaryGreeting': grpclib.const.Handler(
self.StreamUnaryGreeting,
grpclib.const.Cardinality.STREAM_UNARY,
streaming.helloworld_pb2.HelloRequest,
streaming.helloworld_pb2.HelloReply,
),
'/helloworld.Greeter/StreamStreamGreeting': grpclib.const.Handler(