Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_scheduling(self):
"""
Affine iterations -> #pragma omp ... schedule(dynamic,1) ...
Non-affine iterations -> #pragma omp ... schedule(dynamic,chunk_size) ...
"""
grid = Grid(shape=(11, 11))
u = TimeFunction(name='u', grid=grid, time_order=2, save=5, space_order=0)
sf1 = SparseTimeFunction(name='s', grid=grid, npoint=1, nt=5)
eqns = [Eq(u.forward, u + 1)]
eqns += sf1.interpolate(u)
op = Operator(eqns, dle='openmp')
iterations = FindNodes(Iteration).visit(op)
assert len(iterations) == 4
assert iterations[1].is_Affine
assert 'schedule(dynamic,1)' in iterations[1].pragmas[0].value
assert not iterations[3].is_Affine
assert 'schedule(dynamic,chunk_size)' in iterations[3].pragmas[0].value
@pytest.mark.parametrize('dse', ['noop', 'basic', 'advanced', 'aggressive'])
@pytest.mark.parametrize('dle', ['noop', 'advanced', 'speculative'])
def test_time_dependent_split(dse, dle):
grid = Grid(shape=(10, 10))
u = TimeFunction(name='u', grid=grid, time_order=2, space_order=2, save=3)
v = TimeFunction(name='v', grid=grid, time_order=2, space_order=0, save=3)
# The second equation needs a full loop over x/y for u then
# a full one over x.y for v
eq = [Eq(u.forward, 2 + grid.time_dim),
Eq(v.forward, u.forward.dx + u.forward.dy + 1)]
op = Operator(eq, dse=dse, dle=dle)
trees = retrieve_iteration_tree(op)
assert len(trees) == 2
op()
assert np.allclose(u.data[2, :, :], 3.0)
assert np.allclose(v.data[1, 1:-1, 1:-1], 1.0)
def test_sparse_function_hash(FunctionType):
"""Test that different Functions have different hash value."""
grid0 = Grid(shape=(3, 3))
u0 = FunctionType(name='u', grid=grid0, npoint=1, nt=10)
grid1 = Grid(shape=(4, 4))
u1 = FunctionType(name='u', grid=grid1, npoint=1, nt=10)
assert u0 is not u1
assert hash(u0) != hash(u1)
# Now with the same grid
u2 = FunctionType(name='u', grid=grid0, npoint=1, nt=10)
assert u0 is not u2
assert hash(u0) != hash(u2)
def test_uncontracted_shape(self):
"""
Like `test_contracted_shape`, but the potential contraction is
now along the innermost Dimension, which causes falling back to
3D Arrays.
"""
grid = Grid(shape=(3, 3, 3))
x, y, z = grid.dimensions # noqa
t = grid.stepping_dim
f = Function(name='f', grid=grid)
f.data_with_halo[:] = 1.
u = TimeFunction(name='u', grid=grid, space_order=3)
u.data_with_halo[:] = 0.5
# Leads to 3D aliases
eqn = Eq(u.forward, ((u[t, x, y, z] + u[t, x+1, y+1, z])*3*f +
(u[t, x+2, y+2, z] + u[t, x+3, y+3, z])*3*f + 1))
op0 = Operator(eqn, opt=('noop', {'openmp': True}))
op1 = Operator(eqn, opt=('advanced', {'openmp': True, 'cire-mincost-sops': 1}))
x0_blk_size = op1.parameters[2]
y0_blk_size = op1.parameters[3]
def test_full_shape(self):
"""
Check the shape of the Array used to store an aliasing expression.
The shape is impacted by loop blocking, which reduces the required
write-to space.
"""
grid = Grid(shape=(3, 3, 3))
x, y, z = grid.dimensions # noqa
t = grid.stepping_dim
f = Function(name='f', grid=grid)
f.data_with_halo[:] = 1.
u = TimeFunction(name='u', grid=grid, space_order=3)
u.data_with_halo[:] = 0.5
# Leads to 3D aliases
eqn = Eq(u.forward, ((u[t, x, y, z] + u[t, x+1, y+1, z+1])*3*f +
(u[t, x+2, y+2, z+2] + u[t, x+3, y+3, z+3])*3*f + 1))
op0 = Operator(eqn, opt=('noop', {'openmp': True}))
op1 = Operator(eqn, opt=('advanced', {'openmp': True, 'cire-mincost-sops': 1}))
x0_blk_size = op1.parameters[2]
y0_blk_size = op1.parameters[3]
def test_is_param(ndim):
"""
Test that only parameter are evaluated at the variable anf Function and FD indices
stay unchanged
"""
grid = Grid(tuple([10]*ndim))
dims = list(powerset(grid.dimensions))[1:]
var = Function(name="f", grid=grid, staggered=NODE)
for d in dims:
f = Function(name="f", grid=grid, staggered=d)
f2 = Function(name="f2", grid=grid, staggered=d, parameter=True)
# Not a parameter stay untouched (or FD would be destroyed by _eval_at)
assert f._eval_at(var).evaluate == f
# Parameter, automatic averaging
avg = f2
for dd in d:
avg = .5 * (avg + avg.subs({dd: dd - dd.spacing}))
assert f2._eval_at(var).evaluate == avg
@patch("devito.passes.clusters.aliases.MIN_COST_ALIAS", 1)
@patch("devito.passes.iet.openmp.Ompizer.NESTED", 0)
@patch("devito.passes.iet.openmp.Ompizer.COLLAPSE_NCORES", 10000)
def test_multiple_subnests(self):
grid = Grid(shape=(3, 3, 3))
x, y, z = grid.dimensions
t = grid.stepping_dim
f = Function(name='f', grid=grid)
u = TimeFunction(name='u', grid=grid)
eqn = Eq(u.forward, ((u[t, x, y, z] + u[t, x+1, y+1, z+1])*3*f +
(u[t, x+2, y+2, z+2] + u[t, x+3, y+3, z+3])*3*f + 1))
op = Operator(eqn, dse='aggressive', dle=('advanced', {'openmp': True}))
trees = retrieve_iteration_tree(op._func_table['bf0'].root)
assert len(trees) == 2
assert trees[0][0] is trees[1][0]
assert trees[0][0].pragmas[0].value ==\
'omp for collapse(1) schedule(dynamic,1)'
def test_sparsefunction_inject(self):
"""
Test injection of a SparseFunction into a Function
"""
grid = Grid(shape=(11, 11))
u = Function(name='u', grid=grid, space_order=0)
sf1 = SparseFunction(name='s', grid=grid, npoint=1)
op = Operator(sf1.inject(u, expr=sf1))
assert sf1.data.shape == (1, )
sf1.coordinates.data[0, :] = (0.6, 0.6)
sf1.data[0] = 5.0
u.data[:] = 0.0
op.apply()
# This should be exactly on a point, all others 0
assert u.data[6, 6] == pytest.approx(5.0)
assert np.sum(u.data) == pytest.approx(5.0)
@pytest.mark.parallel(mode=4)
def test_niche_slicing(self):
grid0 = Grid(shape=(8, 8))
x0, y0 = grid0.dimensions
glb_pos_map0 = grid0.distributor.glb_pos_map
f = Function(name='f', grid=grid0, space_order=0, dtype=np.int32)
dat = np.arange(64, dtype=np.int32)
a = dat.reshape(grid0.shape)
f.data[:] = a
grid1 = Grid(shape=(12, 12))
x1, y1 = grid1.dimensions
glb_pos_map1 = grid1.distributor.glb_pos_map
h = Function(name='h', grid=grid1, space_order=0, dtype=np.int32)
grid2 = Grid(shape=(4, 4, 4))
t = Function(name='t', grid=grid2, space_order=0)
b = dat.reshape(grid2.shape)
t.data[:] = b
@switchconfig(log_level='WARNING')
def test_segmented_fibonacci():
"""
Test for segmented operator execution of a one-sided second order
function (fibonacci). The corresponding set of stencil offsets in
the time domain is (2, 0)
"""
# Reference Fibonacci solution from:
# https://stackoverflow.com/questions/4935957/fibonacci-numbers-with-an-one-liner-in-python-3
fib = lambda n: reduce(lambda x, n: [x[1], x[0] + x[1]], range(n), [0, 1])[0]
grid = Grid(shape=(5, 5))
x, y = grid.dimensions
t = grid.stepping_dim
f = TimeFunction(name='f', grid=grid, time_order=2)
fi = f.indexed
op = Operator(Eq(fi[t, x, y], fi[t-1, x, y] + fi[t-2, x, y]))
# Reference solution with a single invocation, up to timestep=12 (included)
# =========================================================================
# Developer note: the i-th Fibonacci number resides at logical index i-1
f_ref = TimeFunction(name='f', grid=grid, time_order=2)
f_ref.data[:] = 1.
op(f=f_ref, time=12)
assert (f_ref.data[11] == fib(12)).all()
assert (f_ref.data[12] == fib(13)).all()
# Now run with 2 invocations of 5 timesteps each