Source code for pyapprox.expdesign.bayesian_oed

import os
import math
import numpy as np
from scipy.spatial.distance import cdist
from pyapprox.util.pya_numba import njit
from functools import partial
from multiprocessing import Pool, RawArray
from abc import ABC, abstractmethod
from scipy import stats
import time
import itertools

from pyapprox.util.sys_utilities import trace_error_with_msg
from pyapprox.util.utilities import (
    get_tensor_product_quadrature_rule, split_indices)
from pyapprox.variables.risk import (
    conditional_value_at_risk, conditional_value_at_risk_vectorized,
    entropic_risk_measure)
from pyapprox.variables.transforms import AffineTransform
from pyapprox.variables.joint import IndependentMarginalsVariable
from pyapprox.surrogates.interp.tensorprod import (
    get_tensor_product_piecewise_polynomial_quadrature_rule)
from pyapprox.surrogates.polychaos.gpc import (
    get_univariate_quadrature_rules_from_variable)
from pyapprox.surrogates.interp.barycentric_interpolation import (
    compute_barycentric_weights_1d,
    multivariate_barycentric_lagrange_interpolation)
from pyapprox.surrogates.integrate import integrate


def gaussian_loglike_fun_broadcast(
        obs, pred_obs, noise_std, active_indices=None):
    """
    Conmpute the log-likelihood values from a set of real and predicted
    observations

    Parameters
    ----------
    obs : np.ndarray (nsamples, nobs)
        The real observations repeated for each sample

    pred_obs : np.ndarray (nsamples, nobs)
        The observations predicited by the model for a set of samples

    noise_std : float or np.ndarray (nobs, 1)
        The standard deviation of Gaussian noise added to each observation

    active_indices : np.ndarray (nobs, 1)
        The subset of indices of the observations used to compute the
        likelihood

    Returns
    -------
    llike : np.ndaray (nsamples, 1)

    Notes
    -----
    This can handle 1d, 2d, 3d arrays but is slower
    due to broadcasting when computing obs-pred_obs
    """
    if (type(noise_std) == np.ndarray and
            noise_std.shape[0] != obs.shape[-1]):
        raise ValueError("noise_std must be provided for each observation")

    if type(noise_std) != np.ndarray:
        noise_std = np.ones((obs.shape[-1], 1), dtype=float)*noise_std

    if active_indices is None:
        # avoid copy if possible
        # using special indexing with array e.g array[:, I] where I is an array
        # makes a copy which is slow
        tmp = 1/(2*noise_std[:, 0]**2)
        llike = 0.5*np.sum(np.log(tmp/np.pi))
        llike += np.sum(-(obs-pred_obs)**2*tmp, axis=-1)
    else:
        tmp = 1/(2*noise_std[active_indices, 0]**2)
        llike = 0.5*np.sum(np.log(tmp/np.pi))
        llike += np.sum(
            -(obs[..., active_indices]-pred_obs[..., active_indices])**2*tmp,
            axis=-1)
    if llike.ndim == 1:
        llike = llike[:, None]
    return llike


@njit(cache=True)
def sq_dists_numba_3d(XX, YY, a, b, active_indices):
    """
    Compute the scaled l2-norm distance between two sets of samples
    E.g. for one point


    a[ii]*(XX[ii]-YY[ii])+b


    Parameters
    ----------
    XX : np.ndarray (LL, 1, NN)
        The first set of samples

    YY : np.ndarray (LL, MM, NN)
        The second set of samples. The 3D arrays are useful when computing
        squared distances for multiple sets of samples

    a : float or np.ndarray (NN, 1)
        scalar multiplying l2 distance

    b : float
        scalar added to l2 distance

    Returns
    -------
    ss : np.ndarray (LL, MM)
        The scaled distances
    """
    Yshape = YY.shape
    ss = np.empty(Yshape[:2])
    nactive_indices = active_indices.shape[0]
    for ii in range(Yshape[0]):
        for jj in range(Yshape[1]):
            ss[ii, jj] = 0.0
            for kk in range(nactive_indices):
                ss[ii, jj] += a[active_indices[kk]]*(
                    XX[ii, active_indices[kk]] -
                    YY[ii, jj, active_indices[kk]])**2
            ss[ii, jj] = ss[ii, jj]+b
    return ss


def sq_dists_3d(XX, YY, a=1, b=0, active_indices=None):
    assert XX.shape[0] == YY.shape[0]
    assert a.shape[0] == XX.shape[1]
    assert a.shape[0] == YY.shape[2]
    if active_indices is None:
        active_indices = np.arange(YY.shape[2])
    if np.isscalar(a):
        a = np.ones(active_indices.shape[0])

    try:
        from pyapprox.cython.utilities import sq_dists_3d_pyx
        return sq_dists_3d_pyx(XX, YY, active_indices, a, b)
    except(ImportError, ModuleNotFoundError) as e:
        msg = 'sq_dists_3d extension failed'
        trace_error_with_msg(msg, e)

    return sq_dists_numba_3d(XX, YY, a, b, active_indices)


def gaussian_loglike_fun_economial_3D(
        obs, pred_obs, noise_std, active_indices=None):
    if pred_obs.ndim != 3:
        raise ValueError("pred_obs must be 3D")
    # cdist has a lot of overhead and cannot be used with active_indices
    # sq_dists = sq_dists_cdist_3d(obs, pred_obs)

    if type(noise_std) != np.ndarray:
        noise_std = np.ones((pred_obs.shape[-1], 1), dtype=float)*noise_std

    tmp1 = -1/(2*noise_std[:, 0]**2)
    if active_indices is None:
        tmp2 = 0.5*np.sum(np.log(-tmp1/np.pi))
    else:
        tmp2 = 0.5*np.sum(np.log(-tmp1[active_indices]/np.pi))
    llike = sq_dists_3d(obs, pred_obs, tmp1, tmp2, active_indices)
    if llike.ndim == 1:
        llike = llike[:, None]
    return llike


@njit(cache=True)
def sq_dists_numba_3d_XX_prereduced(XX, YY, a, b, active_indices):
    """
    Compute the scaled l2-norm distance between two sets of samples
    E.g. for one point


    a[ii]*(XX[ii]-YY[ii])+b


    Parameters
    ----------
    XX : np.ndarray (LL, 1, NN)
        The first set of samples

    YY : np.ndarray (LL, MM, NN)
        The second set of samples. The 3D arrays are useful when computing
        squared distances for multiple sets of samples

    a : float or np.ndarray (NN, 1)
        scalar multiplying l2 distance

    b : float
        scalar added to l2 distance

    Returns
    -------
    ss : np.ndarray (LL, MM)
        The scaled distances
    """
    Yshape = YY.shape
    ss = np.empty(Yshape[:2])
    nactive_indices = active_indices.shape[0]
    for ii in range(Yshape[0]):
        for jj in range(Yshape[1]):
            ss[ii, jj] = 0.0
            for kk in range(nactive_indices):
                ss[ii, jj] += a[active_indices[kk]]*(
                    XX[ii, kk] - YY[ii, jj, active_indices[kk]])**2
            ss[ii, jj] = ss[ii, jj]+b
    return ss


def sq_dists_3d_prereduced(XX, YY, a=1, b=0, active_indices=None):
    assert XX.shape[0] == YY.shape[0]
    assert a.shape[0] == XX.shape[1]
    assert a.shape[0] == YY.shape[2]
    if active_indices is None:
        active_indices = np.arange(YY.shape[2])
    if np.isscalar(a):
        a = np.ones(active_indices.shape[0])

    try:
        from pyapprox.cython.utilities import sq_dists_3d_prereduced_pyx
        return sq_dists_3d_prereduced_pyx(XX, YY, active_indices, a, b)
    except(ImportError, ModuleNotFoundError) as e:
        msg = 'sq_dists_3d_prereduced extension failed'
        trace_error_with_msg(msg, e)

    return sq_dists_numba_3d_XX_prereduced(XX, YY, a, b, active_indices)


def gaussian_loglike_fun_3d_prereduced(
        obs, pred_obs, noise_std, active_indices):
    if type(noise_std) != np.ndarray:
        noise_std = np.ones((pred_obs.shape[-1], 1), dtype=float)*noise_std

    tmp1 = -1/(2*noise_std[:, 0]**2)
    tmp2 = 0.5*np.sum(np.log(-tmp1[active_indices]/np.pi))
    llike = sq_dists_numba_3d_XX_prereduced(
        obs, pred_obs, tmp1, tmp2, active_indices)
    if llike.ndim == 1:
        llike = llike[:, None]
    return llike


def compute_weighted_sqeuclidian_distance(obs, pred_obs, noise_std,
                                          active_indices):
    if obs.ndim != 2 or pred_obs.ndim != 2:
        raise ValueError("obs and pred_obs must be 2D arrays")
    if type(noise_std) != np.ndarray or noise_std.ndim != 2:
        msg = "noise_std must be a 2d np.ndarray with one column"
        raise ValueError(msg)

    if active_indices is None:
        weights = 1/(np.sqrt(2)*noise_std[:, 0])
        # avoid copy is possible
        # using special indexing with array makes a copy which is slow
        weighted_obs = obs*weights
        weighted_pred_obs = pred_obs*weights
        sq_dists = cdist(weighted_obs, weighted_pred_obs, "sqeuclidean")
        return sq_dists

    weights = 1/(np.sqrt(2)*noise_std[active_indices, 0])
    weighted_obs = obs[:, active_indices]*weights
    weighted_pred_obs = pred_obs[:, active_indices]*weights
    sq_dists = cdist(weighted_obs, weighted_pred_obs, "sqeuclidean")
    return sq_dists


def gaussian_loglike_fun_economial_2D(
        obs, pred_obs, noise_std, active_indices=None):
    if type(noise_std) != np.ndarray:
        noise_std = np.ones((obs.shape[-1], 1), dtype=float)*noise_std
    sq_dists = compute_weighted_sqeuclidian_distance(
        obs, pred_obs, noise_std, active_indices)
    if active_indices is None:
        llike = (0.5*np.sum(np.log(1/(2*np.pi*noise_std[:, 0]**2))) -
                 sq_dists[0, :])
    else:
        llike = (0.5*np.sum(
            np.log(1/(2*np.pi*noise_std[active_indices, 0]**2))) -
                 sq_dists[0, :])

    if llike.ndim == 1:
        llike = llike[:, None]
    return llike


def gaussian_loglike_fun(obs, pred_obs, noise_std, active_indices=None):
    assert pred_obs.shape[-1] == obs.shape[-1]
    if pred_obs.ndim == 3 and obs.ndim == 2 and obs.shape[0] != 1:
        return gaussian_loglike_fun_economial_3D(
            obs, pred_obs, noise_std, active_indices)
    elif obs.ndim == 2 and pred_obs.ndim == 2 and obs.shape != pred_obs.shape:
        return gaussian_loglike_fun_economial_2D(
            obs, pred_obs, noise_std, active_indices)
    else:
        return gaussian_loglike_fun_broadcast(
            obs, pred_obs, noise_std, active_indices)


@njit(cache=True)
def _evidences(inner_log_likelihood_vals, in_weights):
    MM, NN = inner_log_likelihood_vals.shape
    evidences = np.empty((MM, 1))
    for mm in range(MM):
        evidences[mm, 0] = 0.0
        for nn in range(NN):
            evidences[mm, 0] += math.exp(
                inner_log_likelihood_vals[mm, nn])*in_weights[mm, nn]
    return evidences


def _loglike_fun_from_noiseless_obs(
        noiseless_obs, pred_obs, noise_realizations, noise_std,
        active_indices):
    nactive_indices = active_indices.shape[0]
    obs = noiseless_obs[:, active_indices].copy()
    obs += noise_realizations[:, :nactive_indices]
    if pred_obs.ndim == 3:
        return gaussian_loglike_fun_3d_prereduced(
            obs, pred_obs, noise_std, active_indices)
    return gaussian_loglike_fun(
        obs, pred_obs[:, active_indices], noise_std[active_indices])


@njit(cache=True)
def _compute_evidences_repeated_in_samples(
        out_obs, in_pred_obs, in_weights, active_indices, noise_std):
    nout_samples = out_obs.shape[0]
    nin_samples = in_pred_obs.shape[0]
    nactive_indices = active_indices.shape[0]
    const1 = -1/(2*noise_std[:, 0]**2)
    evidences = np.empty((nout_samples, 1))
    const2 = 0.5*np.sum(np.log(-const1[active_indices]/np.pi))
    for ii in range(nout_samples):
        evidences[ii, 0] = 0.0
        for jj in range(nin_samples):
            loglike_val = const2
            for kk in range(nactive_indices):
                loglike_val += const1[active_indices[kk]]*(
                    out_obs[ii, kk] -
                    in_pred_obs[jj, active_indices[kk]])**2
            evidences[ii, 0] += math.exp(loglike_val)*in_weights[jj, 0]
    return evidences


def _compute_evidences(
        out_pred_obs, in_pred_obs, in_weights,
        out_weights, active_indices, noise_samples, noise_std):
    # warning outerloop_pred_obs has already been reduced down to
    # the active indices but in_pred_obs has not. The cost of
    # copying (from special indexing) is too expensive for the
    # later (and larger) array.

    outer_log_likelihood_vals = _loglike_fun_from_noiseless_obs(
        out_pred_obs, out_pred_obs, noise_samples,
        noise_std, active_indices)

    out_obs = out_pred_obs[:, active_indices].copy()
    out_obs += noise_samples[:, :active_indices.shape[0]]
    evidences = _compute_evidences_repeated_in_samples(
        out_obs, in_pred_obs, in_weights, active_indices, noise_std)

    return outer_log_likelihood_vals, evidences


def _compute_expected_kl_utility_monte_carlo(
        out_pred_obs, in_pred_obs, in_weights, out_weights,
        active_indices, noise_samples, noise_std, data_risk_fun, return_all):
    r"""
    Compute the expected Kullback–Leibler (KL) divergence.

    Parameters
    ----------
    log_likelihood_fun : callable
        Function with signature

        `log_likelihood_fun(obs, pred_obs) -> np.ndarray (nsamples, 1)`

        That returns the log likelihood for a set of observations and
        predictions.
        obs : np.ndarray(nsamples, nobs+nnew_obs)
        pred_obs : np.ndarray(nsamples, nobs+nnew_obs)

    out_pred_obs : np.ndarray (nout_samples, ncandidates)
        The noiseless values of out_obs with noise removed

    in_pred_obs : np.ndarray (nout_samples*nin_samples, ncandidates)
        The noiseless values of obs_fun at all sets of innerloop samples
        used for each outerloop iteration. The values are stacked such
        that np.vstack((inner_loop1_vals, inner_loop2_vals, ...))

    in_weights  : np.ndarray (nout_samples, nin_samples)
        The quadrature weights associated with each in_pred_obs set
        used to compute the inner integral.

    out_weights  : np.ndarray (nout_samples, 1)
        The quadrature weights associated with each out_pred_obs set
        used to compute the outer integral.

    collected_design_indices : np.ndarray (nobs)
        The indices into the qoi vector associated with the
        collected observations

    new_design_indices : np.ndarray (nnew_obs)
        The indices into the qoi vector associated with new design locations
        under consideration

    Returns
    -------
    utility : float
        The expected utility
    """

    outer_log_likelihood_vals, evidences = _compute_evidences(
         out_pred_obs, in_pred_obs, in_weights, out_weights,
         active_indices, noise_samples, noise_std)

    divergences = outer_log_likelihood_vals - np.log(evidences)
    utility_val = data_risk_fun(divergences, out_weights)
    # utility_val = np.sum((outer_log_likelihood_vals - np.log(evidences)) *
    #                      out_weights)
    if not return_all:
        return {"utility_val": utility_val}
    result = {"utility_val": utility_val, "evidences": evidences,
              "divergences": divergences}
    return result


def _precompute_out_quadrature_rule(
        nvars, generate_prior_noise_samples, nout_samples):
    r"""
    Parameters
    ----------
    generate_outer_prior_samples : callable
        Function with the signature

        `generate_outer_prior_samples(nsamples) -> np.ndarray(nvars, nsamples)`

        That returns a set of random samples randomly drawn from the prior
        These samples are used to draw condition samples of the obsevations
        which are used to take an expectation with respect to the data space
        in the outer loop

    nout_samples : integer
        The number of Monte Carlo samples used to compute the outer integral
        over all possible observations

    Returns
    -------
    out_quad_data : tuple(outerquad_x, outerquad_w)
        Tuple containing the points and weights of the outer quadrature rule
        with respect to the joint density of the prior and noise.
        outerquad_x is np.ndarray(nprior_vars, nquad)
        outerquad_w is np.ndarray(nquad, 1)

    out_noise_samples : np.ndarray (max_ncollected_obs, nquad)
    """
    out_samples, out_weights = \
        generate_prior_noise_samples(nout_samples)
    out_prior_samples = out_samples[:nvars]
    out_noise_samples = out_samples[nvars:]
    out_prior_quad_data = (
        out_prior_samples, out_weights)
    return (out_prior_quad_data, out_noise_samples)


def _precompute_expected_kl_utility_data(out_quad_data, in_quad_data, obs_fun):
    r"""
    Parameters
    ----------
    out_quad_data : tuple(outerquad_x, outerquad_w)
        Tuple containing the points and weights of the outer quadrature rule
        with respect to the prior used for outerloop calculations.
        outerquad_x is np.ndarray(nprior_vars, nquad)
        outerquad_w is np.ndarray(nquad, 1)

    in_quad_data : tuple(outerquad_x, outerquad_w)
        Tuple containing the points and weights of the outer quadrature rule
        with respect to the joint prior used for innerloop calculations.
        outerquad_x is np.ndarray(nprior_vars, nquad)
        outerquad_w is np.ndarray(nquad, 1)

    obs_fun : callable
        Function with the signature

        `obs_fun(samples) -> np.ndarray(nsamples, nqoi)`

        That returns noiseless evaluations of the forward model.

    Returns
    -------
    out_pred_obs : np.ndarray (nout_samples, ncandidates)
        The noiseless values of out_obs with noise removed

    in_pred_obs : np.ndarray (nout_samples*nin_samples, ncandidates)
        The noiseless values of obs_fun at all sets of innerloop samples
        used for each outerloop iteration. The values are stacked such
        that np.vstack((inner_loop1_vals, inner_loop2_vals, ...))
    """
    out_prior_samples, out_weights = out_quad_data
    assert out_weights.ndim == 2
    print(f"Running {out_prior_samples.shape[1]} outer model evaluations")
    t0 = time.time()
    out_pred_obs = obs_fun(out_prior_samples)
    print("Predicted observation bounds",
          out_pred_obs.max(), out_pred_obs.min())
    print("Evaluations took", time.time()-t0)

    if out_pred_obs.shape[0] != out_prior_samples.shape[1]:
        msg = "obs_fun is not returning an array with the correct shape."
        msg += f" nrows is {out_pred_obs.shape[0]}. Should be "
        msg += f"{out_prior_samples.shape[1]}"
        raise ValueError(msg)

    print(f"Running {in_quad_data[0].shape[1]} inner model evaluations")
    t0 = time.time()
    in_pred_obs = obs_fun(in_quad_data[0])
    print("Evaluations took", time.time()-t0)

    return out_pred_obs, in_pred_obs


def _precompute_expected_deviation_data(
        out_quad_data, in_quad_data, obs_fun, qoi_fun):
    out_pred_obs, in_pred_obs = _precompute_expected_kl_utility_data(
        out_quad_data, in_quad_data, obs_fun)
    in_samples = in_quad_data[0]
    print(f"Running {in_samples.shape[1]} qoi model evaluations")
    t0 = time.time()
    in_pred_qois = qoi_fun(in_samples)
    print("Evaluations took", time.time()-t0)
    if in_pred_qois.shape[0] != in_samples.shape[1]:
        msg = "qoi_fun is not returning an array with the correct shape"
        raise ValueError(msg)
    return out_pred_obs, in_pred_obs, in_pred_qois


def _evidences_and_weights(inner_log_likelihood_vals, in_weights):
    inner_likelihood_vals = np.exp(inner_log_likelihood_vals)
    evidences = np.einsum(
        "ij,ij->i", inner_likelihood_vals, in_weights)[:, None]
    weights = inner_likelihood_vals*in_weights/evidences
    return evidences, weights


# @njit(cache=True)
# def _evidences_and_weights(inner_log_likelihood_vals, in_weights):
#     MM, NN = inner_log_likelihood_vals.shape
#     evidences = np.empty((MM, 1))
#     weights = in_weights.copy()
#     for mm in range(MM):
#         evidences[mm, 0] = 0.0
#         for nn in range(NN):
#             likelihood_val = math.exp(inner_log_likelihood_vals[mm, nn])
#             weights[mm, nn] *= likelihood_val
#             evidences[mm, 0] += weights[mm, nn]
#     return evidences, weights/evidences


def _compute_negative_expected_deviation_monte_carlo(
        out_pred_obs, in_pred_obs, in_weights, out_weights,
        in_pred_qois, deviation_fun, pred_risk_fun, data_risk_fun,
        noise_samples, noise_std, active_indices, return_all):
    nout_samples = out_pred_obs.shape[0]

    out_obs = out_pred_obs[:, active_indices].copy()
    # print(out_obs.shape, active_indices.shape, noise_samples.shape)
    out_obs += noise_samples[:, :active_indices.shape[0]]
    deviations, evidences = deviation_fun(
        out_obs, in_pred_obs, in_weights, active_indices, noise_std,
        in_pred_qois)
    # deviation.shape = [nout_quad, nprediction_candidates]
    # evidences.shape = [nout_quad, 1]

    # expectation taken with respect to observations
    # assume always want deviation here, but this can be changed
    # expected_obs_deviations = np.sum(deviations*out_weights, axis=0)
    # use einsum because it does not create intermediate arrays
    # expected_obs_deviations = np.einsum(
    #    "ij,i->j", deviations, out_weights[:, 0])
    expected_obs_deviations = data_risk_fun(deviations, out_weights)

    disutility_val = pred_risk_fun(expected_obs_deviations)

    utility_val = -disutility_val
    if not return_all:
        return {'utility_val': utility_val}
    result = {
        'utility_val': utility_val, 'evidences': evidences,
        'deviations': deviations,
        'expected_deviations': expected_obs_deviations}
    return result


def update_observations(design_candidates, collected_design_indices,
                        new_design_indices, obs_process,
                        collected_obs):
    """
    Updated the real collected data with obsevations at the new selected
    candidate locations.

    Parameters
    ---------
    design_candidates : np.ndarray (nvars, nsamples)
        The location of all design sample candidates

    collected_design_indices : np.ndarray (nobs)
        The indices into the qoi vector associated with the
        collected observations

    new_design_indices : np.ndarray (nnew_design_samples)
        The indices into the design_candidates array of the new selected
        design samples

    obs_process : callable
        The true data generation model with the signature

        `obs_process(design_indices) -> np.ndarray (1, ndesign_indices)`

        where design_samples is np.ndarary (nvars, ndesign_indices)

    collected_obs : np.ndarray (1, nobs)
        The observations at the previously selected design samples

    Returns
    -------
    updated_collected_obs : np.ndarray (1, nobs+nnew_design_samples)
        The updated collected observations with the new observations
        appended to the previous observations

    updated_collected_design_indices : np.ndarray (1, nobs+nnew_design_samples)
        The updated indices associated with all the collected observations
    """
    new_obs = obs_process(new_design_indices)

    if collected_obs is None:
        return new_obs, new_design_indices

    updated_collected_obs = np.hstack((collected_obs, new_obs))
    updated_collected_design_indices = np.hstack(
        (collected_design_indices, new_design_indices)).astype(int)
    return updated_collected_obs, updated_collected_design_indices


def d_optimal_utility(Amat, noise_std):
    """
    Compute the d-optimality criterion for a linear model f(x) = Amat.dot(x)

    Assume R = sigma^2 I

    Posterior covaiance
    sigma^2 inv(A^TA+R)

    References
    ----------
    Alen Alexanderian and Arvind K. Saibaba
    Efficient D-Optimal Design of Experiments for
    Infinite-Dimensional Bayesian Linear Inverse Problems
    SIAM Journal on Scientific Computing 2018 40:5, A2956-A2985
    https://doi.org/10.1137/17M115712X
    Theorem 1
    """
    nvars = Amat.shape[1]
    hess_misfit = Amat.T.dot(Amat)/noise_std**2
    ident = np.eye(nvars)
    return 0.5*np.linalg.slogdet(hess_misfit+ident)[1]


def _define_design_data(collected_design_indices,
                        new_design_indices, ndesign_candidates,
                        ndata_per_candidate, noise_std):
    # unlike open loop design (closed loop batch design)
    # we do not update inner and outer loop weights but rather
    # just compute likelihood for all collected and new design indices
    # If want to update weights then we must have a different set of
    # weights for each inner iteration of the inner loop that is
    # computed using
    # the associated outerloop data
    if collected_design_indices is not None:
        design_indices = np.hstack(
            (collected_design_indices, new_design_indices))
    else:
        # assume the observations at the collected_design_indices are
        # already incorporated into the inner and outer loop weights
        design_indices = np.asarray(new_design_indices)

    active_indices = np.hstack([idx*ndata_per_candidate + np.arange(
        ndata_per_candidate) for idx in design_indices])
    # assume same noise_std for each data point associated with a design
    # candidate. Consider adding ndata_per_candidate to self.__init__
    # and defining noise_std correct length there.
    noise_std = np.hstack(
        [[noise_std[idx, 0]]*ndata_per_candidate
         for idx in range(ndesign_candidates)])[:, None]
    return active_indices, noise_std


def oed_data_expectation(deviations, weights):
    """
    Compute the expected deviation for each outer loop sample

    Parameters
    ----------
    deviations : np.ndarray (nout_samples, nqois)
         The samples

    weights : np.ndarray (nout_samples, 1)
        Weights associated with each inner loop sample

    Returns
    -------
    expected_obs_deviations : np.ndarray (nqois, 1)
        The deviation vals
    """
    expected_obs_deviations = np.einsum(
        "ij,i->j", deviations, weights[:, 0])[:, None]
    return expected_obs_deviations


[docs]class AbstractBayesianOED(ABC): r"""Base Bayesian OED class""" def __init__(self, ndesign_candidates, obs_fun, noise_std, prior_variable, out_quad_opts, in_quad_opts, nprocs=1, max_ncollected_obs=2, ndata_per_candidate=1, data_risk_fun=oed_data_expectation): """ Constructor. Parameters ---------- ndesign_candidates : The number of design candidates obs_fun : callable Function with the signature `obs_fun(samples) -> np.ndarray(nsamples, ndesign_candidates*ndata_per_candidate)` That returns noiseless evaluations of the forward model. noise_std : float or np.ndarray (nobs, 1) The standard deviation of the mean zero Gaussian noise added to each observation prior_variable : pya.IndependentMarginalsVariable The prior variable consisting of independent univariate random variables generate_inner_prior_samples : callable Function with the signature `generate_inner_prior_samples(nsamples) -> np.ndarray( nvars, nsamples), np.ndarray(nsamples, 1)` Generate samples and associated weights used to evaluate the evidence computed by the inner loop If None then the function generate_outer_prior_samples is used and weights are assumed to be 1/nsamples. This function is useful if wanting to use multivariate quadrature to evaluate the evidence nin_samples : integer The number of quadrature samples used for the inner integral that computes the evidence for each realiaztion of the predicted observations nout_samples : integer The number of Monte Carlo samples used to compute the outer integral over all possible observations pre_collected_design_indices : np.ndarray (nobs) The indices into the qoi vector associated with the collected observations econ : boolean Make all inner loop samples the same for all outer loop samples. This reduces number of evaluations of prediction model. Currently this common data is copied and repeated for each outer loop sample so the rest of the code can remain the same. Eventually the data has to be tiled anyway when computing exepcted utility so this is not a big deal. nprocs : integer The number of threads used to compute OED design. Warning: this uses multiprocessing.Pool and seems to provide very little benefit and in many cases increases the CPU time. max_ncollected_obs : integer The maximum number of observations that will be collected. outer_quad_type : string The type of quadrature used for the outerloop. Choose from ["mc", :"qmc:, "gauss"] """ self.ndesign_candidates = ndesign_candidates self.ndata_per_candidate = ndata_per_candidate if not callable(obs_fun): raise ValueError("obs_fun must be a callable function") self.obs_fun = obs_fun self.noise_std = noise_std if isinstance(self.noise_std, (float, int)): self.noise_std = np.full( (self.ndesign_candidates, 1), self.noise_std) if (self.noise_std.shape[0] != self.ndesign_candidates): msg = "noise_std must be scalar or given for each design candidate" raise ValueError(msg) # for now assume homoscedastic noise. TODO. currently # noise_variable_marginals is used to create quadrature rule # used for all design candidates. IF want to allow for heteroscedastic # noise then must scale this quadrule for each design location # when computing noisy data in utility functions # Also assume same noise applied to each data point associated # with a design candidate. See compute_expected_utility assert np.allclose(self.noise_std, self.noise_std[0]) self.prior_variable = prior_variable self.max_ncollected_obs = max_ncollected_obs noise_variable_marginals = [ stats.norm(0, self.noise_std[0, 0])]*self.max_ncollected_obs self.joint_prior_noise_variable = IndependentMarginalsVariable( prior_variable.marginals()+noise_variable_marginals) (self.out_quad_data, self.noise_samples, self.in_quad_data, self.econ) = self._get_quad_rules(out_quad_opts, in_quad_opts) self.nout_samples = self.out_quad_data[0].shape[1] self.nin_samples = self.in_quad_data[0].shape[1] self.out_prior_samples, self.out_weights = self.out_quad_data self.in_samples, self.in_weights = self.in_quad_data self.collected_design_indices = None self.out_pred_obs = None self.in_pred_obs = None self.nprocs = self._set_nprocs(nprocs) self.data_risk_fun = data_risk_fun def _get_quad_rules(self, out_quad_opts, in_quad_opts): out_quad_data = integrate( out_quad_opts["method"], self.joint_prior_noise_variable, *out_quad_opts.get("args", []), **out_quad_opts.get("kwargs", [])) out_prior_samples = out_quad_data[0][:self.prior_variable.num_vars()] out_noise_samples = out_quad_data[0][self.prior_variable.num_vars():].T out_quad_data = ( out_prior_samples, out_quad_data[1]) in_quad_data = integrate( in_quad_opts["method"], self.prior_variable, *in_quad_opts.get("args", []), **in_quad_opts.get("kwargs", [])) econ = True return out_quad_data, out_noise_samples, in_quad_data, econ def _set_nprocs(self, nprocs): if (nprocs > 1 and ( 'OMP_NUM_THREADS' not in os.environ or not int(os.environ['OMP_NUM_THREADS']) == 1)): msg = 'User set assert_omp=True but OMP_NUM_THREADS has not been ' msg += 'set to 1. Run script with ' msg += 'OMP_NUM_THREADS=1 python script.py' raise Exception(msg) return nprocs
[docs] @abstractmethod def populate(self): raise NotImplementedError()
[docs] def update_design(self, return_all=False, nnew=1): if not hasattr(self, "out_pred_obs"): raise ValueError("Must call self.populate before creating designs") if self.collected_design_indices is None: self.collected_design_indices = np.zeros((0), dtype=int) if self.collected_design_indices.shape[0]+nnew > self.max_ncollected_obs: msg = "To many new design points requested. Decrease nnew and/or " msg += "increase self.max_ncollected_obs" raise ValueError(msg) utility_vals, selected_indices, results = self.select_design( self.collected_design_indices, nnew, return_all) self.collected_design_indices = np.hstack( (self.collected_design_indices, selected_indices)).astype(int) if return_all is False: return utility_vals, selected_indices, None return utility_vals, selected_indices, results
[docs] def set_collected_design_indices(self, indices): self.collected_design_indices = indices.copy()
[docs] @abstractmethod def compute_expected_utility(self, collected_design_indices, new_design_indices, return_all=False): raise NotImplementedError()
def _compute_utilities_parallel_shared( self, worker_fun, init_worker, _get_initargs, indices, collected_design_indices, return_all): nindices = indices.shape[0] splits = split_indices(nindices, self.nprocs) args = [(indices[splits[ii]:splits[ii+1]], collected_design_indices, return_all, self.ndesign_candidates, self.ndata_per_candidate) for ii in range(splits.shape[0]-1)] # t0 = time.time() with Pool(processes=self.nprocs, initializer=init_worker, initargs=_get_initargs(self)) as pool: # print("pool startup", time.time()-t0) result = pool.map(worker_fun, args) utility_vals = np.hstack([r[0] for r in result]) results = [] for r in result: results += r[1] return utility_vals, results # does not work due to ussues with pickling # def _compute_utilities_parallel(self, ncandidates, # collected_design_indices, return_all): # splits = split_indices(self.ndesign_candidates, self.nprocs) # args = [(splits[ii], splits[ii+1]) for ii in range(splits.shape[0]-1)] # t0 = time.time() # with Pool(processes=self.nprocs) as pool: # print("pool startup", time.time()-t0) # result = pool.map( # partial(self._compute_utilities_serial, # self.compute_expected_utility, # collected_design_indices, # return_all), args) # utility_vals = np.hstack([r[0] for r in result]) # results = [] # for r in result: # results += r[1] # return utility_vals, results def _compute_utilities_serial(self, compute_expected_utility, collected_design_indices, return_all, indices): import time t0 = time.time() ncandidates = indices.shape[0] utility_vals = -np.ones(ncandidates)*np.inf results = [None for ii in range(ncandidates)] ii = 0 for idx in indices: results[ii] = compute_expected_utility( collected_design_indices, np.asarray(idx, dtype=int), return_all=return_all) utility_vals[ii] = results[ii]["utility_val"] ii += 1 print("Computing utilities in serial took", time.time()-t0) return utility_vals, results
[docs] def compute_utilities(self, ncandidates, collected_design_indices, new_indices, return_all): if self.nprocs == 1: return self._compute_utilities_serial( self.compute_expected_utility, collected_design_indices, return_all, new_indices) return self._compute_utilities_parallel_shared( kl_worker_fun, init_kl_worker, _get_kl_compute_utilities_initargs, new_indices, collected_design_indices, return_all)
# return self._compute_utilities_parallel( # ncandidates, collected_design_indices, return_all)
[docs] def select_design(self, collected_design_indices, nnew, return_all): """ Update an experimental design. Parameters ---------- collected_design_indices : np.ndarray (nobs) The indices into the qoi vector associated with the collected observations Returns ------- utility_vals : np.ndarray (ncandidates) The utility vals at the candidate design samples. If the candidate sample is already in collected design then the utility value will be set to -np.inf selected_index : integer The index of the best design, i.e. the largest utility results : dict Dictionary of useful data used to compute expected utility At a minimum it has the keys ["utilties", "evidences", "weights"] """ new_indices = np.asarray(list(itertools.combinations_with_replacement( np.arange(self.ndesign_candidates), nnew))) utility_vals, results = self.compute_utilities( self.ndesign_candidates, collected_design_indices, new_indices, return_all) selected_index = new_indices[np.argmax(np.round(utility_vals, 16))] if not return_all: results = None return utility_vals, selected_index, results
def _define_design_data(self, collected_design_indices, new_design_indices): return _define_design_data(collected_design_indices, new_design_indices, self.ndesign_candidates, self.ndata_per_candidate, self.noise_std)
class OEDSharedData(): def __init__(self): self.attr_names = [ "in_weights", "out_weights", "in_pred_obs", "out_pred_obs", "noise_samples", "noise_std"] def set_data(self, data): assert len(data) == 2*len(self.attr_names) for ii in range(len(self.attr_names)): setattr(self, self.attr_names[ii], data[2*ii]) setattr(self, self.attr_names[ii]+"_shape", data[2*ii+1]) def clear(self): self.set_data([None]*len(self.attr_names)) self.in_pred_qois = None self.set_funs(None, None, None) def set_funs(self, deviation_fun, pred_risk_fun, data_risk_fun): self.deviation_fun = deviation_fun self.pred_risk_fun = pred_risk_fun self.data_risk_fun = data_risk_fun def set_in_pred_qois(self, in_pred_qois, shape): self.in_pred_qois = in_pred_qois self.in_pred_qois_shape = shape global_oed_shared_data = OEDSharedData() def _create_global_array_from_name(obj, name): array = getattr(obj, name) X = RawArray('d', int(np.prod(array.shape))) # X[:] = array.ravel() very slow X_np = np.frombuffer(X).reshape(array.shape) np.copyto(X_np, array) return X, array.shape import time def _get_kl_compute_utilities_initargs(obj): # t0 = time.time() initargs = [] for name in global_oed_shared_data.attr_names: X, shape = _create_global_array_from_name(obj, name) initargs += [X, shape] initargs = (initargs, obj.data_risk_fun) # print("data time", time.time()-t0) return (initargs, ) def init_kl_worker(args): data, data_risk_fun = args global_oed_shared_data.set_data(data) funs = [None, None, data_risk_fun] global_oed_shared_data.set_funs(*funs) def _get_deviation_compute_utilities_initargs(obj): # t0 = time.time() initargs = _get_kl_compute_utilities_initargs(obj)[0][0] X, shape = _create_global_array_from_name(obj, "in_pred_qois") initargs = (initargs, [X, shape], [obj.deviation_fun, obj.pred_risk_fun, obj.data_risk_fun]) # print("data time", time.time()-t0) return (initargs, ) def init_deviation_worker(initargs): data, in_pred_qois, funs = initargs global_oed_shared_data.set_data(data) global_oed_shared_data.set_in_pred_qois(*in_pred_qois) global_oed_shared_data.set_funs(*funs) def from_buffer(name): return np.frombuffer(getattr(global_oed_shared_data, name)).reshape( getattr(global_oed_shared_data, name+"_shape")) def kl_worker_fun(arg): t0 = time.time() in_weights = from_buffer("in_weights") out_weights = from_buffer("out_weights") in_pred_obs = from_buffer("in_pred_obs") out_pred_obs = from_buffer("out_pred_obs") noise_samples = from_buffer("noise_samples") noise_std = from_buffer("noise_std") indices = arg[0] collected_design_indices, return_all = arg[1], arg[2] ndesign_candidates, ndata_per_candidate = arg[3], arg[4] data_risk_fun = global_oed_shared_data.data_risk_fun nindices = indices.shape[0] t0 = time.time() results = [None for ii in range(nindices)] utility_vals = np.full(nindices, -np.inf) ii = 0 noise_std = np.hstack( [[noise_std[idx, 0]]*ndata_per_candidate for idx in range(ndesign_candidates)])[:, None] for new_design_indices in indices: if collected_design_indices is not None: design_indices = np.hstack( (collected_design_indices, new_design_indices)) else: # assume the observations at the collected_design_indices # are already incorporated into the inner and outer loop weights design_indices = np.asarray(new_design_indices) active_indices = np.hstack([idx*ndata_per_candidate + np.arange( ndata_per_candidate) for idx in design_indices]) results[ii] = _compute_expected_kl_utility_monte_carlo( out_pred_obs, in_pred_obs, in_weights, out_weights, active_indices, noise_samples, noise_std, data_risk_fun, return_all) utility_vals[ii] = results[ii]["utility_val"] ii += 1 print("Worker Took", time.time()-t0, indices[0], indices[-1]) return utility_vals, results def deviation_worker_fun(arg): t0 = time.time() in_weights = from_buffer("in_weights") out_weights = from_buffer("out_weights") in_pred_obs = from_buffer("in_pred_obs") out_pred_obs = from_buffer("out_pred_obs") noise_samples = from_buffer("noise_samples") noise_std = from_buffer("noise_std") in_pred_qois = from_buffer("in_pred_qois") indices = arg[0] collected_design_indices, return_all = arg[1], arg[2] ndesign_candidates, ndata_per_candidate = arg[3], arg[4] deviation_fun, pred_risk_fun, data_risk_fun = ( global_oed_shared_data.deviation_fun, global_oed_shared_data.pred_risk_fun, global_oed_shared_data.data_risk_fun) nindices = indices.shape[0] t0 = time.time() results = [None for ii in range(nindices)] utility_vals = np.full(nindices, -np.inf) ii = 0 noise_std = np.hstack( [[noise_std[idx, 0]]*ndata_per_candidate for idx in range(ndesign_candidates)])[:, None] for new_design_indices in indices: if collected_design_indices is not None: design_indices = np.hstack( (collected_design_indices, new_design_indices)) else: # assume the observations at the collected_design_indices # are already incorporated into the inner and outer loop weights design_indices = np.asarray(new_design_indices) active_indices = np.hstack([idx*ndata_per_candidate + np.arange( ndata_per_candidate) for idx in design_indices]) results[ii] = _compute_negative_expected_deviation_monte_carlo( out_pred_obs, in_pred_obs, in_weights, out_weights, in_pred_qois, deviation_fun, pred_risk_fun, data_risk_fun, noise_samples, noise_std, active_indices, return_all) utility_vals[ii] = results[ii]["utility_val"] ii += 1 print("Worker Took", time.time()-t0, indices[0], indices[-1]) return utility_vals, results
[docs]class BayesianBatchKLOED(AbstractBayesianOED): r""" Compute open-loop OED my maximizing KL divergence between the prior and posterior. """
[docs] def populate(self): (self.out_pred_obs, self.in_pred_obs) = \ _precompute_expected_kl_utility_data( self.out_quad_data, self.in_quad_data, self.obs_fun) assert (self.out_pred_obs.shape[1] == self.ndesign_candidates*self.ndata_per_candidate)
[docs] def compute_expected_utility(self, collected_design_indices, new_design_indices, return_all=False): """ return_all true used for debugging returns more than just utilities and also returns itermediate data useful for testing """ active_indices, noise_std = self._define_design_data( collected_design_indices, new_design_indices) return _compute_expected_kl_utility_monte_carlo( self.out_pred_obs, self.in_pred_obs, self.in_weights, self.out_weights, active_indices, self.noise_samples, noise_std, self.data_risk_fun, return_all)
def oed_prediction_average(qoi_vals, weights=None): assert qoi_vals.ndim == 2 and qoi_vals.shape[1] == 1 if weights is None: return qoi_vals.mean() assert weights.shape[1] == 1 return np.sum(qoi_vals*weights, axis=0) class OEDQOIDeviation(): def __init__(self, name, *args): self.name = name self._args = args def __call__(self, out_obs, in_pred_obs, in_weights, active_indices, noise_std, in_pred_qois): if self.name == "variance": deviation_fun = _posterior_push_fwd_variance_deviation elif self.name == "std_dev": deviation_fun = _posterior_push_fwd_standard_deviation elif self.name == 'entropic': deviation_fun = _posterior_push_fwd_entropic_deviation elif self.name == 'cvar': deviation_fun = _posterior_push_fwd_cvar_deviation else: msg = f"deviation: {self.name} is not supported" raise NotImplementedError(msg) return _compute_posterior_push_fwd_deviation( out_obs, in_pred_obs, in_weights, active_indices, noise_std, in_pred_qois, deviation_fun, *self._args) @njit(cache=True) def _posterior_push_fwd_variance_deviation(qoi_vals, nin_samples, weights): qoi_mean = np.zeros(qoi_vals.shape[1]) deviations = np.zeros(qoi_vals.shape[1]) for jj in range(nin_samples): qoi_mean += qoi_vals[jj, :]*weights[jj] deviations += qoi_vals[jj, :]**2*weights[jj] deviations -= qoi_mean**2 return deviations @njit(cache=True) def _posterior_push_fwd_standard_deviation(qoi_vals, nin_samples, weights): deviations = np.sqrt(_posterior_push_fwd_variance_deviation( qoi_vals, nin_samples, weights)) return deviations @njit(cache=True) def _posterior_push_fwd_entropic_deviation( qoi_vals, nin_samples, weights): qoi_mean = np.zeros(qoi_vals.shape[1]) deviations = np.zeros(qoi_vals.shape[1]) for jj in range(nin_samples): qoi_mean += qoi_vals[jj, :]*weights[jj] deviations += np.exp(qoi_vals[jj, :])*weights[jj] deviations = np.log(deviations)-qoi_mean return deviations @njit(cache=True) def _posterior_push_fwd_cvar_deviation( qoi_vals, nin_samples, weights, beta): qoi_means = np.zeros(qoi_vals.shape[1]) # qoi_vars = np.zeros(qoi_vals.shape[1]) for jj in range(nin_samples): qoi_means += qoi_vals[jj, :]*weights[jj] # qoi_vars += qoi_vals[jj, :]**2*weights[jj] # qoi_vars -= qoi_means**2 nqoi = qoi_vals.shape[1] weights_expanded = np.empty_like(qoi_vals.T) for kk in range(nqoi): weights_expanded[kk] = weights # qoi_vals.copy() is necessary because numba is updating qoi_vals risks = conditional_value_at_risk_vectorized( qoi_vals.copy().T, beta, weights_expanded, samples_sorted=False) deviations = risks-qoi_means return deviations @njit(cache=True) def _compute_posterior_push_fwd_deviation( out_obs, in_pred_obs, in_weights, active_indices, noise_std, qoi_vals, deviation_fun, *args): nout_samples = out_obs.shape[0] nin_samples = in_pred_obs.shape[0] nactive_indices = active_indices.shape[0] const1 = -1/(2*noise_std[:, 0]**2) evidences = np.empty((nout_samples, 1)) deviations = np.empty((nout_samples, qoi_vals.shape[1])) const2 = 0.5*np.sum(np.log(-const1[active_indices]/np.pi)) weights = np.empty(nin_samples) for ii in range(nout_samples): evidences[ii, 0] = 0.0 for jj in range(nin_samples): loglike_val = const2 for kk in range(nactive_indices): loglike_val += const1[active_indices[kk]]*( out_obs[ii, kk] - in_pred_obs[jj, active_indices[kk]])**2 weights[jj] = math.exp(loglike_val)*in_weights[jj, 0] evidences[ii, 0] += weights[jj] weights /= evidences[ii, 0] deviations[ii, :] = deviation_fun( qoi_vals, nin_samples, weights, *args) return deviations, evidences def oed_variance_deviation(samples, weights): """ Compute the variance deviation for each outer loop sample using the corresponding inner loop samples Parameters ---------- samples : np.ndarray (nout_samples, nin_samples, nqois) The samples weights : np.ndarray (nout_samples, nin_samples) Weights associated with each inner loop sample Returns ------- deviation_vals : np.ndarray (nout_samples, nqois) The deviation vals """ # For large arrays variance_3D_pyx is the same speed as einsum # implementation below try: from pyapprox.cython.utilities import variance_3D_pyx return variance_3D_pyx(samples, weights) except: pass means = np.einsum( "ijk,ij->ik", samples, weights) variances = np.einsum( "ijk,ij->ik", samples**2, weights)-means**2 return variances def oed_entropic_deviation(samples, weights): """ Compute the entropic risk deviation for each outer loop sample using the corresponding inner loop samples Parameters ---------- samples : np.ndarray (nout_samples, nin_samples, nqois) The samples weights : np.ndarray (nout_samples, nin_samples) Weights associated with each inner loop sample Returns ------- deviation_vals : np.ndarray (nout_samples, nqois) The deviation vals """ means = np.einsum( "ijk,ij->ik", samples, weights) risks = np.log(np.einsum( "ijk,ij->ik", np.exp(samples), weights)) return risks-means def oed_data_cvar(deviations, weights, quantile=None): """ Compute the conditional value of risk of the deviations for each outer loop sample Parameters ---------- deviations : np.ndarray (nout_samples, nqois) The samples weights : np.ndarray (nout_samples, 1) Weights associated with each inner loop sample quantile : float The quantile used to compute of the conditional value at risk of the deviations for each outerloop obsevation Returns ------- cvar_obs_deviations : np.ndarray (nqois, 1) The deviation vals """ assert quantile is not None cvar_obs_deviations = np.empty((deviations.shape[1], 1)) for qq in range(deviations.shape[1]): cvar_obs_deviations[qq, 0] = conditional_value_at_risk( deviations[:, qq], quantile, weights[:, 0], False) return cvar_obs_deviations def oed_standard_deviation(samples, weights): """ Compute the standard deviation for each outer loop sample using the corresponding inner loop samples Parameters ---------- samples : np.ndarray (nout_samples, nin_samples, nqois) The samples weights : np.ndarray (nout_samples, nin_samples) Weights associated with each inner loop sample Returns ------- deviation_vals : np.ndarray (nout_samples, nqois) The deviation vals """ variance = oed_variance_deviation(samples, weights) # rouding error can cause slightly negative values variance[variance < 0] = 0 return np.sqrt(variance) def oed_conditional_value_at_risk_deviation(samples, weights, quantile=None, samples_sorted=True): """ Compute the conditional value at risk deviation for each outer loop sample using the corresponding inner loop samples Parameters ---------- samples : np.ndarray (nout_samples, nin_samples, nqois) The samples weights : np.ndarray (nout_samples, nin_samples) Weights associated with each inner loop sample quantile : float The quantile of the conditional value at risk used to compute the deviation Returns ------- deviation_vals : np.ndarray (nout_samples, nqois) The deviation vals """ assert quantile is not None if samples.shape[2] > 1 and samples_sorted: raise ValueError("samples cannot be sorted if nqoi > 1") cvars = np.empty((samples.shape[0], samples.shape[2])) for ii in range(samples.shape[0]): for qq in range(samples.shape[2]): mean = np.sum(samples[ii, :, qq]*weights[ii, :]) cvars[ii, qq] = (conditional_value_at_risk( samples[ii, :, qq], quantile, weights[ii, :], samples_sorted) - mean) return cvars
[docs]class BayesianBatchDeviationOED(AbstractBayesianOED): r""" Compute open-loop OED by minimizing the deviation on the push forward of the posterior through a QoI model. """ def __init__(self, ndesign_candidates, obs_fun, noise_std, prior_variable, out_quad_opts, in_quad_opts, qoi_fun=None, deviation_fun=oed_standard_deviation, pred_risk_fun=oed_prediction_average, data_risk_fun=oed_data_expectation, nprocs=1, max_ncollected_obs=2, ndata_per_candidate=1): r""" Constructor. Parameters ---------- design_candidates : np.ndarray (nvars, nsamples) The location of all design sample candidates obs_fun : callable Function with the signature `obs_fun(samples) -> np.ndarray(nsamples, nqoi)` That returns noiseless evaluations of the forward model. noise_std : float or np.ndarray (nobs, 1) The standard deviation of the mean zero Gaussian noise added to each observation prior_variable : pya.IndependentMarginalsVariable The prior variable consisting of independent univariate random variables qoi_fun : callable Function with the signature `qoi_fun(samples) -> np.ndarray(nsamples, nqoi)` That returns evaluations of the forward model. Observations are assumed to be :math:`f(z)+\epsilon` where :math:`\epsilon` is additive noise nsamples : np.ndarray (nvars, nsamples) generate_inner_prior_samples : callable Function with the signature `generate_inner_prior_samples(nsamples) -> np.ndarray( nvars, nsamples), np.ndarray(nsamples, 1)` Generate samples and associated weights used to evaluate the evidence computed by the inner loop If None then the function generate_outer_prior_samples is used and weights are assumed to be 1/nsamples. This function is useful if wanting to use multivariate quadrature to evaluate the evidence nin_samples : integer The number of quadrature samples used for the inner integral that computes the evidence for each realiaztion of the predicted observations nout_samples : integer The number of Monte Carlo samples used to compute the outer integral over all possible observations pre_collected_design_indices : np.ndarray (nobs) The indices into the qoi vector associated with the collected observations econ : boolean Make all inner loop samples the same for all outer loop samples. This reduces number of evaluations of prediction model. Currently this common data is copied and repeated for each outer loop sample so the rest of the code can remain the same. Eventually the data has to be tiled anyway when computing exepcted utility so this is not a big deal. deviation_fun : callable Function with the signature `deviation_fun(in_pred_qois, weights) -> np.ndarray(nout_samples, nqois)` where in_pred_qois : np.ndarray ( nout_samples, nin_samples, nqois) weights : np.ndarray (nout_samples, nin_samples) nprocs : integer The number of threads used to compute OED design. Warning: this uses multiprocessing.Pool and seems to provide very little benefit and in many cases increases the CPU time. pred_risk_fun : callable Function to compute risk over multiple qoi with the signature `pred_risk_fun(expected_deviations) -> float` where expected_deviations : np.ndarray (nqois, 1) data_risk_fun : callable Function to compute risk of deviations over all outerloop samples `data_risk_fun(deviations) -> np.ndarray (nqois, 1)` where deviations : np.ndarray (nout_samples, nqois) """ super().__init__(ndesign_candidates, obs_fun, noise_std, prior_variable, out_quad_opts, in_quad_opts, nprocs=nprocs, max_ncollected_obs=max_ncollected_obs, ndata_per_candidate=ndata_per_candidate, data_risk_fun=data_risk_fun) # qoi fun deafult is None so that same api can be used for KL based OED # which does not require qoi_fun if not callable(qoi_fun): raise ValueError("qoi_fun must be a callable function") if not callable(deviation_fun): raise ValueError("deviation_fun must be a callable function") self.qoi_fun = qoi_fun self.deviation_fun = deviation_fun self.pred_risk_fun = pred_risk_fun def _populate(self): """ Compute the data needed to initialize the OED algorithm. """ (self.out_pred_obs, self.in_pred_obs, self.in_pred_qois) = _precompute_expected_deviation_data( self.out_quad_data, self.in_quad_data, self.obs_fun, self.qoi_fun) if (self.out_pred_obs.shape[1] != self.ndesign_candidates*self.ndata_per_candidate): msg = "out_pred_obs.shape[1] != " msg += "self.ndesign_candidates*self.ndata_per_candidate. " msg += f"{self.out_pred_obs.shape[1]}" msg += f"!={self.ndesign_candidates}*{self.ndata_per_candidate} " msg += "check ndata_per_candidate.\nEach design candidate" msg += " must have the same number of data returned by obs_fun" raise ValueError(msg) def _sort_qoi(self): # Sort in_pred_qois and use this order to sort # in_samples so that cvar deviation does not have to # constantly sort samples if self.in_pred_qois.shape[2] != 1: raise ValueError("Sorting can only be used for a single QoI") return np.argsort(self.in_pred_qois, axis=1)
[docs] def populate(self): """ Compute the data needed to initialize the OED algorithm. """ self._populate()
# if self.in_pred_qois.shape[1] == 1: # # speeds up calcualtion of avar # self._sort_qoi()
[docs] def compute_expected_utility(self, collected_design_indices, new_design_indices, return_all=False): """ Compute the negative expected deviation in predictions of QoI Parameters ---------- collected_design_indices : np.ndarray (nobs) The indices into the qoi vector associated with the collected observations new_design_indices : np.ndarray (nnew_obs) The indices into the qoi vector associated with new design locations under consideration return_all : boolean False - return the utilities True - used for debugging returns utilities and itermediate data useful for testing Returns ------- utility : float The negative expected deviation """ active_indices, noise_std = self._define_design_data( collected_design_indices, new_design_indices) return _compute_negative_expected_deviation_monte_carlo( self.out_pred_obs, self.in_pred_obs, self.in_weights, self.out_weights, self.in_pred_qois, self.deviation_fun, self.pred_risk_fun, self.data_risk_fun, self.noise_samples, noise_std, active_indices, return_all)
[docs] def compute_utilities(self, ncandidates, collected_design_indices, new_indices, return_all): if self.nprocs == 1: return self._compute_utilities_serial( self.compute_expected_utility, collected_design_indices, return_all, new_indices) return self._compute_utilities_parallel_shared( deviation_worker_fun, init_deviation_worker, _get_deviation_compute_utilities_initargs, new_indices, collected_design_indices, return_all)
[docs]class BayesianSequentialOED(AbstractBayesianOED): r""" Compute sequential optimal experimental designs that collect data and use this to inform the choice of subsequent design locations. """ @abstractmethod def __init__(self, obs_process): if not callable(obs_process): raise ValueError("obs_process must be a callable function") self.obs_process = obs_process self.collected_obs = None self.inner_importance_weights = None self.outer_importance_weights = None self.in_weights_up = None self.out_weights_up = None self.evidence_from_prior = 1 self.evidence = None def _loglike_fun(self, obs, pred_obs, noise_std): return gaussian_loglike_fun(obs, pred_obs, noise_std) def _compute_evidence(self): """ Compute the evidence associated with using the true collected data. Notes ----- This is a private function because calling by user will upset evidence calculation Always just use the first inner loop sample set to compute evidence. To avoid numerical precision problems recompute evidence with all data as opposed to updating evidence just using new data """ # For now only allow one data per design location assert np.allclose( self.out_pred_obs.shape[1]/self.ndesign_candidates, 1.0, atol=1e-14) log_like_vals = self._loglike_fun( self.collected_obs, self.in_pred_obs[:, self.collected_design_indices], self.noise_std[self.collected_design_indices]) # compute evidence moving from initial prior to current posterior evidence_from_prior = np.sum( np.exp(log_like_vals)[:, 0]*self.in_weights[:, 0]) # compute evidence moving from previous posterior to current posterior self.evidence = evidence_from_prior/self.evidence_from_prior self.evidence_from_prior = evidence_from_prior
[docs] def compute_importance_weights(self): """ Compute the importance weights used in the computation of the expected utility that acccount for the fact we want to use the current posterior as the prior in the utility formula. """ self.outer_importance_weights = np.exp(self._loglike_fun( self.collected_obs, self.out_pred_obs[ :, self.collected_design_indices], self.noise_std[self.collected_design_indices]))/( self.evidence_from_prior) nobs = self.collected_design_indices.shape[0] tmp = self.in_pred_obs[:, self.collected_design_indices].reshape( 1, self.nin_samples, nobs) self.inner_importance_weights = (np.exp( self._loglike_fun(self.collected_obs, tmp, self.noise_std[ self.collected_design_indices]))/( self.evidence_from_prior)).T
[docs] def update_observations(self, new_obs): """ Store the newly collected obsevations which will dictate the next design point. Parameters ---------- new_obs : np.ndarray (1, nnew_obs) The new observations Notes ----- self.inner_importance_weights contains likelihood vals/evidence at in_samples self.in_weights is the prior quadrature weights which for random samples drawn from prior is just 1/N and for Gauss Quadrature is the quadrature rule weights. Similarly for self.outer_importance_weights """ if self.collected_obs is None: self.collected_obs = new_obs else: self.collected_obs = np.hstack( (self.collected_obs, new_obs)) self._compute_evidence() self.compute_importance_weights() self.out_weights_up = \ self.out_weights*self.outer_importance_weights self.in_weights_up = \ self.in_weights*self.inner_importance_weights
[docs] def set_collected_design_indices(self, indices): """ Set the initial design indices and collect data at the corresponding design points. Parameters ---------- indices : np.ndarray (nindices, 1) The indices corresponding to an initial design """ self.collected_design_indices = indices.copy() new_obs = self.obs_process(self.collected_design_indices) self.update_observations(new_obs)
# def update_design(self, return_all=False, rounding_decimals=16): # return super().update_design(return_all, rounding_decimals)
[docs]class BayesianSequentialKLOED(BayesianSequentialOED, BayesianBatchKLOED): r""" Compute closed-loop OED my maximizing KL divergence between the prior and posterior. """ def __init__(self, ndesign_candidates, obs_fun, noise_std, prior_variable, out_quad_opts, in_quad_opts, obs_process=None, nprocs=1, max_ncollected_obs=2): r""" Constructor. Parameters ---------- design_candidates : np.ndarray (nvars, nsamples) The location of all design sample candidates obs_fun : callable Function with the signature `obs_fun(samples) -> np.ndarray(nsamples, nqoi)` That returns noiseless evaluations of the forward model. noise_std : float or np.ndarray (nobs, 1) The standard deviation of the mean zero Gaussian noise added to each observation prior_variable : pya.IndependentMarginalsVariable The prior variable consisting of independent univariate random variables obs_process : callable The true data generation model with the signature `obs_process(design_indices) -> np.ndarray (1, ndesign_indices)` where design_samples is np.ndarary (nvars, ndesign_indices) generate_inner_prior_samples : callable Function with the signature `generate_inner_prior_samples(nsamples) -> np.ndarray( nvars, nsamples), np.ndarray(nsamples, 1)` Generate samples and associated weights used to evaluate the evidence computed by the inner loop If None then the function generate_outer_prior_samples is used and weights are assumed to be 1/nsamples. This function is useful if wanting to use multivariate quadrature to evaluate the evidence nin_samples : integer The number of quadrature samples used for the inner integral that computes the evidence for each realiaztion of the predicted observations nout_samples : integer The number of Monte Carlo samples used to compute the outer integral over all possible observations pre_collected_design_indices : np.ndarray (nobs) The indices into the qoi vector associated with the collected observations econ : boolean Make all inner loop samples the same for all outer loop samples. This reduces number of evaluations of prediction model. Currently this common data is copied and repeated for each outer loop sample so the rest of the code can remain the same. Eventually the data has to be tiled anyway when computing exepcted utility so this is not a big deal. nprocs : integer The number of threads used to compute OED design. Warning: this uses multiprocessing.Pool and seems to provide very little benefit and in many cases increases the CPU time. """ # obs_process default is None so same API can be used as # open loop design BayesianBatchKLOED.__init__( self, ndesign_candidates, obs_fun, noise_std, prior_variable, out_quad_opts, in_quad_opts, nprocs=nprocs, max_ncollected_obs=max_ncollected_obs) BayesianSequentialOED.__init__(self, obs_process)
[docs] def compute_expected_utility(self, collected_design_indices, new_design_indices, return_all=False): """ Compute the expected utility. Using the current posterior as the new prior. Parameters ---------- collected_design_indices : np.ndarray (nobs) The indices into the qoi vector associated with the collected observations new_design_indices : np.ndarray (nnew_obs) The indices into the qoi vector associated with new design locations under consideration Notes ----- Passing None for collected_design_indices will ensure only obs at new_design indices is used to evaluate likelihood the data at collected indices is incoroporated into the inner and outer loop weights """ return _compute_expected_kl_utility_monte_carlo( self.out_pred_obs, self.in_pred_obs, self.in_weights_up, self.out_weights_up, new_design_indices, self.noise_samples, self.noise_std, self.data_risk_fun, return_all)
[docs]class BayesianSequentialDeviationOED( BayesianSequentialOED, BayesianBatchDeviationOED): r""" Compute closed-loop OED by minimizing the deviation on the push forward of the posterior through a QoI model. """ def __init__(self, ndesign_candidates, obs_fun, noise_std, prior_variable, out_quad_opts, in_quad_opts, qoi_fun=None, obs_process=None, deviation_fun=oed_standard_deviation, pred_risk_fun=oed_prediction_average, data_risk_fun=oed_data_expectation, nprocs=1, max_ncollected_obs=2): r""" Constructor. Parameters ---------- design_candidates : np.ndarray (nvars, nsamples) The location of all design sample candidates obs_fun : callable Function with the signature `obs_fun(samples) -> np.ndarray(nsamples, nqoi)` That returns noiseless evaluations of the forward model. noise_std : float or np.ndarray (nobs, 1) The standard deviation of the mean zero Gaussian noise added to each observation prior_variable : pya.IndependentMarginalsVariable The prior variable consisting of independent univariate random variables obs_process : callable The true data generation model with the signature `obs_process(design_indices) -> np.ndarray (1, ndesign_indices)` where design_samples is np.ndarary (nvars, ndesign_indices) qoi_fun : callable Function with the signature `qoi_fun(samples) -> np.ndarray(nsamples, nqoi)` That returns evaluations of the forward model. Observations are assumed to be :math:`f(z)+\epsilon` where :math:`\epsilon` is additive noise nsamples : np.ndarray (nvars, nsamples) generate_inner_prior_samples : callable Function with the signature `generate_inner_prior_samples(nsamples) -> np.ndarray( nvars, nsamples), np.ndarray(nsamples, 1)` Generate samples and associated weights used to evaluate the evidence computed by the inner loop If None then the function generate_outer_prior_samples is used and weights are assumed to be 1/nsamples. This function is useful if wanting to use multivariate quadrature to evaluate the evidence nin_samples : integer The number of quadrature samples used for the inner integral that computes the evidence for each realiaztion of the predicted observations nout_samples : integer The number of Monte Carlo samples used to compute the outer integral over all possible observations pre_collected_design_indices : np.ndarray (nobs) The indices into the qoi vector associated with the collected observations econ : boolean Make all inner loop samples the same for all outer loop samples. This reduces number of evaluations of prediction model. Currently this common data is copied and repeated for each outer loop sample so the rest of the code can remain the same. Eventually the data has to be tiled anyway when computing exepcted utility so this is not a big deal. deviation_fun : callable Function with the signature `deviation_fun(in_pred_qois, weights) -> np.ndarray(nout_samples, nqois)` where in_pred_qois : np.ndarray ( nout_samples, nin_samples, nqois) weights : np.ndarray (nout_samples, nin_samples) nprocs : integer The number of threads used to compute OED design. Warning: this uses multiprocessing.Pool and seems to provide very little benefit and in many cases increases the CPU time. pred_risk_fun : callable Function to compute risk over multiple qoi with the signature `pred_risk_fun(expected_deviations) -> float` where expected_deviations : np.ndarray (nqois, 1) """ # obs_process default is None so same API can be used as # open loop design BayesianBatchDeviationOED.__init__( self, ndesign_candidates, obs_fun, noise_std, prior_variable, out_quad_opts, in_quad_opts, qoi_fun, deviation_fun, pred_risk_fun, data_risk_fun, nprocs, max_ncollected_obs) BayesianSequentialOED.__init__(self, obs_process)
[docs] def compute_expected_utility(self, collected_design_indices, new_design_indices, return_all=False): """ Compute the expected utility. Using the current posterior as the new prior. Parameters ---------- collected_design_indices : np.ndarray (nobs) The indices into the qoi vector associated with the collected observations new_design_indices : np.ndarray (nnew_obs) The indices into the qoi vector associated with new design locations under consideration Notes ----- Passing None for collected_design_indices will ensure only obs at new_design indices is used to evaluate likelihood the data at collected indices is incoroporated into the inner and outer loop weights """ return _compute_negative_expected_deviation_monte_carlo( self.out_pred_obs, self.in_pred_obs, self.in_weights_up, self.out_weights_up, self.in_pred_qois, self.deviation_fun, self.pred_risk_fun, self.data_risk_fun, self.noise_samples, self.noise_std, new_design_indices, return_all)
def get_oed_inner_quadrature_rule(nin_samples, prior_variable, quad_method='gauss'): """ Parameters ---------- quad_method : string The method used to compute the inner loop integral needed to evaluate the evidence for an outer loop sample. Options are ["linear", "quadratic", "gaussian", "monte_carlo"] The first 3 construct tensor product quadrature rules from univariate rules that are respectively piecewise linear, piecewise quadratic or Gauss-quadrature. """ nrandom_vars = prior_variable.num_vars() nin_samples_1d = nin_samples if quad_method == "gauss": var_trans = AffineTransform(prior_variable) univariate_quad_rules = \ get_univariate_quadrature_rules_from_variable( prior_variable, [nin_samples_1d]*nrandom_vars)[0] x_quad, w_quad = get_tensor_product_quadrature_rule( [nin_samples_1d]*nrandom_vars, nrandom_vars, univariate_quad_rules, transform_samples=None) return x_quad, w_quad[:, None] degree = {'linear': 1, 'quadratic': 2}[quad_method] if prior_variable.is_bounded_continuous_variable(): alpha = 1 else: alpha = 1-1e-6 new_ranges = prior_variable.get_statistics( "interval", confidence=alpha).flatten() x_quad, w_quad = \ get_tensor_product_piecewise_polynomial_quadrature_rule( nin_samples_1d, new_ranges, degree) w_quad *= prior_variable.pdf(x_quad)[:, 0] return x_quad, w_quad[:, None] def get_posterior_weights_at_in_samples(oed, nn, out_idx): # plot posterior for one realization of the data # nn : number of data used to form posterior # out_idx : the outer loop iteration used to generate the data assert nn > 0 active_indices = np.hstack([idx*oed.ndata_per_candidate + np.arange( oed.ndata_per_candidate) for idx in oed.collected_design_indices[:nn]]) outer_log_likelihood_vals, evidences = _compute_evidences( oed.out_pred_obs[out_idx:out_idx+1], oed.in_pred_obs, oed.in_weights, oed.out_weights[out_idx:out_idx+1], active_indices, oed.noise_samples[out_idx:out_idx+1], oed.noise_std) inner_log_likelihood_vals = _loglike_fun_from_noiseless_obs( oed.out_pred_obs[out_idx:out_idx+1], oed.in_pred_obs, oed.noise_samples[out_idx:out_idx+1], oed.noise_std, active_indices) weights = np.exp(inner_log_likelihood_vals)*oed.in_weights/evidences return weights def get_posterior_2d_interpolant_from_oed_data( oed, prior_variable, nn, out_idx, quad_method): # plot posterior for one realization of the data # nn : number of data used to form posterior # out_idx : the outer loop iteration used to generate the data assert prior_variable.num_vars() == 2 weights = get_posterior_weights_at_in_samples(oed, nn, out_idx) vals = weights/oed.in_weights # multiply vals by prior. vals *= prior_variable.pdf(oed.in_samples) nin_samples = vals.shape[0] if quad_method == "gauss": # interpolate posterior vals onto equidistant mesh for plotting nvars = prior_variable.num_vars() abscissa_1d = [] for dd in range(nvars): abscissa_1d.append( np.unique( oed.in_samples[dd, :nin_samples])) fun = partial(tensor_product_barycentric_interpolation, abscissa_1d, vals) return fun quad_methods = ['linear', 'quadratic', 'gauss'] if quad_method != "linear" and quad_method != "quadratic": raise ValueError(f"quad_method must be in {quad_methods}") # if using piecewise polynomial quadrature interpolate between using # piecewise linear method from scipy.interpolate import griddata x_quad = oed.in_samples def fun(x): return griddata(x_quad.T, vals, x.T, method="linear") return fun def plot_2d_posterior_from_oed_data( oed, prior_variable, nn, out_idx, method, ax=None, oed_results=None): from pyapprox.util.visualization import plt, get_meshgrid_function_data if ax is None: fig, ax = plt.subplots(1, 1, figsize=(8, 6)) if prior_variable.is_bounded_continuous_variable(): alpha = 1 else: alpha = 0.99 plot_limits = prior_variable.get_statistics( "interval", confidence=alpha).flatten() fun = get_posterior_2d_interpolant_from_oed_data( oed, prior_variable, nn, out_idx, method) X, Y, Z = get_meshgrid_function_data(fun, plot_limits, 100) p = ax.contourf( X, Y, Z, levels=np.linspace(Z.min(), Z.max(), 21)) plt.colorbar(p, ax=ax) def tensor_product_barycentric_interpolation(abscissa_1d, values, samples): nvars = len(abscissa_1d) barycentric_weights_1d = [] for dd in range(nvars): interval_length = abscissa_1d[dd].max()-abscissa_1d[dd].min() barycentric_weights_1d.append( compute_barycentric_weights_1d( abscissa_1d[dd], interval_length=interval_length)) poly_vals = multivariate_barycentric_lagrange_interpolation( samples, abscissa_1d, barycentric_weights_1d, values, np.arange(nvars)) return poly_vals def generate_inner_prior_samples_fixed(x_quad, w_quad, nsamples): """ Wrapper that can be used with functools.partial to create a function with the signature generate_inner_samples(nsamples) that always returns the same quadrature rule. This function will be called many times and creating a quadrature each time can be computationally expensive. """ assert nsamples == x_quad.shape[1], (nsamples, x_quad.shape) return x_quad, w_quad def get_deviation_fun(name, opts={}): """ Get the deviation function used to compute the deviation of the posterior push-forward for a realization of the observational data Parameters ---------- name : string Name of the deviation function. Must be one of ["std", "cvar", "entropic"] opts : dict Any options needed by the desired deviation function. cvar requires {"quantile", p} where 0<=p<1. No options are needed for the other deviation functions Returns ------- deviation_fun : callable Function with the signature `deviation_fun(in_pred_qois, weights) -> np.ndarray(nout_samples, nqois)` where in_pred_qois : np.ndarray (nout_samples, nin_samples, nqois) weights : np.ndarray (nout_samples, nin_samples) """ deviation_funs = { "std": oed_standard_deviation, "cvar": oed_conditional_value_at_risk_deviation, "entropic": oed_entropic_deviation} if name not in deviation_funs: msg = f"{name} not in {deviation_funs.keys()}" raise ValueError(msg) fun = partial(deviation_funs[name], **opts) return fun def get_data_risk_fun(name, opts={}): """ Get the risk function used to compute the risk of the deviation for all outerloop realizations of the observations Parameters ---------- name : string Name of the deviation function. Must be one of ["std", "cvar", "entropic"] opts : dict Any options needed by the desired deviation function. cvar requires {"quantile", p} where 0<=p<1. No options are needed for the other deviation functions Returns ------- deviation_fun : callable Function with the signature `deviation_fun(in_pred_qois, weights) -> np.ndarray(nout_samples, nqois)` where in_pred_qois : np.ndarray (nout_samples, nin_samples, nqois) weights : np.ndarray (nout_samples, nin_samples) """ risk_funs = { "mean": oed_data_expectation, "cvar": oed_data_cvar} if name not in risk_funs: msg = f"{name} not in {risk_funs.keys()}" raise ValueError(msg) fun = partial(risk_funs[name], **opts) return fun def get_pred_risk_fun(name, **kwargs): risk_funs = { "mean": oed_prediction_average, "cvar": conditional_value_at_risk, "entropic": entropic_risk_measure} if name not in risk_funs: msg = f"{name} not in {risk_funs.keys()}" raise ValueError(msg) fun = partial(risk_funs[name], **kwargs) return fun def extract_independent_noise_cov(cov, indices): """ When computing laplace approximations we need a covariance matrix that treats each observation independent even when indices are the same, that is we have two or more observations for the same observation matrix """ nindices = len(indices) if np.unique(indices).shape[0] == nindices: return cov[np.ix_(indices, indices)] new_cov = np.diag(np.diag(cov)[indices]) return new_cov def sequential_oed_synthetic_observation_process( obs_fun, true_sample, noise_fun, new_design_indices): r""" Use obs_model to generate all observations then downselect. For true observation processes this defeats the purpose of experimental design In these cases a custom obs_model must takes design indices as an argument. Parameters ---------- obs_fun : callable Function with the signature `obs_fun() -> np.ndarray(nsamples, nqoi)` That returns the synethic truth for all design candidates. true_sample : np.ndaray (nvars, 1) The true sample used to generate the synthetic truth new_design_indices : np.ndarray (nnew_obs) The indices into the qoi vector associated with new design locations under consideration noise_fun : callable Function with signature `noise_fun(values, new_design_indices) -> np.ndarray (values.shape[0], new_design_indices.shape)` that returns noise for the new observations. Here values : np.ndarray (1, nobs) and new_design_indices : np.ndarary (nindices) where nindices<=nobs """ all_obs = obs_fun(true_sample) noise = noise_fun(all_obs, new_design_indices) obs = all_obs[:, new_design_indices]+noise return obs def gaussian_noise_fun(noise_std, values, active_indices=None): """ Generate gaussian possibly heteroscedastic random noise Parameters ---------- noise_std : float or np.ndarray (nobs) The standard deviation of the noise at each observation values : np.ndarray (nsamples, nobs) The observations at variour realizations of the random parameters active_indices :np.ndarray (nindices) The indices of the active observations with nindices <= nobs Returns ------- noise : np.ndarray (nsamples, nindices) The noise at the active observations nindices=nobs if active_indices is None """ if type(noise_std) == np.ndarray: noise_std = noise_std.flatten() if active_indices is None: return np.random.normal(0, noise_std, (values.shape)) shape = (values.shape[0], active_indices.shape[0]) if type(noise_std) != np.ndarray: return np.random.normal(0, noise_std, shape) return np.random.normal( 0, noise_std[active_indices], shape)
[docs]def get_bayesian_oed_optimizer( short_oed_type, ndesign_candidates, obs_fun, noise_std, prior_variable, out_quad_opts=None, in_quad_opts=None, nprocs=1, pre_collected_design_indices=None, **kwargs): r""" Initialize a Bayesian OED optimizer. Parameters ---------- short_oed_type : string The type of experimental design strategy design_candidates : np.ndarray (nvars, nsamples) The location of all design sample candidates obs_fun : callable Function with the signature `obs_fun(samples) -> np.ndarray(nsamples, nqoi)` That returns noiseless evaluations of the forward model. noise_std : float or np.ndarray (nobs, 1) The standard deviation of the mean zero Gaussian noise added to each observation nin_samples : integer The number of quadrature samples used for the inner integral that computes the evidence for each realiaztion of the predicted observations nout_samples : integer The number of Monte Carlo samples used to compute the outer integral over all possible observations quad_method : string The method used to compute the inner loop integral needed to evaluate the evidence for an outer loop sample. Options are ["linear", "quadratic", "gaussian", "monte_carlo"] The first 3 construct tensor product quadrature rules from univariate rules that are respectively piecewise linear, piecewise quadratic or Gauss-quadrature. pre_collected_design_indices : np.ndarray (nobs) The indices into the qoi vector associated with the collected observations kwargs : kwargs Key word arguments specific to the OED type Returns ------- oed : pyapprox.expdesign.AbstractBayesianOED Bayesian OED optimizer object """ if "obs_process" in kwargs: oed_type = "closed_loop_" + short_oed_type else: oed_type = "open_loop_" + short_oed_type oed_types = {"open_loop_kl_params": BayesianBatchKLOED, "closed_loop_kl_params": BayesianSequentialKLOED, "open_loop_dev_pred": BayesianBatchDeviationOED, "closed_loop_dev_pred": BayesianSequentialDeviationOED} if oed_type not in oed_types: msg = f"oed_type {short_oed_type} not supported." msg += "Select from [kl_params, dev_pred]" raise ValueError(msg) if (type(noise_std) == np.ndarray and noise_std.shape[0] != ndesign_candidates): msg = "noise_std must be specified for each design candiate" raise ValueError(msg) if out_quad_opts is None: out_quad_opts = { "method": "quasimontecarlo", "kwargs": {"nsamples": int(1e3)}} if in_quad_opts is None: in_quad_opts = { "method": "quasimontecarlo", "kwargs": {"nsamples": int(1e3)}} oed = oed_types[oed_type]( ndesign_candidates, obs_fun, noise_std, prior_variable, out_quad_opts, in_quad_opts, nprocs=nprocs, **kwargs) oed.populate() if pre_collected_design_indices is not None: oed.set_collected_design_indices(np.asarray(pre_collected_design_indices)) return oed