Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
str_arr = method_name.split(".", maxsplit=1)
identifier_name = str_arr[0]
estimator_name = str_arr[1]
identified_estimand.set_identifier_method(identifier_name)
if estimator_name.startswith("econml"):
causal_estimator_class =causal_estimators.get_class_object("econml_cate_estimator")
if method_params is None:
method_params = {}
method_params["_econml_methodname"] = estimator_name
else:
causal_estimator_class = causal_estimators.get_class_object(estimator_name + "_estimator")
# Check if estimator's target estimand is identified
if identified_estimand.estimands[identifier_name] is None:
self.logger.warning("No valid identified estimand for using instrumental variables method")
estimate = CausalEstimate(None, None, None)
else:
causal_estimator = causal_estimator_class(
self._data,
identified_estimand,
self._treatment, self._outcome, #names of treatment and outcome
control_value = control_value,
treatment_value = treatment_value,
test_significance=test_significance,
evaluate_effect_strength=evaluate_effect_strength,
confidence_intervals = confidence_intervals,
target_units = target_units,
effect_modifiers = effect_modifiers,
params=method_params
)
estimate = causal_estimator.estimate_effect()
# Store parameters inside estimate object for refutation methods
iv_est = num / deno
else:
# Obtain estimate by 2SLS estimator: Cov(y,z) / Cov(x,z)
num_yz = np.cov(self._outcome, instrument)[0, 1]
deno_xz = np.cov(self._treatment[self._treatment_name[0]], instrument)[0, 1]
iv_est = num_yz / deno_xz
else:
# More than 1 instrument. Use 2sls.
est_treatment = self._treatment.astype(np.float32)
est_outcome = self._outcome.astype(np.float32)
ivmodel = IV2SLS(est_outcome, est_treatment,
self._estimating_instruments)
reg_results = ivmodel.fit()
print(reg_results.summary())
iv_est = sum(reg_results.params) # the effect is the same for any treatment value (assume treatment goes from 0 to 1)
estimate = CausalEstimate(estimate=iv_est,
target_estimand=self._target_estimand,
realized_estimand_expr=self.symbolic_estimator)
return estimate
"""
if method_name is None:
pass
else:
str_arr = method_name.split(".", maxsplit=1)
print(str_arr)
identifier_name = str_arr[0]
estimator_name = str_arr[1]
identified_estimand.set_identifier_method(identifier_name)
causal_estimator_class = causal_estimators.get_class_object(estimator_name + "_estimator")
# Check if estimator's target estimand is identified
if identified_estimand.estimands[identifier_name] is None:
self.logger.warning("No valid identified estimand for using instrumental variables method")
estimate = CausalEstimate(None, None, None)
else:
causal_estimator = causal_estimator_class(
self._data,
identified_estimand,
self._treatment, self._outcome,
test_significance=False,
params=method_params
)
try:
estimate = causal_estimator.do(x)
except NotImplementedError:
self.logger.error('Do Operation not implemented or not supported for this estimator.')
raise NotImplementedError
return estimate
def estimate_effect_naive(self):
#TODO Only works for binary treatment
df_withtreatment = self._data.loc[self._data[self._treatment_name] == 1]
df_notreatment = self._data.loc[self._data[self._treatment_name]== 0]
est = np.mean(df_withtreatment[self._outcome_name]) - np.mean(df_notreatment[self._outcome_name])
return CausalEstimate(est, None, None)
n_target_units = X_test.shape[0]
# Changing shape to a list for a singleton value
if type(self._control_value) is not list:
self._control_value = [self._control_value]
if type(self._treatment_value) is not list:
self._treatment_value = [self._treatment_value]
T0_test = np.repeat([self._control_value], n_target_units, axis=0)
T1_test = np.repeat([self._treatment_value], n_target_units, axis=0)
est = self.estimator.effect(X_test, T0 = T0_test, T1 = T1_test)
ate = np.mean(est)
est_interval = None
if self._confidence_intervals:
est_interval = self.estimator.effect_interval(X_test, T0 = T0_test, T1 = T1_test)
estimate = CausalEstimate(estimate=ate,
target_estimand=self._target_estimand,
realized_estimand_expr=self.symbolic_estimator,
cate_estimates=est,
effect_intervals=est_interval,
_estimator_object = self.estimator)
return estimate
control_outcome = control.iloc[i][self._outcome_name].item()
treated_outcome = treated.iloc[indices[i]][self._outcome_name].item()
atc += treated_outcome - control_outcome
atc /= numcontrolunits
if self._target_units == "att":
est = att
elif self._target_units == "atc":
est = atc
elif self._target_units == "ate":
est = (att*numtreatedunits + atc*numcontrolunits)/(numtreatedunits+numcontrolunits)
else:
raise ValueError("Target units string value not supported")
estimate = CausalEstimate(estimate=est,
target_estimand=self._target_estimand,
realized_estimand_expr=self.symbolic_estimator,
propensity_scores=self._data["propensity_score"])
return estimate
total_treatment_population = weighted_outcomes[treatment_sum_name].sum()
total_control_population = weighted_outcomes[control_sum_name].sum()
total_population = total_treatment_population + total_control_population
if self._target_units=="att":
est = (weighted_outcomes['effect'] * weighted_outcomes[treatment_sum_name]).sum() / total_treatment_population
elif self._target_units=="atc":
est = (weighted_outcomes['effect'] * weighted_outcomes[control_sum_name]).sum() / total_control_population
elif self._target_units == "ate":
est = (weighted_outcomes['effect'] * (weighted_outcomes[control_sum_name]+weighted_outcomes[treatment_sum_name])).sum() / total_population
else:
raise ValueError("Target units string value not supported")
# TODO - how can we add additional information into the returned estimate?
# such as how much clipping was done, or per-strata info for debugging?
estimate = CausalEstimate(estimate=est,
target_estimand=self._target_estimand,
realized_estimand_expr=self.symbolic_estimator,
propensity_scores = self._data["propensity_score"])
return estimate
# Calculating the effect
self._data['d_y'] = (
self._data[weighting_scheme_name] *
self._data[self._treatment_name[0]] *
self._data[self._outcome_name]
)
self._data['dbar_y'] = (
self._data[weighting_scheme_name] *
(1 - self._data[self._treatment_name[0]]) *
self._data[self._outcome_name]
)
est = self._data['d_y'].sum() - self._data['dbar_y'].sum()
# TODO - how can we add additional information into the returned estimate?
estimate = CausalEstimate(estimate=est,
target_estimand=self._target_estimand,
realized_estimand_expr=self.symbolic_estimator,
propensity_scores = self._data["ps"])
return estimate