# Source code for Policies.CORRAL

```
# -*- coding: utf-8 -*-
r""" The CORRAL aggregation bandit algorithm, similar to Exp4 but not exactly equivalent.
The algorithm is a master A, managing several "slave" algorithms, :math:`A_1, ..., A_N`.
- At every step, one slave algorithm is selected, by a random selection from a trust distribution on :math:`[1,...,N]`.
- Then its decision is listen to, played by the master algorithm, and a feedback reward is received.
- The reward is reweighted by the trust of the listened algorithm, and given back to it.
- The other slaves, whose decision was not even asked, receive a zero reward, or no reward at all.
- The trust probabilities are first uniform, :math:`P_i = 1/N`, and then at every step, after receiving the feedback for *one* arm k (the reward), the trust in each slave Ai is updated: :math:`P_i` by the reward received.
- The detail about how to increase or decrease the probabilities are specified in the reference article.
.. note:: Reference: [["Corralling a Band of Bandit Algorithms", by A. Agarwal, H. Luo, B. Neyshabur, R.E. Schapire, 01.2017](https://arxiv.org/abs/1612.06246v2)].
"""
from __future__ import division, print_function # Python 2 compatibility
__author__ = "Lilian Besson"
__version__ = "0.6"
import numpy as np
import numpy.random as rn
from scipy.optimize import minimize_scalar
try:
from .BasePolicy import BasePolicy
except ImportError:
from BasePolicy import BasePolicy
# --- Renormalize function
[docs]def renormalize_reward(reward, lower=0., amplitude=1., trust=1., unbiased=True, mintrust=None):
r"""Renormalize the reward to `[0, 1]`:
- divide by (`trust/mintrust`) if `unbiased` is `True`.
- simply project to `[0, 1]` if `unbiased` is `False`,
.. warning:: If `mintrust` is unknown, the unbiased estimator CANNOT be projected back to a bounded interval.
"""
if unbiased:
if mintrust is not None:
return (reward - lower) / (amplitude * (trust / mintrust))
else:
return (reward - lower) / (amplitude * trust)
else:
return (reward - lower) / amplitude
[docs]def unnormalize_reward(reward, lower=0., amplitude=1.):
r"""Project back reward to `[lower, lower + amplitude]`."""
return lower + (reward * amplitude)
# --- Log-Barrier-OMD
[docs]def log_Barrier_OMB(trusts, losses, rates):
r""" A step of the *log-barrier Online Mirror Descent*, updating the trusts:
- Find :math:`\lambda \in [\min_i l_{t,i}, \max_i l_{t,i}]` such that :math:`\sum_i \frac{1}{1/p_{t,i} + \eta_{t,i}(l_{t,i} - \lambda)} = 1`.
- Return :math:`\mathbf{p}_{t+1,i}` such that :math:`\frac{1}{p_{t+1,i}} = \frac{1}{p_{t,i}} + \eta_{t,i}(l_{t,i} - \lambda)`.
- Note: uses :func:`scipy.optimize.minimize_scalar` for the optimization.
- Reference: [Learning in games: Robustness of fast convergence, by D.Foster, Z.Li, T.Lykouris, K.Sridharan, and E.Tardos, NIPS 2016].
"""
min_loss = max(0, np.min(losses))
max_loss = np.max(losses)
def objective(a_loss):
"""Objective function of the loss."""
lhs = np.sum(1. / ((1. / trusts) + rates * (losses - a_loss)))
rhs = 1.
# return np.abs(lhs - rhs)
return (lhs - rhs) ** 2
assert min_loss <= max_loss, "Error: the interval [min_loss, max_loss] = [{:.3g}, {:.3g}] is not a valid constraint...".format(min_loss, max_loss) # DEBUG
result = minimize_scalar(objective, bounds=(min_loss, max_loss), method='bounded')
best_loss = result.x
assert min_loss <= best_loss <= max_loss, "Error: the loss 'lambda={:.3g}' was supposed to be found in [min_loss, max_loss] = [{:.3g}, {:.3g}]...".format(best_loss, min_loss, max_loss) # DEBUG
new_trusts = 1. / ((1. / trusts) + rates * (losses - best_loss))
new_trusts /= np.sum(new_trusts)
assert np.isclose(np.sum(new_trusts), 1), "Error: the new trusts vector = {} was supposed to sum to 1 but does not...".format(list(new_trusts)) # DEBUG
if not np.all(new_trusts >= 0):
print("Warning: the new trusts vector = {} was supposed to be a valid probability >= 0, but it is not... Let's cheat!".format(list(new_trusts))) # DEBUG)
x = np.min(new_trusts)
assert x < 0
new_trusts /= np.abs(x)
new_trusts += 1
assert np.isclose(np.min(new_trusts), 0)
assert np.all(new_trusts >= 0)
new_trusts /= np.sum(new_trusts)
assert np.all(new_trusts >= 0), "Error: the new trusts vector = {} was supposed to be a valid probability >= 0, but it is not...".format(list(new_trusts)) # DEBUG
assert np.all(new_trusts <= 1), "Error: the new trusts vector = {} was supposed to be a valid probability <= 1, but it is not...".format(list(new_trusts)) # DEBUG
return new_trusts
# --- Parameters for the CORRAL algorithm
# Default values for the parameters
#: self.unbiased is a flag to know if the rewards are used as biased estimator,
#: i.e., just :math:`r_t`, or unbiased estimators, :math:`r_t / p_t`, if :math:`p_t` is the probability of selecting that arm at time :math:`t`.
#: It seemed to work better with unbiased estimators (of course).
UNBIASED = False
UNBIASED = True # Better
#: Whether to give back a reward to only one slave algorithm (default, `False`) or to all slaves who voted for the same arm
BROADCAST_ALL = True
BROADCAST_ALL = False
# --- CORRAL algorithm
[docs]class CORRAL(BasePolicy):
""" The CORRAL aggregation bandit algorithm, similar to Exp4 but not exactly equivalent."""
[docs] def __init__(self, nbArms, children=None,
horizon=None, rate=None,
unbiased=UNBIASED, broadcast_all=BROADCAST_ALL, prior='uniform',
lower=0., amplitude=1.
):
# Attributes
self.nbArms = nbArms #: Number of arms.
self.lower = lower #: Lower values for rewards.
self.amplitude = amplitude #: Larger values for rewards.
self.unbiased = unbiased #: Flag, see above.
self.broadcast_all = broadcast_all #: Flag, see above.
# FIXED I should make this algorithm subject to be used with DoublingTrickWrapper, by changing these if self.horizon is changed
self.gamma = 1. / horizon #: Constant :math:`\gamma = 1 / T`.
assert self.gamma < 1, "Error: parameter 'gamma' for a CORRAL player was expected to be < 1, but = {:.3g}...".format(self.gamma) # DEBUG
self.beta = np.exp(1. / np.log(horizon)) #: Constant :math:`\beta = \exp(1 / \log(T))`.
assert self.beta > 1, "Error: parameter 'beta' for a CORRAL player was expected to be > 1, but = {:.3g}...".format(self.beta) # DEBUG
self._default_parameters = True
self.nbChildren = nbChildren = len(children) #: Number N of slave algorithms.
if rate is None:
# Use the default horizon-dependent rate value
# rate = np.sqrt(nbChildren / (nbArms * horizon))
rate = np.sqrt(nbChildren / horizon)
else:
self._default_parameters = False
assert rate > 0, "Error: parameter 'rate' for a CORRAL player was expected to be > 0, but = {:.3g}...".format(rate) # DEBUG
self.rates = np.full(nbChildren, rate) #: Value of the learning rate (will be **increasing** in time).
# Internal object memory
self.children = [] #: List of slave algorithms.
for i, child in enumerate(children):
if isinstance(child, dict):
print(" Creating this child player from a dictionnary 'children[{}]' = {} ...".format(i, child)) # DEBUG
localparams = {'lower': lower, 'amplitude': amplitude}
localparams.update(child['params'])
self.children.append(child['archtype'](nbArms, **localparams))
elif isinstance(child, type):
print(" Using this not-yet created player 'children[{}]' = {} ...".format(i, child)) # DEBUG
self.children.append(child(nbArms, lower=lower, amplitude=amplitude)) # Create it here!
else:
print(" Using this already created player 'children[{}]' = {} ...".format(i, child)) # DEBUG
self.children.append(child)
# Initialize the arrays
# Assume uniform prior if not given or if = 'uniform'
self.trusts = np.full(nbChildren, 1. / nbChildren) #: Initial trusts in the slaves. Default to uniform, but a prior can also be given.
if prior is not None and prior != 'uniform':
assert len(prior) == nbChildren, "Error: the 'prior' argument given to CORRAL has to be an array of the good size ({}).".format(nbChildren) # DEBUG
self.trusts = prior
self.bar_trusts = np.copy(self.trusts) #: Initial bar trusts in the slaves. Default to uniform, but a prior can also be given.
# Internal vectorial memory
self.choices = np.full(self.nbChildren, -10000, dtype=int) #: Keep track of the last choices of each slave, to know whom to update if update_all_children is false.
# Internal memory, additionally to what is found not in Aggregator
self.last_choice = None #: Remember the index of the last child trusted for a decision.
self.losses = np.zeros(nbChildren) #: For the log-barrier OMD step, a vector of losses has to be given. Faster to keep it as an attribute instead of reallocating it every time.
self.rhos = self.bar_trusts / 2 #: I use the inverses of the :math:`\rho_{t,i}` from the Algorithm in the reference article. Simpler to understand, less numerical errors.
[docs] def __str__(self):
""" Nicely print the name of the algorithm with its relevant parameters."""
is_unbiased = "" if self.unbiased else ", biased"
is_broadcast_all = "broadcast to all" if self.broadcast_all else "broadcast to one"
if self._default_parameters:
return r"CORRAL($N={}${}, {})".format(self.nbChildren, is_unbiased, is_broadcast_all)
else:
if len(set(self.rhos)) > 1 or len(set(self.rates)) > 1:
return r"CORRAL($N={}${}, {}, $\gamma=1/T$, $\beta={:.3g}$, $\rho={}$, $\eta={}$)".format(self.nbChildren, is_unbiased, is_broadcast_all, self.beta, list(self.rhos), list(self.rates))
else:
return r"CORRAL($N={}${}, {}, $\gamma=1/T$, $\beta={:.3g}$, $\rho={:.2g}$, $\eta={:.2g}$)".format(self.nbChildren, is_unbiased, is_broadcast_all, self.beta, self.rhos[0], self.rates[0])
[docs] def __setattr__(self, name, value):
r"""Trick method, to update the :math:`\gamma` and :math:`\beta` parameters of the CORRAL algorithm if the horizon T changes.
- This is here just to eventually allow :class:`Policies.DoublingTrickWrapper` to be used with a CORRAL player.
.. warning:: Not tested yet!
"""
if name in ['horizon', '_horizon']:
horizon = float(value)
self.gamma = 1. / horizon #: Constant :math:`\gamma = 1 / T`.
self.beta = np.exp(1. / np.log(horizon)) #: Constant :math:`\beta = \exp(1 / \log(T))`.
else:
# self.__dict__[name] = value # <-- old style class
object.__setattr__(self, name, value) # <-- new style class
# --- Start the game
[docs] def startGame(self):
""" Start the game for each child."""
# Start all children
for i in range(self.nbChildren):
self.children[i].startGame()
# --- Get a reward
[docs] def getReward(self, arm, reward):
""" Give reward for each child, and then update the trust probabilities."""
reward = float(reward)
new_reward = renormalize_reward(reward, lower=self.lower, amplitude=self.amplitude, unbiased=False)
# new_reward = renormalize_reward(reward, lower=self.lower, amplitude=self.amplitude, trust=self.bar_trusts[self.last_choice], unbiased=self.unbiased)
# , mintrust=(self.gamma / self.nbChildren) # XXX
# print(" A CORRAL player {} received a reward = {:.3g} on arm {} and trust = {:.3g} on that choice = {}, giving {:.3g} ...".format(self, reward, arm, self.bar_trusts[self.last_choice], self.last_choice, new_reward)) # DEBUG
# 1. First, give rewards to all children
if self.broadcast_all:
for i, child in enumerate(self.children):
# # if i == self.last_choice:
# if self.choices[i] == arm:
# # Give reward, biased or not
# # child.getReward(arm, unnormalize_reward(new_reward, lower=self.lower, amplitude=self.amplitude))
child.getReward(arm, reward)
# child.getReward(arm, reward)
# else: # give 0 reward to all other children
# child.getReward(arm, 0) # <-- this is a bad idea!
else:
# XXX this makes WAY more sense!
self.children[self.last_choice].getReward(arm, reward)
# 2. Then reinitialize this array of losses
self.losses[:] = 0
assert 0 <= new_reward <= 1, "Error: the normalized reward {:.3g} was NOT in [0, 1] ...".format(new_reward) # DEBUG
if self.broadcast_all:
self.losses[self.choices == arm] = (1 - new_reward)
if self.unbiased:
self.losses[self.choices == arm] /= self.bar_trusts[self.choices == arm]
else:
self.losses[self.last_choice] = (1 - new_reward)
if self.unbiased:
self.losses[self.last_choice] /= self.bar_trusts[self.last_choice]
# 3. Compute the new trust proba, with a log-barrier Online-Mirror-Descent step
trusts = log_Barrier_OMB(self.trusts, self.losses, self.rates)
# 4. renormalize self.trusts to make it a proba dist
# In practice, it also decreases the self.trusts for the children who were wrong
self.trusts = trusts / np.sum(trusts) # XXX maybe this isn't necessary...
# add uniform mixing of proportion gamma
bar_trusts = (1 - self.gamma) * self.trusts + (self.gamma / self.nbChildren)
self.bar_trusts = bar_trusts / np.sum(bar_trusts) # XXX maybe this isn't necessary...
# 5. Compare trusts with the self.rhos values to compute the new learning rates and rhos
for i in range(self.nbChildren):
if self.bar_trusts[i] < self.rhos[i]:
# print(" For child #i = {}, the sampling trust was = {:.3g}, smaller than the threshold rho = {:.3g} so the learning rate is increased from {:.3g} to {:.3g}, and the threshold is now {:.3g} ...".format(i, self.bar_trusts[i], self.rhos[i], self.rates[i], self.rates[i] * self.beta, self.bar_trusts[i] / 2.)) # DEBUG
self.rhos[i] = self.bar_trusts[i] / 2.
self.rates[i] *= self.beta # increase the rate for this guy
# else: # nothing to do
# self.rhos[i] = self.rhos[i]
# self.rates[i] = self.rates[i]
# print(" The most trusted child policy is the {}th with confidence {}...".format(1 + np.argmax(self.bar_trusts), np.max(self.bar_trusts))) # DEBUG
assert np.isclose(np.sum(self.bar_trusts), 1), "Error: 'bar_trusts' do not sum to 1 but to {:.3g} instead...".format(np.sum(self.bar_trusts)) # DEBUG
# print("self.bar_trusts =", self.bar_trusts) # DEBUG
assert np.isclose(np.sum(self.trusts), 1), "Error: 'trusts' do not sum to 1 but to {:.3g} instead...".format(np.sum(self.trusts)) # DEBUG
# print("self.trusts =", self.trusts) # DEBUG
# --- Choice of arm methods
[docs] def choice(self):
""" Trust one of the slave and listen to his `choice`."""
# 1. first decide who to listen to
self.last_choice = rn.choice(self.nbChildren, p=self.bar_trusts)
if self.broadcast_all:
for i, child in enumerate(self.children):
self.choices[i] = child.choice()
else:
# 2. then listen to him
self.choices[self.last_choice] = self.children[self.last_choice].choice()
return self.choices[self.last_choice]
[docs] def choiceWithRank(self, rank=1):
""" Trust one of the slave and listen to his `choiceWithRank`."""
# 1. first decide who to listen to
self.last_choice = rn.choice(self.nbChildren, p=self.bar_trusts)
if self.broadcast_all:
for i, child in enumerate(self.children):
self.choices[i] = child.choiceWithRank(rank=rank)
else:
# 2. then listen to him
self.choices[self.last_choice] = self.children[self.last_choice].choiceWithRank(rank=rank)
return self.choices[self.last_choice]
[docs] def choiceFromSubSet(self, availableArms='all'):
""" Trust one of the slave and listen to his `choiceFromSubSet`."""
# 1. first decide who to listen to
self.last_choice = rn.choice(self.nbChildren, p=self.bar_trusts)
if self.broadcast_all:
for i, child in enumerate(self.children):
self.choices[i] = child.choiceFromSubSet(availableArms=availableArms)
else:
# 2. then listen to him
self.choices[self.last_choice] = self.children[self.last_choice].choiceFromSubSet(availableArms=availableArms)
return self.choices[self.last_choice]
[docs] def choiceMultiple(self, nb=1):
""" Trust one of the slave and listen to his `choiceMultiple`."""
# 1. first decide who to listen to
self.last_choice = rn.choice(self.nbChildren, p=self.bar_trusts)
if self.broadcast_all:
for i, child in enumerate(self.children):
self.choices[i] = child.choiceMultiple(nb=nb)
else:
# 2. then listen to him
self.choices[self.last_choice] = self.children[self.last_choice].choiceMultiple(nb=nb)
return self.choices[self.last_choice]
[docs] def choiceIMP(self, nb=1, startWithChoiceMultiple=True):
""" Trust one of the slave and listen to his `choiceIMP`."""
# 1. first decide who to listen to
self.last_choice = rn.choice(self.nbChildren, p=self.bar_trusts)
if self.broadcast_all:
for i, child in enumerate(self.children):
self.choices[i] = child.choiceIMP(nb=nb)
else:
# 2. then listen to him
self.choices[self.last_choice] = self.children[self.last_choice].choiceIMP(nb=nb)
return self.choices[self.last_choice]
[docs] def estimatedOrder(self):
r""" Trust one of the slave and listen to his `estimatedOrder`.
- Return the estimate order of the arms, as a permutation on :math:`[0,...,K-1]` that would order the arms by increasing means.
"""
# 1. first decide who to listen to
self.last_choice = rn.choice(self.nbChildren, p=self.bar_trusts)
# 2. then listen to him
return self.children[self.last_choice].estimatedOrder()
[docs] def estimatedBestArms(self, M=1):
""" Return a (non-necessarily sorted) list of the indexes of the M-best arms. Identify the set M-best."""
assert 1 <= M <= self.nbArms, "Error: the parameter 'M' has to be between 1 and K = {}, but it was {} ...".format(self.nbArms, M) # DEBUG
order = self.estimatedOrder()
return order[-M:]
```