Source code for pyapprox.interface.wrappers

import time
import numpy as np
import subprocess
import os
import glob
from functools import partial
from multiprocessing import Pool
import pickle
import copy
# from tqdm import tqdm

from pyapprox.util.utilities import (
    get_all_sample_combinations, hash_array, cartesian_product
)
from pyapprox.util.sys_utilities import has_kwarg


[docs]def evaluate_1darray_function_on_2d_array( function, samples, statusbar=False, return_grad=False): """ Evaluate a function at a set of samples using a function that only takes one sample at a time Parameters ---------- function : callable A function with signature ``function(sample) -> np.ndarray``` where sample is a 1d np.ndarray of shape (num_vars) and the output is a np.ndarray of values of shape (num_qoi). The output can also be a scalar samples : np.ndarray (num_vars, num_samples) The samples at which to evaluate the model statusbar : boolean True - print status bar showing progress to stdout False - do not print return_grad : boolean True - values and return gradient False - return just gradient If function does not accept the return_grad kwarg an exception will be raised Returns ------- values : np.ndarray (num_samples, num_qoi) The value of each requested QoI of the model for each sample """ has_return_grad = has_kwarg(function, "return_grad") if return_grad and not has_return_grad: msg = "return_grad set to true but function does not return grad" raise ValueError(msg) assert samples.ndim == 2 num_samples = samples.shape[1] grads = [] if not has_return_grad or return_grad is False: values_0 = function(samples[:, 0]) else: values_0, grad_0 = function(samples[:, 0], return_grad=return_grad) assert grad_0.ndim == 1 grads.append(grad_0) values_0 = np.atleast_1d(values_0) assert values_0.ndim == 1, values_0.shape num_qoi = values_0.shape[0] values = np.empty((num_samples, num_qoi), float) values[0, :] = values_0 # if statusbar: # pbar = tqdm(total=num_samples) # pbar.update(1) for ii in range(1, num_samples): if not has_return_grad or return_grad is False: values[ii, :] = function(samples[:, ii]) else: val_ii, grad_ii = function(samples[:, ii], return_grad=return_grad) values[ii, :] = val_ii grads.append(grad_ii) # if statusbar: # pbar.update(1) if not return_grad: return values if num_qoi == 1: return values, np.vstack(grads) return values, grads
class PyFunction(object): def __init__(self, function): self.function = function def __call__(self, samples, opts=dict()): return evaluate_1darray_function_on_2d_array( self.function, samples, opts) def run_shell_command(shell_command, opts={}): """ Execute a shell command. Parameters ---------- shell_command : string The command that you want executed output_verbosity : integer (default=0) 0 - supress all model output 1 - write output to file 2 - write output to stdout filename : string (default=None) The filename to which the output of the shell command is written. A file is only written if output_verbosity=1. If output_verbosity=1 and filename is None then filename = shell_command.out env : os.environ (default=None) Mapping that defines the environment variables for the new process; these are used instead of inheriting the current process environment, which is the default behavior. """ output_verbosity = opts.get('verbosity', 1) env = opts.get('env', None) filename = opts.get('filename', None) if output_verbosity == 0: subprocess.check_output(shell_command, shell=True, env=env) elif output_verbosity == 1: if filename is None: filename = 'shell_command.out' with open(filename, 'w') as f: subprocess.call(shell_command, shell=True, stdout=f, stderr=f, env=env) else: subprocess.call(shell_command, shell=True, env=env)
[docs]class DataFunctionModel(object): r""" Create a queriable function that stores samples and associated function values and returns stored values for samples in the database or otherwise evaluate the function. """ def __init__(self, function, data=None, data_basename=None, save_frequency=None, use_hash=True, digits=16, base_model=None): """ Parameters ---------- function : callable A function with signature ``function(w) -> np.ndarray (nsamples,nqoi+1)`` where ``w`` is a np.ndarray of shape (nvars,nsamples). The last qoi returned by function (i.e. the last column of the output array) must be the cost of the simulation. This column is removed from the output of __call__. data : tuple (samples, values) of any previously computed previously samples and associated values data_basename : string The basename of the file used to store the database of samples and values. save_frequency : integer The number of function evaluations run before data is saved. E.g. if save frequency is 10 and __call__(samples) is run with samples containing 30 samples the values and data will be stored at 3 checkpoint, i.e. after 10, 20 and 30 samples have been evaluated use_hash : boolean True - hash samples to determine if values have already been collected False - np.allclose is used to match samples by looping over all samples in the database. This is slower. digits : integer The number of significant digits used to has or compare samples in the database """ self.function = function self.base_model = base_model self.data = dict() self.samples = np.zeros((0, 0)) self.values = None self.grads = None self.num_evaluations_ran = 0 self.num_evaluations = 0 self.digits = digits self.tol = 10**(-self.digits) self.use_hash = use_hash self.data_basename = data_basename self.save_frequency = save_frequency if self.data_basename is not None: assert save_frequency is not None if self.save_frequency and self.data_basename is None: msg = 'Warning save_frequency not being used because data_basename' msg += ' is None' print(msg) if data_basename is not None: file_data = combine_saved_model_data(data_basename) if file_data[0] is not None: self.add_new_data(file_data) if data is not None: self.samples, self.values, self.grads = data assert self.samples.shape[1] == self.values.shape[0] self.add_new_data(data)
[docs] def hash_sample(self, sample): # if samples have undergone a transformation thier value # may not be exactly the same so make hash on samples # with fixed precision # sample = np.round(sample, self.digits) # I = np.where(np.abs(sample)<self.tol)[0] # sample[I] = 0. key = hash_array(sample) # ,decimals=self.digits) return key
[docs] def add_new_data(self, data): samples, values, grads = data for ii in range(samples.shape[1]): if self.use_hash: key = self.hash_sample(samples[:, ii]) if key in self.data: if not np.allclose( self.values[self.data[key]], values[ii]): msg = 'Duplicate samples found but values do not match' raise Exception(msg) found = True else: self.data[key] = ii found = False else: found = False for jj in range(self.samples.shape[1]): if np.allclose(self.samples[:, jj], samples[:, ii], atol=self.tol): found = True break if not found: if self.samples.shape[1] > 0: self.samples = np.hstack( [self.samples, samples[:, ii:ii+1]]) self.values = np.vstack([self.values, values[ii:ii+1, :]]) if grads is not None: self.grads += grads[ii:ii+1] else: self.samples = samples[:, ii:ii+1] self.values = values[ii:ii+1, :] if grads is not None: self.grads = grads[ii:ii+1] # set counter so that next file takes into account all previously # ran samples self.num_evaluations_ran = self.samples.shape[1]
def _batch_call(self, samples, return_grad): assert self.save_frequency > 0 num_batch_samples = self.save_frequency lb = 0 vals = None grads = None while lb < samples.shape[1]: ub = min(lb+num_batch_samples, samples.shape[1]) num_evaluations_ran = self.num_evaluations_ran batch_vals, batch_grads, new_sample_indices = self._call( samples[:, lb:ub], return_grad) if return_grad: grads_4_save = [batch_grads[ii] for ii in new_sample_indices] else: grads_4_save = [None for ii in new_sample_indices] # I think this code will work only if function always does or does # not return grads if vals is None: vals = batch_vals grads = copy.deepcopy(batch_grads) else: vals = np.vstack((vals, batch_vals)) grads += copy.deepcopy(batch_grads) if len(new_sample_indices) == 0: lb = ub continue data_filename = self.data_basename+'-%d-%d.pkl' % ( num_evaluations_ran, num_evaluations_ran+len(new_sample_indices)-1) # np.savez(data_filename, vals=batch_vals[new_sample_indices], # samples=samples[:, lb:ub][:, new_sample_indices], # grads=grads_4_save) with open(data_filename, "wb") as f: pickle.dump(( batch_vals[new_sample_indices], samples[:, lb:ub][:, new_sample_indices], grads_4_save), f) lb = ub if not return_grad: return vals return vals, grads def _call(self, samples, return_grad): has_return_grad = has_kwarg(self.function, "return_grad") if return_grad and not has_return_grad: msg = "return_grad set to true but function does not return return_grad" raise ValueError(msg) evaluated_sample_indices = [] new_sample_indices = [] for ii in range(samples.shape[1]): if self.use_hash: key = self.hash_sample(samples[:, ii]) if key in self.data: evaluated_sample_indices.append([ii, self.data[key]]) else: new_sample_indices.append(ii) else: found = False for jj in range(self.samples.shape[1]): if np.allclose(self.samples[:, jj], samples[:, ii], atol=self.tol): found = True break if found: evaluated_sample_indices.append([ii, jj]) else: new_sample_indices.append(ii) evaluated_sample_indices = np.asarray(evaluated_sample_indices) if len(new_sample_indices) > 0: new_samples = samples[:, new_sample_indices] if not has_return_grad or not return_grad: new_values = self.function(new_samples) num_qoi = new_values.shape[1] new_grads = None else: new_values, new_grads = self.function( new_samples, return_grad=return_grad) num_qoi = new_values.shape[1] else: num_qoi = self.values.shape[1] values = np.empty((samples.shape[1], num_qoi), dtype=float) grads = [None for ii in range(samples.shape[1])] if len(new_sample_indices) > 0: values[new_sample_indices, :] = new_values new_grads_list = [None for ii in range(len(new_sample_indices))] if new_grads is not None: for ii in range(len(new_sample_indices)): # TODO need to make sure fun(sampels, return_grad)=True # can return list of grads for each sample # or a 2d array if nqoi=1 new_grads_list[ii] = np.atleast_2d(new_grads[ii]).copy() grads[new_sample_indices[ii]] = copy.deepcopy( new_grads[ii]) new_grads = new_grads_list if len(new_sample_indices) < samples.shape[1]: values[evaluated_sample_indices[:, 0]] = \ self.values[evaluated_sample_indices[:, 1], :] if has_return_grad: for ii in range(evaluated_sample_indices.shape[0]): grads[evaluated_sample_indices[ii, 0]] = copy.deepcopy( self.grads[ evaluated_sample_indices[ii, 1]]) if len(new_sample_indices) > 0: if self.samples.shape[1] == 0: jj = 0 self.samples = samples self.values = values if has_return_grad: self.grads = copy.deepcopy(grads) else: jj = self.samples.shape[0] self.samples = np.hstack( (self.samples, samples[:, new_sample_indices])) self.values = np.vstack((self.values, new_values)) if has_return_grad: self.grads += copy.deepcopy(new_grads) for ii in range(len(new_sample_indices)): key = hash_array(samples[:, new_sample_indices[ii]]) self.data[key] = jj+ii self.num_evaluations_ran += len(new_sample_indices) # increment the number of samples pass to __call__ since object # created # includes samples drawn from arxiv and samples used to evaluate # self.function self.num_evaluations += samples.shape[1] return values, grads, new_sample_indices @staticmethod def _grads_valid(grads): for g in grads: if g is None: # TODO remove exception and rerun samples to get grads msg = "return_grad=True but previous samples evaluated did " msg += "not have grads" raise ValueError(msg)
[docs] def __call__(self, samples, return_grad=False): if self.save_frequency is not None and self.save_frequency > 0: out = self._batch_call(samples, return_grad) if not return_grad: return out self._grads_valid(out[1]) return out values, grads = self._call(samples, return_grad)[:-1] if return_grad: self._grads_valid(grads) return values, grads return values
def run_model_samples_in_parallel(model, max_eval_concurrency, samples, pool=None, assert_omp=True): """ Warning ------- pool.map serializes each argument and so if model is a class, any of its member variables that are updated in __call__ will not persist once each __call__ to pool completes. """ if max_eval_concurrency == 1: return model(samples) num_samples = samples.shape[1] if assert_omp and max_eval_concurrency > 1: if ('OMP_NUM_THREADS' not in os.environ or not int(os.environ['OMP_NUM_THREADS']) == 1): msg = 'User set assert_omp=True but OMP_NUM_THREADS has not been ' msg += 'set to 1. Run script with ' msg += 'OMP_NUM_THREADS=1 python script.py' raise Exception(msg) if pool is None: pool_given = False pool = Pool(max_eval_concurrency) else: pool_given = True result = pool.map( model, [(samples[:, ii:ii+1]) for ii in range(samples.shape[1])]) if pool_given is False: pool.close() if type(result[0]) == tuple: assert len(result[0]) == 2 num_qoi = result[0][0].shape[1] return_grad = True return_grads = [] else: num_qoi = result[0].shape[1] return_grad = False values = np.empty((num_samples, num_qoi)) for ii in range(len(result)): if not return_grad: values[ii, :] = result[ii][0, :] else: values[ii, :] = result[ii][0][0, :] if type(result[ii][1]) == list: return_grads += result[ii][1] else: return_grads += [result[ii][1]] if not return_grad: return values return values, return_grads def time_function_evaluations(function, samples, return_grad=False): has_return_grad = has_kwarg(function, "return_grad") if return_grad and not has_return_grad: msg = "return_grad set to true but function does not return grad" raise ValueError(msg) vals = [] grads = [] times = [] has_return_grad = has_kwarg(function, "return_grad") for ii in range(samples.shape[1]): t0 = time.time() if not has_return_grad or not return_grad: val = function(samples[:, ii:ii+1])[0, :] else: out = function(samples[:, ii:ii+1], return_grad=return_grad) val = out[0][0, :] grads.append(out[1]) t1 = time.time() vals.append(val) times.append([t1-t0]) vals = np.asarray(vals) times = np.asarray(times) if len(grads) == 0: return np.hstack([vals, times]) if vals.shape[1] == 1: grads = np.vstack(grads) return np.hstack([vals, times]), grads
[docs]class TimerModel(object): r""" Return the wall-time needed to evaluate a function at each sample as an additional quantity of interest. """ def __init__(self, function, base_model=None): """ Parameters ---------- function : callable A function with signature ``function(w) -> np.ndarray (nsamples,nqoi+1)`` where ``w`` is a np.ndarray of shape (nvars,nsamples). The last qoi returned by function (i.e. the last column of the output array) must be the cost of the simulation. This column is removed from the output of __call__. base_model : callable A function with signature ``base_model(w) -> float`` where ``w`` is a np.ndarray of shape (nvars,nsamples). This is useful when function is a wrapper of another model, i.e. base_model and algorithms or the user want access to the attribtes of the base_model. """ self.function_to_time = function self.base_model = base_model # def x__getattr__(self, name): # """ # Cannot get following to work # If defining a custom __getattr__ it seems I cannot have member # variables with the same name in this class and class definition # of function # if self.function is itself a model object allow the access of # self.function.name using self.name # Note __getattr__ # will be invoked on python objects only when the requested # attribute is not found in the particular object's space. # """ # if hasattr(self.function_to_time, name): # attr = getattr(self.function_to_time, name) # return attr # raise AttributeError( # f" {self} or its member {self}.function has no attribute '{name}'")
[docs] def __call__(self, samples, return_grad=False): return time_function_evaluations( self.function_to_time, samples, return_grad=return_grad)
class WorkTracker(object): r""" Store the cost needed to evaluate a function under different configurations, e.g. mesh resolution of a finite element model used to solve a PDE. """ def __init__(self): self.costs = dict() def __call__(self, config_samples=None): """ Read the cost of evaluating the functions with the ids given in a set of config_samples. Parameters ---------- config_samples : np.ndarray (nconfig_vars,nsamples) The configuration indices. If None the default Id [0] is used """ if config_samples is None: config_samples = np.asarray([[0]]) num_config_vars, nqueries = config_samples.shape costs = np.empty((nqueries)) for ii in range(nqueries): # key = tuple([int(ll) for ll in config_samples[:, ii]]) key = tuple([ll for ll in config_samples[:, ii]]) if key not in self.costs: msg = 'Asking for cost before function cost has been provided' raise Exception(msg) else: costs[ii] = np.median(self.costs[key]) return costs def update(self, config_samples, costs): """ Update the cost of evaluating the functions with the ids given in a set of config_samples. Parameters ---------- config_samples : np.ndarray (nconfig_vars,nsamples) The configuration indices costs : np.ndarray (nsamples) The costs of evaluating the function index by each index in ``config_samples`` """ num_config_vars, nqueries = config_samples.shape assert costs.shape[0] == nqueries assert costs.ndim == 1 for ii in range(nqueries): # key = tuple([int(ll) for ll in config_samples[:, ii]]) key = tuple([ll for ll in config_samples[:, ii]]) if key in self.costs: self.costs[key].append(costs[ii]) else: self.costs[key] = [costs[ii]] def __str__(self): msg = 'WorkTracker Cost Summary\n' msg += '{:<10} {:<10}\n'.format('Funtion ID', 'Median Cost') for item in self.costs.items(): msg += '{:<10} {:<10}\n'.format(str(item[0]), np.median(item[1])) return msg # def eval(function, samples): # return function(samples)
[docs]class WorkTrackingModel(object): r""" Keep track of the wall time needed to evaluate a function. """ def __init__(self, function, base_model=None, num_config_vars=0, enforce_timer_model=True): """ Keep track of the wall time needed to evaluate a function. Parameters ---------- function : callable A function with signature ``function(w) -> np.ndarray (nsamples, nqoi+1)`` where ``w`` is a np.ndarray of shape (nvars,nsamples). The last qoi returned by function (i.e. the last column of the output array) must be the cost of the simulation. This column is removed from the output of __call__. model = WorkTrackingModel(TimerModel(benchmark.fun), num_config_vars=1) base_model : callable A function with signature ``base_model(w) -> float`` where ``w`` is a np.ndarray of shape (nvars,nsamples). This is useful when function is a wrapper of another model, i.e. base_model and algorithms or the user want access to the attribtes of the base_model. num_config_vars : integer The number of configuration variables of fun. For most functions this will be zero. enforce_timer_model : boolean If True function must be an instance of TimerModel. If False function must return qoi plus an additional column which is the model run time Notes ----- If defining a custom __getattr__ it seems I cannot have member variables with the same name in this class and class definition of function """ self.wt_function = function if enforce_timer_model and not isinstance(function, TimerModel): raise ValueError("Function is not an instance of TimerModel") self.work_tracker = WorkTracker() self.base_model = base_model self.num_config_vars = num_config_vars
[docs] def __call__(self, samples, return_grad=False): """ Evaluate self.function Parameters ---------- samples : np.ndarray (nvars,nsamples) Samples used to evaluate self.function Returns ------- values : np.ndarray (nsamples,nqoi) The values of self.function. The last qoi returned by self.function (i.e. the last column of the output array of size (nsamples,nqoi+1) is the cost of the simulation. This column is not included in values. """ has_return_grad = has_kwarg(self.wt_function, "return_grad") if return_grad and not has_return_grad: msg = "return_grad set to true but function does not return grad" raise ValueError(msg) if not has_return_grad or not return_grad: data = self.wt_function(samples) else: data, grads = self.wt_function(samples, return_grad=return_grad) if data.shape[1] <= 1: raise RuntimeError( "function did not return at least one QoI and time") values = data[:, :-1] work = data[:, -1] if self.num_config_vars > 0: config_samples = samples[-self.num_config_vars:, :] else: config_samples = np.zeros((1, samples.shape[1])) self.work_tracker.update(config_samples, work) if not return_grad: return values return values, grads
[docs] def cost_function(self, config_samples=None): """ Retrun the cost of evaluating the functions with the ids given in a set of config_samples. These samples are assumed to be in user space not canonical space Parameters ---------- config_samples : np.ndarray (nconfig_vars, nsamples) The configuration indices """ if config_samples is None: config_samples = np.zeros((1, 1)) cost = self.work_tracker(config_samples) return cost
def __repr__(self): if self.base_model is not None: return "{0}(base_model={1})".format( self.__class__.__name__, self.base_model) return "{0}(model={1})".format( self.__class__.__name__, self.wt_function)
[docs]class PoolModel(object): r""" Evaluate a function at multiple samples in parallel using multiprocessing.Pool """ def __init__(self, function, max_eval_concurrency, assert_omp=True, base_model=None): """ Parameters ---------- function : callable A function with signature ``function(w) -> np.ndarray (nsamples,nqoi+1)`` where ``w`` is a np.ndarray of shape (nvars,nsamples). max_eval_concurrency : integer The maximum number of simulations that can be run in parallel. Should be no more than the maximum number of cores on the computer being used assert_omp : boolean If True make sure that python is only using one thread per model instance. On OSX and Linux machines this means that the environement variable OMP_NUM_THREADS has been set to 1 with, e.g. export OMP_NUM_THREADS=1 This is useful because often many python packages, e.g. SciPy, NumPy use multiple threads and this can cause running multiple evaluations of function to be slow because of resource allocation issues. base_model : callable A function with signature ``base_model(w) -> float`` where ``w`` is a np.ndarray of shape (nvars,nsamples). This is useful when function is a wrapper of another model, i.e. base_model and algorithms or the user want access to the attribtes of the base_model. Notes ----- If defining a custom __getattr__ it seems I cannot have member variables with the same name in this class and class definition of function """ self.base_model = base_model self.set_max_eval_concurrency(max_eval_concurrency) self.num_evaluations = 0 self.assert_omp = assert_omp self.pool_function = function
[docs] def set_max_eval_concurrency(self, max_eval_concurrency): """ Set the number of threads used to evaluate the function Parameters ---------- max_eval_concurrency : integer The maximum number of simulations that can be run in parallel. Should be no more than the maximum number of cores on the computer being used """ self.max_eval_concurrency = max_eval_concurrency
[docs] def __call__(self, samples, verbose=False, return_grad=False): """ Evaluate a function at multiple samples in parallel using multiprocessing.Pool Parameters ---------- samples : np.ndarray (nvars,nsamples) Samples used to evaluate self.function """ has_return_grad = has_kwarg(self.pool_function, "return_grad") if return_grad and not has_return_grad: msg = "return_grad set to true but function does not return grad" raise ValueError(msg) if not has_return_grad or not return_grad: fun = self.pool_function else: fun = partial(self.pool_function, return_grad=return_grad) t0 = time.time() vals = run_model_samples_in_parallel( fun, self.max_eval_concurrency, samples, pool=None, assert_omp=self.assert_omp) if verbose: msg = f"Evaluating all {samples.shape[1]} samples took " msg += f"{time.time()-t0} seconds" print(msg) return vals
def get_active_set_model_from_variable(function, variable, active_var_indices, nominal_values, base_model=None): from pyapprox.variables.joint import IndependentMarginalsVariable active_variable = IndependentMarginalsVariable( [variable.marginals()[ii] for ii in active_var_indices]) mask = np.ones(variable.num_vars(), dtype=bool) mask[active_var_indices] = False inactive_var_values = nominal_values[mask] model = ActiveSetVariableModel( function, variable.num_vars(), inactive_var_values, active_var_indices, base_model=base_model) return model, active_variable class ActiveSetVariableModel(object): r""" Create a model wrapper that only accepts a subset of the model variables. """ def __init__(self, function, num_vars, inactive_var_values, active_var_indices, base_model=None): # num_vars can de determined from inputs but making it # necessary allows for better error checking self.function = function assert inactive_var_values.ndim == 2 self.inactive_var_values = np.asarray(inactive_var_values) self.active_var_indices = np.asarray(active_var_indices) assert self.active_var_indices.shape[0] + \ self.inactive_var_values.shape[0] == num_vars self.num_vars = num_vars assert np.all(self.active_var_indices < self.num_vars) self.inactive_var_indices = np.delete( np.arange(self.num_vars), active_var_indices) self.base_model = base_model def _expand_samples(self, reduced_samples): assert reduced_samples.ndim == 2 raw_samples = get_all_sample_combinations( self.inactive_var_values, reduced_samples) samples = np.empty_like(raw_samples) samples[self.inactive_var_indices, :] = raw_samples[:self.inactive_var_indices.shape[0]] samples[self.active_var_indices, :] = raw_samples[self.inactive_var_indices.shape[0]:] return samples def __call__(self, reduced_samples, return_grad=False): has_return_grad = has_kwarg(self.function, "return_grad") if return_grad and not has_return_grad: msg = "return_grad set to true but function does not return grad" raise ValueError(msg) samples = self._expand_samples(reduced_samples) if not has_return_grad: return self.function(samples) return self.function(samples, return_grad) def num_active_vars(self): return len(self.inactive_var_indices) def combine_saved_model_data(saved_data_basename): filenames = glob.glob(saved_data_basename+'*.pkl') ii = 0 grads = None for filename in filenames: # data = np.load(filename, allow_pickle=True) # data_vals, data_samples, data_grads = ( # data["vals"], data["samples"], data["grads"] with open(filename, "rb") as f: data_vals, data_samples, data_grads = pickle.load(f) if ii == 0: vals = data_vals samples = data_samples grads = data_grads else: vals = np.vstack((vals, data_vals)) samples = np.hstack((samples, data_samples)) grads += data_grads ii += 1 if len(filenames) == 0: return None, None, None return samples, vals, grads class SingleFidelityWrapper(object): r""" Create a single fidelity model that fixes the configuration variables to user-defined nominal values. """ def __init__(self, model, config_values, base_model=None): self.model = model assert config_values.ndim == 1 self.config_values = config_values[:, np.newaxis] if base_model is None: base_model = model self.base_model = base_model def __call__(self, samples): multif_samples = np.vstack( (samples, np.tile(self.config_values, (1, samples.shape[1])))) return self.model(multif_samples) def default_map_to_multidimensional_index(num_config_vars, indices): indices = np.atleast_2d(indices) assert indices.ndim == 2 and indices.shape[0] == 1 multiindex_indices = np.empty( (num_config_vars, indices.shape[1]), dtype=indices.dtype) for jj in range(indices.shape[1]): multiindex_indices[:, jj] = indices[0, jj] return multiindex_indices class MultiLevelWrapper(object): r""" Specify a one-dimension model hierachy from a multiple dimensional hierarchy For example if model has configure variables which refine the x and y physical directions then one can specify a multilevel hierarchy by creating new indices with the mapping k=(i,i). map_to_multidimensional_index : callable Function which maps 1D model index to multi-dimensional index See function default_map_to_multidimensional_index """ def __init__(self, model, multiindex_num_config_vars, map_to_multidimensional_index=None): self.model = model self.multiindex_num_config_vars = multiindex_num_config_vars if map_to_multidimensional_index is None: self.map_to_multidimensional_index =\ partial(default_map_to_multidimensional_index, multiindex_num_config_vars) else: self.map_to_multidimensional_index = map_to_multidimensional_index self.num_evaluations = 0 self.num_config_vars = 1 def __call__(self, samples): config_values = self.map_to_multidimensional_index(samples[-1:, :]) assert config_values.shape[0] == self.multiindex_num_config_vars multi_index_samples = np.vstack((samples[:-1], config_values)) return self.model(multi_index_samples) @property def num_evaluations(self): return self.model.num_evaluations @num_evaluations.setter def num_evaluations(self, nn): self.__num_evaluations = nn self.model.num_evaluations = nn
[docs]class ModelEnsemble(object): r""" Wrapper class to allow easy one-dimensional indexing of models in an ensemble. """ def __init__(self, functions, names=None): r""" Parameters ---------- functions : list of callable A list of functions defining the model ensemble. The functions must have the call signature values=function(samples) """ self.functions = functions self.nmodels = len(self.functions) if names is None: names = ['f%d' % ii for ii in range(self.nmodels)] self.names = names
[docs] def evaluate_at_separated_samples(self, samples_list, active_model_ids): r""" Evaluate a set of models at different sets of samples. The models need not have the same parameters. Parameters ---------- samples_list : list[np.ndarray (nvars_ii, nsamples_ii)] Realizations of the multivariate random variable model to evaluate each model. active_model_ids : iterable The models to evaluate Returns ------- values_list : list[np.ndarray (nsamples, nqoi)] The values of the models at the different sets of samples """ values_0 = self.functions[active_model_ids[0]](samples_list[0]) assert values_0.ndim == 2 values_list = [values_0] for ii in range(1, active_model_ids.shape[0]): values_list.append(self.functions[active_model_ids[ii]]( samples_list[ii])) return values_list
[docs] def evaluate_models(self, samples_per_model): """ Evaluate a set of models at a set of samples. Parameters ---------- samples_per_model : list (nmodels) The ith entry contains the set of samples np.narray(nvars, nsamples_ii) used to evaluate the ith model. Returns ------- values_per_model : list (nmodels) The ith entry contains the set of values np.narray(nsamples_ii, nqoi) obtained from the ith model. """ nmodels = len(samples_per_model) if nmodels != self.nmodels: raise ValueError("Samples must be provided for each model") nvars = samples_per_model[0].shape[0] nsamples = np.sum([ss.shape[1] for ss in samples_per_model]) samples = np.empty((nvars+1, nsamples)) cnt = 0 ubs = np.cumsum([ss.shape[1] for ss in samples_per_model]) lbs = np.hstack((0, ubs[:-1])) for ii, samples_ii in enumerate(samples_per_model): samples[:-1, lbs[ii]:ubs[ii]] = samples_ii samples[-1, lbs[ii]:ubs[ii]] = ii cnt += samples_ii.shape[1] values = self(samples) values_per_model = [values[lbs[ii]:ubs[ii]] for ii in range(nmodels)] return values_per_model
[docs] def __call__(self, samples): r""" Evaluate a set of models at a set of samples. The models must have the same parameters. Parameters ---------- samples : np.ndarray (nvars+1,nsamples) Realizations of a multivariate random variable each with an additional scalar model id indicating which model to evaluate. Returns ------- values : np.ndarray (nsamples,nqoi) The values of the models at samples """ model_ids = samples[-1, :] assert model_ids.max() < self.nmodels active_model_ids = np.unique(model_ids).astype(int) active_model_id = active_model_ids[0] II = np.where(model_ids == active_model_id)[0] values_0 = self.functions[active_model_id](samples[:-1, II]) assert values_0.ndim == 2 nqoi = values_0.shape[1] values = np.empty((samples.shape[1], nqoi)) values[II, :] = values_0 for ii in range(1, active_model_ids.shape[0]): active_model_id = active_model_ids[ii] II = np.where(model_ids == active_model_id)[0] values_II = self.functions[active_model_id](samples[:-1, II]) assert values_II.ndim == 2 values[II] = values_II return values
def __repr__(self): return "{0}(nmodels={1})".format( self.__class__.__name__, self.nmodels)
[docs]class MultiIndexModel(): """ Define a multi-index model to be used for multi-index collocation Parameters ---------- setup_model : callable Function with the signature ``setup_model(config_values) -> model_instance`` where config_values np.ndarray (nconfig_vars, 1) defines the numerical resolution of model instance where model instance is a callable with the signature ``setup_model(random_samples) -> np.ndarray (nsamples, nqoi)`` where random_samples is np.ndarray (nvars, nsamples). config_values : 2d list [nconfig_vars, nconfig_values_i] Contains the possible model discretization values for each config var, e.g. [[100, 200], [0.1, 0.2, 0.3]] would be use to specify a 1D spartial mesh with 100 elements or 200 elements and time step sizes of [0.1, 0.2, 0.3] """ def __init__(self, setup_model, config_values): self._nconfig_vars = len(config_values) self._config_values = config_values self._model_ensemble, self._multi_index_to_model_id_map = ( self._create_model_ensemble(setup_model, config_values)) def _create_model_ensemble(self, setup_model, config_values): # config_var_trans = ConfigureVariableTransformation(config_values) config_samples = cartesian_product(config_values).astype(np.double) models = [None for ii in range(config_samples.shape[1])] multi_index_to_model_id_map = {} for ii in range(config_samples.shape[1]): models[ii] = setup_model(config_samples[:, ii]) multi_index_to_model_id_map[hash_array(config_samples[:, ii])] = ii return ModelEnsemble(models), multi_index_to_model_id_map
[docs] def __call__(self, samples): """ Parameters ---------- samples : np.ndarray (nvars+nconfig_vars, nsamples) Each row is the concatenation of a random sample and a configuration sample. Returns ------- values : np.ndarray (nsamples, nqoi) Evaluations of the model at the samples. """ nsamples = samples.shape[1] config_samples = samples[-self._nconfig_vars:, :] model_ids = np.empty((1, nsamples)) for ii in range(nsamples): key = hash_array(config_samples[:, ii]) if key not in self._multi_index_to_model_id_map: msg = f"Model ID for : {config_samples[:, ii]} not found." raise RuntimeError(msg) model_ids[0, ii] = self._multi_index_to_model_id_map[key] return self._model_ensemble( np.vstack((samples[:-self._nconfig_vars, :], model_ids)))
def __repr__(self): return "{0}(nconfig_values_per_config_var={1})".format( self.__class__.__name__, [len(v) for v in self._config_values])
class ArchivedDataModel(): def __init__(self, samples, values): # todo add gradients and hess vec prods as optional args if values.ndim != 2 or values.shape[0] != samples.shape[1]: msg = "values must have shape (nsamples, nqoi) but has shape" msg += f" {values.shape}" raise ValueError(msg) self.samples = samples self.values = values self._samples_dict = self._set_samples_dict(samples) # when randomness is None then rvs just iterates sequentially # through samples until none are left. Useful for methods # that require unique samples self._sample_cnt = 0 self._samples_dict = self._set_samples_dict(samples) def _set_samples_dict(self, samples): samples_dict = dict() for ii in range(samples.shape[1]): key = self._hash_sample(samples[:, ii]) if key in samples_dict: raise ValueError("Duplicate samples detected") samples_dict[key] = ii return samples_dict def num_vars(self): return self.samples.shape[0] def _hash_sample(self, sample): key = hash_array(sample) return key def __call__(self, samples): values = [] for ii in range(samples.shape[1]): key = self._hash_sample(samples[:, ii]) if key not in self._samples_dict: raise ValueError("Sample not found") sample_id = self._samples_dict[key] values.append(self.values[sample_id]) return np.array(values) def rvs(self, nsamples, weights=None, randomness="wo_replacement", return_indices=False): """ Randomly sample with replacement from all available samples if weights is None uniform weights are applied to each sample otherwise sample according to weights """ if randomness is None: if self._sample_cnt+nsamples > self.samples.shape[1]: msg = "Too many samples requested when randomness is None. " msg += f"self._sample+cnt_nsamples={self._sample_cnt+nsamples}" msg += f" but only {self.samples.shape[1]} samples available" msg += " This can be overidden by reseting self._sample_cnt=0" raise ValueError(msg) indices = np.arange(self._sample_cnt, self._sample_cnt+nsamples, dtype=int) self._sample_cnt += nsamples else: indices = np.random.choice( np.arange(self.samples.shape[1], dtype=int), nsamples, p=weights, replace=(randomness == "replacement")) if not return_indices: return self.samples[:, indices] else: return self.samples[:, indices], indices