Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Returns
-------
pred : array (n_samples,) or dict("choice" : array(n_samples,), "score" : array(n_samples,))
Actions chosen by the policy. If passing output_score=True, it will be a dictionary
with the chosen arm and the score that the arm got following this policy with the classifiers used.
"""
if not self.is_fitted:
return _BasePolicy._predict_random_if_unfit(self, X, output_score)
X = _check_X_input(X)
pred = np.zeros((X.shape[0], self.nchoices))
Parallel(n_jobs=self.njobs, verbose=0, require="sharedmem")(delayed(self._predict)(choice, pred, exploit, X) for choice in range(self.nchoices))
if output_score:
score_max = np.max(pred, axis=1)
pred = _BasePolicy._name_arms(self, np.argmax(pred, axis = 1))
if not output_score:
return pred
else:
return {"choice" : pred, "score" : score_max}
New observations for which to choose an action according to this policy.
exploit : bool
Whether to make a prediction according to the policy, or to just choose the
arm with the highest expected reward according to current models.
output_score : bool
Whether to output the score that this method predicted, in case it is desired to use
it with this pakckage's offpolicy and evaluation modules.
Returns
-------
pred : array (n_samples,) or dict("choice" : array(n_samples,), "score" : array(n_samples,))
Actions chosen by the policy. If passing output_score=True, it will be a dictionary
with the chosen arm and the score that the arm got following this policy with the classifiers used.
"""
if not self.is_fitted:
return _BasePolicy._predict_random_if_unfit(self, X, output_score)
X = _check_X_input(X)
pred = np.zeros((X.shape[0], self.nchoices))
Parallel(n_jobs=self.njobs, verbose=0, require="sharedmem")(delayed(self._predict)(choice, pred, exploit, X) for choice in range(self.nchoices))
if output_score:
score_max = np.max(pred, axis=1)
pred = _BasePolicy._name_arms(self, np.argmax(pred, axis = 1))
if not output_score:
return pred
else:
return {"choice" : pred, "score" : score_max}
if not self.is_fitted:
return self._predict_random_if_unfit(X, False)
X = _check_X_input(X)
pred = self._oracles.decision_function(X)
if not exploit:
change_greedy = np.random.random(size=X.shape[0]) <= self.explore_prob
if change_greedy.sum() > 0:
pred[change_greedy, :] = self._crit_active(X[change_greedy, :], pred[change_greedy, :], gradient_calc)
if self.decay is not None:
self.explore_prob *= self.decay ** X.shape[0]
return self._name_arms(np.argmax(pred, axis = 1))
class SoftmaxExplorer(_BasePolicy):
"""
SoftMax Explorer
Selects an action according to probabilites determined by a softmax transformation
on the scores from the decision function that predicts each class.
Note
----
Will apply an inverse sigmoid transformations to the probabilities that come from the base algorithm
before applying the softmax function.
Parameters
----------
base_algorithm : obj
Base binary classifier for which each sample for each class will be fit.
Actions chosen by the policy. If passing output_score=True, it will be a dictionary
with the chosen arm and the score that the arm got following this policy with the classifiers used.
"""
if not self.is_fitted:
return self._predict_random_if_unfit(X, output_score)
scores = self.decision_function(X)
pred = self._name_arms(np.argmax(scores, axis = 1))
if not output_score:
return pred
else:
score_max = np.max(scores, axis=1).reshape((-1, 1))
return {"choice" : pred, "score" : score_max}
class EpsilonGreedy(_BasePolicy):
"""
Epsilon Greedy
Takes a random action with probability p, or the action with highest
estimated reward with probability 1-p.
Parameters
----------
base_algorithm : obj
Base binary classifier for which each sample for each class will be fit.
Will look for, in this order:
1) A 'predict_proba' method with outputs (n_samples, 2), values in [0,1], rows suming to 1
2) A 'decision_function' method with unbounded outputs (n_samples,) to which it will apply a sigmoid function.
3) A 'predict' method with outputs (n_samples,) with values in [0,1].
Can also pass a list with a different (or already-fit) classifier for each arm.
nchoices : int or list-like
X = _check_X_input(X)
if not self.is_fitted:
raise ValueError("Object has not been fit to data.")
return self._oracles.decision_function(X)
def _predict_random_if_unfit(self, X, output_score):
warnings.warn("Model object has not been fit to data, predictions will be random.")
X = _check_X_input(X)
pred = self._name_arms(np.random.randint(self.nchoices, size = X.shape[0]))
if not output_score:
return pred
else:
return {"choice" : pred, "score" : (1.0 / self.nchoices) * np.ones(size = X.shape[0], dtype = "float64")}
class _BasePolicyWithExploit(_BasePolicy):
def _add_bootstrapped_inputs(self, base_algorithm, batch_sample_method, nsamples, njobs_samples, percentile):
assert (batch_sample_method == 'gamma') or (batch_sample_method == 'poisson')
assert isinstance(nsamples, int)
assert nsamples >= 2
self.batch_sample_method = batch_sample_method
self.nsamples = nsamples
self.njobs_samples = _check_njobs(njobs_samples)
if "predict_proba" in dir(base_algorithm):
self.base_algorithm = _BootstrappedClassifier_w_predict_proba(
base_algorithm, self.nsamples, percentile,
self.batch_train, self.batch_sample_method, njobs = self.njobs_samples
)
elif "decision_function" in dir(base_algorithm):
self.base_algorithm = _BootstrappedClassifier_w_decision_function(
base_algorithm, self.nsamples, percentile,
self.batch_train, self.batch_sample_method, njobs = self.njobs_samples
if np.any(set_greedy):
self._choose_greedy(set_greedy, X, pred, pred_proba)
return pred, pred_max
def _choose_greedy(self, set_greedy, X, pred, pred_all):
if self.active_choice is None:
pred[set_greedy] = np.random.randint(self.nchoices, size = set_greedy.sum())
else:
pred[set_greedy] = np.argmax(
self._crit_active(
X[set_greedy],
pred_all[set_greedy],
self.active_choice),
axis = 1)
class ExploreFirst(_BasePolicy):
"""
Explore First, a.k.a. Explore-Then-Exploit
Selects random actions for the first N predictions, after which it selects the
best arm only according to its estimates.
Parameters
----------
base_algorithm : obj
Base binary classifier for which each sample for each class will be fit.
Will look for, in this order:
1) A 'predict_proba' method with outputs (n_samples, 2), values in [0,1], rows suming to 1
2) A 'decision_function' method with unbounded outputs (n_samples,) to which it will apply a sigmoid function.
3) A 'predict' method with outputs (n_samples,) with values in [0,1].
Can also pass a list with a different (or already-fit) classifier for each arm.
nchoices : int or list-like
References
----------
.. [1] Cortes, David. "Adapting multi-armed bandits policies to contextual bandits scenarios."
arXiv preprint arXiv:1811.04383 (2018).
.. [2] Chapelle, Olivier, and Lihong Li. "An empirical evaluation of thompson sampling."
Advances in neural information processing systems. 2011.
"""
def __init__(self, base_algorithm, nchoices, nsamples=10, beta_prior='auto', smoothing=None,
batch_train=False, assume_unique_reward=False, batch_sample_method='gamma',
njobs_arms=1, njobs_samples=-1):
self._add_common_params(base_algorithm, beta_prior, smoothing, njobs_arms, nchoices,
batch_train, assume_unique_reward, assign_algo=False)
self._add_bootstrapped_inputs(base_algorithm, batch_sample_method, nsamples, njobs_samples, None)
class SeparateClassifiers(_BasePolicy):
"""
Separate Clasifiers per arm
Fits one classifier per arm using only the data on which that arm was chosen.
Predicts as One-Vs-Rest.
Parameters
----------
base_algorithm : obj
Base binary classifier for which each sample for each class will be fit.
Will look for, in this order:
1) A 'predict_proba' method with outputs (n_samples, 2), values in [0,1], rows suming to 1
2) A 'decision_function' method with unbounded outputs (n_samples,) to which it will apply a sigmoid function.
3) A 'predict' method with outputs (n_samples,) with values in [0,1].
Can also pass a list with a different (or already-fit) classifier for each arm.
nchoices : int or list-like
if not exploit:
ix_change_rnd = (np.random.random(size = X.shape[0]) <= self.explore_prob)
pred[ix_change_rnd] = np.random.randint(self.nchoices, size = ix_change_rnd.sum())
pred = self._name_arms(pred)
if self.decay is not None:
self.explore_prob *= self.decay ** X.shape[0]
if not output_score:
return pred
else:
score_max = np.max(scores, axis = 1).reshape((-1, 1))
score_max[ix_change_rnd] = 1 / self.nchoices
return {"choice" : pred, "score" : score_max}
class _ActivePolicy(_BasePolicy):
def _crit_active(self, X, pred, grad_crit):
for choice in range(self.nchoices):
if self._oracles.should_calculate_grad(choice) or self._force_fit:
grad_norms = self._get_grad_norms(self._oracles.algos[choice], X, pred[:, choice])
else:
grad_norms = self._rand_grad_norms(X,
self._oracles.get_n_pos(choice), self._oracles.get_n_neg(choice))
if grad_crit == 'min':
pred[:, choice] = grad_norms.min(axis = 1)
elif grad_crit == 'max':
pred[:, choice] = grad_norms.max(axis = 1)
elif grad_crit == 'weighted':
pred[:, choice] = (pred[:, choice].reshape((-1, 1)) * grad_norms).sum(axis = 1)
else: