Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_em_algorithm_multi_dimensional_observations(self):
training_matrix = np.ones((5,10,2))
training_matrix[1,1,1] = np.nan
kf = simdkalman.KalmanFilter(
state_transition = np.eye(2),
process_noise = 0.1,
observation_model = np.array([[1,1], [0,1]]),
observation_noise = 0.1)
r = kf.em(training_matrix, n_iter=5, verbose=False)
self.assertSequenceEqual(r.process_noise.shape, (5,2,2))
A0 = r.process_noise[0,...]
self.assertMatrixEqual(A0, A0.T, epsilon=1e-8)
self.assertTrue(min(np.linalg.eig(A0)[0]) > 0)
B = r.observation_noise
self.assertSequenceEqual(B.shape, (5,2,2))
self.assertTrue(min(np.linalg.eig(B[0,...])[0]) > 0)
def test_one_dimensional(self):
training_matrix = range(10)
kf = simdkalman.KalmanFilter(
state_transition = np.eye(2),
process_noise = 0.1,
observation_model = np.array([[1,1]]),
observation_noise = 0.1)
r = kf.compute(
training_matrix,
n_test = 4,
initial_value = [0,0],
initial_covariance = 1.0,
smoothed = True,
gains = True,
log_likelihood = True)
self.assertSequenceEqual(r.predicted.observations.mean.shape, (4,))
self.assertSequenceEqual(r.smoothed.observations.mean.shape, (10,))
def test_em_algorithm(self):
training_matrix = np.ones((5,10))
kf = simdkalman.KalmanFilter(
state_transition = np.eye(2),
process_noise = 0.1,
observation_model = np.array([[1,1]]),
observation_noise = 0.1)
r = kf.em(training_matrix, n_iter=5, verbose=False)
self.assertSequenceEqual(r.process_noise.shape, (5,2,2))
A0 = r.process_noise[0,...]
self.assertMatrixEqual(A0, A0.T)
self.assertTrue(min(np.linalg.eig(A0)[0]) > 0)
B = r.observation_noise
self.assertSequenceEqual(B.shape, (5,1,1))
self.assertTrue(min(list(B)) > 0)
def test_smooth_helper_kalman_filter_2_states(self):
training_matrix = np.ones((5,10))
training_matrix[1,1] = np.nan
kf = simdkalman.KalmanFilter(
state_transition = np.eye(2),
process_noise = 0.1,
observation_model = np.array([[1,1]]),
observation_noise = 0.1)
r = kf.smooth(training_matrix)
self.assertSequenceEqual(r.observations.mean.shape, training_matrix.shape)
self.assertSequenceEqual(r.states.mean.shape, (5,10,2))
self.assertSequenceEqual(r.states.cov.shape, (5,10,2,2))
def test_train_and_predict_vectorized_kalman_filter_ema(self):
training_matrix = np.ones((5,10))
kf = simdkalman.KalmanFilter(
state_transition = 1,
process_noise = 0.1,
observation_model = 1,
observation_noise = 0.1)
r = kf.compute(
training_matrix,
n_test = 4,
initial_covariance = 1.0)
self.assertSequenceEqual(r.predicted.observations.mean.shape, (5,4))
self.assertSequenceEqual(r.smoothed.observations.mean.shape, training_matrix.shape)
self.assertSequenceEqual(r.predicted.states.mean.shape, (5,4,1))
self.assertSequenceEqual(r.smoothed.states.mean.shape, (5,10,1))
self.assertSequenceEqual(r.predicted.states.cov.shape, (5,4,1,1))
self.assertSequenceEqual(r.smoothed.states.cov.shape, (5,10,1,1))
def test_em_algorithm_3_dimensional_observations(self):
training_matrix = np.ones((5,10,3))
training_matrix[1,1,1] = np.nan
kf = simdkalman.KalmanFilter(
state_transition = np.eye(2),
process_noise = 0.1,
observation_model = np.array([[1,1], [0,1], [1,0]]),
observation_noise = 0.1)
r = kf.em(training_matrix, n_iter=5, verbose=False)
self.assertSequenceEqual(r.process_noise.shape, (5,2,2))
A0 = r.process_noise[0,...]
self.assertMatrixEqual(A0, A0.T, epsilon=1e-8)
self.assertTrue(min(np.linalg.eig(A0)[0]) > 0)
B = r.observation_noise
self.assertSequenceEqual(B.shape, (5,3,3))
self.assertTrue(min(np.linalg.eig(B[0,...])[0]) > 0)
def test_predict_helper_ema(self):
training_matrix = np.ones((5,10))
kf = simdkalman.KalmanFilter(
state_transition = 1,
process_noise = 0.1,
observation_model = 1,
observation_noise = 0.1)
r = kf.predict(training_matrix, n_test = 4)
self.assertSequenceEqual(r.observations.mean.shape, (5,4))
self.assertSequenceEqual(r.states.mean.shape, (5,4,1))
self.assertSequenceEqual(r.states.cov.shape, (5,4,1,1))
def test_vectorized(self):
mean = np.zeros((3,2,1))
mean[0,...] = np.array([[[1],[10]]]) # state 1
mean[1,...] = np.array([[[2],[20]]]) # state 2
mean[2,...] = np.array([[[3],[30]]]) # state 3
stack_mats = lambda arr: np.vstack([a[np.newaxis,...] for a in arr])
covariance = stack_mats([np.eye(2)]*3)
state_transition = stack_mats([np.eye(2)]*3)
process_noise = stack_mats([np.eye(2)]*3)*0.1
m1, P1 = primitives.predict(
mean,
covariance,
state_transition,
process_noise)
self.assertMatrixEqual(m1, mean)
self.assertSequenceEqual(P1.shape, (3,2,2))
observation_model = stack_mats([np.ones((1,2))]*3)
observation_noise = stack_mats([np.eye(1)*0.1]*3)
measurement = np.array([[[2]], [[np.nan]], [[33]]])
m, P = primitives.update_with_nan_check(
m1,
P1,
def test_predict(self):
mean = np.array([[1],[2],[3]])
covariance = np.eye(3)*2
state_transition = np.eye(3)*0.5
process_noise = np.eye(3)*0.1
m1, P1 = primitives.predict(mean, covariance, state_transition, process_noise)
self.assertSequenceEqual(m1.shape, (3,1))
self.assertSequenceEqual(P1.shape, (3,3))
# should be diagonal
self.assertTrue(np.linalg.norm(P1 - np.diag(np.diag(P1))) < 1e-6)
def test_one_dimensional(self):
mean = np.array([[1]])
covariance = np.array([[1]])
state_transition = np.array([[2]])
process_noise = np.array([[0.1]])
m1, P1 = primitives.predict(
mean,
covariance,
state_transition,
process_noise)
self.assertMatrixEqual(m1, mean*2, epsilon=1e-6)
self.assertSequenceEqual(P1.shape, (1,1))
observation_model = np.array([[1]])
observation_noise = np.array([[0.2]])
measurement = np.array([[1]])
m, P = primitives.update_with_nan_check(
m1,
P1,