Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
snap0 = gsd.hoomd.Snapshot()
snap0.state['hpmc/sphere/radius'] = [2.0]
snap0.state['hpmc/sphere/orientable'] = [1]
snap1 = gsd.hoomd.Snapshot()
snap1.state['hpmc/convex_polyhedron/N'] = [3]
snap1.state['hpmc/convex_polyhedron/vertices'] = [[-1, -1, -1], [0, 1, 1],
[1, 0, 0]]
with gsd.hoomd.open(name=tmp_path / "test_state.gsd",
mode=open_mode.write) as hf:
hf.extend([snap0, snap1])
with gsd.hoomd.open(name=tmp_path / "test_state.gsd",
mode=open_mode.read) as hf:
assert len(hf) == 2
s = hf.read_frame(0)
numpy.testing.assert_array_equal(s.state['hpmc/sphere/radius'],
snap0.state['hpmc/sphere/radius'])
numpy.testing.assert_array_equal(s.state['hpmc/sphere/orientable'],
snap0.state['hpmc/sphere/orientable'])
s = hf.read_frame(1)
numpy.testing.assert_array_equal(
s.state['hpmc/convex_polyhedron/N'],
snap1.state['hpmc/convex_polyhedron/N'])
numpy.testing.assert_array_equal(
s.state['hpmc/convex_polyhedron/vertices'],
def test_view_slicing_and_iteration(tmp_path, open_mode):
"""Test that trajectories can be sliced."""
with gsd.hoomd.open(name=tmp_path / "test_slicing.gsd",
mode=open_mode.write) as hf:
hf.extend((create_frame(i) for i in range(40)))
with gsd.hoomd.open(name=tmp_path / "test_slicing.gsd",
mode=open_mode.read) as hf:
view = hf[::2]
# Test len()-function on trajectory and sliced view.
assert len(view) == 20
assert len(view[:10]) == 10
assert len(view[::2]) == 10
# Test len()-function with explicit iterator.
assert len(iter(view)) == len(view)
assert len(iter(view[:10])) == len(view[:10])
# Test iteration with implicit iterator.
# All iterations are run twice to check for issues
# with iterator exhaustion.
assert len(list(view)) == len(view)
def test_defaults(tmp_path, open_mode):
"""Test that the property defaults are properly set."""
snap = gsd.hoomd.Snapshot()
snap.particles.N = 2
snap.bonds.N = 3
snap.angles.N = 4
snap.dihedrals.N = 5
snap.impropers.N = 6
snap.constraints.N = 4
snap.pairs.N = 7
with gsd.hoomd.open(name=tmp_path / "test_defaults.gsd",
mode=open_mode.write) as hf:
hf.append(snap)
with gsd.hoomd.open(name=tmp_path / "test_defaults.gsd",
mode=open_mode.read) as hf:
s = hf.read_frame(0)
assert s.configuration.step == 0
assert s.configuration.dimensions == 3
numpy.testing.assert_array_equal(
s.configuration.box,
numpy.array([1, 1, 1, 0, 0, 0], dtype=numpy.float32))
assert s.particles.N == 2
assert s.particles.types == ['A']
assert s.particles.type_shapes == [{}]
numpy.testing.assert_array_equal(
def test_pickle(tmp_path, open_mode):
"""Test that hoomd trajectory objects can be pickled."""
with gsd.hoomd.open(name=tmp_path / "test_pickling.gsd",
mode=open_mode.write) as traj:
traj.extend((create_frame(i) for i in range(20)))
with pytest.raises(pickle.PickleError):
pkl = pickle.dumps(traj)
with gsd.hoomd.open(name=tmp_path / "test_pickling.gsd",
mode=open_mode.read) as traj:
pkl = pickle.dumps(traj)
with pickle.loads(pkl) as hf:
assert len(hf) == 20
def test_append(tmp_path, open_mode):
"""Test that gsd files can be appended to."""
snap = gsd.hoomd.Snapshot()
snap.particles.N = 10
with gsd.hoomd.open(name=tmp_path / "test_append.gsd",
mode=open_mode.write) as hf:
for i in range(5):
snap.configuration.step = i + 1
hf.append(snap)
with gsd.hoomd.open(name=tmp_path / "test_append.gsd",
mode=open_mode.read) as hf:
assert len(hf) == 5
def test_extend(tmp_path, open_mode):
"""Test that the extend method works."""
snap = gsd.hoomd.Snapshot()
snap.particles.N = 10
with gsd.hoomd.open(name=tmp_path / "test_extend.gsd",
mode=open_mode.write) as hf:
hf.extend((create_frame(i) for i in range(5)))
with gsd.hoomd.open(name=tmp_path / "test_extend.gsd",
mode=open_mode.read) as hf:
assert len(hf) == 5
snap0.log['particles/net_force'] = [[1, 2, 3], [4, 5, 6]]
snap0.log['particles/pair_lj_energy'] = [0, -5, -8, -3]
snap0.log['value/potential_energy'] = [10]
snap0.log['value/pressure'] = [-3]
snap1 = gsd.hoomd.Snapshot()
snap1.log['particles/pair_lj_energy'] = [1, 2, -4, -10]
snap1.log['value/pressure'] = [5]
with gsd.hoomd.open(name=tmp_path / "test_log.gsd",
mode=open_mode.write) as hf:
hf.extend([snap0, snap1])
with gsd.hoomd.open(name=tmp_path / "test_log.gsd",
mode=open_mode.read) as hf:
assert len(hf) == 2
s = hf.read_frame(0)
numpy.testing.assert_array_equal(s.log['particles/net_force'],
snap0.log['particles/net_force'])
numpy.testing.assert_array_equal(s.log['particles/pair_lj_energy'],
snap0.log['particles/pair_lj_energy'])
numpy.testing.assert_array_equal(s.log['value/potential_energy'],
snap0.log['value/potential_energy'])
numpy.testing.assert_array_equal(s.log['value/pressure'],
snap0.log['value/pressure'])
s = hf.read_frame(1)
# unspecified entries pull from frame 0
def main_read(args):
"""Main function to launch a Python interpreter with an open GSD file."""
# Default to a new line for well-formatted printing.
local_ns = {
'gsd': sys.modules['gsd'],
'gsd.hoomd': sys.modules['gsd.hoomd'],
'gsd.fl': sys.modules['gsd.fl'],
}
attributes = {}
if args.schema == 'hoomd':
traj = hoomd_open(args.file, mode=args.mode)
handle = traj.file
local_ns.update({
'handle': handle,
'traj': traj,
})
attributes.update({"Number of frames": len(traj)})
else:
if args.mode not in ['rb', 'rb+', 'ab']:
raise ValueError("Unsupported schema for creating a file.")
handle = fl.open(args.file, args.mode)
local_ns.update({
'handle': handle,
})
extras = "\n".join(
"{}: {}".format(key, val) for key, val in attributes.items())
with gsd.hoomd.open(name='test.gsd', mode='wb') as hf:
start = time.time()
write_file(hf, nframes, N, position, orientation)
# ensure that all writes to disk are completed and drop file system cache
call(['sudo', '/bin/sync'])
call(['sudo', '/sbin/sysctl', 'vm.drop_caches=3'], stdout=PIPE)
end = time.time()
timings['write'] = actual_size / 1024**2 / (end - start)
# time how long it takes to open the file
print("Opening file... ", file=sys.stderr, flush=True, end='')
start = time.time()
with gsd.hoomd.open(name='test.gsd', mode='rb') as hf:
end = time.time()
print(end - start, "s", file=sys.stderr, flush=True)
timings['open_time'] = (end - start)
# Read the file sequentially and measure the time taken
print("Sequential read file:", file=sys.stderr, flush=True)
start = time.time()
read_sequential_file(hf, nframes, nframes_read, N, position,
orientation)
end = time.time()
timings['seq_read'] = bmark_read_size / 1024**2 / (end - start)
# drop the file system cache