Source code for pyapprox.analysis.parameter_sweeps

import numpy as np
import os
import matplotlib.pyplot as plt

from pyapprox.variables.joint import get_truncated_ranges
from pyapprox.variables.transforms import map_hypercube_samples
from pyapprox.variables.density import map_from_canonical_gaussian


def get_parameter_sweeps_samples_using_rotation(
        rotation_matrix, get_sweep_bounds, num_samples_per_sweep=50,
        random_samples=False):
    """
    Get the samples of parameter sweeps through  directions
    d-dimensional hypercube on [a_0,b_0] x ... x [a_d,b_d]. Directions are
    specified by the rows of rotation matrix

    Parameters
    ----------
    rotation_matrix : np.ndarray (num_vars,num_sweeps)
        Each row contains the rotation vector of each sweep.

    get_sweep_bounds : callable
        lb,ub = get_sweep_uppper_and_lower_bounds(W)
        Function to compute the upper and lower bounds of the parameter sweep

    num_samples_per_sweep : integer
        The number of samples in each of the parameter sweeps

    random_samples : boolean
        True  - generate uniform random samples within bounds of sweep
        False - generate equidistant samples within bounds of sweep

    Returns
    -------
    samples : np.ndarray (num_vars, num_samples_per_sweep*num_sweeps)
        The samples in the d-dimensional space. Each sweep is listed
        consecutivelty. That is num_samples_per_sweep for first sweep
        are the first rows, then the second sweep are the next set of
        num_samples_per_sweep rows, and so on.

    active_samples : np.ndarray (num_sweeps, num_samples_per_sweep)
        The univariate samples of the parameter sweeps. These samples are
        for normalized hypercubes [-1,1]^d.
    """
    # num_sweeps, num_vars = rotation_matrix.shape
    num_vars, num_sweeps = rotation_matrix.shape

    samples = np.empty((num_vars, num_samples_per_sweep*num_sweeps))
    active_samples = np.empty((num_sweeps, num_samples_per_sweep))
    for i in range(num_sweeps):
        W = rotation_matrix[:, i:i+1]
        # find approximate upper and lower bounds for active variable
        y_lb, y_ub = get_sweep_bounds(W)
        # define samples in sweep inside approximate upper and lower bounds
        if num_samples_per_sweep == 1:
            y = np.asarray([[(y_lb+y_ub)/2.]])
        else:
            if random_samples:
                y = np.random.uniform(y_lb, y_ub, (1, num_samples_per_sweep))
            else:
                y = np.asarray(
                    [np.linspace(y_lb, y_ub, num_samples_per_sweep)])
        x = np.dot(W, y)
        active_samples[i, :] = y[0, :]
        samples[:, i*num_samples_per_sweep:(i+1)*num_samples_per_sweep] = x

    return samples, active_samples


def get_parameter_sweeps_samples(
        num_vars, get_sweep_bounds, num_samples_per_sweep=50, num_sweeps=1,
        random_samples=False, sweep_rotation_matrix=None):
    """
    Get the samples of parameter sweeps through random directions of a
    d-dimensional hypercube on [a_0,b_0] x ... x [a_d,b_d]

    Parameters
    ----------
    num_vars : integer
        The number of variables d

    get_sweep_bounds : callable
        lb,ub = get_sweep_uppper_and_lower_bounds(W)
        Function to compute the upper and lower bounds of the parameter sweep

    num_samples_per_sweep : integer
        The number of samples in each of the parameter sweeps

    num_sweeps : integer
        The number of sweeps

    random_samples : boolean
        True  - return random samples along sweep
        False - return equidistanly spaced samples

    sweep_rotation_matrix : np.ndarray (num_vars, num_sweeps)
        Precomputed directions along which to compute the parameter sweeps.
        If matrices is zeros except a value of one in each column
        the parameter sweeps will be along the axial directions

    Returns
    -------
    samples : np.ndarray (num_vars, num_samples_per_sweep*num_sweeps)
        The samples in the d-dimensional space. Each sweep is listed
        consecutivelty. That is num_samples_per_sweep for first sweep
        are the first rows, then the second sweep are the next set of
        num_samples_per_sweep rows, and so on.

    active_samples : np.ndarray (num_sweeps, num_samples_per_sweep)
        The univariate samples of the parameter sweeps. These samples are
        for normalized hypercubes [-1,1]^d.

    rotation_matrix : np.ndarray (num_sweeps, num_vars)
        Each row contains the rotation vector of each sweep.
    """
    samples = np.empty((num_vars, num_samples_per_sweep*num_sweeps))
    active_samples = np.empty((num_sweeps, num_samples_per_sweep))

    if sweep_rotation_matrix is not None:
        assert num_vars == sweep_rotation_matrix.shape[0]
        assert num_sweeps == sweep_rotation_matrix.shape[1]
        samples, active_samples = get_parameter_sweeps_samples_using_rotation(
            sweep_rotation_matrix, get_sweep_bounds, num_samples_per_sweep,
            random_samples)
        return samples, active_samples, sweep_rotation_matrix

    # can only generate a maximum of num_var sweeps for each rotation matrix
    # to generate more create different rotation matrices until enough
    # sweeps are generated
    ii = 0
    samples, active_samples, rot_mats = [], [], []
    while ii*num_vars < num_sweeps:
        A = np.random.normal(0, 1, (num_vars, num_sweeps))
        Q, R = np.linalg.qr(A)
        sweep_rotation_matrix_ii = Q[:, :min(num_vars, num_sweeps-ii*num_vars)]

        samples_ii, active_samples_ii = \
            get_parameter_sweeps_samples_using_rotation(
                sweep_rotation_matrix_ii, get_sweep_bounds,
                num_samples_per_sweep, random_samples)
        samples.append(samples_ii)
        active_samples.append(active_samples_ii)
        rot_mats.append(sweep_rotation_matrix_ii)
        ii += 1

    return np.hstack(samples), np.vstack(active_samples), rot_mats


def get_hypercube_sweep_bounds(W):
    num_vars = W.shape[0]
    maxdist = np.sqrt(num_vars*4)
    y = np.asarray([np.linspace(-maxdist/2., maxdist/2., 1000)])
    x = np.dot(W, y)
    II = np.where(np.all(x >= -1, axis=0) & np.all(x <= 1, axis=0))[0]
    y_lb = y[0, II[0]]
    y_ub = y[0, II[-1]]
    return y_lb, y_ub


def get_hypercube_parameter_sweeps_samples(
        ranges, num_samples_per_sweep=50, num_sweeps=1,
        sweep_rotation_matrix=None):
    """
    Get the samples of parameter sweeps through random directions of a
    d-dimensional hypercube on [a_0,b_0] x ... x [a_d,b_d]

    Parameters
    ----------
    ranges : np.ndarray (2*num_vars)
        lower and upper bounds for each of the d random variables
        [lb_1,ub_1,...,lb_d,ub_d]

    num_samples_per_sweep : integer
        The number of samples in each of the parameter sweeps

    num_sweeps : integer
        The number of sweeps

    Returns
    -------
    samples : np.ndarray (num_vars, num_samples_per_sweep*num_sweeps)
        The samples in the d-dimensional space. Each sweep is listed
        consecutivelty. That is num_samples_per_sweep for first sweep
        are the first rows, then the second sweep are the next set of
        num_samples_per_sweep rows, and so on.

    active_samples : np.ndarray (num_sweeps, num_samples_per_sweep)
        The univariate samples of the parameter sweeps. These samples are
        for normalized hypercubes [-1,1]^d.
    """

    num_vars = ranges.shape[0]//2

    samples, active_samples, W = get_parameter_sweeps_samples(
        num_vars, get_hypercube_sweep_bounds,
        num_samples_per_sweep, num_sweeps,
        sweep_rotation_matrix=sweep_rotation_matrix)

    canonical_ranges = np.ones((num_vars*2), dtype=float)
    canonical_ranges[::2] = -1
    samples = map_hypercube_samples(
        samples, canonical_ranges, ranges)
    return samples, active_samples, W


def get_gaussian_parameter_sweeps_samples(
        mean, covariance=None, covariance_sqrt=None,
        sweep_radius=1, num_samples_per_sweep=50, num_sweeps=1,
        sweep_rotation_matrix=None):
    """
    Get the samples of parameter sweeps through random directions of a
    zero-mean multivariate Gaussian

    One and only one of covariance and covariance_sqrt must be
    not None.

    Parameters
    ----------
    mean : np.ndarray (num_vars)
        The mean of the multivariate Gaussian

    covariance : np.ndarray (num_vars, num_vars)
        The covariance of the multivariate Gaussian

    covariance_sqrt : callable
        correlated_samples = covariance_sqrt(stdnormal_samples)
        An operator that applies the sqrt of the Gaussian covariance to a set
        of vectors. Useful for large scale applications.

    sweep_radius : float
        The radius of the parameter sweep as a multiple of
        one standard deviation of the standard normal

    num_samples_per_sweep : integer
        The number of samples in each of the parameter sweeps

    num_sweeps : integer
        The number of sweeps

    Returns
    -------
    samples : np.ndarray (num_vars, num_samples_per_sweep*num_sweeps)
        The samples in the d-dimensional space. Each sweep is listed
        consecutivelty. That is num_samples_per_sweep for first sweep
        are the first rows, then the second sweep are the next set of
        num_samples_per_sweep rows, and so on.

    active_samples : np.ndarray (num_sweeps, num_samples_per_sweep)
        The univariate samples of the parameter sweeps. These samples are
        for normalized hypercubes [-1,1]^d.
    """

    def get_gaussian_sweep_bounds(W):
        z = sweep_radius  # number of standard deviations
        return np.asarray([-z, z])

    num_vars = mean.shape[0]
    samples, active_samples, W = get_parameter_sweeps_samples(
        num_vars, get_gaussian_sweep_bounds, num_samples_per_sweep, num_sweeps,
        random_samples=False, sweep_rotation_matrix=sweep_rotation_matrix)

    if covariance is not None:
        assert covariance_sqrt is None
        covariance_chol_factor = np.linalg.cholesky(covariance)
    else:
        covariance_chol_factor = None

    samples = map_from_canonical_gaussian(
        samples, mean, covariance_chol_factor=covariance_chol_factor,
        covariance_sqrt=covariance_sqrt)

    return samples, active_samples, W


def plot_parameter_sweep_single_qoi(active_samples, vals, num_sweeps,
                                    num_samples_per_sweep, label_opts=dict(),
                                    axs=plt, markers='o', colors=None,
                                    alpha=1):
    if type(markers) == str:
        markers = [markers]*num_sweeps
    assert len(markers) == num_sweeps

    if type(colors) == str or colors is None:
        colors = [colors]*num_sweeps
    assert len(colors) == num_sweeps

    for i in range(num_sweeps):
        # scale y to [-1,1]
        lb = active_samples[i, :].min()
        ub = active_samples[i, :].max()
        y = (active_samples[i, :]-lb)/(ub-lb)*2-1.
        sweep_label = label_opts.get('sweep_label', r'$\mathrm{Sweep}$')
        axs.plot(y, vals[i*num_samples_per_sweep:(i+1)*num_samples_per_sweep],
                 markers[i], lw=2, label=sweep_label+r' $%d$' % i,
                 color=colors[i], alpha=alpha)
    try:
        if 'title' in label_opts:
            axs.title(label_opts['title'])
        xlabel = label_opts.get('xlabel',  r"$\mathrm{Sweep\;variable}$")
        if xlabel is not None:
            axs.set_xlabel(xlabel)
        axs.ylabel(label_opts.get('ylabel', r"$\mathrm{Function\;value}$"))
    except:
        # needed if axs is of type  subplot
        if 'title' in label_opts:
            axs.set_title(label_opts['title'])
        xlabel = label_opts.get('xlabel', r"$\mathrm{Sweep\;variable}$")
        if xlabel is not None:
            axs.set_xlabel(xlabel)
        axs.set_ylabel(label_opts.get('ylabel', r"$\mathrm{Function\;value}$"))
    axs.legend(loc=0, numpoints=1)


def plot_parameter_sweeps(active_samples, vals, fig_basename=None,
                          qoi_indices=None, show=False, axs=None,
                          axes_label_opts=None, markers='o', colors=None,
                          alpha=1):
    num_sweeps, num_samples_per_sweep = active_samples.shape
    assert vals.shape[0] == num_sweeps*num_samples_per_sweep
    if qoi_indices is None:
        qoi_indices = np.arange(vals.shape[1])
    assert np.all(qoi_indices < vals.shape[1])

    if axs is None:
        fig, axs = plt.subplots(1, len(qoi_indices),
                                figsize=(8*len(qoi_indices), 6), sharey=True)
        if len(qoi_indices) == 1:
            axs = [axs]

    if axes_label_opts is None:
        axes_label_opts = [dict() for ii in range(len(qoi_indices))]

    if type(axes_label_opts) == dict:
        axes_label_opts = [axes_label_opts for ii in range(len(qoi_indices))]

    for j in range(len(qoi_indices)):
        label_opts = axes_label_opts[j]
        if 'title' not in label_opts:
            label_opts['title'] = r"$\mathrm{QoI\;%d}$" % qoi_indices[j]
        plot_parameter_sweep_single_qoi(
            active_samples, vals[:, j], num_sweeps, num_samples_per_sweep,
            label_opts, axs[j], markers, colors, alpha)
    if fig_basename is not None:
        plt.savefig(fig_basename+'.pdf', bbox_inches='tight')
    if show:
        plt.show()
    return axs


def generate_parameter_sweeps_samples(opts, sweep_type, num_samples_per_sweep,
                                      num_sweeps, sweep_rotation_matrix):
    if sweep_type == 'hypercube':
        ranges = opts['ranges']
        samples, active_samples, W = \
            get_hypercube_parameter_sweeps_samples(
                ranges, num_samples_per_sweep, num_sweeps,
                sweep_rotation_matrix)
        # print(samples.T)
    elif sweep_type == 'gaussian':
        mean = opts['mean']
        covariance = opts.get('covariance', None)
        covariance_sqrt = opts.get('covariance_sqrt', None)
        sweep_radius = opts['sweep_radius']
        samples, active_samples, W = get_gaussian_parameter_sweeps_samples(
            mean, covariance=covariance, covariance_sqrt=covariance_sqrt,
            sweep_radius=sweep_radius,
            num_samples_per_sweep=num_samples_per_sweep,
            num_sweeps=num_sweeps,
            sweep_rotation_matrix=sweep_rotation_matrix)
    else:
        raise Exception('incorrect sweep_type : %s' % sweep_type)
    return samples, active_samples, W


def generate_parameter_sweeps(opts, sweep_type, num_samples_per_sweep,
                              num_sweeps, sweep_rotation_matrix, model):
    samples, active_samples, W = generate_parameter_sweeps_samples(
        opts, sweep_type, num_samples_per_sweep, num_sweeps,
        sweep_rotation_matrix)
    vals = model(samples)
    assert vals.ndim == 2
    assert vals.shape[0] == samples.shape[1]
    return samples, active_samples, W, vals


def generate_parameter_sweeps_and_plot(
        model, opts, filename, sweep_type, num_samples_per_sweep=50,
        num_sweeps=2, qoi_indices=None, show=False,
        sweep_rotation_matrix=None, axes_label_opts=None, axs=None):

    if filename is None or not os.path.exists(filename):
        samples, active_samples, W, vals = generate_parameter_sweeps(
            opts, sweep_type, num_samples_per_sweep, num_sweeps,
            sweep_rotation_matrix, model)

        if filename is not None:
            path = os.path.split(filename)[0]
            if len(path) > 0 and not os.path.exists(path):
                os.makedirs(path)
            np.savez(filename, samples=samples, vals=vals,
                     active_samples=active_samples)
    else:
        data = np.load(filename)
        samples = data['samples']
        vals = data['vals']
        active_samples = data['active_samples']

    if filename is not None:
        if filename[-4] == '.':
            figbasename = filename[:-4]
        else:
            figbasename = filename
    else:
        figbasename = None

    return plot_parameter_sweeps(
        active_samples, vals, figbasename, qoi_indices,
        show, axes_label_opts=axes_label_opts, axs=axs)


[docs]def generate_parameter_sweeps_and_plot_from_variable( model, variable, filename=None, num_samples_per_sweep=50, num_sweeps=2, qoi_indices=None, show=False, axes_label_opts=None, axs=None): """ Plot parameter sweeps of a function. Parameters ---------- model : callable Function with the signature `model(samples) -> np.ndarray(nsamples, nqoi)` where samples : np.ndarray (nvars, nsamples) variable : :class:`pyapprox.variables.IndependentMarginalsVariable` Random variable filename : string Name of file to store parameter sweeps. If None no file is written num_samples_per_sweep : interger The number of samples in a parameter sweep num_samples : integer The number of parameter sweeps qoi_indices : iterable The column indices in the values outputed by model which will be plotted. A separate subplot will be used for each QoI show : boolean True - plt.show() is called axes_label_opts : list of dict Dictionary specifying plt kwargs for each axes axs : list of :class:`matplotlib.pyplot.axes` If provided will be used to plot each QoI Otherwise axes will be created """ opts = {"ranges": get_truncated_ranges(variable)} return generate_parameter_sweeps_and_plot( model, opts, filename, "hypercube", num_samples_per_sweep, num_sweeps, qoi_indices, show, None, axes_label_opts, axs)