Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _read_talxfm(subject, subjects_dir, mode=None, verbose=None):
"""Compute MNI transform from FreeSurfer talairach.xfm file.
Adapted from freesurfer m-files. Altered to deal with Norig
and Torig correctly.
"""
if mode is not None:
warn('mode is deprecated and will be removed in 0.21, do not set it')
# Setup the RAS to MNI transform
ras_mni_t = read_ras_mni_t(subject, subjects_dir)
# We want to get from Freesurfer surface RAS ('mri') to MNI ('mni_tal').
# This file only gives us RAS (non-zero origin) ('ras') to MNI ('mni_tal').
# Se we need to get the ras->mri transform from the MRI headers.
# To do this, we get Norig and Torig
# (i.e. vox_ras_t and vox_mri_t, respectively)
path = op.join(subjects_dir, subject, 'mri', 'orig.mgz')
if not op.isfile(path):
path = op.join(subjects_dir, subject, 'mri', 'T1.mgz')
if not op.isfile(path):
raise IOError('mri not found: %s' % path)
_, _, mri_ras_t, _, _ = _read_mri_info(path, units='mm')
mri_mni_t = combine_transforms(mri_ras_t, ras_mni_t, 'mri', 'mni_tal')
from ..cov import whiten_evoked
if not type(evoked) in (tuple, list):
evoked = [evoked]
if type(color) in (tuple, list):
if len(color) != len(evoked):
raise ValueError('Lists of evoked objects and colors'
' must have the same length')
elif color is None:
colors = ['w'] + _get_color_list
stop = (slice(len(evoked)) if len(evoked) < len(colors)
else slice(len(colors)))
color = cycle(colors[stop])
if len(evoked) > len(colors):
warn('More evoked objects than colors available. You should pass '
'a list of unique colors.')
else:
color = cycle([color])
noise_cov = _check_cov(noise_cov, evoked[0].info)
if noise_cov is not None:
evoked = [whiten_evoked(e, noise_cov) for e in evoked]
else:
evoked = [e.copy() for e in evoked]
info = evoked[0].info
ch_names = evoked[0].ch_names
scalings = _handle_default('scalings', scalings)
if not all(e.ch_names == ch_names for e in evoked):
raise ValueError('All evoked.picks must be the same')
ch_names = _clean_names(ch_names)
if merge_grads:
def __init__(self, info, data, times, nave, aspect_kind,
first=None, last=None, comment='',
verbose=None): # noqa: D102
self.info = info
self.nave = nave
self._aspect_kind = aspect_kind
self.kind = _aspect_rev.get(aspect_kind, 'unknown')
self.comment = comment
self.times = times
if first is not None or last is not None:
warn(DeprecationWarning, 'first and last are deprecated, '
'do not pass them')
self.data = data
self.verbose = verbose
self.preload = True
self._update_first_last()
iter_n_components = np.arange(5, data.shape[1], 5)
from sklearn.decomposition import PCA, FactorAnalysis
if mode == 'factor_analysis':
est = FactorAnalysis
else:
assert mode == 'pca'
est = PCA
est = est(**method_params)
est.n_components = 1
scores = np.empty_like(iter_n_components, dtype=np.float64)
scores.fill(np.nan)
# make sure we don't empty the thing if it's a generator
max_n = max(list(deepcopy(iter_n_components)))
if max_n > data.shape[1]:
warn('You are trying to estimate %i components on matrix '
'with %i features.' % (max_n, data.shape[1]))
for ii, n in enumerate(iter_n_components):
est.n_components = n
try: # this may fail depending on rank and split
score = _cross_val(data=data, est=est, cv=cv, n_jobs=n_jobs)
except ValueError:
score = np.inf
if np.isinf(score) or score > 0:
logger.info('... infinite values encountered. stopping estimation')
break
logger.info('... rank: %i - loglik: %0.3f' % (n, score))
if score != -np.inf:
scores[ii] = score
if (ii >= 3 and np.all(np.diff(scores[ii - 3:ii]) < 0) and stop_early):
for (t, pos) in zip(times[np.logical_not(inuse)],
pos[np.logical_not(inuse)]):
msg += ' t={:.0f} ms, pos=({:.0f}, {:.0f}, {:.0f}) mm\n'.\
format(t * 1000., pos[0] * 1000.,
pos[1] * 1000., pos[2] * 1000.)
msg += len(head) * '#'
logger.error(msg)
raise ValueError('One or more dipoles outside the inner skull.')
# multiple dipoles (rr and nn) per time instant allowed
# uneven sampling in time returns list
timepoints = np.unique(times)
if len(timepoints) > 1:
tdiff = np.diff(timepoints)
if not np.allclose(tdiff, tdiff[0]):
warn('Unique time points of dipoles unevenly spaced: returned '
'stc will be a list, one for each time point.')
tstep = -1.0
else:
tstep = tdiff[0]
elif len(timepoints) == 1:
tstep = 0.001
# Build the data matrix, essentially a block-diagonal with
# n_rows: number of dipoles in total (dipole.amplitudes)
# n_cols: number of unique time points in dipole.times
# amplitude with identical value of times go together in one col (others=0)
data = np.zeros((len(amplitude), len(timepoints))) # (n_d, n_t)
row = 0
for tpind, tp in enumerate(timepoints):
amp = amplitude[np.in1d(times, tp)]
data[row:row + len(amp), tpind] = amp
def _find_unique_events(events):
"""Uniquify events (ie remove duplicated rows."""
e = np.ascontiguousarray(events).view(
np.dtype((np.void, events.dtype.itemsize * events.shape[1])))
_, idx = np.unique(e, return_index=True)
n_dupes = len(events) - len(idx)
if n_dupes > 0:
warn("Some events are duplicated in your different stim channels."
" %d events were ignored during deduplication." % n_dupes)
return events[idx]
ecg_idx = pick_types(inst.info, meg=False, eeg=False, stim=False,
eog=False, ecg=True, emg=False, ref_meg=False,
exclude='bads')
else:
if ch_name not in inst.ch_names:
raise ValueError('%s not in channel list (%s)' %
(ch_name, inst.ch_names))
ecg_idx = pick_channels(inst.ch_names, include=[ch_name])
if len(ecg_idx) == 0:
return None
# raise RuntimeError('No ECG channel found. Please specify ch_name '
# 'parameter e.g. MEG 1531')
if len(ecg_idx) > 1:
warn('More than one ECG channel found. Using only %s.'
% inst.ch_names[ecg_idx[0]])
return ecg_idx[0]
info = deepcopy(info)
evoked = EvokedArray(data, info, tmin=self.times[0], comment=comment,
nave=n_events, kind=kind, verbose=self.verbose)
# XXX: above constructor doesn't recreate the times object precisely
evoked.times = self.times.copy()
# pick channels
picks = _picks_to_idx(self.info, picks, 'data_or_ica', ())
ch_names = [evoked.ch_names[p] for p in picks]
evoked.pick_channels(ch_names)
if len(evoked.info['ch_names']) == 0:
raise ValueError('No data channel found when averaging.')
if evoked.nave < 1:
warn('evoked object is empty (based on less than 1 epoch)')
return evoked
value_pattern = r"\-?\d+\.?\d*e?\-?\d*"
coord_pattern = r"({0})\s+({0})\s+({0})\s*$".format(value_pattern)
if ext == '.hsp':
coord_pattern = '^' + coord_pattern
points_str = [m.groups() for m in re.finditer(coord_pattern, file_str,
re.MULTILINE)]
dig_points = np.array(points_str, dtype=float)
elif ext == '.mat': # like FastScan II
from scipy.io import loadmat
dig_points = loadmat(fname)['Points'].T
else:
dig_points = np.loadtxt(fname, comments=comments, ndmin=2)
if unit == 'auto':
unit = 'mm'
if dig_points.shape[1] > 3:
warn('Found %d columns instead of 3, using first 3 for XYZ '
'coordinates' % (dig_points.shape[1],))
dig_points = dig_points[:, :3]
if dig_points.shape[-1] != 3:
raise ValueError(
'Data must be of shape (n, 3) instead of %s' % (dig_points.shape,))
if unit == 'mm':
dig_points /= 1000.
elif unit == 'cm':
dig_points /= 100.
return dig_points
def _fit_dipole_fixed(min_dist_to_inner_skull, B_orig, t, guess_rrs,
guess_data, fwd_data, whitener,
fmin_cobyla, ori, rank):
"""Fit a data using a fixed position."""
B = np.dot(whitener, B_orig)
B2 = np.dot(B, B)
if B2 == 0:
warn('Zero field found for time %s' % t)
return np.zeros(3), 0, np.zeros(3), 0, np.zeros(6)
# Compute the dipole moment
Q, gof, residual_noproj = _fit_Q(guess_data, whitener, B, B2, B_orig,
rd=None, ori=ori)[:3]
if ori is None:
amp = np.sqrt(np.dot(Q, Q))
norm = 1. if amp == 0. else amp
ori = Q / norm
else:
amp = np.dot(Q, ori)
rd_final = guess_rrs[0]
# This will be slow, and we don't use it anyway, so omit it for now:
# conf = _fit_confidence(rd_final, Q, ori, whitener, fwd_data)
conf = khi2 = nfree = None
# No corresponding 'logger' message here because it should go *very* fast
return rd_final, amp, ori, gof, conf, khi2, nfree, residual_noproj