How to use the grpclib.reflection.v1.reflection_pb2.ServerReflectionResponse function in grpclib

To help you get started, we’ve selected a few grpclib examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github vmagamedov / grpclib / tests / test_reflection.py View on Github external
file_containing_symbol=(
                DESCRIPTOR.message_types_by_name['DummyRequest'].full_name
            ),
        ),
        ServerReflectionRequest(
            file_containing_symbol='unknown.Symbol',
        ),
    ])

    proto_bytes, = r1.file_descriptor_response.file_descriptor_proto
    dummy_proto = FileDescriptorProto()
    dummy_proto.ParseFromString(proto_bytes)
    assert dummy_proto.name == DESCRIPTOR.name
    assert dummy_proto.package == DESCRIPTOR.package

    assert r2 == ServerReflectionResponse(
        error_response=ErrorResponse(
            error_code=5,
            error_message='not found',
        ),
github vmagamedov / grpclib / grpclib / reflection / service.py View on Github external
def _all_extension_numbers_of_type_response(
        self,
        type_name: str,
    ) -> ServerReflectionResponse:
        try:
            message = self._pool.FindMessageTypeByName(type_name)
            extensions = self._pool.FindAllExtensions(message)
        except KeyError:
            return self._not_found_response()
        else:
            return ServerReflectionResponse(
                all_extension_numbers_response=ExtensionNumberResponse(
                    base_type_name=message.full_name,
                    extension_number=[ext.number for ext in extensions],
                )
github vmagamedov / grpclib / grpclib / reflection / service.py View on Github external
def _not_found_response(self) -> ServerReflectionResponse:
        return ServerReflectionResponse(
            error_response=ErrorResponse(
                error_code=Status.NOT_FOUND.value,
                error_message='not found',
            ),
github vmagamedov / grpclib / grpclib / reflection / v1 / reflection_grpc.py View on Github external
def __mapping__(self) -> typing.Dict[str, grpclib.const.Handler]:
        return {
            '/grpc.reflection.v1.ServerReflection/ServerReflectionInfo': grpclib.const.Handler(
                self.ServerReflectionInfo,
                grpclib.const.Cardinality.STREAM_STREAM,
                grpclib.reflection.v1.reflection_pb2.ServerReflectionRequest,
                grpclib.reflection.v1.reflection_pb2.ServerReflectionResponse,
            ),
github vmagamedov / grpclib / grpclib / reflection / service.py View on Github external
def _file_descriptor_response(
        self,
        file_descriptor: FileDescriptor,
    ) -> ServerReflectionResponse:
        proto = FileDescriptorProto()
        file_descriptor.CopyToProto(proto)  # type: ignore
        return ServerReflectionResponse(
            file_descriptor_response=FileDescriptorResponse(
                file_descriptor_proto=[proto.SerializeToString()],
            ),
github vmagamedov / grpclib / grpclib / reflection / service.py View on Github external
response = self._file_containing_symbol_response(
                    request.file_containing_symbol,
                )
            elif request.HasField('file_containing_extension'):
                response = self._file_containing_extension_response(
                    request.file_containing_extension.containing_type,
                    request.file_containing_extension.extension_number,
                )
            elif request.HasField('all_extension_numbers_of_type'):
                response = self._all_extension_numbers_of_type_response(
                    request.all_extension_numbers_of_type,
                )
            elif request.HasField('list_services'):
                response = self._list_services_response()
            else:
                response = ServerReflectionResponse(
                    error_response=ErrorResponse(
                        error_code=Status.INVALID_ARGUMENT.value,
                        error_message='invalid argument',
                    )
                )
            await stream.send_message(response)
github vmagamedov / grpclib / grpclib / reflection / service.py View on Github external
def _list_services_response(self) -> ServerReflectionResponse:
        return ServerReflectionResponse(
            list_services_response=ListServiceResponse(
                service=[ServiceResponse(name=service_name)
                         for service_name in self._service_names],
            )