Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _test(apply_func):
g = generate_graph()
f = g.ndata['f']
# an out place run to get result
g.pull(nodes,
fn.copy_src(src='f', out='m'), fn.sum(msg='m', out='f'), apply_func)
result = g.ndata['f']
# inplace deg bucket
v1 = F.clone(f)
g.ndata['f'] = v1
g.pull(nodes, message_func, reduce_func, apply_func, inplace=True)
r1 = g.ndata['f']
# check result
assert F.allclose(r1, result)
# check inplace
assert F.allclose(v1, r1)
# inplace v2v spmv
v1 = F.clone(f)
g.ndata['f'] = v1
g.pull(nodes, fn.copy_src(src='f', out='m'),
for i in range(nf.num_blocks):
assert len(edge_embed_names[i]) == len(nf.blocks[i].data.keys())
for key in edge_embed_names[i]:
assert key in nf.blocks[i].data.keys()
assert_array_equal(F.asnumpy(nf.blocks[i].data[key]),
F.asnumpy(g.edges[nf.block_parent_eid(i)].data[key]))
nf = create_mini_batch(g, num_layers)
g.ndata['h0'] = F.clone(g.ndata['h'])
node_embed_names = [['h0'], [], []]
nf.copy_from_parent(node_embed_names=node_embed_names,
edge_embed_names=None)
for i in range(num_layers):
nf.block_compute(i, fn.copy_src(src='h%d' % i, out='m'), fn.sum(msg='m', out='t'),
lambda nodes: {'h%d' % (i+1): nodes.data['t'] + 1})
g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
lambda nodes: {'h': nodes.data['t'] + 1})
assert_allclose(F.asnumpy(nf.layers[i + 1].data['h%d' % (i+1)]),
F.asnumpy(
g.nodes[nf.layer_parent_nid(i + 1)].data['h']),
rtol=1e-4, atol=1e-4)
nf.copy_to_parent(node_embed_names=[['h0'], ['h1'], ['h2']])
for i in range(num_layers + 1):
assert_array_equal(F.asnumpy(nf.layers[i].data['h%d' % i]),
F.asnumpy(g.nodes[nf.layer_parent_nid(i)].data['h%d' % i]))
nf = create_mini_batch(g, num_layers)
g.ndata['h0'] = F.clone(g.ndata['h'])
g.ndata['h1'] = F.clone(g.ndata['h'])
g.ndata['h2'] = F.clone(g.ndata['h'])
node_embed_names = [['h0'], ['h1'], ['h2']]
nf.copy_from_parent(node_embed_names=node_embed_names,
nf.register_apply_node_func(
lambda nodes: {'h': nodes.data['t'] + 1}, l)
nf.block_compute(l)
g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
lambda nodes: {'h': nodes.data['t'] + 1})
assert_allclose(F.asnumpy(nf.layers[i + 1].data['h']),
F.asnumpy(
g.nodes[nf.layer_parent_nid(i + 1)].data['h']),
rtol=1e-4, atol=1e-4)
# test the case that we register UDFs in all blocks.
nf = create_node_flow(g, num_layers)
nf.copy_from_parent()
g.ndata['h'] = g.ndata['h1']
nf.layers[0].data['h'] = nf.layers[0].data['h1']
nf.register_message_func(fn.copy_src(src='h', out='m'))
nf.register_reduce_func(fn.sum(msg='m', out='t'))
nf.register_apply_node_func(lambda nodes: {'h': nodes.data['t'] + 1})
for i in range(num_layers):
l = -num_layers + i if use_negative_block_id else i
nf.block_compute(l)
g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
lambda nodes: {'h': nodes.data['t'] + 1})
assert_allclose(F.asnumpy(nf.layers[i + 1].data['h']),
F.asnumpy(
g.nodes[nf.layer_parent_nid(i + 1)].data['h']),
rtol=1e-4, atol=1e-4)
def check_prop_flows(create_node_flow):
num_layers = 2
g = generate_rand_graph(100)
g.ndata['h'] = g.ndata['h1']
nf2 = create_node_flow(g, num_layers)
nf2.copy_from_parent()
# Test the computation on a layer at a time.
for i in range(num_layers):
g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
lambda nodes: {'h': nodes.data['t'] + 1})
# Test the computation on all layers.
nf2.prop_flow(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
lambda nodes: {'h': nodes.data['t'] + 1})
assert_allclose(F.asnumpy(nf2.layers[-1].data['h']),
F.asnumpy(g.nodes[nf2.layer_parent_nid(-1)].data['h']),
rtol=1e-4, atol=1e-4)
torch.cuda.set_device(args.gpu)
features = features.cuda()
labels = labels.cuda()
train_mask = train_mask.cuda()
val_mask = val_mask.cuda()
test_mask = test_mask.cuda()
norm = norm.cuda()
g.ndata['features'] = features
num_neighbors = args.num_neighbors
n_layers = args.n_layers
g.ndata['norm'] = norm
g.update_all(fn.copy_src(src='features', out='m'),
fn.sum(msg='m', out='preprocess'),
lambda node : {'preprocess': node.data['preprocess'] * node.data['norm']})
for i in range(n_layers):
g.ndata['h_{}'.format(i)] = torch.zeros(features.shape[0], args.n_hidden).to(device=features.device)
g.ndata['h_{}'.format(n_layers-1)] = torch.zeros(features.shape[0], 2*args.n_hidden).to(device=features.device)
model = GCNSampling(in_feats,
args.n_hidden,
n_classes,
n_layers,
F.relu,
args.dropout)
features = bond_features(bond)
bond_src.append(begin_idx)
bond_dst.append(end_idx)
bond_x.append(features)
# set up the reverse direction
bond_src.append(end_idx)
bond_dst.append(begin_idx)
bond_x.append(features)
graph.add_edges(bond_src, bond_dst)
n_edges += n_bonds
return graph, torch.stack(atom_x), \
torch.stack(bond_x) if len(bond_x) > 0 else torch.zeros(0)
mpn_loopy_bp_msg = DGLF.copy_src(src='msg', out='msg')
mpn_loopy_bp_reduce = DGLF.sum(msg='msg', out='accum_msg')
class LoopyBPUpdate(nn.Module):
def __init__(self, hidden_size):
super(LoopyBPUpdate, self).__init__()
self.hidden_size = hidden_size
self.W_h = nn.Linear(hidden_size, hidden_size, bias=False)
def forward(self, nodes):
msg_input = nodes.data['msg_input']
msg_delta = self.W_h(nodes.data['accum_msg'])
msg = F.relu(msg_input + msg_delta)
return {'msg': msg}
def pagerank_builtin(g):
g.ndata['pv'] = g.ndata['pv'] / g.ndata['deg']
g.update_all(message_func=fn.copy_src(src='pv', out='m'),
reduce_func=fn.sum(msg='m',out='m_sum'))
g.ndata['pv'] = (1 - DAMP) / N + DAMP * g.ndata['m_sum']
torch.cuda.set_device(args.gpu)
features = features.cuda()
labels = labels.cuda()
train_mask = train_mask.cuda()
val_mask = val_mask.cuda()
test_mask = test_mask.cuda()
norm = norm.cuda()
g.ndata['features'] = features
num_neighbors = args.num_neighbors
n_layers = args.n_layers
g.ndata['norm'] = norm
g.update_all(fn.copy_src(src='features', out='m'),
fn.sum(msg='m', out='preprocess'),
lambda node : {'preprocess': node.data['preprocess'] * node.data['norm']})
for i in range(n_layers):
g.ndata['h_{}'.format(i)] = torch.zeros(features.shape[0], args.n_hidden).to(device=features.device)
g.ndata['h_{}'.format(n_layers-1)] = torch.zeros(features.shape[0], 2*args.n_hidden).to(device=features.device)
model = GCNSampling(in_feats,
args.n_hidden,
n_classes,
n_layers,
F.relu,
args.dropout)
for i in range(nf.num_blocks):
# aggregate history on the original graph
g.pull(nf.layer_parent_nid(i+1),
fn.copy_src(src='h_{}'.format(i), out='m'),
lambda node: {'agg_h_{}'.format(i): node.mailbox['m'].mean(axis=1)})
nf.copy_from_parent()
h = nf.layers[0].data['features']
for i in range(nf.num_blocks):
prev_h = nf.layers[i].data['h_{}'.format(i)]
# compute delta_h, the difference of the current activation and the history
nf.layers[i].data['delta_h'] = h - prev_h
# refresh the old history
nf.layers[i].data['h_{}'.format(i)] = h.detach()
# aggregate the delta_h
nf.block_compute(i,
fn.copy_src(src='delta_h', out='m'),
lambda node: {'delta_h': node.data['m'].mean(axis=1)})
delta_h = nf.layers[i + 1].data['delta_h']
agg_h = nf.layers[i + 1].data['agg_h_{}'.format(i)]
# control variate estimator
nf.layers[i + 1].data['h'] = delta_h + agg_h
nf.apply_layer(i + 1, lambda node : {'h' : layer(node.data['h'])})
h = nf.layers[i + 1].data['h']
# update history
nf.copy_to_parent()
##############################################################################
# You can see full example here, `MXNet
# code `__
# and `PyTorch
# code `__.
#