Source code for gameanalysis.aggfn

import inspect
import itertools

import numpy as np
import scipy.stats as spt

from gameanalysis import gameio
from gameanalysis import rsgame
from gameanalysis import utils


[docs]class AgfnGame(rsgame.BaseGame): """Action graph with function nodes game Action node utilities have additive structure. Function nodes are contribution-independent. Graph is bipartite so that function nodes have in-edges only from action nodes and vise versa. Parameters ---------- num_players : ndarray or int num_strategies : ndarray or int action_weights : ndarray, float Each entry specifies the incoming weight in the action graph for the action node (column). Must have shape (num_functions, num_role_strats). function_inputs : ndarray, bool Each entry specifies whether the action node (row) is an input to the function node (col). Must have shape (num_role_strats, num_functions). function_table : ndarray, float Value of arbitrary functions for a number of players activating the function. This can either have shape (num_functions, num_all_players + 1) or (num_functions,) + tuple(num_players + 1). The former treats different roles ass simply different strategy sets, the later treats each nodes inputs as distinct, and so each function maps from the number of inputs from each role. """ def __init__(self, num_players, num_strategies, action_weights, function_inputs, function_table): super().__init__(num_players, num_strategies) self._action_weights = np.asarray(action_weights, float) self._function_inputs = np.asarray(function_inputs, bool) self._function_table = np.asarray(function_table, float) self.num_functions = self._function_table.shape[0] # Verify proper formatting of data assert (self._action_weights.shape == (self.num_functions, self.num_role_strats)) assert (self._function_inputs.shape == (self.num_role_strats, self.num_functions)) assert self._function_table.shape[0] == self.num_functions assert (self._function_table.shape[1] == self.num_all_players + 1) \ or (self._function_table.shape[1:] == tuple(self.num_players + 1)) assert self._function_inputs.any( 0).all(), "not every function get input" assert np.any(self._action_weights != 0, 1).all(), "not every function used" assert np.any(self._action_weights != 0, 0).all( ), "strategy doesn't get payoff" # Select appropriate functions for data format if not self.is_symmetric() and len(self._function_table.shape) == 2: self.deviation_payoffs = self._deviation_payoffs_sum self.get_payoffs = self._get_payoffs_sum # Compute minimum and maximum payoffs node_table = self._function_table.reshape((self.num_functions, -1)) minima = node_table.min(1, keepdims=True).repeat( self.num_role_strats, 1) minima[self._action_weights <= 0] = 0 maxima = node_table.max(1, keepdims=True).repeat( self.num_role_strats, 1) maxima[self._action_weights >= 0] = 0 self._min_payoffs = self.role_reduce( np.sum((minima + maxima) * self._action_weights, 0), ufunc=np.minimum) minima = node_table.min(1, keepdims=True).repeat( self.num_role_strats, 1) minima[self._action_weights >= 0] = 0 maxima = node_table.max(1, keepdims=True).repeat( self.num_role_strats, 1) maxima[self._action_weights <= 0] = 0 self._max_payoffs = self.role_reduce( np.sum((minima + maxima) * self._action_weights, 0), ufunc=np.maximum) # Pre-compute derivative info self._dinputs = np.zeros( (self.num_role_strats, self.num_functions, self.num_roles), bool) self._dinputs[np.arange(self.num_role_strats), :, self.role_indices] = self._function_inputs # Set data to read only self._action_weights.setflags(write=False) self._function_inputs.setflags(write=False) self._function_table.setflags(write=False) self._dinputs.setflags(write=False) self._min_payoffs.setflags(write=False) self._max_payoffs.setflags(write=False)
[docs] def is_complete(self): return True
[docs] def min_payoffs(self): """Returns a lower bound on the payoffs.""" return self._min_payoffs
[docs] def max_payoffs(self): """Returns an upper bound on the payoffs.""" return self._max_payoffs
[docs] def deviation_payoffs(self, mix, assume_complete=True, jacobian=False): """Get the deviation payoffs""" role_node_probs = np.minimum(self.role_reduce( mix[:, None] * self._function_inputs, 0), 1)[..., None] table_probs = np.ones( (self.num_roles, self.num_functions) + tuple(self.num_players + 1), float) for i, (num_play, probs, zp) in enumerate(zip( self.num_players, role_node_probs, self.zero_prob)): role_probs = spt.binom.pmf( np.arange(num_play + 1), num_play, probs) dev_role_probs = spt.binom.pmf( np.arange(num_play + 1), num_play - 1, probs) new_shape = [self.num_functions] + [1] * self.num_roles new_shape[i + 1] = num_play + 1 role_probs.shape = new_shape dev_role_probs.shape = new_shape table_probs[:i] *= role_probs table_probs[i] *= dev_role_probs table_probs[i + 1:] *= role_probs dev_probs = self.role_repeat(table_probs, 0) for role, (rinps, rdev_probs) in enumerate(zip( self.role_split(self._function_inputs, 0), self.role_split(dev_probs, 0))): rdev_probs[rinps] = np.roll(rdev_probs[rinps], 1, role + 1) dev_vals = np.reshape(dev_probs * self._function_table, (self.num_role_strats, self.num_functions, -1)) devs = np.sum(np.sum(dev_vals, -1) * self._action_weights.T, -1) if not jacobian: return devs deriv = np.empty((self.num_roles, self.num_roles, self.num_functions) + tuple(self.num_players + 1), float) for i, (num_play, probs, zp) in enumerate(zip( self.num_players, role_node_probs, self.zero_prob)): configs = np.arange(num_play + 1) der = configs / (probs + zp) - configs[::-1] / (1 - probs + zp) dev_der = np.insert(configs[:-1] / (probs + zp) - configs[-2::-1] / (1 - probs + zp), num_play, 0, 1) new_shape = [self.num_functions] + [1] * self.num_roles new_shape[i + 1] = num_play + 1 der.shape = new_shape dev_der.shape = new_shape deriv[:i, i] = der deriv[i, i] = dev_der deriv[i + 1:, i] = der dev_deriv = self.role_repeat(np.rollaxis(deriv, 2, 1), 0) for role, (rinps, rdev_deriv) in enumerate(zip( self.role_split(self._function_inputs, 0), self.role_split(dev_deriv, 0))): rdev_deriv[rinps] = np.roll(rdev_deriv[rinps], 1, role + 2) dev_values = dev_probs[:, :, None] * \ dev_deriv * self._function_table[:, None] dev_values.shape = (self.num_role_strats, self.num_functions, self.num_roles, -1) jac = np.sum( np.sum(dev_values.sum(-1)[:, None] * self._dinputs, -1) * self._action_weights.T[:, None], -1) jac -= self.role_repeat(self.role_reduce(jac) / self.num_strategies) return devs, jac
def _deviation_payoffs_sum(self, mix, assume_complete=True, jacobian=False): role_node_probs = np.minimum(self.role_reduce( mix[:, None] * self._function_inputs, 0), 1)[..., None] role_fft = np.ones((self.num_roles, self.num_functions, self.num_all_players + 1), np.complex128) if jacobian: drole_fft = np.ones( (self.num_roles, self.num_roles, self.num_functions, self.num_all_players + 1), np.complex128) for i, (num_play, probs, zp) in enumerate(zip( self.num_players, role_node_probs, self.zero_prob)): configs = np.arange(num_play + 1) role_probs = spt.binom.pmf(configs, num_play, probs) dev_role_probs = spt.binom.pmf(configs[:-1], num_play - 1, probs) fft = np.fft.fft(role_probs, self.num_all_players + 1) dev_fft = np.fft.fft(dev_role_probs, self.num_all_players + 1) role_fft[:i] *= fft role_fft[i] *= dev_fft role_fft[i + 1:] *= fft if not jacobian: continue der = configs / (probs + zp) - configs[::-1] / (1 - probs + zp) dev_der = (configs[:-1] / (probs + zp) - configs[-2::-1] / (1 - probs + zp)) dfft = np.fft.fft(der * role_probs, self.num_all_players + 1) ddev_fft = np.fft.fft(dev_der * dev_role_probs, self.num_all_players + 1) drole_fft[:i, :i] *= fft drole_fft[i, :i] *= dev_fft drole_fft[i + 1:, :i] *= fft drole_fft[:i, i] *= dfft drole_fft[i, i] *= ddev_fft drole_fft[i + 1:, i] *= dfft drole_fft[:i, i + 1:] *= fft drole_fft[i, i + 1:] *= dev_fft drole_fft[i + 1:, i + 1:] *= fft dev_probs = self.role_repeat(utils.simplex_project( np.fft.ifft(role_fft).real), 0) dev_probs[self._function_inputs] = np.roll( dev_probs[self._function_inputs], 1, 1) devs = np.sum(np.sum(dev_probs * self._function_table, -1) * self._action_weights.T, -1) if not jacobian: return devs dev_deriv = self.role_repeat(np.rollaxis( np.fft.ifft(drole_fft).real, 2, 1), 0) dev_deriv[self._function_inputs] = np.roll( dev_deriv[self._function_inputs], 1, 2) dev_values = dev_deriv * self._function_table[:, None] jac = np.sum( np.sum(dev_values.sum(-1)[:, None] * self._dinputs, -1) * self._action_weights.T[:, None], -1) jac -= self.role_repeat(self.role_reduce(jac) / self.num_strategies) return devs, jac
[docs] def get_payoffs(self, profile): """Returns an array of profile payoffs.""" function_inputs = self.role_reduce( profile[:, None] * self._function_inputs, 0) inds = (np.arange(self.num_functions),) + tuple(function_inputs) function_outputs = self._function_table[inds] payoffs = function_outputs.dot(self._action_weights) payoffs[profile == 0] = 0 return payoffs
def _get_payoffs_sum(self, profile): function_inputs = profile.dot(self._function_inputs) function_outputs = self._function_table[np.arange(self.num_functions), function_inputs] payoffs = function_outputs.dot(self._action_weights) payoffs[profile == 0] = 0 return payoffs
[docs] def to_rsgame(self): """Builds an rsgame.Game object that represents the same game.""" profiles = self.all_profiles() payoffs = np.empty(profiles.shape, float) for prof, pay in zip(profiles, payoffs): np.copyto(pay, self.get_payoffs(prof)) return rsgame.game(self.num_players, self.num_strategies, profiles, payoffs)
def __repr__(self): return '{old}, {nfuncs:d})'.format( old=super().__repr__()[:-1], nfuncs=self.num_functions) def __eq__(self, other): return (type(self) is type(other) and np.all(self.num_strategies == other.num_strategies) and np.all(self.num_players == other.num_players) and np.all(self._function_inputs == other._function_inputs) and np.allclose(self._action_weights, other._action_weights) and self._function_table.shape == other._function_table.shape and np.allclose(self._function_table, other._function_table))
[docs]def aggfn(num_players, num_strategies, action_weights, function_inputs, function_table): """Static constructor for AgfnGame Parameters ---------- num_players : ndarray or int num_strategies : ndarray or int action_weights : ndarray, float Each entry specifies the incoming weight in the action graph for the action node (column). Must have shape (num_functions, num_role_strats). function_inputs : ndarray, bool Each entry specifies whether the action node (row) is an input to the function node (col). Must have shape (num_role_strats, num_functions). function_table : ndarray, float Value of arbitrary functions for a number of players activating the function. This can either have shape (num_functions, num_all_players + 1) or (num_functions,) + tuple(num_players + 1). The former treats different roles ass simply different strategy sets, the later treats each nodes inputs as distinct, and so each function maps from the number of inputs from each role. """ return AgfnGame(num_players, num_strategies, action_weights, function_inputs, function_table)
[docs]def aggfn_copy(copy_game, action_weights, function_inputs, function_table): """Static constructor for AgfnGame Parameters ---------- copy_game : BaseGame Copies players and strategies from base game. action_weights : ndarray, float Each entry specifies the incoming weight in the action graph for the action node (column). Must have shape (num_functions, num_role_strats). function_inputs : ndarray, bool Each entry specifies whether the action node (row) is an input to the function node (col). Must have shape (num_role_strats, num_functions). function_table : ndarray, float Value of arbitrary functions for a number of players activating the function. This can either have shape (num_functions, num_all_players + 1) or (num_functions,) + tuple(num_players + 1). The former treats different roles ass simply different strategy sets, the later treats each nodes inputs as distinct, and so each function maps from the number of inputs from each role. """ return aggfn(copy_game.num_players, copy_game.num_strategies, action_weights, function_inputs, function_table)
# TODO Add from_rsgame that works via least squares fit. This isn't exactly # trivial as you likely want sparsity in action_weights and function_inputs. # Doing so probably undoes any nice optimization things.
[docs]def aggfn_funcs(num_players, num_strategies, action_weights, function_inputs, functions): """Static constructor for AgfnGame with functions This is generally less efficient than just constructing the function table using vectorized operations. Parameters ---------- num_players : ndarray or int num_strategies : ndarray or int action_weights : ndarray, float Each entry specifies the incoming weight in the action graph for the action node (column). Must have shape (num_functions, num_role_strats). function_inputs : ndarray, bool Each entry specifies whether the action node (row) is an input to the function node (col). Must have shape (num_role_strats, num_functions). functions : [f(n, ...) -> float] List of functions that either map total player activations or player per role activations to a single value. """ assert functions, "must have at least one function" base = rsgame.basegame(num_players, num_strategies) num_functions = len(functions) num_params = _num_args_safe(functions[0]) assert all(num_params == _num_args_safe(f) for f in functions) if num_params == 1: # sum format function_table = np.empty( (num_functions, base.num_all_players + 1), float) for func, tab in zip(functions, function_table): for p in range(base.num_all_players + 1): tab[p] = func(p) else: # role format assert num_params == base.num_roles function_table = np.empty( (num_functions,) + tuple(base.num_players + 1), float) for func, tab in zip(functions, function_table): for p in itertools.product(*map(range, base.num_players + 1)): tab[p] = func(*p) return aggfn(num_players, num_strategies, action_weights, function_inputs, function_table)
[docs]def aggfn_funcs_copy(copy_game, action_weights, function_inputs, functions): """Static constructor for AgfnGame with functions This is generally less efficient than just constructing the function table using vectorized operations. Parameters ---------- copy_game : BaseGame The base game to take player and strategy counts from. action_weights : ndarray, float Each entry specifies the incoming weight in the action graph for the action node (column). Must have shape (num_functions, num_role_strats). function_inputs : ndarray, bool Each entry specifies whether the action node (row) is an input to the function node (col). Must have shape (num_role_strats, num_functions). functions : [f(n, ...) -> float] List of functions that either map total player activations or player per role activations to a single value. """ return aggfn_funcs(copy_game.num_players, copy_game.num_strategies, action_weights, function_inputs, functions)
[docs]class AgfnGameSerializer(gameio.GameSerializer): """A serializer for agfn games Parameters ---------- role_names : [str] Names of each role. strat_names : [[str]] Names of each strategy for each role. function_names : [str] Names of each function in order. """ def __init__(self, role_names, strat_names, function_names): super().__init__(role_names, strat_names) self.function_names = tuple(function_names) self._function_index = {f: i for i, f in enumerate(self.function_names)} self.num_functions = len(self.function_names)
[docs] def function_index(self, func_name): return self._function_index[func_name]
[docs] def from_agfngame_json(self, game): base = self.from_basegame_json(game) function_inputs = np.zeros( (self.num_role_strats, self.num_functions), bool) for func, roles in game['function_inputs'].items(): fi = self.function_index(func) for role, strats in roles.items(): for strat in strats: function_inputs[self.role_strat_index( role, strat), fi] = True action_weights = np.zeros( (self.num_functions, self.num_role_strats), float) for role, strats in game['action_weights'].items(): for strat, funcs in strats.items(): rsi = self.role_strat_index(role, strat) for func, val in funcs.items(): action_weights[self.function_index(func), rsi] = val function_list = [None] * self.num_functions for func, table in game['function_tables'].items(): function_list[self.function_index(func)] = np.asarray(table, float) return aggfn_copy(base, action_weights, function_inputs, np.asarray(function_list, float))
[docs] def to_agfngame_json(self, game): assert isinstance(game, AgfnGame) res = self.to_basegame_json(game) res['function_names'] = self.function_names finputs = {} for func, finp in zip(self.function_names, game._function_inputs.T): finputs[func] = { role: [s for s, inp in zip(strats, rinp) if inp] for role, strats, rinp in zip(self.role_names, self.strat_names, self.role_split(finp)) if rinp.any()} res['function_inputs'] = finputs act_weights = {} for role, strats, role_acts in zip( self.role_names, self.strat_names, self.role_split(game._action_weights)): if not np.allclose(role_acts, 0): act_weights[role] = { strat: {f: w for f, w in zip(self.function_names, strat_acts) if not np.isclose(w, 0)} for strat, strat_acts in zip(strats, role_acts.T) if not np.allclose(strat_acts, 0)} res['action_weights'] = act_weights res['function_tables'] = dict(zip( self.function_names, (tab.tolist() for tab in game._function_table))) return res
def __repr__(self): return '{}, {})'.format(super().__repr__(), self.function_names) def __eq__(self, other): return (type(self) is type(other) and self.role_names == other.role_names and self.strat_names == other.strat_names and self.function_names == other.function_names)
[docs]def aggfnserializer(role_names, strat_names, function_names): """Static constructor for AgfnGameSerializer Parameters ---------- role_names : [str] strat_names : [[str]] function_names : [str] """ return AgfnGameSerializer(role_names, strat_names, function_names)
[docs]def aggfnserializer_json(json): """Static constructor for AgfnGameSerializer Takes a game that would be loaded from json and determines field names. Parameters ---------- json : json A json format of a base AgfnGame. One standard output is the one output by to_agfngame_json. {strategies: {<role>: [<strat>]}, function_names: [<func>]} """ serial = gameio.gameserializer_json(json) function_names = json['function_names'] return AgfnGameSerializer(serial.role_names, serial.strat_names, function_names)
[docs]def read_agfngame(json): serial = aggfnserializer_json(json) return serial.from_agfngame_json(json), serial
def _num_args(func): return sum(1 for p in inspect.signature(func).parameters.values() if p.default is p.empty) def _num_args_safe(func): try: return _num_args(func) except AttributeError: # pragma: no cover return _num_args(func.__call__)