Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def log_feature_row_messages(bootstrap_servers, topic):
consumer = KafkaConsumer(topic, bootstrap_servers=bootstrap_servers)
for record in consumer:
feature_row = FeatureRow()
feature_row.ParseFromString(record.value)
print(feature_row)
def produce_feature_rows(
entity_name, feature_infos, feature_values_filepath, bootstrap_servers, topic
):
producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
feature_values = pd.read_csv(
feature_values_filepath,
names=["id", "event_timestamp"] + [f["name"] for f in feature_infos],
dtype=dict(
[("id", np.string_)] + [(f["name"], f["dtype"]) for f in feature_infos]
),
parse_dates=["event_timestamp"],
)
for i, row in feature_values.iterrows():
feature_row = FeatureRow()
feature_row.entityKey = row["id"]
feature_row.entityName = entity_name
timestamp = Timestamp()
timestamp.FromJsonString(row["event_timestamp"].strftime("%Y-%m-%dT%H:%M:%SZ"))
feature_row.eventTimestamp.CopyFrom(timestamp)
for info in feature_infos:
feature = Feature()
feature.id = info["id"]
feature_value = Value()
feature_name = info["name"]
if info["dtype"] is "Int64":
feature_value.int64Val = row[feature_name]
elif info["dtype"] is "Int32":
feature_value.int32Val = row[feature_name]
for field_name, field in fs.fields.items()
}
feature_set = f"{fs.project}/{fs.name}:{fs.version}"
# List to store result
feature_rows = []
# Loop optimization declaration(s)
field = FieldProto.Field
proto_items = proto_columns.items()
append = feature_rows.append
# Iterate through the rows
for row_idx in range(table.num_rows):
feature_row = FeatureRow(
event_timestamp=datetime_col[row_idx], feature_set=feature_set
)
# Loop optimization declaration
ext = feature_row.fields.extend
# Insert field from each column
for k, v in proto_items:
ext([field(name=k, value=v[row_idx])])
# Append FeatureRow in byte string form
append(feature_row.SerializeToString())
return feature_rows
def convert_dict_to_proto_values(
row: dict, df_datetime_dtype: pd.DataFrame.dtypes, feature_set
) -> FeatureRowProto.FeatureRow:
"""
Encode a dictionary describing a feature row into a FeatureRows object.
Args:
row: Dictionary describing a feature row.
df_datetime_dtype: Pandas dtype of datetime column.
feature_set: Feature set describing feature row.
Returns:
FeatureRow
"""
feature_row = FeatureRowProto.FeatureRow(
event_timestamp=_pd_datetime_to_timestamp_proto(
df_datetime_dtype, row[DATETIME_COLUMN]
),
feature_set=feature_set.project
+ "/"
+ feature_set.name
+ ":"
+ str(feature_set.version),
)
for field_name, field in feature_set.fields.items():
feature_row.fields.extend(
[
FieldProto.Field(
name=field.name,
value=_python_value_to_proto_value(field.dtype, row[field.name]),
def convert_series_to_proto_values(row: pd.Series):
"""
Converts a Pandas Series to a Feast FeatureRow
Args:
row: pd.Series The row that should be converted
Returns:
Feast FeatureRow
"""
feature_row = FeatureRowProto.FeatureRow(
event_timestamp=_pd_datetime_to_timestamp_proto(
dataframe[DATETIME_COLUMN].dtype, row[DATETIME_COLUMN]
),
feature_set=feature_set.name + ":" + str(feature_set.version),
)
for field_name, field in feature_set.fields.items():
feature_row.fields.extend(
[
FieldProto.Field(
name=field.name,
value=_python_value_to_proto_value(
field.dtype, row[field.name]
),
)
]