Source code for pyapprox.approximate

import numpy as np
from pyapprox.adaptive_sparse_grid import variance_refinement_indicator, \
    CombinationSparseGrid, \
    get_sparse_grid_univariate_leja_quadrature_rules_economical, \
    max_level_admissibility_function
from pyapprox.variables import IndependentMultivariateRandomVariable
from pyapprox.variable_transformations import AffineRandomVariableTransformation
from functools import partial

from scipy.optimize import OptimizeResult
[docs]class ApproximateResult(OptimizeResult): pass
[docs]def adaptive_approximate_sparse_grid(fun,univariate_variables,callback=None,refinement_indicator=variance_refinement_indicator,univariate_quad_rule_info=None,max_nsamples=100,tol=0,verbose=0, config_variables_idx=None, config_var_trans=None,cost_function=None,max_level_1d=None): """ Compute a sparse grid approximation of a function. Parameters ---------- fun : callable The function to be minimized ``fun(z) -> np.ndarray`` where ``z`` is a 2D np.ndarray with shape (nvars,nsamples) and the output is a 2D np.ndarray with shape (nsamples,nqoi) univariate_variables : list A list of scipy.stats random variables of size (nvars) callback : callable Function called after each iteration with the signature ``callback(approx_k)`` where approx_k is the current approximation object. refinement_indicator : callable A function that retuns an estimate of the error of a sparse grid subspace with signature ``refinement_indicator(subspace_index,nnew_subspace_samples,sparse_grid) -> float, float`` where ``subspace_index`` is 1D np.ndarray of size (nvars), ``nnew_subspace_samples`` is an integer specifying the number of new samples that will be added to the sparse grid by adding the subspace specified by subspace_index and ``sparse_grid`` is the current :class:`pyapprox.adaptive_sparse_grid.CombinationSparseGrid` object. The two outputs are, respectively, the indicator used to control refinement of the sparse grid and the change in error from adding the current subspace. The indicator is typically but now always dependent on the error. univariate_quad_rule_info : list List containing two entries. The first entry is a list (or single callable) of univariate quadrature rules for each variable with signature ``quad_rule(l)->np.ndarray,np.ndarray`` where the integer ``l`` specifies the level of the quadrature rule and ``x`` and ``w`` are 1D np.ndarray of size (nsamples) containing the quadrature rule points and weights, respectively. The second entry is a list (or single callable) of growth rules with signature ``growth_rule(l)->integer`` where the output ``nsamples`` specifies the number of samples in the quadrature rule of level ``l``. If either entry is a callable then the same quad or growth rule is applied to every variable. max_nsamples : float If ``cost_function==None`` then this argument is the maximum number of evaluations of fun. If fun has configure variables If ``cost_function!=None`` Then max_nsamples is the maximum cost of constructing the sparse grid, i.e. the sum of the cost of evaluating each point in the sparse grid. The ``cost_function!=None` same behavior as ``cost_function==None`` can be obtained by setting cost_function = lambda config_sample: 1. This is particularly useful if ``fun`` has configure variables and evaluating ``fun`` at these different values of these configure variables has different cost. For example if there is one configure variable that can take two different values with cost 0.5, and 1 then 10 samples of both models will be measured as 15 samples and so if max_nsamples is 19 the algorithm will not terminate because even though the number of samples is the sparse grid is 20. tol : float Tolerance for termination. The construction of the sparse grid is terminated when the estimate error in the sparse grid (determined by ``refinement_indicator`` is below tol. verbose : integer Controls messages printed during construction. config_variable_idx : integer The position in a sample array that the configure variables start config_var_trans : pyapprox.adaptive_sparse_grid.ConfigureVariableTransformation An object that takes configure indices in [0,1,2,3...] and maps them to the configure values accepted by ``fun`` cost_function : callable A function with signature ``cost_function(config_sample) -> float`` where config_sample is a np.ndarray of shape (nconfig_vars). The output is the cost of evaluating ``fun`` at ``config_sample``. The units can be anything but the units must be consistent with the units of max_nsamples which specifies the maximum cost of constructing the sparse grid. max_level_1d : np.ndarray (nvars) The maximum level of the sparse grid in each dimension. If None There is no limit Returns ------- result : :class:`pyapprox.approximate.ApproximateResult` Result object with the following attributes approx : :class:`pyapprox.adaptive_sparse_grid.CombinationSparseGrid` The sparse grid approximation """ variable = IndependentMultivariateRandomVariable( univariate_variables) var_trans = AffineRandomVariableTransformation(variable) nvars = var_trans.num_vars() if config_var_trans is not None: nvars += config_var_trans.num_vars() sparse_grid = CombinationSparseGrid(nvars) if univariate_quad_rule_info is None: quad_rules, growth_rules, unique_quadrule_indices = \ get_sparse_grid_univariate_leja_quadrature_rules_economical( var_trans) else: quad_rules,growth_rules=univariate_quad_rule_info unique_quadrule_indices=None if max_level_1d is None: max_level_1d = [np.inf]*nvars assert len(max_level_1d)==nvars admissibility_function = partial( max_level_admissibility_function,np.inf,max_level_1d,max_nsamples, tol,verbose=verbose) sparse_grid.setup( fun, config_variables_idx, refinement_indicator, admissibility_function, growth_rules, quad_rules, var_trans,unique_quadrule_indices=unique_quadrule_indices, verbose=verbose,cost_function=cost_function, config_var_trans=config_var_trans) sparse_grid.build(callback) return ApproximateResult({'approx':sparse_grid})
from pyapprox.adaptive_polynomial_chaos import AdaptiveLejaPCE,\ variance_pce_refinement_indicator from pyapprox.variables import is_bounded_continuous_variable from pyapprox.univariate_quadrature import clenshaw_curtis_rule_growth
[docs]def adaptive_approximate_polynomial_chaos(fun,univariate_variables,callback=None,refinement_indicator=variance_pce_refinement_indicator,growth_rules=None,max_nsamples=100,tol=0,verbose=0,ncandidate_samples=1e4,generate_candidate_samples=None): r""" Compute an adaptive Polynomial Chaos Expansion of a function. Parameters ---------- fun : callable The function to be minimized ``fun(z) -> np.ndarray`` where ``z`` is a 2D np.ndarray with shape (nvars,nsamples) and the output is a 2D np.ndarray with shape (nsamples,nqoi) univariate_variables : list A list of scipy.stats random variables of size (nvars) callback : callable Function called after each iteration with the signature ``callback(approx_k)`` where approx_k is the current approximation object. refinement_indicator : callable A function that retuns an estimate of the error of a sparse grid subspace with signature ``refinement_indicator(subspace_index,nnew_subspace_samples,sparse_grid) -> float, float`` where ``subspace_index`` is 1D np.ndarray of size (nvars), ``nnew_subspace_samples`` is an integer specifying the number of new samples that will be added to the sparse grid by adding the subspace specified by subspace_index and ``sparse_grid`` is the current :class:`pyapprox.adaptive_sparse_grid.CombinationSparseGrid` object. The two outputs are, respectively, the indicator used to control refinement of the sparse grid and the change in error from adding the current subspace. The indicator is typically but now always dependent on the error. growth_rules : list or callable a list (or single callable) of growth rules with signature ``growth_rule(l)->integer`` where the output ``nsamples`` specifies the number of indices of the univariate basis of level ``l``. If the entry is a callable then the same growth rule is applied to every variable. max_nsamples : integer The maximum number of evaluations of fun. tol : float Tolerance for termination. The construction of the sparse grid is terminated when the estimate error in the sparse grid (determined by ``refinement_indicator`` is below tol. verbose : integer Controls messages printed during construction. ncandidate_samples : integer The number of candidate samples used to generate the Leja sequence The Leja sequence will be a subset of these samples. generate_candidate_samples : callable A function that generates the candidate samples used to build the Leja sequence with signature ``generate_candidate_samples(ncandidate_samples) -> np.ndarray`` The output is a 2D np.ndarray with size(nvars,ncandidate_samples) Returns ------- result : :class:`pyapprox.approximate.ApproximateResult` Result object with the following attributes approx : :class:`pyapprox.multivariate_polynomials.PolynomialChaosExpansion` The PCE approximation """ variable = IndependentMultivariateRandomVariable( univariate_variables) var_trans = AffineRandomVariableTransformation(variable) nvars = var_trans.num_vars() bounded_variables = True for rv in univariate_variables: if not is_bounded_continuous_variable(rv): bounded_variables = False msg = "For now leja sampling based PCE is only supported for bounded continouous random variablesfor now leja sampling based PCE is only supported for bounded continouous random variables" if generate_candidate_samples is None: raise Exception (msg) else: break if generate_candidate_samples is None: # Todo implement default for non-bounded variables that uses induced # sampling # candidate samples must be in canonical domain from pyapprox import halton_sequence candidate_samples = -np.cos( np.pi*halton_sequence(nvars,1,int(ncandidate_samples+1))) #candidate_samples = -np.cos( # np.random.uniform(0,np.pi,(nvars,int(ncandidate_samples)))) else: candidate_samples = generate_candidate_samples(ncandidate_samples) pce = AdaptiveLejaPCE( nvars,candidate_samples,factorization_type='fast') pce.verbose=verbose admissibility_function = partial( max_level_admissibility_function,np.inf,[np.inf]*nvars,max_nsamples, tol,verbose=verbose) pce.set_function(fun,var_trans) if growth_rules is None: growth_rules = clenshaw_curtis_rule_growth pce.set_refinement_functions( refinement_indicator,admissibility_function,growth_rules) pce.build(callback) return ApproximateResult({'approx':pce})
from pyapprox.probability_measure_sampling import \ generate_independent_random_samples
[docs]def compute_l2_error(f,g,variable,nsamples,rel=False): r""" Compute the :math:`\ell^2` error of the output of two functions f and g, i.e. .. math:: \lVert f(z)-g(z)\rVert\approx \sum_{m=1}^M f(z^{(m)}) from a set of random draws :math:`\mathcal{Z}=\{z^{(m)}\}_{m=1}^M` from the PDF of :math:`z`. Parameters ---------- f : callable Function with signature ``g(z) -> np.ndarray`` where ``z`` is a 2D np.ndarray with shape (nvars,nsamples) and the output is a 2D np.ndarray with shaoe (nsamples,nqoi) g : callable Function with signature ``f(z) -> np.ndarray`` where ``z`` is a 2D np.ndarray with shape (nvars,nsamples) and the output is a 2D np.ndarray with shaoe (nsamples,nqoi) variable : pya.IndependentMultivariateRandomVariable Object containing information of the joint density of the inputs z. This is used to generate random samples from this join density nsamples : integer The number of samples used to compute the :math:`\ell^2` error rel : boolean True - compute relative error False - compute absolute error Returns ------- error : np.ndarray (nqoi) """ validation_samples = generate_independent_random_samples(variable,nsamples) validation_vals = f(validation_samples) approx_vals = g(validation_samples) assert validation_vals.shape==approx_vals.shape error=np.linalg.norm(approx_vals-validation_vals,axis=0) if not rel: error /=np.sqrt(validation_samples.shape[1]) else: error /=np.linalg.norm(validation_vals,axis=0) return error
[docs]def adaptive_approximate(fun,variable,method,options=None): r""" Adaptive approximation of a scalar or vector-valued function of one or more variables. These methods choose the samples to at which to evaluate the function being approximated. Parameters ---------- fun : callable The function to be minimized ``fun(z) -> np.ndarray`` where ``z`` is a 2D np.ndarray with shape (nvars,nsamples) and the output is a 2D np.ndarray with shaoe (nsamples,nqoi) method : string Type of approximation. Should be one of - 'sparse_grid' - 'polynomial_chaos' - 'gaussian_process' Returns ------- result : :class:`pyapprox.approximate.ApproximateResult` Result object. For more details see - :func:`pyapprox.approximate.adaptive_approximate_sparse_grid` - :func:`pyapprox.approximate.adaptive_approximate_polynomial_chaos` """ methods = {'sparse_grid':adaptive_approximate_sparse_grid, 'polynomial_chaos':adaptive_approximate_polynomial_chaos} if method not in methods: msg = f'Method {method} not found.\n Available methods are:\n' for key in methods.keys(): msg += f"\t{key}\n" raise Exception(msg) if options is None: options = {} return methods[method](fun,variable,**options)
[docs]def approximate_polynomial_chaos(train_samples,train_vals,verbosity=0, basis_type='expanding_basis', variable=None,options=None): r""" Compute a Polynomial Chaos Expansion of a function from a fixed data set. Parameters ---------- train_samples : np.ndarray (nvars,nsamples) The inputs of the function used to train the approximation train_vals : np.ndarray (nvars,nsamples) The values of the function at ``train_samples`` basis_type : string Type of approximation. Should be one of - 'expanding_basis' see :func:`pyapprox.approximate.cross_validate_pce_degree` - 'hyperbolic_cross' see :func:`pyapprox.approximate.expanding_basis_omp_pce` variable : pya.IndependentMultivariateRandomVariable Object containing information of the joint density of the inputs z. This is used to generate random samples from this join density verbosity : integer Controls the amount of information printed to screen Returns ------- result : :class:`pyapprox.approximate.ApproximateResult` Result object. For more details see - :func:`pyapprox.approximate.cross_validate_pce_degree` - :func:`pyapprox.approximate.expanding_basis_omp_pce` """ funcs = {'expanding_basis':expanding_basis_omp_pce, 'hyperbolic_cross':cross_validate_pce_degree} if variable is None: msg = 'pce requires that variable be defined' raise Exception(msg) if basis_type not in funcs: msg = f'Basis type {basis_type} not found.\n Available types are:\n' for key in funcs.keys(): msg += f"\t{key}\n" raise Exception(msg) from pyapprox.multivariate_polynomials import PolynomialChaosExpansion, \ define_poly_options_from_variable_transformation var_trans = AffineRandomVariableTransformation(variable) poly = PolynomialChaosExpansion() poly_opts = define_poly_options_from_variable_transformation( var_trans) poly.configure(poly_opts) if options is None: options = {} res = funcs[basis_type](poly,train_samples,train_vals,**options) return res
[docs]def approximate(train_samples,train_vals,method,options=None): r""" Approximate a scalar or vector-valued function of one or more variables from a set of points provided by the user Parameters ---------- train_samples : np.ndarray (nvars,nsamples) The inputs of the function used to train the approximation train_vals : np.ndarray (nvars,nsamples) The values of the function at ``train_samples`` method : string Type of approximation. Should be one of - 'polynomial_chaos' - 'gaussian_process' Returns ------- result : :class:`pyapprox.approximate.ApproximateResult` """ methods = {'polynomial_chaos':approximate_polynomial_chaos, 'gaussian_process':approximate_gaussian_process} #'tensor-train':approximate_tensor_train, if method not in methods: msg = f'Method {method} not found.\n Available methods are:\n' for key in methods.keys(): msg += f"\t{key}\n" raise Exception(msg) if options is None: options = {} return methods[method](train_samples,train_vals,**options)
from sklearn.linear_model import LassoCV, LassoLarsCV, LarsCV, \ OrthogonalMatchingPursuitCV
[docs]def fit_linear_model(basis_matrix,train_vals,solver_type,**kwargs): solvers = {'lasso_lars':LassoLarsCV(cv=kwargs['cv']).fit, 'lasso':LassoCV.fit, 'lars':LarsCV.fit,'omp':OrthogonalMatchingPursuitCV.fit} assert train_vals.ndim==2 if solver_type in solvers: fit = solvers[solver_type] res = fit(basis_matrix,train_vals[:,0]) else: msg = f'Solver type {solver_type} not supported\n' msg += 'Supported solvers are:\n' for key in solvers.keys(): msg += f'\t{key}\n' raise Exception(msg) cv_score = res.score(basis_matrix,train_vals[:,0]) coef = res.coef_[:,np.newaxis]; coef[0]=res.intercept_ return coef, cv_score
import copy from pyapprox import compute_hyperbolic_indices
[docs]def cross_validate_pce_degree( pce,train_samples,train_vals,min_degree=1,max_degree=3, hcross_strength=1, cv=10,solver_type='lasso_lars',verbosity=0): r""" Use cross validation to find the polynomial degree which best fits the data. A polynomial is constructed for each degree and the degree with the highest cross validation score is returned. Parameters ---------- train_samples : np.ndarray (nvars,nsamples) The inputs of the function used to train the approximation train_vals : np.ndarray (nvars,nsamples) The values of the function at ``train_samples`` min_degree : integer The minimum degree to consider min_degree : integer The maximum degree to consider. All degrees in ``range(min_degree,max_deree+1)`` are considered hcross_strength : float The strength of the hyperbolic cross index set. hcross_strength must be in (0,1]. A value of 1 produces total degree polynomials cv : integer The number of cross validation folds used to compute the cross validation error solver_type : string The type of regression used to train the polynomial - 'lasso_lars' - 'lars' - 'lasso' - 'omp' verbosity : integer Controls the amount of information printed to screen Returns ------- result : :class:`pyapprox.approximate.ApproximateResult` Result object with the following attributes approx : :class:`pyapprox.multivariate_polynomials.PolynomialChaosExpansion` The PCE approximation scores : np.ndarray (nqoi) The best cross validation score for each QoI degrees : np.ndarray (nqoi) The best degree for each QoI """ coefs = [] scores = [] indices = [] degrees = [] indices_dict=dict() unique_indices=[] nqoi = train_vals.shape[1] for ii in range(nqoi): if verbosity>1: print(f'Approximating QoI: {ii}') pce_ii,score_ii,degree_ii = _cross_validate_pce_degree( pce,train_samples,train_vals[:,ii:ii+1],min_degree,max_degree, hcross_strength,cv,solver_type,verbosity) coefs.append(pce_ii.get_coefficients()) scores.append(score_ii) indices.append(pce_ii.get_indices()) degrees.append(degree_ii) for index in indices[ii].T: key = hash_array(index) if key not in indices_dict: indices_dict[key]=len(unique_indices) unique_indices.append(index) unique_indices = np.array(unique_indices).T all_coefs = np.zeros((unique_indices.shape[1],nqoi)) for ii in range(nqoi): for jj,index in enumerate(indices[ii].T): key = hash_array(index) all_coefs[indices_dict[key],ii]=coefs[ii][jj,0] pce.set_indices(unique_indices) pce.set_coefficients(all_coefs) return ApproximateResult({'approx':pce,'scores':np.array(scores), 'degrees':np.array(degrees)})
def _cross_validate_pce_degree( pce,train_samples,train_vals,min_degree=1,max_degree=3, hcross_strength=1, cv=10,solver_type='lasso_lars',verbosity=0): assert train_vals.shape[1]==1 num_samples = train_samples.shape[1] if min_degree is None: min_degree = 2 if max_degree is None: max_degree = np.iinfo(int).max-1 best_coef = None best_cv_score = -np.finfo(np.double).max best_degree = min_degree prev_num_terms = 0 if verbosity>0: print ("{:<8} {:<10} {:<18}".format('degree','num_terms','cv score',)) for degree in range(min_degree,max_degree+1): indices = compute_hyperbolic_indices( pce.num_vars(),degree,hcross_strength) pce.set_indices(indices) if ((pce.num_terms()>100000) and (100000-prev_num_terms<pce.num_terms()-100000) ): break basis_matrix = pce.basis_matrix(train_samples) coef, cv_score = fit_linear_model( basis_matrix,train_vals,solver_type,cv=cv) pce.set_coefficients(coef) if verbosity>0: print("{:<8} {:<10} {:<18} ".format(degree,pce.num_terms(),cv_score)) if ( cv_score > best_cv_score ): best_cv_score = cv_score best_coef = coef.copy() best_degree = degree if ( ( cv_score >= best_cv_score ) and ( degree-best_degree > 1 ) ): break prev_num_terms = pce.num_terms() pce.set_indices(compute_hyperbolic_indices( pce.num_vars(),best_degree,hcross_strength)) pce.set_coefficients(best_coef) if verbosity>0: print ('best degree:', best_degree) return pce, best_cv_score, best_degree
[docs]def restrict_basis(indices,coefficients,tol): I = np.where(np.absolute(coefficients)>tol)[0] restricted_indices = indices[:,I] degrees = indices.sum(axis=0) J = np.where(degrees==0)[0] assert J.shape[0]==1 if J not in I: #always include zero degree polynomial in restricted_indices restricted_indices = np.concatenate([indices[:J],restrict_indices]) return restricted_indices
from pyapprox import hash_array, get_forward_neighbor, get_backward_neighbor
[docs]def expand_basis(indices): nvars,nindices=indices.shape indices_set = set() for ii in range(nindices): indices_set.add(hash_array(indices[:,ii])) new_indices = [] for ii in range(nindices): index = indices[:,ii] active_vars = np.nonzero(index) for dd in range(nvars): forward_index = get_forward_neighbor(index,dd) key = hash_array(forward_index) if key not in indices_set: admissible=True for kk in active_vars: backward_index = get_backward_neighbor(forward_index,kk) if hash_array(backward_index) not in indices_set: admissible=False break if admissible: indices_set.add(key) new_indices.append(forward_index) return np.asarray(new_indices).T
[docs]def expanding_basis_omp_pce(pce, train_samples, train_vals, hcross_strength=1, verbosity=1,max_num_terms=None, solver_type='lasso_lars',cv=10, restriction_tol=np.finfo(float).eps*2): r""" Iteratively expand and restrict the polynomial basis and use cross validation to find the best basis [JESJCP2015]_ Parameters ---------- train_samples : np.ndarray (nvars,nsamples) The inputs of the function used to train the approximation train_vals : np.ndarray (nvars,nqoi) The values of the function at ``train_samples`` hcross_strength : float The strength of the hyperbolic cross index set. hcross_strength must be in (0,1]. A value of 1 produces total degree polynomials cv : integer The number of cross validation folds used to compute the cross validation error solver_type : string The type of regression used to train the polynomial - 'lasso_lars' - 'lars' - 'lasso' - 'omp' verbosity : integer Controls the amount of information printed to screen restriction_tol : float The tolerance used to prune inactive indices Returns ------- result : :class:`pyapprox.approximate.ApproximateResult` Result object with the following attributes approx : :class:`pyapprox.multivariate_polynomials.PolynomialChaosExpansion` The PCE approximation scores : np.ndarray (nqoi) The best cross validation score for each QoI References ---------- .. [JESJCP2015] `J.D. Jakeman, M.S. Eldred, and K. Sargsyan. Enhancing l1-minimization estimates of polynomial chaos expansions using basis selection. Journal of Computational Physics, 289(0):18 – 34, 2015 <https://doi.org/10.1016/j.jcp.2015.02.025>`_ """ coefs = [] scores = [] indices = [] indices_dict=dict() unique_indices=[] nqoi = train_vals.shape[1] for ii in range(nqoi): if verbosity>1: print(f'Approximating QoI: {ii}') pce_ii,score_ii = _expanding_basis_omp_pce( pce, train_samples, train_vals[:,ii:ii+1], hcross_strength, verbosity,max_num_terms,solver_type,cv,restriction_tol) coefs.append(pce_ii.get_coefficients()) scores.append(score_ii) indices.append(pce_ii.get_indices()) for index in indices[ii].T: key = hash_array(index) if key not in indices_dict: indices_dict[key]=len(unique_indices) unique_indices.append(index) unique_indices = np.array(unique_indices).T all_coefs = np.zeros((unique_indices.shape[1],nqoi)) for ii in range(nqoi): for jj,index in enumerate(indices[ii].T): key = hash_array(index) all_coefs[indices_dict[key],ii]=coefs[ii][jj,0] pce.set_indices(unique_indices) pce.set_coefficients(all_coefs) return ApproximateResult({'approx':pce,'scores':np.array(scores)})
def _expanding_basis_omp_pce(pce, train_samples, train_vals, hcross_strength=1, verbosity=1,max_num_terms=None, solver_type='lasso_lars',cv=10, restriction_tol=np.finfo(float).eps*2): assert train_vals.shape[1]==1 num_vars = pce.num_vars() if max_num_terms is None: max_num_terms = 10*train_vals.shape[1] degree = 2 prev_num_terms = 0 while True: indices =compute_hyperbolic_indices(num_vars, degree, hcross_strength) num_terms = indices.shape[1] if ( num_terms > max_num_terms ): break degree += 1 prev_num_terms = num_terms if ( abs( num_terms - max_num_terms ) > abs( prev_num_terms - max_num_terms ) ): degree -=1 pce.set_indices( compute_hyperbolic_indices(num_vars, degree, hcross_strength)) if verbosity>0: msg = f'Initializing basis with hyperbolic cross of degree {degree} and ' msg += f' strength {hcross_strength} with {pce.num_terms()} terms' print(msg) basis_matrix = pce.basis_matrix(train_samples) best_coef, best_cv_score = fit_linear_model( basis_matrix,train_vals,solver_type,cv=cv) pce.set_coefficients(best_coef) best_indices = pce.get_indices() if verbosity>0: print ("{:<10} {:<10} {:<18}".format('nterms', 'nnz terms', 'cv score')) print ("{:<10} {:<10} {:<18}".format( pce.num_terms(),np.count_nonzero(pce.coefficients),best_cv_score)) best_cv_score_iter = best_cv_score best_num_expansion_steps = 3 it = 0 best_it = 0 while True: max_num_expansion_steps = 1 best_num_expansion_steps_iter = best_num_expansion_steps while True: # -------------- # # Expand basis # # -------------- # num_expansion_steps = 0 indices=restrict_basis(pce.indices,pce.coefficients,restriction_tol) while ( ( num_expansion_steps < max_num_expansion_steps ) and ( num_expansion_steps < best_num_expansion_steps ) ): new_indices = expand_basis(pce.indices) pce.set_indices(np.hstack([pce.indices,new_indices])) num_terms = pce.num_terms() num_expansion_steps += 1 # -----------------# # Compute solution # # -----------------# basis_matrix = pce.basis_matrix(train_samples) coef, cv_score = fit_linear_model( basis_matrix,train_vals,solver_type,cv=cv) pce.set_coefficients(coef) if verbosity>0: print ("{:<10} {:<10} {:<18}".format( pce.num_terms(),np.count_nonzero(pce.coefficients),cv_score)) if ( cv_score > best_cv_score_iter ): best_cv_score_iter = cv_score best_indices_iter = pce.indices.copy() best_coef_iter = pce.coefficients.copy() best_num_expansion_steps_iter = num_expansion_steps if ( num_terms >= max_num_terms ): break if ( max_num_expansion_steps >= 3 ): break max_num_expansion_steps += 1 if ( best_cv_score_iter > best_cv_score): best_cv_score = best_cv_score_iter best_coef = best_coef_iter.copy() best_indices = best_indices_iter.copy() best_num_expansion_steps = best_num_expansion_steps_iter best_it = it elif ( it - best_it >= 2 ): break it += 1 nindices = best_indices.shape[1] I = np.nonzero(best_coef[:,0])[0] pce.set_indices(best_indices[:,I]) pce.set_coefficients(best_coef[I]) if verbosity>0: msg=f'Final basis has {pce.num_terms()} terms selected from {nindices}' msg+=f' using {train_samples.shape[1]} samples' print(msg) return pce, best_cv_score
[docs]def approximate_gaussian_process(train_samples,train_vals,nu=np.inf,n_restarts_optimizer=5,verbosity=0): r""" Compute a Gaussian process approximation of a function from a fixed data set using the Matern kernel .. math:: k(z_i, z_j) = \frac{1}{\Gamma(\nu)2^{\nu-1}}\Bigg( \frac{\sqrt{2\nu}}{l} \lVert z_i - z_j \rVert_2\Bigg)^\nu K_\nu\Bigg( \frac{\sqrt{2\nu}}{l} \lVert z_i - z_j \rVert_2\Bigg) where :math:`\lVert \cdot \rVert_2` is the Euclidean distance, :math:`\Gamma(\cdot)` is the gamma function, :math:`K_\nu(\cdot)` is the modified Bessel function. Parameters ---------- train_samples : np.ndarray (nvars,nsamples) The inputs of the function used to train the approximation train_vals : np.ndarray (nvars,nsamples) The values of the function at ``train_samples`` kernel_nu : string The parameter :math:`\nu` of the Matern kernel. When :math:`\nu\to\inf` the Matern kernel is equivalent to the squared-exponential kernel. n_restarts_optimizer : int The number of local optimizeation problems solved to find the GP hyper-parameters verbosity : integer Controls the amount of information printed to screen Returns ------- result : :class:`pyapprox.approximate.ApproximateResult` Result object with the following attributes approx : :class:`pyapprox.gaussian_process.GaussianProcess` The Gaussian process """ from sklearn.gaussian_process.kernels import Matern, WhiteKernel from pyapprox.gaussian_process import GaussianProcess kernel = Matern(length_scale_bounds=(1e-2, 10), nu=nu) # optimize variance kernel = 1*kernel # optimize gp noise nvars = train_samples.shape[0] length_scale = np.array([1]*nvars) kernel += WhiteKernel(length_scale,noise_level_bounds=(1e-8, 1)) gp = GaussianProcess(kernel,n_restarts_optimizer=n_restarts_optimizer) gp.fit(train_samples,train_vals) return ApproximateResult({'approx':gp})