Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_AddTimeFeatures(start, target, is_train: bool):
pred_length = 13
t = transform.AddTimeFeatures(
start_field=FieldName.START,
target_field=FieldName.TARGET,
output_field="myout",
pred_length=pred_length,
time_features=[time_feature.DayOfWeek(), time_feature.DayOfMonth()],
)
assert_serializable(t)
data = {"start": start, "target": target}
res = t.map_transform(data, is_train=is_train)
mat = res["myout"]
expected_length = len(target) + (0 if is_train else pred_length)
assert mat.shape == (2, expected_length)
tmp_idx = pd.date_range(
start=start, freq=start.freq, periods=expected_length
)
def create_transformation(self):
# Model specific input transform
# Here we use a transformation that randomly
# selects training samples from all series.
return InstanceSplitter(
target_field=FieldName.TARGET,
is_pad_field=FieldName.IS_PAD,
start_field=FieldName.START,
forecast_start_field=FieldName.FORECAST_START,
train_sampler=ExpectedNumInstanceSampler(num_instances=1),
past_length=self.context_length,
future_length=self.prediction_length,
)
pred_length=self.prediction_length,
),
transform.VstackFeatures(
output_field=FieldName.FEAT_DYNAMIC_REAL,
input_fields=[FieldName.FEAT_TIME],
),
transform.SetFieldIfNotPresent(
field=FieldName.FEAT_STATIC_CAT, value=[0.0]
),
transform.AsNumpyArray(
field=FieldName.FEAT_STATIC_CAT, expected_ndim=1
),
transform.InstanceSplitter(
target_field=FieldName.TARGET,
is_pad_field=FieldName.IS_PAD,
start_field=FieldName.START,
forecast_start_field=FieldName.FORECAST_START,
train_sampler=ExpectedNumInstanceSampler(num_instances=1),
past_length=self.context_length,
future_length=self.prediction_length,
time_series_fields=[FieldName.FEAT_DYNAMIC_REAL],
),
def create_transformation(
self, bin_edges: np.ndarray, pred_length: int
) -> transform.Transformation:
return Chain(
[
AsNumpyArray(field=FieldName.TARGET, expected_ndim=1),
AddObservedValuesIndicator(
target_field=FieldName.TARGET,
output_field=FieldName.OBSERVED_VALUES,
),
AddTimeFeatures(
start_field=FieldName.START,
target_field=FieldName.TARGET,
output_field=FieldName.FEAT_TIME,
time_features=time_features_from_frequency_str(self.freq),
pred_length=self.prediction_length,
),
AddAgeFeature(
target_field=FieldName.TARGET,
output_field=FieldName.FEAT_AGE,
pred_length=self.prediction_length,
),
VstackFeatures(
output_field=FieldName.FEAT_TIME,
input_fields=[FieldName.FEAT_TIME, FieldName.FEAT_AGE],
),
SetFieldIfNotPresent(
field=FieldName.FEAT_STATIC_CAT, value=[0.0]
target_field=FieldName.TARGET,
output_field=FieldName.FEAT_AGE,
pred_length=self.prediction_length,
),
VstackFeatures(
output_field=FieldName.FEAT_TIME,
input_fields=[FieldName.FEAT_TIME, FieldName.FEAT_AGE],
),
SetFieldIfNotPresent(
field=FieldName.FEAT_STATIC_CAT, value=[0.0]
),
AsNumpyArray(field=FieldName.FEAT_STATIC_CAT, expected_ndim=1),
InstanceSplitter(
target_field=FieldName.TARGET,
is_pad_field=FieldName.IS_PAD,
start_field=FieldName.START,
forecast_start_field=FieldName.FORECAST_START,
train_sampler=ExpectedNumInstanceSampler(num_instances=1),
past_length=self.context_length,
future_length=pred_length,
output_NTC=False,
time_series_fields=[
FieldName.FEAT_TIME,
FieldName.OBSERVED_VALUES,
],
),
QuantizeScaled(
bin_edges=bin_edges.tolist(),
future_target="future_target",
past_target="past_target",
),
def __init__(
self,
past_interval_length: float,
future_interval_length: float,
train_sampler: ContinuousTimePointSampler,
target_field: str = FieldName.TARGET,
start_field: str = FieldName.START,
end_field: str = "end",
forecast_start_field: str = FieldName.FORECAST_START,
) -> None:
assert (
future_interval_length > 0
), "Prediction interval must have length greater than 0."
self.train_sampler = train_sampler
self.past_interval_length = past_interval_length
self.future_interval_length = future_interval_length
self.target_field = target_field
self.start_field = start_field
self.end_field = end_field
self.forecast_start_field = forecast_start_field
AsNumpyArray(field=FieldName.TARGET, expected_ndim=1),
AddTimeFeatures(
start_field=FieldName.START,
target_field=FieldName.TARGET,
output_field=FieldName.FEAT_TIME,
time_features=time_features_from_frequency_str(self.freq),
pred_length=self.prediction_length,
),
SetFieldIfNotPresent(
field=FieldName.FEAT_STATIC_CAT, value=[0.0]
),
AsNumpyArray(field=FieldName.FEAT_STATIC_CAT, expected_ndim=1),
transform.InstanceSplitter(
target_field=FieldName.TARGET,
is_pad_field=FieldName.IS_PAD,
start_field=FieldName.START,
forecast_start_field=FieldName.FORECAST_START,
train_sampler=TestSplitSampler(),
time_series_fields=[FieldName.FEAT_TIME],
past_length=self.context_length,
future_length=self.prediction_length,
),
def create_transformation(self) -> Transformation:
return Chain(
[
AsNumpyArray(field=FieldName.TARGET, expected_ndim=1),
AddTimeFeatures(
start_field=FieldName.START,
target_field=FieldName.TARGET,
output_field=FieldName.FEAT_TIME,
time_features=self.time_features,
pred_length=self.prediction_length,
),
SetFieldIfNotPresent(
field=FieldName.FEAT_STATIC_CAT, value=[0.0]
),
AsNumpyArray(field=FieldName.FEAT_STATIC_CAT, expected_ndim=1),
CanonicalInstanceSplitter(
target_field=FieldName.TARGET,
is_pad_field=FieldName.IS_PAD,
start_field=FieldName.START,
forecast_start_field=FieldName.FORECAST_START,
instance_sampler=TestSplitSampler(),
time_series_fields=[FieldName.FEAT_TIME],
def create_transformation(self) -> Transformation:
return Chain(
trans=[
AsNumpyArray(field=FieldName.TARGET, expected_ndim=1),
AddTimeFeatures(
start_field=FieldName.START,
target_field=FieldName.TARGET,
output_field=FieldName.FEAT_TIME,
time_features=time_features_from_frequency_str(self.freq),
pred_length=self.prediction_length,
),
SetFieldIfNotPresent(
field=FieldName.FEAT_STATIC_CAT, value=[0.0]
),
AsNumpyArray(field=FieldName.FEAT_STATIC_CAT, expected_ndim=1),
InstanceSplitter(
target_field=FieldName.TARGET,
is_pad_field=FieldName.IS_PAD,
start_field=FieldName.START,
forecast_start_field=FieldName.FORECAST_START,
train_sampler=TestSplitSampler(),
time_series_fields=[FieldName.FEAT_TIME],
week_dict = {
0: "MON",
1: "TUE",
2: "WED",
3: "THU",
4: "FRI",
5: "SAT",
6: "SUN",
}
for i in range(len(time_series)):
data = time_series[i]
timestamp = pd.Timestamp(data[FieldName.START])
freq_week_start = freq
if freq_week_start == "W":
freq_week_start = f"W-{week_dict[timestamp.weekday()]}"
timestamp = pd.Timestamp(data[FieldName.START], freq=freq_week_start)
item_id = int(data[FieldName.ITEM_ID])
for j, target in enumerate(data[FieldName.TARGET]):
# Using convention that there are no missing values before the start date
if is_missing and j != 0 and j % num_missing == 0:
timestamp += 1
continue # Skip every 4th entry
else:
timestamp_row = timestamp
if freq in ["W", "D", "M"]:
timestamp_row = timestamp.date()
row = [item_id, timestamp_row, target]
# Check if related time series is present
if FieldName.FEAT_DYNAMIC_REAL in data.keys():
for feat_dynamic_real in data[FieldName.FEAT_DYNAMIC_REAL]:
row.append(feat_dynamic_real[j])
csv_writer.writerow(row)