Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
past_is_pad = np.array([[0] * len(target)]).transpose()
past_observed_target = np.array(
[[1] * len(target), [1] * len(target)]
).transpose()
ds = gluonts.dataset.common.ListDataset(
# Mimic output from InstanceSplitter
data_iter=[
{
"start": "2012-01-01",
"target": multi_dim_target,
"past_target": multi_dim_target,
"future_target": multi_dim_target,
"past_is_pad": past_is_pad,
f"past_{FieldName.OBSERVED_VALUES}": past_observed_target,
}
],
freq="1D",
one_dim_target=False,
)
return ds
def create_transformation(self) -> Transformation:
return Chain(
[
InstanceSplitter(
target_field=FieldName.TARGET,
is_pad_field=FieldName.IS_PAD,
start_field=FieldName.START,
forecast_start_field=FieldName.FORECAST_START,
train_sampler=ExpectedNumInstanceSampler(num_instances=1),
past_length=self.context_length,
future_length=self.prediction_length,
time_series_fields=[], # [FieldName.FEAT_DYNAMIC_REAL]
)
assert num_feat_static_real is not None
assert observed_feat_static_real is not None
assert_data_error(
num_feat_static_real == len(feat_static_real),
"Not all feat_static_real vectors have the same length {} != {}.",
num_feat_static_real,
len(feat_static_real),
)
for i, c in enumerate(feat_static_real):
observed_feat_static_real[i].add(c)
# FEAT_DYNAMIC_CAT
feat_dynamic_cat = (
ts[FieldName.FEAT_DYNAMIC_CAT]
if FieldName.FEAT_DYNAMIC_CAT in ts
else None
)
if feat_dynamic_cat is None:
# feat_dynamic_cat not found, check it was the first ts we encounter or
# that feat_dynamic_cat were seen before
assert_data_error(
num_feat_dynamic_cat is None or num_feat_dynamic_cat == 0,
"feat_dynamic_cat was found for some instances but not others.",
)
num_feat_dynamic_cat = 0
else:
if num_feat_dynamic_cat is None:
# first num_feat_dynamic_cat found
num_feat_dynamic_cat = feat_dynamic_cat.shape[0]
else:
def trim_ts_item_front(x: DataEntry, length: int) -> DataEntry:
"""Trim a TimeSeriesItem into a training range, by removing
the first offset_front time points from the target and dynamic
features."""
assert length <= len(x[FieldName.TARGET])
y = dict(
item_id=x[FieldName.ITEM_ID],
start=x[FieldName.START] + length * x[FieldName.START].freq,
target=x[FieldName.TARGET][length:],
)
if FieldName.FEAT_DYNAMIC_CAT in x:
y[FieldName.FEAT_DYNAMIC_CAT] = x[FieldName.FEAT_DYNAMIC_CAT][
:, length:
]
if FieldName.FEAT_DYNAMIC_REAL in x:
y[FieldName.FEAT_DYNAMIC_REAL] = x[FieldName.FEAT_DYNAMIC_REAL][
:, length:
]
return y
def to_ts(data: DataEntry) -> pd.Series:
return pd.Series(
data[FieldName.TARGET],
index=pd.date_range(
start=data[FieldName.START],
periods=len(data[FieldName.TARGET]),
freq=data[FieldName.START].freq,
),
def __init__(
self,
past_interval_length: float,
future_interval_length: float,
train_sampler: ContinuousTimePointSampler,
target_field: str = FieldName.TARGET,
start_field: str = FieldName.START,
end_field: str = "end",
forecast_start_field: str = FieldName.FORECAST_START,
) -> None:
assert (
future_interval_length > 0
), "Prediction interval must have length greater than 0."
self.train_sampler = train_sampler
self.past_interval_length = past_interval_length
self.future_interval_length = future_interval_length
self.target_field = target_field
self.start_field = start_field
self.end_field = end_field
self.forecast_start_field = forecast_start_field
FieldName.FEAT_STATIC_REAL,
]
if not self.use_feat_dynamic_real:
remove_field_names.append(FieldName.FEAT_DYNAMIC_REAL)
return Chain(
[RemoveFields(field_names=remove_field_names)]
+ (
[SetField(output_field=FieldName.FEAT_STATIC_CAT, value=[0.0])]
if not self.use_feat_static_cat
else []
)
+ [
AsNumpyArray(field=FieldName.FEAT_STATIC_CAT, expected_ndim=1),
AsNumpyArray(
field=FieldName.TARGET,
# in the following line, we add 1 for the time dimension
expected_ndim=1 + len(self.distr_output.event_shape),
),
AddObservedValuesIndicator(
target_field=FieldName.TARGET,
output_field=FieldName.OBSERVED_VALUES,
),
AddTimeFeatures(
start_field=FieldName.START,
target_field=FieldName.TARGET,
output_field=FieldName.FEAT_TIME,
time_features=self.time_features,
pred_length=self.prediction_length,
),
AddAgeFeature(
target_field=FieldName.TARGET,
num_missing: int,
) -> None:
csv_writer = csv.writer(csv_file)
# convert to right date where MON == 0, ..., SUN == 6
week_dict = {
0: "MON",
1: "TUE",
2: "WED",
3: "THU",
4: "FRI",
5: "SAT",
6: "SUN",
}
for i in range(len(time_series)):
data = time_series[i]
timestamp = pd.Timestamp(data[FieldName.START])
freq_week_start = freq
if freq_week_start == "W":
freq_week_start = f"W-{week_dict[timestamp.weekday()]}"
timestamp = pd.Timestamp(data[FieldName.START], freq=freq_week_start)
item_id = int(data[FieldName.ITEM_ID])
for j, target in enumerate(data[FieldName.TARGET]):
# Using convention that there are no missing values before the start date
if is_missing and j != 0 and j % num_missing == 0:
timestamp += 1
continue # Skip every 4th entry
else:
timestamp_row = timestamp
if freq in ["W", "D", "M"]:
timestamp_row = timestamp.date()
row = [item_id, timestamp_row, target]
# Check if related time series is present
def create_transformation(self) -> Transformation:
remove_field_names = [FieldName.FEAT_DYNAMIC_CAT]
if not self.use_feat_static_real:
remove_field_names.append(FieldName.FEAT_STATIC_REAL)
if not self.use_feat_dynamic_real:
remove_field_names.append(FieldName.FEAT_DYNAMIC_REAL)
return Chain(
[RemoveFields(field_names=remove_field_names)]
+ (
[SetField(output_field=FieldName.FEAT_STATIC_CAT, value=[0.0])]
if not self.use_feat_static_cat
else []
)
+ (
[
SetField(
output_field=FieldName.FEAT_STATIC_REAL, value=[0.0]