Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
os.makedirs(str(study_folder / 'ground_truth'))
os.makedirs(str(study_folder / 'sortings'))
os.makedirs(str(study_folder / 'sortings/run_log' ))
for rec_name, (recording, sorting_gt) in gt_dict.items():
# write recording as binary format + json + prb
raw_filename = study_folder / 'raw_files' / (rec_name + '.dat')
prb_filename = study_folder / 'raw_files' / (rec_name + '.prb')
json_filename = study_folder / 'raw_files' / (rec_name + '.json')
num_chan = recording.get_num_channels()
chunksize = 2 ** 24 // num_chan
sr = recording.get_sampling_frequency()
se.write_binary_dat_format(recording, raw_filename, time_axis=0, dtype='float32', chunksize=chunksize)
se.save_probe_file(recording, prb_filename, format='spyking_circus')
with open(json_filename, 'w', encoding='utf8') as f:
info = dict(sample_rate=sr, num_chan=num_chan, dtype='float32', frames_first=True)
json.dump(info, f, indent=4)
# write recording sorting_gt as with npz format
se.NpzSortingExtractor.write_sorting(sorting_gt, study_folder / 'ground_truth' / (rec_name + '.npz'))
# make an index of recording names
with open(study_folder / 'names.txt', mode='w', encoding='utf8') as f:
for rec_name in gt_dict:
f.write(rec_name + '\n')
# source file
if isinstance(recording, se.BinDatRecordingExtractor) and recording._frame_first and\
recording._timeseries.offset==0:
# no need to copy
raw_filename = str(recording._datfile)
raw_filename = raw_filename.replace('.dat', '')
dtype = recording._timeseries.dtype.str
nb_chan = len(recording._channels)
else:
# save binary file (chunk by hcunk) into a new file
raw_filename = output_folder / 'recording'
n_chan = recording.get_num_channels()
chunksize = 2**24// n_chan
dtype='int16'
se.write_binary_dat_format(recording, raw_filename, time_axis=0, dtype=dtype, chunksize=chunksize)
if p['detect_sign'] < 0:
detect_sign = 'negative'
elif p['detect_sign'] > 0:
detect_sign = 'positive'
else:
detect_sign = 'both'
# set up klusta config file
with (source_dir / 'config_default.prm').open('r') as f:
klusta_config = f.readlines()
# Note: should use format with dict approach here
klusta_config = ''.join(klusta_config).format(raw_filename,
p['probe_file'], float(recording.get_sampling_frequency()),
recording.get_num_channels(), "'{}'".format(dtype),
def _setup_recording(self, recording, output_folder):
source_dir = Path(__file__).parent
p = self.params
if not check_if_installed(KilosortSorter.kilosort_path, KilosortSorter.npy_matlab_path):
raise Exception(KilosortSorter.installation_mesg)
# save binary file
file_name = 'recording'
se.write_binary_dat_format(recording, output_folder / file_name, dtype='int16')
# set up kilosort config files and run kilosort on data
with (source_dir / 'kilosort_master.txt').open('r') as f:
kilosort_master = f.readlines()
with (source_dir / 'kilosort_config.txt').open('r') as f:
kilosort_config = f.readlines()
with (source_dir / 'kilosort_channelmap.txt').open('r') as f:
kilosort_channelmap = f.readlines()
nchan = recording.get_num_channels()
dat_file = (output_folder / (file_name + '.dat')).absolute()
kilo_thresh = p['detect_threshold']
Nfilt = (nchan // 32) * 32 * 8
if Nfilt == 0:
Nfilt = nchan * 8
nsamples = 128 * 1024 + 64
# source file
if isinstance(recording, se.BinDatRecordingExtractor) and recording._frame_first:
# no need to copy
raw_filename = recording._datfile
dtype = recording._timeseries.dtype.str
nb_chan = len(recording._channels)
offset = recording._timeseries.offset
else:
if self.debug:
print('Local copy of recording')
# save binary file (chunk by hcunk) into a new file
raw_filename = output_folder / 'raw_signals.raw'
n_chan = recording.get_num_channels()
chunksize = 2**24// n_chan
se.write_binary_dat_format(recording, raw_filename, time_axis=0, dtype='float32', chunksize=chunksize)
dtype='float32'
offset = 0
# initialize source and probe file
tdc_dataio = tdc.DataIO(dirname=str(output_folder))
nb_chan = recording.get_num_channels()
tdc_dataio.set_data_source(type='RawData', filenames=[str(raw_filename)],
dtype=dtype, sample_rate=recording.get_sampling_frequency(),
total_channel=nb_chan, offset=offset)
tdc_dataio.set_probe_file(str(probe_file))
if self.debug:
print(tdc_dataio)
The maximum number of wavefomrs to extract (default is np.inf)
compute_property_from_recording: bool
If True and 'grouping_property' is given, the property of each unit is assigned as the corresponding propery of
the recording extractor channel on which the average waveform is the largest
verbose: bool
If True output is verbose
'''
import spiketoolkit as st
if not isinstance(recording, se.RecordingExtractor) or not isinstance(sorting, se.SortingExtractor):
raise AttributeError()
output_folder = Path(output_folder).absolute()
if not output_folder.is_dir():
output_folder.mkdir()
# save dat file
se.write_binary_dat_format(recording, output_folder / 'recording.dat', dtype='int16')
# write params.py
with (output_folder / 'params.py').open('w') as f:
f.write("dat_path =" + "'" + str(output_folder / 'recording.dat') + "'" + '\n')
f.write('n_channels_dat = ' + str(recording.get_num_channels()) + '\n')
f.write("dtype = 'int16'\n")
f.write('offset = 0\n')
f.write('sample_rate = ' + str(recording.get_sampling_frequency()) + '\n')
f.write('hp_filtered = False')
# pc_features.npy - [nSpikes, nFeaturesPerChannel, nPCFeatures] single
if nPC > recording.get_num_channels():
nPC = recording.get_num_channels()
print("Changed number of PC to number of channels: ", nPC)
pc_scores = compute_pca_scores(recording, sorting, n_comp=nPC, by_electrode=True,
start_frame=start_frame, end_frame=end_frame, max_num_waveforms=max_num_waveforms,
def _setup_recording(self, recording, output_folder):
source_dir = Path(__file__).parent
p = self.params
if not check_if_installed(Kilosort2Sorter.kilosort2_path, Kilosort2Sorter.npy_matlab_path):
raise Exception(Kilosort2Sorter.installation_mesg)
# save binary file
if p['file_name'] is None:
self.file_name = Path('recording')
elif p['file_name'].suffix == '.dat':
self.file_name = p['file_name'].stem
p['file_name'] = self.file_name
se.write_binary_dat_format(recording, output_folder / self.file_name, dtype='int16')
# set up kilosort2 config files and run kilosort2 on data
with (source_dir / 'kilosort2_master.txt').open('r') as f:
kilosort2_master = f.readlines()
with (source_dir / 'kilosort2_config.txt').open('r') as f:
kilosort2_config = f.readlines()
with (source_dir / 'kilosort2_channelmap.txt').open('r') as f:
kilosort2_channelmap = f.readlines()
nchan = recording.get_num_channels()
dat_file = (output_folder / (self.file_name.name + '.dat')).absolute()
kilo_thresh = p['detect_threshold']
sample_rate = recording.get_sampling_frequency()
if not Kilosort2Sorter.installed: