Source code for pyapprox.variables.transforms
import numpy as np
from pyapprox.variables.marginals import (
is_bounded_continuous_variable, transform_scale_parameters
)
from pyapprox.variables.rosenblatt import (
rosenblatt_transformation, inverse_rosenblatt_transformation
)
from pyapprox.variables.nataf import (
covariance_to_correlation, trans_x_to_u, trans_u_to_x,
transform_correlations, scipy_gauss_hermite_pts_wts_1D
)
from pyapprox.variables.joint import (
define_iid_random_variable, IndependentMarginalsVariable
)
def _map_hypercube_samples(current_samples, current_ranges, new_ranges):
# no error checking or notion of active_vars
clbs, cubs = current_ranges[0::2], current_ranges[1::2]
nlbs, nubs = new_ranges[0::2], new_ranges[1::2]
return ((current_samples.T-clbs)/(cubs-clbs)*(nubs-nlbs)+nlbs).T
def map_hypercube_samples(current_samples, current_ranges, new_ranges,
active_vars=None, tol=2*np.finfo(float).eps):
"""
Transform samples from one hypercube to another hypercube with different
bounds.
Parameters
----------
current_samples : np.ndarray (num_vars, num_samples)
The samples to be transformed
current_ranges : np.ndarray (2*num_vars)
The lower and upper bound of each variable of the current samples
[lb_1,ub_1,...,lb_d,ub_d]
new_ranges : np.ndarray (2*num_vars)
The desired lower and upper bound of each variable
[lb_1,ub_1,...,lb_d,ub_d]
active_vars : np.ndarray (num_active_vars)
The active vars to which the variable transformation should be applied
The inactive vars have the identity applied, i.e. they remain
unchanged.
tol : float
Some functions such as optimizers will create points very close
but outside current bounds. In this case allow small error
and move these points to boundary
Returns
-------
new_samples : np.ndarray (num_vars, num_samples)
The transformed samples
"""
new_ranges = np.asarray(new_ranges)
current_ranges = np.asarray(current_ranges)
if np.allclose(new_ranges, current_ranges):
return current_samples
new_samples = current_samples.copy()
num_vars = current_samples.shape[0]
assert num_vars == new_ranges.shape[0]//2
assert num_vars == current_ranges.shape[0]//2
clbs, cubs = current_ranges[0::2], current_ranges[1::2]
nlbs, nubs = new_ranges[0::2], new_ranges[1::2]
if np.any((current_samples.min(axis=1)) <= clbs-tol):
raise ValueError("samples outside lower bounds")
if np.any((current_samples.max(axis=1)) >= cubs+tol):
raise ValueError("samples outside upper bounds")
II = np.where(current_samples.T < clbs)
JJ = np.where(current_samples.T > cubs)
if II[0].shape[0] > 0:
current_samples[II[1], II[0]] = clbs[II[1]]
if JJ[0].shape[0] > 0:
current_samples[JJ[1], JJ[0]] = cubs[JJ[1]]
if active_vars is None:
return _map_hypercube_samples(
current_samples, current_ranges, new_ranges)
new_samples[active_vars] = (
(current_samples[active_vars].T-clbs[active_vars])/(
cubs[active_vars]-clbs[active_vars])*(
nubs[active_vars]-nlbs[active_vars])+nlbs[active_vars]).T
return new_samples
class IdentityTransformation(object):
def __init__(self, num_vars):
self.nvars = num_vars
def map_from_canonical(self, samples):
return samples
def map_to_canonical(self, samples):
return samples
def num_vars(self):
return self.nvars
def map_derivatives_from_canonical_space(self, derivatives):
return derivatives
class AffineBoundedVariableTransformation(object):
def __init__(self, canonical_ranges, user_ranges):
assert len(user_ranges) == len(canonical_ranges)
self.canonical_ranges = np.asarray(canonical_ranges)
self.user_ranges = np.asarray(user_ranges)
self.nvars = int(len(self.user_ranges)/2)
def map_from_canonical(self, canonical_samples):
return map_hypercube_samples(
canonical_samples, self.canonical_ranges, self.user_ranges)
def map_to_canonical(self, user_samples):
return map_hypercube_samples(
user_samples, self.user_ranges, self.canonical_ranges)
def num_vars(self):
return self.nvars
[docs]class AffineTransform(object):
r"""
Apply an affine transformation to a
:py:class:`pyapprox.variables.IndependentMarginalsVariable`
"""
def __init__(self, variable, enforce_bounds=False):
"""
Variable uniquness dependes on both the type of random variable
e.g. beta, gaussian, etc. and the parameters of that distribution
e.g. loc and scale parameters as well as any additional parameters
"""
if (type(variable) != IndependentMarginalsVariable):
variable = IndependentMarginalsVariable(variable)
self.variable = variable
self.enforce_bounds = enforce_bounds
self.identity_map_indices = None
self.scale_parameters = np.empty((self.variable.nunique_vars, 2))
for ii in range(self.variable.nunique_vars):
var = self.variable.unique_variables[ii]
# name, scale_dict, __ = get_distribution_info(var)
# copy is essential here because code below modifies scale
# loc, scale = scale_dict['loc'].copy(), scale_dict['scale'].copy()
# if (is_bounded_continuous_variable(var) or
# (type(var.dist) == float_rv_discrete and
# var.dist.name != 'discrete_chebyshev')):
# lb, ub = -1, 1
# scale /= (ub-lb)
# loc = loc-scale*lb
self.scale_parameters[ii, :] = np.hstack(
transform_scale_parameters(var))
[docs] def set_identity_maps(self, identity_map_indices):
"""
Set the dimensions we do not want to map to and from
canonical space
Parameters
----------
identity_map_indices : iterable
The dimensions we do not want to map to and from
canonical space
"""
self.identity_map_indices = identity_map_indices
[docs] def map_to_canonical(self, user_samples):
canonical_samples = user_samples.copy().astype(float)
for ii in range(self.variable.nunique_vars):
indices = self.variable.unique_variable_indices[ii]
loc, scale = self.scale_parameters[ii, :]
bounds = [loc-scale, loc+scale]
var = self.variable.unique_variables[ii]
if self.identity_map_indices is not None:
active_indices = np.setdiff1d(
indices, self.identity_map_indices, assume_unique=True)
else:
active_indices = indices
if ((self.enforce_bounds is True) and
(is_bounded_continuous_variable(var) is True) and
((np.any(user_samples[active_indices, :] < bounds[0])) or
(np.any(user_samples[active_indices, :] > bounds[1])))):
II = np.where((user_samples[active_indices, :] < bounds[0]) |
(user_samples[active_indices, :] > bounds[1]))[1]
print(user_samples[np.ix_(active_indices, II)], bounds)
raise ValueError(f'Sample outside the bounds {bounds}')
canonical_samples[active_indices, :] = (
user_samples[active_indices, :]-loc)/scale
return canonical_samples
[docs] def map_from_canonical(self, canonical_samples):
user_samples = canonical_samples.copy().astype(float)
for ii in range(self.variable.nunique_vars):
indices = self.variable.unique_variable_indices[ii]
loc, scale = self.scale_parameters[ii, :]
if self.identity_map_indices is not None:
active_indices = np.setdiff1d(
indices, self.identity_map_indices, assume_unique=True)
else:
active_indices = indices
user_samples[active_indices, :] = \
canonical_samples[active_indices, :]*scale+loc
return user_samples
[docs] def map_to_canonical_1d(self, samples, ii):
for jj in range(self.variable.nunique_vars):
if ii in self.variable.unique_variable_indices[jj]:
loc, scale = self.scale_parameters[jj, :]
if self.identity_map_indices is None:
return (samples-loc)/scale
else:
return samples
raise Exception()
[docs] def map_from_canonical_1d(self, canonical_samples, ii):
for jj in range(self.variable.nunique_vars):
if ii in self.variable.unique_variable_indices[jj]:
loc, scale = self.scale_parameters[jj, :]
if self.identity_map_indices is None:
return canonical_samples*scale+loc
else:
return canonical_samples
raise Exception()
[docs] def map_derivatives_from_canonical_space(self, derivatives):
"""
Parameters
----------
derivatives : np.ndarray (nvars*nsamples, nqoi)
Derivatives of each qoi. The ith column consists of the derivatives
[d/dx_1 f(x^{(1)}), ..., f(x^{(M)}),
d/dx_2 f(x^{(1)}), ..., f(x^{(M)})
...,
d/dx_D f(x^{(1)}), ..., f(x^{(M)})]
where M is the number of samples and D=nvars
Derivatives can also be (nvars, nsamples) - transpose of Jacobian -
Here each sample is considered a different QoI
"""
assert derivatives.shape[0] % self.num_vars() == 0
num_samples = int(derivatives.shape[0]/self.num_vars())
mapped_derivatives = derivatives.copy()
for ii in range(self.variable.nunique_vars):
var_indices = self.variable.unique_variable_indices[ii]
idx = np.tile(var_indices*num_samples, num_samples)+np.tile(
np.arange(num_samples), var_indices.shape[0])
loc, scale = self.scale_parameters[ii, :]
mapped_derivatives[idx, :] /= scale
return mapped_derivatives
[docs] def map_derivatives_to_canonical_space(self, canonical_derivatives):
"""
derivatives : np.ndarray (nvars*nsamples, nqoi)
Derivatives of each qoi. The ith column consists of the derivatives
[d/dx_1 f(x^{(1)}), ..., f(x^{(M)}),
d/dx_2 f(x^{(1)}), ..., f(x^{(M)})
...,
d/dx_D f(x^{(1)}), ..., f(x^{(M)})]
where M is the number of samples and D=nvars
Derivatives can also be (nvars, nsamples) - transpose of Jacobian -
Here each sample is considered a different QoI
"""
assert canonical_derivatives.shape[0] % self.num_vars() == 0
num_samples = int(canonical_derivatives.shape[0]/self.num_vars())
derivatives = canonical_derivatives.copy()
for ii in range(self.variable.nunique_vars):
var_indices = self.variable.unique_variable_indices[ii]
idx = np.tile(var_indices*num_samples, num_samples)+np.tile(
np.arange(num_samples), var_indices.shape[0])
loc, scale = self.scale_parameters[ii, :]
derivatives[idx, :] *= scale
return derivatives
[docs] def samples_of_bounded_variables_inside_domain(self, samples):
for ii in range(self.variable.nunique_vars):
var = self.variable.unique_variables[ii]
lb, ub = var.interval(1)
indices = self.variable.unique_variable_indices[ii]
if (samples[indices, :].max() > ub):
print(samples[indices, :].max(), ub, 'ub violated')
return False
if samples[indices, :].min() < lb:
print(samples[indices, :].min(), lb, 'lb violated')
return False
return True
[docs] def get_ranges(self):
ranges = np.empty((2*self.num_vars()), dtype=float)
for ii in range(self.variable.nunique_vars):
var = self.variable.unique_variables[ii]
lb, ub = var.interval(1)
indices = self.variable.unique_variable_indices[ii]
ranges[2*indices], ranges[2*indices+1] = lb, ub
return ranges
def define_iid_random_variable_transformation(variable_1d, num_vars):
variable = define_iid_random_variable(variable_1d, num_vars)
var_trans = AffineTransform(variable)
return var_trans
[docs]class RosenblattTransform(object):
r"""
Apply the Rosenblatt transformation to an arbitraty multivariate
random variable.
"""
def __init__(self, joint_density, num_vars, opts):
self.joint_density = joint_density
self.limits = opts['limits']
self.num_quad_samples_1d = opts['num_quad_samples_1d']
self.tol = opts.get('tol', 1e-12)
self.num_bins = opts.get('num_bins', 101)
self.nvars = num_vars
self.canonical_variable_types = ['uniform']*self.num_vars()
[docs] def map_from_canonical(self, canonical_samples):
user_samples = inverse_rosenblatt_transformation(
canonical_samples, self.joint_density, self.limits,
self.num_quad_samples_1d, self.tol, self.num_bins)
return user_samples
[docs] def map_to_canonical(self, user_samples):
canonical_samples = rosenblatt_transformation(
user_samples, self.joint_density, self.limits,
self.num_quad_samples_1d)
return canonical_samples
class UniformMarginalTransformation(object):
r"""
Transform variables to have uniform marginals on [0,1]
"""
def __init__(self, x_marginal_cdfs, x_marginal_inv_cdfs,
enforce_open_bounds=True):
"""
enforce_open_bounds: boolean
If True - enforce that canonical samples are in (0,1)
If False - enforce that canonical samples are in [0,1]
"""
self.nvars = len(x_marginal_cdfs)
self.x_marginal_cdfs = x_marginal_cdfs
self.x_marginal_inv_cdfs = x_marginal_inv_cdfs
self.enforce_open_bounds = enforce_open_bounds
def map_from_canonical(self, canonical_samples):
# there is a singularity at the boundary of the unit hypercube when
# mapping to the (semi) unbounded distributions
if self.enforce_open_bounds:
assert canonical_samples.min() > 0 and canonical_samples.max() < 1
user_samples = np.empty_like(canonical_samples)
for ii in range(self.nvars):
user_samples[ii, :] = self.x_marginal_inv_cdfs[ii](
canonical_samples[ii, :])
return user_samples
def map_to_canonical(self, user_samples):
canonical_samples = np.empty_like(user_samples)
for ii in range(self.nvars):
canonical_samples[ii, :] = self.x_marginal_cdfs[ii](
user_samples[ii, :])
return canonical_samples
def num_vars(self):
return self.nvars
[docs]class NatafTransform(object):
r"""
Apply the Nataf transformation to an arbitraty multivariate
random variable.
"""
def __init__(self, x_marginal_cdfs, x_marginal_inv_cdfs,
x_marginal_pdfs, x_covariance, x_marginal_means,
bisection_opts=dict()):
self.nvars = len(x_marginal_cdfs)
self.x_marginal_cdfs = x_marginal_cdfs
self.x_marginal_inv_cdfs = x_marginal_inv_cdfs
self.x_marginal_pdfs = x_marginal_pdfs
self.x_marginal_means = x_marginal_means
self.x_correlation = covariance_to_correlation(x_covariance)
self.x_marginal_stdevs = np.sqrt(np.diag(x_covariance))
quad_rule = scipy_gauss_hermite_pts_wts_1D(11)
self.z_correlation = transform_correlations(
self.x_correlation, self.x_marginal_inv_cdfs,
self.x_marginal_means, self.x_marginal_stdevs, quad_rule,
bisection_opts)
self.z_correlation_cholesky_factor = np.linalg.cholesky(
self.z_correlation)
[docs] def map_from_canonical(self, canonical_samples):
return trans_u_to_x(
canonical_samples, self.x_marginal_inv_cdfs,
self.z_correlation_cholesky_factor)
[docs] def map_to_canonical(self, user_samples):
return trans_x_to_u(
user_samples, self.x_marginal_cdfs,
self.z_correlation_cholesky_factor)
class ComposeTransforms(object):
r"""
Apply a composition of transformation to an multivariate
random variable.
"""
def __init__(self, transformations):
"""
Parameters
----------
transformations : list of transformation objects
The transformations are applied first to last for
map_to_canonical and in reverse order for
map_from_canonical
"""
self.transformations = transformations
def map_from_canonical(self, canonical_samples):
user_samples = canonical_samples
for ii in range(len(self.transformations)-1, -1, -1):
user_samples = \
self.transformations[ii].map_from_canonical(user_samples)
return user_samples
def map_to_canonical(self, user_samples):
canonical_samples = user_samples
for ii in range(len(self.transformations)):
canonical_samples = \
self.transformations[ii].map_to_canonical(
canonical_samples)
return canonical_samples
def num_vars(self):
return self.transformations[0].num_vars()
[docs]class ConfigureVariableTransformation(object):
"""
Class which maps one-to-one configure indices in [0, 1, 2, 3,...]
to a set of configure values accepted by a function
"""
def __init__(self, config_values, labels=None):
"""
Parameters
----------
nvars : integer
The number of configure variables
config_values : list
The list of configure values for each configure variable.
Each entry in the list is a 1D np.ndarray with potentiallly
different sizes
"""
self.nvars = len(config_values)
assert (type(config_values[0]) == list or
type(config_values[0]) == np.ndarray)
self.config_values = config_values
self.variable_labels = labels
[docs] def map_from_canonical(self, canonical_samples):
"""
Map a configure multi-dimensional index to the corresponding
configure values
"""
assert canonical_samples.shape[0] == self.nvars
samples = np.empty_like(canonical_samples, dtype=float)
for ii in range(samples.shape[1]):
for jj in range(self.nvars):
kk = canonical_samples[jj, ii]
samples[jj, ii] = self.config_values[jj][int(kk)]
return samples
[docs] def map_to_canonical(self, samples):
"""
This is the naive slow implementation that searches through all
canonical samples to find one that matches each sample provided
"""
assert samples.shape[0] == self.nvars
canonical_samples = np.empty_like(samples, dtype=float)
for ii in range(samples.shape[1]):
for jj in range(self.nvars):
found = False
for kk in range(len(self.config_values[jj])):
if samples[jj, ii] == self.config_values[jj][int(kk)]:
found = True
break
if not found:
raise Exception("Configure value not found")
canonical_samples[jj, ii] = kk
return canonical_samples
[docs] def num_vars(self):
"""Return the number of configure variables.
Returns
-------
The number of configure variables
"""
return self.nvars
def __repr__(self):
return "{0}(nvars={1}, {2})".format(
self.__class__.__name__, self.num_vars(), self.config_values)