Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Features for each sample
W: optional(n × d_w) matrix
Controls for each sample
Z: optional(n × d_z) matrix
Instruments for each sample
Returns
-------
self
"""
if X is None:
X = np.empty((shape(Y)[0], 0))
if W is None:
W = np.empty((shape(Y)[0], 0))
assert shape(Y)[0] == shape(T)[0] == shape(X)[0] == shape(W)[0] == shape(Z)[0]
# store number of columns of W so that we can create correctly shaped zero array in effect and marginal effect
self._d_w = shape(W)[1]
# store number of columns of T so that we can pass scalars to effect
self._d_t = shape(T)[1]
# two stage approximation
# first, get basis expansions of T, X, and Z
ft_X = self._x_featurizer.fit_transform(X)
ft_Z = self._z_featurizer.fit_transform(Z)
ft_T = self._t_featurizer.fit_transform(T)
# regress T expansion on X,Z expansions concatenated with W
features = _add_ones(np.hstack([W, cross_product(ft_X, ft_Z)]))
self._model_T.fit(features, ft_T)
# predict ft_T from interacted ft_X, ft_Z
ft_T_hat = self._model_T.predict(features)
if splitter != self._n_splits and isinstance(splitter, (KFold, StratifiedKFold)):
splitter.shuffle = True
splitter.random_state = self._random_state
all_vars = [var if np.ndim(var) == 2 else var.reshape(-1, 1) for var in [Z, W, X] if var is not None]
if all_vars:
all_vars = np.hstack(all_vars)
folds = splitter.split(all_vars, T)
else:
folds = splitter.split(np.ones((T.shape[0], 1)), T)
if self._discrete_treatment:
T = self._label_encoder.fit_transform(T.ravel())
# drop first column since all columns sum to one
T = self._one_hot_encoder.fit_transform(reshape(T, (-1, 1)))[:, 1:]
self._d_t = shape(T)[1:]
self.transformer = FunctionTransformer(
func=(lambda T:
self._one_hot_encoder.transform(
reshape(self._label_encoder.transform(T.ravel()), (-1, 1)))[:, 1:]),
validate=False)
nuisances, fitted_models, fitted_inds = _crossfit(self._model_nuisance, folds,
Y, T, X=X, W=W, Z=Z, sample_weight=sample_weight)
self._models_nuisance = fitted_models
return nuisances, fitted_inds
def fit(self, X, T_res, Y_res, sample_weight=None, sample_var=None):
# Track training dimensions to see if Y or T is a vector instead of a 2-dimensional array
self._d_t = shape(T_res)[1:]
self._d_y = shape(Y_res)[1:]
if not self._use_weight_trick:
fts = self._combine(X, T_res)
if sample_weight is not None:
if sample_var is not None:
self._model.fit(fts,
Y_res, sample_weight=sample_weight, sample_var=sample_var)
else:
self._model.fit(fts,
Y_res, sample_weight=sample_weight)
else:
self._model.fit(fts, Y_res)
self._intercept = None
intercept = self._model.predict(np.zeros_like(fts[0:1]))
if (np.count_nonzero(intercept) > 0):
# we rely on the fact that M(X beta) = (M X) beta, but M(X beta + c) is not the same
# as (M X) beta + c, so the learned coef and intercept will be wrong
intercept = self.penalized_model.predict(np.zeros_like(X2[0:1]))
if not np.allclose(intercept, 0):
raise AttributeError("The penalized model has a non-zero intercept; to fit an intercept "
"you should instead either set fit_intercept to True when initializing the "
"SelectiveRegression instance (for an unpenalized intercept) or "
"explicitly add a column of ones to the data being fit and include that "
"column in the penalized indices.")
# now regress X1 on y - X2 * beta2 to learn beta1
self._model_X1 = LinearRegression(fit_intercept=self._fit_intercept)
self._model_X1.fit(X1, y - self.penalized_model.predict(X2), sample_weight=sample_weight)
# set coef_ and intercept_ attributes
self.coef_ = np.empty(shape(y)[1:] + shape(X)[1:])
self.coef_[..., self._penalized_inds] = self.penalized_model.coef_
self.coef_[..., self._unpenalized_inds] = self._model_X1.coef_
# Note that the penalized model should *not* have an intercept
self.intercept_ = self._model_X1.intercept_
return self
def _add_ones(arr):
"""Add a column of ones to the front of an array."""
return np.hstack([np.ones((shape(arr)[0], 1)), arr])
ncols = shape(X)[1]
columns = []
for indices in product(*[range(ncols) for i in range(self._shift)]):
if self._joint:
columns.append(cross_product(*[self._column_feats(X[:, i], indices.count(i))
for i in range(shape(X)[1])]))
else:
indices = set(indices)
if self._shift == 0: # return features for all columns:
columns.append(np.hstack([self._column_feats(X[:, i], self._shift) for i in range(shape(X)[1])]))
# columns are featurized independently; partial derivatives are only non-zero
# when taken with respect to the same column each time
elif len(indices) == 1:
index = list(indices)[0]
feats = self._column_feats(X[:, index], self._shift)
columns.append(np.hstack([feats if i == index else np.zeros(shape(feats))
for i in range(shape(X)[1])]))
else:
columns.append(np.zeros((n, (self._degree + 1) * ncols)))
return reshape(np.hstack(columns), (n,) + (ncols,) * self._shift + (-1,))
def _check_input_dims(self, Y, T, X=None, W=None, Z=None, sample_weight=None, sample_var=None):
assert shape(Y)[0] == shape(T)[0], "Dimension mis-match!"
for arr in [X, W, Z, sample_weight, sample_var]:
assert (arr is None) or (arr.shape[0] == Y.shape[0]), "Dimension mismatch"
self._d_x = X.shape[1:] if X is not None else None
self._d_w = W.shape[1:] if W is not None else None
self._d_z = Z.shape[1:] if Z is not None else None