Module riid.gadras.pcf

This module contains utilities for working with GADRAS PCF files.

Expand source code Browse git
# Copyright 2021 National Technology & Engineering Solutions of Sandia, LLC (NTESS).
# Under the terms of Contract DE-NA0003525 with NTESS,
# the U.S. Government retains certain rights in this software.
"""This module contains utilities for working with GADRAS PCF files."""
import struct
from collections import defaultdict
from pathlib import Path
from typing import List, Tuple

import numpy as np
import tqdm

HEADER_DEFINITIONS = defaultdict(lambda: {
    "fields": (
        "NRPS",
        "Version",
        "Ecal_label",
        "Energy_Calibration_Offset",
        "Energy_Calibration_Gain",
        "Energy_Calibration_Quadratic",
        "Energy_Calibration_Cubic",
        "Energy_Calibration_Low_Energy"
    ),
    "mask": "<h3s4cfffff",
    "n_bytes": [2, 3, 4, 4, 4, 4, 4, 4]
})

HEADER_DEFINITIONS["DHS"] = {
    "fields": (
        "NRPS",
        "Version",
        "last_mod_hash",
        "UUID",
        "Inspection",
        "Lane_Number",
        "Measurement_Remark",
        "Intrument_Type",
        "Manufacturer",
        "Instrument_Model",
        "Instrument_ID",
        "Item_Description",
        "Item_Location",
        "Measurement_Coordinates",
        "Item_to_detector_distance",
        "Occupancy_Number",
        "Cargo_Type"
    ),
    "mask": "<h3s7s36s16sh26s28s28s18s18s20s16s16shh16s",
    "n_bytes": [2, 3, 7, 36, 16, 2, 26, 28, 28, 18, 18, 20, 16, 16, 2, 2, 16]
}

SPECTRUM_DEFINITION = {
    "fields": (
        "Compressed_Text_Buffer",
        "Date-time_VAX",
        "Tag",
        "Live_Time",
        "Total_time_per_real_time",
        "unused0",
        "unused1",
        "unused2",
        "Energy_Calibration_Offset",
        "Energy_Calibration_Gain",
        "Energy_Calibration_Quadratic",
        "Energy_Calibration_Cubic",
        "Energy_Calibration_Low_Energy",
        "Occupancy_Flag",
        "Total_Neutron_Counts",
        "Number_of_Channels"
    ),
    "mask": "180s23scffffffffffffi",
    "n_bytes": [180, 23, 1, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4]
}


def _get_srsi(file_bytes: list) -> Tuple[str, int]:
    """Obtains the PCF file Spectral Records Start Index (SRSI).

    Args:
        file_bytes: byte array of the pcf file contents

    Returns:
        Tuple containing:

        - return_value: string indicator for what Deviation pairs type is being used
        - srsi: Spectral Records Start Index, the index where spectra records begin
            after skipping past deviation pairs
    """
    test_lengths = [30, 20]
    index = 256
    return_value = "other"
    srsi = 2
    for i in test_lengths:
        value = struct.unpack(
            "{}s".format(i),
            file_bytes[index:index + i]
        )[0].decode("utf-8").strip()
        if value in "DeviationPairsInFile" or value in "DeviationPairsInFileCompressed":
            return_value = value
            srsi = 83
            break

    return return_value, srsi


def _get_spectrum_header_offset(spectrum_number: int, srsi: int, nrps: int) -> int:
    """Calculate the PCF header offset for a spectrum.

    Args:
        spectrum_number: number of spectrum for which to the
            obtain offset of header
        srsi: spectral record start index
        nrps: number of records (channels) per spectrum

    Returns:
        Integer representing the offset of the spectrum header
    """
    return 256 * (srsi + nrps * (spectrum_number - 1) - 1)


def _read_header(header_bytes: list, header_def: dict):
    """Converts bytes of header using header definition.

    Args:
        header_bytes: byte array of the values of the header
        header_def: dictionary defining the mask, field names, and lengths of each entry

    Returns:
        Dictionary containing the content of the header
    """
    header_values = struct.unpack(
        header_def["mask"],
        header_bytes[:sum(header_def["n_bytes"])]
    )
    header = {}
    for field, value in zip(header_def["fields"], header_values):
        if isinstance(value, bytes) and field != "last_mod_hash":
            value = value.decode("utf-8", "ignore")
        elif field == "last_mod_hash":
            value = str(value)
        header.update({field: value})
    return header


def _read_spectra(data: list, n_rec_per_spec: int, spec_rec_start_indx: int) -> List[dict]:
    """Read spectra from a PCF.

    Args:
        data: byte array of pcf file contents
        n_rec_per_spec: number of records per spectrum (NRPS in PCF ICD)
        spec_rec_start_index: spectral record start index (SRSI in PCF ICD)

    Returns:
        List of dictionaries containing pcf file spectra information
    """
    num_samples = int((len(data) - 256 * (spec_rec_start_indx - 1)) / (256 * n_rec_per_spec))
    spectra = []
    # The range below is due to the definition of the spectrum offset in
    # pcf definition document assuming a first index of 1
    for spectrum_number in range(1, num_samples + 1):
        spectrum_header_offset = _get_spectrum_header_offset(
            spectrum_number,
            spec_rec_start_indx,
            n_rec_per_spec
        )
        header_def = SPECTRUM_DEFINITION
        spectrum_header = _read_header(
            data[spectrum_header_offset: spectrum_header_offset+256],
            header_def
        )

        spctrum_offset = spectrum_header_offset + 256
        n_channels = int(256*(n_rec_per_spec-1) / 4)
        values = struct.unpack(
            "{}f".format(n_channels),
            data[spctrum_offset: spctrum_offset+(4*n_channels)]
        )

        spectrum = {
            "header": spectrum_header,
            "spectrum": np.array(values)
        }
        spectra.append(spectrum)

    return spectra


def _pcf_to_dict(pcf_file_path: Path, verbose: bool = False) -> dict:
    """Convert a PCF into a dictionary representation.

    Args:
        pcf_file_path: path to the PCF to be converted
        verbose: whether to show detailed output

    Returns:
        Dictionary with keys "header" and "spectra"
    """
    pcf_data = np.fromfile(pcf_file_path, dtype=np.uint8)
    version = struct.unpack("3s", pcf_data[2:5])[0].decode("utf-8")
    header_def = HEADER_DEFINITIONS[version]
    has_deviation_pairs = "DeviationPairsInFile" in header_def or \
        "DeviationPairsInFileCompressed" in header_def
    if has_deviation_pairs:
        deviation_values = struct.unpack("5120f", pcf_data[512:512+20480])
        if not set(deviation_values) == {0} and verbose:
            print("Deviation pairs exist in file")
    # Header provides metadata about the collection.
    # Spectra is a list of "header" and "spectrum" pairs for each spectrum in file.
    header = _read_header(pcf_data[:256], header_def)
    dev_type, spec_rec_start_indx = _get_srsi(pcf_data)
    header.update({"SRSI": spec_rec_start_indx, "DevType": dev_type})
    spectra = _read_spectra(pcf_data, header["NRPS"], spec_rec_start_indx)

    return {"header": header, "spectra": spectra}


def _convert_header(header_dict: dict, header_def: dict) -> bytes:
    """Convert the header to a bytes object.

    Args:
        header_dict: dictionary of header values to be converted
        header_def: dictionary of field and n_bytes values

    Returns:
        `bytes` object containing the converted header dictionary
    """
    values = []
    for i, tar_len in zip(header_def["fields"], header_def["n_bytes"]):
        if "unused" not in i:
            value = header_dict[i]
            if isinstance(value, str):
                if len(value) < tar_len:
                    value = value.ljust(tar_len, " ")
                value = value.encode("utf-8")
            if i == "Tag":
                if value == b"":
                    value = b" "
            values.append(value)
        else:
            values.append(0.0)
    return struct.pack(header_def["mask"], *values)


def _spectrum_byte_offset(spectrum_index: int, n_records_per_spectrum: int,
                          spec_rec_start_index: int = 83) -> int:
    """Get the byte offset in the file where the spectrum should occur, where
    spectrum_index begins with index equals 1.

    Args:
        spectrum_index: index of the spectrum to be located
        n_records_per_spectrum: number of records (channels) per spectrum

    Returns:
        The integer byte offset of the desired spectrum
    """
    return 256 * (spec_rec_start_index + n_records_per_spectrum * (spectrum_index - 1) - 1)


def _dict_to_pcf(pcf_dict: dict, save_path: Path, verbose=False):
    """Convert PCF dictionary representation to PCF binary.

    Args:
        pcf_dict: dictionary of PCF values
        save_path: path at which to save the PCF
    """
    header = pcf_dict["header"]
    n_records_per_spectrum = header["NRPS"]
    n_spectra = len(pcf_dict["spectra"])

    file_bytes = _convert_header(header, HEADER_DEFINITIONS["DHS"])
    file_bytes += struct.pack("20s", b"DeviationPairsInFile")
    loc_first_spectra = _spectrum_byte_offset(1, n_records_per_spectrum)
    n_pad = 512 - len(file_bytes)
    file_bytes += bytes((" " * n_pad).encode("utf-8"))
    n_pad = loc_first_spectra - len(file_bytes)
    file_bytes += bytes(n_pad)
    # save binary file
    with open(save_path, "wb") as fout:
        fout.write(file_bytes)

    # Add the spectra to file
    sample_range = range(n_spectra)
    if verbose:
        sample_range = tqdm.tqdm(
            sample_range,
            desc="Writing to file"
        )

    with open(save_path, "ab") as fout:
        for i in sample_range:
            spectrum_dict = pcf_dict["spectra"][i]
            spectrum_header_dict = spectrum_dict["header"]
            spectrum_header_bytes = _convert_header(spectrum_header_dict, SPECTRUM_DEFINITION)
            n_channels = len(spectrum_dict["spectrum"])
            spectrum_bytes = struct.pack("{}f".format(n_channels), *spectrum_dict["spectrum"])
            file_bytes = spectrum_header_bytes + spectrum_bytes
            fout.write(file_bytes)


def _unpack_compressed_text_buffer(ctb, field_len=60) -> Tuple[str, str, str]:
    """Unpack a compressed text buffer into title, description, and source."""
    if ord(ctb[0]) == 255:
        ctb_parts = ctb.split(ctb[0])
        title, description, source = ctb_parts[1:]
    else:
        title = ctb[:field_len]
        description = ctb[field_len:field_len*2]
        source = ctb[field_len*2:]
    title = title.strip()  # this is treated as the isotope
    # Note: description is described in PCF doc but currently does not appear accessible via API
    description = description.strip()
    source = source.strip()
    return title, description, source


def _pack_compressed_text_buffer(title, desc, source, field_len=60) -> str:
    """Convert title, description, and source strings into a single, PCF-compatible array of 180
    characters to be put into the compressed text buffer.

    Each argument should by 60 characters or less.
    PyRIID does not use the delimiter approach for the compressed text buffer when converting
    from `SampleSet` to PCF. Any part of the field exceeding 60 bytes will be truncated to fit.

    Args:
        title: isotope of the record
        desc: custom user description
        source: seed used to generate the sample, i.e., the inject source string
        field_len: fixed length of each field if delimeters are not to be used

    Returns:
        String of length 180
    """
    assert field_len >= 3

    ctb_len = field_len * 3
    title = title.strip()
    title_len = len(title)
    desc = desc.strip()
    desc_len = len(desc)
    source = source.strip()
    source_len = len(source)

    if title_len <= field_len and desc_len <= field_len and source_len <= field_len:
        ctb = \
            f"{title:{field_len}.{field_len}}" + \
            f"{desc:{field_len}.{field_len}}" + \
            f"{source:{field_len}.{field_len}}"
    else:
        if title_len + desc_len + source_len <= ctb_len - 3:
            ctb = \
                "\xFF" + \
                f"{title:{title_len}.{title_len}}" + \
                "\xFF" + \
                f"{desc:{desc_len}.{desc_len}}" + \
                "\xFF" + \
                f"{source:{source_len}.{source_len}}"
        else:
            # Description is thrown away
            title_len = min(title_len, ctb_len - 3 - source_len)
            ctb = \
                "\xFF" + \
                f"{title:{title_len}.{title_len}}" + \
                "\xFF\xFF" + \
                f"{source:{source_len}.{source_len}}"

    if len(ctb) < ctb_len:
        ctb = ctb.ljust(ctb_len)

    return ctb