Skip to content
238 changes: 161 additions & 77 deletions quantecon/markov/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,11 @@
from __future__ import division
import numpy as np
from fractions import gcd
import sys
from .gth_solve import gth_solve
from ..graph_tools import DiGraph

# -Check if Numba is Available- #
from ..util import searchsorted, numba_installed, jit
from ..util import searchsorted, check_random_state, numba_installed, jit


class MarkovChain(object):
Expand Down Expand Up @@ -140,11 +139,6 @@ class MarkovChain(object):
List of numpy arrays containing the cyclic classes. Defined only
when the Markov chain is irreducible.

Methods
-------
simulate : Simulates the markov chain for a given initial state or
distribution.

"""

def __init__(self, P):
Expand All @@ -167,9 +161,10 @@ def __init__(self, P):
self.n = self.P.shape[0]

# To analyze the structure of P as a directed graph
self.digraph = DiGraph(self.P)
self._digraph = None

self._stationary_dists = None
self._cdfs = None

def __repr__(self):
msg = "Markov chain with transition matrix \nP = \n{0}"
Expand All @@ -183,6 +178,12 @@ def __repr__(self):
def __str__(self):
return str(self.__repr__)

@property
def digraph(self):
if self._digraph is None:
self._digraph = DiGraph(self.P)
return self._digraph

@property
def is_irreducible(self):
return self.digraph.is_strongly_connected
Expand Down Expand Up @@ -256,9 +257,124 @@ def stationary_distributions(self):
self._compute_stationary()
return self._stationary_dists

def simulate(self, init=0, sample_size=1000):
X = mc_sample_path(self.P, init, sample_size)
return X
@property
def cdfs(self):
if self._cdfs is None:
# See issue #137#issuecomment-96128186
cdfs = np.empty((self.n, self.n), order='C')
np.cumsum(self.P, axis=-1, out=cdfs)
self._cdfs = cdfs
return self._cdfs

def simulate(self, ts_length, init=None, num_reps=None, random_state=None):
"""
Simulate time series of state transitions.

Parameters
----------
ts_length : scalar(int)
Length of each simulation.

init : scalar(int) or array_like(int, ndim=1),
optional(default=None)
Initial state(s). If None, the initial state is randomly
drawn.

num_reps : scalar(int), optional(default=None)
Number of repetitions of simulation.

random_state : scalar(int) or np.random.RandomState,
optional(default=None)
Random seed (integer) or np.random.RandomState instance to
set the initial state of the random number generator for
reproducibility. If None, a randomly initialized RandomState
is used.

Returns
-------
X : ndarray(int, ndim=1 or 2)
Array containing the sample path(s), of shape (ts_length,)
if init is a scalar (integer) or None and num_reps is None;
of shape (k, ts_length) otherwise, where k = len(init) if
(init, num_reps) = (array, None), k = num_reps if (init,
num_reps) = (int or None, int), and k = len(init)*num_reps
if (init, num_reps) = (array, int).

"""
random_state = check_random_state(random_state)
dim = 1 # Dimension of the returned array: 1 or 2

try:
k = len(init) # init is an array
dim = 2
init_states = np.asarray(init, dtype=int)
if num_reps is not None:
k *= num_reps
init_states = np.tile(init_states, num_reps)
except TypeError: # init is a scalar(int) or None
k = 1
if num_reps is not None:
dim = 2
k = num_reps
if init is None:
init_states = random_state.randint(self.n, size=k)
elif isinstance(init, int):
init_states = np.ones(k, dtype=int) * init
else:
raise ValueError(
'init must be int, array_like of ints, or None'
)

# === set up array to store output === #
X = np.empty((k, ts_length), dtype=int)

# Random values, uniformly sampled from [0, 1)
random_values = random_state.random_sample(size=(k, ts_length-1))

# Generate sample paths and store in X
_generate_sample_paths(self.cdfs, init_states, random_values, out=X)

if dim == 1:
return X[0]
else:
return X


def _generate_sample_paths(P_cdfs, init_states, random_values, out):
"""
Generate num_reps sample paths of length ts_length, where num_reps =
out.shape[0] and ts_length = out.shape[1].

Parameters
----------
P_cdfs : ndarray(float, ndim=2)
Array containing as rows the CDFs of the state transition.

init_states : array_like(int, ndim=1)
Array containing the initial states. Its length must be equal to
num_reps.

random_values : ndarray(float, ndim=2)
Array containing random values from [0, 1). Its shape must be
equal to (num_reps, ts_length-1)

out : ndarray(int, ndim=2)
Array to store the sample paths.

Notes
-----
This routine is jit-complied if the module Numba is vailable.

"""
num_reps, ts_length = out.shape

for i in range(num_reps):
out[i, 0] = init_states[i]
for t in range(ts_length-1):
out[i, t+1] = searchsorted(P_cdfs[out[i, t]], random_values[i, t])

if numba_installed:
_generate_sample_paths = jit(nopython=True)(_generate_sample_paths)


def mc_compute_stationary(P):
Expand All @@ -276,78 +392,46 @@ def mc_compute_stationary(P):
return MarkovChain(P).stationary_distributions


def mc_sample_path(P, init=0, sample_size=1000):
"""
See Section: DocStrings below
def mc_sample_path(P, init=0, sample_size=1000, random_state=None):
"""
n = len(P)
Generates one sample path from the Markov chain represented by
(n x n) transition matrix P on state space S = {{0,...,n-1}}.

# CDFs, one for each row of P
cdfs = np.empty((n, n), order='C') # see issue #137#issuecomment-96128186
np.cumsum(P, axis=-1, out=cdfs)

# Random values, uniformly sampled from [0, 1)
u = np.random.random(size=sample_size)

# === set up array to store output === #
X = np.empty(sample_size, dtype=int)
if isinstance(init, int):
X[0] = init
else:
cdf0 = np.cumsum(init)
X[0] = searchsorted(cdf0, u[0])

# === generate the sample path === #
for t in range(sample_size-1):
X[t+1] = searchsorted(cdfs[X[t]], u[t+1])

return X

if numba_installed:
mc_sample_path = jit(mc_sample_path)


# ------------ #
# -DocStrings- #
# ------------ #

# -mc_sample_path() function and MarkovChain.simulate() method- #
_sample_path_docstr = \
"""
Generates one sample path from the Markov chain represented by (n x n)
transition matrix P on state space S = {{0,...,n-1}}.

Parameters
----------
{p_arg}
init : array_like(float ndim=1) or scalar(int), optional(default=0)
If init is an array_like, then it is treated as the initial
distribution across states. If init is a scalar, then it treated as
the deterministic initial state.
Parameters
----------
P : array_like(float, ndim=2)
A Markov transition matrix.

sample_size : scalar(int), optional(default=1000)
The length of the sample path.
init : array_like(float ndim=1) or scalar(int), optional(default=0)
If init is an array_like, then it is treated as the initial
distribution across states. If init is a scalar, then it
treated as the deterministic initial state.

Returns
-------
X : array_like(int, ndim=1)
The simulation of states.
sample_size : scalar(int), optional(default=1000)
The length of the sample path.

"""
random_state : scalar(int) or np.random.RandomState,
optional(default=None)
Random seed (integer) or np.random.RandomState instance to set
the initial state of the random number generator for
reproducibility. If None, a randomly initialized RandomState is
used.

# -Functions- #
Returns
-------
X : array_like(int, ndim=1)
The simulation of states.

# -mc_sample_path- #
mc_sample_path.__doc__ = _sample_path_docstr.format(p_arg="""
P : array_like(float, ndim=2)
A Markov transition matrix.
""")
"""
random_state = check_random_state(random_state)

# -Methods- #
if isinstance(init, int):
X_0 = init
else:
cdf0 = np.cumsum(init)
u_0 = random_state.random_sample()
X_0 = searchsorted(cdf0, u_0)

# -Markovchain.simulate()- #
if sys.version_info[0] == 3:
MarkovChain.simulate.__doc__ = _sample_path_docstr.format(p_arg="")
elif sys.version_info[0] == 2:
MarkovChain.simulate.__func__.__doc__ = \
_sample_path_docstr.format(p_arg="")
mc = MarkovChain(P)
return mc.simulate(ts_length=sample_size, init=X_0,
random_state=random_state)
Loading