Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
parse_dates=["event_timestamp"],
)
for i, row in feature_values.iterrows():
feature_row = FeatureRow()
feature_row.entityKey = row["id"]
feature_row.entityName = entity_name
timestamp = Timestamp()
timestamp.FromJsonString(row["event_timestamp"].strftime("%Y-%m-%dT%H:%M:%SZ"))
feature_row.eventTimestamp.CopyFrom(timestamp)
for info in feature_infos:
feature = Feature()
feature.id = info["id"]
feature_value = Value()
feature_name = info["name"]
if info["dtype"] is "Int64":
feature_value.int64Val = row[feature_name]
elif info["dtype"] is "Int32":
feature_value.int32Val = row[feature_name]
elif info["dtype"] is np.float64:
feature_value.doubleVal = row[feature_name]
else:
raise RuntimeError(
f"Unsupported dtype: {info['dtype']}\n"
"Supported valueType: INT32, INT64, FLOAT, DOUBLE\n"
"Please update your feature specs in testdata/feature_specs folder"
)
feature.value.CopyFrom(feature_value)
feature_row.features.extend([feature])
def test_all_types_register_feature_set_success(client):
all_types_fs_expected = FeatureSet(
name="all_types",
entities=[Entity(name="user_id", dtype=ValueType.INT64)],
features=[
Feature(name="float_feature", dtype=ValueType.FLOAT),
Feature(name="int64_feature", dtype=ValueType.INT64),
Feature(name="int32_feature", dtype=ValueType.INT32),
Feature(name="string_feature", dtype=ValueType.STRING),
Feature(name="bytes_feature", dtype=ValueType.BYTES),
Feature(name="bool_feature", dtype=ValueType.BOOL),
Feature(name="double_feature", dtype=ValueType.DOUBLE),
Feature(name="double_list_feature", dtype=ValueType.DOUBLE_LIST),
Feature(name="float_list_feature", dtype=ValueType.FLOAT_LIST),
Feature(name="int64_list_feature", dtype=ValueType.INT64_LIST),
Feature(name="int32_list_feature", dtype=ValueType.INT32_LIST),
Feature(name="string_list_feature",
dtype=ValueType.STRING_LIST),
historical_fs = FeatureSet(
"historical",
features=[Feature("feature_value5", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(historical_fs)
fs1 = FeatureSet(
"feature_set_1",
features=[Feature("feature_value6", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
fs2 = FeatureSet(
"feature_set_2",
features=[Feature("other_feature_value7", ValueType.INT64)],
entities=[Entity("other_entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(fs1)
client.apply(fs2)
"file_feature_set",
features=[Feature("feature_value1", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(file_fs1)
gcs_fs1 = FeatureSet(
"gcs_feature_set",
features=[Feature("feature_value2", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(gcs_fs1)
proc_time_fs = FeatureSet(
"processing_time",
features=[Feature("feature_value3", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(proc_time_fs)
add_cols_fs = FeatureSet(
"additional_columns",
features=[Feature("feature_value4", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(add_cols_fs)
historical_fs = FeatureSet(
"gcs_feature_set",
features=[Feature("feature_value2", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(gcs_fs1)
proc_time_fs = FeatureSet(
"processing_time",
features=[Feature("feature_value3", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(proc_time_fs)
add_cols_fs = FeatureSet(
"additional_columns",
features=[Feature("feature_value4", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(add_cols_fs)
historical_fs = FeatureSet(
"historical",
features=[Feature("feature_value5", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(historical_fs)
fs1 = FeatureSet(
def test_basic_register_feature_set_success(client):
# Load feature set from file
cust_trans_fs_expected = FeatureSet.from_yaml("basic/cust_trans_fs.yaml")
client.set_project(PROJECT_NAME)
# Register feature set
client.apply(cust_trans_fs_expected)
cust_trans_fs_actual = client.get_feature_set(name="customer_transactions")
assert cust_trans_fs_actual == cust_trans_fs_expected
if cust_trans_fs_actual is None:
raise Exception(
"Client cannot retrieve 'customer_transactions' FeatureSet "
"after registration. Either Feast Core does not save the "
"additional_columns",
features=[Feature("feature_value4", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(add_cols_fs)
historical_fs = FeatureSet(
"historical",
features=[Feature("feature_value5", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(historical_fs)
fs1 = FeatureSet(
"feature_set_1",
features=[Feature("feature_value6", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
fs2 = FeatureSet(
"feature_set_2",
features=[Feature("other_feature_value7", ValueType.INT64)],
entities=[Entity("other_entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(fs1)
client.apply(fs2)
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(gcs_fs1)
proc_time_fs = FeatureSet(
"processing_time",
features=[Feature("feature_value3", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(proc_time_fs)
add_cols_fs = FeatureSet(
"additional_columns",
features=[Feature("feature_value4", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(add_cols_fs)
historical_fs = FeatureSet(
"historical",
features=[Feature("feature_value5", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(historical_fs)
fs1 = FeatureSet(
"feature_set_1",
features=[Feature("feature_value6", ValueType.STRING)],
features=[Feature("feature_value5", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(historical_fs)
fs1 = FeatureSet(
"feature_set_1",
features=[Feature("feature_value6", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
fs2 = FeatureSet(
"feature_set_2",
features=[Feature("other_feature_value7", ValueType.INT64)],
entities=[Entity("other_entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(fs1)
client.apply(fs2)
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(file_fs1)
gcs_fs1 = FeatureSet(
"gcs_feature_set",
features=[Feature("feature_value2", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(gcs_fs1)
proc_time_fs = FeatureSet(
"processing_time",
features=[Feature("feature_value3", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(proc_time_fs)
add_cols_fs = FeatureSet(
"additional_columns",
features=[Feature("feature_value4", ValueType.STRING)],
entities=[Entity("entity_id", ValueType.INT64)],
max_age=Duration(seconds=100),
)
client.apply(add_cols_fs)
historical_fs = FeatureSet(
"historical",
features=[Feature("feature_value5", ValueType.STRING)],