Module riid.data.converters.topcoder
This module provides tools for handling data related to the "Detecting Radiological Threats in Urban Areas" TopCoder Challenge. https://doi.org/10.1038/s41597-020-00672-2
Expand source code Browse git
# Copyright 2021 National Technology & Engineering Solutions of Sandia, LLC (NTESS).
# Under the terms of Contract DE-NA0003525 with NTESS,
# the U.S. Government retains certain rights in this software.
"""This module provides tools for handling data related to the
"Detecting Radiological Threats in Urban Areas" TopCoder Challenge.
https://doi.org/10.1038/s41597-020-00672-2
"""
import csv
import logging
import os
from pathlib import Path
import numpy as np
import pandas as pd
from riid import SAMPLESET_HDF_FILE_EXTENSION, SampleSet
from riid.data.converters import _validate_and_create_output_dir
from riid.data.labeling import label_to_index_element
SOURCE_ID_TO_LABEL = {
0: "Background",
1: "HEU",
2: "WGPu",
3: "I131",
4: "Co60",
5: "Tc99m",
6: "HEU + Tc99m",
}
DISTINCT_SOURCES = list(SOURCE_ID_TO_LABEL.values())
def _get_answers(answer_file_path: str):
answers = {}
with open(answer_file_path) as csvfile:
reader = csv.reader(csvfile, delimiter=",")
_ = next(reader) # skip header
# timestamp = 0 # in milliseconds
for row in reader:
run_id = row[0]
source_id = int(row[1])
source_time_secs = float(row[2])
answers[run_id] = {
"source_id": source_id,
"source_time_secs": source_time_secs
}
return answers
def topcoder_file_to_ss(file_path: str, sample_interval: float, n_bins: int = 1024,
max_energy_kev: int = 4000, answers_path: str = None) -> SampleSet:
"""Convert a TopCoder CSV file of list-mode data into a SampleSet.
Args:
file_path: file path of the CSV file
sample_interval: integration time (referred to as "live time" later) to use in seconds.
Warning: the final sample in the set will likely be truncated, i.e., the count rate
will appear low because the live time represented is too large.
Consider ignoring the last sample.
n_bins: desired number of bins in the resulting spectra.
Bins will be uniformly spaced from 0 to `max_energy_kev`.
max_energy_kev: desired maximum of the energy range represented in the resulting spectra.
Intuition (assuming a fixed number of bins): a higher max energy value "compresses" the
spectral information; a lower max energy value spreads out the spectral information and
counts are potentially lost off the high energy end of the specturm.
answers_path: path to the answer key for the data. If provided, this will fill out the
`SampleSet.sources` DataFrame.
Returns:
`SampleSet` containing the series of spectra for a single run
"""
file_name_with_dir = os.path.splitext(file_path)[0]
file_name = os.path.basename(file_name_with_dir)
slice_duration_ms = sample_interval * 1000 # in milliseconds
events = []
with open(file_path) as csvfile:
reader = csv.reader(csvfile, delimiter=",")
timestamp = 0 # in milliseconds
for row in reader:
timestamp += int(row[0]) / 1000 # microseconds to milliseconds
energy = float(row[1])
if energy > max_energy_kev:
msg = (
f"Encountered energy ({energy:.2f} keV) greater than "
f"specified max energy ({max_energy_kev:.2f})"
)
logging.warn(msg)
channel = int(n_bins * energy // max_energy_kev) # energy to bin
events.append((timestamp, channel, 1))
events_df = pd.DataFrame(
events,
columns=["timestamp", "channel", "counts"]
)
# Organize events into time intervals
event_time_intervals = pd.cut(
events_df["timestamp"],
np.arange(
start=0,
stop=events_df["timestamp"].max()+slice_duration_ms,
step=slice_duration_ms
)
)
# Group events by time intervals
event_time_groups = events_df.groupby(event_time_intervals)
# Within time intervals, sum counts by channel
result = event_time_groups.apply(
lambda x: x.groupby("channel").sum().loc[:, ["counts"]]
)
# Create new dataframe where: row = time interval, column = channel
spectra_df = result.unstack(level=-1, fill_value=0)
spectra_df = spectra_df["counts"]
# Add in missing columns as needed
col_list = np.arange(0, n_bins)
cols_to_add = np.setdiff1d(col_list, spectra_df.columns)
missing_df = pd.DataFrame(
0,
columns=cols_to_add,
index=spectra_df.index
)
combined_unsorted_df = pd.concat([spectra_df, missing_df], axis=1)
combined_df = combined_unsorted_df.reindex(
sorted(combined_unsorted_df.columns),
axis=1
)
spectra_df = combined_df.astype(int)
spectra_df.reset_index(inplace=True, drop=True)
# SampleSet creation
ss = SampleSet()
ss.measured_or_synthetic = "synthetic"
ss.detector_info = {
"name": "2\"x4\"x16\" NaI(Tl)",
"height_cm": 100,
"fwhm_at_661_kev": 0.075,
}
ss.spectra = spectra_df
ss.info.total_counts = spectra_df.sum(axis=1)
ss.info.live_time = sample_interval
ss.info.description = file_name
ss.info.ecal_order_0 = 0
ss.info.ecal_order_1 = max_energy_kev
ss.info.ecal_order_2 = 0
ss.info.ecal_order_3 = 0
ss.info.ecal_low_e = 0
if answers_path:
answers = _get_answers(answers_path)
run_id = file_name.split("runID-")[-1].split(".")[0]
ss.info.timestamp = answers[run_id]["source_time_secs"]
source_id = answers[run_id]["source_id"]
sources_mi = pd.MultiIndex.from_tuples([
label_to_index_element(x, label_level="Seed")
for x in DISTINCT_SOURCES
], names=SampleSet.SOURCES_MULTI_INDEX_NAMES)
sources_df = pd.DataFrame(
np.zeros((ss.n_samples, len(DISTINCT_SOURCES))),
columns=sources_mi,
)
sources_df.sort_index(axis=1, inplace=True)
source = label_to_index_element(SOURCE_ID_TO_LABEL[source_id], label_level="Seed")
sources_df[source] = 1.0
ss.sources = sources_df
return ss
def convert_and_save(input_file_path: str, output_dir: str = None,
skip_existing: bool = True, **kwargs):
"""Convert TopCoder file to SampleSet and save as HDF.
Output file will have same name with different extension.
Args:
input_file_path: file path of the CSV file
output_dir: alternative directory in which to save HDF files
(defaults to `input_file_path` parent if not provided)
skip_existing: whether to skip conversion if the file already exists
kwargs: keyword args passed to `topcoder_file_to_ss()`
"""
input_path = Path(input_file_path)
if not output_dir:
output_dir = input_path.parent
_validate_and_create_output_dir(output_dir)
output_file_path = os.path.join(output_dir, input_path.stem + SAMPLESET_HDF_FILE_EXTENSION)
if skip_existing and os.path.exists(output_file_path):
return
ss = topcoder_file_to_ss(input_file_path, **kwargs)
ss.to_hdf(output_file_path)
Functions
def convert_and_save(input_file_path: str, output_dir: str = None, skip_existing: bool = True, **kwargs)-
Convert TopCoder file to SampleSet and save as HDF.
Output file will have same name with different extension.
Args
input_file_path- file path of the CSV file
output_dir- alternative directory in which to save HDF files
(defaults to
input_file_pathparent if not provided) skip_existing- whether to skip conversion if the file already exists
kwargs- keyword args passed to
topcoder_file_to_ss()
Expand source code Browse git
def convert_and_save(input_file_path: str, output_dir: str = None, skip_existing: bool = True, **kwargs): """Convert TopCoder file to SampleSet and save as HDF. Output file will have same name with different extension. Args: input_file_path: file path of the CSV file output_dir: alternative directory in which to save HDF files (defaults to `input_file_path` parent if not provided) skip_existing: whether to skip conversion if the file already exists kwargs: keyword args passed to `topcoder_file_to_ss()` """ input_path = Path(input_file_path) if not output_dir: output_dir = input_path.parent _validate_and_create_output_dir(output_dir) output_file_path = os.path.join(output_dir, input_path.stem + SAMPLESET_HDF_FILE_EXTENSION) if skip_existing and os.path.exists(output_file_path): return ss = topcoder_file_to_ss(input_file_path, **kwargs) ss.to_hdf(output_file_path) def topcoder_file_to_ss(file_path: str, sample_interval: float, n_bins: int = 1024, max_energy_kev: int = 4000, answers_path: str = None) ‑> SampleSet-
Convert a TopCoder CSV file of list-mode data into a SampleSet.
Args
file_path- file path of the CSV file
sample_interval- integration time (referred to as "live time" later) to use in seconds. Warning: the final sample in the set will likely be truncated, i.e., the count rate will appear low because the live time represented is too large. Consider ignoring the last sample.
n_bins- desired number of bins in the resulting spectra.
Bins will be uniformly spaced from 0 to
max_energy_kev. max_energy_kev- desired maximum of the energy range represented in the resulting spectra. Intuition (assuming a fixed number of bins): a higher max energy value "compresses" the spectral information; a lower max energy value spreads out the spectral information and counts are potentially lost off the high energy end of the specturm.
answers_path- path to the answer key for the data. If provided, this will fill out the
SampleSet.sourcesDataFrame.
Returns
SampleSetcontaining the series of spectra for a single runExpand source code Browse git
def topcoder_file_to_ss(file_path: str, sample_interval: float, n_bins: int = 1024, max_energy_kev: int = 4000, answers_path: str = None) -> SampleSet: """Convert a TopCoder CSV file of list-mode data into a SampleSet. Args: file_path: file path of the CSV file sample_interval: integration time (referred to as "live time" later) to use in seconds. Warning: the final sample in the set will likely be truncated, i.e., the count rate will appear low because the live time represented is too large. Consider ignoring the last sample. n_bins: desired number of bins in the resulting spectra. Bins will be uniformly spaced from 0 to `max_energy_kev`. max_energy_kev: desired maximum of the energy range represented in the resulting spectra. Intuition (assuming a fixed number of bins): a higher max energy value "compresses" the spectral information; a lower max energy value spreads out the spectral information and counts are potentially lost off the high energy end of the specturm. answers_path: path to the answer key for the data. If provided, this will fill out the `SampleSet.sources` DataFrame. Returns: `SampleSet` containing the series of spectra for a single run """ file_name_with_dir = os.path.splitext(file_path)[0] file_name = os.path.basename(file_name_with_dir) slice_duration_ms = sample_interval * 1000 # in milliseconds events = [] with open(file_path) as csvfile: reader = csv.reader(csvfile, delimiter=",") timestamp = 0 # in milliseconds for row in reader: timestamp += int(row[0]) / 1000 # microseconds to milliseconds energy = float(row[1]) if energy > max_energy_kev: msg = ( f"Encountered energy ({energy:.2f} keV) greater than " f"specified max energy ({max_energy_kev:.2f})" ) logging.warn(msg) channel = int(n_bins * energy // max_energy_kev) # energy to bin events.append((timestamp, channel, 1)) events_df = pd.DataFrame( events, columns=["timestamp", "channel", "counts"] ) # Organize events into time intervals event_time_intervals = pd.cut( events_df["timestamp"], np.arange( start=0, stop=events_df["timestamp"].max()+slice_duration_ms, step=slice_duration_ms ) ) # Group events by time intervals event_time_groups = events_df.groupby(event_time_intervals) # Within time intervals, sum counts by channel result = event_time_groups.apply( lambda x: x.groupby("channel").sum().loc[:, ["counts"]] ) # Create new dataframe where: row = time interval, column = channel spectra_df = result.unstack(level=-1, fill_value=0) spectra_df = spectra_df["counts"] # Add in missing columns as needed col_list = np.arange(0, n_bins) cols_to_add = np.setdiff1d(col_list, spectra_df.columns) missing_df = pd.DataFrame( 0, columns=cols_to_add, index=spectra_df.index ) combined_unsorted_df = pd.concat([spectra_df, missing_df], axis=1) combined_df = combined_unsorted_df.reindex( sorted(combined_unsorted_df.columns), axis=1 ) spectra_df = combined_df.astype(int) spectra_df.reset_index(inplace=True, drop=True) # SampleSet creation ss = SampleSet() ss.measured_or_synthetic = "synthetic" ss.detector_info = { "name": "2\"x4\"x16\" NaI(Tl)", "height_cm": 100, "fwhm_at_661_kev": 0.075, } ss.spectra = spectra_df ss.info.total_counts = spectra_df.sum(axis=1) ss.info.live_time = sample_interval ss.info.description = file_name ss.info.ecal_order_0 = 0 ss.info.ecal_order_1 = max_energy_kev ss.info.ecal_order_2 = 0 ss.info.ecal_order_3 = 0 ss.info.ecal_low_e = 0 if answers_path: answers = _get_answers(answers_path) run_id = file_name.split("runID-")[-1].split(".")[0] ss.info.timestamp = answers[run_id]["source_time_secs"] source_id = answers[run_id]["source_id"] sources_mi = pd.MultiIndex.from_tuples([ label_to_index_element(x, label_level="Seed") for x in DISTINCT_SOURCES ], names=SampleSet.SOURCES_MULTI_INDEX_NAMES) sources_df = pd.DataFrame( np.zeros((ss.n_samples, len(DISTINCT_SOURCES))), columns=sources_mi, ) sources_df.sort_index(axis=1, inplace=True) source = label_to_index_element(SOURCE_ID_TO_LABEL[source_id], label_level="Seed") sources_df[source] = 1.0 ss.sources = sources_df return ss