Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_print_summary_with_decimals(self, rossi, cph):
import sys
saved_stdout = sys.stdout
try:
out = StringIO()
sys.stdout = out
cph = CoxPHFitter()
cph.fit(rossi, duration_col="week", event_col="arrest", batch_mode=True)
cph._time_fit_was_called = "2018-10-23 02:40:45 UTC"
cph.print_summary(decimals=1)
output_dec_1 = out.getvalue().strip().split()
cph.print_summary(decimals=3)
output_dec_3 = out.getvalue().strip().split()
assert output_dec_1 != output_dec_3
finally:
sys.stdout = saved_stdout
cph.fit(rossi, duration_col="week", event_col="arrest", batch_mode=False)
def test_proportional_hazard_test_with_log_transform():
cph = CoxPHFitter()
df = load_regression_dataset()
cph.fit(df, "T", "E")
results = stats.proportional_hazard_test(cph, df, time_transform="log")
npt.assert_allclose(results.summary.loc["var1"]["test_statistic"], 2.227627, rtol=1e-3)
npt.assert_allclose(results.summary.loc["var2"]["test_statistic"], 0.714427, rtol=1e-3)
npt.assert_allclose(results.summary.loc["var3"]["test_statistic"], 1.466321, rtol=1e-3)
npt.assert_allclose(results.summary.loc["var3"]["p"], 0.225927, rtol=1e-3)
r$var
r$naive.var
residuals(r, type='dfbeta')
"""
df = pd.DataFrame(
{
"var1": [0.209325, 0.693919, 0.443804, 0.065636, 0.386294],
"var2": [0.184677, 0.071893, 1.364646, 0.098375, 1.663092],
"T": [1, 2, 3, 4, 5],
"var3": [2, 2, 2, 1, 2],
}
)
df["E"] = 1
cph = CoxPHFitter()
cph.fit(df, "T", "E", robust=True, weights_col="var3", show_progress=True)
expected = pd.Series({"var1": 1.431, "var2": -1.277})
assert_series_equal(cph.hazards_.T["coef"], expected, check_less_precise=2, check_names=False)
expected_cov = np.array([[3.5439245, -0.3549099], [-0.3549099, 0.4499553]])
npt.assert_array_almost_equal(
cph.variance_matrix_, expected_cov, decimal=1
) # not as precise because matrix inversion will accumulate estimation errors.
expected = pd.Series({"var1": 2.094, "var2": 0.452})
assert_series_equal(cph.summary["se(coef)"], expected, check_less_precise=2, check_names=False)
def test_output_with_strata_against_R(self, rossi):
"""
rossi <- read.csv('.../lifelines/datasets/rossi.csv')
r = coxph(formula = Surv(week, arrest) ~ fin + age + strata(race,
paro, mar, wexp) + prio, data = rossi)
"""
expected = np.array([[-0.3355, -0.0590, 0.1002]])
cf = CoxPHFitter()
cf.fit(
rossi, duration_col="week", event_col="arrest", strata=["race", "paro", "mar", "wexp"], show_progress=True
)
npt.assert_array_almost_equal(cf.hazards_.values, expected, decimal=4)
def test_p_value_against_Survival_Analysis_by_John_Klein_and_Melvin_Moeschberger(self):
# see table 8.1 in Survival Analysis by John P. Klein and Melvin L. Moeschberger, Second Edition
df = load_larynx()
cf = CoxPHFitter()
cf.fit(df, duration_col="time", event_col="death")
# p-values
actual_p = cf._compute_p_values()
expected_p = np.array([0.1847, 0.7644, 0.0730, 0.00])
npt.assert_array_almost_equal(actual_p, expected_p, decimal=2)
def test_weights_can_be_floats(self):
n = 100
T = np.random.binomial(40, 0.5, n)
E = np.random.binomial(1, 0.9, n)
with pytest.warns(StatisticalWarning) as w:
kmf = KaplanMeierFitter().fit(T, E, weights=np.random.random(n))
assert True
def test_kaplan_meier_vs_lifelines(n, p_cens):
from lifelines import KaplanMeierFitter
np.random.seed(0)
durations = np.random.uniform(0, 100, n)
events = np.random.binomial(1, 1 - p_cens, n).astype('float')
km = utils.kaplan_meier(durations, events)
kmf = KaplanMeierFitter().fit(durations, events).survival_function_['KM_estimate']
assert km.shape == kmf.shape
assert (km - kmf).abs().max() < 1e-14
assert (km.index == kmf.index).all()
def test_shifting_durations_doesnt_affect_survival_function_values(self):
T = np.random.exponential(10, size=100)
kmf = KaplanMeierFitter()
expected = kmf.fit(T).survival_function_.values
T_shifted = T + 100
npt.assert_allclose(expected, kmf.fit(T_shifted).survival_function_.values)
T_shifted = T - 50
npt.assert_allclose(expected[1:], kmf.fit(T_shifted).survival_function_.values)
T_shifted = T - 200
npt.assert_allclose(expected[1:], kmf.fit(T_shifted).survival_function_.values)
def test_sort_doesnt_affect_kmf(self, sample_lifetimes):
T, _ = sample_lifetimes
kmf = KaplanMeierFitter()
assert_frame_equal(kmf.fit(T).survival_function_, kmf.fit(sorted(T)).survival_function_)
df["c"] = (np.where(df["t_cens"] <= Ct, 1, 0)) * (np.where(df["t_cens"] < df["t"], 1, 0))
df["y"] = (
(np.where(df["t"] > df["t_enter"], 1, 0))
* (np.where(df["t_cens"] > df["t_enter"], 1, 0))
* (np.where(Ct > df["t_enter"], 1, 0))
)
dfo = df.loc[df["y"] == 1].copy() # "observed data"
# Fitting KM to full data
km1 = KaplanMeierFitter()
km1.fit(df["t_out"], event_observed=df["d"])
rf = pd.DataFrame(index=km1.survival_function_.index)
rf["KM_true"] = km1.survival_function_
# Fitting KM to "observed" data
km2 = KaplanMeierFitter()
km2.fit(dfo["t_out"], entry=dfo["t_enter"], event_observed=dfo["d"])
rf["KM_lifelines_latest"] = km2.survival_function_
# Version of KM where late entries occur after
rf["KM_lateenterafter"] = np.cumprod(
1 - (km2.event_table.observed / (km2.event_table.at_risk - km2.event_table.entrance))
)
# drop the first NA from comparison
rf = rf.dropna()
npt.assert_allclose(rf["KM_true"].values, rf["KM_lateenterafter"].values, rtol=10e-2)
npt.assert_allclose(rf["KM_lifelines_latest"].values, rf["KM_lateenterafter"].values, rtol=10e-2)
npt.assert_allclose(rf["KM_lifelines_latest"].values, rf["KM_true"].values, rtol=10e-2)