Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
prel_theta = np.zeros(n_samples)
res_t = np.zeros(n_samples)
res_y = np.zeros(n_samples)
delta = np.zeros(n_samples)
splits = self._get_split_enum(y, T, X, Z)
for idx, (train, test) in enumerate(splits):
# Estimate preliminary theta in cross fitting manner
prel_theta[test] = self.prel_model_effect[idx].fit(
y[train], T[train], X[train], Z[train]).effect(X[test]).flatten()
# Estimate p(X) = E[T | X] in cross fitting manner
self.model_T_XZ[idx].fit(hstack([X[train], Z[train].reshape(-1, 1)]), T[train])
Z_one = np.ones((Z[test].shape[0], 1))
Z_zero = np.zeros((Z[test].shape[0], 1))
pr_t_test_one = self.model_T_XZ[idx].predict(hstack([X[test], Z_one]))
pr_t_test_zero = self.model_T_XZ[idx].predict(hstack([X[test], Z_zero]))
delta[test] = (pr_t_test_one - pr_t_test_zero) / 2
pr_t_test = (pr_t_test_one + pr_t_test_zero) / 2
res_t[test] = T[test] - pr_t_test
# Estimate residual Y_res = Y - q(X) = Y - E[Y | X] in cross fitting manner
res_y[test] = y[test] - \
self.model_Y_X[idx].fit(X[train], y[train]).predict(X[test])
return prel_theta, res_t, res_y, 2*Z-1 , delta
def _nuisance_estimates(self, y, T, X, Z):
n_samples = y.shape[0]
prel_theta = np.zeros(n_samples)
res_t = np.zeros(n_samples)
res_y = np.zeros(n_samples)
delta = np.zeros(n_samples)
splits = self._get_split_enum(y, T, X, Z)
for idx, (train, test) in enumerate(splits):
# Estimate preliminary theta in cross fitting manner
prel_theta[test] = self.prel_model_effect[idx].fit(
y[train], T[train], X[train], Z[train]).effect(X[test]).flatten()
# Estimate p(X) = E[T | X] in cross fitting manner
self.model_T_XZ[idx].fit(hstack([X[train], Z[train].reshape(-1, 1)]), T[train])
Z_one = np.ones((Z[test].shape[0], 1))
Z_zero = np.zeros((Z[test].shape[0], 1))
pr_t_test_one = self.model_T_XZ[idx].predict(hstack([X[test], Z_one]))
pr_t_test_zero = self.model_T_XZ[idx].predict(hstack([X[test], Z_zero]))
delta[test] = (pr_t_test_one - pr_t_test_zero) / 2
pr_t_test = (pr_t_test_one + pr_t_test_zero) / 2
res_t[test] = T[test] - pr_t_test
# Estimate residual Y_res = Y - q(X) = Y - E[Y | X] in cross fitting manner
res_y[test] = y[test] - \
self.model_Y_X[idx].fit(X[train], y[train]).predict(X[test])
return prel_theta, res_t, res_y, 2*Z-1 , delta
def _combine(self, X, W, n_samples, fitting=True):
if X is None:
# if both X and W are None, just return a column of ones
return (W if W is not None else np.ones((n_samples, 1)))
XW = hstack([X, W]) if W is not None else X
if self._is_Y and self._linear_first_stages:
if self._featurizer is None:
F = X
else:
F = self._featurizer.fit_transform(X) if fitting else self._featurizer.transform(X)
return cross_product(XW, hstack([np.ones((shape(XW)[0], 1)), F]))
else:
return XW
def _combine(self, X, W, n_samples, fitting=True):
if X is None:
# if both X and W are None, just return a column of ones
return (W if W is not None else np.ones((n_samples, 1)))
XW = hstack([X, W]) if W is not None else X
if self._is_Y and self._linear_first_stages:
if self._featurizer is None:
F = X
else:
F = self._featurizer.fit_transform(X) if fitting else self._featurizer.transform(X)
return cross_product(XW, hstack([np.ones((shape(XW)[0], 1)), F]))
else:
return XW
add_intercept = FunctionTransformer(lambda F:
hstack([np.ones((F.shape[0], 1)), F]),
validate=True)