Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Returns
-------
:class:`.TrajectorySegmentContainer`
transitions from `stateA` to `stateB` within `trajectory`
"""
# we define the transitions ensemble just in case the transition is,
# e.g., fixed path length TPS. We want flexible path length ensemble
transition_ensemble = paths.SequentialEnsemble([
paths.AllInXEnsemble(stateA) & paths.LengthEnsemble(1),
paths.OptionalEnsemble( # optional to allow instantaneous hops
paths.AllOutXEnsemble(stateA) & paths.AllOutXEnsemble(stateB)
),
paths.AllInXEnsemble(stateB) & paths.LengthEnsemble(1)
])
segments = [seg[1:-1] for seg in transition_ensemble.split(trajectory)]
return TrajectorySegmentContainer(segments, self.dt)
out_segments = self.get_lifetime_segments(
trajectory=trajectory,
from_vol=~interface,
to_vol=state,
forbidden=other,
padding=[None, -1]
)
out_container = TrajectorySegmentContainer(out_segments, self.dt)
in_segments = self.get_lifetime_segments(
trajectory=trajectory,
from_vol=state,
to_vol=~interface,
forbidden=other,
padding=[None, -1]
)
in_container = TrajectorySegmentContainer(in_segments, self.dt)
return {'in': in_container, 'out': out_container}
def reset_analysis(self):
"""Reset the analysis by emptying all saved segments."""
stateA = self.stateA
stateB = self.stateB
dt = self.dt
self.continuous_segments = {
stateA: TrajectorySegmentContainer(segments=[], dt=dt),
stateB: TrajectorySegmentContainer(segments=[], dt=dt)
}
self.lifetime_segments = {
stateA: TrajectorySegmentContainer(segments=[], dt=dt),
stateB: TrajectorySegmentContainer(segments=[], dt=dt)
}
self.transition_segments = {
(stateA, stateB): TrajectorySegmentContainer(segments=[], dt=dt),
(stateB, stateA): TrajectorySegmentContainer(segments=[], dt=dt)
}
self.flux_segments = {
stateA: {'in': TrajectorySegmentContainer(segments=[], dt=dt),
'out': TrajectorySegmentContainer(segments=[], dt=dt)},
stateB: {'in': TrajectorySegmentContainer(segments=[], dt=dt),
'out': TrajectorySegmentContainer(segments=[], dt=dt)}
}
def _analyze_flux_single_traj(self, trajectory, state, interface):
other = list(set([self.stateA, self.stateB]) - set([state]))[0]
out_segments = self.get_lifetime_segments(
trajectory=trajectory,
from_vol=~interface,
to_vol=state,
forbidden=other,
padding=[None, -1]
)
out_container = TrajectorySegmentContainer(out_segments, self.dt)
in_segments = self.get_lifetime_segments(
trajectory=trajectory,
from_vol=state,
to_vol=~interface,
forbidden=other,
padding=[None, -1]
)
in_container = TrajectorySegmentContainer(in_segments, self.dt)
return {'in': in_container, 'out': out_container}
def __add__(self, other):
if self.dt != other.dt:
raise RuntimeError(
"Different time steps in TrajectorySegmentContainers."
)
return TrajectorySegmentContainer(self._segments + other._segments,
self.dt)