"""An action graph game with additive function nodes"""
import itertools
import numpy as np
import scipy.stats as spt
from gameanalysis import rsgame
from gameanalysis import utils
[docs]class AgfnGame(rsgame.CompleteGame):
"""Action graph with function nodes game
Action node utilities have additive structure. Function nodes are
contribution-independent. Graph is bipartite so that function nodes have
in-edges only from action nodes and vise versa.
Parameters
----------
role_names : (str,)
The name of each role.
strat_names : ((str,),)
The name of each strategy per role.
num_role_players : ndarray
The number of players for each role.
function_names : (str,)
The name of each function. Must be sorted, unique, and non-empty.
action_weights : ndarray
Each entry specifies the incoming weight in the action graph for the
action node (column). Must have shape (num_functions, num_strats). The
action weights for a particular function can't be all zero, otherwise
that function should not exist.
function_inputs : ndarray
Each entry specifies whether the action node (row) is an input to the
function node (col). Must have shape (num_strats, num_functions).
function_table : ndarray
Value of arbitrary functions for a number of players activating the
function. This can either have shape (num_functions, num_players + 1)
or (num_functions,) + tuple(num_role_players + 1). The former treats
different roles ass simply different strategy sets, the later treats
each nodes inputs as distinct, and so each function maps from the
number of inputs from each role.
"""
def __init__(self, role_names, strat_names, function_names,
num_role_players, action_weights, function_inputs,
function_table, offsets):
super().__init__(role_names, strat_names, num_role_players)
self.function_names = function_names
self.num_functions = len(function_names)
self.action_weights = action_weights
self.action_weights.setflags(write=False)
self.function_inputs = function_inputs
self.function_inputs.setflags(write=False)
self.function_table = function_table
self.function_table.setflags(write=False)
self.offsets = offsets
self.offsets.setflags(write=False)
# Pre-compute derivative info
self._dinputs = np.zeros(
(self.num_strats, self.num_functions, self.num_roles), bool)
self._dinputs[np.arange(self.num_strats), :, self.role_indices] = (
self.function_inputs)
self._dinputs.setflags(write=False)
self._function_index = {f: i for i, f in enumerate(function_names)}
# Compute other bookmarking stuff
self._basis = np.insert(
np.cumprod(self.num_role_players[:0:-1] + 1)[::-1],
self.num_roles - 1, 1)
self._func_offset = (np.arange(self.num_functions) *
np.prod(self.num_role_players + 1))
[docs] def function_index(self, func_name):
"""Get the index of a function by name"""
return self._function_index[func_name]
[docs] @utils.memoize
def min_strat_payoffs(self):
"""Returns a lower bound on the payoffs."""
node_table = self.function_table.reshape((self.num_functions, -1))
minima = node_table.min(1, keepdims=True)
maxima = node_table.max(1, keepdims=True)
eff_min = np.where(self.action_weights > 0, minima, maxima)
mins = np.einsum(
'ij,ij->j', eff_min, self.action_weights) + self.offsets
mins.setflags(write=False)
return mins.view()
[docs] @utils.memoize
def max_strat_payoffs(self):
"""Returns an upper bound on the payoffs."""
node_table = self.function_table.reshape((self.num_functions, -1))
minima = node_table.min(1, keepdims=True)
maxima = node_table.max(1, keepdims=True)
eff_max = np.where(self.action_weights > 0, maxima, minima)
maxs = np.einsum(
'ij,ij->j', eff_max, self.action_weights) + self.offsets
maxs.setflags(write=False)
return maxs.view()
[docs] def get_payoffs(self, profile):
"""Returns an array of profile payoffs."""
profile = np.asarray(profile, int)
function_inputs = np.add.reduceat(
profile[..., None, :] * self.function_inputs.T,
self.role_starts, -1)
inds = function_inputs.dot(self._basis) + self._func_offset
function_outputs = self.function_table.ravel()[inds]
payoffs = function_outputs.dot(self.action_weights) + self.offsets
payoffs[profile == 0] = 0
return payoffs
# TODO override get_dev_payoffs to be more efficient, i.e. only compute the
# dev payoff.
[docs] def deviation_payoffs(self, mix, *, jacobian=False):
"""Get the deviation payoffs"""
mix = np.asarray(mix, float)
role_node_probs = np.minimum(
np.add.reduceat(mix[:, None] * self.function_inputs,
self.role_starts), 1)[..., None]
table_probs = np.ones(
(self.num_roles, self.num_functions) +
tuple(self.num_role_players + 1),
float)
for i, (num_play, probs, zp) in enumerate(zip(
self.num_role_players, role_node_probs, self.zero_prob)):
role_probs = spt.binom.pmf(
np.arange(num_play + 1), num_play, probs)
dev_role_probs = spt.binom.pmf(
np.arange(num_play + 1), num_play - 1, probs)
new_shape = [self.num_functions] + [1] * self.num_roles
new_shape[i + 1] = num_play + 1
role_probs.shape = new_shape
dev_role_probs.shape = new_shape
table_probs[:i] *= role_probs
table_probs[i] *= dev_role_probs
table_probs[i + 1:] *= role_probs
dev_probs = table_probs.repeat(self.num_role_strats, 0)
for role, (rinps, rdev_probs) in enumerate(zip(
np.split(self.function_inputs, self.role_starts[1:], 0),
np.split(dev_probs, self.role_starts[1:], 0))):
rdev_probs[rinps] = np.roll(rdev_probs[rinps], 1, role + 1)
dev_vals = np.reshape(dev_probs * self.function_table,
(self.num_strats, self.num_functions, -1))
devs = (np.einsum('ijk,ji->i', dev_vals, self.action_weights) +
self.offsets)
if not jacobian:
return devs
deriv = np.empty((self.num_roles, self.num_roles, self.num_functions) +
tuple(self.num_role_players + 1), float)
for i, (num_play, probs, zp) in enumerate(zip(
self.num_role_players, role_node_probs, self.zero_prob)):
configs = np.arange(num_play + 1)
der = configs / (probs + zp) - configs[::-1] / (1 - probs + zp)
dev_der = np.insert(configs[:-1] / (probs + zp) - configs[-2::-1] /
(1 - probs + zp), num_play, 0, 1)
new_shape = [self.num_functions] + [1] * self.num_roles
new_shape[i + 1] = num_play + 1
der.shape = new_shape
dev_der.shape = new_shape
deriv[:i, i] = der
deriv[i, i] = dev_der
deriv[i + 1:, i] = der
dev_deriv = np.rollaxis(deriv, 2, 1).repeat(self.num_role_strats, 0)
for role, (rinps, rdev_deriv) in enumerate(zip(
np.split(self.function_inputs, self.role_starts[1:], 0),
np.split(dev_deriv, self.role_starts[1:], 0))):
rdev_deriv[rinps] = np.roll(rdev_deriv[rinps], 1, role + 2)
dev_values = dev_probs[:, :, None] * \
dev_deriv * self.function_table[:, None]
dev_values.shape = (self.num_strats,
self.num_functions, self.num_roles, -1)
jac = np.einsum('iklm,jkl,ki->ij', dev_values, self._dinputs,
self.action_weights)
return devs, jac
[docs] def normalize(self):
"""Return a normalized AgfnGame"""
scale = self.max_role_payoffs() - self.min_role_payoffs()
scale[np.isclose(scale, 0)] = 1
scale = scale.repeat(self.num_role_strats)
offsets = (self.offsets - self.min_role_payoffs().repeat(
self.num_role_strats)) / scale
return AgfnGame(
self.role_names, self.strat_names, self.function_names,
self.num_role_players, self.action_weights / scale,
self.function_inputs, self.function_table, offsets)
[docs] def restrict(self, rest):
rest = np.asarray(rest, bool)
base = rsgame.emptygame_copy(self).restrict(rest)
action_weights = self.action_weights[:, rest]
func_mask = np.any(~np.isclose(action_weights, 0), 1)
func_names = tuple(
n for n, m in zip(self.function_names, func_mask) if m)
return AgfnGame(
base.role_names, base.strat_names, func_names,
base.num_role_players, action_weights[func_mask],
self.function_inputs[:, func_mask][rest],
self.function_table[func_mask], self.offsets[rest])
[docs] def to_json(self):
res = super().to_json()
res['function_inputs'] = {
func: self.restriction_to_json(finp) for func, finp
in zip(self.function_names, self.function_inputs.T)}
res['action_weights'] = {
func: self.payoff_to_json(ws) for func, ws
in zip(self.function_names, self.action_weights)}
# XXX This will fail if a role has the name "value", do we care?
res['function_tables'] = {
name: [dict(zip(self.role_names, (c.item() for c in counts)),
value=val)
for val, *counts in zip(
tab.ravel(), *np.indices(tab.shape).reshape(
self.num_roles, -1))
if val != 0]
for name, tab in zip(self.function_names, self.function_table)}
if not np.allclose(self.offsets, 0):
res['offsets'] = self.payoff_to_json(self.offsets)
res['type'] = 'aggfn.2'
return res
def __repr__(self):
return '{old}, {nfuncs:d})'.format(
old=super().__repr__()[:-1],
nfuncs=self.num_functions)
def __eq__(self, other):
if not (super().__eq__(other) and
self.function_names == other.function_names and
self.function_table.shape == other.function_table.shape and
np.allclose(self.offsets, other.offsets)):
return False
selfp = np.lexsort(
self.function_table.reshape((self.num_functions, -1)).T)
otherp = np.lexsort(
other.function_table.reshape((other.num_functions, -1)).T)
return (np.all(self.function_inputs[:, selfp]
== other.function_inputs[:, otherp]) and
np.allclose(self.action_weights[selfp],
other.action_weights[otherp]) and
np.allclose(self.function_table[selfp],
other.function_table[otherp]))
@utils.memoize
def __hash__(self):
return hash((super().__hash__(), self.num_functions))
[docs]def aggfn(num_role_players, num_role_strats, action_weights, function_inputs,
function_table, offsets=None):
"""Create an Aggfn with default names
Parameters
----------
num_role_players : ndarray
The number of players per role.
num_role_strats : ndarray
The number of strategies per role.
action_weights : ndarray, float
The action weights.
function_inputs : ndarray, bool
The input mask for each function.
function_table : ndarray, float
The function value relative to number of incoming edges.
offsets : ndarray, float, optional
A constant offset for each strategies payoff. Constant functions are
not allowed in the function table as they are clutter, instead,
constant functions can be specified here.
"""
return aggfn_replace(rsgame.emptygame(num_role_players, num_role_strats),
action_weights, function_inputs, function_table,
offsets)
[docs]def aggfn_names(role_names, num_role_players, strat_names, function_names,
action_weights, function_inputs, function_table, offsets=None):
"""Create an Aggfn with specified names
Parameters
----------
role_names : [str]
The name of each role.
num_role_players : ndarray
The number of players for each role.
strat_names : [[str]]
The name of each strategy for each role.
function_names : [str]
The name of each function.
action_weights : ndarray
The mapping of each function to the strategy weight for a player.
function_inpits : ndarray
The mask indicating which strategies are inputs to which function.
offsets : ndarray, float, optional
A constant offset for each strategies payoff. Constant functions are
not allowed in the function table as they are clutter, instead,
constant functions can be specified here.
"""
return aggfn_names_replace(
rsgame.emptygame_names(role_names, num_role_players, strat_names),
function_names, action_weights, function_inputs, function_table,
offsets)
# TODO Make aggfn_copy method that will clone the aggfn game if it is one,
# else, it will regress on profiles to compute one.
[docs]def aggfn_replace(copy_game, action_weights, function_inputs, function_table,
offsets=None):
"""Replace an existing game with default function names
Parameters
----------
copy_game : RsGame
The game to take game structure from.
action_weights : ndarray-like
The weights of each function to player payoffs.
function_inputs : ndarray-like
The mask of each strategy to function.
function_table : ndarray-like
The lookup table of number of incoming edges to function value.
offsets : ndarray, float, optional
A constant offset for each strategies payoff. Constant functions are
not allowed in the function table as they are clutter, instead,
constant functions can be specified here.
"""
if hasattr(copy_game, 'function_names'):
function_names = copy_game.function_names
else:
function_names = tuple(utils.prefix_strings('f', len(action_weights)))
return aggfn_names_replace(copy_game, function_names, action_weights,
function_inputs, function_table, offsets)
[docs]def aggfn_names_replace(copy_game, function_names, action_weights,
function_inputs, function_table, offsets=None):
"""Replace an existing game with an Aggfn
Parameters
----------
copy_game : RsGame
The game to take game structure from.
function_names : [str]
The name of each function.
action_weights : ndarray-like
The weights of each function to player payoffs.
function_inputs : ndarray-like
The mask of each strategy to function.
function_table : ndarray-like
The lookup table of number of incoming edges to function value.
offsets : ndarray, float, optional
A constant offset for each strategies payoff. Constant functions are
not allowed in the function table as they are clutter, instead,
constant functions can be specified here.
"""
if offsets is None:
offsets = np.zeros(copy_game.num_strats)
function_names = tuple(function_names)
action_weights = np.asarray(action_weights, float)
function_inputs = np.asarray(function_inputs, bool)
function_table = np.asarray(function_table, float)
offsets = np.asarray(offsets, float)
num_funcs = len(function_names)
assert function_names, \
"must have at least one function"
assert (action_weights.shape == (num_funcs, copy_game.num_strats)), \
"action_weights must have shape (num_functions, num_strats)"
assert (function_inputs.shape == (copy_game.num_strats, num_funcs)), \
"function_inputs must have shape (num_strats, num_functions)"
assert not function_inputs.all(0).any(), \
"can't have a function with input from every strategy"
assert function_inputs.any(0).all(), \
"every function must take input from at least one strategy"
assert (function_table.shape == (num_funcs,) +
tuple(copy_game.num_role_players + 1)), \
"function_table must have shape (num_functions, ... num_role_players + 1)" # noqa
assert not np.isclose(
function_table.reshape((num_funcs, -1))[:, 0, None],
function_table.reshape((num_funcs, -1))).all(1).any(), \
"a function can't be constant (all identical values)"
assert not np.isclose(action_weights, 0).all(1).any(), \
"a function can't have actions weights of all zero"
assert all(isinstance(f, str) for f in function_names), \
"all function names must be strs"
assert utils.is_sorted(function_names, strict=True), \
"function_names must be sorted"
return AgfnGame(
copy_game.role_names, copy_game.strat_names, function_names,
copy_game.num_role_players, action_weights, function_inputs,
function_table, offsets)
[docs]def aggfn_funcs(num_role_players, num_role_strats, action_weights,
function_inputs, functions, offsets=None):
"""Construct and Aggfn with functions
This is generally less efficient than just constructing the function table
using vectorized operations or an existing function table.
Parameters
----------
num_role_players : ndarray
The number of players per role.
num_role_strats : ndarray
The number of strategies per role.
action_weights : ndarray, float
The action weights.
function_inputs : ndarray, bool
The input mask for each function.
functions : [(nr1, nr2, ...) -> float]
List of functions that maps the player per role activations to a single
value. The number of ordered arguments will be inferred from each
function.
"""
assert functions, "must have at least one function"
num_functions = len(functions)
base = rsgame.emptygame(num_role_players, num_role_strats)
function_table = np.empty(
(num_functions,) + tuple(base.num_role_players + 1),
float)
for func, tab in zip(functions, function_table):
for p in itertools.product(*map(range, base.num_role_players + 1)):
tab[p] = func(*p)
return aggfn_replace(base, action_weights, function_inputs, function_table)
[docs]def aggfn_json(json):
"""Read an Aggfn from json
Json versions of the game will generally have 'type': 'aggfn...' in them,
but as long as the proper fields exist, this will succeed."""
base = rsgame.emptygame_json(json)
function_names = sorted(json['function_tables'])
findex = {f: i for i, f in enumerate(function_names)}
num_functions = len(function_names)
version = json.get('type', '.1').split('.', 1)[1]
function_inputs = np.empty((base.num_strats, num_functions), bool)
action_weights = np.empty((num_functions, base.num_strats))
function_table = np.empty((num_functions,) +
tuple(base.num_role_players + 1))
offsets = np.empty(base.num_strats)
for func, inps in json['function_inputs'].items():
base.restriction_from_json(inps, function_inputs[:, findex[func]],
verify=False)
base.payoff_from_json(json.get('offsets', {}), offsets)
if version == '1':
action_weights.fill(0)
for role, strats in json['action_weights'].items():
for strat, funcs in strats.items():
rsi = base.role_strat_index(role, strat)
for func, val in funcs.items():
action_weights[findex[func], rsi] = val
for func, jtable in json['function_tables'].items():
atable = np.asarray(jtable, float)
if base.num_roles > 1 and atable.ndim == 1:
# Convert old sum format to role format
tab = function_table[findex[func]]
inds = np.indices(tab.shape)
tab[tuple(inds)] = atable[inds.sum(0)]
else:
function_table[findex[func]] = atable
# Find constant functions and remove them. Constant functions either
# have identical function values for every possible input, or are never
# activated or always activated.
flat_funcs = function_table.reshape(num_functions, -1)
vals = flat_funcs[:, 0].copy()
consts = np.isclose(vals[:, None], flat_funcs).all(1)
consts |= ~function_inputs.any(0)
always = function_inputs.all(0)
vals[always] = flat_funcs[always, -1]
consts |= always
offsets += vals[consts].dot(action_weights[consts])
function_inputs = function_inputs[:, ~consts]
action_weights = action_weights[~consts]
function_table = function_table[~consts]
flat_funcs = flat_funcs[~consts]
function_names = [n for n, c in zip(function_names, consts) if not c]
elif version == '2':
for func, weights in json['action_weights'].items():
base.payoff_from_json(weights, action_weights[findex[func]])
function_table.fill(0)
for func, jtable in json['function_tables'].items():
table = function_table[findex[func]]
for elem in jtable:
copy = elem.copy()
value = copy.pop('value')
table[tuple(int(i) for i in base.role_from_json(copy))] = value
else:
assert False, "unknown version \"{}\"".format(version)
return aggfn_names_replace(
base, function_names, action_weights, function_inputs, function_table,
offsets)