Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from gluonts.distribution import Uniform
QUANTILES = np.arange(1, 100) / 100
SAMPLES = np.arange(101).reshape(101, 1) / 100
START_DATE = pd.Timestamp(2017, 1, 1, 12)
FREQ = "1D"
FORECASTS = {
"QuantileForecast": QuantileForecast(
forecast_arrays=QUANTILES.reshape(-1, 1),
start_date=START_DATE,
forecast_keys=np.array(QUANTILES, str),
freq=FREQ,
),
"SampleForecast": SampleForecast(
samples=SAMPLES, start_date=START_DATE, freq=FREQ
),
"DistributionForecast": DistributionForecast(
distribution=Uniform(low=mx.nd.zeros(1), high=mx.nd.ones(1)),
start_date=START_DATE,
freq=FREQ,
),
}
@pytest.mark.parametrize("name", FORECASTS.keys())
def test_Forecast(name):
forecast = FORECASTS[name]
def percentile(value):
return f"p{int(round(value * 100)):02d}"
def fcst_iterator(fcst, start_dates, freq):
"""
:param fcst: list of numpy arrays with the sample paths
:return:
"""
for i in range(len(fcst)):
yield SampleForecast(
samples=fcst[i], start_date=start_dates[i], freq=freq
)
def predict_item(self, item: DataEntry) -> SampleForecast:
if self.context_length is not None:
target = item["target"][-self.context_length :]
else:
target = item["target"]
mean = np.nanmean(target)
std = np.nanstd(target)
normal = np.random.standard_normal(self.shape)
start_date = frequency_add(item["start"], len(item["target"]))
return SampleForecast(
samples=std * normal + mean,
start_date=start_date,
freq=self.freq,
item_id=item.get(FieldName.ITEM_ID),
)
def predict_item(self, item: DataEntry) -> Forecast:
prediction = item["target"][-self.prediction_length :]
samples = np.broadcast_to(
array=np.expand_dims(prediction, 0),
shape=(self.num_samples, self.prediction_length),
)
return SampleForecast(
samples=samples,
start_date=item["start"],
freq=self.freq,
item_id=item.get(FieldName.ITEM_ID),
)
forecast_start = (
pd.Timestamp(data["start"], freq=self.freq)
+ data["target"].shape[0]
)
samples = np.array(forecast_dict["samples"])
expected_shape = (params["num_samples"], self.prediction_length)
assert (
samples.shape == expected_shape
), f"Expected shape {expected_shape} but found {samples.shape}"
info = (
{"console_output": "\n".join(console_output)}
if save_info
else None
)
yield SampleForecast(
samples, forecast_start, forecast_start.freqstr, info=info
)
def predict(
self, dataset: Dataset, num_samples: int = 100, **kwargs
) -> Iterator[SampleForecast]:
params = self.prophet_params.copy()
params.update(uncertainty_samples=num_samples)
for entry in dataset:
data = self._make_prophet_data_entry(entry)
forecast_samples = self._run_prophet(data, params)
yield SampleForecast(
samples=forecast_samples,
start_date=data.forecast_start,
freq=self.freq,
)
len_ts >= 1
), "all time series should have at least one data point"
if len_ts >= self.season_length:
indices = [
len_ts - self.season_length + k % self.season_length
for k in range(self.prediction_length)
]
samples = target[indices].reshape((1, self.prediction_length))
else:
samples = np.full(
shape=(1, self.prediction_length), fill_value=target.mean()
)
forecast_time = start_time + len_ts * start_time.freq
return SampleForecast(samples, forecast_time, start_time.freqstr)