Demo: The unscented particle filter
Van Der Merwe, R., Doucet, A., De Freitas, N., & Wan, E. (2000). The Unscented Particle Filter. Advances In Neural Information Processing Systems, 13. Retrieved from http://papers.nips.cc/paper/1818-the-unscented-particle-filter.pdf
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt
import scipy.stats
/Users/jhamrick/miniconda3/lib/python3.5/site-packages/matplotlib/__init__.py:872: UserWarning: axes.color_cycle is deprecated and replaced with axes.prop_cycle; please use the latter.
warnings.warn(self.msg_depr % (key, alt_key))
True model
def transition_func(x, t, omega=4*np.e-2, phi=0.5):
return 1 + np.sin(omega * np.pi * t) + phi * x
def transition(x, t, omega=4*np.e-2, phi=0.5):
vt = scipy.stats.gamma.rvs(3, scale=1/2)
return transition_func(x, t, omega=omega, phi=phi) + vt
def transition_density(xnew, xold, t, omega=4*np.e-2, phi=0.5):
xest = transition_func(xold, t, omega=omega, phi=phi)
return scipy.stats.gamma.logpdf(xnew - xest, 3, scale=1/2)
def observation_func(x, t, phi1=0.2, phi2=0.5):
if t <= 30:
return phi1 * x ** 2
else:
return phi2 * x - 2
def observation(x, t, phi1=0.2, phi2=0.5):
nt = scipy.stats.norm.rvs(loc=0, scale=np.sqrt(1e-5))
return observation_func(x, t, phi1=phi1, phi2=phi2) + nt
def observation_density(y, x, t, phi1=0.2, phi2=0.5):
yest = observation_func(x, t, phi1=phi1, phi2=phi2)
return scipy.stats.norm.logpdf(y - yest, loc=0, scale=np.sqrt(1e-5))
Xorig = np.empty(60)
Xorig[0] = 1
Yorig = np.empty(60)
Yorig[0] = observation(Xorig[0], 0)
for t in range(1, 60):
Xorig[t] = transition(Xorig[t-1], t + 1)
Yorig[t] = observation(Xorig[t], t + 1)
plt.plot(np.arange(60) + 1, Xorig, '-')
plt.xlabel('Time')
plt.ylabel('X')
<matplotlib.text.Text at 0x107c71278>
Regular particle filter
def particle_filter(Y, N=200):
# initialization
X = np.empty((len(Y) + 1, N))
w = np.ones((len(Y) + 1, N)) / N
for i in range(N):
X[0, i] = np.random.randn() + 1
for t in range(len(Y)):
# propagate particles and calculate unnormalized weights
Xtilde = np.empty(N)
wtilde = np.empty(N)
for i in range(N):
Xtilde[i] = transition(X[t, i], t + 1)
wtilde[i] = observation_density(Y[t], Xtilde[i], t + 1)
# normalize weights
w[t + 1] = np.exp(wtilde - scipy.misc.logsumexp(wtilde))
w[t + 1] /= np.sum(w[t + 1])
# resample particles
X[t + 1] = Xtilde[np.random.choice(np.arange(N), size=N, p=w[t + 1])]
# MCMC step
for i in range(N):
u = np.random.rand()
x = transition(X[t, i], t)
p_x = np.exp(observation_density(Y[t], x, t + 1))
p_xtilde = np.exp(observation_density(Y[t], X[t + 1, i], t + 1))
if u <= min(1, p_x / p_xtilde):
X[t + 1, i] = x
return X, w
particles = particle_filter(Yorig)
Xest = particles[0][1:].mean(axis=1)
/Users/jhamrick/miniconda3/lib/python3.5/site-packages/ipykernel/__main__.py:29: RuntimeWarning: invalid value encountered in double_scalars
/Users/jhamrick/miniconda3/lib/python3.5/site-packages/ipykernel/__main__.py:29: RuntimeWarning: divide by zero encountered in double_scalars
plt.plot(np.arange(60) + 1, Xorig, 'ko')
plt.plot(np.arange(60) + 1, Xest, 'r-')
plt.xlabel('Time')
plt.ylabel('E[X]')
<matplotlib.text.Text at 0x107d2ce48>
Unscented particle filter
def calc_sigma_weights(n=1, alpha=1, kappa=2, beta=0):
lam = alpha ** 2 * (n + kappa) - n
Wm = np.empty(2 * n + 1)
Wc = np.empty(2 * n + 1)
Wm[0] = lam / (n + lam)
Wc[0] = (lam / (n + lam)) + (1 - alpha ** 2 + beta)
for i in range(1, 2 * n + 1):
Wm[i] = 1 / (2 * (n + lam))
Wc[i] = 1 / (2 * (n + lam))
return Wm, Wc
def calc_sigma_points(X, P, n=1, alpha=1, kappa=2):
lam = alpha ** 2 * (n + kappa) - n
sigma = np.zeros(2 * n + 1)
sigma[0] = X.copy()
for i in range(1, n + 1):
sigma[i] = X + np.sqrt((n + lam) * P)
sigma[i + n] = X - np.sqrt((n + lam) * P)
return sigma
def unscented_update(Xm_old, P_old, Y_obs, t, n=1):
# calculate sigma points and weights
Wm, Wc = calc_sigma_weights()
sigma = calc_sigma_points(Xm_old, P_old, n=n)
# propagate sigma points
X = np.zeros(2 * n + 1)
Y = np.zeros(2 * n + 1)
for i in range(2 * n + 1):
X[i] = transition(sigma[i], t)
Y[i] = observation(X[i], t)
# compute conditional t|t-1 distributions
Xm_cond = np.sum(Wm * X)
P_cond = np.sum(Wc * (X - Xm_cond) ** 2)
Ym_cond = np.sum(Wm * Y)
# incorporate new observation
Pyy = np.sum(Wc * (Y - Ym_cond) ** 2)
Pxy = np.sum(Wc * (Y - Ym_cond) * (X - Xm_cond))
K = Pxy / Pyy
Xm_new = Xm_cond + K * (Y_obs - Ym_cond)
P_new = P_cond - (K * Pyy * K)
assert P_new >= 0
return Xm_new, P_new
def propagate(Xm, P, N):
Xtilde = np.zeros(N)
for i in range(N):
Xtilde[i] = np.random.normal(Xm[i], np.sqrt(P[i]))
return Xtilde
def calc_weights(Xtilde, X, Y, Xm, P, t, N):
# calculate unnormalized weights
wtilde = np.zeros(N)
for i in range(N):
p_Xtilde = scipy.stats.norm.logpdf(Xtilde[i], loc=Xm[i], scale=np.sqrt(P[i]))
p_obs = observation_density(Y, Xtilde[i], t)
p_trans = transition_density(Xtilde[i], X[i], t)
wtilde[i] = p_obs + p_trans - p_Xtilde
# normalize weights
ix = ~(np.isnan(wtilde) | np.isinf(wtilde))
w = np.zeros(N)
w[ix] = np.exp(wtilde[ix] - scipy.misc.logsumexp(wtilde[ix]))
w /= np.sum(w)
return w
def unscented_particle_filter(Y, N=200, alpha=1, beta=0, kappa=2):
# PF initialization
Y = np.concatenate([[0], Y])
X = np.zeros((len(Y), N))
w = np.ones((len(Y), N)) / N
for i in range(N):
X[0, i] = np.random.randn() + 1
# UKF initialization
Xm = np.zeros((len(Y), N))
Xm[0, :] = X[0].copy()
P = np.zeros((len(Y), N))
P[0, :] = 1
for t in range(1, len(Y)):
for i in range(N):
# perform kalman update
Xm[t, i], P[t, i] = unscented_update(Xm[t - 1, i], P[t - 1, i], Y[t], t)
# propagate particles and calculate weights
Xtilde = propagate(Xm[t], P[t], N)
wtilde = calc_weights(Xtilde, X[t - 1], Y[t - 1], Xm[t], P[t], t, N)
# resample particles
idx = np.random.choice(np.arange(N), size=N, p=wtilde)
X[t] = Xtilde[idx]
w[t] = wtilde[idx]
Xm[t] = Xm[t, idx]
P[t] = P[t, idx]
# MCMC step
for i in range(N):
u = np.random.rand()
x = transition(X[t - 1, i], t)
p_x = np.exp(observation_density(Y[t], x, t))
p_xtilde = np.exp(observation_density(Y[t], X[t, i], t))
if u <= min(1, p_x / p_xtilde):
X[t, i] = x
Xm[t, i] = x
P[t, i] = 1
return X, w, Xm, P
unscented_particles = unscented_particle_filter(Yorig)
unscented_Xest = unscented_particles[0][1:].mean(axis=1)
/Users/jhamrick/miniconda3/lib/python3.5/site-packages/ipykernel/__main__.py:37: RuntimeWarning: invalid value encountered in double_scalars
/Users/jhamrick/miniconda3/lib/python3.5/site-packages/ipykernel/__main__.py:37: RuntimeWarning: divide by zero encountered in double_scalars
fig, ax = plt.subplots()
ax.plot(np.arange(60) + 1, Xorig, 'ko', label='True x')
ax.plot(np.arange(60) + 1, Xest, 'r-', label='PF')
ax.plot(np.arange(60) + 1, unscented_Xest, 'b-', label='UPF')
ax.set_xlabel('Time')
ax.set_ylabel('E[X]')
ax.legend(loc='best')
<matplotlib.legend.Legend at 0x107fd5ac8>