Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
dummy_pb2.DummyRequest,
dummy_pb2.DummyReply,
)
self.UnaryStream = grpclib.client.UnaryStreamMethod(
channel,
'/dummy.DummyService/UnaryStream',
dummy_pb2.DummyRequest,
dummy_pb2.DummyReply,
)
self.StreamUnary = grpclib.client.StreamUnaryMethod(
channel,
'/dummy.DummyService/StreamUnary',
dummy_pb2.DummyRequest,
dummy_pb2.DummyReply,
)
self.StreamStream = grpclib.client.StreamStreamMethod(
channel,
'/dummy.DummyService/StreamStream',
dummy_pb2.DummyRequest,
dummy_pb2.DummyReply,
)
def __init__(self, channel: grpclib.client.Channel) -> None:
self.ServerReflectionInfo = grpclib.client.StreamStreamMethod(
channel,
'/grpc.reflection.v1.ServerReflection/ServerReflectionInfo',
grpclib.reflection.v1.reflection_pb2.ServerReflectionRequest,
grpclib.reflection.v1.reflection_pb2.ServerReflectionResponse,
)
def __init__(self, channel: grpclib.client.Channel) -> None:
self.ServerReflectionInfo = grpclib.client.StreamStreamMethod(
channel,
'/grpc.reflection.v1alpha.ServerReflection/ServerReflectionInfo',
grpclib.reflection.v1alpha.reflection_pb2.ServerReflectionRequest,
grpclib.reflection.v1alpha.reflection_pb2.ServerReflectionResponse,
)
streaming.helloworld_pb2.HelloRequest,
streaming.helloworld_pb2.HelloReply,
)
self.UnaryStreamGreeting = grpclib.client.UnaryStreamMethod(
channel,
'/helloworld.Greeter/UnaryStreamGreeting',
streaming.helloworld_pb2.HelloRequest,
streaming.helloworld_pb2.HelloReply,
)
self.StreamUnaryGreeting = grpclib.client.StreamUnaryMethod(
channel,
'/helloworld.Greeter/StreamUnaryGreeting',
streaming.helloworld_pb2.HelloRequest,
streaming.helloworld_pb2.HelloReply,
)
self.StreamStreamGreeting = grpclib.client.StreamStreamMethod(
channel,
'/helloworld.Greeter/StreamStreamGreeting',
streaming.helloworld_pb2.HelloRequest,
streaming.helloworld_pb2.HelloReply,
)
buf.add('')
buf.add('def __init__(self, channel: {}.{}) -> None:'
.format(client.__name__, client.Channel.__name__))
with buf.indent():
for method in service.methods:
name, cardinality, request_type, reply_type = method
full_name = '/{}/{}'.format(service_name, name)
method_cls: type
if cardinality is const.Cardinality.UNARY_UNARY:
method_cls = client.UnaryUnaryMethod
elif cardinality is const.Cardinality.UNARY_STREAM:
method_cls = client.UnaryStreamMethod
elif cardinality is const.Cardinality.STREAM_UNARY:
method_cls = client.StreamUnaryMethod
elif cardinality is const.Cardinality.STREAM_STREAM:
method_cls = client.StreamStreamMethod
else:
raise TypeError(cardinality)
method_cls = cast(type, method_cls) # FIXME: redundant
buf.add('self.{} = {}.{}('.format(name, client.__name__,
method_cls.__name__))
with buf.indent():
buf.add('channel,')
buf.add('{!r},'.format(full_name))
buf.add('{},', request_type)
buf.add('{},', reply_type)
buf.add(')')
return buf.content()
def __init__(self, channel: grpclib.client.Channel) -> None:
self.Connect = grpclib.client.UnaryUnaryMethod(
channel,
'/remote.Remoting/Connect',
protoactor.remote.protos_remote_pb2.ConnectRequest,
protoactor.remote.protos_remote_pb2.ConnectResponse,
)
self.Receive = grpclib.client.StreamStreamMethod(
channel,
'/remote.Remoting/Receive',
protoactor.remote.protos_remote_pb2.MessageBatch,
protoactor.remote.protos_remote_pb2.Unit,
)