Source code for pyapprox.surrogates.polychaos.adaptive_polynomial_chaos

import numpy as np
from scipy.linalg import solve_triangular
from sklearn.linear_model import (
    OrthogonalMatchingPursuit,
    OrthogonalMatchingPursuitCV
)

from pyapprox.surrogates.orthopoly.leja_sequences import christoffel_weights
from pyapprox.surrogates.polychaos.gpc import (
    PolynomialChaosExpansion,
    define_poly_options_from_variable_transformation
)
from pyapprox.surrogates.polychaos.induced_sampling import (
    increment_induced_samples_migliorati,
    generate_induced_samples_migliorati_tolerance,
    compute_preconditioned_canonical_basis_matrix_condition_number
)
from pyapprox.util.utilities import hash_array
from pyapprox.util.linalg import (
    add_columns_to_pivoted_lu_factorization,
    continue_pivoted_lu_factorization,
    get_final_pivots_from_sequential_pivots,
    split_lu_factorization_matrix,
    pivot_rows,
    truncated_pivoted_lu_factorization, unprecondition_LU_factor
)
from pyapprox.surrogates.interp.adaptive_sparse_grid import (
    SubSpaceRefinementManager
)
from pyapprox.variables.sampling import (
    generate_independent_random_samples
)


def get_subspace_active_poly_array_indices(adaptive_pce, ii):
    idx1 = adaptive_pce.unique_poly_indices_idx[ii]
    if ii < adaptive_pce.unique_poly_indices_idx.shape[0]-1:
        idx2 = adaptive_pce.unique_poly_indices_idx[ii+1]
    else:
        idx2 = adaptive_pce.poly_indices.shape[1]
    return np.arange(idx1, idx2)


def get_active_poly_array_indices(adaptive_pce):
    indices = np.empty((0), dtype=int)
    for key, ii in adaptive_pce.active_subspace_indices_dict.items():
        subspace_array_indices = get_subspace_active_poly_array_indices(
            adaptive_pce, ii)
        indices = np.hstack([indices, subspace_array_indices])
    return indices


def variance_pce_refinement_indicator(
        subspace_index, num_new_subspace_samples, adaptive_pce,
        normalize=True, mean_only=False):
    """
    Set pce coefficients of new subspace poly indices to zero to compute
    previous mean then set them to be non-zero
    """
    key = hash_array(subspace_index)
    ii = adaptive_pce.active_subspace_indices_dict[key]
    II = get_subspace_active_poly_array_indices(adaptive_pce, ii)
    error = np.sum(adaptive_pce.pce.coefficients[II]**2, axis=0)
    indicator = error.copy()

    if normalize:
        msg = """Attempted normalization of variance with values at first grid point close to 0.
        Possible options are:
        - Use a different sample or sampling sequence
          (if random sampling, try a different seed value)
        - Set the `normalize` option to False (see docs for: `variance_pce_refinement_indicator()`)
        """
        assert np.all(np.absolute(adaptive_pce.values[0, :]) > 1e-6), msg
        indicator /= np.absolute(adaptive_pce.values[0, :])**2

    qoi_chosen = np.argmax(indicator)
    indicator = indicator.max()

    cost_per_sample = adaptive_pce.eval_cost_function(
        subspace_index[:, np.newaxis])
    cost = cost_per_sample*num_new_subspace_samples

    # compute marginal benefit
    indicator /= cost

    return -indicator, error[qoi_chosen]


def solve_preconditioned_least_squares(basis_matrix_func, samples, values,
                                       precond_func):
    basis_matrix = basis_matrix_func(samples)
    weights = precond_func(basis_matrix, samples)
    basis_matrix = basis_matrix*weights[:, np.newaxis]
    rhs = values*weights[:, np.newaxis]
    coef = np.linalg.lstsq(basis_matrix, rhs, rcond=None)[0]
    return coef


def solve_preconditioned_orthogonal_matching_pursuit(basis_matrix_func,
                                                     samples, values,
                                                     precond_func,
                                                     tol=1e-8):
    basis_matrix = basis_matrix_func(samples)
    weights = precond_func(basis_matrix, samples)
    basis_matrix = basis_matrix*weights[:, np.newaxis]
    rhs = values*weights[:, np.newaxis]
    if basis_matrix.shape[1] == 1 or tol > 0:
        omp = OrthogonalMatchingPursuit(tol=tol)
    else:
        omp = OrthogonalMatchingPursuitCV(cv=min(samples.shape[1], 10))
    res = omp.fit(basis_matrix, rhs)
    coef = omp.coef_
    coef[0] += res.intercept_
    return coef[:, np.newaxis]


def christoffel_preconditioning_function(basis_matrix, samples):
    weights = np.sqrt(basis_matrix.shape[1]*christoffel_weights(basis_matrix))
    return weights


def generate_probability_samples_tolerance(
        pce, nindices, cond_tol, samples=None,
        verbosity=0):
    r"""
    Add samples in integer increments of nindices.
    E.g. if try nsamples = nindices, 2*nindices, 3*nindices
    until condition number is less than tolerance.

    Parameters
    ----------
    samples : np.ndarray
        samples in the canonical domain of the polynomial

    Returns
    -------
    new_samples : np.ndarray(nvars, nnew_samples)
        New samples appended to samples. must be in canonical space
    """
    variable = pce.var_trans.variable
    if samples is None:
        new_samples = generate_independent_random_samples(
            variable, nindices)
        new_samples = pce.var_trans.map_to_canonical(new_samples)
    else:
        new_samples = samples.copy()
    cond = compute_preconditioned_canonical_basis_matrix_condition_number(
        pce, new_samples)
    if verbosity > 0:
        print('\tCond No.', cond, 'No. samples', new_samples.shape[1],
              'cond tol', cond_tol)
    cnt = 1
    max_nsamples = 1000*pce.indices.shape[1]
    while cond > cond_tol:
        tmp = generate_independent_random_samples(variable, cnt*nindices)
        tmp = pce.var_trans.map_to_canonical(tmp)
        new_samples = np.hstack((new_samples, tmp))
        cond = compute_preconditioned_canonical_basis_matrix_condition_number(
            pce, new_samples)
        if verbosity > 0:
            print('\tCond No.', cond, 'No. samples', new_samples.shape[1],
                  'cond tol', cond_tol)
        # double number of samples added so loop does not take to long
        cnt *= 2
        if new_samples.shape[1] > max_nsamples:
            msg = "Basis and sample combination is ill conditioned"
            raise RuntimeError(msg)
    return new_samples


def increment_probability_samples(pce, cond_tol, samples, indices,
                                  new_indices, verbosity=0):
    r"""
    Parameters
    ----------
    samples : np.ndarray
        samples in the canonical domain of the polynomial

    Returns
    -------
    new_samples : np.ndarray(nvars, nnew_samples)
        New samples appended to samples. must be in canonical space
    """
    # allocate at one sample for every new basis
    tmp = generate_independent_random_samples(
        pce.var_trans.variable, new_indices.shape[1])
    tmp = pce.var_trans.map_to_canonical(tmp)
    new_samples = np.hstack((samples, tmp))
    # keep sampling until condition number is below cond_tol
    new_samples = generate_probability_samples_tolerance(
        pce, new_indices.shape[1], cond_tol, new_samples, verbosity)
    if verbosity > 0:
        print('No. samples', new_samples.shape[1])
        print('No. initial samples', samples.shape[1])
        print('No. indices', indices.shape[1], pce.indices.shape[1])
        print('No. new indices', new_indices.shape[1])
        print('No. new samples',
              new_samples.shape[1]-samples.shape[1])
    return new_samples


[docs]class AdaptiveInducedPCE(SubSpaceRefinementManager): """ An adaptive PCE built using induced sampling and generalized sparse grid like refinement. """ def __init__(self, num_vars, cond_tol=1e2, induced_sampling=True, fit_opts={'omp_tol': 0}): """ num_vars : integer The number of random variables cond_tol : float The target condition number of the basis matrix used for regression induced_sampling : boolean True - use induced sampling False - use random sampling fit_opts : dict Options used to solve the regression problem at each step of the adaptive algorithm """ super(AdaptiveInducedPCE, self).__init__(num_vars) self.cond_tol = cond_tol self.fit_opts = fit_opts self.set_preconditioning_function(christoffel_preconditioning_function) self.fit_function = self._fit self.induced_sampling = induced_sampling assert abs(cond_tol) > 1 if not induced_sampling: self.set_preconditioning_function( precond_func=lambda m, x: np.ones(x.shape[1])) self.moments = None
[docs] def set_function(self, function, var_trans=None, pce=None): super(AdaptiveInducedPCE, self).set_function(function, var_trans) self.set_polynomial_chaos_expansion(pce)
[docs] def set_polynomial_chaos_expansion(self, pce=None): if pce is None: poly_opts = define_poly_options_from_variable_transformation( self.var_trans) self.pce = PolynomialChaosExpansion() self.pce.configure(poly_opts) else: self.pce = pce
[docs] def increment_samples(self, current_poly_indices, unique_poly_indices): if self.induced_sampling: return increment_induced_samples_migliorati( self.pce, self.cond_tol, self.samples, current_poly_indices, unique_poly_indices) if self.cond_tol < 0: sample_ratio = -self.cond_tol samples = generate_independent_random_samples( self.pce.var_trans.variable, sample_ratio*unique_poly_indices.shape[1]) samples = self.pce.var_trans.map_to_canonical(samples) samples = np.hstack([self.samples, samples]) return samples return increment_probability_samples( self.pce, self.cond_tol, self.samples, current_poly_indices, unique_poly_indices)
[docs] def allocate_initial_samples(self): if self.induced_sampling: return generate_induced_samples_migliorati_tolerance( self.pce, self.cond_tol) if self.cond_tol < 0: sample_ratio = -self.cond_tol return generate_independent_random_samples( self.pce.var_trans.variable, sample_ratio*self.pce.num_terms()) return generate_probability_samples_tolerance( self.pce, self.pce.num_terms(), self.cond_tol)
[docs] def create_new_subspaces_data(self, new_subspace_indices): num_current_subspaces = self.subspace_indices.shape[1] self.initialize_subspaces(new_subspace_indices) self.pce.set_indices(self.poly_indices) if self.samples.shape[1] == 0: unique_subspace_samples = self.allocate_initial_samples() return unique_subspace_samples, np.array( [unique_subspace_samples.shape[1]]) num_vars, num_new_subspaces = new_subspace_indices.shape unique_poly_indices = np.zeros((num_vars, 0), dtype=int) for ii in range(num_new_subspaces): II = get_subspace_active_poly_array_indices( self, num_current_subspaces+ii) unique_poly_indices = np.hstack( [unique_poly_indices, self.poly_indices[:, II]]) # Current_poly_indices will include active indices not added # during this call, i.e. in new_subspace_indices. # thus cannot use # II = get_active_poly_array_indices(self) # unique_poly_indices = self.poly_indices[:,II] # to replace above loop current_poly_indices = self.poly_indices[ :, :self.unique_poly_indices_idx[num_current_subspaces]] num_samples = self.samples.shape[1] samples = self.increment_samples( current_poly_indices, unique_poly_indices) unique_subspace_samples = samples[:, num_samples:] # warning num_new_subspace_samples does not really make sense for # induced sampling as new samples are not directly tied to newly # added basis num_new_subspace_samples = unique_subspace_samples.shape[1]*np.ones( new_subspace_indices.shape[1])//new_subspace_indices.shape[1] return unique_subspace_samples, num_new_subspace_samples
def _fit(self, pce, canonical_basis_matrix, samples, values, precond_func=None, omp_tol=0): # do to, just add columns to stored basis matrix # store qr factorization of basis_matrix and update the factorization # self.samples are in canonical domain if omp_tol == 0: coef = solve_preconditioned_least_squares( canonical_basis_matrix, samples, values, precond_func) else: coef = solve_preconditioned_orthogonal_matching_pursuit( canonical_basis_matrix, samples, values, precond_func, omp_tol) self.pce.set_coefficients(coef)
[docs] def fit(self): return self.fit_function( self.pce, self.pce.canonical_basis_matrix, self.samples, self.values, **self.fit_opts)
[docs] def add_new_subspaces(self, new_subspace_indices): num_new_subspace_samples = super( AdaptiveInducedPCE, self).add_new_subspaces(new_subspace_indices) self.fit() return num_new_subspace_samples
[docs] def __call__(self, samples, return_grad=False): return self.pce(samples, return_grad)
[docs] def get_active_unique_poly_indices(self): II = get_active_poly_array_indices(self) return self.poly_indices[:, II]
[docs] def set_preconditioning_function(self, precond_func): """ precond_func : callable Callable function with signature precond_func(basis_matrix,samples) """ self.precond_func = precond_func self.fit_opts['precond_func'] = self.precond_func
[docs] def num_training_samples(self): return self.samples.shape[1]
[docs] def build(self, callback=None): """ """ while (not self.active_subspace_queue.empty() or self.subspace_indices.shape[1] == 0): self.refine() self.recompute_active_subspace_priorities() if callback is not None: callback(self)
[docs]class AdaptiveLejaPCE(AdaptiveInducedPCE): """ An adaptive PCE built using multivariate Leja sequences and generalized sparse grid like refinement. """ def __init__(self, num_vars, candidate_samples, factorization_type='fast'): """ num_vars : integer The number of random variables candidate_samples : np.ndarray (num_vars, ncandidate_samples) The candidate samples from which the leja sequence is selected factorization_type : string fast - update LU factorization at each step slow - recompute LU factorization at each step """ super(AdaptiveLejaPCE, self).__init__(num_vars, 1e6) # Make sure correct preconditioning function is used. # AdaptiveInducedPCE has some internal logic that can overide default # we want self.set_preconditioning_function(christoffel_preconditioning_function) # Must be in canonical space # TODO: generate candidate samples at each iteration from induced # distribution using current self.poly_indices self.candidate_samples = candidate_samples self.factorization_type = factorization_type
[docs] def precond_canonical_basis_matrix(self, samples): basis_matrix = self.pce.canonical_basis_matrix(samples) precond_weights = self.precond_func(basis_matrix, samples) precond_basis_matrix = basis_matrix*precond_weights[:, np.newaxis] return precond_basis_matrix, precond_weights
[docs] def get_num_new_subspace_samples(self, new_subspace_indices, num_current_subspaces): num_current_subspaces = self.subspace_indices.shape[1] num_vars, num_new_subspaces = new_subspace_indices.shape num_new_subspace_samples = np.empty((num_new_subspaces), dtype=int) for ii in range(num_new_subspaces): II = get_subspace_active_poly_array_indices( self, num_current_subspaces+ii) num_new_subspace_samples[ii] = II.shape[0] return num_new_subspace_samples
[docs] def condition_number(self): if self.factorization_type == 'slow': return np.linalg.cond(self.L_factor.dot(self.U_factor)) else: L, U = split_lu_factorization_matrix( self.LU_factor, num_pivots=self.samples.shape[1]) return np.linalg.cond(L.dot(U))
[docs] def update_leja_sequence_slow(self, new_subspace_indices): num_samples = self.samples.shape[1] # There will be two copies of self.samples in candidate_samples # but pivoting will only choose these samples once when number of # desired samples is smaller than # self.candidate_samples.shape[0]-self.samples.shape[1] candidate_samples = np.hstack([self.samples, self.candidate_samples]) self.pce.set_indices(self.poly_indices) precond_basis_matrix, precond_weights = \ self.precond_canonical_basis_matrix(candidate_samples) # TODO: update LU factorization using new candidate points, This # requires writing a function that updates not just new columns of # L and U factor but also allows new rows to be added. max_iters = self.poly_indices.shape[1] num_initial_rows = num_samples self.L_factor, self.U_factor, pivots =\ truncated_pivoted_lu_factorization( precond_basis_matrix, max_iters, num_initial_rows=num_initial_rows) self.pivots = np.arange(num_samples)[pivots[:num_initial_rows]] self.pivots = np.concatenate( [self.pivots, np.arange(num_initial_rows, pivots.shape[0])]) self.precond_weights = precond_weights[pivots, np.newaxis] return candidate_samples[:, pivots[num_samples:]]
[docs] def update_leja_sequence_fast(self, new_subspace_indices, num_current_subspaces): num_samples = self.samples.shape[1] if num_samples == 0: self.pce.set_indices(self.poly_indices) max_iters = self.poly_indices.shape[1] # Keep unconditioned self.basis_matrix = self.precond_canonical_basis_matrix( self.candidate_samples)[0] self.LU_factor, self.seq_pivots = \ truncated_pivoted_lu_factorization( self.basis_matrix, max_iters, truncate_L_factor=False) self.pivots = get_final_pivots_from_sequential_pivots( self.seq_pivots.copy())[:max_iters] self.precond_weights = self.precond_func( self.basis_matrix, self.candidate_samples)[:, np.newaxis] return self.candidate_samples[ :, self.pivots[num_samples:self.poly_indices.shape[1]]] num_vars, num_new_subspaces = new_subspace_indices.shape unique_poly_indices = np.zeros((num_vars, 0), dtype=int) for ii in range(num_new_subspaces): II = get_subspace_active_poly_array_indices( self, num_current_subspaces+ii) unique_poly_indices = np.hstack( [unique_poly_indices, self.poly_indices[:, II]]) self.pce.set_indices(unique_poly_indices) precond_weights_prev = self.precond_weights pivoted_precond_weights_prev = pivot_rows( self.seq_pivots, precond_weights_prev, False) new_cols = self.pce.canonical_basis_matrix(self.candidate_samples) self.basis_matrix = np.hstack([self.basis_matrix, np.array(new_cols)]) new_cols *= precond_weights_prev self.LU_factor = add_columns_to_pivoted_lu_factorization( np.array(self.LU_factor), new_cols, self.seq_pivots[:num_samples]) self.precond_weights = self.precond_func( self.basis_matrix, self.candidate_samples)[:, np.newaxis] pivoted_precond_weights = pivot_rows( self.seq_pivots, self.precond_weights, False) self.LU_factor = unprecondition_LU_factor( self.LU_factor, pivoted_precond_weights_prev/pivoted_precond_weights, num_samples) max_iters = self.poly_indices.shape[1] self.LU_factor, self.seq_pivots, _ = continue_pivoted_lu_factorization( self.LU_factor.copy(), self.seq_pivots, self.samples.shape[1], max_iters, num_initial_rows=0) self.pivots = get_final_pivots_from_sequential_pivots( self.seq_pivots.copy())[:max_iters] self.pce.set_indices(self.poly_indices) return self.candidate_samples[ :, self.pivots[num_samples:self.poly_indices.shape[1]]]
[docs] def create_new_subspaces_data(self, new_subspace_indices): num_current_subspaces = self.subspace_indices.shape[1] self.initialize_subspaces(new_subspace_indices) if self.factorization_type == 'fast': unique_subspace_samples = self.update_leja_sequence_fast( new_subspace_indices, num_current_subspaces) else: unique_subspace_samples = self.update_leja_sequence_slow( new_subspace_indices) num_new_subspace_samples = self.get_num_new_subspace_samples( new_subspace_indices, num_current_subspaces) return unique_subspace_samples, num_new_subspace_samples
[docs] def add_new_subspaces(self, new_subspace_indices): num_new_subspace_samples = super( AdaptiveInducedPCE, self).add_new_subspaces(new_subspace_indices) if self.factorization_type == 'fast': it = self.samples.shape[1] temp = solve_triangular( self.LU_factor[:it, :it], self.values*self.precond_weights[self.pivots], lower=True, unit_diagonal=True) a_f = self.LU_factor[:it, :it] else: temp = solve_triangular( self.L_factor, self.values[self.pivots]*self.precond_weights, lower=True) a_f = self.U_factor coef = solve_triangular(a_f, temp, lower=False) self.pce.set_coefficients(coef) return num_new_subspace_samples
[docs] def __call__(self, samples, return_grad=False): return self.pce(samples, return_grad)
[docs] def get_active_unique_poly_indices(self): II = get_active_poly_array_indices(self) return self.poly_indices[:, II]