import copy
from functools import partial
import numpy as np
from pyapprox.variables.transforms import AffineTransform
from pyapprox.variables.joint import IndependentMarginalsVariable
from pyapprox.util.utilities import (
cartesian_product, outer_product, hash_array, nchoosek
)
from pyapprox.util.utilities import get_tensor_product_quadrature_rule
from pyapprox.surrogates.orthopoly.recursion_factory import (
get_recursion_coefficients_from_variable
)
from pyapprox.surrogates.orthopoly.orthonormal_polynomials import (
evaluate_orthonormal_polynomial_deriv_1d,
evaluate_orthonormal_polynomial_1d, gauss_quadrature,
define_orthopoly_options_from_marginal
)
from pyapprox.surrogates.orthopoly.quadrature import (
get_gauss_quadrature_rule_from_marginal
)
from pyapprox.surrogates.interp.monomial import monomial_basis_matrix
from pyapprox.util.linalg import (
flattened_rectangular_lower_triangular_matrix_index
)
from pyapprox.surrogates.interp.manipulate_polynomials import add_polynomials
from pyapprox.surrogates.interp.indexing import (
compute_hyperbolic_level_indices
)
def make_2D_array(lis):
"""Funciton to get 2D array from a list of lists
numba does not support lists of lists so need to compute lists to 2D array
and store different lengths
"""
n = len(lis)
lengths = np.array([len(x) for x in lis])
max_len = np.max(lengths)
arr = np.zeros((n, max_len))
for i in range(n):
arr[i, :lengths[i]] = lis[i]
return arr, lengths
def precompute_multivariate_orthonormal_polynomial_univariate_values(
samples, indices, recursion_coeffs, deriv_order, basis_type_index_map):
num_vars = indices.shape[0]
# axis keyword is not supported when usingnumba
# max_level_1d = indices.max(axis=1)
# so replace with
max_level_1d = np.empty((num_vars), dtype=np.int_)
for ii in range(num_vars):
max_level_1d[ii] = indices[ii, :].max()
if basis_type_index_map is None:
# numba requires np.int_ not int or np.int
basis_type_index_map = np.zeros((num_vars), dtype=np.int_)
recursion_coeffs = [recursion_coeffs]
for dd in range(num_vars):
if (recursion_coeffs[basis_type_index_map[dd]].shape[0] <=
max_level_1d[dd]):
msg = f'max_level_1d for dimension {dd} exceeds number of '
msg += 'precomputed recusion coefficients'
raise Exception(msg)
basis_vals_1d = np.empty(
(num_vars, (1+deriv_order)*(max_level_1d.max()+1), samples.shape[1]),
dtype=np.float64)
# WARNING some entries of basis_vals_1d will remain uninitialized
# when max_level_1d[dd]=max_level_1d.max() for directions dd
# storing arrays of equal size allows fast vectorization based manipulation
# in downstream functions
for dd in range(num_vars):
basis_vals_1d[dd, :(deriv_order+1)*(max_level_1d[dd]+1), :] = \
evaluate_orthonormal_polynomial_deriv_1d(
samples[dd, :], max_level_1d[dd],
recursion_coeffs[basis_type_index_map[dd]], deriv_order).T
return basis_vals_1d
def evaluate_multivariate_orthonormal_polynomial_values(
indices, basis_vals_1d, num_samples):
num_vars, num_indices = indices.shape
temp1 = basis_vals_1d.reshape(
(num_vars*basis_vals_1d.shape[1], num_samples))
temp2 = temp1[indices.ravel()+np.repeat(
np.arange(num_vars)*basis_vals_1d.shape[1], num_indices), :].reshape(
num_vars, num_indices, num_samples)
values = np.prod(temp2, axis=0).T
return values
def evaluate_multivariate_orthonormal_polynomial_derivs(
indices, max_level_1d, basis_vals_1d, num_samples, deriv_order):
# TODO Consider combining
# evaluate_multivariate_orthonormal_polynomial_values and derivs and
# evaluate_multivariate_orthonormal_polynomial_derivs because they both
# compute temp2
assert deriv_order == 1
num_vars, num_indices = indices.shape
# extract derivatives
temp1 = basis_vals_1d.reshape(
(num_vars*basis_vals_1d.shape[1], num_samples))
temp2 = temp1[indices.ravel()+np.repeat(
np.arange(num_vars)*basis_vals_1d.shape[1], num_indices), :].reshape(
num_vars, num_indices, num_samples)
# values = np.prod(temp2, axis=0).T
# derivs are stored immeadiately after values in basis_vals_1d
# if max_level_1d[dd]!=max_level_1d.max() then there will be some
# uninitialized values at the end of the array but these are never accessed
temp3 = temp1[indices.ravel()+np.repeat(
max_level_1d+1+np.arange(num_vars)*basis_vals_1d.shape[1],
num_indices), :].reshape(num_vars, num_indices, num_samples)
derivs = np.empty((num_samples*num_vars, num_indices))
for jj in range(num_vars):
derivs[(jj)*num_samples:(jj+1)*num_samples] = (
np.prod(temp2[:jj], axis=0)*np.prod(
temp2[jj+1:], axis=0)*temp3[jj]).T
return derivs
def evaluate_multivariate_orthonormal_polynomial(
samples, indices, recursion_coeffs, deriv_order=0,
basis_type_index_map=None):
"""
Evaluate a multivaiate orthonormal polynomial and its s-derivatives
(s=1,...,num_derivs) using a three-term recurrence coefficients.
Parameters
----------
samples : np.ndarray (num_vars, num_samples)
Samples at which to evaluate the polynomial
indices : np.ndarray (num_vars, num_indices)
The exponents of each polynomial term
recursion_coeffs : list of np.ndarray (num_indices,2)
The recursion coefficients for each unique polynomial
deriv_order : integer in [0,1]
The maximum order of the derivatives to evaluate.
basis_type_index_map : list
The index into recursion coeffs that points to the unique recursion
coefficients associated with each dimension
Return
------
values : np.ndarray (1+deriv_order*num_samples,num_indices)
The values of the polynomials at the samples
"""
num_vars, num_indices = indices.shape
assert samples.shape[0] == num_vars
assert samples.shape[1] > 0
# assert recursion_coeffs.shape[0]>indices.max()
max_level_1d = indices.max(axis=1)
assert deriv_order >= 0 and deriv_order <= 1
# My cython implementaion is currently slower than pure python found here
# try:
# from pyapprox.cython.multivariate_polynomials import \
# evaluate_multivariate_orthonormal_polynomial_pyx
# return evaluate_multivariate_orthonormal_polynomial_pyx(
# samples,indices,recursion_coeffs,deriv_order)
# except:
# print('evaluate_multivariate_orthonormal_polynomial extension failed')
# precompute 1D basis functions for faster evaluation of
# multivariate terms
precompute_values = \
precompute_multivariate_orthonormal_polynomial_univariate_values
compute_values = evaluate_multivariate_orthonormal_polynomial_values
compute_derivs = evaluate_multivariate_orthonormal_polynomial_derivs
basis_vals_1d = precompute_values(
samples, indices, recursion_coeffs, deriv_order, basis_type_index_map)
num_samples = samples.shape[1]
values = compute_values(indices, basis_vals_1d, num_samples)
if deriv_order == 0:
return values
derivs = compute_derivs(
indices, max_level_1d, basis_vals_1d, num_samples, deriv_order)
values = np.vstack([values, derivs])
return values
[docs]class PolynomialChaosExpansion(object):
"""
A polynomial chaos expansion for independent random variables.
"""
def __init__(self):
""" Constructor. """
self.coefficients = None
self.indices = None
self.recursion_coeffs = []
self.basis_type_index_map = None
self.basis_type_var_indices = []
def __mul__(self, other):
if self.indices.shape[1] > other.indices.shape[1]:
poly1 = self
poly2 = other
else:
poly1 = other
poly2 = self
poly1 = copy.deepcopy(poly1)
poly2 = copy.deepcopy(poly2)
max_degrees1 = poly1.indices.max(axis=1)
max_degrees2 = poly2.indices.max(axis=1)
# print('###')
# print(max_degrees1,max_degrees2)
product_coefs_1d = compute_product_coeffs_1d_for_each_variable(
poly1, max_degrees1, max_degrees2)
# print(product_coefs_1d)
indices, coefs = \
multiply_multivariate_orthonormal_polynomial_expansions(
product_coefs_1d, poly1.get_indices(),
poly1.get_coefficients(),
poly2.get_indices(), poly2.get_coefficients())
# get_polynomial_from_variable(self.var_trans.variable)
poly = copy.deepcopy(self)
poly.set_indices(indices)
poly.set_coefficients(coefs)
return poly
def __add__(self, other):
indices_list = [self.indices, other.indices]
coefs_list = [self.coefficients, other.coefficients]
indices, coefs = add_polynomials(indices_list, coefs_list)
poly = get_polynomial_from_variable(self.var_trans.variable)
poly.set_indices(indices)
poly.set_coefficients(coefs)
return poly
def __sub__(self, other):
indices_list = [self.indices, other.indices]
coefs_list = [self.coefficients, -other.coefficients]
indices, coefs = add_polynomials(indices_list, coefs_list)
poly = get_polynomial_from_variable(self.var_trans.variable)
poly.set_indices(indices)
poly.set_coefficients(coefs)
return poly
def __pow__(self, order):
poly = get_polynomial_from_variable(self.var_trans.variable)
if order == 0:
poly.set_indices(np.zeros([self.num_vars(), 1], dtype=int))
poly.set_coefficients(np.ones([1, self.coefficients.shape[1]]))
return poly
poly = copy.deepcopy(self)
for ii in range(2, order+1):
poly = poly*self
return poly
# def substitute(self, other):
# """
# Final polynomial will use orthonormal basis of other for any variable
# in other. E.g. if other is denoted z = p(x,y) and we are computing
# f(z)
# then f(z) = f(x,y) where we use basis associated with x and y and not
# with z.
# I have code to do this but it requires a transformation from an
# orthogonal basis into the monomial basis then into another orthogonal
# basis and this transformation can be ill-conditioned.
# """
# raise Exception(Not implemented)
[docs] def update_recursion_coefficients(self, num_coefs_per_var):
num_coefs_per_var = np.atleast_1d(num_coefs_per_var)
initializing = False
if self.basis_type_index_map is None:
initializing = True
self.basis_type_index_map = np.zeros((self.num_vars()), dtype=int)
ii = 0
for key, poly_opts in self.config_opts['poly_types'].items():
if (initializing or (
np.any(num_coefs_per_var[self.basis_type_var_indices[ii]] >
self.max_degree[self.basis_type_var_indices[ii]]+1))):
if initializing:
self.basis_type_var_indices.append(poly_opts['var_nums'])
num_coefs = num_coefs_per_var[
self.basis_type_var_indices[ii]].max()
recursion_coeffs_ii = get_recursion_coefficients_from_variable(
poly_opts["var"], num_coefs, poly_opts)
if recursion_coeffs_ii is None:
# recursion coefficients will be None is returned if
# monomial basis is specified. Only allow monomials to
# be used if all variables use monomial basis
assert len(self.config_opts['poly_types']) == 1
if initializing:
self.recursion_coeffs.append(recursion_coeffs_ii)
else:
self.recursion_coeffs[ii] = recursion_coeffs_ii
# extract variables indices for which basis is to be used
self.basis_type_index_map[self.basis_type_var_indices[ii]] = ii
ii += 1
if (np.unique(np.hstack(self.basis_type_var_indices)).shape[0] !=
self.num_vars()):
msg = 'poly_opts does not specify a basis for each input '
msg += 'variable'
raise ValueError(msg)
[docs] def set_indices(self, indices):
# assert indices.dtype==int
if indices.ndim == 1:
indices = indices.reshape((1, indices.shape[0]))
self.indices = indices
assert indices.shape[0] == self.num_vars()
max_degree = indices.max(axis=1)
if np.any(self.max_degree < max_degree):
self.update_recursion_coefficients(max_degree+1)
self.max_degree = max_degree
[docs] def basis_matrix(self, samples, opts=dict()):
assert samples.ndim == 2
assert samples.shape[0] == self.num_vars()
canonical_samples = self.var_trans.map_to_canonical(
samples)
basis_matrix = self.canonical_basis_matrix(canonical_samples, opts)
deriv_order = opts.get('deriv_order', 0)
if deriv_order == 1:
basis_matrix[samples.shape[1]:, :] =\
self.var_trans.map_derivatives_from_canonical_space(
basis_matrix[samples.shape[1]:, :])
return basis_matrix
[docs] def canonical_basis_matrix(self, canonical_samples, opts=dict()):
deriv_order = opts.get('deriv_order', 0)
if self.recursion_coeffs[0] is not None:
basis_matrix = evaluate_multivariate_orthonormal_polynomial(
canonical_samples, self.indices, self.recursion_coeffs,
deriv_order, self.basis_type_index_map)
else:
basis_matrix = monomial_basis_matrix(
self.indices, canonical_samples, deriv_order)
return basis_matrix
[docs] def jacobian(self, sample):
assert sample.shape[1] == 1
derivative_matrix = self.basis_matrix(
sample, {'deriv_order': 1})[1:]
jac = derivative_matrix.dot(self.coefficients).T
return jac
[docs] def set_coefficients(self, coefficients):
assert coefficients.ndim == 2
assert coefficients.shape[0] == self.num_terms()
self.coefficients = coefficients.copy()
[docs] def get_coefficients(self):
if self.coefficients is not None:
return self.coefficients.copy()
[docs] def get_indices(self):
return self.indices.copy()
[docs] def value(self, samples):
basis_matrix = self.basis_matrix(samples)
return np.dot(basis_matrix, self.coefficients)
[docs] def num_vars(self):
return self.var_trans.num_vars()
[docs] def __call__(self, samples, return_grad=False):
vals = self.value(samples)
if not return_grad:
return vals
jacs = [self.jacobian(sample[:, None]) for sample in samples.T]
if samples.shape[1] == 1:
return vals, jacs[0]
return vals, jacs
[docs] def mean(self):
"""
Compute the mean of the polynomial chaos expansion
Returns
-------
mean : np.ndarray (nqoi)
The mean of each quantitity of interest
"""
return self.coefficients[0, :]
[docs] def variance(self):
"""
Compute the variance of the polynomial chaos expansion
Returns
-------
var : np.ndarray (nqoi)
The variance of each quantitity of interest
"""
var = np.sum(self.coefficients[1:, :]**2, axis=0)
return var
[docs] def covariance(self):
"""
Compute the covariance between each quantity of interest of the
polynomial chaos expansion
Returns
-------
covar : np.ndarray (nqoi)
The covariance between each quantitity of interest
"""
# nqoi = self.coefficients.shape[1]
covar = self.coefficients[1:, :].T.dot(self.coefficients[1:, :])
return covar
[docs] def num_terms(self):
# truncated svd creates basis with num_terms <= num_indices
return self.indices.shape[1]
def get_univariate_quadrature_rules_from_pce(pce, degrees):
num_vars = pce.num_vars()
degrees = np.atleast_1d(degrees)
if degrees.shape[0] == 1 and num_vars > 1:
degrees = np.array([degrees[0]]*num_vars)
if np.any(pce.max_degree < degrees):
pce.update_recursion_coefficients(degrees+1)
if len(pce.recursion_coeffs) == 1:
# update_recursion_coefficients may not return coefficients
# up to degree specified if using recursion for polynomial
# orthogonal to a discrete variable with finite non-zero
# probability measures
assert pce.recursion_coeffs[0].shape[0] >= degrees.max()+1
univariate_quadrature_rules = [
partial(gauss_quadrature, pce.recursion_coeffs[0])]*num_vars
else:
univariate_quadrature_rules = []
for dd in range(num_vars):
# update_recursion_coefficients may not return coefficients
# up to degree specified if using recursion for polynomial
# orthogonal to a discrete variable with finite non-zero
# probability measures
assert (
pce.recursion_coeffs[pce.basis_type_index_map[dd]].shape[0] >=
degrees[dd]+1)
univariate_quadrature_rules.append(
partial(gauss_quadrature,
pce.recursion_coeffs[pce.basis_type_index_map[dd]]))
return univariate_quadrature_rules
def get_tensor_product_quadrature_rule_from_pce(pce, degrees):
univariate_quadrature_rules = get_univariate_quadrature_rules_from_pce(
pce, degrees)
canonical_samples, weights = \
get_tensor_product_quadrature_rule(
degrees+1, pce.num_vars(), univariate_quadrature_rules)
samples = pce.var_trans.map_from_canonical(
canonical_samples)
return samples, weights
def define_poly_options_from_variable(variable):
basis_opts = dict()
for ii in range(len(variable.unique_variables)):
var = variable.unique_variables[ii]
opts = define_orthopoly_options_from_marginal(var)
opts['var_nums'] = variable.unique_variable_indices[ii]
basis_opts['basis%d' % ii] = opts
return basis_opts
def define_poly_options_from_variable_transformation(var_trans):
pce_opts = {'var_trans': var_trans}
basis_opts = define_poly_options_from_variable(var_trans.variable)
pce_opts['poly_types'] = basis_opts
return pce_opts
def conditional_moments_of_polynomial_chaos_expansion(
poly, samples, inactive_idx, return_variance=False):
"""
Return mean and variance of polynomial chaos expansion with some variables
fixed at specified values.
Parameters
----------
poly: PolynomialChaosExpansion
The polynomial used to compute moments
inactive_idx : np.ndarray (ninactive_vars)
The indices of the fixed variables
samples : np.ndarray (ninactive_vars)
The samples of the inacive dimensions fixed when computing moments
Returns
-------
mean : np.ndarray
The conditional mean (num_qoi)
variance : np.ndarray
The conditional variance (num_qoi). Only returned if
return_variance=True. Computing variance is significantly slower than
computing mean. TODO check it is indeed slower
"""
assert samples.shape[0] == len(inactive_idx)
assert samples.ndim == 2 and samples.shape[1] == 1
assert poly.coefficients is not None
coef = poly.get_coefficients()
indices = poly.get_indices()
# precompute 1D basis functions for faster evaluation of
# multivariate terms
basis_vals_1d = []
for dd in range(len(inactive_idx)):
basis_vals_1d_dd = evaluate_orthonormal_polynomial_1d(
samples[dd, :], indices[inactive_idx[dd], :].max(),
poly.recursion_coeffs[poly.basis_type_index_map[inactive_idx[dd]]])
basis_vals_1d.append(basis_vals_1d_dd)
active_idx = np.setdiff1d(np.arange(poly.num_vars()), inactive_idx)
mean = coef[0].copy()
for ii in range(1, indices.shape[1]):
index = indices[:, ii]
coef_ii = coef[ii] # this intentionally updates the coef matrix
for dd in range(len(inactive_idx)):
coef_ii *= basis_vals_1d[dd][0, index[inactive_idx[dd]]]
if index[active_idx].sum() == 0:
mean += coef_ii
if not return_variance:
return mean
unique_indices, repeated_idx = np.unique(
indices[active_idx, :], axis=1, return_inverse=True)
new_coef = np.zeros((unique_indices.shape[1], coef.shape[1]))
for ii in range(repeated_idx.shape[0]):
new_coef[repeated_idx[ii]] += coef[ii]
variance = np.sum(new_coef**2, axis=0)-mean**2
return mean, variance
def marginalize_polynomial_chaos_expansion(poly, inactive_idx, center=True):
"""
This function is not optimal. It will recreate the options
used to configure the polynomial. Any recursion coefficients
calculated which are still relevant will need to be computed.
This is probably not a large overhead though
"""
marginalized_pce = PolynomialChaosExpansion()
# poly.config_opts.copy will not work
opts = copy.deepcopy(poly.config_opts)
all_variables = poly.var_trans.variable.marginals()
active_idx = np.setdiff1d(np.arange(poly.num_vars()), inactive_idx)
active_variables = IndependentMarginalsVariable(
[all_variables[ii] for ii in active_idx])
opts['var_trans'] = AffineTransform(active_variables)
marginalized_var_nums = -np.ones(poly.num_vars())
marginalized_var_nums[active_idx] = np.arange(active_idx.shape[0])
keys_to_delete = []
for key, poly_opts in opts['poly_types'].items():
var_nums = poly_opts['var_nums']
poly_opts['var_nums'] = np.array(
[marginalized_var_nums[v] for v in var_nums
if v in active_idx], dtype=int)
if poly_opts['var_nums'].shape[0] == 0:
keys_to_delete.append(key)
for key in keys_to_delete:
del opts['poly_types'][key]
marginalized_pce.configure(opts)
if poly.indices is not None:
marginalized_array_indices = []
for ii, index in enumerate(poly.indices.T):
if ((index.sum() == 0 and center is False) or
np.any(index[active_idx]) and
(not np.any(index[inactive_idx] > 0))):
marginalized_array_indices.append(ii)
marginalized_pce.set_indices(
poly.indices[
np.ix_(active_idx, np.array(marginalized_array_indices))])
if poly.coefficients is not None:
marginalized_pce.set_coefficients(
poly.coefficients[marginalized_array_indices, :].copy())
return marginalized_pce
def get_polynomial_from_variable(variable):
var_trans = AffineTransform(
variable)
poly = PolynomialChaosExpansion()
poly_opts = define_poly_options_from_variable_transformation(var_trans)
poly.configure(poly_opts)
return poly
def compute_univariate_orthonormal_basis_products(get_recursion_coefficients,
max_degree1, max_degree2):
"""
Compute all the products of univariate orthonormal bases and re-express
them as expansions using the orthnormal basis.
"""
assert max_degree1 >= max_degree2
max_degree = max_degree1+max_degree2
num_quad_points = max_degree+1
recursion_coefs = get_recursion_coefficients(num_quad_points)
x_quad, w_quad = gauss_quadrature(recursion_coefs, num_quad_points)
w_quad = w_quad[:, np.newaxis]
# evaluate the orthonormal basis at the quadrature points. This can
# be computed once for all degrees up to the maximum degree
ortho_basis_matrix = evaluate_orthonormal_polynomial_1d(
x_quad, max_degree, recursion_coefs)
# compute coefficients of orthonormal basis using pseudo
# spectral projection
product_coefs = []
for d1 in range(max_degree1+1):
for d2 in range(min(d1+1, max_degree2+1)):
product_vals = ortho_basis_matrix[:, d1]*ortho_basis_matrix[:, d2]
coefs = w_quad.T.dot(
product_vals[:, np.newaxis]*ortho_basis_matrix[:, :d1+d2+1]).T
product_coefs.append(coefs)
return product_coefs
def compute_product_coeffs_1d_for_each_variable(
poly, max_degrees1, max_degrees2):
# must ensure that poly1 and poly2 have the same basis types
# in each dimension
num_vars = poly.num_vars()
def get_recursion_coefficients(N, dd):
poly.update_recursion_coefficients([N]*num_vars)
return poly.recursion_coeffs[poly.basis_type_index_map[dd]].copy()
# change this to only compute this for unique 1d polys
product_coefs_1d = [
compute_univariate_orthonormal_basis_products(
partial(get_recursion_coefficients, dd=dd),
max_degrees1[dd], max_degrees2[dd])
for dd in range(num_vars)]
return product_coefs_1d
def compute_multivariate_orthonormal_basis_product(
product_coefs_1d, poly_index_ii, poly_index_jj, max_degrees1,
max_degrees2, tol=2*np.finfo(float).eps):
"""
Compute the product of two multivariate orthonormal bases and re-express
as an expansion using the orthnormal basis.
"""
num_vars = poly_index_ii.shape[0]
poly_index = poly_index_ii+poly_index_jj
active_vars = np.where(poly_index > 0)[0]
if active_vars.shape[0] > 0:
coefs_1d = []
for dd in active_vars:
pii, pjj = poly_index_ii[dd], poly_index_jj[dd]
if pii < pjj:
tmp = pjj
pjj = pii
pii = tmp
kk = flattened_rectangular_lower_triangular_matrix_index(
pii, pjj, max_degrees1[dd]+1, max_degrees2[dd]+1)
coefs_1d.append(product_coefs_1d[dd][kk][:, 0])
indices_1d = [np.arange(poly_index[dd]+1)
for dd in active_vars]
product_coefs = outer_product(coefs_1d)[:, np.newaxis]
active_product_indices = cartesian_product(indices_1d)
II = np.where(np.absolute(product_coefs) > tol)[0]
active_product_indices = active_product_indices[:, II]
product_coefs = product_coefs[II]
product_indices = np.zeros(
(num_vars, active_product_indices.shape[1]), dtype=int)
product_indices[active_vars] = active_product_indices
else:
product_coefs = np.ones((1, 1))
product_indices = np.zeros([num_vars, 1], dtype=int)
return product_indices, product_coefs
def multiply_multivariate_orthonormal_polynomial_expansions(
product_coefs_1d, poly_indices1, poly_coefficients1, poly_indices2,
poly_coefficients2):
num_indices1 = poly_indices1.shape[1]
num_indices2 = poly_indices2.shape[1]
assert num_indices2 <= num_indices1
assert poly_coefficients1.shape[0] == num_indices1
assert poly_coefficients2.shape[0] == num_indices2
# following assumes the max degrees were used to create product_coefs_1d
max_degrees1 = poly_indices1.max(axis=1)
max_degrees2 = poly_indices2.max(axis=1)
basis_coefs, basis_indices = [], []
for ii in range(num_indices1):
poly_index_ii = poly_indices1[:, ii]
for jj in range(num_indices2):
poly_index_jj = poly_indices2[:, jj]
product_indices, product_coefs = \
compute_multivariate_orthonormal_basis_product(
product_coefs_1d, poly_index_ii, poly_index_jj,
max_degrees1, max_degrees2)
# print(ii,jj,product_coefs,poly_index_ii,poly_index_jj)
# TODO for unique polynomials the product_coefs and indices
# of [0,1,2] is the same as [2,1,0] so perhaps store
# sorted active indices and look up to reuse computations
product_coefs_iijj = product_coefs*poly_coefficients1[ii, :] *\
poly_coefficients2[jj, :]
basis_coefs.append(product_coefs_iijj)
basis_indices.append(product_indices)
assert basis_coefs[-1].shape[0] == basis_indices[-1].shape[1]
indices, coefs = add_polynomials(basis_indices, basis_coefs)
return indices, coefs
def get_univariate_quadrature_rules_from_variable(
variable, max_nsamples, canonical=False):
max_nsamples = np.atleast_1d(max_nsamples)
if max_nsamples.shape[0] == 1:
max_nsamples = np.ones(variable.num_vars(), dtype=int)*max_nsamples[0]
if max_nsamples.shape[0] != variable.num_vars():
raise ValueError(
"max_nsamples must be an integer or specfied for each marginal")
univariate_quad_rules = []
for ii, marginal in enumerate(variable.marginals()):
quad_rule = get_gauss_quadrature_rule_from_marginal(
marginal, max_nsamples[ii], canonical=canonical)
univariate_quad_rules.append(quad_rule)
return univariate_quad_rules
def get_coefficients_for_plotting(pce, qoi_idx):
"""
Get the coefficients of a
:class:`pyapprox.polynomial_chaos.multivariate_polynomials.PolynomialChaosExpansion`
"""
coeff = pce.get_coefficients()[:, qoi_idx]
indices = pce.indices.copy()
assert coeff.shape[0] == indices.shape[1]
num_vars = pce.num_vars()
degree = -1
indices_dict = dict()
max_degree = indices.sum(axis=0).max()
for ii in range(indices.shape[1]):
key = hash_array(indices[:, ii])
indices_dict[key] = ii
i = 0
degree_breaks = []
coeff_sorted = []
degree_indices_set = np.empty((num_vars, 0))
for degree in range(max_degree+1):
nterms = nchoosek(num_vars+degree, degree)
if nterms < 1e6:
degree_indices = compute_hyperbolic_level_indices(
num_vars, degree, 1.)
else:
'Could not plot coefficients of terms with degree >= %d' % degree
break
degree_indices_set = np.hstack((degree_indices_set, indices))
for ii in range(degree_indices.shape[1]-1, -1, -1):
index = degree_indices[:, ii]
key = hash_array(index)
if key in indices_dict:
coeff_sorted.append(coeff[indices_dict[key]])
else:
coeff_sorted.append(0.0)
i += 1
degree_breaks.append(i)
return np.array(coeff_sorted), degree_indices_set, degree_breaks
def plot_unsorted_pce_coefficients(coeffs,
indices,
ax,
degree_breaks=None,
axislabels=None,
legendlabels=None,
title=None,
cutoffs=None,
ylim=[1e-6, 1.]):
"""
Plot the coefficients of linear (in parameters) approximation
"""
np.set_printoptions(precision=16)
colors = ['k', 'r', 'b', 'g', 'm', 'y', 'c']
markers = ['s', 'o', 'd', ]
fill_style = ['none', 'full']
# fill_style = ['full','full']
ax.set_xlim([1, coeffs[0].shape[0]])
for i in range(len(coeffs)):
coeffs_i = np.absolute(coeffs[i])
assert coeffs_i.ndim == 1
mfc = colors[i]
fs = fill_style[min(i, 1)]
if (fs == 'none'):
mfc = 'none'
ms = 10.-2.*i
ax.plot(list(range(1, coeffs_i.shape[0]+1)), coeffs_i,
marker=markers[i % 3],
fillstyle=fs,
markeredgecolor=colors[i],
linestyle='None', markerfacecolor=mfc,
markersize=ms)
if degree_breaks is not None:
for i in np.array(degree_breaks) + 1:
ax.axvline(i, linestyle='--', color='k')
if cutoffs is not None:
i = 1
for cutoff in cutoffs:
plt.axhline(cutoff, linestyle='-', color=colors[i])
i += 1
ax.set_xscale('log')
ax.set_yscale('log')
if (axislabels is not None):
ax.set_xlabel(axislabels[0], fontsize=20)
ax.set_ylabel(axislabels[1], rotation='horizontal', fontsize=20)
if (title is not None):
ax.set_title(title)
if (legendlabels is not None):
msg = "Must provide a legend label for each filename"
assert (len(legendlabels) >= len(coeffs)), msg
ax.legend(legendlabels, numpoints=1)
ax.set_ylim(ylim)
def plot_pce_coefficients(ax, pces, ylim=[1e-6, 1], qoi_idx=0):
"""
Plot the coefficients of multiple
:class:`pyapprox.polynomial_chaos.multivariate_polynomials.PolynomialChaosExpansion`
"""
coeffs = []
breaks = []
indices_list = []
max_num_indices = 0
for pce in pces:
# only plot coeff that will fit inside axis limits
coeff, indices, degree_breaks = get_coefficients_for_plotting(
pce, qoi_idx)
coeffs.append(coeff)
indices_list.append(indices)
breaks.append(degree_breaks)
max_num_indices = max(max_num_indices, indices.shape[1])
for ii in range(len(indices_list)):
nn = indices.shape[1]
if (nn < max_num_indices):
indices_list[ii] += [None]*(max_num_indices-nn)
coeffs[ii] = np.resize(coeffs[ii], max_num_indices)
coeffs[ii][nn:] = 0.
plot_unsorted_pce_coefficients(
coeffs, indices_list, ax, degree_breaks=breaks[0], ylim=ylim)
def _marginalize_function_1d(fun, variable, quad_degrees, ii, samples_ii,
qoi=0):
assert samples_ii.ndim == 1
all_variables = variable.marginals()
sub_variable = IndependentMarginalsVariable(
[all_variables[jj]
for jj in range(variable.num_vars()) if ii != jj])
pce = get_polynomial_from_variable(sub_variable)
xquad, wquad = get_tensor_product_quadrature_rule_from_pce(
pce, quad_degrees)
samples = np.empty((xquad.shape[0]+1, xquad.shape[1]))
indices = np.delete(np.arange(variable.num_vars()), ii)
samples[indices, :] = xquad
values = np.empty_like(samples_ii)
for jj in range(samples_ii.shape[0]):
samples[ii, :] = samples_ii[jj]
# print(jj, samples.shape, samples_ii.shape)
values_jj = fun(samples)[:, qoi]
values[jj] = values_jj.dot(wquad)
return values
def _marginalize_function_nd(fun, variable, quad_degrees, sub_indices,
sub_samples, qoi=0):
assert sub_samples.ndim == 2
assert sub_indices.shape[0] == sub_samples.shape[0]
assert quad_degrees.shape[0] == variable.num_vars() - sub_indices.shape[0]
if sub_indices.shape[0] == variable.num_vars():
return fun(sub_samples)[:, qoi]
all_variables = variable.marginals()
sub_variable = IndependentMarginalsVariable(
[all_variables[kk]
for kk in range(variable.num_vars()) if kk not in sub_indices])
pce = get_polynomial_from_variable(sub_variable)
xquad, wquad = get_tensor_product_quadrature_rule_from_pce(
pce, quad_degrees)
samples = np.empty((variable.num_vars(), xquad.shape[1]))
indices = np.delete(np.arange(variable.num_vars()), sub_indices)
samples[indices, :] = xquad
nsamples = sub_samples.shape[1]
values = np.empty((nsamples, 1))
for kk in range(nsamples):
samples[sub_indices, :] = sub_samples[:, kk:kk+1]
values_jj = fun(samples)[:, qoi]
values[kk, 0] = values_jj.dot(wquad)
return values