Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def true_effect(x, t):
return cross_product(
np.hstack([np.ones((x.shape[0], 1)), x[:, :d_x]]), t) @ true_coef.T
y = true_effect(X, T) + X[:, [0] * p] +\
cov['coef_stderr'] = est.model_final.coef_stderr_.flatten()
cov['coef_sqerror'] = (est_coef - true_coef)**2
cov['coef_cov'] = ((true_coef >= est_coef_lb) & (true_coef <= est_coef_ub))
cov['coef_length'] = est_coef_ub - est_coef_lb
effect_interval = est.effect_interval(X_test, T0=np.zeros(
(X_test.shape[0], d_t)), T1=np.ones((X_test.shape[0], d_t)), alpha=alpha)
true_eff = true_effect(X_test, np.ones((X_test.shape[0], d_t))).reshape(effect_interval[0].shape)
est_effect = est.effect(X_test, T0=np.zeros((X_test.shape[0], d_t)), T1=np.ones((X_test.shape[0], d_t)))
cov['x_test'] = np.repeat(X_test, d_y, axis=0)
cov['effect'] = est_effect.flatten()
cov['effect_lower'] = effect_interval[0].flatten()
cov['effect_upper'] = effect_interval[1].flatten()
cov['true_effect'] = true_eff.flatten()
cov['effect_sqerror'] = ((est_effect - true_eff)**2).flatten()
cov['effect_stderr'] = est.model_final.prediction_stderr(
cross_product(add_constant(X_test), np.ones((X_test.shape[0], d_t)))).flatten()
cov['effect_cov'] = ((true_eff >= effect_interval[0]) & (true_eff <= effect_interval[1])).flatten()
cov['effect_length'] = (effect_interval[1] - effect_interval[0]).flatten()
return cov
if W is None:
W = np.empty((shape(Y)[0], 0))
assert shape(Y)[0] == shape(T)[0] == shape(X)[0] == shape(W)[0] == shape(Z)[0]
# store number of columns of W so that we can create correctly shaped zero array in effect and marginal effect
self._d_w = shape(W)[1]
# store number of columns of T so that we can pass scalars to effect
self._d_t = shape(T)[1]
# two stage approximation
# first, get basis expansions of T, X, and Z
ft_X = self._x_featurizer.fit_transform(X)
ft_Z = self._z_featurizer.fit_transform(Z)
ft_T = self._t_featurizer.fit_transform(T)
# regress T expansion on X,Z expansions concatenated with W
features = _add_ones(np.hstack([W, cross_product(ft_X, ft_Z)]))
self._model_T.fit(features, ft_T)
# predict ft_T from interacted ft_X, ft_Z
ft_T_hat = self._model_T.predict(features)
self._model_Y.fit(_add_ones(np.hstack([W, cross_product(ft_T_hat, ft_X)])), Y)
return self
"""
if ndim(T0) == 0:
T0 = np.full((1 if X is None else shape(X)[0], self._d_t), T0)
if ndim(T1) == 0:
T1 = np.full((1 if X is None else shape(X)[0], self._d_t), T1)
if X is None:
X = np.empty((shape(T0)[0], 0))
assert shape(T0) == shape(T1)
assert shape(T0)[0] == shape(X)[0]
W = np.zeros((shape(T0)[0], self._d_w)) # can set arbitrarily since values will cancel
ft_X = self._x_featurizer.fit_transform(X)
ft_T0 = self._t_featurizer.fit_transform(T0)
ft_T1 = self._t_featurizer.fit_transform(T1)
Y0 = self._model_Y.predict(_add_ones(np.hstack([W, cross_product(ft_T0, ft_X)])))
Y1 = self._model_Y.predict(_add_ones(np.hstack([W, cross_product(ft_T1, ft_X)])))
return Y1 - Y0
def parameter_estimator_func(Y, T, X,
nuisance_estimates,
sample_weight=None):
"""Calculate the parameter of interest for points given by (Y, T) and corresponding nuisance estimates."""
# Compute residuals
Y_hat, T_hat = nuisance_estimates
Y_res, T_res = reshape_Y_T(Y - Y_hat, T - T_hat)
X_aug = PolynomialFeatures(degree=1, include_bias=True).fit_transform(X)
XT_res = cross_product(T_res, X_aug)
# Compute coefficient by OLS on residuals
if sample_weight is not None:
weighted_XT_res = sample_weight.reshape(-1, 1) * XT_res
else:
weighted_XT_res = XT_res / XT_res.shape[0]
# ell_2 regularization
diagonal = np.ones(XT_res.shape[1])
diagonal[:T_res.shape[1]] = 0
reg = lambda_reg * np.diag(diagonal)
# Ridge regression estimate
param_estimate = np.linalg.lstsq(np.matmul(weighted_XT_res.T, XT_res) + reg,
np.matmul(weighted_XT_res.T, Y_res.reshape(-1, 1)),
rcond=None)[0].flatten()
# Parameter returned by LinearRegression is (d_T, )
return param_estimate
def _combine(self, X, T, fitting=True):
if X is not None:
if self._featurizer is not None:
F = self._featurizer.fit_transform(X) if fitting else self._featurizer.transform(X)
else:
F = X
else:
if not self._fit_cate_intercept:
if self._use_weight_trick:
raise AttributeError("Cannot use this method with X=None. Consider "
"using the LinearDMLCateEstimator.")
else:
raise AttributeError("Cannot have X=None and also not allow for a CATE intercept!")
F = np.ones((T.shape[0], 1))
return cross_product(F, T)