Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def create_transformation(self):
# Model specific input transform
# Here we use a transformation that randomly
# selects training samples from all series.
return InstanceSplitter(
target_field=FieldName.TARGET,
is_pad_field=FieldName.IS_PAD,
start_field=FieldName.START,
forecast_start_field=FieldName.FORECAST_START,
train_sampler=ExpectedNumInstanceSampler(num_instances=1),
past_length=self.context_length,
future_length=self.prediction_length,
)
def test_BucketInstanceSampler():
N = 6
train_length = 2
pred_length = 1
ds = make_dataset(N, train_length)
dataset_stats = calculate_dataset_statistics(ds)
t = transform.Chain(
trans=[
transform.InstanceSplitter(
target_field=FieldName.TARGET,
is_pad_field=FieldName.IS_PAD,
start_field=FieldName.START,
forecast_start_field=FieldName.FORECAST_START,
train_sampler=transform.BucketInstanceSampler(
dataset_stats.scale_histogram
),
past_length=train_length,
future_length=pred_length,
pick_incomplete=True,
)
]
)
assert_serializable(t)
scale_hist = ScaleHistogram()
repetition = 200
for i in range(repetition):
output_field=FieldName.FEAT_TIME,
input_fields=[FieldName.FEAT_TIME],
),
SetFieldIfNotPresent(
field=FieldName.FEAT_STATIC_CAT, value=[0.0]
),
TargetDimIndicator(
field_name="target_dimension_indicator",
target_field=FieldName.TARGET,
),
AsNumpyArray(field=FieldName.FEAT_STATIC_CAT, expected_ndim=1),
InstanceSplitter(
target_field=FieldName.TARGET,
is_pad_field=FieldName.IS_PAD,
start_field=FieldName.START,
forecast_start_field=FieldName.FORECAST_START,
train_sampler=ExpectedNumInstanceSampler(num_instances=1),
past_length=self.history_length,
future_length=self.prediction_length,
time_series_fields=[
FieldName.FEAT_TIME,
FieldName.OBSERVED_VALUES,
],
pick_incomplete=self.pick_incomplete,
),
use_marginal_transformation(self.use_marginal_transformation),
]
output_field=FieldName.FEAT_AGE,
pred_length=self.prediction_length,
),
VstackFeatures(
output_field=FieldName.FEAT_TIME,
input_fields=[FieldName.FEAT_TIME, FieldName.FEAT_AGE],
),
SetFieldIfNotPresent(
field=FieldName.FEAT_STATIC_CAT, value=[0.0]
),
AsNumpyArray(field=FieldName.FEAT_STATIC_CAT, expected_ndim=1),
InstanceSplitter(
target_field=FieldName.TARGET,
is_pad_field=FieldName.IS_PAD,
start_field=FieldName.START,
forecast_start_field=FieldName.FORECAST_START,
train_sampler=ExpectedNumInstanceSampler(num_instances=1),
past_length=self.context_length,
future_length=pred_length,
output_NTC=False,
time_series_fields=[
FieldName.FEAT_TIME,
FieldName.OBSERVED_VALUES,
],
),
QuantizeScaled(
bin_edges=bin_edges.tolist(),
future_target="future_target",
past_target="past_target",
),
log_scale=True,
),
VstackFeatures(
output_field=FieldName.FEAT_TIME,
input_fields=[FieldName.FEAT_TIME, FieldName.FEAT_AGE]
+ (
[FieldName.FEAT_DYNAMIC_REAL]
if self.use_feat_dynamic_real
else []
),
),
InstanceSplitter(
target_field=FieldName.TARGET,
is_pad_field=FieldName.IS_PAD,
start_field=FieldName.START,
forecast_start_field=FieldName.FORECAST_START,
train_sampler=ExpectedNumInstanceSampler(num_instances=1),
past_length=self.history_length,
future_length=self.prediction_length,
time_series_fields=[
FieldName.FEAT_TIME,
FieldName.OBSERVED_VALUES,
],
dtype=self.dtype,
),
VstackFeatures(
output_field=FieldName.FEAT_TIME,
input_fields=[FieldName.FEAT_TIME, FieldName.FEAT_AGE]
+ (
[FieldName.FEAT_DYNAMIC_REAL]
if self.use_feat_dynamic_real
else []
),
),
InstanceSplitter(
target_field=FieldName.TARGET,
is_pad_field=FieldName.IS_PAD,
start_field=FieldName.START,
forecast_start_field=FieldName.FORECAST_START,
train_sampler=ExpectedNumInstanceSampler(num_instances=1),
past_length=self.history_length,
future_length=self.prediction_length,
time_series_fields=[
FieldName.FEAT_TIME,
FieldName.OBSERVED_VALUES,
],
def __init__(
self,
past_interval_length: float,
future_interval_length: float,
train_sampler: ContinuousTimePointSampler,
target_field: str = FieldName.TARGET,
start_field: str = FieldName.START,
end_field: str = "end",
forecast_start_field: str = FieldName.FORECAST_START,
) -> None:
assert (
future_interval_length > 0
), "Prediction interval must have length greater than 0."
self.train_sampler = train_sampler
self.past_interval_length = past_interval_length
self.future_interval_length = future_interval_length
self.target_field = target_field
self.start_field = start_field
self.end_field = end_field
self.forecast_start_field = forecast_start_field
AddTimeFeatures(
start_field=FieldName.START,
target_field=FieldName.TARGET,
output_field=FieldName.FEAT_TIME,
time_features=self.time_features,
pred_length=self.prediction_length,
),
SetFieldIfNotPresent(
field=FieldName.FEAT_STATIC_CAT, value=[0.0]
),
AsNumpyArray(field=FieldName.FEAT_STATIC_CAT, expected_ndim=1),
CanonicalInstanceSplitter(
target_field=FieldName.TARGET,
is_pad_field=FieldName.IS_PAD,
start_field=FieldName.START,
forecast_start_field=FieldName.FORECAST_START,
instance_sampler=TestSplitSampler(),
time_series_fields=[FieldName.FEAT_TIME],
instance_length=self.context_length,
use_prediction_features=True,
prediction_length=self.prediction_length,
),
def __init__(
self,
past_interval_length: float,
future_interval_length: float,
train_sampler: ContinuousTimePointSampler,
target_field: str = FieldName.TARGET,
start_field: str = FieldName.START,
end_field: str = "end",
forecast_start_field: str = FieldName.FORECAST_START,
) -> None:
assert (
future_interval_length > 0
), "Prediction interval must have length greater than 0."
self.train_sampler = train_sampler
self.past_interval_length = past_interval_length
self.future_interval_length = future_interval_length
self.target_field = target_field
self.start_field = start_field
self.end_field = end_field
self.forecast_start_field = forecast_start_field
),
transform.VstackFeatures(
output_field=FieldName.FEAT_DYNAMIC_REAL,
input_fields=[FieldName.FEAT_TIME],
),
transform.SetFieldIfNotPresent(
field=FieldName.FEAT_STATIC_CAT, value=[0.0]
),
transform.AsNumpyArray(
field=FieldName.FEAT_STATIC_CAT, expected_ndim=1
),
transform.InstanceSplitter(
target_field=FieldName.TARGET,
is_pad_field=FieldName.IS_PAD,
start_field=FieldName.START,
forecast_start_field=FieldName.FORECAST_START,
train_sampler=ExpectedNumInstanceSampler(num_instances=1),
past_length=self.context_length,
future_length=self.prediction_length,
time_series_fields=[FieldName.FEAT_DYNAMIC_REAL],
),