Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""
d0 = Dimension(name='d')
d1 = Dimension(name='d')
assert d0 is d1
s0 = Scalar(name='s0')
s1 = Scalar(name='s1')
d2 = Dimension(name='d', spacing=s0)
d3 = Dimension(name='d', spacing=s1)
assert d2 is not d3
d4 = Dimension(name='d', spacing=s1)
assert d3 is d4
d5 = Dimension(name='d', spacing=Constant(name='s1'))
assert d2 is not d5
def test_override_composite_data(self):
grid = Grid(shape=(10, 10))
original_coords = (1., 1.)
new_coords = (2., 2.)
p_dim = Dimension(name='p_src')
u = TimeFunction(name='u', grid=grid, time_order=2, space_order=2)
time = u.indices[0]
src1 = SparseTimeFunction(name='src1', grid=grid, dimensions=[time, p_dim],
npoint=1, nt=10, coordinates=original_coords)
src2 = SparseTimeFunction(name='src1', grid=grid, dimensions=[time, p_dim],
npoint=1, nt=10, coordinates=new_coords)
op = Operator(src1.inject(u, src1))
# Move the source from the location where the setup put it so we can test
# whether the override picks up the original coordinates or the changed ones
args = op.arguments(src1=src2, t=0)
arg_name = src1.name + "_coords"
assert(np.array_equal(args[arg_name], np.asarray((new_coords,))))
def test_indexed_w_indirections(self):
"""Test point-wise arithmetic with indirectly indexed Functions."""
grid = Grid(shape=(10, 10))
x, y = grid.dimensions
p_poke = Dimension('p_src')
d = Dimension('d')
npoke = 1
u = Function(name='u', grid=grid, space_order=0)
coordinates = Function(name='coordinates', dimensions=(p_poke, d),
shape=(npoke, grid.dim), space_order=0, dtype=np.int32)
coordinates.data[0, 0] = 4
coordinates.data[0, 1] = 3
poke_eq = Eq(u[coordinates[p_poke, 0], coordinates[p_poke, 1]], 1.0)
op = Operator(poke_eq)
op.apply()
ix, iy = np.where(u.data == 1.)
assert len(ix) == len(iy) == 1
assert ix[0] == 4 and iy[0] == 3
(['Eq(v.forward, v[t+1, x, y-1]+v[t, x, y]+v[t, x, y-1])'],
[], ['x']),
(['Eq(v.forward, v+1)', 'Inc(u, v)'],
[], ['x', 'y'])
])
def test_iteration_parallelism_2d(self, exprs, atomic, parallel):
"""Tests detection of PARALLEL_* properties."""
grid = Grid(shape=(10, 10))
time = grid.time_dim # noqa
t = grid.stepping_dim # noqa
x, y = grid.dimensions # noqa
p = Dimension(name='p')
d = Dimension(name='d')
rx = Dimension(name='rx')
ry = Dimension(name='ry')
u = Function(name='u', grid=grid) # noqa
v = TimeFunction(name='v', grid=grid, save=None) # noqa
cx = Function(name='coeff_x', dimensions=(p, rx), shape=(1, 2)) # noqa
cy = Function(name='coeff_y', dimensions=(p, ry), shape=(1, 2)) # noqa
gp = Function(name='gridpoints', dimensions=(p, d), shape=(1, 2)) # noqa
src = Function(name='src', dimensions=(p,), shape=(1,)) # noqa
rcv = Function(name='rcv', dimensions=(time, p), shape=(100, 1), space_order=0) # noqa
# List comprehension would need explicit locals/globals mappings to eval
for i, e in enumerate(list(exprs)):
exprs[i] = eval(e)
op = Operator(exprs, dle='openmp')
def test_argument_from_index_constant(self):
nx, ny = 30, 30
grid = Grid(shape=(nx, ny))
x, y = grid.dimensions
arbdim = Dimension('arb')
u = TimeFunction(name='u', grid=grid, save=None, time_order=2, space_order=0)
snap = Function(name='snap', dimensions=(arbdim, x, y), shape=(5, nx, ny),
space_order=0)
save_t = Constant(name='save_t', dtype=np.int32)
save_slot = Constant(name='save_slot', dtype=np.int32)
expr = Eq(snap.subs(arbdim, save_slot), u.subs(grid.stepping_dim, save_t))
op = Operator(expr)
u.data[:] = 0.0
snap.data[:] = 0.0
u.data[0, 10, 10] = 1.0
op.apply(save_t=0, save_slot=1)
assert snap.data[1, 10, 10] == 1.0
def test_indexed_w_indirections(self):
"""Test point-wise arithmetic with indirectly indexed Functions."""
grid = Grid(shape=(10, 10))
x, y = grid.dimensions
p_poke = Dimension('p_src')
d = Dimension('d')
npoke = 1
u = Function(name='u', grid=grid, space_order=0)
coordinates = Function(name='coordinates', dimensions=(p_poke, d),
shape=(npoke, grid.dim), space_order=0, dtype=np.int32)
coordinates.data[0, 0] = 4
coordinates.data[0, 1] = 3
poke_eq = Eq(u[coordinates[p_poke, 0], coordinates[p_poke, 1]], 1.0)
op = Operator(poke_eq)
op.apply()
ix, iy = np.where(u.data == 1.)
assert len(ix) == len(iy) == 1
# sp_source_id.data[inds[0],inds[1],:] = inds[2][:maxz]
id_dim = Dimension(name='id_dim')
save_src = TimeFunction(name='save_src', shape=(src.shape[0],
nzinds[1].shape[0]), dimensions=(src.dimensions[0], id_dim))
save_src_term = src.inject(field=save_src[src.dimensions[0], source_id], expr=src * dt**2 / model.m)
op1 = Operator([save_src_term])
op1.apply(time=time_range.num-1)
usol = TimeFunction(name="usol", grid=model.grid, space_order=so, time_order=2)
sp_zi = Dimension(name='sp_zi')
# import pdb; pdb.set_trace()
sp_source_mask = Function(name='sp_source_mask', shape=(list(sparse_shape)), dimensions=(x, y, sp_zi), space_order=0, dtype=np.int32)
# Now holds IDs
sp_source_mask.data[inds[0], inds[1], :] = tuple(inds[2][:len(np.unique(inds[2]))])
assert(np.count_nonzero(sp_source_mask.data) == len(nzinds[0]))
assert(len(sp_source_mask.dimensions) == 3)
t = model.grid.stepping_dim
zind = Scalar(name='zind', dtype=np.int32)
eq0 = Eq(sp_zi.symbolic_max, nnz_sp_source_mask[x, y] - 1, implicit_dims=(time, x, y))
def test_implicit_dims(self):
"""
Test ConditionalDimension as an implicit dimension for an equation.
"""
# This test makes an Operator that should create a vector of increasing
# integers, but stop incrementing when a certain stop value is reached
shape = (50,)
stop_value = 20
time = Dimension(name='time')
f = TimeFunction(name='f', shape=shape, dimensions=[time])
# The condition to stop incrementing
cond = ConditionalDimension(name='cond',
parent=time, condition=f[time] < stop_value)
eqs = [Eq(f.forward, f), Eq(f.forward, f.forward + 1, implicit_dims=[cond])]
op = Operator(eqs)
op.apply(time_M=shape[0] - 2)
# Make the same calculation in python to assert the result
F = np.zeros(shape[0])
for i in range(shape[0]):
F[i] = i if i < stop_value else stop_value
assert np.all(f.data == F)
def test_iteration_parallelism_2d(self, exprs, atomic, parallel):
"""Tests detection of PARALLEL_* properties."""
grid = Grid(shape=(10, 10))
time = grid.time_dim # noqa
t = grid.stepping_dim # noqa
x, y = grid.dimensions # noqa
p = Dimension(name='p')
d = Dimension(name='d')
rx = Dimension(name='rx')
ry = Dimension(name='ry')
u = Function(name='u', grid=grid) # noqa
v = TimeFunction(name='v', grid=grid, save=None) # noqa
cx = Function(name='coeff_x', dimensions=(p, rx), shape=(1, 2)) # noqa
cy = Function(name='coeff_y', dimensions=(p, ry), shape=(1, 2)) # noqa
gp = Function(name='gridpoints', dimensions=(p, d), shape=(1, 2)) # noqa
src = Function(name='src', dimensions=(p,), shape=(1,)) # noqa
rcv = Function(name='rcv', dimensions=(time, p), shape=(100, 1), space_order=0) # noqa
# List comprehension would need explicit locals/globals mappings to eval
for i, e in enumerate(list(exprs)):
exprs[i] = eval(e)
def test_partial_offloading(self):
"""
Check that Function objects not using any SpaceDimension
are computed in Devito-land, rather than via YASK.
"""
shape = (4, 4, 4)
grid = Grid(shape=shape)
dx = Dimension(name='dx')
dy = Dimension(name='dy')
dz = Dimension(name='dz')
x, y, z = grid.dimensions
u = TimeFunction(name='yu4D', grid=grid, space_order=0)
f = Function(name='f', dimensions=(dx, dy, dz), shape=shape)
u.data_with_halo[:] = 0.
f.data[:] = 0.
eqns = [Eq(u.forward, u + 1.),
Eq(f, u[1, dx, dy, dz] + 1.)]
op = Operator(eqns)
op(time=0)
assert np.all(u.data[0] == 0.)
assert np.all(u.data[1] == 1.)
assert np.all(f.data == 2.)