Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
fn.sum(msg='m', out=fld), apply_func)
g2.send_and_recv((u, v), message_func, reduce_func, apply_func)
g1_res = g1.ndata[fld]
g2_res = g2.ndata[fld]
assert np.allclose(g1_res.asnumpy(), g2_res.asnumpy(), rtol=1e-05, atol=1e-05)
g1_res.backward()
g2_res.backward()
assert np.allclose(g1_data.grad.asnumpy(), g2_data.grad.asnumpy(), rtol=1e-05, atol=1e-05)
# send and recv with edge weights
g1_data = g1.ndata[fld]
g1.send_and_recv((u, v), fn.src_mul_edge(src=fld, edge='e1', out='m'),
fn.sum(msg='m', out=fld), apply_func)
v2 = g1.ndata[fld]
g1.set_n_repr({fld : g1_data})
g1.send_and_recv((u, v), fn.src_mul_edge(src=fld, edge='e2', out='m'),
fn.sum(msg='m', out=fld), apply_func)
v3 = g1.ndata[fld]
assert np.allclose(v2.asnumpy(), v3.asnumpy(), rtol=1e-05, atol=1e-05)
g1.set_n_repr({fld : g1_data})
g2_data = g2.ndata[fld]
g1_data.attach_grad()
g2_data.attach_grad()
with mx.autograd.record():
g1.send_and_recv((u, v), fn.src_mul_edge(src=fld, edge='e2', out='m'),
fn.sum(msg='m', out=fld), apply_func)
g2.send_and_recv((u, v), message_func_edge, reduce_func, apply_func)
g1_res = g1.ndata[fld]
g2_res = g2.ndata[fld]
assert np.allclose(g1_res.asnumpy(), g2_res.asnumpy(), rtol=1e-05, atol=1e-05)
g1_res.backward()
g = simple_graph()
# update all
v1 = g.ndata[fld]
g.update_all(fn.copy_src(src=fld, out='m'), fn.sum(msg='m', out=fld), apply_func)
v2 = g.ndata[fld]
g.set_n_repr({fld : v1})
g.update_all(message_func, reduce_func, apply_func)
v3 = g.ndata[fld]
assert np.allclose(v2.asnumpy(), v3.asnumpy(), rtol=1e-05, atol=1e-05)
# update all with edge weights
v1 = g.ndata[fld]
g.update_all(fn.src_mul_edge(src=fld, edge='e1', out='m'),
fn.sum(msg='m', out=fld), apply_func)
v2 = g.ndata[fld]
g.set_n_repr({fld : v1})
g.update_all(fn.src_mul_edge(src=fld, edge='e2', out='m'),
fn.sum(msg='m', out=fld), apply_func)
v3 = g.ndata[fld].squeeze()
g.set_n_repr({fld : v1})
g.update_all(message_func_edge, reduce_func, apply_func)
v4 = g.ndata[fld]
assert np.allclose(v2.asnumpy(), v3.asnumpy(), rtol=1e-05, atol=1e-05)
assert np.allclose(v3.asnumpy(), v4.asnumpy(), rtol=1e-05, atol=1e-05)
# test 1d node features
g = generate_graph()
g.set_n_repr({'v1' : F.zeros((10,)), 'v2' : F.zeros((10,))})
fld = 'f2'
g.update_all(message_func, reduce_func)
v1 = g.ndata['v1']
# 1 message, 2 reduces
g.update_all(fn.copy_src(src=fld, out='m'), [fn.sum(msg='m', out='v2'), fn.sum(msg='m', out='v3')])
v2 = g.ndata['v2']
v3 = g.ndata['v3']
assert F.allclose(v1, v2)
assert F.allclose(v1, v3)
# update all with edge weights, 2 message, 3 reduces
g.update_all([fn.src_mul_edge(src=fld, edge='e1', out='m1'), fn.src_mul_edge(src=fld, edge='e2', out='m2')],
[fn.sum(msg='m1', out='v1'), fn.sum(msg='m2', out='v2'), fn.sum(msg='m1', out='v3')],
None)
v1 = g.ndata['v1']
v2 = g.ndata['v2']
v3 = g.ndata['v3']
assert F.allclose(v1, v2)
assert F.allclose(v1, v3)
# run UDF with single message and reduce
g.update_all(message_func_edge, reduce_func, None)
v2 = g.ndata['v2']
assert F.allclose(v1, v2)
return {fld : F.sum(nodes.mailbox['m'], 1)}
def apply_func(nodes):
return {fld : 2 * nodes.data[fld]}
g = generate_graph()
# update all
v1 = g.ndata[fld]
g.update_all(fn.copy_src(src=fld, out='m'), fn.sum(msg='m', out=fld), apply_func)
v2 = g.ndata[fld]
g.set_n_repr({fld : v1})
g.update_all(message_func, reduce_func, apply_func)
v3 = g.ndata[fld]
assert F.allclose(v2, v3)
# update all with edge weights
v1 = g.ndata[fld]
g.update_all(fn.src_mul_edge(src=fld, edge='e1', out='m'),
fn.sum(msg='m', out=fld), apply_func)
v2 = g.ndata[fld]
g.set_n_repr({fld : v1})
g.update_all(message_func_edge, reduce_func, apply_func)
v4 = g.ndata[fld]
assert F.allclose(v2, v4)
# test 1d node features
return {fld : mx.nd.max(nodes.mailbox['m'], axis=1)}
def apply_func(nodes):
return {fld : 2 * nodes.data[fld]}
g = simple_graph()
# update all
v1 = g.ndata[fld]
g.update_all(fn.copy_src(src=fld, out='m'), fn.max(msg='m', out=fld), apply_func)
v2 = g.ndata[fld]
g.set_n_repr({fld : v1})
g.update_all(message_func, reduce_func, apply_func)
v3 = g.ndata[fld]
assert np.allclose(v2.asnumpy(), v3.asnumpy(), rtol=1e-05, atol=1e-05)
# update all with edge weights
v1 = g.ndata[fld]
g.update_all(fn.src_mul_edge(src=fld, edge='e1', out='m'),
fn.max(msg='m', out=fld), apply_func)
v2 = g.ndata[fld]
g.set_n_repr({fld : v1})
g.update_all(fn.src_mul_edge(src=fld, edge='e2', out='m'),
fn.max(msg='m', out=fld), apply_func)
v3 = g.ndata[fld].squeeze()
g.set_n_repr({fld : v1})
g.update_all(message_func_edge, reduce_func, apply_func)
v4 = g.ndata[fld]
assert np.allclose(v2.asnumpy(), v3.asnumpy(), rtol=1e-05, atol=1e-05)
assert np.allclose(v3.asnumpy(), v4.asnumpy(), rtol=1e-05, atol=1e-05)
# test 1d node features
def apply_func(nodes):
return {fld : 2 * nodes.data[fld]}
g = generate_graph()
# send and recv
v1 = g.ndata[fld]
g.send_and_recv((u, v), fn.copy_src(src=fld, out='m'),
fn.sum(msg='m', out=fld), apply_func)
v2 = g.ndata[fld]
g.set_n_repr({fld : v1})
g.send_and_recv((u, v), message_func, reduce_func, apply_func)
v3 = g.ndata[fld]
assert F.allclose(v2, v3)
# send and recv with edge weights
v1 = g.ndata[fld]
g.send_and_recv((u, v), fn.src_mul_edge(src=fld, edge='e1', out='m'),
fn.sum(msg='m', out=fld), apply_func)
v2 = g.ndata[fld]
g.set_n_repr({fld : v1})
g.send_and_recv((u, v), message_func_edge, reduce_func, apply_func)
v4 = g.ndata[fld]
assert F.allclose(v2, v4)
# test 1d node features
def check_flow_compute2(create_node_flow):
num_layers = 2
g = generate_rand_graph(100)
g.edata['h'] = F.ones((g.number_of_edges(), 10))
nf = create_node_flow(g, num_layers)
nf.copy_from_parent()
g.ndata['h'] = g.ndata['h1']
nf.layers[0].data['h'] = nf.layers[0].data['h1']
for i in range(num_layers):
nf.block_compute(i, SrcMulEdgeMessageFunction(
'h', 'h', 't'), fn.sum('t', 'h1'))
nf.block_compute(i, fn.src_mul_edge('h', 'h', 'h'), fn.sum('h', 'h'))
g.update_all(fn.src_mul_edge('h', 'h', 'h'), fn.sum('h', 'h'))
assert_allclose(F.asnumpy(nf.layers[i + 1].data['h1']),
F.asnumpy(nf.layers[i + 1].data['h']),
rtol=1e-4, atol=1e-4)
assert_allclose(F.asnumpy(nf.layers[i + 1].data['h']),
F.asnumpy(
g.nodes[nf.layer_parent_nid(i + 1)].data['h']),
rtol=1e-4, atol=1e-4)
nf = create_node_flow(g, num_layers)
g.ndata['h'] = g.ndata['h1']
nf.copy_from_parent()
for i in range(nf.num_layers):
nf.layers[i].data['h'] = nf.layers[i].data['h1']
for i in range(num_layers):
nf.block_compute(i, fn.u_mul_v('h', 'h', 't'), fn.sum('t', 's'))
g.update_all(fn.u_mul_v('h', 'h', 't'), fn.sum('t', 's'))
assert F.allclose(o1, g.ndata.pop('o1'))
# v2v fallback to e2v
g.pull(nodes, fn.src_mul_edge(src='h', edge='w2', out='m2'),
fn.sum(msg='m2', out='o2'),
_afunc)
assert F.allclose(o2, g.ndata.pop('o2'))
# multi builtins, both v2v spmv
g.pull(nodes,
[fn.src_mul_edge(src='h', edge='w1', out='m1'), fn.src_mul_edge(src='h', edge='w1', out='m2')],
[fn.sum(msg='m1', out='o1'), fn.sum(msg='m2', out='o2')],
_afunc)
assert F.allclose(o1, g.ndata.pop('o1'))
assert F.allclose(o1, g.ndata.pop('o2'))
# multi builtins, one v2v spmv, one fallback to e2v
g.pull(nodes,
[fn.src_mul_edge(src='h', edge='w1', out='m1'), fn.src_mul_edge(src='h', edge='w2', out='m2')],
[fn.sum(msg='m1', out='o1'), fn.sum(msg='m2', out='o2')],
_afunc)
assert F.allclose(o1, g.ndata.pop('o1'))
assert F.allclose(o2, g.ndata.pop('o2'))
# test#1: non-0deg nodes
def forward(self, x):
x = torch.matmul(x, self.weight)
x = x.reshape((x.size(0), self.heads, -1)) # NxHxD'
head_x = x.transpose(0, 1) # HxNxD'
a1 = torch.bmm(head_x, self.att_l).transpose(0, 1) # NxHx1
a2 = torch.bmm(head_x, self.att_r).transpose(0, 1) # NxHx1
self.g.ndata.update({'x': x, 'a1': a1, 'a2': a2})
self.g.apply_edges(self.edge_attention)
self.edge_softmax()
self.g.update_all(fn.src_mul_edge('x', 'a', 'x'), fn.sum('x', 'x'))
x = self.g.ndata['x'] / self.g.ndata['z'] # NxHxD'
return x.view(-1, self.heads * self.out_channels)
def forward(self, inputs):
# prepare
h = self.feat_drop(inputs) # NxD
ft = self.fc(h).reshape((h.shape[0], self.num_heads, -1)) # NxHxD'
a1 = (ft * self.attn_l).sum(dim=-1).unsqueeze(-1) # N x H x 1
a2 = (ft * self.attn_r).sum(dim=-1).unsqueeze(-1) # N x H x 1
self.g.ndata.update({'ft' : ft, 'a1' : a1, 'a2' : a2})
# 1. compute edge attention
self.g.apply_edges(self.edge_attention)
# 2. compute softmax
self.edge_softmax()
# 3. compute the aggregated node features scaled by the dropped,
# unnormalized attention values.
self.g.update_all(fn.src_mul_edge('ft', 'a_drop', 'ft'), fn.sum('ft', 'ft'))
ret = self.g.ndata['ft']
# 4. residual
if self.residual:
if self.res_fc is not None:
resval = self.res_fc(h).reshape((h.shape[0], self.num_heads, -1)) # NxHxD'
else:
resval = torch.unsqueeze(h, 1) # Nx1xD'
ret = resval + ret
return ret