Source code for pyapprox.multifidelity.factory

import copy
from itertools import combinations
from functools import partial
from multiprocessing import Pool

import numpy as np

from pyapprox.multifidelity.acv import (
    CVEstimator, MLMCEstimator, MFMCEstimator, GMFEstimator, GISEstimator,
    GRDEstimator, MCEstimator, ACVEstimator,
    log_trace_variance, determinant_variance)
from pyapprox.multifidelity.stats import (
    MultiOutputMean, MultiOutputVariance, MultiOutputMeanAndVariance,
    _nqoi_nqoi_subproblem)
from pyapprox.multifidelity.groupacv import MLBLUEEstimator
from pyapprox.util.utilities import get_all_sample_combinations
from pyapprox.multifidelity.etc import AETCBLUE


class BestEstimator():
    def __init__(self, est_types, stat, costs, max_nmodels, **est_kwargs):
        """
        Parameters
        ----------
        estimator_types : list or string
            List of strings of each estimator type to compute, e.g. [gmf, grd]
        """
        self.best_est = None

        self._estimator_types = est_types
        self._stat = stat
        self._candidate_costs = np.asarray(costs)
        # self._ncandidate_nmodels is the number of total models
        self._ncandidate_models = len(self._candidate_costs)
        self._lf_model_indices = np.arange(1, self._ncandidate_models)
        self._nqoi = stat._nqoi
        if max_nmodels is not None and max_nmodels < 2:
            raise ValueError("Ensure max_nmodels > 1")
        self._max_nmodels = max_nmodels
        self._allow_failures = est_kwargs.get("allow_failures", False)
        if "allow_failures" in est_kwargs:
            del est_kwargs["allow_failures"]
        self._kwargs = est_kwargs
        self._best_model_indices = None
        self._all_model_labels = None

        self._save_candidate_estimators = est_kwargs.pop(
            "save_candidate_estimators", False)
        self._candidate_estimators = None

    @property
    def model_labels(self):
        return [self._all_model_labels[idx]
                for idx in self._best_model_indices]

    @model_labels.setter
    def model_labels(self, labels):
        self._all_model_labels = labels

    def _validate_kwargs(self, nsubset_lfmodels):
        sub_kwargs = copy.deepcopy(self._kwargs)
        if "recursion_index" in sub_kwargs:
            index = sub_kwargs["recursion_index"]
            if (np.allclose(index, np.arange(len(index))) or
                    np.allclose(index, np.zeros(len(index)))):
                sub_kwargs["recursion_index"] = index[:nsubset_lfmodels]
            else:
                msg = "model selection can only be used with recursion indices"
                msg += " (0, 1, 2, ...) or (0, ..., 0) or tree_depth is"
                msg += " not None"
                # There is no logical way to reduce a recursion index to use
                # a subset of model unless they are one of these two indices
                # or tree_depth is not None so that all possible recursion
                # indices are considered
                raise ValueError(msg)
        if "tree_depth" in sub_kwargs:
            sub_kwargs["tree_depth"] = min(
                sub_kwargs["tree_depth"], nsubset_lfmodels)
        return sub_kwargs

    def _get_estimator(self, est_type, subset_costs,
                       target_cost, sub_stat, sub_kwargs, optim_options):
        try:
            est = get_estimator(est_type, sub_stat, subset_costs, **sub_kwargs)
        except ValueError as e:
            if optim_options.get("verbosity", 0) > 0:
                print(e)
            # Some estimators, e.g. MFMC, fail when certain criteria
            # are not satisfied
            return None
        try:
            est.allocate_samples(target_cost, optim_options)
            return est
        except (RuntimeError, ValueError) as e:
            if optim_options.get("verbosity", 0) > 0:
                print(e)
            if self._allow_failures:
                return None
            raise e

    def _get_model_subset_estimator(
            self, qoi_idx, nsubset_lfmodels, optim_options,
            target_cost, lf_model_subset_indices):
        """
        Compute estimator covariance for all estimator types in
        self._estimator_types
        """
        idx = np.hstack(([0], lf_model_subset_indices)).astype(int)
        subset_costs = self._candidate_costs[idx]
        sub_stat = self._stat.__class__(self._stat._nqoi)
        sub_stat.set_pilot_quantities(*self._stat.get_pilot_quantities_subset(
            self._ncandidate_models, self._nqoi, idx))
        sub_kwargs = self._validate_kwargs(nsubset_lfmodels)

        best_est = None
        best_criteria = np.inf
        for est_type in self._estimator_types:
            est = self._get_estimator(
                est_type, subset_costs, target_cost,
                sub_stat, sub_kwargs, optim_options)
            if optim_options.get("verbosity", 0) > 1:
                msg = "\t Models {0}".format(idx)
                print(msg)
            if self._save_candidate_estimators:
                self._candidate_estimators.append(
                    [est, lf_model_subset_indices])
            if est is not None and est._optimized_criteria < best_criteria:
                best_est = est
                if hasattr(est._optimized_criteria, "item"):
                    best_criteria = est._optimized_criteria.item()
                else:
                    best_criteria = est._optimized_criteria
        return best_est

    def _get_best_model_subset_for_estimator_pool(
            self, nsubset_lfmodels, target_cost,
            best_criteria, best_model_indices, best_est, nprocs,
            optim_options):
        """
        Compute estimator covariances for all model subets
        """
        qoi_idx = np.arange(self._nqoi)
        pool = Pool(nprocs)
        indices = list(
            combinations(self._lf_model_indices, nsubset_lfmodels))
        result = pool.map(
            partial(self._get_model_subset_estimator,
                    qoi_idx, nsubset_lfmodels, optim_options,
                    target_cost), indices)
        pool.close()
        criteria = [
            np.array(est._optimized_criteria)
            if est is not None else np.inf for est in result]
        II = np.argmin(criteria)
        if not np.isfinite(criteria[II]):
            best_est = None
        else:
            best_est = result[II]
            best_model_indices = np.hstack(
                ([0], indices[II])).astype(int)
            best_criteria = best_est._optimized_criteria
        return best_criteria, best_model_indices, best_est

    def _get_best_model_subset_for_estimator_serial(
            self, nsubset_lfmodels, target_cost,
            best_criteria, best_model_indices, best_est, optim_options):
        qoi_idx = np.arange(self._nqoi)
        for lf_model_subset_indices in combinations(
                self._lf_model_indices, nsubset_lfmodels):
            est = self._get_model_subset_estimator(
                qoi_idx, nsubset_lfmodels, optim_options,
                target_cost, lf_model_subset_indices)
            if est is not None and est._optimized_criteria < best_criteria:
                best_est = est
                best_model_indices = np.hstack(
                    ([0], lf_model_subset_indices)).astype(int)
                best_criteria = best_est._optimized_criteria
        return best_criteria, best_model_indices, best_est

    def _get_best_estimator(self, target_cost, optim_options):
        best_criteria = np.inf
        best_est, best_model_indices = None, None
        nprocs = optim_options.get("nprocs", 1)

        if optim_options.get("verbosity", 0) > 0:
            print(f"Finding best model using {nprocs} processors")
        if "nprocs" in optim_options:
            del optim_options["nprocs"]

        if self._max_nmodels is None:
            min_nlfmodels = self._ncandidate_models-1
            max_nmodels = self._ncandidate_models
        else:
            min_nlfmodels = 1
            max_nmodels = self._max_nmodels

        best_est = None
        best_criteria = np.inf
        for nsubset_lfmodels in range(min_nlfmodels, max_nmodels):
            if nprocs > 1:
                criteria, model_indices, est = (
                    self._get_best_model_subset_for_estimator_pool(
                        nsubset_lfmodels, target_cost,
                        best_criteria, best_model_indices, best_est, nprocs,
                        optim_options))
            else:
                criteria, model_indices, est = (
                    self._get_best_model_subset_for_estimator_serial(
                        nsubset_lfmodels, target_cost,
                        best_criteria, best_model_indices, best_est,
                        optim_options))
            if optim_options.get("verbosity", 0) > 0:
                msg = "\t No of lf models {0}: {1} best criteria {2}".format(
                    nsubset_lfmodels, est, best_criteria)
                print(msg)
            if criteria < best_criteria:
                best_criteria = criteria
                best_est = est
                best_model_indices = model_indices

        if best_est is None:
            raise RuntimeError("No solutions found for any model subset")
        return best_est, best_model_indices

    def allocate_samples(self, target_cost, optim_options={}):
        if self._save_candidate_estimators:
            self._candidate_estimators = []
        best_est, best_model_indices = self._get_best_estimator(
            target_cost, optim_options)
        self.best_est = best_est
        self._best_model_indices = best_model_indices
        self._set_best_est_attributes()

    def _set_best_est_attributes(self):
        # allow direct access of important self.best_est attributes
        # __call__ cannot be set using this approach.
        attr_list = [
            # public functions
            "combine_acv_samples",
            "combine_acv_values",
            "generate_samples_per_model",
            "insert_pilot_values",
            "bootstrap",
            "plot_allocation",
            # private functions and variables
            "_separate_values_per_model",
            "_covariance_from_npartition_samples",
            "_covariance_from_partition_ratios",
            "_rounded_partition_ratios", "_stat",
            "_nmodels", "_cov", "_rounded_npartition_samples",
            "_rounded_nsamples_per_model", "_costs",
            "_optimized_criteria", "_get_discrepancy_covariances",
            "_rounded_target_cost",
            "_get_allocation_matrix",
            "_optimization_criteria",
            "_optimized_covariance",
            "_allocation_mat",
            "_npartitions"]
        for attr in attr_list:
            # Improve this function so that it works without the following
            # hack for MLBLUE
            if hasattr(self.best_est, attr):
                setattr(self, attr, getattr(self.best_est, attr))

    def __repr__(self):
        if self._optimized_criteria is None:
            return "{0}".format(self.__class__.__name__)
        return "{0}(est={1}, subset={2})".format(
            self.__class__.__name__, self.best_est, self._best_model_indices)

    def __call__(self, values):
        return self.best_est(values)


multioutput_estimators = {
    "cv": CVEstimator,
    "gmf": GMFEstimator,
    "gis": GISEstimator,
    "grd": GRDEstimator,
    "mfmc": MFMCEstimator,
    "mlmc": MLMCEstimator,
    "mc": MCEstimator,
    "mlblue": MLBLUEEstimator}


multioutput_stats = {
    "mean": MultiOutputMean,
    "variance": MultiOutputVariance,
    "mean_variance": MultiOutputMeanAndVariance,
}


[docs]def get_estimator(estimator_types, stat, costs, max_nmodels=None, **est_kwargs): """ Parameters ---------- estimator_types : list [str] or str If str (or len(estimators_types==1), then return the estimator named estimator_type (or estimator_types[0]) stat_type : str The type of statistics to compute costs : np.ndarray (nmodels) The computational cost of evaluating each model stat_args : list or tuple The arguments that are needed to compute the statistic max_nmodels : integer If None, compute the estimator using all the models. If not None, find the model subset that uses at most max_nmodels that minimizes the estimator covariance. est_kwargs : dict Keyword arguments that will be passed when creating each estimator. """ if isinstance(estimator_types, list) or max_nmodels is not None: if not isinstance(estimator_types, list): estimator_types = [estimator_types] return BestEstimator( estimator_types, stat, costs, max_nmodels, **est_kwargs) if isinstance(estimator_types, list): estimator_type = estimator_types[0] else: estimator_type = estimator_types if estimator_type not in multioutput_estimators: msg = f"Estimator {estimator_type} not supported. " msg += f"Must be one of {multioutput_estimators.keys()}" raise ValueError(msg) return multioutput_estimators[estimator_type]( stat, costs, **est_kwargs)
def _estimate_components(variable, est, funs, ii): """ Notes ----- To create reproducible results when running numpy.random in parallel must use RandomState. If not the results will be non-deterministic. This is happens because of a race condition. numpy.random.* uses only one global PRNG that is shared across all the threads without synchronization. Since the threads are running in parallel, at the same time, and their access to this global PRNG is not synchronized between them, they are all racing to access the PRNG state (so that the PRNG's state might change behind other threads' backs). Giving each thread its own PRNG (RandomState) solves this problem because there is no longer any state that's shared by multiple threads without synchronization. Also see new features https://docs.scipy.org/doc/numpy/reference/random/parallel.html https://docs.scipy.org/doc/numpy/reference/random/multithreading.html """ random_states = [np.random.RandomState(ii*variable.num_vars()+jj) for jj in range(variable.num_vars())] samples_per_model = est.generate_samples_per_model( partial(variable.rvs, random_states=random_states)) values_per_model = [ fun(samples) for fun, samples in zip(funs, samples_per_model)] mc_est = est._stat.sample_estimate if ((isinstance(est, ACVEstimator) or isinstance(est, BestEstimator))): # the above condition does not allow BestEstimator to be # applied to CVEstimator est_val = est(values_per_model) acv_values = est._separate_values_per_model(values_per_model) Q = mc_est(acv_values[1]) delta = np.hstack([mc_est(acv_values[2*ii]) - mc_est(acv_values[2*ii+1]) for ii in range(1, est._nmodels)]) elif isinstance(est, CVEstimator): est_val = est(values_per_model) Q = mc_est(values_per_model[0]) delta = np.hstack( [mc_est(values_per_model[ii]) - est._lowfi_stats[ii-1] for ii in range(1, est._nmodels)]) else: est_val = est(values_per_model[0]) Q = mc_est(values_per_model[0]) delta = Q*0 return est_val, Q, delta def _estimate_components_loop( variable, ntrials, est, funs, max_eval_concurrency): if max_eval_concurrency == 1: Q = [] delta = [] estimator_vals = [] for ii in range(ntrials): est_val, Q_val, delta_val = _estimate_components( variable, est, funs, ii) estimator_vals.append(est_val) Q.append(Q_val) delta.append(delta_val) Q = np.array(Q) delta = np.array(delta) estimator_vals = np.array(estimator_vals) return estimator_vals, Q, delta # set flat funs to none so funs can be pickled pool = Pool(max_eval_concurrency) func = partial(_estimate_components, variable, est, funs) result = pool.map(func, list(range(ntrials))) pool.close() estimator_vals = np.asarray([r[0] for r in result]) Q = np.asarray([r[1] for r in result]) delta = np.asarray([r[2] for r in result]) return estimator_vals, Q, delta def numerically_compute_estimator_variance( funs, variable, est, ntrials=int(1e3), max_eval_concurrency=1, return_all=False): r""" Numerically estimate the variance of an approximate control variate estimator. Parameters ---------- funs : list [callable] List of functions with signature `fun(samples) -> np.ndarray (nsamples, nqoi)` where samples has shape (nvars, nsamples) est : :class:`pyapprox.multifidelity.multioutput_monte_carlo.MCEstimator` A Monte Carlo like estimator for computing sample based statistics ntrials : integer The number of times to compute estimator using different randomly generated set of samples max_eval_concurrency : integer The number of processors used to compute realizations of the estimators which can be run independently and in parallel. Returns ------- hf_covar_numer : np.ndarray (nstats, nstats) The estimator covariance of the single high-fidelity Monte Carlo estimator hf_covar : np.ndarray (nstats, nstats) The analytical value of the estimator covariance of the single high-fidelity Monte Carlo estimator covar_numer : np.ndarray (nstats, nstats) The estimator covariance of est hf_covar : np.ndarray (nstats, nstats) The analytical value of the estimator covariance of est est_vals : np.ndarray (ntrials, nstats) The values for the est for each trial. Only returned if return_all=True Q0 : np.ndarray (ntrials, nstats) The values for the single fidelity MC estimator for each trial. Only returned if return_all=True delta : np.ndarray (ntrials, nstats) The values for the differences between the low-fidelty estimators :math:`\mathcal{Z}_\alpha` and :math:`\mathcal{Z}_\alpha^*` for each trial. Only returned if return_all=True """ ntrials = int(ntrials) est_vals, Q0, delta = _estimate_components_loop( variable, ntrials, est, funs, max_eval_concurrency) hf_covar_numer = np.cov(Q0, ddof=1, rowvar=False) hf_covar = est._stat.high_fidelity_estimator_covariance( est._rounded_npartition_samples[0]) covar_numer = np.cov(est_vals, ddof=1, rowvar=False) covar = est._covariance_from_npartition_samples( est._rounded_npartition_samples).numpy() if not return_all: return hf_covar_numer, hf_covar, covar_numer, covar return hf_covar_numer, hf_covar, covar_numer, covar, est_vals, Q0, delta def compare_estimator_variances(target_costs, estimators, optim_opts={}): """ Compute the variances of different Monte-Carlo like estimators. Parameters ---------- target_costs : np.ndarray (ntarget_costs) Different total cost budgets estimators : list (nestimators) List of Monte Carlo estimator objects, e.g. :class:`~pyapprox.multifidelity.multioutput_monte_carlo.MCEstimator` Returns ------- optimized_estimators : list Each entry is a list of optimized estimators for a set of target costs """ optimized_estimators = [] for est in estimators: est_copies = [] for target_cost in target_costs: est_copy = copy.deepcopy(est) est_copy.allocate_samples( target_cost, optim_options=optim_opts) est_copies.append(est_copy) optimized_estimators.append(est_copies) return optimized_estimators class ComparisonCriteria(): def __init__(self, criteria_type=None): self._criteria_type = criteria_type def __call__(self, est_covariance, est): if self._criteria_type == "det": return determinant_variance(est_covariance) if self._criteria_type == "trace": return np.exp(log_trace_variance(est_covariance)) raise ValueError( "Criteria {0} not supported".format(self._criteria_type)) def __repr__(self): return "{0}(citeria={1})".format( self.__class__.__name__, self._criteria_type) class SingleQoiAndStatComparisonCriteria(ComparisonCriteria): def __init__(self, stat_type, qoi_idx): """ Compare estimators based on the variance of a single statistic for a single QoI even though mutiple QoI may have been used to compute multiple statistics Parameters ---------- stat_type: str The stat type. Must be one of ["mean", "variance", "mean_variance"] qoi_idx: integer The index of the QoI as it appears in the covariance matrix """ self._stat_type = stat_type self._qoi_idx = qoi_idx def __call__(self, est_covariance, est): if self._stat_type != "mean" and isinstance( est._stat, MultiOutputMeanAndVariance): return ( est_covariance[est.nqoi+self._qoi_idx, est._nqoi+self._qoi_idx]) elif (isinstance( est._stat, (MultiOutputVariance, MultiOutputMean)) or self._stat_type == "mean"): return est_covariance[self._qoi_idx, self._qoi_idx] raise ValueError("{0} not supported".format(est._stat)) def __repr__(self): return "{0}(stat={1}, qoi={2})".format( self.__class__.__name__, self._stat_type, self._qoi_idx) def compute_variance_reductions(optimized_estimators, criteria=ComparisonCriteria("det"), nhf_samples=None): """ Compute the variance reduction (relative to single model MC) for a list of optimized estimtors. Parameters ---------- optimized_estimators : list Each entry is a list of optimized estimators for a set of target costs est_labels : list (nestimators) String used to label each estimator criteria : callable A function that returns a scalar metric of the estimator covariance with signature `criteria(cov) -> float` where cov is an np.ndarray (nstats, nstats) is the estimator covariance nhf_samples : int The number of samples of the high-fidelity model used for the high-fidelity only estimator. If None, then the number of high-fidelity evaluations that produce a estimator cost equal to the optimized target cost of the estimator is used. Usually, nhf_samples should be set to None. """ var_red, est_criterias, sf_criterias = [], [], [] optimized_estimators = optimized_estimators.copy() nestimators = len(optimized_estimators) for ii in range(nestimators): est = optimized_estimators[ii] est_criteria = criteria(est._covariance_from_npartition_samples( est._rounded_npartition_samples), est) if nhf_samples is None: nhf_samples = int(est._rounded_target_cost/est._costs[0]) sf_criteria = criteria( est._stat.high_fidelity_estimator_covariance( nhf_samples), est) var_red.append(sf_criteria/est_criteria) sf_criterias.append(sf_criteria) est_criterias.append(est_criteria) return (np.asarray(var_red), np.asarray(est_criterias), np.asarray(sf_criterias))
[docs]def estimate_model_ensemble_covariance(npilot_samples, generate_samples, model_ensemble, nmodels): r""" Estimate the covariance of a model ensemble from a set of pilot samples Parameters ---------- npilot_samples : integer The number of samples used to estimate the covariance generate_samples : callable Function used to generate realizations of the random variable with call signature samples = generate_samples(npilot_samples) model_ensemble : callable or np.ndarray (nvars, nsamples) Function that takes a set of samples and models ids and evaluates a set of models. See ModelEnsemble. call signature values = model_emsemble(samples) relizations of the random variable nmodels : integer The number of models in the ensemble Returns ------- cov : np.ndarray (nqoi,nqoi) The covariance between the model qoi pilot_random_samples : np.ndarray (nvars,npilot_samples) The random samples used to compute the covariance. These samples DO NOT have a model id pilot_values : np.ndaray (npilot_samples,nmodels) The values of each model at the pilot samples """ # generate pilot samples if callable(generate_samples): pilot_random_samples = generate_samples(npilot_samples) else: pilot_random_samples = generate_samples[:, :npilot_samples] config_vars = np.arange(nmodels)[np.newaxis, :] # append model ids to pilot smaples pilot_samples = get_all_sample_combinations( pilot_random_samples, config_vars) # evaluate models at pilot samples pilot_values = model_ensemble(pilot_samples) pilot_values = np.reshape( pilot_values, (npilot_samples, nmodels)) # compute covariance cov = np.cov(pilot_values, rowvar=False) return cov, pilot_random_samples, pilot_values