Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
new_class = SharedTable
elif isinstance(data, pd.DataFrame):
new_class = SharedPandasFrame
elif isinstance(data, (tuple, list)):
new_class = SharedArray
elif isinstance(data, (np.ndarray, np.matrix)):
new_class = SharedCArray
else:
raise RuntimeError('Your data `%s` is not understood.' % key)
shared_data = new_class(result.f_translate_key(key), result, trajectory=trajectory)
shared_data._request_data('make_shared')
result[key] = shared_data
return result
class SharedData(HasLogger):
FLAG = None
def __init__(self, name=None, parent=None, trajectory=None, add_to_parent=False):
self._set_logger()
self.name = name
self.parent = parent
self.traj = trajectory
if add_to_parent:
self.parent[name] = self
def _check_state(self):
if self.traj is None:
raise TypeError('Please pass the trajectory to the shared data, via'
'`shared_data.traj = trajectory`, otherwise you cannot '
'access the data.')
successful parameter exploration
requires parallel processing (see :class:`~pypet.brian.network.NetworkManager`).
.. _`BRIAN network`: http://briansimulator.org/docs/reference-network.html
"""
__author__ = 'Robert Meyer'
from brian import Network, clear, reinit
from brian.units import second
from pypet.pypetlogging import HasLogger
class NetworkComponent(HasLogger):
"""Abstract class to define a component of a BRIAN network.
Can be subclassed to define the construction of NeuronGroups_ or
Connections_, for instance.
.. _NeuronGroups: http://briansimulator.org/docs/reference-models-and-groups.html
.. _Connections: http://briansimulator.org/docs/reference-connections.html
"""
def add_parameters(self, traj):
"""Adds parameters to `traj`.
Function called from the :class:`~pypet.brian.network.NetworkManager` to
define and add parameters to the trajectory container.
trajectory_name = kwargs['trajectory_name']
if trajectory_name not in self.references:
self.references[trajectory_name] = []
self.references[trajectory_name].append((msg, cp.copy(stuff_to_store), args, kwargs))
def load(self, *args, **kwargs):
"""Not implemented"""
raise NotImplementedError('Reference wrapping does not support loading. If you want to '
'load data in a multiprocessing environment, use a Lock '
'wrapping.')
def free_references(self):
self.references = {}
class ReferenceStore(HasLogger):
"""Class that can store references"""
def __init__(self, storage_service, gc_interval=None):
self._storage_service = storage_service
self.gc_interval = gc_interval
self.operation_counter = 0
self._set_logger()
def _check_and_collect_garbage(self):
if self.gc_interval and self.operation_counter % self.gc_interval == 0:
collected = gc.collect()
self._logger.debug('Garbage Collection: Found %d unreachable items.' % collected)
self.operation_counter += 1
def store_references(self, references):
"""Stores references to disk and may collect garbage."""
for trajectory_name in references:
# Finally get all results from the result queue once more and finalize the queue
self._get_results_from_queue(result_queue, results, n, total_runs)
result_queue.close()
result_queue.join_thread()
del result_queue
finally:
# Finalize the wrapper
if self._multiproc_wrapper is not None:
self._multiproc_wrapper.f_finalize()
self._multiproc_wrapper = None
return expanded_by_postproc
class MultiprocContext(HasLogger):
""" A lightweight environment that allows the usage of multiprocessing.
Can be used if you don't want a full-blown :class:`~pypet.environment.Environment` to
enable multiprocessing or if you want to implement your own custom multiprocessing.
This Wrapper tool will take a trajectory container and take care that the storage
service is multiprocessing safe. Supports the ``'LOCK'`` as well as the ``'QUEUE'`` mode.
In case of the latter an extra queue process is created if desired.
This process will handle all storage requests and write data to the hdf5 file.
Not that in case of ``'QUEUE'`` wrapping data can only be stored not loaded, because
the queue will only be read in one direction.
:param trajectory:
The trajectory which storage service should be wrapped
"""
self.start(test_connection=False)
while True:
response = self._req_rep(QueuingServerMessageListener.SPACE)
if response == QueuingServerMessageListener.SPACE_AVAILABLE:
self._req_rep((QueuingServerMessageListener.DATA, data))
break
else:
time.sleep(0.01)
def _send_request(self, request):
return self._socket.send_pyobj(request)
class ForkDetector(HasLogger):
def _detect_fork(self):
"""Detects if lock client was forked.
Forking is detected by comparing the PID of the current
process with the stored PID.
"""
if self._pid is None:
self._pid = os.getpid()
if self._context is not None:
current_pid = os.getpid()
if current_pid != self._pid:
self._logger.debug('Fork detected: My pid `%s` != os pid `%s`. '
'Restarting connection.' % (str(self._pid), str(current_pid)))
self._context = None
self._pid = current_pid
def f_set_single(self, name, data):
""" Sets a single annotation. """
self._dict[name] = data
def f_ann_to_str(self):
"""Returns all annotations lexicographically sorted as a concatenated string."""
resstr = ''
for key in sorted(self._dict.keys()):
resstr += '%s=%s; ' % (key, str(self._dict[key]))
return resstr[:-2]
def __str__(self):
return self.f_ann_to_str()
class WithAnnotations(HasLogger):
__slots__ = ('_annotations',)
def __init__(self):
self._annotations = Annotations() # The annotation object to handle annotations
@property
def v_annotations(self):
""" Annotation feature of a trajectory node.
Store some short additional information about your nodes here.
If you use the standard HDF5 storage service, they will be stored as hdf5 node
attributes_.
.. _attributes: http://pytables.github.io/usersguide/libref/declarative_classes.html#the-attributeset-class
def __init__(self, url='tcp://127.0.0.1:7777', lock_name=LockerServer.DEFAULT_LOCK):
super(ForkAwareLockerClient, self).__init__(url, lock_name)
self._pid = None
def __getstate__(self):
result_dict = super(ForkAwareLockerClient, self).__getstate__()
result_dict['_pid'] = None
return result_dict
def start(self, test_connection=True):
"""Checks for forking and starts/restarts if desired"""
self._detect_fork()
super(ForkAwareLockerClient, self).start(test_connection)
class QueueStorageServiceSender(MultiprocWrapper, HasLogger):
""" For multiprocessing with :const:`~pypet.pypetconstants.WRAP_MODE_QUEUE`, replaces the
original storage service.
All storage requests are send over a queue to the process running the
:class:`~pypet.storageservice.QdebugueueStorageServiceWriter`.
Does not support loading of data!
"""
def __init__(self, storage_queue=None):
self.queue = storage_queue
self.pickle_queue = True
self._set_logger()
def __getstate__(self):
# print('S; DONE SENDING data')
def store(self, *args, **kwargs):
"""Puts data to store on queue.
Note that the queue will no longer be pickled if the Sender is pickled.
"""
self._put_on_pipe(('STORE', args, kwargs))
def send_done(self):
"""Signals the writer that it can stop listening to the queue"""
self._put_on_pipe(('DONE', [], {}))
class StorageServiceDataHandler(HasLogger):
"""Class that can store data via a storage service, needs to be sub-classed to receive data"""
def __init__(self, storage_service, gc_interval=None):
self._storage_service = storage_service
self._trajectory_name = ''
self.gc_interval = gc_interval
self.operation_counter = 0
self._set_logger()
def __repr__(self):
return '<%s wrapping Storage Service %s>' % (self.__class__.__name__,
repr(self._storage_service))
def _open_file(self):
self._storage_service.store(pypetconstants.OPEN_FILE, None,
trajectory_name=self._trajectory_name)
However, the storage service may provide to keep the store open and signals
this via this property.
"""
return False
@property
def multiproc_safe(self):
"""This wrapper guarantees multiprocessing safety"""
return True
def store(self, *args, **kwargs):
raise NotImplementedError('Implement this!')
class ZMQServer(HasLogger):
""" Generic zmq server """
PING = 'PING' # for connection testing
PONG = 'PONG' # for connection testing
DONE = 'DONE' # signals stopping of server
CLOSED = 'CLOSED' # signals closing of server
def __init__(self, url="tcp://127.0.0.1:7777"):
self._url = url # server url
self._set_logger()
self._context = None
self._socket = None
def _start(self):
self._logger.info('Starting Server at `%s`' % self._url)
self._context = zmq.Context()
# 7. Call `remove` for all analyser components
for analyser in analyser_list:
analyser.remove_from_network(traj, network, current_subrun, subrun_list,
network_dict)
# 8. Call `remove` for all normal components
for component in component_list:
component.remove_from_network(traj, network, current_subrun, subrun_list,
network_dict)
subrun_number += 1
class NetworkManager(HasLogger):
"""Manages a BRIAN2 network experiment and creates the network.
An experiment consists of
:param network_runner: A :class:`~pypet.brian2.network.NetworkRunner`
Special component that handles the execution of several subruns.
A NetworkRunner can be subclassed to implement the
:func:`~pypet.brian2.network.NetworkComponent.add_parameters` method to add
:class:`~pypet.brian2.parameter.Brian2Parameter` instances defining the
order and duration of subruns.
:param component_list:
List of :class:`~pypet.brian2.network.NetworkComponents` instances to create
and manage individual parts of a network.