Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
[14.0, 15.0, 16.0],
[15.0, 16.0, 17.0],
[16.0, 17.0, 18.0],
[17.0, 18.0, 19.0],
]
)
assert (
np.linalg.norm(future_target - transformed_data["future_target"])
< 1e-5
), "the forking sequence target should be computed correctly."
trans_oob = transform.Chain(
trans=[
transform.AddAgeFeature(
target_field=FieldName.TARGET,
output_field="age",
pred_length=10,
),
ForkingSequenceSplitter(
train_sampler=TSplitSampler(),
time_series_fields=["age"],
enc_len=20,
dec_len=20,
),
]
)
transformed_data_oob = next(iter(trans_oob(iter(ds), is_train=True)))
assert (
np.sum(transformed_data_oob["future_target"]) - np.sum(np.arange(20))
def create_transformation(self):
# Model specific input transform
# Here we use a transformation that randomly
# selects training samples from all series.
return InstanceSplitter(
target_field=FieldName.TARGET,
is_pad_field=FieldName.IS_PAD,
start_field=FieldName.START,
forecast_start_field=FieldName.FORECAST_START,
train_sampler=ExpectedNumInstanceSampler(num_instances=1),
past_length=self.context_length,
future_length=self.prediction_length,
)
def use_marginal_transformation(
marginal_transformation: bool
) -> Transformation:
if marginal_transformation:
return CDFtoGaussianTransform(
target_field=FieldName.TARGET,
observed_values_field=FieldName.OBSERVED_VALUES,
max_context_length=self.conditioning_length,
target_dim=self.target_dim,
)
else:
return RenameFields(
{
f"past_{FieldName.TARGET}": f"past_{FieldName.TARGET}_cdf",
f"future_{FieldName.TARGET}": f"future_{FieldName.TARGET}_cdf",
}
def trim_ts_item_front(x: DataEntry, length: int) -> DataEntry:
"""Trim a TimeSeriesItem into a training range, by removing
the first offset_front time points from the target and dynamic
features."""
assert length <= len(x[FieldName.TARGET])
y = dict(
item_id=x[FieldName.ITEM_ID],
start=x[FieldName.START] + length * x[FieldName.START].freq,
target=x[FieldName.TARGET][length:],
)
if FieldName.FEAT_DYNAMIC_CAT in x:
y[FieldName.FEAT_DYNAMIC_CAT] = x[FieldName.FEAT_DYNAMIC_CAT][
:, length:
]
if FieldName.FEAT_DYNAMIC_REAL in x:
y[FieldName.FEAT_DYNAMIC_REAL] = x[FieldName.FEAT_DYNAMIC_REAL][
:, length:
]
return y
target_field=FieldName.TARGET,
output_field=FieldName.FEAT_AGE,
pred_length=self.prediction_length,
log_scale=True,
),
VstackFeatures(
output_field=FieldName.FEAT_TIME,
input_fields=[FieldName.FEAT_TIME, FieldName.FEAT_AGE]
+ (
[FieldName.FEAT_DYNAMIC_REAL]
if self.use_feat_dynamic_real
else []
),
),
CanonicalInstanceSplitter(
target_field=FieldName.TARGET,
is_pad_field=FieldName.IS_PAD,
start_field=FieldName.START,
forecast_start_field=FieldName.FORECAST_START,
instance_sampler=TestSplitSampler(),
time_series_fields=[
FieldName.FEAT_TIME,
SEASON_INDICATORS_FIELD,
FieldName.OBSERVED_VALUES,
],
allow_target_padding=True,
instance_length=self.past_length,
use_prediction_features=True,
prediction_length=self.prediction_length,
),
else []
)
+ [
AsNumpyArray(field=FieldName.FEAT_STATIC_CAT, expected_ndim=1),
AsNumpyArray(
field=FieldName.TARGET,
# in the following line, we add 1 for the time dimension
expected_ndim=1 + len(self.distr_output.event_shape),
),
AddObservedValuesIndicator(
target_field=FieldName.TARGET,
output_field=FieldName.OBSERVED_VALUES,
),
AddTimeFeatures(
start_field=FieldName.START,
target_field=FieldName.TARGET,
output_field=FieldName.FEAT_TIME,
time_features=self.time_features,
pred_length=self.prediction_length,
),
AddAgeFeature(
target_field=FieldName.TARGET,
output_field=FieldName.FEAT_AGE,
pred_length=self.prediction_length,
log_scale=True,
),
VstackFeatures(
output_field=FieldName.FEAT_TIME,
input_fields=[FieldName.FEAT_TIME, FieldName.FEAT_AGE]
+ (
[FieldName.FEAT_DYNAMIC_REAL]
if self.use_feat_dynamic_real
def create_transformation(
self, bin_edges: np.ndarray, pred_length: int
) -> transform.Transformation:
return Chain(
[
AsNumpyArray(field=FieldName.TARGET, expected_ndim=1),
AddObservedValuesIndicator(
target_field=FieldName.TARGET,
output_field=FieldName.OBSERVED_VALUES,
),
AddTimeFeatures(
start_field=FieldName.START,
target_field=FieldName.TARGET,
output_field=FieldName.FEAT_TIME,
time_features=time_features_from_frequency_str(self.freq),
pred_length=self.prediction_length,
),
AddAgeFeature(
target_field=FieldName.TARGET,
output_field=FieldName.FEAT_AGE,
pred_length=self.prediction_length,
),
VstackFeatures(
output_field=FieldName.FEAT_TIME,
expected_ndim=1,
dtype=self.dtype,
),
AsNumpyArray(
field=FieldName.FEAT_STATIC_REAL,
expected_ndim=1,
dtype=self.dtype,
),
AsNumpyArray(
field=FieldName.TARGET,
# in the following line, we add 1 for the time dimension
expected_ndim=1 + len(self.distr_output.event_shape),
dtype=self.dtype,
),
AddObservedValuesIndicator(
target_field=FieldName.TARGET,
output_field=FieldName.OBSERVED_VALUES,
dtype=self.dtype,
),
AddTimeFeatures(
start_field=FieldName.START,
target_field=FieldName.TARGET,
output_field=FieldName.FEAT_TIME,
time_features=self.time_features,
pred_length=self.prediction_length,
),
AddAgeFeature(
target_field=FieldName.TARGET,
output_field=FieldName.FEAT_AGE,
pred_length=self.prediction_length,
log_scale=True,
dtype=self.dtype,
def __init__(
self,
past_interval_length: float,
future_interval_length: float,
train_sampler: ContinuousTimePointSampler,
target_field: str = FieldName.TARGET,
start_field: str = FieldName.START,
end_field: str = "end",
forecast_start_field: str = FieldName.FORECAST_START,
) -> None:
assert (
future_interval_length > 0
), "Prediction interval must have length greater than 0."
self.train_sampler = train_sampler
self.past_interval_length = past_interval_length
self.future_interval_length = future_interval_length
self.target_field = target_field
self.start_field = start_field
self.end_field = end_field
self.forecast_start_field = forecast_start_field
pred_length=self.prediction_length,
),
VstackFeatures(
output_field=FieldName.FEAT_TIME,
input_fields=[FieldName.FEAT_TIME],
),
SetFieldIfNotPresent(
field=FieldName.FEAT_STATIC_CAT, value=[0.0]
),
TargetDimIndicator(
field_name="target_dimension_indicator",
target_field=FieldName.TARGET,
),
AsNumpyArray(field=FieldName.FEAT_STATIC_CAT, expected_ndim=1),
InstanceSplitter(
target_field=FieldName.TARGET,
is_pad_field=FieldName.IS_PAD,
start_field=FieldName.START,
forecast_start_field=FieldName.FORECAST_START,
train_sampler=ExpectedNumInstanceSampler(num_instances=1),
past_length=self.history_length,
future_length=self.prediction_length,
time_series_fields=[
FieldName.FEAT_TIME,
FieldName.OBSERVED_VALUES,
],
pick_incomplete=self.pick_incomplete,
),
use_marginal_transformation(self.use_marginal_transformation),
]