Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, recording, chunk_size=10000, cache_chunks=False):
if not isinstance(recording, RecordingExtractor):
raise ValueError("'recording' must be a RecordingExtractor")
self._recording = recording
self._chunk_size = chunk_size
self._cache_chunks = cache_chunks
if cache_chunks:
self._filtered_cache_chunks = FilteredChunkCache()
else:
self._filtered_cache_chunks = None
self._traces = None
se.RecordingExtractor.__init__(self)
dtype = str(self._recording.get_dtype())
if 'uint' in dtype:
if 'numpy' in dtype:
dtype = str(dtype).replace("", "")
# drop 'numpy'
dtype = dtype.split('.')[1]
dtype_signed = dtype[1:]
exp_idx = dtype.find('int') + 3
exp = int(dtype[exp_idx:])
offset = - 2**(exp - 1)
self._recording = TransformRecording(self._recording, offset=offset, dtype=dtype_signed)
print(f"dtype converted from {dtype} to {dtype_signed} before filtering")
self._dtype = dtype_signed
else:
self._dtype = dtype
self.copy_channel_properties(recording)
self._spike_clusters_amps = None
self._spike_times_pca = None
self._spike_times_amps = None
# Dictionary of cached metrics
self.metrics = {}
self.verbose = verbose
for epoch in self._epochs:
self._sorting.add_epoch(
epoch_name=epoch[0],
start_frame=epoch[1] * self._sampling_frequency,
end_frame=epoch[2] * self._sampling_frequency,
)
if recording is not None:
assert isinstance(
recording, RecordingExtractor
), "'sorting' must be a RecordingExtractor object"
self.set_recording(
recording,
apply_filter=apply_filter,
freq_min=freq_min,
freq_max=freq_max,
)
else:
self._recording = None
def __init__(self, recording, resample_rate):
assert HAVE_RR, "To use the ResampleRecording, install scipy: \n\n pip install scipy\n\n"
self._recording = recording
self._resample_rate = resample_rate
RecordingExtractor.__init__(self)
self._dtype = recording.get_dtype()
self.copy_channel_properties(recording)
self.is_filtered = self._recording.is_filtered
self._kwargs = {'recording': recording.make_serialized_dict(), 'resample_rate': resample_rate}
def __init__(self, file_path):
assert HAVE_NIXIO, self.installation_mesg
RecordingExtractor.__init__(self)
self._file = nix.File.open(file_path, nix.FileMode.ReadOnly)
self._load_properties()
self._kwargs = {'file_path': str(Path(file_path).absolute())}
from spikeextractors import RecordingExtractor, SortingExtractor
from spikeextractors.extraction_tools import check_get_traces_args
from pathlib import Path
try:
import pyintan
HAVE_INTAN = True
except ImportError:
HAVE_INTAN = False
class IntanRecordingExtractor(RecordingExtractor):
extractor_name = 'IntanRecording'
has_default_locations = False
is_writable = False
mode = 'file'
installed = HAVE_INTAN # check at class level if installed or not
installation_mesg = "To use the Intan extractor, install pyintan: \n\n pip install pyintan\n\n" # error message when not installed
def __init__(self, file_path, verbose=False):
assert HAVE_INTAN, self.installation_mesg
RecordingExtractor.__init__(self)
assert Path(file_path).suffix == '.rhs' or Path(file_path).suffix == '.rhd', \
"Only '.rhd' and '.rhs' files are supported"
self._recording_file = file_path
self._recording = pyintan.File(file_path, verbose)
self._kwargs = {'file_path': str(Path(file_path).absolute()), 'verbose': verbose}
def __init__(self, folder_path):
RecordingExtractor.__init__(self)
phy_folder = Path(folder_path)
self.params = read_python(str(phy_folder / 'params.py'))
datfile = [x for x in phy_folder.iterdir() if x.suffix == '.dat' or x.suffix == '.bin']
if (phy_folder / 'channel_map_si.npy').is_file():
channel_map = list(np.squeeze(np.load(phy_folder / 'channel_map_si.npy')))
assert len(channel_map) == self.params['n_channels_dat']
elif (phy_folder / 'channel_map.npy').is_file():
channel_map = list(np.squeeze(np.load(phy_folder / 'channel_map.npy')))
assert len(channel_map) == self.params['n_channels_dat']
else:
channel_map = list(range(self.params['n_channels_dat']))
BinDatRecordingExtractor.__init__(self, datfile[0], sampling_frequency=float(self.params['sample_rate']),
dtype=self.params['dtype'], numchan=self.params['n_channels_dat'],