Source code for pyapprox.control_variate_monte_carlo

"""
Functions for estimating expectations using frequentist control-variate Monte-Carlo based methods such as multi-level Monte-Carlo, control-variate Monte-Carlo, and approximate control-variate Monte-Carlo.
"""
import numpy as np, os
from scipy.optimize import minimize
try:
    #use torch to compute gradients for sample allocation optimization
    import torch
    use_torch=True
except:
    #msg = 'Could not import Torch'
    #print(msg)
    use_torch=False
    
import copy
from pyapprox.utilities import get_all_sample_combinations
from functools import partial

[docs]def compute_correlations_from_covariance(cov): """ Compute the correlation matrix of a covariance matrix. Parameters ---------- cov : np.ndarray (nmodels,nmodels) The covariance C between each of the models. The highest fidelity model is the first model, i.e its variance is cov[0,0] Returns ------- corr : np.ndarray (nmodels,nmodels) The correlation matrix """ corr_sqrt = np.diag(1/np.sqrt((np.diag(cov)))) corr = np.dot(corr_sqrt, np.dot(cov, corr_sqrt)) return corr
[docs]def standardize_sample_ratios(nhf_samples,nsample_ratios): """ Ensure num high fidelity samples is positive (>0) and then recompute sample ratios. This is useful when num high fidelity samples and sample ratios are computed by an optimization process. This function is useful for optimization problems with a numerical or analytical solution. Parameters ---------- nhf_samples : integer The number of samples of the high fidelity model nsample_ratios : np.ndarray (nmodels-1) The sample ratios r used to specify the number of samples of the lower fidelity models, e.g. N_i = r_i*nhf_samples, i=1,...,nmodels-1 Returns ------- nhf_samples : integer The corrected number of samples of the high fidelity model nsample_ratios : np.ndarray (nmodels-1) The corrected sample ratios """ nsamples = np.array([r*nhf_samples for r in nsample_ratios]) nhf_samples = int(max(1,np.floor(nhf_samples))) nsample_ratios = np.floor(nsamples)/nhf_samples #nhf_samples = int(max(1,np.round(nhf_samples))) #nsample_ratios = [max(np.round(nn/nhf_samples),0) for nn in nsamples] return nhf_samples, np.asarray(nsample_ratios)
[docs]def get_variance_reduction(get_rsquared,cov,nsample_ratios): r""" Compute the variance reduction: .. math:: \gamma = 1-r^2 Parameters ---------- cov : np.ndarray (nmodels,nmodels) The covariance C between each of the models. The highest fidelity model is the first model, i.e its variance is cov[0,0] nsample_ratios : np.ndarray (nmodels-1) The sample ratios r used to specify the number of samples of the lower fidelity models, e.g. N_i = r_i*nhf_samples, i=1,...,nmodels-1 Returns ------- gamma : float The variance reduction """ return 1-get_rsquared(cov,nsample_ratios)
[docs]def get_control_variate_rsquared(cov): r""" Compute :math:`r^2` used to compute the variance reduction of control variate Monte Carlo .. math:: \gamma = 1-r^2, \qquad r^2 = c^TC^{-1}c where c is the first column of C Parameters ---------- cov : np.ndarray (nmodels,nmodels) The covariance C between each of the models. The highest fidelity model is the first model, i.e its variance is cov[0,0] Returns ------- rsquared : float The value :math:`r^2` """ nmodels = cov.shape[0] rsquared = cov[0,1:].dot(np.linalg.solve(cov[1:, 1:], cov[1:, 0])) rsquared /= cov[0,0] return rsquared
[docs]def get_rsquared_mfmc(cov,nsample_ratios): r""" Compute r^2 used to compute the variance reduction of Multifidelity Monte Carlo (MFMC) Parameters ---------- cov : np.ndarray (nmodels,nmodels) The covariance C between each of the models. The highest fidelity model is the first model, i.e its variance is cov[0,0] nsample_ratios : np.ndarray (nmodels-1) The sample ratios r used to specify the number of samples of the lower fidelity models, e.g. N_i = r_i*nhf_samples, i=1,...,nmodels-1 Returns ------- rsquared : float The value r^2 """ nmodels = cov.shape[0] assert len(nsample_ratios)==nmodels-1 rsquared=(nsample_ratios[0]-1)/(nsample_ratios[0])*cov[0,1]/( cov[0,0]*cov[1,1])*cov[0,1] for ii in range(1,nmodels-1): p1 = (nsample_ratios[ii]-nsample_ratios[ii-1])/( nsample_ratios[ii]*nsample_ratios[ii-1]) p1 *= cov[0,ii+1]/(cov[0,0]*cov[ii+1,ii+1])*cov[0,ii+1] rsquared += p1 return rsquared
[docs]def get_rsquared_mlmc(cov,nsample_ratios,pkg=np): r""" Compute r^2 used to compute the variance reduction of Multilevel Monte Carlo (MLMC) See Equation 2.24 in ARXIV paper where alpha_i=-1 for all i Parameters ---------- cov : np.ndarray (nmodels,nmodels) The covariance C between each of the models. The highest fidelity model is the first model, i.e its variance is cov[0,0] nsample_ratios : np.ndarray (nmodels-1) The sample ratios r used to specify the number of samples of the lower fidelity models, e.g. N_i = r_i*nhf_samples, i=1,...,nmodels-1. The values r_i correspond to eta_i in Equation 2.24 Returns ------- gamma : float The variance reduction """ nmodels = cov.shape[0] assert len(nsample_ratios)==nmodels-1 gamma = 0.0 rhat = pkg.ones(nmodels) for ii in range(1, nmodels): rhat[ii] = nsample_ratios[ii-1] - rhat[ii-1] for ii in range(nmodels-1): vardelta = cov[ii, ii] + cov[ii+1, ii+1] - 2*cov[ii, ii+1] gamma += vardelta / (rhat[ii]) v = cov[nmodels-1, nmodels-1] gamma += v / (rhat[-1]) gamma /= cov[0, 0] return 1-gamma
[docs]def get_mlmc_control_variate_weights(nmodels): r""" Get the weights used by the MLMC control variate estimator Returns ------- weights : np.ndarray (nmodels-1) The control variate weights """ return -np.ones(nmodels-1)
[docs]def compute_approximate_control_variate_mean_estimate(weights,values): r""" Use approximate control variate Monte Carlo to estimate the mean of high-fidelity data with low-fidelity models with unknown means Parameters ---------- values : list (nmodels) Each entry of the list contains values0 : np.ndarray (num_samples_i0,num_qoi) Evaluations of each model used to compute the estimator :math:`Q_{i,N}` of values1: np.ndarray (num_samples_i1,num_qoi) Evaluations used compute the approximate mean :math:`\mu_{i,r_iN}` of the low fidelity models. weights : np.ndarray (nmodels-1) the control variate weights Returns ------- est : float The control variate estimate of the mean """ nmodels = len(values) assert len(values)==nmodels # high fidelity monte carlo estimate of mean est = values[0][0].mean() for ii in range(nmodels-1): est += weights[ii]*(values[ii+1][0].mean()-values[ii+1][1].mean()) return est
[docs]def compute_control_variate_mean_estimate(weights,values,lf_means): r""" Use control variate Monte Carlo to estimate the mean of high-fidelity data with low-fidelity models with known means Parameters ---------- values : list (nmodels) Each entry of the list contains values0 : np.ndarray (num_samples_i0,num_qoi) Evaluations of each model used to compute the estimator :math:`Q_{i,N}` of weights : np.ndarray (nmodels-1) the control variate weights lf_means : np.ndarray (nmodels-1): The known means of the low fidelity models Returns ------- est : float The control variate estimate of the mean """ nmodels = len(values) assert len(values)==nmodels # high fidelity monte carlo estimate of mean est = values[0].mean() for ii in range(nmodels-1): est += weights[ii]*(values[ii+1].mean()-lf_means[ii]) return est
[docs]def allocate_samples_mfmc(cov, costs, target_cost, standardize=True): r""" Determine the samples to be allocated to each model when using MFMC Parameters ---------- cov : np.ndarray (nmodels,nmodels) The covariance C between each of the models. The highest fidelity model is the first model, i.e its variance is cov[0,0] costs : np.ndarray (nmodels) The relative costs of evaluating each model target_cost : float The total cost budget standardize : boolean If true make sure that nhf_samples is an integer and that nhf_samples*nsamples_ratios are integers. False is only ever used for testing. Returns ------- nhf_samples : integer The number of samples of the high fidelity model nsample_ratios : np.ndarray (nmodels-1) The sample ratios r used to specify the number of samples of the lower fidelity models, e.g. N_i=r_i*nhf_samples, i=1,...,nmodels-1 log10_variance : float The base 10 logarithm of the variance of the estimator """ nmodels = cov.shape[0] corr = compute_correlations_from_covariance(cov) I = np.argsort(np.absolute(corr[0,1:]))[::-1] if not np.allclose(I, np.arange(nmodels-1)): msg = 'Models must be ordered with decreasing correlation with ' msg += 'high-fidelity model' raise Exception(msg) r = [] for ii in range(nmodels-1): # Step 3 in Algorithm 2 in Peherstorfer et al 2016 num = costs[0] * (corr[0, ii]**2 - corr[0, ii+1]**2) den = costs[ii] * ( 1 - corr[0, 1]**2) r.append(np.sqrt(num/den)) num = costs[0]*corr[0,-1]**2 den = costs[-1] * (1 - corr[0, 1]**2) r.append(np.sqrt(num/den)) # Step 4 in Algorithm 2 in Peherstorfer et al 2016 nhf_samples = target_cost / np.dot(costs, r) nhf_samples = max(nhf_samples, 1) nsample_ratios = r[1:] if standardize: nhf_samples, nsample_ratios = standardize_sample_ratios( nhf_samples, nsample_ratios) gamma = get_variance_reduction(get_rsquared_mfmc,cov,nsample_ratios) log10_variance = np.log10(gamma)+np.log10(cov[0, 0])-np.log10( nhf_samples) return nhf_samples, np.atleast_1d(nsample_ratios), log10_variance
[docs]def allocate_samples_mlmc(cov, costs, target_cost, standardize=True): r""" Determine the samples to be allocated to each model when using MLMC Parameters ---------- cov : np.ndarray (nmodels,nmodels) The covariance C between each of the models. The highest fidelity model is the first model, i.e its variance is cov[0,0] costs : np.ndarray (nmodels) The relative costs of evaluating each model target_cost : float The total cost budget standardize : boolean If true make sure that nhf_samples is an integer and that nhf_samples*nsamples_ratios are integers. False is only ever used for testing. Returns ------- nhf_samples : integer The number of samples of the high fidelity model nsample_ratios : np.ndarray (nmodels-1) The sample ratios r used to specify the number of samples of the lower fidelity models, e.g. N_i = r_i*nhf_samples, i=1,...,nmodels-1. For model i>0 nsample_ratio*nhf_samples equals the number of samples in the two different discrepancies involving the ith model. log10_variance : float The base 10 logarithm of the variance of the estimator """ nmodels = cov.shape[0] sum1 = 0.0 nsamples = [] vardeltas=[] for ii in range(nmodels-1): # compute the variance of the discrepancy vardelta = cov[ii, ii] + cov[ii+1, ii+1] - 2*cov[ii, ii+1] vardeltas.append(vardelta) # compute the variance * cost vc = vardelta * (costs[ii] + costs[ii+1]) # compute the unnormalized number of samples\ # these values will be normalized by lamda later nsamp = np.sqrt(vardelta / (costs[ii] + costs[ii+1])) nsamples.append(nsamp) sum1 += np.sqrt(vc) I = np.argsort(vardeltas) #assert np.allclose(I,np.arange(nmodels-1)) # compute information for lowest fidelity model v = cov[nmodels-1, nmodels-1] c = costs[nmodels-1] nsamples.append(np.sqrt(v/c)) sum1 += np.sqrt(v*c) # compute the ML estimator variance from the target cost variance = sum1**2 / target_cost # compute the lagrangian parameter sqrt_lamda = sum1/variance # compute the number of samples allocated to resolving each # discrepancy. nl = [sqrt_lamda * n for n in nsamples] # compute the number of samples allocated to each model. For # all but the highest fidelity model we need to collect samples # from two discrepancies. nhf_samples = nl[0] nsample_ratios = [] for ii in range(1, nmodels-1): nsample_ratios.append((nl[ii-1] + nl[ii])/nl[0]) if nmodels>1: nsample_ratios.append((nl[-2]+nl[-1])/nl[0]) nsample_ratios = np.asarray(nsample_ratios) if standardize: nhf_samples = max(nhf_samples, 1) nhf_samples, nsample_ratios = standardize_sample_ratios( nhf_samples, nsample_ratios) gamma = get_variance_reduction(get_rsquared_mlmc,cov,nsample_ratios) log10_variance=np.log10(gamma)+np.log10(cov[0, 0])-np.log10( nhf_samples) #print(log10_variance) if np.isnan(log10_variance): raise Exception('MLMC variance is NAN') return nhf_samples, np.atleast_1d(nsample_ratios), log10_variance
[docs]def get_lagrange_multiplier_mlmc(cov,costs,nhf_samples): r""" Given an optimal sample allocation recover the optimal value of the Lagrange multiplier. This is only used for testing """ ii=0 # 0th discrepancy var_delta = cov[ii, ii] + cov[ii+1, ii+1] - 2*cov[ii, ii+1] cost_delta = (costs[ii] + costs[ii+1]) lagrange_mult=nhf_samples**2/(var_delta/cost_delta) return lagrange_mult
[docs]def get_discrepancy_covariances_IS(cov,nsample_ratios,pkg=np): r""" Get the covariances of the discrepancies :math:`\delta` between each low-fidelity model and its estimated mean when the same :math:`N` samples are used to compute the covariance between each models and :math:`N-r_\alpha` samples are allocated to estimate the low-fidelity means, and each of these sets are drawn independently from one another. Parameters ---------- cov : np.ndarray (nmodels,nmodels) The estimated covariance between each model. nsample_ratios : iterable (nmodels-1) The sample ratioss :math:`r_\alpha>1` for each low-fidelity model pkg : package (optional) A python package (numpy or torch) used to store the covariances. Results ------- CF : np.ndarray (nmodels-1,nmodels-1) The matrix of covariances between the discrepancies :math:`\delta` cf : np.ndarray (nmodels-1) The vector of covariances between the discrepancies and the high-fidelity model. """ nmodels = cov.shape[0] F = pkg.zeros((nmodels-1, nmodels-1), dtype=pkg.double) for ii in range(nmodels-1): F[ii, ii]=(nsample_ratios[ii]-1)/nsample_ratios[ii] for jj in range(ii+1,nmodels-1): F[ii, jj] = (nsample_ratios[ii]-1)/nsample_ratios[ii] * ( nsample_ratios[jj]-1)/nsample_ratios[jj] F[jj, ii] = F[ii, jj] CF = cov[1:,1:] * F cf = pkg.diag(F) * cov[1:, 0] return CF,cf
[docs]def get_discrepancy_covariances_MF(cov,nsample_ratios,pkg=np): r""" Get the covariances of the discrepancies :math:`\delta` between each low-fidelity model and its estimated mean using the MFMC sampling strategy. Parameters ---------- cov : np.ndarray (nmodels,nmodels) The estimated covariance between each model. nsample_ratios : iterable (nmodels-1) The sample ratioss :math:`r_\alpha>1` for each low-fidelity model pkg : package (optional) A python package (numpy or torch) used to store the covariances. Results ------- CF : np.ndarray (nmodels-1,nmodels-1) The matrix of covariances between the discrepancies :math:`\delta` cf : np.ndarray (nmodels-1) The vector of covariances between the discrepancies and the high-fidelity model. """ nmodels = cov.shape[0] F = pkg.zeros((nmodels-1, nmodels-1), dtype=pkg.double) for ii in range(nmodels-1): for jj in range(nmodels-1): rr = min(nsample_ratios[ii],nsample_ratios[jj]) F[ii, jj] = (rr - 1) / rr CF = cov[1:,1:] * F cf = pkg.diag(F) * cov[1:, 0] return CF,cf
[docs]def get_discrepancy_covariances_KL(cov,nsample_ratios,K,L,pkg=np): r""" Get the covariances of the discrepancies :math:`\delta` between each low-fidelity model and its estimated mean using the MFMC sampling strategy and the ACV KL estimator. The ACV-KL estimator partitions all of the control variates into two groups; the first K variables form a K -level approximate control variate, and the last :math:`M-K` variables are used to reduce the variance of estimating :math:`\mu_L` some :math:`L \le K` . The resulting estimator accelerates convergence to OCV-K , and L provides a degree of freedom for targeting a control variate level that contributes the greatest to the estimator variance. Parameters ---------- cov : np.ndarray (nmodels,nmodels) The estimated covariance between each model. nsample_ratios : iterable (nmodels-1) The sample ratioss :math:`r_\alpha>1` for each low-fidelity model K : integer (K<=nmodels-1) The number of effective control variates. L : integer (1<=L<=K+1) The id of the models whose mean is being targeted by the remaining nmodels-K low fidelity models. pkg : package (optional) A python package (numpy or torch) used to store the covariances. Results ------- CF : np.ndarray (nmodels-1,nmodels-1) The matrix of covariances between the discrepancies :math:`\delta` cf : np.ndarray (nmodels-1) The vector of covariances between the discrepancies and the high-fidelity model. """ nmodels = cov.shape[0] assert L<=K+1 and L>=1 and K<nmodels K,L=K-1,L-1 F = pkg.zeros((nmodels-1, nmodels-1), dtype=pkg.double) rs = nsample_ratios for ii in range(nmodels-1): if ii <= K: F[ii, ii] = (rs[ii]-1)/(rs[ii]+1e-20) else: F[ii, ii] = (rs[ii]-rs[L])/(rs[ii]*rs[L]) for jj in range(ii+1,nmodels-1): if (ii <= K) and (jj <= K): ri = min(rs[ii], rs[jj]) F[ii, jj] = (ri - 1) / (ri + 1e-20) elif (jj > K) and (ii > K): ri = min(rs[ii], rs[jj]) t1 = (rs[ii]-rs[L])*(rs[jj]-rs[L])/(rs[ii]*rs[jj]*rs[L] +1e-20) t2 = (ri - rs[L]) / (rs[ii] * rs[jj] + 1e-20) F[ii, jj] = t1 + t2 elif (ii > L) and (ii <= K) and (jj > K): F[ii, jj] = (rs[ii] - rs[L]) / (rs[ii] * rs[L] + 1e-20) elif (jj > L) and (jj <= K) and (ii > K): F[ii, jj] = (rs[jj] - rs[L]) / (rs[jj] * rs[L] + 1e-20) else: F[ii, jj] = 0.0 F[jj, ii] = F[ii, jj] CF = cov[1:,1:] * F cf = pkg.diag(F) * cov[1:, 0] return CF,cf
[docs]def get_control_variate_weights(cov): r""" Get the weights used by the control variate estimator with known low fidelity means. Parameters ---------- cov : np.ndarray (nmodels,nmodels) The estimated covariance between each model. Returns ------- weights : np.ndarray (nmodels-1) The control variate weights """ weights = -np.linalg.solve(cov[1:,1:], cov[0,1:]) return weights
[docs]def get_approximate_control_variate_weights(cov,nsample_ratios, get_discrepancy_covariances): r""" Get the weights used by the approximate control variate estimator. Parameters ---------- cov : np.ndarray (nmodels,nmodels) The estimated covariance between each model. nsample_ratios : iterable (nmodels-1) The sample ratioss :math:`r_\alpha>1` for each low-fidelity model get_discrepancy_covariances : callable Function with signature get_discrepancy_covariances(cov,nsample_ratios) which returns the covariances between the discrepancies betweem the low-fidelity models and their approximated mean. Returns ------- weights : np.ndarray (nmodels-1) The control variate weights """ CF,cf = get_discrepancy_covariances(cov,nsample_ratios) weights = -np.linalg.solve(CF, cf) return weights
[docs]def get_rsquared_acv(cov,nsample_ratios,get_discrepancy_covariances): r""" Compute r^2 used to compute the variance reduction of Approximate Control Variate Algorithms Parameters ---------- cov : np.ndarray (nmodels,nmodels) The covariance C between each of the models. The highest fidelity model is the first model, i.e its variance is cov[0,0] nsample_ratios : np.ndarray (nmodels-1) The sample ratios r used to specify the number of samples of the lower fidelity models, e.g. N_i = r_i*nhf_samples, i=1,...,nmodels-1 get_discrepancy_covariances : callable Function that returns the covariances of the control variate discrepancies. Functions must have the signature CF,cf = get_discrepancy_covariances(cov,nsample_ratios) Returns ------- rsquared : float The value r^2 """ CF,cf = get_discrepancy_covariances(cov,nsample_ratios) if type(cov)==np.ndarray: try: rsquared = np.dot(cf,np.linalg.solve(CF,cf))/cov[0, 0] except: return np.array([0.0])*nsample_ratios[0] else: try: rsquared = torch.dot(cf, torch.mv(torch.inverse(CF),cf))/cov[0, 0] except: #print("Error computing inverse of CF") return torch.tensor([0.0], dtype=torch.double)*nsample_ratios[0] return rsquared
[docs]def acv_sample_allocation_sample_ratio_constraint(ratios, *args): ind = args[0] return ratios[ind] - ratios[ind-1]
[docs]def generate_samples_and_values_acv_IS(nhf_samples,nsample_ratios, functions,generate_samples): nmodels = len(nsample_ratios)+1 if not callable(functions): assert len(functions)==nmodels samples1 = [generate_samples(nhf_samples)]*nmodels samples2 = [None]+[np.hstack( [samples1[ii+1],generate_samples(int(nhf_samples*r-nhf_samples))]) for ii,r in enumerate(nsample_ratios)] if not callable(functions): values2 = [None]+[f(s) for f,s in zip(functions[1:],samples2[1:])] values1 = [functions[0](samples1[0])] values1 += [values2[ii][:nhf_samples] for ii in range(1,nmodels)] else: nsamples2 = [0] samples_with_id = np.vstack([samples1[0],np.zeros((1,nhf_samples))]) for ii in range(1,nmodels): samples2_ii = np.vstack( [samples2[ii],ii*np.ones((1,samples2[ii].shape[1]))]) nsamples2.append(samples2[ii].shape[1]) samples_with_id = np.hstack([ samples_with_id,samples2_ii]) values_flattened = functions(samples_with_id) values1 = [values_flattened[:nhf_samples]] values2 = [None] cnt = nhf_samples for ii in range(1,nmodels): values1.append(values_flattened[cnt:cnt+nhf_samples]) values2.append(values_flattened[cnt:cnt+nsamples2[ii]]) cnt += nsamples2[ii] samples = [[s1,s2] for s1,s2 in zip(samples1,samples2)] values = [[v1,v2] for v1,v2 in zip(values1,values2)] return samples,values
[docs]def generate_samples_and_values_mlmc(nhf_samples,nsample_ratios,functions, generate_samples): r""" Parameters ========== nhf_samples : integer The number of samples of the high fidelity model nsample_ratios : np.ndarray (nmodels-1) The sample ratios r used to specify the number of samples of the lower fidelity models, e.g. N_i = r_i*nhf_samples, i=1,...,nmodels-1 functions : list of callables The functions used to evaluate each model generate_samples : callable Function used to generate realizations of the random variables Returns ======= """ nmodels = len(nsample_ratios)+1 if not callable: assert nmodels==len(functions) assert np.all(nsample_ratios>=1) samples1 = [generate_samples(nhf_samples)] samples2 = [None] prev_samples = samples1[0] for ii in range(nmodels-1): total_samples = nsample_ratios[ii] * nhf_samples assert total_samples/int(total_samples)==1.0 total_samples = int(total_samples) samples1.append(prev_samples) nnew_samples = total_samples - prev_samples.shape[1] samples2.append(generate_samples(nnew_samples)) prev_samples = samples2[-1] if not callable(functions): values1 = [functions[0](samples1[0])] values2 = [None] for ii in range(1,nmodels): values1.append(functions[ii](samples1[ii])) values2.append(functions[ii](samples2[ii])) else: samples_with_id = np.vstack([samples1[0],np.zeros((1,nhf_samples))]) nsamples1 = [nhf_samples] nsamples2 = [0] for ii in range(1,nmodels): samples1_ii = np.vstack( [samples1[ii],ii*np.ones((1,samples1[ii].shape[1]))]) samples2_ii = np.vstack( [samples2[ii],ii*np.ones((1,samples2[ii].shape[1]))]) nsamples1.append(samples1[ii].shape[1]) nsamples2.append(samples2[ii].shape[1]) samples_with_id = np.hstack([ samples_with_id,samples1_ii,samples2_ii]) values_flattened = functions(samples_with_id) values1 = [values_flattened[:nsamples1[0]]] values2 = [None] cnt = nsamples1[0] for ii in range(1,nmodels): values1.append(values_flattened[cnt:cnt+nsamples1[ii]]) cnt += nsamples1[ii] values2.append(values_flattened[cnt:cnt+nsamples2[ii]]) cnt += nsamples2[ii] samples = [[s1,s2] for s1,s2 in zip(samples1,samples2)] values = [[v1,v2] for v1,v2 in zip(values1,values2)] return samples, values
[docs]def get_mfmc_control_variate_weights(cov): weights = -cov[0,1:]/np.diag(cov[1:,1:]) return weights
[docs]def validate_nsample_ratios(nhf_samples,nsample_ratios): r""" Check that nsample_ratios* nhf_samples are all integers and that nsample_ratios are all larger than 1 """ nmodels = len(nsample_ratios)+1 assert np.all(nsample_ratios>=1) # check nhf_samples is an integer assert nhf_samples/int(nhf_samples)==1.0 # convert to int if a float because numpy random assumes nsamples # is an int nhf_samples = int(nhf_samples) nlf_samples = nhf_samples*nsample_ratios for ii in range(nmodels-1): assert np.allclose(nlf_samples[ii]/int(nlf_samples[ii]),1.0,atol=1e-5) nlf_samples = np.asarray(nlf_samples,dtype=int) return nlf_samples
[docs]def generate_samples_and_values_acv_KL(nhf_samples,nsample_ratios,functions, generate_samples,K,L): r""" K : integer (K<=nmodels-1) The number of effective control variates. L : integer (1<=L<=K+1) The id of the models whose mean is being targeted by the remaining nmodels-K low fidelity models. """ nsample_ratios = np.asarray(nsample_ratios) nlf_samples = validate_nsample_ratios(nhf_samples,nsample_ratios) nmodels = nsample_ratios.shape[0]+1 assert L<=K+1 and L>=1 and K<nmodels K,L=K-1,L-1 max_nsamples = nlf_samples.max() samples = generate_samples(max_nsamples) samples1 = [samples[:,:nhf_samples]] samples2 = [None] nprev_samples1 = nhf_samples nprev_samples_total = nhf_samples for ii in range(1,nmodels): samples1.append(samples[:,:nprev_samples1]) samples2.append(samples[:,:nlf_samples[ii-1]]) if (ii<=K): nprev_samples1 = nhf_samples else: nprev_samples1 = nlf_samples[L] nprev_samples_total= nlf_samples[ii-1] if not callable(functions): values1 = [functions[0](samples1[0])] values2 = [None] for ii in range(1,nmodels): values_ii = functions[ii](samples2[ii]) values1.append(values_ii[:samples1[ii].shape[1]]) values2.append(values_ii) else: # collect all samples assign an id and then evaluate in one batch # this can be faster if functions is something like a pool model samples_with_id = np.vstack([samples1[0],np.zeros((1,nhf_samples))]) for ii in range(1,nmodels): samples_with_id = np.hstack([ samples_with_id, np.vstack( [samples2[ii],ii*np.ones((1,samples2[ii].shape[1]))])]) assert samples_with_id.shape[1]==nhf_samples+np.sum(nlf_samples) values_flattened = functions(samples_with_id) values1 = [values_flattened[:nhf_samples]] values2 = [None] nprev_samples1 = nhf_samples nprev_samples_total = nhf_samples cnt = nhf_samples for ii in range(1,nmodels): values1.append(values_flattened[cnt:cnt+nprev_samples1]) values2.append(values_flattened[cnt:cnt+nlf_samples[ii-1]]) cnt += nlf_samples[ii-1] if (ii <= K): nprev_samples1 = nhf_samples else: nprev_samples1 = nlf_samples[L] nprev_samples_total = nlf_samples[ii-1] assert cnt==values_flattened.shape[0] samples = [[s1,s2] for s1,s2 in zip(samples1,samples2)] values = [[v1,v2] for v1,v2 in zip(values1,values2)] return samples,values
[docs]def generate_samples_and_values_mfmc(nhf_samples,nsample_ratios,functions, generate_samples,acv_modification=False): r""" Parameters ========== nhf_samples : integer The number of samples of the high fidelity model nsample_ratios : np.ndarray (nmodels-1) The sample ratios r used to specify the number of samples of the lower fidelity models, e.g. N_i = r_i*nhf_samples, i=1,...,nmodels-1 functions : list of callables The functions used to evaluate each model generate_samples : callable Function used to generate realizations of the random variables Returns ======= samples : list List containing the samples :math:`\mathcal{Z}_{i,1}` and :math:`\mathcal{Z}_{i,2}` for each model :math:`i=0,\ldots,M-1`. The list is [[:math:`\mathcal{Z}_{0,1}`,:math:`\mathcal{Z}_{0,2}`],...,[:math:`\mathcal{Z}_{M-1,1}`,:math:`\mathcal{Z}_{M-1,2}`]], where :math:`M` is the number of models values : list Model values at the points in samples """ nsample_ratios = np.asarray(nsample_ratios) nlf_samples = validate_nsample_ratios(nhf_samples,nsample_ratios) nmodels = nsample_ratios.shape[0]+1 max_nsamples = nlf_samples.max() samples = generate_samples(max_nsamples) samples1 = [samples[:,:nhf_samples]] samples2 = [None] nprev_samples = nhf_samples for ii in range(1,nmodels): samples1.append(samples[:,:nprev_samples]) samples2.append(samples[:,:nlf_samples[ii-1]]) if acv_modification: nprev_samples = nhf_samples else: nprev_samples = samples2[ii].shape[1] if not callable(functions): values1 = [functions[0](samples1[0])] values2 = [None] for ii in range(1,nmodels): values_ii = functions[ii](samples2[ii]) values1.append(values_ii[:samples1[ii].shape[1]]) values2.append(values_ii) else: # collect all samples assign an id and then evaluate in one batch # this can be faster if functions is something like a pool model samples_with_id = np.vstack([samples1[0],np.zeros((1,nhf_samples))]) for ii in range(1,nmodels): samples_with_id = np.hstack([ samples_with_id, np.vstack( [samples2[ii],ii*np.ones((1,samples2[ii].shape[1]))])]) values_flattened = functions(samples_with_id) values1 = [values_flattened[:nhf_samples]] values2 = [None] nprev_samples = nhf_samples cnt = nhf_samples for ii in range(1,nmodels): values1.append(values_flattened[cnt:cnt+nprev_samples]) values2.append(values_flattened[cnt:cnt+nlf_samples[ii-1]]) cnt += nlf_samples[ii-1] if acv_modification: nprev_samples = nhf_samples else: nprev_samples = samples2[ii].shape[1] assert cnt==values_flattened.shape[0] assert cnt==nhf_samples + np.sum(nlf_samples) samples = [[s1,s2] for s1,s2 in zip(samples1,samples2)] values = [[v1,v2] for v1,v2 in zip(values1,values2)] return samples,values
[docs]def acv_sample_allocation_cost_constraint(ratios, nhf, costs, target_cost): cost = nhf*(costs[0] + np.dot(ratios, costs[1:])) return target_cost - cost
[docs]def acv_sample_allocation_cost_constraint_all(ratios, costs, target_cost): nhf, rats = ratios[0], ratios[1:] cost = nhf*(costs[0] + np.dot(rats, costs[1:])) return target_cost - cost
[docs]def acv_sample_allocation_cost_constraint_jacobian_all(ratios, costs, target_cost): nhf, rats = ratios[0], ratios[1:] jac = costs.copy().astype(float) jac[0] += np.dot(rats, costs[1:]) jac[1:] *= nhf return -jac
[docs]def acv_sample_allocation_objective(estimator, nsample_ratios): if use_torch: ratios = torch.tensor(nsample_ratios) gamma = estimator.variance_reduction(ratios) gamma = torch.log10(gamma) return gamma.item() else: gamma = estimator.variance_reduction(ratios) gamma = np.log10(gamma) return gamma
[docs]def acv_sample_allocation_jacobian_torch(estimator, nsample_ratios): ratios = torch.tensor(nsample_ratios, dtype=torch.double) ratios.requires_grad=True gamma = estimator.variance_reduction(ratios) gamma = torch.log10(gamma) gamma.backward() grad = ratios.grad.numpy().copy() ratios.grad.zero_() return grad
[docs]def acv_sample_allocation_objective_all(estimator, x): if use_torch: xrats = torch.tensor(x, dtype=torch.double) xrats.requires_grad=True else: xrats=x nhf, ratios = xrats[0], xrats[1:] #TODO make this consistent with other objective which does not use #variance as is used below. It is necessary here because need to include #the impact of nhf on objective gamma = estimator.variance_reduction(ratios) * estimator.cov[0, 0] / nhf if use_torch: gamma = torch.log10(gamma) return gamma.item() return np.log10(gamma)
[docs]def acv_sample_allocation_jacobian_all_torch(estimator,x): xrats = torch.tensor(x, dtype=torch.double) xrats.requires_grad=True nhf, ratios = xrats[0], xrats[1:] gamma = estimator.variance_reduction(ratios)*estimator.cov[0,0]/nhf gamma = torch.log10(gamma) gamma.backward() grad = xrats.grad.numpy().copy() xrats.grad.zero_() return grad
[docs]def acv_sample_allocation_objective_all_lagrange(estimator, x): if use_torch: xrats = torch.tensor(x, dtype=torch.double) xrats.requires_grad=True nhf, ratios, lagrange_mult = xrats[0], xrats[1:-1], xrats[-1] gamma = estimator.variance_reduction(ratios)*estimator.cov[0, 0]/nhf total_cost = estimator.costs[0]*nhf + estimator.costs[1:].dot( ratios*nhf) obj = lagrange_mult*gamma+total_cost if use_torch: obj = torch.log10(obj) return obj.item() else: return np.log10(obj)
[docs]def acv_sample_allocation_jacobian_all_lagrange_torch(estimator,x): xrats = torch.tensor(x, dtype=torch.double) xrats.requires_grad=True nhf, ratios, lagrange_mult = xrats[0], xrats[1:-1], xrats[-1] gamma = estimator.variance_reduction(ratios)*estimator.cov[0,0]/nhf total_cost = estimator.costs[0]*nhf+estimator.costs[1:].dot( ratios*nhf) obj = lagrange_mult*gamma+total_cost obj = torch.log10(obj) obj.backward() grad = xrats.grad.numpy().copy() xrats.grad.zero_() return grad
[docs]def get_allocate_samples_acv_trust_region_constraints(costs,target_cost): from scipy.optimize import NonlinearConstraint nonlinear_constraint = NonlinearConstraint( partial(acv_sample_allocation_cost_constraint_all, costs=costs, target_cost=target_cost),0,0) return [nonlinear_constraint]
[docs]def solve_allocate_samples_acv_trust_region_optimization( estimator,costs,target_cost,initial_guess,optim_options): nmodels = costs.shape[0] constraints = get_allocate_samples_acv_trust_region_constraints( costs,target_cost) if optim_options is None: tol=1e-10 optim_options={'verbose': 1, 'maxiter':1000, 'gtol':tol, 'xtol':1e-4*tol, 'barrier_tol':tol} from scipy.optimize import Bounds lbs,ubs = [1]*nmodels,[np.inf]*nmodels bounds = Bounds(lbs,ubs) jac = None if use_torch: jac=estimator.jacobian opt = minimize(estimator.objective,initial_guess,method='trust-constr', jac=jac,#hess=self.objective_hessian, constraints=constraints,options=optim_options, bounds=bounds) if opt.success == False: raise Exception('Trust-constr optimizer failed') return opt
[docs]def get_initial_guess(initial_guess, cov, costs, target_cost): if initial_guess is None: nhf_samples_init, nsample_ratios_init = allocate_samples_mfmc( cov, costs, target_cost, standardize=True)[:2] initial_guess = np.concatenate( [[nhf_samples_init],nsample_ratios_init]) return initial_guess
[docs]def solve_allocate_samples_acv_slsqp_optimization( estimator,costs,target_cost,initial_guess,optim_options): nmodels = len(costs) #alex had these bounds and constraints # bounds = [(1,np.inf)] + [(2, np.inf)]*(nmodels-1) # cons = [dict({'type':'ineq', # 'fun':acv_sample_allocation_cost_constraint_all, # 'args':(costs, target_cost)})] # for jj in range(2,nmodels-1): # cons.append( dict({'type':'ineq', # 'fun':acv_sample_allocation_ratio_constraint_all, # 'args':[jj]})) if optim_options is None: optim_options = {'disp':True,'ftol':1e-8, 'maxiter':10000,'iprint':0} #set iprint=2 to printing iteration info bounds = [(1,np.inf)] + [(1.1, np.inf)]*(nmodels-1) cons = [{'type':'eq', 'fun':acv_sample_allocation_cost_constraint_all, 'jac':acv_sample_allocation_cost_constraint_jacobian_all, 'args':(np.asarray(costs), target_cost)}] jac = None if use_torch: jac=estimator.jacobian opt = minimize( estimator.objective, initial_guess, method='SLSQP',jac=jac, bounds=bounds, constraints=cons, options = optim_options) if opt.success == False: print(opt) raise Exception('SLSQP optimizer failed'+f'{opt}') return opt
[docs]def allocate_samples_acv(cov, costs, target_cost, estimator, standardize=True, initial_guess=None, optim_options=None, optim_method='SLSQP'): r""" Determine the samples to be allocated to each model Parameters ---------- cov : np.ndarray (nmodels,nmodels) The covariance C between each of the models. The highest fidelity model is the first model, i.e its variance is cov[0,0] costs : np.ndarray (nmodels) The relative costs of evaluating each model target_cost : float The total cost budget Returns ------- nhf_samples : integer The number of samples of the high fidelity model nsample_ratios : np.ndarray (nmodels-1) The sample ratios r used to specify the number of samples of the lower fidelity models, e.g. N_i=r_i*nhf_samples,i=1,...,nmodels-1 log10_variance : float The base 10 logarithm of the variance of the estimator """ initial_guess=get_initial_guess( initial_guess, cov, costs, target_cost) if optim_method=='trust-constr': opt = solve_allocate_samples_acv_trust_region_optimization( estimator,costs,target_cost,initial_guess,optim_options) else: opt = solve_allocate_samples_acv_slsqp_optimization( estimator,costs,target_cost,initial_guess,optim_options) nhf_samples, nsample_ratios = opt.x[0], opt.x[1:] if standardize: nhf_samples, nsample_ratios = standardize_sample_ratios( nhf_samples, nsample_ratios) var = estimator.get_variance(nhf_samples,nsample_ratios) log10_var = np.log10(var.item()) return nhf_samples, nsample_ratios, log10_var
[docs]def get_rsquared_acv_KL_best(cov, nsample_ratios): r""" """ nmodels = cov.shape[1] opt_rsquared = -1 KL = None for K in range(1, nmodels): for L in range(1, K+1): get_discrepancy_covariances = partial( get_discrepancy_covariances_KL,K=K,L=L) get_rsquared = partial( get_rsquared_acv, get_discrepancy_covariances=get_discrepancy_covariances) rsquared = get_rsquared(cov, nsample_ratios) #print(K,L,rsquared) if rsquared > opt_rsquared: opt_rsquared = rsquared KL = (K, L) return opt_rsquared
[docs]def allocate_samples_acv_best_kl(cov,costs,target_cost,standardize=True, initial_guess=None,optim_options=None, optim_method='SLSQP'): nmodels = len(costs) sol, KL, opt_log10_var = None, None, np.inf for K in range(1,nmodels): for L in range(1, K+1): estimator = ACVMFKL(cov, costs,target_cost, K, L) nhf_samples, nsample_ratios, log10_var = allocate_samples_acv( cov, costs, target_cost, estimator,standardize, initial_guess,optim_options,optim_method) #print("K, L = ", K, L) #print("\t ", log10_var) if log10_var < opt_log10_var: opt_log10_var = log10_var sol = (nhf_samples, nsample_ratios) KL = (K, L) return sol[0], sol[1], opt_log10_var
[docs]class ModelEnsemble(object): r""" Wrapper class to allow easy one-dimensional indexing of models in an ensemble. """ def __init__(self,functions,names=None): r""" Parameters ---------- functions : list of callable A list of functions defining the model ensemble. The functions must have the call signature values=function(samples) """ self.functions=functions self.nmodels = len(self.functions) if names is None: names = ['f%d'%ii for ii in range(self.nmodels)] self.names=names
[docs] def __call__(self,samples): r""" Evaluate a set of models at a set of samples Parameters ---------- samples : np.ndarray (nvars+1,nsamples) Realizations of a multivariate random variable each with an additional scalar model id indicating which model to evaluate. Returns ------- values : np.ndarray (nsamples,nqoi) The values of the models at samples """ model_ids = samples[-1,:] #print(model_ids.max(),self.nmodels) assert model_ids.max()<self.nmodels active_model_ids = np.unique(model_ids).astype(int) active_model_id=active_model_ids[0] I = np.where(model_ids==active_model_id)[0] values_0 = self.functions[active_model_id](samples[:-1,I]) assert values_0.ndim==2 nqoi = values_0.shape[1] values = np.empty((samples.shape[1],nqoi)) values[I,:]=values_0 for ii in range(1,active_model_ids.shape[0]): active_model_id=active_model_ids[ii] I = np.where(model_ids==active_model_id)[0] values[I] = self.functions[active_model_id](samples[:-1,I]) return values
[docs]def estimate_model_ensemble_covariance(npilot_samples,generate_samples, model_ensemble): r""" Estimate the covariance of a model ensemble from a set of pilot samples Parameters ---------- npilot_samples : integer The number of samples used to estimate the covariance generate_samples : callable Function used to generate realizations of the random variables with call signature samples = generate_samples(npilot_samples) model_emsemble : callable Function that takes a set of samples and models ids and evaluates a set of models. See ModelEnsemble. call signature values = model_emsemble(samples) Returns ------- cov : np.ndarray (nqoi,nqoi) The covariance between the model qoi pilot_random_samples : np.ndarray (nvars,npilot_samples) The random samples used to compute the covariance. These samples DO NOT have a model id pilot_values : np.ndaray (npilot_samples,nmodels) The values of each model at the pilot samples """ # generate pilot samples pilot_random_samples = generate_samples(npilot_samples) config_vars = np.arange(model_ensemble.nmodels)[np.newaxis,:] # append model ids to pilot smaples pilot_samples = get_all_sample_combinations( pilot_random_samples,config_vars) # evaluate models at pilot samples pilot_values = model_ensemble(pilot_samples) pilot_values = np.reshape( pilot_values,(npilot_samples,model_ensemble.nmodels)) # compute covariance cov = np.cov(pilot_values,rowvar=False) return cov, pilot_random_samples, pilot_values
[docs]class ACVMF(object): def __init__(self,cov,costs): self.cov=cov self.costs=costs if use_torch: self.cov=torch.tensor(np.copy(self.cov), dtype=torch.double) self.costs=torch.tensor(np.copy(self.costs), dtype=torch.double) #self.objective_fun = partial( # acv_sample_allocation_objective,self) #self.jacobian_fun = partial( # acv_sample_allocation_jacobian,self) self.objective_fun_all = partial( acv_sample_allocation_objective_all,self) if use_torch: self.jacobian_fun_all = partial( acv_sample_allocation_jacobian_all_torch,self) else: self.jacobian_fun_all = None
[docs] def get_rsquared(self,nsample_ratios): if use_torch: pkg=torch else: pkg=np rsquared = get_rsquared_acv( self.cov,nsample_ratios, partial(get_discrepancy_covariances_MF,pkg=pkg)) try: return rsquared.numpy() except: return rsquared
[docs] def variance_reduction(self,nsample_ratios): """ This is not the variance reduction relative to the equivalent Monte Carlo estimator. A variance reduction can be smaller than one and still correspond to a multi-fidelity estimator that has a larger variance than the single fidelity Monte Carlo that uses the equivalent number of high-fidelity samples """ return 1-self.get_rsquared(nsample_ratios)
[docs] def objective(self,x): return self.objective_fun_all(x)
[docs] def jacobian(self,x): return self.jacobian_fun_all(x)
[docs] def allocate_samples(self,target_cost,**kwargs): return allocate_samples_acv(self.cov, self.costs, target_cost, self, **kwargs)
[docs] def get_nsamples(self,nhf_samples,nsample_ratios): return np.concatenate([[nhf_samples],nsample_ratios*nhf_samples])
[docs] def get_variance(self,nhf_samples,nsample_ratios): gamma = self.variance_reduction(nsample_ratios) return gamma*self.get_covariance()[0,0]/nhf_samples
[docs] def generate_data(self,nhf_samples,nsample_ratios,generate_samples, model_ensemble): return generate_samples_and_values_mfmc( nhf_samples,nsample_ratios,model_ensemble, generate_samples,acv_modification=True)
[docs] def get_covariance(self): try: return self.cov.numpy() except: return self.cov
[docs] def __call__(self,values): eta = get_mfmc_control_variate_weights(self.get_covariance()) return compute_approximate_control_variate_mean_estimate(eta,values)
[docs]class MC(object): def __init__(self,cov,costs): self.costs=costs self.cov=cov
[docs] def get_variance(self,nhf_samples,nsample_ratios): return self.cov[0,0]/nhf_samples
[docs] def allocate_samples(self,target_cost): return np.floor(target_cost/self.costs[0]),None,None
[docs] def get_nsamples(self,nhf_samples,nsample_ratios): return np.concatenate([[nhf_samples],np.zeros(self.cov.shape[0]-1)])
[docs] def generate_data(self,nhf_samples,nsample_ratios,generate_samples, model_ensemble): samples = generate_samples(int(nhf_samples)) if not callable(model_ensemble): values = model_ensemble[0](samples) else: samples_with_id=np.vstack([samples,np.zeros((1,int(nhf_samples)))]) values = model_ensemble(samples_with_id) return samples, values
[docs] def __call__(self,values): return values.mean()
[docs]class ACVMFKL(ACVMF): def __init__(self,cov,costs,target_cost,K,L): self.K, self.L = K, L super().__init__(cov, costs)
[docs] def get_rsquared(self,nsample_ratios): if use_torch: pkg=torch else: pkg=np return get_rsquared_acv( self.cov,nsample_ratios, partial(get_discrepancy_covariances_KL,K=self.K,L=self.L, pkg=pkg))
[docs]class ACVMFKLBest(ACVMF):
[docs] def get_rsquared(self,nsample_ratios): return get_rsquared_acv_KL_best(self.cov, nsample_ratios)
[docs] def allocate_samples(self,target_cost): return allocate_samples_acv_best_kl( self.cov,self.costs,target_cost,standardize=True, initial_guess=None,optim_options=None, optim_method='SLSQP')
[docs]class MFMC(ACVMF): def __init__(self,cov,costs): super().__init__(cov, costs)
[docs] def get_rsquared(self,nsample_ratios): rsquared = get_rsquared_mfmc(self.get_covariance(),nsample_ratios) return rsquared
[docs] def allocate_samples(self,target_cost): return allocate_samples_mfmc( self.get_covariance(), self.costs, target_cost)
[docs]class MLMC(ACVMF):
[docs] def use_lagrange_formulation(self,flag): r"""For testing purposes only""" if flag: self.objective_fun_all = partial( acv_sample_allocation_objective_all_lagrange,self) self.jacobian_fun_all = partial( acv_sample_allocation_jacobian_all_lagrange_torch,self) else: self.objective_fun_all = partial( acv_sample_allocation_objective_all,self) self.jacobian_fun_all = partial( acv_sample_allocation_jacobian_all_torch,self) if not use_torch: self.jacobian_fun_all = None
[docs] def get_rsquared(self,nsample_ratios): if use_torch: pkg=torch else: pkg=np rsquared = get_rsquared_mlmc(self.cov,nsample_ratios,pkg) if use_torch: rsquared=rsquared.numpy() return rsquared
[docs] def allocate_samples(self,target_cost): return allocate_samples_mlmc(self.cov, self.costs, target_cost)
[docs]def compute_single_fidelity_and_approximate_control_variate_mean_estimates( nhf_samples,nsample_ratios, model_ensemble,generate_samples, generate_samples_and_values,cov, get_cv_weights,seed): r""" Compute the approximate control variate estimate of a high-fidelity model from using it and a set of lower fidelity models. Also compute the single fidelity Monte Carlo estimate of the mean from only the high-fidelity data. Notes ----- To create reproducible results when running numpy.random in parallel must use RandomState. If not the results will be non-deterministic. This is happens because of a race condition. numpy.random.* uses only one global PRNG that is shared across all the threads without synchronization. Since the threads are running in parallel, at the same time, and their access to this global PRNG is not synchronized between them, they are all racing to access the PRNG state (so that the PRNG's state might change behind other threads' backs). Giving each thread its own PRNG (RandomState) solves this problem because there is no longer any state that's shared by multiple threads without synchronization. Also see new features https://docs.scipy.org/doc/numpy/reference/random/parallel.html https://docs.scipy.org/doc/numpy/reference/random/multithreading.html """ random_state = np.random.RandomState(seed) local_generate_samples = partial( generate_samples, random_state=random_state) samples,values =generate_samples_and_values( nhf_samples,nsample_ratios,model_ensemble,local_generate_samples) # compute mean using only hf data hf_mean = values[0][0].mean() # compute ACV mean eta = get_cv_weights(cov,nsample_ratios) acv_mean = compute_approximate_control_variate_mean_estimate(eta,values) return hf_mean, acv_mean
[docs]def estimate_variance_reduction(model_ensemble, cov, generate_samples, allocate_samples,generate_samples_and_values, get_cv_weights,get_rsquared=None, ntrials=1e3,max_eval_concurrency=1, target_cost=None, costs=None): r""" Numerically estimate the variance of an approximate control variate estimator and compare its value to the estimator using only the high-fidelity data. Parameters ---------- ntrials : integer The number of times to compute estimator using different randomly generated set of samples max_eval_concurrency : integer The number of processors used to compute realizations of the estimators, which can be run independently and in parallel. """ M = cov.shape[0]-1 # number of lower fidelity models if costs is None: costs = np.asarray([100//2**ii for ii in range(M+1)]) if target_cost is None: target_cost = int(1e4) nhf_samples,nsample_ratios = allocate_samples( cov, costs, target_cost)[:2] ntrials = int(ntrials) from multiprocessing import Pool pool = Pool(max_eval_concurrency) func = partial( compute_single_fidelity_and_approximate_control_variate_mean_estimates, nhf_samples,nsample_ratios,model_ensemble,generate_samples, generate_samples_and_values,cov,get_cv_weights) if max_eval_concurrency>1: assert int(os.environ['OMP_NUM_THREADS'])==1 means = np.asarray(pool.map(func,[ii for ii in range(ntrials)])) else: means = np.empty((ntrials,2)) for ii in range(ntrials): means[ii,:] = func(ii) numerical_var_reduction=means[:,1].var(axis=0)/means[:,0].var(axis=0) if get_rsquared is not None: true_var_reduction = 1-get_rsquared(cov[:M+1,:M+1],nsample_ratios) return means, numerical_var_reduction, true_var_reduction return means, numerical_var_reduction
[docs]def get_mfmc_control_variate_weights_pool_wrapper(cov,nsamples): r""" Create interface that adhears to assumed api for variance reduction check cannot be defined as a lambda locally in a test when using with multiprocessing pool because python cannot pickle such lambda functions """ return get_mfmc_control_variate_weights(cov)
[docs]def get_mlmc_control_variate_weights_pool_wrapper(cov,nsamples): r""" Create interface that adhears to assumed api for variance reduction check cannot be defined as a lambda locally in a test when using with multiprocessing pool because python cannot pickle such lambda functions """ return get_mlmc_control_variate_weights(cov.shape[0])
[docs]def plot_acv_sample_allocation(nsamples_history,costs,labels,ax): def autolabel(ax,rects,labels): #Attach a text label in each bar in *rects* for rect,label in zip(rects,labels): ax.annotate(label, xy=(rect.get_x() + rect.get_width()/2, rect.get_y() + rect.get_height()/2), xytext=(0, -10), # 3 points vertical offset textcoords="offset points", ha='center', va='bottom') nsamples_history = np.asarray(nsamples_history) xlocs = np.arange(nsamples_history.shape[0]) nmodels = nsamples_history.shape[1] cnt = 0 total_costs = nsamples_history.dot(costs) for ii in range(nmodels): rel_cost = nsamples_history[:,ii]*costs[ii] rel_cost /= total_costs rects = ax.bar(xlocs,rel_cost,bottom=cnt,edgecolor='white', label=labels[ii]) autolabel(ax,rects,['$%d$'%int(n) for n in nsamples_history[:,ii]]) cnt+=rel_cost ax.set_xticks(xlocs) ax.set_xticklabels(['$%d$'%t for t in total_costs]) ax.set_xlabel(r'$\mathrm{Total}\;\mathrm{Cost}$') ax.set_ylabel(r'$\mathrm{Percentage}\;\mathrm{of}\;\mathrm{Total}\;\mathrm{Cost}$')# / $N_\alpha$') ax.legend(loc=[0.925,0.25])
from pyapprox.probability_measure_sampling import \ generate_independent_random_samples from pyapprox.utilities import get_correlation_from_covariance
[docs]def get_pilot_covariance(nmodels,variable,model_ensemble,npilot_samples): """ Parameters ---------- nmodels : integer The number of information sources variable : :class:`pyapprox.variable.IndependentMultivariateRandomVariable` Object defining the nvar uncertain random variables. Samples will be drawn from its joint density. model_ensemble : callable Function with signature ``model_ensemble(samples) -> np.ndarray (nsamples,1)`` where samples is a np.ndarray with shape (nvars+1,nsamples) npilot_samples : integer The number of samples used to compute correlations Returns ------- cov_matrix : np.ndarray (nmodels,nmodels) The covariance between each information source pilot_samples : np.ndarray (nvars+1,nsamples) The samples used to evaluate each information source when computing correlations pilot_values : np.ndarray (nsamples,nmodels) The values of each information source at the pilot samples """ pilot_samples = generate_independent_random_samples( variable,npilot_samples) config_vars = np.arange(nmodels)[np.newaxis,:] pilot_samples = get_all_sample_combinations( pilot_samples,config_vars) pilot_values = model_ensemble(pilot_samples) pilot_values = np.reshape( pilot_values,(npilot_samples,model_ensemble.nmodels)) cov_matrix = np.cov(pilot_values,rowvar=False) return cov_matrix,pilot_samples,pilot_values
[docs]def bootstrap_monte_carlo_estimator(values,nbootstraps=10,verbose=True): """ Approxiamte the variance of the Monte Carlo estimate of the mean using bootstraping Parameters ---------- values : np.ndarry (nsamples,1) The values used to compute the mean nbootstraps : integer The number of boostraps used to compute estimator variance verbose: If True print the estimator mean and +/- 2 standard deviation interval Returns ------- bootstrap_mean : float The bootstrap estimate of the estimator mean bootstrap_variance : float The bootstrap estimate of the estimator variance """ values = values.squeeze() assert values.ndim==1 nsamples = values.shape[0] bootstrap_values = np.random.choice( values,size=(nsamples,nbootstraps),replace=True) bootstrap_means = bootstrap_values.mean(axis=0) bootstrap_mean = bootstrap_means.mean() bootstrap_variance = np.var(bootstrap_means) if verbose: print('No. samples', values.shape[0]) print('Mean',bootstrap_mean) print('Mean +/- 2 sigma', [bootstrap_mean-2*np.sqrt(bootstrap_variance),bootstrap_mean+2*np.sqrt(bootstrap_variance)]) return bootstrap_mean, bootstrap_variance
[docs]def bootstrap_mfmc_estimator(values,weights,nbootstraps=10, verbose=True,acv_modification=True): r""" Boostrap the approximate MFMC estimate of the mean of high-fidelity data with low-fidelity models with unknown means Parameters ---------- values : list (nmodels) The evaluations of each information source seperated in form necessary for control variate estimators. Each entry of the list contains values0 : np.ndarray (num_samples_i0,num_qoi) Evaluations of each model used to compute the estimator :math:`Q_{i,N}` of values1: np.ndarray (num_samples_i1,num_qoi) Evaluations used compute the approximate mean :math:`\mu_{i,r_iN}` of the low fidelity models. weights : np.ndarray (nmodels-1) The control variate weights nbootstraps : integer The number of boostraps used to compute estimator variance verbose: If True print the estimator mean and +/- 2 standard deviation interval Returns ------- bootstrap_mean : float The bootstrap estimate of the estimator mean bootstrap_variance : float The bootstrap estimate of the estimator variance """ assert acv_modification nmodels = len(values) assert len(values)==nmodels # high fidelity monte carlo estimate of mean bootstrap_means = [] for jj in range(nbootstraps): vals = values[0][0]; nhf_samples=vals.shape[0] I1 = np.random.choice( np.arange(nhf_samples),size=(nhf_samples),replace=True) est = vals[I1].mean() nprev_samples=nhf_samples for ii in range(nmodels-1): vals1 = values[ii+1][0]; nsamples1=vals1.shape[0] vals2 = values[ii+1][1]; nsamples2=vals2.shape[0] assert nsamples1==nhf_samples I2 = np.random.choice( np.arange(nhf_samples,nsamples2),size=(nsamples2-nhf_samples), replace=True) #maks sure same shared samples are still used. vals2_boot = np.vstack([vals2[I1],vals2[I2]]) est += weights[ii]*(vals1[I1].mean()-vals2_boot.mean()) if acv_modification: nprev_samples = nhf_samples else: nprev_samples=nsamples2 bootstrap_means.append(est) bootstrap_means = np.array(bootstrap_means) bootstrap_mean = np.mean(bootstrap_means) bootstrap_variance = np.var(bootstrap_means) return bootstrap_mean, bootstrap_variance
[docs]def compute_covariance_from_control_variate_samples(values): r""" Compute the covariance between information sources from a set of evaluations of each information source. Parameters ---------- values : list (nmodels) The evaluations of each information source seperated in form necessary for control variate estimators. Each entry of the list contains values0 : np.ndarray (num_samples_i0,num_qoi) Evaluations of each model used to compute the estimator :math:`Q_{i,N}` of values1: np.ndarray (num_samples_i1,num_qoi) Evaluations used compute the approximate mean :math:`\mu_{i,r_iN}` of the low fidelity models. Returns ------- cov : np.ndarray (nmodels) The covariance between the information sources """ shared_samples_values = np.hstack( [v[0].squeeze()[:,np.newaxis] for v in values]) cov = np.cov(shared_samples_values,rowvar=False) #print(cov,'\n',cov_matrix) return cov
[docs]def compare_estimator_variances(target_costs,estimators,cov_matrix,model_costs): variances, nsamples_history = [],[] for target_cost in target_costs: for estimator in estimators: est = estimator(cov_matrix,model_costs) nhf_samples,nsample_ratios = est.allocate_samples(target_cost)[:2] variances.append(est.get_variance(nhf_samples,nsample_ratios)) nsamples_history.append( est.get_nsamples(nhf_samples,nsample_ratios)) variances = np.asarray(variances) nsamples_history = np.asarray(nsamples_history) return nsamples_history, variances
[docs]def plot_estimator_variances(nsamples_history, variances, model_costs, est_labels, ax): linestyles=['-','--',':','-.'] nestimators = len(est_labels) assert len(nsamples_history)==len(variances) assert len(nsamples_history)%nestimators==0 for ii in range(nestimators): est_total_costs = np.array(nsamples_history[ii::nestimators]).dot( model_costs) est_variances = variances[ii::nestimators] ax.loglog(est_total_costs,est_variances,':',label=est_labels[ii], ls=linestyles[ii]) ax.set_xlabel(r'$\mathrm{Target\;Cost}$') ax.set_ylabel(r'$\mathrm{Estimator\;Variance}$') ax.legend()