Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# generates 2 ** N - 1 timeseries with constant increasing values
n = 2 ** N - 1
targets = np.arange(n * train_length).reshape((n, train_length))
return ListDataset(
[
{"start": "2012-01-01", "target": targets[i, :]}
for i in range(n)
],
freq="D",
)
ds = make_dataset(1, 20)
trans = transform.Chain(
trans=[
transform.AddAgeFeature(
target_field=FieldName.TARGET,
output_field="age",
pred_length=10,
),
ForkingSequenceSplitter(
train_sampler=TSplitSampler(),
time_series_fields=["age"],
enc_len=5,
dec_len=3,
),
]
)
out = trans(iter(ds), is_train=True)
def test_BucketInstanceSampler():
N = 6
train_length = 2
pred_length = 1
ds = make_dataset(N, train_length)
dataset_stats = calculate_dataset_statistics(ds)
t = transform.Chain(
trans=[
transform.InstanceSplitter(
target_field=FieldName.TARGET,
is_pad_field=FieldName.IS_PAD,
start_field=FieldName.START,
forecast_start_field=FieldName.FORECAST_START,
train_sampler=transform.BucketInstanceSampler(
dataset_stats.scale_histogram
),
past_length=train_length,
future_length=pred_length,
pick_incomplete=True,
)
]
)
def test_Transformation():
train_length = 100
ds = gluonts.dataset.common.ListDataset(
[{"start": "2012-01-01", "target": [0.2] * train_length}], freq="1D"
)
pred_length = 10
t = transform.Chain(
trans=[
transform.AddTimeFeatures(
start_field=FieldName.START,
target_field=FieldName.TARGET,
output_field="time_feat",
time_features=[
time_feature.DayOfWeek(),
time_feature.DayOfMonth(),
time_feature.MonthOfYear(),
],
pred_length=pred_length,
),
transform.AddAgeFeature(
target_field=FieldName.TARGET,
output_field="age",
pred_length=pred_length,
def create_transformation(
self, bin_edges: np.ndarray, pred_length: int
) -> transform.Transformation:
return Chain(
[
AsNumpyArray(field=FieldName.TARGET, expected_ndim=1),
AddObservedValuesIndicator(
target_field=FieldName.TARGET,
output_field=FieldName.OBSERVED_VALUES,
),
AddTimeFeatures(
start_field=FieldName.START,
target_field=FieldName.TARGET,
output_field=FieldName.FEAT_TIME,
time_features=time_features_from_frequency_str(self.freq),
pred_length=self.prediction_length,
),
AddAgeFeature(
target_field=FieldName.TARGET,
output_field=FieldName.FEAT_AGE,
def create_transformation(self) -> Transformation:
remove_field_names = [FieldName.FEAT_DYNAMIC_CAT]
if not self.use_feat_static_real:
remove_field_names.append(FieldName.FEAT_STATIC_REAL)
if not self.use_feat_dynamic_real:
remove_field_names.append(FieldName.FEAT_DYNAMIC_REAL)
return Chain(
[RemoveFields(field_names=remove_field_names)]
+ (
[SetField(output_field=FieldName.FEAT_STATIC_CAT, value=[0.0])]
if not self.use_feat_static_cat
else []
)
+ (
[
SetField(
output_field=FieldName.FEAT_STATIC_REAL, value=[0.0]
)
]
if not self.use_feat_static_real
else []
)
+ [
def create_transformation(self) -> Transformation:
return Chain(
[
InstanceSplitter(
target_field=FieldName.TARGET,
is_pad_field=FieldName.IS_PAD,
start_field=FieldName.START,
forecast_start_field=FieldName.FORECAST_START,
train_sampler=ExpectedNumInstanceSampler(num_instances=1),
past_length=self.context_length,
future_length=self.prediction_length,
time_series_fields=[], # [FieldName.FEAT_DYNAMIC_REAL]
)
def create_transformation(self) -> Transformation:
remove_field_names = [
FieldName.FEAT_DYNAMIC_CAT,
FieldName.FEAT_STATIC_REAL,
]
if not self.use_feat_dynamic_real:
remove_field_names.append(FieldName.FEAT_DYNAMIC_REAL)
return Chain(
[RemoveFields(field_names=remove_field_names)]
+ (
[SetField(output_field=FieldName.FEAT_STATIC_CAT, value=[0.0])]
if not self.use_feat_static_cat
else []
)
+ [
AsNumpyArray(field=FieldName.FEAT_STATIC_CAT, expected_ndim=1),
AsNumpyArray(field=FieldName.TARGET, expected_ndim=1),
# gives target the (1, T) layout
ExpandDimArray(field=FieldName.TARGET, axis=0),
AddObservedValuesIndicator(
target_field=FieldName.TARGET,
output_field=FieldName.OBSERVED_VALUES,
),
# Unnormalized seasonal features
def create_transformation(self) -> transform.Transformation:
return transform.Chain(
trans=[
transform.AsNumpyArray(
field=FieldName.TARGET, expected_ndim=1
),
transform.AddTimeFeatures(
start_field=FieldName.START,
target_field=FieldName.TARGET,
output_field=FieldName.FEAT_TIME,
time_features=time_features_from_frequency_str(self.freq),
pred_length=self.prediction_length,
),
transform.VstackFeatures(
output_field=FieldName.FEAT_DYNAMIC_REAL,
input_fields=[FieldName.FEAT_TIME],
),
transform.SetFieldIfNotPresent(