Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from pyemma._base.serialization.serialization import SerializableMixIn, Modifications
from pyemma._base.estimator import Estimator, estimate_param_scan, param_grid
from pyemma._base.model import SampledModel
from pyemma._base.progress import ProgressReporterMixin
from pyemma.util.statistics import confidence_interval
from pyemma.util import types
__author__ = 'noe'
def _serial_fix_lagged_model_validatior_version_1(state):
state['has_errors'] = issubclass(state['test_model'].__class__, SampledModel)
return state
class LaggedModelValidator(Estimator, ProgressReporterMixin, SerializableMixIn, NJobsMixIn):
r""" Validates a model estimated at lag time tau by testing its predictions
for longer lag times
Parameters
----------
test_model : Model
Model to be tested
test_estimator : Estimator
Parametrized Estimator that has produced the model
mlags : int or int-array, default=10
multiples of lag times for testing the Model, e.g. range(10).
A single int will trigger a range, i.e. mlags=10 maps to
mlags=range(10). The setting None will choose mlags automatically
according to the longest available trajectory
import numpy as np
from pyemma._base.serialization.serialization import SerializableMixIn
from pyemma._ext.variational.solvers.direct import eig_corr
from pyemma._ext.variational.util import ZeroRankError
from pyemma.coordinates.estimation.covariance import LaggedCovariance
from pyemma.coordinates.transform._tica_base import TICABase, TICAModelBase
from pyemma.util.annotators import fix_docs
import warnings
__all__ = ['TICA']
@fix_docs
class TICA(TICABase, SerializableMixIn):
r""" Time-lagged independent component analysis (TICA)"""
__serialize_version = 0
def __init__(self, lag, dim=-1, var_cutoff=0.95, kinetic_map=True, commute_map=False, epsilon=1e-6,
stride=1, skip=0, reversible=True, weights=None, ncov_max=float('inf')):
r""" Time-lagged independent component analysis (TICA) [1]_, [2]_, [3]_.
Parameters
----------
lag : int
lag time
dim : int, optional, default -1
Maximum number of significant independent components to use to reduce dimension of input data. -1 means
all numerically available dimensions (see epsilon) will be used unless reduced by var_cutoff.
Setting dim to a positive value is exclusive with var_cutoff.
var_cutoff : float in the range [0,1], optional, default 0.95
__all__ = ['PCA']
__author__ = 'noe'
@decorator
def _lazy_estimation(func, *args, **kw):
assert isinstance(args[0], PCA)
obj = args[0]
if not obj._estimated:
obj._diagonalize()
return func(*args, **kw)
class PCAModel(Model, SerializableMixIn):
__serialize_version = 0
def set_model_params(self, mean, eigenvectors):
self.mean = mean
self.eigenvectors = eigenvectors
@fix_docs
class PCA(StreamingEstimationTransformer, SerializableMixIn):
r""" Principal component analysis."""
__serialize_version = 0
def __init__(self, dim=-1, var_cutoff=0.95, mean=None, stride=1, skip=0):
r""" Principal component analysis.
Given a sequence of multivariate data :math:`X_t`,
if score_method == 'VAMP1':
res = np.linalg.norm(ABC, ord='nuc')
elif score_method == 'VAMP2':
res = np.linalg.norm(ABC, ord='fro')**2
elif score_method == 'VAMPE':
Sk = np.diag(self.singular_values[0:self.dimension()])
res = np.trace(2.0 * mdot(Vk, Sk, Uk.T, test_model.C0t) - mdot(Vk, Sk, Uk.T, test_model.C00, Uk, Sk, Vk.T, test_model.Ctt))
else:
raise ValueError('"score" should be one of VAMP1, VAMP2 or VAMPE')
# add the contribution (+1) of the constant singular functions to the result
assert res
return res + 1
@fix_docs
class VAMP(StreamingEstimationTransformer, SerializableMixIn):
r"""Variational approach for Markov processes (VAMP)"""
__serialize_version = 0
__serialize_fields = []
def describe(self):
return "[VAMP, lag = %i; max. output dim. = %s]" % (self._lag, str(self.dim))
def __init__(self, lag, dim=None, scaling=None, right=False, epsilon=1e-6,
stride=1, skip=0, ncov_max=float('inf')):
r""" Variational approach for Markov processes (VAMP) [1]_.
Parameters
----------
lag : int
lag time
def add_serializable(self, name, obj, overwrite=False, save_streaming_chain=False):
# create new group with given name and serialize the object in it.
from pyemma._base.serialization.serialization import SerializableMixIn
assert isinstance(obj, SerializableMixIn)
# save data producer chain?
old_flag = getattr(obj, '_save_data_producer', None)
if old_flag is not None:
obj._save_data_producer = save_streaming_chain
assert obj._save_data_producer == save_streaming_chain
try:
self._set_group(name, overwrite)
# store attributes
self._save_attributes(obj)
# additionally we store, whether the pipeline has been saved.
self.save_streaming_chain = save_streaming_chain
# now encode the object (this will write all numpy arrays to current group).
self._pickle_and_attach_object(obj)
def _set_state_from_serializeable_fields_and_state(self, state, klass):
""" set only fields from state, which are present in klass.__serialize_fields """
if _debug:
logger.debug("restoring state for class %s", klass)
# handle field renames, deletion, transformations etc.
SerializableMixIn.__interpolate(state, klass)
for field in SerializableMixIn._get_serialize_fields(klass):
if field in state:
# ensure we can set attributes. Log culprits.
try:
setattr(self, field, state.get(field))
except AttributeError:
logger.debug('field: %s', field, exc_info=True)
else:
if _debug:
logger.debug("skipped %s, because it is not contained in state", field)
def _get_version(cls, require=True):
name = cls.__name__
if name.startswith('_'):
name = name[1:]
attr = '_%s__serialize_version' % name
version = getattr(cls, attr, None)
if require:
if issubclass(cls, SerializableMixIn):
if version is None:
raise ClassVersionException('{} does not have the private field __serialize_version'.format(cls))
if not isinstance(version, int):
raise ClassVersionException('{} does not have an integer __serialize_version'.format(cls))
# check for int
return version
import numpy as np
from pyemma._base.serialization.serialization import SerializableMixIn
from pyemma._base.model import Model
from pyemma._base.parallel import NJobsMixIn
from pyemma._ext.sklearn.base import ClusterMixin
from pyemma.coordinates.data._base.transformer import StreamingEstimationTransformer
from pyemma.util.annotators import fix_docs, aliased, alias
from pyemma.util.discrete_trajectories import index_states, sample_indexes_by_state
from pyemma.util.files import mkdir_p
@fix_docs
@aliased
class AbstractClustering(StreamingEstimationTransformer, Model, ClusterMixin, NJobsMixIn, SerializableMixIn):
"""
provides a common interface for cluster algorithms.
Parameters
----------
metric: str, default='euclidean'
metric to pass to c extension
n_jobs: int or None, default=None
How much threads to use during assignment
If None, all available CPUs will be used.
"""
def __init__(self, metric='euclidean', n_jobs=None):
super(AbstractClustering, self).__init__()
import numpy as np
from pyemma._base.serialization.serialization import SerializableMixIn
from pyemma.coordinates.data._base.datasource import DataSourceIterator, DataSource
__author__ = 'marscher'
class SourcesMerger(DataSource, SerializableMixIn):
__serialize_version = 0
""" Combines multiple data sources to stream from.
Note that you are responsible you only join matching (meaningful) data sets. If one trajectory is for instance
shorter than the another, the longer one will be truncated during iteration.
Parameters
----------
sources : list, tuple
list of DataSources (Readers, StreamingTransformers etc.) to combine for streaming access.
chunk: int or None
chunk size to use for underlying iterators.
"""
def __init__(self, sources, chunk=None):
super(SourcesMerger, self).__init__(chunksize=chunk)
Derive from SerializableMixIn to make your class serializable. If you need to patch an old version of your class,
you need the Modifications class.
"""
from .serialization import SerializableMixIn, Modifications
__all__ = ['SerializableMixIn', 'Modifications', 'load', 'list_models']
def load(filename, model_name='default'):
from .serialization import SerializableMixIn
return SerializableMixIn.load(file_name=filename, model_name=model_name)
load.__doc__ = SerializableMixIn.load.__doc__
def list_models(filename):
""" Lists all models in given filename.
Parameters
----------
filename: str
path to filename, where the model has been stored.
Returns
-------
obj: dict
A mapping by name and a comprehensive description like this:
{model_name: {'repr' : 'string representation, 'created': 'human readable date', ...}
"""