Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if type(model) is dict and "trees" in model:
# This allows a dictionary to be passed that represents the model.
# this dictionary has several numerica paramters and also a list of trees
# where each tree is a dictionary describing that tree
if "internal_dtype" in model:
self.internal_dtype = model["internal_dtype"]
if "input_dtype" in model:
self.input_dtype = model["input_dtype"]
if "objective" in model:
self.objective = model["objective"]
if "tree_output" in model:
self.tree_output = model["tree_output"]
if "base_offset" in model:
self.base_offset = model["base_offset"]
self.trees = [Tree(t, data=data, data_missing=data_missing) for t in model["trees"]]
elif type(model) is list and type(model[0]) == Tree: # old-style direct-load format
self.trees = model
elif safe_isinstance(model, ["sklearn.ensemble.RandomForestRegressor", "sklearn.ensemble.forest.RandomForestRegressor"]):
assert hasattr(model, "estimators_"), "Model has no `estimators_`! Have you called `model.fit`?"
self.internal_dtype = model.estimators_[0].tree_.value.dtype.type
self.input_dtype = np.float32
scaling = 1.0 / len(model.estimators_) # output is average of trees
self.trees = [Tree(e.tree_, scaling=scaling, data=data, data_missing=data_missing) for e in model.estimators_]
self.objective = objective_name_map.get(model.criterion, None)
self.tree_output = "raw_value"
elif safe_isinstance(model, ["sklearn.ensemble.IsolationForest", "sklearn.ensemble.iforest.IsolationForest"]):
self.dtype = np.float32
scaling = 1.0 / len(model.estimators_) # output is average of trees
self.trees = [IsoTree(e.tree_, scaling=scaling, data=data, data_missing=data_missing) for e in model.estimators_]
self.tree_output = "raw_value"
elif safe_isinstance(model, "skopt.learning.forest.RandomForestRegressor"):
self.objective = objective_name_map.get(model.criterion, None)
self.tree_output = "raw_value"
elif safe_isinstance(model, ["sklearn.ensemble.ExtraTreesRegressor", "sklearn.ensemble.forest.ExtraTreesRegressor"]):
assert hasattr(model, "estimators_"), "Model has no `estimators_`! Have you called `model.fit`?"
self.internal_dtype = model.estimators_[0].tree_.value.dtype.type
self.input_dtype = np.float32
scaling = 1.0 / len(model.estimators_) # output is average of trees
self.trees = [Tree(e.tree_, scaling=scaling, data=data, data_missing=data_missing) for e in model.estimators_]
self.objective = objective_name_map.get(model.criterion, None)
self.tree_output = "raw_value"
elif safe_isinstance(model, "skopt.learning.forest.ExtraTreesRegressor"):
assert hasattr(model, "estimators_"), "Model has no `estimators_`! Have you called `model.fit`?"
self.internal_dtype = model.estimators_[0].tree_.value.dtype.type
self.input_dtype = np.float32
scaling = 1.0 / len(model.estimators_) # output is average of trees
self.trees = [Tree(e.tree_, scaling=scaling, data=data, data_missing=data_missing) for e in model.estimators_]
self.objective = objective_name_map.get(model.criterion, None)
self.tree_output = "raw_value"
elif safe_isinstance(model, ["sklearn.tree.DecisionTreeRegressor", "sklearn.tree.tree.DecisionTreeRegressor"]):
self.internal_dtype = model.tree_.value.dtype.type
self.input_dtype = np.float32
self.trees = [Tree(model.tree_, data=data, data_missing=data_missing)]
self.objective = objective_name_map.get(model.criterion, None)
self.tree_output = "raw_value"
elif safe_isinstance(model, ["sklearn.tree.DecisionTreeClassifier", "sklearn.tree.tree.DecisionTreeClassifier"]):
self.internal_dtype = model.tree_.value.dtype.type
self.input_dtype = np.float32
self.trees = [Tree(model.tree_, normalize=True, data=data, data_missing=data_missing)]
self.objective = objective_name_map.get(model.criterion, None)
self.tree_output = "probability"
elif safe_isinstance(model, ["sklearn.ensemble.RandomForestClassifier", "sklearn.ensemble.forest.RandomForestClassifier"]):
assert hasattr(model, "estimators_"), "Model has no `estimators_`! Have you called `model.fit`?"
if type(model) is dict and "trees" in model:
# This allows a dictionary to be passed that represents the model.
# this dictionary has several numerica paramters and also a list of trees
# where each tree is a dictionary describing that tree
if "internal_dtype" in model:
self.internal_dtype = model["internal_dtype"]
if "input_dtype" in model:
self.input_dtype = model["input_dtype"]
if "objective" in model:
self.objective = model["objective"]
if "tree_output" in model:
self.tree_output = model["tree_output"]
if "base_offset" in model:
self.base_offset = model["base_offset"]
self.trees = [Tree(t, data=data, data_missing=data_missing) for t in model["trees"]]
elif type(model) is list and type(model[0]) == Tree: # old-style direct-load format
self.trees = model
elif safe_isinstance(model, ["sklearn.ensemble.RandomForestRegressor", "sklearn.ensemble.forest.RandomForestRegressor"]):
assert hasattr(model, "estimators_"), "Model has no `estimators_`! Have you called `model.fit`?"
self.internal_dtype = model.estimators_[0].tree_.value.dtype.type
self.input_dtype = np.float32
scaling = 1.0 / len(model.estimators_) # output is average of trees
self.trees = [Tree(e.tree_, scaling=scaling, data=data, data_missing=data_missing) for e in model.estimators_]
self.objective = objective_name_map.get(model.criterion, None)
self.tree_output = "raw_value"
elif safe_isinstance(model, ["sklearn.ensemble.IsolationForest", "sklearn.ensemble.iforest.IsolationForest"]):
self.dtype = np.float32
scaling = 1.0 / len(model.estimators_) # output is average of trees
self.trees = [IsoTree(e.tree_, scaling=scaling, data=data, data_missing=data_missing) for e in model.estimators_]
self.tree_output = "raw_value"
elif safe_isinstance(model, "skopt.learning.forest.RandomForestRegressor"):
assert hasattr(model, "estimators_"), "Model has no `estimators_`! Have you called `model.fit`?"
# print(children_left)
# trees[-1]['children_left']=np.copy(children_left)
# trees[-1]['children_right']=np.copy(children_right)
# trees[-1]['children_default']=np.copy(children_default)
# trees[-1]['feature']=np.copy(features)
# trees[-1]['threshold']=np.copy(thresholds)
# trees[-1]['value']=np.copy(values).reshape(-1,1)
# trees[-1]['node_sample_weight']=np.copy(node_sample_weight)
trees[-1]['value']=trees[-1]['value'].reshape(-1,1)
# print(len(trees))
import shap.explainers.tree as shap_tree
shap_trees = []
for k in range(self.n_trees * n_tree_per_iter):
shap_trees.append(shap_tree.Tree(trees[k]))
return shap_trees
scaling = 1.0 / len(model.estimators_) # output is average of trees
self.trees = [Tree(e.tree_, scaling=scaling, data=data, data_missing=data_missing) for e in model.estimators_]
self.objective = objective_name_map.get(model.criterion, None)
self.tree_output = "raw_value"
elif safe_isinstance(model, "skopt.learning.forest.ExtraTreesRegressor"):
assert hasattr(model, "estimators_"), "Model has no `estimators_`! Have you called `model.fit`?"
self.internal_dtype = model.estimators_[0].tree_.value.dtype.type
self.input_dtype = np.float32
scaling = 1.0 / len(model.estimators_) # output is average of trees
self.trees = [Tree(e.tree_, scaling=scaling, data=data, data_missing=data_missing) for e in model.estimators_]
self.objective = objective_name_map.get(model.criterion, None)
self.tree_output = "raw_value"
elif safe_isinstance(model, ["sklearn.tree.DecisionTreeRegressor", "sklearn.tree.tree.DecisionTreeRegressor"]):
self.internal_dtype = model.tree_.value.dtype.type
self.input_dtype = np.float32
self.trees = [Tree(model.tree_, data=data, data_missing=data_missing)]
self.objective = objective_name_map.get(model.criterion, None)
self.tree_output = "raw_value"
elif safe_isinstance(model, ["sklearn.tree.DecisionTreeClassifier", "sklearn.tree.tree.DecisionTreeClassifier"]):
self.internal_dtype = model.tree_.value.dtype.type
self.input_dtype = np.float32
self.trees = [Tree(model.tree_, normalize=True, data=data, data_missing=data_missing)]
self.objective = objective_name_map.get(model.criterion, None)
self.tree_output = "probability"
elif safe_isinstance(model, ["sklearn.ensemble.RandomForestClassifier", "sklearn.ensemble.forest.RandomForestClassifier"]):
assert hasattr(model, "estimators_"), "Model has no `estimators_`! Have you called `model.fit`?"
self.internal_dtype = model.estimators_[0].tree_.value.dtype.type
self.input_dtype = np.float32
scaling = 1.0 / len(model.estimators_) # output is average of trees
self.trees = [Tree(e.tree_, normalize=True, scaling=scaling, data=data, data_missing=data_missing) for e in model.estimators_]
self.objective = objective_name_map.get(model.criterion, None)
self.tree_output = "probability"
tree_info = self.original_model.dump_model()["tree_info"]
try:
self.trees = [Tree(e, data=data, data_missing=data_missing) for e in tree_info]
except:
self.trees = None # we get here because the cext can't handle categorical splits yet
self.objective = objective_name_map.get(model.params.get("objective", "regression"), None)
self.tree_output = tree_output_name_map.get(model.params.get("objective", "regression"), None)
elif safe_isinstance(model, "lightgbm.sklearn.LGBMRegressor"):
assert_import("lightgbm")
self.model_type = "lightgbm"
self.original_model = model.booster_
tree_info = self.original_model.dump_model()["tree_info"]
try:
self.trees = [Tree(e, data=data, data_missing=data_missing) for e in tree_info]
except:
self.trees = None # we get here because the cext can't handle categorical splits yet
self.objective = objective_name_map.get(model.objective, None)
self.tree_output = tree_output_name_map.get(model.objective, None)
if model.objective is None:
self.objective = "squared_error"
self.tree_output = "raw_value"
elif safe_isinstance(model, "lightgbm.sklearn.LGBMRanker"):
assert_import("lightgbm")
self.model_type = "lightgbm"
self.original_model = model.booster_
tree_info = self.original_model.dump_model()["tree_info"]
try:
self.trees = [Tree(e, data=data, data_missing=data_missing) for e in tree_info]
except:
self.trees = None # we get here because the cext can't handle categorical splits yet
assert_import("pyspark")
self.original_model = model
self.model_type = "pyspark"
# model._java_obj.getImpurity() can be gini, entropy or variance.
self.objective = objective_name_map.get(model._java_obj.getImpurity(), None)
if "Classification" in str(type(model)):
normalize = True
self.tree_output = "probability"
else:
normalize = False
self.tree_output = "raw_value"
# Spark Random forest, create 1 weighted (avg) tree per sub-model
if safe_isinstance(model, "pyspark.ml.classification.RandomForestClassificationModel") \
or safe_isinstance(model, "pyspark.ml.regression.RandomForestRegressionModel"):
sum_weight = sum(model.treeWeights) # output is average of trees
self.trees = [Tree(tree, normalize=normalize, scaling=model.treeWeights[i]/sum_weight) for i, tree in enumerate(model.trees)]
# Spark GBT, create 1 weighted (learning rate) tree per sub-model
elif safe_isinstance(model, "pyspark.ml.classification.GBTClassificationModel") \
or safe_isinstance(model, "pyspark.ml.regression.GBTRegressionModel"):
self.objective = "squared_error" # GBT subtree use the variance
self.tree_output = "raw_value"
self.trees = [Tree(tree, normalize=False, scaling=model.treeWeights[i]) for i, tree in enumerate(model.trees)]
# Spark Basic model (single tree)
elif safe_isinstance(model, "pyspark.ml.classification.DecisionTreeClassificationModel") \
or safe_isinstance(model, "pyspark.ml.regression.DecisionTreeRegressionModel"):
self.trees = [Tree(model, normalize=normalize, scaling=1)]
else:
assert False, "Unsupported Spark model type: " + str(type(model))
elif safe_isinstance(model, "xgboost.core.Booster"):
import xgboost
self.original_model = model
self.model_type = "xgboost"
else:
split_feature_index = elem.get('ctr_target_border_idx')
borders.append(elem['border'])
split_features_index.append(split_feature_index)
split_features_index_unraveled = []
for counter, feature_index in enumerate(split_features_index[::-1]):
split_features_index_unraveled += [feature_index] * (2 ** counter)
split_features_index_unraveled += [0] * len(leaf_values)
borders_unraveled = []
for counter, border in enumerate(borders[::-1]):
borders_unraveled += [border] * (2 ** counter)
borders_unraveled += [0] * len(leaf_values)
trees.append(Tree({"children_left": np.array(children_left),
"children_right": np.array(children_right),
"children_default": np.array(children_default),
"feature": np.array(split_features_index_unraveled),
"threshold": np.array(borders_unraveled),
"value": np.array(leaf_values_unraveled).reshape((-1,1)),
"node_sample_weight": np.array(leaf_weights_unraveled),
}, data=data, data_missing=data_missing))
return trees
self.internal_dtype = model.estimators_[0].tree_.value.dtype.type
self.input_dtype = np.float32
scaling = 1.0 / len(model.estimators_) # output is average of trees
self.trees = [Tree(e.tree_, scaling=scaling, data=data, data_missing=data_missing) for e in model.estimators_]
self.objective = objective_name_map.get(model.criterion, None)
self.tree_output = "raw_value"
elif safe_isinstance(model, ["sklearn.tree.DecisionTreeRegressor", "sklearn.tree.tree.DecisionTreeRegressor"]):
self.internal_dtype = model.tree_.value.dtype.type
self.input_dtype = np.float32
self.trees = [Tree(model.tree_, data=data, data_missing=data_missing)]
self.objective = objective_name_map.get(model.criterion, None)
self.tree_output = "raw_value"
elif safe_isinstance(model, ["sklearn.tree.DecisionTreeClassifier", "sklearn.tree.tree.DecisionTreeClassifier"]):
self.internal_dtype = model.tree_.value.dtype.type
self.input_dtype = np.float32
self.trees = [Tree(model.tree_, normalize=True, data=data, data_missing=data_missing)]
self.objective = objective_name_map.get(model.criterion, None)
self.tree_output = "probability"
elif safe_isinstance(model, ["sklearn.ensemble.RandomForestClassifier", "sklearn.ensemble.forest.RandomForestClassifier"]):
assert hasattr(model, "estimators_"), "Model has no `estimators_`! Have you called `model.fit`?"
self.internal_dtype = model.estimators_[0].tree_.value.dtype.type
self.input_dtype = np.float32
scaling = 1.0 / len(model.estimators_) # output is average of trees
self.trees = [Tree(e.tree_, normalize=True, scaling=scaling, data=data, data_missing=data_missing) for e in model.estimators_]
self.objective = objective_name_map.get(model.criterion, None)
self.tree_output = "probability"
elif safe_isinstance(model, ["sklearn.ensemble.ExtraTreesClassifier", "sklearn.ensemble.forest.ExtraTreesClassifier"]): # TODO: add unit test for this case
assert hasattr(model, "estimators_"), "Model has no `estimators_`! Have you called `model.fit`?"
self.internal_dtype = model.estimators_[0].tree_.value.dtype.type
self.input_dtype = np.float32
scaling = 1.0 / len(model.estimators_) # output is average of trees
self.trees = [Tree(e.tree_, normalize=True, scaling=scaling, data=data, data_missing=data_missing) for e in model.estimators_]