Source code for gameanalysis.utils

import functools
import inspect
import itertools
import math
import operator
import random
import string
import warnings
from collections import abc

import numpy as np
import scipy.special as sps


_TINY = np.finfo(float).tiny
# This is the maximum integer that can be exactly represented as a float
_MAX_INT_FLOAT = 2 ** (np.finfo(float).nmant - 1)
_SIMPLEX_BIG = 1 / np.finfo(float).resolution
# XXX A lot of these are candidates for cython


[docs]def prod(collection): """Product of all elements in the collection""" return functools.reduce(operator.mul, collection)
[docs]def comb(n, k): """Return n choose k This function works on arrays, and will properly return a python integer object if the number is too large to be stored in a 64 bit integer. """ res = np.rint(sps.comb(n, k, False)) if np.all(res < _MAX_INT_FLOAT): return res.astype(int) elif isinstance(n, abc.Iterable) or isinstance(k, abc.Iterable): broad = np.broadcast(np.asarray(n), np.asarray(k)) res = np.empty(broad.shape, dtype=object) res.flat = [sps.comb(n_, k_, True) for n_, k_ in broad] return res else: return sps.comb(n, k, True)
[docs]def comb_inv(cmb, k): """Return the inverse of `comb` Given a number of combinations, and the size of subset we're choosing, compute the integer lower bound, i.e. return n* such that `comb(n*, k) <= cmb < comb(n* + 1, k)`. """ n = np.empty(np.broadcast(cmb, k).shape, int) na = n.view() na.shape = (n.size,) cmba = np.broadcast_to(cmb, n.size) ka = np.broadcast_to(k, n.size) step = ka.copy() mask = step > 0 na[~mask] = 0 na[mask] = np.ceil((ka[mask] / np.e * cmba[mask] ** (1 / ka[mask])).astype(float)) # If we didn't approximate the lower bound, then there are at most k values # to check. This does a poor mans binary search with some wasted effort, # however for small k, it's negligible, and we see performance improvements # over linear search in general. while np.any(mask): valid = comb(na[mask] + step[mask], ka[mask]) <= cmba[mask] inc = mask.copy() inc[mask] = valid na[inc] += step[inc] red = mask.copy() red[mask] = ~valid step[red] //= 2 mask = step > 0 if n.ndim == 0: return n.item() else: return n
[docs]def game_size(players, strategies): """Number of profiles in a symmetric game with players and strategies""" return comb(np.asarray(players) + strategies - 1, players)
[docs]def game_size_inv(size, players): """Inverse of game_size Given a game size and a number of players, return a lower bound on the number of strategies s* such that game_size(players, s*) <= size < game_size(players, s* + 1)`. """ return comb_inv(size, players) - players + 1
[docs]def only(iterable): """Return the only element of an iterable Throws a value error if the iterable doesn't contain only one element """ try: it = iter(iterable) value = next(it) try: next(it) except StopIteration: return value raise ValueError('Iterable had more than one element') except TypeError: raise ValueError('Input was not iterable') except StopIteration: raise ValueError('Input was empty')
[docs]def repeat(iterable, reps): """Repeat each element of iterable reps times""" return itertools.chain.from_iterable( itertools.repeat(e, r) for e, r in zip(iterable, reps))
[docs]def one_line(string, line_width=80): """If string s is longer than line width, cut it off and append "..." """ string = string.replace('\n', ' ') if len(string) > line_width: return "{}...{}".format(string[:3 * line_width // 4], string[-line_width // 4 + 3:]) return string
[docs]def acomb(n, k, repetition=False): """Compute an array of all n choose k options The result will be an array shape (m, n) where m is n choose k optionally with repetitions.""" if repetition: return _acombr(n, k) else: return _acomb(n, k)
def _acombr(n, k): """Combinations with repetitions""" # This uses dynamic programming to compute everything num = sps.comb(n, k, repetition=True, exact=True) grid = np.zeros((num, n), dtype=int) memoized = {} # This recursion breaks if asking for numbers that are too large (stack # overflow), but the order to fill n and k is predictable, it may be better # to to use a for loop. def fill_region(n, k, region): if n == 1: region[0, 0] = k return elif k == 0: region.fill(0) return if (n, k) in memoized: np.copyto(region, memoized[n, k]) return memoized[n, k] = region o = 0 for ki in range(k, -1, -1): n_ = n - 1 k_ = k - ki m = sps.comb(n_, k_, repetition=True, exact=True) region[o:o + m, 0] = ki fill_region(n_, k_, region[o:o + m, 1:]) o += m fill_region(n, k, grid) return grid def _acomb(n, k): """Combinations""" if k == 0: return np.zeros((1, n), bool) # This uses dynamic programming to compute everything num = sps.comb(n, k, exact=True) grid = np.empty((num, n), dtype=bool) memoized = {} # This recursion breaks if asking for numbers that are too large (stack # overflow), but the order to fill n and k is predictable, it may be better # to to use a for loop. def fill_region(n, k, region): if n <= k: region.fill(True) return elif k == 1: region.fill(False) np.fill_diagonal(region, True) return if (n, k) in memoized: np.copyto(region, memoized[n, k]) return memoized[n, k] = region trues = sps.comb(n - 1, k - 1, exact=True) region[:trues, 0] = True fill_region(n - 1, k - 1, region[:trues, 1:]) region[trues:, 0] = False fill_region(n - 1, k, region[trues:, 1:]) fill_region(n, k, grid) return grid
[docs]def acartesian2(*arrays): """Array cartesian product in 2d Produces a new ndarray that has the cartesian product of every row in the input arrays. The number of columns is the sum of the number of columns in each input. The number of rows is the product of the number of rows in each input. Arguments --------- *arrays : [ndarray (xi, s)] """ rows = prod(a.shape[0] for a in arrays) columns = sum(a.shape[1] for a in arrays) dtype = arrays[0].dtype # should always have at least one role assert all(a.dtype == dtype for a in arrays), \ "all arrays must have the same dtype" result = np.zeros((rows, columns), dtype) pre_row = 1 post_row = rows pre_column = 0 for array in arrays: length, width = array.shape post_row //= length post_column = pre_column + width view = result[:, pre_column:post_column] view.shape = (pre_row, -1, post_row, width) np.copyto(view, array[:, None]) pre_row *= length pre_column = post_column return result
[docs]def simplex_project(array): """Return the projection onto the simplex""" array = np.asarray(array, float) assert not np.isnan(array).any(), \ "can't project nan onto simplex: {}".format(array) # This fails for really large values, so we normalize the array so the # largest element has absolute value at most _SIMPLEX_BIG array = np.clip(array, -_SIMPLEX_BIG, _SIMPLEX_BIG) size = array.shape[-1] sort = -np.sort(-array, -1) rho = (1 - sort.cumsum(-1)) / np.arange(1, size + 1) inds = size - 1 - np.argmax((rho + sort > 0)[..., ::-1], -1) rho.shape = (-1, size) lam = rho[np.arange(rho.shape[0]), inds.flat] lam.shape = array.shape[:-1] + (1,) return np.maximum(array + lam, 0)
[docs]def multinomial_mode(p, n): """Compute the mode of n samples from multinomial distribution p. Notes ----- Algorithm from [3]_, notation follows [4]_. .. [3] Finucan 1964. The mode of a multinomial distribution. .. [4] Gall 2003. Determination of the modes of a Multinomial distribution. """ p = np.asarray(p, float) mask = p > 0 result = np.zeros(p.size, int) p = p[mask] f = p * (n + p.size / 2) k = f.astype(int) f -= k n0 = k.sum() if n0 < n: q = (1 - f) / (k + 1) for _ in range(n0, n): a = q.argmin() k[a] += 1 f[a] -= 1 q[a] = (1 - f[a]) / (k[a] + 1) elif n0 > n: with np.errstate(divide='ignore'): q = f / k for _ in range(n, n0): a = q.argmin() k[a] -= 1 f[a] += 1 q[a] = f[a] / k[a] result[mask] = k return result
[docs]def geometric_histogram(n, p): """Return the histogram of n draws from a geometric distribution This function computes values from the same distribution as `np.bincount(np.random.geometric(p, n) - 1)` but does so more efficiently. """ assert n > 0, "must take at least one sample" assert 0 < p <= 1, "must use a valid probability in (0, 1]" results = [] # This is a rough upper bound on the expectation of the extreme value of n # geometrics with probability p inc = math.ceil((np.log(n) + 1) * (1 / p - .5)) + 1 while n > 0: res = np.random.multinomial(n, p * (1 - p) ** np.arange(inc)) results.append(res[:-1]) n = res[-1] # Remove trailing zeros last = results.pop() results.append(last[:np.flatnonzero(last)[-1] + 1]) return np.concatenate(results)
[docs]def axis_to_elem(array, axis=-1): """Converts an axis of an array into a unique element In general, this returns a copy of the array, unless the data is contiguous. This usually requires that the last axis is the one being merged. Parameters ---------- array : ndarray The array to convert an axis to a view. axis : int, optional The axis to convert into a single element. Defaults to the last axis. """ array = np.asarray(array) # ascontiguousarray will make a copy of necessary axis_at_end = np.ascontiguousarray(np.rollaxis(array, axis, array.ndim)) new_shape = axis_at_end.shape elems = axis_at_end.view(np.dtype((np.void, array.itemsize * new_shape[-1]))) elems.shape = new_shape[:-1] return elems
[docs]def elem_to_axis(array, dtype, axis=-1): """Converts and array of axis elements back to an axis""" dtype = np.dtype(dtype) new_shape = array.shape + (array.itemsize // dtype.itemsize,) return np.rollaxis(array.view(dtype).reshape(new_shape), -1, axis)
[docs]def unique_axis(array, axis=-1, **kwargs): """Find unique axis elements Parameters ---------- array : ndarray The array to find unique axis elements of axis : int, optional The axis to find unique elements of. Defaults to the last axis. **kwargs : flags The flags to pass to numpys unique function Returns ------- uniques : ndarray The unique axes as rows of a two dimensional array. *args : Any other results of the unique functions due to flags """ elems = axis_to_elem(array, axis) results = np.unique(elems, **kwargs) if isinstance(results, tuple): return ((elem_to_axis(results[0], array.dtype, axis),) + results[1:]) else: return elem_to_axis(results, array.dtype, axis)
[docs]class hash_array(object): def __init__(self, array): self.array = np.asarray(array) self.array.setflags(write=False) self._hash = hash(self.array.tobytes()) def __hash__(self): return self._hash def __eq__(self, other): return (self._hash == other._hash and np.all(self.array == other.array))
[docs]def iunique(iterable): """Return an iterable of unique items ordered by first occurrence""" seen = set() for item in iterable: if item not in seen: seen.add(item) yield item
[docs]def random_strings(min_length, max_length=None, digits=string.ascii_lowercase): """Return a random string Parameters ---------- min_length : int The minimum length string to return. max_length : int, optional The maximum length string to return. If None or unspecified, this is the same as min_length. num : int, optional The number of strings to return. If None or unspecified this returns a single string, otherwise it returns a generator with length `num`. digits : str, optional The optional digits to select from. """ if max_length is None: max_length = min_length assert min_length <= max_length, \ "max_length can't be less than min_length" while True: length = random.randint(min_length, max_length) yield ''.join(random.choice(digits) for _ in range(length))
[docs]def prefix_strings(prefix, num): """Returns a list of prefixed integer strings""" padding = int(math.log10(max(num - 1, 1))) + 1 return ('{}{:0{:d}d}'.format(prefix, i, padding) for i in range(num))
[docs]def is_sorted(iterable, *, key=None, reverse=False, strict=False): """Returns true if iterable is sorted Parameters ---------- iterable : iterable The iterable to check for sorted-ness. key : x -> y, optional Apply mapping function key to iterable prior to checking. This can be done before calling, but this ensures identical calls as sorted. reverse : bool, optional `key` and `reverse` function as they for `sorted`""" if key is not None: iterable = map(key, iterable) ai, bi = itertools.tee(iterable) next(bi, None) # Don't throw error if empty if strict and reverse: return all(a > b for a, b in zip(ai, bi)) elif reverse: return all(a >= b for a, b in zip(ai, bi)) elif strict: return all(a < b for a, b in zip(ai, bi)) else: return all(a <= b for a, b in zip(ai, bi))
[docs]def memoize(member_function): """Memoize computation of single object functions""" assert len(inspect.signature(member_function).parameters) == 1, \ "Can only memoize single object functions" @functools.wraps(member_function) def new_member_function(obj): name = '__{}_{}'.format( member_function.__name__, obj.__class__.__name__) if not hasattr(obj, name): setattr(obj, name, member_function(obj)) return getattr(obj, name) return new_member_function
[docs]def deprecated(func): """Mark a function as deprecated""" @functools.wraps(func) def wrapped(*args, **kwargs): warnings.warn("Call to deprecated function {}.".format(func.__name__), category=DeprecationWarning, stacklevel=2) return func(*args, **kwargs) return wrapped