Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
dep_val = deprecated_options["feature_dependence"]
if dep_val == "independent" and feature_perturbation == "interventional":
warnings.warn("feature_dependence = \"independent\" has been renamed to feature_perturbation" \
" == \"interventional\"! See GitHub issue #882.")
elif feature_perturbation != "interventional":
warnings.warn("feature_dependence = \"independent\" has been renamed to feature_perturbation" \
" == \"interventional\", you can't supply both options! See GitHub issue #882.")
if dep_val == "tree_path_dependent" and feature_perturbation == "interventional":
raise Exception("The feature_dependence option has been renamed to feature_perturbation! " \
"Please update the option name before calling TreeExplainer. See GitHub issue #882.")
if feature_perturbation == "independent":
raise Exception("feature_perturbation = \"independent\" is not a valid option value, please use " \
"feature_perturbation == \"interventional\" instead. See GitHub issue #882.")
if safe_isinstance(data, "pandas.core.frame.DataFrame"):
self.data = data.values
elif isinstance(data, DenseData):
self.data = data.data
else:
self.data = data
if self.data is None:
feature_perturbation = "tree_path_dependent"
warnings.warn("Setting feature_perturbation = \"tree_path_dependent\" because no background data was given.")
elif feature_perturbation == "interventional" and self.data.shape[0] > 1000:
warnings.warn("Passing "+str(self.data.shape[0]) + " background samples may lead to slow runtimes. Consider "
"using shap.sample(data, 100) to create a smaller background data set.")
self.data_missing = None if self.data is None else np.isnan(self.data)
self.model_output = model_output
self.feature_perturbation = feature_perturbation
self.expected_value = None
self.model = TreeEnsemble(model, self.data, self.data_missing)
else:
normalize = False
self.tree_output = "raw_value"
# Spark Random forest, create 1 weighted (avg) tree per sub-model
if safe_isinstance(model, "pyspark.ml.classification.RandomForestClassificationModel") \
or safe_isinstance(model, "pyspark.ml.regression.RandomForestRegressionModel"):
sum_weight = sum(model.treeWeights) # output is average of trees
self.trees = [Tree(tree, normalize=normalize, scaling=model.treeWeights[i]/sum_weight) for i, tree in enumerate(model.trees)]
# Spark GBT, create 1 weighted (learning rate) tree per sub-model
elif safe_isinstance(model, "pyspark.ml.classification.GBTClassificationModel") \
or safe_isinstance(model, "pyspark.ml.regression.GBTRegressionModel"):
self.objective = "squared_error" # GBT subtree use the variance
self.tree_output = "raw_value"
self.trees = [Tree(tree, normalize=False, scaling=model.treeWeights[i]) for i, tree in enumerate(model.trees)]
# Spark Basic model (single tree)
elif safe_isinstance(model, "pyspark.ml.classification.DecisionTreeClassificationModel") \
or safe_isinstance(model, "pyspark.ml.regression.DecisionTreeRegressionModel"):
self.trees = [Tree(model, normalize=normalize, scaling=1)]
else:
assert False, "Unsupported Spark model type: " + str(type(model))
elif safe_isinstance(model, "xgboost.core.Booster"):
import xgboost
self.original_model = model
self.model_type = "xgboost"
xgb_loader = XGBTreeModelLoader(self.original_model)
self.trees = xgb_loader.get_trees(data=data, data_missing=data_missing)
self.base_offset = xgb_loader.base_score
less_than_or_equal = False
self.objective = objective_name_map.get(xgb_loader.name_obj, None)
self.tree_output = tree_output_name_map.get(xgb_loader.name_obj, None)
elif safe_isinstance(model, "xgboost.sklearn.XGBClassifier"):
import xgboost
def __init__(self, tree, normalize=False, scaling=1.0, data=None, data_missing=None):
assert_import("cext")
if safe_isinstance(tree, "sklearn.tree._tree.Tree"):
self.children_left = tree.children_left.astype(np.int32)
self.children_right = tree.children_right.astype(np.int32)
self.children_default = self.children_left # missing values not supported in sklearn
self.features = tree.feature.astype(np.int32)
self.thresholds = tree.threshold.astype(np.float64)
self.values = tree.value.reshape(tree.value.shape[0], tree.value.shape[1] * tree.value.shape[2])
if normalize:
self.values = (self.values.T / self.values.sum(1)).T
self.values = self.values * scaling
self.node_sample_weight = tree.weighted_n_node_samples.astype(np.float64)
elif type(tree) is dict and 'features' in tree:
self.children_left = tree["children_left"].astype(np.int32)
self.children_right = tree["children_right"].astype(np.int32)
self.children_default = tree["children_default"].astype(np.int32)
self.features = tree["features"].astype(np.int32)
if not isinstance(X, xgboost.core.DMatrix):
X = xgboost.DMatrix(X)
if tree_limit == -1:
tree_limit = 0
phi = self.model.original_model.predict(X, ntree_limit=tree_limit, pred_interactions=True)
# note we pull off the last column and keep it as our expected_value
if len(phi.shape) == 4:
self.expected_value = [phi[0, i, -1, -1] for i in range(phi.shape[1])]
return [phi[:, i, :-1, :-1] for i in range(phi.shape[1])]
else:
self.expected_value = phi[0, -1, -1]
return phi[:, :-1, :-1]
# convert dataframes
if safe_isinstance(X, "pandas.core.series.Series"):
X = X.values
elif safe_isinstance(X, "pandas.core.frame.DataFrame"):
X = X.values
flat_output = False
if len(X.shape) == 1:
flat_output = True
X = X.reshape(1, X.shape[0])
if X.dtype != self.model.input_dtype:
X = X.astype(self.model.input_dtype)
X_missing = np.isnan(X, dtype=np.bool)
assert isinstance(X, np.ndarray), "Unknown instance type: " + str(type(X))
assert len(X.shape) == 2, "Passed input data matrix X must have 1 or 2 dimensions!"
if tree_limit < 0 or tree_limit > self.model.values.shape[0]:
tree_limit = self.model.values.shape[0]
elif safe_isinstance(model, "lightgbm.sklearn.LGBMRegressor"):
assert_import("lightgbm")
self.model_type = "lightgbm"
self.original_model = model.booster_
tree_info = self.original_model.dump_model()["tree_info"]
try:
self.trees = [Tree(e, data=data, data_missing=data_missing) for e in tree_info]
except:
self.trees = None # we get here because the cext can't handle categorical splits yet
self.objective = objective_name_map.get(model.objective, None)
self.tree_output = tree_output_name_map.get(model.objective, None)
if model.objective is None:
self.objective = "squared_error"
self.tree_output = "raw_value"
elif safe_isinstance(model, "lightgbm.sklearn.LGBMRanker"):
assert_import("lightgbm")
self.model_type = "lightgbm"
self.original_model = model.booster_
tree_info = self.original_model.dump_model()["tree_info"]
try:
self.trees = [Tree(e, data=data, data_missing=data_missing) for e in tree_info]
except:
self.trees = None # we get here because the cext can't handle categorical splits yet
# Note: for ranker, leaving tree_output and objective as None as they
# are not implemented in native code yet
elif safe_isinstance(model, "lightgbm.sklearn.LGBMClassifier"):
assert_import("lightgbm")
self.model_type = "lightgbm"
self.original_model = model.booster_
tree_info = self.original_model.dump_model()["tree_info"]
try:
assert_import("catboost")
self.model_type = "catboost"
self.original_model = model
self.input_dtype = np.float32
try:
cb_loader = CatBoostTreeModelLoader(model)
self.trees = cb_loader.get_trees(data=data, data_missing=data_missing)
except:
self.trees = None # we get here because the cext can't handle categorical splits yet
self.tree_output = "log_odds"
self.objective = "binary_crossentropy"
elif safe_isinstance(model, "catboost.core.CatBoost"):
assert_import("catboost")
self.model_type = "catboost"
self.original_model = model
elif safe_isinstance(model, "imblearn.ensemble._forest.BalancedRandomForestClassifier"):
self.input_dtype = np.float32
scaling = 1.0 / len(model.estimators_) # output is average of trees
self.trees = [Tree(e.tree_, normalize=True, scaling=scaling, data=data, data_missing=data_missing) for e in model.estimators_]
self.objective = objective_name_map.get(model.criterion, None)
self.tree_output = "probability"
else:
raise Exception("Model type not yet supported by TreeExplainer: " + str(type(model)))
# build a dense numpy version of all the tree objects
if self.trees is not None and self.trees:
max_nodes = np.max([len(t.values) for t in self.trees])
assert len(np.unique([t.values.shape[1] for t in self.trees])) == 1, "All trees in the ensemble must have the same output dimension!"
ntrees = len(self.trees)
self.n_outputs = self.trees[0].values.shape[1]
# important to be -1 in unused sections!! This way we can tell which entries are valid.
assert hasattr(model, "estimators_"), "Model has no `estimators_`! Have you called `model.fit`?"
self.internal_dtype = model.estimators_[0].tree_.value.dtype.type
self.input_dtype = np.float32
scaling = 1.0 / len(model.estimators_) # output is average of trees
self.trees = [Tree(e.tree_, normalize=True, scaling=scaling, data=data, data_missing=data_missing) for e in model.estimators_]
self.objective = objective_name_map.get(model.criterion, None)
self.tree_output = "probability"
elif safe_isinstance(model, ["sklearn.ensemble.GradientBoostingRegressor", "sklearn.ensemble.gradient_boosting.GradientBoostingRegressor"]):
self.input_dtype = np.float32
# currently we only support the mean and quantile estimators
if safe_isinstance(model.init_, ["sklearn.ensemble.MeanEstimator", "sklearn.ensemble.gradient_boosting.MeanEstimator"]):
self.base_offset = model.init_.mean
elif safe_isinstance(model.init_, ["sklearn.ensemble.QuantileEstimator", "sklearn.ensemble.gradient_boosting.QuantileEstimator"]):
self.base_offset = model.init_.quantile
elif safe_isinstance(model.init_, "sklearn.dummy.DummyRegressor"):
self.base_offset = model.init_.constant_[0]
else:
assert False, "Unsupported init model type: " + str(type(model.init_))
self.trees = [Tree(e.tree_, scaling=model.learning_rate, data=data, data_missing=data_missing) for e in model.estimators_[:,0]]
self.objective = objective_name_map.get(model.criterion, None)
self.tree_output = "raw_value"
elif safe_isinstance(model, ["sklearn.ensemble.GradientBoostingClassifier", "sklearn.ensemble.gradient_boosting.GradientBoostingClassifier"]):
self.input_dtype = np.float32
# TODO: deal with estimators for each class
if model.estimators_.shape[1] > 1:
assert False, "GradientBoostingClassifier is only supported for binary classification right now!"
# currently we only support the logs odds estimator
if safe_isinstance(model.init_, ["sklearn.ensemble.LogOddsEstimator", "sklearn.ensemble.gradient_boosting.LogOddsEstimator"]):
less_than_or_equal = False
self.objective = objective_name_map.get(xgb_loader.name_obj, None)
self.tree_output = tree_output_name_map.get(xgb_loader.name_obj, None)
elif safe_isinstance(model, "xgboost.sklearn.XGBClassifier"):
import xgboost
self.input_dtype = np.float32
self.model_type = "xgboost"
self.original_model = model.get_booster()
xgb_loader = XGBTreeModelLoader(self.original_model)
self.trees = xgb_loader.get_trees(data=data, data_missing=data_missing)
self.base_offset = xgb_loader.base_score
less_than_or_equal = False
self.objective = objective_name_map.get(xgb_loader.name_obj, None)
self.tree_output = tree_output_name_map.get(xgb_loader.name_obj, None)
self.tree_limit = getattr(model, "best_ntree_limit", None)
elif safe_isinstance(model, "xgboost.sklearn.XGBRegressor"):
import xgboost
self.original_model = model.get_booster()
self.model_type = "xgboost"
xgb_loader = XGBTreeModelLoader(self.original_model)
self.trees = xgb_loader.get_trees(data=data, data_missing=data_missing)
self.base_offset = xgb_loader.base_score
less_than_or_equal = False
self.objective = objective_name_map.get(xgb_loader.name_obj, None)
self.tree_output = tree_output_name_map.get(xgb_loader.name_obj, None)
self.tree_limit = getattr(model, "best_ntree_limit", None)
elif safe_isinstance(model, "xgboost.sklearn.XGBRanker"):
import xgboost
self.original_model = model.get_booster()
self.model_type = "xgboost"
xgb_loader = XGBTreeModelLoader(self.original_model)
self.trees = xgb_loader.get_trees(data=data, data_missing=data_missing)
self.model_type = "pyspark"
# model._java_obj.getImpurity() can be gini, entropy or variance.
self.objective = objective_name_map.get(model._java_obj.getImpurity(), None)
if "Classification" in str(type(model)):
normalize = True
self.tree_output = "probability"
else:
normalize = False
self.tree_output = "raw_value"
# Spark Random forest, create 1 weighted (avg) tree per sub-model
if safe_isinstance(model, "pyspark.ml.classification.RandomForestClassificationModel") \
or safe_isinstance(model, "pyspark.ml.regression.RandomForestRegressionModel"):
sum_weight = sum(model.treeWeights) # output is average of trees
self.trees = [Tree(tree, normalize=normalize, scaling=model.treeWeights[i]/sum_weight) for i, tree in enumerate(model.trees)]
# Spark GBT, create 1 weighted (learning rate) tree per sub-model
elif safe_isinstance(model, "pyspark.ml.classification.GBTClassificationModel") \
or safe_isinstance(model, "pyspark.ml.regression.GBTRegressionModel"):
self.objective = "squared_error" # GBT subtree use the variance
self.tree_output = "raw_value"
self.trees = [Tree(tree, normalize=False, scaling=model.treeWeights[i]) for i, tree in enumerate(model.trees)]
# Spark Basic model (single tree)
elif safe_isinstance(model, "pyspark.ml.classification.DecisionTreeClassificationModel") \
or safe_isinstance(model, "pyspark.ml.regression.DecisionTreeRegressionModel"):
self.trees = [Tree(model, normalize=normalize, scaling=1)]
else:
assert False, "Unsupported Spark model type: " + str(type(model))
elif safe_isinstance(model, "xgboost.core.Booster"):
import xgboost
self.original_model = model
self.model_type = "xgboost"
xgb_loader = XGBTreeModelLoader(self.original_model)
self.trees = xgb_loader.get_trees(data=data, data_missing=data_missing)
self.trees = [Tree(e.tree_, scaling=model.learning_rate, data=data, data_missing=data_missing) for e in model.estimators_[:,0]]
self.objective = objective_name_map.get(model.criterion, None)
elif "pyspark.ml" in str(type(model)):
assert_import("pyspark")
self.original_model = model
self.model_type = "pyspark"
# model._java_obj.getImpurity() can be gini, entropy or variance.
self.objective = objective_name_map.get(model._java_obj.getImpurity(), None)
if "Classification" in str(type(model)):
normalize = True
self.tree_output = "probability"
else:
normalize = False
self.tree_output = "raw_value"
# Spark Random forest, create 1 weighted (avg) tree per sub-model
if safe_isinstance(model, "pyspark.ml.classification.RandomForestClassificationModel") \
or safe_isinstance(model, "pyspark.ml.regression.RandomForestRegressionModel"):
sum_weight = sum(model.treeWeights) # output is average of trees
self.trees = [Tree(tree, normalize=normalize, scaling=model.treeWeights[i]/sum_weight) for i, tree in enumerate(model.trees)]
# Spark GBT, create 1 weighted (learning rate) tree per sub-model
elif safe_isinstance(model, "pyspark.ml.classification.GBTClassificationModel") \
or safe_isinstance(model, "pyspark.ml.regression.GBTRegressionModel"):
self.objective = "squared_error" # GBT subtree use the variance
self.tree_output = "raw_value"
self.trees = [Tree(tree, normalize=False, scaling=model.treeWeights[i]) for i, tree in enumerate(model.trees)]
# Spark Basic model (single tree)
elif safe_isinstance(model, "pyspark.ml.classification.DecisionTreeClassificationModel") \
or safe_isinstance(model, "pyspark.ml.regression.DecisionTreeRegressionModel"):
self.trees = [Tree(model, normalize=normalize, scaling=1)]
else:
assert False, "Unsupported Spark model type: " + str(type(model))
elif safe_isinstance(model, "xgboost.core.Booster"):