Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Check if the layer is operating in batch mode (X and A have rank 3)
mode = ops.autodetect_mode(A, X)
self.reduce_loss = mode in (ops.modes['M'], ops.modes['B'])
# Get normalized adjacency
if K.is_sparse(A):
I_ = tf.sparse.eye(N, dtype=A.dtype)
A_ = tf.sparse.add(A, I_)
else:
I_ = tf.eye(N, dtype=A.dtype)
A_ = A + I_
fltr = ops.normalize_A(A_)
# Node embeddings
Z = K.dot(X, self.kernel_emb)
Z = ops.filter_dot(fltr, Z)
if self.activation is not None:
Z = self.activation(Z)
# Compute cluster assignment matrix
S = K.dot(X, self.kernel_pool)
S = ops.filter_dot(fltr, S)
S = activations.softmax(S, axis=-1) # softmax applied row-wise
# Link prediction loss
S_gram = ops.matmul_A_BT(S, S)
if K.is_sparse(A):
LP_loss = tf.sparse.add(A, -S_gram) # A/tf.norm(A) - S_gram/tf.norm(S_gram)
else:
LP_loss = A - S_gram
LP_loss = tf.norm(LP_loss, axis=(-1, -2))
if self.reduce_loss:
I_ = tf.sparse.eye(N, dtype=A.dtype)
A_ = tf.sparse.add(A, I_)
else:
I_ = tf.eye(N, dtype=A.dtype)
A_ = A + I_
fltr = ops.normalize_A(A_)
# Node embeddings
Z = K.dot(X, self.kernel_emb)
Z = ops.filter_dot(fltr, Z)
if self.activation is not None:
Z = self.activation(Z)
# Compute cluster assignment matrix
S = K.dot(X, self.kernel_pool)
S = ops.filter_dot(fltr, S)
S = activations.softmax(S, axis=-1) # softmax applied row-wise
# Link prediction loss
S_gram = ops.matmul_A_BT(S, S)
if K.is_sparse(A):
LP_loss = tf.sparse.add(A, -S_gram) # A/tf.norm(A) - S_gram/tf.norm(S_gram)
else:
LP_loss = A - S_gram
LP_loss = tf.norm(LP_loss, axis=(-1, -2))
if self.reduce_loss:
LP_loss = K.mean(LP_loss)
self.add_loss(LP_loss)
# Entropy loss
entr = tf.negative(tf.reduce_sum(tf.multiply(S, K.log(S + K.epsilon())), axis=-1))
entr_loss = K.mean(entr, axis=-1)
bias_constraint=bias_constraint)
else:
# When using shared weights, use the pre-computed ones.
if recurrent_t is None:
raise ValueError('recurrent_k and recurrent_t must be set together.')
if recurrent_t == 0:
kernel_1, kernel_2, bias = self.kernels_in[recurrent_k]
else:
kernel_1, kernel_2, bias = self.kernels_hid[recurrent_k]
features = x[0]
features_skip = x[1]
fltr = x[2]
# Convolution
output = K.dot(features, kernel_1)
output = filter_dot(fltr, output)
# Skip connection
skip = K.dot(features_skip, kernel_2)
skip = Dropout(self.dropout_rate)(skip)
output += skip
if use_bias:
output = K.bias_add(output, bias)
if activation is not None:
output = activations.get(activation)(output)
return output
attn_coef = attn_for_self + attn_for_neighs_T
attn_coef = LeakyReLU(alpha=0.2)(attn_coef)
# Mask values before activation (Vaswani et al., 2017)
mask = -10e9 * (1.0 - A)
attn_coef += mask
# Apply softmax to get attention coefficients
attn_coef = K.softmax(attn_coef)
output_attn.append(attn_coef)
# Apply dropout to attention coefficients
attn_coef_drop = Dropout(self.dropout_rate)(attn_coef)
# Convolution
features = filter_dot(attn_coef_drop, features)
if self.use_bias:
features = K.bias_add(features, self.biases[head])
# Add output of attention head to final output
outputs.append(features)
# Aggregate the heads' output according to the reduction method
if self.concat_heads:
output = K.concatenate(outputs)
else:
output = K.mean(K.stack(outputs), axis=0)
output = self.activation(output)
if self.return_attn_coef:
return output, output_attn
def call(self, inputs):
features = inputs[0]
fltr = inputs[1]
# Convolution
output = K.dot(features, self.kernel)
output = filter_dot(fltr, output)
if self.use_bias:
output = K.bias_add(output, self.bias)
if self.activation is not None:
output = self.activation(output)
return output
def call(self, inputs):
features = inputs[0]
fltr_list = inputs[1:]
# Convolution
supports = list()
for fltr in fltr_list:
s = filter_dot(fltr, features)
supports.append(s)
supports = K.concatenate(supports, axis=-1)
output = K.dot(supports, self.kernel)
if self.use_bias:
output = K.bias_add(output, self.bias)
if self.activation is not None:
output = self.activation(output)
return output
features = Dropout(self.dropout_rate)(features)
features = K.dot(features, self.kernels_mlp[i])
if self.use_bias:
features += self.biases_mlp[i]
if self.mlp_activation is not None:
features = self.mlp_activation(features)
# Compute MLP output
mlp_out = K.dot(features, self.kernel_out)
if self.use_bias:
mlp_out += self.bias_out
# Propagation
Z = mlp_out
for k in range(self.propagations):
Z = (1 - self.alpha) * filter_dot(fltr, Z) + self.alpha * mlp_out
if self.activation is not None:
output = self.activation(Z)
else:
output = Z
return output