Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
client_worker_pb2.TensorLike,
common_pb2.VoidResp,
),
'/ClientMaster.ClientWorker/Update': grpclib.const.Handler(
self.Update,
grpclib.const.Cardinality.UNARY_UNARY,
client_worker_pb2.TensorLike,
common_pb2.VoidResp,
),
'/ClientMaster.ClientWorker/Sync': grpclib.const.Handler(
self.Sync,
grpclib.const.Cardinality.UNARY_UNARY,
client_worker_pb2.SyncRequest,
common_pb2.VoidResp,
),
'/ClientMaster.ClientWorker/CompleteTask': grpclib.const.Handler(
self.CompleteTask,
grpclib.const.Cardinality.UNARY_UNARY,
common_pb2.CompleteTaskReq,
common_pb2.VoidResp,
),
def __mapping__(self):
return {
'/helloworld.Greeter/UnaryUnaryGreeting': grpclib.const.Handler(
self.UnaryUnaryGreeting,
grpclib.const.Cardinality.UNARY_UNARY,
helloworld.helloworld_pb2.HelloRequest,
helloworld.helloworld_pb2.HelloReply,
),
'/helloworld.Greeter/UnaryStreamGreeting': grpclib.const.Handler(
self.UnaryStreamGreeting,
grpclib.const.Cardinality.UNARY_STREAM,
helloworld.helloworld_pb2.HelloRequest,
helloworld.helloworld_pb2.HelloReply,
),
'/helloworld.Greeter/StreamUnaryGreeting': grpclib.const.Handler(
self.StreamUnaryGreeting,
grpclib.const.Cardinality.STREAM_UNARY,
helloworld.helloworld_pb2.HelloRequest,
helloworld.helloworld_pb2.HelloReply,
),
'/helloworld.Greeter/StreamStreamGreeting': grpclib.const.Handler(
self.StreamStreamGreeting,
grpclib.const.Cardinality.STREAM_STREAM,
helloworld.helloworld_pb2.HelloRequest,
helloworld.helloworld_pb2.HelloReply,
),
def __mapping__(self):
return {
'/DotaOptimizer/Rollout': grpclib.const.Handler(
self.Rollout,
grpclib.const.Cardinality.UNARY_UNARY,
protos.DotaOptimizer_pb2.RolloutData,
protos.DotaOptimizer_pb2.Empty2,
),
'/DotaOptimizer/GetWeights': grpclib.const.Handler(
self.GetWeights,
grpclib.const.Cardinality.UNARY_UNARY,
protos.DotaOptimizer_pb2.WeightQuery,
protos.DotaOptimizer_pb2.Weights,
),
def __mapping__(self) -> typing.Dict[str, grpclib.const.Handler]:
return {
'/helloworld.Greeter/UnaryUnaryGreeting': grpclib.const.Handler(
self.UnaryUnaryGreeting,
grpclib.const.Cardinality.UNARY_UNARY,
streaming.helloworld_pb2.HelloRequest,
streaming.helloworld_pb2.HelloReply,
),
'/helloworld.Greeter/UnaryStreamGreeting': grpclib.const.Handler(
self.UnaryStreamGreeting,
grpclib.const.Cardinality.UNARY_STREAM,
streaming.helloworld_pb2.HelloRequest,
streaming.helloworld_pb2.HelloReply,
),
'/helloworld.Greeter/StreamUnaryGreeting': grpclib.const.Handler(
self.StreamUnaryGreeting,
grpclib.const.Cardinality.STREAM_UNARY,
streaming.helloworld_pb2.HelloRequest,
streaming.helloworld_pb2.HelloReply,
),
'/helloworld.Greeter/StreamStreamGreeting': grpclib.const.Handler(
self.StreamStreamGreeting,
grpclib.const.Cardinality.STREAM_STREAM,
streaming.helloworld_pb2.HelloRequest,
streaming.helloworld_pb2.HelloReply,
def __mapping__(self) -> typing.Dict[str, grpclib.const.Handler]:
return {
'/grpc.health.v1.Health/Check': grpclib.const.Handler(
self.Check,
grpclib.const.Cardinality.UNARY_UNARY,
grpclib.health.v1.health_pb2.HealthCheckRequest,
grpclib.health.v1.health_pb2.HealthCheckResponse,
),
'/grpc.health.v1.Health/Watch': grpclib.const.Handler(
self.Watch,
grpclib.const.Cardinality.UNARY_STREAM,
grpclib.health.v1.health_pb2.HealthCheckRequest,
grpclib.health.v1.health_pb2.HealthCheckResponse,
),
def __mapping__(self) -> typing.Dict[str, grpclib.const.Handler]:
return {
'/admin.Admin/Append': grpclib.const.Handler(
self.Append,
grpclib.const.Cardinality.UNARY_UNARY,
pymap.admin.grpc.admin_pb2.AppendRequest,
pymap.admin.grpc.admin_pb2.AppendResponse,
),
def __mapping__(self):
return {
'/DotaOptimizer/Rollout': grpclib.const.Handler(
self.Rollout,
grpclib.const.Cardinality.UNARY_UNARY,
protos.DotaOptimizer_pb2.RolloutData,
protos.DotaOptimizer_pb2.Empty2,
),
'/DotaOptimizer/GetWeights': grpclib.const.Handler(
self.GetWeights,
grpclib.const.Cardinality.UNARY_UNARY,
protos.DotaOptimizer_pb2.WeightQuery,
protos.DotaOptimizer_pb2.Weights,
),