Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def fit(self, X, p, treatment, y, verbose=True):
"""Fit the treatment effect and outcome models of the R learner.
Args:
X (np.matrix or np.array or pd.Dataframe): a feature matrix
p (np.ndarray or pd.Series or dict): an array of propensity scores of float (0,1) in the single-treatment
case; or, a dictionary of treatment groups that map to propensity vectors of float (0,1)
treatment (np.array or pd.Series): a treatment vector
y (np.array or pd.Series): an outcome vector
"""
X, treatment, y = convert_pd_to_np(X, treatment, y)
check_treatment_vector(treatment, self.control_name)
self.t_groups = np.unique(treatment[treatment != self.control_name])
self.t_groups.sort()
check_p_conditions(p, self.t_groups)
if isinstance(p, (np.ndarray, pd.Series)):
treatment_name = self.t_groups[0]
p = {treatment_name: convert_pd_to_np(p)}
elif isinstance(p, dict):
p = {treatment_name: convert_pd_to_np(_p) for treatment_name, _p in p.items()}
self._classes = {group: i for i, group in enumerate(self.t_groups)}
self.models_tau = {group: deepcopy(self.model_tau) for group in self.t_groups}
self.vars_c = {}
self.vars_t = {}
if verbose:
Args:
X (np.matrix or np.array or pd.Dataframe): a feature matrix
treatment (np.array or pd.Series): a treatment vector
y (np.array or pd.Series): an outcome vector
return_ci (bool): whether to return confidence intervals
n_bootstraps (int): number of bootstrap iterations
bootstrap_size (int): number of samples per bootstrap
return_components (bool, optional): whether to return outcome for treatment and control seperately
verbose (str): whether to output progress logs
Returns:
(numpy.ndarray): Predictions of treatment effects. Output dim: [n_samples, n_treatment].
If return_ci, returns CATE [n_samples, n_treatment], LB [n_samples, n_treatment],
UB [n_samples, n_treatment]
"""
X, treatment, y = convert_pd_to_np(X, treatment, y)
self.fit(X, treatment, y)
te = self.predict(X, treatment, y, return_components=return_components)
if not return_ci:
return te
else:
t_groups_global = self.t_groups
_classes_global = self._classes
models_c_global = deepcopy(self.models_c)
models_t_global = deepcopy(self.models_t)
te_bootstraps = np.zeros(shape=(X.shape[0], self.t_groups.shape[0], n_bootstraps))
logger.info('Bootstrap Confidence Intervals')
for i in tqdm(range(n_bootstraps)):
te_b = self.bootstrap(X, treatment, y, size=bootstrap_size)
te_bootstraps[:, :, i] = te_b
def fit(self, X, treatment, y):
"""Fit the inference model.
Args:
X (np.matrix or np.array or pd.Dataframe): a feature matrix
treatment (np.array or pd.Series): a treatment vector
y (np.array or pd.Series): an outcome vector
"""
X, treatment, y = convert_pd_to_np(X, treatment, y)
check_treatment_vector(treatment, self.control_name)
self.t_groups = np.unique(treatment[treatment != self.control_name])
self.t_groups.sort()
self._classes = {group: i for i, group in enumerate(self.t_groups)}
self.models_mu_c = {group: deepcopy(self.model_mu_c) for group in self.t_groups}
self.models_mu_t = {group: deepcopy(self.model_mu_t) for group in self.t_groups}
self.models_tau_c = {group: deepcopy(self.model_tau_c) for group in self.t_groups}
self.models_tau_t = {group: deepcopy(self.model_tau_t) for group in self.t_groups}
self.vars_c = {}
self.vars_t = {}
for group in self.t_groups:
mask = (treatment == group) | (treatment == self.control_name)
treatment_filt = treatment[mask]
X_filt = X[mask]
y_filt = y[mask]
def predict(self, X, treatment=None, y=None, return_components=False, verbose=True):
"""Predict treatment effects.
Args:
X (np.matrix or np.array or pd.Dataframe): a feature matrix
treatment (np.array or pd.Series, optional): a treatment vector
y (np.array or pd.Series, optional): an outcome vector
return_components (bool, optional): whether to return outcome for treatment and control seperately
verbose (bool, optional): whether to output progress logs
Returns:
(numpy.ndarray): Predictions of treatment effects.
"""
X, treatment, y = convert_pd_to_np(X, treatment, y)
yhat_cs = {}
yhat_ts = {}
for group in self.t_groups:
model = self.models[group]
# set the treatment column to zero (the control group)
X_new = np.hstack((np.zeros((X.shape[0], 1)), X))
yhat_cs[group] = model.predict(X_new)
# set the treatment column to one (the treatment group)
X_new[:, 0] = 1
yhat_ts[group] = model.predict(X_new)
if (y is not None) and (treatment is not None) and verbose:
mask = (treatment == group) | (treatment == self.control_name)
def predict(self, X, treatment=None, y=None, return_components=False, verbose=True):
"""Predict treatment effects.
Args:
X (np.matrix or np.array or pd.Dataframe): a feature matrix
treatment (np.array or pd.Series, optional): a treatment vector
y (np.array or pd.Series, optional): an outcome vector
return_components (bool, optional): whether to return outcome for treatment and control seperately
Returns:
(numpy.ndarray): Predictions of treatment effects.
"""
X, treatment, y = convert_pd_to_np(X, treatment, y)
yhat_cs = {}
yhat_ts = {}
for group in self.t_groups:
model_c = self.models_c[group]
model_t = self.models_t[group]
yhat_cs[group] = model_c.predict(X)
yhat_ts[group] = model_t.predict(X)
if (y is not None) and (treatment is not None) and verbose:
mask = (treatment == group) | (treatment == self.control_name)
treatment_filt = treatment[mask]
y_filt = y[mask]
w = (treatment_filt == group).astype(int)
yhat = np.zeros_like(y_filt, dtype=float)
X (np.matrix or np.array or pd.Dataframe): a feature matrix
p (np.ndarray or pd.Series or dict): an array of propensity scores of float (0,1) in the single-treatment
case; or, a dictionary of treatment groups that map to propensity vectors of float (0,1)
treatment (np.array or pd.Series, optional): a treatment vector
y (np.array or pd.Series, optional): an outcome vector
Returns:
(numpy.ndarray): Predictions of treatment effects.
"""
X, treatment, y = convert_pd_to_np(X, treatment, y)
check_p_conditions(p, self.t_groups)
if isinstance(p, (np.ndarray, pd.Series)):
treatment_name = self.t_groups[0]
p = {treatment_name: convert_pd_to_np(p)}
elif isinstance(p, dict):
p = {treatment_name: convert_pd_to_np(_p) for treatment_name, _p in p.items()}
te = np.zeros((X.shape[0], self.t_groups.shape[0]))
dhat_cs = {}
dhat_ts = {}
for i, group in enumerate(self.t_groups):
model_tau_c = self.models_tau_c[group]
model_tau_t = self.models_tau_t[group]
dhat_cs[group] = model_tau_c.predict(X)
dhat_ts[group] = model_tau_t.predict(X)
_te = (p[group] * dhat_cs[group] + (1 - p[group]) * dhat_ts[group]).reshape(-1, 1)
te[:, i] = np.ravel(_te)
if (y is not None) and (treatment is not None) and verbose:
mask = (treatment == group) | (treatment == self.control_name)
"""Estimate the Average Treatment Effect (ATE).
Args:
X (np.matrix or np.array or pd.Dataframe): a feature matrix
p (np.ndarray or pd.Series or dict): an array of propensity scores of float (0,1) in the single-treatment
case; or, a dictionary of treatment groups that map to propensity vectors of float (0,1)
treatment (np.array or pd.Series): a treatment vector
y (np.array or pd.Series): an outcome vector
segment (np.array, optional): An optional segment vector of int. If given, the ATE and its CI will be
estimated for each segment.
return_ci (bool, optional): Whether to return confidence intervals
Returns:
(tuple): The ATE and its confidence interval (LB, UB) for each treatment, t and segment, s
"""
X, treatment, y = convert_pd_to_np(X, treatment, y)
check_treatment_vector(treatment, self.control_name)
self.t_groups = np.unique(treatment[treatment != self.control_name])
self.t_groups.sort()
check_p_conditions(p, self.t_groups)
if isinstance(p, (np.ndarray, pd.Series)):
treatment_name = self.t_groups[0]
p = {treatment_name: convert_pd_to_np(p)}
elif isinstance(p, dict):
p = {treatment_name: convert_pd_to_np(_p) for treatment_name, _p in p.items()}
ate = []
ate_lb = []
ate_ub = []
for i, group in enumerate(self.t_groups):
p (np.ndarray or pd.Series or dict): an array of propensity scores of float (0,1) in the single-treatment
case; or, a dictionary of treatment groups that map to propensity vectors of float (0,1)
treatment (np.array or pd.Series, optional): a treatment vector
y (np.array or pd.Series, optional): an outcome vector
return_components (bool, optional): whether to return outcome for treatment and control seperately
Returns:
(numpy.ndarray): Predictions of treatment effects.
"""
X, treatment, y = convert_pd_to_np(X, treatment, y)
check_p_conditions(p, self.t_groups)
if isinstance(p, (np.ndarray, pd.Series)):
treatment_name = self.t_groups[0]
p = {treatment_name: convert_pd_to_np(p)}
elif isinstance(p, dict):
p = {treatment_name: convert_pd_to_np(_p) for treatment_name, _p in p.items()}
te = np.zeros((X.shape[0], self.t_groups.shape[0]))
dhat_cs = {}
dhat_ts = {}
for i, group in enumerate(self.t_groups):
model_tau_c = self.models_tau_c[group]
model_tau_t = self.models_tau_t[group]
dhat_cs[group] = model_tau_c.predict(X)
dhat_ts[group] = model_tau_t.predict(X)
_te = (p[group] * dhat_cs[group] + (1 - p[group]) * dhat_ts[group]).reshape(-1, 1)
te[:, i] = np.ravel(_te)
if (y is not None) and (treatment is not None) and verbose:
mask = (treatment == group) | (treatment == self.control_name)