Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _predict_random_if_unfit(self, X, output_score):
warnings.warn("Model object has not been fit to data, predictions will be random.")
X = _check_X_input(X)
pred = self._name_arms(np.random.randint(self.nchoices, size = X.shape[0]))
if not output_score:
return pred
else:
return {"choice" : pred, "score" : (1.0 / self.nchoices) * np.ones(size = X.shape[0], dtype = "float64")}
def decision_function(self, X):
"""
Decision function before sigmoid transformation for new observations
Parameters
----------
X : array(n_samples, n_features)
Input data on which to predict.
Returns
-------
pred : array(n_samples, )
Raw prediction for each observation
"""
X = _check_X_input(X)
if self.fit_intercept:
return X.dot(self.w[:self.w.shape[0] - 1]) + self.w[-1]
else:
return X.dot(self.w)
def _predict(self, X, exploit = False):
X = _check_X_input(X)
if X.shape[0] == 0:
return np.array([])
if exploit:
return self._oracles.predict(X)
# fixed threshold, anything below is always random
if (self.decay == 1) or (self.decay is None):
pred, pred_max = self._calc_preds(X)
# variable threshold that needs to be updated
else:
remainder_window = self.window_size - self.window_cnt
# case 1: number of predictions to make would still fit within current window
----
While in theory, making predictions from this algorithm should be faster than from others,
the implementation here uses a Python loop for each observation, which is slow compared to
NumPy array lookups, so the predictions will be slower to calculate than those from other algorithms.
Parameters
----------
X : array (n_samples, n_features)
New observations for which to choose an action.
Returns
-------
pred : array (n_samples,)
Actions chosen by this technique.
"""
X = _check_X_input(X)
pred = np.zeros(X.shape[0])
shape_single = list(X.shape)
shape_single[0] = 1
Parallel(n_jobs=self.njobs, verbose=0, require="sharedmem")(delayed(self._predict)(pred, i, shape_single, X) for i in range(X.shape[0]))
return np.array(pred).astype('int64')
Note
----
For details on how this is calculated, see the documentation of the
RegressionOneVsRest and WeightedAllPairs classes in the costsensitive package.
Parameters
----------
X : array (n_samples, n_features)
New observations for which to evaluate actions.
Returns
-------
pred : array (n_samples, n_choices)
Score assigned to each arm for each observation (see Note).
"""
X = _check_X_input(X)
return self.oracle.decision_function(X)
def predict(self, X):
"""
Predict best arm for new data.
Parameters
----------
X : array (n_samples, n_features)
New observations for which to choose an action.
Returns
-------
pred : array (n_samples,)
Actions chosen by this technique.
"""
X = _check_X_input(X)
return self.oracle.predict(X)
def decision_function(self, X, output_score=False, apply_sigmoid_score=True):
"""
Get the scores for each arm following this policy's action-choosing criteria.
Parameters
----------
X : array (n_samples, n_features)
Data for which to obtain decision function scores for each arm.
Returns
-------
scores : array (n_samples, n_choices)
Scores following this policy for each arm.
"""
X = _check_X_input(X)
if not self.is_fitted:
raise ValueError("Object has not been fit to data.")
return self._oracles.predict_proba(X)
def _predict(self, X, exploit = False):
X = _check_X_input(X)
if X.shape[0] == 0:
return np.array([])
if exploit:
return self._oracles.predict(X)
if self.explore_cnt < self.explore_rounds:
self.explore_cnt += X.shape[0]
# case 1: all predictions are within allowance
if self.explore_cnt <= self.explore_rounds:
return np.random.randint(self.nchoices, size = X.shape[0])
# case 2: some predictions are within allowance, others are not
else:
def _check_inputs(self, X, y, sample_weight):
X = _check_X_input(X)
y = _check_1d_inp(y)
assert X.shape[0] == y.shape[0]
if sample_weight is None:
sample_weight = np.ones(X.shape[0])
assert sample_weight.shape[0] == X.shape[0]
sample_weight /= sample_weight.sum()
return X, y, sample_weight
def decision_function_std(self, X):
"""
Get the predicted "probabilities" from each arm from the classifier that predicts it,
standardized to sum up to 1 (note that these are no longer probabilities).
Parameters
----------
X : array (n_samples, n_features)
Data for which to obtain decision function scores for each arm.
Returns
-------
scores : array (n_samples, n_choices)
Scores following this policy for each arm.
"""
X = _check_X_input(X)
if not self.is_fitted:
raise ValueError("Object has not been fit to data.")
return self._oracles.predict_proba(X)