Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
context_length = (
int(context_length_frac * history_length)
if context_length_frac is not None
else None
)
predictor = NPTSPredictor(
prediction_length=pred_length,
context_length=context_length,
freq=freq,
kernel_type=KernelType.exponential,
feature_scale=feature_scale,
use_seasonal_model=use_seasonal_model,
)
dataset = ListDataset(
[{"start": str(train_ts.index[0]), "target": train_ts.values}],
freq=freq,
)
# validate that the predictor works with targets with NaNs
_test_nans_in_target(predictor, dataset)
forecast = next(predictor.predict(dataset, num_samples=2000))
train_targets = (
train_ts.values
if context_length is None
else train_ts.values[-min(history_length, context_length) :]
)
targets_outside_context = (
None
def test_feat_dynamic_real_bad_size():
params = dict(freq="1D", prediction_length=3, prophet_params={})
dataset = ListDataset(
data_iter=[
{
"start": "2017-01-01",
"target": np.array([1.0, 2.0, 3.0, 4.0]),
"feat_dynamic_real": np.array(
[
[1.0, 2.0, 3.0, 4.0, 5.0, 6.0],
[1.0, 2.0, 3.0, 4.0, 5.0, 6.0],
]
),
}
],
freq=params["freq"],
)
with pytest.raises(AssertionError) as excinfo:
90,
100,
]
).tolist()
np.random.shuffle(target)
multi_dim_target = np.array([target, target]).transpose()
past_is_pad = np.array([[0] * len(target)]).transpose()
past_observed_target = np.array(
[[1] * len(target), [1] * len(target)]
).transpose()
ds = gluonts.dataset.common.ListDataset(
# Mimic output from InstanceSplitter
data_iter=[
{
"start": "2012-01-01",
"target": multi_dim_target,
"past_target": multi_dim_target,
"future_target": multi_dim_target,
"past_is_pad": past_is_pad,
f"past_{FieldName.OBSERVED_VALUES}": past_observed_target,
}
],
freq="1D",
one_dim_target=False,
)
return ds
split_dataset = np.split(
grouped_data[FieldName.TARGET], self.num_test_dates
)
all_entries = list()
for dataset_at_test_date in split_dataset:
grouped_data = dict()
grouped_data[FieldName.TARGET] = np.array(
list(dataset_at_test_date), dtype=np.float32
)
grouped_data = self._restrict_max_dimensionality(grouped_data)
grouped_data[FieldName.START] = self.first_timestamp
grouped_data[FieldName.FEAT_STATIC_CAT] = [0]
all_entries.append(grouped_data)
return ListDataset(
all_entries, freq=self.frequency, one_dim_target=False
)
def generate(self) -> TrainDatasets:
return TrainDatasets(
metadata=self.metadata,
train=ListDataset(self.train, self.freq),
test=ListDataset(self.test, self.freq),
)
def constant_dataset() -> Tuple[DatasetInfo, Dataset, Dataset]:
metadata = MetaData(
freq="1H",
feat_static_cat=[
CategoricalFeatureInfo(
name="feat_static_cat_000", cardinality="10"
)
],
feat_static_real=[BasicFeatureInfo(name="feat_static_real_000")],
)
start_date = "2000-01-01 00:00:00"
train_ds = ListDataset(
data_iter=[
{
FieldName.ITEM_ID: str(i),
FieldName.START: start_date,
FieldName.TARGET: [float(i)] * 24,
FieldName.FEAT_STATIC_CAT: [i],
FieldName.FEAT_STATIC_REAL: [float(i)],
}
for i in range(10)
],
freq=metadata.freq,
)
test_ds = ListDataset(
data_iter=[
{
full_length_data = take_as_list(data_it, self.num_timeseries)
test_data = [
RecipeDataset.trim_ts_item_front(
x, self.trim_length_fun(x, train_length=self.max_train_length)
)
for x in full_length_data
]
train_data = [
RecipeDataset.trim_ts_item_end(x, self.prediction_length)
for x in test_data
]
return TrainDatasets(
metadata=metadata,
train=ListDataset(train_data, metadata.freq),
test=ListDataset(test_data, metadata.freq),
)
def _prepare_train_data(self, dataset: Dataset) -> ListDataset:
logging.info("group training time-series to datasets")
grouped_data = self._transform_target(self._align_data_entry, dataset)
grouped_data = self._restrict_max_dimensionality(grouped_data)
grouped_data[FieldName.START] = self.first_timestamp
grouped_data[FieldName.FEAT_STATIC_CAT] = [0]
return ListDataset(
[grouped_data], freq=self.frequency, one_dim_target=False
)
def invocations() -> Response:
request_data = request.data.decode("utf8").strip()
instances = list(map(json.loads, request_data.splitlines()))
predictions = []
# we have to take this as the initial start-time since the first
# forecast is produced before the loop in predictor.predict
start = time.time()
forecast_iter = predictor.predict(
ListDataset(instances, predictor.freq),
num_samples=configuration.num_samples,
)
for forecast in forecast_iter:
end = time.time()
prediction = forecast.as_json_dict(configuration)
if DEBUG:
prediction["debug"] = {"timing": end - start}
predictions.append(prediction)
start = time.time()
lines = list(map(json.dumps, map(jsonify_floats, predictions)))
return Response("\n".join(lines), mimetype="application/jsonlines")