Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_nodeflow(g, node_ids, num_layers):
batch_size = len(node_ids)
expand_factor = g.number_of_nodes()
sampler = dgl.contrib.sampling.NeighborSampler(g, batch_size,
expand_factor=expand_factor, num_hops=num_layers,
seed_nodes=node_ids)
return next(iter(sampler))
# \hat{z}_v^{(l+1)} = \frac{\vert \mathcal{N}(v) \vert }{\vert \hat{\mathcal{N}}^{(l)}(v) \vert} \sum_{u \in \hat{\mathcal{N}}^{(l)}(v)} \tilde{A}_{uv} ( \hat{h}_u^{(l)} - \bar{h}_u^{(l)} ) + \sum_{u \in \mathcal{N}(v)} \tilde{A}_{uv} \bar{h}_u^{(l)} \\
# \hat{h}_v^{(l+1)} = \sigma ( \hat{z}_v^{(l+1)} W^{(l)} )
#
# This method can also be *conceptually* implemented in DGL as shown
# here.
#
have_large_memory = False
# The control-variate sampling code below needs to run on a large-memory
# machine for the Reddit graph.
if have_large_memory:
g.ndata['h_0'] = features
for i in range(L):
g.ndata['h_{}'.format(i+1)] = mx.nd.zeros((features.shape[0], n_hidden))
# With control-variate sampling, you only need to sample two neighbors to train GCN.
for nf in dgl.contrib.sampling.NeighborSampler(g, batch_size, expand_factor=2,
neighbor_type='in', num_hops=L,
seed_nodes=train_nid):
for i in range(nf.num_blocks):
# aggregate history on the original graph
g.pull(nf.layer_parent_nid(i+1),
fn.copy_src(src='h_{}'.format(i), out='m'),
lambda node: {'agg_h_{}'.format(i): node.mailbox['m'].mean(axis=1)})
nf.copy_from_parent()
h = nf.layers[0].data['features']
for i in range(nf.num_blocks):
prev_h = nf.layers[i].data['h_{}'.format(i)]
# compute delta_h, the difference of the current activation and the history
nf.layers[i].data['delta_h'] = h - prev_h
# refresh the old history
nf.layers[i].data['h_{}'.format(i)] = h.detach()
# aggregate the delta_h
data = load_data(args)
ctx = mx.cpu()
if args.self_loop and not args.dataset.startswith('reddit'):
data.graph.add_edges_from([(i,i) for i in range(len(data.graph))])
train_nid = mx.nd.array(np.nonzero(data.train_mask)[0]).astype(np.int64).as_in_context(ctx)
test_nid = mx.nd.array(np.nonzero(data.test_mask)[0]).astype(np.int64).as_in_context(ctx)
# create GCN model
g = DGLGraph(data.graph, readonly=True)
while True:
idx = 0
for nf in dgl.contrib.sampling.NeighborSampler(g, args.batch_size,
args.num_neighbors,
neighbor_type='in',
shuffle=is_shuffle,
num_workers=32,
num_hops=number_hops,
add_self_loop=self_loop,
seed_nodes=train_nid):
print("send train nodeflow: %d" %(idx))
sender.send(nf, 0)
idx += 1
sender.signal(0)
# forward
pred = model(nf)
batch_nids = nf.layer_parent_nid(-1).to(device=pred.device, dtype=torch.long)
batch_labels = labels[batch_nids]
loss = loss_fcn(pred, batch_labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
for infer_param, param in zip(infer_model.parameters(), model.parameters()):
infer_param.data.copy_(param.data)
num_acc = 0.
for nf in dgl.contrib.sampling.NeighborSampler(g, args.test_batch_size,
g.number_of_nodes(),
neighbor_type='in',
num_workers=32,
num_hops=args.n_layers+1,
seed_nodes=test_nid):
nf.copy_from_parent()
infer_model.eval()
with torch.no_grad():
pred = infer_model(nf)
batch_nids = nf.layer_parent_nid(-1).to(device=pred.device, dtype=torch.long)
batch_labels = labels[batch_nids]
num_acc += (pred.argmax(dim=1) == batch_labels).sum().cpu().item()
print("Test Accuracy {:.4f}". format(num_acc/n_test_samples))
# forward
pred = model(nf)
batch_nids = nf.layer_parent_nid(-1).to(device=pred.device, dtype=torch.long)
batch_labels = labels[batch_nids]
loss = loss_fcn(pred, batch_labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
for infer_param, param in zip(infer_model.parameters(), model.parameters()):
infer_param.data.copy_(param.data)
num_acc = 0.
for nf in dgl.contrib.sampling.NeighborSampler(g, args.test_batch_size,
g.number_of_nodes(),
neighbor_type='in',
num_workers=32,
num_hops=args.n_layers+1,
seed_nodes=test_nid):
nf.copy_from_parent()
infer_model.eval()
with torch.no_grad():
pred = infer_model(nf)
batch_nids = nf.layer_parent_nid(-1).to(device=pred.device, dtype=torch.long)
batch_labels = labels[batch_nids]
num_acc += (pred.argmax(dim=1) == batch_labels).sum().cpu().item()
print("Test Accuracy {:.4f}". format(num_acc/n_test_samples))
# ~~~~~~~~~
#
# In fact, ``block_compute`` is one of the APIs that comes with
# ``NodeFlow``, which provides flexibility to research new ideas. The
# computation flow underlying a DAG can be executed in one sweep, by
# calling ``prop_flows``.
#
# ``prop_flows`` accepts a list of UDFs. The code below defines node update UDFs
# for each layer and computes a simplified version of GCN with neighbor sampling.
#
apply_node_funcs = [
lambda node : {'h' : layers[0](node)['activation']},
lambda node : {'h' : layers[1](node)['activation']},
]
for nf in dgl.contrib.sampling.NeighborSampler(g, batch_size, num_neighbors,
neighbor_type='in', num_hops=L,
seed_nodes=train_nid):
nf.copy_from_parent()
nf.layers[0].data['h'] = nf.layers[0].data['features']
nf.prop_flow(fn.copy_src(src='h', out='m'),
fn.sum(msg='m', out='h'), apply_node_funcs)