Source code for Policies.CD_UCB

# -*- coding: utf-8 -*-
r""" The CD-UCB generic policy policies for non-stationary bandits.

- Reference: [["A Change-Detection based Framework for Piecewise-stationary Multi-Armed Bandit Problem". F. Liu, J. Lee and N. Shroff. arXiv preprint arXiv:1711.03539, 2017]](https://arxiv.org/pdf/1711.03539)
- It runs on top of a simple policy, e.g., :class:`UCB`, and :class:`UCBLCB_IndexPolicy` is a wrapper:

    >>> policy = UCBLCB_IndexPolicy(nbArms, UCB)
    >>> # use policy as usual, with policy.startGame(), r = policy.choice(), policy.getReward(arm, r)

- It uses an additional :math:`\mathcal{O}(\tau_\max)` memory for a game of maximum stationary length :math:`\tau_\max`.

.. warning:: It can only work on basic index policy based on empirical averages (and an exploration bias), like :class:`UCB`, and cannot work on any Bayesian policy (for which we would have to remember all previous observations in order to reset the history with a small history)!
"""
from __future__ import division, print_function  # Python 2 compatibility

__author__ = "Lilian Besson"
__version__ = "0.9"

import numpy as np
from math import log, sqrt, isinf

try:
    from .with_proba import with_proba
    from .BaseWrapperPolicy import BaseWrapperPolicy
    from .UCB import UCB as DefaultPolicy
except ImportError:
    from with_proba import with_proba
    from BaseWrapperPolicy import BaseWrapperPolicy
    from UCB import UCB as DefaultPolicy


#: Whether to be verbose when doing the change detection algorithm.
VERBOSE = False

#: Default probability of random exploration :math:`\alpha`.
PROBA_RANDOM_EXPLORATION = 0.1

#: Should we reset one arm empirical average or all? Default is ``True``, it's usually more efficient!
PER_ARM_RESTART = False
PER_ARM_RESTART = True

#: Should we fully restart the algorithm or simply reset one arm empirical average? Default is ``False``, it's usually more efficient!
FULL_RESTART_WHEN_REFRESH = True
FULL_RESTART_WHEN_REFRESH = False

#: Precision of the test. For CUSUM/PHT, :math:`\varepsilon` is the drift correction threshold (see algorithm).
EPSILON = 0.05

#: Default value of :math:`\lambda`.
LAMBDA = 1

#: Hypothesis on the speed of changes: between two change points, there is at least :math:`M * K` time steps, where K is the number of arms, and M is this constant.
MIN_NUMBER_OF_OBSERVATION_BETWEEN_CHANGE_POINT = 50

#: XXX Be lazy and try to detect changes only X steps, where X is small like 10 for instance.
#: It is a simple but efficient way to speed up CD tests, see https://github.com/SMPyBandits/SMPyBandits/issues/173
#: Default value is 0, to not use this feature, and 10 should speed up the test by x10.
LAZY_DETECT_CHANGE_ONLY_X_STEPS = 1
LAZY_DETECT_CHANGE_ONLY_X_STEPS = 10


# --- The very generic class

[docs]class CD_IndexPolicy(BaseWrapperPolicy): r""" The CD-UCB generic policy for non-stationary bandits, from [["A Change-Detection based Framework for Piecewise-stationary Multi-Armed Bandit Problem". F. Liu, J. Lee and N. Shroff. arXiv preprint arXiv:1711.03539, 2017]](https://arxiv.org/pdf/1711.03539). """
[docs] def __init__(self, nbArms, full_restart_when_refresh=FULL_RESTART_WHEN_REFRESH, per_arm_restart=PER_ARM_RESTART, epsilon=EPSILON, proba_random_exploration=None, lazy_detect_change_only_x_steps=LAZY_DETECT_CHANGE_ONLY_X_STEPS, *args, **kwargs ): super(CD_IndexPolicy, self).__init__(nbArms, *args, **kwargs) # New parameters self.epsilon = epsilon #: Parameter :math:`\varepsilon` for the test. self.lazy_detect_change_only_x_steps = lazy_detect_change_only_x_steps #: Be lazy and try to detect changes only X steps, where X is small like 10 for instance. if proba_random_exploration is not None: alpha = max(0, min(1, proba_random_exploration)) # crop to [0, 1] self.proba_random_exploration = alpha #: What they call :math:`\alpha` in their paper: the probability of uniform exploration at each time. self._full_restart_when_refresh = full_restart_when_refresh # Should we fully restart the algorithm or simply reset one arm empirical average ? self._per_arm_restart = per_arm_restart # Should we reset one arm empirical average or all? # Internal memory self.all_rewards = [[] for _ in range(self.nbArms)] #: Keep in memory all the rewards obtained since the last restart on that arm. self.last_pulls = np.zeros(nbArms, dtype=int) #: Keep in memory the number times since last restart. Start with -1 (never seen) self.last_restart_times = np.zeros(nbArms, dtype=int) #: Keep in memory the times of last restarts (for each arm). self.number_of_restart = 0 #: Keep in memory the number of restarts.
[docs] def __str__(self): return r"CD-{}($\varepsilon={:.3g}$, $\gamma={:.3g}$, {}{})".format(self._policy.__name__, self.epsilon, self.proba_random_exploration, "" if self._per_arm_restart else "Global", ", lazy detect {}".format(self.lazy_detect_change_only_x_steps) if self.lazy_detect_change_only_x_steps != LAZY_DETECT_CHANGE_ONLY_X_STEPS else "")
[docs] def choice(self): r""" With a probability :math:`\alpha`, play uniformly at random, otherwise, pass the call to :meth:`choice` of the underlying policy.""" if with_proba(self.proba_random_exploration): return np.random.randint(0, self.nbArms - 1) return self.policy.choice()
[docs] def choiceWithRank(self, rank=1): r""" With a probability :math:`\alpha`, play uniformly at random, otherwise, pass the call to :meth:`choiceWithRank` of the underlying policy.""" if with_proba(self.proba_random_exploration): return np.random.randint(0, self.nbArms - 1) return self.policy.choiceWithRank(rank=1)
[docs] def getReward(self, arm, reward): r""" Give a reward: increase t, pulls, and update cumulated sum of rewards and update small history (sliding window) for that arm (normalized in [0, 1]). - Reset the whole empirical average if the change detection algorithm says so, with method :meth:`detect_change`, for this arm at this current time step. .. warning:: This is computationally costly, so an easy way to speed up this step is to use :attr:`lazy_detect_change_only_x_steps` :math:`= \mathrm{Step_t}` for a small value (e.g., 10), so not test for all :math:`t\in\mathbb{N}^*` but only :math:`s\in\mathbb{N}^*, s % \mathrm{Step_t} = 0` (e.g., one out of every 10 steps). .. warning:: If the :math:`detect_change` method also returns an estimate of the position of the change-point, :math:`\hat{tau}`, then it is used to reset the memory of the changing arm and keep the observations from :math:`\hat{tau}+1`. """ super(CD_IndexPolicy, self).getReward(arm, reward) # Get reward reward = (reward - self.lower) / self.amplitude # We seen it one more time self.last_pulls[arm] += 1 # Store it in place for the empirical average of that arm self.all_rewards[arm].append(reward) should_you_try_to_detect = (self.last_pulls[arm] % self.lazy_detect_change_only_x_steps) == 0 if not should_you_try_to_detect: return has_detected_and_maybe_position = self.detect_change(arm) if isinstance(has_detected_and_maybe_position, bool): has_detected, position = has_detected_and_maybe_position, None else: has_detected, position = has_detected_and_maybe_position if not has_detected: return self.number_of_restart += 1 if position is None: # print("For a player {} a change was detected at time {} for arm {}, after {} pulls of that arm (giving mean reward = {:.3g}). Last restart on that arm was at tau = {}".format(self, self.t, arm, self.last_pulls[arm], np.sum(self.all_rewards[arm]) / self.last_pulls[arm], self.last_restart_times[arm])) # DEBUG if not self._per_arm_restart: # or reset current memory for ALL THE arms for other_arm in range(self.nbArms): self.last_restart_times[other_arm] = self.t self.last_pulls[other_arm] = 0 self.all_rewards[other_arm] = [] # reset current memory for THIS arm self.last_restart_times[arm] = self.t self.last_pulls[arm] = 1 self.all_rewards[arm] = [reward] # Fully restart the algorithm ?! if self._full_restart_when_refresh: self.startGame(createNewPolicy=True) # Or simply reset one of the empirical averages? else: if not self._per_arm_restart: # or reset current memory for ALL THE arms for other_arm in range(self.nbArms): self.policy.rewards[other_arm] = 0 self.policy.pulls[other_arm] = 0 # reset current memory for THIS arm self.policy.rewards[arm] = np.sum(self.all_rewards[arm]) self.policy.pulls[arm] = len(self.all_rewards[arm]) # XXX second case, for policies implementing the localization trick else: # print("For a player {} a change was detected at time {} for arm {}, after {} pulls of that arm (giving mean reward = {:.3g}). Last restart on that arm was at tau = {}".format(self, self.t, arm, self.last_pulls[arm], np.sum(self.all_rewards[arm]) / self.last_pulls[arm], self.last_restart_times[arm])) # DEBUG if not self._per_arm_restart: # or reset current memory for ALL THE arms for other_arm in range(self.nbArms): self.last_restart_times[other_arm] = self.t self.last_pulls[other_arm] = 0 self.all_rewards[other_arm] = [] # reset current memory for THIS arm self.all_rewards[arm] = self.all_rewards[arm][position:] + [reward] self.last_pulls[arm] = len(self.all_rewards[arm]) self.last_restart_times[arm] = self.t - len(self.all_rewards[arm]) + 1 # Fully restart the algorithm ?! if self._full_restart_when_refresh: self.startGame(createNewPolicy=True) # Or simply reset one of the empirical averages? else: if not self._per_arm_restart: # or reset current memory for ALL THE arms for other_arm in range(self.nbArms): self.policy.rewards[other_arm] = 0 self.policy.pulls[other_arm] = 0 # reset current memory for THIS arm self.policy.rewards[arm] = np.sum(self.all_rewards[arm]) self.policy.pulls[arm] = len(self.all_rewards[arm])
# we update the total number of samples available to the underlying policy # self.policy.t = sum(self.last_pulls) # DONE for the generic CD UCB policy, nothing to do here to change self.policy.t # XXX See CUSUM_UCB which forces self.policy.t = sum(last_pulls) # XXX See GLR_UCB which forces self.policy.t_for_each_arm = t - self.last_restart_times (different t for each arm)!
[docs] def detect_change(self, arm, verbose=VERBOSE): """ Try to detect a change in the current arm. .. warning:: This is not implemented for the generic CD algorithm, it has to be implement by a child of the class :class:`CD_IndexPolicy`. """ raise NotImplementedError
# --- Different change detection algorithms
[docs]class SlidingWindowRestart_IndexPolicy(CD_IndexPolicy): r""" A more generic implementation is the :class:`Policies.SlidingWindowRestart` class. .. warning:: I have no idea if what I wrote is correct or not! """
[docs] def detect_change(self, arm, verbose=VERBOSE): """ Try to detect a change in the current arm. .. warning:: This one is simply using a sliding-window of fixed size = 100. A more generic implementation is the :class:`Policies.SlidingWindowRestart` class. """ tau = self.M * self.nbArms if self.last_pulls[arm] >= tau and self.pulls[arm] >= tau: # Compute the empirical average for that arm empirical_average = self.rewards[arm] / self.pulls[arm] # And the small empirical average for that arm small_empirical_average = np.mean(self.last_rewards[arm]) if np.abs(empirical_average - small_empirical_average) >= self.epsilon: return True, None return False, None
# --- UCB-CDP based on LCB/UCB Mukherjee & Maillard's paper #: XXX Be lazy and try to detect changes for :math:`s` taking steps of size ``steps_s``. Default is to have ``steps_s=1``, but only using ``steps_s=2`` should already speed up by 2. #: It is a simple but efficient way to speed up GLR tests, see https://github.com/SMPyBandits/SMPyBandits/issues/173 #: Default value is 1, to not use this feature, and 10 should speed up the test by x10. LAZY_TRY_VALUE_S_ONLY_X_STEPS = 1 LAZY_TRY_VALUE_S_ONLY_X_STEPS = 10 #: Default value of ``use_localization`` for policies. All the experiments I tried showed that the localization always helps improving learning, so the default value is set to True. USE_LOCALIZATION = False USE_LOCALIZATION = True
[docs]class UCBLCB_IndexPolicy(CD_IndexPolicy): r""" The UCBLCB-UCB generic policy for non-stationary bandits, from [[Improved Changepoint Detection for Piecewise i.i.d Bandits, by S. Mukherjee & O.-A. Maillard, preprint 2018](https://subhojyoti.github.io/pdf/aistats_2019.pdf)]. .. warning:: This is still experimental! See https://github.com/SMPyBandits/SMPyBandits/issues/177 """
[docs] def __init__(self, nbArms, delta=None, delta0=1.0, lazy_try_value_s_only_x_steps=LAZY_TRY_VALUE_S_ONLY_X_STEPS, use_localization=USE_LOCALIZATION, *args, **kwargs ): super(UCBLCB_IndexPolicy, self).__init__(nbArms, per_arm_restart=False, *args, **kwargs) # New parameters self.proba_random_exploration = 0 #: What they call :math:`\alpha` in their paper: the probability of uniform exploration at each time. self.lazy_try_value_s_only_x_steps = lazy_try_value_s_only_x_steps #: Be lazy and try to detect changes for :math:`s` taking steps of size ``steps_s``. self.use_localization = use_localization #: experiment to use localization of the break-point, ie, restart memory of arm by keeping observations s+1...n instead of just the last one self._delta = delta self._delta0 = delta0
[docs] def __str__(self): args = "{}{}{}".format( "lazy detect {}, ".format(self.lazy_detect_change_only_x_steps) if self.lazy_detect_change_only_x_steps != LAZY_DETECT_CHANGE_ONLY_X_STEPS else "", "lazy s {}, ".format(self.lazy_try_value_s_only_x_steps) if self.lazy_try_value_s_only_x_steps != LAZY_TRY_VALUE_S_ONLY_X_STEPS else "", r"$\delta_0={:.3g}$, ".format(self._delta0) if self._delta0 != 1 else "", ) if args.endswith(', '): args = args[:-2] args = "({})".format(args) if args else "" return r"{}-CDP{}".format(self._policy.__name__, args)
[docs] def delta(self, t): r""" Use :math:`\delta = \delta_0` if it was given as an argument to the policy, or :math:`\frac{\delta_0}{t}` as the confidence level of UCB/LCB test (default is :math:`\delta_0=1`). .. warning:: It is unclear (in the article) whether :math:`t` is the time since the last restart or the total time? """ if self._delta is not None: return self._delta else: return self._delta0 / t
[docs] def detect_change(self, arm, verbose=VERBOSE): r""" Detect a change in the current arm, using the two-sided UCB-LCB algorithm [Mukherjee & Maillard, 2018]. - Let :math:`\hat{\mu}_{i,t:t'}` the empirical mean of rewards obtained for arm i from time :math:`t` to :math:`t'`, and :math:`N_{i,t:t'}` the number of samples. - Let :math:`S_{i,t:t'} = \sqrt{\frac{\log(4 t^2 / \delta)}{2 N_{i,t:t'}}}` the length of the confidence interval. - When we have data starting at :math:`t_0=0` (since last restart) and up-to current time :math:`t`, for each *arm* i, - For each intermediate time steps :math:`t' \in [t_0, t)`, - Compute :math:`LCB_{\text{before}} = \hat{\mu}_{i,t_0:t'} - S_{i,t_0:t'}`, - Compute :math:`UCB_{\text{before}} = \hat{\mu}_{i,t_0:t'} + S_{i,t_0:t'}`, - Compute :math:`LCB_{\text{after}} = \hat{\mu}_{i,t'+1:t} - S_{i,t'+1:t}`, - Compute :math:`UCB_{\text{after}} = \hat{\mu}_{i,t'+1:t} + S_{i,t'+1:t}`, - If :math:`UCB_{\text{before}} < LCB_{\text{after}}` or :math:`UCB_{\text{after}} < LCB_{\text{before}}`, then restart. """ for armId in range(self.nbArms): data_y = self.all_rewards[armId] t0 = 0 t = len(data_y)-1 if t <= 2: continue mean_all = np.mean(data_y[t0 : t+1]) mean_before = 0.0 mean_after = mean_all for s in range(t0, t): y = data_y[s] mean_before = (s * mean_before + y) / (s + 1) mean_after = ((t + 1 - s + t0) * mean_after - y) / (t - s + t0) if s % self.lazy_try_value_s_only_x_steps != 0: continue ucb_lcb_cst = sqrt(log(4 * t / self.delta(t)) / 2.0) S_before = ucb_lcb_cst / sqrt(s + 1) S_after = ucb_lcb_cst / sqrt(t - s + t0) ucb_after = mean_after + S_after lcb_before = mean_before - S_before if ucb_after < lcb_before: if verbose: print(" - For arm = {}, t0 = {}, s = {}, t = {}, the mean before mu(t0,s) = {:.3g} and the mean after mu(s+1,t) = {:.3g} and the S_before = {:.3g} and S_after = {:.3g}, so UCB_after = {:.3g} < LCB_before = {:.3g}...".format(armId, t0, s, t, mean_before, mean_after, S_before, S_after, ucb_after, lcb_before)) return True, t0 + s if self.use_localization else None ucb_before = mean_before + S_before lcb_after = mean_after - S_after if ucb_before < lcb_after: if verbose: print(" - For arm = {}, t0 = {}, s = {}, t = {}, the mean before mu(t0,s) = {:.3g} and the mean after mu(s+1,t) = {:.3g} and the S_before = {:.3g} and S_after = {:.3g}, so UCB_before = {:.3g} < LCB_after = {:.3g}...".format(armId, t0, s, t, mean_before, mean_after, S_before, S_after, ucb_before, lcb_after)) return True, t0 + s if self.use_localization else None return False, None