# -*- coding: utf-8 -*-
""" The UCB-V policy for bounded bandits, with a variance correction term.
Reference: [Audibert, Munos, & Szepesvári - Theoret. Comput. Sci., 2009].
"""
from __future__ import division, print_function # Python 2 compatibility
__author__ = "Olivier Cappé, Aurélien Garivier, Lilian Besson"
__version__ = "0.5"
from math import sqrt, log
import numpy as np
np.seterr(divide='ignore') # XXX dangerous in general, controlled here!
try:
from .UCB import UCB
except ImportError:
from UCB import UCB
[docs]class UCBV(UCB):
""" The UCB-V policy for bounded bandits, with a variance correction term.
Reference: [Audibert, Munos, & Szepesvári - Theoret. Comput. Sci., 2009].
"""
[docs] def __str__(self):
return "UCB-V"
[docs] def __init__(self, nbArms, lower=0., amplitude=1.):
super(UCBV, self).__init__(nbArms, lower=lower, amplitude=amplitude)
self.rewardsSquared = np.zeros(self.nbArms) #: Keep track of squared of rewards, to compute an empirical variance
[docs] def startGame(self):
super(UCBV, self).startGame()
self.rewardsSquared.fill(0)
[docs] def getReward(self, arm, reward):
"""Give a reward: increase t, pulls, and update cumulated sum of rewards and of rewards squared for that arm (normalized in [0, 1])."""
super(UCBV, self).getReward(arm, reward)
self.rewardsSquared[arm] += ((reward - self.lower) / self.amplitude) ** 2
[docs] def computeIndex(self, arm):
r""" Compute the current index, at time t and after :math:`N_k(t)` pulls of arm k:
.. math::
\hat{\mu}_k(t) &= \frac{X_k(t)}{N_k(t)}, \\
V_k(t) &= \frac{Z_k(t)}{N_k(t)} - \hat{\mu}_k(t)^2, \\
I_k(t) &= \hat{\mu}_k(t) + \sqrt{\frac{2 \log(t) V_k(t)}{N_k(t)}} + 3 (b - a) \frac{\log(t)}{N_k(t)}.
Where rewards are in :math:`[a, b]`, and :math:`V_k(t)` is an estimator of the variance of rewards,
obtained from :math:`X_k(t) = \sum_{\sigma=1}^{t} 1(A(\sigma) = k) r_k(\sigma)` is the sum of rewards from arm k,
and :math:`Z_k(t) = \sum_{\sigma=1}^{t} 1(A(\sigma) = k) r_k(\sigma)^2` is the sum of rewards *squared*.
"""
if self.pulls[arm] < 1:
return float('+inf')
else:
mean = self.rewards[arm] / self.pulls[arm] # Mean estimate
variance = (self.rewardsSquared[arm] / self.pulls[arm]) - mean ** 2 # Variance estimate
return mean + sqrt(2.0 * log(self.t) * variance / self.pulls[arm]) + 3.0 * self.amplitude * log(self.t) / self.pulls[arm]
[docs] def computeAllIndex(self):
""" Compute the current indexes for all arms, in a vectorized manner."""
means = self.rewards / self.pulls # Mean estimate
variances = (self.rewardsSquared / self.pulls) - means ** 2 # Variance estimate
indexes = means + np.sqrt(2.0 * np.log(self.t) * variances / self.pulls) + 3.0 * self.amplitude * np.log(self.t) / self.pulls
indexes[self.pulls < 1] = float('+inf')
self.index[:] = indexes