# Source code for Policies.CD_UCB

# -*- coding: utf-8 -*-
r""" The CD-UCB generic policy policies for non-stationary bandits.

- Reference: [["A Change-Detection based Framework for Piecewise-stationary Multi-Armed Bandit Problem". F. Liu, J. Lee and N. Shroff. arXiv preprint arXiv:1711.03539, 2017]](https://arxiv.org/pdf/1711.03539)
- It runs on top of a simple policy, e.g., :class:UCB, and :class:UCBLCB_IndexPolicy is a wrapper:

>>> policy = UCBLCB_IndexPolicy(nbArms, UCB)
>>> # use policy as usual, with policy.startGame(), r = policy.choice(), policy.getReward(arm, r)

- It uses an additional :math:\mathcal{O}(\tau_\max) memory for a game of maximum stationary length :math:\tau_\max.

.. warning:: It can only work on basic index policy based on empirical averages (and an exploration bias), like :class:UCB, and cannot work on any Bayesian policy (for which we would have to remember all previous observations in order to reset the history with a small history)!
"""
from __future__ import division, print_function  # Python 2 compatibility

__author__ = "Lilian Besson"
__version__ = "0.9"

import numpy as np
from math import log, sqrt, isinf

try:
from .with_proba import with_proba
from .BaseWrapperPolicy import BaseWrapperPolicy
from .UCB import UCB as DefaultPolicy
except ImportError:
from with_proba import with_proba
from BaseWrapperPolicy import BaseWrapperPolicy
from UCB import UCB as DefaultPolicy

#: Whether to be verbose when doing the change detection algorithm.
VERBOSE = False

#: Default probability of random exploration :math:\alpha.
PROBA_RANDOM_EXPLORATION = 0.1

#: Should we reset one arm empirical average or all? Default is True, it's usually more efficient!
PER_ARM_RESTART = False
PER_ARM_RESTART = True

#: Should we fully restart the algorithm or simply reset one arm empirical average? Default is False, it's usually more efficient!
FULL_RESTART_WHEN_REFRESH = True
FULL_RESTART_WHEN_REFRESH = False

#: Precision of the test. For CUSUM/PHT, :math:\varepsilon is the drift correction threshold (see algorithm).
EPSILON = 0.05

#: Default value of :math:\lambda.
LAMBDA = 1

#: Hypothesis on the speed of changes: between two change points, there is at least :math:M * K time steps, where K is the number of arms, and M is this constant.
MIN_NUMBER_OF_OBSERVATION_BETWEEN_CHANGE_POINT = 50

#: XXX Be lazy and try to detect changes only X steps, where X is small like 10 for instance.
#: It is a simple but efficient way to speed up CD tests, see https://github.com/SMPyBandits/SMPyBandits/issues/173
#: Default value is 0, to not use this feature, and 10 should speed up the test by x10.
LAZY_DETECT_CHANGE_ONLY_X_STEPS = 1
LAZY_DETECT_CHANGE_ONLY_X_STEPS = 10

# --- The very generic class

[docs]class CD_IndexPolicy(BaseWrapperPolicy):
r""" The CD-UCB generic policy for non-stationary bandits, from [["A Change-Detection based Framework for Piecewise-stationary Multi-Armed Bandit Problem". F. Liu, J. Lee and N. Shroff. arXiv preprint arXiv:1711.03539, 2017]](https://arxiv.org/pdf/1711.03539).
"""
[docs]    def __init__(self, nbArms,
full_restart_when_refresh=FULL_RESTART_WHEN_REFRESH,
per_arm_restart=PER_ARM_RESTART,
epsilon=EPSILON,
proba_random_exploration=None,
lazy_detect_change_only_x_steps=LAZY_DETECT_CHANGE_ONLY_X_STEPS,
*args, **kwargs
):
super(CD_IndexPolicy, self).__init__(nbArms, *args, **kwargs)
# New parameters
self.epsilon = epsilon  #: Parameter :math:\varepsilon for the test.
self.lazy_detect_change_only_x_steps = lazy_detect_change_only_x_steps  #: Be lazy and try to detect changes only X steps, where X is small like 10 for instance.
if proba_random_exploration is not None:
alpha = max(0, min(1, proba_random_exploration))  # crop to [0, 1]
self.proba_random_exploration = alpha  #: What they call :math:\alpha in their paper: the probability of uniform exploration at each time.
self._full_restart_when_refresh = full_restart_when_refresh  # Should we fully restart the algorithm or simply reset one arm empirical average ?
self._per_arm_restart = per_arm_restart  # Should we reset one arm empirical average or all?
# Internal memory
self.all_rewards = [[] for _ in range(self.nbArms)]  #: Keep in memory all the rewards obtained since the last restart on that arm.
self.last_pulls = np.zeros(nbArms, dtype=int)  #: Keep in memory the number times since last restart. Start with -1 (never seen)
self.last_restart_times = np.zeros(nbArms, dtype=int)  #: Keep in memory the times of last restarts (for each arm).
self.number_of_restart = 0  #: Keep in memory the number of restarts.

[docs]    def __str__(self):
return r"CD-{}($\varepsilon={:.3g}$, $\gamma={:.3g}$, {}{})".format(self._policy.__name__, self.epsilon, self.proba_random_exploration, "" if self._per_arm_restart else "Global", ", lazy detect {}".format(self.lazy_detect_change_only_x_steps) if self.lazy_detect_change_only_x_steps != LAZY_DETECT_CHANGE_ONLY_X_STEPS else "")

[docs]    def choice(self):
r""" With a probability :math:\alpha, play uniformly at random, otherwise, pass the call to :meth:choice of the underlying policy."""
if with_proba(self.proba_random_exploration):
return np.random.randint(0, self.nbArms - 1)
return self.policy.choice()

[docs]    def choiceWithRank(self, rank=1):
r""" With a probability :math:\alpha, play uniformly at random, otherwise, pass the call to :meth:choiceWithRank of the underlying policy."""
if with_proba(self.proba_random_exploration):
return np.random.randint(0, self.nbArms - 1)
return self.policy.choiceWithRank(rank=1)

[docs]    def getReward(self, arm, reward):
r""" Give a reward: increase t, pulls, and update cumulated sum of rewards and update small history (sliding window) for that arm (normalized in [0, 1]).

- Reset the whole empirical average if the change detection algorithm says so, with method :meth:detect_change, for this arm at this current time step.

.. warning:: This is computationally costly, so an easy way to speed up this step is to use :attr:lazy_detect_change_only_x_steps :math:= \mathrm{Step_t} for a small value (e.g., 10), so not test for all :math:t\in\mathbb{N}^* but only :math:s\in\mathbb{N}^*, s % \mathrm{Step_t} = 0 (e.g., one out of every 10 steps).

.. warning:: If the :math:detect_change method also returns an estimate of the position of the change-point, :math:\hat{tau}, then it is used to reset the memory of the changing arm and keep the observations from :math:\hat{tau}+1.
"""
super(CD_IndexPolicy, self).getReward(arm, reward)
# Get reward
reward = (reward - self.lower) / self.amplitude
# We seen it one more time
self.last_pulls[arm] += 1
# Store it in place for the empirical average of that arm
self.all_rewards[arm].append(reward)

should_you_try_to_detect = (self.last_pulls[arm] % self.lazy_detect_change_only_x_steps) == 0
if not should_you_try_to_detect: return
has_detected_and_maybe_position = self.detect_change(arm)
if isinstance(has_detected_and_maybe_position, bool):
has_detected, position = has_detected_and_maybe_position, None
else:
has_detected, position = has_detected_and_maybe_position
if not has_detected: return
self.number_of_restart += 1
if position is None:
# print("For a player {} a change was detected at time {} for arm {}, after {} pulls of that arm (giving mean reward = {:.3g}). Last restart on that arm was at tau = {}".format(self, self.t, arm, self.last_pulls[arm], np.sum(self.all_rewards[arm]) / self.last_pulls[arm], self.last_restart_times[arm]))  # DEBUG

if not self._per_arm_restart:
# or reset current memory for ALL THE arms
for other_arm in range(self.nbArms):
self.last_restart_times[other_arm] = self.t
self.last_pulls[other_arm] = 0
self.all_rewards[other_arm] = []
# reset current memory for THIS arm
self.last_restart_times[arm] = self.t
self.last_pulls[arm] = 1
self.all_rewards[arm] = [reward]

# Fully restart the algorithm ?!
if self._full_restart_when_refresh:
self.startGame(createNewPolicy=True)
# Or simply reset one of the empirical averages?
else:
if not self._per_arm_restart:
# or reset current memory for ALL THE arms
for other_arm in range(self.nbArms):
self.policy.rewards[other_arm] = 0
self.policy.pulls[other_arm] = 0
# reset current memory for THIS arm
self.policy.rewards[arm] = np.sum(self.all_rewards[arm])
self.policy.pulls[arm] = len(self.all_rewards[arm])
# XXX second case, for policies implementing the localization trick
else:
# print("For a player {} a change was detected at time {} for arm {}, after {} pulls of that arm (giving mean reward = {:.3g}). Last restart on that arm was at tau = {}".format(self, self.t, arm, self.last_pulls[arm], np.sum(self.all_rewards[arm]) / self.last_pulls[arm], self.last_restart_times[arm]))  # DEBUG

if not self._per_arm_restart:
# or reset current memory for ALL THE arms
for other_arm in range(self.nbArms):
self.last_restart_times[other_arm] = self.t
self.last_pulls[other_arm] = 0
self.all_rewards[other_arm] = []
# reset current memory for THIS arm
self.all_rewards[arm] = self.all_rewards[arm][position:] + [reward]
self.last_pulls[arm] = len(self.all_rewards[arm])
self.last_restart_times[arm] = self.t - len(self.all_rewards[arm]) + 1

# Fully restart the algorithm ?!
if self._full_restart_when_refresh:
self.startGame(createNewPolicy=True)
# Or simply reset one of the empirical averages?
else:
if not self._per_arm_restart:
# or reset current memory for ALL THE arms
for other_arm in range(self.nbArms):
self.policy.rewards[other_arm] = 0
self.policy.pulls[other_arm] = 0
# reset current memory for THIS arm
self.policy.rewards[arm] = np.sum(self.all_rewards[arm])
self.policy.pulls[arm] = len(self.all_rewards[arm])

# we update the total number of samples available to the underlying policy
# self.policy.t = sum(self.last_pulls)  # DONE for the generic CD UCB policy, nothing to do here to change self.policy.t
# XXX See CUSUM_UCB which forces self.policy.t = sum(last_pulls)
# XXX See GLR_UCB which forces self.policy.t_for_each_arm = t - self.last_restart_times (different t for each arm)!

[docs]    def detect_change(self, arm, verbose=VERBOSE):
""" Try to detect a change in the current arm.

.. warning:: This is not implemented for the generic CD algorithm, it has to be implement by a child of the class :class:CD_IndexPolicy.
"""
raise NotImplementedError

# --- Different change detection algorithms

[docs]class SlidingWindowRestart_IndexPolicy(CD_IndexPolicy):
r""" A more generic implementation is the :class:Policies.SlidingWindowRestart class.

.. warning:: I have no idea if what I wrote is correct or not!
"""

[docs]    def detect_change(self, arm, verbose=VERBOSE):
""" Try to detect a change in the current arm.

.. warning:: This one is simply using a sliding-window of fixed size = 100. A more generic implementation is the :class:Policies.SlidingWindowRestart class.
"""
tau = self.M * self.nbArms
if self.last_pulls[arm] >= tau and self.pulls[arm] >= tau:
# Compute the empirical average for that arm
empirical_average = self.rewards[arm] / self.pulls[arm]
# And the small empirical average for that arm
small_empirical_average = np.mean(self.last_rewards[arm])
if np.abs(empirical_average - small_empirical_average) >= self.epsilon:
return True, None
return False, None

# --- UCB-CDP based on LCB/UCB Mukherjee & Maillard's paper

#: XXX Be lazy and try to detect changes for :math:s taking steps of size steps_s. Default is to have steps_s=1, but only using steps_s=2 should already speed up by 2.
#: It is a simple but efficient way to speed up GLR tests, see https://github.com/SMPyBandits/SMPyBandits/issues/173
#: Default value is 1, to not use this feature, and 10 should speed up the test by x10.
LAZY_TRY_VALUE_S_ONLY_X_STEPS = 1
LAZY_TRY_VALUE_S_ONLY_X_STEPS = 10

#: Default value of use_localization for policies. All the experiments I tried showed that the localization always helps improving learning, so the default value is set to True.
USE_LOCALIZATION = False
USE_LOCALIZATION = True

[docs]class UCBLCB_IndexPolicy(CD_IndexPolicy):
r""" The UCBLCB-UCB generic policy for non-stationary bandits, from [[Improved Changepoint Detection for Piecewise i.i.d Bandits, by S. Mukherjee  & O.-A. Maillard, preprint 2018](https://subhojyoti.github.io/pdf/aistats_2019.pdf)].

.. warning:: This is still experimental! See https://github.com/SMPyBandits/SMPyBandits/issues/177
"""
[docs]    def __init__(self, nbArms,
delta=None, delta0=1.0,
lazy_try_value_s_only_x_steps=LAZY_TRY_VALUE_S_ONLY_X_STEPS,
use_localization=USE_LOCALIZATION,
*args, **kwargs
):
super(UCBLCB_IndexPolicy, self).__init__(nbArms, per_arm_restart=False, *args, **kwargs)
# New parameters
self.proba_random_exploration = 0  #: What they call :math:\alpha in their paper: the probability of uniform exploration at each time.
self.lazy_try_value_s_only_x_steps = lazy_try_value_s_only_x_steps  #: Be lazy and try to detect changes for :math:s taking steps of size steps_s.
self.use_localization = use_localization  #: experiment to use localization of the break-point, ie, restart memory of arm by keeping observations s+1...n instead of just the last one
self._delta = delta
self._delta0 = delta0

[docs]    def __str__(self):
args = "{}{}{}".format(
"lazy detect {}, ".format(self.lazy_detect_change_only_x_steps) if self.lazy_detect_change_only_x_steps != LAZY_DETECT_CHANGE_ONLY_X_STEPS else "",
"lazy s {}, ".format(self.lazy_try_value_s_only_x_steps) if self.lazy_try_value_s_only_x_steps != LAZY_TRY_VALUE_S_ONLY_X_STEPS else "",
r"$\delta_0={:.3g}$, ".format(self._delta0) if self._delta0 != 1 else "",
)
if args.endswith(', '): args = args[:-2]
args = "({})".format(args) if args else ""
return r"{}-CDP{}".format(self._policy.__name__, args)

[docs]    def delta(self, t):
r""" Use :math:\delta = \delta_0 if it was given as an argument to the policy, or :math:\frac{\delta_0}{t} as the confidence level of UCB/LCB test (default is :math:\delta_0=1).

.. warning:: It is unclear (in the article) whether :math:t is the time since the last restart or the total time?
"""
if self._delta is not None:
return self._delta
else:
return self._delta0 / t

[docs]    def detect_change(self, arm, verbose=VERBOSE):
r""" Detect a change in the current arm, using the two-sided UCB-LCB algorithm [Mukherjee & Maillard, 2018].

- Let :math:\hat{\mu}_{i,t:t'} the empirical mean of rewards obtained for arm i from time :math:t to :math:t', and :math:N_{i,t:t'} the number of samples.
- Let :math:S_{i,t:t'} = \sqrt{\frac{\log(4 t^2 / \delta)}{2 N_{i,t:t'}}} the length of the confidence interval.

- When we have data starting at :math:t_0=0 (since last restart) and up-to current time :math:t, for each *arm* i,
- For each intermediate time steps :math:t' \in [t_0, t),
- Compute :math:LCB_{\text{before}} = \hat{\mu}_{i,t_0:t'} - S_{i,t_0:t'},
- Compute :math:UCB_{\text{before}} = \hat{\mu}_{i,t_0:t'} + S_{i,t_0:t'},
- Compute :math:LCB_{\text{after}} = \hat{\mu}_{i,t'+1:t} - S_{i,t'+1:t},
- Compute :math:UCB_{\text{after}} = \hat{\mu}_{i,t'+1:t} + S_{i,t'+1:t},
- If :math:UCB_{\text{before}} < LCB_{\text{after}} or :math:UCB_{\text{after}} < LCB_{\text{before}}, then restart.
"""
for armId in range(self.nbArms):
data_y = self.all_rewards[armId]
t0 = 0
t = len(data_y)-1
if t <= 2:
continue
mean_all = np.mean(data_y[t0 : t+1])
mean_before = 0.0
mean_after = mean_all
for s in range(t0, t):
y = data_y[s]
mean_before = (s * mean_before + y) / (s + 1)
mean_after = ((t + 1 - s + t0) * mean_after - y) / (t - s + t0)
if s % self.lazy_try_value_s_only_x_steps != 0:
continue

ucb_lcb_cst = sqrt(log(4 * t / self.delta(t)) / 2.0)
S_before = ucb_lcb_cst / sqrt(s + 1)
S_after  = ucb_lcb_cst / sqrt(t - s + t0)
ucb_after  = mean_after  + S_after
lcb_before = mean_before - S_before

if ucb_after < lcb_before:
if verbose: print("  - For arm = {}, t0 = {}, s = {}, t = {}, the mean before mu(t0,s) = {:.3g} and the mean after mu(s+1,t) = {:.3g} and the S_before = {:.3g} and S_after = {:.3g}, so UCB_after = {:.3g} < LCB_before = {:.3g}...".format(armId, t0, s, t, mean_before, mean_after, S_before, S_after, ucb_after, lcb_before))
return True, t0 + s if self.use_localization else None

ucb_before = mean_before + S_before
lcb_after  = mean_after  - S_after

if ucb_before < lcb_after:
if verbose: print("  - For arm = {}, t0 = {}, s = {}, t = {}, the mean before mu(t0,s) = {:.3g} and the mean after mu(s+1,t) = {:.3g} and the S_before = {:.3g} and S_after = {:.3g}, so UCB_before = {:.3g} < LCB_after = {:.3g}...".format(armId, t0, s, t, mean_before, mean_after, S_before, S_after, ucb_before, lcb_after))
return True, t0 + s if self.use_localization else None
return False, None