Module riid.anomaly
This module contains an algorithm for detecting statistical anomalies from sequences of gamma spectra.
Expand source code Browse git
# Copyright 2021 National Technology & Engineering Solutions of Sandia, LLC (NTESS).
# Under the terms of Contract DE-NA0003525 with NTESS,
# the U.S. Government retains certain rights in this software.
"""This module contains an algorithm for detecting statistical anomalies from
sequences of gamma spectra.
"""
import logging
from queue import Queue
from typing import Union
import numpy as np
from scipy.stats import poisson, norm
class PoissonNChannelEventDetector():
"""This class identifies sequences of anomalous spectra.
This algorithm assumes that the background environment will be consistent over known timescales.
I.e., this algorithm is only intended for static detector, mobile source scenarios.
"""
def __init__(self, long_term_duration: int = 120, short_term_duration: int = 1,
pre_event_duration: int = 5, max_event_duration: float = 120,
post_event_duration: int = 1.5, tolerable_false_alarms_per_day: float = 1.0,
anomaly_threshold_update_interval: float = 60):
"""
Args:
long_term_duration: duration (in seconds) of the long-term buffer;
this buffer contains background samples only if the correctness of the algorithm
and environmental assumptions both hold. A longer duration is often better with
the trade-off eventually becoming RAM (to store all of the spectra in the buffer)
and waiting time vs. converging on actually seeing the desired number of tolerable
false alarms per day.
short_term_duration: duration (in seconds) of the short-term buffer;
this buffer can contain background and/or anomalous samples.
This is the buffer of "most recent" samples that is compared to the long-term buffer
in order to determine if an anomaly is present. The basic question asked by this
algorithm is, "Is what I see in my short-term significantly different than what I've
seen in my long-term buffer? If so, it's an anomaly so I'll add it to the event
buffer. If not, it's background so I'll add it to the long-term buffer."
A short-term duration close to the measurement duration is recommended.
pre_event_duration: duration (in seconds) specifying the amount of pre-event,
background samples to include in the event result (but not summed into the event
spectrum) in order to visualize the run-up to the anomaly.
max_event_duration: maximum duration (in seconds) of the event buffer.
If anomalies remain present for more than this specified amount of time, the object
will produce an event so that something is reported. A new event will likely then
be started on the subsequent measurement if the anomaly is still present.
post_event_duration: duration (in seconds) that determines the number of
consecutive, insignificant measurements (i.e., measurements which are not considered
anomalies) that must be observed in order to end an event.
tolerable_false_alarms_per_day: desired maximum number of allowable false
positive events per day for all spectrum channels. The actual observed outcome of
this parameter primarily depends on the long-term duration (although other
parameters such as the short-term duration and limit update frequency also play a
part). Long-term duration influences the number of actual false alarms per day in
the following ways:
1. if the long-term buffer is not long enough to incorporate long-term
background fluctuations, such fluctuations can be interpreted as anomalies;
and
2. if the long-term buffer is too short for the desired false alarm rate, it
cannot accurately model the anomaly probability distribution
(more specifically, to achieve a very low false alarm rate, this algorithm
must witness a sufficiently large number of measurements around the tail of
the anomaly probability distribution where the false alarm threshold exists,
and measurements in that region do not happen very often, hence you must
wait a long time to see enough).
To give an idea of the challenge here, empirical evidence has shown that to
accurately achieve 1 false alarm per day requires 6 or more hours of long-term
duration for a 1024-channel, 3"x3" NaI detector at a sample interval of 0.25s.
The unfortunate reality of this is that you are now dealing with a timescale where
the background is likely to have a significantly different mean. So what is one to
do? The simple answer is leave this parameter at its default, and then accept the
reality of physics that will inevitably lead you to more false positives due to an
unobtainable background characterization. In other words, aim high but be realistic.
anomaly_threshold_update_interval: time (in seconds) between updates to the anomaly
probability thresholds. Since this calculation is expensive relative to the other
computations going on, and because background itself usually does not significantly
change between consecutive measurements, this parameter can be used to relax
computation.
"""
self._event_in_progress = False
self._measurement_duration = None
self._n_measurement_channels = 0
self._post_event_duration = post_event_duration
self._anomaly_threshold_update_interval = anomaly_threshold_update_interval
self._anomaly_threshold_update_counter = 0
self._pre_event_duration = pre_event_duration
self._max_event_duration = max_event_duration
self._tolerable_false_alarms_per_day = tolerable_false_alarms_per_day
self._short_term_buffer = Queue()
self._short_term_duration = short_term_duration
self._long_term_buffer = Queue()
self._long_term_duration = long_term_duration
self._event_buffer = Queue()
self._reset_stats()
# region Properties
@property
def event_in_progress(self):
"""Whether an event is in progress."""
return self._event_in_progress
@property
def short_term_buffer_length(self):
"""Get the number of measurements in the short term buffer (foreground)."""
return self._short_term_max_count
@property
def background_percent_complete(self):
"""Get the percent fullness as a percent (not decimal)."""
if self._long_term_max_count == 0:
return 0
long_term_buffer_percent_full = self._long_term_count / self._long_term_max_count * 100
return long_term_buffer_percent_full
@property
def long_term_sum_norm(self):
"""Get the normalized long term buffer sum (background)."""
return self._long_term_sum_norm
@property
def long_term_duration(self):
"""Get or set the duration (in seconds) of the long term buffer (background)."""
return self._long_term_duration
@long_term_duration.setter
def long_term_duration(self, value):
f_value = float(value)
if f_value <= 0:
raise ValueError("Long term duration must be greater than zero.")
self._long_term_duration = f_value
self.clear_background()
@property
def short_term_duration(self):
"""Get or set the duration (in seconds) of the short term buffer."""
return self._short_term_duration
@short_term_duration.setter
def short_term_duration(self, value):
f_value = float(value)
if f_value <= 0:
raise ValueError("Short term duration must be greater than zero.")
self._short_term_duration = f_value
@property
def pre_event_duration(self):
"""Get or set the pre-event duration, e.g. the allowable time before an event
occurs that is considered a part of the event."""
return self._pre_event_duration
@pre_event_duration.setter
def pre_event_duration(self, value):
f_value = float(value)
if f_value <= 0:
raise ValueError("Pre-event duration must be greater than zero.")
self._pre_event_duration = f_value
@property
def max_event_duration(self):
"""Get or set the maximum event duration."""
return self._max_event_duration
@max_event_duration.setter
def max_event_duration(self, value):
f_value = float(value)
if f_value <= 0:
raise ValueError("Max event duration must be greater than zero.")
self._max_event_duration = f_value
@property
def post_event_duration(self):
"""Get or set the post-event duration, e.g. the allowable time after an event
occurs that is still considered part of the event."""
return self._post_event_duration
@post_event_duration.setter
def post_event_duration(self, value):
f_value = float(value)
if f_value < 0:
raise ValueError("Post event duration must be nonnegative.")
self._post_event_duration = f_value
@property
def tolerable_false_alarms_per_day(self):
"""Get or set the number of tolerable false alarms per day."""
return self._tolerable_false_alarms_per_day
@tolerable_false_alarms_per_day.setter
def tolerable_false_alarms_per_day(self, value):
f_value = float(value)
if f_value <= 0:
raise ValueError("Tolerable false alarms must be greater than zero.")
self._tolerable_false_alarms_per_day = f_value
@property
def limit_update_frequency(self):
"""Get or set the frequency (in Hz) which which to recompute anomaly thresholds.
"""
return self._limit_update_frequency
@limit_update_frequency.setter
def limit_update_frequency(self, value):
f_value = float(value)
if f_value < 0:
raise ValueError("Limit update frequency must be nonnegative.")
self._limit_update_frequency = f_value
# endregion
def _reset_stats(self):
"""Reset the Event Detector stats to a zeroed, initial state."""
self._measurement_duration = None
self._anomalous_count = 0
self._nonanomalous_count_to_trigger_off = 0
self._nonanomalous_count = 0
self._anomaly_threshold_update_counter = 0
self._anomaly_probas = 0
self._anomaly_proba = None
self._anomaly_thresholds = None
self._short_term_buffer_full = False
self._short_term_count = 0
self._short_term_max_count = 0
self._short_term_sum = None
self._long_term_buffer_full = False
self._long_term_count = 0
self._long_term_max_count = 0
self._long_term_sum = None
self._long_term_cps = 0
self._long_term_sum_norm = 0
self._long_term_proba_buffer = Queue()
self._long_term_proba_stats = RunningStats(0, 0, 0)
def clear_background(self):
"""Clear the short-term buffer, long-term buffer, and stats."""
with self._short_term_buffer.mutex:
self._short_term_buffer.queue.clear()
with self._long_term_buffer.mutex:
self._long_term_buffer.queue.clear()
self._reset_stats()
def add_measurement(self, measurement_id: int, measurement: Union[np.array, int],
duration: float, verbose=True):
"""Add the next measurement to the event detector.
Args:
measurement_id: unique identifier for the measurement
measurement: 1D array-like object containing one or more channels of measured data
duration: float representing the length of time over which the measurement was taken
verbose: whether to print log messages
Returns:
if adding a spectrum concludes an event, a tuple is returned containing:
- the channel-wise sum of all spectra that were part of the event
- the channel-wise sum of all spectra in the background buffer,
normalized to match the duration of the event
- the duration (in seconds) of the event
- list of measurement IDs that comprise the event
else if adding a spectrum does not conclude event, return None
"""
if self._measurement_duration != duration:
if verbose:
logging.debug("Measurement duration changed")
self.clear_background()
self._measurement_duration = duration
self._n_measurement_channels = len(measurement)
self._short_term_max_count = self._short_term_duration / duration
self._long_term_max_count = self._long_term_duration / duration
self._nonanomalous_count_to_trigger_off = self._post_event_duration / duration
self._sigma_deviation = \
(self._tolerable_false_alarms_per_day / (24 * 60 * 60 / duration))
self._long_term_proba_stats.N = self._long_term_max_count
# Update short-term buffer
self._short_term_buffer.put((measurement_id, measurement))
self._short_term_buffer_full = self._short_term_count >= self._short_term_max_count
if self._short_term_sum is not None:
self._short_term_sum += np.array(measurement)
else:
self._short_term_sum = np.array(measurement)
if self._short_term_buffer_full:
_, oldest_short_term_measurement = self._short_term_buffer.get()
self._short_term_sum -= oldest_short_term_measurement
else:
self._short_term_count += 1
# Anomaly & event determination
self._long_term_buffer_full = self._long_term_max_count and \
self._long_term_count >= self._long_term_max_count
event_result = None
if self._long_term_buffer_full:
# Check if it's time to update the anomaly thresholds
time_to_update_anomaly_thresholds = not self._anomaly_thresholds or \
self._anomaly_threshold_update_counter >= \
self._anomaly_threshold_update_interval
if time_to_update_anomaly_thresholds:
# It is known that the anomaly probability distribution is more perfectly
# modeled with a skewed normal distribution, however the number of measurements
# required to accurately obtain the skew term is infeasible so we use a
# gaussian for now.
self._anomaly_thresholds = norm.ppf(
self._sigma_deviation,
self._long_term_proba_stats.average,
self._long_term_proba_stats.stddev
)
self._anomaly_threshold_update_counter = 0
else:
self._anomaly_threshold_update_counter += self._measurement_duration
# Check anomaly thresholds
self._anomaly_probas = poisson.logpmf(
self._short_term_sum,
self._long_term_sum_norm
)
self._anomaly_proba = np.sum(self._anomaly_probas)
measurement_is_anomalous = self._anomaly_proba < self._anomaly_thresholds
# Now determine if signficant measurements meet on_time / off_time requirements
if measurement_is_anomalous:
if not self._event_in_progress:
if verbose:
logging.debug(f"Event started @ {measurement_id}")
self._event_in_progress = True
self._nonanomalous_count = 0
# event duration = anomalous count * measurement duration
self._anomalous_count += 1
self._event_buffer.put((measurement_id, measurement))
elif self._event_in_progress:
self._nonanomalous_count += 1
anomaly_gone = self._nonanomalous_count >= self._nonanomalous_count_to_trigger_off
event_duration = self._anomalous_count * self._measurement_duration
event_reached_max_duration = event_duration >= self.max_event_duration
event_over = anomaly_gone or event_reached_max_duration
if event_over:
if verbose:
logging.debug(f"Event ended @ {measurement_id}")
self._event_in_progress = False
# Build the event tuple
event_measurement_ids, event_measurements = zip(
*[self._event_buffer.get() for _ in range(self._event_buffer.qsize())]
)
event_measurement = np.array(event_measurements).sum(axis=0)
self._anomalous_count = 0
long_term_cps = self._long_term_sum / self._long_term_duration
event_bg_measurement = long_term_cps * event_duration
gross_counts = sum(event_measurement)
bg_counts = sum(event_bg_measurement)
fg_counts = gross_counts - bg_counts
snr = fg_counts / bg_counts
is_positive_snr_event = snr > 0
if not is_positive_snr_event and verbose:
logging.debug(f"Event ending @ {measurement_id} had a SNR <= 0 ({snr:.2f})")
event_result = (
event_measurement,
event_bg_measurement,
event_duration,
event_measurement_ids
)
else:
self._event_buffer.put((measurement_id, measurement))
# Update long-term buffer
if not self._event_in_progress:
self._long_term_buffer.put((measurement_id, measurement))
if self._long_term_sum is not None:
self._long_term_sum += np.array(measurement)
else:
self._long_term_sum = np.array(measurement)
if self._long_term_buffer_full:
_, oldest_long_term_measurement = self._long_term_buffer.get()
self._long_term_sum -= oldest_long_term_measurement
else:
self._long_term_count += 1
self._long_term_cps = np.divide(self._long_term_sum,
(self._long_term_count * self._measurement_duration))
self._long_term_sum_norm = (self._long_term_cps * self._short_term_duration).clip(
1 / self._n_measurement_channels
)
# Calculate some stats for thresholding
long_term_anomaly_probas = poisson.logpmf(
self._short_term_sum,
self._long_term_sum_norm
)
long_term_anomaly_proba = np.sum(long_term_anomaly_probas)
# Rolling Welford's algorithm
self._long_term_proba_buffer.put(long_term_anomaly_proba)
oldest_long_term_proba = 0
if self._long_term_buffer_full:
oldest_long_term_proba = self._long_term_proba_buffer.get()
self._long_term_proba_stats.update(long_term_anomaly_proba, oldest_long_term_proba)
return event_result
class RunningStats(object):
"""Helper for calculating stats in a rolling fashion."""
def __init__(self, window_size, average, variance):
"""
Args:
window_size: size of the window across which the probabilities are calculated
average: average within the window
variance: variance within the window
"""
self.N = window_size
self.average = average
self.variance = variance
self.stddev = np.sqrt(variance)
def update(self, new, old):
"""Update the average, variance, and stddev for the RunningStats.
Args:
new: new value to include in window
old: oldest value in window (this must be externally tracked - queue)
"""
oldavg = self.average
newavg = oldavg + (new - old) / self.N
self.average = newavg
self.variance += (new - old) * (new - newavg + old - oldavg) / (self.N - 1)
self.stddev = np.sqrt(self.variance)
Classes
class PoissonNChannelEventDetector (long_term_duration: int = 120, short_term_duration: int = 1, pre_event_duration: int = 5, max_event_duration: float = 120, post_event_duration: int = 1.5, tolerable_false_alarms_per_day: float = 1.0, anomaly_threshold_update_interval: float = 60)-
This class identifies sequences of anomalous spectra.
This algorithm assumes that the background environment will be consistent over known timescales. I.e., this algorithm is only intended for static detector, mobile source scenarios.
Args
long_term_duration- duration (in seconds) of the long-term buffer; this buffer contains background samples only if the correctness of the algorithm and environmental assumptions both hold. A longer duration is often better with the trade-off eventually becoming RAM (to store all of the spectra in the buffer) and waiting time vs. converging on actually seeing the desired number of tolerable false alarms per day.
short_term_duration- duration (in seconds) of the short-term buffer; this buffer can contain background and/or anomalous samples. This is the buffer of "most recent" samples that is compared to the long-term buffer in order to determine if an anomaly is present. The basic question asked by this algorithm is, "Is what I see in my short-term significantly different than what I've seen in my long-term buffer? If so, it's an anomaly so I'll add it to the event buffer. If not, it's background so I'll add it to the long-term buffer." A short-term duration close to the measurement duration is recommended.
pre_event_duration- duration (in seconds) specifying the amount of pre-event, background samples to include in the event result (but not summed into the event spectrum) in order to visualize the run-up to the anomaly.
max_event_duration- maximum duration (in seconds) of the event buffer. If anomalies remain present for more than this specified amount of time, the object will produce an event so that something is reported. A new event will likely then be started on the subsequent measurement if the anomaly is still present.
post_event_duration- duration (in seconds) that determines the number of consecutive, insignificant measurements (i.e., measurements which are not considered anomalies) that must be observed in order to end an event.
tolerable_false_alarms_per_day-
desired maximum number of allowable false positive events per day for all spectrum channels. The actual observed outcome of this parameter primarily depends on the long-term duration (although other parameters such as the short-term duration and limit update frequency also play a part). Long-term duration influences the number of actual false alarms per day in the following ways:
- if the long-term buffer is not long enough to incorporate long-term background fluctuations, such fluctuations can be interpreted as anomalies; and
- if the long-term buffer is too short for the desired false alarm rate, it cannot accurately model the anomaly probability distribution (more specifically, to achieve a very low false alarm rate, this algorithm must witness a sufficiently large number of measurements around the tail of the anomaly probability distribution where the false alarm threshold exists, and measurements in that region do not happen very often, hence you must wait a long time to see enough).
To give an idea of the challenge here, empirical evidence has shown that to accurately achieve 1 false alarm per day requires 6 or more hours of long-term duration for a 1024-channel, 3"x3" NaI detector at a sample interval of 0.25s. The unfortunate reality of this is that you are now dealing with a timescale where the background is likely to have a significantly different mean. So what is one to do? The simple answer is leave this parameter at its default, and then accept the reality of physics that will inevitably lead you to more false positives due to an unobtainable background characterization. In other words, aim high but be realistic.
anomaly_threshold_update_interval- time (in seconds) between updates to the anomaly probability thresholds. Since this calculation is expensive relative to the other computations going on, and because background itself usually does not significantly change between consecutive measurements, this parameter can be used to relax computation.
Expand source code Browse git
class PoissonNChannelEventDetector(): """This class identifies sequences of anomalous spectra. This algorithm assumes that the background environment will be consistent over known timescales. I.e., this algorithm is only intended for static detector, mobile source scenarios. """ def __init__(self, long_term_duration: int = 120, short_term_duration: int = 1, pre_event_duration: int = 5, max_event_duration: float = 120, post_event_duration: int = 1.5, tolerable_false_alarms_per_day: float = 1.0, anomaly_threshold_update_interval: float = 60): """ Args: long_term_duration: duration (in seconds) of the long-term buffer; this buffer contains background samples only if the correctness of the algorithm and environmental assumptions both hold. A longer duration is often better with the trade-off eventually becoming RAM (to store all of the spectra in the buffer) and waiting time vs. converging on actually seeing the desired number of tolerable false alarms per day. short_term_duration: duration (in seconds) of the short-term buffer; this buffer can contain background and/or anomalous samples. This is the buffer of "most recent" samples that is compared to the long-term buffer in order to determine if an anomaly is present. The basic question asked by this algorithm is, "Is what I see in my short-term significantly different than what I've seen in my long-term buffer? If so, it's an anomaly so I'll add it to the event buffer. If not, it's background so I'll add it to the long-term buffer." A short-term duration close to the measurement duration is recommended. pre_event_duration: duration (in seconds) specifying the amount of pre-event, background samples to include in the event result (but not summed into the event spectrum) in order to visualize the run-up to the anomaly. max_event_duration: maximum duration (in seconds) of the event buffer. If anomalies remain present for more than this specified amount of time, the object will produce an event so that something is reported. A new event will likely then be started on the subsequent measurement if the anomaly is still present. post_event_duration: duration (in seconds) that determines the number of consecutive, insignificant measurements (i.e., measurements which are not considered anomalies) that must be observed in order to end an event. tolerable_false_alarms_per_day: desired maximum number of allowable false positive events per day for all spectrum channels. The actual observed outcome of this parameter primarily depends on the long-term duration (although other parameters such as the short-term duration and limit update frequency also play a part). Long-term duration influences the number of actual false alarms per day in the following ways: 1. if the long-term buffer is not long enough to incorporate long-term background fluctuations, such fluctuations can be interpreted as anomalies; and 2. if the long-term buffer is too short for the desired false alarm rate, it cannot accurately model the anomaly probability distribution (more specifically, to achieve a very low false alarm rate, this algorithm must witness a sufficiently large number of measurements around the tail of the anomaly probability distribution where the false alarm threshold exists, and measurements in that region do not happen very often, hence you must wait a long time to see enough). To give an idea of the challenge here, empirical evidence has shown that to accurately achieve 1 false alarm per day requires 6 or more hours of long-term duration for a 1024-channel, 3"x3" NaI detector at a sample interval of 0.25s. The unfortunate reality of this is that you are now dealing with a timescale where the background is likely to have a significantly different mean. So what is one to do? The simple answer is leave this parameter at its default, and then accept the reality of physics that will inevitably lead you to more false positives due to an unobtainable background characterization. In other words, aim high but be realistic. anomaly_threshold_update_interval: time (in seconds) between updates to the anomaly probability thresholds. Since this calculation is expensive relative to the other computations going on, and because background itself usually does not significantly change between consecutive measurements, this parameter can be used to relax computation. """ self._event_in_progress = False self._measurement_duration = None self._n_measurement_channels = 0 self._post_event_duration = post_event_duration self._anomaly_threshold_update_interval = anomaly_threshold_update_interval self._anomaly_threshold_update_counter = 0 self._pre_event_duration = pre_event_duration self._max_event_duration = max_event_duration self._tolerable_false_alarms_per_day = tolerable_false_alarms_per_day self._short_term_buffer = Queue() self._short_term_duration = short_term_duration self._long_term_buffer = Queue() self._long_term_duration = long_term_duration self._event_buffer = Queue() self._reset_stats() # region Properties @property def event_in_progress(self): """Whether an event is in progress.""" return self._event_in_progress @property def short_term_buffer_length(self): """Get the number of measurements in the short term buffer (foreground).""" return self._short_term_max_count @property def background_percent_complete(self): """Get the percent fullness as a percent (not decimal).""" if self._long_term_max_count == 0: return 0 long_term_buffer_percent_full = self._long_term_count / self._long_term_max_count * 100 return long_term_buffer_percent_full @property def long_term_sum_norm(self): """Get the normalized long term buffer sum (background).""" return self._long_term_sum_norm @property def long_term_duration(self): """Get or set the duration (in seconds) of the long term buffer (background).""" return self._long_term_duration @long_term_duration.setter def long_term_duration(self, value): f_value = float(value) if f_value <= 0: raise ValueError("Long term duration must be greater than zero.") self._long_term_duration = f_value self.clear_background() @property def short_term_duration(self): """Get or set the duration (in seconds) of the short term buffer.""" return self._short_term_duration @short_term_duration.setter def short_term_duration(self, value): f_value = float(value) if f_value <= 0: raise ValueError("Short term duration must be greater than zero.") self._short_term_duration = f_value @property def pre_event_duration(self): """Get or set the pre-event duration, e.g. the allowable time before an event occurs that is considered a part of the event.""" return self._pre_event_duration @pre_event_duration.setter def pre_event_duration(self, value): f_value = float(value) if f_value <= 0: raise ValueError("Pre-event duration must be greater than zero.") self._pre_event_duration = f_value @property def max_event_duration(self): """Get or set the maximum event duration.""" return self._max_event_duration @max_event_duration.setter def max_event_duration(self, value): f_value = float(value) if f_value <= 0: raise ValueError("Max event duration must be greater than zero.") self._max_event_duration = f_value @property def post_event_duration(self): """Get or set the post-event duration, e.g. the allowable time after an event occurs that is still considered part of the event.""" return self._post_event_duration @post_event_duration.setter def post_event_duration(self, value): f_value = float(value) if f_value < 0: raise ValueError("Post event duration must be nonnegative.") self._post_event_duration = f_value @property def tolerable_false_alarms_per_day(self): """Get or set the number of tolerable false alarms per day.""" return self._tolerable_false_alarms_per_day @tolerable_false_alarms_per_day.setter def tolerable_false_alarms_per_day(self, value): f_value = float(value) if f_value <= 0: raise ValueError("Tolerable false alarms must be greater than zero.") self._tolerable_false_alarms_per_day = f_value @property def limit_update_frequency(self): """Get or set the frequency (in Hz) which which to recompute anomaly thresholds. """ return self._limit_update_frequency @limit_update_frequency.setter def limit_update_frequency(self, value): f_value = float(value) if f_value < 0: raise ValueError("Limit update frequency must be nonnegative.") self._limit_update_frequency = f_value # endregion def _reset_stats(self): """Reset the Event Detector stats to a zeroed, initial state.""" self._measurement_duration = None self._anomalous_count = 0 self._nonanomalous_count_to_trigger_off = 0 self._nonanomalous_count = 0 self._anomaly_threshold_update_counter = 0 self._anomaly_probas = 0 self._anomaly_proba = None self._anomaly_thresholds = None self._short_term_buffer_full = False self._short_term_count = 0 self._short_term_max_count = 0 self._short_term_sum = None self._long_term_buffer_full = False self._long_term_count = 0 self._long_term_max_count = 0 self._long_term_sum = None self._long_term_cps = 0 self._long_term_sum_norm = 0 self._long_term_proba_buffer = Queue() self._long_term_proba_stats = RunningStats(0, 0, 0) def clear_background(self): """Clear the short-term buffer, long-term buffer, and stats.""" with self._short_term_buffer.mutex: self._short_term_buffer.queue.clear() with self._long_term_buffer.mutex: self._long_term_buffer.queue.clear() self._reset_stats() def add_measurement(self, measurement_id: int, measurement: Union[np.array, int], duration: float, verbose=True): """Add the next measurement to the event detector. Args: measurement_id: unique identifier for the measurement measurement: 1D array-like object containing one or more channels of measured data duration: float representing the length of time over which the measurement was taken verbose: whether to print log messages Returns: if adding a spectrum concludes an event, a tuple is returned containing: - the channel-wise sum of all spectra that were part of the event - the channel-wise sum of all spectra in the background buffer, normalized to match the duration of the event - the duration (in seconds) of the event - list of measurement IDs that comprise the event else if adding a spectrum does not conclude event, return None """ if self._measurement_duration != duration: if verbose: logging.debug("Measurement duration changed") self.clear_background() self._measurement_duration = duration self._n_measurement_channels = len(measurement) self._short_term_max_count = self._short_term_duration / duration self._long_term_max_count = self._long_term_duration / duration self._nonanomalous_count_to_trigger_off = self._post_event_duration / duration self._sigma_deviation = \ (self._tolerable_false_alarms_per_day / (24 * 60 * 60 / duration)) self._long_term_proba_stats.N = self._long_term_max_count # Update short-term buffer self._short_term_buffer.put((measurement_id, measurement)) self._short_term_buffer_full = self._short_term_count >= self._short_term_max_count if self._short_term_sum is not None: self._short_term_sum += np.array(measurement) else: self._short_term_sum = np.array(measurement) if self._short_term_buffer_full: _, oldest_short_term_measurement = self._short_term_buffer.get() self._short_term_sum -= oldest_short_term_measurement else: self._short_term_count += 1 # Anomaly & event determination self._long_term_buffer_full = self._long_term_max_count and \ self._long_term_count >= self._long_term_max_count event_result = None if self._long_term_buffer_full: # Check if it's time to update the anomaly thresholds time_to_update_anomaly_thresholds = not self._anomaly_thresholds or \ self._anomaly_threshold_update_counter >= \ self._anomaly_threshold_update_interval if time_to_update_anomaly_thresholds: # It is known that the anomaly probability distribution is more perfectly # modeled with a skewed normal distribution, however the number of measurements # required to accurately obtain the skew term is infeasible so we use a # gaussian for now. self._anomaly_thresholds = norm.ppf( self._sigma_deviation, self._long_term_proba_stats.average, self._long_term_proba_stats.stddev ) self._anomaly_threshold_update_counter = 0 else: self._anomaly_threshold_update_counter += self._measurement_duration # Check anomaly thresholds self._anomaly_probas = poisson.logpmf( self._short_term_sum, self._long_term_sum_norm ) self._anomaly_proba = np.sum(self._anomaly_probas) measurement_is_anomalous = self._anomaly_proba < self._anomaly_thresholds # Now determine if signficant measurements meet on_time / off_time requirements if measurement_is_anomalous: if not self._event_in_progress: if verbose: logging.debug(f"Event started @ {measurement_id}") self._event_in_progress = True self._nonanomalous_count = 0 # event duration = anomalous count * measurement duration self._anomalous_count += 1 self._event_buffer.put((measurement_id, measurement)) elif self._event_in_progress: self._nonanomalous_count += 1 anomaly_gone = self._nonanomalous_count >= self._nonanomalous_count_to_trigger_off event_duration = self._anomalous_count * self._measurement_duration event_reached_max_duration = event_duration >= self.max_event_duration event_over = anomaly_gone or event_reached_max_duration if event_over: if verbose: logging.debug(f"Event ended @ {measurement_id}") self._event_in_progress = False # Build the event tuple event_measurement_ids, event_measurements = zip( *[self._event_buffer.get() for _ in range(self._event_buffer.qsize())] ) event_measurement = np.array(event_measurements).sum(axis=0) self._anomalous_count = 0 long_term_cps = self._long_term_sum / self._long_term_duration event_bg_measurement = long_term_cps * event_duration gross_counts = sum(event_measurement) bg_counts = sum(event_bg_measurement) fg_counts = gross_counts - bg_counts snr = fg_counts / bg_counts is_positive_snr_event = snr > 0 if not is_positive_snr_event and verbose: logging.debug(f"Event ending @ {measurement_id} had a SNR <= 0 ({snr:.2f})") event_result = ( event_measurement, event_bg_measurement, event_duration, event_measurement_ids ) else: self._event_buffer.put((measurement_id, measurement)) # Update long-term buffer if not self._event_in_progress: self._long_term_buffer.put((measurement_id, measurement)) if self._long_term_sum is not None: self._long_term_sum += np.array(measurement) else: self._long_term_sum = np.array(measurement) if self._long_term_buffer_full: _, oldest_long_term_measurement = self._long_term_buffer.get() self._long_term_sum -= oldest_long_term_measurement else: self._long_term_count += 1 self._long_term_cps = np.divide(self._long_term_sum, (self._long_term_count * self._measurement_duration)) self._long_term_sum_norm = (self._long_term_cps * self._short_term_duration).clip( 1 / self._n_measurement_channels ) # Calculate some stats for thresholding long_term_anomaly_probas = poisson.logpmf( self._short_term_sum, self._long_term_sum_norm ) long_term_anomaly_proba = np.sum(long_term_anomaly_probas) # Rolling Welford's algorithm self._long_term_proba_buffer.put(long_term_anomaly_proba) oldest_long_term_proba = 0 if self._long_term_buffer_full: oldest_long_term_proba = self._long_term_proba_buffer.get() self._long_term_proba_stats.update(long_term_anomaly_proba, oldest_long_term_proba) return event_resultInstance variables
var background_percent_complete-
Get the percent fullness as a percent (not decimal).
Expand source code Browse git
@property def background_percent_complete(self): """Get the percent fullness as a percent (not decimal).""" if self._long_term_max_count == 0: return 0 long_term_buffer_percent_full = self._long_term_count / self._long_term_max_count * 100 return long_term_buffer_percent_full var event_in_progress-
Whether an event is in progress.
Expand source code Browse git
@property def event_in_progress(self): """Whether an event is in progress.""" return self._event_in_progress var limit_update_frequency-
Get or set the frequency (in Hz) which which to recompute anomaly thresholds.
Expand source code Browse git
@property def limit_update_frequency(self): """Get or set the frequency (in Hz) which which to recompute anomaly thresholds. """ return self._limit_update_frequency var long_term_duration-
Get or set the duration (in seconds) of the long term buffer (background).
Expand source code Browse git
@property def long_term_duration(self): """Get or set the duration (in seconds) of the long term buffer (background).""" return self._long_term_duration var long_term_sum_norm-
Get the normalized long term buffer sum (background).
Expand source code Browse git
@property def long_term_sum_norm(self): """Get the normalized long term buffer sum (background).""" return self._long_term_sum_norm var max_event_duration-
Get or set the maximum event duration.
Expand source code Browse git
@property def max_event_duration(self): """Get or set the maximum event duration.""" return self._max_event_duration var post_event_duration-
Get or set the post-event duration, e.g. the allowable time after an event occurs that is still considered part of the event.
Expand source code Browse git
@property def post_event_duration(self): """Get or set the post-event duration, e.g. the allowable time after an event occurs that is still considered part of the event.""" return self._post_event_duration var pre_event_duration-
Get or set the pre-event duration, e.g. the allowable time before an event occurs that is considered a part of the event.
Expand source code Browse git
@property def pre_event_duration(self): """Get or set the pre-event duration, e.g. the allowable time before an event occurs that is considered a part of the event.""" return self._pre_event_duration var short_term_buffer_length-
Get the number of measurements in the short term buffer (foreground).
Expand source code Browse git
@property def short_term_buffer_length(self): """Get the number of measurements in the short term buffer (foreground).""" return self._short_term_max_count var short_term_duration-
Get or set the duration (in seconds) of the short term buffer.
Expand source code Browse git
@property def short_term_duration(self): """Get or set the duration (in seconds) of the short term buffer.""" return self._short_term_duration var tolerable_false_alarms_per_day-
Get or set the number of tolerable false alarms per day.
Expand source code Browse git
@property def tolerable_false_alarms_per_day(self): """Get or set the number of tolerable false alarms per day.""" return self._tolerable_false_alarms_per_day
Methods
def add_measurement(self, measurement_id: int, measurement: Union[, int], duration: float, verbose=True) -
Add the next measurement to the event detector.
Args
measurement_id- unique identifier for the measurement
measurement- 1D array-like object containing one or more channels of measured data
duration- float representing the length of time over which the measurement was taken
verbose- whether to print log messages
Returns
if adding a spectrum concludes an event, a tuple is returned containing: - the channel-wise sum of all spectra that were part of the event - the channel-wise sum of all spectra in the background buffer, normalized to match the duration of the event - the duration (in seconds) of the event - list of measurement IDs that comprise the event else if adding a spectrum does not conclude event, return None
Expand source code Browse git
def add_measurement(self, measurement_id: int, measurement: Union[np.array, int], duration: float, verbose=True): """Add the next measurement to the event detector. Args: measurement_id: unique identifier for the measurement measurement: 1D array-like object containing one or more channels of measured data duration: float representing the length of time over which the measurement was taken verbose: whether to print log messages Returns: if adding a spectrum concludes an event, a tuple is returned containing: - the channel-wise sum of all spectra that were part of the event - the channel-wise sum of all spectra in the background buffer, normalized to match the duration of the event - the duration (in seconds) of the event - list of measurement IDs that comprise the event else if adding a spectrum does not conclude event, return None """ if self._measurement_duration != duration: if verbose: logging.debug("Measurement duration changed") self.clear_background() self._measurement_duration = duration self._n_measurement_channels = len(measurement) self._short_term_max_count = self._short_term_duration / duration self._long_term_max_count = self._long_term_duration / duration self._nonanomalous_count_to_trigger_off = self._post_event_duration / duration self._sigma_deviation = \ (self._tolerable_false_alarms_per_day / (24 * 60 * 60 / duration)) self._long_term_proba_stats.N = self._long_term_max_count # Update short-term buffer self._short_term_buffer.put((measurement_id, measurement)) self._short_term_buffer_full = self._short_term_count >= self._short_term_max_count if self._short_term_sum is not None: self._short_term_sum += np.array(measurement) else: self._short_term_sum = np.array(measurement) if self._short_term_buffer_full: _, oldest_short_term_measurement = self._short_term_buffer.get() self._short_term_sum -= oldest_short_term_measurement else: self._short_term_count += 1 # Anomaly & event determination self._long_term_buffer_full = self._long_term_max_count and \ self._long_term_count >= self._long_term_max_count event_result = None if self._long_term_buffer_full: # Check if it's time to update the anomaly thresholds time_to_update_anomaly_thresholds = not self._anomaly_thresholds or \ self._anomaly_threshold_update_counter >= \ self._anomaly_threshold_update_interval if time_to_update_anomaly_thresholds: # It is known that the anomaly probability distribution is more perfectly # modeled with a skewed normal distribution, however the number of measurements # required to accurately obtain the skew term is infeasible so we use a # gaussian for now. self._anomaly_thresholds = norm.ppf( self._sigma_deviation, self._long_term_proba_stats.average, self._long_term_proba_stats.stddev ) self._anomaly_threshold_update_counter = 0 else: self._anomaly_threshold_update_counter += self._measurement_duration # Check anomaly thresholds self._anomaly_probas = poisson.logpmf( self._short_term_sum, self._long_term_sum_norm ) self._anomaly_proba = np.sum(self._anomaly_probas) measurement_is_anomalous = self._anomaly_proba < self._anomaly_thresholds # Now determine if signficant measurements meet on_time / off_time requirements if measurement_is_anomalous: if not self._event_in_progress: if verbose: logging.debug(f"Event started @ {measurement_id}") self._event_in_progress = True self._nonanomalous_count = 0 # event duration = anomalous count * measurement duration self._anomalous_count += 1 self._event_buffer.put((measurement_id, measurement)) elif self._event_in_progress: self._nonanomalous_count += 1 anomaly_gone = self._nonanomalous_count >= self._nonanomalous_count_to_trigger_off event_duration = self._anomalous_count * self._measurement_duration event_reached_max_duration = event_duration >= self.max_event_duration event_over = anomaly_gone or event_reached_max_duration if event_over: if verbose: logging.debug(f"Event ended @ {measurement_id}") self._event_in_progress = False # Build the event tuple event_measurement_ids, event_measurements = zip( *[self._event_buffer.get() for _ in range(self._event_buffer.qsize())] ) event_measurement = np.array(event_measurements).sum(axis=0) self._anomalous_count = 0 long_term_cps = self._long_term_sum / self._long_term_duration event_bg_measurement = long_term_cps * event_duration gross_counts = sum(event_measurement) bg_counts = sum(event_bg_measurement) fg_counts = gross_counts - bg_counts snr = fg_counts / bg_counts is_positive_snr_event = snr > 0 if not is_positive_snr_event and verbose: logging.debug(f"Event ending @ {measurement_id} had a SNR <= 0 ({snr:.2f})") event_result = ( event_measurement, event_bg_measurement, event_duration, event_measurement_ids ) else: self._event_buffer.put((measurement_id, measurement)) # Update long-term buffer if not self._event_in_progress: self._long_term_buffer.put((measurement_id, measurement)) if self._long_term_sum is not None: self._long_term_sum += np.array(measurement) else: self._long_term_sum = np.array(measurement) if self._long_term_buffer_full: _, oldest_long_term_measurement = self._long_term_buffer.get() self._long_term_sum -= oldest_long_term_measurement else: self._long_term_count += 1 self._long_term_cps = np.divide(self._long_term_sum, (self._long_term_count * self._measurement_duration)) self._long_term_sum_norm = (self._long_term_cps * self._short_term_duration).clip( 1 / self._n_measurement_channels ) # Calculate some stats for thresholding long_term_anomaly_probas = poisson.logpmf( self._short_term_sum, self._long_term_sum_norm ) long_term_anomaly_proba = np.sum(long_term_anomaly_probas) # Rolling Welford's algorithm self._long_term_proba_buffer.put(long_term_anomaly_proba) oldest_long_term_proba = 0 if self._long_term_buffer_full: oldest_long_term_proba = self._long_term_proba_buffer.get() self._long_term_proba_stats.update(long_term_anomaly_proba, oldest_long_term_proba) return event_result def clear_background(self)-
Clear the short-term buffer, long-term buffer, and stats.
Expand source code Browse git
def clear_background(self): """Clear the short-term buffer, long-term buffer, and stats.""" with self._short_term_buffer.mutex: self._short_term_buffer.queue.clear() with self._long_term_buffer.mutex: self._long_term_buffer.queue.clear() self._reset_stats()
class RunningStats (window_size, average, variance)-
Helper for calculating stats in a rolling fashion.
Args
window_size- size of the window across which the probabilities are calculated
average- average within the window
variance- variance within the window
Expand source code Browse git
class RunningStats(object): """Helper for calculating stats in a rolling fashion.""" def __init__(self, window_size, average, variance): """ Args: window_size: size of the window across which the probabilities are calculated average: average within the window variance: variance within the window """ self.N = window_size self.average = average self.variance = variance self.stddev = np.sqrt(variance) def update(self, new, old): """Update the average, variance, and stddev for the RunningStats. Args: new: new value to include in window old: oldest value in window (this must be externally tracked - queue) """ oldavg = self.average newavg = oldavg + (new - old) / self.N self.average = newavg self.variance += (new - old) * (new - newavg + old - oldavg) / (self.N - 1) self.stddev = np.sqrt(self.variance)Methods
def update(self, new, old)-
Update the average, variance, and stddev for the RunningStats.
Args
new- new value to include in window
old- oldest value in window (this must be externally tracked - queue)
Expand source code Browse git
def update(self, new, old): """Update the average, variance, and stddev for the RunningStats. Args: new: new value to include in window old: oldest value in window (this must be externally tracked - queue) """ oldavg = self.average newavg = oldavg + (new - old) / self.N self.average = newavg self.variance += (new - old) * (new - newavg + old - oldavg) / (self.N - 1) self.stddev = np.sqrt(self.variance)