Package riid
PyRIID is a Python package providing modeling and data synthesis utilities for machine learning-based research and development of radioisotope-related detection, identification, and quantification.
Installation
Requirements:
- Python version: 3.10 to 3.12
- Note: we recommended the highest Python version you can manage as anecdotally, we have noticed that everything just tends to get faster.
- Operating systems: Windows, Mac, or Ubuntu
Tests and examples are run via Actions on many combinations of Python version and operating system. You can verify support for your platform by checking the workflow files.
For Use
To use the latest version on PyPI, run:
pip install riid
Note that changes are slower to appear on PyPI, so for the latest features, run:**
pip install git+https://github.com/sandialabs/pyriid.git@main
For Development
If you are developing PyRIID, clone this repository and run:
pip install -e ".[dev]"
If you encounter Pylance issues, try:
pip install -e ".[dev]" --config-settings editable_mode=compat
Examples
Examples for how to use this package can be found here.
Tests
Unit tests for this package can be found here.
Run all unit tests with the following:
python -m unittest tests/*.py -v
You can also run one of the run_tests.* scripts, whichever is appropriate for your platform.
Docs
API documentation can be found here.
Docs can be built locally with the following:
pip install -r pdoc/requirements.txt
pdoc riid -o docs/ --html --template-dir pdoc
Contributing
Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change.
Please make sure to update tests as appropriate and adhere to our code of conduct.
Contacts
Maintainers and authors can be found here.
Copyright
Full copyright details can be found here.
Acknowledgements
Thank you to the U.S. Department of Energy, National Nuclear Security Administration,
Office of Defense Nuclear Nonproliferation Research and Development (DNN R&D) for funding that has led to versions 2.0 and 2.1.
Additionally, thank you to the following individuals who have provided invaluable subject-matter expertise:
- Paul Thelen (also an author)
- Ben Maestas
- Greg Thoreson
- Michael Enghauser
- Elliott Leonard
Citing
When citing PyRIID, please reference the U.S. Department of Energy Office of Science and Technology Information (OSTI) record here: 10.11578/dc.20221017.2
Related Reports, Publications, and Projects
- Alan Van Omen, "A Semi-Supervised Model for Multi-Label Radioisotope Classification and Out-of-Distribution Detection." Diss. 2023. doi: 10.7302/7200.
- Tyler Morrow, "Questionnaire for Radioisotope Identification and Estimation from Gamma Spectra using PyRIID v2." United States: N. p., 2023. Web. doi: 10.2172/2229893.
- Aaron Fjeldsted, Tyler Morrow, and Douglas Wolfe, "Identifying Signal-to-Noise Ratios Representative of Gamma Detector Response in Realistic Scenarios," 2023 IEEE Nuclear Science Symposium, Medical Imaging Conference and International Symposium on Room-Temperature Semiconductor Detectors (NSS MIC RTSD), Vancouver, BC, Canada, 2023. doi: 10.1109/NSSMICRTSD49126.2023.10337860.
- Alan Van Omen and Tyler Morrow, "A Semi-supervised Learning Method to Produce Explainable Radioisotope Proportion Estimates for NaI-based Synthetic and Measured Gamma Spectra." United States: N. p., 2024. Web. doi: 10.2172/2335904.
- Alan Van Omen and Tyler Morrow, "Controlling Radioisotope Proportions When Randomly Sampling from Dirichlet Distributions in PyRIID." United States: N. p., 2024. Web. doi: 10.2172/2335905.
- Alan Van Omen, Tyler Morrow, et al., "Multilabel Proportion Prediction and Out-of-distribution Detection on Gamma Spectra of Short-lived Fission Products." Annals of Nuclear Energy 208 (2024): 110777. doi: 10.1016/j.anucene.2024.110777.
- Aaron Fjeldsted, Tyler Morrow, et al., "A Novel Methodology for Gamma-Ray Spectra Dataset Procurement over Varying Standoff Distances and Source Activities," Nuclear Instruments and Methods in Physics Research Section A (2024): 169681. doi: 10.1016/j.nima.2024.169681.
Expand source code Browse git
# Copyright 2021 National Technology & Engineering Solutions of Sandia, LLC (NTESS).
# Under the terms of Contract DE-NA0003525 with NTESS,
# the U.S. Government retains certain rights in this software.
"""
.. include:: ../README.md
"""
import logging
import os
import sys
from importlib.metadata import version
from riid.data.sampleset import (SampleSet, SpectraState, SpectraType,
read_hdf, read_json, read_pcf)
from riid.data.synthetic.passby import PassbySynthesizer
from riid.data.synthetic.seed import (SeedMixer, SeedSynthesizer,
get_dummy_seeds)
from riid.data.synthetic.static import StaticSynthesizer
HANDLER = logging.StreamHandler(sys.stdout)
logging.root.addHandler(HANDLER)
logging.root.setLevel(logging.DEBUG)
MPL_LOGGER = logging.getLogger("matplotlib")
MPL_LOGGER.setLevel(logging.WARNING)
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "1"
SAMPLESET_HDF_FILE_EXTENSION = ".h5"
SAMPLESET_JSON_FILE_EXTENSION = ".json"
PCF_FILE_EXTENSION = ".pcf"
ONNX_MODEL_FILE_EXTENSION = ".onnx"
TFLITE_MODEL_FILE_EXTENSION = ".tflite"
RIID = "riid"
__version__ = version(RIID)
__pdoc__ = {
"riid.data.synthetic.seed.SeedMixer.__call__": True,
"riid.data.synthetic.passby.PassbySynthesizer._generate_single_passby": True,
"riid.data.sampleset.SampleSet._channels_to_energies": True,
}
__all__ = ["SampleSet", "SpectraState", "SpectraType",
"read_hdf", "read_json", "read_pcf", "get_dummy_seeds",
"PassbySynthesizer", "SeedSynthesizer", "StaticSynthesizer", "SeedMixer"]
Sub-modules
riid.anomaly-
This module contains an algorithm for detecting statistical anomalies from sequences of gamma spectra.
riid.data-
This sub-package contains all utilities for synthesizing, reading, writing, and converting data.
riid.gadras-
This module wraps various GADRAS features by calling into the DLLs of a GADRAS installation.
riid.losses-
This module contains custom loss functions.
riid.metrics-
This module provides custom model metrics.
riid.models-
This module contains PyRIID models.
riid.visualize-
This module provides visualization functions primarily for visualizing SampleSets.
Functions
def get_dummy_seeds(n_channels: int = 512, live_time: float = 600.0, count_rate: float = 1000.0, normalize: bool = True, rng: numpy.random._generator.Generator = Generator(PCG64)) ‑> SampleSet-
Get a random, dummy
SampleSetof ideal seeds.WARNING: the spectra returned by this function each contain one gaussian peak that does not overlap with the peaks of other spectra. Such data is about as ideal as one could hope to be working with and does not represent anything real. Therefore, do not use this data for any purpose other than testing, debugging, or examples where code, not results, is being demonstrated. Any use in scientific studies does not make sense.
Args
n_channels- number of channels in the spectra DataFrame
live_time- collection time on which to base seeds (higher creates a less noisy shape)
count_rate- count rate on which to base seeds (higher creates a less noisy shape)
normalize- whether to apply an L1-norm to the spectra
rng- NumPy random number generator, useful for experiment repeatability
Returns
SampleSetwith randomly generated spectraExpand source code Browse git
def get_dummy_seeds(n_channels: int = 512, live_time: float = 600.0, count_rate: float = 1000.0, normalize: bool = True, rng: Generator = np.random.default_rng()) -> SampleSet: """Get a random, dummy `SampleSet` of ideal seeds. WARNING: the spectra returned by this function each contain one gaussian peak that does not overlap with the peaks of other spectra. Such data is about as *ideal* as one could hope to be working with and does not represent anything real. Therefore, **do not** use this data for any purpose other than testing, debugging, or examples where code, not results, is being demonstrated. Any use in scientific studies does not make sense. Args: n_channels: number of channels in the spectra DataFrame live_time: collection time on which to base seeds (higher creates a less noisy shape) count_rate: count rate on which to base seeds (higher creates a less noisy shape) normalize: whether to apply an L1-norm to the spectra rng: NumPy random number generator, useful for experiment repeatability Returns: `SampleSet` with randomly generated spectra """ ss = SampleSet() ss.measured_or_synthetic = "synthetic" ss.spectra_state = SpectraState.Counts ss.spectra_type = SpectraType.BackgroundForeground ss.synthesis_info = { "subtract_background": True, } sources = [ ("Industrial", "Am241", "Unshielded Am241"), ("Industrial", "Ba133", "Unshielded Ba133"), ("NORM", "K40", "PotassiumInSoil"), ("NORM", "K40", "Moderately Shielded K40"), ("NORM", "Ra226", "UraniumInSoil"), ("NORM", "Th232", "ThoriumInSoil"), ("SNM", "U238", "Unshielded U238"), ("SNM", "Pu239", "Unshielded Pu239"), ("SNM", "Pu239", "Moderately Shielded Pu239"), ("SNM", "Pu239", "Heavily Shielded Pu239"), ] n_sources = len(sources) n_fg_sources = n_sources sources_cols = pd.MultiIndex.from_tuples( sources, names=SampleSet.SOURCES_MULTI_INDEX_NAMES ) sources_data = np.identity(n_sources) ss.sources = pd.DataFrame(data=sources_data, columns=sources_cols) histograms = [] N_FG_COUNTS = int(count_rate * live_time) fg_std = np.sqrt(n_channels / n_sources) channels_per_sources = n_channels / n_fg_sources for i in range(n_fg_sources): mu = i * channels_per_sources + channels_per_sources / 2 counts = rng.normal(mu, fg_std, size=N_FG_COUNTS) fg_histogram, _ = np.histogram(counts, bins=n_channels, range=(0, n_channels)) histograms.append(fg_histogram) histograms = np.array(histograms) ss.spectra = pd.DataFrame(data=histograms) ss.info.total_counts = ss.spectra.sum(axis=1) ss.info.live_time = live_time ss.info.real_time = live_time ss.info.snr = None ss.info.ecal_order_0 = 0 ss.info.ecal_order_1 = 3000 ss.info.ecal_order_2 = 100 ss.info.ecal_order_3 = 0 ss.info.ecal_low_e = 0 ss.info.description = "" ss.update_timestamp() if normalize: ss.normalize() return ss def read_hdf(path: str | Path) ‑> SampleSet-
Read an HDF file in as a
SampleSetobject.Args
path- path for the HDF file to be read in
Returns
SampleSetobjectExpand source code Browse git
def read_hdf(path: str | Path) -> SampleSet: """Read an HDF file in as a `SampleSet` object. Args: path: path for the HDF file to be read in Returns: `SampleSet` object """ expanded_path = Path(path).expanduser() if not expanded_path.is_file(): raise FileNotFoundError(f"No file found at location '{expanded_path}'.") ss = _read_hdf(expanded_path) if not ss: raise FileNotFoundError(f"No data found at location '{expanded_path}'.") return ss def read_json(path: str | Path) ‑> SampleSet-
Expand source code Browse git
def read_json(path: str | Path) -> SampleSet: expanded_path = Path(path).expanduser() with open(expanded_path, "r") as fin: ss_dict = json.load(fin) ss = _pcf_dict_to_ss(ss_dict) ss.detector_info = ss_dict["detector_info"] return ss def read_pcf(path: str | Path, verbose: bool = False) ‑> SampleSet-
Read a PCF file in as a
SampleSetobject.Args
path- path for the PCF file to be read in
verbose- whether to show verbose function output in terminal
Returns
SamplesetobjectExpand source code Browse git
def read_pcf(path: str | Path, verbose: bool = False) -> SampleSet: """Read a PCF file in as a `SampleSet` object. Args: path: path for the PCF file to be read in verbose: whether to show verbose function output in terminal Returns: `Sampleset` object """ expanded_path = Path(path).expanduser() return _pcf_dict_to_ss(_pcf_to_dict(expanded_path, verbose), verbose)
Classes
class PassbySynthesizer (events_per_seed: int = 2, sample_interval: float = 0.125, dwell_time_function: str = 'uniform', dwell_time_function_args=(0.25, 8.0), fwhm_function: str = 'discrete', fwhm_function_args=(1,), snr_function: str = 'uniform', snr_function_args=(1.0, 10.0), min_fraction: float = 0.005, normalize_sources: bool = True, bg_cps: float = 300.0, long_bg_live_time: float = 120.0, apply_poisson_noise: bool = True, return_fg: bool = True, return_gross: bool = False, rng: numpy.random._generator.Generator = Generator(PCG64))-
Synthesizer for creating pass-by events as sequences of gamma spectra.
Args
events_per_seed- number of pass-bys to generate per source-background seed pair
live_time_function- string that names the method of sampling for target live time values (options: uniform, log10, discrete, list)
live_time_function_args- range of values which are sampled in the
fashion specified by the
live_time_functionargument snr_function- string that names the method of sampling for target signal-to-noise ratio values (options: uniform, log10, discrete, list)
snr_function_args- range of values which are sampled in the fashion
specified by the
snr_functionargument min_fraction- minimum proportion of peak amplitude to exclude
Expand source code Browse git
class PassbySynthesizer(Synthesizer): """Synthesizer for creating pass-by events as sequences of gamma spectra.""" _supported_functions = ["uniform", "log10", "discrete", "list"] def __init__(self, events_per_seed: int = 2, sample_interval: float = 0.125, dwell_time_function: str = "uniform", dwell_time_function_args=(0.25, 8.0), fwhm_function: str = "discrete", fwhm_function_args=(1,), snr_function: str = "uniform", snr_function_args=(1.0, 10.0), min_fraction: float = 0.005, normalize_sources: bool = True, bg_cps: float = 300.0, long_bg_live_time: float = 120.0, apply_poisson_noise: bool = True, return_fg: bool = True, return_gross: bool = False, rng: Generator = np.random.default_rng()): """ Args: events_per_seed: number of pass-bys to generate per source-background seed pair live_time_function: string that names the method of sampling for target live time values (options: uniform, log10, discrete, list) live_time_function_args: range of values which are sampled in the fashion specified by the `live_time_function` argument snr_function: string that names the method of sampling for target signal-to-noise ratio values (options: uniform, log10, discrete, list) snr_function_args: range of values which are sampled in the fashion specified by the `snr_function` argument min_fraction: minimum proportion of peak amplitude to exclude """ super().__init__(bg_cps, long_bg_live_time, apply_poisson_noise, normalize_sources, return_fg, return_gross, rng) self.events_per_seed = events_per_seed self.sample_interval = sample_interval self.dwell_time_function = dwell_time_function self.dwell_time_function_args = dwell_time_function_args self.fwhm_function = fwhm_function self.fwhm_function_args = fwhm_function_args self.snr_function = snr_function self.snr_function_args = snr_function_args self.min_fraction = min_fraction # region Properties @property def dwell_time_function(self) -> str: """Get or set the function used to randomly sample the desired dwell time space. Raises: `ValueError` when an unsupported function type is provided """ return self._dwell_time_function @dwell_time_function.setter def dwell_time_function(self, value: str): if value not in self._supported_functions: raise ValueError("{} is not a valid function.".format(value)) self._dwell_time_function = value @property def dwell_time_function_args(self) -> tuple: """Get or set the dwell time space to be randomly sampled.""" return self._dwell_time_function_args @dwell_time_function_args.setter def dwell_time_function_args(self, value): self._dwell_time_function_args = value @property def events_per_seed(self) -> int: """Get or set the number of samples to create per seed (excluding the background seed). """ return self._events_per_seed @events_per_seed.setter def events_per_seed(self, value): self._events_per_seed = value @property def fwhm_function(self) -> str: """Get or set the function used to randomly sample the desired full-width-half-max (FWHM) ratio space. Raises: `ValueError` when an unsupported function type is provided """ return self._fwhm_function @fwhm_function.setter def fwhm_function(self, value: str): if value not in self._supported_functions: raise ValueError("{} is not a valid function.".format(value)) self._fwhm_function = value @property def fwhm_function_args(self) -> tuple: """Get or set the full-width-half-max (FWHM) space to be randomly sampled.""" return self._fwhm_function_args @fwhm_function_args.setter def fwhm_function_args(self, value): self._fwhm_function_args = value @property def min_fraction(self) -> float: """Get or set the percentage of the peak amplitude to exclude.""" return self._min_fraction @min_fraction.setter def min_fraction(self, value: float): self._min_fraction = value @property def sample_interval(self) -> float: """Get or set the sample interval (in seconds) at which the events are simulated.""" return self._sample_interval @sample_interval.setter def sample_interval(self, value: float): self._sample_interval = value @property def snr_function(self) -> str: """Get or set the function used to randomly sample the desired signal-to-noise (SNR) ratio space. Raises: `ValueError` when an unsupported function type is provided """ return self._snr_function @snr_function.setter def snr_function(self, value: str): if value not in self._supported_functions: raise ValueError("{} is not a valid function.".format(value)) self._snr_function = value @property def snr_function_args(self) -> tuple: """Get or set the signal-to-noise (SNR) space to be randomly sampled.""" return self._snr_function_args @snr_function_args.setter def snr_function_args(self, value): self._snr_function_args = value # endregion def _calculate_passby_shape(self, fwhm: float): """Calculates a pass-by shape with maximum of 1 which goes from min_fraction to min_fraction of signal with specified fwhm. Args: fwhm: full width at half maximum value to use for calculating the passby shape Returns: Array of floats representing the passby shape """ lim = np.sqrt((1-self.min_fraction)/self.min_fraction) samples = np.arange(-lim, lim, self.sample_interval / fwhm / 2) return 1 / (np.power(samples, 2) + 1) def _generate_single_passby(self, fwhm: float, snr: float, dwell_time: float, fg_seed: np.array, bg_seed: np.array, fg_sources: pd.Series, bg_sources: pd.Series): """Generate a `SampleSet` with a sequence of spectra representative of a single pass-by. A source template is scaled up and then back down over the duration of a pass-by in a Gaussian fashion. Each sample has some total counts obtained over `dwell_time / sample_interval` seconds. Args: fwhm: full width at half maximum of the (bell-like) shape of the source count rate over time snr: overall signal-to-noise ratio (SNR) of the passby event dwell_time: overall time (seconds) the source is present fg_seed: source template to use for calculating the passby; the value of each channel should be calculated as counts divided by total counts bg_seed: background template to use for calculating the passby; the value of each channel should be calculated as counts divided by total counts fg_ecal: e-cal terms for the provided source seed fg_sources: underlying ground truth proportions of anomalous sources bg_sources: underlying ground truth proportions of background sources Returns: `SampleSet` object containing the synthesized pass-by data """ event_snr_targets = self._calculate_passby_shape(fwhm) * snr dwell_targets = np.zeros(int(dwell_time / self.sample_interval)) snr_targets = np.concatenate((event_snr_targets, dwell_targets)) n_samples = len(snr_targets) live_times = np.ones(n_samples) * self.sample_interval fg_ss, gross_ss = self._get_batch( fg_seed, fg_sources, bg_seed, bg_sources, live_times, snr_targets ) if self.normalize_sources: if self.return_fg: fg_ss.normalize_sources() if self.return_gross: gross_ss.normalize_sources() return fg_ss, gross_ss def generate(self, fg_seeds_ss: SampleSet, bg_seeds_ss: SampleSet, skip_health_check: bool = False, verbose: bool = True) \ -> List[Tuple[SampleSet, SampleSet, SampleSet]]: """Generate a list of `SampleSet`s where each contains a pass-by as a sequence of spectra. Args: fg_seeds_ss: spectra normalized by total counts to be used as the source component(s) of spectra bg_seeds_ss: spectra normalized by total counts to be used as the background components of gross spectra skip_health_check: whether to skip seed health checks verbose: whether to display output from synthesis Returns: List of tuples of SampleSets where each tuple represents a pass-by event Raises: - `ValueError` when either foreground of background seeds are not provided - `AssertionError` if seed health check fails """ if not fg_seeds_ss or not bg_seeds_ss: raise ValueError("At least one foreground and background seed must be provided.") if not skip_health_check: fg_seeds_ss.check_seed_health() bg_seeds_ss.check_seed_health() self._reset_progress() if verbose: tstart = time() args = [] for bg_i in range(bg_seeds_ss.n_samples): bg_pmf = bg_seeds_ss.spectra.iloc[bg_i] bg_sources = bg_seeds_ss.sources.iloc[bg_i] fwhm_targets = get_distribution_values(self.fwhm_function, self.fwhm_function_args, self.events_per_seed, self._rng) snr_targets = get_distribution_values(self.snr_function, self.snr_function_args, self.events_per_seed, self._rng) dwell_time_targets = get_distribution_values(self.dwell_time_function, self.dwell_time_function_args, self.events_per_seed, self._rng) for fg_i in range(fg_seeds_ss.n_samples): fg_pmf = fg_seeds_ss.spectra.iloc[fg_i] fg_sources = fg_seeds_ss.sources.iloc[fg_i] for t_i in range(self.events_per_seed): fwhm = fwhm_targets[t_i] snr = snr_targets[t_i] dwell_time = dwell_time_targets[t_i] pb_args = (fg_i, fwhm, snr, dwell_time, fg_pmf, bg_pmf, fg_sources, bg_sources) args.append(pb_args) # TODO: the following prevents periodic progress reports passbys = [] for a in args: f, fwhm, snr, dwell_time, fg_pmf, bg_pmf, fg_sources, bg_sources = a fg_passby_ss, gross_passby_ss = self._generate_single_passby( fwhm, snr, dwell_time, fg_pmf, bg_pmf, fg_sources, bg_sources ) fg_seed_info = fg_seeds_ss.info.iloc[f] set_ss_info_from_seed_info(fg_passby_ss, fg_seed_info, self._synthesis_start_dt) set_ss_info_from_seed_info(gross_passby_ss, fg_seed_info, self._synthesis_start_dt) passbys.append((fg_passby_ss, gross_passby_ss)) if verbose: delay = time() - tstart self._report_completion(delay) return passbysAncestors
Instance variables
var dwell_time_function : str-
Get or set the function used to randomly sample the desired dwell time space.
Raises
ValueErrorwhen an unsupported function type is providedExpand source code Browse git
@property def dwell_time_function(self) -> str: """Get or set the function used to randomly sample the desired dwell time space. Raises: `ValueError` when an unsupported function type is provided """ return self._dwell_time_function var dwell_time_function_args : tuple-
Get or set the dwell time space to be randomly sampled.
Expand source code Browse git
@property def dwell_time_function_args(self) -> tuple: """Get or set the dwell time space to be randomly sampled.""" return self._dwell_time_function_args var events_per_seed : int-
Get or set the number of samples to create per seed (excluding the background seed).
Expand source code Browse git
@property def events_per_seed(self) -> int: """Get or set the number of samples to create per seed (excluding the background seed). """ return self._events_per_seed var fwhm_function : str-
Get or set the function used to randomly sample the desired full-width-half-max (FWHM) ratio space.
Raises
ValueErrorwhen an unsupported function type is providedExpand source code Browse git
@property def fwhm_function(self) -> str: """Get or set the function used to randomly sample the desired full-width-half-max (FWHM) ratio space. Raises: `ValueError` when an unsupported function type is provided """ return self._fwhm_function var fwhm_function_args : tuple-
Get or set the full-width-half-max (FWHM) space to be randomly sampled.
Expand source code Browse git
@property def fwhm_function_args(self) -> tuple: """Get or set the full-width-half-max (FWHM) space to be randomly sampled.""" return self._fwhm_function_args var min_fraction : float-
Get or set the percentage of the peak amplitude to exclude.
Expand source code Browse git
@property def min_fraction(self) -> float: """Get or set the percentage of the peak amplitude to exclude.""" return self._min_fraction var sample_interval : float-
Get or set the sample interval (in seconds) at which the events are simulated.
Expand source code Browse git
@property def sample_interval(self) -> float: """Get or set the sample interval (in seconds) at which the events are simulated.""" return self._sample_interval var snr_function : str-
Get or set the function used to randomly sample the desired signal-to-noise (SNR) ratio space.
Raises
ValueErrorwhen an unsupported function type is providedExpand source code Browse git
@property def snr_function(self) -> str: """Get or set the function used to randomly sample the desired signal-to-noise (SNR) ratio space. Raises: `ValueError` when an unsupported function type is provided """ return self._snr_function var snr_function_args : tuple-
Get or set the signal-to-noise (SNR) space to be randomly sampled.
Expand source code Browse git
@property def snr_function_args(self) -> tuple: """Get or set the signal-to-noise (SNR) space to be randomly sampled.""" return self._snr_function_args
Methods
def generate(self, fg_seeds_ss: SampleSet, bg_seeds_ss: SampleSet, skip_health_check: bool = False, verbose: bool = True) ‑> List[Tuple[SampleSet, SampleSet, SampleSet]]-
Generate a list of
SampleSets where each contains a pass-by as a sequence of spectra.Args
fg_seeds_ss- spectra normalized by total counts to be used as the source component(s) of spectra
bg_seeds_ss- spectra normalized by total counts to be used as the background components of gross spectra
skip_health_check- whether to skip seed health checks
verbose- whether to display output from synthesis
Returns
List of tuples of SampleSets where each tuple represents a pass-by event
Raises
ValueErrorwhen either foreground of background seeds are not providedAssertionErrorif seed health check fails
Expand source code Browse git
def generate(self, fg_seeds_ss: SampleSet, bg_seeds_ss: SampleSet, skip_health_check: bool = False, verbose: bool = True) \ -> List[Tuple[SampleSet, SampleSet, SampleSet]]: """Generate a list of `SampleSet`s where each contains a pass-by as a sequence of spectra. Args: fg_seeds_ss: spectra normalized by total counts to be used as the source component(s) of spectra bg_seeds_ss: spectra normalized by total counts to be used as the background components of gross spectra skip_health_check: whether to skip seed health checks verbose: whether to display output from synthesis Returns: List of tuples of SampleSets where each tuple represents a pass-by event Raises: - `ValueError` when either foreground of background seeds are not provided - `AssertionError` if seed health check fails """ if not fg_seeds_ss or not bg_seeds_ss: raise ValueError("At least one foreground and background seed must be provided.") if not skip_health_check: fg_seeds_ss.check_seed_health() bg_seeds_ss.check_seed_health() self._reset_progress() if verbose: tstart = time() args = [] for bg_i in range(bg_seeds_ss.n_samples): bg_pmf = bg_seeds_ss.spectra.iloc[bg_i] bg_sources = bg_seeds_ss.sources.iloc[bg_i] fwhm_targets = get_distribution_values(self.fwhm_function, self.fwhm_function_args, self.events_per_seed, self._rng) snr_targets = get_distribution_values(self.snr_function, self.snr_function_args, self.events_per_seed, self._rng) dwell_time_targets = get_distribution_values(self.dwell_time_function, self.dwell_time_function_args, self.events_per_seed, self._rng) for fg_i in range(fg_seeds_ss.n_samples): fg_pmf = fg_seeds_ss.spectra.iloc[fg_i] fg_sources = fg_seeds_ss.sources.iloc[fg_i] for t_i in range(self.events_per_seed): fwhm = fwhm_targets[t_i] snr = snr_targets[t_i] dwell_time = dwell_time_targets[t_i] pb_args = (fg_i, fwhm, snr, dwell_time, fg_pmf, bg_pmf, fg_sources, bg_sources) args.append(pb_args) # TODO: the following prevents periodic progress reports passbys = [] for a in args: f, fwhm, snr, dwell_time, fg_pmf, bg_pmf, fg_sources, bg_sources = a fg_passby_ss, gross_passby_ss = self._generate_single_passby( fwhm, snr, dwell_time, fg_pmf, bg_pmf, fg_sources, bg_sources ) fg_seed_info = fg_seeds_ss.info.iloc[f] set_ss_info_from_seed_info(fg_passby_ss, fg_seed_info, self._synthesis_start_dt) set_ss_info_from_seed_info(gross_passby_ss, fg_seed_info, self._synthesis_start_dt) passbys.append((fg_passby_ss, gross_passby_ss)) if verbose: delay = time() - tstart self._report_completion(delay) return passbys
class SampleSet-
A collection of spectrum samples and their metadata.
Expected sizes for DataFrames: self._spectra: [n_samples, n_channels] self._sources: [n_samples, n_sources]
Expand source code Browse git
class SampleSet(): """A collection of spectrum samples and their metadata.""" DEAD_TIME_PROP_INFO_KEY = "dead_time_prop" SOURCES_MULTI_INDEX_NAMES = ( "Category", "Isotope", "Seed" ) ECAL_INFO_COLUMNS = ( "ecal_order_0", "ecal_order_1", "ecal_order_2", "ecal_order_3", "ecal_low_e" ) DEFAULT_INFO_COLUMNS = ( "description", "timestamp", "live_time", "real_time", "total_counts", "snr", "neutron_counts", "distance_cm", *ECAL_INFO_COLUMNS, "areal_density", "atomic_number", "occupancy_flag", "tag", ) DEFAULT_BG_SEED_NAMES = [ "Cosmic", "PotassiumInSoil", "UraniumInSoil", "ThoriumInSoil", ] SUPPORTED_STATES_FOR_ARITHMETIC = ( SpectraState.Counts.value, SpectraState.L1Normalized.value ) DEFAULT_EXCLUSIONS_FROM_COMPARISON = ( "timestamp", "description", "occupancy_flag", "tag", ) def __init__(self): """ Expected sizes for DataFrames: self._spectra: [n_samples, n_channels] self._sources: [n_samples, n_sources] """ self._spectra = pd.DataFrame() self._sources = pd.DataFrame() self._info = pd.DataFrame(columns=SampleSet.DEFAULT_INFO_COLUMNS) self._detector_info = {} self._synthesis_info = {} self._prediction_probas = pd.DataFrame() self._measured_or_synthetic = None self.pyriid_version = riid.__version__ self._spectra_state = SpectraState.Unknown self._spectra_type = SpectraType.Unknown self._classified_by = "" def __bool__(self): return bool(len(self)) def __getitem__(self, key: Union[slice, int, list]): selection = key if isinstance(key, int): selection = slice(key, key+1) elif isinstance(key, pd.Series) and key.dtype == bool: selection = self.spectra.index[key] sub_ss = copy.copy(self) sub_ss.spectra = sub_ss.spectra.iloc[selection].reset_index(drop=True) sub_ss.sources = sub_ss.sources.iloc[selection].reset_index(drop=True) sub_ss.info = sub_ss.info.iloc[selection].reset_index(drop=True) if not sub_ss.prediction_probas.empty: sub_ss.prediction_probas = sub_ss.prediction_probas.iloc[selection] \ .reset_index(drop=True) return sub_ss def __len__(self): return self.n_samples def __str__(self): UNKNOWN_VALUE = "Unknown" detector_info_str = self.detector_info if self.detector_info else UNKNOWN_VALUE sources_present_str = ", ".join(sorted(np.unique(self.get_labels()))) \ if not self.sources.empty \ else UNKNOWN_VALUE predictions_str = ", ".join(sorted(np.unique(self.get_labels()))) \ if not self.prediction_probas.empty \ else "None" pyriid_version_str = self.pyriid_version if self.pyriid_version else UNKNOWN_VALUE info_dict = { "# of samples": self.n_samples, "# of channels": self.n_channels, "Detector": detector_info_str, "Present predictions": predictions_str, "Present sources": sources_present_str, "PyRIID version": pyriid_version_str, } summary = "\n".join(_dict_to_bulleted_list(info_dict)) return f"SampleSet Summary:\n{summary}" def __repr__(self): return self.__str__() def __eq__(self, ss: SampleSet): return np.allclose(self._spectra.values, ss._spectra.values, atol=1e-3) def _check_arithmetic_supported(self, ss2: SampleSet): if ss2.n_samples != 1 and ss2.n_samples != self.n_samples: n_sample_str = ( "You can only add/subtract SampleSet objects " "when the second SampleSet has only one spectrum " "(the spectrum will be scaled to match each sample of the first SampleSet) or " "when the second SampleSet has the same number of samples as the first " "(no rescaling will occur)." ) raise InvalidSampleCountError(n_sample_str) if self.n_channels != ss2.n_channels: channel_str = f"({self.n_channels} != {ss2.n_channels})" raise ChannelCountMismatchError(f"Mismatched spectra channels {channel_str}!") if self.spectra_state != ss2.spectra_state: state_str = f"({self.spectra_state} != {ss2.spectra_state})" raise SpectraStateMismatchError(f"Mismatched spectra states {state_str}!") if self.spectra_state.value not in self.SUPPORTED_STATES_FOR_ARITHMETIC: raise ValueError(f"{self.spectra_state} spectra state not supported for arithmetic!") # Don't need to check spectra state of ss2 since they passed the prior equality check def _get_scaled_spectra(self, ss: SampleSet) -> np.ndarray: spectra_in_counts = ss.spectra.iloc[0].values.copy() if ss.spectra_state == SpectraState.L1Normalized: spectra_in_counts *= ss.info.iloc[0].total_counts live_times = ss.info.iloc[0].live_time spectra_in_cps = spectra_in_counts / live_times scaled_spectra = np.concatenate( spectra_in_cps * self.info.live_time.values[:, np.newaxis, np.newaxis] ) return scaled_spectra def _get_arithmetic_result(self, ss: SampleSet, op) -> SampleSet: self._check_arithmetic_supported(ss) if ss.n_samples == 1: scaled_spectra = self._get_scaled_spectra(ss) else: scaled_spectra = ss.spectra.values # Assumed to already be scaled new_ss = self[:] is_l1_normalized = new_ss.spectra_state == SpectraState.L1Normalized if is_l1_normalized: new_ss.spectra = new_ss.spectra.multiply(new_ss.info.total_counts, axis=0) new_ss.spectra = op(new_ss.spectra, scaled_spectra) if is_l1_normalized: new_ss.normalize() return new_ss def __add__(self, bg_ss: SampleSet) -> SampleSet: """Add the given background spectr(um|a) to the spectra of the current SampleSet. """ if bg_ss.spectra_type.value != SpectraType.Background.value: msg = ( "`bg_ss` argument must have a `spectra_type` of `Background`. " f"Its `spectra_type` is `{bg_ss.spectra_type}`." ) raise ValueError(msg) if self.spectra_type.value != SpectraType.Gross.value and \ self.spectra_type.value != SpectraType.Foreground.value: msg = ( "Current `SampleSet` must have a `spectra_type` of `Gross` or `Foreground`. " f"Current `spectra_type` is `{self.spectra_type}`." ) raise ValueError(msg) new_ss = self._get_arithmetic_result(bg_ss, operator.add) new_ss.spectra_type = SpectraType.Gross return new_ss def __sub__(self, ss: SampleSet) -> SampleSet: """Subtract the given background or foreground spectr(um|a) from the spectra of the current `SampleSet`. """ if ss.spectra_type.value != SpectraType.Background.value and \ ss.spectra_type.value != SpectraType.Foreground.value: msg = ( "`ss` argument must have a `spectra_type` of `Background` of `Foreground`. " f"Its `spectra_type` is `{ss.spectra_type}`." ) raise ValueError(msg) if self.spectra_type.value != SpectraType.Gross.value: msg = ( "Current `SampleSet` must have a `spectra_type` of `Gross`. " f"Current `spectra_type` is `{self.spectra_type}`." ) raise ValueError(msg) new_ss = self._get_arithmetic_result(ss, operator.sub) if ss.spectra_type == SpectraType.Background: new_ss.spectra_type = SpectraType.Foreground else: new_ss.spectra_type = SpectraType.Background return new_ss # region Properties @property def category_names(self): """Get the names of the categories involved in this SampleSet.""" return self.sources.columns.get_level_values("Category") @property def classified_by(self): """Get or set the UUID of the model that classified the SampleSet. TODO: Implement as third dimension to `prediction_probas` DataFrame """ return self._classified_by @classified_by.setter def classified_by(self, value): self._classified_by = value @property def detector_info(self): """Get or set the detector info on which this `SampleSet` is based. TODO: implement as DataFrame """ return self._detector_info @detector_info.setter def detector_info(self, value): self._detector_info = value @property def difficulty_score(self, mean=10.0, std=3.0) -> float: """Compute the "difficulty" of the `SampleSet` on a scale of 0 to 1, where 0 is easiest and 1 is hardest. The difficulty of a `SampleSet` is the mean of the individual sample difficulties. Each sample's difficulty is determined by where its signal strength (SNR) falls on the survival function (AKA reliability function, or 1 - CDF) for a logistic distribution. The decision to use this distribution is based on empirical data showing that classifier performance increases in a logistic fashion as signal strength increases. The primary use of this function is for situations where ground truth is unknown and you want to get an idea of how "difficult" it will be for a model to make predictions. Consider the following example: Given a `SampleSet` for which ground truth is unknown and the signal strength for each spectrum is low. A model considering, say, 100+ isotopes will see this `SampleSet` as quite difficult, whereas a model considering 5 isotopes will, being more specialized, see the `SampleSet` as easier. Of course, this makes the assumption that both models were trained to the isotope(s) contained in the test `SampleSet`. Based on the previous example, the unfortunate state of this function is that you must know how to pick means and standard deviations which properly reflect the: - number of target isotopes - amount of variation within each isotope (i.e., shielding, scattering, etc.) - detector resolution A future goal of ours is to provide updates to this function and docstring which make choosing the mean and standard deviation easier/automated based on more of our findings. Arguments: mean: SNR value representing the mean of the logistic distribution std: standard deviation of the logistic distribution Returns: The mean of all SNR values passed through a logistic survival function """ snrs: np.ndarray = self.info.snr.clip(1e-6) score = float(stats.logistic.sf(snrs, loc=mean, scale=std).mean()) return score @property def ecal(self): """Get or set the ecal terms.""" ecal_terms = self.info[list(self.ECAL_INFO_COLUMNS)].to_numpy(dtype=float) return ecal_terms @ecal.setter def ecal(self, value): self.info.loc[:, self.ECAL_INFO_COLUMNS[0]] = value[0] self.info.loc[:, self.ECAL_INFO_COLUMNS[1]] = value[1] self.info.loc[:, self.ECAL_INFO_COLUMNS[2]] = value[2] self.info.loc[:, self.ECAL_INFO_COLUMNS[3]] = value[3] self.info.loc[:, self.ECAL_INFO_COLUMNS[4]] = value[4] @property def info(self): """Get or set the info DataFrame.""" return self._info @info.setter def info(self, value): self._info = value @property def isotope_names(self): """Get the names of the isotopes involved in this SampleSet.""" return self.sources.columns.get_level_values("Isotope") @property def measured_or_synthetic(self): """Get or set the value for measured_or_synthetic for the SampleSet object.""" return self._measured_or_synthetic @measured_or_synthetic.setter def measured_or_synthetic(self, value): self._measured_or_synthetic = value @property def n_channels(self): """Get the number of channels included in the spectra DataFrame. Channels may also be referred to as bins. """ return self._spectra.shape[1] @property def n_samples(self): """Get the number of samples included in the spectra dataframe, where each row is a sample. """ return self._spectra.shape[0] @property def prediction_probas(self): """Get or set the prediction_probas DataFrame.""" return self._prediction_probas @prediction_probas.setter def prediction_probas(self, value): if isinstance(value, pd.DataFrame): self._prediction_probas = value else: self._prediction_probas = pd.DataFrame(value, columns=self.sources.columns) @property def seed_names(self): """Get the names of the seeds involved in this `SampleSet`.""" return self.sources.columns.get_level_values("Seed") @property def sources(self): """Get or set the sources `DataFrame`.""" return self._sources @sources.setter def sources(self, value): self._sources = value.replace(np.nan, 0) @property def spectra(self): """Get or set the spectra `DataFrame`.""" return self._spectra @spectra.setter def spectra(self, value): self._spectra = value @property def spectra_state(self: SpectraState): """Get or set the spectra state.""" return self._spectra_state @spectra_state.setter def spectra_state(self, value: SpectraState): self._spectra_state = value @property def spectra_type(self: SpectraType): """Get or set the spectra type.""" return self._spectra_type @spectra_type.setter def spectra_type(self, value: SpectraType): self._spectra_type = value @property def synthesis_info(self): """Get or set the information that was used to by a synthesizer.""" return self._synthesis_info @synthesis_info.setter def synthesis_info(self, value): self._synthesis_info = value # endregion def _channels_to_energies(self, fractional_energy_bins, offset: float, gain: float, quadratic: float, cubic: float, low_energy: float) -> np.ndarray: r"""Convert fractional energy bins to the energy represented by the lower edge of the bin. A spectrum represents a range of energy (e.g., 0 keV to 3000 keV). The energy range represented by a spectrum can be inferred from its energy calibration (e-cal for short), if known. Using the convention in ANSI N42.42-2006, the e-cal is defined using the following terms: - \( a_0 \): offset (ecal_order_0) - \( a_1 \): gain (ecal_order_1) - \( a_2 \): quadratic (ecal_order_2) - \( a_3 \): cubic (ecal_order_3) - \( a_4 \): low energy (ecal_low_e) The energy represented by the lower edge of channel \( i \) is then calculated as follows: $$ E_i = a_0 + a_1 * x + a_2 * x^2 + a_3 * a^3 + \frac{a_4}{1 + 60x} $$ where \( x \) is the fractional energy of the spectral range. Also, under this scheme and ignoring the non-linear terms (e.g., \( a_2 \) through \( a_4 \)), the range of energy represented by a spectrum can be quickly inferred from the e-cal as: - `min_energy = offset` - `max_energy = gain + offset` Note: spline interpolation is another way to characterize the energy calibration of a spectrum. In GADRAS, this is accomplished via deviation pairs; PyRIID does not currently support this but may in the future. Args: fractional_energy_bins: array of fractions representing the spacing of the channels offset: e-cal order 0 term gain: e-cal order 1 term quadratic: e-cal order 2 term cubic: e-cal order 3 term low_energy: e-cal low energy term Returns: Array containing the energies represented by the lower edge of each channel (hape depends on the shape of input values) """ channel_energies = \ offset + \ fractional_energy_bins * gain + \ fractional_energy_bins**2 * quadratic + \ fractional_energy_bins**3 * cubic + \ low_energy / (1 + 60 * fractional_energy_bins) return channel_energies def _check_target_level(self, target_level, levels_allowed=SOURCES_MULTI_INDEX_NAMES): if target_level not in levels_allowed: raise ValueError(( f"'{target_level}' is not an appropriate target level. " f"Acceptable values are: {levels_allowed}." )) def _get_dead_time_proportions(self): live_times = self.info.live_time.values real_times = self.info.real_time.values dead_time_props = (1.0 - live_times / real_times) dead_time_props = np.nan_to_num(dead_time_props, nan=1.0) return dead_time_props def all_spectra_sum_to_one(self, rtol: float = 0.0, atol: float = 1e-4) -> bool: """Checks if all spectra are normalized to sum to one.""" spectra_counts = self.spectra.sum(axis=1).values all_sum_to_one = np.all(np.isclose(spectra_counts, 1, rtol=rtol, atol=atol)) return all_sum_to_one def as_ecal(self, new_offset: float, new_gain: float, new_quadratic: float, new_cubic: float, new_low_energy: float) -> SampleSet: """Re-bin spectra based on energy by interpolating the current shape from the current binning structure to a new one. Warning: this operation is likely to add or remove spectral information depending on the new energy calibration values used. Args: new_offset: new gain value, i.e. the 0-th e-cal term new_gain: new gain value, i.e. the 1-th e-cal term new_quadratic: new quadratic value, i.e. the 2-th e-cal term new_cubic: new cubic value, i.e. the 3-th e-cal term new_low_energy: new low energy term Returns: A new `SamleSet` with `spectra` and `info` DataFrames Raises: `ValueError` when no argument values are provided """ new_args = [new_offset, new_gain, new_quadratic, new_cubic, new_low_energy] if all(v is None for v in new_args): raise ValueError("At least one argument value must be provided.") ecal_cols = list(self.ECAL_INFO_COLUMNS) new_ecal = self.info[ecal_cols].copy() new_ecal.ecal_order_0 = new_offset new_ecal.ecal_order_1 = new_gain new_ecal.ecal_order_2 = new_quadratic new_ecal.ecal_order_3 = new_cubic new_ecal.ecal_low_e = new_low_energy all_original_channel_energies = self.get_all_channel_energies() all_new_channel_energies = [] fractional_energy_bins = np.linspace(0, 1, self.n_channels) for i in range(self.n_samples): offset, gain, quadratic, cubic, low_energy = new_ecal.iloc[i].values new_channel_energies = self._channels_to_energies( fractional_energy_bins, offset, gain, quadratic, cubic, low_energy ) all_new_channel_energies.append(new_channel_energies) new_spectra = np.zeros([self.n_samples, self.n_channels]) # Perform interpolation for i in range(self.n_samples): f = interp1d( all_original_channel_energies[i], self.spectra.values[i], kind="slinear", fill_value=0, bounds_error=False ) new_spectrum = f(all_new_channel_energies[i]) new_spectra[i] = new_spectrum # Update spectra and info DataFrames new_ss = self[:] new_ss.spectra = pd.DataFrame(new_spectra) new_ss.info.total_counts = new_ss.spectra.sum(axis=1) new_ss.info[ecal_cols] = new_ecal return new_ss def as_regions(self, rois: list) -> SampleSet: """Obtains a new `SampleSet` where the spectra are limited to specific regions of interest (ROIs). Notes: - If your samples have disparate energy calibration terms, call `as_ecal()` first to align channel space, then you may call this function. Otherwise, it is possible to end up with a ragged array of spectra, which we do not support. - After this call, `spectra` will have columns filled in with energy values for convenience. As such, in the context of the returned `SampleSet`, the energy calibration terms in `info` will no longer have any meaning, and any subsequent calls to methods like `as_ecal()` would not make sense. This method is intended as a last step to be performed right before analysis of whatever kind. Args: rois: a list of 2-tuples where tuple represents (low energy, high energy) Returns: A new `SamleSet` with only ROIs remaining in the `spectra` DataFrame Raises: `ValueError` when no argument values are provided """ if not rois: raise ValueError("At least one ROI must be provided.") all_ecals = self.ecal all_ecals_are_same = np.isclose(all_ecals, all_ecals[0]).all() if not all_ecals_are_same: msg = "Spectra have different energy calibrations; consider `as_ecal()` first." raise ValueError(msg) energies = self.get_channel_energies(0) mask = _get_energy_roi_masks(rois, energies) new_spectra = self.spectra.to_numpy(dtype=float)[:, mask] new_spectra = new_spectra.reshape((self.n_samples, -1)) mask_energies = energies[mask] new_ss = self[:] new_ss.spectra = pd.DataFrame(new_spectra, columns=mask_energies) new_ss.info.total_counts = new_ss.spectra.sum(axis=1) return new_ss def check_seed_health(self, dead_time_threshold=1.0): """Checks health of all spectra and info assuming they are seeds. Invalidate states for which we currently check: - spectra do not sum to 1 - dead time greater than or equal to provided threshold Args: dead_time_threshold: value at which seed dead time is unacceptable Raises: `AssertionError` if any check fails """ all_spectra_sum_to_one = self.all_spectra_sum_to_one() assert all_spectra_sum_to_one if self.DEAD_TIME_PROP_INFO_KEY not in self.info.columns: self.set_dead_time_proportions() dead_samples = self.info[self.DEAD_TIME_PROP_INFO_KEY] >= dead_time_threshold all_samples_are_alive = not np.any(dead_samples) assert all_samples_are_alive def clip_negatives(self, min_value: float = 0): """Clip spectrum values to some minimum value. Args: min_value: value to which to clip existing spectrum values """ self._spectra = pd.DataFrame(data=self._spectra.clip(min_value)) def compare_to(self, ss: SampleSet, n_bins: int = 20, density: bool = False) \ -> Tuple[dict, dict, dict]: """Compare the current `SampleSet` to another `SampleSet`. The function only compares the selected, mutual information of each `SampleSet` by computing the Jensen-Shannon distance between histograms of that information. Args: ss: `SampleSet` to compare to n_bins: number of bins we will sort both sample sets by density: whether histograms should be in counts or density Returns: Tuple of the following: - ss1_stats: dictionary of stats describing the first `SampleSet` - ss2_stats: dictionary of stats describing the second `SampleSet` - col_comparisons: dictionary of distance values comparing each stat """ TARGET_SUMMARY_STATS = ["min", "max", "median", "mean", "std"] STAT_PRECISION = 3 # Get info columns we want to compare info_columns = [x for x in list(self.DEFAULT_INFO_COLUMNS) if x not in list(self.DEFAULT_EXCLUSIONS_FROM_COMPARISON)] # Get both sample sets comparable columns of data info_df1 = self.info[info_columns] info_df2 = ss.info[info_columns] # Check valid columns in each sample set (cannot have None or 0) ss1_valid_cols = [ c for c in info_df1.columns if pd.to_numeric(info_df1[c], errors="coerce").notnull().all() and any(info_df1[c]) ] ss2_valid_cols = [ c for c in info_df2.columns if pd.to_numeric(info_df2[c], errors="coerce").notnull().all() and any(info_df2[c]) ] # Remove non shared column lists for i in ss1_valid_cols: if i not in ss2_valid_cols: ss1_valid_cols.remove(i) for i in ss2_valid_cols: if i not in ss1_valid_cols: ss2_valid_cols.remove(i) # Remove non shared column data info_df1 = info_df1[ss1_valid_cols] info_df2 = info_df2[ss2_valid_cols] # Bin the data ss1_stats = {} ss2_stats = {} col_comparisons = {} for i in ss1_valid_cols: ss1_stats[i] = {} ss2_stats[i] = {} hist_range = ( min(min(info_df1[i]), min(info_df2[i])), max(max(info_df1[i]), max(info_df2[i])) ) # Get data in bins and get counts for each bin hist1, bins1 = np.histogram(info_df1[i], bins=n_bins, range=hist_range, density=density) hist2, bins2 = np.histogram(info_df2[i], bins=n_bins, range=hist_range, density=density) ss1_stats[i]["density"] = density ss2_stats[i]["density"] = density ss1_stats[i]["hist"] = hist1 ss2_stats[i]["hist"] = hist2 ss1_stats[i]["bins"] = bins1 ss2_stats[i]["bins"] = bins2 ss1_stats[i]["range"] = hist_range ss2_stats[i]["range"] = hist_range summary_stats_df1 = info_df1[i].agg(TARGET_SUMMARY_STATS).round(decimals=STAT_PRECISION) ss1_stats[i].update(summary_stats_df1.to_dict()) summary_stats_df2 = info_df2[i].agg(TARGET_SUMMARY_STATS).round(decimals=STAT_PRECISION) ss2_stats[i].update(summary_stats_df2.to_dict()) js_dist = distance.jensenshannon(hist1, hist2, axis=0) col_comparisons[i] = js_dist return ss1_stats, ss2_stats, col_comparisons def concat(self, ss_list: list): """Vertically concatenate multiple `SampleSet`s into one `SampleSet`. Combining of non-DataFrame properties (e.g., detector_info, synthesis_info, etc.) is NOT performed - it is the responsiblity of the user to ensure proper bookkeeping for these properties when concatenating `SampleSet`s. This method is most applicable to `SampleSet`s containing measured data from the same detector which could not be made as a single `SampleSet` because of how measurements had to be taken (i.e., measurements were taken by distinct processes separated by time). Therefore, the user should avoid concatenating `SampleSet`s when: - the data is from different detectors (note that `SampleSet`s are given to models for prediction, and models should be 1-to-1 with physical detectors); - a mix of synthetic and measured data would occur (this could make an existing `synthesis_info` value ambiguous, but one way to handle this would be to add a new column to `info` distinguishing between synthetic and measured on a per-sample basis). Args: ss_list: list of `SampleSet`s to concatenate Returns: `SampleSet` object """ if not ss_list: return if not isinstance(ss_list, list) and not isinstance(ss_list, tuple): ss_list = [ss_list] spectra_types = list( set([self.spectra_type.value] + [x.spectra_type.value for x in ss_list]) ) n_spectra_types = len(spectra_types) if n_spectra_types == 2: if SpectraType.Background.value in spectra_types and \ SpectraType.Foreground.value in spectra_types: self.spectra_type = SpectraType.BackgroundForeground else: self.spectra_type = SpectraType.Unknown elif n_spectra_types == 0 or n_spectra_types > 2: self.spectra_type = SpectraType.Unknown self._spectra = pd.concat( [self._spectra] + [ss.spectra for ss in ss_list], ignore_index=True, sort=False ) self._sources = pd.concat( [self._sources] + [ss.sources for ss in ss_list], ignore_index=True, sort=False ) self._sources = self._sources.where(pd.notnull(self._sources), 0) existing_info_df = self._info if not self._info.empty else None self._info = pd.concat( [existing_info_df] + [ss.info for ss in ss_list], ignore_index=True, sort=False ) self._info = self._info.where(pd.notnull(self._info), None) self._prediction_probas = pd.concat( [self._prediction_probas] + [ss.prediction_probas for ss in ss_list], ignore_index=True, sort=False ) self._prediction_probas = self._prediction_probas.where( pd.notnull(self._prediction_probas), 0 ) def downsample_spectra(self, target_bins: int = 128, min_frac=1e-8): """Uniformly down-bin spectra. Args: target_bins: number of channels to which to bin the existing spectra Raises: `ValueError` when binning is not a multiple of the target binning """ if self.n_channels % target_bins == 0: n_per_group = int(self.n_channels / target_bins) transformation = np.zeros([target_bins, self.n_channels]) for i in range(target_bins): transformation[i, (i * n_per_group):((i * n_per_group) + n_per_group)] = 1 else: msg = "Current binning ({}) is not a multiple of the target binning ({})".format( self.n_channels, target_bins ) raise ValueError(msg) self._spectra = pd.DataFrame( data=np.matmul( self._spectra.values, transformation.T ) ) self._spectra[self._spectra < min_frac] = 0 def drop_sources(self, col_names: Iterable = DEFAULT_BG_SEED_NAMES, normalize_sources: bool = True, target_level: str = "Seed"): """Drop columns from `sources`. Args: col_names: names of the sources columns to be dropped. The names of background seeds are used by default as removing the ground truth for background sources when dealing with synthetic, gross spectra is a common operation. normalize_sources: whether to normalize the sources DataFrame after dropping the column(s) target_level: `SampleSet.sources` column level to use """ self.sources.drop(col_names, axis=1, inplace=True, level=target_level) if normalize_sources: self.normalize_sources() def drop_sources_columns_with_all_zeros(self): """Remove columns from the sources `DataFrame` that contain only zeros. Modifications are made in-place. """ idxs = (self.sources != 0).any(axis=0) self.sources = self.sources.loc[:, idxs] self.sources.columns = self.sources.columns.remove_unused_levels() return idxs def drop_spectra_with_no_contributors(self) -> np.ndarray: """Remove samples where the spectrum has no recorded contributor. Modifications are inplace. Returns: A boolean mask of which rows were kept """ idxs = np.where(self.sources.values.sum(axis=1) == 0)[0] kept_mask = ~np.in1d(self.sources.index.values, idxs) self.spectra = self.spectra.drop(index=idxs).reset_index(drop=True) self.sources = self.sources.drop(index=idxs).reset_index(drop=True) self.info = self.info.drop(index=idxs).reset_index(drop=True) return kept_mask def extend(self, spectra: Union[dict, list, np.array, pd.DataFrame], sources: pd.DataFrame, info: Union[list, pd.DataFrame]): """Extend the current SampleSet with the given data. Always verify that the data was appended in the way you expect based on what input type you are preferring to work with. Args: spectra: spectra to append to the current spectra DataFrame sources: sources to append to the current sources DataFrame info: info to append to the current info DataFrame Raises: `ValueError` when internal `DataFrame` lengths do not match """ if not spectra.shape[0] == sources.shape[0] == info.shape[0]: msg = "The number of rows in each of the required positional arguments must be same." raise ValueError(msg) self._spectra = self._spectra\ .append(pd.DataFrame(spectra), ignore_index=True, sort=True)\ .fillna(0) new_sources_column_index = pd.MultiIndex.from_tuples( sources.keys(), names=SampleSet.SOURCES_MULTI_INDEX_NAMES ) new_sources_df = pd.DataFrame(sources.values(), columns=new_sources_column_index) self._sources = self._sources\ .append(new_sources_df, ignore_index=True, sort=True)\ .fillna(0) self._info = self._info\ .append(pd.DataFrame(info), ignore_index=True, sort=True) def get_all_channel_energies(self, fractional_energy_bins=None) -> np.ndarray: """Get the energy (in keV) represented by the lower edge of each channel for all samples. See docstring for `_channels_to_energies()` for more details. Args: fractional_energy_bins: array of fractions representing the spacing of the channels (for arbitrary channel structures) Returns: 2-D array of energy values in keV for all samples """ if fractional_energy_bins is None: fractional_energy_bins = np.linspace(0, 1, self.n_channels) all_channel_energies = [] for i in range(self.n_samples): channel_energies = self.get_channel_energies(i, fractional_energy_bins) all_channel_energies.append(channel_energies) return all_channel_energies def get_channel_energies(self, sample_index, fractional_energy_bins=None) -> np.ndarray: """Get the energy (in keV) represented by the lower edge of each channel for a single sample at a specific index. See docstring for `_channels_to_energies()` for more details. Args: sample_index: index of the specific sample for which to obtain energies fractional_energy_bins: array of fractions representing the spacing of the channels (for arbitrary channel structures) Returns: Array of energy values in keV """ if fractional_energy_bins is None: fractional_energy_bins = np.linspace(0, 1, self.n_channels) # TODO: raise error if ecal info is missing for any row offset, gain, quadratic, cubic, low_energy = self.ecal[sample_index] channel_energies = self._channels_to_energies( fractional_energy_bins, offset, gain, quadratic, cubic, low_energy ) return channel_energies def get_confidences(self, fg_seeds_ss: SampleSet, bg_seed_ss: SampleSet = None, bg_cps: float = None, is_lpe: bool = False, confidence_func: Callable = None, **confidence_func_kwargs) -> np.ndarray: """Get confidence measure for predictions as a NumPy array. Args: fg_seeds_ss: `Sampleset` containing foreground seeds bg_seed_ss: `Sampleset` containing a single background seed bg_cps: count rate used to scale `bg_seed_ss` is_lpe: whether predictions are for multi-class classification or label proportion estimation (LPE) confidence_func: metric function used to compare spectral reconstructions and confidence_func_kwargs: kwargs for confidence metric function Returns: Array containing confidence metric for each prediction """ if fg_seeds_ss.spectra_type != SpectraType.Foreground: msg = ( "`fg_seeds_ss` must have a `spectra_type` of `Foreground`. " f"Its `spectra_type` is `{fg_seeds_ss.spectra_type}`." ) raise ValueError(msg) if self.spectra_type == SpectraType.Gross: if self.info.total_counts.isna().any() or (self.info.total_counts <= 0).any(): msg = ( "This `SampleSet` must have the `info.total_counts` column filled in " "with values greater than zero." "If there are samples with spectra of all zeros, remove them first." ) raise ValueError(msg) if self.info.live_time.isna().any() or (self.info.live_time <= 0).any(): msg = ( "This `SampleSet` must have the `info.live_time` column filled in " "with values greater than zero." "If there are samples with live of zero, remove them first." ) raise ValueError(msg) if not bg_seed_ss: raise ValueError("`bg_seed_ss` is required for current `spectra_type` of `Gross`.") if bg_seed_ss.n_samples != 1: raise ValueError("`bg_seed_ss` must have exactly one sample.") if bg_seed_ss.spectra_type != SpectraType.Background: msg = ( "`bg_seed_ss` must have a `spectra_type` of `Background`. " f"Its `spectra_type` is `{bg_seed_ss.spectra_type}`." ) raise ValueError(msg) if not bg_cps or bg_cps <= 0: raise ValueError("Positive background count rate required.") if confidence_func is None: from riid.losses import jensen_shannon_divergence confidence_func = jensen_shannon_divergence if is_lpe: recon_proportions = self.prediction_probas.values else: max_indices = np.argmax(self.prediction_probas.values, axis=1) recon_proportions = np.zeros(self.prediction_probas.values.shape) recon_proportions[np.arange(self.n_samples), max_indices] = 1 reconstructions = np.dot(recon_proportions, fg_seeds_ss.spectra.values) if self.spectra_type == SpectraType.Gross: bg_spectrum = bg_seed_ss.spectra.iloc[0].values normalized_bg_spectrum = bg_spectrum / bg_spectrum.sum() bg_counts = bg_cps * self.info.live_time.values fg_counts = (self.info.total_counts.values - bg_counts).clip(1) scaled_bg_spectra = get_expected_spectra(normalized_bg_spectrum, bg_counts) scaled_fg_spectra = reconstructions * fg_counts[:, np.newaxis] reconstructions = scaled_fg_spectra + scaled_bg_spectra confidences = confidence_func( reconstructions.astype(np.float64), self.spectra.values, **confidence_func_kwargs ) confidences = np.array(confidences) return confidences def _get_spectral_distances(self, distance_func=distance.jensenshannon) -> np.array: n_samples = self.n_samples spectra = self.spectra.values.copy() spectrum_pairs = [ (spectra[i], spectra[j]) for i in range(n_samples) for j in range(i, n_samples) ] left_spectrum, right_spectrum = zip(*spectrum_pairs) distance_values = distance_func(left_spectrum, right_spectrum, axis=1) return distance_values def get_multiclass_jsds(self, fg_seeds_ss: SampleSet, target_level: str) -> list: """For each sample, this constructs a dictionary containing seed name to JSD using the sample's top prediction. Args: fg_seeds_ss: `SampleSet` from which to pull seeds for computing JSD prediction_target_level: the level at which predictions were made by a model Returns: List of dictionaries where keys are seed name and values are JSD """ test_prediction_labels = self.get_predictions(target_level) fg_seed_labels = fg_seeds_ss.get_labels(target_level) jsds = [] for i, pred_label in enumerate(test_prediction_labels): test_spectrum = self.spectra.iloc[i] seeds_for_pred = fg_seeds_ss[fg_seed_labels == pred_label] seed_labels_for_pred = seeds_for_pred.get_labels("Seed") jsds_for_sample = {} for j, seed_label in enumerate(seed_labels_for_pred): seed_spectrum = seeds_for_pred.spectra.iloc[j] jsd = distance.jensenshannon(test_spectrum, seed_spectrum) jsds_for_sample[seed_label] = jsd jsds.append(jsds_for_sample) return jsds def get_spectral_distance_matrix(self, distance_func=distance.jensenshannon, target_level="Seed") -> pd.DataFrame: """Compute the distance between all pairs of spectra. This method is intended for use on seed spectra (i.e., templates). Calling this method on large samplesets, such as those produced by the static synthesizer, is not recommended. This method only computes the upper triangle and diagonal of the matrix since the lower triangle would be a copy of the upper triangle. The diagonal is computed, for when Poisson noise is present, for two main reasons: - the distance between the same source will effectively never be zero. - there is sometimes research interest in comparing the diagonal to the upper triangle. Args: distance_func: specific distance function to use target_level: `SampleSet.sources` column level to use Returns: A 2-D array of distance values """ distance_values = self._get_spectral_distances(distance_func) row_labels = self.get_labels(target_level=target_level) col_labels = self.sources.columns.get_level_values(level=target_level) distance_df = _get_distance_df_from_values(distance_values, row_labels, col_labels) return distance_df def get_labels(self, target_level="Isotope", max_only=True, include_value=False, min_value=0.01, level_aggregation=None): """Get row ground truth labels for each sample based on `sources` values. See docstring for `_get_row_labels()` for more details. """ labels = _get_row_labels( self.sources, target_level=target_level, max_only=max_only, include_value=include_value, min_value=min_value, level_aggregation=level_aggregation, ) return labels def get_predictions(self, target_level="Isotope", max_only=True, include_value=False, min_value=0.01, level_aggregation=None): """Get row predictions for each spectrum based on `prediction_probas` values. See docstring for `_get_row_labels()` for more details. """ labels = _get_row_labels( self.prediction_probas, target_level=target_level, max_only=max_only, include_value=include_value, min_value=min_value, level_aggregation=level_aggregation, ) return labels def get_samples(self) -> np.ndarray: """Get `spectra` as a NumPy array. Replaces NaN values with 0. Returns: Array containing spectrum data """ spectra_values = np.nan_to_num(self._spectra) return spectra_values def get_source_contributions(self, target_level="Isotope") -> np.ndarray: """Get `sources` as a NumPy array. Replaces NaN values with 0. Returns: Array containing the ground truth contributions for each sample """ collapsed_sources = self.sources.T.groupby(target_level).sum().T sources_values = np.nan_to_num(collapsed_sources) return sources_values def normalize(self, p: float = 1, clip_negatives: bool = True): """Apply L-p normalization to `spectra` in place. Default is L1 norm (p=1) can be interpreted as a forming a probability mass function (PMF). L2 norm (p=2) is unit energy (sum of squares == 1). Reference: https://en.wikipedia.org/wiki/Parseval's_theorem Args: p: exponent to which the spectra values are raised clip_negatives: whether negative values will be removed from the spectra and replaced with 0 """ if clip_negatives: self.clip_negatives() elif (self._spectra.values < 0).sum(): logging.warning( "You are performing a normalization operation on spectra which contain negative " + "values. Consider applying `clip_negatives`." ) energy = (self._spectra.values ** p).sum(axis=1)[:, np.newaxis] energy[energy == 0] = 1 self._spectra = pd.DataFrame(data=self._spectra.values / energy ** (1/p)) if p == 1: self.spectra_state = SpectraState.L1Normalized elif p == 2: self.spectra_state = SpectraState.L2Normalized else: self.spectra_state = SpectraState.Unknown def normalize_sources(self): """Normalize `sources` such that rows sum to 1.""" row_sums = self._sources.sum(axis=1) row_sums[row_sums == 0] = 1 self._sources = self._sources.clip(0).divide( row_sums, axis=0, ) def replace_nan(self, replace_value: float = 0): """Replace np.nan values. Args: replace_value: value with which NaN will be replaced """ self._spectra.replace(np.nan, replace_value) self._sources.replace(np.nan, replace_value) self._info.replace(np.nan, replace_value) def sample(self, n_samples: int, random_seed: int = None) -> SampleSet: """Randomly sample the SampleSet. Args: n_samples: number of random samples to be returned random_seed: seed value for random number generation Returns: `Sampleset` of randomly selected measurements """ if random_seed: random.seed(random_seed) if n_samples > self.n_samples: n_samples = self.n_samples random_indices = random.sample(self.spectra.index.values.tolist(), n_samples) random_mask = np.isin(self.spectra.index, random_indices) return self[random_mask] def set_dead_time_proportions(self): """Computes dead time proportion for each sample and saves it in the `SampleSet` info. """ dead_time_props = self._get_dead_time_proportions() self.info[self.DEAD_TIME_PROP_INFO_KEY] = dead_time_props def shuffle(self, inplace: bool = True, random_state: int = None) -> SampleSet: """Randomly reorder all DataFrames. Args: inplace: whether to perform the operation in-place random_state: random seed for reproducing specific shuffles Returns: `SampleSet` object when `inplace` is False """ new_row_ordering = np.arange(self.n_samples) np.random.default_rng(random_state).shuffle(new_row_ordering) new_ss = self if inplace else self[:] new_ss.spectra = self.spectra.iloc[new_row_ordering]\ .reset_index(drop=True) new_ss.sources = self.sources.iloc[new_row_ordering]\ .reset_index(drop=True) new_ss.info = self.info.iloc[new_row_ordering]\ .reset_index(drop=True) if not new_ss.prediction_probas.empty: new_ss.prediction_probas = self.prediction_probas.iloc[new_row_ordering]\ .reset_index(drop=True) if not inplace: return new_ss def sources_columns_to_dict(self, target_level="Isotope") -> Union[dict, list]: """Convert the MultiIndex columns of `sources` to a dictionary. Note: depending on `target_level` and `sources` columns, duplicate values are possible. Args: target_level: `SampleSet.sources` column level to use Returns: If `target_level` is "Category" or "Isotope," then a dict is returned. If `target_level` is "Seed," then a list is returned. Raises: `ValueError` when `target_level` is invalid """ self._check_target_level( target_level, levels_allowed=SampleSet.SOURCES_MULTI_INDEX_NAMES ) d = {} column_tuples = self.sources.columns.to_list() if target_level == "Seed": d = column_tuples elif target_level == "Isotope": for t in column_tuples: _, i, _ = t if i not in d: d[i] = [t] if t not in d[i]: d[i].append(t) else: # target_level == "Category": for t in column_tuples: c, i, _ = t if c not in d: d[c] = {} if i not in d[c]: d[c][i] = [t] if t not in d[c][i]: d[c][i].append(t) return d def split_fg_and_bg(self, bg_seed_names: Iterable = DEFAULT_BG_SEED_NAMES) \ -> Tuple[SampleSet, SampleSet]: """Split the current `SampleSet` into two new `SampleSet`s, one containing only foreground sources and the other containing only background sources. Foreground sources are assumed to be anything that is not designated as a background source. Args: bg_seeds_names: names of the seeds which are considered background sources. This list be customized to also extract atypical background sources such as calibration sources. Returns: Two `SampleSet`s, the first containing only foregrounds and the second only containing backgrounds. """ seed_labels = self.get_labels(target_level="Seed") bg_mask = seed_labels.isin(bg_seed_names) fg_seeds_ss = self[~bg_mask] fg_seeds_ss.drop_sources_columns_with_all_zeros() fg_seeds_ss.spectra_type = SpectraType.Foreground fg_seeds_ss.spectra_state = self.spectra_state bg_seeds_ss = self[bg_mask] bg_seeds_ss.drop_sources_columns_with_all_zeros() bg_seeds_ss.spectra_type = SpectraType.Background bg_seeds_ss.spectra_state = self.spectra_state return fg_seeds_ss, bg_seeds_ss def squash(self) -> SampleSet: """Combine all rows of the `SampleSet` into a single row. All data suited for summing is summed, otherwise the info from the first sample is used. Returns: A flattened `SampleSet` """ flat_spectra = self.spectra.sum(axis=0).to_frame().T flat_sources = self.sources.sum(axis=0).to_frame().T flat_prediction_probas = self.prediction_probas.sum(axis=0).to_frame().T flat_info = self.info.iloc[0].to_frame().T if "description" in flat_info: flat_info.description = "squashed" if "live_time" in flat_info: flat_info.live_time = self.info.live_time.sum() if "real_time" in flat_info: flat_info.real_time = self.info.real_time.sum() if "total_counts" in flat_info: flat_info.total_counts = self.info.total_counts.sum() if "snr" in flat_info: flat_info.snr = self.info.snr.sum() # Create a new SampleSet with the flattened data flat_ss = SampleSet() flat_ss.spectra = flat_spectra flat_ss.sources = flat_sources flat_ss.prediction_probas = flat_prediction_probas flat_ss.info = flat_info # Copy dictionary and other non-DataFrame objects from full ss flat_ss._detector_info = self._detector_info flat_ss._synthesis_info = self._synthesis_info flat_ss._measured_or_synthetic = self._measured_or_synthetic return flat_ss def to_hdf(self, path: str | Path, verbose=False, **kwargs): """Write the `SampleSet` to disk as a HDF file. Args: path: file path at which to save as an HDF file verbose: whether to display detailed output kwargs: additional arguments to be passed to the `Pandas.HDFStore` constructor Raises: `ValueError` when provided path extension is invalid """ path = Path(path) if path.suffix != riid.SAMPLESET_HDF_FILE_EXTENSION: logging.warning(f"Path does not end in {riid.SAMPLESET_HDF_FILE_EXTENSION}") _write_hdf(self, path, **kwargs) if verbose: logging.info(f"Saved SampleSet to '{path}'") def to_json(self, path: str | Path, verbose=False): """Write the `SampleSet` to disk as a JSON file. Warning: it is not recommended that you use this on large `SampleSet` objects. Consider `SampleSet.to_hdf()` instead in such cases. Args: path: file path at which to save as an HDF file verbose: whether to display detailed output Raises: `ValueError` when provided path extension is invalid """ path = Path(path) if path.suffix != riid.SAMPLESET_JSON_FILE_EXTENSION: logging.warning(f"Path does not end in {riid.SAMPLESET_JSON_FILE_EXTENSION}") _write_json(self, path) if verbose: logging.info(f"Saved SampleSet to '{path}'") def to_pcf(self, path: str | Path, verbose=False): """Write the `SampleSet` to disk as a PCF. Args: path: file path at which to save as a PCF verbose: whether to display detailed output Raises: `ValueError` when provided path extension is invalid """ path = Path(path) if path.suffix != riid.PCF_FILE_EXTENSION: logging.warning(f"Path does not end in {riid.PCF_FILE_EXTENSION}") _dict_to_pcf(_ss_to_pcf_dict(self, verbose), path, verbose) if verbose: logging.info(f"Saved SampleSet to '{path}'") def update_timestamp(self): """Set the timestamp for all samples to the current UTC date and time.""" self.info.timestamp = _get_utc_timestamp() def upsample_spectra(self, target_bins: int = 4096): """Uniformly up-bin spectra. Args: target_bins: number of bins into which the current spectra should be split. """ if target_bins % self.n_channels == 0: n_per_group = int(target_bins / self.n_channels) transformation = np.zeros([target_bins, self.n_channels]) for i in range(self.n_channels): transformation[(i * n_per_group):((i * n_per_group) + n_per_group), i] = \ 1.0 / n_per_group else: raise ValueError( ("Desired number of bins ({}) is cannot" " be uniformly mapped from existing" " channels ({})").format(target_bins, self.n_channels)) self._spectra = pd.DataFrame( data=np.matmul( self._spectra.values, transformation.T))Class variables
var DEAD_TIME_PROP_INFO_KEYvar DEFAULT_BG_SEED_NAMESvar DEFAULT_EXCLUSIONS_FROM_COMPARISONvar DEFAULT_INFO_COLUMNSvar ECAL_INFO_COLUMNSvar SOURCES_MULTI_INDEX_NAMESvar SUPPORTED_STATES_FOR_ARITHMETIC
Instance variables
var category_names-
Get the names of the categories involved in this SampleSet.
Expand source code Browse git
@property def category_names(self): """Get the names of the categories involved in this SampleSet.""" return self.sources.columns.get_level_values("Category") var classified_by-
Get or set the UUID of the model that classified the SampleSet.
TODO: Implement as third dimension to
prediction_probasDataFrameExpand source code Browse git
@property def classified_by(self): """Get or set the UUID of the model that classified the SampleSet. TODO: Implement as third dimension to `prediction_probas` DataFrame """ return self._classified_by var detector_info-
Get or set the detector info on which this
SampleSetis based.TODO: implement as DataFrame
Expand source code Browse git
@property def detector_info(self): """Get or set the detector info on which this `SampleSet` is based. TODO: implement as DataFrame """ return self._detector_info var difficulty_score : float-
Compute the "difficulty" of the
SampleSeton a scale of 0 to 1, where 0 is easiest and 1 is hardest.The difficulty of a
SampleSetis the mean of the individual sample difficulties. Each sample's difficulty is determined by where its signal strength (SNR) falls on the survival function (AKA reliability function, or 1 - CDF) for a logistic distribution. The decision to use this distribution is based on empirical data showing that classifier performance increases in a logistic fashion as signal strength increases.The primary use of this function is for situations where ground truth is unknown and you want to get an idea of how "difficult" it will be for a model to make predictions. Consider the following example: Given a
SampleSetfor which ground truth is unknown and the signal strength for each spectrum is low. A model considering, say, 100+ isotopes will see thisSampleSetas quite difficult, whereas a model considering 5 isotopes will, being more specialized, see theSampleSetas easier. Of course, this makes the assumption that both models were trained to the isotope(s) contained in the testSampleSet.Based on the previous example, the unfortunate state of this function is that you must know how to pick means and standard deviations which properly reflect the:
- number of target isotopes
- amount of variation within each isotope (i.e., shielding, scattering, etc.)
- detector resolution
A future goal of ours is to provide updates to this function and docstring which make choosing the mean and standard deviation easier/automated based on more of our findings.
Arguments
mean: SNR value representing the mean of the logistic distribution std: standard deviation of the logistic distribution
Returns
The mean of all SNR values passed through a logistic survival function
Expand source code Browse git
@property def difficulty_score(self, mean=10.0, std=3.0) -> float: """Compute the "difficulty" of the `SampleSet` on a scale of 0 to 1, where 0 is easiest and 1 is hardest. The difficulty of a `SampleSet` is the mean of the individual sample difficulties. Each sample's difficulty is determined by where its signal strength (SNR) falls on the survival function (AKA reliability function, or 1 - CDF) for a logistic distribution. The decision to use this distribution is based on empirical data showing that classifier performance increases in a logistic fashion as signal strength increases. The primary use of this function is for situations where ground truth is unknown and you want to get an idea of how "difficult" it will be for a model to make predictions. Consider the following example: Given a `SampleSet` for which ground truth is unknown and the signal strength for each spectrum is low. A model considering, say, 100+ isotopes will see this `SampleSet` as quite difficult, whereas a model considering 5 isotopes will, being more specialized, see the `SampleSet` as easier. Of course, this makes the assumption that both models were trained to the isotope(s) contained in the test `SampleSet`. Based on the previous example, the unfortunate state of this function is that you must know how to pick means and standard deviations which properly reflect the: - number of target isotopes - amount of variation within each isotope (i.e., shielding, scattering, etc.) - detector resolution A future goal of ours is to provide updates to this function and docstring which make choosing the mean and standard deviation easier/automated based on more of our findings. Arguments: mean: SNR value representing the mean of the logistic distribution std: standard deviation of the logistic distribution Returns: The mean of all SNR values passed through a logistic survival function """ snrs: np.ndarray = self.info.snr.clip(1e-6) score = float(stats.logistic.sf(snrs, loc=mean, scale=std).mean()) return score var ecal-
Get or set the ecal terms.
Expand source code Browse git
@property def ecal(self): """Get or set the ecal terms.""" ecal_terms = self.info[list(self.ECAL_INFO_COLUMNS)].to_numpy(dtype=float) return ecal_terms var info-
Get or set the info DataFrame.
Expand source code Browse git
@property def info(self): """Get or set the info DataFrame.""" return self._info var isotope_names-
Get the names of the isotopes involved in this SampleSet.
Expand source code Browse git
@property def isotope_names(self): """Get the names of the isotopes involved in this SampleSet.""" return self.sources.columns.get_level_values("Isotope") var measured_or_synthetic-
Get or set the value for measured_or_synthetic for the SampleSet object.
Expand source code Browse git
@property def measured_or_synthetic(self): """Get or set the value for measured_or_synthetic for the SampleSet object.""" return self._measured_or_synthetic var n_channels-
Get the number of channels included in the spectra DataFrame. Channels may also be referred to as bins.
Expand source code Browse git
@property def n_channels(self): """Get the number of channels included in the spectra DataFrame. Channels may also be referred to as bins. """ return self._spectra.shape[1] var n_samples-
Get the number of samples included in the spectra dataframe, where each row is a sample.
Expand source code Browse git
@property def n_samples(self): """Get the number of samples included in the spectra dataframe, where each row is a sample. """ return self._spectra.shape[0] var prediction_probas-
Get or set the prediction_probas DataFrame.
Expand source code Browse git
@property def prediction_probas(self): """Get or set the prediction_probas DataFrame.""" return self._prediction_probas var seed_names-
Get the names of the seeds involved in this
SampleSet.Expand source code Browse git
@property def seed_names(self): """Get the names of the seeds involved in this `SampleSet`.""" return self.sources.columns.get_level_values("Seed") var sources-
Get or set the sources
DataFrame.Expand source code Browse git
@property def sources(self): """Get or set the sources `DataFrame`.""" return self._sources var spectra-
Get or set the spectra
DataFrame.Expand source code Browse git
@property def spectra(self): """Get or set the spectra `DataFrame`.""" return self._spectra var spectra_state-
Get or set the spectra state.
Expand source code Browse git
@property def spectra_state(self: SpectraState): """Get or set the spectra state.""" return self._spectra_state var spectra_type-
Get or set the spectra type.
Expand source code Browse git
@property def spectra_type(self: SpectraType): """Get or set the spectra type.""" return self._spectra_type var synthesis_info-
Get or set the information that was used to by a synthesizer.
Expand source code Browse git
@property def synthesis_info(self): """Get or set the information that was used to by a synthesizer.""" return self._synthesis_info
Methods
def all_spectra_sum_to_one(self, rtol: float = 0.0, atol: float = 0.0001) ‑> bool-
Checks if all spectra are normalized to sum to one.
Expand source code Browse git
def all_spectra_sum_to_one(self, rtol: float = 0.0, atol: float = 1e-4) -> bool: """Checks if all spectra are normalized to sum to one.""" spectra_counts = self.spectra.sum(axis=1).values all_sum_to_one = np.all(np.isclose(spectra_counts, 1, rtol=rtol, atol=atol)) return all_sum_to_one def as_ecal(self, new_offset: float, new_gain: float, new_quadratic: float, new_cubic: float, new_low_energy: float) ‑> SampleSet-
Re-bin spectra based on energy by interpolating the current shape from the current binning structure to a new one.
Warning: this operation is likely to add or remove spectral information depending on the new energy calibration values used.
Args
new_offset- new gain value, i.e. the 0-th e-cal term
new_gain- new gain value, i.e. the 1-th e-cal term
new_quadratic- new quadratic value, i.e. the 2-th e-cal term
new_cubic- new cubic value, i.e. the 3-th e-cal term
new_low_energy- new low energy term
Returns
A new
SamleSetwithspectraandinfoDataFramesRaises
ValueErrorwhen no argument values are providedExpand source code Browse git
def as_ecal(self, new_offset: float, new_gain: float, new_quadratic: float, new_cubic: float, new_low_energy: float) -> SampleSet: """Re-bin spectra based on energy by interpolating the current shape from the current binning structure to a new one. Warning: this operation is likely to add or remove spectral information depending on the new energy calibration values used. Args: new_offset: new gain value, i.e. the 0-th e-cal term new_gain: new gain value, i.e. the 1-th e-cal term new_quadratic: new quadratic value, i.e. the 2-th e-cal term new_cubic: new cubic value, i.e. the 3-th e-cal term new_low_energy: new low energy term Returns: A new `SamleSet` with `spectra` and `info` DataFrames Raises: `ValueError` when no argument values are provided """ new_args = [new_offset, new_gain, new_quadratic, new_cubic, new_low_energy] if all(v is None for v in new_args): raise ValueError("At least one argument value must be provided.") ecal_cols = list(self.ECAL_INFO_COLUMNS) new_ecal = self.info[ecal_cols].copy() new_ecal.ecal_order_0 = new_offset new_ecal.ecal_order_1 = new_gain new_ecal.ecal_order_2 = new_quadratic new_ecal.ecal_order_3 = new_cubic new_ecal.ecal_low_e = new_low_energy all_original_channel_energies = self.get_all_channel_energies() all_new_channel_energies = [] fractional_energy_bins = np.linspace(0, 1, self.n_channels) for i in range(self.n_samples): offset, gain, quadratic, cubic, low_energy = new_ecal.iloc[i].values new_channel_energies = self._channels_to_energies( fractional_energy_bins, offset, gain, quadratic, cubic, low_energy ) all_new_channel_energies.append(new_channel_energies) new_spectra = np.zeros([self.n_samples, self.n_channels]) # Perform interpolation for i in range(self.n_samples): f = interp1d( all_original_channel_energies[i], self.spectra.values[i], kind="slinear", fill_value=0, bounds_error=False ) new_spectrum = f(all_new_channel_energies[i]) new_spectra[i] = new_spectrum # Update spectra and info DataFrames new_ss = self[:] new_ss.spectra = pd.DataFrame(new_spectra) new_ss.info.total_counts = new_ss.spectra.sum(axis=1) new_ss.info[ecal_cols] = new_ecal return new_ss def as_regions(self, rois: list) ‑> SampleSet-
Obtains a new
SampleSetwhere the spectra are limited to specific regions of interest (ROIs).Notes
- If your samples have disparate energy calibration terms, call
as_ecal()first to align channel space, then you may call this function. Otherwise, it is possible to end up with a ragged array of spectra, which we do not support. - After this call,
spectrawill have columns filled in with energy values for convenience. As such, in the context of the returnedSampleSet, the energy calibration terms ininfowill no longer have any meaning, and any subsequent calls to methods likeas_ecal()would not make sense. This method is intended as a last step to be performed right before analysis of whatever kind.
Args
rois- a list of 2-tuples where tuple represents (low energy, high energy)
Returns
A new
SamleSetwith only ROIs remaining in thespectraDataFrameRaises
ValueErrorwhen no argument values are providedExpand source code Browse git
def as_regions(self, rois: list) -> SampleSet: """Obtains a new `SampleSet` where the spectra are limited to specific regions of interest (ROIs). Notes: - If your samples have disparate energy calibration terms, call `as_ecal()` first to align channel space, then you may call this function. Otherwise, it is possible to end up with a ragged array of spectra, which we do not support. - After this call, `spectra` will have columns filled in with energy values for convenience. As such, in the context of the returned `SampleSet`, the energy calibration terms in `info` will no longer have any meaning, and any subsequent calls to methods like `as_ecal()` would not make sense. This method is intended as a last step to be performed right before analysis of whatever kind. Args: rois: a list of 2-tuples where tuple represents (low energy, high energy) Returns: A new `SamleSet` with only ROIs remaining in the `spectra` DataFrame Raises: `ValueError` when no argument values are provided """ if not rois: raise ValueError("At least one ROI must be provided.") all_ecals = self.ecal all_ecals_are_same = np.isclose(all_ecals, all_ecals[0]).all() if not all_ecals_are_same: msg = "Spectra have different energy calibrations; consider `as_ecal()` first." raise ValueError(msg) energies = self.get_channel_energies(0) mask = _get_energy_roi_masks(rois, energies) new_spectra = self.spectra.to_numpy(dtype=float)[:, mask] new_spectra = new_spectra.reshape((self.n_samples, -1)) mask_energies = energies[mask] new_ss = self[:] new_ss.spectra = pd.DataFrame(new_spectra, columns=mask_energies) new_ss.info.total_counts = new_ss.spectra.sum(axis=1) return new_ss - If your samples have disparate energy calibration terms, call
def check_seed_health(self, dead_time_threshold=1.0)-
Checks health of all spectra and info assuming they are seeds.
Invalidate states for which we currently check:
- spectra do not sum to 1
- dead time greater than or equal to provided threshold
Args
dead_time_threshold- value at which seed dead time is unacceptable
Raises
AssertionErrorif any check failsExpand source code Browse git
def check_seed_health(self, dead_time_threshold=1.0): """Checks health of all spectra and info assuming they are seeds. Invalidate states for which we currently check: - spectra do not sum to 1 - dead time greater than or equal to provided threshold Args: dead_time_threshold: value at which seed dead time is unacceptable Raises: `AssertionError` if any check fails """ all_spectra_sum_to_one = self.all_spectra_sum_to_one() assert all_spectra_sum_to_one if self.DEAD_TIME_PROP_INFO_KEY not in self.info.columns: self.set_dead_time_proportions() dead_samples = self.info[self.DEAD_TIME_PROP_INFO_KEY] >= dead_time_threshold all_samples_are_alive = not np.any(dead_samples) assert all_samples_are_alive def clip_negatives(self, min_value: float = 0)-
Clip spectrum values to some minimum value.
Args
min_value- value to which to clip existing spectrum values
Expand source code Browse git
def clip_negatives(self, min_value: float = 0): """Clip spectrum values to some minimum value. Args: min_value: value to which to clip existing spectrum values """ self._spectra = pd.DataFrame(data=self._spectra.clip(min_value)) def compare_to(self, ss: SampleSet, n_bins: int = 20, density: bool = False) ‑> Tuple[dict, dict, dict]-
Compare the current
SampleSetto anotherSampleSet.The function only compares the selected, mutual information of each
SampleSetby computing the Jensen-Shannon distance between histograms of that information.Args
ssSampleSetto compare ton_bins- number of bins we will sort both sample sets by
density- whether histograms should be in counts or density
Returns
Tuple of the following:
Expand source code Browse git
def compare_to(self, ss: SampleSet, n_bins: int = 20, density: bool = False) \ -> Tuple[dict, dict, dict]: """Compare the current `SampleSet` to another `SampleSet`. The function only compares the selected, mutual information of each `SampleSet` by computing the Jensen-Shannon distance between histograms of that information. Args: ss: `SampleSet` to compare to n_bins: number of bins we will sort both sample sets by density: whether histograms should be in counts or density Returns: Tuple of the following: - ss1_stats: dictionary of stats describing the first `SampleSet` - ss2_stats: dictionary of stats describing the second `SampleSet` - col_comparisons: dictionary of distance values comparing each stat """ TARGET_SUMMARY_STATS = ["min", "max", "median", "mean", "std"] STAT_PRECISION = 3 # Get info columns we want to compare info_columns = [x for x in list(self.DEFAULT_INFO_COLUMNS) if x not in list(self.DEFAULT_EXCLUSIONS_FROM_COMPARISON)] # Get both sample sets comparable columns of data info_df1 = self.info[info_columns] info_df2 = ss.info[info_columns] # Check valid columns in each sample set (cannot have None or 0) ss1_valid_cols = [ c for c in info_df1.columns if pd.to_numeric(info_df1[c], errors="coerce").notnull().all() and any(info_df1[c]) ] ss2_valid_cols = [ c for c in info_df2.columns if pd.to_numeric(info_df2[c], errors="coerce").notnull().all() and any(info_df2[c]) ] # Remove non shared column lists for i in ss1_valid_cols: if i not in ss2_valid_cols: ss1_valid_cols.remove(i) for i in ss2_valid_cols: if i not in ss1_valid_cols: ss2_valid_cols.remove(i) # Remove non shared column data info_df1 = info_df1[ss1_valid_cols] info_df2 = info_df2[ss2_valid_cols] # Bin the data ss1_stats = {} ss2_stats = {} col_comparisons = {} for i in ss1_valid_cols: ss1_stats[i] = {} ss2_stats[i] = {} hist_range = ( min(min(info_df1[i]), min(info_df2[i])), max(max(info_df1[i]), max(info_df2[i])) ) # Get data in bins and get counts for each bin hist1, bins1 = np.histogram(info_df1[i], bins=n_bins, range=hist_range, density=density) hist2, bins2 = np.histogram(info_df2[i], bins=n_bins, range=hist_range, density=density) ss1_stats[i]["density"] = density ss2_stats[i]["density"] = density ss1_stats[i]["hist"] = hist1 ss2_stats[i]["hist"] = hist2 ss1_stats[i]["bins"] = bins1 ss2_stats[i]["bins"] = bins2 ss1_stats[i]["range"] = hist_range ss2_stats[i]["range"] = hist_range summary_stats_df1 = info_df1[i].agg(TARGET_SUMMARY_STATS).round(decimals=STAT_PRECISION) ss1_stats[i].update(summary_stats_df1.to_dict()) summary_stats_df2 = info_df2[i].agg(TARGET_SUMMARY_STATS).round(decimals=STAT_PRECISION) ss2_stats[i].update(summary_stats_df2.to_dict()) js_dist = distance.jensenshannon(hist1, hist2, axis=0) col_comparisons[i] = js_dist return ss1_stats, ss2_stats, col_comparisons def concat(self, ss_list: list)-
Vertically concatenate multiple
SampleSets into oneSampleSet.Combining of non-DataFrame properties (e.g., detector_info, synthesis_info, etc.) is NOT performed - it is the responsiblity of the user to ensure proper bookkeeping for these properties when concatenating
SampleSets.This method is most applicable to
SampleSets containing measured data from the same detector which could not be made as a singleSampleSetbecause of how measurements had to be taken (i.e., measurements were taken by distinct processes separated by time). Therefore, the user should avoid concatenatingSampleSets when:- the data is from different detectors
(note that
SampleSets are given to models for prediction, and models should be 1-to-1 with physical detectors); - a mix of synthetic and measured data would occur (this could make an existing
synthesis_infovalue ambiguous, but one way to handle this would be to add a new column toinfodistinguishing between synthetic and measured on a per-sample basis).
Args
ss_list- list of
SampleSets to concatenate
Returns
SampleSetobjectExpand source code Browse git
def concat(self, ss_list: list): """Vertically concatenate multiple `SampleSet`s into one `SampleSet`. Combining of non-DataFrame properties (e.g., detector_info, synthesis_info, etc.) is NOT performed - it is the responsiblity of the user to ensure proper bookkeeping for these properties when concatenating `SampleSet`s. This method is most applicable to `SampleSet`s containing measured data from the same detector which could not be made as a single `SampleSet` because of how measurements had to be taken (i.e., measurements were taken by distinct processes separated by time). Therefore, the user should avoid concatenating `SampleSet`s when: - the data is from different detectors (note that `SampleSet`s are given to models for prediction, and models should be 1-to-1 with physical detectors); - a mix of synthetic and measured data would occur (this could make an existing `synthesis_info` value ambiguous, but one way to handle this would be to add a new column to `info` distinguishing between synthetic and measured on a per-sample basis). Args: ss_list: list of `SampleSet`s to concatenate Returns: `SampleSet` object """ if not ss_list: return if not isinstance(ss_list, list) and not isinstance(ss_list, tuple): ss_list = [ss_list] spectra_types = list( set([self.spectra_type.value] + [x.spectra_type.value for x in ss_list]) ) n_spectra_types = len(spectra_types) if n_spectra_types == 2: if SpectraType.Background.value in spectra_types and \ SpectraType.Foreground.value in spectra_types: self.spectra_type = SpectraType.BackgroundForeground else: self.spectra_type = SpectraType.Unknown elif n_spectra_types == 0 or n_spectra_types > 2: self.spectra_type = SpectraType.Unknown self._spectra = pd.concat( [self._spectra] + [ss.spectra for ss in ss_list], ignore_index=True, sort=False ) self._sources = pd.concat( [self._sources] + [ss.sources for ss in ss_list], ignore_index=True, sort=False ) self._sources = self._sources.where(pd.notnull(self._sources), 0) existing_info_df = self._info if not self._info.empty else None self._info = pd.concat( [existing_info_df] + [ss.info for ss in ss_list], ignore_index=True, sort=False ) self._info = self._info.where(pd.notnull(self._info), None) self._prediction_probas = pd.concat( [self._prediction_probas] + [ss.prediction_probas for ss in ss_list], ignore_index=True, sort=False ) self._prediction_probas = self._prediction_probas.where( pd.notnull(self._prediction_probas), 0 ) - the data is from different detectors
(note that
def downsample_spectra(self, target_bins: int = 128, min_frac=1e-08)-
Uniformly down-bin spectra.
Args
target_bins- number of channels to which to bin the existing spectra
Raises
ValueErrorwhen binning is not a multiple of the target binningExpand source code Browse git
def downsample_spectra(self, target_bins: int = 128, min_frac=1e-8): """Uniformly down-bin spectra. Args: target_bins: number of channels to which to bin the existing spectra Raises: `ValueError` when binning is not a multiple of the target binning """ if self.n_channels % target_bins == 0: n_per_group = int(self.n_channels / target_bins) transformation = np.zeros([target_bins, self.n_channels]) for i in range(target_bins): transformation[i, (i * n_per_group):((i * n_per_group) + n_per_group)] = 1 else: msg = "Current binning ({}) is not a multiple of the target binning ({})".format( self.n_channels, target_bins ) raise ValueError(msg) self._spectra = pd.DataFrame( data=np.matmul( self._spectra.values, transformation.T ) ) self._spectra[self._spectra < min_frac] = 0 def drop_sources(self, col_names: Iterable = ['Cosmic', 'PotassiumInSoil', 'UraniumInSoil', 'ThoriumInSoil'], normalize_sources: bool = True, target_level: str = 'Seed')-
Drop columns from
sources.Args
col_names- names of the sources columns to be dropped. The names of background seeds are used by default as removing the ground truth for background sources when dealing with synthetic, gross spectra is a common operation.
normalize_sources- whether to normalize the sources DataFrame after dropping the column(s)
target_levelSampleSet.sourcescolumn level to use
Expand source code Browse git
def drop_sources(self, col_names: Iterable = DEFAULT_BG_SEED_NAMES, normalize_sources: bool = True, target_level: str = "Seed"): """Drop columns from `sources`. Args: col_names: names of the sources columns to be dropped. The names of background seeds are used by default as removing the ground truth for background sources when dealing with synthetic, gross spectra is a common operation. normalize_sources: whether to normalize the sources DataFrame after dropping the column(s) target_level: `SampleSet.sources` column level to use """ self.sources.drop(col_names, axis=1, inplace=True, level=target_level) if normalize_sources: self.normalize_sources() def drop_sources_columns_with_all_zeros(self)-
Remove columns from the sources
DataFramethat contain only zeros.Modifications are made in-place.
Expand source code Browse git
def drop_sources_columns_with_all_zeros(self): """Remove columns from the sources `DataFrame` that contain only zeros. Modifications are made in-place. """ idxs = (self.sources != 0).any(axis=0) self.sources = self.sources.loc[:, idxs] self.sources.columns = self.sources.columns.remove_unused_levels() return idxs def drop_spectra_with_no_contributors(self) ‑> numpy.ndarray-
Remove samples where the spectrum has no recorded contributor.
Modifications are inplace.
Returns
A boolean mask of which rows were kept
Expand source code Browse git
def drop_spectra_with_no_contributors(self) -> np.ndarray: """Remove samples where the spectrum has no recorded contributor. Modifications are inplace. Returns: A boolean mask of which rows were kept """ idxs = np.where(self.sources.values.sum(axis=1) == 0)[0] kept_mask = ~np.in1d(self.sources.index.values, idxs) self.spectra = self.spectra.drop(index=idxs).reset_index(drop=True) self.sources = self.sources.drop(index=idxs).reset_index(drop=True) self.info = self.info.drop(index=idxs).reset_index(drop=True) return kept_mask def extend(self, spectra: Union[dict, list, np.array, pd.DataFrame], sources: pd.DataFrame, info: Union[list, pd.DataFrame])-
Extend the current SampleSet with the given data.
Always verify that the data was appended in the way you expect based on what input type you are preferring to work with.
Args
spectra- spectra to append to the current spectra DataFrame
sources- sources to append to the current sources DataFrame
info- info to append to the current info DataFrame
Raises
ValueErrorwhen internalDataFramelengths do not matchExpand source code Browse git
def extend(self, spectra: Union[dict, list, np.array, pd.DataFrame], sources: pd.DataFrame, info: Union[list, pd.DataFrame]): """Extend the current SampleSet with the given data. Always verify that the data was appended in the way you expect based on what input type you are preferring to work with. Args: spectra: spectra to append to the current spectra DataFrame sources: sources to append to the current sources DataFrame info: info to append to the current info DataFrame Raises: `ValueError` when internal `DataFrame` lengths do not match """ if not spectra.shape[0] == sources.shape[0] == info.shape[0]: msg = "The number of rows in each of the required positional arguments must be same." raise ValueError(msg) self._spectra = self._spectra\ .append(pd.DataFrame(spectra), ignore_index=True, sort=True)\ .fillna(0) new_sources_column_index = pd.MultiIndex.from_tuples( sources.keys(), names=SampleSet.SOURCES_MULTI_INDEX_NAMES ) new_sources_df = pd.DataFrame(sources.values(), columns=new_sources_column_index) self._sources = self._sources\ .append(new_sources_df, ignore_index=True, sort=True)\ .fillna(0) self._info = self._info\ .append(pd.DataFrame(info), ignore_index=True, sort=True) def get_all_channel_energies(self, fractional_energy_bins=None) ‑> numpy.ndarray-
Get the energy (in keV) represented by the lower edge of each channel for all samples.
See docstring for
_channels_to_energies()for more details.Args
fractional_energy_bins- array of fractions representing the spacing of the channels (for arbitrary channel structures)
Returns
2-D array of energy values in keV for all samples
Expand source code Browse git
def get_all_channel_energies(self, fractional_energy_bins=None) -> np.ndarray: """Get the energy (in keV) represented by the lower edge of each channel for all samples. See docstring for `_channels_to_energies()` for more details. Args: fractional_energy_bins: array of fractions representing the spacing of the channels (for arbitrary channel structures) Returns: 2-D array of energy values in keV for all samples """ if fractional_energy_bins is None: fractional_energy_bins = np.linspace(0, 1, self.n_channels) all_channel_energies = [] for i in range(self.n_samples): channel_energies = self.get_channel_energies(i, fractional_energy_bins) all_channel_energies.append(channel_energies) return all_channel_energies def get_channel_energies(self, sample_index, fractional_energy_bins=None) ‑> numpy.ndarray-
Get the energy (in keV) represented by the lower edge of each channel for a single sample at a specific index.
See docstring for
_channels_to_energies()for more details.Args
sample_index- index of the specific sample for which to obtain energies
fractional_energy_bins- array of fractions representing the spacing of the channels (for arbitrary channel structures)
Returns
Array of energy values in keV
Expand source code Browse git
def get_channel_energies(self, sample_index, fractional_energy_bins=None) -> np.ndarray: """Get the energy (in keV) represented by the lower edge of each channel for a single sample at a specific index. See docstring for `_channels_to_energies()` for more details. Args: sample_index: index of the specific sample for which to obtain energies fractional_energy_bins: array of fractions representing the spacing of the channels (for arbitrary channel structures) Returns: Array of energy values in keV """ if fractional_energy_bins is None: fractional_energy_bins = np.linspace(0, 1, self.n_channels) # TODO: raise error if ecal info is missing for any row offset, gain, quadratic, cubic, low_energy = self.ecal[sample_index] channel_energies = self._channels_to_energies( fractional_energy_bins, offset, gain, quadratic, cubic, low_energy ) return channel_energies def get_confidences(self, fg_seeds_ss: SampleSet, bg_seed_ss: SampleSet = None, bg_cps: float = None, is_lpe: bool = False, confidence_func: Callable = None, **confidence_func_kwargs) ‑> numpy.ndarray-
Get confidence measure for predictions as a NumPy array.
Args
fg_seeds_ssSamplesetcontaining foreground seedsbg_seed_ssSamplesetcontaining a single background seedbg_cps- count rate used to scale
bg_seed_ss is_lpe- whether predictions are for multi-class classification or label proportion estimation (LPE)
confidence_func- metric function used to compare spectral reconstructions and
confidence_func_kwargs- kwargs for confidence metric function
Returns
Array containing confidence metric for each prediction
Expand source code Browse git
def get_confidences(self, fg_seeds_ss: SampleSet, bg_seed_ss: SampleSet = None, bg_cps: float = None, is_lpe: bool = False, confidence_func: Callable = None, **confidence_func_kwargs) -> np.ndarray: """Get confidence measure for predictions as a NumPy array. Args: fg_seeds_ss: `Sampleset` containing foreground seeds bg_seed_ss: `Sampleset` containing a single background seed bg_cps: count rate used to scale `bg_seed_ss` is_lpe: whether predictions are for multi-class classification or label proportion estimation (LPE) confidence_func: metric function used to compare spectral reconstructions and confidence_func_kwargs: kwargs for confidence metric function Returns: Array containing confidence metric for each prediction """ if fg_seeds_ss.spectra_type != SpectraType.Foreground: msg = ( "`fg_seeds_ss` must have a `spectra_type` of `Foreground`. " f"Its `spectra_type` is `{fg_seeds_ss.spectra_type}`." ) raise ValueError(msg) if self.spectra_type == SpectraType.Gross: if self.info.total_counts.isna().any() or (self.info.total_counts <= 0).any(): msg = ( "This `SampleSet` must have the `info.total_counts` column filled in " "with values greater than zero." "If there are samples with spectra of all zeros, remove them first." ) raise ValueError(msg) if self.info.live_time.isna().any() or (self.info.live_time <= 0).any(): msg = ( "This `SampleSet` must have the `info.live_time` column filled in " "with values greater than zero." "If there are samples with live of zero, remove them first." ) raise ValueError(msg) if not bg_seed_ss: raise ValueError("`bg_seed_ss` is required for current `spectra_type` of `Gross`.") if bg_seed_ss.n_samples != 1: raise ValueError("`bg_seed_ss` must have exactly one sample.") if bg_seed_ss.spectra_type != SpectraType.Background: msg = ( "`bg_seed_ss` must have a `spectra_type` of `Background`. " f"Its `spectra_type` is `{bg_seed_ss.spectra_type}`." ) raise ValueError(msg) if not bg_cps or bg_cps <= 0: raise ValueError("Positive background count rate required.") if confidence_func is None: from riid.losses import jensen_shannon_divergence confidence_func = jensen_shannon_divergence if is_lpe: recon_proportions = self.prediction_probas.values else: max_indices = np.argmax(self.prediction_probas.values, axis=1) recon_proportions = np.zeros(self.prediction_probas.values.shape) recon_proportions[np.arange(self.n_samples), max_indices] = 1 reconstructions = np.dot(recon_proportions, fg_seeds_ss.spectra.values) if self.spectra_type == SpectraType.Gross: bg_spectrum = bg_seed_ss.spectra.iloc[0].values normalized_bg_spectrum = bg_spectrum / bg_spectrum.sum() bg_counts = bg_cps * self.info.live_time.values fg_counts = (self.info.total_counts.values - bg_counts).clip(1) scaled_bg_spectra = get_expected_spectra(normalized_bg_spectrum, bg_counts) scaled_fg_spectra = reconstructions * fg_counts[:, np.newaxis] reconstructions = scaled_fg_spectra + scaled_bg_spectra confidences = confidence_func( reconstructions.astype(np.float64), self.spectra.values, **confidence_func_kwargs ) confidences = np.array(confidences) return confidences def get_labels(self, target_level='Isotope', max_only=True, include_value=False, min_value=0.01, level_aggregation=None)-
Get row ground truth labels for each sample based on
sourcesvalues.See docstring for
_get_row_labels()for more details.Expand source code Browse git
def get_labels(self, target_level="Isotope", max_only=True, include_value=False, min_value=0.01, level_aggregation=None): """Get row ground truth labels for each sample based on `sources` values. See docstring for `_get_row_labels()` for more details. """ labels = _get_row_labels( self.sources, target_level=target_level, max_only=max_only, include_value=include_value, min_value=min_value, level_aggregation=level_aggregation, ) return labels def get_multiclass_jsds(self, fg_seeds_ss: SampleSet, target_level: str) ‑> list-
For each sample, this constructs a dictionary containing seed name to JSD using the sample's top prediction.
Args
fg_seeds_ssSampleSetfrom which to pull seeds for computing JSDprediction_target_level- the level at which predictions were made by a model
Returns
List of dictionaries where keys are seed name and values are JSD
Expand source code Browse git
def get_multiclass_jsds(self, fg_seeds_ss: SampleSet, target_level: str) -> list: """For each sample, this constructs a dictionary containing seed name to JSD using the sample's top prediction. Args: fg_seeds_ss: `SampleSet` from which to pull seeds for computing JSD prediction_target_level: the level at which predictions were made by a model Returns: List of dictionaries where keys are seed name and values are JSD """ test_prediction_labels = self.get_predictions(target_level) fg_seed_labels = fg_seeds_ss.get_labels(target_level) jsds = [] for i, pred_label in enumerate(test_prediction_labels): test_spectrum = self.spectra.iloc[i] seeds_for_pred = fg_seeds_ss[fg_seed_labels == pred_label] seed_labels_for_pred = seeds_for_pred.get_labels("Seed") jsds_for_sample = {} for j, seed_label in enumerate(seed_labels_for_pred): seed_spectrum = seeds_for_pred.spectra.iloc[j] jsd = distance.jensenshannon(test_spectrum, seed_spectrum) jsds_for_sample[seed_label] = jsd jsds.append(jsds_for_sample) return jsds def get_predictions(self, target_level='Isotope', max_only=True, include_value=False, min_value=0.01, level_aggregation=None)-
Get row predictions for each spectrum based on
prediction_probasvalues.See docstring for
_get_row_labels()for more details.Expand source code Browse git
def get_predictions(self, target_level="Isotope", max_only=True, include_value=False, min_value=0.01, level_aggregation=None): """Get row predictions for each spectrum based on `prediction_probas` values. See docstring for `_get_row_labels()` for more details. """ labels = _get_row_labels( self.prediction_probas, target_level=target_level, max_only=max_only, include_value=include_value, min_value=min_value, level_aggregation=level_aggregation, ) return labels def get_samples(self) ‑> numpy.ndarray-
Get
spectraas a NumPy array.Replaces NaN values with 0.
Returns
Array containing spectrum data
Expand source code Browse git
def get_samples(self) -> np.ndarray: """Get `spectra` as a NumPy array. Replaces NaN values with 0. Returns: Array containing spectrum data """ spectra_values = np.nan_to_num(self._spectra) return spectra_values def get_source_contributions(self, target_level='Isotope') ‑> numpy.ndarray-
Get
sourcesas a NumPy array.Replaces NaN values with 0.
Returns
Array containing the ground truth contributions for each sample
Expand source code Browse git
def get_source_contributions(self, target_level="Isotope") -> np.ndarray: """Get `sources` as a NumPy array. Replaces NaN values with 0. Returns: Array containing the ground truth contributions for each sample """ collapsed_sources = self.sources.T.groupby(target_level).sum().T sources_values = np.nan_to_num(collapsed_sources) return sources_values def get_spectral_distance_matrix(self, distance_func=<function jensenshannon>, target_level='Seed') ‑> pandas.core.frame.DataFrame-
Compute the distance between all pairs of spectra.
This method is intended for use on seed spectra (i.e., templates). Calling this method on large samplesets, such as those produced by the static synthesizer, is not recommended. This method only computes the upper triangle and diagonal of the matrix since the lower triangle would be a copy of the upper triangle. The diagonal is computed, for when Poisson noise is present, for two main reasons:
- the distance between the same source will effectively never be zero.
- there is sometimes research interest in comparing the diagonal to the upper triangle.
Args
distance_func- specific distance function to use
target_levelSampleSet.sourcescolumn level to use
Returns
A 2-D array of distance values
Expand source code Browse git
def get_spectral_distance_matrix(self, distance_func=distance.jensenshannon, target_level="Seed") -> pd.DataFrame: """Compute the distance between all pairs of spectra. This method is intended for use on seed spectra (i.e., templates). Calling this method on large samplesets, such as those produced by the static synthesizer, is not recommended. This method only computes the upper triangle and diagonal of the matrix since the lower triangle would be a copy of the upper triangle. The diagonal is computed, for when Poisson noise is present, for two main reasons: - the distance between the same source will effectively never be zero. - there is sometimes research interest in comparing the diagonal to the upper triangle. Args: distance_func: specific distance function to use target_level: `SampleSet.sources` column level to use Returns: A 2-D array of distance values """ distance_values = self._get_spectral_distances(distance_func) row_labels = self.get_labels(target_level=target_level) col_labels = self.sources.columns.get_level_values(level=target_level) distance_df = _get_distance_df_from_values(distance_values, row_labels, col_labels) return distance_df def normalize(self, p: float = 1, clip_negatives: bool = True)-
Apply L-p normalization to
spectrain place.Default is L1 norm (p=1) can be interpreted as a forming a probability mass function (PMF). L2 norm (p=2) is unit energy (sum of squares == 1).
Reference: https://en.wikipedia.org/wiki/Parseval's_theorem
Args
p- exponent to which the spectra values are raised
clip_negatives- whether negative values will be removed from the spectra and replaced with 0
Expand source code Browse git
def normalize(self, p: float = 1, clip_negatives: bool = True): """Apply L-p normalization to `spectra` in place. Default is L1 norm (p=1) can be interpreted as a forming a probability mass function (PMF). L2 norm (p=2) is unit energy (sum of squares == 1). Reference: https://en.wikipedia.org/wiki/Parseval's_theorem Args: p: exponent to which the spectra values are raised clip_negatives: whether negative values will be removed from the spectra and replaced with 0 """ if clip_negatives: self.clip_negatives() elif (self._spectra.values < 0).sum(): logging.warning( "You are performing a normalization operation on spectra which contain negative " + "values. Consider applying `clip_negatives`." ) energy = (self._spectra.values ** p).sum(axis=1)[:, np.newaxis] energy[energy == 0] = 1 self._spectra = pd.DataFrame(data=self._spectra.values / energy ** (1/p)) if p == 1: self.spectra_state = SpectraState.L1Normalized elif p == 2: self.spectra_state = SpectraState.L2Normalized else: self.spectra_state = SpectraState.Unknown def normalize_sources(self)-
Normalize
sourcessuch that rows sum to 1.Expand source code Browse git
def normalize_sources(self): """Normalize `sources` such that rows sum to 1.""" row_sums = self._sources.sum(axis=1) row_sums[row_sums == 0] = 1 self._sources = self._sources.clip(0).divide( row_sums, axis=0, ) def replace_nan(self, replace_value: float = 0)-
Replace np.nan values.
Args
replace_value- value with which NaN will be replaced
Expand source code Browse git
def replace_nan(self, replace_value: float = 0): """Replace np.nan values. Args: replace_value: value with which NaN will be replaced """ self._spectra.replace(np.nan, replace_value) self._sources.replace(np.nan, replace_value) self._info.replace(np.nan, replace_value) def sample(self, n_samples: int, random_seed: int = None) ‑> SampleSet-
Randomly sample the SampleSet.
Args
n_samples- number of random samples to be returned
random_seed- seed value for random number generation
Returns
Samplesetof randomly selected measurementsExpand source code Browse git
def sample(self, n_samples: int, random_seed: int = None) -> SampleSet: """Randomly sample the SampleSet. Args: n_samples: number of random samples to be returned random_seed: seed value for random number generation Returns: `Sampleset` of randomly selected measurements """ if random_seed: random.seed(random_seed) if n_samples > self.n_samples: n_samples = self.n_samples random_indices = random.sample(self.spectra.index.values.tolist(), n_samples) random_mask = np.isin(self.spectra.index, random_indices) return self[random_mask] def set_dead_time_proportions(self)-
Computes dead time proportion for each sample and saves it in the
SampleSetinfo.Expand source code Browse git
def set_dead_time_proportions(self): """Computes dead time proportion for each sample and saves it in the `SampleSet` info. """ dead_time_props = self._get_dead_time_proportions() self.info[self.DEAD_TIME_PROP_INFO_KEY] = dead_time_props def shuffle(self, inplace: bool = True, random_state: int = None) ‑> SampleSet-
Randomly reorder all DataFrames.
Args
inplace- whether to perform the operation in-place
random_state- random seed for reproducing specific shuffles
Returns
SampleSetobject wheninplaceis FalseExpand source code Browse git
def shuffle(self, inplace: bool = True, random_state: int = None) -> SampleSet: """Randomly reorder all DataFrames. Args: inplace: whether to perform the operation in-place random_state: random seed for reproducing specific shuffles Returns: `SampleSet` object when `inplace` is False """ new_row_ordering = np.arange(self.n_samples) np.random.default_rng(random_state).shuffle(new_row_ordering) new_ss = self if inplace else self[:] new_ss.spectra = self.spectra.iloc[new_row_ordering]\ .reset_index(drop=True) new_ss.sources = self.sources.iloc[new_row_ordering]\ .reset_index(drop=True) new_ss.info = self.info.iloc[new_row_ordering]\ .reset_index(drop=True) if not new_ss.prediction_probas.empty: new_ss.prediction_probas = self.prediction_probas.iloc[new_row_ordering]\ .reset_index(drop=True) if not inplace: return new_ss def sources_columns_to_dict(self, target_level='Isotope') ‑> Union[dict, list]-
Convert the MultiIndex columns of
sourcesto a dictionary.Note: depending on
target_levelandsourcescolumns, duplicate values are possible.Args
target_levelSampleSet.sourcescolumn level to use
Returns
If
target_levelis "Category" or "Isotope," then a dict is returned. Iftarget_levelis "Seed," then a list is returned.Raises
ValueErrorwhentarget_levelis invalidExpand source code Browse git
def sources_columns_to_dict(self, target_level="Isotope") -> Union[dict, list]: """Convert the MultiIndex columns of `sources` to a dictionary. Note: depending on `target_level` and `sources` columns, duplicate values are possible. Args: target_level: `SampleSet.sources` column level to use Returns: If `target_level` is "Category" or "Isotope," then a dict is returned. If `target_level` is "Seed," then a list is returned. Raises: `ValueError` when `target_level` is invalid """ self._check_target_level( target_level, levels_allowed=SampleSet.SOURCES_MULTI_INDEX_NAMES ) d = {} column_tuples = self.sources.columns.to_list() if target_level == "Seed": d = column_tuples elif target_level == "Isotope": for t in column_tuples: _, i, _ = t if i not in d: d[i] = [t] if t not in d[i]: d[i].append(t) else: # target_level == "Category": for t in column_tuples: c, i, _ = t if c not in d: d[c] = {} if i not in d[c]: d[c][i] = [t] if t not in d[c][i]: d[c][i].append(t) return d def split_fg_and_bg(self, bg_seed_names: Iterable = ['Cosmic', 'PotassiumInSoil', 'UraniumInSoil', 'ThoriumInSoil']) ‑> Tuple[SampleSet, SampleSet]-
Split the current
SampleSetinto two newSampleSets, one containing only foreground sources and the other containing only background sources.Foreground sources are assumed to be anything that is not designated as a background source.
Args
bg_seeds_names- names of the seeds which are considered background sources. This list be customized to also extract atypical background sources such as calibration sources.
Returns
Two
SampleSets, the first containing only foregrounds and the second only containing backgrounds.Expand source code Browse git
def split_fg_and_bg(self, bg_seed_names: Iterable = DEFAULT_BG_SEED_NAMES) \ -> Tuple[SampleSet, SampleSet]: """Split the current `SampleSet` into two new `SampleSet`s, one containing only foreground sources and the other containing only background sources. Foreground sources are assumed to be anything that is not designated as a background source. Args: bg_seeds_names: names of the seeds which are considered background sources. This list be customized to also extract atypical background sources such as calibration sources. Returns: Two `SampleSet`s, the first containing only foregrounds and the second only containing backgrounds. """ seed_labels = self.get_labels(target_level="Seed") bg_mask = seed_labels.isin(bg_seed_names) fg_seeds_ss = self[~bg_mask] fg_seeds_ss.drop_sources_columns_with_all_zeros() fg_seeds_ss.spectra_type = SpectraType.Foreground fg_seeds_ss.spectra_state = self.spectra_state bg_seeds_ss = self[bg_mask] bg_seeds_ss.drop_sources_columns_with_all_zeros() bg_seeds_ss.spectra_type = SpectraType.Background bg_seeds_ss.spectra_state = self.spectra_state return fg_seeds_ss, bg_seeds_ss def squash(self) ‑> SampleSet-
Combine all rows of the
SampleSetinto a single row.All data suited for summing is summed, otherwise the info from the first sample is used.
Returns
A flattened
SampleSetExpand source code Browse git
def squash(self) -> SampleSet: """Combine all rows of the `SampleSet` into a single row. All data suited for summing is summed, otherwise the info from the first sample is used. Returns: A flattened `SampleSet` """ flat_spectra = self.spectra.sum(axis=0).to_frame().T flat_sources = self.sources.sum(axis=0).to_frame().T flat_prediction_probas = self.prediction_probas.sum(axis=0).to_frame().T flat_info = self.info.iloc[0].to_frame().T if "description" in flat_info: flat_info.description = "squashed" if "live_time" in flat_info: flat_info.live_time = self.info.live_time.sum() if "real_time" in flat_info: flat_info.real_time = self.info.real_time.sum() if "total_counts" in flat_info: flat_info.total_counts = self.info.total_counts.sum() if "snr" in flat_info: flat_info.snr = self.info.snr.sum() # Create a new SampleSet with the flattened data flat_ss = SampleSet() flat_ss.spectra = flat_spectra flat_ss.sources = flat_sources flat_ss.prediction_probas = flat_prediction_probas flat_ss.info = flat_info # Copy dictionary and other non-DataFrame objects from full ss flat_ss._detector_info = self._detector_info flat_ss._synthesis_info = self._synthesis_info flat_ss._measured_or_synthetic = self._measured_or_synthetic return flat_ss def to_hdf(self, path: str | Path, verbose=False, **kwargs)-
Write the
SampleSetto disk as a HDF file.Args
path- file path at which to save as an HDF file
verbose- whether to display detailed output
kwargs- additional arguments to be passed to the
Pandas.HDFStoreconstructor
Raises
ValueErrorwhen provided path extension is invalidExpand source code Browse git
def to_hdf(self, path: str | Path, verbose=False, **kwargs): """Write the `SampleSet` to disk as a HDF file. Args: path: file path at which to save as an HDF file verbose: whether to display detailed output kwargs: additional arguments to be passed to the `Pandas.HDFStore` constructor Raises: `ValueError` when provided path extension is invalid """ path = Path(path) if path.suffix != riid.SAMPLESET_HDF_FILE_EXTENSION: logging.warning(f"Path does not end in {riid.SAMPLESET_HDF_FILE_EXTENSION}") _write_hdf(self, path, **kwargs) if verbose: logging.info(f"Saved SampleSet to '{path}'") def to_json(self, path: str | Path, verbose=False)-
Write the
SampleSetto disk as a JSON file.Warning: it is not recommended that you use this on large
SampleSetobjects. ConsiderSampleSet.to_hdf()instead in such cases.Args
path- file path at which to save as an HDF file
verbose- whether to display detailed output
Raises
ValueErrorwhen provided path extension is invalidExpand source code Browse git
def to_json(self, path: str | Path, verbose=False): """Write the `SampleSet` to disk as a JSON file. Warning: it is not recommended that you use this on large `SampleSet` objects. Consider `SampleSet.to_hdf()` instead in such cases. Args: path: file path at which to save as an HDF file verbose: whether to display detailed output Raises: `ValueError` when provided path extension is invalid """ path = Path(path) if path.suffix != riid.SAMPLESET_JSON_FILE_EXTENSION: logging.warning(f"Path does not end in {riid.SAMPLESET_JSON_FILE_EXTENSION}") _write_json(self, path) if verbose: logging.info(f"Saved SampleSet to '{path}'") def to_pcf(self, path: str | Path, verbose=False)-
Write the
SampleSetto disk as a PCF.Args
path- file path at which to save as a PCF
verbose- whether to display detailed output
Raises
ValueErrorwhen provided path extension is invalidExpand source code Browse git
def to_pcf(self, path: str | Path, verbose=False): """Write the `SampleSet` to disk as a PCF. Args: path: file path at which to save as a PCF verbose: whether to display detailed output Raises: `ValueError` when provided path extension is invalid """ path = Path(path) if path.suffix != riid.PCF_FILE_EXTENSION: logging.warning(f"Path does not end in {riid.PCF_FILE_EXTENSION}") _dict_to_pcf(_ss_to_pcf_dict(self, verbose), path, verbose) if verbose: logging.info(f"Saved SampleSet to '{path}'") def update_timestamp(self)-
Set the timestamp for all samples to the current UTC date and time.
Expand source code Browse git
def update_timestamp(self): """Set the timestamp for all samples to the current UTC date and time.""" self.info.timestamp = _get_utc_timestamp() def upsample_spectra(self, target_bins: int = 4096)-
Uniformly up-bin spectra.
Args
target_bins- number of bins into which the current spectra should be split.
Expand source code Browse git
def upsample_spectra(self, target_bins: int = 4096): """Uniformly up-bin spectra. Args: target_bins: number of bins into which the current spectra should be split. """ if target_bins % self.n_channels == 0: n_per_group = int(target_bins / self.n_channels) transformation = np.zeros([target_bins, self.n_channels]) for i in range(self.n_channels): transformation[(i * n_per_group):((i * n_per_group) + n_per_group), i] = \ 1.0 / n_per_group else: raise ValueError( ("Desired number of bins ({}) is cannot" " be uniformly mapped from existing" " channels ({})").format(target_bins, self.n_channels)) self._spectra = pd.DataFrame( data=np.matmul( self._spectra.values, transformation.T))
class SeedMixer (seeds_ss: SampleSet, mixture_size: int = 2, dirichlet_alpha: float = 2.0, restricted_isotope_pairs: List[Tuple[str, str]] = [], rng: numpy.random._generator.Generator = None)-
Randomly mixes seeds in a
SampleSetArgs
seeds_ssSampleSetofnseed spectra wheren>=mixture_sizemixture_size- number of templates to mix
dirichlet_alpha- Dirichlet parameter controlling the nature of proportions
restricted_pairs- list of 2-tuples containing pairs of isotope strings that are not to be mixed together
rng- NumPy random number generator, useful for experiment repeatability
Raises
AssertionError when
mixture_sizeis less than 2Expand source code Browse git
class SeedMixer(): """Randomly mixes seeds in a `SampleSet` """ def __init__(self, seeds_ss: SampleSet, mixture_size: int = 2, dirichlet_alpha: float = 2.0, restricted_isotope_pairs: List[Tuple[str, str]] = [], rng: Generator = None): """ Args: seeds_ss: `SampleSet` of `n` seed spectra where `n` >= `mixture_size` mixture_size: number of templates to mix dirichlet_alpha: Dirichlet parameter controlling the nature of proportions restricted_pairs: list of 2-tuples containing pairs of isotope strings that are not to be mixed together rng: NumPy random number generator, useful for experiment repeatability Raises: AssertionError when `mixture_size` is less than 2 """ assert mixture_size >= 2 self.seeds_ss = seeds_ss self.mixture_size = mixture_size self.dirichlet_alpha = dirichlet_alpha self.restricted_isotope_pairs = restricted_isotope_pairs if rng is None: self.rng = np.random.default_rng() else: self.rng = rng self._check_seeds() def _check_seeds(self, skip_health_check: bool = False): if not skip_health_check: self.seeds_ss.check_seed_health() n_sources_per_row = np.count_nonzero( self.seeds_ss.get_source_contributions(), axis=1 ) if not np.all(n_sources_per_row == 1): raise ValueError("At least one provided seed contains a mixture of sources.") if np.any(np.count_nonzero(self.seeds_ss.get_source_contributions(), axis=1) == 0): raise ValueError("At least one provided seed contains no ground truth.") for ecal_column in self.seeds_ss.ECAL_INFO_COLUMNS: all_ecal_columns_close_to_one = np.all(np.isclose( self.seeds_ss.info[ecal_column], self.seeds_ss.info[ecal_column][0] )) if not all_ecal_columns_close_to_one: raise ValueError(( f"{ecal_column} is not consistent. " "All seeds must have the same energy calibration." )) def __call__(self, n_samples: int, max_batch_size: int = 100, skip_health_check: bool = False) -> Iterator[SampleSet]: """Yields batches of seeds one at a time until a specified number of samples has been reached. Dirichlet intuition: - Higher alpha: values will converge on 1/N where N is mixture size - Lower alpha: values will converge on ~0 but there will be a single 1 Using `np.random.dirichlet` with too small of an alpha will result in NaNs (per https://github.com/rust-random/rand/pull/1209) Using `numpy.random.Generator.dirichlet` instead avoids this. TODO: seed-level restrictions Args: n_samples: total number of mixture seeds to produce across all batches max_batch_size: maximum size of a batch per yield skip_health_check: whether to skip the seed health check Returns: Generator of `SampleSet`s """ self._check_seeds(skip_health_check) isotope_to_seeds = self.seeds_ss.sources_columns_to_dict(target_level="Isotope") isotopes = list(isotope_to_seeds.keys()) seeds = list(isotope_to_seeds.values()) # not necessarily distinct seeds = [item for sublist in seeds for item in sublist] n_seeds = len(seeds) n_distinct_seeds = len(set(seeds)) if n_distinct_seeds != n_seeds: raise ValueError("Seed names must be unique.") isotope_probas = list([len(isotope_to_seeds[i]) / n_seeds for i in isotopes]) spectra_row_labels = self.seeds_ss.sources.idxmax(axis=1) restricted_isotope_bidict = bidict({k: v for k, v in self.restricted_isotope_pairs}) seed_spectra_df = self.seeds_ss.spectra.copy() seed_spectra_df.index = spectra_row_labels try: _ = iter(self.dirichlet_alpha) except TypeError: seed_to_alpha = {s: self.dirichlet_alpha for s in seeds} else: if n_seeds != len(self.dirichlet_alpha): raise ValueError("Number of Dirichlet alphas does not equal the number of seeds.") seed_to_alpha = {s: a for s, a in zip(seeds, self.dirichlet_alpha)} n_samples_produced = 0 while n_samples_produced < n_samples: batch_size = n_samples - n_samples_produced if batch_size > max_batch_size: batch_size = max_batch_size # Make batch isotope_choices = [ get_choices( [], isotopes.copy(), np.array(isotope_probas.copy()), restricted_isotope_bidict, self.mixture_size, self.rng, ) for _ in range(batch_size) ] seed_choices = [ [isotope_to_seeds[i][self.rng.choice(len(isotope_to_seeds[i]))] for i in c] for c in isotope_choices ] batch_dirichlet_alphas = np.array([ [seed_to_alpha[i] for i in s] for s in seed_choices ]) seed_ratios = [ self.rng.dirichlet( alpha=alpha ) for alpha in batch_dirichlet_alphas ] # Compute the spectra spectra = np.array([ (seed_ratios[i] * seed_spectra_df.loc[m].T).sum(axis=1) for i, m in enumerate(seed_choices) ]) # Build SampleSet batch_ss = SampleSet() batch_ss.detector_info = self.seeds_ss.detector_info batch_ss.spectra_state = self.seeds_ss.spectra_state batch_ss.spectra_type = self.seeds_ss.spectra_type batch_ss.spectra = pd.DataFrame(spectra) batch_ss.info = pd.DataFrame( [self.seeds_ss.info.iloc[0].values] * batch_size, columns=self.seeds_ss.info.columns ) batch_sources_dfs = [] for r, s in zip(seed_ratios, seed_choices): sources_cols = pd.MultiIndex.from_tuples( s, names=SampleSet.SOURCES_MULTI_INDEX_NAMES, ) sources_df = pd.DataFrame([r], columns=sources_cols) batch_sources_dfs.append(sources_df) sources_df = pd\ .concat(batch_sources_dfs)\ .fillna(0.0) batch_ss.sources = sources_df.reindex( columns=self.seeds_ss.sources.columns, fill_value=0.0 ) n_samples_produced += batch_size yield batch_ss def generate(self, n_samples: int, max_batch_size: int = 100, skip_health_check: bool = False) -> SampleSet: """Computes random mixtures of seeds at the isotope level. """ batches = [] batch_iterable = self( n_samples, max_batch_size=max_batch_size, skip_health_check=skip_health_check ) for batch_ss in batch_iterable: batches.append(batch_ss) mixtures_ss = SampleSet() mixtures_ss.spectra_type = self.seeds_ss.spectra_type mixtures_ss.concat(batches) return mixtures_ssMethods
def generate(self, n_samples: int, max_batch_size: int = 100, skip_health_check: bool = False) ‑> SampleSet-
Computes random mixtures of seeds at the isotope level.
Expand source code Browse git
def generate(self, n_samples: int, max_batch_size: int = 100, skip_health_check: bool = False) -> SampleSet: """Computes random mixtures of seeds at the isotope level. """ batches = [] batch_iterable = self( n_samples, max_batch_size=max_batch_size, skip_health_check=skip_health_check ) for batch_ss in batch_iterable: batches.append(batch_ss) mixtures_ss = SampleSet() mixtures_ss.spectra_type = self.seeds_ss.spectra_type mixtures_ss.concat(batches) return mixtures_ss
class SeedSynthesizer-
Expand source code Browse git
class SeedSynthesizer(): @contextmanager def _cwd(self, path): """Temporarily change working directory. This is used to change the execution location which necessary due to how the GADRAS API uses relative pathing from its installation directory. """ oldpwd = os.getcwd() os.chdir(path) try: yield finally: os.chdir(oldpwd) def _get_detector_parameters(self, gadras_api) -> dict: params = {} for k in gadras_api.detectorGetParameters().Keys: if k not in DETECTOR_PARAMS: continue params[k] = gadras_api.detectorGetParameter(k) return params def _set_detector_parameters(self, gadras_api, new_parameters: dict, verbose=False, dry_run=False) -> None: for k, v in new_parameters.items(): k_upper = k.upper() if k_upper in INJECT_PARAMS: continue v_type = DETECTOR_PARAMS[k_upper]["type"] if v_type == "float": gadras_api.detectorSetParameter(k_upper, float(v)) if verbose: print(f"i: Setting parameter '{k_upper}' to {v}") elif v_type == "int": gadras_api.detectorSetParameter(k_upper.upper(), int(v)) if verbose: print(f"i: Setting parameter '{k_upper}' to {v}") else: print(f"Warning: parameter '{k}'s type of {v_type} is not supported - not set.") if not dry_run: gadras_api.detectorSaveParameters() def generate(self, config: Union[str, dict], normalize_spectra: bool = True, normalize_sources: bool = True, verbose: bool = False) -> SampleSet: """Produce a `SampleSet` containing foreground and/or background seeds using GADRAS based on the given inject configuration. Args: config: dictionary is treated as the actual config containing the needed information to perform injects via the GADRAS API, while a string is treated as a path to a YAML file which deserialized as a dictionary normalize_spectra: whether to divide each row of `SampleSet.spectra` by each row's sum normalize_sources: whether to divide each row of `SampleSet.sources` by each row's sum verbose: whether to show detailed output Returns: `SampleSet` containing foreground and/or background seeds generated by GADRAS """ if isinstance(config, str): with open(config, "r") as stream: config = yaml.safe_load(stream) elif not isinstance(config, dict): msg = ( "The provided config for seed synthesis must either be " "a path to a properly structured YAML file or " "a properly structured dictionary." ) raise ValueError(msg) validate_inject_config(config) setups = get_inject_setups(config) with self._cwd(GADRAS_ASSEMBLY_PATH): gadras_api = get_gadras_api() detector_name = config["gamma_detector"]["name"] gadras_api.detectorSetCurrent(detector_name) original_detector_parameters = self._get_detector_parameters(gadras_api) if verbose: print(f"Obtaining sources for '{detector_name}'") source_list = [] for s in setups: source_injector = SourceInjector(gadras_api) new_detector_parameters = s["gamma_detector"]["parameters"] now = _get_utc_timestamp().replace(":", "_") # replace() prevents error on Windows rel_output_path = f"{now}_sources.pcf" try: self._set_detector_parameters(gadras_api, new_detector_parameters, verbose) # TODO: propagate dry_run to injectors # Source injects pcf_abs_path = source_injector.generate(s, rel_output_path, verbose=verbose) seeds_ss = read_pcf(pcf_abs_path) # Manually set distance_cm so it works with expanded configs seeds_ss.info["distance_cm"] = new_detector_parameters["distance_cm"] seeds_ss.info["height_cm"] = new_detector_parameters["height_cm"] for k, v in new_detector_parameters.items(): if k not in DETECTOR_PARAMS or k.startswith("ECAL"): # e-cal info will come from the PCF continue seeds_ss.info[k.lower()] = v if not normalize_sources: seeds_ss.sources *= seeds_ss.spectra.sum(axis=1).values source_list.append(seeds_ss) except Exception as e: # Try to restore .dat file to original state even when an error occurs self._set_detector_parameters(gadras_api, original_detector_parameters) raise e # Restore .dat file to original state self._set_detector_parameters(gadras_api, original_detector_parameters) ss = SampleSet() ss.spectra_state = SpectraState.Counts ss.concat(source_list) ss.detector_info = deepcopy(config["gamma_detector"]) ss.set_dead_time_proportions() if normalize_spectra: ss.normalize() return ssMethods
def generate(self, config: Union[str, dict], normalize_spectra: bool = True, normalize_sources: bool = True, verbose: bool = False) ‑> SampleSet-
Produce a
SampleSetcontaining foreground and/or background seeds using GADRAS based on the given inject configuration.Args
config- dictionary is treated as the actual config containing the needed information to perform injects via the GADRAS API, while a string is treated as a path to a YAML file which deserialized as a dictionary
normalize_spectra- whether to divide each row of
SampleSet.spectraby each row's sum normalize_sources- whether to divide each row of
SampleSet.sourcesby each row's sum verbose- whether to show detailed output
Returns
SampleSetcontaining foreground and/or background seeds generated by GADRASExpand source code Browse git
def generate(self, config: Union[str, dict], normalize_spectra: bool = True, normalize_sources: bool = True, verbose: bool = False) -> SampleSet: """Produce a `SampleSet` containing foreground and/or background seeds using GADRAS based on the given inject configuration. Args: config: dictionary is treated as the actual config containing the needed information to perform injects via the GADRAS API, while a string is treated as a path to a YAML file which deserialized as a dictionary normalize_spectra: whether to divide each row of `SampleSet.spectra` by each row's sum normalize_sources: whether to divide each row of `SampleSet.sources` by each row's sum verbose: whether to show detailed output Returns: `SampleSet` containing foreground and/or background seeds generated by GADRAS """ if isinstance(config, str): with open(config, "r") as stream: config = yaml.safe_load(stream) elif not isinstance(config, dict): msg = ( "The provided config for seed synthesis must either be " "a path to a properly structured YAML file or " "a properly structured dictionary." ) raise ValueError(msg) validate_inject_config(config) setups = get_inject_setups(config) with self._cwd(GADRAS_ASSEMBLY_PATH): gadras_api = get_gadras_api() detector_name = config["gamma_detector"]["name"] gadras_api.detectorSetCurrent(detector_name) original_detector_parameters = self._get_detector_parameters(gadras_api) if verbose: print(f"Obtaining sources for '{detector_name}'") source_list = [] for s in setups: source_injector = SourceInjector(gadras_api) new_detector_parameters = s["gamma_detector"]["parameters"] now = _get_utc_timestamp().replace(":", "_") # replace() prevents error on Windows rel_output_path = f"{now}_sources.pcf" try: self._set_detector_parameters(gadras_api, new_detector_parameters, verbose) # TODO: propagate dry_run to injectors # Source injects pcf_abs_path = source_injector.generate(s, rel_output_path, verbose=verbose) seeds_ss = read_pcf(pcf_abs_path) # Manually set distance_cm so it works with expanded configs seeds_ss.info["distance_cm"] = new_detector_parameters["distance_cm"] seeds_ss.info["height_cm"] = new_detector_parameters["height_cm"] for k, v in new_detector_parameters.items(): if k not in DETECTOR_PARAMS or k.startswith("ECAL"): # e-cal info will come from the PCF continue seeds_ss.info[k.lower()] = v if not normalize_sources: seeds_ss.sources *= seeds_ss.spectra.sum(axis=1).values source_list.append(seeds_ss) except Exception as e: # Try to restore .dat file to original state even when an error occurs self._set_detector_parameters(gadras_api, original_detector_parameters) raise e # Restore .dat file to original state self._set_detector_parameters(gadras_api, original_detector_parameters) ss = SampleSet() ss.spectra_state = SpectraState.Counts ss.concat(source_list) ss.detector_info = deepcopy(config["gamma_detector"]) ss.set_dead_time_proportions() if normalize_spectra: ss.normalize() return ss
class SpectraState (value, names=None, *, module=None, qualname=None, type=None, start=1)-
States in which SampleSet spectra can exist.
Expand source code Browse git
class SpectraState(int, Enum): """States in which SampleSet spectra can exist.""" Unknown = 0 Counts = 1 L1Normalized = 2 L2Normalized = 3Ancestors
- builtins.int
- enum.Enum
Class variables
var Countsvar L1Normalizedvar L2Normalizedvar Unknown
class SpectraType (value, names=None, *, module=None, qualname=None, type=None, start=1)-
Types for SampleSet spectra.
Expand source code Browse git
class SpectraType(int, Enum): """Types for SampleSet spectra.""" Unknown = 0 Background = 1 Foreground = 2 Gross = 3 BackgroundForeground = 4Ancestors
- builtins.int
- enum.Enum
Class variables
var Backgroundvar BackgroundForegroundvar Foregroundvar Grossvar Unknown
class StaticSynthesizer (samples_per_seed: int = 100, live_time_function: str = 'uniform', live_time_function_args=(0.25, 8.0), snr_function: str = 'uniform', snr_function_args=(0.01, 100.0), bg_cps: float = 300.0, long_bg_live_time: float = 120.0, apply_poisson_noise: bool = True, normalize_sources: bool = True, return_fg: bool = True, return_gross: bool = False, rng: numpy.random._generator.Generator = Generator(PCG64))-
Creates a set of synthetic gamma spectra from seed templates.
The "seed" templates are count-normalized spectra representing signature shapes of interest. The static synthesizer takes the seeds you have chosen and scales them up in terms of three components:
- live time (the amount of time over which the spectrum was collected/integrated)
- signal-to-noise ratio (SNR) (source counts divided by the square root of background counts, i.e., the number of standard deviations above background)
- a fixed background rate (as a reference point)
The static synthesizer is meant to capture various count rates from sources in a statically placed detector scenario where the background can be characterized (and usually subtracted). Effects related to pile-up, scattering, or other shape-changing outcomes must be represented in the seeds.
Args
samples_per_seed- number of synthetic samples to generate per source-background seed pair
live_time_function- string that names the method of sampling for target live time values (options: uniform, log10, discrete, list)
live_time_function_args- range of values which are sampled in the
fashion specified by the
live_time_functionargument snr_function- string that names the method of sampling for target signal-to-noise ratio values (options: uniform, log10, discrete, list)
snr_function_args- range of values which are sampled in the fashion
specified by the
snr_functionargument
Expand source code Browse git
class StaticSynthesizer(Synthesizer): """Creates a set of synthetic gamma spectra from seed templates. The "seed" templates are count-normalized spectra representing signature shapes of interest. The static synthesizer takes the seeds you have chosen and scales them up in terms of three components: - live time (the amount of time over which the spectrum was collected/integrated) - signal-to-noise ratio (SNR) (source counts divided by the square root of background counts, i.e., the number of standard deviations above background) - a fixed background rate (as a reference point) The static synthesizer is meant to capture various count rates from sources in a statically placed detector scenario where the background can be characterized (and usually subtracted). Effects related to pile-up, scattering, or other shape-changing outcomes must be represented in the seeds. """ def __init__(self, samples_per_seed: int = 100, live_time_function: str = "uniform", live_time_function_args=(0.25, 8.0), snr_function: str = "uniform", snr_function_args=(0.01, 100.0), bg_cps: float = 300.0, long_bg_live_time: float = 120.0, apply_poisson_noise: bool = True, normalize_sources: bool = True, return_fg: bool = True, return_gross: bool = False, rng: Generator = np.random.default_rng()) -> None: """ Args: samples_per_seed: number of synthetic samples to generate per source-background seed pair live_time_function: string that names the method of sampling for target live time values (options: uniform, log10, discrete, list) live_time_function_args: range of values which are sampled in the fashion specified by the `live_time_function` argument snr_function: string that names the method of sampling for target signal-to-noise ratio values (options: uniform, log10, discrete, list) snr_function_args: range of values which are sampled in the fashion specified by the `snr_function` argument """ super().__init__(bg_cps, long_bg_live_time, apply_poisson_noise, normalize_sources, return_fg, return_gross, rng) self.samples_per_seed = samples_per_seed self.live_time_function = live_time_function self.live_time_function_args = live_time_function_args self.snr_function = snr_function self.snr_function_args = snr_function_args # region Properties @property def samples_per_seed(self): """Get or set the number of samples to create per seed (excluding the background seed). Raises: `TypeError` when provided samples_per_seed is not an integer. """ return self._samples_per_seed @samples_per_seed.setter def samples_per_seed(self, value: int): if not isinstance(value, int): raise TypeError("Property 'samples_per_seed' key must be of type 'int'!") self._samples_per_seed = value @property def live_time_function(self) -> str: """Get or set the function used to randomly sample the desired live time space. Raises: `ValueError` when an unsupported function type is provided. """ return self._live_time_function @live_time_function.setter def live_time_function(self, value: str): if value not in self.SUPPORTED_SAMPLING_FUNCTIONS: raise ValueError("{} is not a valid function.".format(value)) self._live_time_function = value @property def live_time_function_args(self) -> tuple: """Get or set the live time space to be randomly sampled.""" return self._live_time_function_args @live_time_function_args.setter def live_time_function_args(self, value): self._live_time_function_args = value @property def snr_function(self) -> str: """Get or set the function used to randomly sample the desired signal-to-noise (SNR) ratio space. Raises: `ValueError` when an unsupported function type is provided. """ return self._snr_function @snr_function.setter def snr_function(self, value: str): if value not in self.SUPPORTED_SAMPLING_FUNCTIONS: raise ValueError("{} is not a valid function.".format(value)) self._snr_function = value @property def snr_function_args(self) -> tuple: """Get or set the signal-to-noise (SNR) space to be randomly sampled.""" return self._snr_function_args @snr_function_args.setter def snr_function_args(self, value): self._snr_function_args = value # endregion def _get_concatenated_batches(self, ss_batches, n_samples_expected, spectra_type): ss = SampleSet() ss.measured_or_synthetic = self.SYNTHETIC_STR ss.spectra_state = SpectraState.Counts ss.spectra_type = spectra_type ss.concat(ss_batches) self._verify_n_samples_synthesized(ss.n_samples, n_samples_expected) if self.normalize_sources: ss.normalize_sources() return ss def _get_synthetic_samples(self, fg_seeds_ss: SampleSet, bg_seeds_ss: SampleSet, verbose=True): """Iterate over each background, then each source, to generate a batch of spectra that target a set of SNR and live time values. """ n_returns = self.return_fg + self.return_gross n_samples_per_return = self.samples_per_seed * fg_seeds_ss.n_samples * bg_seeds_ss.n_samples n_samples_expected = n_returns * n_samples_per_return fg_ss_batches = [] gross_ss_batches = [] lt_targets = get_distribution_values(self.live_time_function, self.live_time_function_args, n_samples_per_return, self._rng) snr_targets = get_distribution_values(self.snr_function, self.snr_function_args, n_samples_per_return, self._rng) batch_configs = [ ( b, f, self.samples_per_seed * (b * fg_seeds_ss.n_samples + f), self.samples_per_seed * (b * fg_seeds_ss.n_samples + f + 1), ) for f in range(fg_seeds_ss.n_samples) for b in range(bg_seeds_ss.n_samples) ] fg_labels = fg_seeds_ss.get_labels(target_level="Seed", max_only=False, level_aggregation=None) fg_ss_batches = [] gross_ss_batches = [] for b, f, batch_begin_idx, batch_end_idx in batch_configs: bg_seed = bg_seeds_ss.spectra.iloc[b] bg_sources = bg_seeds_ss.sources.iloc[b] fg_seed = fg_seeds_ss.spectra.iloc[f] fg_sources = fg_seeds_ss.sources.iloc[f] batch_lt_targets = lt_targets[batch_begin_idx:batch_end_idx] batch_snr_targets = snr_targets[batch_begin_idx:batch_end_idx] fg_batch_ss, gross_batch_ss = self._get_batch( fg_seed, fg_sources, bg_seed, bg_sources, batch_lt_targets, batch_snr_targets ) fg_seed_info = fg_seeds_ss.info.iloc[f] set_ss_info_from_seed_info(fg_batch_ss, fg_seed_info, self._synthesis_start_dt) set_ss_info_from_seed_info(gross_batch_ss, fg_seed_info, self._synthesis_start_dt) fg_ss_batches.append(fg_batch_ss) gross_ss_batches.append(gross_batch_ss) if verbose: self._report_progress( n_samples_expected, fg_labels[f] ) fg_ss = gross_ss = None if self.return_fg: fg_ss = self._get_concatenated_batches( fg_ss_batches, n_samples_per_return, SpectraType.Foreground ) if self.return_gross: gross_ss = self._get_concatenated_batches( gross_ss_batches, n_samples_per_return, SpectraType.Gross ) return fg_ss, gross_ss def generate(self, fg_seeds_ss: SampleSet, bg_seeds_ss: SampleSet, skip_health_check: bool = False, verbose: bool = True) -> Tuple[SampleSet, SampleSet]: """Generate a `SampleSet` of gamma spectra from the provided config. Args: fg_seeds_ss: spectra normalized by total counts to be used as the source component(s) of spectra bg_seeds_ss: spectra normalized by total counts to be used as the background components of gross spectra fixed_bg_ss: single spectrum to be used as a fixed (or intrinsic) background source; live time information must be present. This spectrum is used to represent things like: - cosmic background (which is location-specific); - one or more calibration sources; or - intrinsic counts from the detector material (e.g., LaBr3). This spectrum will form the base of each background spectrum where seeds in `bg_seeds_ss`, which represent mixtures of K-U-T, get added on top. Note: this spectrum is not considered part of the `bg_cps` parameter, but is instead added on top of it. skip_health_check: whether to skip seed health checks verbose: whether to show detailed output Returns: Tuple of synthetic foreground and gross `SampleSet`s Raises: - `ValueError` when either foreground of background seeds are not provided - `AssertionError` if seed health check fails """ if not fg_seeds_ss or not bg_seeds_ss: raise ValueError("At least one foreground and background seed must be provided.") if not skip_health_check: fg_seeds_ss.check_seed_health() bg_seeds_ss.check_seed_health() self._reset_progress() if verbose: tstart = time() fg_ss, gross_ss = self._get_synthetic_samples( fg_seeds_ss, bg_seeds_ss, verbose=verbose ) if verbose: delay = time() - tstart self._report_completion(delay) return fg_ss, gross_ssAncestors
Instance variables
var live_time_function : str-
Get or set the function used to randomly sample the desired live time space.
Raises
ValueErrorwhen an unsupported function type is provided.Expand source code Browse git
@property def live_time_function(self) -> str: """Get or set the function used to randomly sample the desired live time space. Raises: `ValueError` when an unsupported function type is provided. """ return self._live_time_function var live_time_function_args : tuple-
Get or set the live time space to be randomly sampled.
Expand source code Browse git
@property def live_time_function_args(self) -> tuple: """Get or set the live time space to be randomly sampled.""" return self._live_time_function_args var samples_per_seed-
Get or set the number of samples to create per seed (excluding the background seed).
Raises
TypeErrorwhen provided samples_per_seed is not an integer.Expand source code Browse git
@property def samples_per_seed(self): """Get or set the number of samples to create per seed (excluding the background seed). Raises: `TypeError` when provided samples_per_seed is not an integer. """ return self._samples_per_seed var snr_function : str-
Get or set the function used to randomly sample the desired signal-to-noise (SNR) ratio space.
Raises
ValueErrorwhen an unsupported function type is provided.Expand source code Browse git
@property def snr_function(self) -> str: """Get or set the function used to randomly sample the desired signal-to-noise (SNR) ratio space. Raises: `ValueError` when an unsupported function type is provided. """ return self._snr_function var snr_function_args : tuple-
Get or set the signal-to-noise (SNR) space to be randomly sampled.
Expand source code Browse git
@property def snr_function_args(self) -> tuple: """Get or set the signal-to-noise (SNR) space to be randomly sampled.""" return self._snr_function_args
Methods
def generate(self, fg_seeds_ss: SampleSet, bg_seeds_ss: SampleSet, skip_health_check: bool = False, verbose: bool = True) ‑> Tuple[SampleSet, SampleSet]-
Generate a
SampleSetof gamma spectra from the provided config.Args
fg_seeds_ss- spectra normalized by total counts to be used as the source component(s) of spectra
bg_seeds_ss- spectra normalized by total counts to be used as the background components of gross spectra
fixed_bg_ss-
single spectrum to be used as a fixed (or intrinsic) background source; live time information must be present. This spectrum is used to represent things like:
- cosmic background (which is location-specific);
- one or more calibration sources; or
- intrinsic counts from the detector material (e.g., LaBr3).
This spectrum will form the base of each background spectrum where seeds in
bg_seeds_ss, which represent mixtures of K-U-T, get added on top. Note: this spectrum is not considered part of thebg_cpsparameter, but is instead added on top of it. skip_health_check- whether to skip seed health checks
verbose- whether to show detailed output
Returns
Tuple of synthetic foreground and gross
SampleSetsRaises
ValueErrorwhen either foreground of background seeds are not providedAssertionErrorif seed health check fails
Expand source code Browse git
def generate(self, fg_seeds_ss: SampleSet, bg_seeds_ss: SampleSet, skip_health_check: bool = False, verbose: bool = True) -> Tuple[SampleSet, SampleSet]: """Generate a `SampleSet` of gamma spectra from the provided config. Args: fg_seeds_ss: spectra normalized by total counts to be used as the source component(s) of spectra bg_seeds_ss: spectra normalized by total counts to be used as the background components of gross spectra fixed_bg_ss: single spectrum to be used as a fixed (or intrinsic) background source; live time information must be present. This spectrum is used to represent things like: - cosmic background (which is location-specific); - one or more calibration sources; or - intrinsic counts from the detector material (e.g., LaBr3). This spectrum will form the base of each background spectrum where seeds in `bg_seeds_ss`, which represent mixtures of K-U-T, get added on top. Note: this spectrum is not considered part of the `bg_cps` parameter, but is instead added on top of it. skip_health_check: whether to skip seed health checks verbose: whether to show detailed output Returns: Tuple of synthetic foreground and gross `SampleSet`s Raises: - `ValueError` when either foreground of background seeds are not provided - `AssertionError` if seed health check fails """ if not fg_seeds_ss or not bg_seeds_ss: raise ValueError("At least one foreground and background seed must be provided.") if not skip_health_check: fg_seeds_ss.check_seed_health() bg_seeds_ss.check_seed_health() self._reset_progress() if verbose: tstart = time() fg_ss, gross_ss = self._get_synthetic_samples( fg_seeds_ss, bg_seeds_ss, verbose=verbose ) if verbose: delay = time() - tstart self._report_completion(delay) return fg_ss, gross_ss