Source code for pyapprox.util.utilities

from warnings import warn
from functools import partial

import numpy as np
from numpy.polynomial.legendre import leggauss
from scipy.special import roots_hermitenorm

from scipy.linalg import solve_triangular
from scipy.linalg import lapack

import matplotlib.pyplot as plt

from pyapprox.util.pya_numba import njit
from pyapprox.util.sys_utilities import hash_array
from pyapprox.util.sys_utilities import has_kwarg


def sub2ind(sizes, multi_index):
    r"""
    Map a d-dimensional index to the scalar index of the equivalent flat
    1D array

    Examples
    --------

    .. math::

       \begin{bmatrix}
       0,0 & 0,1 & 0,2\\
       1,0 & 1,1 & 1,2\\
       2,0 & 2,1 & 2,2
       \end{bmatrix}
       \rightarrow
       \begin{bmatrix}
       0 & 3 & 6\\
       1 & 4 & 7\\
       2 & 5 & 8
       \end{bmatrix}

    >>> from pyapprox.util.utilities import sub2ind
    >>> sizes = [3,3]
    >>> ind = sub2ind(sizes,[1,0])
    >>> print(ind)
    1

    Parameters
    ----------
    sizes : integer
        The number of elems in each dimension. For a 2D index
        sizes = [numRows, numCols]

    multi_index : np.ndarray (len(sizes))
       The d-dimensional index

    Returns
    -------
    scalar_index : integer
        The scalar index

    See Also
    --------
    pyapprox.util.sub2ind
    """
    num_sets = len(sizes)
    scalar_index = 0
    shift = 1
    for ii in range(num_sets):
        scalar_index += shift * multi_index[ii]
        shift *= sizes[ii]
    return scalar_index


def ind2sub(sizes, scalar_index, num_elems):
    r"""
    Map a scalar index of a flat 1D array to the equivalent d-dimensional index

    Examples
    --------

    .. math::

        \begin{bmatrix}
        0 & 3 & 6\\
        1 & 4 & 7\\
        2 & 5 & 8
        \end{bmatrix}
        \rightarrow
        \begin{bmatrix}
        0,0 & 0,1 & 0,2\\
        1,0 & 1,1 & 1,2\\
        2,0 & 2,1 & 2,2
        \end{bmatrix}

    >>> from pyapprox.util.utilities import ind2sub
    >>> sizes = [3,3]
    >>> sub = ind2sub(sizes,1,9)
    >>> print(sub)
    [1 0]

    Parameters
    ----------
    sizes : integer
        The number of elems in each dimension. For a 2D index
        sizes = [numRows, numCols]

    scalar_index : integer
        The scalar index

    num_elems : integer
        The total number of elements in the d-dimensional matrix

    Returns
    -------
    multi_index : np.ndarray (len(sizes))
       The d-dimensional index

    See Also
    --------
    pyapprox.util.sub2ind
    """
    denom = num_elems
    num_sets = len(sizes)
    multi_index = np.empty((num_sets), dtype=int)
    for ii in range(num_sets-1, -1, -1):
        denom /= sizes[ii]
        multi_index[ii] = scalar_index / denom
        scalar_index = scalar_index % denom
    return multi_index


[docs]def cartesian_product(input_sets, elem_size=1): r""" Compute the cartesian product of an arbitray number of sets. The sets can consist of numbers or themselves be lists or vectors. All the lists or vectors of a given set must have the same number of entries (elem_size). However each set can have a different number of scalars, lists, or vectors. Parameters ---------- input_sets The sets to be used in the cartesian product. elem_size : integer The size of the vectors within each set. Returns ------- result : np.ndarray (num_sets*elem_size, num_elems) The cartesian product. num_elems = np.prod(sizes)/elem_size, where sizes[ii] = len(input_sets[ii]), ii=0,..,num_sets-1. result.dtype will be set to the first entry of the first input_set """ import itertools out = [] # ::-1 reverse order to be backwards compatiable with old # function below for r in itertools.product(*input_sets[::-1]): out.append(r) out = np.asarray(out).T[::-1, :] return out
# try: # from pyapprox.cython.utilities import cartesian_product_pyx # # # fused type does not work for np.in32, np.float32, np.int64 # # # so envoke cython cast # # if np.issubdtype(input_sets[0][0],np.signedinteger): # # return cartesian_product_pyx(input_sets,1,elem_size) # # if np.issubdtype(input_sets[0][0],np.floating): # # return cartesian_product_pyx(input_sets,1.,elem_size) # # else: # # return cartesian_product_pyx( # # input_sets,input_sets[0][0],elem_size) # # always convert to float then cast back # cast_input_sets = [np.asarray(s, dtype=float) for s in input_sets] # out = cartesian_product_pyx(cast_input_sets, 1., elem_size) # out = np.asarray(out, dtype=input_sets[0].dtype) # return out # except: # print('cartesian_product extension failed') # num_elems = 1 # num_sets = len(input_sets) # sizes = np.empty((num_sets), dtype=int) # for ii in range(num_sets): # sizes[ii] = input_sets[ii].shape[0]/elem_size # num_elems *= sizes[ii] # # try: # # from pyapprox.weave import c_cartesian_product # # # note c_cartesian_product takes_num_elems as last arg and cython # # # takes elem_size # # return c_cartesian_product(input_sets, elem_size, sizes, num_elems) # # except: # # print ('cartesian_product extension failed') # result = np.empty( # (num_sets*elem_size, num_elems), dtype=type(input_sets[0][0])) # for ii in range(num_elems): # multi_index = ind2sub(sizes, ii, num_elems) # for jj in range(num_sets): # for kk in range(elem_size): # result[jj*elem_size+kk, ii] =\ # input_sets[jj][multi_index[jj]*elem_size+kk] # return result
[docs]def outer_product(input_sets, axis=0): r""" Construct the outer product of an arbitary number of sets. Examples -------- .. math:: \{1,2\}\times\{3,4\}=\{1\times3, 2\times3, 1\times4, 2\times4\} = \{3, 6, 4, 8\} Parameters ---------- input_sets The sets to be used in the outer product Returns ------- result : np.ndarray(np.prod(sizes)) The outer product of the sets. result.dtype will be set to the first entry of the first input_set """ out = cartesian_product(input_sets) return np.prod(out, axis=axis)
# try: # from pyapprox.cython.utilities import outer_product_pyx # # fused type does not work for np.in32, np.float32, np.int64 # # so envoke cython cast # if np.issubdtype(input_sets[0][0], np.signedinteger): # return outer_product_pyx(input_sets, 1) # if np.issubdtype(input_sets[0][0], np.floating): # return outer_product_pyx(input_sets, 1.) # else: # return outer_product_pyx(input_sets, input_sets[0][0]) # except ImportError: # print('outer_product extension failed') # num_elems = 1 # num_sets = len(input_sets) # sizes = np.empty((num_sets), dtype=int) # for ii in range(num_sets): # sizes[ii] = len(input_sets[ii]) # num_elems *= sizes[ii] # # try: # # from pyapprox.weave import c_outer_product # # return c_outer_product(input_sets) # # except: # # print ('outer_product extension failed') # result = np.empty((num_elems), dtype=type(input_sets[0][0])) # for ii in range(num_elems): # result[ii] = 1.0 # multi_index = ind2sub(sizes, ii, num_elems) # for jj in range(num_sets): # result[ii] *= input_sets[jj][multi_index[jj]] # return result def unique_matrix_row_indices(matrix): unique_row_indices = [] unique_rows_set = set() for ii in range(matrix.shape[0]): key = hash_array(matrix[ii, :]) if key not in unique_rows_set: unique_rows_set.add(key) unique_row_indices.append(ii) return np.array(unique_row_indices) def unique_matrix_rows(matrix): return matrix[unique_matrix_row_indices(matrix)] # unique_rows = [] # unique_rows_set = set() # for ii in range(matrix.shape[0]): # key = hash_array(matrix[ii, :]) # if key not in unique_rows_set: # unique_rows_set.add(key) # unique_rows.append(matrix[ii, :]) # return np.asarray(unique_rows) def common_matrix_rows(matrix): unique_rows_dict = dict() for ii in range(matrix.shape[0]): key = hash_array(matrix[ii, :]) if key not in unique_rows_dict: unique_rows_dict[key] = [ii] else: unique_rows_dict[key].append(ii) return unique_rows_dict def remove_common_rows(matrices): num_cols = matrices[0].shape[1] unique_rows_dict = dict() for ii in range(len(matrices)): matrix = matrices[ii] assert matrix.shape[1] == num_cols for jj in range(matrix.shape[0]): key = hash_array(matrix[jj, :]) if key not in unique_rows_dict: unique_rows_dict[key] = (ii, jj) elif unique_rows_dict[key][0] != ii: del unique_rows_dict[key] # else: # entry is a duplicate entry in the current. Allow this to # occur but only add one of the duplicates to the unique rows dict unique_rows = [] for key in list(unique_rows_dict.keys()): ii, jj = unique_rows_dict[key] unique_rows.append(matrices[ii][jj, :]) return np.asarray(unique_rows) def allclose_unsorted_matrix_rows(matrix1, matrix2): if matrix1.shape != matrix2.shape: return False matrix1_dict = dict() for ii in range(matrix1.shape[0]): key = hash_array(matrix1[ii, :]) # allow duplicates of rows if key not in matrix1_dict: matrix1_dict[key] = 0 else: matrix1_dict[key] += 1 matrix2_dict = dict() for ii in range(matrix2.shape[0]): key = hash_array(matrix2[ii, :]) # allow duplicates of rows if key not in matrix2_dict: matrix2_dict[key] = 0 else: matrix2_dict[key] += 1 if len(list(matrix1_dict.keys())) != len(list(matrix2_dict.keys())): return False for key in list(matrix1_dict.keys()): if key not in matrix2_dict: return False if matrix2_dict[key] != matrix1_dict[key]: return False return True def get_2d_cartesian_grid(num_pts_1d, ranges): r""" Get a 2d tensor grid with equidistant points. Parameters ---------- num_pts_1d : integer The number of points in each dimension ranges : np.ndarray (4) The lower and upper bound of each dimension [lb_1,ub_1,lb_2,ub_2] Returns ------- grid : np.ndarray (2,num_pts_1d**2) The points in the tensor product grid. [x1,x2,...x1,x2...] [y1,y1,...y2,y2...] """ # from math_tools_cpp import cartesian_product_double as cartesian_product from PyDakota.math_tools import cartesian_product x1 = np.linspace(ranges[0], ranges[1], num_pts_1d) x2 = np.linspace(ranges[2], ranges[3], num_pts_1d) abscissa_1d = [] abscissa_1d.append(x1) abscissa_1d.append(x2) grid = cartesian_product(abscissa_1d, 1) return grid def nchoosek(nn, kk): try: # SciPy >= 0.19 from scipy.special import comb except ImportError: from scipy.misc import comb result = np.asarray(np.round(comb(nn, kk)), dtype=int) if result.ndim == 0: result = result.item() # result = np.asscalar(result) return result def lists_of_arrays_equal(list1, list2): if len(list1) != len(list2): return False for ll in range(len(list1)): if not np.allclose(list1[ll], list2[ll]): return False return True def lists_of_lists_of_arrays_equal(list1, list2): if len(list1) != len(list2): return False for ll in range(len(list1)): for kk in range(len(list1[ll])): if not np.allclose(list1[ll][kk], list2[ll][kk]): return False return True def get_all_primes_less_than_or_equal_to_n(n): primes = list() primes.append(2) for num in range(3, n+1, 2): if all(num % i != 0 for i in range(2, int(num**.5) + 1)): primes.append(num) return np.asarray(primes) @njit(cache=True) def get_first_n_primes(n): primes = list() primes.append(2) num = 3 while len(primes) < n: # np.all does not work with numba # if np.all([num % i != 0 for i in range(2, int(num**.5) + 1)]): flag = True for i in range(2, int(num**.5) + 1): if (num % i == 0): flag = False break if flag is True: primes.append(num) num += 2 return np.asarray(primes) def approx_fprime(x, func, eps=np.sqrt(np.finfo(float).eps)): r"""Approx the gradient of a vector valued function at a single sample using finite_difference """ assert x.shape[1] == 1 nvars = x.shape[0] fprime = [] func_at_x = func(x).squeeze() # if func_at_x.ndim == 2: # func_at_x = func_at_x[:, 0] assert func_at_x.ndim < 2 for ii in range(nvars): x_plus_eps = x.copy() x_plus_eps[ii] += eps fprime.append((func(x_plus_eps).squeeze()-func_at_x)/eps) return np.array(fprime) def partial_functions_equal(func1, func2): if not (isinstance(func1, partial) and isinstance(func2, partial)): return False are_equal = all([getattr(func1, attr) == getattr(func2, attr) for attr in ['func', 'args', 'keywords']]) return are_equal def get_all_sample_combinations(samples1, samples2): r""" For two sample sets of different random variables loop over all combinations samples1 vary slowest and samples2 vary fastest Let samples1 = [[1,2],[2,3]] samples2 = [[0, 0, 0],[0, 1, 2]] Then samples will be ([1, 2, 0, 0, 0]) ([1, 2, 0, 1, 2]) ([3, 4, 0, 0, 0]) ([3, 4, 0, 1, 2]) """ import itertools samples = [] for r in itertools.product(*[samples1.T, samples2.T]): samples.append(np.concatenate(r)) return np.asarray(samples).T def get_correlation_from_covariance(cov): r""" Compute the correlation matrix from a covariance matrix Parameters ---------- cov : np.ndarray (nrows,nrows) The symetric covariance matrix Returns ------- cor : np.ndarray (nrows,nrows) The symetric correlation matrix Examples -------- >>> cov = np.asarray([[2,-1],[-1,2]]) >>> get_correlation_from_covariance(cov) array([[ 1. , -0.5], [-0.5, 1. ]]) """ stdev_inv = 1/np.sqrt(np.diag(cov)) cor = stdev_inv[np.newaxis, :]*cov*stdev_inv[:, np.newaxis] return cor def evaluate_quadratic_form(matrix, samples): r""" Evaluate x.T.dot(A).dot(x) for several vectors x Parameters ---------- num_samples : np.ndarray (nvars,nsamples) The vectors x matrix : np.ndarray(nvars,nvars) The matrix A Returns ------- vals : np.ndarray (nsamples) Evaluations of the quadratic form for each vector x """ return (samples.T.dot(matrix)*samples.T).sum(axis=1) def split_dataset(samples, values, ndata1): """ Split a data set into two sets. Parameters ---------- samples : np.ndarray (nvars,nsamples) The samples to be split values : np.ndarray (nsamples,nqoi) Values of the data at ``samples`` ndata1 : integer The number of samples allocated to the first split. All remaining samples will be added to the second split. Returns ------- samples1 : np.ndarray (nvars,ndata1) The samples of the first split data set values1 : np.ndarray (nvars,ndata1) The values of the first split data set samples2 : np.ndarray (nvars,ndata1) The samples of the first split data set values2 : np.ndarray (nvars,ndata1) The values of the first split data set """ assert ndata1 <= samples.shape[1] assert values.shape[0] == samples.shape[1] II = np.random.permutation(samples.shape[1]) samples1 = samples[:, II[:ndata1]] samples2 = samples[:, II[ndata1:]] values1 = values[II[:ndata1], :] values2 = values[II[ndata1:], :] return samples1, samples2, values1, values2 def leave_one_out_lsq_cross_validation(basis_mat, values, alpha=0, coef=None): """ let :math:`x_i` be the ith row of :math:`X` and let :math:`\beta=(X^\top X)^{-1}X^\top y` such that the residuals at the training samples satisfy .. math:: r_i = X\beta-y then the leave one out cross validation errors are given by .. math:: e_i = \frac{r_i}{1-h_i} where :math:`h_i = x_i^\top(X^\top X)^{-1}x_i` """ assert values.ndim == 2 assert basis_mat.shape[0] > basis_mat.shape[1]+2 gram_mat = basis_mat.T.dot(basis_mat) gram_mat += alpha*np.eye(gram_mat.shape[0]) H_mat = basis_mat.dot(np.linalg.inv(gram_mat).dot(basis_mat.T)) H_diag = np.diag(H_mat) if coef is None: coef = np.linalg.lstsq( gram_mat, basis_mat.T.dot(values), rcond=None)[0] assert coef.ndim == 2 residuals = basis_mat.dot(coef) - values cv_errors = residuals / (1-H_diag[:, None]) cv_score = np.sqrt(np.sum(cv_errors**2, axis=0)/basis_mat.shape[0]) return cv_errors, cv_score, coef def leave_many_out_lsq_cross_validation(basis_mat, values, fold_sample_indices, alpha=0, coef=None): nfolds = len(fold_sample_indices) nsamples = basis_mat.shape[0] cv_errors = [] cv_score = 0 gram_mat = basis_mat.T.dot(basis_mat) gram_mat += alpha*np.eye(gram_mat.shape[0]) if coef is None: coef = np.linalg.lstsq( gram_mat, basis_mat.T.dot(values), rcond=None)[0] residuals = basis_mat.dot(coef) - values gram_mat_inv = np.linalg.inv(gram_mat) for kk in range(nfolds): indices_kk = fold_sample_indices[kk] nvalidation_samples_kk = indices_kk.shape[0] assert nsamples - nvalidation_samples_kk >= basis_mat.shape[1] basis_mat_kk = basis_mat[indices_kk, :] residuals_kk = residuals[indices_kk, :] H_mat = np.eye(nvalidation_samples_kk) - basis_mat_kk.dot( gram_mat_inv.dot(basis_mat_kk.T)) # print('gram_mat cond number', np.linalg.cond(gram_mat)) # print('H_mat cond number', np.linalg.cond(H_mat)) H_mat_inv = np.linalg.inv(H_mat) cv_errors.append(H_mat_inv.dot(residuals_kk)) cv_score += np.sum(cv_errors[-1]**2, axis=0) return cv_errors, np.sqrt(cv_score/basis_mat.shape[0]), coef def get_random_k_fold_sample_indices(nsamples, nfolds, random=True): sample_indices = np.arange(nsamples) if random is True: sample_indices = np.random.permutation(sample_indices) fold_sample_indices = [np.empty(0, dtype=int) for kk in range(nfolds)] nn = 0 while nn < nsamples: for jj in range(nfolds): fold_sample_indices[jj] = np.append( fold_sample_indices[jj], sample_indices[nn]) nn += 1 if nn >= nsamples: break assert np.unique(np.hstack(fold_sample_indices)).shape[0] == nsamples return fold_sample_indices def get_cross_validation_rsquared_coefficient_of_variation( cv_score, train_vals): r""" cv_score = :math:`N^{-1/2}\left(\sum_{n=1}^N e_n\right^{1/2}` where :math:`e_n` are the cross validation residues at each test point and :math:`N` is the number of traing vals We define r_sq as .. math:: 1-\frac{N^{-1}\left(\sum_{n=1}^N e_n\right)}/mathbb{V}\left[ Y\right] where Y is the vector of training vals """ # total sum of squares (proportional to variance) denom = np.std(train_vals) # the factors of 1/N in numerator and denominator cancel out rsq = 1-(cv_score/denom)**2 return rsq def __integrate_using_univariate_gauss_legendre_quadrature_bounded( integrand, lb, ub, nquad_samples, rtol=1e-8, atol=1e-8, verbose=0, adaptive=True, tabulated_quad_rules=None): """ tabulated_quad_rules : dictionary each entry is a tuple (x,w) of gauss legendre with weight function p(x)=1 defined on [-1,1]. The number of points in x is defined by the key. User must ensure that the dictionary contains any nquad_samples that may be requested """ # Adaptive # nquad_samples = 10 prev_res = np.inf it = 0 while True: if (tabulated_quad_rules is None or nquad_samples not in tabulated_quad_rules): xx_canonical, ww_canonical = leggauss(nquad_samples) else: xx_canonical, ww_canonical = tabulated_quad_rules[nquad_samples] xx = (xx_canonical+1)/2*(ub-lb)+lb ww = ww_canonical*(ub-lb)/2 res = integrand(xx).T.dot(ww).T diff = np.absolute(prev_res-res) if verbose > 1: print(it, nquad_samples, diff) if (np.all(np.absolute(prev_res-res) < rtol*np.absolute(res)+atol) or adaptive is False): break prev_res = res nquad_samples *= 2 it += 1 if verbose > 0: print(f'adaptive quadrature converged in {it} iterations') return res def integrate_using_univariate_gauss_legendre_quadrature_unbounded( integrand, lb, ub, nquad_samples, atol=1e-8, rtol=1e-8, interval_size=2, max_steps=1000, verbose=0, adaptive=True, soft_error=False, tabulated_quad_rules=None): """ Compute unbounded integrals by moving left and right from origin. Assume that integral decays towards +/- infinity. And that once integral over a sub interval drops below tolerance it will not increase again if we keep moving in same direction. """ if interval_size <= 0: raise ValueError("Interval size must be positive") if np.isfinite(lb) and np.isfinite(ub): partial_lb, partial_ub = lb, ub elif np.isfinite(lb) and not np.isfinite(ub): partial_lb, partial_ub = lb, lb+interval_size elif not np.isfinite(lb) and np.isfinite(ub): partial_lb, partial_ub = ub-interval_size, ub else: partial_lb, partial_ub = -interval_size/2, interval_size/2 result = __integrate_using_univariate_gauss_legendre_quadrature_bounded( integrand, partial_lb, partial_ub, nquad_samples, rtol, atol, verbose-1, adaptive, tabulated_quad_rules) step = 0 partial_result = np.inf plb, pub = partial_lb-interval_size, partial_lb while (np.any(np.absolute(partial_result) >= rtol*np.absolute(result)+atol) and (plb >= lb) and step < max_steps): partial_result = \ __integrate_using_univariate_gauss_legendre_quadrature_bounded( integrand, plb, pub, nquad_samples, rtol, atol, verbose-1, adaptive, tabulated_quad_rules) result += partial_result pub = plb plb -= interval_size step += 1 if verbose > 1: print('Left', step, result, partial_result, plb, pub, interval_size) if verbose > 0: if step >= max_steps: msg = "Early termination when computing left integral" msg += f"max_steps {max_steps} reached" if soft_error is True: warn(msg, UserWarning) else: raise RuntimeError(msg) if np.all(np.abs(partial_result) < rtol*np.absolute(result)+atol): msg = f'Tolerance {atol} {rtol} for left integral reached in ' msg += f'{step} iterations' print(msg) step = 0 partial_result = np.inf plb, pub = partial_ub, partial_ub+interval_size while (np.any(np.absolute(partial_result) >= rtol*np.absolute(result)+atol) and (pub <= ub) and step < max_steps): partial_result = \ __integrate_using_univariate_gauss_legendre_quadrature_bounded( integrand, plb, pub, nquad_samples, rtol, atol, verbose-1, adaptive, tabulated_quad_rules) result += partial_result plb = pub pub += interval_size step += 1 if verbose > 1: print('Right', step, result, partial_result, plb, pub, interval_size) if verbose > 0: if step >= max_steps: msg = "Early termination when computing right integral. " msg += f"max_steps {max_steps} reached" if soft_error is True: warn(msg, UserWarning) else: raise RuntimeError(msg) if np.all(np.abs(partial_result) < rtol*np.absolute(result)+atol): msg = f'Tolerance {atol} {rtol} for right integral reached in ' msg += f'{step} iterations' print(msg) # print(partial_result, plb, pub) return result def qr_solve(Q, R, rhs): """ Find the least squares solution Ax = rhs given a QR factorization of the matrix A Parameters ---------- Q : np.ndarray (nrows, nrows) The unitary/upper triangular Q factor R : np.ndarray (nrows, ncols) The upper triangular R matrix rhs : np.ndarray (nrows, nqoi) The right hand side vectors Returns ------- x : np.ndarray (nrows, nqoi) The solution """ tmp = np.dot(Q.T, rhs) return solve_triangular(R, tmp, lower=False) def equality_constrained_linear_least_squares(A, B, y, z): """ Solve equality constrained least squares regression minimize || y - A*x ||_2 subject to B*x = z It is assumed that Parameters ---------- A : np.ndarray (M, N) P <= N <= M+P, and B : np.ndarray (N, P) P <= N <= M+P, and y : np.ndarray (M, 1) P <= N <= M+P, and z : np.ndarray (P, 1) P <= N <= M+P, and Returns ------- x : np.ndarray (N, 1) The solution """ return lapack.dgglse(A, B, y, z)[3] def extract_sub_list(mylist, indices): """ Extract a subset of items from a list Parameters ---------- mylist : list(nitems) The list containing all items indices : iterable (nindices) The indices of the desired items Returns ------- subset : list (nindices) The extracted items """ return [mylist[ii] for ii in indices] def unique_elements_from_2D_list(list_2d): """ Extract the unique elements from a list of lists Parameters ---------- list_2d : list(list) The list of lists Returns ------- unique_items : list (nunique_items) The unique items """ return list(set(flatten_2D_list(list_2d))) def flatten_2D_list(list_2d): """ Flatten a list of lists into a single list Parameters ---------- list_2d : list(list) The list of lists Returns ------- flattened_list : list (nitems) The unique items """ return [item for sub in list_2d for item in sub] def approx_jacobian(func, x, *args, epsilon=np.sqrt(np.finfo(float).eps)): x0 = np.asfarray(x) assert x0.ndim == 1 or x0.shape[1] == 1 f0 = np.atleast_1d(func(*((x0,)+args))) if f0.ndim == 2: assert f0.shape[1] == 1 f0 = f0[:, 0] jac = np.zeros([len(x0), len(f0)]) dx = np.zeros(x0.shape) for i in range(len(x0)): dx[i] = epsilon f1 = func(*((x0+dx,)+args)) if f1.ndim == 2: assert f1.shape[1] == 1 f1 = f1[:, 0] jac[i] = (f1 - f0)/epsilon dx[i] = 0.0 return jac.transpose() def _check_gradients(fun, zz, direction, plot, disp, rel, fd_eps): function_val, directional_derivative = fun(zz, direction) if isinstance(function_val, np.ndarray): function_val = function_val.squeeze() if fd_eps is None: fd_eps = np.logspace(-13, 0, 14)[::-1] errors = [] row_format = "{:<12} {:<25} {:<25} {:<25}" if disp: if rel: print( row_format.format( "Eps", "norm(jv)", "norm(jv_fd)", "Rel. Errors")) else: print(row_format.format( "Eps", "norm(jv)", "norm(jv_fd)", "Abs. Errors")) row_format = "{:<12.2e} {:<25} {:<25} {:<25}" for ii in range(fd_eps.shape[0]): zz_perturbed = zz.copy()+fd_eps[ii]*direction # perturbed_function_val = fun(zz_perturbed) # add jac=False so that exact gradient is not always computed perturbed_function_val = fun(zz_perturbed, direction=None) if isinstance(perturbed_function_val, np.ndarray): perturbed_function_val = perturbed_function_val.squeeze() # print(inspect.getfullargspec(fun).args) # print(perturbed_function_val, function_val, fd_eps[ii]) fd_directional_derivative = ( perturbed_function_val-function_val)/fd_eps[ii] # print(fd_directional_derivative) errors.append(np.linalg.norm( fd_directional_derivative.reshape(directional_derivative.shape) - directional_derivative)) if rel: errors[-1] /= np.linalg.norm(directional_derivative) if disp: print(row_format.format( fd_eps[ii], np.linalg.norm(directional_derivative), np.linalg.norm(fd_directional_derivative), errors[ii])) if plot: plt.loglog(fd_eps, errors, 'o-') plt.ylabel(r'$\lvert\nabla_\epsilon f\cdot p-\nabla f\cdot p\rvert$') plt.xlabel(r'$\epsilon$') plt.show() return np.asarray(errors) def _wrap_function_with_gradient(fun, return_grad): if ((return_grad is not None) and not callable(return_grad) and (return_grad != "return_gradp") and (return_grad != True)): raise ValueError("return_grad must be callable, 'jacp', or None") if callable(return_grad): def fun_wrapper(x, direction=None): if direction is None: return fun(x) return fun(x), return_grad(x).dot(direction) return fun_wrapper if return_grad == True and has_kwarg(fun, "return_grad"): # this is PyApprox's preferred convention def fun_wrapper(x, direction=None): if direction is None: val = fun(x, return_grad=False) return val vals, grad = fun(x, return_grad=True) return vals, grad.dot(direction) return fun_wrapper if return_grad == True: def fun_wrapper(x, direction=None): if direction is None: return fun(x)[0] vals, grad = fun(x) return vals, grad.dot(direction) return fun_wrapper if return_grad == "jacp": assert has_kwarg(fun, "return_grad") # this is PyApprox's other preferred convention def fun_wrapper(x, direction=None): if direction is None: return fun(x, return_grad=False) val, grad = fun(x, return_grad=True) return fun(x), grad.dot(direction) return fun_wrapper return fun
[docs]def check_gradients(fun, jac, zz, plot=False, disp=True, rel=True, direction=None, fd_eps=None): """ Compare a user specified jacobian with the jacobian computed with finite difference with multiple step sizes. Parameters ---------- fun : callable A function with one of the following signatures ``fun(z) -> (vals)`` or ``fun(z, jac) -> (vals, grad)`` or ``fun(z, direction) -> (vals, directional_grad)`` where ``z`` is a 2D np.ndarray with shape (nvars, 1) and the first output is a 2D np.ndarray with shape (nqoi, 1) and the second output is a gradient with shape (nqoi, nvars). jac is a flag that specifies if the function returns only the funciton value (False) or the function value and gradient (True) jac : callable or string If jac="jacp" then provided the jacobian of ``fun`` with signature ``jac(z) -> np.ndarray`` where ``z`` is a 2D np.ndarray with shape (nvars, 1) and the output is a 2D np.ndarray with shape (nqoi, nvars). This assumes that fun only returns a value (not gradient) and has signature ``fun(z) -> np.ndarray`` zz : np.ndarray (nvars, 1) A sample of ``z`` at which to compute the gradient plot : boolean Plot the errors as a function of the finite difference step size disp : boolean True - print the errors False - do not print rel : boolean True - compute the relative error in the directional derivative, i.e. the absolute error divided by the directional derivative using ``jac``. False - compute the absolute error in the directional derivative direction : np.ndarray (nvars, 1) Direction to which Jacobian is applied. Default is None in which case random direction is chosen. fd_eps : np.ndarray (nstep_sizes) The finite difference step sizes used to compute the gradient. If None then fd_eps=np.logspace(-13, 0, 14)[::-1] Returns ------- errors : np.ndarray (14, nqoi) The errors in the directional derivative of ``fun`` at 14 different values of finite difference tolerance for each quantity of interest """ assert zz.ndim == 2 assert zz.shape[1] == 1 fun_wrapper = _wrap_function_with_gradient(fun, jac) if direction is None: direction = np.random.normal(0, 1, (zz.shape[0], 1)) direction /= np.linalg.norm(direction) assert direction.ndim == 2 and direction.shape[1] == 1 return _check_gradients( fun_wrapper, zz, direction, plot, disp, rel, fd_eps)
def check_hessian(jac, hessian_matvec, zz, plot=False, disp=True, rel=True, direction=None, fd_eps=np.logspace(-13, 0, 14)[::-1]): """ Compare a user specified Hessian matrix-vector product with the Hessian matrix vector produced computed with finite difference with multiple step sizes using a user specified jacobian. Parameters --------- jac : callable The jacobian with signature ``jac(z) -> np.ndarray`` where ``z`` is a 2D np.ndarray with shape (nvars,1) and the output is a 2D np.ndarray with shape (nqoi,nvars) hessian_matvec : callable A function implementing the hessian matrix-vector product with signature ``hessian_matvec(z,p) -> np.ndarray`` where ``z`` is a 2D np.ndarray with shape (nvars,1), ``p`` is an arbitrary vector with shape (nvars,1) and the output is a 2D np.ndarray with shape (nqoi,nvars) zz : np.ndarray (nvars,1) A sample of ``z`` at which to compute the gradient plot : boolean Plot the errors as a function of the finite difference step size disp : boolean True - print the errors False - do not print rel : boolean True - compute the relative error in the directional derivative, i.e. the absolute error divided by the directional derivative using ``jac``. False - compute the absolute error in the directional derivative direction : np.ndarray (nvars, 1) Direction to which Hessian is applied. Default is None in which case random direction is chosen. Returns ------- errors : np.ndarray (14, nqoi) The errors in the directional derivative of ``jac`` at 14 different values of finite difference tolerance for each quantity of interest """ assert zz.ndim == 2 assert zz.shape[1] == 1 grad = jac(zz) if direction is None: direction = np.random.normal(0, 1, (zz.shape[0], 1)) direction /= np.linalg.norm(direction) directional_derivative = hessian_matvec(zz, direction) errors = [] row_format = "{:<12} {:<25} {:<25} {:<25}" if disp: if rel: print( row_format.format( "Eps", "norm(jv)", "norm(jv_fd)", "Rel. Errors")) else: print(row_format.format( "Eps", "norm(jv)", "norm(jv_fd)", "Abs. Errors")) for ii in range(fd_eps.shape[0]): zz_perturbed = zz.copy()+fd_eps[ii]*direction perturbed_grad = jac(zz_perturbed) fd_directional_derivative = (perturbed_grad-grad)/fd_eps[ii] # print(directional_derivative, fd_directional_derivative) errors.append(np.linalg.norm( fd_directional_derivative.reshape(directional_derivative.shape) - directional_derivative)) if rel: errors[-1] /= np.linalg.norm(directional_derivative) if disp: print(row_format.format(fd_eps[ii], np.linalg.norm(directional_derivative), np.linalg.norm(fd_directional_derivative), errors[ii])) # print(fd_directional_derivative,directional_derivative) if plot: plt.loglog(fd_eps, errors, 'o-') label = r'$\lvert\nabla^2_\epsilon \cdot p f-\nabla^2 f\cdot p\rvert$' plt.ylabel(label) plt.xlabel(r'$\epsilon$') plt.show() return np.asarray(errors) def scipy_gauss_hermite_pts_wts_1D(nn): x, w = roots_hermitenorm(nn) w /= np.sqrt(2*np.pi) return x, w def scipy_gauss_legendre_pts_wts_1D(nn): x, w = np.polynomial.legendre.leggauss(nn) w *= 0.5 return x, w def get_tensor_product_quadrature_rule( nsamples, num_vars, univariate_quadrature_rules, transform_samples=None, density_function=None): r""" if get error about outer product failing it may be because univariate_quadrature rule is returning a weights array for every level, i.e. l=0,...level """ nsamples = np.atleast_1d(nsamples) if nsamples.shape[0] == 1 and num_vars > 1: nsamples = np.array([nsamples[0]]*num_vars, dtype=int) if callable(univariate_quadrature_rules): univariate_quadrature_rules = [univariate_quadrature_rules]*num_vars x_1d = [] w_1d = [] for ii in range(len(univariate_quadrature_rules)): x, w = univariate_quadrature_rules[ii](nsamples[ii]) x_1d.append(x) w_1d.append(w) samples = cartesian_product(x_1d, 1) weights = outer_product(w_1d) if density_function is not None: weights *= density_function(samples) if transform_samples is not None: samples = transform_samples(samples) return samples, weights def split_indices(nelems, nsplits): indices = np.hstack(( np.full((nelems % nsplits), nelems//nsplits+1), np.full(nsplits-(nelems % nsplits), nelems//nsplits))) return np.hstack((0, np.cumsum(indices)))