Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@switchconfig(platform='knl7210') # To trigger nested parallelism
def test_nested_nthreads():
grid = Grid(shape=(96, 96, 96))
f = TimeFunction(name='f', grid=grid)
op = Operator(Eq(f.forward, f + 1.), dle=('advanced', {'openmp': True}))
op.apply(time=10, autotune=True)
assert op._state['autotuning'][0]['runs'] == 6
assert op._state['autotuning'][0]['tpr'] == options['squeezer'] + 1
assert len(op._state['autotuning'][0]['tuned']) == 3
assert 'nthreads' in op._state['autotuning'][0]['tuned']
# No tuning for the nested level
assert 'nthreads_nested' not in op._state['autotuning'][0]['tuned']
def test_directly_indexed_expression(self, exprs):
"""
Test that equations using integer indices are inserted in the right
loop nest, at the right loop nest depth.
"""
grid = Grid(shape=(4, 4, 4))
x, y, z = grid.dimensions # noqa
ti0 = Function(name='ti0', grid=grid, space_order=0) # noqa
t0 = Scalar(name='t0') # noqa
eqs = [eval(exprs[0]), eval(exprs[1])]
op = Operator(eqs, opt='noop')
trees = retrieve_iteration_tree(op)
assert len(trees) == 2
assert trees[0][-1].nodes[0].exprs[0].expr.rhs == eqs[0].rhs
assert trees[1][-1].nodes[0].exprs[0].expr.rhs == eqs[1].rhs
3 2 1 0
"""
grid = Grid(shape=(4, 4, 4))
x, y, z = grid.dimensions
t = grid.stepping_dim
p = SparseTimeFunction(name='points', grid=grid, nt=1, npoint=4)
u = TimeFunction(name='yu4D', grid=grid, space_order=0)
for i in range(4):
for j in range(4):
for k in range(4):
u.data[0, i, j, k] = k
ind = lambda i: p[0, i]
eqs = [Eq(p[0, 0], 3.), Eq(p[0, 1], 2.),
Eq(p[0, 2], 1.), Eq(p[0, 3], 0.),
Eq(u[t + 1, ind(x), ind(y), ind(z)], u[t, x, y, z])]
op = Operator(eqs, subs=grid.spacing_map)
op(yu4D=u, time=0)
assert 'run_solution' not in str(op)
assert all(np.all(u.data[1, :, :, i] == 3 - i) for i in range(4))
def test_inject_from_field(shape, coords, result, npoints=19):
"""Test point injection from a second field along a line
through the middle of the grid.
"""
a = unit_box(shape=shape)
a.data[:] = 0.
b = Function(name='b', grid=a.grid)
b.data[:] = 1.
p = points(a.grid, ranges=coords, npoints=npoints)
expr = p.inject(field=a, expr=b)
Operator(expr)(a=a, b=b)
indices = [slice(4, 6, 1) for _ in coords]
indices[0] = slice(1, -1, 1)
assert np.allclose(a.data[indices], result, rtol=1.e-5)
def test_heap_1D(self):
i, j = dimensions('i j')
a = Array(name='a', dimensions=(i,))
b = Array(name='b', dimensions=(i,))
f = Function(name='f', shape=(3,), dimensions=(j,))
op = Operator([Eq(a[i], a[i] + b[i] + 5.),
Eq(f[j], a[j])])
assert op.body[0].body[0].is_PointerCast
assert str(op.body[0].body[0]) == ('float (*restrict f) __attribute__ '
'((aligned (64))) = (float (*)) f_vec->data;')
assert op.body[1].is_List
assert str(op.body[1].header[0]) == 'float (*a);'
assert str(op.body[1].header[1]) == ('posix_memalign((void**)&a, 64, '
'sizeof(float[i_size]));')
assert str(op.body[1].footer[0]) == ''
assert str(op.body[1].footer[1]) == 'free(a);'
def test_forward(a):
a.data[0, :] = 1.
Operator(Eq(a.forward, a + 1.))()
for i in range(a.shape[0]):
assert np.allclose(a.data[i, :], 1. + i, rtol=1.e-12)
def test_fixed_halo_w_ofs(self, space_order):
"""
Compute an N-point stencil sum, where N is the number of points sorrounding
an inner (i.e., non-border) grid point.
For example (in 2D view):
1 1 1 ... 1 1
1 4 4 ... 4 1
1 4 4 ... 4 1
1 4 4 ... 4 1
1 1 1 ... 1 1
"""
grid = Grid(shape=(17, 17, 17))
v = TimeFunction(name='yv4D', grid=grid, space_order=space_order)
v.data_with_halo[:] = 1.
op = Operator(Eq(v.forward, v.laplace + 6.0*v), subs=grid.spacing_map)
op(yv4D=v, time=0)
assert 'run_solution' in str(op)
# Chech that the domain size has actually been written to
assert np.all(v.data[1] == 6.)
# Check that the halo planes are untouched
assert all(np.all(v.data_with_halo[1, i, :, :] == 1)
for i in range(v._size_halo.left[1]))
assert all(np.all(v.data_with_halo[1, :, i, :] == 1)
for i in range(v._size_halo.left[2]))
assert all(np.all(v.data_with_halo[1, :, :, i] == 1)
for i in range(v._size_halo.left[3]))
@pytest.mark.parametrize('expr, result', [
('Eq(a[1, j, l], a[0, j - 1 , l] + 1.)', np.zeros((5, 6)) + 3.),
])
def test_indexed_open_loops(self, expr, result):
"""Test point-wise arithmetic with stencil offsets and open loop
boundaries in indexed expression format"""
i, j, l = dimify('i j l')
a = Function(name='a', dimensions=(i, j, l), shape=(3, 5, 6)).indexed
fa = a.function
fa.data[0, :, :] = 2.
eqn = eval(expr)
Operator(eqn)(a=fa)
assert np.allclose(fa.data[1, 1:-1, 1:-1], result[1:-1, 1:-1], rtol=1e-12)
@pytest.mark.parametrize('expr, result', [
('Eq(a[1, j, l], a[0, j - 1 , l] + 1.)', np.zeros((5, 6)) + 3.),
('Eq(a[1, j, l], a[0, j , l - 1] + 1.)', np.zeros((5, 6)) + 3.),
('Eq(a[1, j, l], a[0, j - 1, l - 1] + 1.)', np.zeros((5, 6)) + 3.),
('Eq(a[1, j, l], a[0, j + 1, l + 1] + 1.)', np.zeros((5, 6)) + 3.),
])
def test_indexed_buffered(self, expr, result):
"""Test point-wise arithmetic with stencil offsets across a single
functions with buffering dimension in indexed expression format"""
i, j, l = dimify('i j l')
a = symbol(name='a', dimensions=(i, j, l), value=2., shape=(3, 5, 6),
mode='indexed').base
fa = a.function
eqn = eval(expr)
Operator(eqn)(a=fa)
assert np.allclose(fa.data[1, 1:-1, 1:-1], result[1:-1, 1:-1], rtol=1e-12)
s = model.grid.stepping_dim.spacing
eqn = iso_stencil(v, time_order, m, s, damp, forward=False)
if time_order == 2:
gradient_update = Eq(grad, grad - u.dt2 * v)
else:
gradient_update = Eq(grad, grad - (u.dt2 +
s**2 / 12.0 * u.laplace2(m**(-2))) * v)
# Add expression for receiver injection
receivers = rec.inject(field=v.backward, expr=rec * dt**2 / m,
offset=model.nbpml)
# Substitute spacing terms to reduce flops
return Operator(eqn + receivers + [gradient_update], subs=model.spacing_map,
time_axis=Backward, name='Gradient', **kwargs)