Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
setattr(store, 'value', self.storage.vars[store_name + '_value'])
store.initialize()
store_idx = int(len(self.storage.dimensions['cvcache']))
self.cv_list[cv] = (store, store_idx)
self.storage.vars['cvcache'][store_idx] = store
# use the cache and function of the CV to fill the store when it is made
if not allow_incomplete:
indices = self.vars['uuid'][:]
for pos, idx in enumerate(indices):
proxy = LoaderProxy(self.storage.snapshots, idx)
value = cv._cache_dict._get(proxy)
if value is None:
# not in cache so compute it if possible
if cv._eval_dict:
value = cv._eval_dict([proxy])[0]
else:
value = None
if value is not None:
store.vars['value'][pos] = value
store.cache[pos] = value
cv.set_cache_store(store)
return store, store_idx
def _add_empty_to_cache(self, idx, sample_idxs, pmc_idx):
if idx not in self.cache:
obj = SampleSet(
samples=[self.storage.samples[sample_idx.tolist()] for sample_idx in sample_idxs],
movepath=LoaderProxy(self.storage.pathmovechanges, int(pmc_idx))
)
self.index[obj] = idx
self.cache[idx] = obj
def __getitem__(self, items):
if type(items) is LoaderProxy:
return self._post[[items]][0]
if hasattr(items, '__iter__'):
try:
_ = len(items)
except AttributeError:
# no length means unbound iterator and we cannot handle these
raise AttributeError(
'Iterators that do not have __len__ implemented are not '
'supported. You can wrap your iterator in list() if you '
'know that it will finish.')
try:
return self._post[items.as_proxies()]
except AttributeError:
# turn possible iterators into a list if possible
return self._post[list(items)]
def _get(self, idx, from_reversed=False):
if from_reversed:
obj = self.cache[idx ^ 1]
return AbstractSnapshot(
is_reversed=not obj.is_reversed,
topology=obj.topology,
reversed_copy=LoaderProxy(self, idx ^ 1)
)
else:
momentum_reversed = self.vars['momentum_reversed'][idx]
topology = self.storage.topology
return AbstractSnapshot(
is_reversed=momentum_reversed,
topology=topology,
reversed_copy=LoaderProxy(self, idx ^ 1)
)
def _add_empty_to_cache(self, idx, trajectory_idx, replica_idx, bias,
ensemble_idx, parent_idx, details_idx, mover_idx):
storage = self.storage
obj = Sample(
trajectory=storage.trajectories[int(trajectory_idx)],
replica=int(replica_idx),
bias=float(bias),
ensemble=storage.ensembles[int(ensemble_idx)],
mover=storage.pathmovers[int(mover_idx)],
parent=LoaderProxy(storage.samples, int(parent_idx)),
details=LoaderProxy(storage.details, int(details_idx))
)
self.index[obj] = idx
self.cache[idx] = obj
return obj
velocities=obj.velocities,
is_reversed=not obj.is_reversed,
topology=obj.topology,
reversed_copy=LoaderProxy(self, idx ^ 1)
)
else:
coordinates = self.vars['coordinates'][idx]
velocities = self.vars['velocities'][idx]
momentum_reversed = self.vars['momentum_reversed'][idx]
return ToySnapshot(
coordinates=coordinates,
velocities=velocities,
is_reversed=momentum_reversed,
topology=self.storage.topology,
reversed_copy=LoaderProxy(self, idx ^ 1)
)
def _get(self, idx, from_reversed=False):
if from_reversed:
obj = self.cache[idx ^ 1]
return Snapshot(
configuration=obj.configuration,
momentum=obj.momentum,
is_reversed=not obj.is_reversed,
topology=obj.topology,
reversed_copy=LoaderProxy(self, idx ^ 1)
)
else:
configuration = self.vars['configuration'][idx]
momentum = self.vars['momentum'][idx]
momentum_reversed = self.vars['momentum_reversed'][idx]
topology = self.storage.topology
return Snapshot(
configuration=configuration,
momentum=momentum,
is_reversed=momentum_reversed,
topology=topology,
reversed_copy=LoaderProxy(self, idx ^ 1)
)
def _add_empty_to_cache(self, idx, trajectory_idx, replica_idx, bias,
ensemble_idx, parent_idx, details_idx, mover_idx):
storage = self.storage
obj = Sample(
trajectory=storage.trajectories[int(trajectory_idx)],
replica=int(replica_idx),
bias=float(bias),
ensemble=storage.ensembles[int(ensemble_idx)],
mover=storage.pathmovers[int(mover_idx)],
parent=LoaderProxy(storage.samples, int(parent_idx)),
details=LoaderProxy(storage.details, int(details_idx))
)
self.index[obj] = idx
self.cache[idx] = obj
return obj
the snapshot to be saved
idx : int or None
if idx is not None the index will be used for saving in the storage.
This might overwrite already existing trajectories!
Notes
-----
This also saves all contained frames in the snapshot if not done yet.
A single Snapshot object can only be saved once!
"""
self._put(idx, snapshot)
reversed = snapshot._reversed
snapshot._reversed = LoaderProxy(self, idx ^ 1)
reversed._reversed = LoaderProxy(self, idx)
# mark reversed as stored
self.index[reversed] = idx ^ 1
def all(self):
return Trajectory([LoaderProxy(self, idx) for idx in range(len(self))])