Source code for pyapprox.variables.sampling

import numpy as np

from pyapprox.variables.joint import IndependentMarginalsVariable





def generate_independent_random_samples(variable, num_samples,
                                        random_state=None):
    """
    Generate samples from a tensor-product probability measure. This function
    still exists to maintain backwards compatability. Please use
    variable.rvs() instead

    Parameters
    ----------
    num_samples : integer
        The number of samples to generate

    Returns
    -------
    samples : np.ndarray (num_vars, num_samples)
        Independent samples from the target distribution
    """
    assert type(variable) == IndependentMarginalsVariable, \
        "`variable` must be of IndependentMarginalsVariable type"
    return variable.rvs(num_samples, random_state)


def rejection_sampling(target_density, proposal_density,
                       generate_proposal_samples, envelope_factor,
                       num_vars, num_samples, verbose=False,
                       batch_size=None):
    """
    Obtain samples from a density f(x) using samples from a proposal
    distribution g(x).

    Parameters
    ----------
    target_density : callable vals = target_density(samples)
        The target density f(x)

    proposal_density : callable vals = proposal_density(samples)
        The proposal density g(x)

    generate_proposal_samples : callable samples = generate_samples(num_samples)
        Generate samples from the proposal density

    envelope_factor : double
        Factor M that satifies f(x)<=Mg(x). Set M such that inequality is as
        close to equality as possible

    num_vars : integer
        The number of variables

    num_samples : integer
        The number of samples required

    verbose : boolean
        Flag specifying whether to print diagnostic information

    batch_size : integer
        The number of evaluations of each density to be performed in a batch.
        Almost always we should set batch_size=num_samples

    Returns
    -------
    samples : np.ndarray (num_vars, num_samples)
        Independent samples from the target distribution
    """
    if batch_size is None:
        batch_size = num_samples

    cntr = 0
    num_proposal_samples = 0
    samples = np.empty((num_vars, num_samples), dtype=float)
    while cntr < num_samples:
        proposal_samples = generate_proposal_samples(batch_size)
        target_density_vals = target_density(proposal_samples)
        proposal_density_vals = proposal_density(proposal_samples)
        assert target_density_vals.shape[0] == batch_size
        assert proposal_density_vals.shape[0] == batch_size
        urand = np.random.uniform(0., 1., (batch_size))

        # ensure envelop_factor is large enough
        if np.any(target_density_vals > (envelope_factor*proposal_density_vals)):
            I = np.argmax(
                target_density_vals/(envelope_factor*proposal_density_vals))
            msg = 'proposal_density*envelop factor does not bound target '
            msg += 'density: %f,%f' % (
                target_density_vals[I],
                (envelope_factor*proposal_density_vals)[I])
            raise ValueError(msg)

        I = np.where(
            urand < target_density_vals/(envelope_factor*proposal_density_vals))[0]

        num_batch_samples_accepted = min(I.shape[0], num_samples-cntr)
        I = I[:num_batch_samples_accepted]
        samples[:, cntr:cntr+num_batch_samples_accepted] = proposal_samples[:, I]
        cntr += num_batch_samples_accepted
        num_proposal_samples += batch_size

    if verbose:
        print(('num accepted', num_samples))
        print(('num rejected', num_proposal_samples-num_samples))
        print(('inverse envelope factor', 1/envelope_factor))
        print(('acceptance probability', float(
            num_samples)/float(num_proposal_samples)))
    return samples


def discrete_sampling(N, probs, states=None):
    r"""
    discrete_sampling -- samples iid from a discrete probability measure

    x = discrete_sampling(N, prob, states)

    Generates N iid samples from a random variable X whose probability mass
    function is

    prob(X = states[j]) = prob[j],    1 <= j <= length(prob).

    If states is not given, the states are gives by 1 <= state <= length(prob)
    """

    p = probs.squeeze()/np.sum(probs)

    bins = np.digitize(
        np.random.uniform(0., 1., (N, 1)), np.hstack((0, np.cumsum(p))))-1

    if states is None:
        x = bins
    else:
        assert(states.shape[0] == probs.shape[0])
        x = states[bins]

    return x.squeeze()