Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import pandas as pd
import openpathsampling as paths
from openpathsampling.netcdfplus import StorableNamedObject
from functools import reduce # not built-in for py3
logger = logging.getLogger(__name__)
def index_to_string(index):
n_underscore = index // 26
letter_value = index % 26
mystr = "_"*n_underscore + chr(65 + letter_value)
return mystr
class TransitionNetwork(StorableNamedObject):
"""
Subclasses of TransitionNetwork are the main way to set up calculations
Attributes
----------
sampling_ensembles
all_ensembles
sampling_transitions
"""
def __init__(self):
super(TransitionNetwork, self).__init__()
#self.transitions = {}
#self.special_ensembles = {}
@property
def sampling_ensembles(self):
def test_in_shooting_move(self):
import glob
for testfile in glob.glob("test*out") + glob.glob("test*inp"):
os.remove(testfile)
ens10 = paths.LengthEnsemble(10)
init_traj = self.fast_engine.generate_forward(self.template, ens10)
assert_equal(ens10(init_traj), True)
init_conds = paths.SampleSet([
paths.Sample(replica=0, ensemble=ens10, trajectory=init_traj)
])
shooter = paths.OneWayShootingMover(ensemble=ens10,
selector=paths.UniformSelector(),
engine=self.fast_engine)
prev_sample_set = init_conds
default_traj = [[[0.0]], [[1.0]], [[2.0]], [[3.0]], [[4.0]],
[[5.0]], [[6.0]], [[7.0]], [[8.0]], [[9.0]]]
assert_items_equal(init_conds[0].trajectory.xyz, default_traj)
for step in range(10):
assert_equal(len(prev_sample_set), 1)
change = shooter.move(prev_sample_set)
new_sample_set = prev_sample_set.apply_samples(change.results)
assert_items_equal(new_sample_set[0].trajectory.xyz,
default_traj)
prev_traj = prev_sample_set[0].trajectory
def setUp(self):
slow_options = {
'n_frames_max' : 10000,
'engine_sleep' : 100,
'name_prefix' : "test"
}
fast_options = {
'n_frames_max' : 10000,
'engine_sleep' : 0,
'name_prefix' : "test"
}
self.template = peng.toy.Snapshot(coordinates=np.array([[0.0]]),
velocities=np.array([[1.0]]))
self.slow_engine = ExternalEngine(slow_options, self.template)
self.fast_engine = ExternalEngine(fast_options, self.template)
self.ensemble = paths.LengthEnsemble(5)
def netcdfplus_init(store):
kinetic_store = KineticContainerStore()
kinetic_store.set_caching(WeakLRUCache(10000))
name = store.prefix + 'kinetics'
# tell the KineticContainerStore to base its dimensions on names
# prefixed with the store name to make sure each snapshot store has
# its own kinetics store
kinetic_store.set_dimension_prefix_store(store)
store.storage.create_store(name, kinetic_store, False)
store.create_variable(
'kinetics', 'lazyobj.' + name,
description="the snapshot index (0..n_momentum-1) 'frame' of "
"snapshot '{idx}'.")
store.create_variable(
'is_reversed', 'bool',
def netcdfplus_init(store):
static_store = StaticContainerStore()
static_store.set_caching(WeakLRUCache(10000))
name = store.prefix + 'statics'
static_store.set_dimension_prefix_store(store)
store.storage.create_store(name, static_store, False)
store.create_variable(
'statics',
'lazyobj.' + name,
description="the snapshot index (0..n_configuration-1) of "
"snapshot '{idx}'.")
We do this by using a special sequential ensemble for the sequence.
This path ensemble is particularly complex because we want to be sure that
the path we generate is in the ensemble we desire: this means that we can't
use PartOutXEnsemble as we typically do with TIS paths.
"""
snapshot = engine.storage.snapshots.load(0)
first_traj_ensemble = SequentialEnsemble([
AllOutXEnsemble(stateA) | LengthEnsemble(0),
AllInXEnsemble(stateA),
(AllOutXEnsemble(stateA) & AllInXEnsemble(interface0)) | LengthEnsemble(0),
AllInXEnsemble(interface0) | LengthEnsemble(0),
AllOutXEnsemble(interface0),
AllOutXEnsemble(stateA) | LengthEnsemble(0),
AllInXEnsemble(stateA) & LengthEnsemble(1)
])
interface0_ensemble = interface_set[0]
print "start path generation (should not take more than a few minutes)"
total_path = engine.generate(snapshot,
[first_traj_ensemble.can_append])
print "path generation complete"
print
print "Total trajectory length: ", len(total_path)
segments = interface0_ensemble.split(total_path)
print "Traj in first_traj_ensemble? (should be)",
print first_traj_ensemble(total_path)
print "Traj in TIS ensemble? (probably not)",
print interface0_ensemble(total_path)
print "Number of segments in TIS ensemble: ", len(segments)
if len(segments):
We do this by using a special sequential ensemble for the sequence.
This path ensemble is particularly complex because we want to be sure that
the path we generate is in the ensemble we desire: this means that we can't
use PartOutXEnsemble as we typically do with TIS paths.
"""
snapshot = engine.storage.snapshots.load(0)
first_traj_ensemble = SequentialEnsemble([
AllOutXEnsemble(stateA) | LengthEnsemble(0),
AllInXEnsemble(stateA),
(AllOutXEnsemble(stateA) & AllInXEnsemble(interface0)) | LengthEnsemble(0),
AllInXEnsemble(interface0) | LengthEnsemble(0),
AllOutXEnsemble(interface0),
AllOutXEnsemble(stateA) | LengthEnsemble(0),
AllInXEnsemble(stateA) & LengthEnsemble(1)
])
interface0_ensemble = interface_set[0]
print "start path generation (should not take more than a few minutes)"
total_path = engine.generate(snapshot,
[first_traj_ensemble.can_append])
print "path generation complete"
print
print "Total trajectory length: ", len(total_path)
segments = interface0_ensemble.split(total_path)
print "Traj in first_traj_ensemble? (should be)",
print first_traj_ensemble(total_path)
print "Traj in TIS ensemble? (probably not)",
print interface0_ensemble(total_path)
print "Number of segments in TIS ensemble: ", len(segments)
if len(segments):
interface_set = ef.TISEnsembleSet(stateA, stateA | stateB, volume_set)
for no, interface in enumerate(interface_set):
# Give each interface a name
interface.name = 'Interface '+str(no)
# And save all of these
engine.storage.ensembles.save()
mover_set = mf.OneWayShootingSet(UniformSelector(), interface_set)
snapshot = engine.storage.snapshots.load(0)
first_traj_ensemble = SequentialEnsemble([
AllOutXEnsemble(stateA) | LengthEnsemble(0),
AllInXEnsemble(stateA),
(AllOutXEnsemble(stateA) & AllInXEnsemble(interface0)) | LengthEnsemble(0),
AllInXEnsemble(interface0) | LengthEnsemble(0),
AllOutXEnsemble(interface0),
AllOutXEnsemble(stateA) | LengthEnsemble(0),
AllInXEnsemble(stateA) & LengthEnsemble(1)
])
interface0_ensemble = interface_set[0]
print "start path generation (should not take more than a few minutes)"
total_path = engine.generate(snapshot,
[first_traj_ensemble.can_append])
print "path generation complete"
print
print "Total trajectory length: ", len(total_path)
segments = interface0_ensemble.split(total_path)
print "Traj in first_traj_ensemble? (should be)",
print first_traj_ensemble(total_path)
print "Traj in TIS ensemble? (probably not)",
the final frame (`padding=[0, -1]`) so that it doesn't include
the frame in `to_vol`.
Returns
-------
list of :class:`.Trajectory`
the frames from (and including) each first entry from `to_vol`
into `from_vol` until (and including) the next entry into
`to_vol`, with no frames in `forbidden`, and with frames removed
from the ends according to `padding`
"""
if forbidden is None:
forbidden = paths.EmptyVolume()
ensemble_BAB = paths.SequentialEnsemble([
paths.LengthEnsemble(1) & paths.AllInXEnsemble(to_vol),
paths.AllOutXEnsemble(to_vol) & paths.PartInXEnsemble(from_vol),
paths.LengthEnsemble(1) & paths.AllInXEnsemble(to_vol)
]) & paths.AllOutXEnsemble(forbidden)
ensemble_AB = paths.SequentialEnsemble([
paths.LengthEnsemble(1) & paths.AllInXEnsemble(from_vol),
paths.OptionalEnsemble(paths.AllOutXEnsemble(to_vol)),
paths.LengthEnsemble(1) & paths.AllInXEnsemble(to_vol)
])
BAB_split = ensemble_BAB.split(trajectory)
AB_split = [ensemble_AB.split(part)[0] for part in BAB_split]
return [subtraj[padding[0]:padding[1]] for subtraj in AB_split]
Returns
-------
list of :class:`.Trajectory`
the frames from (and including) each first entry from `to_vol`
into `from_vol` until (and including) the next entry into
`to_vol`, with no frames in `forbidden`, and with frames removed
from the ends according to `padding`
"""
if forbidden is None:
forbidden = paths.EmptyVolume()
ensemble_BAB = paths.SequentialEnsemble([
paths.LengthEnsemble(1) & paths.AllInXEnsemble(to_vol),
paths.AllOutXEnsemble(to_vol) & paths.PartInXEnsemble(from_vol),
paths.LengthEnsemble(1) & paths.AllInXEnsemble(to_vol)
]) & paths.AllOutXEnsemble(forbidden)
ensemble_AB = paths.SequentialEnsemble([
paths.LengthEnsemble(1) & paths.AllInXEnsemble(from_vol),
paths.OptionalEnsemble(paths.AllOutXEnsemble(to_vol)),
paths.LengthEnsemble(1) & paths.AllInXEnsemble(to_vol)
])
BAB_split = ensemble_BAB.split(trajectory)
AB_split = [ensemble_AB.split(part)[0] for part in BAB_split]
return [subtraj[padding[0]:padding[1]] for subtraj in AB_split]