Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""
# set estimation parameters
self._lag = lag
self._nstates = nstates
# if no initial MSM is given, estimate it now
if msm_init is None:
msm_estimator = _MSMEstimator(self._dtrajs, reversible=self._reversible, sparse=self._msm_sparse,
connectivity=self._msm_connectivity, dt=self._dt, **self._kwargs)
msm_init = msm_estimator.estimate(lag=lag)
else:
assert isinstance(msm_init, _EstimatedMSM), 'msm_init must be of type EstimatedMSM'
# check input
assert _types.is_int(nstates) and nstates > 1 and nstates <= msm_init.nstates, \
'nstates must be an int in [2,msmobj.nstates]'
timescale_ratios = msm_init.timescales()[:-1] / msm_init.timescales()[1:]
if timescale_ratios[nstates-2] < 2.0:
self._logger.warn('Requested coarse-grained model with ' + str(nstates) + ' metastable states. ' +
'The ratio of relaxation timescales between ' + str(nstates) + ' and ' + str(nstates+1) +
' states is only ' + str(timescale_ratios[nstates-2]) + ' while we recomment at ' +
'least 2. It is possible that the resulting HMM is inaccurate. Handle with caution.')
# set things from MSM
self._nstates_obs_full = msm_init.nstates_full
if self._observe_active:
nstates_obs = msm_init.nstates
observable_set = msm_init.active_set
dtrajs_obs = msm_init.discrete_trajectories_active
else:
nstates_obs = msm_init.nstates_full
def _column(arr, indexes):
""" Returns a column with given indexes from a deep array
For example, if the array is a matrix and indexes is a single int, will
return arr[:,indexes]. If the array is an order 3 tensor and indexes is a
pair of ints, will return arr[:,indexes[0],indexes[1]], etc.
"""
if arr.ndim == 2 and types.is_int(indexes):
return arr[:, indexes]
elif arr.ndim == 3 and len(indexes) == 2:
return arr[:, indexes[0], indexes[1]]
else:
raise NotImplementedError('Only supporting arrays of dimension 2 and 3 as yet.')
def fixed_seed(self, val):
from pyemma.util import types
if isinstance(val, bool) or val is None:
if val:
self._fixed_seed = 42
else:
self._fixed_seed = random.randint(0, 2**32-1)
elif types.is_int(val):
if val < 0 or val > 2**32-1:
self.logger.warning("seed has to be positive (or smaller than 2**32-1)."
" Seed will be chosen randomly.")
self.fixed_seed = False
else:
self._fixed_seed = val
else:
raise ValueError("fixed seed has to be bool or integer")
self.logger.debug("seed = %i", self._fixed_seed)
def listify_if_int(inp):
if _is_int(inp):
inp = [inp]
return inp
Parameters
----------
p0 : ndarray(n,)
Initial distribution. Vector of size of the active set.
k : int
Number of time steps
Returns
----------
pk : ndarray(n,)
Distribution after k steps. Vector of size of the active set.
"""
p0 = _types.ensure_ndarray(p0, ndim=1, size=self.nstates, kind='numeric')
assert _types.is_int(k) and k >= 0, 'k must be a non-negative integer'
if k == 0: # simply return p0 normalized
return p0 / p0.sum()
if self.is_sparse: # sparse: we don't have a full eigenvalue set, so just propagate
pk = _np.array(p0)
for i in range(k):
pk = _np.dot(pk.T, self.transition_matrix)
else: # dense: employ eigenvalue decomposition
self._ensure_eigendecomposition(self.nstates)
from pyemma.util.linalg import mdot
pk = mdot(p0.T,
self.eigenvectors_right(),
_np.diag(_np.power(self.eigenvalues(), k)),
self.eigenvectors_left()).real
# normalize to 1.0 and return
Returns
-------
ngl_wdg : Input ngl_wdg with the representations added
"""
if color_list in [None, [None]]:
color_list = ['blue']*len(atom_idxs)
elif isinstance(color_list, list) and len(color_list)0 and len(iidxs)==2:
ngl_wdg.add_distance(atom_pair=[[ii for ii in iidxs]], # yes it has to be this way for now
color=color,
#label_color='black',
label_size=0,
component=cc)
# TODO add line thickness as **kwarg
elif _np.ndim(iidxs) > 0 and len(iidxs) in [3,4]:
ngl_wdg.add_spacefill(selection=iidxs, radius=radius, color=color, component=cc)
else:
print("Cannot represent features involving more than 5 atoms per single feature")
return ngl_wdg
# set model and estimator
# copy the test model, since the estimation of cktest modifies the model.
from copy import deepcopy
self.test_model = deepcopy(test_model)
self.test_estimator = test_estimator
# set mlags
try:
maxlength = np.max([len(dtraj) for dtraj in test_estimator.discrete_trajectories_full])
except AttributeError:
maxlength = np.max(test_estimator.trajectory_lengths())
maxmlag = int(math.floor(maxlength / test_estimator.lag))
if mlags is None:
mlags = maxmlag
if types.is_int(mlags):
mlags = np.arange(mlags)
mlags = types.ensure_ndarray(mlags, ndim=1, kind='i')
if np.any(mlags > maxmlag):
mlags = mlags[np.where(mlags <= maxmlag)]
self.logger.warning('Changed mlags as some mlags exceeded maximum trajectory length.')
if np.any(mlags < 0):
mlags = mlags[np.where(mlags >= 0)]
self.logger.warning('Changed mlags as some mlags were negative.')
self.mlags = mlags
# set conf and error handling
self.conf = conf
self.has_errors = issubclass(self.test_model.__class__, SampledModel)
if self.has_errors:
self.test_model.set_model_params(conf=conf)
self.err_est = err_est