import numpy as np
import logging
import matplotlib.pyplot as plt
import scipy.stats
from copy import copy, deepcopy
from gp import GP, GaussianKernel, PeriodicKernel
from . import bq_c
from . import linalg_c as la
from . import util
logger = logging.getLogger("bayesian_quadrature")
DTYPE = np.dtype('float64')
MIN = np.log(np.exp2(np.float64(np.finfo(np.float64).minexp + 4)))
MAX = np.log(np.exp2(np.float64(np.finfo(np.float64).maxexp - 4)))
[docs]class BQ(object):
r"""
Estimate an integral of the following form using Bayesian
Quadrature with a Gaussian Process prior:
.. math::
Z = \int \ell(x)\mathcal{N}(x\ |\ \mu, \sigma^2)\ \mathrm{d}x
See :meth:`~bayesian_quadrature.bq.BQ.load_options` for details on
allowable options.
Parameters
----------
x : numpy.ndarray
size :math:`s` array of sample locations
l : numpy.ndarray
size :math:`s` array of sample observations
options : dict
Options dictionary
Notes
-----
This algorithm is an updated version of the one described in
[OD12]_. The overall idea is:
1. Estimate :math:`\log\ell` using a GP.
2. Estimate :math:`\bar{\ell}=\exp(\log\ell)` using second GP.
3. Integrate exactly under :math:`\bar{\ell}`.
"""
##################################################################
# Initialization #
##################################################################
def __init__(self, x, l, **options):
"""Initialize the Bayesian Quadrature object."""
#: Vector of observed locations
self.x_s = np.array(x, dtype=DTYPE)
#: Vector of observed values
self.l_s = np.array(l, dtype=DTYPE)
if (self.l_s <= 0).any():
raise ValueError("l_s contains zero or negative values")
if self.x_s.ndim > 1:
raise ValueError("invalid number of dimensions for x")
if self.l_s.ndim > 1:
raise ValueError("invalid number of dimensions for l")
if self.x_s.shape != self.l_s.shape:
raise ValueError("shape mismatch for x and l")
#: Vector of log-transformed observed values
self.tl_s = np.log(self.l_s)
#: Number of observations
self.ns = self.x_s.shape[0]
self.load_options(**options)
self.initialized = False
self.gp_log_l = None #: Gaussian process over log(l)
self.gp_l = None #: Gaussian process over exp(log(l))
self.x_c = None #: Vector of candidate locations
self.l_c = None #: Vector of candidate values
self.nc = None #: Number of candidate points
self.x_sc = None #: Vector of observed plus candidate locations
self.l_sc = None #: Vector of observed plus candidate values
self.nsc = None #: Number of observations plus candidates
self._approx_x = None
self._approx_px = None
[docs] def load_options(self, kernel, n_candidate, candidate_thresh, x_mean, x_var, optim_method):
r"""
Load options.
Parameters
----------
kernel : Kernel
The type of kernel to use. Note that if the kernel is not
Gaussian, slow approximate (rather than analytic)
solutions will be used.
n_candidate : int
The (maximum) number of candidate points.
candidate_thresh : float
Minimum allowed space between candidates.
x_mean : float
Prior mean, :math:`\mu`.
x_var : float
Prior variance, :math:`\sigma^2`.
optim_method : string
Method to use for parameter optimization (e.g., 'L-BFGS-B'
or 'Powell')
"""
# store the options dictionary for future use
self.options = {
'kernel': kernel,
'n_candidate': int(n_candidate),
'candidate_thresh': float(candidate_thresh),
'x_mean': np.array([x_mean], dtype=DTYPE, order='F'),
'x_cov': np.array([[x_var]], dtype=DTYPE, order='F'),
'use_approx': not (kernel is GaussianKernel),
'wrapped': kernel is PeriodicKernel,
'optim_method': optim_method
}
if self.options['use_approx']:
logger.debug("Using approximate solutions for non-Gaussian kernel")
[docs] def init(self, params_tl, params_l):
"""Initialize the GPs.
Parameters
----------
params_tl : np.ndarray
initial parameters for GP over :math:`\log\ell`
params_l : np.ndarray
initial parameters for GP over :math:`\exp(\log\ell)`
"""
kernel = self.options['kernel']
# create the gaussian process over log(l)
self.gp_log_l = GP(
kernel(*params_tl[:-1]),
self.x_s, self.tl_s,
s=params_tl[-1])
# TODO: improve matrix conditioning for log(l)
self.gp_log_l.jitter = np.zeros(self.ns, dtype=DTYPE)
# pick candidate points
self._choose_candidates()
# create the gaussian process over exp(log(l))
self.gp_l = GP(
kernel(*params_l[:-1]),
self.x_sc, self.l_sc,
s=params_l[-1])
# TODO: improve matrix conditioning for exp(log(l))
self.gp_l.jitter = np.zeros(self.nsc, dtype=DTYPE)
# make the vector of locations for approximations
self._approx_x = self._make_approx_x()
self._approx_px = self._make_approx_px()
self.initialized = True
##################################################################
# Mean and variance of l #
##################################################################
[docs] def l_mean(self, x):
r"""
Mean of the final approximation to :math:`\ell`.
Parameters
----------
x : numpy.ndarray
:math:`m` array of new sample locations.
Returns
-------
mean : numpy.ndarray
:math:`m` array of predictive means
Notes
-----
This is just the mean of the GP over :math:`\exp(\log\ell)`, i.e.:
.. math::
\mathbb{E}[\bar{\ell}(\mathbf{x})] = \mathbb{E}_{\mathrm{GP}(\exp(\log\ell))}(\mathbf{x})
"""
return self.gp_l.mean(x)
[docs] def l_var(self, x):
r"""
Marginal variance of the final approximation to :math:`\ell`.
Parameters
----------
x : numpy.ndarray
:math:`m` array of new sample locations.
Returns
-------
mean : numpy.ndarray
:math:`m` array of predictive variances
Notes
-----
This is just the diagonal of the covariance of the GP over
:math:`\log\ell` multiplied by the squared mean of the GP over
:math:`\exp(\log\ell)`, i.e.:
.. math::
\mathbb{V}[\bar{\ell}(\mathbf{x})] = \mathbb{V}_{\mathrm{GP}(\log\ell)}(\mathbf{x})\mathbb{E}_{\mathrm{GP}(\exp(\log\ell))}(\mathbf{x})^2
"""
v_log_l = np.diag(self.gp_log_l.cov(x)).copy()
m_l = self.gp_l.mean(x)
l_var = v_log_l * m_l ** 2
l_var[l_var < 0] = 0
return l_var
##################################################################
# Mean of Z #
##################################################################
[docs] def Z_mean(self):
r"""
Computes the mean of :math:`Z`, which is defined as:
.. math ::
\mathbb{E}[Z]=\int \bar{\ell}(x)p(x)\ \mathrm{d}x
Returns
-------
mean : float
"""
if self.options['use_approx']:
return self._approx_Z_mean()
else:
return self._exact_Z_mean()
def _approx_Z_mean(self, xo=None):
if xo is None:
xo = self._approx_x
p_xo = self._approx_px
else:
p_xo = self._make_approx_px(xo)
approx = bq_c.approx_Z_mean(
np.array(xo[None], order='F'), p_xo, self.l_mean(xo))
return approx
def _exact_Z_mean(self):
r"""
Equivalent to:
.. math::
\begin{align*}
\mathbb{E}[Z]&\approx \int\bar{\ell}(x)\mathcal{N}(x\ |\ \mu, \sigma^2)\ \mathrm{d}x \\
&= \left(\int K_{\exp(\log\ell)}(x, \mathbf{x}_c)\mathcal{N}(x\ |\ \mu, \sigma^2)\ \mathrm{d}x\right)K_{\exp(\log\ell)}(\mathbf{x}_c, \mathbf{x}_c)^{-1}\ell(\mathbf{x}_c)
\end{align*}
"""
x_sc = np.array(self.x_sc[None], order='F')
alpha_l = self.gp_l.inv_Kxx_y
h_s, w_s = self.gp_l.K.params
w_s = np.array([w_s], order='F')
m_Z = bq_c.Z_mean(
x_sc, alpha_l, h_s, w_s,
self.options['x_mean'],
self.options['x_cov'])
return m_Z
##################################################################
# Variance of Z #
##################################################################
[docs] def Z_var(self):
r"""
Computes the variance of :math:`Z`, which is defined as:
.. math::
\mathbb{V}(Z)\approx \int\int \mathrm{Cov}_{\log\ell}(x, x^\prime)\bar{\ell}(x)\bar{\ell}(x^\prime)p(x)p(x^\prime)\ \mathrm{d}x\ \mathrm{d}x^\prime
Returns
-------
var : float
"""
if self.options['use_approx']:
return self._approx_Z_var()
else:
return self._exact_Z_var()
def _approx_Z_var(self, xo=None):
if xo is None:
xo = self._approx_x
p_xo = self._approx_px
else:
p_xo = self._make_approx_px(xo)
approx = bq_c.approx_Z_var(
np.array(xo[None], order='F'), p_xo,
np.array(self.l_mean(xo), order='F'),
np.array(self.gp_log_l.cov(xo), order='F'))
return approx
def _exact_Z_var(self):
# values for the GPs over l(x) and log(l(x))
x_s = np.array(self.x_s[None], order='F')
x_sc = np.array(self.x_sc[None], order='F')
alpha_l = self.gp_l.inv_Kxx_y
L_tl = np.array(self.gp_log_l.Lxx, order='F')
h_l, w_l = self.gp_l.K.params
w_l = np.array([w_l])
h_tl, w_tl = self.gp_log_l.K.params
w_tl = np.array([w_tl])
V_Z = bq_c.Z_var(
x_s, x_sc, alpha_l, L_tl,
h_l, w_l, h_tl, w_tl,
self.options['x_mean'],
self.options['x_cov'])
return V_Z
##################################################################
# Expected variance of Z #
##################################################################
[docs] def expected_Z_var(self, x_a):
r"""
Computes the expected variance of :math:`Z` given a new
observation :math:`x_a`. This is defined as:
.. math ::
\mathbb{E}[V(Z)\ |\ \ell_s, \ell_a] = \mathbb{E}[Z\ |\ \ell_s]^2 + V(Z\ |\ \ell_s) - \int \mathbb{E}[Z\ |\ \ell_s, \ell_a]^2 \mathcal{N}(\ell_a\ |\ \hat{m}_a, \hat{C}_a)\ \mathrm{d}\ell_a
Parameters
----------
x_a : numpy.ndarray
vector of points for which to (independently) compute the
expected variance
Returns
-------
out : expected variance for each point in `x_a`
"""
mean_second_moment = self.Z_mean() ** 2 + self.Z_var()
expected_squared_mean = self.expected_squared_mean(x_a)
expected_var = mean_second_moment - expected_squared_mean
return expected_var
[docs] def expected_squared_mean(self, x_a):
r"""
Computes the expected square mean of :math:`Z` given a new
observation :math:`x_a`. This is defined as:
.. math ::
\mathbb{E}[\mathbb{E}[Z]^2 |\ \ell_s] = \int \mathbb{E}[Z\ |\ \ell_s, \ell_a]^2 \mathcal{N}(\ell_a\ |\ \hat{m}_a, \hat{C}_a)\ \mathrm{d}\ell_a
Parameters
----------
x_a : numpy.ndarray
vector of points for which to (independently) compute the
expected squared mean
Returns
-------
out : expected squared mean for each point in `x_a`
"""
esm = np.empty(x_a.shape[0])
for i in xrange(x_a.shape[0]):
esm[i] = self._esm_and_em(x_a[[i]])[0]
return esm
[docs] def expected_mean(self, x_a):
r"""
Computes the expected mean of :math:`Z` given a new
observation :math:`x_a`.
Parameters
----------
x_a : numpy.ndarray
vector of points for which to (independently) compute the
expected mean
Returns
-------
out : expected mean for each point in `x_a`
"""
em = np.empty(x_a.shape[0])
for i in xrange(x_a.shape[0]):
em[i] = self._esm_and_em(x_a[[i]])[1]
return em
[docs] def expected_squared_mean_and_mean(self, x_a):
r"""
Computes the expected squared mean and expected mean of
:math:`Z` given a new observation :math:`x_a`.
Parameters
----------
x_a : numpy.ndarray
vector of points for which to (independently) compute the
expected mean
Returns
-------
out : expected squared mean and expected mean for each point
in `x_a`
"""
em = np.empty((x_a.shape[0], 2))
for i in xrange(x_a.shape[0]):
em[i] = self._esm_and_em(x_a[[i]])
return em
def _esm_and_em(self, x_a):
"""Computes the expected square mean for a single point `x_a`."""
# check for invalid inputs
if x_a is None or np.isnan(x_a) or np.isinf(x_a):
raise ValueError("invalid value for x_a: %s", x_a)
# don't do the heavy computation if the point is close to one
# we already have
if np.isclose(x_a, self.x_s, atol=1e-4).any():
em = self.Z_mean()
esm = em ** 2
return esm, em
# include new x_a
x_sca = np.concatenate([self.x_sc, x_a])
# compute K_l(x_sca, x_sca)
K_l = self.gp_l.Kxoxo(x_sca)
jitter = np.zeros(self.nsc + 1)
# add noise to the candidate points closest to x_a, since they
# are likely to change
close = np.abs(self.x_c - x_a) < self.options['candidate_thresh']
if close.any():
idx = np.array(np.nonzero(close)[0]) + self.ns
bq_c.improve_covariance_conditioning(K_l, jitter, idx)
# also add noise to the new point
bq_c.improve_covariance_conditioning(K_l, jitter, np.array([self.nsc]))
L = np.empty(K_l.shape, order='F')
try:
la.cho_factor(np.array(K_l, order='F'), L)
except np.linalg.LinAlgError:
# if the matrix is singular, it's because either x_a is
# close to a point we already have, or the kernel produces
# similar values for all points (e.g., there is a very
# large variance). In both cases, out expectation should
# be that the mean won't change much, so just return the
# mean we currently have.
em = self.Z_mean()
esm = em ** 2
return esm, em
# compute expected transformed mean
tm_a = np.array(self.gp_log_l.mean(x_a))
# compute expected transformed covariance
tC_a = np.array(self.gp_log_l.cov(x_a), order='F')
if self.options['use_approx']:
xo = self._approx_x
p_xo = self._approx_px
Kxxo = np.array(self.gp_l.K(x_sca, xo), order='F')
esm, em = bq_c.approx_expected_squared_mean_and_mean(
self.l_sc, L, tm_a, tC_a,
np.array(xo[None], order='F'), p_xo, Kxxo)
else:
esm, em = bq_c.expected_squared_mean_and_mean(
self.l_sc, L, tm_a, tC_a,
np.array(x_sca[None], order='F'),
self.gp_l.K.h, np.array([self.gp_l.K.w]),
self.options['x_mean'],
np.array(self.options['x_cov'], order='F'))
if np.isnan(esm) or esm < 0:
raise RuntimeError(
"invalid expected squared mean for x_a=%s: %s" % (
x_a, esm))
if np.isnan(em):
raise RuntimeError(
"invalid expected mean for x_a=%s: %s" % (x_a, em))
if np.isinf(esm):
logger.warn("expected squared mean for x_a=%s is infinity!", x_a)
if np.isinf(em):
logger.warn("expected mean for x_a=%s is infinity!", x_a)
return esm, em
##################################################################
# Hyperparameter optimization/marginalization #
##################################################################
def _make_llh_params(self, params):
nparam = len(params)
def f(x):
if x is None or np.isnan(x).any():
return -np.inf
try:
self._set_gp_log_l_params(dict(zip(params, x[:nparam])))
self._set_gp_l_params(dict(zip(params, x[nparam:])))
except (ValueError, np.linalg.LinAlgError):
return -np.inf
try:
llh = self.gp_log_l.log_lh + self.gp_l.log_lh
except (ValueError, np.linalg.LinAlgError):
return -np.inf
return llh
return f
[docs] def fit_hypers(self, params):
p0_tl = [self.gp_log_l.get_param(p) for p in params]
p0_l = [self.gp_l.get_param(p) for p in params]
p0 = np.array(p0_tl + p0_l)
f = self._make_llh_params(params)
p0 = util.find_good_parameters(f, p0, self.options['optim_method'])
if p0 is None:
raise RuntimeError("couldn't find good parameters")
[docs] def sample_hypers(self, params, n=1, nburn=10):
r"""
Use slice sampling to samples new hyperparameters for the two
GPs. Note that this will probably cause
:math:`\bar{ell}(x_{sc})` to change.
Parameters
----------
params : list of strings
Dictionary of parameter names to be sampled
nburn : int
Number of burn-in samples
"""
# TODO: should the window be a parameter that is set by the
# user? is there a way to choose a sane window size
# automatically?
nparam = len(params)
window = 2 * nparam
p0_tl = [self.gp_log_l.get_param(p) for p in params]
p0_l = [self.gp_l.get_param(p) for p in params]
p0 = np.array(p0_tl + p0_l)
f = self._make_llh_params(params)
if f(p0) < MIN:
pn = util.find_good_parameters(f, p0, self.options['optim_method'])
if pn is None:
raise RuntimeError("couldn't find good starting parameters")
p0 = pn
hypers = util.slice_sample(f, nburn+n, window, p0, nburn=nburn, freq=1)
return hypers[:, :nparam], hypers[:, nparam:]
##################################################################
# Active sampling #
##################################################################
[docs] def marginalize(self, funs, n, params):
r"""
Compute the approximate marginal functions `funs` by
approximately marginalizing over the GP hyperparameters.
Parameters
----------
funs : list
List of functions for which to compute the marginal.
n : int
Number of samples to take when doing the approximate
marginalization
params : list or tuple
List of parameters to marginalize over
"""
# cache state
state = deepcopy(self.__getstate__())
# allocate space for the function values
values = []
for fun in funs:
value = fun()
try:
m = value.shape
except AttributeError:
values.append(np.empty(n))
else:
values.append(np.empty((n,) + m))
# do all the sampling at once, because it is faster
hypers_tl, hypers_l = self.sample_hypers(params, n=n, nburn=1)
# compute values for the functions based on the parameters we
# just sampled
for i in xrange(n):
params_tl = dict(zip(params, hypers_tl[i]))
params_l = dict(zip(params, hypers_l[i]))
self._set_gp_log_l_params(params_tl)
self._set_gp_l_params(params_l)
for j, fun in enumerate(funs):
try:
values[j][i] = fun()
except:
logger.error(
"error with parameters %s and %s", params_tl, params_l)
raise
# restore state
self.__setstate__(state)
return values
[docs] def choose_next(self, x_a, n, params, plot=False):
f = lambda: -self.expected_squared_mean(x_a)
values = self.marginalize([f], n, params)
loss = values[0].mean(axis=0)
best = np.min(loss)
close = np.nonzero(np.isclose(loss, best))[0]
choice = np.random.choice(close)
best = x_a[choice]
if plot:
fig, (ax1, ax2) = plt.subplots(1, 2, sharex=True)
self.plot_l(ax1, xmin=x_a.min(), xmax=x_a.max())
util.vlines(ax1, best, color='g', linestyle='--', lw=2)
ax2.plot(x_a, loss, 'k-', lw=2)
util.vlines(ax2, best, color='g', linestyle='--', lw=2)
ax2.set_title("Negative expected sq. mean")
fig.set_figwidth(10)
fig.set_figheight(3.5)
return best
[docs] def add_observation(self, x_a, l_a):
diffs = np.abs(x_a - self.x_s)
if diffs.min() < self.options['candidate_thresh']:
c = diffs.argmin()
logger.debug(
"x_a=%s is close to x_s=%s, averaging them",
x_a, self.x_s[c])
self.x_s[c] = (self.x_s[c] + x_a) / 2.
self.l_s[c] = (self.l_s[c] + l_a) / 2.
self.tl_s[c] = np.log(float(self.l_s[c]))
else:
self.x_s = np.append(self.x_s, float(x_a))
self.l_s = np.append(self.l_s, float(l_a))
self.tl_s = np.append(self.tl_s, np.log(float(l_a)))
self.ns += 1
# reinitialize the bq object
self.init(self.gp_log_l.params, self.gp_l.params)
##################################################################
# Plotting methods #
##################################################################
[docs] def plot_gp_log_l(self, ax, f_l=None, xmin=None, xmax=None):
x = self._make_approx_x(xmin=xmin, xmax=xmax, n=1000)
if f_l is not None:
l = np.log(f_l(x))
ax.plot(x, l, 'k-', lw=2)
self.gp_log_l.plot(ax, xlim=[x.min(), x.max()], color='r')
ax.plot(
self.x_c, np.log(self.l_c),
'bs', markersize=4,
label="$m_{\log\ell}(x_c)$")
ax.set_title(r"GP over $\log\ell$")
util.set_scientific(ax, -5, 4)
[docs] def plot_gp_l(self, ax, f_l=None, xmin=None, xmax=None):
x = self._make_approx_x(xmin=xmin, xmax=xmax, n=1000)
if f_l is not None:
l = f_l(x)
ax.plot(x, l, 'k-', lw=2)
self.gp_l.plot(ax, xlim=[x.min(), x.max()], color='r')
ax.plot(
self.x_c, self.l_c,
'bs', markersize=4,
label="$\exp(m_{\log\ell}(x_c))$")
ax.set_title(r"GP over $\exp(\log\ell)$")
util.set_scientific(ax, -5, 4)
[docs] def plot_l(self, ax, f_l=None, xmin=None, xmax=None, legend=True):
x = self._make_approx_x(xmin=xmin, xmax=xmax, n=1000)
if f_l is not None:
l = f_l(x)
ax.plot(x, l, 'k-', lw=2, label="$\ell(x)$")
l_mean = self.l_mean(x)
l_var = np.sqrt(self.l_var(x))
lower = l_mean - l_var
upper = l_mean + l_var
ax.fill_between(x, lower, upper, color='r', alpha=0.2)
ax.plot(
x, l_mean,
'r-', lw=2,
label="final approx")
ax.plot(
self.x_s, self.l_s,
'ro', markersize=5,
label="$\ell(x_s)$")
ax.plot(
self.x_c, self.l_c,
'bs', markersize=4,
label="$\exp(m_{\log\ell}(x_c))$")
ax.set_title("Final Approximation")
ax.set_xlim(x.min(), x.max())
util.set_scientific(ax, -5, 4)
if legend:
ax.legend(loc=0, fontsize=10)
[docs] def plot_expected_squared_mean(self, ax, xmin=None, xmax=None):
x_a = self._make_approx_x(xmin=xmin, xmax=xmax, n=1000)
exp_sq_m = self.expected_squared_mean(x_a)
# plot the expected variance
ax.plot(x_a, exp_sq_m,
label=r"$E[\mathrm{m}(Z)^2]$",
color='k', lw=2)
ax.set_xlim(x_a.min(), x_a.max())
# plot a line for the current variance
util.hlines(
ax, self.Z_mean() ** 2,
color="#00FF00", lw=2,
label=r"$\mathrm{m}(Z)^2$")
# plot lines where there are observatiosn
util.vlines(ax, self.x_sc, color='k', linestyle='--', alpha=0.5)
util.set_scientific(ax, -5, 4)
ax.legend(loc=0, fontsize=10)
ax.set_title(r"Expected squared mean of $Z$")
[docs] def plot_expected_variance(self, ax, xmin=None, xmax=None):
x_a = self._make_approx_x(xmin=xmin, xmax=xmax, n=1000)
exp_Z_var = self.expected_Z_var(x_a)
# plot the expected variance
ax.plot(x_a, exp_Z_var,
label=r"$E[\mathrm{Var}(Z)]$",
color='k', lw=2)
ax.set_xlim(x_a.min(), x_a.max())
# plot a line for the current variance
util.hlines(
ax, self.Z_var(),
color="#00FF00", lw=2,
label=r"$\mathrm{Var}(Z)$")
# plot lines where there are observations
util.vlines(ax, self.x_sc, color='k', linestyle='--', alpha=0.5)
util.set_scientific(ax, -5, 4)
ax.legend(loc=0, fontsize=10)
ax.set_title(r"Expected variance of $Z$")
[docs] def plot(self, f_l=None, xmin=None, xmax=None):
fig, axes = plt.subplots(1, 3)
self.plot_gp_log_l(axes[0], f_l=f_l, xmin=xmin, xmax=xmax)
self.plot_gp_l(axes[1], f_l=f_l, xmin=xmin, xmax=xmax)
self.plot_l(axes[2], f_l=f_l, xmin=xmin, xmax=xmax)
ymins, ymaxs = zip(*[ax.get_ylim() for ax in axes[1:]])
ymin = min(ymins)
ymax = max(ymaxs)
for ax in axes[1:]:
ax.set_ylim(ymin, ymax)
fig.set_figwidth(14)
fig.set_figheight(3.5)
return fig, axes
##################################################################
# Saving and restoring #
##################################################################
def __getstate__(self):
state = {}
state['x_s'] = self.x_s
state['l_s'] = self.l_s
state['tl_s'] = self.tl_s
state['options'] = self.options
state['initialized'] = self.initialized
if self.initialized:
state['gp_log_l'] = self.gp_log_l
state['gp_log_l_jitter'] = self.gp_log_l.jitter
state['gp_l'] = self.gp_l
state['gp_l_jitter'] = self.gp_l.jitter
state['_approx_x'] = self._approx_x
state['_approx_px'] = self._approx_px
return state
def __setstate__(self, state):
self.x_s = state['x_s']
self.l_s = state['l_s']
self.tl_s = state['tl_s']
self.ns = self.x_s.shape[0]
self.options = state['options']
self.initialized = state['initialized']
if self.initialized:
self.gp_log_l = state['gp_log_l']
self.gp_log_l.jitter = state['gp_log_l_jitter']
self.gp_l = state['gp_l']
self.gp_l.jitter = state['gp_l_jitter']
self.x_sc = self.gp_l._x
self.l_sc = self.gp_l._y
self.nsc = self.x_sc.shape[0]
self.x_c = self.x_sc[self.ns:]
self.l_c = self.l_sc[self.ns:]
self.nc = self.nsc - self.ns
self._approx_x = state['_approx_x']
self._approx_px = state['_approx_px']
else:
self.gp_log_l = None
self.gp_l = None
self.x_c = None
self.l_c = None
self.nc = None
self.x_sc = None
self.l_sc = None
self.nsc = None
self._approx_x = None
self._approx_px = None
##################################################################
# Copying #
##################################################################
def __copy__(self):
state = self.__getstate__()
cls = type(self)
bq = cls.__new__(cls)
bq.__setstate__(state)
return bq
def __deepcopy__(self, memo):
state = deepcopy(self.__getstate__(), memo)
cls = type(self)
bq = cls.__new__(cls)
bq.__setstate__(state)
return bq
[docs] def copy(self, deep=True):
if deep:
out = deepcopy(self)
else:
out = copy(self)
return out
##################################################################
# Helper methods #
##################################################################
def _set_gp_log_l_params(self, params):
# set the parameter values
for p, v in params.iteritems():
self.gp_log_l.set_param(p, v)
# TODO: improve matrix conditioning for log(l)
self.gp_log_l.jitter.fill(0)
# update values of candidate points
m = self.gp_log_l.mean(self.x_c)
V = np.diag(self.gp_log_l.cov(self.x_c)).copy()
V[V < 0] = 0
tl_c = m + 2 * np.sqrt(V)
if (tl_c > MAX).any():
raise np.linalg.LinAlgError("GP mean is too large")
self.l_c = np.exp(m)
self.l_sc = np.array(np.concatenate([self.l_s, self.l_c]))
# update the locations and values for exp(log(l))
self.gp_l.x = self.x_sc
self.gp_l.y = self.l_sc
# TODO: improve matrix conditioning for exp(log(l))
self.gp_l.jitter.fill(0)
def _set_gp_l_params(self, params):
# set the parameter values
for p, v in params.iteritems():
self.gp_l.set_param(p, v)
# TODO: improve matrix conditioning for exp(log(l))
self.gp_l.jitter.fill(0)
def _choose_candidates(self):
logger.debug("Choosing candidate points")
if self.options['wrapped']:
xmin = -np.pi * self.gp_log_l.K.p
xmax = np.pi * self.gp_log_l.K.p
else:
xmin = self.x_s.min() - self.gp_log_l.K.w
xmax = self.x_s.max() + self.gp_log_l.K.w
# compute the candidate points
xc = np.random.uniform(xmin, xmax, self.options['n_candidate'])
# make sure they don't overlap with points we already have
bq_c.filter_candidates(xc, self.x_s, self.options['candidate_thresh'])
# save the locations and compute the values
self.x_c = np.sort(xc[~np.isnan(xc)])
self.l_c = np.exp(self.gp_log_l.mean(self.x_c))
self.nc = self.x_c.shape[0]
# concatenate with the observations we already have
self.x_sc = np.array(np.concatenate([self.x_s, self.x_c]))
self.l_sc = np.array(np.concatenate([self.l_s, self.l_c]))
self.nsc = self.ns + self.nc
def _make_approx_x(self, xmin=None, xmax=None, n=1000):
if xmin is None:
if self.options['wrapped']:
xmin = -np.pi * self.gp_log_l.K.p
else:
xmin = self.x_sc.min() - self.gp_log_l.K.w
if xmax is None:
if self.options['wrapped']:
xmax = np.pi * self.gp_log_l.K.p
else:
xmax = self.x_sc.max() + self.gp_log_l.K.w
return np.linspace(xmin, xmax, n)
def _make_approx_px(self, x=None):
if x is None:
x = self._approx_x
p = np.empty(x.size, order='F')
if self.options['wrapped']:
bq_c.p_x_vonmises(
p, x,
float(self.options['x_mean']),
1. / float(self.options['x_cov']))
else:
bq_c.p_x_gaussian(
p, np.array(x[None], order='F'),
self.options['x_mean'],
self.options['x_cov'])
return p