Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_log_rank_returns_None_if_equal_arrays():
T = np.random.exponential(5, size=200)
result = stats.logrank_test(T, T)
assert result.p_value > 0.05
C = np.random.binomial(1, 0.8, size=200)
result = stats.logrank_test(T, T, C, C)
assert result.p_value > 0.05
def test_equal_intensity_with_negative_data():
data1 = np.random.normal(0, size=(2000, 1))
data1 -= data1.mean()
data1 /= data1.std()
data2 = np.random.normal(0, size=(2000, 1))
data2 -= data2.mean()
data2 /= data2.std()
result = stats.logrank_test(data1, data2)
assert result.p_value > 0.05
def test_unequal_intensity_event_observed():
data1 = np.random.exponential(5, size=(2000, 1))
data2 = np.random.exponential(1, size=(2000, 1))
eventA = np.random.binomial(1, 0.5, size=(2000, 1))
eventB = np.random.binomial(1, 0.5, size=(2000, 1))
result = stats.logrank_test(data1, data2, event_observed_A=eventA, event_observed_B=eventB)
assert result.p_value < 0.05
def test_unequal_intensity_with_random_data():
data1 = np.random.exponential(5, size=(2000, 1))
data2 = np.random.exponential(1, size=(2000, 1))
test_result = stats.logrank_test(data1, data2)
assert test_result.p_value < 0.05
def test_logrank_test_output_against_R_2():
# from https://stat.ethz.ch/education/semesters/ss2011/seminar/contents/presentation_2.pdf
control_T = [1, 1, 2, 2, 3, 4, 4, 5, 5, 8, 8, 8, 8, 11, 11, 12, 12, 15, 17, 22, 23]
control_E = np.ones_like(control_T)
treatment_T = [6, 6, 6, 7, 10, 13, 16, 22, 23, 6, 9, 10, 11, 17, 19, 20, 25, 32, 32, 34, 25]
treatment_E = [1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
result = stats.logrank_test(control_T, treatment_T, event_observed_A=control_E, event_observed_B=treatment_E)
expected_p_value = 4.17e-05
assert abs(result.p_value - expected_p_value) < 0.0001
assert abs(result.test_statistic - 16.8) < 0.1
def cox_log_rank(hazardsdata, labels, survtime_all):
median = np.median(hazardsdata)
hazards_dichotomize = np.zeros([len(hazardsdata)], dtype=int)
hazards_dichotomize[hazardsdata > median] = 1
idx = hazards_dichotomize == 0
T1 = survtime_all[idx]
T2 = survtime_all[~idx]
E1 = labels[idx]
E2 = labels[~idx]
results = logrank_test(T1, T2, event_observed_A=E1, event_observed_B=E2)
pvalue_pred = results.p_value
return(pvalue_pred)
x_feature = x.reset_index(drop=True).iloc[:, feature]
score_opt = 0
split_val_opt = None
lhs_idxs = None
rhs_idxs = None
for split_val in x_feature.sort_values(ascending=True, kind="quicksort").unique():
feature1 = list(x_feature[x_feature <= split_val].index)
feature2 = list(x_feature[x_feature > split_val].index)
if len(feature1) < min_leaf or len(feature2) < min_leaf:
continue
durations_a = y.iloc[feature1, 0]
event_observed_a = y.iloc[feature1, 1]
durations_b = y.iloc[feature2, 0]
event_observed_b = y.iloc[feature2, 1]
results = logrank_test(durations_A=durations_a, durations_B=durations_b,
event_observed_A=event_observed_a, event_observed_B=event_observed_b)
score = results.test_statistic
if score > score_opt:
score_opt = round(score, 3)
split_val_opt = round(split_val, 3)
lhs_idxs = feature1
rhs_idxs = feature2
return score_opt, split_val_opt, lhs_idxs, rhs_idxs
ax.set_title(title)
elif print_as_title:
ax.set_title(' | '.join(grp_desc))
else:
[print(desc) for desc in grp_desc]
# axis labels
if xlabel:
ax.set_xlabel(xlabel)
if ylabel:
ax.set_ylabel(ylabel)
## summarize analytical version of results
## again using same groups as are plotted
if len(grp_names) == 2:
# use log-rank test for 2 groups
results = logrank_test(grp_survival_data[grp_names[0]],
grp_survival_data[grp_names[1]],
event_observed_A=grp_event_data[grp_names[0]],
event_observed_B=grp_event_data[grp_names[1]])
elif len(grp_names) == 1:
# no analytical result for 1 or 0 groups
results = NullSurvivalResults()
else:
# cox PH fitter for >2 groups
cf = CoxPHFitter()
cox_df = patsy.dmatrix('+'.join([condition_col, survival_col,
censor_col]),
df, return_type='dataframe')
del cox_df['Intercept']
results = cf.fit(cox_df, survival_col, event_col=censor_col)
results.print_summary()
# add metadata to results object so caller can print them
censor_col,
survival_col,
threshold=None):
if threshold is not None:
if threshold == "median":
threshold = df[condition_col].median()
condition = df[condition_col] > threshold
else:
condition = df[condition_col]
df_with_condition = df[condition]
df_no_condition = df[~condition]
survival_no_condition = df_no_condition[survival_col]
survival_with_condition = df_with_condition[survival_col]
event_no_condition = (df_no_condition[censor_col].astype(bool))
event_with_condition = (df_with_condition[censor_col].astype(bool))
return logrank_test(survival_no_condition,
survival_with_condition,
event_observed_A=event_no_condition,
event_observed_B=event_with_condition)