Source code for pyapprox.variables.joint

import numpy as np
from scipy import stats
from abc import ABC, abstractmethod

from pyapprox.variables.marginals import (
    get_unique_variables, get_distribution_info,
    is_bounded_continuous_variable, get_truncated_range
)
from pyapprox.variables.nataf import (
    nataf_joint_density, generate_x_samples_using_gaussian_copula,
    transform_correlations, scipy_gauss_hermite_pts_wts_1D
)


[docs]class JointVariable(ABC): r""" Base class for multivariate variables. """
[docs] @abstractmethod def rvs(self, num_samples): """ Generate samples from a random variable. Parameters ---------- num_samples : integer The number of samples to generate Returns ------- samples : np.ndarray (num_vars, num_samples) Independent samples from the target distribution """ raise NotImplementedError()
def __str__(self): return "JointVariable"
[docs]class IndependentMarginalsVariable(JointVariable): """ Class representing independent random variables Examples -------- >>> from pyapprox.variables.joint import IndependentMarginalsVariable >>> from scipy.stats import norm, beta >>> marginals = [norm(0,1),beta(0,1),norm()] >>> variable = IndependentMarginalsVariable(marginals) >>> print(variable) I.I.D. Variable Number of variables: 3 Unique variables and global id: norm(loc=0,scale=1): z0, z2 beta(a=0,b=1,loc=0,scale=1): z1 """ def __init__(self, unique_variables, unique_variable_indices=None, variable_labels=None): """ Constructor method """ if unique_variable_indices is None: self.unique_variables, self.unique_variable_indices =\ get_unique_variables(unique_variables) else: self.unique_variables = unique_variables.copy() self.unique_variable_indices = unique_variable_indices.copy() self.nunique_vars = len(self.unique_variables) assert self.nunique_vars == len(self.unique_variable_indices) self.nvars = 0 for ii in range(self.nunique_vars): self.unique_variable_indices[ii] = np.asarray( self.unique_variable_indices[ii]) self.nvars += self.unique_variable_indices[ii].shape[0] if unique_variable_indices is None: assert self.nvars == len(unique_variables) self.variable_labels = variable_labels
[docs] def num_vars(self): """ Return The number of independent 1D variables Returns ------- nvars : integer The number of independent 1D variables """ return self.nvars
[docs] def marginals(self): """ Return a list of all the 1D scipy.stats random variables. Returns ------- variables : list List of :class:`scipy.stats.dist` variables """ all_variables = [None for ii in range(self.nvars)] for ii in range(self.nunique_vars): for jj in self.unique_variable_indices[ii]: all_variables[jj] = self.unique_variables[ii] return all_variables
[docs] def get_statistics(self, function_name, *args, **kwargs): """ Get a statistic from each univariate random variable. Parameters ---------- function_name : string The function name of the scipy random variable statistic of interest kwargs : kwargs The arguments to the scipy statistic function Returns ------- stat : np.ndarray The output of the stat function Examples -------- >>> import pyapprox as pya >>> from scipy.stats import uniform >>> num_vars = 2 >>> variable = pya.IndependentMarginalsVariable([uniform(-2, 3)], [np.arange(num_vars)]) >>> variable.get_statistics("interval", confidence=1) array([[-2., 1.], [-2., 1.]]) >>> variable.get_statistics("pdf",x=np.linspace(-2, 1, 3)) array([[0.33333333, 0.33333333, 0.33333333], [0.33333333, 0.33333333, 0.33333333]]) """ for ii in range(self.nunique_vars): var = self.unique_variables[ii] indices = self.unique_variable_indices[ii] stats_ii = np.atleast_1d(getattr(var, function_name)( *args, **kwargs)) assert stats_ii.ndim == 1 if ii == 0: stats = np.empty((self.num_vars(), stats_ii.shape[0])) stats[indices] = stats_ii return stats
[docs] def evaluate(self, function_name, x): """ Evaluate a frunction for each univariate random variable. Parameters ---------- function_name : string The function name of the scipy random variable statistic of interest x : np.ndarray (nsamples) The input to the scipy statistic function Returns ------- stat : np.ndarray (nsamples, nqoi) The outputs of the stat function for each variable """ stats = None for ii in range(self.nunique_vars): var = self.unique_variables[ii] indices = self.unique_variable_indices[ii] for jj in indices: stats_jj = np.atleast_1d(getattr(var, function_name)(x[jj, :])) assert stats_jj.ndim == 1 if stats is None: stats = np.empty((self.num_vars(), stats_jj.shape[0])) stats[jj] = stats_jj return stats
[docs] def pdf(self, x, log=False): """ Evaluate the joint probability distribution function. Parameters ---------- x : np.ndarray (nvars, nsamples) Values in the domain of the random variable X log : boolean True - return the natural logarithm of the PDF values False - return the PDF values Returns ------- values : np.ndarray (nsamples, 1) The values of the PDF at x """ assert x.shape[0] == self.num_vars() if log is False: marginal_vals = self.evaluate("pdf", x) return np.prod(marginal_vals, axis=0)[:, None] marginal_vals = self.evaluate("logpdf", x) return np.sum(marginal_vals, axis=0)[:, None]
def _evaluate(self, function_name, x): """ Evaluate a frunction for each univariate random variable using rv.dist This is faster than evaluate because it avoids error checks. Use with caution. dist is only for canonical distribution so user must transform variables before using this function Parameters ---------- function_name : string The function name of the scipy random variable statistic of interest x : np.ndarray (nsamples) The input to the scipy statistic function Returns ------- stat : np.ndarray (nsamples, nqoi) The outputs of the stat function for each variable """ stats = None for ii in range(self.nunique_vars): var = self.unique_variables[ii] indices = self.unique_variable_indices[ii] for jj in indices: stats_jj = np.atleast_1d( getattr(var.dist, function_name)(x[jj, :])) assert stats_jj.ndim == 1 if stats is None: stats = np.empty((self.num_vars(), stats_jj.shape[0])) stats[jj] = stats_jj return stats def _pdf(self, x, log=False): if not log: marginal_vals = self._evaluate("_pdf", x) return np.prod(marginal_vals, axis=0)[:, None] marginal_vals = self._evaluate("_logpdf", x) return np.sum(marginal_vals, axis=0)[:, None] def __str__(self): variable_labels = self.variable_labels if variable_labels is None: variable_labels = ["z%d" % ii for ii in range(self.num_vars())] string = "Independent Marginal Variable\n" string += f"Number of variables: {self.num_vars()}\n" string += "Unique variables and global id:\n" for ii in range(self.nunique_vars): var = self.unique_variables[ii] indices = self.unique_variable_indices[ii] name, scales, shapes = get_distribution_info(var) shape_string = ",".join( [f"{name}={val}" for name, val in shapes.items()]) scales_string = ",".join( [f"{name}={val}" for name, val in scales.items()]) string += " "+var.dist.name + "(" if len(shapes) > 0: string += ",".join([shape_string, scales_string]) else: string += scales_string string += "): " string += ", ".join( [variable_labels[idx] for idx in indices]) if ii < self.nunique_vars-1: string += "\n" return string def __repr__(self): return self.__str__()
[docs] def is_bounded_continuous_variable(self): """ Are all 1D variables are continuous and bounded. Returns ------- is_bounded : boolean True - all 1D variables are continuous and bounded False - otherwise """ for rv in self.unique_variables: if not is_bounded_continuous_variable(rv): return False return True
[docs] def rvs(self, num_samples, random_states=None): """ Generate samples from a tensor-product probability measure. Parameters ---------- num_samples : integer The number of samples to generate Returns ------- samples : np.ndarray (num_vars, num_samples) Independent samples from the target distribution """ num_samples = int(num_samples) samples = np.empty((self.num_vars(), num_samples), dtype=float) if random_states is not None: assert len(random_states) == self.num_vars() else: random_states = [None]*self.num_vars() for ii in range(self.nunique_vars): var = self.unique_variables[ii] indices = self.unique_variable_indices[ii] samples[indices, :] = var.rvs( size=(indices.shape[0], num_samples), random_state=random_states[ii]) return samples
[docs]class GaussCopulaVariable(JointVariable): """ Multivariate random variable with Gaussian correlation and arbitrary marginals """ def __init__(self, marginals, x_correlation, bisection_opts={}): self.nvars = len(marginals) self._marginals = marginals self.x_correlation = x_correlation self.x_marginal_means = np.array([m.mean() for m in self._marginals]) self.x_marginal_stdevs = np.array([m.std() for m in self._marginals]) self.x_marginal_pdfs = [m.pdf for m in self._marginals] self.x_marginal_cdfs = [m.cdf for m in self._marginals] self.x_marginal_inv_cdfs = [m.ppf for m in self._marginals] quad_rule = scipy_gauss_hermite_pts_wts_1D(11) self.z_correlation = transform_correlations( self.x_correlation, self.x_marginal_inv_cdfs, self.x_marginal_means, self.x_marginal_stdevs, quad_rule, bisection_opts) self.z_variable = stats.multivariate_normal( mean=np.zeros((self.nvars)), cov=self.z_correlation)
[docs] def z_joint_density(self, z_samples): return self.z_variable.pdf(z_samples.T)
[docs] def pdf(self, x_samples, log=False): vals = nataf_joint_density( x_samples, self.x_marginal_cdfs, self.x_marginal_pdfs, self.z_joint_density) if not log: return vals return np.log(vals)
[docs] def num_vars(self): return self.nvars
[docs] def rvs(self, nsamples, return_all=False): out = generate_x_samples_using_gaussian_copula( self.nvars, self.z_correlation, self.x_marginal_inv_cdfs, nsamples) if not return_all: return out[0] return out
[docs] def marginals(self): return self._marginals
def define_iid_random_variable(rv, num_vars): """ Create independent identically distributed variables Parameters ---------- rv : :class:`scipy.stats.dist` A 1D random variable object num_vars : integer The number of 1D variables Returns ------- variable : :class:`pyapprox.variables.IndependentMarginalsVariable` The multivariate random variable """ unique_variables = [rv] unique_var_indices = [np.arange(num_vars)] return IndependentMarginalsVariable( unique_variables, unique_var_indices) def get_truncated_ranges(variable, unbounded_alpha=0.99, bounded_alpha=1.0): r""" Get truncated ranges for independent random variables or Copulas Parameters ---------- variable : :class:`pyapprox.variables.IndependentMarginalsVariable` Variable unbounded_alpha : float fraction in (0, 1) of probability captured by ranges for unbounded random variables bounded_alpha : float fraction in (0, 1) of probability captured by ranges for bounded random variables. bounded_alpha < 1 is useful when variable is bounded but is used in a copula Returns ------- ranges : np.ndarray (2*nvars) The finite (possibly truncated) ranges of the random variables [lb0, ub0, lb1, ub1, ...] """ ranges = [] if (type(variable) == GaussCopulaVariable) and (bounded_alpha == 1): bounded_alpha = unbounded_alpha for rv in variable.marginals(): ranges += get_truncated_range(rv, unbounded_alpha, bounded_alpha) return np.array(ranges)
[docs]def combine_uncertain_and_bounded_design_variables( random_variable, design_variable, random_variable_indices=None): """ Convert design variables to random variables defined over them optimization bounds. Parameters ---------- random_variable_indices : np.ndarray The variable numbers of the random variables in the new combined variable. """ if random_variable_indices is None: random_variable_indices = np.arange(random_variable.num_vars()) if len(random_variable_indices) != random_variable.num_vars(): raise ValueError nvars = random_variable.num_vars() + design_variable.num_vars() design_variable_indices = np.setdiff1d( np.arange(nvars), random_variable_indices) variable_list = [None for ii in range(nvars)] all_random_variables = random_variable.marginals() for ii in range(random_variable.num_vars()): variable_list[random_variable_indices[ii]] = all_random_variables[ii] for ii in range(design_variable.num_vars()): lb = design_variable.bounds.lb[ii] ub = design_variable.bounds.ub[ii] if not np.isfinite(lb) or not np.isfinite(ub): raise ValueError(f"Design variable {ii} is not bounded") rv = stats.uniform(lb, ub-lb) variable_list[design_variable_indices[ii]] = rv return IndependentMarginalsVariable(variable_list)
class DesignVariable(object): """ Design variables with no probability information """ def __init__(self, bounds): """ Constructor method Parameters ---------- bounds : array_like Lower and upper bounds for each variable [lb0,ub0, lb1, ub1, ...] """ self.bounds = bounds def num_vars(self): """ Return The number of independent 1D variables Returns ------- nvars : integer The number of independent 1D variables """ return len(self.bounds.lb)