Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_1d(self):
with self.test_context() as session:
lengthscale = 1.4
variance = 2.3
kSE = gpflow.kernels.RBF(1, lengthscales=lengthscale, variance=variance)
kRQ = gpflow.kernels.RationalQuadratic(1, lengthscales=lengthscale, variance=variance, alpha=1e8)
rng = np.random.RandomState(1)
X = tf.placeholder(gpflow.settings.float_type)
X_data = rng.randn(6, 1).astype(gpflow.settings.float_type)
kSE.compile()
kRQ.compile()
gram_matrix_SE = session.run(kSE.K(X), feed_dict={X: X_data})
gram_matrix_RQ = session.run(kRQ.K(X), feed_dict={X: X_data})
np.testing.assert_allclose(gram_matrix_SE, gram_matrix_RQ)
variance=1.0, bias_variance=0., active_dims=None):
gpflow.kernels.Kern.__init__(self, input_dim, active_dims)
if base_kernel is None:
# corresponds to using "Radford" scaled input to hidden weights
base_kernel = gpflow.kernels.Linear(
input_dim=input_dim, variance=variance/input_dim
) + gpflow.kernels.Constant( input_dim = input_dim, variance = bias_variance )
self.num_steps = num_steps
self.base_kernel = base_kernel
self.variance = gpflow.params.Parameter(
variance, gpflow.transforms.positive, dtype=settings.float_type)
self.bias_variance = gpflow.params.Parameter(
bias_variance, gpflow.transforms.positive, dtype=settings.float_type)
def test_naming(self):
with self.test_context():
p1 = gpflow.Param(1.2)
p2 = gpflow.Param(np.array([3.4, 5.6], settings.float_type))
l = gpflow.ParamList([p1, p2])
assert p1.pathname == l.name + '/0'
assert p2.pathname == l.name + '/1'
def student_t(x, mean, scale, df):
df = tf.cast(df, settings.float_type)
const = tf.lgamma((df + 1.) * 0.5) - tf.lgamma(df * 0.5) \
- 0.5 * (tf.log(tf.square(scale)) + tf.log(df) + np.log(np.pi))
const = tf.cast(const, settings.float_type)
return const - 0.5 * (df + 1.) * \
tf.log(1. + (1. / df) * (tf.square((x - mean) / scale)))
def _compute_cache(self):
K = self.kern.K(self.X) + tf.eye(tf.shape(self.X)[0], dtype=settings.float_type) * self.likelihood.variance
L = tf.cholesky(K, name='gp_cholesky')
V = tf.matrix_triangular_solve(L, self.Y - self.mean_function(self.X), name='gp_alpha')
return L, V
This method computes the variational lower bound on the likelihood,
which is:
E_{q(F)} [ \log p(Y|F) ] - KL[ q(F) || p(F)]
with
q(\\mathbf f) = N(\\mathbf f \\,|\\, \\boldsymbol \\mu, \\boldsymbol \\Sigma)
"""
# Get prior KL.
KL = gauss_kl(self.q_mu, self.q_sqrt)
# Get conditionals
K = self.kern.K(self.X) + tf.eye(self.num_data, dtype=settings.float_type) * \
settings.numerics.jitter_level
L = tf.cholesky(K)
fmean = tf.matmul(L, self.q_mu) + self.mean_function(self.X) # NN,ND->ND
q_sqrt_dnn = tf.matrix_band_part(self.q_sqrt, -1, 0) # D x N x N
L_tiled = tf.tile(tf.expand_dims(L, 0), tf.stack([self.num_latent, 1, 1]))
LTA = tf.matmul(L_tiled, q_sqrt_dnn) # D x N x N
fvar = tf.reduce_sum(tf.square(LTA), 2)
fvar = tf.transpose(fvar)
# Get variational expectations.
var_exp = self.likelihood.variational_expectations(fmean, fvar, self.Y)
def log_jacobian_tensor(self, x):
return tf.zeros((1,), settings.float_type)
import tensorflow as tf
from .. import kernels
from .. import settings
from ..decors import params_as_tensors, autoflow
from ..dispatch import dispatch
from ..features import InducingPoints, InducingFeature
from ..features import Kuu, Kuf
from ..kernels import Kernel, Combination
from ..params import Parameter
from .features import SeparateIndependentMof, SharedIndependentMof, MixedKernelSharedMof
float_type = settings.float_type
# TODO MultiOutputKernels have a different method signature for K and Kdiag (they take full_cov_output)
# this needs better documentation - especially as the default there is *True* not False as for full_cov
# TODO what are the dimensions of MultiOutputKernel.K, Kuu(), Kuf()?
# do we need MultiOutputInducingPoints as a special case ?
# or can we write it in terms of the regular InducingPoints?
class Mok(Kernel):
"""
Multi Output Kernel class.
Subclasses of Mok should implement K which returns:
- N x P x N x P if full_cov_output = True
- N x N x P if full_cov_output = False
diag_symm = True
slices = list((slice(j, j+n_max), slice(i, i+n_max))
for j in range(0, N, n_max)
for i in range(j, N2, n_max))
else:
diag_symm = False
slices = list((slice(j, j+n_max), slice(i, i+n_max))
for j in range(0, N, n_max)
for i in range(0, N2, n_max))
# Make the required kernel ops and placeholders for each GPU
K_ops = []
for i in range(n_gpus):
with tf.device("gpu:{}".format(i)):
X_ph = tf.placeholder(settings.float_type, [None, X.shape[1]], "X_ph")
X2_ph = tf.placeholder(settings.float_type, X_ph.shape, "X2_ph")
K_cross = kern.K(X_ph, X2_ph)
if diag_symm:
K_symm = kern.K(X_ph, None)
else:
K_symm = None
K_ops.append((X_ph, X2_ph, K_cross, K_symm))
# Execute on all GPUs concurrently
out = np.zeros((N, N2), dtype=settings.float_type)
for j in tqdm.trange(0, len(slices), n_gpus):
feed_dict = {}
ops = []
for (X_ph, X2_ph, K_cross, K_symm), (j_s, i_s) in (
zip(K_ops, slices[j:j+n_gpus])):
if j_s == i_s and diag_symm:
feed_dict[X_ph] = X[j_s]
inner_dtype = self.dtype
if dtype is not None and inner_dtype != dtype:
msg = 'Overriding parameter\'s type "{0}" with "{1}" is not possible.'
raise ValueError(msg.format(inner_dtype, dtype))
elif isinstance(value, np.ndarray) and inner_dtype != value.dtype:
msg = 'The value has different data type "{0}". Parameter type is "{1}".'
raise ValueError(msg.format(value.dtype, inner_dtype))
cast = False
dtype = inner_dtype
if misc.is_number(value):
value_type = np.result_type(value).type
num_type = misc.normalize_num_type(value_type)
dtype = num_type if dtype is None else dtype
value = np.array(value, dtype=dtype)
elif misc.is_list(value):
dtype = settings.float_type if dtype is None else dtype
value = np.array(value, dtype=dtype)
elif cast:
value = value.astype(dtype)
if shape is not None and self.fixed_shape and is_built and shape != value.shape:
msg = 'Value has different shape. Parameter shape {0}, value shape {1}.'
raise ValueError(msg.format(shape, value.shape))
return value