Source code for Policies.UCBimproved

# -*- coding: utf-8 -*-
""" The UCB-Improved policy for bounded bandits, with knowing the horizon, as an example of successive elimination algorithm.

- Reference: [[Auer et al, 2010](https://link.springer.com/content/pdf/10.1007/s10998-010-3055-6.pdf)].
"""

__author__ = "Lilian Besson"
__version__ = "0.9"

from copy import copy
from numpy import sqrt, log
import numpy as np
np.seterr(divide='ignore')  # XXX dangerous in general, controlled here!

try:
    from .SuccessiveElimination import SuccessiveElimination
except ImportError:
    from SuccessiveElimination import SuccessiveElimination

#: Default value for parameter :math:`\alpha`.
ALPHA = 0.5


# --- Utility functions

[docs]def n_m(horizon, delta_m): r""" Function :math:`\lceil \frac{2 \log(T \Delta_m^2)}{\Delta_m^2} \rceil`.""" return int(np.ceil((2 * np.log(horizon * delta_m**2)) / delta_m**2))
# --- The class
[docs]class UCBimproved(SuccessiveElimination): """ The UCB-Improved policy for bounded bandits, with knowing the horizon, as an example of successive elimination algorithm. - Reference: [[Auer et al, 2010](https://link.springer.com/content/pdf/10.1007/s10998-010-3055-6.pdf)]. """
[docs] def __init__(self, nbArms, horizon=None, alpha=ALPHA, lower=0., amplitude=1.): super(UCBimproved, self).__init__(nbArms, lower=lower, amplitude=amplitude) assert horizon is not None, "Error: UCBimproved requires to know the horizon T." # DEBUG self.horizon = int(horizon) #: Parameter :math:`T` = known horizon of the experiment. self.alpha = alpha #: Parameter alpha #: Set of active arms self.activeArms = list(np.arange(self.nbArms)) #: Current estimate of the gap :math:`\Delta_0` self.estimate_delta = 1 #: Keep in memory the :math:`n_m` quantity, using :func:`n_m` self.max_nb_of_exploration = 1 #: Current round m self.current_m = 0 #: Bound :math:`m = \lfloor \frac{1}{2} \log_2(\frac{T}{e}) \rfloor` self.max_m = int(np.floor(0.5 * np.log2(horizon / np.exp(1)))) #: Also keep in memory when the arm was kicked out of the ``activeArms`` sets, so fake index can be given, if we ask to order the arms for instance. self.when_did_it_leave = np.zeros(self.nbArms, dtype=int) + float('-inf')
[docs] def __str__(self): return r"UCB-Improved($T={}$, $\alpha={:.3g}$)".format(self.horizon, self.alpha)
[docs] def update_activeArms(self): """ Update the set ``activeArms`` of active arms.""" # first compute UCB and LCB assert np.all(self.pulls >= 1), "Error: UCBimproved.update_activeArms() should not be called with min(pulls) = {} but >= 1...".format(np.min(self.pulls)) # DEBUG means = self.rewards / self.pulls exploration_bias = np.sqrt(self.alpha * (np.log(self.horizon * self.estimate_delta**2)) / self.max_nb_of_exploration) LCB = means - exploration_bias UCB = means + exploration_bias max_LCB = np.max(LCB) # now it's time to update activeArms, m, and estimate_delta new_active_arms = [ arm for arm in self.activeArms if UCB[arm] >= max_LCB ] # maybe we eliminated some arms, so we need to keep in memory when the left if len(new_active_arms) < len(self.activeArms): for arm in self.activeArms: if arm not in new_active_arms: # self.when_did_it_leave[arm] = self.current_m # XXX use current_m ? self.when_did_it_leave[arm] = self.t self.activeArms = new_active_arms # then update the rest self.estimate_delta /= 2.0 self.max_nb_of_exploration = n_m(self.horizon, self.estimate_delta) # and finally m self.current_m += 1 assert self.current_m <= self.max_m, "Error: the current m in UCBimproved policy, = {}, should always be smaller than max m = {}...".format(self.current_m, self.max_m) # DEBUG
[docs] def choice(self, recursive=False): r""" In policy based on successive elimination, choosing an arm is the same as choosing an arm from the set of active arms (``self.activeArms``) with method ``choiceFromSubSet``. """ if len(self.activeArms) == 1: return self.activeArms[0] else: arms_not_explored_enough = [ arm for arm in self.activeArms if self.pulls[arm] < self.max_nb_of_exploration ] if len(arms_not_explored_enough) == 0: self.update_activeArms() if not recursive: return self.choice(recursive=True) else: print("Warning: something is wrong for UCBimproved.choice(), in a recursive call it could find any arm not explored enough. Please DEBUG me!") # DEBUG return np.random.choice(self.nbArms) # here we have some arms that are not explored enough return np.random.choice(arms_not_explored_enough)
# --- boring methods
[docs] def computeIndex(self, arm): """ Nothing to do, just copy from ``when_did_it_leave``.""" self.index[arm] = self.when_did_it_leave[arm]