Package riid

PyRIID

Python Version from PEP 621 TOML PyPI

PyRIID is a Python package providing modeling and data synthesis utilities for machine learning-based research and development of radioisotope-related detection, identification, and quantification.

Installation

Requirements:

  • Python version: 3.10 to 3.12
  • Note: we recommended the highest Python version you can manage as anecdotally, we have noticed that everything just tends to get faster.
  • Operating systems: Windows, Mac, or Ubuntu

Tests and examples are run via Actions on many combinations of Python version and operating system. You can verify support for your platform by checking the workflow files.

For Use

To use the latest version on PyPI, run:

pip install riid

Note that changes are slower to appear on PyPI, so for the latest features, run:**

pip install git+https://github.com/sandialabs/pyriid.git@main

For Development

If you are developing PyRIID, clone this repository and run:

pip install -e ".[dev]"

If you encounter Pylance issues, try:

pip install -e ".[dev]" --config-settings editable_mode=compat

Examples

Examples for how to use this package can be found here.

Tests

Unit tests for this package can be found here.

Run all unit tests with the following:

python -m unittest tests/*.py -v

You can also run one of the run_tests.* scripts, whichever is appropriate for your platform.

Docs

API documentation can be found here.

Docs can be built locally with the following:

pip install -r pdoc/requirements.txt
pdoc riid -o docs/ --html --template-dir pdoc

Contributing

Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change.

Please make sure to update tests as appropriate and adhere to our code of conduct.

Contacts

Maintainers and authors can be found here.

Full copyright details can be found here.

Acknowledgements

Thank you to the U.S. Department of Energy, National Nuclear Security Administration, Office of Defense Nuclear Nonproliferation Research and Development (DNN R&D) for funding that has led to versions 2.0 and 2.1.

Additionally, thank you to the following individuals who have provided invaluable subject-matter expertise:

  • Paul Thelen (also an author)
  • Ben Maestas
  • Greg Thoreson
  • Michael Enghauser
  • Elliott Leonard

Citing

When citing PyRIID, please reference the U.S. Department of Energy Office of Science and Technology Information (OSTI) record here: 10.11578/dc.20221017.2

  1. Alan Van Omen, "A Semi-Supervised Model for Multi-Label Radioisotope Classification and Out-of-Distribution Detection." Diss. 2023. doi: 10.7302/7200.
  2. Tyler Morrow, "Questionnaire for Radioisotope Identification and Estimation from Gamma Spectra using PyRIID v2." United States: N. p., 2023. Web. doi: 10.2172/2229893.
  3. Aaron Fjeldsted, Tyler Morrow, and Douglas Wolfe, "Identifying Signal-to-Noise Ratios Representative of Gamma Detector Response in Realistic Scenarios," 2023 IEEE Nuclear Science Symposium, Medical Imaging Conference and International Symposium on Room-Temperature Semiconductor Detectors (NSS MIC RTSD), Vancouver, BC, Canada, 2023. doi: 10.1109/NSSMICRTSD49126.2023.10337860.
  4. Alan Van Omen and Tyler Morrow, "A Semi-supervised Learning Method to Produce Explainable Radioisotope Proportion Estimates for NaI-based Synthetic and Measured Gamma Spectra." United States: N. p., 2024. Web. doi: 10.2172/2335904.
  5. Alan Van Omen and Tyler Morrow, "Controlling Radioisotope Proportions When Randomly Sampling from Dirichlet Distributions in PyRIID." United States: N. p., 2024. Web. doi: 10.2172/2335905.
  6. Alan Van Omen, Tyler Morrow, et al., "Multilabel Proportion Prediction and Out-of-distribution Detection on Gamma Spectra of Short-lived Fission Products." Annals of Nuclear Energy 208 (2024): 110777. doi: 10.1016/j.anucene.2024.110777.
  7. Aaron Fjeldsted, Tyler Morrow, et al., "A Novel Methodology for Gamma-Ray Spectra Dataset Procurement over Varying Standoff Distances and Source Activities," Nuclear Instruments and Methods in Physics Research Section A (2024): 169681. doi: 10.1016/j.nima.2024.169681.
Expand source code Browse git
# Copyright 2021 National Technology & Engineering Solutions of Sandia, LLC (NTESS).
# Under the terms of Contract DE-NA0003525 with NTESS,
# the U.S. Government retains certain rights in this software.
"""
.. include:: ../README.md
"""
import logging
import os
import sys
from importlib.metadata import version

from riid.data.sampleset import (SampleSet, SpectraState, SpectraType,
                                 read_hdf, read_json, read_pcf)
from riid.data.synthetic.passby import PassbySynthesizer
from riid.data.synthetic.seed import (SeedMixer, SeedSynthesizer,
                                      get_dummy_seeds)
from riid.data.synthetic.static import StaticSynthesizer

HANDLER = logging.StreamHandler(sys.stdout)
logging.root.addHandler(HANDLER)
logging.root.setLevel(logging.DEBUG)
MPL_LOGGER = logging.getLogger("matplotlib")
MPL_LOGGER.setLevel(logging.WARNING)
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "1"

SAMPLESET_HDF_FILE_EXTENSION = ".h5"
SAMPLESET_JSON_FILE_EXTENSION = ".json"
PCF_FILE_EXTENSION = ".pcf"
ONNX_MODEL_FILE_EXTENSION = ".onnx"
TFLITE_MODEL_FILE_EXTENSION = ".tflite"
RIID = "riid"

__version__ = version(RIID)

__pdoc__ = {
    "riid.data.synthetic.seed.SeedMixer.__call__": True,
    "riid.data.synthetic.passby.PassbySynthesizer._generate_single_passby": True,
    "riid.data.sampleset.SampleSet._channels_to_energies": True,
}

__all__ = ["SampleSet", "SpectraState", "SpectraType",
           "read_hdf", "read_json", "read_pcf", "get_dummy_seeds",
           "PassbySynthesizer", "SeedSynthesizer", "StaticSynthesizer", "SeedMixer"]

Sub-modules

riid.anomaly

This module contains an algorithm for detecting statistical anomalies from sequences of gamma spectra.

riid.data

This sub-package contains all utilities for synthesizing, reading, writing, and converting data.

riid.gadras

This module wraps various GADRAS features by calling into the DLLs of a GADRAS installation.

riid.losses

This module contains custom loss functions.

riid.metrics

This module provides custom model metrics.

riid.models

This module contains PyRIID models.

riid.visualize

This module provides visualization functions primarily for visualizing SampleSets.

Functions

def get_dummy_seeds(n_channels: int = 512, live_time: float = 600.0, count_rate: float = 1000.0, normalize: bool = True, rng: numpy.random._generator.Generator = Generator(PCG64)) ‑> SampleSet

Get a random, dummy SampleSet of ideal seeds.

WARNING: the spectra returned by this function each contain one gaussian peak that does not overlap with the peaks of other spectra. Such data is about as ideal as one could hope to be working with and does not represent anything real. Therefore, do not use this data for any purpose other than testing, debugging, or examples where code, not results, is being demonstrated. Any use in scientific studies does not make sense.

Args

n_channels
number of channels in the spectra DataFrame
live_time
collection time on which to base seeds (higher creates a less noisy shape)
count_rate
count rate on which to base seeds (higher creates a less noisy shape)
normalize
whether to apply an L1-norm to the spectra
rng
NumPy random number generator, useful for experiment repeatability

Returns

SampleSet with randomly generated spectra

Expand source code Browse git
def get_dummy_seeds(n_channels: int = 512, live_time: float = 600.0,
                    count_rate: float = 1000.0, normalize: bool = True,
                    rng: Generator = np.random.default_rng()) -> SampleSet:
    """Get a random, dummy `SampleSet` of ideal seeds.

    WARNING: the spectra returned by this function each contain one gaussian peak that does
    not overlap with the peaks of other spectra.  Such data is about as *ideal* as one
    could hope to be working with and does not represent anything real.
    Therefore, **do not** use this data for any purpose other than testing, debugging, or
    examples where code, not results, is being demonstrated. Any use in scientific studies
    does not make sense.

    Args:
        n_channels: number of channels in the spectra DataFrame
        live_time: collection time on which to base seeds
            (higher creates a less noisy shape)
        count_rate: count rate on which to base seeds
            (higher creates a less noisy shape)
        normalize: whether to apply an L1-norm to the spectra
        rng: NumPy random number generator, useful for experiment repeatability

    Returns:
        `SampleSet` with randomly generated spectra
    """
    ss = SampleSet()
    ss.measured_or_synthetic = "synthetic"
    ss.spectra_state = SpectraState.Counts
    ss.spectra_type = SpectraType.BackgroundForeground
    ss.synthesis_info = {
        "subtract_background": True,
    }
    sources = [
        ("Industrial",  "Am241",    "Unshielded Am241"),
        ("Industrial",  "Ba133",    "Unshielded Ba133"),
        ("NORM",        "K40",      "PotassiumInSoil"),
        ("NORM",        "K40",      "Moderately Shielded K40"),
        ("NORM",        "Ra226",    "UraniumInSoil"),
        ("NORM",        "Th232",    "ThoriumInSoil"),
        ("SNM",         "U238",     "Unshielded U238"),
        ("SNM",         "Pu239",    "Unshielded Pu239"),
        ("SNM",         "Pu239",    "Moderately Shielded Pu239"),
        ("SNM",         "Pu239",    "Heavily Shielded Pu239"),
    ]
    n_sources = len(sources)
    n_fg_sources = n_sources
    sources_cols = pd.MultiIndex.from_tuples(
        sources,
        names=SampleSet.SOURCES_MULTI_INDEX_NAMES
    )
    sources_data = np.identity(n_sources)
    ss.sources = pd.DataFrame(data=sources_data, columns=sources_cols)

    histograms = []
    N_FG_COUNTS = int(count_rate * live_time)
    fg_std = np.sqrt(n_channels / n_sources)
    channels_per_sources = n_channels / n_fg_sources
    for i in range(n_fg_sources):
        mu = i * channels_per_sources + channels_per_sources / 2
        counts = rng.normal(mu, fg_std, size=N_FG_COUNTS)
        fg_histogram, _ = np.histogram(counts, bins=n_channels, range=(0, n_channels))
        histograms.append(fg_histogram)
    histograms = np.array(histograms)

    ss.spectra = pd.DataFrame(data=histograms)

    ss.info.total_counts = ss.spectra.sum(axis=1)
    ss.info.live_time = live_time
    ss.info.real_time = live_time
    ss.info.snr = None
    ss.info.ecal_order_0 = 0
    ss.info.ecal_order_1 = 3000
    ss.info.ecal_order_2 = 100
    ss.info.ecal_order_3 = 0
    ss.info.ecal_low_e = 0
    ss.info.description = ""
    ss.update_timestamp()

    if normalize:
        ss.normalize()

    return ss
def read_hdf(path: str | Path) ‑> SampleSet

Read an HDF file in as a SampleSet object.

Args

path
path for the HDF file to be read in

Returns

SampleSet object

Expand source code Browse git
def read_hdf(path: str | Path) -> SampleSet:
    """Read an HDF file in as a `SampleSet` object.

    Args:
        path: path for the HDF file to be read in

    Returns:
        `SampleSet` object
    """
    expanded_path = Path(path).expanduser()
    if not expanded_path.is_file():
        raise FileNotFoundError(f"No file found at location '{expanded_path}'.")

    ss = _read_hdf(expanded_path)

    if not ss:
        raise FileNotFoundError(f"No data found at location '{expanded_path}'.")

    return ss
def read_json(path: str | Path) ‑> SampleSet
Expand source code Browse git
def read_json(path: str | Path) -> SampleSet:
    expanded_path = Path(path).expanduser()
    with open(expanded_path, "r") as fin:
        ss_dict = json.load(fin)
    ss = _pcf_dict_to_ss(ss_dict)
    ss.detector_info = ss_dict["detector_info"]
    return ss
def read_pcf(path: str | Path, verbose: bool = False) ‑> SampleSet

Read a PCF file in as a SampleSet object.

Args

path
path for the PCF file to be read in
verbose
whether to show verbose function output in terminal

Returns

Sampleset object

Expand source code Browse git
def read_pcf(path: str | Path, verbose: bool = False) -> SampleSet:
    """Read a PCF file in as a `SampleSet` object.

    Args:
        path: path for the PCF file to be read in
        verbose: whether to show verbose function output in terminal

    Returns:
        `Sampleset` object
    """
    expanded_path = Path(path).expanduser()
    return _pcf_dict_to_ss(_pcf_to_dict(expanded_path, verbose), verbose)

Classes

class PassbySynthesizer (events_per_seed: int = 2, sample_interval: float = 0.125, dwell_time_function: str = 'uniform', dwell_time_function_args=(0.25, 8.0), fwhm_function: str = 'discrete', fwhm_function_args=(1,), snr_function: str = 'uniform', snr_function_args=(1.0, 10.0), min_fraction: float = 0.005, normalize_sources: bool = True, bg_cps: float = 300.0, long_bg_live_time: float = 120.0, apply_poisson_noise: bool = True, return_fg: bool = True, return_gross: bool = False, rng: numpy.random._generator.Generator = Generator(PCG64))

Synthesizer for creating pass-by events as sequences of gamma spectra.

Args

events_per_seed
number of pass-bys to generate per source-background seed pair
live_time_function
string that names the method of sampling for target live time values (options: uniform, log10, discrete, list)
live_time_function_args
range of values which are sampled in the fashion specified by the live_time_function argument
snr_function
string that names the method of sampling for target signal-to-noise ratio values (options: uniform, log10, discrete, list)
snr_function_args
range of values which are sampled in the fashion specified by the snr_function argument
min_fraction
minimum proportion of peak amplitude to exclude
Expand source code Browse git
class PassbySynthesizer(Synthesizer):
    """Synthesizer for creating pass-by events as sequences of gamma spectra."""

    _supported_functions = ["uniform", "log10", "discrete", "list"]

    def __init__(self, events_per_seed: int = 2, sample_interval: float = 0.125,
                 dwell_time_function: str = "uniform", dwell_time_function_args=(0.25, 8.0),
                 fwhm_function: str = "discrete", fwhm_function_args=(1,),
                 snr_function: str = "uniform", snr_function_args=(1.0, 10.0),
                 min_fraction: float = 0.005, normalize_sources: bool = True,
                 bg_cps: float = 300.0, long_bg_live_time: float = 120.0,
                 apply_poisson_noise: bool = True,
                 return_fg: bool = True, return_gross: bool = False,
                 rng: Generator = np.random.default_rng()):
        """
        Args:
            events_per_seed: number of pass-bys to generate per source-background seed pair
            live_time_function: string that names the method of sampling
                for target live time values (options: uniform, log10, discrete, list)
            live_time_function_args: range of values which are sampled in the
                fashion specified by the `live_time_function` argument
            snr_function: string that names the method of sampling for target
                signal-to-noise ratio values (options: uniform, log10, discrete, list)
            snr_function_args: range of values which are sampled in the fashion
                specified by the `snr_function` argument
            min_fraction: minimum proportion of peak amplitude to exclude
        """
        super().__init__(bg_cps, long_bg_live_time, apply_poisson_noise, normalize_sources,
                         return_fg, return_gross, rng)

        self.events_per_seed = events_per_seed
        self.sample_interval = sample_interval
        self.dwell_time_function = dwell_time_function
        self.dwell_time_function_args = dwell_time_function_args
        self.fwhm_function = fwhm_function
        self.fwhm_function_args = fwhm_function_args
        self.snr_function = snr_function
        self.snr_function_args = snr_function_args
        self.min_fraction = min_fraction

    # region Properties

    @property
    def dwell_time_function(self) -> str:
        """Get or set the function used to randomly sample the desired dwell time space.

        Raises:
            `ValueError` when an unsupported function type is provided
        """
        return self._dwell_time_function

    @dwell_time_function.setter
    def dwell_time_function(self, value: str):
        if value not in self._supported_functions:
            raise ValueError("{} is not a valid function.".format(value))
        self._dwell_time_function = value

    @property
    def dwell_time_function_args(self) -> tuple:
        """Get or set the dwell time space to be randomly sampled."""
        return self._dwell_time_function_args

    @dwell_time_function_args.setter
    def dwell_time_function_args(self, value):
        self._dwell_time_function_args = value

    @property
    def events_per_seed(self) -> int:
        """Get or set the number of samples to create per seed (excluding the background seed).
        """
        return self._events_per_seed

    @events_per_seed.setter
    def events_per_seed(self, value):
        self._events_per_seed = value

    @property
    def fwhm_function(self) -> str:
        """Get or set the function used to randomly sample the desired full-width-half-max (FWHM)
        ratio space.

        Raises:
            `ValueError` when an unsupported function type is provided
        """
        return self._fwhm_function

    @fwhm_function.setter
    def fwhm_function(self, value: str):
        if value not in self._supported_functions:
            raise ValueError("{} is not a valid function.".format(value))
        self._fwhm_function = value

    @property
    def fwhm_function_args(self) -> tuple:
        """Get or set the full-width-half-max (FWHM) space to be randomly sampled."""
        return self._fwhm_function_args

    @fwhm_function_args.setter
    def fwhm_function_args(self, value):
        self._fwhm_function_args = value

    @property
    def min_fraction(self) -> float:
        """Get or set the percentage of the peak amplitude to exclude."""
        return self._min_fraction

    @min_fraction.setter
    def min_fraction(self, value: float):
        self._min_fraction = value

    @property
    def sample_interval(self) -> float:
        """Get or set the sample interval (in seconds) at which the events are simulated."""
        return self._sample_interval

    @sample_interval.setter
    def sample_interval(self, value: float):
        self._sample_interval = value

    @property
    def snr_function(self) -> str:
        """Get or set the function used to randomly sample the desired signal-to-noise
        (SNR) ratio space.

        Raises:
            `ValueError` when an unsupported function type is provided
        """
        return self._snr_function

    @snr_function.setter
    def snr_function(self, value: str):
        if value not in self._supported_functions:
            raise ValueError("{} is not a valid function.".format(value))
        self._snr_function = value

    @property
    def snr_function_args(self) -> tuple:
        """Get or set the signal-to-noise (SNR) space to be randomly sampled."""
        return self._snr_function_args

    @snr_function_args.setter
    def snr_function_args(self, value):
        self._snr_function_args = value

    # endregion

    def _calculate_passby_shape(self, fwhm: float):
        """Calculates a pass-by shape with maximum of 1 which goes from min_fraction to
        min_fraction of signal with specified fwhm.

        Args:
            fwhm: full width at half maximum value to use for calculating
                the passby shape

        Returns:
            Array of floats representing the passby shape
        """
        lim = np.sqrt((1-self.min_fraction)/self.min_fraction)
        samples = np.arange(-lim, lim, self.sample_interval / fwhm / 2)
        return 1 / (np.power(samples, 2) + 1)

    def _generate_single_passby(self, fwhm: float, snr: float, dwell_time: float,
                                fg_seed: np.array, bg_seed: np.array,
                                fg_sources: pd.Series, bg_sources: pd.Series):
        """Generate a `SampleSet` with a sequence of spectra representative of a single pass-by.

        A source template is scaled up and then back down over the duration of a pass-by in a
        Gaussian fashion.
        Each sample has some total counts obtained over `dwell_time / sample_interval` seconds.

        Args:
            fwhm: full width at half maximum of the (bell-like) shape of the source count rate
                over time
            snr: overall signal-to-noise ratio (SNR) of the passby event
            dwell_time: overall time (seconds) the source is present
            fg_seed: source template to use for calculating the passby; the value of each
                channel should be calculated as counts divided by total counts
            bg_seed: background template to use for calculating the passby; the value of each
                channel should be calculated as counts divided by total counts
            fg_ecal: e-cal terms for the provided source seed
            fg_sources: underlying ground truth proportions of anomalous sources
            bg_sources: underlying ground truth proportions of background sources

        Returns:
            `SampleSet` object containing the synthesized pass-by data
        """
        event_snr_targets = self._calculate_passby_shape(fwhm) * snr
        dwell_targets = np.zeros(int(dwell_time / self.sample_interval))
        snr_targets = np.concatenate((event_snr_targets, dwell_targets))
        n_samples = len(snr_targets)
        live_times = np.ones(n_samples) * self.sample_interval

        fg_ss, gross_ss = self._get_batch(
            fg_seed,
            fg_sources,
            bg_seed,
            bg_sources,
            live_times,
            snr_targets
        )

        if self.normalize_sources:
            if self.return_fg:
                fg_ss.normalize_sources()
            if self.return_gross:
                gross_ss.normalize_sources()

        return fg_ss, gross_ss

    def generate(self, fg_seeds_ss: SampleSet, bg_seeds_ss: SampleSet,
                 skip_health_check: bool = False, verbose: bool = True) \
            -> List[Tuple[SampleSet, SampleSet, SampleSet]]:
        """Generate a list of `SampleSet`s where each contains a pass-by as a sequence of spectra.

        Args:
            fg_seeds_ss: spectra normalized by total counts to be used as the
                source component(s) of spectra
            bg_seeds_ss: spectra normalized by total counts to be used as the
                background components of gross spectra
            skip_health_check: whether to skip seed health checks
            verbose: whether to display output from synthesis

        Returns:
            List of tuples of SampleSets where each tuple represents a pass-by event

        Raises:
            - `ValueError` when either foreground of background seeds are not provided
            - `AssertionError` if seed health check fails
        """
        if not fg_seeds_ss or not bg_seeds_ss:
            raise ValueError("At least one foreground and background seed must be provided.")

        if not skip_health_check:
            fg_seeds_ss.check_seed_health()
            bg_seeds_ss.check_seed_health()

        self._reset_progress()
        if verbose:
            tstart = time()

        args = []
        for bg_i in range(bg_seeds_ss.n_samples):
            bg_pmf = bg_seeds_ss.spectra.iloc[bg_i]
            bg_sources = bg_seeds_ss.sources.iloc[bg_i]
            fwhm_targets = get_distribution_values(self.fwhm_function,
                                                   self.fwhm_function_args,
                                                   self.events_per_seed,
                                                   self._rng)
            snr_targets = get_distribution_values(self.snr_function,
                                                  self.snr_function_args,
                                                  self.events_per_seed,
                                                  self._rng)
            dwell_time_targets = get_distribution_values(self.dwell_time_function,
                                                         self.dwell_time_function_args,
                                                         self.events_per_seed,
                                                         self._rng)
            for fg_i in range(fg_seeds_ss.n_samples):
                fg_pmf = fg_seeds_ss.spectra.iloc[fg_i]
                fg_sources = fg_seeds_ss.sources.iloc[fg_i]
                for t_i in range(self.events_per_seed):
                    fwhm = fwhm_targets[t_i]
                    snr = snr_targets[t_i]
                    dwell_time = dwell_time_targets[t_i]
                    pb_args = (fg_i, fwhm, snr, dwell_time, fg_pmf, bg_pmf,
                               fg_sources, bg_sources)
                    args.append(pb_args)

        # TODO: the following prevents periodic progress reports
        passbys = []
        for a in args:
            f, fwhm, snr, dwell_time, fg_pmf, bg_pmf, fg_sources, bg_sources = a
            fg_passby_ss, gross_passby_ss = self._generate_single_passby(
                fwhm, snr, dwell_time, fg_pmf, bg_pmf, fg_sources, bg_sources
            )
            fg_seed_info = fg_seeds_ss.info.iloc[f]
            set_ss_info_from_seed_info(fg_passby_ss, fg_seed_info, self._synthesis_start_dt)
            set_ss_info_from_seed_info(gross_passby_ss, fg_seed_info, self._synthesis_start_dt)

            passbys.append((fg_passby_ss, gross_passby_ss))

        if verbose:
            delay = time() - tstart
            self._report_completion(delay)

        return passbys

Ancestors

Instance variables

var dwell_time_function : str

Get or set the function used to randomly sample the desired dwell time space.

Raises

ValueError when an unsupported function type is provided

Expand source code Browse git
@property
def dwell_time_function(self) -> str:
    """Get or set the function used to randomly sample the desired dwell time space.

    Raises:
        `ValueError` when an unsupported function type is provided
    """
    return self._dwell_time_function
var dwell_time_function_args : tuple

Get or set the dwell time space to be randomly sampled.

Expand source code Browse git
@property
def dwell_time_function_args(self) -> tuple:
    """Get or set the dwell time space to be randomly sampled."""
    return self._dwell_time_function_args
var events_per_seed : int

Get or set the number of samples to create per seed (excluding the background seed).

Expand source code Browse git
@property
def events_per_seed(self) -> int:
    """Get or set the number of samples to create per seed (excluding the background seed).
    """
    return self._events_per_seed
var fwhm_function : str

Get or set the function used to randomly sample the desired full-width-half-max (FWHM) ratio space.

Raises

ValueError when an unsupported function type is provided

Expand source code Browse git
@property
def fwhm_function(self) -> str:
    """Get or set the function used to randomly sample the desired full-width-half-max (FWHM)
    ratio space.

    Raises:
        `ValueError` when an unsupported function type is provided
    """
    return self._fwhm_function
var fwhm_function_args : tuple

Get or set the full-width-half-max (FWHM) space to be randomly sampled.

Expand source code Browse git
@property
def fwhm_function_args(self) -> tuple:
    """Get or set the full-width-half-max (FWHM) space to be randomly sampled."""
    return self._fwhm_function_args
var min_fraction : float

Get or set the percentage of the peak amplitude to exclude.

Expand source code Browse git
@property
def min_fraction(self) -> float:
    """Get or set the percentage of the peak amplitude to exclude."""
    return self._min_fraction
var sample_interval : float

Get or set the sample interval (in seconds) at which the events are simulated.

Expand source code Browse git
@property
def sample_interval(self) -> float:
    """Get or set the sample interval (in seconds) at which the events are simulated."""
    return self._sample_interval
var snr_function : str

Get or set the function used to randomly sample the desired signal-to-noise (SNR) ratio space.

Raises

ValueError when an unsupported function type is provided

Expand source code Browse git
@property
def snr_function(self) -> str:
    """Get or set the function used to randomly sample the desired signal-to-noise
    (SNR) ratio space.

    Raises:
        `ValueError` when an unsupported function type is provided
    """
    return self._snr_function
var snr_function_args : tuple

Get or set the signal-to-noise (SNR) space to be randomly sampled.

Expand source code Browse git
@property
def snr_function_args(self) -> tuple:
    """Get or set the signal-to-noise (SNR) space to be randomly sampled."""
    return self._snr_function_args

Methods

def generate(self, fg_seeds_ss: SampleSet, bg_seeds_ss: SampleSet, skip_health_check: bool = False, verbose: bool = True) ‑> List[Tuple[SampleSetSampleSetSampleSet]]

Generate a list of SampleSets where each contains a pass-by as a sequence of spectra.

Args

fg_seeds_ss
spectra normalized by total counts to be used as the source component(s) of spectra
bg_seeds_ss
spectra normalized by total counts to be used as the background components of gross spectra
skip_health_check
whether to skip seed health checks
verbose
whether to display output from synthesis

Returns

List of tuples of SampleSets where each tuple represents a pass-by event

Raises

  • ValueError when either foreground of background seeds are not provided
  • AssertionError if seed health check fails
Expand source code Browse git
def generate(self, fg_seeds_ss: SampleSet, bg_seeds_ss: SampleSet,
             skip_health_check: bool = False, verbose: bool = True) \
        -> List[Tuple[SampleSet, SampleSet, SampleSet]]:
    """Generate a list of `SampleSet`s where each contains a pass-by as a sequence of spectra.

    Args:
        fg_seeds_ss: spectra normalized by total counts to be used as the
            source component(s) of spectra
        bg_seeds_ss: spectra normalized by total counts to be used as the
            background components of gross spectra
        skip_health_check: whether to skip seed health checks
        verbose: whether to display output from synthesis

    Returns:
        List of tuples of SampleSets where each tuple represents a pass-by event

    Raises:
        - `ValueError` when either foreground of background seeds are not provided
        - `AssertionError` if seed health check fails
    """
    if not fg_seeds_ss or not bg_seeds_ss:
        raise ValueError("At least one foreground and background seed must be provided.")

    if not skip_health_check:
        fg_seeds_ss.check_seed_health()
        bg_seeds_ss.check_seed_health()

    self._reset_progress()
    if verbose:
        tstart = time()

    args = []
    for bg_i in range(bg_seeds_ss.n_samples):
        bg_pmf = bg_seeds_ss.spectra.iloc[bg_i]
        bg_sources = bg_seeds_ss.sources.iloc[bg_i]
        fwhm_targets = get_distribution_values(self.fwhm_function,
                                               self.fwhm_function_args,
                                               self.events_per_seed,
                                               self._rng)
        snr_targets = get_distribution_values(self.snr_function,
                                              self.snr_function_args,
                                              self.events_per_seed,
                                              self._rng)
        dwell_time_targets = get_distribution_values(self.dwell_time_function,
                                                     self.dwell_time_function_args,
                                                     self.events_per_seed,
                                                     self._rng)
        for fg_i in range(fg_seeds_ss.n_samples):
            fg_pmf = fg_seeds_ss.spectra.iloc[fg_i]
            fg_sources = fg_seeds_ss.sources.iloc[fg_i]
            for t_i in range(self.events_per_seed):
                fwhm = fwhm_targets[t_i]
                snr = snr_targets[t_i]
                dwell_time = dwell_time_targets[t_i]
                pb_args = (fg_i, fwhm, snr, dwell_time, fg_pmf, bg_pmf,
                           fg_sources, bg_sources)
                args.append(pb_args)

    # TODO: the following prevents periodic progress reports
    passbys = []
    for a in args:
        f, fwhm, snr, dwell_time, fg_pmf, bg_pmf, fg_sources, bg_sources = a
        fg_passby_ss, gross_passby_ss = self._generate_single_passby(
            fwhm, snr, dwell_time, fg_pmf, bg_pmf, fg_sources, bg_sources
        )
        fg_seed_info = fg_seeds_ss.info.iloc[f]
        set_ss_info_from_seed_info(fg_passby_ss, fg_seed_info, self._synthesis_start_dt)
        set_ss_info_from_seed_info(gross_passby_ss, fg_seed_info, self._synthesis_start_dt)

        passbys.append((fg_passby_ss, gross_passby_ss))

    if verbose:
        delay = time() - tstart
        self._report_completion(delay)

    return passbys
class SampleSet

A collection of spectrum samples and their metadata.

Expected sizes for DataFrames: self._spectra: [n_samples, n_channels] self._sources: [n_samples, n_sources]

Expand source code Browse git
class SampleSet():
    """A collection of spectrum samples and their metadata."""
    DEAD_TIME_PROP_INFO_KEY = "dead_time_prop"
    SOURCES_MULTI_INDEX_NAMES = (
        "Category",
        "Isotope",
        "Seed"
    )
    ECAL_INFO_COLUMNS = (
        "ecal_order_0",
        "ecal_order_1",
        "ecal_order_2",
        "ecal_order_3",
        "ecal_low_e"
    )
    DEFAULT_INFO_COLUMNS = (
        "description",
        "timestamp",
        "live_time",
        "real_time",
        "total_counts",
        "snr",
        "neutron_counts",
        "distance_cm",
        *ECAL_INFO_COLUMNS,
        "areal_density",
        "atomic_number",
        "occupancy_flag",
        "tag",
    )
    DEFAULT_BG_SEED_NAMES = [
        "Cosmic",
        "PotassiumInSoil",
        "UraniumInSoil",
        "ThoriumInSoil",
    ]
    SUPPORTED_STATES_FOR_ARITHMETIC = (
        SpectraState.Counts.value,
        SpectraState.L1Normalized.value
    )
    DEFAULT_EXCLUSIONS_FROM_COMPARISON = (
        "timestamp",
        "description",
        "occupancy_flag",
        "tag",
    )

    def __init__(self):
        """
        Expected sizes for DataFrames:
            self._spectra: [n_samples, n_channels]
            self._sources: [n_samples, n_sources]
        """
        self._spectra = pd.DataFrame()
        self._sources = pd.DataFrame()
        self._info = pd.DataFrame(columns=SampleSet.DEFAULT_INFO_COLUMNS)
        self._detector_info = {}
        self._synthesis_info = {}
        self._prediction_probas = pd.DataFrame()
        self._measured_or_synthetic = None
        self.pyriid_version = riid.__version__
        self._spectra_state = SpectraState.Unknown
        self._spectra_type = SpectraType.Unknown
        self._classified_by = ""

    def __bool__(self):
        return bool(len(self))

    def __getitem__(self, key: Union[slice, int, list]):
        selection = key
        if isinstance(key, int):
            selection = slice(key, key+1)
        elif isinstance(key, pd.Series) and key.dtype == bool:
            selection = self.spectra.index[key]

        sub_ss = copy.copy(self)
        sub_ss.spectra = sub_ss.spectra.iloc[selection].reset_index(drop=True)
        sub_ss.sources = sub_ss.sources.iloc[selection].reset_index(drop=True)
        sub_ss.info = sub_ss.info.iloc[selection].reset_index(drop=True)
        if not sub_ss.prediction_probas.empty:
            sub_ss.prediction_probas = sub_ss.prediction_probas.iloc[selection] \
                .reset_index(drop=True)

        return sub_ss

    def __len__(self):
        return self.n_samples

    def __str__(self):
        UNKNOWN_VALUE = "Unknown"
        detector_info_str = self.detector_info if self.detector_info else UNKNOWN_VALUE
        sources_present_str = ", ".join(sorted(np.unique(self.get_labels()))) \
            if not self.sources.empty \
            else UNKNOWN_VALUE
        predictions_str = ", ".join(sorted(np.unique(self.get_labels()))) \
            if not self.prediction_probas.empty \
            else "None"
        pyriid_version_str = self.pyriid_version if self.pyriid_version else UNKNOWN_VALUE
        info_dict = {
            "# of samples": self.n_samples,
            "# of channels": self.n_channels,
            "Detector": detector_info_str,
            "Present predictions": predictions_str,
            "Present sources": sources_present_str,
            "PyRIID version": pyriid_version_str,
        }
        summary = "\n".join(_dict_to_bulleted_list(info_dict))
        return f"SampleSet Summary:\n{summary}"

    def __repr__(self):
        return self.__str__()

    def __eq__(self, ss: SampleSet):
        return np.allclose(self._spectra.values, ss._spectra.values, atol=1e-3)

    def _check_arithmetic_supported(self, ss2: SampleSet):
        if ss2.n_samples != 1 and ss2.n_samples != self.n_samples:
            n_sample_str = (
                "You can only add/subtract SampleSet objects "
                "when the second SampleSet has only one spectrum "
                "(the spectrum will be scaled to match each sample of the first SampleSet) or "
                "when the second SampleSet has the same number of samples as the first "
                "(no rescaling will occur)."
            )
            raise InvalidSampleCountError(n_sample_str)
        if self.n_channels != ss2.n_channels:
            channel_str = f"({self.n_channels} != {ss2.n_channels})"
            raise ChannelCountMismatchError(f"Mismatched spectra channels {channel_str}!")
        if self.spectra_state != ss2.spectra_state:
            state_str = f"({self.spectra_state} != {ss2.spectra_state})"
            raise SpectraStateMismatchError(f"Mismatched spectra states {state_str}!")
        if self.spectra_state.value not in self.SUPPORTED_STATES_FOR_ARITHMETIC:
            raise ValueError(f"{self.spectra_state} spectra state not supported for arithmetic!")
            # Don't need to check spectra state of ss2 since they passed the prior equality check

    def _get_scaled_spectra(self, ss: SampleSet) -> np.ndarray:
        spectra_in_counts = ss.spectra.iloc[0].values.copy()
        if ss.spectra_state == SpectraState.L1Normalized:
            spectra_in_counts *= ss.info.iloc[0].total_counts
        live_times = ss.info.iloc[0].live_time
        spectra_in_cps = spectra_in_counts / live_times
        scaled_spectra = np.concatenate(
            spectra_in_cps * self.info.live_time.values[:, np.newaxis, np.newaxis]
        )
        return scaled_spectra

    def _get_arithmetic_result(self, ss: SampleSet, op) -> SampleSet:
        self._check_arithmetic_supported(ss)

        if ss.n_samples == 1:
            scaled_spectra = self._get_scaled_spectra(ss)
        else:
            scaled_spectra = ss.spectra.values  # Assumed to already be scaled
        new_ss = self[:]

        is_l1_normalized = new_ss.spectra_state == SpectraState.L1Normalized
        if is_l1_normalized:
            new_ss.spectra = new_ss.spectra.multiply(new_ss.info.total_counts, axis=0)

        new_ss.spectra = op(new_ss.spectra, scaled_spectra)

        if is_l1_normalized:
            new_ss.normalize()

        return new_ss

    def __add__(self, bg_ss: SampleSet) -> SampleSet:
        """Add the given background spectr(um|a) to the spectra of the current SampleSet.
        """
        if bg_ss.spectra_type.value != SpectraType.Background.value:
            msg = (
                "`bg_ss` argument must have a `spectra_type` of `Background`. "
                f"Its `spectra_type` is `{bg_ss.spectra_type}`."
            )
            raise ValueError(msg)
        if self.spectra_type.value != SpectraType.Gross.value and \
           self.spectra_type.value != SpectraType.Foreground.value:
            msg = (
                "Current `SampleSet` must have a `spectra_type` of `Gross` or `Foreground`. "
                f"Current `spectra_type` is `{self.spectra_type}`."
            )
            raise ValueError(msg)

        new_ss = self._get_arithmetic_result(bg_ss, operator.add)
        new_ss.spectra_type = SpectraType.Gross
        return new_ss

    def __sub__(self, ss: SampleSet) -> SampleSet:
        """Subtract the given background or foreground spectr(um|a) from the spectra of the current
        `SampleSet`.
        """
        if ss.spectra_type.value != SpectraType.Background.value and \
           ss.spectra_type.value != SpectraType.Foreground.value:
            msg = (
                "`ss` argument must have a `spectra_type` of `Background` of `Foreground`. "
                f"Its `spectra_type` is `{ss.spectra_type}`."
            )
            raise ValueError(msg)
        if self.spectra_type.value != SpectraType.Gross.value:
            msg = (
                "Current `SampleSet` must have a `spectra_type` of `Gross`. "
                f"Current `spectra_type` is `{self.spectra_type}`."
            )
            raise ValueError(msg)

        new_ss = self._get_arithmetic_result(ss, operator.sub)
        if ss.spectra_type == SpectraType.Background:
            new_ss.spectra_type = SpectraType.Foreground
        else:
            new_ss.spectra_type = SpectraType.Background
        return new_ss

    # region Properties

    @property
    def category_names(self):
        """Get the names of the categories involved in this SampleSet."""
        return self.sources.columns.get_level_values("Category")

    @property
    def classified_by(self):
        """Get or set the UUID of the model that classified the SampleSet.

        TODO: Implement as third dimension to `prediction_probas` DataFrame
        """
        return self._classified_by

    @classified_by.setter
    def classified_by(self, value):
        self._classified_by = value

    @property
    def detector_info(self):
        """Get or set the detector info on which this `SampleSet` is based.

        TODO: implement as DataFrame
        """
        return self._detector_info

    @detector_info.setter
    def detector_info(self, value):
        self._detector_info = value

    @property
    def difficulty_score(self, mean=10.0, std=3.0) -> float:
        """Compute the "difficulty" of the `SampleSet` on a scale of 0 to 1,
        where 0 is easiest and 1 is hardest.

        The difficulty of a `SampleSet` is the mean of the individual sample difficulties.
        Each sample's difficulty is determined by where its signal strength (SNR)
        falls on the survival function (AKA reliability function, or 1 - CDF) for a
        logistic distribution.
        The decision to use this distribution is based on empirical data showing that
        classifier performance increases in a logistic fashion as signal strength increases.

        The primary use of this function is for situations where ground truth is unknown
        and you want to get an idea of how "difficult" it will be for a model
        to make predictions.  Consider the following example:
            Given a `SampleSet` for which ground truth is unknown and the signal strength for
            each spectrum is low.
            A model considering, say, 100+ isotopes will see this `SampleSet` as quite
            difficult, whereas a model considering 5 isotopes will, being more specialized,
            see the `SampleSet` as easier.
            Of course, this makes the assumption that both models were trained to the
            isotope(s) contained in the test `SampleSet`.

        Based on the previous example, the unfortunate state of this function is that you
        must know how to pick means and standard deviations which properly reflect the:

        - number of target isotopes
        - amount of variation within each isotope (i.e., shielding, scattering, etc.)
        - detector resolution

        A future goal of ours is to provide updates to this function and docstring which make
        choosing the mean and standard deviation easier/automated based on more of our findings.

        Arguments:
            mean: SNR value representing the mean of the logistic distribution
            std: standard deviation of the logistic distribution

        Returns:
            The mean of all SNR values passed through a logistic survival function
        """
        snrs: np.ndarray = self.info.snr.clip(1e-6)
        score = float(stats.logistic.sf(snrs, loc=mean, scale=std).mean())

        return score

    @property
    def ecal(self):
        """Get or set the ecal terms."""
        ecal_terms = self.info[list(self.ECAL_INFO_COLUMNS)].to_numpy(dtype=float)
        return ecal_terms

    @ecal.setter
    def ecal(self, value):
        self.info.loc[:, self.ECAL_INFO_COLUMNS[0]] = value[0]
        self.info.loc[:, self.ECAL_INFO_COLUMNS[1]] = value[1]
        self.info.loc[:, self.ECAL_INFO_COLUMNS[2]] = value[2]
        self.info.loc[:, self.ECAL_INFO_COLUMNS[3]] = value[3]
        self.info.loc[:, self.ECAL_INFO_COLUMNS[4]] = value[4]

    @property
    def info(self):
        """Get or set the info DataFrame."""
        return self._info

    @info.setter
    def info(self, value):
        self._info = value

    @property
    def isotope_names(self):
        """Get the names of the isotopes involved in this SampleSet."""
        return self.sources.columns.get_level_values("Isotope")

    @property
    def measured_or_synthetic(self):
        """Get or set the value for measured_or_synthetic for the SampleSet object."""
        return self._measured_or_synthetic

    @measured_or_synthetic.setter
    def measured_or_synthetic(self, value):
        self._measured_or_synthetic = value

    @property
    def n_channels(self):
        """Get the number of channels included in the spectra DataFrame.
        Channels may also be referred to as bins.
        """
        return self._spectra.shape[1]

    @property
    def n_samples(self):
        """Get the number of samples included in the spectra dataframe,
        where each row is a sample.
        """
        return self._spectra.shape[0]

    @property
    def prediction_probas(self):
        """Get or set the prediction_probas DataFrame."""
        return self._prediction_probas

    @prediction_probas.setter
    def prediction_probas(self, value):
        if isinstance(value, pd.DataFrame):
            self._prediction_probas = value
        else:
            self._prediction_probas = pd.DataFrame(value, columns=self.sources.columns)

    @property
    def seed_names(self):
        """Get the names of the seeds involved in this `SampleSet`."""
        return self.sources.columns.get_level_values("Seed")

    @property
    def sources(self):
        """Get or set the sources `DataFrame`."""
        return self._sources

    @sources.setter
    def sources(self, value):
        self._sources = value.replace(np.nan, 0)

    @property
    def spectra(self):
        """Get or set the spectra `DataFrame`."""
        return self._spectra

    @spectra.setter
    def spectra(self, value):
        self._spectra = value

    @property
    def spectra_state(self: SpectraState):
        """Get or set the spectra state."""
        return self._spectra_state

    @spectra_state.setter
    def spectra_state(self, value: SpectraState):
        self._spectra_state = value

    @property
    def spectra_type(self: SpectraType):
        """Get or set the spectra type."""
        return self._spectra_type

    @spectra_type.setter
    def spectra_type(self, value: SpectraType):
        self._spectra_type = value

    @property
    def synthesis_info(self):
        """Get or set the information that was used to by a synthesizer."""
        return self._synthesis_info

    @synthesis_info.setter
    def synthesis_info(self, value):
        self._synthesis_info = value

    # endregion

    def _channels_to_energies(self, fractional_energy_bins,
                              offset: float, gain: float, quadratic: float, cubic: float,
                              low_energy: float) -> np.ndarray:
        r"""Convert fractional energy bins to the energy represented by the lower edge of the bin.

        A spectrum represents a range of energy (e.g., 0 keV to 3000 keV).
        The energy range represented by a spectrum can be inferred from its energy calibration
        (e-cal for short), if known.  Using the convention in ANSI N42.42-2006, the e-cal is
        defined using the following terms:

        - \( a_0 \): offset (ecal_order_0)
        - \( a_1 \): gain (ecal_order_1)
        - \( a_2 \): quadratic (ecal_order_2)
        - \( a_3 \): cubic (ecal_order_3)
        - \( a_4 \): low energy (ecal_low_e)

        The energy represented by the lower edge of channel \( i \) is then calculated as follows:

        $$
        E_i = a_0 + a_1 * x + a_2 * x^2 + a_3 * a^3 + \frac{a_4}{1 + 60x}
        $$

        where \( x \) is the fractional energy of the spectral range.
        Also, under this scheme and ignoring the non-linear terms (e.g., \( a_2 \) through
        \( a_4 \)), the range of energy represented by a spectrum can be quickly inferred
        from the e-cal as:

        - `min_energy = offset`
        - `max_energy = gain + offset`

        Note: spline interpolation is another way to characterize the energy calibration of a
        spectrum. In GADRAS, this is accomplished via deviation pairs; PyRIID does not currently
        support this but may in the future.

        Args:
            fractional_energy_bins: array of fractions representing the spacing of the channels
            offset: e-cal order 0 term
            gain: e-cal order 1 term
            quadratic: e-cal order 2 term
            cubic: e-cal order 3 term
            low_energy: e-cal low energy term

        Returns:
            Array containing the energies represented by the lower edge of each channel
            (hape depends on the shape of input values)
        """
        channel_energies = \
            offset + \
            fractional_energy_bins * gain + \
            fractional_energy_bins**2 * quadratic + \
            fractional_energy_bins**3 * cubic + \
            low_energy / (1 + 60 * fractional_energy_bins)
        return channel_energies

    def _check_target_level(self, target_level,
                            levels_allowed=SOURCES_MULTI_INDEX_NAMES):
        if target_level not in levels_allowed:
            raise ValueError((
                f"'{target_level}' is not an appropriate target level. "
                f"Acceptable values are: {levels_allowed}."
            ))

    def _get_dead_time_proportions(self):
        live_times = self.info.live_time.values
        real_times = self.info.real_time.values
        dead_time_props = (1.0 - live_times / real_times)
        dead_time_props = np.nan_to_num(dead_time_props, nan=1.0)
        return dead_time_props

    def all_spectra_sum_to_one(self, rtol: float = 0.0, atol: float = 1e-4) -> bool:
        """Checks if all spectra are normalized to sum to one."""
        spectra_counts = self.spectra.sum(axis=1).values
        all_sum_to_one = np.all(np.isclose(spectra_counts, 1, rtol=rtol, atol=atol))
        return all_sum_to_one

    def as_ecal(self, new_offset: float, new_gain: float,
                new_quadratic: float, new_cubic: float,
                new_low_energy: float) -> SampleSet:
        """Re-bin spectra based on energy by interpolating the current shape from the current
        binning structure to a new one.

        Warning: this operation is likely to add or remove spectral information
        depending on the new energy calibration values used.

        Args:
            new_offset: new gain value, i.e. the 0-th e-cal term
            new_gain: new gain value, i.e. the 1-th e-cal term
            new_quadratic: new quadratic value, i.e. the 2-th e-cal term
            new_cubic: new cubic value, i.e. the 3-th e-cal term
            new_low_energy: new low energy term

        Returns:
            A new `SamleSet` with `spectra` and `info` DataFrames

        Raises:
            `ValueError` when no argument values are provided
        """
        new_args = [new_offset, new_gain, new_quadratic, new_cubic, new_low_energy]
        if all(v is None for v in new_args):
            raise ValueError("At least one argument value must be provided.")

        ecal_cols = list(self.ECAL_INFO_COLUMNS)
        new_ecal = self.info[ecal_cols].copy()
        new_ecal.ecal_order_0 = new_offset
        new_ecal.ecal_order_1 = new_gain
        new_ecal.ecal_order_2 = new_quadratic
        new_ecal.ecal_order_3 = new_cubic
        new_ecal.ecal_low_e = new_low_energy

        all_original_channel_energies = self.get_all_channel_energies()
        all_new_channel_energies = []
        fractional_energy_bins = np.linspace(0, 1, self.n_channels)
        for i in range(self.n_samples):
            offset, gain, quadratic, cubic, low_energy = new_ecal.iloc[i].values
            new_channel_energies = self._channels_to_energies(
                fractional_energy_bins,
                offset, gain, quadratic, cubic, low_energy
            )
            all_new_channel_energies.append(new_channel_energies)

        new_spectra = np.zeros([self.n_samples, self.n_channels])
        # Perform interpolation
        for i in range(self.n_samples):
            f = interp1d(
                all_original_channel_energies[i],
                self.spectra.values[i],
                kind="slinear",
                fill_value=0,
                bounds_error=False
            )
            new_spectrum = f(all_new_channel_energies[i])
            new_spectra[i] = new_spectrum

        # Update spectra and info DataFrames
        new_ss = self[:]
        new_ss.spectra = pd.DataFrame(new_spectra)
        new_ss.info.total_counts = new_ss.spectra.sum(axis=1)
        new_ss.info[ecal_cols] = new_ecal
        return new_ss

    def as_regions(self, rois: list) -> SampleSet:
        """Obtains a new `SampleSet` where the spectra are limited to specific
        regions of interest (ROIs).

        Notes:
            - If your samples have disparate energy calibration terms, call `as_ecal()` first
              to align channel space, then you may call this function. Otherwise, it is possible
              to end up with a ragged array of spectra, which we do not support.
            - After this call, `spectra` will have columns filled in with energy values for
              convenience. As such, in the context of the returned `SampleSet`, the energy
              calibration terms in `info` will no longer have any meaning, and any subsequent
              calls to methods like `as_ecal()` would not make sense.  This method is intended
              as a last step to be performed right before analysis of whatever kind.

        Args:
            rois: a list of 2-tuples where tuple represents (low energy, high energy)

        Returns:
            A new `SamleSet` with only ROIs remaining in the `spectra` DataFrame

        Raises:
            `ValueError` when no argument values are provided
        """
        if not rois:
            raise ValueError("At least one ROI must be provided.")
        all_ecals = self.ecal
        all_ecals_are_same = np.isclose(all_ecals, all_ecals[0]).all()
        if not all_ecals_are_same:
            msg = "Spectra have different energy calibrations; consider `as_ecal()` first."
            raise ValueError(msg)

        energies = self.get_channel_energies(0)
        mask = _get_energy_roi_masks(rois, energies)
        new_spectra = self.spectra.to_numpy(dtype=float)[:, mask]
        new_spectra = new_spectra.reshape((self.n_samples, -1))
        mask_energies = energies[mask]

        new_ss = self[:]
        new_ss.spectra = pd.DataFrame(new_spectra, columns=mask_energies)
        new_ss.info.total_counts = new_ss.spectra.sum(axis=1)
        return new_ss

    def check_seed_health(self, dead_time_threshold=1.0):
        """Checks health of all spectra and info assuming they are seeds.

        Invalidate states for which we currently check:

        - spectra do not sum to 1
        - dead time greater than or equal to provided threshold

        Args:
            dead_time_threshold: value at which seed dead time is unacceptable

        Raises:
            `AssertionError` if any check fails
        """
        all_spectra_sum_to_one = self.all_spectra_sum_to_one()
        assert all_spectra_sum_to_one

        if self.DEAD_TIME_PROP_INFO_KEY not in self.info.columns:
            self.set_dead_time_proportions()
        dead_samples = self.info[self.DEAD_TIME_PROP_INFO_KEY] >= dead_time_threshold
        all_samples_are_alive = not np.any(dead_samples)
        assert all_samples_are_alive

    def clip_negatives(self, min_value: float = 0):
        """Clip spectrum values to some minimum value.

        Args:
            min_value: value to which to clip existing spectrum values
        """
        self._spectra = pd.DataFrame(data=self._spectra.clip(min_value))

    def compare_to(self, ss: SampleSet, n_bins: int = 20, density: bool = False) \
            -> Tuple[dict, dict, dict]:
        """Compare the current `SampleSet` to another `SampleSet`.

        The function only compares the selected, mutual information of
        each `SampleSet` by computing the Jensen-Shannon distance between
        histograms of that information.

        Args:
            ss: `SampleSet` to compare to
            n_bins: number of bins we will sort both sample sets by
            density: whether histograms should be in counts or density

        Returns:
            Tuple of the following:

            - ss1_stats: dictionary of stats describing the first `SampleSet`
            - ss2_stats: dictionary of stats describing the second `SampleSet`
            - col_comparisons: dictionary of distance values comparing each stat
        """
        TARGET_SUMMARY_STATS = ["min", "max", "median", "mean", "std"]
        STAT_PRECISION = 3

        # Get info columns we want to compare
        info_columns = [x for x in list(self.DEFAULT_INFO_COLUMNS)
                        if x not in list(self.DEFAULT_EXCLUSIONS_FROM_COMPARISON)]

        # Get both sample sets comparable columns of data
        info_df1 = self.info[info_columns]
        info_df2 = ss.info[info_columns]

        # Check valid columns in each sample set (cannot have None or 0)
        ss1_valid_cols = [
            c for c in info_df1.columns
            if pd.to_numeric(info_df1[c], errors="coerce").notnull().all() and any(info_df1[c])
        ]
        ss2_valid_cols = [
            c for c in info_df2.columns
            if pd.to_numeric(info_df2[c], errors="coerce").notnull().all() and any(info_df2[c])
        ]

        # Remove non shared column lists
        for i in ss1_valid_cols:
            if i not in ss2_valid_cols:
                ss1_valid_cols.remove(i)
        for i in ss2_valid_cols:
            if i not in ss1_valid_cols:
                ss2_valid_cols.remove(i)

        # Remove non shared column data
        info_df1 = info_df1[ss1_valid_cols]
        info_df2 = info_df2[ss2_valid_cols]

        # Bin the data
        ss1_stats = {}
        ss2_stats = {}
        col_comparisons = {}
        for i in ss1_valid_cols:
            ss1_stats[i] = {}
            ss2_stats[i] = {}
            hist_range = (
                min(min(info_df1[i]), min(info_df2[i])),
                max(max(info_df1[i]), max(info_df2[i]))
            )
            # Get data in bins and get counts for each bin
            hist1, bins1 = np.histogram(info_df1[i], bins=n_bins, range=hist_range, density=density)
            hist2, bins2 = np.histogram(info_df2[i], bins=n_bins, range=hist_range, density=density)
            ss1_stats[i]["density"] = density
            ss2_stats[i]["density"] = density
            ss1_stats[i]["hist"] = hist1
            ss2_stats[i]["hist"] = hist2
            ss1_stats[i]["bins"] = bins1
            ss2_stats[i]["bins"] = bins2
            ss1_stats[i]["range"] = hist_range
            ss2_stats[i]["range"] = hist_range

            summary_stats_df1 = info_df1[i].agg(TARGET_SUMMARY_STATS).round(decimals=STAT_PRECISION)
            ss1_stats[i].update(summary_stats_df1.to_dict())
            summary_stats_df2 = info_df2[i].agg(TARGET_SUMMARY_STATS).round(decimals=STAT_PRECISION)
            ss2_stats[i].update(summary_stats_df2.to_dict())

            js_dist = distance.jensenshannon(hist1, hist2, axis=0)
            col_comparisons[i] = js_dist

        return ss1_stats, ss2_stats, col_comparisons

    def concat(self, ss_list: list):
        """Vertically concatenate multiple `SampleSet`s into one `SampleSet`.

        Combining of non-DataFrame properties (e.g., detector_info, synthesis_info, etc.)
        is NOT performed - it is the responsiblity of the user to ensure proper bookkeeping for
        these properties when concatenating `SampleSet`s.

        This method is most applicable to `SampleSet`s containing measured data from the same
        detector which could not be made as a single `SampleSet` because of how measurements had to
        be taken (i.e., measurements were taken by distinct processes separated by time).
        Therefore, the user should avoid concatenating `SampleSet`s when:

        - the data is from different detectors
        (note that `SampleSet`s are given to models for prediction, and models should be 1-to-1
        with physical detectors);
        - a mix of synthetic and measured data would occur (this could make an existing
        `synthesis_info` value ambiguous, but one way to handle this would be to add a new column
        to `info` distinguishing between synthetic and measured on a per-sample basis).

        Args:
            ss_list: list of `SampleSet`s to concatenate

        Returns:
            `SampleSet` object
        """
        if not ss_list:
            return
        if not isinstance(ss_list, list) and not isinstance(ss_list, tuple):
            ss_list = [ss_list]
        spectra_types = list(
            set([self.spectra_type.value] + [x.spectra_type.value for x in ss_list])
        )
        n_spectra_types = len(spectra_types)
        if n_spectra_types == 2:
            if SpectraType.Background.value in spectra_types and \
               SpectraType.Foreground.value in spectra_types:
                self.spectra_type = SpectraType.BackgroundForeground
            else:
                self.spectra_type = SpectraType.Unknown
        elif n_spectra_types == 0 or n_spectra_types > 2:
            self.spectra_type = SpectraType.Unknown

        self._spectra = pd.concat(
            [self._spectra] + [ss.spectra for ss in ss_list],
            ignore_index=True,
            sort=False
        )
        self._sources = pd.concat(
            [self._sources] + [ss.sources for ss in ss_list],
            ignore_index=True,
            sort=False
        )
        self._sources = self._sources.where(pd.notnull(self._sources), 0)
        existing_info_df = self._info if not self._info.empty else None
        self._info = pd.concat(
            [existing_info_df] + [ss.info for ss in ss_list],
            ignore_index=True,
            sort=False
        )
        self._info = self._info.where(pd.notnull(self._info), None)
        self._prediction_probas = pd.concat(
            [self._prediction_probas] + [ss.prediction_probas for ss in ss_list],
            ignore_index=True,
            sort=False
        )
        self._prediction_probas = self._prediction_probas.where(
            pd.notnull(self._prediction_probas),
            0
        )

    def downsample_spectra(self, target_bins: int = 128, min_frac=1e-8):
        """Uniformly down-bin spectra.

        Args:
            target_bins: number of channels to which to bin the existing spectra

        Raises:
            `ValueError` when binning is not a multiple of the target binning
        """
        if self.n_channels % target_bins == 0:
            n_per_group = int(self.n_channels / target_bins)
            transformation = np.zeros([target_bins, self.n_channels])
            for i in range(target_bins):
                transformation[i, (i * n_per_group):((i * n_per_group) + n_per_group)] = 1
        else:
            msg = "Current binning ({}) is not a multiple of the target binning ({})".format(
                self.n_channels,
                target_bins
            )
            raise ValueError(msg)
        self._spectra = pd.DataFrame(
            data=np.matmul(
                self._spectra.values,
                transformation.T
            )
        )
        self._spectra[self._spectra < min_frac] = 0

    def drop_sources(self, col_names: Iterable = DEFAULT_BG_SEED_NAMES,
                     normalize_sources: bool = True,
                     target_level: str = "Seed"):
        """Drop columns from `sources`.

        Args:
            col_names: names of the sources columns to be dropped.
                The names of background seeds are used by default as removing the
                ground truth for background sources when dealing with synthetic,
                gross spectra is a common operation.
            normalize_sources: whether to normalize the sources DataFrame after dropping
                the column(s)
            target_level: `SampleSet.sources` column level to use
        """
        self.sources.drop(col_names, axis=1, inplace=True, level=target_level)
        if normalize_sources:
            self.normalize_sources()

    def drop_sources_columns_with_all_zeros(self):
        """Remove columns from the sources `DataFrame` that contain only zeros.

        Modifications are made in-place.
        """
        idxs = (self.sources != 0).any(axis=0)
        self.sources = self.sources.loc[:, idxs]
        self.sources.columns = self.sources.columns.remove_unused_levels()

        return idxs

    def drop_spectra_with_no_contributors(self) -> np.ndarray:
        """Remove samples where the spectrum has no recorded contributor.

        Modifications are inplace.

        Returns:
            A boolean mask of which rows were kept
        """
        idxs = np.where(self.sources.values.sum(axis=1) == 0)[0]
        kept_mask = ~np.in1d(self.sources.index.values, idxs)

        self.spectra = self.spectra.drop(index=idxs).reset_index(drop=True)
        self.sources = self.sources.drop(index=idxs).reset_index(drop=True)
        self.info = self.info.drop(index=idxs).reset_index(drop=True)

        return kept_mask

    def extend(self, spectra: Union[dict, list, np.array, pd.DataFrame],
               sources: pd.DataFrame, info: Union[list, pd.DataFrame]):
        """Extend the current SampleSet with the given data.

        Always verify that the data was appended in the way you expect based on what input type
        you are preferring to work with.

        Args:
            spectra: spectra to append to the current spectra DataFrame
            sources: sources to append to the current sources DataFrame
            info: info to append to the current info DataFrame

        Raises:
            `ValueError` when internal `DataFrame` lengths do not match
        """
        if not spectra.shape[0] == sources.shape[0] == info.shape[0]:
            msg = "The number of rows in each of the required positional arguments must be same."
            raise ValueError(msg)

        self._spectra = self._spectra\
            .append(pd.DataFrame(spectra), ignore_index=True, sort=True)\
            .fillna(0)

        new_sources_column_index = pd.MultiIndex.from_tuples(
            sources.keys(),
            names=SampleSet.SOURCES_MULTI_INDEX_NAMES
        )
        new_sources_df = pd.DataFrame(sources.values(), columns=new_sources_column_index)
        self._sources = self._sources\
            .append(new_sources_df, ignore_index=True, sort=True)\
            .fillna(0)

        self._info = self._info\
            .append(pd.DataFrame(info), ignore_index=True, sort=True)

    def get_all_channel_energies(self, fractional_energy_bins=None) -> np.ndarray:
        """Get the energy (in keV) represented by the lower edge of each channel for all samples.

        See docstring for `_channels_to_energies()` for more details.

        Args:
            fractional_energy_bins: array of fractions representing the spacing of
                the channels (for arbitrary channel structures)

        Returns:
            2-D array of energy values in keV for all samples
        """
        if fractional_energy_bins is None:
            fractional_energy_bins = np.linspace(0, 1, self.n_channels)

        all_channel_energies = []
        for i in range(self.n_samples):
            channel_energies = self.get_channel_energies(i, fractional_energy_bins)
            all_channel_energies.append(channel_energies)

        return all_channel_energies

    def get_channel_energies(self, sample_index, fractional_energy_bins=None) -> np.ndarray:
        """Get the energy (in keV) represented by the lower edge of each channel for a
        single sample at a specific index.

        See docstring for `_channels_to_energies()` for more details.

        Args:
            sample_index: index of the specific sample for which to obtain energies
            fractional_energy_bins: array of fractions representing the spacing of
                the channels (for arbitrary channel structures)

        Returns:
            Array of energy values in keV
        """
        if fractional_energy_bins is None:
            fractional_energy_bins = np.linspace(0, 1, self.n_channels)

        # TODO: raise error if ecal info is missing for any row

        offset, gain, quadratic, cubic, low_energy = self.ecal[sample_index]
        channel_energies = self._channels_to_energies(
            fractional_energy_bins,
            offset, gain, quadratic, cubic, low_energy
        )

        return channel_energies

    def get_confidences(self, fg_seeds_ss: SampleSet, bg_seed_ss: SampleSet = None,
                        bg_cps: float = None, is_lpe: bool = False,
                        confidence_func: Callable = None,
                        **confidence_func_kwargs) -> np.ndarray:
        """Get confidence measure for predictions as a NumPy array.

        Args:
            fg_seeds_ss: `Sampleset` containing foreground seeds
            bg_seed_ss: `Sampleset` containing a single background seed
            bg_cps: count rate used to scale `bg_seed_ss`
            is_lpe: whether predictions are for multi-class classification or label proportion
                estimation (LPE)
            confidence_func: metric function used to compare spectral reconstructions and
            confidence_func_kwargs: kwargs for confidence metric function

        Returns:
            Array containing confidence metric for each prediction
        """
        if fg_seeds_ss.spectra_type != SpectraType.Foreground:
            msg = (
                "`fg_seeds_ss` must have a `spectra_type` of `Foreground`. "
                f"Its `spectra_type` is `{fg_seeds_ss.spectra_type}`."
            )
            raise ValueError(msg)
        if self.spectra_type == SpectraType.Gross:
            if self.info.total_counts.isna().any() or (self.info.total_counts <= 0).any():
                msg = (
                    "This `SampleSet` must have the `info.total_counts` column filled in "
                    "with values greater than zero."
                    "If there are samples with spectra of all zeros, remove them first."
                )
                raise ValueError(msg)
            if self.info.live_time.isna().any() or (self.info.live_time <= 0).any():
                msg = (
                    "This `SampleSet` must have the `info.live_time` column filled in "
                    "with values greater than zero."
                    "If there are samples with live of zero, remove them first."
                )
                raise ValueError(msg)
            if not bg_seed_ss:
                raise ValueError("`bg_seed_ss` is required for current `spectra_type` of `Gross`.")
            if bg_seed_ss.n_samples != 1:
                raise ValueError("`bg_seed_ss` must have exactly one sample.")
            if bg_seed_ss.spectra_type != SpectraType.Background:
                msg = (
                    "`bg_seed_ss` must have a `spectra_type` of `Background`. "
                    f"Its `spectra_type` is `{bg_seed_ss.spectra_type}`."
                )
                raise ValueError(msg)
            if not bg_cps or bg_cps <= 0:
                raise ValueError("Positive background count rate required.")

        if confidence_func is None:
            from riid.losses import jensen_shannon_divergence
            confidence_func = jensen_shannon_divergence

        if is_lpe:
            recon_proportions = self.prediction_probas.values
        else:
            max_indices = np.argmax(self.prediction_probas.values, axis=1)
            recon_proportions = np.zeros(self.prediction_probas.values.shape)
            recon_proportions[np.arange(self.n_samples), max_indices] = 1

        reconstructions = np.dot(recon_proportions, fg_seeds_ss.spectra.values)

        if self.spectra_type == SpectraType.Gross:
            bg_spectrum = bg_seed_ss.spectra.iloc[0].values
            normalized_bg_spectrum = bg_spectrum / bg_spectrum.sum()

            bg_counts = bg_cps * self.info.live_time.values
            fg_counts = (self.info.total_counts.values - bg_counts).clip(1)

            scaled_bg_spectra = get_expected_spectra(normalized_bg_spectrum, bg_counts)
            scaled_fg_spectra = reconstructions * fg_counts[:, np.newaxis]

            reconstructions = scaled_fg_spectra + scaled_bg_spectra

        confidences = confidence_func(
            reconstructions.astype(np.float64),
            self.spectra.values,
            **confidence_func_kwargs
        )
        confidences = np.array(confidences)
        return confidences

    def _get_spectral_distances(self, distance_func=distance.jensenshannon) -> np.array:
        n_samples = self.n_samples
        spectra = self.spectra.values.copy()
        spectrum_pairs = [
            (spectra[i], spectra[j])
            for i in range(n_samples)
            for j in range(i, n_samples)
        ]
        left_spectrum, right_spectrum = zip(*spectrum_pairs)
        distance_values = distance_func(left_spectrum, right_spectrum, axis=1)

        return distance_values

    def get_multiclass_jsds(self, fg_seeds_ss: SampleSet, target_level: str) -> list:
        """For each sample, this constructs a dictionary containing seed name to JSD using
        the sample's top prediction.

        Args:
            fg_seeds_ss: `SampleSet` from which to pull seeds for computing JSD
            prediction_target_level: the level at which predictions were made by a model

        Returns:
            List of dictionaries where keys are seed name and values are JSD
        """
        test_prediction_labels = self.get_predictions(target_level)
        fg_seed_labels = fg_seeds_ss.get_labels(target_level)

        jsds = []
        for i, pred_label in enumerate(test_prediction_labels):
            test_spectrum = self.spectra.iloc[i]
            seeds_for_pred = fg_seeds_ss[fg_seed_labels == pred_label]
            seed_labels_for_pred = seeds_for_pred.get_labels("Seed")
            jsds_for_sample = {}
            for j, seed_label in enumerate(seed_labels_for_pred):
                seed_spectrum = seeds_for_pred.spectra.iloc[j]
                jsd = distance.jensenshannon(test_spectrum, seed_spectrum)
                jsds_for_sample[seed_label] = jsd
            jsds.append(jsds_for_sample)
        return jsds

    def get_spectral_distance_matrix(self, distance_func=distance.jensenshannon,
                                     target_level="Seed") -> pd.DataFrame:
        """Compute the distance between all pairs of spectra.

        This method is intended for use on seed spectra (i.e., templates).
        Calling this method on large samplesets, such as those produced by the static
        synthesizer, is not recommended.
        This method only computes the upper triangle and diagonal of the matrix since the lower
        triangle would be a copy of the upper triangle.
        The diagonal is computed, for when Poisson noise is present, for two main reasons:

        - the distance between the same source will effectively never be zero.
        - there is sometimes research interest in comparing the diagonal to the upper triangle.

        Args:
            distance_func: specific distance function to use
            target_level: `SampleSet.sources` column level to use

        Returns:
            A 2-D array of distance values
        """
        distance_values = self._get_spectral_distances(distance_func)
        row_labels = self.get_labels(target_level=target_level)
        col_labels = self.sources.columns.get_level_values(level=target_level)
        distance_df = _get_distance_df_from_values(distance_values, row_labels, col_labels)

        return distance_df

    def get_labels(self, target_level="Isotope", max_only=True,
                   include_value=False, min_value=0.01,
                   level_aggregation=None):
        """Get row ground truth labels for each sample based on `sources` values.

        See docstring for `_get_row_labels()` for more details.
        """
        labels = _get_row_labels(
            self.sources,
            target_level=target_level,
            max_only=max_only,
            include_value=include_value,
            min_value=min_value,
            level_aggregation=level_aggregation,
        )
        return labels

    def get_predictions(self, target_level="Isotope", max_only=True,
                        include_value=False, min_value=0.01,
                        level_aggregation=None):
        """Get row predictions for each spectrum based on `prediction_probas` values.

        See docstring for `_get_row_labels()` for more details.
        """
        labels = _get_row_labels(
            self.prediction_probas,
            target_level=target_level,
            max_only=max_only,
            include_value=include_value,
            min_value=min_value,
            level_aggregation=level_aggregation,
        )
        return labels

    def get_samples(self) -> np.ndarray:
        """Get `spectra` as a NumPy array.

        Replaces NaN values with 0.

        Returns:
            Array containing spectrum data
        """
        spectra_values = np.nan_to_num(self._spectra)
        return spectra_values

    def get_source_contributions(self, target_level="Isotope") -> np.ndarray:
        """Get `sources` as a NumPy array.

        Replaces NaN values with 0.

        Returns:
            Array containing the ground truth contributions for each sample
        """
        collapsed_sources = self.sources.T.groupby(target_level).sum().T
        sources_values = np.nan_to_num(collapsed_sources)
        return sources_values

    def normalize(self, p: float = 1, clip_negatives: bool = True):
        """Apply L-p normalization to `spectra` in place.

        Default is L1 norm (p=1) can be interpreted as a forming a probability mass function (PMF).
        L2 norm (p=2) is unit energy (sum of squares == 1).

        Reference: https://en.wikipedia.org/wiki/Parseval's_theorem

        Args:
            p: exponent to which the spectra values are raised
            clip_negatives: whether negative values will be removed from the spectra and
                replaced with 0
        """
        if clip_negatives:
            self.clip_negatives()
        elif (self._spectra.values < 0).sum():
            logging.warning(
                "You are performing a normalization operation on spectra which contain negative " +
                "values.  Consider applying `clip_negatives`."
            )

        energy = (self._spectra.values ** p).sum(axis=1)[:, np.newaxis]
        energy[energy == 0] = 1
        self._spectra = pd.DataFrame(data=self._spectra.values / energy ** (1/p))
        if p == 1:
            self.spectra_state = SpectraState.L1Normalized
        elif p == 2:
            self.spectra_state = SpectraState.L2Normalized
        else:
            self.spectra_state = SpectraState.Unknown

    def normalize_sources(self):
        """Normalize `sources` such that rows sum to 1."""
        row_sums = self._sources.sum(axis=1)
        row_sums[row_sums == 0] = 1
        self._sources = self._sources.clip(0).divide(
            row_sums,
            axis=0,
        )

    def replace_nan(self, replace_value: float = 0):
        """Replace np.nan values.

        Args:
            replace_value: value with which NaN will be replaced

        """
        self._spectra.replace(np.nan, replace_value)
        self._sources.replace(np.nan, replace_value)
        self._info.replace(np.nan, replace_value)

    def sample(self, n_samples: int, random_seed: int = None) -> SampleSet:
        """Randomly sample the SampleSet.

        Args:
            n_samples: number of random samples to be returned
            random_seed: seed value for random number generation

        Returns:
            `Sampleset` of randomly selected measurements
        """
        if random_seed:
            random.seed(random_seed)

        if n_samples > self.n_samples:
            n_samples = self.n_samples

        random_indices = random.sample(self.spectra.index.values.tolist(), n_samples)
        random_mask = np.isin(self.spectra.index, random_indices)
        return self[random_mask]

    def set_dead_time_proportions(self):
        """Computes dead time proportion for each sample and saves it in the `SampleSet` info.
        """
        dead_time_props = self._get_dead_time_proportions()
        self.info[self.DEAD_TIME_PROP_INFO_KEY] = dead_time_props

    def shuffle(self, inplace: bool = True, random_state: int = None) -> SampleSet:
        """Randomly reorder all DataFrames.

        Args:
            inplace: whether to perform the operation in-place
            random_state: random seed for reproducing specific shuffles

        Returns:
            `SampleSet` object when `inplace` is False
        """
        new_row_ordering = np.arange(self.n_samples)
        np.random.default_rng(random_state).shuffle(new_row_ordering)

        new_ss = self if inplace else self[:]

        new_ss.spectra = self.spectra.iloc[new_row_ordering]\
            .reset_index(drop=True)
        new_ss.sources = self.sources.iloc[new_row_ordering]\
            .reset_index(drop=True)
        new_ss.info = self.info.iloc[new_row_ordering]\
            .reset_index(drop=True)
        if not new_ss.prediction_probas.empty:
            new_ss.prediction_probas = self.prediction_probas.iloc[new_row_ordering]\
                .reset_index(drop=True)

        if not inplace:
            return new_ss

    def sources_columns_to_dict(self, target_level="Isotope") -> Union[dict, list]:
        """Convert the MultiIndex columns of `sources` to a dictionary.

        Note: depending on `target_level` and `sources` columns, duplicate values are possible.

        Args:
            target_level: `SampleSet.sources` column level to use

        Returns:
            If `target_level` is "Category" or "Isotope," then a dict is returned.
            If `target_level` is "Seed," then a list is returned.

        Raises:
            `ValueError` when `target_level` is invalid
        """
        self._check_target_level(
            target_level,
            levels_allowed=SampleSet.SOURCES_MULTI_INDEX_NAMES
        )

        d = {}
        column_tuples = self.sources.columns.to_list()
        if target_level == "Seed":
            d = column_tuples
        elif target_level == "Isotope":
            for t in column_tuples:
                _, i, _ = t
                if i not in d:
                    d[i] = [t]
                if t not in d[i]:
                    d[i].append(t)
        else:  # target_level == "Category":
            for t in column_tuples:
                c, i, _ = t
                if c not in d:
                    d[c] = {}
                if i not in d[c]:
                    d[c][i] = [t]
                if t not in d[c][i]:
                    d[c][i].append(t)

        return d

    def split_fg_and_bg(self, bg_seed_names: Iterable = DEFAULT_BG_SEED_NAMES) \
            -> Tuple[SampleSet, SampleSet]:
        """Split the current `SampleSet` into two new `SampleSet`s, one containing only foreground
        sources and the other containing only background sources.

        Foreground sources are assumed to be anything that is not designated as a background source.

        Args:
            bg_seeds_names: names of the seeds which are considered background sources.
                This list be customized to also extract atypical background sources such as
                calibration sources.

        Returns:
            Two `SampleSet`s, the first containing only foregrounds and the second only
            containing backgrounds.
        """
        seed_labels = self.get_labels(target_level="Seed")
        bg_mask = seed_labels.isin(bg_seed_names)

        fg_seeds_ss = self[~bg_mask]
        fg_seeds_ss.drop_sources_columns_with_all_zeros()
        fg_seeds_ss.spectra_type = SpectraType.Foreground
        fg_seeds_ss.spectra_state = self.spectra_state

        bg_seeds_ss = self[bg_mask]
        bg_seeds_ss.drop_sources_columns_with_all_zeros()
        bg_seeds_ss.spectra_type = SpectraType.Background
        bg_seeds_ss.spectra_state = self.spectra_state

        return fg_seeds_ss, bg_seeds_ss

    def squash(self) -> SampleSet:
        """Combine all rows of the `SampleSet` into a single row.

        All data suited for summing is summed, otherwise the info from the first
        sample is used.

        Returns:
            A flattened `SampleSet`
        """
        flat_spectra = self.spectra.sum(axis=0).to_frame().T
        flat_sources = self.sources.sum(axis=0).to_frame().T
        flat_prediction_probas = self.prediction_probas.sum(axis=0).to_frame().T
        flat_info = self.info.iloc[0].to_frame().T

        if "description" in flat_info:
            flat_info.description = "squashed"
        if "live_time" in flat_info:
            flat_info.live_time = self.info.live_time.sum()
        if "real_time" in flat_info:
            flat_info.real_time = self.info.real_time.sum()
        if "total_counts" in flat_info:
            flat_info.total_counts = self.info.total_counts.sum()
        if "snr" in flat_info:
            flat_info.snr = self.info.snr.sum()

        # Create a new SampleSet with the flattened data
        flat_ss = SampleSet()
        flat_ss.spectra = flat_spectra
        flat_ss.sources = flat_sources
        flat_ss.prediction_probas = flat_prediction_probas
        flat_ss.info = flat_info
        # Copy dictionary and other non-DataFrame objects from full ss
        flat_ss._detector_info = self._detector_info
        flat_ss._synthesis_info = self._synthesis_info
        flat_ss._measured_or_synthetic = self._measured_or_synthetic

        return flat_ss

    def to_hdf(self, path: str | Path, verbose=False, **kwargs):
        """Write the `SampleSet` to disk as a HDF file.

        Args:
            path: file path at which to save as an HDF file
            verbose: whether to display detailed output
            kwargs: additional arguments to be passed to the `Pandas.HDFStore` constructor

        Raises:
            `ValueError` when provided path extension is invalid
        """
        path = Path(path)
        if path.suffix != riid.SAMPLESET_HDF_FILE_EXTENSION:
            logging.warning(f"Path does not end in {riid.SAMPLESET_HDF_FILE_EXTENSION}")

        _write_hdf(self, path, **kwargs)
        if verbose:
            logging.info(f"Saved SampleSet to '{path}'")

    def to_json(self, path: str | Path, verbose=False):
        """Write the `SampleSet` to disk as a JSON file.

        Warning: it is not recommended that you use this on large `SampleSet` objects.
        Consider `SampleSet.to_hdf()` instead in such cases.

        Args:
            path: file path at which to save as an HDF file
            verbose: whether to display detailed output

        Raises:
            `ValueError` when provided path extension is invalid
        """
        path = Path(path)
        if path.suffix != riid.SAMPLESET_JSON_FILE_EXTENSION:
            logging.warning(f"Path does not end in {riid.SAMPLESET_JSON_FILE_EXTENSION}")

        _write_json(self, path)
        if verbose:
            logging.info(f"Saved SampleSet to '{path}'")

    def to_pcf(self, path: str | Path, verbose=False):
        """Write the `SampleSet` to disk as a PCF.

        Args:
            path: file path at which to save as a PCF
            verbose: whether to display detailed output

        Raises:
            `ValueError` when provided path extension is invalid
        """
        path = Path(path)
        if path.suffix != riid.PCF_FILE_EXTENSION:
            logging.warning(f"Path does not end in {riid.PCF_FILE_EXTENSION}")

        _dict_to_pcf(_ss_to_pcf_dict(self, verbose), path, verbose)

        if verbose:
            logging.info(f"Saved SampleSet to '{path}'")

    def update_timestamp(self):
        """Set the timestamp for all samples to the current UTC date and time."""
        self.info.timestamp = _get_utc_timestamp()

    def upsample_spectra(self, target_bins: int = 4096):
        """Uniformly up-bin spectra.

        Args:
            target_bins: number of bins into which the current spectra
                should be split.
        """
        if target_bins % self.n_channels == 0:
            n_per_group = int(target_bins / self.n_channels)
            transformation = np.zeros([target_bins, self.n_channels])
            for i in range(self.n_channels):
                transformation[(i * n_per_group):((i * n_per_group) + n_per_group), i] = \
                                    1.0 / n_per_group
        else:
            raise ValueError(
                ("Desired number of bins ({}) is cannot"
                 " be uniformly mapped from existing"
                 " channels ({})").format(target_bins, self.n_channels))
        self._spectra = pd.DataFrame(
            data=np.matmul(
                self._spectra.values,
                transformation.T))

Class variables

var DEAD_TIME_PROP_INFO_KEY
var DEFAULT_BG_SEED_NAMES
var DEFAULT_EXCLUSIONS_FROM_COMPARISON
var DEFAULT_INFO_COLUMNS
var ECAL_INFO_COLUMNS
var SOURCES_MULTI_INDEX_NAMES
var SUPPORTED_STATES_FOR_ARITHMETIC

Instance variables

var category_names

Get the names of the categories involved in this SampleSet.

Expand source code Browse git
@property
def category_names(self):
    """Get the names of the categories involved in this SampleSet."""
    return self.sources.columns.get_level_values("Category")
var classified_by

Get or set the UUID of the model that classified the SampleSet.

TODO: Implement as third dimension to prediction_probas DataFrame

Expand source code Browse git
@property
def classified_by(self):
    """Get or set the UUID of the model that classified the SampleSet.

    TODO: Implement as third dimension to `prediction_probas` DataFrame
    """
    return self._classified_by
var detector_info

Get or set the detector info on which this SampleSet is based.

TODO: implement as DataFrame

Expand source code Browse git
@property
def detector_info(self):
    """Get or set the detector info on which this `SampleSet` is based.

    TODO: implement as DataFrame
    """
    return self._detector_info
var difficulty_score : float

Compute the "difficulty" of the SampleSet on a scale of 0 to 1, where 0 is easiest and 1 is hardest.

The difficulty of a SampleSet is the mean of the individual sample difficulties. Each sample's difficulty is determined by where its signal strength (SNR) falls on the survival function (AKA reliability function, or 1 - CDF) for a logistic distribution. The decision to use this distribution is based on empirical data showing that classifier performance increases in a logistic fashion as signal strength increases.

The primary use of this function is for situations where ground truth is unknown and you want to get an idea of how "difficult" it will be for a model to make predictions. Consider the following example: Given a SampleSet for which ground truth is unknown and the signal strength for each spectrum is low. A model considering, say, 100+ isotopes will see this SampleSet as quite difficult, whereas a model considering 5 isotopes will, being more specialized, see the SampleSet as easier. Of course, this makes the assumption that both models were trained to the isotope(s) contained in the test SampleSet.

Based on the previous example, the unfortunate state of this function is that you must know how to pick means and standard deviations which properly reflect the:

  • number of target isotopes
  • amount of variation within each isotope (i.e., shielding, scattering, etc.)
  • detector resolution

A future goal of ours is to provide updates to this function and docstring which make choosing the mean and standard deviation easier/automated based on more of our findings.

Arguments

mean: SNR value representing the mean of the logistic distribution std: standard deviation of the logistic distribution

Returns

The mean of all SNR values passed through a logistic survival function

Expand source code Browse git
@property
def difficulty_score(self, mean=10.0, std=3.0) -> float:
    """Compute the "difficulty" of the `SampleSet` on a scale of 0 to 1,
    where 0 is easiest and 1 is hardest.

    The difficulty of a `SampleSet` is the mean of the individual sample difficulties.
    Each sample's difficulty is determined by where its signal strength (SNR)
    falls on the survival function (AKA reliability function, or 1 - CDF) for a
    logistic distribution.
    The decision to use this distribution is based on empirical data showing that
    classifier performance increases in a logistic fashion as signal strength increases.

    The primary use of this function is for situations where ground truth is unknown
    and you want to get an idea of how "difficult" it will be for a model
    to make predictions.  Consider the following example:
        Given a `SampleSet` for which ground truth is unknown and the signal strength for
        each spectrum is low.
        A model considering, say, 100+ isotopes will see this `SampleSet` as quite
        difficult, whereas a model considering 5 isotopes will, being more specialized,
        see the `SampleSet` as easier.
        Of course, this makes the assumption that both models were trained to the
        isotope(s) contained in the test `SampleSet`.

    Based on the previous example, the unfortunate state of this function is that you
    must know how to pick means and standard deviations which properly reflect the:

    - number of target isotopes
    - amount of variation within each isotope (i.e., shielding, scattering, etc.)
    - detector resolution

    A future goal of ours is to provide updates to this function and docstring which make
    choosing the mean and standard deviation easier/automated based on more of our findings.

    Arguments:
        mean: SNR value representing the mean of the logistic distribution
        std: standard deviation of the logistic distribution

    Returns:
        The mean of all SNR values passed through a logistic survival function
    """
    snrs: np.ndarray = self.info.snr.clip(1e-6)
    score = float(stats.logistic.sf(snrs, loc=mean, scale=std).mean())

    return score
var ecal

Get or set the ecal terms.

Expand source code Browse git
@property
def ecal(self):
    """Get or set the ecal terms."""
    ecal_terms = self.info[list(self.ECAL_INFO_COLUMNS)].to_numpy(dtype=float)
    return ecal_terms
var info

Get or set the info DataFrame.

Expand source code Browse git
@property
def info(self):
    """Get or set the info DataFrame."""
    return self._info
var isotope_names

Get the names of the isotopes involved in this SampleSet.

Expand source code Browse git
@property
def isotope_names(self):
    """Get the names of the isotopes involved in this SampleSet."""
    return self.sources.columns.get_level_values("Isotope")
var measured_or_synthetic

Get or set the value for measured_or_synthetic for the SampleSet object.

Expand source code Browse git
@property
def measured_or_synthetic(self):
    """Get or set the value for measured_or_synthetic for the SampleSet object."""
    return self._measured_or_synthetic
var n_channels

Get the number of channels included in the spectra DataFrame. Channels may also be referred to as bins.

Expand source code Browse git
@property
def n_channels(self):
    """Get the number of channels included in the spectra DataFrame.
    Channels may also be referred to as bins.
    """
    return self._spectra.shape[1]
var n_samples

Get the number of samples included in the spectra dataframe, where each row is a sample.

Expand source code Browse git
@property
def n_samples(self):
    """Get the number of samples included in the spectra dataframe,
    where each row is a sample.
    """
    return self._spectra.shape[0]
var prediction_probas

Get or set the prediction_probas DataFrame.

Expand source code Browse git
@property
def prediction_probas(self):
    """Get or set the prediction_probas DataFrame."""
    return self._prediction_probas
var seed_names

Get the names of the seeds involved in this SampleSet.

Expand source code Browse git
@property
def seed_names(self):
    """Get the names of the seeds involved in this `SampleSet`."""
    return self.sources.columns.get_level_values("Seed")
var sources

Get or set the sources DataFrame.

Expand source code Browse git
@property
def sources(self):
    """Get or set the sources `DataFrame`."""
    return self._sources
var spectra

Get or set the spectra DataFrame.

Expand source code Browse git
@property
def spectra(self):
    """Get or set the spectra `DataFrame`."""
    return self._spectra
var spectra_state

Get or set the spectra state.

Expand source code Browse git
@property
def spectra_state(self: SpectraState):
    """Get or set the spectra state."""
    return self._spectra_state
var spectra_type

Get or set the spectra type.

Expand source code Browse git
@property
def spectra_type(self: SpectraType):
    """Get or set the spectra type."""
    return self._spectra_type
var synthesis_info

Get or set the information that was used to by a synthesizer.

Expand source code Browse git
@property
def synthesis_info(self):
    """Get or set the information that was used to by a synthesizer."""
    return self._synthesis_info

Methods

def all_spectra_sum_to_one(self, rtol: float = 0.0, atol: float = 0.0001) ‑> bool

Checks if all spectra are normalized to sum to one.

Expand source code Browse git
def all_spectra_sum_to_one(self, rtol: float = 0.0, atol: float = 1e-4) -> bool:
    """Checks if all spectra are normalized to sum to one."""
    spectra_counts = self.spectra.sum(axis=1).values
    all_sum_to_one = np.all(np.isclose(spectra_counts, 1, rtol=rtol, atol=atol))
    return all_sum_to_one
def as_ecal(self, new_offset: float, new_gain: float, new_quadratic: float, new_cubic: float, new_low_energy: float) ‑> SampleSet

Re-bin spectra based on energy by interpolating the current shape from the current binning structure to a new one.

Warning: this operation is likely to add or remove spectral information depending on the new energy calibration values used.

Args

new_offset
new gain value, i.e. the 0-th e-cal term
new_gain
new gain value, i.e. the 1-th e-cal term
new_quadratic
new quadratic value, i.e. the 2-th e-cal term
new_cubic
new cubic value, i.e. the 3-th e-cal term
new_low_energy
new low energy term

Returns

A new SamleSet with spectra and info DataFrames

Raises

ValueError when no argument values are provided

Expand source code Browse git
def as_ecal(self, new_offset: float, new_gain: float,
            new_quadratic: float, new_cubic: float,
            new_low_energy: float) -> SampleSet:
    """Re-bin spectra based on energy by interpolating the current shape from the current
    binning structure to a new one.

    Warning: this operation is likely to add or remove spectral information
    depending on the new energy calibration values used.

    Args:
        new_offset: new gain value, i.e. the 0-th e-cal term
        new_gain: new gain value, i.e. the 1-th e-cal term
        new_quadratic: new quadratic value, i.e. the 2-th e-cal term
        new_cubic: new cubic value, i.e. the 3-th e-cal term
        new_low_energy: new low energy term

    Returns:
        A new `SamleSet` with `spectra` and `info` DataFrames

    Raises:
        `ValueError` when no argument values are provided
    """
    new_args = [new_offset, new_gain, new_quadratic, new_cubic, new_low_energy]
    if all(v is None for v in new_args):
        raise ValueError("At least one argument value must be provided.")

    ecal_cols = list(self.ECAL_INFO_COLUMNS)
    new_ecal = self.info[ecal_cols].copy()
    new_ecal.ecal_order_0 = new_offset
    new_ecal.ecal_order_1 = new_gain
    new_ecal.ecal_order_2 = new_quadratic
    new_ecal.ecal_order_3 = new_cubic
    new_ecal.ecal_low_e = new_low_energy

    all_original_channel_energies = self.get_all_channel_energies()
    all_new_channel_energies = []
    fractional_energy_bins = np.linspace(0, 1, self.n_channels)
    for i in range(self.n_samples):
        offset, gain, quadratic, cubic, low_energy = new_ecal.iloc[i].values
        new_channel_energies = self._channels_to_energies(
            fractional_energy_bins,
            offset, gain, quadratic, cubic, low_energy
        )
        all_new_channel_energies.append(new_channel_energies)

    new_spectra = np.zeros([self.n_samples, self.n_channels])
    # Perform interpolation
    for i in range(self.n_samples):
        f = interp1d(
            all_original_channel_energies[i],
            self.spectra.values[i],
            kind="slinear",
            fill_value=0,
            bounds_error=False
        )
        new_spectrum = f(all_new_channel_energies[i])
        new_spectra[i] = new_spectrum

    # Update spectra and info DataFrames
    new_ss = self[:]
    new_ss.spectra = pd.DataFrame(new_spectra)
    new_ss.info.total_counts = new_ss.spectra.sum(axis=1)
    new_ss.info[ecal_cols] = new_ecal
    return new_ss
def as_regions(self, rois: list) ‑> SampleSet

Obtains a new SampleSet where the spectra are limited to specific regions of interest (ROIs).

Notes

  • If your samples have disparate energy calibration terms, call as_ecal() first to align channel space, then you may call this function. Otherwise, it is possible to end up with a ragged array of spectra, which we do not support.
  • After this call, spectra will have columns filled in with energy values for convenience. As such, in the context of the returned SampleSet, the energy calibration terms in info will no longer have any meaning, and any subsequent calls to methods like as_ecal() would not make sense. This method is intended as a last step to be performed right before analysis of whatever kind.

Args

rois
a list of 2-tuples where tuple represents (low energy, high energy)

Returns

A new SamleSet with only ROIs remaining in the spectra DataFrame

Raises

ValueError when no argument values are provided

Expand source code Browse git
def as_regions(self, rois: list) -> SampleSet:
    """Obtains a new `SampleSet` where the spectra are limited to specific
    regions of interest (ROIs).

    Notes:
        - If your samples have disparate energy calibration terms, call `as_ecal()` first
          to align channel space, then you may call this function. Otherwise, it is possible
          to end up with a ragged array of spectra, which we do not support.
        - After this call, `spectra` will have columns filled in with energy values for
          convenience. As such, in the context of the returned `SampleSet`, the energy
          calibration terms in `info` will no longer have any meaning, and any subsequent
          calls to methods like `as_ecal()` would not make sense.  This method is intended
          as a last step to be performed right before analysis of whatever kind.

    Args:
        rois: a list of 2-tuples where tuple represents (low energy, high energy)

    Returns:
        A new `SamleSet` with only ROIs remaining in the `spectra` DataFrame

    Raises:
        `ValueError` when no argument values are provided
    """
    if not rois:
        raise ValueError("At least one ROI must be provided.")
    all_ecals = self.ecal
    all_ecals_are_same = np.isclose(all_ecals, all_ecals[0]).all()
    if not all_ecals_are_same:
        msg = "Spectra have different energy calibrations; consider `as_ecal()` first."
        raise ValueError(msg)

    energies = self.get_channel_energies(0)
    mask = _get_energy_roi_masks(rois, energies)
    new_spectra = self.spectra.to_numpy(dtype=float)[:, mask]
    new_spectra = new_spectra.reshape((self.n_samples, -1))
    mask_energies = energies[mask]

    new_ss = self[:]
    new_ss.spectra = pd.DataFrame(new_spectra, columns=mask_energies)
    new_ss.info.total_counts = new_ss.spectra.sum(axis=1)
    return new_ss
def check_seed_health(self, dead_time_threshold=1.0)

Checks health of all spectra and info assuming they are seeds.

Invalidate states for which we currently check:

  • spectra do not sum to 1
  • dead time greater than or equal to provided threshold

Args

dead_time_threshold
value at which seed dead time is unacceptable

Raises

AssertionError if any check fails

Expand source code Browse git
def check_seed_health(self, dead_time_threshold=1.0):
    """Checks health of all spectra and info assuming they are seeds.

    Invalidate states for which we currently check:

    - spectra do not sum to 1
    - dead time greater than or equal to provided threshold

    Args:
        dead_time_threshold: value at which seed dead time is unacceptable

    Raises:
        `AssertionError` if any check fails
    """
    all_spectra_sum_to_one = self.all_spectra_sum_to_one()
    assert all_spectra_sum_to_one

    if self.DEAD_TIME_PROP_INFO_KEY not in self.info.columns:
        self.set_dead_time_proportions()
    dead_samples = self.info[self.DEAD_TIME_PROP_INFO_KEY] >= dead_time_threshold
    all_samples_are_alive = not np.any(dead_samples)
    assert all_samples_are_alive
def clip_negatives(self, min_value: float = 0)

Clip spectrum values to some minimum value.

Args

min_value
value to which to clip existing spectrum values
Expand source code Browse git
def clip_negatives(self, min_value: float = 0):
    """Clip spectrum values to some minimum value.

    Args:
        min_value: value to which to clip existing spectrum values
    """
    self._spectra = pd.DataFrame(data=self._spectra.clip(min_value))
def compare_to(self, ss: SampleSet, n_bins: int = 20, density: bool = False) ‑> Tuple[dict, dict, dict]

Compare the current SampleSet to another SampleSet.

The function only compares the selected, mutual information of each SampleSet by computing the Jensen-Shannon distance between histograms of that information.

Args

ss
SampleSet to compare to
n_bins
number of bins we will sort both sample sets by
density
whether histograms should be in counts or density

Returns

Tuple of the following:

  • ss1_stats: dictionary of stats describing the first SampleSet
  • ss2_stats: dictionary of stats describing the second SampleSet
  • col_comparisons: dictionary of distance values comparing each stat
Expand source code Browse git
def compare_to(self, ss: SampleSet, n_bins: int = 20, density: bool = False) \
        -> Tuple[dict, dict, dict]:
    """Compare the current `SampleSet` to another `SampleSet`.

    The function only compares the selected, mutual information of
    each `SampleSet` by computing the Jensen-Shannon distance between
    histograms of that information.

    Args:
        ss: `SampleSet` to compare to
        n_bins: number of bins we will sort both sample sets by
        density: whether histograms should be in counts or density

    Returns:
        Tuple of the following:

        - ss1_stats: dictionary of stats describing the first `SampleSet`
        - ss2_stats: dictionary of stats describing the second `SampleSet`
        - col_comparisons: dictionary of distance values comparing each stat
    """
    TARGET_SUMMARY_STATS = ["min", "max", "median", "mean", "std"]
    STAT_PRECISION = 3

    # Get info columns we want to compare
    info_columns = [x for x in list(self.DEFAULT_INFO_COLUMNS)
                    if x not in list(self.DEFAULT_EXCLUSIONS_FROM_COMPARISON)]

    # Get both sample sets comparable columns of data
    info_df1 = self.info[info_columns]
    info_df2 = ss.info[info_columns]

    # Check valid columns in each sample set (cannot have None or 0)
    ss1_valid_cols = [
        c for c in info_df1.columns
        if pd.to_numeric(info_df1[c], errors="coerce").notnull().all() and any(info_df1[c])
    ]
    ss2_valid_cols = [
        c for c in info_df2.columns
        if pd.to_numeric(info_df2[c], errors="coerce").notnull().all() and any(info_df2[c])
    ]

    # Remove non shared column lists
    for i in ss1_valid_cols:
        if i not in ss2_valid_cols:
            ss1_valid_cols.remove(i)
    for i in ss2_valid_cols:
        if i not in ss1_valid_cols:
            ss2_valid_cols.remove(i)

    # Remove non shared column data
    info_df1 = info_df1[ss1_valid_cols]
    info_df2 = info_df2[ss2_valid_cols]

    # Bin the data
    ss1_stats = {}
    ss2_stats = {}
    col_comparisons = {}
    for i in ss1_valid_cols:
        ss1_stats[i] = {}
        ss2_stats[i] = {}
        hist_range = (
            min(min(info_df1[i]), min(info_df2[i])),
            max(max(info_df1[i]), max(info_df2[i]))
        )
        # Get data in bins and get counts for each bin
        hist1, bins1 = np.histogram(info_df1[i], bins=n_bins, range=hist_range, density=density)
        hist2, bins2 = np.histogram(info_df2[i], bins=n_bins, range=hist_range, density=density)
        ss1_stats[i]["density"] = density
        ss2_stats[i]["density"] = density
        ss1_stats[i]["hist"] = hist1
        ss2_stats[i]["hist"] = hist2
        ss1_stats[i]["bins"] = bins1
        ss2_stats[i]["bins"] = bins2
        ss1_stats[i]["range"] = hist_range
        ss2_stats[i]["range"] = hist_range

        summary_stats_df1 = info_df1[i].agg(TARGET_SUMMARY_STATS).round(decimals=STAT_PRECISION)
        ss1_stats[i].update(summary_stats_df1.to_dict())
        summary_stats_df2 = info_df2[i].agg(TARGET_SUMMARY_STATS).round(decimals=STAT_PRECISION)
        ss2_stats[i].update(summary_stats_df2.to_dict())

        js_dist = distance.jensenshannon(hist1, hist2, axis=0)
        col_comparisons[i] = js_dist

    return ss1_stats, ss2_stats, col_comparisons
def concat(self, ss_list: list)

Vertically concatenate multiple SampleSets into one SampleSet.

Combining of non-DataFrame properties (e.g., detector_info, synthesis_info, etc.) is NOT performed - it is the responsiblity of the user to ensure proper bookkeeping for these properties when concatenating SampleSets.

This method is most applicable to SampleSets containing measured data from the same detector which could not be made as a single SampleSet because of how measurements had to be taken (i.e., measurements were taken by distinct processes separated by time). Therefore, the user should avoid concatenating SampleSets when:

  • the data is from different detectors (note that SampleSets are given to models for prediction, and models should be 1-to-1 with physical detectors);
  • a mix of synthetic and measured data would occur (this could make an existing synthesis_info value ambiguous, but one way to handle this would be to add a new column to info distinguishing between synthetic and measured on a per-sample basis).

Args

ss_list
list of SampleSets to concatenate

Returns

SampleSet object

Expand source code Browse git
def concat(self, ss_list: list):
    """Vertically concatenate multiple `SampleSet`s into one `SampleSet`.

    Combining of non-DataFrame properties (e.g., detector_info, synthesis_info, etc.)
    is NOT performed - it is the responsiblity of the user to ensure proper bookkeeping for
    these properties when concatenating `SampleSet`s.

    This method is most applicable to `SampleSet`s containing measured data from the same
    detector which could not be made as a single `SampleSet` because of how measurements had to
    be taken (i.e., measurements were taken by distinct processes separated by time).
    Therefore, the user should avoid concatenating `SampleSet`s when:

    - the data is from different detectors
    (note that `SampleSet`s are given to models for prediction, and models should be 1-to-1
    with physical detectors);
    - a mix of synthetic and measured data would occur (this could make an existing
    `synthesis_info` value ambiguous, but one way to handle this would be to add a new column
    to `info` distinguishing between synthetic and measured on a per-sample basis).

    Args:
        ss_list: list of `SampleSet`s to concatenate

    Returns:
        `SampleSet` object
    """
    if not ss_list:
        return
    if not isinstance(ss_list, list) and not isinstance(ss_list, tuple):
        ss_list = [ss_list]
    spectra_types = list(
        set([self.spectra_type.value] + [x.spectra_type.value for x in ss_list])
    )
    n_spectra_types = len(spectra_types)
    if n_spectra_types == 2:
        if SpectraType.Background.value in spectra_types and \
           SpectraType.Foreground.value in spectra_types:
            self.spectra_type = SpectraType.BackgroundForeground
        else:
            self.spectra_type = SpectraType.Unknown
    elif n_spectra_types == 0 or n_spectra_types > 2:
        self.spectra_type = SpectraType.Unknown

    self._spectra = pd.concat(
        [self._spectra] + [ss.spectra for ss in ss_list],
        ignore_index=True,
        sort=False
    )
    self._sources = pd.concat(
        [self._sources] + [ss.sources for ss in ss_list],
        ignore_index=True,
        sort=False
    )
    self._sources = self._sources.where(pd.notnull(self._sources), 0)
    existing_info_df = self._info if not self._info.empty else None
    self._info = pd.concat(
        [existing_info_df] + [ss.info for ss in ss_list],
        ignore_index=True,
        sort=False
    )
    self._info = self._info.where(pd.notnull(self._info), None)
    self._prediction_probas = pd.concat(
        [self._prediction_probas] + [ss.prediction_probas for ss in ss_list],
        ignore_index=True,
        sort=False
    )
    self._prediction_probas = self._prediction_probas.where(
        pd.notnull(self._prediction_probas),
        0
    )
def downsample_spectra(self, target_bins: int = 128, min_frac=1e-08)

Uniformly down-bin spectra.

Args

target_bins
number of channels to which to bin the existing spectra

Raises

ValueError when binning is not a multiple of the target binning

Expand source code Browse git
def downsample_spectra(self, target_bins: int = 128, min_frac=1e-8):
    """Uniformly down-bin spectra.

    Args:
        target_bins: number of channels to which to bin the existing spectra

    Raises:
        `ValueError` when binning is not a multiple of the target binning
    """
    if self.n_channels % target_bins == 0:
        n_per_group = int(self.n_channels / target_bins)
        transformation = np.zeros([target_bins, self.n_channels])
        for i in range(target_bins):
            transformation[i, (i * n_per_group):((i * n_per_group) + n_per_group)] = 1
    else:
        msg = "Current binning ({}) is not a multiple of the target binning ({})".format(
            self.n_channels,
            target_bins
        )
        raise ValueError(msg)
    self._spectra = pd.DataFrame(
        data=np.matmul(
            self._spectra.values,
            transformation.T
        )
    )
    self._spectra[self._spectra < min_frac] = 0
def drop_sources(self, col_names: Iterable = ['Cosmic', 'PotassiumInSoil', 'UraniumInSoil', 'ThoriumInSoil'], normalize_sources: bool = True, target_level: str = 'Seed')

Drop columns from sources.

Args

col_names
names of the sources columns to be dropped. The names of background seeds are used by default as removing the ground truth for background sources when dealing with synthetic, gross spectra is a common operation.
normalize_sources
whether to normalize the sources DataFrame after dropping the column(s)
target_level
SampleSet.sources column level to use
Expand source code Browse git
def drop_sources(self, col_names: Iterable = DEFAULT_BG_SEED_NAMES,
                 normalize_sources: bool = True,
                 target_level: str = "Seed"):
    """Drop columns from `sources`.

    Args:
        col_names: names of the sources columns to be dropped.
            The names of background seeds are used by default as removing the
            ground truth for background sources when dealing with synthetic,
            gross spectra is a common operation.
        normalize_sources: whether to normalize the sources DataFrame after dropping
            the column(s)
        target_level: `SampleSet.sources` column level to use
    """
    self.sources.drop(col_names, axis=1, inplace=True, level=target_level)
    if normalize_sources:
        self.normalize_sources()
def drop_sources_columns_with_all_zeros(self)

Remove columns from the sources DataFrame that contain only zeros.

Modifications are made in-place.

Expand source code Browse git
def drop_sources_columns_with_all_zeros(self):
    """Remove columns from the sources `DataFrame` that contain only zeros.

    Modifications are made in-place.
    """
    idxs = (self.sources != 0).any(axis=0)
    self.sources = self.sources.loc[:, idxs]
    self.sources.columns = self.sources.columns.remove_unused_levels()

    return idxs
def drop_spectra_with_no_contributors(self) ‑> numpy.ndarray

Remove samples where the spectrum has no recorded contributor.

Modifications are inplace.

Returns

A boolean mask of which rows were kept

Expand source code Browse git
def drop_spectra_with_no_contributors(self) -> np.ndarray:
    """Remove samples where the spectrum has no recorded contributor.

    Modifications are inplace.

    Returns:
        A boolean mask of which rows were kept
    """
    idxs = np.where(self.sources.values.sum(axis=1) == 0)[0]
    kept_mask = ~np.in1d(self.sources.index.values, idxs)

    self.spectra = self.spectra.drop(index=idxs).reset_index(drop=True)
    self.sources = self.sources.drop(index=idxs).reset_index(drop=True)
    self.info = self.info.drop(index=idxs).reset_index(drop=True)

    return kept_mask
def extend(self, spectra: Union[dict, list, np.array, pd.DataFrame], sources: pd.DataFrame, info: Union[list, pd.DataFrame])

Extend the current SampleSet with the given data.

Always verify that the data was appended in the way you expect based on what input type you are preferring to work with.

Args

spectra
spectra to append to the current spectra DataFrame
sources
sources to append to the current sources DataFrame
info
info to append to the current info DataFrame

Raises

ValueError when internal DataFrame lengths do not match

Expand source code Browse git
def extend(self, spectra: Union[dict, list, np.array, pd.DataFrame],
           sources: pd.DataFrame, info: Union[list, pd.DataFrame]):
    """Extend the current SampleSet with the given data.

    Always verify that the data was appended in the way you expect based on what input type
    you are preferring to work with.

    Args:
        spectra: spectra to append to the current spectra DataFrame
        sources: sources to append to the current sources DataFrame
        info: info to append to the current info DataFrame

    Raises:
        `ValueError` when internal `DataFrame` lengths do not match
    """
    if not spectra.shape[0] == sources.shape[0] == info.shape[0]:
        msg = "The number of rows in each of the required positional arguments must be same."
        raise ValueError(msg)

    self._spectra = self._spectra\
        .append(pd.DataFrame(spectra), ignore_index=True, sort=True)\
        .fillna(0)

    new_sources_column_index = pd.MultiIndex.from_tuples(
        sources.keys(),
        names=SampleSet.SOURCES_MULTI_INDEX_NAMES
    )
    new_sources_df = pd.DataFrame(sources.values(), columns=new_sources_column_index)
    self._sources = self._sources\
        .append(new_sources_df, ignore_index=True, sort=True)\
        .fillna(0)

    self._info = self._info\
        .append(pd.DataFrame(info), ignore_index=True, sort=True)
def get_all_channel_energies(self, fractional_energy_bins=None) ‑> numpy.ndarray

Get the energy (in keV) represented by the lower edge of each channel for all samples.

See docstring for _channels_to_energies() for more details.

Args

fractional_energy_bins
array of fractions representing the spacing of the channels (for arbitrary channel structures)

Returns

2-D array of energy values in keV for all samples

Expand source code Browse git
def get_all_channel_energies(self, fractional_energy_bins=None) -> np.ndarray:
    """Get the energy (in keV) represented by the lower edge of each channel for all samples.

    See docstring for `_channels_to_energies()` for more details.

    Args:
        fractional_energy_bins: array of fractions representing the spacing of
            the channels (for arbitrary channel structures)

    Returns:
        2-D array of energy values in keV for all samples
    """
    if fractional_energy_bins is None:
        fractional_energy_bins = np.linspace(0, 1, self.n_channels)

    all_channel_energies = []
    for i in range(self.n_samples):
        channel_energies = self.get_channel_energies(i, fractional_energy_bins)
        all_channel_energies.append(channel_energies)

    return all_channel_energies
def get_channel_energies(self, sample_index, fractional_energy_bins=None) ‑> numpy.ndarray

Get the energy (in keV) represented by the lower edge of each channel for a single sample at a specific index.

See docstring for _channels_to_energies() for more details.

Args

sample_index
index of the specific sample for which to obtain energies
fractional_energy_bins
array of fractions representing the spacing of the channels (for arbitrary channel structures)

Returns

Array of energy values in keV

Expand source code Browse git
def get_channel_energies(self, sample_index, fractional_energy_bins=None) -> np.ndarray:
    """Get the energy (in keV) represented by the lower edge of each channel for a
    single sample at a specific index.

    See docstring for `_channels_to_energies()` for more details.

    Args:
        sample_index: index of the specific sample for which to obtain energies
        fractional_energy_bins: array of fractions representing the spacing of
            the channels (for arbitrary channel structures)

    Returns:
        Array of energy values in keV
    """
    if fractional_energy_bins is None:
        fractional_energy_bins = np.linspace(0, 1, self.n_channels)

    # TODO: raise error if ecal info is missing for any row

    offset, gain, quadratic, cubic, low_energy = self.ecal[sample_index]
    channel_energies = self._channels_to_energies(
        fractional_energy_bins,
        offset, gain, quadratic, cubic, low_energy
    )

    return channel_energies
def get_confidences(self, fg_seeds_ss: SampleSet, bg_seed_ss: SampleSet = None, bg_cps: float = None, is_lpe: bool = False, confidence_func: Callable = None, **confidence_func_kwargs) ‑> numpy.ndarray

Get confidence measure for predictions as a NumPy array.

Args

fg_seeds_ss
Sampleset containing foreground seeds
bg_seed_ss
Sampleset containing a single background seed
bg_cps
count rate used to scale bg_seed_ss
is_lpe
whether predictions are for multi-class classification or label proportion estimation (LPE)
confidence_func
metric function used to compare spectral reconstructions and
confidence_func_kwargs
kwargs for confidence metric function

Returns

Array containing confidence metric for each prediction

Expand source code Browse git
def get_confidences(self, fg_seeds_ss: SampleSet, bg_seed_ss: SampleSet = None,
                    bg_cps: float = None, is_lpe: bool = False,
                    confidence_func: Callable = None,
                    **confidence_func_kwargs) -> np.ndarray:
    """Get confidence measure for predictions as a NumPy array.

    Args:
        fg_seeds_ss: `Sampleset` containing foreground seeds
        bg_seed_ss: `Sampleset` containing a single background seed
        bg_cps: count rate used to scale `bg_seed_ss`
        is_lpe: whether predictions are for multi-class classification or label proportion
            estimation (LPE)
        confidence_func: metric function used to compare spectral reconstructions and
        confidence_func_kwargs: kwargs for confidence metric function

    Returns:
        Array containing confidence metric for each prediction
    """
    if fg_seeds_ss.spectra_type != SpectraType.Foreground:
        msg = (
            "`fg_seeds_ss` must have a `spectra_type` of `Foreground`. "
            f"Its `spectra_type` is `{fg_seeds_ss.spectra_type}`."
        )
        raise ValueError(msg)
    if self.spectra_type == SpectraType.Gross:
        if self.info.total_counts.isna().any() or (self.info.total_counts <= 0).any():
            msg = (
                "This `SampleSet` must have the `info.total_counts` column filled in "
                "with values greater than zero."
                "If there are samples with spectra of all zeros, remove them first."
            )
            raise ValueError(msg)
        if self.info.live_time.isna().any() or (self.info.live_time <= 0).any():
            msg = (
                "This `SampleSet` must have the `info.live_time` column filled in "
                "with values greater than zero."
                "If there are samples with live of zero, remove them first."
            )
            raise ValueError(msg)
        if not bg_seed_ss:
            raise ValueError("`bg_seed_ss` is required for current `spectra_type` of `Gross`.")
        if bg_seed_ss.n_samples != 1:
            raise ValueError("`bg_seed_ss` must have exactly one sample.")
        if bg_seed_ss.spectra_type != SpectraType.Background:
            msg = (
                "`bg_seed_ss` must have a `spectra_type` of `Background`. "
                f"Its `spectra_type` is `{bg_seed_ss.spectra_type}`."
            )
            raise ValueError(msg)
        if not bg_cps or bg_cps <= 0:
            raise ValueError("Positive background count rate required.")

    if confidence_func is None:
        from riid.losses import jensen_shannon_divergence
        confidence_func = jensen_shannon_divergence

    if is_lpe:
        recon_proportions = self.prediction_probas.values
    else:
        max_indices = np.argmax(self.prediction_probas.values, axis=1)
        recon_proportions = np.zeros(self.prediction_probas.values.shape)
        recon_proportions[np.arange(self.n_samples), max_indices] = 1

    reconstructions = np.dot(recon_proportions, fg_seeds_ss.spectra.values)

    if self.spectra_type == SpectraType.Gross:
        bg_spectrum = bg_seed_ss.spectra.iloc[0].values
        normalized_bg_spectrum = bg_spectrum / bg_spectrum.sum()

        bg_counts = bg_cps * self.info.live_time.values
        fg_counts = (self.info.total_counts.values - bg_counts).clip(1)

        scaled_bg_spectra = get_expected_spectra(normalized_bg_spectrum, bg_counts)
        scaled_fg_spectra = reconstructions * fg_counts[:, np.newaxis]

        reconstructions = scaled_fg_spectra + scaled_bg_spectra

    confidences = confidence_func(
        reconstructions.astype(np.float64),
        self.spectra.values,
        **confidence_func_kwargs
    )
    confidences = np.array(confidences)
    return confidences
def get_labels(self, target_level='Isotope', max_only=True, include_value=False, min_value=0.01, level_aggregation=None)

Get row ground truth labels for each sample based on sources values.

See docstring for _get_row_labels() for more details.

Expand source code Browse git
def get_labels(self, target_level="Isotope", max_only=True,
               include_value=False, min_value=0.01,
               level_aggregation=None):
    """Get row ground truth labels for each sample based on `sources` values.

    See docstring for `_get_row_labels()` for more details.
    """
    labels = _get_row_labels(
        self.sources,
        target_level=target_level,
        max_only=max_only,
        include_value=include_value,
        min_value=min_value,
        level_aggregation=level_aggregation,
    )
    return labels
def get_multiclass_jsds(self, fg_seeds_ss: SampleSet, target_level: str) ‑> list

For each sample, this constructs a dictionary containing seed name to JSD using the sample's top prediction.

Args

fg_seeds_ss
SampleSet from which to pull seeds for computing JSD
prediction_target_level
the level at which predictions were made by a model

Returns

List of dictionaries where keys are seed name and values are JSD

Expand source code Browse git
def get_multiclass_jsds(self, fg_seeds_ss: SampleSet, target_level: str) -> list:
    """For each sample, this constructs a dictionary containing seed name to JSD using
    the sample's top prediction.

    Args:
        fg_seeds_ss: `SampleSet` from which to pull seeds for computing JSD
        prediction_target_level: the level at which predictions were made by a model

    Returns:
        List of dictionaries where keys are seed name and values are JSD
    """
    test_prediction_labels = self.get_predictions(target_level)
    fg_seed_labels = fg_seeds_ss.get_labels(target_level)

    jsds = []
    for i, pred_label in enumerate(test_prediction_labels):
        test_spectrum = self.spectra.iloc[i]
        seeds_for_pred = fg_seeds_ss[fg_seed_labels == pred_label]
        seed_labels_for_pred = seeds_for_pred.get_labels("Seed")
        jsds_for_sample = {}
        for j, seed_label in enumerate(seed_labels_for_pred):
            seed_spectrum = seeds_for_pred.spectra.iloc[j]
            jsd = distance.jensenshannon(test_spectrum, seed_spectrum)
            jsds_for_sample[seed_label] = jsd
        jsds.append(jsds_for_sample)
    return jsds
def get_predictions(self, target_level='Isotope', max_only=True, include_value=False, min_value=0.01, level_aggregation=None)

Get row predictions for each spectrum based on prediction_probas values.

See docstring for _get_row_labels() for more details.

Expand source code Browse git
def get_predictions(self, target_level="Isotope", max_only=True,
                    include_value=False, min_value=0.01,
                    level_aggregation=None):
    """Get row predictions for each spectrum based on `prediction_probas` values.

    See docstring for `_get_row_labels()` for more details.
    """
    labels = _get_row_labels(
        self.prediction_probas,
        target_level=target_level,
        max_only=max_only,
        include_value=include_value,
        min_value=min_value,
        level_aggregation=level_aggregation,
    )
    return labels
def get_samples(self) ‑> numpy.ndarray

Get spectra as a NumPy array.

Replaces NaN values with 0.

Returns

Array containing spectrum data

Expand source code Browse git
def get_samples(self) -> np.ndarray:
    """Get `spectra` as a NumPy array.

    Replaces NaN values with 0.

    Returns:
        Array containing spectrum data
    """
    spectra_values = np.nan_to_num(self._spectra)
    return spectra_values
def get_source_contributions(self, target_level='Isotope') ‑> numpy.ndarray

Get sources as a NumPy array.

Replaces NaN values with 0.

Returns

Array containing the ground truth contributions for each sample

Expand source code Browse git
def get_source_contributions(self, target_level="Isotope") -> np.ndarray:
    """Get `sources` as a NumPy array.

    Replaces NaN values with 0.

    Returns:
        Array containing the ground truth contributions for each sample
    """
    collapsed_sources = self.sources.T.groupby(target_level).sum().T
    sources_values = np.nan_to_num(collapsed_sources)
    return sources_values
def get_spectral_distance_matrix(self, distance_func=<function jensenshannon>, target_level='Seed') ‑> pandas.core.frame.DataFrame

Compute the distance between all pairs of spectra.

This method is intended for use on seed spectra (i.e., templates). Calling this method on large samplesets, such as those produced by the static synthesizer, is not recommended. This method only computes the upper triangle and diagonal of the matrix since the lower triangle would be a copy of the upper triangle. The diagonal is computed, for when Poisson noise is present, for two main reasons:

  • the distance between the same source will effectively never be zero.
  • there is sometimes research interest in comparing the diagonal to the upper triangle.

Args

distance_func
specific distance function to use
target_level
SampleSet.sources column level to use

Returns

A 2-D array of distance values

Expand source code Browse git
def get_spectral_distance_matrix(self, distance_func=distance.jensenshannon,
                                 target_level="Seed") -> pd.DataFrame:
    """Compute the distance between all pairs of spectra.

    This method is intended for use on seed spectra (i.e., templates).
    Calling this method on large samplesets, such as those produced by the static
    synthesizer, is not recommended.
    This method only computes the upper triangle and diagonal of the matrix since the lower
    triangle would be a copy of the upper triangle.
    The diagonal is computed, for when Poisson noise is present, for two main reasons:

    - the distance between the same source will effectively never be zero.
    - there is sometimes research interest in comparing the diagonal to the upper triangle.

    Args:
        distance_func: specific distance function to use
        target_level: `SampleSet.sources` column level to use

    Returns:
        A 2-D array of distance values
    """
    distance_values = self._get_spectral_distances(distance_func)
    row_labels = self.get_labels(target_level=target_level)
    col_labels = self.sources.columns.get_level_values(level=target_level)
    distance_df = _get_distance_df_from_values(distance_values, row_labels, col_labels)

    return distance_df
def normalize(self, p: float = 1, clip_negatives: bool = True)

Apply L-p normalization to spectra in place.

Default is L1 norm (p=1) can be interpreted as a forming a probability mass function (PMF). L2 norm (p=2) is unit energy (sum of squares == 1).

Reference: https://en.wikipedia.org/wiki/Parseval's_theorem

Args

p
exponent to which the spectra values are raised
clip_negatives
whether negative values will be removed from the spectra and replaced with 0
Expand source code Browse git
def normalize(self, p: float = 1, clip_negatives: bool = True):
    """Apply L-p normalization to `spectra` in place.

    Default is L1 norm (p=1) can be interpreted as a forming a probability mass function (PMF).
    L2 norm (p=2) is unit energy (sum of squares == 1).

    Reference: https://en.wikipedia.org/wiki/Parseval's_theorem

    Args:
        p: exponent to which the spectra values are raised
        clip_negatives: whether negative values will be removed from the spectra and
            replaced with 0
    """
    if clip_negatives:
        self.clip_negatives()
    elif (self._spectra.values < 0).sum():
        logging.warning(
            "You are performing a normalization operation on spectra which contain negative " +
            "values.  Consider applying `clip_negatives`."
        )

    energy = (self._spectra.values ** p).sum(axis=1)[:, np.newaxis]
    energy[energy == 0] = 1
    self._spectra = pd.DataFrame(data=self._spectra.values / energy ** (1/p))
    if p == 1:
        self.spectra_state = SpectraState.L1Normalized
    elif p == 2:
        self.spectra_state = SpectraState.L2Normalized
    else:
        self.spectra_state = SpectraState.Unknown
def normalize_sources(self)

Normalize sources such that rows sum to 1.

Expand source code Browse git
def normalize_sources(self):
    """Normalize `sources` such that rows sum to 1."""
    row_sums = self._sources.sum(axis=1)
    row_sums[row_sums == 0] = 1
    self._sources = self._sources.clip(0).divide(
        row_sums,
        axis=0,
    )
def replace_nan(self, replace_value: float = 0)

Replace np.nan values.

Args

replace_value
value with which NaN will be replaced
Expand source code Browse git
def replace_nan(self, replace_value: float = 0):
    """Replace np.nan values.

    Args:
        replace_value: value with which NaN will be replaced

    """
    self._spectra.replace(np.nan, replace_value)
    self._sources.replace(np.nan, replace_value)
    self._info.replace(np.nan, replace_value)
def sample(self, n_samples: int, random_seed: int = None) ‑> SampleSet

Randomly sample the SampleSet.

Args

n_samples
number of random samples to be returned
random_seed
seed value for random number generation

Returns

Sampleset of randomly selected measurements

Expand source code Browse git
def sample(self, n_samples: int, random_seed: int = None) -> SampleSet:
    """Randomly sample the SampleSet.

    Args:
        n_samples: number of random samples to be returned
        random_seed: seed value for random number generation

    Returns:
        `Sampleset` of randomly selected measurements
    """
    if random_seed:
        random.seed(random_seed)

    if n_samples > self.n_samples:
        n_samples = self.n_samples

    random_indices = random.sample(self.spectra.index.values.tolist(), n_samples)
    random_mask = np.isin(self.spectra.index, random_indices)
    return self[random_mask]
def set_dead_time_proportions(self)

Computes dead time proportion for each sample and saves it in the SampleSet info.

Expand source code Browse git
def set_dead_time_proportions(self):
    """Computes dead time proportion for each sample and saves it in the `SampleSet` info.
    """
    dead_time_props = self._get_dead_time_proportions()
    self.info[self.DEAD_TIME_PROP_INFO_KEY] = dead_time_props
def shuffle(self, inplace: bool = True, random_state: int = None) ‑> SampleSet

Randomly reorder all DataFrames.

Args

inplace
whether to perform the operation in-place
random_state
random seed for reproducing specific shuffles

Returns

SampleSet object when inplace is False

Expand source code Browse git
def shuffle(self, inplace: bool = True, random_state: int = None) -> SampleSet:
    """Randomly reorder all DataFrames.

    Args:
        inplace: whether to perform the operation in-place
        random_state: random seed for reproducing specific shuffles

    Returns:
        `SampleSet` object when `inplace` is False
    """
    new_row_ordering = np.arange(self.n_samples)
    np.random.default_rng(random_state).shuffle(new_row_ordering)

    new_ss = self if inplace else self[:]

    new_ss.spectra = self.spectra.iloc[new_row_ordering]\
        .reset_index(drop=True)
    new_ss.sources = self.sources.iloc[new_row_ordering]\
        .reset_index(drop=True)
    new_ss.info = self.info.iloc[new_row_ordering]\
        .reset_index(drop=True)
    if not new_ss.prediction_probas.empty:
        new_ss.prediction_probas = self.prediction_probas.iloc[new_row_ordering]\
            .reset_index(drop=True)

    if not inplace:
        return new_ss
def sources_columns_to_dict(self, target_level='Isotope') ‑> Union[dict, list]

Convert the MultiIndex columns of sources to a dictionary.

Note: depending on target_level and sources columns, duplicate values are possible.

Args

target_level
SampleSet.sources column level to use

Returns

If target_level is "Category" or "Isotope," then a dict is returned. If target_level is "Seed," then a list is returned.

Raises

ValueError when target_level is invalid

Expand source code Browse git
def sources_columns_to_dict(self, target_level="Isotope") -> Union[dict, list]:
    """Convert the MultiIndex columns of `sources` to a dictionary.

    Note: depending on `target_level` and `sources` columns, duplicate values are possible.

    Args:
        target_level: `SampleSet.sources` column level to use

    Returns:
        If `target_level` is "Category" or "Isotope," then a dict is returned.
        If `target_level` is "Seed," then a list is returned.

    Raises:
        `ValueError` when `target_level` is invalid
    """
    self._check_target_level(
        target_level,
        levels_allowed=SampleSet.SOURCES_MULTI_INDEX_NAMES
    )

    d = {}
    column_tuples = self.sources.columns.to_list()
    if target_level == "Seed":
        d = column_tuples
    elif target_level == "Isotope":
        for t in column_tuples:
            _, i, _ = t
            if i not in d:
                d[i] = [t]
            if t not in d[i]:
                d[i].append(t)
    else:  # target_level == "Category":
        for t in column_tuples:
            c, i, _ = t
            if c not in d:
                d[c] = {}
            if i not in d[c]:
                d[c][i] = [t]
            if t not in d[c][i]:
                d[c][i].append(t)

    return d
def split_fg_and_bg(self, bg_seed_names: Iterable = ['Cosmic', 'PotassiumInSoil', 'UraniumInSoil', 'ThoriumInSoil']) ‑> Tuple[SampleSetSampleSet]

Split the current SampleSet into two new SampleSets, one containing only foreground sources and the other containing only background sources.

Foreground sources are assumed to be anything that is not designated as a background source.

Args

bg_seeds_names
names of the seeds which are considered background sources. This list be customized to also extract atypical background sources such as calibration sources.

Returns

Two SampleSets, the first containing only foregrounds and the second only containing backgrounds.

Expand source code Browse git
def split_fg_and_bg(self, bg_seed_names: Iterable = DEFAULT_BG_SEED_NAMES) \
        -> Tuple[SampleSet, SampleSet]:
    """Split the current `SampleSet` into two new `SampleSet`s, one containing only foreground
    sources and the other containing only background sources.

    Foreground sources are assumed to be anything that is not designated as a background source.

    Args:
        bg_seeds_names: names of the seeds which are considered background sources.
            This list be customized to also extract atypical background sources such as
            calibration sources.

    Returns:
        Two `SampleSet`s, the first containing only foregrounds and the second only
        containing backgrounds.
    """
    seed_labels = self.get_labels(target_level="Seed")
    bg_mask = seed_labels.isin(bg_seed_names)

    fg_seeds_ss = self[~bg_mask]
    fg_seeds_ss.drop_sources_columns_with_all_zeros()
    fg_seeds_ss.spectra_type = SpectraType.Foreground
    fg_seeds_ss.spectra_state = self.spectra_state

    bg_seeds_ss = self[bg_mask]
    bg_seeds_ss.drop_sources_columns_with_all_zeros()
    bg_seeds_ss.spectra_type = SpectraType.Background
    bg_seeds_ss.spectra_state = self.spectra_state

    return fg_seeds_ss, bg_seeds_ss
def squash(self) ‑> SampleSet

Combine all rows of the SampleSet into a single row.

All data suited for summing is summed, otherwise the info from the first sample is used.

Returns

A flattened SampleSet

Expand source code Browse git
def squash(self) -> SampleSet:
    """Combine all rows of the `SampleSet` into a single row.

    All data suited for summing is summed, otherwise the info from the first
    sample is used.

    Returns:
        A flattened `SampleSet`
    """
    flat_spectra = self.spectra.sum(axis=0).to_frame().T
    flat_sources = self.sources.sum(axis=0).to_frame().T
    flat_prediction_probas = self.prediction_probas.sum(axis=0).to_frame().T
    flat_info = self.info.iloc[0].to_frame().T

    if "description" in flat_info:
        flat_info.description = "squashed"
    if "live_time" in flat_info:
        flat_info.live_time = self.info.live_time.sum()
    if "real_time" in flat_info:
        flat_info.real_time = self.info.real_time.sum()
    if "total_counts" in flat_info:
        flat_info.total_counts = self.info.total_counts.sum()
    if "snr" in flat_info:
        flat_info.snr = self.info.snr.sum()

    # Create a new SampleSet with the flattened data
    flat_ss = SampleSet()
    flat_ss.spectra = flat_spectra
    flat_ss.sources = flat_sources
    flat_ss.prediction_probas = flat_prediction_probas
    flat_ss.info = flat_info
    # Copy dictionary and other non-DataFrame objects from full ss
    flat_ss._detector_info = self._detector_info
    flat_ss._synthesis_info = self._synthesis_info
    flat_ss._measured_or_synthetic = self._measured_or_synthetic

    return flat_ss
def to_hdf(self, path: str | Path, verbose=False, **kwargs)

Write the SampleSet to disk as a HDF file.

Args

path
file path at which to save as an HDF file
verbose
whether to display detailed output
kwargs
additional arguments to be passed to the Pandas.HDFStore constructor

Raises

ValueError when provided path extension is invalid

Expand source code Browse git
def to_hdf(self, path: str | Path, verbose=False, **kwargs):
    """Write the `SampleSet` to disk as a HDF file.

    Args:
        path: file path at which to save as an HDF file
        verbose: whether to display detailed output
        kwargs: additional arguments to be passed to the `Pandas.HDFStore` constructor

    Raises:
        `ValueError` when provided path extension is invalid
    """
    path = Path(path)
    if path.suffix != riid.SAMPLESET_HDF_FILE_EXTENSION:
        logging.warning(f"Path does not end in {riid.SAMPLESET_HDF_FILE_EXTENSION}")

    _write_hdf(self, path, **kwargs)
    if verbose:
        logging.info(f"Saved SampleSet to '{path}'")
def to_json(self, path: str | Path, verbose=False)

Write the SampleSet to disk as a JSON file.

Warning: it is not recommended that you use this on large SampleSet objects. Consider SampleSet.to_hdf() instead in such cases.

Args

path
file path at which to save as an HDF file
verbose
whether to display detailed output

Raises

ValueError when provided path extension is invalid

Expand source code Browse git
def to_json(self, path: str | Path, verbose=False):
    """Write the `SampleSet` to disk as a JSON file.

    Warning: it is not recommended that you use this on large `SampleSet` objects.
    Consider `SampleSet.to_hdf()` instead in such cases.

    Args:
        path: file path at which to save as an HDF file
        verbose: whether to display detailed output

    Raises:
        `ValueError` when provided path extension is invalid
    """
    path = Path(path)
    if path.suffix != riid.SAMPLESET_JSON_FILE_EXTENSION:
        logging.warning(f"Path does not end in {riid.SAMPLESET_JSON_FILE_EXTENSION}")

    _write_json(self, path)
    if verbose:
        logging.info(f"Saved SampleSet to '{path}'")
def to_pcf(self, path: str | Path, verbose=False)

Write the SampleSet to disk as a PCF.

Args

path
file path at which to save as a PCF
verbose
whether to display detailed output

Raises

ValueError when provided path extension is invalid

Expand source code Browse git
def to_pcf(self, path: str | Path, verbose=False):
    """Write the `SampleSet` to disk as a PCF.

    Args:
        path: file path at which to save as a PCF
        verbose: whether to display detailed output

    Raises:
        `ValueError` when provided path extension is invalid
    """
    path = Path(path)
    if path.suffix != riid.PCF_FILE_EXTENSION:
        logging.warning(f"Path does not end in {riid.PCF_FILE_EXTENSION}")

    _dict_to_pcf(_ss_to_pcf_dict(self, verbose), path, verbose)

    if verbose:
        logging.info(f"Saved SampleSet to '{path}'")
def update_timestamp(self)

Set the timestamp for all samples to the current UTC date and time.

Expand source code Browse git
def update_timestamp(self):
    """Set the timestamp for all samples to the current UTC date and time."""
    self.info.timestamp = _get_utc_timestamp()
def upsample_spectra(self, target_bins: int = 4096)

Uniformly up-bin spectra.

Args

target_bins
number of bins into which the current spectra should be split.
Expand source code Browse git
def upsample_spectra(self, target_bins: int = 4096):
    """Uniformly up-bin spectra.

    Args:
        target_bins: number of bins into which the current spectra
            should be split.
    """
    if target_bins % self.n_channels == 0:
        n_per_group = int(target_bins / self.n_channels)
        transformation = np.zeros([target_bins, self.n_channels])
        for i in range(self.n_channels):
            transformation[(i * n_per_group):((i * n_per_group) + n_per_group), i] = \
                                1.0 / n_per_group
    else:
        raise ValueError(
            ("Desired number of bins ({}) is cannot"
             " be uniformly mapped from existing"
             " channels ({})").format(target_bins, self.n_channels))
    self._spectra = pd.DataFrame(
        data=np.matmul(
            self._spectra.values,
            transformation.T))
class SeedMixer (seeds_ss: SampleSet, mixture_size: int = 2, dirichlet_alpha: float = 2.0, restricted_isotope_pairs: List[Tuple[str, str]] = [], rng: numpy.random._generator.Generator = None)

Randomly mixes seeds in a SampleSet

Args

seeds_ss
SampleSet of n seed spectra where n >= mixture_size
mixture_size
number of templates to mix
dirichlet_alpha
Dirichlet parameter controlling the nature of proportions
restricted_pairs
list of 2-tuples containing pairs of isotope strings that are not to be mixed together
rng
NumPy random number generator, useful for experiment repeatability

Raises

AssertionError when mixture_size is less than 2

Expand source code Browse git
class SeedMixer():
    """Randomly mixes seeds in a `SampleSet` """
    def __init__(self, seeds_ss: SampleSet, mixture_size: int = 2, dirichlet_alpha: float = 2.0,
                 restricted_isotope_pairs: List[Tuple[str, str]] = [], rng: Generator = None):
        """
        Args:
            seeds_ss: `SampleSet` of `n` seed spectra where `n` >= `mixture_size`
            mixture_size: number of templates to mix
            dirichlet_alpha: Dirichlet parameter controlling the nature of proportions
            restricted_pairs: list of 2-tuples containing pairs of isotope strings that
                are not to be mixed together
            rng: NumPy random number generator, useful for experiment repeatability

        Raises:
            AssertionError when `mixture_size` is less than 2
        """
        assert mixture_size >= 2

        self.seeds_ss = seeds_ss
        self.mixture_size = mixture_size
        self.dirichlet_alpha = dirichlet_alpha
        self.restricted_isotope_pairs = restricted_isotope_pairs
        if rng is None:
            self.rng = np.random.default_rng()
        else:
            self.rng = rng

        self._check_seeds()

    def _check_seeds(self, skip_health_check: bool = False):
        if not skip_health_check:
            self.seeds_ss.check_seed_health()
        n_sources_per_row = np.count_nonzero(
            self.seeds_ss.get_source_contributions(),
            axis=1
        )
        if not np.all(n_sources_per_row == 1):
            raise ValueError("At least one provided seed contains a mixture of sources.")
        if np.any(np.count_nonzero(self.seeds_ss.get_source_contributions(), axis=1) == 0):
            raise ValueError("At least one provided seed contains no ground truth.")
        for ecal_column in self.seeds_ss.ECAL_INFO_COLUMNS:
            all_ecal_columns_close_to_one = np.all(np.isclose(
                self.seeds_ss.info[ecal_column],
                self.seeds_ss.info[ecal_column][0]
            ))
            if not all_ecal_columns_close_to_one:
                raise ValueError((
                    f"{ecal_column} is not consistent. "
                    "All seeds must have the same energy calibration."
                ))

    def __call__(self, n_samples: int, max_batch_size: int = 100,
                 skip_health_check: bool = False) -> Iterator[SampleSet]:
        """Yields batches of seeds one at a time until a specified number of samples has
        been reached.

        Dirichlet intuition:

        - Higher alpha: values will converge on 1/N where N is mixture size
        - Lower alpha: values will converge on ~0 but there will be a single 1

        Using `np.random.dirichlet` with too small of an alpha will result in NaNs
            (per https://github.com/rust-random/rand/pull/1209)
        Using `numpy.random.Generator.dirichlet` instead avoids this.

        TODO: seed-level restrictions

        Args:
            n_samples: total number of mixture seeds to produce across all batches
            max_batch_size: maximum size of a batch per yield
            skip_health_check: whether to skip the seed health check

        Returns:
            Generator of `SampleSet`s
        """
        self._check_seeds(skip_health_check)

        isotope_to_seeds = self.seeds_ss.sources_columns_to_dict(target_level="Isotope")
        isotopes = list(isotope_to_seeds.keys())
        seeds = list(isotope_to_seeds.values())  # not necessarily distinct
        seeds = [item for sublist in seeds for item in sublist]
        n_seeds = len(seeds)
        n_distinct_seeds = len(set(seeds))
        if n_distinct_seeds != n_seeds:
            raise ValueError("Seed names must be unique.")
        isotope_probas = list([len(isotope_to_seeds[i]) / n_seeds for i in isotopes])
        spectra_row_labels = self.seeds_ss.sources.idxmax(axis=1)
        restricted_isotope_bidict = bidict({k: v for k, v in self.restricted_isotope_pairs})
        seed_spectra_df = self.seeds_ss.spectra.copy()
        seed_spectra_df.index = spectra_row_labels

        try:
            _ = iter(self.dirichlet_alpha)
        except TypeError:
            seed_to_alpha = {s: self.dirichlet_alpha for s in seeds}
        else:
            if n_seeds != len(self.dirichlet_alpha):
                raise ValueError("Number of Dirichlet alphas does not equal the number of seeds.")
            seed_to_alpha = {s: a for s, a in zip(seeds, self.dirichlet_alpha)}

        n_samples_produced = 0
        while n_samples_produced < n_samples:
            batch_size = n_samples - n_samples_produced
            if batch_size > max_batch_size:
                batch_size = max_batch_size
            # Make batch
            isotope_choices = [
                get_choices(
                    [],
                    isotopes.copy(),
                    np.array(isotope_probas.copy()),
                    restricted_isotope_bidict,
                    self.mixture_size,
                    self.rng,
                )
                for _ in range(batch_size)
            ]
            seed_choices = [
                [isotope_to_seeds[i][self.rng.choice(len(isotope_to_seeds[i]))] for i in c]
                for c in isotope_choices
            ]
            batch_dirichlet_alphas = np.array([
                [seed_to_alpha[i] for i in s]
                for s in seed_choices
            ])
            seed_ratios = [
                self.rng.dirichlet(
                    alpha=alpha
                ) for alpha in batch_dirichlet_alphas
            ]

            # Compute the spectra
            spectra = np.array([
                (seed_ratios[i] * seed_spectra_df.loc[m].T).sum(axis=1)
                for i, m in enumerate(seed_choices)
            ])

            # Build SampleSet
            batch_ss = SampleSet()
            batch_ss.detector_info = self.seeds_ss.detector_info
            batch_ss.spectra_state = self.seeds_ss.spectra_state
            batch_ss.spectra_type = self.seeds_ss.spectra_type
            batch_ss.spectra = pd.DataFrame(spectra)
            batch_ss.info = pd.DataFrame(
                [self.seeds_ss.info.iloc[0].values] * batch_size,
                columns=self.seeds_ss.info.columns
            )
            batch_sources_dfs = []
            for r, s in zip(seed_ratios, seed_choices):
                sources_cols = pd.MultiIndex.from_tuples(
                    s,
                    names=SampleSet.SOURCES_MULTI_INDEX_NAMES,
                )
                sources_df = pd.DataFrame([r], columns=sources_cols)
                batch_sources_dfs.append(sources_df)
            sources_df = pd\
                .concat(batch_sources_dfs)\
                .fillna(0.0)
            batch_ss.sources = sources_df.reindex(
                columns=self.seeds_ss.sources.columns,
                fill_value=0.0
            )

            n_samples_produced += batch_size

            yield batch_ss

    def generate(self, n_samples: int, max_batch_size: int = 100,
                 skip_health_check: bool = False) -> SampleSet:
        """Computes random mixtures of seeds at the isotope level.
        """
        batches = []
        batch_iterable = self(
            n_samples,
            max_batch_size=max_batch_size,
            skip_health_check=skip_health_check
        )
        for batch_ss in batch_iterable:
            batches.append(batch_ss)
        mixtures_ss = SampleSet()
        mixtures_ss.spectra_type = self.seeds_ss.spectra_type
        mixtures_ss.concat(batches)

        return mixtures_ss

Methods

def generate(self, n_samples: int, max_batch_size: int = 100, skip_health_check: bool = False) ‑> SampleSet

Computes random mixtures of seeds at the isotope level.

Expand source code Browse git
def generate(self, n_samples: int, max_batch_size: int = 100,
             skip_health_check: bool = False) -> SampleSet:
    """Computes random mixtures of seeds at the isotope level.
    """
    batches = []
    batch_iterable = self(
        n_samples,
        max_batch_size=max_batch_size,
        skip_health_check=skip_health_check
    )
    for batch_ss in batch_iterable:
        batches.append(batch_ss)
    mixtures_ss = SampleSet()
    mixtures_ss.spectra_type = self.seeds_ss.spectra_type
    mixtures_ss.concat(batches)

    return mixtures_ss
class SeedSynthesizer
Expand source code Browse git
class SeedSynthesizer():

    @contextmanager
    def _cwd(self, path):
        """Temporarily change working directory.

        This is used to change the execution location which necessary due to how the GADRAS API
        uses relative pathing from its installation directory.
        """
        oldpwd = os.getcwd()
        os.chdir(path)
        try:
            yield
        finally:
            os.chdir(oldpwd)

    def _get_detector_parameters(self, gadras_api) -> dict:
        params = {}
        for k in gadras_api.detectorGetParameters().Keys:
            if k not in DETECTOR_PARAMS:
                continue
            params[k] = gadras_api.detectorGetParameter(k)
        return params

    def _set_detector_parameters(self, gadras_api, new_parameters: dict, verbose=False,
                                 dry_run=False) -> None:
        for k, v in new_parameters.items():
            k_upper = k.upper()
            if k_upper in INJECT_PARAMS:
                continue
            v_type = DETECTOR_PARAMS[k_upper]["type"]

            if v_type == "float":
                gadras_api.detectorSetParameter(k_upper, float(v))
                if verbose:
                    print(f"i: Setting parameter '{k_upper}' to {v}")
            elif v_type == "int":
                gadras_api.detectorSetParameter(k_upper.upper(), int(v))
                if verbose:
                    print(f"i: Setting parameter '{k_upper}' to {v}")
            else:
                print(f"Warning: parameter '{k}'s type of {v_type} is not supported - not set.")
        if not dry_run:
            gadras_api.detectorSaveParameters()

    def generate(self, config: Union[str, dict],
                 normalize_spectra: bool = True, normalize_sources: bool = True,
                 verbose: bool = False) -> SampleSet:
        """Produce a `SampleSet` containing foreground and/or background seeds using GADRAS based
        on the given inject configuration.

        Args:
            config: dictionary is treated as the actual config containing the needed information
                to perform injects via the GADRAS API, while a string is treated as a path to a YAML
                file which deserialized as a dictionary
            normalize_spectra: whether to divide each row of `SampleSet.spectra` by each row's sum
            normalize_sources: whether to divide each row of `SampleSet.sources` by each row's sum
            verbose: whether to show detailed output

        Returns:
            `SampleSet` containing foreground and/or background seeds generated by GADRAS
        """
        if isinstance(config, str):
            with open(config, "r") as stream:
                config = yaml.safe_load(stream)
        elif not isinstance(config, dict):
            msg = (
                "The provided config for seed synthesis must either be "
                "a path to a properly structured YAML file or "
                "a properly structured dictionary."
            )
            raise ValueError(msg)

        validate_inject_config(config)

        setups = get_inject_setups(config)
        with self._cwd(GADRAS_ASSEMBLY_PATH):
            gadras_api = get_gadras_api()
            detector_name = config["gamma_detector"]["name"]
            gadras_api.detectorSetCurrent(detector_name)
            original_detector_parameters = self._get_detector_parameters(gadras_api)
            if verbose:
                print(f"Obtaining sources for '{detector_name}'")

            source_list = []
            for s in setups:
                source_injector = SourceInjector(gadras_api)
                new_detector_parameters = s["gamma_detector"]["parameters"]
                now = _get_utc_timestamp().replace(":", "_")  # replace() prevents error on Windows
                rel_output_path = f"{now}_sources.pcf"
                try:
                    self._set_detector_parameters(gadras_api, new_detector_parameters, verbose)
                    # TODO: propagate dry_run to injectors
                    # Source injects
                    pcf_abs_path = source_injector.generate(s, rel_output_path, verbose=verbose)
                    seeds_ss = read_pcf(pcf_abs_path)
                    # Manually set distance_cm so it works with expanded configs
                    seeds_ss.info["distance_cm"] = new_detector_parameters["distance_cm"]
                    seeds_ss.info["height_cm"] = new_detector_parameters["height_cm"]
                    for k, v in new_detector_parameters.items():
                        if k not in DETECTOR_PARAMS or k.startswith("ECAL"):
                            # e-cal info will come from the PCF
                            continue
                        seeds_ss.info[k.lower()] = v
                    if not normalize_sources:
                        seeds_ss.sources *= seeds_ss.spectra.sum(axis=1).values
                    source_list.append(seeds_ss)
                except Exception as e:
                    # Try to restore .dat file to original state even when an error occurs
                    self._set_detector_parameters(gadras_api, original_detector_parameters)
                    raise e

            # Restore .dat file to original state
            self._set_detector_parameters(gadras_api, original_detector_parameters)

        ss = SampleSet()
        ss.spectra_state = SpectraState.Counts
        ss.concat(source_list)
        ss.detector_info = deepcopy(config["gamma_detector"])
        ss.set_dead_time_proportions()
        if normalize_spectra:
            ss.normalize()

        return ss

Methods

def generate(self, config: Union[str, dict], normalize_spectra: bool = True, normalize_sources: bool = True, verbose: bool = False) ‑> SampleSet

Produce a SampleSet containing foreground and/or background seeds using GADRAS based on the given inject configuration.

Args

config
dictionary is treated as the actual config containing the needed information to perform injects via the GADRAS API, while a string is treated as a path to a YAML file which deserialized as a dictionary
normalize_spectra
whether to divide each row of SampleSet.spectra by each row's sum
normalize_sources
whether to divide each row of SampleSet.sources by each row's sum
verbose
whether to show detailed output

Returns

SampleSet containing foreground and/or background seeds generated by GADRAS

Expand source code Browse git
def generate(self, config: Union[str, dict],
             normalize_spectra: bool = True, normalize_sources: bool = True,
             verbose: bool = False) -> SampleSet:
    """Produce a `SampleSet` containing foreground and/or background seeds using GADRAS based
    on the given inject configuration.

    Args:
        config: dictionary is treated as the actual config containing the needed information
            to perform injects via the GADRAS API, while a string is treated as a path to a YAML
            file which deserialized as a dictionary
        normalize_spectra: whether to divide each row of `SampleSet.spectra` by each row's sum
        normalize_sources: whether to divide each row of `SampleSet.sources` by each row's sum
        verbose: whether to show detailed output

    Returns:
        `SampleSet` containing foreground and/or background seeds generated by GADRAS
    """
    if isinstance(config, str):
        with open(config, "r") as stream:
            config = yaml.safe_load(stream)
    elif not isinstance(config, dict):
        msg = (
            "The provided config for seed synthesis must either be "
            "a path to a properly structured YAML file or "
            "a properly structured dictionary."
        )
        raise ValueError(msg)

    validate_inject_config(config)

    setups = get_inject_setups(config)
    with self._cwd(GADRAS_ASSEMBLY_PATH):
        gadras_api = get_gadras_api()
        detector_name = config["gamma_detector"]["name"]
        gadras_api.detectorSetCurrent(detector_name)
        original_detector_parameters = self._get_detector_parameters(gadras_api)
        if verbose:
            print(f"Obtaining sources for '{detector_name}'")

        source_list = []
        for s in setups:
            source_injector = SourceInjector(gadras_api)
            new_detector_parameters = s["gamma_detector"]["parameters"]
            now = _get_utc_timestamp().replace(":", "_")  # replace() prevents error on Windows
            rel_output_path = f"{now}_sources.pcf"
            try:
                self._set_detector_parameters(gadras_api, new_detector_parameters, verbose)
                # TODO: propagate dry_run to injectors
                # Source injects
                pcf_abs_path = source_injector.generate(s, rel_output_path, verbose=verbose)
                seeds_ss = read_pcf(pcf_abs_path)
                # Manually set distance_cm so it works with expanded configs
                seeds_ss.info["distance_cm"] = new_detector_parameters["distance_cm"]
                seeds_ss.info["height_cm"] = new_detector_parameters["height_cm"]
                for k, v in new_detector_parameters.items():
                    if k not in DETECTOR_PARAMS or k.startswith("ECAL"):
                        # e-cal info will come from the PCF
                        continue
                    seeds_ss.info[k.lower()] = v
                if not normalize_sources:
                    seeds_ss.sources *= seeds_ss.spectra.sum(axis=1).values
                source_list.append(seeds_ss)
            except Exception as e:
                # Try to restore .dat file to original state even when an error occurs
                self._set_detector_parameters(gadras_api, original_detector_parameters)
                raise e

        # Restore .dat file to original state
        self._set_detector_parameters(gadras_api, original_detector_parameters)

    ss = SampleSet()
    ss.spectra_state = SpectraState.Counts
    ss.concat(source_list)
    ss.detector_info = deepcopy(config["gamma_detector"])
    ss.set_dead_time_proportions()
    if normalize_spectra:
        ss.normalize()

    return ss
class SpectraState (value, names=None, *, module=None, qualname=None, type=None, start=1)

States in which SampleSet spectra can exist.

Expand source code Browse git
class SpectraState(int, Enum):
    """States in which SampleSet spectra can exist."""
    Unknown = 0
    Counts = 1
    L1Normalized = 2
    L2Normalized = 3

Ancestors

  • builtins.int
  • enum.Enum

Class variables

var Counts
var L1Normalized
var L2Normalized
var Unknown
class SpectraType (value, names=None, *, module=None, qualname=None, type=None, start=1)

Types for SampleSet spectra.

Expand source code Browse git
class SpectraType(int, Enum):
    """Types for SampleSet spectra."""
    Unknown = 0
    Background = 1
    Foreground = 2
    Gross = 3
    BackgroundForeground = 4

Ancestors

  • builtins.int
  • enum.Enum

Class variables

var Background
var BackgroundForeground
var Foreground
var Gross
var Unknown
class StaticSynthesizer (samples_per_seed: int = 100, live_time_function: str = 'uniform', live_time_function_args=(0.25, 8.0), snr_function: str = 'uniform', snr_function_args=(0.01, 100.0), bg_cps: float = 300.0, long_bg_live_time: float = 120.0, apply_poisson_noise: bool = True, normalize_sources: bool = True, return_fg: bool = True, return_gross: bool = False, rng: numpy.random._generator.Generator = Generator(PCG64))

Creates a set of synthetic gamma spectra from seed templates.

The "seed" templates are count-normalized spectra representing signature shapes of interest. The static synthesizer takes the seeds you have chosen and scales them up in terms of three components:

  • live time (the amount of time over which the spectrum was collected/integrated)
  • signal-to-noise ratio (SNR) (source counts divided by the square root of background counts, i.e., the number of standard deviations above background)
  • a fixed background rate (as a reference point)

The static synthesizer is meant to capture various count rates from sources in a statically placed detector scenario where the background can be characterized (and usually subtracted). Effects related to pile-up, scattering, or other shape-changing outcomes must be represented in the seeds.

Args

samples_per_seed
number of synthetic samples to generate per source-background seed pair
live_time_function
string that names the method of sampling for target live time values (options: uniform, log10, discrete, list)
live_time_function_args
range of values which are sampled in the fashion specified by the live_time_function argument
snr_function
string that names the method of sampling for target signal-to-noise ratio values (options: uniform, log10, discrete, list)
snr_function_args
range of values which are sampled in the fashion specified by the snr_function argument
Expand source code Browse git
class StaticSynthesizer(Synthesizer):
    """Creates a set of synthetic gamma spectra from seed templates.

    The "seed" templates are count-normalized spectra representing signature shapes of interest.
    The static synthesizer takes the seeds you have chosen and scales them up in terms of three
    components:

    - live time (the amount of time over which the spectrum was collected/integrated)
    - signal-to-noise ratio (SNR) (source counts divided by the square root of background counts,
      i.e., the number of standard deviations above background)
    - a fixed background rate (as a reference point)

    The static synthesizer is meant to capture various count rates from sources in a statically
    placed detector scenario where the background can be characterized (and usually subtracted).
    Effects related to pile-up, scattering, or other shape-changing outcomes must be represented
    in the seeds.
    """

    def __init__(self, samples_per_seed: int = 100,
                 live_time_function: str = "uniform", live_time_function_args=(0.25, 8.0),
                 snr_function: str = "uniform", snr_function_args=(0.01, 100.0),
                 bg_cps: float = 300.0, long_bg_live_time: float = 120.0,
                 apply_poisson_noise: bool = True,
                 normalize_sources: bool = True,
                 return_fg: bool = True, return_gross: bool = False,
                 rng: Generator = np.random.default_rng()) -> None:
        """
        Args:
            samples_per_seed: number of synthetic samples to generate
                per source-background seed pair
            live_time_function: string that names the method of sampling
                for target live time values (options: uniform, log10, discrete, list)
            live_time_function_args: range of values which are sampled in the
                fashion specified by the `live_time_function` argument
            snr_function: string that names the method of sampling for target
                signal-to-noise ratio values (options: uniform, log10, discrete, list)
            snr_function_args: range of values which are sampled in the fashion
                specified by the `snr_function` argument
        """
        super().__init__(bg_cps, long_bg_live_time, apply_poisson_noise, normalize_sources,
                         return_fg, return_gross, rng)

        self.samples_per_seed = samples_per_seed
        self.live_time_function = live_time_function
        self.live_time_function_args = live_time_function_args
        self.snr_function = snr_function
        self.snr_function_args = snr_function_args

    # region Properties

    @property
    def samples_per_seed(self):
        """Get or set the number of samples to create per seed (excluding the background seed).

        Raises:
            `TypeError` when provided samples_per_seed is not an integer.
        """
        return self._samples_per_seed

    @samples_per_seed.setter
    def samples_per_seed(self, value: int):
        if not isinstance(value, int):
            raise TypeError("Property 'samples_per_seed' key must be of type 'int'!")

        self._samples_per_seed = value

    @property
    def live_time_function(self) -> str:
        """Get or set the function used to randomly sample the desired live time space.

        Raises:
            `ValueError` when an unsupported function type is provided.
        """
        return self._live_time_function

    @live_time_function.setter
    def live_time_function(self, value: str):
        if value not in self.SUPPORTED_SAMPLING_FUNCTIONS:
            raise ValueError("{} is not a valid function.".format(value))
        self._live_time_function = value

    @property
    def live_time_function_args(self) -> tuple:
        """Get or set the live time space to be randomly sampled."""
        return self._live_time_function_args

    @live_time_function_args.setter
    def live_time_function_args(self, value):
        self._live_time_function_args = value

    @property
    def snr_function(self) -> str:
        """Get or set the function used to randomly sample the desired signal-to-noise (SNR)
        ratio space.

        Raises:
            `ValueError` when an unsupported function type is provided.
        """
        return self._snr_function

    @snr_function.setter
    def snr_function(self, value: str):
        if value not in self.SUPPORTED_SAMPLING_FUNCTIONS:
            raise ValueError("{} is not a valid function.".format(value))
        self._snr_function = value

    @property
    def snr_function_args(self) -> tuple:
        """Get or set the signal-to-noise (SNR) space to be randomly sampled."""
        return self._snr_function_args

    @snr_function_args.setter
    def snr_function_args(self, value):
        self._snr_function_args = value

    # endregion

    def _get_concatenated_batches(self, ss_batches, n_samples_expected, spectra_type):
        ss = SampleSet()
        ss.measured_or_synthetic = self.SYNTHETIC_STR
        ss.spectra_state = SpectraState.Counts
        ss.spectra_type = spectra_type
        ss.concat(ss_batches)
        self._verify_n_samples_synthesized(ss.n_samples, n_samples_expected)
        if self.normalize_sources:
            ss.normalize_sources()
        return ss

    def _get_synthetic_samples(self, fg_seeds_ss: SampleSet, bg_seeds_ss: SampleSet, verbose=True):
        """Iterate over each background, then each source, to generate a batch of spectra that
            target a set of SNR and live time values.
        """
        n_returns = self.return_fg + self.return_gross
        n_samples_per_return = self.samples_per_seed * fg_seeds_ss.n_samples * bg_seeds_ss.n_samples
        n_samples_expected = n_returns * n_samples_per_return
        fg_ss_batches = []
        gross_ss_batches = []

        lt_targets = get_distribution_values(self.live_time_function,
                                             self.live_time_function_args,
                                             n_samples_per_return,
                                             self._rng)
        snr_targets = get_distribution_values(self.snr_function,
                                              self.snr_function_args,
                                              n_samples_per_return,
                                              self._rng)
        batch_configs = [
            (
                b,
                f,
                self.samples_per_seed * (b * fg_seeds_ss.n_samples + f),
                self.samples_per_seed * (b * fg_seeds_ss.n_samples + f + 1),
            )
            for f in range(fg_seeds_ss.n_samples)
            for b in range(bg_seeds_ss.n_samples)
        ]

        fg_labels = fg_seeds_ss.get_labels(target_level="Seed", max_only=False,
                                           level_aggregation=None)
        fg_ss_batches = []
        gross_ss_batches = []
        for b, f, batch_begin_idx, batch_end_idx in batch_configs:
            bg_seed = bg_seeds_ss.spectra.iloc[b]
            bg_sources = bg_seeds_ss.sources.iloc[b]
            fg_seed = fg_seeds_ss.spectra.iloc[f]
            fg_sources = fg_seeds_ss.sources.iloc[f]
            batch_lt_targets = lt_targets[batch_begin_idx:batch_end_idx]
            batch_snr_targets = snr_targets[batch_begin_idx:batch_end_idx]

            fg_batch_ss, gross_batch_ss = self._get_batch(
                fg_seed, fg_sources,
                bg_seed, bg_sources,
                batch_lt_targets,
                batch_snr_targets
            )

            fg_seed_info = fg_seeds_ss.info.iloc[f]
            set_ss_info_from_seed_info(fg_batch_ss, fg_seed_info, self._synthesis_start_dt)
            set_ss_info_from_seed_info(gross_batch_ss, fg_seed_info, self._synthesis_start_dt)

            fg_ss_batches.append(fg_batch_ss)
            gross_ss_batches.append(gross_batch_ss)

            if verbose:
                self._report_progress(
                    n_samples_expected,
                    fg_labels[f]
                )

        fg_ss = gross_ss = None
        if self.return_fg:
            fg_ss = self._get_concatenated_batches(
                fg_ss_batches,
                n_samples_per_return,
                SpectraType.Foreground
            )
        if self.return_gross:
            gross_ss = self._get_concatenated_batches(
                gross_ss_batches,
                n_samples_per_return,
                SpectraType.Gross
            )

        return fg_ss, gross_ss

    def generate(self, fg_seeds_ss: SampleSet, bg_seeds_ss: SampleSet,
                 skip_health_check: bool = False,
                 verbose: bool = True) -> Tuple[SampleSet, SampleSet]:
        """Generate a `SampleSet` of gamma spectra from the provided config.

        Args:
            fg_seeds_ss: spectra normalized by total counts to be used
                as the source component(s) of spectra
            bg_seeds_ss: spectra normalized by total counts to be used
                as the background components of gross spectra
            fixed_bg_ss: single spectrum to be used as a fixed (or intrinsic)
                background source; live time information must be present.
                This spectrum is used to represent things like:

                - cosmic background (which is location-specific);
                - one or more calibration sources; or
                - intrinsic counts from the detector material (e.g., LaBr3).

                This spectrum will form the base of each background spectrum where seeds in
                `bg_seeds_ss`, which represent mixtures of K-U-T, get added on top.
                Note: this spectrum is not considered part of the `bg_cps` parameter,
                but is instead added on top of it.
            skip_health_check: whether to skip seed health checks
            verbose: whether to show detailed output

        Returns:
            Tuple of synthetic foreground and gross `SampleSet`s

        Raises:
            - `ValueError` when either foreground of background seeds are not provided
            - `AssertionError` if seed health check fails
        """
        if not fg_seeds_ss or not bg_seeds_ss:
            raise ValueError("At least one foreground and background seed must be provided.")
        if not skip_health_check:
            fg_seeds_ss.check_seed_health()
            bg_seeds_ss.check_seed_health()

        self._reset_progress()
        if verbose:
            tstart = time()

        fg_ss, gross_ss = self._get_synthetic_samples(
            fg_seeds_ss,
            bg_seeds_ss,
            verbose=verbose
        )

        if verbose:
            delay = time() - tstart
            self._report_completion(delay)

        return fg_ss, gross_ss

Ancestors

Instance variables

var live_time_function : str

Get or set the function used to randomly sample the desired live time space.

Raises

ValueError when an unsupported function type is provided.

Expand source code Browse git
@property
def live_time_function(self) -> str:
    """Get or set the function used to randomly sample the desired live time space.

    Raises:
        `ValueError` when an unsupported function type is provided.
    """
    return self._live_time_function
var live_time_function_args : tuple

Get or set the live time space to be randomly sampled.

Expand source code Browse git
@property
def live_time_function_args(self) -> tuple:
    """Get or set the live time space to be randomly sampled."""
    return self._live_time_function_args
var samples_per_seed

Get or set the number of samples to create per seed (excluding the background seed).

Raises

TypeError when provided samples_per_seed is not an integer.

Expand source code Browse git
@property
def samples_per_seed(self):
    """Get or set the number of samples to create per seed (excluding the background seed).

    Raises:
        `TypeError` when provided samples_per_seed is not an integer.
    """
    return self._samples_per_seed
var snr_function : str

Get or set the function used to randomly sample the desired signal-to-noise (SNR) ratio space.

Raises

ValueError when an unsupported function type is provided.

Expand source code Browse git
@property
def snr_function(self) -> str:
    """Get or set the function used to randomly sample the desired signal-to-noise (SNR)
    ratio space.

    Raises:
        `ValueError` when an unsupported function type is provided.
    """
    return self._snr_function
var snr_function_args : tuple

Get or set the signal-to-noise (SNR) space to be randomly sampled.

Expand source code Browse git
@property
def snr_function_args(self) -> tuple:
    """Get or set the signal-to-noise (SNR) space to be randomly sampled."""
    return self._snr_function_args

Methods

def generate(self, fg_seeds_ss: SampleSet, bg_seeds_ss: SampleSet, skip_health_check: bool = False, verbose: bool = True) ‑> Tuple[SampleSetSampleSet]

Generate a SampleSet of gamma spectra from the provided config.

Args

fg_seeds_ss
spectra normalized by total counts to be used as the source component(s) of spectra
bg_seeds_ss
spectra normalized by total counts to be used as the background components of gross spectra
fixed_bg_ss

single spectrum to be used as a fixed (or intrinsic) background source; live time information must be present. This spectrum is used to represent things like:

  • cosmic background (which is location-specific);
  • one or more calibration sources; or
  • intrinsic counts from the detector material (e.g., LaBr3).

This spectrum will form the base of each background spectrum where seeds in bg_seeds_ss, which represent mixtures of K-U-T, get added on top. Note: this spectrum is not considered part of the bg_cps parameter, but is instead added on top of it.

skip_health_check
whether to skip seed health checks
verbose
whether to show detailed output

Returns

Tuple of synthetic foreground and gross SampleSets

Raises

  • ValueError when either foreground of background seeds are not provided
  • AssertionError if seed health check fails
Expand source code Browse git
def generate(self, fg_seeds_ss: SampleSet, bg_seeds_ss: SampleSet,
             skip_health_check: bool = False,
             verbose: bool = True) -> Tuple[SampleSet, SampleSet]:
    """Generate a `SampleSet` of gamma spectra from the provided config.

    Args:
        fg_seeds_ss: spectra normalized by total counts to be used
            as the source component(s) of spectra
        bg_seeds_ss: spectra normalized by total counts to be used
            as the background components of gross spectra
        fixed_bg_ss: single spectrum to be used as a fixed (or intrinsic)
            background source; live time information must be present.
            This spectrum is used to represent things like:

            - cosmic background (which is location-specific);
            - one or more calibration sources; or
            - intrinsic counts from the detector material (e.g., LaBr3).

            This spectrum will form the base of each background spectrum where seeds in
            `bg_seeds_ss`, which represent mixtures of K-U-T, get added on top.
            Note: this spectrum is not considered part of the `bg_cps` parameter,
            but is instead added on top of it.
        skip_health_check: whether to skip seed health checks
        verbose: whether to show detailed output

    Returns:
        Tuple of synthetic foreground and gross `SampleSet`s

    Raises:
        - `ValueError` when either foreground of background seeds are not provided
        - `AssertionError` if seed health check fails
    """
    if not fg_seeds_ss or not bg_seeds_ss:
        raise ValueError("At least one foreground and background seed must be provided.")
    if not skip_health_check:
        fg_seeds_ss.check_seed_health()
        bg_seeds_ss.check_seed_health()

    self._reset_progress()
    if verbose:
        tstart = time()

    fg_ss, gross_ss = self._get_synthetic_samples(
        fg_seeds_ss,
        bg_seeds_ss,
        verbose=verbose
    )

    if verbose:
        delay = time() - tstart
        self._report_completion(delay)

    return fg_ss, gross_ss