import copy
import numpy as np
from functools import partial
from scipy.optimize import OptimizeResult
from scipy.linalg import LinAlgWarning
from sklearn.linear_model import (
LassoCV, LassoLarsCV, LarsCV, OrthogonalMatchingPursuitCV, Lasso,
LassoLars, Lars, OrthogonalMatchingPursuit
)
from sklearn.exceptions import ConvergenceWarning
from sklearn.utils._testing import ignore_warnings
from sklearn.linear_model._base import LinearModel
from pyapprox.util.utilities import hash_array
from pyapprox.surrogates.interp.indexing import (
get_forward_neighbor, get_backward_neighbor,
compute_hyperbolic_indices
)
from pyapprox.variables.sampling import (
generate_independent_random_samples
)
from pyapprox.surrogates.polychaos.adaptive_polynomial_chaos import (
AdaptiveLejaPCE, variance_pce_refinement_indicator, AdaptiveInducedPCE
)
from pyapprox.surrogates.orthopoly.leja_sequences import christoffel_weights
from pyapprox.variables.marginals import is_bounded_continuous_variable
from pyapprox.surrogates.interp.adaptive_sparse_grid import (
variance_refinement_indicator,
CombinationSparseGrid, constant_increment_growth_rule,
get_sparse_grid_univariate_leja_quadrature_rules_economical,
max_level_admissibility_function, get_unique_max_level_1d,
get_unique_quadrule_variables
)
from pyapprox.variables.joint import IndependentMarginalsVariable
from pyapprox.variables.transforms import (
AffineTransform
)
from pyapprox.expdesign.low_discrepancy_sequences import halton_sequence
from pyapprox.surrogates.gaussianprocess.gaussian_process import (
AdaptiveGaussianProcess, CholeskySampler, GaussianProcess
)
from pyapprox.surrogates.gaussianprocess.kernels import (
Matern, WhiteKernel, ConstantKernel)
from pyapprox.surrogates.polychaos.gpc import (
PolynomialChaosExpansion, define_poly_options_from_variable_transformation
)
from pyapprox.surrogates.neural_networks import NeuralNetwork
class ApproximateResult(OptimizeResult):
pass
def adaptive_approximate_sparse_grid(
fun, variables, callback=None,
refinement_indicator=variance_refinement_indicator,
univariate_quad_rule_info=None, max_nsamples=100, tol=0, verbose=0,
config_variables_idx=None, config_var_trans=None, cost_function=None,
max_level_1d=None, max_level=None, basis_type="barycentric"):
r"""
Compute a sparse grid approximation of a function.
Parameters
----------
fun : callable
The function to be approximated
``fun(z) -> np.ndarray``
where ``z`` is a 2D np.ndarray with shape (nvars,nsamples) and the
output is a 2D np.ndarray with shape (nsamples,nqoi)
variables : IndependentMarginalsVariable
A set of independent univariate random variables
callback : callable
Function called after each iteration with the signature
``callback(approx_k)``
where approx_k is the current approximation object.
refinement_indicator : callable
A function that retuns an estimate of the error of a sparse grid
subspace with signature
``refinement_indicator(subspace_index,nnew_subspace_samples,sparse_grid) -> float, float``
where ``subspace_index`` is 1D np.ndarray of size (nvars),
``nnew_subspace_samples`` is an integer specifying the number
of new samples that will be added to the sparse grid by adding the
subspace specified by subspace_index and ``sparse_grid`` is the current
:class:`pyapprox.adaptive_sparse_grid.CombinationSparseGrid` object.
The two outputs are, respectively, the indicator used to control
refinement of the sparse grid and the change in error from adding the
current subspace. The indicator is typically but now always dependent
on the error.
univariate_quad_rule_info : list
List containing four entries. The first entry is a list
(or single callable) of univariate quadrature rules for each variable
with signature
``quad_rule(l)->np.ndarray,np.ndarray``
where the integer ``l`` specifies the level of the quadrature rule and
``x`` and ``w`` are 1D np.ndarray of size (nsamples) containing the
quadrature rule points and weights, respectively.
The second entry is a list (or single callable) of growth rules
with signature
``growth_rule(l)->integer``
where the output ``nsamples`` specifies the number of samples in the
quadrature rule of level ``l``.
If either entry is a callable then the same quad or growth rule is
applied to every variable.
The third entry is a list of np.ndarray (or single scalar) specifying
the variable dimensions that each unique quadrature rule is applied to.
The forth entry is a list which specifies the maximum level of each
unique quadrature rule. If None then max_level is assumed to be np.inf
for each quadrature rule. If a scalar then the same value is applied
to all quadrature rules. This entry is useful for certain quadrature
rules, e.g. Gauss Patterson, or Leja sequences for bounded discrete
variables where there is a limit on the number of levels that can be
used
max_nsamples : float
If ``cost_function==None`` then this argument is the maximum number of
evaluations of fun. If fun has configure variables
If ``cost_function!=None`` Then max_nsamples is the maximum cost of
constructing the sparse grid, i.e. the sum of the cost of evaluating
each point in the sparse grid.
The ``cost_function!=None` same behavior as ``cost_function==None``
can be obtained by setting cost_function = lambda config_sample: 1.
This is particularly useful if ``fun`` has configure variables
and evaluating ``fun`` at these different values of these configure
variables has different cost. For example if there is one configure
variable that can take two different values with cost 0.5, and 1
then 10 samples of both models will be measured as 15 samples and
so if max_nsamples is 19 the algorithm will not terminate because
even though the number of samples is the sparse grid is 20.
tol : float
Tolerance for termination. The construction of the sparse grid is
terminated when the estimate error in the sparse grid (determined by
``refinement_indicator`` is below tol.
verbose : integer
Controls messages printed during construction.
config_variable_idx : integer
The position in a sample array that the configure variables start
config_var_trans : pyapprox.adaptive_sparse_grid.ConfigureVariableTransformation
An object that takes configure indices in [0,1,2,3...]
and maps them to the configure values accepted by ``fun``
cost_function : callable
A function with signature
``cost_function(config_sample) -> float``
where config_sample is a np.ndarray of shape (nconfig_vars). The output
is the cost of evaluating ``fun`` at ``config_sample``. The units can
be anything but the units must be consistent with the units of
max_nsamples which specifies the maximum cost of constructing the
sparse grid.
max_level_1d : np.ndarray (nvars)
The maximum level of the sparse grid in each dimension. If None
There is no limit
max_level : integer
The maximum level l of the sparse grid. Only subspaces with indices
i that satisfy : math:`\lvert i \rvert_1\le l` can be added. If None
l=np.inf
basis_type : string (default="barycentric")
Specify the basis type to use. Currently the same basis must be used
for all dimensions. Options "barycentric", "linear", "quadratic"
Returns
-------
result : :class:`pyapprox.surrogates.approximate.ApproximateResult`
Result object with the following attributes
approx : :class:`pyapprox.adaptive_sparse_grid.CombinationSparseGrid`
The sparse grid approximation
"""
var_trans = AffineTransform(variables)
nvars = var_trans.num_vars()
if config_var_trans is not None:
nvars += config_var_trans.num_vars()
sparse_grid = CombinationSparseGrid(nvars, basis_type)
if max_level_1d is None:
max_level_1d = [np.inf]*nvars
elif np.isscalar(max_level_1d):
max_level_1d = [max_level_1d]*nvars
if max_level is None:
max_level = np.inf
if univariate_quad_rule_info is None:
quad_rules, growth_rules, unique_quadrule_indices, \
unique_max_level_1d = \
get_sparse_grid_univariate_leja_quadrature_rules_economical(
var_trans, method='pdf')
# Some quadrature rules have max_level enforce this here
for ii in range(len(unique_quadrule_indices)):
for ind in unique_quadrule_indices[ii]:
max_level_1d[ind] = min(
max_level_1d[ind], unique_max_level_1d[ii])
else:
quad_rules, growth_rules, unique_quadrule_indices, \
unique_max_level_1d = univariate_quad_rule_info
if unique_max_level_1d is None:
max_level_1d = np.minimum([np.inf]*nvars, max_level_1d)
elif np.isscalar(unique_max_level_1d):
max_level_1d = np.minimum(
[unique_max_level_1d]*nvars, max_level_1d)
else:
nunique_vars = len(quad_rules)
assert len(unique_max_level_1d) == nunique_vars
for ii in range(nunique_vars):
for jj in unique_quadrule_indices[ii]:
max_level_1d[jj] = np.minimum(
unique_max_level_1d[ii], max_level_1d[jj])
if config_var_trans is not None:
if max_level_1d is None:
msg = "max_level_1d must be set if config_var_trans is provided"
#raise ValueError(msg)
for ii, cv in enumerate(config_var_trans.config_values):
if len(cv) <= max_level_1d[config_variables_idx+ii]:
msg = f"maxlevel_1d {max_level_1d} and "
msg += "config_var_trans.config_values with shapes {0}".format(
[len(v) for v in config_var_trans.config_values])
msg += " are inconsistent."
raise ValueError(msg)
assert len(max_level_1d) == nvars
# todo change np.inf to argument that is passed into approximate
admissibility_function = partial(
max_level_admissibility_function, max_level, max_level_1d, max_nsamples,
tol, verbose=verbose)
sparse_grid.setup(
fun, config_variables_idx, refinement_indicator,
admissibility_function, growth_rules, quad_rules,
var_trans, unique_quadrule_indices=unique_quadrule_indices,
verbose=verbose, cost_function=cost_function,
config_var_trans=config_var_trans)
sparse_grid.build(callback)
return ApproximateResult({'approx': sparse_grid})
def adaptive_approximate_polynomial_chaos(
fun, variable, method="leja", options={}):
methods = {"leja": adaptive_approximate_polynomial_chaos_leja,
"induced": adaptive_approximate_polynomial_chaos_induced}
# "random": adaptive_approximate_polynomial_chaos_random}
if method not in methods:
msg = f'Method {method} not found.\n Available methods are:\n'
for key in methods.keys():
msg += f"\t{key}\n"
raise Exception(msg)
if options is None:
options = {}
return methods[method](fun, variable, **options)
def __initialize_leja_pce(
variables, generate_candidate_samples, ncandidate_samples):
for rv in variables.marginals():
if not is_bounded_continuous_variable(rv):
msg = "For now leja sampling based PCE is only supported for "
msg += " bounded continouous random variables when"
msg += " generate_candidate_samples is not provided."
if generate_candidate_samples is None:
raise Exception(msg)
else:
break
nvars = variables.num_vars()
if generate_candidate_samples is None:
# Todo implement default for non-bounded variables that uses induced
# sampling
# candidate samples must be in canonical domain
candidate_samples = -np.cos(
np.pi*halton_sequence(nvars, int(ncandidate_samples), 1))
# candidate_samples = -np.cos(
# np.random.uniform(0,np.pi,(nvars,int(ncandidate_samples))))
else:
candidate_samples = generate_candidate_samples(ncandidate_samples)
pce = AdaptiveLejaPCE(
nvars, candidate_samples, factorization_type='fast')
return pce
def __setup_adaptive_pce(pce, verbose, fun, var_trans, growth_rules,
refinement_indicator, tol, max_nsamples, callback,
max_level_1d):
pce.verbose = verbose
pce.set_function(fun, var_trans)
if growth_rules is None:
growth_incr = 2
growth_rules = partial(constant_increment_growth_rule, growth_incr)
assert callable(growth_rules)
unique_quadrule_variables, unique_quadrule_indices = \
get_unique_quadrule_variables(var_trans)
growth_rules = [growth_rules]*len(unique_quadrule_indices)
admissibility_function = None # provide after growth_rules have been added
pce.set_refinement_functions(
refinement_indicator, admissibility_function, growth_rules,
unique_quadrule_indices=unique_quadrule_indices)
nvars = var_trans.num_vars()
if max_level_1d is None:
max_level_1d = [np.inf]*nvars
elif np.isscalar(max_level_1d):
max_level_1d = [max_level_1d]*nvars
unique_max_level_1d = get_unique_max_level_1d(
var_trans, pce.compact_univariate_growth_rule)
nunique_vars = len(unique_quadrule_indices)
assert len(unique_max_level_1d) == nunique_vars
for ii in range(nunique_vars):
for jj in unique_quadrule_indices[ii]:
max_level_1d[jj] = np.minimum(
unique_max_level_1d[ii], max_level_1d[jj])
admissibility_function = partial(
max_level_admissibility_function, np.inf, max_level_1d, max_nsamples,
tol, verbose=verbose)
pce.admissibility_function = admissibility_function
def adaptive_approximate_polynomial_chaos_induced(
fun, variables,
callback=None,
refinement_indicator=variance_pce_refinement_indicator,
growth_rules=None, max_nsamples=100, tol=0, verbose=0,
max_level_1d=None, induced_sampling=True, cond_tol=1e6,
fit_opts={'omp_tol': 0}):
r"""
Compute an adaptive Polynomial Chaos Expansion of a function based upon
random induced or probability measure sampling.
Parameters
----------
fun : callable
The function to be minimized
``fun(z) -> np.ndarray``
where ``z`` is a 2D np.ndarray with shape (nvars,nsamples) and the
output is a 2D np.ndarray with shape (nsamples,nqoi)
variables : IndependentMarginalsVariable
A set of independent univariate random variables
callback : callable
Function called after each iteration with the signature
``callback(approx_k)``
where approx_k is the current approximation object.
refinement_indicator : callable
A function that retuns an estimate of the error of a sparse grid
subspace with signature
``refinement_indicator(subspace_index,nnew_subspace_samples,sparse_grid) -> float, float``
where ``subspace_index`` is 1D np.ndarray of size (nvars),
``nnew_subspace_samples`` is an integer specifying the number
of new samples that will be added to the sparse grid by adding the
subspace specified by subspace_index and ``sparse_grid`` is the current
:class:`pyapprox.adaptive_sparse_grid.CombinationSparseGrid` object.
The two outputs are, respectively, the indicator used to control
refinement of the sparse grid and the change in error from adding the
current subspace. The indicator is typically but now always dependent
on the error.
growth_rules : list or callable
a list (or single callable) of growth rules with signature
``growth_rule(l)->integer``
where the output ``nsamples`` specifies the number of indices of the
univariate basis of level ``l``.
If the entry is a callable then the same growth rule is
applied to every variable.
max_nsamples : integer
The maximum number of evaluations of fun.
tol : float
Tolerance for termination. The construction of the sparse grid is
terminated when the estimate error in the sparse grid (determined by
``refinement_indicator`` is below tol.
verbose : integer
Controls messages printed during construction.
max_level_1d : np.ndarray (nvars)
The maximum level of the sparse grid in each dimension. If None
There is no limit
induced_sampling : boolean
True - use induced sampling
False - sample from probability measure
cond_tol : float
The maximum allowable condition number of the regression problem.
If induced_sampling is False and cond_tol < 0 then we do not sample
until cond number is below cond_tol but rather simply add
nnew_indices*abs(cond_tol) samples. That is we specify an
over sampling factor
Returns
-------
result : :class:`pyapprox.surrogates.approximate.ApproximateResult`
Result object with the following attributes
approx: :class:`pyapprox.surrogates.polychaos.gpc.PolynomialChaosExpansion`
The PCE approximation
"""
var_trans = AffineTransform(variables)
pce = AdaptiveInducedPCE(
var_trans.num_vars(), induced_sampling=induced_sampling,
cond_tol=cond_tol, fit_opts=fit_opts)
__setup_adaptive_pce(pce, verbose, fun, var_trans, growth_rules,
refinement_indicator, tol, max_nsamples, callback,
max_level_1d)
pce.build(callback)
return ApproximateResult({'approx': pce})
def adaptive_approximate_polynomial_chaos_leja(
fun, variables,
callback=None,
refinement_indicator=variance_pce_refinement_indicator,
growth_rules=None, max_nsamples=100, tol=0, verbose=0,
max_level_1d=None,
ncandidate_samples=1e4, generate_candidate_samples=None):
r"""
Compute an adaptive Polynomial Chaos Expansion of a function based upon
multivariate Leja sequences.
Parameters
----------
fun : callable
The function to be minimized
``fun(z) -> np.ndarray``
where ``z`` is a 2D np.ndarray with shape (nvars,nsamples) and the
output is a 2D np.ndarray with shape (nsamples,nqoi)
variables : IndependentMarginalsVariable
A set of independent univariate random variables
callback : callable
Function called after each iteration with the signature
``callback(approx_k)``
where approx_k is the current approximation object.
refinement_indicator : callable
A function that retuns an estimate of the error of a sparse grid
subspace with signature
``refinement_indicator(subspace_index,nnew_subspace_samples,sparse_grid) -> float, float``
where ``subspace_index`` is 1D np.ndarray of size (nvars),
``nnew_subspace_samples`` is an integer specifying the number
of new samples that will be added to the sparse grid by adding the
subspace specified by subspace_index and ``sparse_grid`` is the current
:class:`pyapprox.adaptive_sparse_grid.CombinationSparseGrid` object.
The two outputs are, respectively, the indicator used to control
refinement of the sparse grid and the change in error from adding the
current subspace. The indicator is typically but now always dependent
on the error.
growth_rules : list or callable
a list (or single callable) of growth rules with signature
``growth_rule(l)->integer``
where the output ``nsamples`` specifies the number of indices of the
univariate basis of level ``l``.
If the entry is a callable then the same growth rule is
applied to every variable.
max_nsamples : integer
The maximum number of evaluations of fun.
tol : float
Tolerance for termination. The construction of the sparse grid is
terminated when the estimate error in the sparse grid (determined by
``refinement_indicator`` is below tol.
verbose : integer
Controls messages printed during construction.
max_level_1d : np.ndarray (nvars)
The maximum level of the sparse grid in each dimension. If None
There is no limit
ncandidate_samples : integer
The number of candidate samples used to generate the Leja sequence
The Leja sequence will be a subset of these samples.
generate_candidate_samples : callable
A function that generates the candidate samples used to build the Leja
sequence with signature
``generate_candidate_samples(ncandidate_samples) -> np.ndarray``
The output is a 2D np.ndarray with size(nvars,ncandidate_samples)
Returns
-------
result : :class:`pyapprox.surrogates.approximate.ApproximateResult`
Result object with the following attributes
approx: :class:`pyapprox.surrogates.polychaos.gpc.PolynomialChaosExpansion`
The PCE approximation
"""
var_trans = AffineTransform(variables)
pce = __initialize_leja_pce(
variables, generate_candidate_samples, ncandidate_samples)
__setup_adaptive_pce(pce, verbose, fun, var_trans, growth_rules,
refinement_indicator, tol, max_nsamples, callback,
max_level_1d)
pce.build(callback)
return ApproximateResult({'approx': pce})
def adaptive_approximate_gaussian_process(
fun, variable, callback=None,
max_nsamples=100, verbose=0, ncandidate_samples=1e4,
checkpoints=None, nu=np.inf, n_restarts_optimizer=1,
normalize_y=False, alpha=0,
noise_level=None, noise_level_bounds='fixed',
kernel_variance=None,
kernel_variance_bounds='fixed',
length_scale=1,
length_scale_bounds=(1e-2, 10),
generate_candidate_samples=None,
weight_function=None,
normalize_inputs=False):
r"""
Adaptively construct a Gaussian process approximation of a function using
weighted-pivoted-Cholesky sampling and the Matern kernel
.. math::
k(z_i, z_j) = \frac{1}{\Gamma(\nu)2^{\nu-1}}\Bigg(
\frac{\sqrt{2\nu}}{l} \lVert z_i - z_j \rVert_2\Bigg)^\nu K_\nu\Bigg(
\frac{\sqrt{2\nu}}{l} \lVert z_i - z_j \rVert_2\Bigg)
where :math:`\lVert \cdot \rVert_2` is the Euclidean distance,
:math:`\Gamma(\cdot)` is the gamma function, :math:`K_\nu(\cdot)` is the
modified Bessel function.
Starting from an initial guess, the algorithm learns the kernel length
scale as more training data is collected.
Parameters
----------
fun : callable
The function to be approximated
``fun(z) -> np.ndarray``
where ``z`` is a 2D np.ndarray with shape (nvars,nsamples) and the
output is a 2D np.ndarray with shape (nsamples,nqoi)
variable : IndependentMarginalsVariable
A set of independent univariate random variables
callback : callable
Function called after each iteration with the signature
``callback(approx_k)``
where approx_k is the current approximation object.
nu : string
The parameter :math:`\nu` of the Matern kernel. When :math:`\nu\to\inf`
the Matern kernel is equivalent to the squared-exponential kernel.
checkpoints : iterable
The set of points at which the length scale of the kernel will be
recomputed and new training data obtained. If None then
``checkpoints = np.linspace(10, max_nsamples, 10).astype(int)``
max_nsamples : float
The maximum number of evaluations of fun. If fun has configure
variables.
ncandidate_samples : integer
The number of candidate samples used to select the training samples
The final training samples will be a subset of these samples.
alpha : float
Nugget added to diagonal of the covariance kernel evaluated at
the training data. Used to improve numerical conditionining. This
parameter is different to noise_level which applies to both training
and test data
normalize_y : bool
True - normalize the training values to have zero mean and unit
variance
length_scale : float
The initial length scale used to generate the first batch of training
samples
length_scale_bounds : tuple (2)
The lower and upper bound on length_scale used in optimization of
the Gaussian process hyper-parameters
noise_level : float
The noise_level used when training the GP
noise_level_bounds : tuple (2)
The lower and upper bound on noise_level used in optimization of
the Gaussian process hyper-parameters
kernel_variance : float
The kernel_variance used when training the GP
noise_level_bounds : tuple (2)
The lower and upper bound on kernel_variance used in optimization of
the Gaussian process hyper-parameters
n_restarts_optimizer : int
The number of local optimizeation problems solved to find the
GP hyper-parameters
verbose : integer
Controls the amount of information printed to screen
generate_candidate_samples : callable
A function that generates the candidate samples used to build the Leja
sequence with signature
``generate_candidate_samples(ncandidate_samples) -> np.ndarray``
The output is a 2D np.ndarray with size(nvars,ncandidate_samples)
weight_function : callable
Function used to precondition kernel with the signature
``weight_function(samples) -> np.ndarray (num_samples)``
where samples is a np.ndarray (num_vars,num_samples)
Returns
-------
result : :class:`pyapprox.surrogates.approximate.ApproximateResult`
Result object with the following attributes
approx : :class:`pyapprox.surrogates.gaussianprocess.gaussian_process.AdaptiveGaussianProcess`
The Gaussian process
"""
assert max_nsamples <= ncandidate_samples
nvars = variable.num_vars()
if normalize_inputs:
var_trans = AffineTransform(variable)
else:
var_trans = None
if normalize_y:
raise ValueError("normalize_y=True not currently supported")
kernel = __setup_gaussian_process_kernel(
nvars, length_scale, length_scale_bounds,
kernel_variance, kernel_variance_bounds,
noise_level, noise_level_bounds, nu)
sampler = CholeskySampler(
nvars, ncandidate_samples, variable,
gen_candidate_samples=generate_candidate_samples,
var_trans=var_trans)
sampler_kernel = copy.deepcopy(kernel)
sampler.set_kernel(sampler_kernel)
sampler.set_weight_function(weight_function)
gp = AdaptiveGaussianProcess(
kernel, n_restarts_optimizer=n_restarts_optimizer, alpha=alpha,
normalize_y=normalize_y)
gp.setup(fun, sampler)
if var_trans is not None:
gp.set_variable_transformation(var_trans)
if checkpoints is None:
nsteps = 10
if max_nsamples-10 < nsteps:
nsteps = max_nsamples-10
checkpoints = np.linspace(10, max_nsamples, nsteps).astype(int)
checkpoints = np.unique(checkpoints.astype(int))
assert checkpoints[-1] <= max_nsamples
nsteps = len(checkpoints)
for kk in range(nsteps):
chol_flag = gp.refine(checkpoints[kk])
gp.sampler.set_kernel(copy.deepcopy(gp.kernel_))
if callback is not None:
callback(gp)
if chol_flag != 0:
msg = "Cannot add additional samples. "
msg += "Kernel is now ill conditioned. "
msg += 'If more samples are really required increase alpha or '
msg += 'manually fix kernel_length to a smaller value'
print('Exiting: ' + msg)
# print(gp.kernel_)
# print(np.linalg.norm(gp.sampler.candidate_samples))
break
return ApproximateResult({'approx': gp})
def compute_l2_error(f, g, variable, nsamples, rel=False):
r"""
Compute the :math:`\ell^2` error of the output of two functions f and g,
i.e.
.. math:: \lVert f(z)-g(z)\rVert\approx \sum_{m=1}^M f(z^{(m)})
from a set of random draws :math:`\mathcal{Z}=\{z^{(m)}\}_{m=1}^M`
from the PDF of :math:`z`.
Parameters
----------
f : callable
Function with signature
``g(z) -> np.ndarray``
where ``z`` is a 2D np.ndarray with shape (nvars,nsamples) and the
output is a 2D np.ndarray with shaoe (nsamples,nqoi)
g : callable
Function with signature
``f(z) -> np.ndarray``
where ``z`` is a 2D np.ndarray with shape (nvars,nsamples) and the
output is a 2D np.ndarray with shaoe (nsamples,nqoi)
variable : pya.IndependentMarginalsVariable
Object containing information of the joint density of the inputs z.
This is used to generate random samples from this join density
nsamples : integer
The number of samples used to compute the :math:`\ell^2` error
rel : boolean
True - compute relative error
False - compute absolute error
Returns
-------
error : np.ndarray (nqoi)
"""
validation_samples = generate_independent_random_samples(
variable, nsamples)
validation_vals = f(validation_samples)
approx_vals = g(validation_samples)
assert validation_vals.shape == approx_vals.shape
error = np.linalg.norm(approx_vals-validation_vals, axis=0)
if not rel:
error /= np.sqrt(validation_samples.shape[1])
else:
error /= np.linalg.norm(validation_vals, axis=0)
return error
[docs]def adaptive_approximate(fun, variable, method, options=None):
r"""
Adaptive approximation of a scalar or vector-valued function of one or
more variables. These methods choose the samples to at which to
evaluate the function being approximated.
Parameters
----------
fun : callable
The function to be minimized
``fun(z) -> np.ndarray``
where ``z`` is a 2D np.ndarray with shape (nvars,nsamples) and the
output is a 2D np.ndarray with shaoe (nsamples,nqoi)
method : string
Type of approximation. Should be one of
- 'sparse_grid'
- 'polynomial_chaos'
- 'gaussian_process'
Returns
-------
result : :class:`pyapprox.surrogates.approximate.ApproximateResult`
Result object. For more details see
- :func:`pyapprox.surrogates.approximate.adaptive_approximate_sparse_grid`
- :func:`pyapprox.surrogates.approximate.adaptive_approximate_polynomial_chaos`
- :func:`pyapprox.surrogates.approximate.adaptive_approximate_gaussian_process`
"""
methods = {'sparse_grid': adaptive_approximate_sparse_grid,
'polynomial_chaos': adaptive_approximate_polynomial_chaos,
'gaussian_process': adaptive_approximate_gaussian_process}
if type(variable) != IndependentMarginalsVariable:
variable = IndependentMarginalsVariable(
variable)
if method not in methods:
msg = f'Method {method} not found.\n Available methods are:\n'
for key in methods.keys():
msg += f"\t{key}\n"
raise Exception(msg)
if options is None:
options = {}
return methods[method](fun, variable, **options)
def approximate_polynomial_chaos(train_samples, train_vals, verbosity=0,
basis_type='expanding_basis',
variable=None, options=None, poly_opts=None):
r"""
Compute a Polynomial Chaos Expansion of a function from a fixed data set.
Parameters
----------
train_samples : np.ndarray (nvars,nsamples)
The inputs of the function used to train the approximation
train_vals : np.ndarray (nvars,nsamples)
The values of the function at ``train_samples``
basis_type : string
Type of approximation. Should be one of
- 'hyperbolic_cross' see :func:`pyapprox.surrogates.approximate.cross_validate_pce_degree`
- 'expanding_basis' see :func:`pyapprox.surrogates.approximate.expanding_basis_pce`
- 'fixed' see :func:`pyapprox.surrogates.approximate.approximate_fixed_pce`
variable : pya.IndependentMarginalsVariable
Object containing information of the joint density of the inputs z.
This is used to generate random samples from this join density
verbosity : integer
Controls the amount of information printed to screen
poly_opts : dictionary
Dictionary definining the custom configuration of the polynomial
chaos expansion basis. See :func:`pyapprox.surrogates.polychaos.gpc.PolynomialChaosExpansion.configure`
Returns
-------
result : :class:`pyapprox.surrogates.approximate.ApproximateResult`
Result object. For more details see
- :func:`pyapprox.surrogates.approximate.cross_validate_pce_degree`
- :func:`pyapprox.surrogates.approximate.expanding_basis_pce`
- :func:`pyapprox.surrogates.approximate.approximate_fixed_pce`
"""
funcs = {'expanding_basis': expanding_basis_pce,
'hyperbolic_cross': cross_validate_pce_degree,
'fixed': approximate_fixed_pce}
if variable is None:
msg = 'pce requires that variable be defined'
raise Exception(msg)
if basis_type not in funcs:
msg = f'Basis type {basis_type} not found.\n Available types are:\n'
for key in funcs.keys():
msg += f"\t{key}\n"
raise Exception(msg)
poly = PolynomialChaosExpansion()
if poly_opts is None:
var_trans = AffineTransform(variable)
poly_opts = define_poly_options_from_variable_transformation(
var_trans)
poly.configure(poly_opts)
if options is None:
options = {}
res = funcs[basis_type](poly, train_samples, train_vals, **options)
return res
def approximate_neural_network(train_samples, train_vals,
network_opts, verbosity=0,
variable=None, optimizer_opts=None, x0=None):
print(network_opts)
network = NeuralNetwork(network_opts)
if x0 is None:
nrestarts = 10
x0 = np.random.uniform(-1, 2, (network.nparams, nrestarts))
if optimizer_opts is None:
optimizer_opts = {"method": "L-BFGS-B",
"options": {"maxiter": 1000}}
network.fit(train_samples, train_vals, x0, verbose=verbosity,
opts=optimizer_opts)
return ApproximateResult({'approx': network})
[docs]def approximate(train_samples, train_vals, method, options=None):
r"""
Approximate a scalar or vector-valued function of one or
more variables from a set of points provided by the user
Parameters
----------
train_samples : np.ndarray (nvars,nsamples)
The inputs of the function used to train the approximation
train_vals : np.ndarray (nvars,nsamples)
The values of the function at ``train_samples``
method : string
Type of approximation. Should be one of
- 'polynomial_chaos'
- 'gaussian_process'
Returns
-------
result : :class:`pyapprox.surrogates.approximate.ApproximateResult`
"""
methods = {'polynomial_chaos': approximate_polynomial_chaos,
'gaussian_process': approximate_gaussian_process,
'neural_network': approximate_neural_network}
# 'tensor-train':approximate_tensor_train,
if method not in methods:
msg = f'Method {method} not found.\n Available methods are:\n'
for key in methods.keys():
msg += f"\t{key}\n"
raise Exception(msg)
if options is None:
options = {}
return methods[method](train_samples, train_vals, **options)
class LinearLeastSquares(LinearModel):
def __init__(self, alpha=0.):
self.alpha = alpha
def fit(self, X, y):
gram_mat = X.T.dot(X)
gram_mat += self.alpha*np.eye(gram_mat.shape[0])
self.coef_ = np.linalg.lstsq(
gram_mat, X.T.dot(y), rcond=None)[0]
# Do not current support fit_intercept = True
self.intercept_ = 0
return self
class LinearLeastSquaresCV(LinearModel):
"""
Parameters
----------
- None, to use the default 5-fold cross-validation
- integer to specify the number of folds
"""
def __init__(self, alphas=[0.], cv=None, random_folds=True):
if cv is None:
# sklearn RidgeCV can only be applied with leave one out cross
# validation. Use this as the default here
cv = 1
self.cv = cv
self.alphas = alphas
self.random_folds = random_folds
def fit(self, X, y):
if y.ndim == 1:
y = y[:, None]
assert y.shape[1] == 1
if self.cv != y.shape[0]:
fold_sample_indices = get_random_k_fold_sample_indices(
X.shape[0], self.cv, self.random_folds)
results = [leave_many_out_lsq_cross_validation(
X, y, fold_sample_indices, alpha) for alpha in self.alphas]
else:
results = [leave_one_out_lsq_cross_validation(
X, y, alpha) for alpha in self.alphas]
cv_scores = [r[1] for r in results]
ii_best_alpha = np.argmin(cv_scores)
self.cv_score_ = cv_scores[ii_best_alpha][0]
self.alpha_ = self.alphas[ii_best_alpha]
self.coef_ = results[ii_best_alpha][2]
# Do not current support fit_intercept = True
self.intercept_ = 0
return self
@ignore_warnings(category=ConvergenceWarning)
@ignore_warnings(category=LinAlgWarning)
@ignore_warnings(category=RuntimeWarning)
def fit_linear_model(basis_matrix, train_vals, solver_type, **kwargs):
# verbose=1 will display lars path on entire data setUp
# verbose>1 will also show this plus paths on each cross validation set
solvers = {'lasso': [LassoLarsCV, LassoLars],
'lasso_grad': [LassoCV, Lasso],
'lars': [LarsCV, Lars],
'omp': [OrthogonalMatchingPursuitCV, OrthogonalMatchingPursuit],
'lstsq': [LinearLeastSquaresCV, LinearLeastSquares]}
if solver_type not in solvers:
msg = f'Solver type {solver_type} not supported\n'
msg += 'Supported solvers are:\n'
for key in solvers.keys():
msg += f'\t{key}\n'
raise Exception(msg)
if solver_type == 'lars':
msg = 'Currently lars does not exit when alpha starts to grow '
msg += 'this causes problems with cross validation. The lasso variant '
msg += 'lars does work because this exit condition is implemented'
raise Exception(msg)
assert train_vals.ndim == 2
assert train_vals.shape[1] == 1
# The following comment and two conditional statements are only true
# for lars which I have switched off.
# cv interpolates each residual onto a common set of alphas
# This is problematic if the alpha path is not monotonically decreasing
# For some problems alpha will increase for last few sample sizes. This
# messes up interpolation and causes the best_alpha to be estimated
# very poorly in some cases. I belive all_alphas = np.unique(all_alphas)
# is the culprit. To avoid the aforementioned issue set max_iter to
# ntrain_samples//2 This is typically stops the algorithm after
# what would have been chosen as the best_alpha but before
# alphas start increasing. Ideally sklearn should exit when
# alphas increase.
# if solver_type != 'lstsq' and 'max_iter' not in kwargs:
# kwargs['max_iter'] = basis_matrix.shape[0]//2
# if 'max_iter' in kwargs and kwargs['max_iter'] > basis_matrix.shape[0]//2:
# msg = "Warning: max_iter is set large this can effect not just "
# msg += "Computational cost but also final accuracy"
# print(msg)
if solver_type == 'omp' and 'max_iter' in kwargs:
# unlike lasso/lars max iter is not allowed to be greater than
# number of columns (features/bases)
kwargs['max_iter'] = min(kwargs['max_iter'], basis_matrix.shape[1])
# for omp to work sklean must be patched to store mse_path_.
# Add the line self.mse_path_ = mse_folds.T
# as the last line (913) before return self in the function fit of
# OrthogonalMatchingPursuitCV in
# site-packages/sklearn/linear_model/_omp.py
if 'cv' not in kwargs or kwargs['cv'] is False:
solver_idx = 1
else:
solver_idx = 0
if kwargs.get('precondition', False):
weights = np.sqrt(christoffel_weights(basis_matrix))[:, None]
basis_matrix = basis_matrix.copy()*weights
train_vals = train_vals.copy()*weights
if 'precondition' in kwargs:
del kwargs['precondition']
fit = solvers[solver_type][solver_idx](**kwargs).fit
res = fit(basis_matrix, train_vals[:, 0])
coef = res.coef_
if coef.ndim == 1:
coef = coef[:, np.newaxis]
# some methods allow fit_intercept to be set. If True this method
# extracts of mean of data before computing coefficients.
# res.predict then makes predictions as X.dot(coef_) + res.intercept_
# I assume first coefficient is constant basis and want to be able
# to simply return X.dot(coef_) (e.g. as done be Polynomial Chaos
# Expansion). Thus add res.intercept_ to first coefficient, i.e.
coef[0] += res.intercept_
if 'cv' in kwargs:
cv_score = extract_cross_validation_score(res)
best_regularization_param = extract_best_regularization_parameters(res)
else:
cv_score = None
best_regularization_param = None
return coef, cv_score, best_regularization_param
def extract_best_regularization_parameters(res):
if (type(res) == LassoLarsCV or type(res) == LinearLeastSquaresCV):
return res.alpha_
elif type(res) == LarsCV:
assert len(res.n_iter_) == 1
# The Lars (not LarsCV) model takes max_iters as regularization
# parameter so return it here as well as the alpha. Sklearn
# has an inconsistency alpha is used to choose best cv score but Lars
# uses max_iters to stop algorithm early.
return (res.alpha_, res.n_iter_[0])
elif type(res) == OrthogonalMatchingPursuitCV:
return res.n_nonzero_coefs_
else:
raise Exception()
def extract_cross_validation_score(linear_model):
if hasattr(linear_model, 'cv_score_'):
return linear_model.cv_score_
elif hasattr(linear_model, 'mse_path_'):
return np.sqrt(linear_model.mse_path_.mean(axis=-1).min())
else:
raise Exception('attribute mse_path_ not found')
def cross_validate_pce_degree(
pce, train_samples, train_vals, min_degree=1, max_degree=3,
hcross_strength=1, solver_type='lasso', verbose=0,
linear_solver_options={'cv': 10}):
r"""
Use cross validation to find the polynomial degree which best fits the
data.
A polynomial is constructed for each degree and the degree with the highest
cross validation score is returned.
Parameters
----------
train_samples : np.ndarray (nvars,nsamples)
The inputs of the function used to train the approximation
train_vals : np.ndarray (nvars,nsamples)
The values of the function at ``train_samples``
min_degree : integer
The minimum degree to consider
max_degree : integer
The maximum degree to consider.
All degrees in ``range(min_degree,max_deree+1)`` are considered
hcross_strength : float
The strength of the hyperbolic cross index set. hcross_strength must be
in (0,1]. A value of 1 produces total degree polynomials
cv : integer
The number of cross validation folds used to compute the cross
validation error
solver_type : string
The type of regression used to train the polynomial
- 'lasso'
- 'lars'
- 'lasso_grad'
- 'omp'
- 'lstsq'
verbose : integer
Controls the amount of information printed to screen
Returns
-------
result : :class:`pyapprox.surrogates.approximate.ApproximateResult`
Result object with the following attributes
approx: :class:`pyapprox.surrogates.polychaos.gpc.PolynomialChaosExpansion`
The PCE approximation
scores : np.ndarray (nqoi)
The best cross validation score for each QoI
degrees : np.ndarray (nqoi)
The best degree for each QoI
reg_params : np.ndarray (nqoi)
The best regularization parameters for each QoI chosen by cross
validation.
"""
coefs = []
scores = []
indices = []
degrees = []
reg_params = []
indices_dict = dict()
unique_indices = []
nqoi = train_vals.shape[1]
for ii in range(nqoi):
if verbose > 1:
print(f'Approximating QoI: {ii}')
pce_ii, score_ii, degree_ii, reg_param_ii = _cross_validate_pce_degree(
pce, train_samples, train_vals[:, ii:ii+1], min_degree,
max_degree, hcross_strength, linear_solver_options,
solver_type, verbose)
coefs.append(pce_ii.get_coefficients())
scores.append(score_ii)
indices.append(pce_ii.get_indices())
degrees.append(degree_ii)
reg_params.append(reg_param_ii)
for index in indices[ii].T:
key = hash_array(index)
if key not in indices_dict:
indices_dict[key] = len(unique_indices)
unique_indices.append(index)
unique_indices = np.array(unique_indices).T
all_coefs = np.zeros((unique_indices.shape[1], nqoi))
for ii in range(nqoi):
for jj, index in enumerate(indices[ii].T):
key = hash_array(index)
all_coefs[indices_dict[key], ii] = coefs[ii][jj, 0]
pce.set_indices(unique_indices)
pce.set_coefficients(all_coefs)
return ApproximateResult({'approx': pce, 'scores': np.array(scores),
'degrees': np.array(degrees),
'reg_params': reg_params})
def _cross_validate_pce_degree(
pce, train_samples, train_vals, min_degree=1, max_degree=3,
hcross_strength=1, linear_solver_options={'cv': 10},
solver_type='lasso', verbose=0):
assert train_vals.shape[1] == 1
if min_degree is None:
min_degree = 2
if max_degree is None:
max_degree = np.iinfo(int).max-1
best_coef = None
best_cv_score = np.finfo(np.double).max
best_degree = min_degree
prev_num_terms = 0
if verbose > 0:
print("{:<8} {:<10} {:<18}".format('degree', 'num_terms', 'cv score',))
rng_state = np.random.get_state()
for degree in range(min_degree, max_degree+1):
indices = compute_hyperbolic_indices(
pce.num_vars(), degree, hcross_strength)
pce.set_indices(indices)
if ((pce.num_terms() > 100000) and
(100000-prev_num_terms < pce.num_terms()-100000)):
break
basis_matrix = pce.basis_matrix(train_samples)
# use the same state (thus cross validation folds) for each degree
np.random.set_state(rng_state)
coef, cv_score, reg_param = fit_linear_model(
basis_matrix, train_vals, solver_type, **linear_solver_options)
np.random.set_state(rng_state)
pce.set_coefficients(coef)
if verbose > 0:
print("{:<8} {:<10} {:<18} ".format(
degree, pce.num_terms(), cv_score))
if ((cv_score >= best_cv_score) and (degree-best_degree > 1)):
break
if (cv_score < best_cv_score):
best_cv_score = cv_score
best_coef = coef.copy()
best_degree = degree
best_reg_param = reg_param
prev_num_terms = pce.num_terms()
pce.set_indices(compute_hyperbolic_indices(
pce.num_vars(), best_degree, hcross_strength))
pce.set_coefficients(best_coef)
if verbose > 0:
print('best degree:', best_degree)
return pce, best_cv_score, best_degree, best_reg_param
def restrict_basis(indices, coefficients, tol):
II = np.where(np.absolute(coefficients) > tol)[0]
restricted_indices = indices[:, II]
degrees = indices.sum(axis=0)
JJ = np.where(degrees == 0)[0]
assert JJ.shape[0] == 1
if JJ not in II:
# always include zero degree polynomial in restricted_indices
restricted_indices = np.concatenate(
[indices[:JJ], restricted_indices])
return restricted_indices
def expand_basis(indices):
nvars, nindices = indices.shape
indices_set = set()
for ii in range(nindices):
indices_set.add(hash_array(indices[:, ii]))
new_indices = []
for ii in range(nindices):
index = indices[:, ii]
for dd in range(nvars):
forward_index = get_forward_neighbor(index, dd)
key = hash_array(forward_index)
if key not in indices_set:
admissible = True
active_vars = np.nonzero(forward_index)
for kk in active_vars:
backward_index = get_backward_neighbor(forward_index, kk)
if hash_array(backward_index) not in indices_set:
admissible = False
break
if admissible:
indices_set.add(key)
new_indices.append(forward_index)
return np.asarray(new_indices).T
def expanding_basis_pce(pce, train_samples, train_vals, hcross_strength=1,
verbose=1, max_num_init_terms=None, max_num_terms=None,
solver_type='lasso',
linear_solver_options={'cv': 10},
restriction_tol=np.finfo(float).eps*2,
max_num_expansion_steps_iter=1,
max_iters=20,
max_num_step_increases=1):
r"""
Iteratively expand and restrict the polynomial basis and use
cross validation to find the best basis [JESJCP2015]_
Parameters
----------
train_samples : np.ndarray (nvars,nsamples)
The inputs of the function used to train the approximation
train_vals : np.ndarray (nvars,nqoi)
The values of the function at ``train_samples``
hcross_strength : float
The strength of the hyperbolic cross index set. hcross_strength must be
in (0,1]. A value of 1 produces total degree polynomials
cv : integer
The number of cross validation folds used to compute the cross
validation error
solver_type : string
The type of regression used to train the polynomial
- 'lasso'
- 'lars'
- 'lasso_grad'
- 'omp'
- 'lstsq'
verbose : integer
Controls the amount of information printed to screen
restriction_tol : float
The tolerance used to prune inactive indices
max_num_init_terms : integer
The number of terms used to initialize the algorithm
max_num_terms : integer
The maximum number of terms allowed
max_iters : integer
The number of expansion/restriction iterations
max_num_expansion_steps_iter : integer (1,2,3)
The number of times a basis can expanded after
the last restriction step
max_num_expansion_steps_iter : integer
The number of iterations error does not decrease before
terminating
Returns
-------
result : :class:`pyapprox.surrogates.approximate.ApproximateResult`
Result object with the following attributes
approx: :class:`pyapprox.surrogates.polychaos.gpc.PolynomialChaosExpansion`
The PCE approximation
scores : np.ndarray (nqoi)
The best cross validation score for each QoI
reg_params : np.ndarray (nqoi)
The best regularization parameters for each QoI chosen by cross
validation.
References
----------
.. [JESJCP2015] `J.D. Jakeman, M.S. Eldred, and K. Sargsyan. Enhancing l1-minimization estimates of polynomial chaos expansions using basis selection. Journal of Computational Physics, 289(0):18 – 34, 2015 <https://doi.org/10.1016/j.jcp.2015.02.025>`_
"""
coefs = []
scores = []
indices = []
reg_params = []
indices_dict = dict()
unique_indices = []
nqoi = train_vals.shape[1]
for ii in range(nqoi):
if verbose > 1:
print(f'Approximating QoI: {ii}')
pce_ii, score_ii, reg_param_ii = _expanding_basis_pce(
pce, train_samples, train_vals[:, ii:ii+1], hcross_strength,
verbose, max_num_init_terms, max_num_terms, solver_type,
linear_solver_options, restriction_tol,
max_num_expansion_steps_iter, max_iters, max_num_step_increases)
coefs.append(pce_ii.get_coefficients())
scores.append(score_ii)
indices.append(pce_ii.get_indices())
reg_params.append(reg_param_ii)
for index in indices[ii].T:
key = hash_array(index)
if key not in indices_dict:
indices_dict[key] = len(unique_indices)
unique_indices.append(index)
unique_indices = np.array(unique_indices).T
all_coefs = np.zeros((unique_indices.shape[1], nqoi))
for ii in range(nqoi):
for jj, index in enumerate(indices[ii].T):
key = hash_array(index)
all_coefs[indices_dict[key], ii] = coefs[ii][jj, 0]
pce.set_indices(unique_indices)
pce.set_coefficients(all_coefs)
return ApproximateResult({'approx': pce, 'scores': np.array(scores),
'reg_params': reg_params})
def _expanding_basis_pce(pce, train_samples, train_vals, hcross_strength=1,
verbose=1, max_num_init_terms=None,
max_num_terms=None,
solver_type='lasso',
linear_solver_options={'cv': 10},
restriction_tol=np.finfo(float).eps*2,
max_num_expansion_steps_iter=3,
max_iters=20,
max_num_step_increases=1):
assert train_vals.shape[1] == 1
num_vars = pce.num_vars()
if max_num_init_terms is None:
max_num_init_terms = train_vals.shape[0]
if max_num_terms is None:
max_num_terms = 10*train_vals.shape[0]
degree = 2
prev_num_terms = 0
while True:
indices = compute_hyperbolic_indices(num_vars, degree, hcross_strength)
num_terms = indices.shape[1]
if (num_terms > max_num_init_terms):
break
degree += 1
prev_num_terms = num_terms
if (abs(num_terms - max_num_init_terms) >
abs(prev_num_terms - max_num_init_terms)):
degree -= 1
pce.set_indices(
compute_hyperbolic_indices(num_vars, degree, hcross_strength))
if verbose > 0:
msg = f'Initializing basis with hyperbolic cross of degree {degree} '
msg += f'and strength {hcross_strength} with {pce.num_terms()} terms'
print(msg)
rng_state = np.random.get_state()
basis_matrix = pce.basis_matrix(train_samples)
best_coef, best_cv_score, best_reg_param = fit_linear_model(
basis_matrix, train_vals, solver_type, **linear_solver_options)
np.random.set_state(rng_state)
pce.set_coefficients(best_coef)
best_indices = pce.get_indices()
best_cv_score_iter = best_cv_score
best_indices_iter = best_indices.copy()
best_coef_iter = best_coef.copy()
best_reg_param_iter = best_reg_param
if verbose > 0:
print("{:<10} {:<10} {:<18}".format('nterms', 'nnz terms', 'cv score'))
print("{:<10} {:<10} {:<18}".format(
pce.num_terms(), np.count_nonzero(pce.coefficients),
best_cv_score))
it = 0
best_it = 0
while True:
current_max_num_expansion_steps_iter = 1
best_cv_score_iter = best_cv_score
indices_iter = pce.indices.copy()
coef_iter = pce.coefficients.copy()
while True:
# -------------- #
# Expand basis #
# -------------- #
num_expansion_steps_iter = 0
indices = restrict_basis(
# pce.indices, pce.coefficients, restriction_tol)
indices_iter, coef_iter, restriction_tol)
msg = f'Expanding {indices.shape[1]} restricted from '
msg += f'{pce.indices.shape[1]} terms'
while (num_expansion_steps_iter <
current_max_num_expansion_steps_iter):
new_indices = expand_basis(indices)
indices = np.hstack([indices, new_indices])
num_expansion_steps_iter += 1
pce.set_indices(indices)
num_terms = pce.num_terms()
msg += f' New number of terms {pce.indices.shape[1]}'
print(msg)
# -----------------#
# Compute solution #
# -----------------#
basis_matrix = pce.basis_matrix(train_samples)
np.random.set_state(rng_state)
coef, cv_score, reg_param = fit_linear_model(
basis_matrix, train_vals, solver_type, **linear_solver_options)
np.random.set_state(rng_state)
pce.set_coefficients(coef)
if verbose > 0:
print("{:<10} {:<10} {:<18}".format(
pce.num_terms(), np.count_nonzero(pce.coefficients),
cv_score))
if (cv_score < best_cv_score_iter):
best_cv_score_iter = cv_score
best_indices_iter = pce.indices.copy()
best_coef_iter = pce.coefficients.copy()
best_reg_param_iter = reg_param
if (num_terms >= max_num_terms):
if verbose > 0:
print(f'Max number of terms {max_num_terms} reached')
break
if (current_max_num_expansion_steps_iter >=
max_num_expansion_steps_iter):
if verbose > 0:
msg = 'Max number of inner expansion steps '
msg += f'({max_num_expansion_steps_iter}) reached'
print(msg)
break
current_max_num_expansion_steps_iter += 1
it += 1
pce.set_indices(best_indices_iter)
pce.set_coefficients(best_coef_iter)
if (best_cv_score_iter < best_cv_score):
best_cv_score = best_cv_score_iter
best_coef = best_coef_iter.copy()
best_indices = best_indices_iter.copy()
best_reg_param = best_reg_param_iter
best_it = it
elif (it - best_it >= max_num_step_increases):
if verbose > 0:
msg = 'Terminating: error did not decrease'
msg += f' in last {max_num_step_increases} iterations'
msg += f'best error: {best_cv_score}'
print(msg)
break
if it >= max_iters:
if verbose > 0:
msg = 'Terminating: max iterations reached'
print(msg)
break
nindices = best_indices.shape[1]
II = np.nonzero(best_coef[:, 0])[0]
pce.set_indices(best_indices[:, II])
pce.set_coefficients(best_coef[II])
if verbose > 0:
msg = f'Final basis has {pce.num_terms()} terms selected from '
msg += f'{nindices} using {train_samples.shape[1]} samples'
print(msg)
return pce, best_cv_score, best_reg_param
def approximate_fixed_pce(pce, train_samples, train_vals, indices,
verbose=1, solver_type='lasso',
linear_solver_options={}):
r"""
Estimate the coefficients of a polynomial chaos using regression methods
and pre-specified (fixed) basis and regularization parameters
Parameters
----------
train_samples : np.ndarray (nvars, nsamples)
The inputs of the function used to train the approximation
train_vals : np.ndarray (nvars, nqoi)
The values of the function at ``train_samples``
indices : np.ndarray (nvars, nindices)
The multivariate indices representing each basis in the expansion.
solver_type : string
The type of regression used to train the polynomial
- 'lasso'
- 'lars'
- 'lasso_grad'
- 'omp'
- 'lstsq'
verbose : integer
Controls the amount of information printed to screen
Returns
-------
result : :class:`pyapprox.surrogates.approximate.ApproximateResult`
Result object with the following attributes
approx: :class:`pyapprox.surrogates.polychaos.gpc.PolynomialChaosExpansion`
The PCE approximation
reg_params : np.ndarray (nqoi)
The regularization parameters for each QoI.
"""
nqoi = train_vals.shape[1]
coefs = []
if type(linear_solver_options) == dict:
linear_solver_options = [linear_solver_options]*nqoi
if type(indices) == np.ndarray:
indices = [indices.copy() for ii in range(nqoi)]
unique_indices = []
indices_dict = dict()
for ii in range(nqoi):
pce.set_indices(indices[ii])
basis_matrix = pce.basis_matrix(train_samples)
coef_ii, _, reg_param_ii = fit_linear_model(
basis_matrix, train_vals[:, ii:ii+1], solver_type,
**linear_solver_options[ii])
coefs.append(coef_ii)
for index in indices[ii].T:
key = hash_array(index)
if key not in indices_dict:
indices_dict[key] = len(unique_indices)
unique_indices.append(index)
unique_indices = np.array(unique_indices).T
all_coefs = np.zeros((unique_indices.shape[1], nqoi))
for ii in range(nqoi):
for jj, index in enumerate(indices[ii].T):
key = hash_array(index)
all_coefs[indices_dict[key], ii] = coefs[ii][jj, 0]
pce.set_indices(unique_indices)
pce.set_coefficients(all_coefs)
return ApproximateResult({'approx': pce})
def __setup_gaussian_process_kernel(nvars, length_scale, length_scale_bounds,
kernel_variance, kernel_variance_bounds,
noise_level, noise_level_bounds, nu):
if np.isscalar(length_scale):
length_scale = np.array([length_scale]*nvars)
assert length_scale.shape[0] == nvars
kernel = Matern(length_scale, length_scale_bounds=length_scale_bounds,
nu=nu)
# optimize variance
if kernel_variance is not None:
kernel = ConstantKernel(
constant_value=kernel_variance,
constant_value_bounds=kernel_variance_bounds)*kernel
# optimize gp noise
if noise_level is not None:
kernel += WhiteKernel(
noise_level, noise_level_bounds=noise_level_bounds)
# Note noise_level is different to alpha
# noise_kernel applies nugget to both training and test data
# alpha only applies it to training data
return kernel
def approximate_gaussian_process(train_samples, train_vals, nu=np.inf,
n_restarts_optimizer=5, verbose=0,
normalize_y=False, alpha=0,
noise_level=None, noise_level_bounds='fixed',
kernel_variance=None,
kernel_variance_bounds='fixed',
var_trans=None,
length_scale=1,
length_scale_bounds=(1e-2, 10)):
r"""
Compute a Gaussian process approximation of a function from a fixed data
set using the Matern kernel
.. math::
k(z_i, z_j) = \frac{1}{\Gamma(\nu)2^{\nu-1}}\Bigg(
\frac{\sqrt{2\nu}}{l} \lVert z_i - z_j \rVert_2\Bigg)^\nu K_\nu\Bigg(
\frac{\sqrt{2\nu}}{l} \lVert z_i - z_j \rVert_2\Bigg)
where :math:`\lVert \cdot \rVert_2` is the Euclidean distance,
:math:`\Gamma(\cdot)` is the gamma function, :math:`K_\nu(\cdot)` is the
modified Bessel function.
Parameters
----------
train_samples : np.ndarray (nvars, nsamples)
The inputs of the function used to train the approximation
train_vals : np.ndarray (nvars, 1)
The values of the function at ``train_samples``
nu : string
The parameter :math:`\nu` of the Matern kernel. When :math:`\nu\to\inf`
the Matern kernel is equivalent to the squared-exponential kernel.
alpha : float
Nugget added to diagonal of the covariance kernel evaluated at
the training data. Used to improve numerical conditionining. This
parameter is different to noise_level which applies to both training
and test data
normalize_y : bool
True - normalize the training values to have zero mean and unit
variance
length_scale : float
The initial length scale used to generate the first batch of training
samples
length_scale_bounds : tuple (2)
The lower and upper bound on length_scale used in optimization of
the Gaussian process hyper-parameters
noise_level : float
The noise_level used when training the GP
noise_level_bounds : tuple (2)
The lower and upper bound on noise_level used in optimization of
the Gaussian process hyper-parameters
kernel_variance : float
The kernel_variance used when training the GP
noise_level_bounds : tuple (2)
The lower and upper bound on kernel_variance used in optimization of
the Gaussian process hyper-parameters
n_restarts_optimizer : int
The number of local optimizeation problems solved to find the
GP hyper-parameters
verbose : integer
Controls the amount of information printed to screen
Returns
-------
result : :class:`pyapprox.surrogates.approximate.ApproximateResult`
Result object with the following attributes
approx : :class:`pyapprox.surrogates.gaussianprocess.gaussian_process.GaussianProcess`
The Gaussian process
"""
nvars = train_samples.shape[0]
kernel = __setup_gaussian_process_kernel(
nvars, length_scale, length_scale_bounds,
kernel_variance, kernel_variance_bounds,
noise_level, noise_level_bounds, nu)
gp = GaussianProcess(kernel, n_restarts_optimizer=n_restarts_optimizer,
normalize_y=normalize_y, alpha=alpha)
if var_trans is not None:
gp.set_variable_transformation(var_trans)
gp.fit(train_samples, train_vals)
return ApproximateResult({'approx': gp})
from pyapprox.util.utilities import get_random_k_fold_sample_indices, \
leave_many_out_lsq_cross_validation, leave_one_out_lsq_cross_validation
def cross_validate_approximation(
train_samples, train_vals, options, nfolds, method, random_folds=True):
ntrain_samples = train_samples.shape[1]
if random_folds != 'sklearn':
fold_sample_indices = get_random_k_fold_sample_indices(
ntrain_samples, nfolds, random_folds)
else:
from sklearn.model_selection._split import KFold
sklearn_cv = KFold(nfolds)
# indices = np.arange(train_samples.shape[1])
fold_sample_indices = [
te for tr, te in sklearn_cv.split(train_vals, train_vals)]
approx_list = []
residues_list = []
cv_score = 0
for kk in range(len(fold_sample_indices)):
K = np.ones(ntrain_samples, dtype=bool)
K[fold_sample_indices[kk]] = False
train_samples_kk = train_samples[:, K]
train_vals_kk = train_vals[K, :]
test_samples_kk = train_samples[:, fold_sample_indices[kk]]
test_vals_kk = train_vals[fold_sample_indices[kk]]
approx_kk = approximate(
train_samples_kk, train_vals_kk, method, options).approx
residues = approx_kk(test_samples_kk) - test_vals_kk
approx_list.append(approx_kk)
residues_list.append(residues)
cv_score += np.sum(residues**2, axis=0)
cv_score = np.sqrt(cv_score/ntrain_samples)
return approx_list, residues_list, cv_score
def quadratic_oversampling_ratio(nindices):
return nindices**2
def increment_samples_using_oversampling_ratio(
train_samples, indices, oversampling_ratio, generate_samples):
ndesired_samples = oversampling_ratio(indices.shape[1])
nnew_samples = ndesired_samples-train_samples.shape[1]
new_train_samples = generate_samples(nnew_samples)
return new_train_samples
def increment_samples_using_condition_number(
train_samples, indices, pce, generate_samples, sample_growth_factor,
cond_tol, max_nsamples):
pce.set_indices(indices)
ndesired_samples = indices.shape[1]
cond_num = np.inf
ncurrent_samples = train_samples.shape[1]
new_train_samples = np.zeros((pce.num_vars(), 0))
if train_samples.shape[1] == 0:
basis_matrix = np.zeros((0, indices.shape[1]))
else:
basis_matrix = pce.basis_matrix(train_samples)
while True:
ndesired_samples = min(
int(ndesired_samples*sample_growth_factor), max_nsamples)
if ndesired_samples < ncurrent_samples:
ndesired_samples = int(ncurrent_samples*sample_growth_factor)
assert ndesired_samples > ncurrent_samples
# generate a new increment of samples
nsample_incr = ndesired_samples-(
ncurrent_samples+new_train_samples.shape[1])
# print(ncurrent_samples, nsample_incr, train_samples.shape,
# ndesired_samples, indices.shape)
samples_incr = generate_samples(nsample_incr)
# add increment to total set of new samples
new_train_samples = np.hstack((
new_train_samples, samples_incr))
# compute basis matrix for sample increment
basis_matrix_incr = pce.basis_matrix(samples_incr)
# compute condition number of entire sample set
basis_matrix = np.vstack((basis_matrix, basis_matrix_incr))
cond_num = np.linalg.cond(basis_matrix)
if cond_num <= cond_tol:
break
if ncurrent_samples + new_train_samples.shape[1] >= max_nsamples:
break
return new_train_samples
def adaptive_approximate_polynomial_chaos_increment_degree(
fun, variable, max_degree, max_nsamples=100, cond_tol=None,
sample_growth_factor=2, verbose=0, hcross_strength=1,
oversampling_ratio=quadratic_oversampling_ratio,
solver_type='lasso', linear_solver_options={},
callback=None):
var_trans = AffineTransform(variable)
pce = PolynomialChaosExpansion()
pce_opts = define_poly_options_from_variable_transformation(var_trans)
pce.configure(pce_opts)
if cond_tol is not None and oversampling_ratio is not None:
raise ValueError("cond_tol or over_sampling_ratio must be None")
if cond_tol is None and oversampling_ratio is None:
raise ValueError("cond_tol or over_sampling_ratio must be None")
degree = 0
generate_samples = partial(
generate_independent_random_samples, pce.var_trans.variable)
train_samples = np.zeros((pce.num_vars(), 0))
while True:
if degree > max_degree:
break
if train_samples.shape[1] > max_nsamples:
break
indices = compute_hyperbolic_indices(
pce.num_vars(), degree, hcross_strength)
if cond_tol is not None:
new_train_samples = increment_samples_using_condition_number(
train_samples, indices, pce, generate_samples,
sample_growth_factor, cond_tol, max_nsamples)
else:
new_train_samples = increment_samples_using_oversampling_ratio(
train_samples, indices, oversampling_ratio, generate_samples)
new_train_values = fun(new_train_samples)
if train_samples.shape[1] == 0:
train_values = new_train_values
else:
train_values = np.vstack((train_values, new_train_values))
train_samples = np.hstack((train_samples, new_train_samples))
# Todo allow expanding basis as well
result = approximate_fixed_pce(
pce, train_samples, train_values, indices, verbose,
solver_type, linear_solver_options)
if callback is not None:
callback(result.approx)
# TODO: if requested add exit condition that checks cross validation
# error. For now if want to print cross validation error
# implement that inside a callback
# if nfolds > 0:
# method_opts = {"basis_type": "fixed", "verbose": verbose}
# method = approximate_polynomial_chaos()
# cv_score = cross_validate_approximation(
# train_samples, train_values, method_options, nfolds, method)
degree += 1
result = ApproximateResult(
{'approx': result.approx, 'train_samples': train_samples,
'train_values': train_values})
return result