Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
on = self.o == 1
if adjust_t_ and np.any(on):
if not perform_update:
alpha, beta, gamma, scaling, t_ = self.get_vars()
t, tau, o = self.get_time_assignment()
loss = self.get_loss()
alt_t_ = t[on].max()
if 0 < alt_t_ < t_:
# alt_u0_, alt_s0_ = mRNA(alt_t_, 0, 0, alpha, beta, gamma)
alt_t_ += np.max(t) / len(t) * np.sum(t == t_)
# np.sum((self.u / self.scaling >= alt_u0_) | (self.s >= alt_s0_))
_time = self.get_time_assignment(alpha, beta, gamma, scaling, alt_t_)
alt_t, alt_tau, alt_o = _time
alt_loss = self.get_loss(alt_t, alt_t_, alpha, beta, gamma, scaling)
ut_cur = unspliced(t_, 0, alpha, beta)
ut_alt = unspliced(alt_t_, 0, alpha, beta)
min_loss = np.min([loss, loss_prev])
if alt_loss * 0.99 <= min_loss or ut_cur * 0.99 < ut_alt:
t, tau, o, t_, loss = alt_t, alt_tau, alt_o, alt_t_, alt_loss
perform_update = True
if False:
steady_states = t == t_
if perform_update and np.any(steady_states):
t_ += t.max() / len(t) * np.sum(steady_states)
_time = self.get_time_assignment(alpha, beta, gamma, scaling, t_)
t, tau, o = _time
loss = self.get_loss(t, t_, alpha, beta, gamma, scaling)
if perform_update:
alpha, beta, gamma, scaling, t_ = get_vars(adata[:, basis], key=key)
if "fit_u0" in adata.var.keys():
u0_offset, s0_offset = adata.var["fit_u0"][idx], adata.var["fit_s0"][idx]
else:
u0_offset, s0_offset = 0, 0
if t is None or isinstance(t, bool) or len(t) < adata.n_obs:
t = (
adata.obs[f"{key}_t"].values
if key == "true"
else adata.layers[f"{key}_t"][:, idx]
)
if extrapolate:
u0_ = unspliced(t_, 0, alpha, beta)
tmax = np.max(t) if True else tau_inv(u0_ * 1e-4, u0=u0_, alpha=0, beta=beta)
t = np.concatenate(
[np.linspace(0, t_, num=500), np.linspace(t_, tmax, num=500)]
)
tau, alpha, u0, s0 = vectorize(np.sort(t) if sort else t, t_, alpha, beta, gamma)
ut, st = mRNA(tau, u0, s0, alpha, beta, gamma)
ut, st = ut * scaling + u0_offset, st + s0_offset
return alpha, ut, st
if adjust_t_ and np.any(on):
if not perform_update:
alpha, beta, gamma, scaling, t_ = self.get_vars()
t, tau, o = self.get_time_assignment()
loss = self.get_loss()
alt_t_ = t[on].max()
if 0 < alt_t_ < t_:
# alt_u0_, alt_s0_ = mRNA(alt_t_, 0, 0, alpha, beta, gamma)
alt_t_ += np.max(t) / len(t) * np.sum(t == t_)
# np.sum((self.u / self.scaling >= alt_u0_) | (self.s >= alt_s0_))
_time = self.get_time_assignment(alpha, beta, gamma, scaling, alt_t_)
alt_t, alt_tau, alt_o = _time
alt_loss = self.get_loss(alt_t, alt_t_, alpha, beta, gamma, scaling)
ut_cur = unspliced(t_, 0, alpha, beta)
ut_alt = unspliced(alt_t_, 0, alpha, beta)
min_loss = np.min([loss, loss_prev])
if alt_loss * 0.99 <= min_loss or ut_cur * 0.99 < ut_alt:
t, tau, o, t_, loss = alt_t, alt_tau, alt_o, alt_t_, alt_loss
perform_update = True
if False:
steady_states = t == t_
if perform_update and np.any(steady_states):
t_ += t.max() / len(t) * np.sum(steady_states)
_time = self.get_time_assignment(alpha, beta, gamma, scaling, t_)
t, tau, o = _time
loss = self.get_loss(t, t_, alpha, beta, gamma, scaling)
if perform_update:
if scaling is not None:
def get_dynamics(adata, key="fit", extrapolate=False, sorted=False, t=None):
alpha, beta, gamma, scaling, t_ = get_vars(adata, key=key)
if extrapolate:
u0_ = unspliced(t_, 0, alpha, beta)
tmax = t_ + tau_inv(u0_ * 1e-4, u0=u0_, alpha=0, beta=beta)
t = np.concatenate(
[np.linspace(0, t_, num=500), t_ + np.linspace(0, tmax, num=500)]
)
elif t is None or t is True:
t = adata.obs[f"{key}_t"].values if key == "true" else adata.layers[f"{key}_t"]
tau, alpha, u0, s0 = vectorize(np.sort(t) if sorted else t, t_, alpha, beta, gamma)
ut, st = mRNA(tau, u0, s0, alpha, beta, gamma)
return alpha, ut, st