Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def main(params, nb_cpu, nb_gpu, use_gpu, file_name, benchmark, sim_same_elec):
"""
Useful tool to create synthetic datasets for benchmarking.
Arguments
---------
benchmark : {'fitting', 'clustering', 'synchrony', 'pca-validation', 'smart-search', 'drifts'}
"""
if sim_same_elec is None:
sim_same_elec = 0.8
logger = init_logging(params.logfile)
logger = logging.getLogger('circus.benchmarking')
numpy.random.seed(265)
file_name = os.path.abspath(file_name)
data_path = os.path.dirname(file_name)
data_suff, ext = os.path.splitext(os.path.basename(file_name))
file_out, ext = os.path.splitext(file_name)
if ext == '':
ext = '.dat'
file_name += ext
if ext != '.dat':
if comm.rank == 0:
print_and_log(['Benchmarking produces raw files: select a .dat extension'], 'error', logger)
sys.exit(0)
def main(params, nb_cpu, nb_gpu, use_gpu, extension):
assert comm.rank == 0
input_extension = extension
logger = init_logging(params.logfile)
logger = logging.getLogger('circus.deconverting')
# Retrieve parameters.
input_path = params.get('data', 'file_out_suff') + input_extension + '.GUI'
output_path = params.get('data', 'file_out_suff')
output_extension = '-deconverted'
clusters_path = output_path + '.clusters{}.hdf5'.format(output_extension)
templates_path = output_path + '.templates{}.hdf5'.format(output_extension)
result_path = output_path + '.result{}.hdf5'.format(output_extension)
# Check if input path exists.
if not os.path.isdir(input_path):
print_and_log([
"Can't find directory: {}".format(input_path),
"You must first export your results into the phy format (use the converting method)."
], 'error', logger)
sys.exit(0)
parser.add_argument('-t', '--trig', help='Search for all .trig files', action='store_true')
if len(argv) == 0:
parser.print_help()
sys.exit()
args = parser.parse_args(argv)
folders_file = os.path.abspath(args.folders)
output = os.path.abspath(args.output)
extension = args.extension
filename, ext = os.path.splitext(os.path.basename(folders_file))
logger = init_logging(filename + '.log')
logger = logging.getLogger(__name__)
if not os.path.exists(folders_file):
print_and_log(['The folder file %s does not exists!' %folders_file], 'error', logger)
sys.exit(0)
try:
folders = []
myfile = open(folders_file, 'r')
lines = myfile.readlines()
myfile.close()
for l in lines:
folders += [os.path.abspath(l.strip())]
except Exception:
print_and_log(['Check the syntax of the folder file'], 'error', logger)
sys.exit(0)
parser.add_argument('datafile', help='data file')
parser.add_argument('-e', '--extension', help='extension to consider for visualization',
default='')
if len(argv) == 0:
parser.print_help()
sys.exit()
args = parser.parse_args(argv)
filename = os.path.abspath(args.datafile)
extension = args.extension
params = CircusParser(filename)
if os.path.exists(params.logfile):
os.remove(params.logfile)
logger = init_logging(params.logfile)
logger = logging.getLogger(__name__)
data_file = params.get_data_file()
data_dtype = data_file.data_dtype
gain = data_file.gain
t_start = data_file.t_start
file_format = data_file.description
if file_format not in supported_by_matlab:
print_and_log(["File format %s is not supported by MATLAB. Waveforms disabled" %file_format], 'info', logger)
if numpy.iterable(gain):
print_and_log(['Multiple gains are not supported, using a default value of 1'], 'info', logger)
gain = 1
file_out_suff = params.get('data', 'file_out_suff')
if data_file.params.has_key('data_offset'):
default='')
if len(argv) == 0:
parser.print_help()
sys.exit()
args = parser.parse_args(argv)
filename = os.path.abspath(args.datafile)
extension = args.extension
if extension != '':
extension = '-' + extension
params = CircusParser(filename)
if os.path.exists(params.logfile):
os.remove(params.logfile)
logger = init_logging(params.logfile)
logger = logging.getLogger(__name__)
file_out_suff = params.get('data', 'file_out_suff')
if params.get('data', 'stream_mode') in ['None', 'none']:
print_and_log(['No streams in the datafile!'], 'error', logger)
sys.exit(1)
data_file = params.get_data_file()
result = circus.shared.files.get_results(params, extension=extension)
times = []
for source in data_file._sources:
times += [[source.t_start, source.t_stop]]
sub_results = slice_result(result, times)
for count, result in enumerate(sub_results):
def main(params, nb_cpu, nb_gpu, use_gpu):
#################################################################
#params = detect_memory(params)
logger = init_logging(params.logfile)
SHARED_MEMORY = get_shared_memory_flag(params)
logger = logging.getLogger('circus.fitting')
data_file = params.data_file
N_e = params.getint('data', 'N_e')
N_total = params.nb_channels
N_t = params.getint('detection', 'N_t')
template_shift = params.getint('detection', 'template_shift')
file_out = params.get('data', 'file_out')
file_out_suff = params.get('data', 'file_out_suff')
sign_peaks = params.get('detection', 'peaks')
matched_filter = params.getboolean('detection', 'matched-filter')
spike_thresh = params.getfloat('detection', 'spike_thresh')
spike_width = params.getfloat('detection', 'spike_width')
dist_peaks = params.getint('detection', 'dist_peaks')
do_temporal_whitening = params.getboolean('whitening', 'temporal')
do_spatial_whitening = params.getboolean('whitening', 'spatial')
def main(params, nb_cpu, nb_gpu, use_gpu):
logger = init_logging(params.logfile)
logger = logging.getLogger('circus.gathering')
io.collect_data(nb_cpu, params, erase=False)
args = parser.parse_args(argv)
# if args.window is None:
# window_file = None
# else:
# window_file = os.path.abspath(args.window)
filename = os.path.abspath(args.datafile)
params = CircusParser(filename)
dead_in_ms = params.getboolean('triggers', 'dead_in_ms')
trig_in_ms = params.getboolean('triggers', 'trig_in_ms')
if os.path.exists(params.logfile):
os.remove(params.logfile)
logger = init_logging(params.logfile)
logger = logging.getLogger(__name__)
if params.get('data', 'stream_mode') == 'multi-files':
data_file = params.get_data_file(source=True, has_been_created=False)
all_times_dead = numpy.zeros((0, 2), dtype=numpy.int64)
all_times_trig = numpy.zeros((0, 2), dtype=numpy.int64)
for f in data_file._sources:
name, ext = os.path.splitext(f.file_name)
dead_file = f.file_name.replace(ext, '.dead')
trig_file = f.file_name.replace(ext, '.trig')
if os.path.exists(dead_file):
print_and_log(['Found file %s' %dead_file], 'default', logger)
times = get_dead_times(dead_file, data_file.sampling_rate, dead_in_ms)
if times.max() > f.duration or times.min() < 0:
def main(params, nb_cpu, nb_gpu, use_gpu):
numpy.random.seed(520)
parallel_hdf5 = get_parallel_hdf5_flag(params)
logger = init_logging(params.logfile)
logger = logging.getLogger('circus.clustering')
#################################################################
data_file = params.data_file
N_e = params.getint('data', 'N_e')
N_total = params.nb_channels
N_t = params.getint('detection', 'N_t')
dist_peaks = params.getint('detection', 'dist_peaks')
template_shift = params.getint('detection', 'template_shift')
file_out_suff = params.get('data', 'file_out_suff')
sign_peaks = params.get('detection', 'peaks')
alignment = params.getboolean('detection', 'alignment')
isolation = params.getboolean('detection', 'isolation')
over_factor = float(params.getint('detection', 'oversampling_factor'))
matched_filter = params.getboolean('detection', 'matched-filter')
spike_thresh = params.getfloat('detection', 'spike_thresh')
spike_width = params.getfloat('detection', 'spike_width')
def main(params, nb_cpu, nb_gpu, use_gpu):
logger = init_logging(params.logfile)
logger = logging.getLogger('circus.filtering')
#################################################################
do_filter = params.getboolean('filtering', 'filter')
filter_done = check_if_done(params, 'filter_done', logger)
artefacts_done = check_if_done(params, 'artefacts_done', logger)
median_done = check_if_done(params, 'median_done', logger)
ground_done = check_if_done(params, 'ground_done', logger)
clean_artefact = params.getboolean('triggers', 'clean_artefact')
remove_median = params.getboolean('filtering', 'remove_median')
common_ground = params.getint('filtering', 'common_ground')
remove_ground = common_ground >= 0
nodes, edges = get_nodes_and_edges(params)
#################################################################
def filter_file(data_file_in, data_file_out, do_filtering, do_remove_median, do_remove_ground):
"""