"""Module for games with potentially sparse payoff data"""
import itertools
from collections import abc
import numpy as np
import numpy.random as rand
import scipy.special as sps
from gameanalysis import rsgame
from gameanalysis import utils
[docs]class Game(rsgame.RsGame):
"""Role-symmetric data game representation
This representation uses a sparse mapping from profiles to payoffs for role
symmetric games. This allows it to capture arbitrary games, as well as
games that are generated from data. Payoffs for specific players in a
profile can be nan to indicate they are missing. The profiles will not be
listed in `num_complete_profiles` or counted as `in` the game, but their
data can be accessed via `get_payoffs`, and they will be used for
calculating deviation payoffs if possible.
Parameters
----------
role_names : (str,)
The name of each role.
strat_names : ((str,),)
The name of each strategy for each role.
num_role_players : ndarray
The number of players per role.
profiles : ndarray, (num_payoffs, num_strats)
The profiles for the game. These must be unique, and all valid for the
game.
payoffs : ndarray, (num_payoffs, num_strats)
The payoffs for the game. This must contain zeros for profile, strategy
pairs that are not played (i.e. zero). All valid payoffs for a profile
can't be nan, the profile should be omitted instead.
"""
def __init__(self, role_names, strat_names, num_role_players, profiles,
payoffs):
super().__init__(role_names, strat_names, num_role_players)
self._profiles = profiles
self._profiles.setflags(write=False)
self._payoffs = payoffs
self._payoffs.setflags(write=False)
self._num_profiles = profiles.shape[0]
# compute log dev reps
player_factorial = np.sum(sps.gammaln(profiles + 1), 1)
totals = (np.sum(sps.gammaln(self.num_role_players + 1)) -
player_factorial)
with np.errstate(divide='ignore'):
self._dev_reps = (
totals[:, None] + np.log(profiles) -
np.log(self.num_role_players).repeat(self.num_role_strats))
self._dev_reps.setflags(write=False)
# Add profile lookup
self._profile_map = dict(zip(map(utils.hash_array, profiles),
payoffs))
if np.isnan(payoffs).any():
self._complete_profiles = frozenset(
prof for prof, pay in self._profile_map.items()
if not np.isnan(pay).any())
else: # Don't need to store duplicate lookup object
self._complete_profiles = self._profile_map
self._num_complete_profiles = len(self._complete_profiles)
@property
def num_profiles(self):
return self._num_profiles
@property
def num_complete_profiles(self):
return self._num_complete_profiles
[docs] def profiles(self):
return self._profiles.view()
[docs] def payoffs(self):
return self._payoffs.view()
[docs] @utils.memoize
def min_strat_payoffs(self):
"""Returns the minimum payoff for each role"""
if not self.num_profiles:
pays = np.full(self.num_strats, np.nan)
else:
pays = np.fmin.reduce(np.where(
self._profiles > 0, self._payoffs, np.nan), 0)
pays.setflags(write=False)
return pays
[docs] @utils.memoize
def max_strat_payoffs(self):
"""Returns the maximum payoff for each role"""
if not self.num_profiles:
pays = np.full(self.num_strats, np.nan)
else:
pays = np.fmax.reduce(np.where(
self._profiles > 0, self._payoffs, np.nan), 0)
pays.setflags(write=False)
return pays
[docs] def get_payoffs(self, profiles):
"""Returns an array of profile payoffs
If profile is not in game, an array of nans is returned where profile
has support."""
profiles = np.asarray(profiles, int)
assert self.is_profile(profiles).all()
prof_view = profiles.reshape((-1, self.num_strats))
payoffs = np.empty(prof_view.shape, float)
for prof, pay in zip(prof_view, payoffs):
hashed = utils.hash_array(prof)
if hashed not in self._profile_map:
pay[prof == 0] = 0
pay[prof > 0] = np.nan
else:
np.copyto(pay, self._profile_map[hashed])
return payoffs.reshape(profiles.shape)
[docs] def deviation_payoffs(self, mix, *, jacobian=False,
ignore_incomplete=False):
"""Computes the expected value of deviating
More specifically, this is the expected payoff of playing each pure
strategy played against all opponents playing mix.
Parameters
----------
mix : ndarray
The mix all other players are using
jacobian : bool
If true, the second returned argument will be the jacobian of the
deviation payoffs with respect to the mixture. The first axis is
the deviating strategy, the second axis is the strategy in the mix
the derivative is taken with respect to. For this to be calculated
correctly, the game must be complete. Thus if the game is not
complete, this will be all nan.
ignore_incomplete : bool, optional
If True, a "best estimate" will be returned for incomplete data.
This means that instead of marking a payoff where all deviations
aren't known as nan, the probability will be renormalized by the
mass that is known, creating a biased estimate based of the data
that is present.
"""
mix = np.asarray(mix, float)
nan_mask = np.empty_like(mix, dtype=bool)
# Fill out mask where we don't have data
if ignore_incomplete or self.is_complete():
nan_mask.fill(False)
elif self.is_empty():
nan_mask.fill(True)
else:
# These calculations are approximate, but for games we can do
# anything with, the size is bounded, and so numeric methods are
# actually exact.
support = mix > 0
strats = np.add.reduceat(support, self.role_starts)
devs = self._profiles[:, ~support]
num_supp = utils.game_size(self.num_role_players, strats).prod()
dev_players = self.num_role_players - \
np.eye(self.num_roles, dtype=int)
role_num_dev = utils.game_size(dev_players, strats).prod(1)
num_dev = role_num_dev.repeat(self.num_role_strats)[~support]
nan_mask[support] = np.all(devs == 0, 1).sum() < num_supp
nan_mask[~support] = devs[devs.sum(1) == 1].sum(0) < num_dev
# Compute values
if not nan_mask.all():
# zero_prob effectively makes 0^0=1 and 0/0=0.
zmix = mix + self.zero_prob.repeat(self.num_role_strats)
log_mix = np.log(zmix)
prof_prob = self._profiles.dot(log_mix)[:, None]
with np.errstate(under='ignore'):
# Ignore underflow caused when profile probability is not
# representable in floating point.
probs = np.exp(prof_prob + self._dev_reps - log_mix)
if ignore_incomplete:
# mask out nans
mask = np.isnan(self._payoffs)
payoffs = np.where(mask, 0, self._payoffs)
probs[mask] = 0
else:
payoffs = self._payoffs
# Mask out nans
zp = self.zero_prob.dot(self.num_role_players)
devs = np.einsum(
'ij,ij->j', probs, np.where(probs > zp, payoffs, 0))
devs[nan_mask] = np.nan
else:
devs = np.full(self.num_strats, np.nan)
if ignore_incomplete:
tprobs = probs.sum(0)
tsupp = tprobs > 0
devs[tsupp] /= tprobs[tsupp]
devs[~tsupp] = np.nan
if not jacobian:
return devs
if ignore_incomplete or self.is_complete():
dev_profs = (self._profiles[:, None] -
np.eye(self.num_strats, dtype=int))
dev_jac = np.einsum(
'ij,ij,ijk->jk', probs, payoffs, dev_profs) / zmix
if ignore_incomplete:
dev_jac -= (np.einsum('ij,ijk->jk', probs, dev_profs) *
devs[:, None] / zmix)
dev_jac[tsupp] /= tprobs[tsupp, None]
dev_jac[~tsupp] = np.nan
else:
dev_jac = np.full((self.num_strats,) * 2, np.nan)
return devs, dev_jac
[docs] def normalize(self):
"""Return a normalized game"""
scale = self.max_role_payoffs() - self.min_role_payoffs()
scale[np.isclose(scale, 0)] = 1
offset = np.repeat(self.min_role_payoffs(), self.num_role_strats)
payoffs = (self._payoffs - offset) / scale.repeat(self.num_role_strats)
payoffs[0 == self._profiles] = 0
return game_replace(self, self._profiles, payoffs)
[docs] def restrict(self, rest):
"""Remove possible strategies from consideration"""
rest = np.asarray(rest, bool)
base = rsgame.emptygame_copy(self).restrict(rest)
prof_mask = ~np.any(self._profiles * ~rest, 1)
profiles = self._profiles[prof_mask][:, rest]
payoffs = self._payoffs[prof_mask][:, rest]
return Game(base.role_names, base.strat_names, base.num_role_players,
profiles, payoffs)
def __contains__(self, profile):
"""Returns true if all data for that profile exists"""
return (utils.hash_array(np.asarray(profile, int))
in self._complete_profiles)
[docs] def profile_from_json(self, prof, dest=None, *, verify=True):
"""Read a profile from json
A profile is an assignment from role-strategy pairs to counts. This
method reads from several formats as specified in parameters.
Parameters
----------
prof : json
A description of a profile in a number of formats. The correct
format will be auto detected and used. The most common are {role:
{strat: count}}, {role: [(strat, count, payoff)]},
{symmetry_groups: [{role: role, strategy: strategy, count:
count}]}.
dest : ndarray, optional
If supplied, ``dest`` will be written to instead of allocating a
new array.
"""
if dest is None:
dest = np.empty(self.num_strats, int)
else:
assert dest.dtype.kind == 'i'
assert dest.shape == (self.num_strats,)
dest.fill(0)
try:
# To parse as format that contains both data types
self.profpay_from_json(prof, dest_prof=dest, verify=False)
except ValueError:
# Only remaining format is straight dictionary
super().profile_from_json(prof, dest=dest, verify=False)
assert not verify or self.is_profile(dest), \
"\"{}\" is not a valid profile".format(prof)
return dest
[docs] def profile_to_assignment(self, prof):
return {
role: list(itertools.chain.from_iterable(
itertools.repeat(strat, val.item()) for strat, val
in zip(strats, counts)))
for counts, role, strats
in zip(np.split(prof, self.role_starts[1:]),
self.role_names, self.strat_names)
if np.any(counts > 0)}
[docs] def payoff_from_json(self, pays, dest=None, *, verify=True):
"""Read a set of payoffs from json
Parameters
----------
pays : json
A description of a set of payoffs in a number of formats
dest : ndarray, optional
If supplied, ``dest`` will be written to instead of allocating a
new array.
"""
if dest is None:
dest = np.empty(self.num_strats, float)
else:
assert dest.dtype.kind == 'f'
assert dest.shape == (self.num_strats,)
dest.fill(0)
try:
# To parse as format that contains both data types
self.profpay_from_json(pays, dest_pays=dest, verify=verify)
except ValueError:
# Only remaining format is straight dictionary
super().payoff_from_json(pays, dest=dest)
return dest
[docs] def profpay_from_json(self, prof, dest_prof=None, dest_pays=None, *,
verify=True):
"""Read json as a profile and a payoff"""
if dest_prof is None:
dest_prof = np.empty(self.num_strats, int)
if dest_pays is None:
dest_pays = np.empty(self.num_strats, float)
dest_prof.fill(0)
dest_pays.fill(0)
# observations but no data
if not prof.get('observations', True):
for symgrp in prof['symmetry_groups']:
_, role, strat, count, __ = _unpack_symgrp(**symgrp)
index = self.role_strat_index(role, strat)
dest_prof[index] = count
dest_pays[index] = np.nan
# summary format
elif 'observations' not in prof and 'symmetry_groups' in prof:
for symgrp in prof['symmetry_groups']:
_, role, strat, count, pay = _unpack_symgrp(**symgrp)
index = self.role_strat_index(role, strat)
dest_prof[index] = count
dest_pays[index] = pay
# observations format
elif ('observations' in prof
and 'symmetry_groups' in prof['observations'][0]):
ids = {}
for symgrp in prof['symmetry_groups']:
i, role, strat, count, _ = _unpack_symgrp(**symgrp)
index = self.role_strat_index(role, strat)
ids[i] = index
dest_prof[index] = count
counts = np.zeros(self.num_strats, int)
for j, obs in enumerate(prof['observations']):
for symgrp in obs['symmetry_groups']:
i, pay = _unpack_obs(**symgrp)
k = ids[i]
counts[k] += 1
dest_pays[k] += (pay - dest_pays[k]) / counts[k]
# full format
elif 'observations' in prof:
ids = {}
for symgrp in prof['symmetry_groups']:
i, role, strat, count, _ = _unpack_symgrp(**symgrp)
index = self.role_strat_index(role, strat)
ids[i] = index
dest_prof[index] = count
counts = np.zeros(self.num_strats, int)
for j, obs in enumerate(prof['observations']):
for player in obs['players']:
i, pay = _unpack_player(**player)
k = ids[i]
counts[k] += 1
dest_pays[k] += (pay - dest_pays[k]) / counts[k]
# observation from simulation
elif 'players' in prof:
for player in prof['players']:
role, strat, pay = _unpack_obs_player(**player)
ind = self.role_strat_index(role, strat)
dest_prof[ind] += 1
dest_pays[ind] += (pay - dest_pays[ind]) / dest_prof[ind]
# dict payoff
elif all(not isinstance(v, abc.Mapping) for v in prof.values()):
for role, strats in prof.items():
for strat, count, pays in strats:
index = self.role_strat_index(role, strat)
dest_prof[index] = count
dest_pays[index] = _mean(pays)
# error
else:
raise ValueError("unknown format")
assert not verify or self.is_profile(dest_prof), \
"\"{}\" does not define a valid profile".format(prof)
return dest_prof, dest_pays
[docs] def profpay_to_json(self, payoffs, prof):
"""Format a profile and payoffs as json"""
return {role: [(strat, int(count), float(pay)) for strat, count, pay
in zip(strats, counts, pays) if count > 0]
for role, strats, counts, pays
in zip(self.role_names, self.strat_names,
np.split(prof, self.role_starts[1:]),
np.split(payoffs, self.role_starts[1:]))}
@utils.memoize
def __hash__(self):
return hash((super().__hash__(), self.num_profiles,
self.num_complete_profiles))
def __eq__(self, other):
return (super().__eq__(other) and
# Identical profiles
self.num_profiles == other.num_profiles and
self.num_complete_profiles == other.num_complete_profiles and
# Identical payoffs
not np.setxor1d(utils.axis_to_elem(self._profiles),
utils.axis_to_elem(other._profiles)).size and
# Identical payoffs
all(np.allclose(pay, other.get_payoffs(prof), equal_nan=True)
for prof, pay in zip(self._profiles, self._payoffs)))
[docs] def to_json(self):
"""Fromat a Game as json"""
res = super().to_json()
res['profiles'] = [self.profpay_to_json(pay, prof) for prof, pay
in zip(self._profiles, self._payoffs)]
res['type'] = 'game.1'
return res
def __repr__(self):
return '{old}, {data:d} / {total:d})'.format(
old=super().__repr__()[:-1],
data=self.num_profiles,
total=self.num_all_profiles)
def __str__(self):
"""Fromat basegame as a printable string"""
return '{}\npayoff data for {:d} out of {:d} profiles'.format(
super().__str__(), self.num_profiles, self.num_all_profiles)
[docs]def game(num_role_players, num_role_strats, profiles, payoffs):
"""Create a game with default names
Parameters
----------
num_role_players : ndarray-like, int,
The number of players per role.
num_role_strats : ndarray-like, int,
The number of strategies per role.
profiles : ndarray-like, int
The profiles for the game, with shape (num_profiles, num_strats).
payoffs : ndarray-like, float
The payoffs for the game, with shape (num_profiles, num_strats).
"""
return game_replace(rsgame.emptygame(num_role_players, num_role_strats),
profiles, payoffs)
[docs]def game_names(role_names, num_role_players, strat_names, profiles, payoffs):
"""Create a game with specified names
Parameters
----------
role_names : [str]
The name for each role.
num_role_players : ndarray-like, int,
The number of players per role.
strat_names : [[str]]
The name for each strategy per role.
profiles : ndarray-like, int
The profiles for the game, with shape (num_profiles, num_strats).
payoffs : ndarray-like, float
The payoffs for the game, with shape (num_profiles, num_strats).
"""
return game_replace(
rsgame.emptygame_names(role_names, num_role_players, strat_names),
profiles, payoffs)
[docs]def game_json(json):
"""Read a Game from json
This takes a game in any valid payoff format (i.e. output by this or by
EGTA Online), and converts it into a Game. If several payoff exist, the
mean is taken. This means that loading a game using this method, and
loading it as a sample game produce different results, as the sample game
will truncate extra payoffs for an individual profile, while this will take
the minimum. Note, that there is no legitimate way to get a game with that
structure, but it is possible to write the json.
"""
base = game_copy(rsgame.emptygame_json(json))
profiles = json.get('profiles', ())
if not profiles:
return base
num_profs = len(profiles)
profs = np.empty((num_profs, base.num_strats), int)
pays = np.empty((num_profs, base.num_strats), float)
for profj, prof, pay in zip(profiles, profs, pays):
base.profpay_from_json(profj, prof, pay)
return game_replace(base, profs, pays)
[docs]def game_copy(copy_game):
"""Copy structure and payoffs from an existing game
Parameters
----------
copy_game : RsGame
Game to copy data from. This will create a copy with the games profiles
and payoffs.
"""
return Game(copy_game.role_names, copy_game.strat_names,
copy_game.num_role_players, copy_game.profiles(),
copy_game.payoffs())
[docs]def game_replace(copy_game, profiles, payoffs):
"""Copy structure from an existing game with new data
Parameters
----------
copy_game : Game
Game to copy structure out of. Structure includes role names, strategy
names, and the number of players.
profiles : ndarray-like, int
The profiles for the game, with shape (num_profiles, num_strats).
payoffs : ndarray-like, float
The payoffs for the game, with shape (num_profiles, num_strats).
"""
profiles = np.asarray(profiles, int)
payoffs = np.asarray(payoffs, float)
assert profiles.shape == payoffs.shape, \
"profiles and payoffs must be the same shape {} {}".format(
profiles.shape, payoffs.shape)
assert profiles.shape[1:] == (copy_game.num_strats,), \
"profiles must have proper end shape : expected {} but was {}" \
.format((copy_game.num_strats,), profiles.shape[1:])
assert np.all(profiles >= 0), "profiles was negative"
assert np.all(
np.add.reduceat(profiles, copy_game.role_starts, 1) ==
copy_game.num_role_players), \
"not all profiles equaled player total"
assert not np.any((payoffs != 0) & (profiles == 0)), \
"there were nonzero payoffs for strategies without players"
assert not np.all(np.isnan(payoffs) | (profiles == 0), 1).any(), \
"a profile can't have entirely nan payoffs"
assert profiles.shape[0] == np.unique(utils.axis_to_elem(profiles)).size, \
"there can't be any duplicate profiles"
return Game(copy_game.role_names, copy_game.strat_names,
copy_game.num_role_players, profiles, payoffs)
[docs]class SampleGame(Game):
"""A Role Symmetric Game that has multiple samples per profile
This behaves the same as a normal Game object, except that it has methods
for accessing several payoffs per profile. It also has a `resample` method
which returns a Game with bootstrapped payoffs instead of mean payoffs,
allowing for easy bootstrapping.
Parameters
----------
role_names : (str,)
The name of each role.
strat_names : ((str,),)
The name of each strategy for each role.
num_role_players : ndarray, int
The number of players per role.
profiles : ndarray
The profiles for the game.
sample_payoffs : (ndarray,)
The sample payoffs for the game. Each element of the tuple is a set of
payoff samples grouped by number of samples and parallel with profiles.
The dimension of each element should be (num_payoffs, num_samples,
num_strats), where num_payoffs is the number of samples for that number
of observations. The number of samples for each element of the tuple
must be distinct, and an element with zero samples is disallowed, it
should be omitted instead. All requirements for valid payoffs also
apply.
"""
def __init__(self, role_names, strat_names, num_role_players, profiles,
sample_payoffs):
super().__init__(
role_names, strat_names, num_role_players, profiles,
np.concatenate([s.mean(1) for s in sample_payoffs])
if sample_payoffs else np.empty((0, profiles.shape[1])))
self._sample_payoffs = sample_payoffs
for spay in self._sample_payoffs:
spay.setflags(write=False)
self.num_sample_profs = np.fromiter( # pragma: no branch
(x.shape[0] for x in sample_payoffs),
int, len(sample_payoffs))
self.num_sample_profs.setflags(write=False)
self.sample_starts = np.insert(
self.num_sample_profs[:-1].cumsum(), 0, 0)
self.sample_starts.setflags(write=False)
self.num_samples = np.fromiter( # pragma: no branch
(v.shape[1] for v in sample_payoffs),
int, len(sample_payoffs))
self.num_samples.setflags(write=False)
self._sample_profile_map = None
[docs] @utils.memoize
def min_strat_payoffs(self):
"""Returns the minimum payoff for each role"""
mins = np.full(self.num_strats, np.nan)
for profs, spays in zip(
np.split(self._profiles, self.sample_starts[1:]),
self._sample_payoffs):
sample_mins = np.fmin.reduce(
np.where(profs[:, None] > 0, spays, np.nan), (0, 1))
np.fmin(mins, sample_mins, mins)
mins.setflags(write=False)
return mins
[docs] @utils.memoize
def max_strat_payoffs(self):
"""Returns the maximum payoff for each role"""
maxs = np.full(self.num_strats, np.nan)
for profs, spays in zip(
np.split(self._profiles, self.sample_starts[1:]),
self._sample_payoffs):
sample_maxs = np.fmax.reduce(
np.where(profs[:, None] > 0, spays, np.nan), (0, 1))
np.fmax(maxs, sample_maxs, maxs)
maxs.setflags(write=False)
return maxs
[docs] def sample_payoffs(self):
"""Get the underlying sample payoffs"""
return self._sample_payoffs
[docs] def resample(self, num_resamples=None, *, independent_profile=False,
independent_role=False, independent_strategy=False):
"""Fetch a game with bootstrap sampled payoffs
Arguments
---------
num_resamples : int
The number of resamples to take for each realized payoff. By
default this is equal to the number of observations for that
profile, yielding proper bootstrap sampling.
independent_profile : bool
If true, sample each profile independently. In general, only
profiles with a different number of observations will be resampled
independently.
independent_role : bool
If true, sample each role independently. Within a profile, the
payoffs for each role will be drawn independently.
independent_strategy : bool
IF true, sample each strategy independently. Within a profile, the
payoffs for each strategy will be drawn independently. This
supersceeds `independent_role`.
Notes
-----
Each of the `independent_` arguments will increase the time to do a
resample, but may produce better results as it will remove correlations
between payoffs.
"""
dim2 = (self.num_strats if independent_strategy
else self.num_roles if independent_role
else 1)
payoffs = np.empty_like(self._payoffs)
for obs, pays in zip(self._sample_payoffs,
np.split(payoffs, self.sample_starts[1:])):
obs = np.rollaxis(obs, 1, 3)
num_samples = obs.shape[2]
num_obs_resamples = (num_samples if num_resamples is None
else num_resamples)
dim1 = obs.shape[0] if independent_profile else 1
sample = rand.multinomial(num_obs_resamples,
np.ones(num_samples) / num_samples,
(dim1, dim2))
if independent_role and not independent_strategy:
sample = sample.repeat(self.num_role_strats, 1)
np.copyto(pays, np.mean(obs * sample, 2) *
(num_samples / num_obs_resamples))
return Game(self.role_names, self.strat_names, self.num_role_players,
self._profiles, payoffs)
[docs] def get_sample_payoffs(self, profile):
"""Get sample payoffs associated with a profile
This returns an array of shape (num_observations, num_role_strats). If
profile has no data, num_observations will be 0."""
if self._sample_profile_map is None:
self._sample_profile_map = dict(zip(
map(utils.hash_array, self._profiles),
itertools.chain.from_iterable(self._sample_payoffs)))
profile = np.asarray(profile, int)
assert self.is_profile(profile)
hashed = utils.hash_array(profile)
if hashed not in self._sample_profile_map:
return np.empty((0, self.num_strats), float)
else:
return self._sample_profile_map[hashed]
[docs] def flat_profiles(self):
"""Profiles in parallel with flat_payoffs"""
return self._profiles.repeat(
self.num_samples.repeat(self.num_sample_profs), 0)
[docs] def flat_payoffs(self):
"""All sample payoffs linearly concatenated together"""
return np.concatenate([
pay.reshape((-1, self.num_strats))
for pay in self._sample_payoffs])
[docs] def normalize(self):
"""Return a normalized SampleGame"""
scale = self.max_role_payoffs() - self.min_role_payoffs()
scale[np.isclose(scale, 0)] = 1
scale = scale.repeat(self.num_role_strats)
offset = self.min_role_payoffs().repeat(self.num_role_strats)
spayoffs = tuple((pays - offset) / scale
for pays in self._sample_payoffs)
for profs, spays in zip(
np.split(self._profiles, self.sample_starts[1:]), spayoffs):
spays *= 0 < profs[:, None]
return samplegame_replace(self, self._profiles, spayoffs)
[docs] def restrict(self, rest):
"""Remove possible strategies from consideration"""
rest = np.asarray(rest, bool)
base = rsgame.emptygame_copy(self).restrict(rest)
prof_mask = ~np.any(self._profiles * ~rest, 1)
profiles = self._profiles[prof_mask][:, rest]
sample_payoffs = tuple(
pays[pmask][..., rest] for pays, pmask
in zip(self._sample_payoffs,
np.split(prof_mask, self.sample_starts[1:]))
if pmask.any())
return SampleGame(base.role_names, base.strat_names,
base.num_role_players, profiles, sample_payoffs)
[docs] def samplepay_from_json(self, prof, dest=None):
"""Read a set of payoff samples
Parameters
----------
prof : json
A description of a set of profiles and their payoffs. There are
several formats that are acceptable, they're all output by egta.
dest : ndarray, options
If supplied, ``dest`` will be written to instead of allocting a new
array. This may be hard to use as you need to know how many
observations are in the json.
"""
try:
# samplepay format with profile too
_, dest = self.profsamplepay_from_json(prof, dest_samplepay=dest)
except ValueError:
# Must be {role: {strat: [pay]}}
num = max(max(len(p) if isinstance(p, abc.Iterable) else 1
for p in pays.values())
for pays in prof.values())
if dest is None:
dest = np.empty((num, self.num_strats), float)
else:
assert dest.dtype.kind == 'f'
assert dest.shape == (num, self.num_strats), \
"dest_samplepay not large enough for observations"
dest.fill(0)
for role, strats in prof.items():
for strat, pay in strats.items():
dest[:, self.role_strat_index(role, strat)] = pay
return dest
[docs] def samplepay_to_json(self, samplepay):
"""Format sample payoffs as json"""
# In a really weird degenerate case, if all payoffs are 0, we'll write
# out an empty dictionary, which loses information about the number of
# samples. In that case we arbitrarily write out the first strategy
# with zero payoffs.
samplepay = np.asarray(samplepay, float)
if np.all(samplepay == 0):
return {self.role_names[0]: {
self.strat_names[0][0]: [0] * samplepay.shape[0]}}
else:
return {role: {strat: pay.tolist() for strat, pay
in zip(strats, pays)
if np.any(pay != 0)}
for role, strats, pays
in zip(self.role_names, self.strat_names,
np.split(samplepay.T, self.role_starts[1:]))
if np.any(pays != 0)}
[docs] def profsamplepay_from_json(self, prof, dest_prof=None,
dest_samplepay=None):
"""Convert json into a profile and an observation"""
if dest_prof is None:
dest_prof = np.empty(self.num_strats, int)
dest_prof.fill(0)
def get_pay(num):
dest = dest_samplepay
if dest is None:
dest = np.empty((num, self.num_strats), float)
else:
assert dest.shape[0] >= num, \
"dest_samplepay not large enough for observations"
dest.fill(0)
return dest
# summary format
if 'observations' not in prof and 'symmetry_groups' in prof:
dest = get_pay(1)
for symgrp in prof['symmetry_groups']:
_, role, strat, count, pay = _unpack_symgrp(**symgrp)
index = self.role_strat_index(role, strat)
dest_prof[index] = count
dest[0, index] = pay
# observations format
elif ('observations' in prof
and 'symmetry_groups' in prof['observations'][0]):
dest = get_pay(len(prof['observations']))
ids = {}
for symgrp in prof['symmetry_groups']:
i, role, strat, count, _ = _unpack_symgrp(**symgrp)
index = self.role_strat_index(role, strat)
ids[i] = index
dest_prof[index] = count
for j, obs in enumerate(prof['observations']):
for symgrp in obs['symmetry_groups']:
i, pay = _unpack_obs(**symgrp)
dest[j, ids[i]] = pay
# full format
elif 'observations' in prof:
dest = get_pay(len(prof['observations']))
ids = {}
for symgrp in prof['symmetry_groups']:
i, role, strat, count, _ = _unpack_symgrp(**symgrp)
index = self.role_strat_index(role, strat)
ids[i] = index
dest_prof[index] = count
counts = np.empty(self.num_strats, int)
for j, obs in enumerate(prof['observations']):
counts.fill(0)
for player in obs['players']:
i, pay = _unpack_player(**player)
k = ids[i]
counts[k] += 1
dest[j, k] += (pay - dest[j, k]) / counts[k]
assert np.all(counts == dest_prof), \
"full format didn't have payoffs for the correct number of players" # noqa
# profile payoff
elif all(not isinstance(v, abc.Mapping) for v in prof.values()):
num = max(max(len(p) if isinstance(p, abc.Iterable) else 1
for _, __, p in sg)
for sg in prof.values())
dest = get_pay(num)
for role, strats in prof.items():
for strat, count, pays in strats:
index = self.role_strat_index(role, strat)
dest_prof[index] = count
dest[:, index] = pays
# unrecognized
else:
raise ValueError("unrecognized format")
return dest_prof, dest
[docs] def profsamplepay_to_json(self, samplepay, prof):
"""Convery profile and observations to prof obs output"""
return {role: [(strat, int(count), list(map(float, pay)))
for strat, count, pay
in zip(strats, counts, pays.T) if count > 0]
for role, strats, counts, pays
in zip(self.role_names, self.strat_names,
np.split(prof, self.role_starts[1:]),
np.split(samplepay, self.role_starts[1:], 1))}
@utils.memoize
def __hash__(self):
return hash((super().__hash__(), tuple(sorted(self.num_samples))))
def __eq__(self, other):
return (
super().__eq__(other) and
# Identical sample payoffs
all(_sample_payoffs_equal(pay, other.get_sample_payoffs(prof))
for prof, pay in zip(
self._profiles,
itertools.chain.from_iterable(self._sample_payoffs))))
[docs] def to_json(self):
"""Fromat a SampleGame as json"""
res = super().to_json()
res['profiles'] = [
self.profsamplepay_to_json(pay, prof) for prof, pay
in zip(self._profiles,
itertools.chain.from_iterable(self._sample_payoffs))]
res['type'] = 'samplegame.1'
return res
def __repr__(self):
samples = self.num_samples
if samples.size == 0:
sample_str = '0'
elif samples.size == 1:
sample_str = str(samples[0])
else:
sample_str = '{:d} - {:d}'.format(samples.min(), samples.max())
return '{}, {})'.format(super().__repr__()[:-1], sample_str)
def __str__(self):
samples = self.num_sample_profs.dot(self.num_samples)
if self.num_samples.size == 0:
sampstr = 'no observations'
elif self.num_samples.size == 1:
samps = self.num_samples[0]
sampstr = '{:d} observation{} per profile'.format(
samps, '' if samps == 1 else 's')
else:
sampstr = '{:d} to {:d} observations per profile'.format(
self.num_samples.min(), self.num_samples.max())
return '{}\n{} payoff sample{}\n{}'.format(
super().__str__(), 'no' if samples == 0 else samples,
'' if samples == 1 else 's', sampstr)
def _sample_payoffs_equal(p1, p2):
"""Returns true if two sample payoffs are almost equal"""
# Pathological payoffs will make this fail, e.g. small perturbations to
# almost equal payoffs that invert their order, but we're testing for
# equality, so that's not really an issue, as strict permutations will
# still be valid.
return (p1.shape[0] == p2.shape[0] and
np.allclose(p1[np.lexsort(p1.T)], p2[np.lexsort(p2.T)],
equal_nan=True))
[docs]def samplegame(num_role_players, num_role_strats, profiles,
sample_payoffs):
"""Create a SampleGame with default names
Parameters
----------
num_role_players : ndarray-like, int
The number of players per role.
num_role_strats : ndarray-like, int
The number of strategies per role.
profiles : ndarray-like, int
The profiles for the game, with shape (num_profiles, num_strats).
sample_payoffs : [ndarray-like, float]
The sample payoffs for the game.
"""
return samplegame_replace(
rsgame.emptygame(num_role_players, num_role_strats),
profiles, sample_payoffs)
[docs]def samplegame_flat(num_role_players, num_role_strats, profiles, payoffs):
"""Create a SampleGame with default names and flat profiles
Parameters
----------
num_role_players : ndarray-like, int
The number of players per role.
num_role_strats : ndarray-like, int
The number of strategies per role.
profiles : ndarray-like, int
The profiles for the game, potentially with duplicates, with shape
(num_sample_profiles, num_strats).
payoffs : ndarray-like, float
The sample payoffs for the game, in parallel with the profiles they're
samples from, with shape (num_sample_profiles, num_strats).
"""
return samplegame_replace_flat(
rsgame.emptygame(num_role_players, num_role_strats), profiles, payoffs)
[docs]def samplegame_names(role_names, num_role_players, strat_names, profiles,
sample_payoffs):
"""Create a SampleGame with specified names
Parameters
----------
role_names : [str]
The name of each role.
num_role_players : ndarray
The number of players for each role.
strat_names : [[str]]
The name of each strategy.
profiles : ndarray
The profiles for the game.
sample_payoffs : [ndarray]
The sample payoffs for the game."""
return samplegame_replace(
rsgame.emptygame_names(role_names, num_role_players, strat_names),
profiles, sample_payoffs)
[docs]def samplegame_names_flat(role_names, num_role_players, strat_names, profiles,
payoffs):
"""Create a SampleGame with specified names and flat payoffs
Parameters
----------
role_names : [str]
The name of each role.
num_role_players : ndarray
The number of players for each role.
strat_names : [[str]]
The name of each strategy.
profiles : ndarray-like, int
The profiles for the game, potentially with duplicates,
(num_sample_profiles, num_strats).
payoffs : ndarray-like, float
The sample payoffs for the game, in parallel with the profiles they're
samples from, (num_sample_profiles, num_strats).
"""
return samplegame_replace_flat(
rsgame.emptygame_names(role_names, num_role_players, strat_names),
profiles, payoffs)
[docs]def samplegame_json(json):
"""Read a SampleGame from json
This will read any valid payoff game as a sample game. Invalid games will
produce an empty sample game."""
base = samplegame_copy(rsgame.emptygame_json(json))
profiles = json.get('profiles', ())
if not profiles:
return base
sample_map = {}
for profile in profiles:
prof, spay = base.profsamplepay_from_json(profile)
num_samps = spay.shape[0]
profls, payls = sample_map.setdefault(num_samps, ([], []))
profls.append(prof[None])
payls.append(spay[None])
values = [v for _, v in sorted(sample_map.items())]
profiles = np.concatenate(list(itertools.chain.from_iterable(
prof for prof, _ in values)))
sample_payoffs = tuple(np.concatenate(spay) for _, spay in values)
return samplegame_replace(base, profiles, sample_payoffs)
[docs]def samplegame_copy(copy_game):
"""Copy a SampleGame from another game
If game defined sample_payoffs, this will be created with those, otherwise
it will create a game with one sample per payoff.
Parameters
----------
copy_game : RsGame
Game to copy data from.
"""
if hasattr(copy_game, 'sample_payoffs'):
sample_payoffs = copy_game.sample_payoffs()
elif not copy_game.is_empty():
sample_payoffs = (copy_game.payoffs()[:, None],)
else:
sample_payoffs = ()
return SampleGame(copy_game.role_names, copy_game.strat_names,
copy_game.num_role_players, copy_game.profiles(),
sample_payoffs)
[docs]def samplegame_replace_flat(copy_game, profiles, payoffs):
"""Replace sample payoff data for an existing game
Parameters
----------
copy_game : BaseGame, optional
Game to copy information out of.
profiles : ndarray-like, int
The profiles for the game, potentially with duplicates, with shape
(num_sample_profiles, num_strats).
payoffs : ndarray-like, float
The sample payoffs for the game, in parallel with the profiles they're
samples from, with shape (num_sample_profiles, num_strats).
"""
profiles = np.asarray(profiles, int)
payoffs = np.asarray(payoffs, float)
_, ind, inv, counts = utils.unique_axis(
profiles, return_index=True, return_inverse=True, return_counts=True)
countso = counts.argsort()
countsoi = np.empty(counts.size, int)
countsoi[countso] = np.arange(counts.size)
cinv = countsoi[inv]
cinvo = cinv.argsort()
cinvs = cinv[cinvo]
payo = (np.insert(np.cumsum(1 - np.diff(cinvs)), 0, 0) + cinvs)[cinvo]
num_samps, ccounts = np.unique(counts[countso], return_counts=True)
splits = (num_samps * ccounts)[:-1].cumsum()
profs = profiles[ind[countso]]
pays = [pay.reshape((n, c, -1)) for pay, n, c
in zip(np.split(payoffs[payo], splits), ccounts, num_samps)]
return samplegame_replace(copy_game, profs, pays)
[docs]def samplegame_replace(copy_game, profiles, sample_payoffs):
"""Replace sample payoff data for an existing game
Parameters
----------
copy_game : BaseGame, optional
Game to copy information out of.
profiles : ndarray-like, int
The profiles for the game, with shape (num_profiles, num_strats).
sample_payoffs : [ndarray-like, float]
The sample payoffs for the game.
"""
profiles = np.asarray(profiles, int)
sample_payoffs = tuple(np.asarray(sp) for sp in sample_payoffs)
assert profiles.shape[1:] == (copy_game.num_strats,), \
"profiles must have proper end shape : expected {} but was {}" \
.format((copy_game.num_strats,), profiles.shape[1:])
assert np.all(profiles >= 0), "profiles were negative"
assert np.all(
np.add.reduceat(profiles, copy_game.role_starts, 1) ==
copy_game.num_role_players), \
"not all profiles equaled player total"
assert profiles.shape[0] == np.unique(utils.axis_to_elem(profiles)).size, \
"there can't be any duplicate profiles"
assert profiles.shape[0] == sum(sp.shape[0] for sp in sample_payoffs), \
"profiles and sample_payoffs must have the same number of 'profiles'"
assert all(sp.shape[2] == copy_game.num_strats for sp in sample_payoffs), \
"all sample payoffs must have the appropriate number of strategies"
assert not any(pays.size == 0 for pays in sample_payoffs), \
"sample_payoffs can't be empty"
assert len({s.shape[1] for s in sample_payoffs}) == len(sample_payoffs), \
"Each set of observations must have a unique number or be merged"
for profs, spays in zip(np.split(profiles, list(itertools.accumulate(
sp.shape[0] for sp in sample_payoffs[:-1]))), sample_payoffs):
assert not np.any((spays != 0) & (profs == 0)[:, None]), \
"some sample payoffs were nonzero for invalid payoffs"
assert not np.all(np.isnan(spays) | (profs == 0)[:, None], 2).any(), \
"an observation can't have entirely nan payoffs"
assert np.all(np.isnan(spays).all(1) | ~np.isnan(spays).any()), \
"for a given strategy, all payoffs must be nan or non"
return SampleGame(copy_game.role_names, copy_game.strat_names,
copy_game.num_role_players, profiles, sample_payoffs)
# ---------
# Utilities
# ---------
def _mean(vals):
if isinstance(vals, abc.Iterable):
count = 0
mean = 0
for v in vals:
count += 1
mean += (v - mean) / count
return mean if count > 0 else float('nan')
else:
return vals
def _unpack_symgrp(role, strategy, count, payoff=None, id=None, **_):
return id, role, strategy, count, payoff
def _unpack_obs(id, payoff, **_):
return id, payoff
def _unpack_player(sid, p, **_):
return sid, p
def _unpack_obs_player(role, strategy, payoff, **_):
return role, strategy, payoff