Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
RecordingExtractor.__init__(self)
phy_folder = Path(folder_path)
self.params = read_python(str(phy_folder / 'params.py'))
datfile = [x for x in phy_folder.iterdir() if x.suffix == '.dat' or x.suffix == '.bin']
if (phy_folder / 'channel_map_si.npy').is_file():
channel_map = list(np.squeeze(np.load(phy_folder / 'channel_map_si.npy')))
assert len(channel_map) == self.params['n_channels_dat']
elif (phy_folder / 'channel_map.npy').is_file():
channel_map = list(np.squeeze(np.load(phy_folder / 'channel_map.npy')))
assert len(channel_map) == self.params['n_channels_dat']
else:
channel_map = list(range(self.params['n_channels_dat']))
BinDatRecordingExtractor.__init__(self, datfile[0], sampling_frequency=float(self.params['sample_rate']),
dtype=self.params['dtype'], numchan=self.params['n_channels_dat'],
recording_channels=list(channel_map))
if (phy_folder / 'channel_groups.npy').is_file():
channel_groups = np.load(phy_folder / 'channel_groups.npy')
assert len(channel_groups) == self.get_num_channels()
for (ch, cg) in zip(self.get_channel_ids(), channel_groups):
self.set_channel_property(ch, 'group', cg)
if (phy_folder / 'channel_positions.npy').is_file():
channel_locations = np.load(phy_folder / 'channel_positions.npy')
assert len(channel_locations) == self.get_num_channels()
for (ch, loc) in zip(self.get_channel_ids(), channel_locations):
self.set_channel_property(ch, 'location', loc)
self._kwargs = {'folder_path': str(Path(folder_path).absolute())}
def __init__(self, folder_path):
assert HAVE_KLSX, "To use the KlustaSortingExtractor install h5py: \n\n pip install h5py\n\n"
klustafolder = Path(folder_path).absolute()
config_file = [f for f in klustafolder.iterdir() if f.suffix == '.prm'][0]
dat_file = [f for f in klustafolder.iterdir() if f.suffix == '.dat'][0]
assert config_file.is_file() and dat_file.is_file(), "Not a valid klusta folder"
config = read_python(str(config_file))
sampling_frequency = config['traces']['sample_rate']
n_channels = config['traces']['n_channels']
dtype = config['traces']['dtype']
BinDatRecordingExtractor.__init__(self, file_path=dat_file, sampling_frequency=sampling_frequency, numchan=n_channels,
dtype=dtype)
self._kwargs = {'folder_path': str(Path(folder_path).absolute())}
def __init__(self, folder_path):
assert HAVE_KLSX, self.installation_mesg
klustafolder = Path(folder_path).absolute()
config_file = [f for f in klustafolder.iterdir() if f.suffix == '.prm'][0]
dat_file = [f for f in klustafolder.iterdir() if f.suffix == '.dat'][0]
assert config_file.is_file() and dat_file.is_file(), "Not a valid klusta folder"
config = read_python(str(config_file))
sampling_frequency = config['traces']['sample_rate']
n_channels = config['traces']['n_channels']
dtype = config['traces']['dtype']
BinDatRecordingExtractor.__init__(self, file_path=dat_file, sampling_frequency=sampling_frequency, numchan=n_channels,
dtype=dtype)
self._kwargs = {'folder_path': str(Path(folder_path).absolute())}
params = sbio.get_params(file_path)['data']
# create a shybrid probe object
probe = sbprb.Probe(params['probe'])
nb_channels = probe.total_nb_channels
# translate the byte ordering
# TODO still ambiguous, shybrid should assume time_axis=1, since spike interface makes an assumption on the byte ordering
byte_order = params['order']
if byte_order == 'C':
time_axis = 1
elif byte_order == 'F':
time_axis = 0
# piggyback on binary data recording extractor
BinDatRecordingExtractor.__init__(self,
file_path,
params['fs'],
nb_channels,
params['dtype'],
time_axis=time_axis)
self._kwargs = {'file_path': str(Path(file_path).absolute())}
self = load_probe_file(self, params['probe'])
if save_path is None:
self._is_tmp = True
self._tmp_file = tempfile.NamedTemporaryFile(suffix=".dat", dir=tmp_folder).name
else:
save_path = Path(save_path)
if save_path.suffix != '.dat' and save_path.suffix != '.bin':
save_path = save_path.with_suffix('.dat')
if not save_path.parent.is_dir():
os.makedirs(save_path.parent)
self._is_tmp = False
self._tmp_file = save_path
self._dtype = recording.get_dtype()
recording.write_to_binary_dat_format(save_path=self._tmp_file, dtype=self._dtype, chunk_size=chunk_size)
# keep track of filter status when dumping
self.is_filtered = self._recording.is_filtered
BinDatRecordingExtractor.__init__(self, self._tmp_file, numchan=recording.get_num_channels(),
recording_channels=recording.get_channel_ids(),
sampling_frequency=recording.get_sampling_frequency(),
dtype=self._dtype, is_filtered=self.is_filtered)
# keep BinDatRecording kwargs
self._bindat_kwargs = deepcopy(self._kwargs)
self.set_tmp_folder(tmp_folder)
self.copy_channel_properties(recording)
self._kwargs = {'recording': recording, 'chunk_size': chunk_size}
def move_to(self, save_path):
save_path = Path(save_path)
if save_path.suffix != '.dat' and save_path.suffix != '.bin':
save_path = save_path.with_suffix('.dat')
if not save_path.parent.is_dir():
os.makedirs(save_path.parent)
shutil.move(self._tmp_file, str(save_path))
self._tmp_file = str(save_path)
self._kwargs['file_path'] = str(Path(self._tmp_file).absolute())
self._bindat_kwargs['file_path'] = str(Path(self._tmp_file).absolute())
self._is_tmp = False
tmp_folder = self.get_tmp_folder()
# re-initialize with new file
BinDatRecordingExtractor.__init__(self, self._tmp_file, numchan=self._recording.get_num_channels(),
recording_channels=self._recording.get_channel_ids(),
sampling_frequency=self._recording.get_sampling_frequency(),
dtype=self._dtype, is_filtered=self.is_filtered)
self.set_tmp_folder(tmp_folder)
self.copy_channel_properties(self._recording)