Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def save_netcdf(self, filename, force_overwrite=True):
"""Save trajectory in AMBER NetCDF format
Parameters
----------
filename : str
filesystem path in which to save the trajectory
force_overwrite : bool, default=True
Overwrite anything that exists at filename, if it's already there
"""
self._check_valid_unitcell()
with NetCDFTrajectoryFile(filename, 'w', force_overwrite=force_overwrite) as f:
f.write(coordinates=in_units_of(self._xyz, Trajectory._distance_unit, NetCDFTrajectoryFile.distance_unit),
time=self.time,
cell_lengths=in_units_of(self.unitcell_lengths, Trajectory._distance_unit, f.distance_unit),
cell_angles=self.unitcell_angles)
# Assume that its a rectilinear box
cell_angles = 90.0 * np.ones_like(cell_lengths)
elif isinstance(f, (LAMMPSTrajectoryFile, DCDTrajectoryFile)):
cell_lengths, cell_angles = res[1:]
elif len(res) == 4 or isinstance(f, (HDF5TrajectoryFile, DTRTrajectoryFile, NetCDFTrajectoryFile)):
cell_lengths, cell_angles = res[2:4]
elif len(res) == 3:
# this tng format.
box = res[2]
else:
assert len(res) == 1, "len:{l}, type={t}".format(l=len(res), t=f)
#raise NotImplementedError("format read function not handled..." + str(f))
in_units_of(box, f.distance_unit, Trajectory._distance_unit, inplace=True)
if cell_lengths is not None:
in_units_of(cell_lengths, f.distance_unit, Trajectory._distance_unit, inplace=True)
return TrajData(xyz, cell_lengths, cell_angles, box)
if 'xyz' in out_fields and 'xyz' in data:
data['xyz'] = in_units_of(data['xyz'], in_units, out_units, inplace=True)
if 'box' in out_fields:
if 'box' in data:
data['box'] = in_units_of(data['box'], in_units, out_units, inplace=True)
elif 'cell_angles' in data and 'cell_lengths' in data:
a, b, c = data['cell_lengths'].T
alpha, beta, gamma = data['cell_angles'].T
data['box'] = np.dstack(md.utils.unitcell.lengths_and_angles_to_box_vectors(a, b, c, alpha, beta, gamma))
data['box'] = in_units_of(data['box'], in_units, out_units, inplace=True)
del data['cell_lengths']
del data['cell_angles']
if 'cell_lengths' in out_fields:
if 'cell_lengths' in data:
data['cell_lengths'] = in_units_of(data['cell_lengths'], in_units, out_units, inplace=True)
elif 'box' in data:
a, b, c, alpha, beta, gamma = md.utils.unitcell.box_vectors_to_lengths_and_angles(data['box'][:, 0], data['box'][:, 1], data['box'][:, 2])
data['cell_lengths'] = np.vstack((a, b, c)).T
data['cell_angles'] = np.vstack((alpha, beta, gamma)).T
data['cell_lengths'] = in_units_of(data['cell_lengths'], in_units, out_units, inplace=True)
del data['box']
ignored_keys = ["'%s'" % s for s in set(data) - set(out_fields)]
formated_fields = ', '.join("'%s'" % o for o in out_fields)
if len(ignored_keys) > 0:
warn('%s data from input file(s) will be discarded. '
'output format only supports fields: %s' % (', '.join(ignored_keys),
formated_fields))
warn.active = False
return data
The angles (\alpha, \beta, \gamma) defining the unit cell for
each frame.
Notes
-----
If the input arrays are of dimension deficient by one, for example
if the coordinates array is two dimensional, the time is a single
scalar or cell_lengths and cell_angles are a 1d array of length three,
that is okay. You'll simply be saving a single frame.
"""
self._validate_open()
if self._mode not in ['w', 'ws', 'a', 'as']:
raise IOError('The file was opened in mode=%s. Writing is not allowed.' % self._mode)
coordinates = in_units_of(coordinates, 'angstroms')
time = in_units_of(time, 'picoseconds')
cell_lengths = in_units_of(cell_lengths, 'angstroms')
cell_angles = in_units_of(cell_angles, 'degrees')
# typecheck all of the input arguments rigorously
coordinates = ensure_type(coordinates, np.float32, 3, 'coordinates', length=None,
can_be_none=False, shape=(None, None, 3), warn_on_cast=False, add_newaxis_on_deficient_ndim=True)
n_frames, n_atoms = coordinates.shape[0], coordinates.shape[1]
time = ensure_type(time, np.float32, 1, 'time', length=n_frames,
can_be_none=True, warn_on_cast=False, add_newaxis_on_deficient_ndim=True)
cell_lengths = ensure_type(cell_lengths, np.float64, 2, 'cell_lengths', length=n_frames,
can_be_none=True, shape=(n_frames, 3), warn_on_cast=False, add_newaxis_on_deficient_ndim=True)
cell_angles = ensure_type(cell_angles, np.float64, 2, 'cell_angles', length=n_frames,
can_be_none=True, shape=(n_frames, 3), warn_on_cast=False, add_newaxis_on_deficient_ndim=True)
# are we dealing with a periodic system?
for cluster in nbr_list:
cluster_water_coords = water_coordinates[cluster]
if len(cluster) > cutoff:
near_flag = 0
waters_offset = [(water_id_frame_list[wat][0] + self.start_frame,
((water_id_frame_list[wat][1] - start_point) * self.water_sites)
+ self.wat_oxygen_atom_ids[0]) for wat in cluster]
com = np.zeros(3)
masses = np.ones(cluster_water_coords.shape[0])
masses /= masses.sum()
com[:] = water_coordinates[cluster].T.dot(masses)
cluster_center = com[:]
# Raise flag if the current cluster center is within 1.2 A of existing cluster center
for other, coord in enumerate(final_cluster_coords[:-1]):
dist = np.linalg.norm(md.utils.in_units_of(cluster_center, "nanometers", "angstroms") - coord)
if dist < 1.20:
near_flag += 1
# Only add cluster center if it is at a safe distance from others
if near_flag == 0:
final_cluster_coords.append(md.utils.in_units_of(cluster_center, "nanometers", "angstroms"))
site_waters.append(waters_offset)
cluster_index += 1
# otherwise store data for each user defined cluster
else:
# For each cluster, set cluster center equal to geometric center of all waters in the cluster
final_cluster_coords = md.utils.in_units_of(init_cluster_coords, "nanometers", "angstroms")
site_waters = []
cluster_index = 1
for cluster in nbr_list:
waters_offset = [(water_id_frame_list[wat][0] + self.start_frame,
((water_id_frame_list[wat][1] - start_point) * self.water_sites)
@function_timer
def generate_clusters(self, ligand_file):
# Obtain binding site solute atoms using ligand atom coordinates
ligand = md.load_pdb(ligand_file)
ligand_coords = md.utils.in_units_of(ligand.xyz[0, :, :], "nanometers", "angstroms", inplace=True)
first_frame = md.load_frame(self.trajectory, 0, top=self.topology_file)
solute_pos = md.utils.in_units_of(first_frame.xyz[0, self.non_water_atom_ids, :], "nanometers", "angstroms")
search_space = NeighborSearch(solute_pos, 5.0)
near_indices = search_space.query_nbrs_multiple_points(ligand_coords)
binding_site_atom_indices = [self.non_water_atom_ids[nbr_index] for nbr_index in near_indices]
# Obtain water molecules solvating the binding site
stride = 10
print "Reading in trajectory for clustering."
trj = md.load(self.trajectory, top=self.topology)
trj_short = trj[self.start_frame:self.start_frame + trj.n_frames:stride]
print "Obtaining a superconfiguration of all water molecules found in the binding site throught the trajectory."
binding_site_waters = md.compute_neighbors(trj_short, 0.50, binding_site_atom_indices, haystack_indices=self.wat_oxygen_atom_ids)
# generate a list of all waters with their frame ids
water_id_frame_list = [(i, nbr) for i in range(len(binding_site_waters)) for nbr in binding_site_waters[i]]
# Set up clustering loop
print "Performing clustering on the superconfiguration."
cutoff = trj_short.n_frames * 2 * 0.1401
if np.ceil(cutoff) - cutoff <= 0.5:
_check_mode(self.mode, ('w', 'a'))
# these must be either both present or both absent. since
# we're going to throw an error if one is present w/o the other,
# lets do it now.
if cell_lengths is None and cell_angles is not None:
raise ValueError('cell_lengths were given, but no cell_angles')
if cell_lengths is not None and cell_angles is None:
raise ValueError('cell_angles were given, but no cell_lengths')
# if the input arrays are simtk.unit.Quantities, convert them
# into md units. Note that this acts as a no-op if the user doesn't
# have simtk.unit installed (e.g. they didn't install OpenMM)
coordinates = in_units_of(coordinates, None, 'nanometers')
time = in_units_of(time, None, 'picoseconds')
cell_lengths = in_units_of(cell_lengths, None, 'nanometers')
cell_angles = in_units_of(cell_angles, None, 'degrees')
velocities = in_units_of(velocities, None, 'nanometers/picosecond')
kineticEnergy = in_units_of(kineticEnergy, None, 'kilojoules_per_mole')
potentialEnergy = in_units_of(potentialEnergy, None, 'kilojoules_per_mole')
temperature = in_units_of(temperature, None, 'kelvin')
alchemicalLambda = in_units_of(alchemicalLambda, None, 'dimensionless')
# do typechecking and shapechecking on the arrays
# this ensure_type method has a lot of options, but basically it lets
# us validate most aspects of the array. Also, we can upconvert
# on defficent ndim, which means that if the user sends in a single
# frame of data (i.e. coordinates is shape=(n_atoms, 3)), we can
# realize that. obviously the default mode is that they want to
# write multiple frames at a time, so the coordinate shape is
# (n_frames, n_atoms, 3)
coordinates = ensure_type(coordinates, dtype=np.float32, ndim=3,
force_overwrite : bool, default=True
Overwrite anything that exists at filename, if it's already there
Notes
-----
Amber restart files can only store a single frame. If only one frame
exists, "filename" will be written. Otherwise, "filename.#" will be
written, where # is a zero-padded number from 1 to the total number of
frames in the trajectory
"""
self._check_valid_unitcell()
if self.n_frames == 1:
with AmberRestartFile(filename, 'w', force_overwrite=force_overwrite) as f:
coordinates = in_units_of(self._xyz, Trajectory._distance_unit,
AmberRestartFile.distance_unit)
lengths = in_units_of(self.unitcell_lengths, Trajectory._distance_unit,
AmberRestartFile.distance_unit)
f.write(coordinates=coordinates, time=self.time[0],
cell_lengths=lengths, cell_angles=self.unitcell_angles)
else:
fmt = '%s.%%0%dd' % (filename, len(str(self.n_frames)))
for i in xrange(self.n_frames):
with AmberRestartFile(fmt % (i+1), 'w', force_overwrite=force_overwrite) as f:
coordinates = in_units_of(self._xyz, Trajectory._distance_unit,
AmberRestartFile.distance_unit)
lengths = in_units_of(self.unitcell_lengths, Trajectory._distance_unit,
AmberRestartFile.distance_unit)
f.write(coordinates=coordinates[i], time=self.time[0],
cell_lengths=lengths[i], cell_angles=self.unitcell_angles[i])
def save_hdf5(self, filename, force_overwrite=True):
"""Save trajectory to MDTraj HDF5 format
Parameters
----------
filename : str
filesystem path in which to save the trajectory
force_overwrite : bool, default=True
Overwrite anything that exists at filename, if its already there
"""
with HDF5TrajectoryFile(filename, 'w', force_overwrite=force_overwrite) as f:
f.write(coordinates=in_units_of(self.xyz, Trajectory._distance_unit, f.distance_unit),
time=self.time,
cell_lengths=in_units_of(self.unitcell_lengths, Trajectory._distance_unit, f.distance_unit),
cell_angles=self.unitcell_angles)
f.topology = self.topology
Parameters
----------
filename : str
filesystem path in which to save the trajectory
force_overwrite : bool, default=True
Overwrite anything that exists at filename, if its already there
"""
self._check_valid_unitcell()
if self._have_unitcell:
if not np.all(self.unitcell_angles == 90):
raise ValueError('Only rectilinear boxes can be saved to mdcrd files. '
'Your angles are {}'.format(self.unitcell_angles))
with MDCRDTrajectoryFile(filename, mode='w', force_overwrite=force_overwrite) as f:
f.write(xyz=in_units_of(self.xyz, Trajectory._distance_unit, f.distance_unit),
cell_lengths=in_units_of(self.unitcell_lengths, Trajectory._distance_unit, f.distance_unit))