Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def emission_coeff(self, seasonal_indicators: Tensor) -> Tensor:
F = getF(seasonal_indicators)
return F.one_hot(seasonal_indicators, depth=self.latent_dim())
def s(alpha: Tensor) -> Tensor:
F = getF(alpha)
samples_gamma = F.sample_gamma(
alpha=alpha, beta=F.ones_like(alpha), dtype=dtype
)
sum_gamma = F.sum(samples_gamma, axis=-1, keepdims=True)
samples_s = F.broadcast_div(samples_gamma, sum_gamma)
return samples_s
The distribution is obtained by unrolling the network with the true
target, this is also the distribution that is being minimized during
training. This can be used in anomaly detection, see for instance
examples/anomaly_detection.py.
Input arguments are the same as for the hybrid_forward method.
Returns
-------
Distribution
a distribution object whose mean has shape:
(batch_size, context_length + prediction_length).
"""
# unroll the decoder in "training mode"
# i.e. by providing future data as well
F = getF(feat_static_cat)
rnn_outputs, _, scale, _ = self.unroll_encoder(
F=F,
feat_static_cat=feat_static_cat,
feat_static_real=feat_static_real,
past_time_feat=past_time_feat,
past_target=past_target,
past_observed_values=past_observed_values,
future_time_feat=future_time_feat,
future_target=future_target,
)
distr_args = self.proj_distr_args(rnn_outputs)
return self.distr_output.distribution(distr_args, scale=scale)
F=None,
) -> None:
"""
Parameters
----------
amplitude : Tensor
Periodic kernel amplitude hyper-parameter of shape (batch_size, 1, 1).
length_scale : Tensor
Periodic kernel length scale hyper-parameter of of shape (batch_size, 1, 1).
frequency : Tensor
Periodic kernel hyper-parameter of shape (batch_size, 1, 1).
F : ModuleType
A module that can either refer to the Symbol API or the NDArray
API in MXNet.
"""
self.F = F if F else getF(amplitude)
self.amplitude = amplitude
self.length_scale = length_scale
self.frequency = frequency
def s(mu: Tensor, D: Tensor, W: Tensor) -> Tensor:
F = getF(mu)
samples_D = F.sample_normal(
mu=F.zeros_like(mu), sigma=F.ones_like(mu), dtype=dtype
)
cov_D = D.sqrt() * samples_D
# dummy only use to get the shape (..., rank, 1)
dummy_tensor = F.linalg_gemm2(
W, mu.expand_dims(axis=-1), transpose_a=True
).squeeze(axis=-1)
samples_W = F.sample_normal(
mu=F.zeros_like(dummy_tensor),
sigma=F.ones_like(dummy_tensor),
dtype=dtype,
)
prediction_length is None or prediction_length > 0
), "The value of `prediction_length` should be > 0"
assert (
context_length is None or context_length > 0
), "The value of `context_length` should be > 0"
assert (
num_samples is None or num_samples > 0
), "The value of `num_samples` should be > 0"
self.sigma = sigma
self.kernel = kernel
self.prediction_length = prediction_length
self.context_length = (
context_length if context_length is not None else prediction_length
)
self.num_samples = num_samples
self.F = F if F else getF(sigma)
self.ctx = ctx
self.float_type = float_type
self.jitter_method = jitter_method
self.max_iter_jitter = max_iter_jitter
self.neg_tol = neg_tol
self.diag_weight = diag_weight
self.increase_jitter = increase_jitter
self.sample_noise = sample_noise
def __init__(
self, dim: int, rank: int, mu: Tensor, D: Tensor, W: Tensor
) -> None:
self.dim = dim
self.rank = rank
self.mu = mu
self.D = D
self.W = W
self.F = getF(mu)
self.Cov = None
def crps(self, y: Tensor) -> Tensor:
# TODO: use event_shape
F = getF(y)
for t in self.transforms[::-1]:
assert isinstance(
t, AffineTransformation
), "Not an AffineTransformation"
assert (
t.scale is not None and t.loc is None
), "Not a scaling transformation"
scale = t.scale
x = t.f_inv(y)
# (..., 1)
p = self.base_distribution.crps(x)
return F.broadcast_mul(p, scale)
def emission_coeff(
self, seasonal_indicators: Tensor # (batch_size, time_length)
) -> Tensor:
F = getF(seasonal_indicators)
_emission_coeff = F.ones(shape=(1, 1, 1, self.latent_dim()))
# get the right shape: (batch_size, seq_length, obs_dim, latent_dim)
zeros = _broadcast_param(
F.zeros_like(
seasonal_indicators.slice_axis(
axis=-1, begin=0, end=1
).squeeze(axis=-1)
),
axes=[2, 3],
sizes=[1, self.latent_dim()],
)
return _emission_coeff.broadcast_like(zeros)
def get_issm_coeff(
self, seasonal_indicators: Tensor # (batch_size, time_length)
) -> Tuple[Tensor, Tensor, Tensor]:
F = getF(seasonal_indicators)
emission_coeff_ls, transition_coeff_ls, innovation_coeff_ls = zip(
self.nonseasonal_issm.get_issm_coeff(seasonal_indicators),
*[
issm.get_issm_coeff(
seasonal_indicators.slice_axis(
axis=-1, begin=ix, end=ix + 1
)
)
for ix, issm in enumerate(self.seasonal_issms)
],
)
# stack emission and innovation coefficients
emission_coeff = F.concat(*emission_coeff_ls, dim=-1)
innovation_coeff = F.concat(*innovation_coeff_ls, dim=-1)