Van Der Merwe, R., Doucet, A., De Freitas, N., & Wan, E. (2000). The Unscented Particle Filter. Advances In Neural Information Processing Systems, 13. Retrieved from http://papers.nips.cc/paper/1818-the-unscented-particle-filter.pdf

%matplotlib inline

import numpy as np
import matplotlib.pyplot as plt
import scipy.stats
/Users/jhamrick/miniconda3/lib/python3.5/site-packages/matplotlib/__init__.py:872: UserWarning: axes.color_cycle is deprecated and replaced with axes.prop_cycle; please use the latter.
  warnings.warn(self.msg_depr % (key, alt_key))

True model

def transition_func(x, t, omega=4*np.e-2, phi=0.5):
    return 1 + np.sin(omega * np.pi * t) + phi * x

def transition(x, t, omega=4*np.e-2, phi=0.5):
    vt = scipy.stats.gamma.rvs(3, scale=1/2)
    return transition_func(x, t, omega=omega, phi=phi) + vt

def transition_density(xnew, xold, t, omega=4*np.e-2, phi=0.5):
    xest = transition_func(xold, t, omega=omega, phi=phi)
    return scipy.stats.gamma.logpdf(xnew - xest, 3, scale=1/2)

def observation_func(x, t, phi1=0.2, phi2=0.5):
    if t <= 30:
        return phi1 * x ** 2
    else:
        return phi2 * x - 2
    
def observation(x, t, phi1=0.2, phi2=0.5):
    nt = scipy.stats.norm.rvs(loc=0, scale=np.sqrt(1e-5))
    return observation_func(x, t, phi1=phi1, phi2=phi2) + nt
    
def observation_density(y, x, t, phi1=0.2, phi2=0.5):
    yest = observation_func(x, t, phi1=phi1, phi2=phi2)
    return scipy.stats.norm.logpdf(y - yest, loc=0, scale=np.sqrt(1e-5))
Xorig = np.empty(60)
Xorig[0] = 1
Yorig = np.empty(60)
Yorig[0] = observation(Xorig[0], 0)

for t in range(1, 60):
    Xorig[t] = transition(Xorig[t-1], t + 1)
    Yorig[t] = observation(Xorig[t], t + 1)

plt.plot(np.arange(60) + 1, Xorig, '-')
plt.xlabel('Time')
plt.ylabel('X')
<matplotlib.text.Text at 0x107c71278>

png

Regular particle filter

def particle_filter(Y, N=200):
    # initialization
    X = np.empty((len(Y) + 1, N))
    w = np.ones((len(Y) + 1, N)) / N
    for i in range(N):
        X[0, i] = np.random.randn() + 1
        
    for t in range(len(Y)):
        # propagate particles and calculate unnormalized weights
        Xtilde = np.empty(N)
        wtilde = np.empty(N)
        for i in range(N):
            Xtilde[i] = transition(X[t, i], t + 1)
            wtilde[i] = observation_density(Y[t], Xtilde[i], t + 1)
            
        # normalize weights
        w[t + 1] = np.exp(wtilde - scipy.misc.logsumexp(wtilde))
        w[t + 1] /= np.sum(w[t + 1])
        
        # resample particles
        X[t + 1] = Xtilde[np.random.choice(np.arange(N), size=N, p=w[t + 1])]
        
        # MCMC step
        for i in range(N):
            u = np.random.rand()
            x = transition(X[t, i], t)
            p_x = np.exp(observation_density(Y[t], x, t + 1))
            p_xtilde = np.exp(observation_density(Y[t], X[t + 1, i], t + 1))
            if u <= min(1, p_x / p_xtilde):
                X[t + 1, i] = x
                
    return X, w
particles = particle_filter(Yorig)
Xest = particles[0][1:].mean(axis=1)
/Users/jhamrick/miniconda3/lib/python3.5/site-packages/ipykernel/__main__.py:29: RuntimeWarning: invalid value encountered in double_scalars
/Users/jhamrick/miniconda3/lib/python3.5/site-packages/ipykernel/__main__.py:29: RuntimeWarning: divide by zero encountered in double_scalars
plt.plot(np.arange(60) + 1, Xorig, 'ko')
plt.plot(np.arange(60) + 1, Xest, 'r-')
plt.xlabel('Time')
plt.ylabel('E[X]')
<matplotlib.text.Text at 0x107d2ce48>

png

Unscented particle filter

def calc_sigma_weights(n=1, alpha=1, kappa=2, beta=0):
    lam = alpha ** 2 * (n + kappa) - n
    Wm = np.empty(2 * n + 1)
    Wc = np.empty(2 * n + 1)
    Wm[0] = lam / (n + lam)
    Wc[0] = (lam / (n + lam)) + (1 - alpha ** 2 + beta)
    for i in range(1, 2 * n + 1):
        Wm[i] = 1 / (2 * (n + lam))
        Wc[i] = 1 / (2 * (n + lam))
    return Wm, Wc
def calc_sigma_points(X, P, n=1, alpha=1, kappa=2):
    lam = alpha ** 2 * (n + kappa) - n
    sigma = np.zeros(2 * n + 1)
    sigma[0] = X.copy()
    for i in range(1, n + 1):
        sigma[i] = X + np.sqrt((n + lam) * P)
        sigma[i + n] = X - np.sqrt((n + lam) * P)
    return sigma    
def unscented_update(Xm_old, P_old, Y_obs, t, n=1):
    # calculate sigma points and weights
    Wm, Wc = calc_sigma_weights()
    sigma = calc_sigma_points(Xm_old, P_old, n=n)

    # propagate sigma points
    X = np.zeros(2 * n + 1)
    Y = np.zeros(2 * n + 1)
    for i in range(2 * n + 1):
        X[i] = transition(sigma[i], t)
        Y[i] = observation(X[i], t)

    # compute conditional t|t-1 distributions
    Xm_cond = np.sum(Wm * X)
    P_cond = np.sum(Wc * (X - Xm_cond) ** 2)
    Ym_cond = np.sum(Wm * Y)

    # incorporate new observation
    Pyy = np.sum(Wc * (Y - Ym_cond) ** 2)
    Pxy = np.sum(Wc * (Y - Ym_cond) * (X - Xm_cond))
    K = Pxy / Pyy
    Xm_new = Xm_cond + K * (Y_obs - Ym_cond)
    P_new = P_cond - (K * Pyy * K)
    
    assert P_new >= 0

    return Xm_new, P_new
def propagate(Xm, P, N):
    Xtilde = np.zeros(N)
    for i in range(N):
        Xtilde[i] = np.random.normal(Xm[i], np.sqrt(P[i]))
    return Xtilde
def calc_weights(Xtilde, X, Y, Xm, P, t, N):
    # calculate unnormalized weights
    wtilde = np.zeros(N)
    for i in range(N):
        p_Xtilde = scipy.stats.norm.logpdf(Xtilde[i], loc=Xm[i], scale=np.sqrt(P[i]))
        p_obs = observation_density(Y, Xtilde[i], t)
        p_trans = transition_density(Xtilde[i], X[i], t)
        wtilde[i] = p_obs + p_trans - p_Xtilde

    # normalize weights
    ix = ~(np.isnan(wtilde) | np.isinf(wtilde))
    w = np.zeros(N)
    w[ix] = np.exp(wtilde[ix] - scipy.misc.logsumexp(wtilde[ix]))
    w /= np.sum(w)
    
    return w
def unscented_particle_filter(Y, N=200, alpha=1, beta=0, kappa=2):
    # PF initialization
    Y = np.concatenate([[0], Y])
    X = np.zeros((len(Y), N))
    w = np.ones((len(Y), N)) / N
    for i in range(N):
        X[0, i] = np.random.randn() + 1

    # UKF initialization
    Xm = np.zeros((len(Y), N))
    Xm[0, :] = X[0].copy()
    P = np.zeros((len(Y), N))
    P[0, :] = 1

    for t in range(1, len(Y)):
        for i in range(N):
            # perform kalman update
            Xm[t, i], P[t, i] = unscented_update(Xm[t - 1, i], P[t - 1, i], Y[t], t)

        # propagate particles and calculate weights
        Xtilde = propagate(Xm[t], P[t], N)
        wtilde = calc_weights(Xtilde, X[t - 1], Y[t - 1], Xm[t], P[t], t, N)

        # resample particles
        idx = np.random.choice(np.arange(N), size=N, p=wtilde)
        X[t] = Xtilde[idx]
        w[t] = wtilde[idx]
        Xm[t] = Xm[t, idx]
        P[t] = P[t, idx]

        # MCMC step
        for i in range(N):
            u = np.random.rand()
            x = transition(X[t - 1, i], t)
            p_x = np.exp(observation_density(Y[t], x, t))
            p_xtilde = np.exp(observation_density(Y[t], X[t, i], t))
            if u <= min(1, p_x / p_xtilde):
                X[t, i] = x
                Xm[t, i] = x
                P[t, i] = 1

    return X, w, Xm, P
unscented_particles = unscented_particle_filter(Yorig)
unscented_Xest = unscented_particles[0][1:].mean(axis=1)
/Users/jhamrick/miniconda3/lib/python3.5/site-packages/ipykernel/__main__.py:37: RuntimeWarning: invalid value encountered in double_scalars
/Users/jhamrick/miniconda3/lib/python3.5/site-packages/ipykernel/__main__.py:37: RuntimeWarning: divide by zero encountered in double_scalars
fig, ax = plt.subplots()
ax.plot(np.arange(60) + 1, Xorig, 'ko', label='True x')
ax.plot(np.arange(60) + 1, Xest, 'r-', label='PF')
ax.plot(np.arange(60) + 1, unscented_Xest, 'b-', label='UPF')
ax.set_xlabel('Time')
ax.set_ylabel('E[X]')
ax.legend(loc='best')
<matplotlib.legend.Legend at 0x107fd5ac8>

png