Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_all_types_register_feature_set_success(client):
all_types_fs_expected = FeatureSet(
name="all_types",
entities=[Entity(name="user_id", dtype=ValueType.INT64)],
features=[
Feature(name="float_feature", dtype=ValueType.FLOAT),
Feature(name="int64_feature", dtype=ValueType.INT64),
Feature(name="int32_feature", dtype=ValueType.INT32),
Feature(name="string_feature", dtype=ValueType.STRING),
Feature(name="bytes_feature", dtype=ValueType.BYTES),
Feature(name="bool_feature", dtype=ValueType.BOOL),
Feature(name="double_feature", dtype=ValueType.DOUBLE),
Feature(name="double_list_feature", dtype=ValueType.DOUBLE_LIST),
Feature(name="float_list_feature", dtype=ValueType.FLOAT_LIST),
Feature(name="int64_list_feature", dtype=ValueType.INT64_LIST),
Feature(name="int32_list_feature", dtype=ValueType.INT32_LIST),
Feature(name="string_list_feature",
dtype=ValueType.STRING_LIST),
historical_fs = FeatureSet(
"historical",
features=[Feature("feature_value5", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(historical_fs)
fs1 = FeatureSet(
"feature_set_1",
features=[Feature("feature_value6", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
fs2 = FeatureSet(
"feature_set_2",
features=[Feature("other_feature_value7", ValueType.INT64)],
entities=[Entity("other_entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(fs1)
client.apply(fs2)
"file_feature_set",
features=[Feature("feature_value1", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(file_fs1)
gcs_fs1 = FeatureSet(
"gcs_feature_set",
features=[Feature("feature_value2", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(gcs_fs1)
proc_time_fs = FeatureSet(
"processing_time",
features=[Feature("feature_value3", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(proc_time_fs)
add_cols_fs = FeatureSet(
"additional_columns",
features=[Feature("feature_value4", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(add_cols_fs)
historical_fs = FeatureSet(
"gcs_feature_set",
features=[Feature("feature_value2", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(gcs_fs1)
proc_time_fs = FeatureSet(
"processing_time",
features=[Feature("feature_value3", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(proc_time_fs)
add_cols_fs = FeatureSet(
"additional_columns",
features=[Feature("feature_value4", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(add_cols_fs)
historical_fs = FeatureSet(
"historical",
features=[Feature("feature_value5", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(historical_fs)
fs1 = FeatureSet(
def test_basic_register_feature_set_success(client):
# Load feature set from file
cust_trans_fs_expected = FeatureSet.from_yaml("basic/cust_trans_fs.yaml")
client.set_project(PROJECT_NAME)
# Register feature set
client.apply(cust_trans_fs_expected)
cust_trans_fs_actual = client.get_feature_set(name="customer_transactions")
assert cust_trans_fs_actual == cust_trans_fs_expected
if cust_trans_fs_actual is None:
raise Exception(
"Client cannot retrieve 'customer_transactions' FeatureSet "
"after registration. Either Feast Core does not save the "
"additional_columns",
features=[Feature("feature_value4", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(add_cols_fs)
historical_fs = FeatureSet(
"historical",
features=[Feature("feature_value5", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(historical_fs)
fs1 = FeatureSet(
"feature_set_1",
features=[Feature("feature_value6", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
fs2 = FeatureSet(
"feature_set_2",
features=[Feature("other_feature_value7", ValueType.INT64)],
entities=[Entity("other_entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(fs1)
client.apply(fs2)
def apply(self, feature_sets: Union[List[FeatureSet], FeatureSet]):
"""
Idempotently registers feature set(s) with Feast Core. Either a single
feature set or a list can be provided.
Args:
feature_sets: List of feature sets that will be registered
"""
if not isinstance(feature_sets, list):
feature_sets = [feature_sets]
for feature_set in feature_sets:
if isinstance(feature_set, FeatureSet):
self._apply_feature_set(feature_set)
continue
raise ValueError(
f"Could not determine feature set type to apply {feature_set}"
)
def get_resource(kind):
if kind == "feature_set":
return FeatureSet
else:
raise ValueError(kind)
def feature_set_create(name):
"""
Create a feature set
"""
feast_client = Client(
core_url=feast_config.get_config_property_or_fail("core_url")
) # type: Client
feast_client.apply(FeatureSet(name=name))