Source code for Policies.Posterior.DiscountedBeta

# -*- coding: utf-8 -*-
r""" Manipulate posteriors of Bernoulli/Beta experiments., for discounted Bayesian policies (:class:`Policies.DiscountedBayesianIndexPolicy`).
"""
from __future__ import division, print_function  # Python 2 compatibility

__author__ = "Lilian Besson"
__version__ = "0.9"

# Local imports
try:
    from .Beta import Beta, bernoulliBinarization

    from .with_proba import with_proba
except (ImportError, SystemError):
    from Beta import Beta, bernoulliBinarization

    from with_proba import with_proba

try:
    from numpy.random import beta as betavariate  # Faster! Yes!
except ImportError:
    from random import betavariate
from scipy.special import btdtri


# --- Constants

#: Default value for the discount factor :math:`\gamma\in(0,1)`.
#: ``0.95`` is empirically a reasonable value for short-term non-stationary experiments.
GAMMA = 0.95


# --- Class

[docs]class DiscountedBeta(Beta): r""" Manipulate posteriors of Bernoulli/Beta experiments, for discounted Bayesian policies (:class:`Policies.DiscountedBayesianIndexPolicy`). - It keeps :math:`\tilde{S}(t)` and :math:`\tilde{F}(t)` the *discounted* counts of successes and failures (S and F). """
[docs] def __init__(self, gamma=GAMMA, a=1, b=1): r""" Create a Beta posterior :math:`\mathrm{Beta}(\alpha, \beta)` with no observation, i.e., :math:`\alpha = 1` and :math:`\beta = 1` by default.""" assert a >= 0, "Error: parameter 'a' for Beta posterior has to be >= 0." # DEBUG self._a = a assert b >= 0, "Error: parameter 'b' for Beta posterior has to be >= 0." # DEBUG self._b = b self.N = [0, 0] #: List of two parameters [a, b] assert 0 < gamma <= 1, "Error: for a DiscountedBayesianIndexPolicy policy, the discount factor has to be in (0,1], but it was {}.".format(gamma) # DEBUG if gamma == 1: print("Warning: gamma = 1 is stupid, just use a regular Beta posterior!") # DEBUG self.gamma = gamma #: Discount factor :math:`\gamma\in(0,1)`.
[docs] def __str__(self): return r"DiscountedBeta(\alpha={:.3g}, \beta={:.3g})".format(self.N[1], self.N[0])
[docs] def reset(self, a=None, b=None): """Reset alpha and beta, both to 0 as when creating a new default DiscountedBeta.""" if a is None: a = self._a if b is None: b = self._b self.N = [0, 0]
[docs] def sample(self): """Get a random sample from the DiscountedBeta posterior (using :func:`numpy.random.betavariate`). - Used only by :class:`Thompson` Sampling and :class:`AdBandits` so far. """ return betavariate(self._a + self.N[1], self._b + self.N[0])
[docs] def quantile(self, p): """Return the p quantile of the DiscountedBeta posterior (using :func:`scipy.stats.btdtri`). - Used only by :class:`BayesUCB` and :class:`AdBandits` so far. """ return btdtri(self._a + self.N[1], self._b + self.N[0], p)
# Bug: do not call btdtri with (0.5,0.5,0.5) in scipy version < 0.9 (old)
[docs] def forget(self, obs): """Forget the last observation, and undiscount the count of observations.""" # print("Info: calling DiscountedBeta.forget() with obs = {}, self.N = {} and self.gamma = {} ...".format(obs, self.N, self.gamma)) # DEBUG # FIXED update this code, to accept obs that are FLOAT in [0, 1] and not just in {0, 1}... binaryObs = bernoulliBinarization(obs) self.N[binaryObs] = (self.N[binaryObs] - 1) / self.gamma otherObs = 1 - binaryObs self.N[otherObs] = self.N[otherObs] / self.gamma
[docs] def update(self, obs): r""" Add an observation, and discount the previous observations. - If obs is 1, update :math:`\alpha` the count of positive observations, - If it is 0, update :math:`\beta` the count of negative observations. - But instead of using :math:`\tilde{S}(t) = S(t)` and :math:`\tilde{N}(t) = N(t)`, they are updated at each time step using the discount factor :math:`\gamma`: .. math:: \tilde{S}(t+1) &= \gamma \tilde{S}(t) + r(t), \tilde{F}(t+1) &= \gamma \tilde{F}(t) + (1 - r(t)). .. note:: Otherwise, a trick with :func:`bernoulliBinarization` has to be used. """ # print("Info: calling DiscountedBeta.update() with obs = {}, self.N = {} and self.gamma = {} ...".format(obs, self.N, self.gamma)) # DEBUG # FIXED update this code, to accept obs that are FLOAT in [0, 1] and not just in {0, 1}... binaryObs = bernoulliBinarization(obs) self.N[binaryObs] = self.gamma * self.N[binaryObs] + 1 otherObs = 1 - binaryObs self.N[otherObs] = self.gamma * self.N[otherObs]
[docs] def discount(self): r""" Simply discount the old observation, when no observation is given at this time. .. math:: \tilde{S}(t+1) &= \gamma \tilde{S}(t), \tilde{F}(t+1) &= \gamma \tilde{F}(t). """ # print("Info: calling DiscountedBeta.discount() self.N = {} and self.gamma = {} ...".format(self.N, self.gamma)) # DEBUG self.N[0] = max(0, self.gamma * self.N[0]) self.N[1] = max(0, self.gamma * self.N[1])
[docs] def undiscount(self): r""" Simply cancel the discount on the old observation, when no observation is given at this time. .. math:: \tilde{S}(t+1) &= \frac{1}{\gamma} \tilde{S}(t), \tilde{F}(t+1) &= \frac{1}{\gamma} \tilde{F}(t). """ # print("Info: calling DiscountedBeta.undiscount() self.N = {} and self.gamma = {} ...".format(self.N, self.gamma)) # DEBUG self.N[0] = max(0, self.N[0] / self.gamma) self.N[1] = max(0, self.N[1] / self.gamma)