Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Examples
--------
Given the expression
sqrt(cos(a[x, y]))
We should get
t0 = cos(a[x,y])
t1 = sqrt(t0)
out = t1 # pseudocode
"""
grid = Grid(shape=(3, 3))
x, y = grid.dimensions # noqa
u = TimeFunction(name='u', grid=grid)
g = Function(name='g', grid=grid)
op = Operator(Eq(u.forward, u + sin(cos(g)) + sin(cos(g[x+1, y+1]))))
# We expect two temporary Arrays: `r1 = cos(g)` and `r2 = sqrt(r1)`
arrays = [i for i in FindSymbols().visit(op) if i.is_Array and i._mem_local]
assert len(arrays) == 2
assert all(i._mem_heap and not i._mem_external for i in arrays)
def test_const_change(self):
"""
Test that Constand.data can be set as required.
"""
n = 5
t = Constant(name='t', dtype=np.int32)
grid = Grid(shape=(2, 2))
x, y = grid.dimensions
f = TimeFunction(name='f', grid=grid, save=n+1)
f.data[:] = 0
eq = Eq(f.dt-1)
stencil = Eq(f.forward, solve(eq, f.forward))
op = Operator([stencil])
op.apply(time_m=0, time_M=n-1, dt=1)
check = Function(name='check', grid=grid)
eq_test = Eq(check, f[t, x, y])
op_test = Operator([eq_test])
for j in range(0, n+1):
t.data = j # Ensure constant is being updated correctly
op_test.apply(t=t)
assert(np.amax(check.data[:], axis=None) == j)
assert(np.amin(check.data[:], axis=None) == j)
Test conditional dimension for the spatial ones.
This test saves u every two grid points :
u2[x, y] = u[2*x, 2*y]
"""
nt = 19
grid = Grid(shape=(12, 12))
time = grid.time_dim
u = TimeFunction(name='u', grid=grid, save=nt)
assert(grid.time_dim in u.indices)
# Creates subsampled spatial dimensions and accordine grid
dims = tuple([ConditionalDimension(d.name+'sub', parent=d, factor=2)
for d in u.grid.dimensions])
grid2 = Grid((6, 6), dimensions=dims, time_dimension=time)
u2 = TimeFunction(name='u2', grid=grid2, save=nt)
assert(time in u2.indices)
eqns = [Eq(u.forward, u + 1.), Eq(u2, u)]
op = Operator(eqns)
op.apply(time_M=nt-2)
# Verify that u2[x,y]= u[2*x, 2*y]
assert np.allclose(u.data[:-1, 0:-1:2, 0:-1:2], u2.data[:-1, :, :])
def test_op_new_dist(self):
"""
Test that an operator made with one distributor produces correct results
when executed with a different distributor.
"""
grid = Grid(shape=(10, 10), comm=MPI.COMM_SELF)
grid2 = Grid(shape=(10, 10), comm=MPI.COMM_WORLD)
u = TimeFunction(name='u', grid=grid, space_order=2)
u2 = TimeFunction(name='u2', grid=grid2, space_order=2)
x, y = np.ix_(np.linspace(-1, 1, grid.shape[0]),
np.linspace(-1, 1, grid.shape[1]))
dx = x - 0.5
dy = y
u.data[0, :, :] = 1.0 * ((dx*dx + dy*dy) < 0.125)
u2.data[0, :, :] = 1.0 * ((dx*dx + dy*dy) < 0.125)
# Create some operator that requires MPI communication
eqn = Eq(u.forward, u + 0.000001 * u.laplace)
op = Operator(eqn)
op.apply(u=u, time_M=10)
op.apply(u=u2, time_M=10)
assert abs(norm(u) - norm(u2)) < 1.e-3
def test_interior_w_stencil(self):
grid = Grid(shape=(10,))
x = grid.dimensions[0]
t = grid.stepping_dim
u = TimeFunction(name='u', grid=grid)
op = Operator(Eq(u.forward, u[t, x-1] + u[t, x+1] + 1, subdomain=grid.interior))
op.apply(time=1)
glb_pos_map = u.grid.distributor.glb_pos_map
if LEFT in glb_pos_map[x]:
assert np.all(u.data_ro_domain[0, 1] == 2.)
assert np.all(u.data_ro_domain[0, 2:] == 3.)
else:
assert np.all(u.data_ro_domain[0, -2] == 2.)
assert np.all(u.data_ro_domain[0, :-2] == 3.)
def test_scheduling_sparse_functions(self):
"""Tests loop scheduling in presence of sparse functions."""
grid = Grid((10, 10))
time = grid.time_dim
u1 = TimeFunction(name="u1", grid=grid, save=10, time_order=2)
u2 = TimeFunction(name="u2", grid=grid, time_order=2)
sf1 = SparseTimeFunction(name='sf1', grid=grid, npoint=1, nt=10)
sf2 = SparseTimeFunction(name='sf2', grid=grid, npoint=1, nt=10)
# Deliberately inject into u1, rather than u1.forward, to create a WAR w/ eqn3
eqn1 = Eq(u1.forward, u1 + 2.0 - u1.backward)
eqn2 = sf1.inject(u1, expr=sf1)
eqn3 = Eq(u2.forward, u2 + 2*u2.backward - u1.dt2)
eqn4 = sf2.interpolate(u2)
# Note: DLE disabled only because with OpenMP otherwise there might be more
# `trees` than 4
op = Operator([eqn1] + eqn2 + [eqn3] + eqn4, dle=('noop', {'openmp': False}))
trees = retrieve_iteration_tree(op)
assert len(trees) == 4
# Time loop not shared due to the WAR
assert trees[0][0].dim is time and trees[0][0] is trees[1][0] # this IS shared
def test_discarding_runs():
grid = Grid(shape=(64, 64, 64))
f = TimeFunction(name='f', grid=grid)
op = Operator(Eq(f.forward, f + 1.),
opt=('advanced', {'openmp': True, 'par-collapse-ncores': 1}))
op.apply(time=100, nthreads=4, autotune='aggressive')
assert op._state['autotuning'][0]['runs'] == 18
assert op._state['autotuning'][0]['tpr'] == options['squeezer'] + 1
assert len(op._state['autotuning'][0]['tuned']) == 3
assert op._state['autotuning'][0]['tuned']['nthreads'] == 4
# With 1 < 4 threads, the AT eventually tries many more combinations
op.apply(time=100, nthreads=1, autotune='aggressive')
assert op._state['autotuning'][1]['runs'] == 25
assert op._state['autotuning'][1]['tpr'] == options['squeezer'] + 1
assert len(op._state['autotuning'][1]['tuned']) == 3
def test_basic(self):
nt = 19
grid = Grid(shape=(11, 11))
time = grid.time_dim
u = TimeFunction(name='u', grid=grid)
assert(grid.stepping_dim in u.indices)
u2 = TimeFunction(name='u2', grid=grid, save=nt)
assert(time in u2.indices)
factor = 4
time_subsampled = ConditionalDimension('t_sub', parent=time, factor=factor)
usave = TimeFunction(name='usave', grid=grid, save=(nt+factor-1)//factor,
time_dim=time_subsampled)
assert(time_subsampled in usave.indices)
eqns = [Eq(u.forward, u + 1.), Eq(u2.forward, u2 + 1.), Eq(usave, u)]
op = Operator(eqns)
op.apply(t_M=nt-2)
assert np.all(np.allclose(u.data[(nt-1) % 3], nt-1))
assert np.all([np.allclose(u2.data[i], i) for i in range(nt)])
wOverQ = Function(name='wOverQ', grid=vel.grid, space_order=space_order)
b.data[:] = 1.0
f.data[:] = 0.84
vel.data[:] = 1.5
eps.data[:] = 0.2
eta.data[:] = 0.4
wOverQ.data[:] = 1.0
t0 = 0.0
t1 = 250.0
dt = 1.0
time_axis = TimeAxis(start=t0, stop=t1, step=dt)
p0 = TimeFunction(name='p0', grid=grid, time_order=2, space_order=space_order)
m0 = TimeFunction(name='m0', grid=grid, time_order=2, space_order=space_order)
t, x, y, z = p0.dimensions
src_coords = np.empty((1, len(shape)), dtype=dtype)
src_coords[0, :] = [d * (s-1)//2 for d, s in zip(spacing, shape)]
src = RickerSource(name='src', grid=vel.grid, f0=fpeak, npoint=1, time_range=time_axis)
src.coordinates.data[:] = src_coords[:]
src_term = src.inject(field=p0.forward, expr=src * t.spacing**2 * vel**2 / b)
def g1(field):
return field.dx(x0=x+x.spacing/2)
def g2(field):
return field.dy(x0=y+y.spacing/2)
src_term = src.inject(field=save_src[src.dimensions[0], source_id], expr=src)
op1 = Operator([src_term])
op1.apply()
u2 = TimeFunction(name="u2", grid=model.grid, space_order=so)
sp_zi = Dimension(name='sp_zi')
source_mask_f = TimeFunction(name='source_mask_f', grid=model.grid, time_order=0,
dtype=np.int32)
source_mask_f.data[0, :, :, :] = source_mask.data[:, :, :]
sp_source_mask = TimeFunction(name='sp_source_mask', shape=(list(sparse_shape)),
dimensions=(x, y, sp_zi), dtype=np.int32)
# Now holds IDs
sp_source_mask.data[inds[0], inds[1], :] = tuple(inds[2][:len(np.unique(inds[2]))])
assert(np.count_nonzero(sp_source_mask.data) == len(nzinds[0]))
assert(len(sp_source_mask.dimensions) == 3)
t = model.grid.stepping_dim
zind = Scalar(name='zind', dtype=np.int32)
eq0 = Eq(sp_zi.symbolic_max, nnz_sp_source_mask[x, y], implicit_dims=(time, x, y))
eq1 = Eq(zind, sp_source_mask[x, y, sp_zi], implicit_dims=(time, x, y, sp_zi))