Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def psd_fft(varr):
_T = len(varr.coords['frame'])
ns = _T // 2 + 1
if _T % 2 == 0:
freq_crd = np.linspace(0, 0.5, ns)
else:
freq_crd = np.linspace(0, 0.5 * (_T - 1) / _T, ns)
print("computing psd of input")
varr_fft = xr.apply_ufunc(
fftw.rfft,
varr.chunk(dict(frame=-1)),
input_core_dims=[['frame']],
output_core_dims=[['freq']],
dask='allowed',
output_sizes=dict(freq=ns),
output_dtypes=[np.complex_])
varr_fft = varr_fft.assign_coords(freq=freq_crd)
varr_psd = 1 / _T * np.abs(varr_fft)**2
return varr_psd
def interpolate(ds, dim='time'):
def _interpolate1d(y):
nan = np.isnan(y)
if nan.all(): return y
x = lambda z: z.nonzero()[0]
y[nan] = np.interp(x(nan), x(~nan), y[~nan])
return y
def _interpolate(a):
return a.map_blocks(partial(np.apply_along_axis, _interpolate1d, -1), dtype=a.dtype)
data_vars = ds.data_vars.values() if isinstance(ds, xr.Dataset) else (ds,)
dtypes = {da.dtype for da in data_vars}
assert len(dtypes) == 1, "interpolate only supports datasets with homogeneous dtype"
return xr.apply_ufunc(_interpolate, ds,
input_core_dims=[[dim]],
output_core_dims=[[dim]],
output_dtypes=[dtypes.pop()],
output_sizes={dim: len(ds.indexes[dim])},
dask='allowed',
keep_attrs=True)
def unit_merge(A, C, add_list=None, thres_corr=0.9):
print("computing spatial overlap")
A_bl = ((A > 0).astype(np.float32)
.chunk(dict(unit_id='auto', height=-1, width=-1)))
A_ovlp = xr.apply_ufunc(
da.array.tensordot,
A_bl,
A_bl.rename(unit_id='unit_id_cp'),
input_core_dims=[['unit_id', 'height', 'width'],
['height', 'width', 'unit_id_cp']],
output_core_dims=[['unit_id', 'unit_id_cp']],
dask='allowed',
kwargs=dict(axes=([1, 2], [0, 1])),
output_dtypes=[A_bl.dtype])
A_ovlp = A_ovlp.persist()
print("computing temporal correlation")
uid_idx = C.coords['unit_id'].values
corr = xr.apply_ufunc(
np.corrcoef,
C.compute(),
input_core_dims=[['unit_id', 'frame']],
"""
# Convert coord points to indexes of x.coords[dim]
idx = xarray.DataArray(
x.coords[xdim].searchsorted(c),
dims=c.dims, coords=c.coords)
# searchsorted(NaN) returns 0; replace it with -1.
# isnull('') returns False. We could have asked for None, however
# searchsorted will refuse to compare strings and None's
if c.dtype.kind == 'U':
idx = idx.where(c != '', -1)
else:
idx = idx.where(~c.isnull(), -1)
dtype = x.dtypes if isinstance(x, xarray.Dataset) else x.dtype
return xarray.apply_ufunc(
kernel, x, idx,
input_core_dims=[[xdim], [cdim]],
output_core_dims=[[]],
dask='parallelized',
output_dtypes=[dtype])
def rel_cent(im):
im_nan = np.isnan(im)
if im_nan.all():
return np.array([np.nan, np.nan])
if im_nan.any():
im = np.nan_to_num(im)
cent = np.array(center_of_mass(im))
return cent / im.shape
gu_rel_cent = da.gufunc(
rel_cent,
signature='(h,w)->(d)',
output_dtypes=float,
output_sizes=dict(d=2),
vectorize=True
)
cents = (xr.apply_ufunc(
gu_rel_cent, A.chunk(dict(height=-1, width=-1)),
input_core_dims=[['height', 'width']],
output_core_dims=[['dim']],
dask='allowed')
.assign_coords(dim=['height', 'width']))
if verbose:
print("computing centroids")
with ProgressBar():
cents=cents.compute()
cents_df = (cents.rename('cents').to_series().dropna()
.unstack('dim').rename_axis(None, axis='columns')
.reset_index())
h_rg = (A.coords['height'].min().values, A.coords['height'].max().values)
w_rg = (A.coords['width'].min().values, A.coords['width'].max().values)
cents_df['height'] = cents_df['height'] * (h_rg[1] - h_rg[0]) + h_rg[0]
cents_df['width'] = cents_df['width'] * (w_rg[1] - w_rg[0]) + w_rg[0]
output_core_dims=[['spatial', 'spatial_cp']],
dask='allowed',
output_sizes=dict(spatial_cp=len(sp_idxs)))
sd_id = np.ravel_multi_index(
(ih, iw),
(sur.sizes['height'], sur.sizes['width']))
corr = (corr.isel(spatial_cp=sd_id)
.squeeze().unstack('spatial'))
mask = corr > thres_corr
mask_lb = xr.apply_ufunc(da_label, mask, dask='allowed')
sd_lb = mask_lb.isel(height=ih, width=iw)
mask = (mask_lb == sd_lb)
sur = sur.where(mask, 0)
corr = corr.where(mask, 0)
corr_norm = corr / corr.sum()
C = xr.apply_ufunc(
da.tensordot, sur, corr_norm,
input_core_dims=[['frame', 'height', 'width'], ['height', 'width']],
output_core_dims=[['frame']],
kwargs=dict(axes=[(1, 2), (0, 1)]),
dask='allowed')
return corr, C
def remove_trend(obj):
time_nums = xr.DataArray(obj['time'].values.astype(np.float),
dims='time',
coords={'time': obj['time']},
name='time_nums')
trend = xr.apply_ufunc(_calc_slope, time_nums, obj,
vectorize=True,
input_core_dims=[['time'], ['time']],
output_core_dims=[[]],
output_dtypes=[np.float],
dask='parallelized')
trend_ts = (time_nums * trend).transpose(*obj.dims)
detrended = obj - trend_ts
return detrended, trend_ts
def remove_brightspot(varr, thres=3):
k_mean = ski.morphology.diamond(1)
k_mean[1, 1] = 0
k_mean = k_mean / 4
return xr.apply_ufunc(
remove_brightspot_perframe,
varr.chunk(dict(height=-1, width=-1)),
input_core_dims=[['height', 'width']],
output_core_dims=[['height', 'width']],
vectorize=True,
dask='parallelized',
kwargs=dict(k_mean=k_mean, thres=thres),
output_dtypes=[varr.dtype]).rename(varr.name + '_clean')
raise ValueError("template {} not understood".format(on))
onfm = xr.apply_ufunc(
truncate,
onfm,
input_core_dims=[['height', 'width']],
output_core_dims=[['height', 'width']],
kwargs=dict(w=max_shift)).chunk()
src_fft = xr.apply_ufunc(
darr.fft.fft2,
onfm,
input_core_dims=[['height', 'width']],
output_core_dims=[['height_pad', 'width_pad']],
dask='allowed',
kwargs=dict(s=pad_s)).persist()
print("estimating shifts")
res = xr.apply_ufunc(
shift_fft,
src_fft,
varr,
input_core_dims=[['height_pad', 'width_pad'], ['height', 'width']],
output_core_dims=[['variable']],
vectorize=True,
dask='parallelized',
output_dtypes=[float],
output_sizes=dict(variable=3))
res = res.assign_coords(variable=['height', 'width', 'corr'])
return res
def gmm_refine(varr, seeds, q=(0.1, 99.9), n_components=2, valid_components=1, mean_mask=True):
print("selecting seeds")
varr_sub = varr.sel(
spatial=[tuple(hw) for hw in seeds[['height', 'width']].values])
print("computing peak-valley values")
varr_valley = xr.apply_ufunc(
np.percentile,
varr_sub.chunk(dict(frame=-1)),
input_core_dims=[['frame']],
kwargs=dict(q=q[0], axis=-1),
dask='parallelized',
output_dtypes=[varr_sub.dtype])
varr_peak = xr.apply_ufunc(
np.percentile,
varr_sub.chunk(dict(frame=-1)),
input_core_dims=[['frame']],
kwargs=dict(q=q[1], axis=-1),
dask='parallelized',
output_dtypes=[varr_sub.dtype])
varr_pv = varr_peak - varr_valley
varr_pv = varr_pv.compute()
print("fitting GMM models")