Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
`name_to_codes` dictionary.
Returns
-------
cnt_y: 2d-array
Timeseries of one-hot-labels, time x classes.
trial_bounds: list of (int,int)
List of (trial_start, trial_stop) tuples.
"""
assert np.array_equal(
list(name_to_start_codes.keys()), list(name_to_stop_codes.keys())
)
events = np.asarray(events)
ival_in_samples = ms_to_samples(np.array(epoch_ival_ms), fs)
start_offset = np.int32(np.round(ival_in_samples[0]))
# we will use ceil but exclusive...
stop_offset = np.int32(np.ceil(ival_in_samples[1]))
start_code_to_name_and_y = _to_mrk_code_to_name_and_y(name_to_start_codes)
# Ensure all stop marker codes are iterables
for name in name_to_stop_codes:
codes = name_to_stop_codes[name]
if not hasattr(codes, "__len__"):
name_to_stop_codes[name] = [codes]
all_stop_codes = np.concatenate(list(name_to_stop_codes.values())).astype(
np.int64
)
class_to_n_trials = Counter()
n_classes = len(name_to_start_codes)
cnt_y = np.zeros((n_samples, n_classes), dtype=np.int64)
max_break_length_ms: number, optional
Maximum length in milliseconds a break can have to be included.
break_start_offset_ms: number, optional
What offset from trial end to start of the break in ms.
break_stop_offset_ms: number, optional
What offset from next trial start end to previous break end in ms.
Returns
-------
events: 2d-array
Events with break start and stop markers.
"""
min_samples = (
None
if min_break_length_ms is None
else ms_to_samples(min_break_length_ms, fs)
)
max_samples = (
None
if max_break_length_ms is None
else ms_to_samples(max_break_length_ms, fs)
)
orig_events = events
break_starts, break_stops = _extract_break_start_stop_ms(
events, name_to_start_codes, name_to_stop_codes
)
break_durations = break_stops - break_starts
valid_mask = np.array([True] * len(break_starts))
if min_samples is not None:
valid_mask[break_durations < min_samples] = False
if max_samples is not None:
What offset from next trial start end to previous break end in ms.
Returns
-------
events: 2d-array
Events with break start and stop markers.
"""
min_samples = (
None
if min_break_length_ms is None
else ms_to_samples(min_break_length_ms, fs)
)
max_samples = (
None
if max_break_length_ms is None
else ms_to_samples(max_break_length_ms, fs)
)
orig_events = events
break_starts, break_stops = _extract_break_start_stop_ms(
events, name_to_start_codes, name_to_stop_codes
)
break_durations = break_stops - break_starts
valid_mask = np.array([True] * len(break_starts))
if min_samples is not None:
valid_mask[break_durations < min_samples] = False
if max_samples is not None:
valid_mask[break_durations > max_samples] = False
if sum(valid_mask) == 0:
return deepcopy(events)
break_starts = break_starts[valid_mask]
break_stops = break_stops[valid_mask]
def _create_cnt_y_and_trial_bounds_from_start_and_ival(
n_samples, events, fs, name_to_start_codes, epoch_ival_ms
):
ival_in_samples = ms_to_samples(np.array(epoch_ival_ms), fs)
start_offset = np.int32(np.round(ival_in_samples[0]))
# we will use ceil but exclusive...
stop_offset = np.int32(np.ceil(ival_in_samples[1]))
mrk_code_to_name_and_y = _to_mrk_code_to_name_and_y(name_to_start_codes)
class_to_n_trials = Counter()
n_classes = len(name_to_start_codes)
cnt_y = np.zeros((n_samples, n_classes), dtype=np.int64)
i_start_stops = []
for i_sample, mrk_code in zip(events[:, 0], events[:, 1]):
start_sample = int(i_sample) + start_offset
stop_sample = int(i_sample) + stop_offset
if mrk_code in mrk_code_to_name_and_y:
if start_sample < 0:
log.warning(
"Ignore trial with marker code {:d}, would start at "
"sample {:d}".format(mrk_code, start_sample)
break_starts, break_stops = _extract_break_start_stop_ms(
events, name_to_start_codes, name_to_stop_codes
)
break_durations = break_stops - break_starts
valid_mask = np.array([True] * len(break_starts))
if min_samples is not None:
valid_mask[break_durations < min_samples] = False
if max_samples is not None:
valid_mask[break_durations > max_samples] = False
if sum(valid_mask) == 0:
return deepcopy(events)
break_starts = break_starts[valid_mask]
break_stops = break_stops[valid_mask]
if break_start_offset_ms is not None:
break_starts += int(round(ms_to_samples(break_start_offset_ms, fs)))
if break_stop_offset_ms is not None:
break_stops += int(round(ms_to_samples(break_stop_offset_ms, fs)))
break_events = np.zeros((len(break_starts) * 2, 2))
break_events[0::2, 0] = break_starts
break_events[1::2, 0] = break_stops
break_events[0::2, 1] = break_start_code
break_events[1::2, 1] = break_stop_code
new_events = np.concatenate((orig_events, break_events))
# sort events
sort_order = np.argsort(new_events[:, 0], kind="mergesort")
new_events = new_events[sort_order]
return new_events