Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_create(tmp_path):
"""Test that gsd files can be created."""
with gsd.hoomd.open(name=tmp_path / "test_create.gsd", mode='wb') as hf:
assert hf.file.schema == 'hoomd'
assert hf.file.schema_version >= (1, 0)
def create_frame(i):
"""Helper function to create snapshot objects."""
snap = gsd.hoomd.Snapshot()
snap.configuration.step = i + 1
return snap
def test_gsd_gsd(self):
import gsd.hoomd
with gsd.hoomd.open(LJ_GSD, 'rb') as traj:
self.run_analyses(traj)
def test_slicing_and_iteration(tmp_path, open_mode):
"""Test that hoomd trajectories can be sliced."""
with gsd.hoomd.open(name=tmp_path / "test_slicing.gsd",
mode=open_mode.write) as hf:
hf.extend((create_frame(i) for i in range(20)))
with gsd.hoomd.open(name=tmp_path / "test_slicing.gsd",
mode=open_mode.read) as hf:
# Test len()-function on trajectory and sliced trajectory.
assert len(hf) == 20
assert len(hf[:10]) == 10
# Test len()-function with explicit iterator.
assert len(iter(hf)) == len(hf)
assert len(iter(hf[:10])) == len(hf[:10])
# Test iteration with implicit iterator.
# All iterations are run twice to check for issues
# with iterator exhaustion.
def write_frame(file, frame, position, orientation):
"""Write a frame to the file."""
snap = gsd.hoomd.Snapshot()
snap.particles.N = position.shape[0]
snap.configuration.step = frame * 10
position[0][0] = frame
orientation[0][0] = frame
snap.particles.position = position
snap.particles.orientation = orientation
file.append(snap)
def read_xml(name):
snap = gsd.hoomd.Snapshot()
# parse the XML file
dom = xml.dom.minidom.parse(open(name, 'rb'))
hoomd_xml = dom.getElementsByTagName('hoomd_xml')
if len(hoomd_xml) != 1:
raise RuntimeError("hoomd_xml tag not found in xml file")
else:
hoomd_xml = hoomd_xml[0]
configuration = hoomd_xml.getElementsByTagName('configuration')
if len(configuration) != 1:
raise RuntimeError("configuration tag not found in xml file")
else:
configuration = configuration[0]
# determine the number of dimensions
orientation = numpy.random.random((N, 4)).astype('float32')
nframes = compute_nframes(N, size)
actual_size = compute_actual_size(N, nframes)
nframes_read = int(nframes * bmark_read_size / actual_size)
bmark_read_size = compute_actual_size(N, nframes_read)
if nframes_read > nframes:
nframes_read = nframes
bmark_read_size = actual_size
# first, write the file and time how long it takes
print("Writing file: ", file=sys.stderr, flush=True)
# if the file size is small, write it once to warm up the disk
if size < 64 * 1024**3:
gsd.hoomd.open(mode='wb', name='test.gsd')
with gsd.hoomd.open(name='test.gsd', mode='wb') as hf:
write_file(hf, nframes, N, position, orientation)
# write it again and time this one
gsd.hoomd.open(mode='wb', name='test.gsd')
with gsd.hoomd.open(name='test.gsd', mode='wb') as hf:
start = time.time()
write_file(hf, nframes, N, position, orientation)
# ensure that all writes to disk are completed and drop file system cache
call(['sudo', '/bin/sync'])
call(['sudo', '/sbin/sysctl', 'vm.drop_caches=3'], stdout=PIPE)
end = time.time()
timings['write'] = actual_size / 1024**2 / (end - start)
bmark_read_size = compute_actual_size(N, nframes_read)
if nframes_read > nframes:
nframes_read = nframes
bmark_read_size = actual_size
# first, write the file and time how long it takes
print("Writing file: ", file=sys.stderr, flush=True)
# if the file size is small, write it once to warm up the disk
if size < 64 * 1024**3:
gsd.hoomd.open(mode='wb', name='test.gsd')
with gsd.hoomd.open(name='test.gsd', mode='wb') as hf:
write_file(hf, nframes, N, position, orientation)
# write it again and time this one
gsd.hoomd.open(mode='wb', name='test.gsd')
with gsd.hoomd.open(name='test.gsd', mode='wb') as hf:
start = time.time()
write_file(hf, nframes, N, position, orientation)
# ensure that all writes to disk are completed and drop file system cache
call(['sudo', '/bin/sync'])
call(['sudo', '/sbin/sysctl', 'vm.drop_caches=3'], stdout=PIPE)
end = time.time()
timings['write'] = actual_size / 1024**2 / (end - start)
# time how long it takes to open the file
print("Opening file... ", file=sys.stderr, flush=True, end='')
start = time.time()
with gsd.hoomd.open(name='test.gsd', mode='rb') as hf: