Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(gcs_fs1)
proc_time_fs = FeatureSet(
"processing_time",
features=[Feature("feature_value3", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(proc_time_fs)
add_cols_fs = FeatureSet(
"additional_columns",
features=[Feature("feature_value4", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(add_cols_fs)
historical_fs = FeatureSet(
"historical",
features=[Feature("feature_value5", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(historical_fs)
fs1 = FeatureSet(
"feature_set_1",
features=[Feature("feature_value6", ValueType.STRING)],
features=[Feature("feature_value5", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(historical_fs)
fs1 = FeatureSet(
"feature_set_1",
features=[Feature("feature_value6", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
fs2 = FeatureSet(
"feature_set_2",
features=[Feature("other_feature_value7", ValueType.INT64)],
entities=[Entity("other_entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(fs1)
client.apply(fs2)
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(file_fs1)
gcs_fs1 = FeatureSet(
"gcs_feature_set",
features=[Feature("feature_value2", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(gcs_fs1)
proc_time_fs = FeatureSet(
"processing_time",
features=[Feature("feature_value3", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(proc_time_fs)
add_cols_fs = FeatureSet(
"additional_columns",
features=[Feature("feature_value4", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(add_cols_fs)
historical_fs = FeatureSet(
"historical",
features=[Feature("feature_value5", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(add_cols_fs)
historical_fs = FeatureSet(
"historical",
features=[Feature("feature_value5", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(historical_fs)
fs1 = FeatureSet(
"feature_set_1",
features=[Feature("feature_value6", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
fs2 = FeatureSet(
"feature_set_2",
features=[Feature("other_feature_value7", ValueType.INT64)],
entities=[Entity("other_entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(fs1)
client.apply(fs2)
def test_apply_all_featuresets(client):
client.set_project(PROJECT_NAME)
file_fs1 = FeatureSet(
"file_feature_set",
features=[Feature("feature_value1", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(file_fs1)
gcs_fs1 = FeatureSet(
"gcs_feature_set",
features=[Feature("feature_value2", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(gcs_fs1)
proc_time_fs = FeatureSet(
"processing_time",
features=[Feature("feature_value3", ValueType.STRING)],
def test_apply_all_featuresets(client):
client.set_project(PROJECT_NAME)
file_fs1 = FeatureSet(
"file_feature_set",
features=[Feature("feature_value1", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(file_fs1)
gcs_fs1 = FeatureSet(
"gcs_feature_set",
features=[Feature("feature_value2", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(gcs_fs1)
proc_time_fs = FeatureSet(
"processing_time",
features=[Feature("feature_value3", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(proc_time_fs)
add_cols_fs = FeatureSet(
"additional_columns",
features=[Feature("feature_value4", ValueType.STRING)],
entities=[Entity(name="user_id", dtype=ValueType.INT64)],
features=[
Feature(name="float_feature", dtype=ValueType.FLOAT),
Feature(name="int64_feature", dtype=ValueType.INT64),
Feature(name="int32_feature", dtype=ValueType.INT32),
Feature(name="string_feature", dtype=ValueType.STRING),
Feature(name="bytes_feature", dtype=ValueType.BYTES),
Feature(name="bool_feature", dtype=ValueType.BOOL),
Feature(name="double_feature", dtype=ValueType.DOUBLE),
Feature(name="double_list_feature", dtype=ValueType.DOUBLE_LIST),
Feature(name="float_list_feature", dtype=ValueType.FLOAT_LIST),
Feature(name="int64_list_feature", dtype=ValueType.INT64_LIST),
Feature(name="int32_list_feature", dtype=ValueType.INT32_LIST),
Feature(name="string_list_feature",
dtype=ValueType.STRING_LIST),
Feature(name="bytes_list_feature", dtype=ValueType.BYTES_LIST),
],
max_age=Duration(seconds=3600),
)
# Register feature set
client.apply(all_types_fs_expected)
# Feast Core needs some time to fully commit the FeatureSet applied
# when there is no existing job yet for the Featureset
time.sleep(15)
all_types_fs_actual = client.get_feature_set(name="all_types")
assert all_types_fs_actual == all_types_fs_expected
if all_types_fs_actual is None:
def features(self) -> List[Feature]:
"""
Returns a list of features from this feature set
"""
return [field for field in self._fields.values() if isinstance(field, Feature)]
def features(self, features: List[Feature]):
"""
Sets the active features within this feature set
Args:
features: List of feature objects
"""
for feature in features:
if not isinstance(feature, Feature):
raise Exception("object type is not a Feature: " + str(type(feature)))
for key in list(self._fields.keys()):
if isinstance(self._fields[key], Feature):
del self._fields[key]
if features is not None:
self._add_fields(features)
# Only overwrite conflicting fields if replacement is allowed
if column in new_fields:
if (
isinstance(self._fields[column], Feature)
and not replace_existing_features
):
continue
if (
isinstance(self._fields[column], Entity)
and not replace_existing_entities
):
continue
# Store this field as a feature
new_fields[column] = Feature(
name=column,
dtype=_infer_pd_column_type(column, df[column], rows_to_sample),
)
output_log += f"{type(new_fields[column]).__name__} {new_fields[column].name} ({new_fields[column].dtype}) added from dataframe.\n"
# Discard unused fields from feature set
if discard_unused_fields:
keys_to_remove = []
for key in new_fields.keys():
if not (key in df.columns or key in provided_fields.keys()):
output_log += f"{type(new_fields[key]).__name__} {new_fields[key].name} ({new_fields[key].dtype}) removed because it is unused.\n"
keys_to_remove.append(key)
for key in keys_to_remove:
del new_fields[key]