Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def add_CoreServiceServicer_to_server(servicer, server):
rpc_method_handlers = {
'GetFeastCoreVersion': grpc.unary_unary_rpc_method_handler(
servicer.GetFeastCoreVersion,
request_deserializer=feast_dot_core_dot_CoreService__pb2.GetFeastCoreVersionRequest.FromString,
response_serializer=feast_dot_core_dot_CoreService__pb2.GetFeastCoreVersionResponse.SerializeToString,
),
'GetFeatureSet': grpc.unary_unary_rpc_method_handler(
servicer.GetFeatureSet,
request_deserializer=feast_dot_core_dot_CoreService__pb2.GetFeatureSetRequest.FromString,
response_serializer=feast_dot_core_dot_CoreService__pb2.GetFeatureSetResponse.SerializeToString,
),
'ListFeatureSets': grpc.unary_unary_rpc_method_handler(
servicer.ListFeatureSets,
request_deserializer=feast_dot_core_dot_CoreService__pb2.ListFeatureSetsRequest.FromString,
response_serializer=feast_dot_core_dot_CoreService__pb2.ListFeatureSetsResponse.SerializeToString,
),
'ListStores': grpc.unary_unary_rpc_method_handler(
servicer.ListStores,
request_deserializer=feast_dot_core_dot_CoreService__pb2.ListStoresRequest.FromString,
response_serializer=feast_dot_core_dot_CoreService__pb2.ListStoresResponse.SerializeToString,
),
'ApplyFeatureSet': grpc.unary_unary_rpc_method_handler(
servicer.ApplyFeatureSet,
request_deserializer=feast_dot_core_dot_CoreService__pb2.ApplyFeatureSetRequest.FromString,
response_serializer=feast_dot_core_dot_CoreService__pb2.ApplyFeatureSetResponse.SerializeToString,
),
'UpdateStore': grpc.unary_unary_rpc_method_handler(
servicer.UpdateStore,
request_deserializer=feast_dot_core_dot_CoreService__pb2.UpdateStoreRequest.FromString,
response_serializer=feast_dot_core_dot_CoreService__pb2.UpdateStoreResponse.SerializeToString,
else:
project = "*"
if name is None:
name = "*"
if version is None:
version = "*"
filter = ListFeatureSetsRequest.Filter(
project=project, feature_set_name=name, feature_set_version=version
)
# Get latest feature sets from Feast Core
feature_set_protos = self._core_service_stub.ListFeatureSets(
ListFeatureSetsRequest(filter=filter)
) # type: ListFeatureSetsResponse
# Extract feature sets and return
feature_sets = []
for feature_set_proto in feature_set_protos.feature_sets:
feature_set = FeatureSet.from_proto(feature_set_proto)
feature_set._client = self
feature_sets.append(feature_set)
return feature_sets
"""
self._connect_core()
if project is None:
if self.project is not None:
project = self.project
else:
project = "*"
if name is None:
name = "*"
if version is None:
version = "*"
filter = ListFeatureSetsRequest.Filter(
project=project, feature_set_name=name, feature_set_version=version
)
# Get latest feature sets from Feast Core
feature_set_protos = self._core_service_stub.ListFeatureSets(
ListFeatureSetsRequest(filter=filter)
) # type: ListFeatureSetsResponse
# Extract feature sets and return
feature_sets = []
for feature_set_proto in feature_set_protos.feature_sets:
feature_set = FeatureSet.from_proto(feature_set_proto)
feature_set._client = self
feature_sets.append(feature_set)
return feature_sets
Args:
channel: A grpc.Channel.
"""
self.GetFeastCoreVersion = channel.unary_unary(
'/feast.core.CoreService/GetFeastCoreVersion',
request_serializer=feast_dot_core_dot_CoreService__pb2.GetFeastCoreVersionRequest.SerializeToString,
response_deserializer=feast_dot_core_dot_CoreService__pb2.GetFeastCoreVersionResponse.FromString,
)
self.GetFeatureSet = channel.unary_unary(
'/feast.core.CoreService/GetFeatureSet',
request_serializer=feast_dot_core_dot_CoreService__pb2.GetFeatureSetRequest.SerializeToString,
response_deserializer=feast_dot_core_dot_CoreService__pb2.GetFeatureSetResponse.FromString,
)
self.ListFeatureSets = channel.unary_unary(
'/feast.core.CoreService/ListFeatureSets',
request_serializer=feast_dot_core_dot_CoreService__pb2.ListFeatureSetsRequest.SerializeToString,
response_deserializer=feast_dot_core_dot_CoreService__pb2.ListFeatureSetsResponse.FromString,
)
self.ListStores = channel.unary_unary(
'/feast.core.CoreService/ListStores',
request_serializer=feast_dot_core_dot_CoreService__pb2.ListStoresRequest.SerializeToString,
response_deserializer=feast_dot_core_dot_CoreService__pb2.ListStoresResponse.FromString,
)
self.ApplyFeatureSet = channel.unary_unary(
'/feast.core.CoreService/ApplyFeatureSet',
request_serializer=feast_dot_core_dot_CoreService__pb2.ApplyFeatureSetRequest.SerializeToString,
response_deserializer=feast_dot_core_dot_CoreService__pb2.ApplyFeatureSetResponse.FromString,
)
self.UpdateStore = channel.unary_unary(
'/feast.core.CoreService/UpdateStore',
request_serializer=feast_dot_core_dot_CoreService__pb2.UpdateStoreRequest.SerializeToString,
response_deserializer=feast_dot_core_dot_CoreService__pb2.UpdateStoreResponse.FromString,