Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
return self._log_p
def get_variables(self):
""" Get the theano variables associated with this model.
"""
return {'A' : self.A}
def sample(self, acc):
N = self.model['N']
A = np.random.rand(N, N) < self.rho
A = A.astype(np.int)
return {'A' : A}
class StochasticBlockGraphModel(Component):
def __init__(self, model, latent):
""" Initialize the stochastic block model for the adjacency matrix
"""
self.model = model
self.latent = latent
self.prms = model['network']['graph']
self.N = model['N']
# Get the number of latent types (R) and the latent type vector (Y)
self.type_name = self.prms['types']
self.R = self.latent[self.type_name].R
self.Y = self.latent[self.type_name].Y
# A RxR matrix of connection probabilities per pair of clusters
self.B = T.dmatrix('B')
nT,Ns = data["S"].shape
assert Ns == self.N, "ERROR: Spike train must be (TxN) " \
"dimensional where N=%d" % self.N
fS = convolve_with_basis(data["S"], ibasis)
# Flatten this manually to be safe
# (there's surely a way to do this with numpy)
(nT,Nc,B) = fS.shape
assert Nc == self.N, "ERROR: Convolution with spike train " \
"resulted in incorrect shape: %s" % str(fS.shape)
data['fS'] = fS
def set_data(self, data):
self.ir.set_value(data['fS'])
class DirichletImpulses(Component):
""" Normalized impulse response functions using a Dirichlet prior.
Here we separate out the parameters of each impulse response
so that they may be sampled separately.
"""
def __init__(self, model):
self.model = model
self.imp_model = model['impulse']
# Number of presynaptic neurons
self.N = model['N']
# Get parameters of the prior
self.alpha = self.imp_model['alpha']
# Create a basis for the impulse responses response
self.basis = create_basis(self.imp_model['basis'])
def set_hyperparameters(self, model):
""" Set the hyperparameters of the model
"""
# TODO Fix me
self.mu.set_value(model['mu'])
self.sigma.set_value(model['sigma'])
def sample(self, acc):
""" Sample from the prior
"""
# TODO Use size
v = self.mu.get_value() + self.sigma.get_value() * np.random.randn(self.D)
return {str(self.value): v}
class GroupLasso(Component):
""" Wrapper for a scalar random variable with a Normal distribution
"""
def __init__(self, model, name='gaussian'):
self.prms = model
self.lam = theano.shared(name='lam', value=self.prms['lam'])
self.mu = theano.shared(name='mu', value=self.prms['mu'])
self.sigma = theano.shared(name='sigma', value=self.prms['sigma'])
def log_p(self, value):
""" Compute log prob of the given value under this prior
Value should be NxB where N is the number of groups and
B is the number of parameters per group.
"""
return -1.0*self.lam * T.sum(T.sqrt(T.sum(((value-self.mu)/self.sigma)**2, axis=1)))
type = model['bkgd']['type'].lower()
if type == 'no_stimulus' or \
type == 'none' or \
type == 'nostimulus':
bkgd = NoStimulus(model)
elif type == 'basis':
bkgd = BasisStimulus(model)
elif type == 'spatiotemporal':
bkgd = SpatiotemporalStimulus(model)
elif type == 'sharedtuningcurve':
bkgd = SharedTuningCurveStimulus(model, glm, latent)
else:
raise Exception("Unrecognized backgound model: %s" % type)
return bkgd
class NoStimulus(Component):
""" No stimulus dependence. Constant biases are handled by the
bias component.
"""
def __init__(self, model):
""" No stimulus.
"""
# Log probability
self.log_p = T.constant(0.0)
# Due a theano quirk, I_stim cannot directly be a constant
self.stim = T.constant(0.0)
# Expose outputs to the Glm class
self.I_stim = T.dot(T.constant(1.0), self.stim)
class BasisStimulus(Component):
lp += -np.Inf * T.lt(value, self.min)
lp += -np.Inf * T.gt(value, self.max)
lp += self.p[value]
return lp
def get_variables(self):
return {}
def sample(self, acc, size=(1,)):
""" Sample from the prior
"""
v = np.random.choice(self.p, size=size)
return v
class JointCategorical(Component):
"""
Wrapper for a discrete random variable from a product distribution of
two categorical distributions.
"""
def __init__(self, model):
self.prms = model
self.p0 = theano.shared(name='p0', value=self.prms['p0'])
self.p1 = theano.shared(name='p1', value=self.prms['p1'])
self.min0 = self.prms['min0']
self.max0 = self.prms['max0']
self.min1 = self.prms['min1']
self.max1 = self.prms['max1']
def ravel_index(self, value):
return (self.max1-self.min1+1)*(value[0]-self.min0) + (value[1]-self.min1)
return JointCategorical(model, **kwargs)
elif typ == 'spherical_gaussian':
return SphericalGaussian(model, **kwargs)
elif typ == 'group_lasso' or \
typ == 'grouplasso':
return GroupLasso(model, **kwargs)
elif typ == 'dpp':
return DeterminenalPointProcess(model, **kwargs)
elif typ == 'dirichlet':
return Dirichlet(model)
else:
raise Exception("Unrecognized prior type: %s" % typ)
class Categorical(Component):
""" Wrapper for a discrete random variable from a Categorical distribution
"""
def __init__(self, model):
self.prms = model
self.p = theano.shared(name='p', value=self.prms['p'])
self.min = self.prms['min']
self.max = self.prms['max']
def ravel_index(self,value):
return value - self.min
def log_p(self, value):
""" Compute log prob of the given value under this prior
"""
lp = T.constant(0.)
lp += -np.Inf * T.lt(value, self.min)
B = np.random.beta(self.b0, self.b1, (self.R, self.R))
# We need a sample of Y in order to evaluate pA!
Y = acc['latent'][self.type_name]['Y']
pA = self.pA.eval({self.B: B})
A = np.random.rand(N, N) < pA
A = A.astype(np.int8)
return {str(self.A): A,
str(self.B): B}
def get_state(self):
return {str(self.A): self.A,
str(self.B): self.B}
class LatentDistanceGraphModel(Component):
def __init__(self, model, latent):
""" Initialize the stochastic block model for the adjacency matrix
"""
self.model = model
self.prms = model['network']['graph']
self.N = model['N']
self.N_dims = self.prms['N_dims']
# Get the latent location
self.location = latent[self.prms['locations']]
self.Lm = self.location.Lm
# self.location_prior = create_prior(self.prms['location_prior'])
#
# # Latent distance model has NxR matrix of locations L
# self.L = T.dvector('L')
# self.Lm = T.reshape(self.L, (self.N, self.N_dims))
def set_hyperparameters(self, model):
""" Set the hyperparameters of the model
"""
self.mu.set_value(model['mu'])
self.sigma.set_value(model['sigma'])
def sample(self, acc, size=(1,)):
""" Sample from the prior
"""
v = self.mu.get_value() + self.sigma.get_value() * np.random.randn(*size)
return v
class SphericalGaussian(Component):
""" Wrapper for a vector random variable with a spherical distribution
"""
def __init__(self, model, name='spherical_gaussian', D=1):
self.prms = model
self.D = D
self.value = T.dvector(name=name)
self.mu = theano.shared(name='mu', value=self.prms['mu'])
self.sigma = theano.shared(name='sigma', value=self.prms['sigma'])
self.log_p = -0.5/self.sigma**2 *T.sum((self.value-self.mu)**2)
def get_variables(self):
return {str(self.value): self.value}
def set_hyperparameters(self, model):
""" Set the hyperparameters of the model
"""
graph = LatentDistanceGraphModel(model, latent)
else:
raise Exception("Unrecognized graph model: %s" % type)
return graph
def create_kayak_graph_component(model, latent):
type = model['network']['graph']['type'].lower()
if type == 'complete':
graph = KayakCompleteGraphModel(model)
elif type == 'erdos_renyi' or \
type == 'erdosrenyi':
graph = KayakErdosRenyiGraphModel(model)
return graph
class _GraphModelBase(Component):
@property
def A(self):
raise NotImplementedError()
def get_state(self):
return {'A': self.A}
class TheanoCompleteGraphModel(_GraphModelBase):
def __init__(self, model):
""" Initialize the filtered stim model
"""
self.model = model
N = model['N']
# Define complete adjacency matrix
self._A = T.ones((N, N))
""" Initialize the component with the parameters from the given model,
the vector of symbolic variables, vars, and the offset into that vector, offset.
"""
self._log_p = T.constant(0.)
@property
def log_p(self):
return self._log_p
def nlin(self, x):
return T.log(1.0+T.exp(x))
def f_nlin(self, x):
return np.log(1.0 + np.exp(x))
class KayakExpLinearNonlinearity(Component):
""" Exponential nonlinearity (\lambda=e^x) for x<0,
Linear (\lambda=1+x) for x>0.
This is nice because it satisfies a Lipschitz bound of 1.
"""
def __init__(self, model):
""" Initialize the component with the parameters from the given model,
the vector of symbolic variables, vars, and the offset into that vector, offset.
"""
self._log_p = kyk.Constant(0.)
@property
def log_p(self):
return self._log_p
def nlin(self, x):