Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_weights_can_be_floats(self):
n = 100
T = np.random.binomial(40, 0.5, n)
E = np.random.binomial(1, 0.9, n)
with pytest.warns(StatisticalWarning) as w:
kmf = KaplanMeierFitter().fit(T, E, weights=np.random.random(n))
assert True
def test_kaplan_meier_vs_lifelines(n, p_cens):
from lifelines import KaplanMeierFitter
np.random.seed(0)
durations = np.random.uniform(0, 100, n)
events = np.random.binomial(1, 1 - p_cens, n).astype('float')
km = utils.kaplan_meier(durations, events)
kmf = KaplanMeierFitter().fit(durations, events).survival_function_['KM_estimate']
assert km.shape == kmf.shape
assert (km - kmf).abs().max() < 1e-14
assert (km.index == kmf.index).all()
def test_shifting_durations_doesnt_affect_survival_function_values(self):
T = np.random.exponential(10, size=100)
kmf = KaplanMeierFitter()
expected = kmf.fit(T).survival_function_.values
T_shifted = T + 100
npt.assert_allclose(expected, kmf.fit(T_shifted).survival_function_.values)
T_shifted = T - 50
npt.assert_allclose(expected[1:], kmf.fit(T_shifted).survival_function_.values)
T_shifted = T - 200
npt.assert_allclose(expected[1:], kmf.fit(T_shifted).survival_function_.values)
def test_sort_doesnt_affect_kmf(self, sample_lifetimes):
T, _ = sample_lifetimes
kmf = KaplanMeierFitter()
assert_frame_equal(kmf.fit(T).survival_function_, kmf.fit(sorted(T)).survival_function_)
df["c"] = (np.where(df["t_cens"] <= Ct, 1, 0)) * (np.where(df["t_cens"] < df["t"], 1, 0))
df["y"] = (
(np.where(df["t"] > df["t_enter"], 1, 0))
* (np.where(df["t_cens"] > df["t_enter"], 1, 0))
* (np.where(Ct > df["t_enter"], 1, 0))
)
dfo = df.loc[df["y"] == 1].copy() # "observed data"
# Fitting KM to full data
km1 = KaplanMeierFitter()
km1.fit(df["t_out"], event_observed=df["d"])
rf = pd.DataFrame(index=km1.survival_function_.index)
rf["KM_true"] = km1.survival_function_
# Fitting KM to "observed" data
km2 = KaplanMeierFitter()
km2.fit(dfo["t_out"], entry=dfo["t_enter"], event_observed=dfo["d"])
rf["KM_lifelines_latest"] = km2.survival_function_
# Version of KM where late entries occur after
rf["KM_lateenterafter"] = np.cumprod(
1 - (km2.event_table.observed / (km2.event_table.at_risk - km2.event_table.entrance))
)
# drop the first NA from comparison
rf = rf.dropna()
npt.assert_allclose(rf["KM_true"].values, rf["KM_lateenterafter"].values, rtol=10e-2)
npt.assert_allclose(rf["KM_lifelines_latest"].values, rf["KM_lateenterafter"].values, rtol=10e-2)
npt.assert_allclose(rf["KM_lifelines_latest"].values, rf["KM_true"].values, rtol=10e-2)
def get_kmf_fit(qs):
t = qs.values_list('days_since_complaint', flat=True)
c = qs.values_list('is_closed', flat=True)
kmf = KaplanMeierFitter()
kmf.fit(t, event_observed=c)
return kmf
def fit_plot(T1, T2, E1, E2, title, unit_of_time, label1, label2):
kmf1 = KaplanMeierFitter()
kmf2 = KaplanMeierFitter()
ax = kmf1.fit(T1, E1, label=label1, alpha=0.05).plot(show_censors=True)
ax = kmf2.fit(T2, E2, label=label2, alpha=0.05).plot(ax=ax, show_censors=True)
ax.set_title(title)
if unit_of_time:
plt.xlabel(f'timeline ({unit_of_time})')
lifelines.plotting.add_at_risk_counts(kmf1, kmf2, ax=ax, labels=None)
figname = ax.figure.canvas.get_window_title()
ax.figure.canvas.set_window_title(f'Party {mpc.pid} - {figname}')
return kmf1, kmf2
"km": lambda t, c, w: 1 - KaplanMeierFitter().fit(t, c, weights=w).survival_function_.loc[t, "KM_estimate"],
}
if event_observed:
# find observation boolean value for every duration
E = event_observed[0].merge(sub_df, how='right', on='id')
E = [not x for x in pd.isnull(E['value_x'])]
assert len(E) == len(T)
if estimator == 'NelsonAalen':
fitter = NelsonAalenFitter()
fitter.fit(durations=T, event_observed=E)
estimate = fitter.cumulative_hazard_[
'NA_estimate'].tolist()
ci_lower = fitter.confidence_interval_[
'NA_estimate_lower_0.95'].tolist()
ci_upper = fitter.confidence_interval_[
'NA_estimate_upper_0.95'].tolist()
elif estimator == 'KaplanMeier':
fitter = KaplanMeierFitter()
fitter.fit(durations=T, event_observed=E)
# noinspection PyUnresolvedReferences
estimate = fitter.survival_function_[
'KM_estimate'].tolist()
ci_lower = fitter.confidence_interval_[
'KM_estimate_lower_0.95'].tolist()
ci_upper = fitter.confidence_interval_[
'KM_estimate_upper_0.95'].tolist()
else:
error = 'Unknown estimator: {}'.format(estimator)
logger.exception(error)
raise ValueError(error)
timeline = fitter.timeline.tolist()
if not stats.get(category):
stats[category] = {}
stats[category][subset] = {
"""
"""
from lifelines import KaplanMeierFitter
if ax is None:
ax = plt.gca()
if timeline is None:
timeline = model.timeline
COL_EMP = "empirical CDF"
if CensoringType.is_left_censoring(model):
empirical_kmf = KaplanMeierFitter().fit_left_censoring(
model.durations, model.event_observed, label=COL_EMP, timeline=timeline
)
elif CensoringType.is_right_censoring(model):
empirical_kmf = KaplanMeierFitter().fit_right_censoring(
model.durations, model.event_observed, label=COL_EMP, timeline=timeline
)
elif CensoringType.is_interval_censoring(model):
raise NotImplementedError("lifelines does not have a non-parametric interval model yet.")
empirical_kmf.plot_cumulative_density(ax=ax, **plot_kwargs)
dist = get_distribution_name_of_lifelines_model(model)
dist_object = create_scipy_stats_model_from_lifelines_model(model)
ax.plot(timeline, dist_object.cdf(timeline), label="fitted %s" % dist, **plot_kwargs)
ax.legend()
return ax