Coverage for  / opt / hostedtoolcache / Python / 3.11.14 / x64 / lib / python3.11 / site-packages / rattlesnake / components / data_collector.py: 79%

356 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-02-27 18:22 +0000

1# -*- coding: utf-8 -*- 

2""" 

3A general subprocess that collects data, splits it into measurement frames, then 

4sends it to the environment. 

5 

6Rattlesnake Vibration Control Software 

7Copyright (C) 2021 National Technology & Engineering Solutions of Sandia, LLC 

8(NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S. 

9Government retains certain rights in this software. 

10 

11This program is free software: you can redistribute it and/or modify 

12it under the terms of the GNU General Public License as published by 

13the Free Software Foundation, either version 3 of the License, or 

14(at your option) any later version. 

15 

16This program is distributed in the hope that it will be useful, 

17but WITHOUT ANY WARRANTY; without even the implied warranty of 

18MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19GNU General Public License for more details. 

20 

21You should have received a copy of the GNU General Public License 

22along with this program. If not, see <https://www.gnu.org/licenses/>. 

23""" 

24 

25import copy 

26import multiprocessing as mp 

27from enum import Enum 

28from time import sleep 

29from typing import List 

30 

31import numpy as np 

32import scipy.signal as sig 

33from scipy.fft import rfft 

34 

35from .abstract_message_process import AbstractMessageProcess 

36from .utilities import VerboseMessageQueue, flush_queue, load_python_module, rms_time 

37 

38DEBUG = False 

39 

40if DEBUG: 

41 import pickle 

42 

43 

44class FrameBuffer: 

45 """A class that stores most recently acquired data in a buffer to facilitate overlapping, 

46 triggering, and spectral processing""" 

47 

48 def __init__( 

49 self, 

50 num_channels, 

51 trigger_index, 

52 pretrigger, 

53 positive_slope, 

54 trigger_level, 

55 hysteresis_level, 

56 hysteresis_samples, 

57 samples_per_frame, 

58 maximum_overlap, 

59 manual_accept, 

60 trigger_enabled, 

61 trigger_only_first, 

62 wait_samples, 

63 dtype="float64", 

64 starting_value=np.nan, 

65 buffer_size_frame_multiplier=2, 

66 ): 

67 """Initializes a framebuffer object 

68 

69 Parameters 

70 ---------- 

71 num_channels : int 

72 The number of signals in the buffer 

73 trigger_index : int 

74 The signal index to use for a trigger. Only used if trigger_enabled is True 

75 pretrigger : float 

76 The fraction of the frame sized used for a pretrigger 

77 positive_slope : bool 

78 If True, the trigger is detected with a positive slow. If False, a negative slope. 

79 trigger_level : float 

80 The value of the signal required to activate the trigger 

81 hysteresis_level : float 

82 The value of the signal required to reset the trigger 

83 hysteresis_samples : int 

84 The number of samples required to satisfy the hysteresis level to reset the trigger 

85 samples_per_frame : int 

86 The number of samples per measurement frame 

87 maximum_overlap : float 

88 The fraction of the frame overlapping with the next frame 

89 manual_accept : bool 

90 If True, wait for an acceptance before returning data 

91 trigger_enabled : bool 

92 If True, data will only be returned after a trigger. If False, all data will be 

93 returned in a "free run" 

94 trigger_only_first : bool 

95 If True, only the first frame requires a trigger, and all remaining frames will be 

96 "free run" 

97 wait_samples : int 

98 The number of samples to wait before returning a frame; for example, to wait until a 

99 system is at steady state 

100 dtype : str, optional 

101 A dtype designator in string format for the type of data in the buffer, by default 

102 "float64" 

103 starting_value : float, optional 

104 Initial values in the buffer, by default np.nan 

105 buffer_size_frame_multiplier : int, optional 

106 Buffer size as specified by a multiplier on the frame size, by default 2 

107 """ 

108 self.samples_per_frame = samples_per_frame 

109 self.trigger_index = trigger_index 

110 self.pretrigger_samples = int(pretrigger * samples_per_frame) if trigger_enabled else 0 

111 self.positive_slope = positive_slope 

112 self.trigger_level = trigger_level 

113 self.hysteresis_level = hysteresis_level 

114 self.hysteresis_samples = hysteresis_samples 

115 self.samples_per_frame = samples_per_frame 

116 self.overlap_samples = samples_per_frame - int(maximum_overlap * samples_per_frame) 

117 self.manual_accept = manual_accept 

118 self.waiting_for_accept = False 

119 self._buffer = starting_value * np.ones( 

120 ( 

121 num_channels, 

122 int(np.ceil(buffer_size_frame_multiplier * samples_per_frame)), 

123 ), 

124 dtype=dtype, 

125 ) 

126 self.buffer_size_frame_multiplier = buffer_size_frame_multiplier 

127 self.wait_samples = wait_samples 

128 self.last_trigger = self.overlap_samples - self.wait_samples 

129 self.last_reset = self.overlap_samples + 1 - self.wait_samples 

130 self.trigger_enabled = trigger_enabled 

131 self.trigger_only_first = trigger_only_first 

132 self.first_trigger = True 

133 

134 @property 

135 def buffer_data(self): 

136 """Gets the data currently in the buffer""" 

137 return self._buffer 

138 

139 def add_data(self, data): 

140 """Adds the provided data to the buffer 

141 

142 Parameters 

143 ---------- 

144 data : np.ndarray 

145 Data to add to the buffer 

146 """ 

147 data = np.array(data) 

148 self.last_trigger += data.shape[-1] 

149 self.last_reset += data.shape[-1] 

150 # Make sure the data will fit into the buffer 

151 data = data[..., -self.buffer_data.shape[-1] :] 

152 # Figure out how much we need to roll the buffer 

153 self.buffer_data[:] = np.concatenate( 

154 (self.buffer_data[..., data.shape[-1] :], data), axis=-1 

155 ) 

156 

157 def find_triggers(self): 

158 """Goes through the buffer and finds triggers to denote a measurement frame""" 

159 # print('Finding Triggers, first trigger {:}'.format(self.first_trigger)) 

160 if self.manual_accept and self.waiting_for_accept: 

161 # print('Waiting for Accept') 

162 return [] 

163 if self.trigger_enabled and ( 

164 (self.trigger_only_first and self.first_trigger) or not self.trigger_only_first 

165 ): 

166 # print('Getting trigger based on signal') 

167 trigger_data = self.buffer_data[ 

168 self.trigger_index, 

169 self.pretrigger_samples : self.samples_per_frame + self.pretrigger_samples, 

170 ] 

171 if self.positive_slope: 

172 indices = (trigger_data[:-1] < self.trigger_level) & ( 

173 trigger_data[1:] > self.trigger_level 

174 ) 

175 reset_indices = trigger_data < self.hysteresis_level 

176 else: 

177 indices = (trigger_data[:-1] > self.trigger_level) & ( 

178 trigger_data[1:] < self.trigger_level 

179 ) 

180 reset_indices = trigger_data > self.hysteresis_level 

181 if self.hysteresis_samples > 1: 

182 zeros = ~reset_indices 

183 iszero = np.concatenate(([0], np.equal(zeros, 0).view(np.int8), [0])) 

184 absdiff = np.abs(np.diff(iszero)) 

185 ranges = np.where(absdiff == 1)[0].reshape(-1, 2) 

186 reset_indices = np.array( 

187 [r[-1] - 1 for r in ranges if r[1] - r[0] > self.hysteresis_samples - 2] 

188 ) 

189 triggers = list( 

190 self.buffer_size_frame_multiplier * self.samples_per_frame 

191 - (np.where(indices)[0] + 1 + self.pretrigger_samples) 

192 ) 

193 resets = np.concatenate( 

194 [ 

195 [self.last_reset], 

196 self.buffer_size_frame_multiplier * self.samples_per_frame 

197 - reset_indices 

198 - self.pretrigger_samples, 

199 ] 

200 ) 

201 

202 final_triggers = [] 

203 

204 while len(triggers) > 0: 

205 trigger = triggers.pop(0) 

206 # Check to see if the trigger is far enough away from the last one 

207 if self.last_trigger - trigger < self.overlap_samples: 

208 continue 

209 # Check to see if there has been a reset since the last trigger 

210 if not np.any((resets < self.last_trigger) & (resets > trigger)): 

211 continue 

212 if self.trigger_only_first and not self.first_trigger: 

213 continue 

214 final_triggers.append(trigger) 

215 self.first_trigger = False 

216 self.last_trigger = trigger 

217 

218 self.last_reset = resets.min() 

219 

220 if self.manual_accept and len(final_triggers) > 0: 

221 self.last_trigger = self.overlap_samples 

222 self.last_reset = self.overlap_samples - 1 

223 self.waiting_for_accept = True 

224 return [final_triggers[0]] 

225 else: 

226 return final_triggers 

227 else: 

228 # Get the next triggers that are in the data 

229 # print('Getting trigger based on spacing') 

230 last_trigger_rectified = ( 

231 self.buffer_size_frame_multiplier * self.samples_per_frame - self.last_trigger 

232 ) 

233 triggers_available = int( 

234 (self.samples_per_frame - last_trigger_rectified) / self.overlap_samples 

235 ) 

236 final_triggers = [ 

237 self.last_trigger - (i + 1) * self.overlap_samples 

238 for i in range(triggers_available) 

239 ] 

240 if len(final_triggers) > 0: 

241 self.last_trigger = final_triggers[-1] 

242 return final_triggers 

243 

244 def reset_trigger(self): 

245 """Resets the last trigger in the buffer""" 

246 self.last_trigger = self.overlap_samples - self.wait_samples 

247 self.last_reset = self.overlap_samples - 1 - self.wait_samples 

248 

249 def accept(self): 

250 """Manually accept the last frame""" 

251 self.last_trigger = self.overlap_samples 

252 self.last_reset = self.overlap_samples - 1 

253 self.waiting_for_accept = False 

254 

255 def add_data_get_frame(self, data): 

256 """Add data and get the next measurement frame""" 

257 self.add_data(data) 

258 # print('Last Trigger: {:}'.format(self.last_trigger)) 

259 triggers = self.find_triggers() 

260 # print('Triggers: {:}'.format(triggers)) 

261 frame_indices = ( 

262 self.buffer_size_frame_multiplier * self.samples_per_frame 

263 - np.array(triggers)[:, np.newaxis] 

264 + np.arange(self.samples_per_frame) 

265 - self.pretrigger_samples 

266 ).astype(int) 

267 return np.moveaxis(self.buffer_data[:, frame_indices], 1, 0) 

268 

269 def __getitem__(self, key): 

270 return self._buffer[key] 

271 

272 def __setitem__(self, key, val): 

273 self._buffer[key] = val 

274 

275 

276class KurtosisBuffer: 

277 """A buffer that computes a running kurtosis value""" 

278 

279 def __init__(self, n_channels: int, averages: int = 100) -> None: 

280 # this will keep track of our place in the buffers (alternative to using np.roll, this is 

281 # more efficient since we don't actually care what order the buffer is in) 

282 self.idx = 0 

283 self.averages = averages # number of frames to keep for kurtosis calculation 

284 self.g0 = np.zeros((n_channels, averages)) # number of samples per frame 

285 self.g1 = np.zeros((n_channels, averages)) # sum of samples per frame 

286 self.g2 = np.zeros( 

287 (n_channels, averages) 

288 ) # sum of (second moments * samples/frame) per frame 

289 self.g3 = np.zeros( 

290 (n_channels, averages) 

291 ) # sum of (third moments * samples/frame) per frame 

292 self.g4 = np.zeros( 

293 (n_channels, averages) 

294 ) # sum of (fourth moments * samples/frame) per frame 

295 

296 def clear(self) -> None: 

297 """Clears the kurtosis buffer""" 

298 self.idx = 0 

299 self.g0[:] = 0.0 

300 self.g1[:] = 0.0 

301 self.g2[:] = 0.0 

302 self.g3[:] = 0.0 

303 self.g4[:] = 0.0 

304 

305 def add_data(self, arr: np.ndarray, axis=-1) -> None: 

306 """Adds data to the kurtosis buffer 

307 

308 Choi M, Sweetman B. Efficient Calculation of Statistical Moments for Structural Health 

309 Monitoring. Structural Health Monitoring. 2009;9(1):13-24. doi:10.1177/1475921709341014 

310 (https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance) 

311 

312 Implements a method of moments approach used for kurtosis calculation. Gamma values 

313 are calculated using raw moments and sample length. These gamma values can be combined 

314 additively as new data appears, at which point the raw moments can be backed out. Using 

315 known relationships between raw moments and central moments, we can calculate kurtosis. 

316 

317 Parameters 

318 ---------- 

319 arr : np.ndarray 

320 A numpy array containing the data to add 

321 axis : int, optional 

322 The axis across which the buffer will be computed, by default -1 

323 """ 

324 

325 # Calculate gamma values of new data (raw moment * sample length, equivalent to sum of 

326 # moments if time delta is constant) 

327 self.g0[:, self.idx] = arr.shape[ 

328 axis 

329 ] # gamma_0 is taken to be number of points (assuming constant time delta) 

330 self.g1[:, self.idx] = np.sum(arr, axis=axis) 

331 self.g2[:, self.idx] = np.sum(arr**2, axis=axis) 

332 self.g3[:, self.idx] = np.sum(arr**3, axis=axis) 

333 self.g4[:, self.idx] = np.sum(arr**4, axis=axis) 

334 

335 # increment our index (wrap around if buffer is full) 

336 self.idx = (self.idx + 1) % self.averages 

337 

338 def get_kurtosis(self, fisher=False) -> None: 

339 """Gets the current kurtosis values 

340 

341 Parameters 

342 ---------- 

343 fisher : bool, optional 

344 If True, the fisher kurtosis (excess kurtosis) is presented. If False, the regular 

345 kurtosis is presented (e.g. 3 for a normal distribution), by default False 

346 

347 Returns 

348 ------- 

349 kurtosis values for each channel 

350 """ 

351 # sum the gamma values that are in the buffer 

352 g0 = np.sum(self.g0, axis=-1) 

353 g1 = np.sum(self.g1, axis=-1) 

354 g2 = np.sum(self.g2, axis=-1) 

355 g3 = np.sum(self.g3, axis=-1) 

356 g4 = np.sum(self.g4, axis=-1) 

357 

358 # back out raw moments from gamma values 

359 m1 = g1 / g0 

360 m2 = g2 / g0 

361 m3 = g3 / g0 

362 m4 = g4 / g0 

363 

364 # compute central moments from raw moments 

365 c2 = m2 - (m1**2) 

366 # c3 = m3 - (3*m1*m2) + (2*(m1**3)) # not needed for kurtosis 

367 c4 = m4 - (4 * m1 * m3) + (6 * (m1**2) * m2) - (3 * (m1**4)) 

368 

369 # compute kurtosis 

370 k = c4 / (c2**2) 

371 return k - 3 if fisher else k 

372 

373 

374class DataCollectorCommands(Enum): 

375 """Commands that the Random Data Collector Process can accept""" 

376 

377 INITIALIZE_COLLECTOR = 1 

378 FORCE_INITIALIZE_COLLECTOR = 2 

379 ACQUIRE = 3 

380 STOP = 4 

381 ACCEPT = 5 

382 SET_TEST_LEVEL = 6 

383 ACCEPTED = 7 

384 SHUTDOWN_ACHIEVED = 8 

385 CLEAR_KURTOSIS_BUFFER = 9 

386 

387 

388class AcquisitionType(Enum): 

389 """Enumeration of different triggering strategies""" 

390 

391 FREE_RUN = 0 

392 TRIGGER_EVERY_FRAME = 1 

393 TRIGGER_FIRST_FRAME = 2 

394 

395 

396class Acceptance(Enum): 

397 """Enumeration of different acceptance strategies""" 

398 

399 MANUAL = 0 

400 AUTOMATIC = 1 

401 

402 

403class TriggerSlope(Enum): 

404 """Enumeration of valid trigger slopes""" 

405 

406 POSITIVE = 0 

407 NEGATIVE = 1 

408 

409 

410class Window(Enum): 

411 """Enumeration of valid window functions""" 

412 

413 RECTANGLE = 0 

414 HANN = 1 

415 HAMMING = 2 

416 FLATTOP = 3 

417 TUKEY = 4 

418 BLACKMANHARRIS = 5 

419 EXPONENTIAL = 6 

420 EXPONENTIAL_FORCE = 7 

421 

422 

423class CollectorMetadata: 

424 """Metadata associated with the data collector""" 

425 

426 def __init__( 

427 self, 

428 num_channels, 

429 response_channel_indices, 

430 reference_channel_indices, 

431 acquisition_type, 

432 acceptance, 

433 acceptance_function, 

434 overlap_fraction, 

435 trigger_channel_index, 

436 trigger_slope, 

437 trigger_level, 

438 trigger_hysteresis, 

439 trigger_hysteresis_samples, 

440 pretrigger_fraction, 

441 frame_size, 

442 window, 

443 window_parameter_1=0, 

444 window_parameter_2=0, 

445 window_parameter_3=0, 

446 wait_samples=0, 

447 kurtosis_buffer_length=None, 

448 response_transformation_matrix=None, 

449 reference_transformation_matrix=None, 

450 ): 

451 """Initializes data collector metadata 

452 

453 Parameters 

454 ---------- 

455 num_channels : int 

456 The number of channels in the data acquisition 

457 response_channel_indices : np.ndarray 

458 Indices associated with control or response channels 

459 reference_channel_indices : np.ndarray 

460 Indices associated with drive or reference channels 

461 acquisition_type : AcquisitionType 

462 The type of acquisition used by the collector 

463 acceptance : AcceptanceType 

464 The type of frame acceptance used by the collector 

465 acceptance_function : tuple 

466 A tuple containing a path to a Python module and a function name in that module 

467 that is used to automatically determine if a frame should be accepted 

468 overlap_fraction : float 

469 The fraction of the frame to overlap in the data collection 

470 trigger_channel_index : int 

471 The index of the channel used as the trigger 

472 trigger_slope : TriggerSlope 

473 The slope of the trigger 

474 trigger_level : float 

475 The trigger level 

476 trigger_hysteresis : float 

477 The level below which the trigger must return before another trigger can be obtained 

478 trigger_hysteresis_samples : int 

479 The number of samples the trigger must be below the hysteresis level before another 

480 trigger can be obtained 

481 pretrigger_fraction : float 

482 The fraction of the frame used as a pretrigger 

483 frame_size : int 

484 The number of samples in a measurement frame 

485 window : Window 

486 The window function used by the collector 

487 window_parameter_1 : float, optional 

488 Optional parameters required by the window function, by default 0 

489 window_parameter_2 : float, optional 

490 Optional parameters required by the window function, by default 0 

491 window_parameter_3 : float, optional 

492 Optional parameters required by the window function, by default 0 

493 wait_samples : int, optional 

494 Number of samples to wait before returning a frame, by default 0 

495 kurtosis_buffer_length : int, optional 

496 The number of samples in the running kurtosis calculation. 

497 response_transformation_matrix : np.ndarray, optional 

498 A transformation applied to the response channels, by default None 

499 reference_transformation_matrix : np.ndarray, optional 

500 A transformation applied to the reference channels, by default None 

501 """ 

502 self.num_channels = num_channels 

503 self.response_channel_indices = response_channel_indices 

504 self.reference_channel_indices = reference_channel_indices 

505 self.acquisition_type = acquisition_type 

506 self.acceptance = acceptance 

507 self.acceptance_function = acceptance_function 

508 self.overlap_fraction = overlap_fraction 

509 self.trigger_channel_index = trigger_channel_index 

510 self.trigger_slope = trigger_slope 

511 self.trigger_level = trigger_level 

512 self.trigger_hysteresis = trigger_hysteresis 

513 self.trigger_hysteresis_samples = trigger_hysteresis_samples 

514 self.pretrigger_fraction = pretrigger_fraction 

515 self.frame_size = frame_size 

516 self.window = window 

517 self.window_parameter_1 = window_parameter_1 

518 self.window_parameter_2 = window_parameter_2 

519 self.window_parameter_3 = window_parameter_3 

520 self.response_transformation_matrix = response_transformation_matrix 

521 self.reference_transformation_matrix = reference_transformation_matrix 

522 self.wait_samples = wait_samples 

523 self.kurtosis_buffer_length = kurtosis_buffer_length 

524 

525 def __eq__(self, other): 

526 try: 

527 return np.all( 

528 [np.all(value == other.__dict__[field]) for field, value in self.__dict__.items()] 

529 ) 

530 except (AttributeError, KeyError): 

531 return False 

532 

533 

534class DataCollectorProcess(AbstractMessageProcess): 

535 """Class that takes data from the data_in_queue and distributes to the environment 

536 

537 This class keeps track of the test level used when acquiring data so the 

538 data can be scaled back to full level for control. It will also skip 

539 frames that are acquired while the system is ramping.""" 

540 

541 def __init__( 

542 self, 

543 process_name: str, 

544 command_queue: VerboseMessageQueue, 

545 data_in_queue: mp.queues.Queue, 

546 data_out_queues: List[mp.queues.Queue], 

547 environment_command_queue: VerboseMessageQueue, 

548 log_file_queue: mp.queues.Queue, 

549 gui_update_queue: mp.queues.Queue, 

550 environment_name, 

551 ): 

552 """ 

553 Constructs the data collector class 

554 

555 Parameters 

556 ---------- 

557 process_name : str 

558 A name to assign the process, primarily for logging purposes. 

559 queues : RandomEnvironmentQueues 

560 A list of Random Environment queues for communcation with other parts 

561 of the environment and the controller 

562 environment_name : str 

563 The name of the environment that this process is generating signals for. 

564 

565 """ 

566 super().__init__(process_name, log_file_queue, command_queue, gui_update_queue) 

567 self.map_command(DataCollectorCommands.INITIALIZE_COLLECTOR, self.initialize_collector) 

568 self.map_command( 

569 DataCollectorCommands.FORCE_INITIALIZE_COLLECTOR, 

570 self.force_initialize_collector, 

571 ) 

572 self.map_command(DataCollectorCommands.ACQUIRE, self.acquire) 

573 self.map_command(DataCollectorCommands.STOP, self.stop) 

574 self.map_command(DataCollectorCommands.ACCEPT, self.accept) 

575 self.map_command(DataCollectorCommands.SET_TEST_LEVEL, self.set_test_level) 

576 self.map_command(DataCollectorCommands.CLEAR_KURTOSIS_BUFFER, self.clear_kurtosis_buffer) 

577 self.environment_command_queue = environment_command_queue 

578 self.environment_name = environment_name 

579 self.collector_metadata = None 

580 self.frame_buffer = None 

581 self.kurtosis_buffer = None 

582 self.reference_window = None 

583 self.response_window = None 

584 self.window_correction_factor = None 

585 self.acceptance_function = None 

586 self.skip_frames = 0 

587 self.test_level = None 

588 self.data_in_queue = data_in_queue 

589 self.data_out_queues = data_out_queues 

590 self.last_frame = None 

591 self.window_correction = None 

592 if DEBUG: 

593 self.received_data_index = 0 

594 

595 def initialize_collector(self, data: CollectorMetadata): 

596 """Initializes the collector with the provided metadata 

597 

598 Parameters 

599 ---------- 

600 data : CollectorMetadata 

601 An object containing metadata to define the collector 

602 """ 

603 if not self.collector_metadata == data: 

604 self.force_initialize_collector(data) 

605 

606 def force_initialize_collector(self, data: CollectorMetadata): 

607 """Initializes the collector with the provided metadata even if the 

608 metadata is already equivalent. 

609 

610 Parameters 

611 ---------- 

612 data : CollectorMetadata 

613 An object containing metadata to define the collector 

614 """ 

615 # Flush the outputs to make sure that there's nothing hanging out on 

616 # the queue when we start up. 

617 for queue in self.data_out_queues: 

618 flush_queue(queue) 

619 self.collector_metadata = data 

620 self.frame_buffer = FrameBuffer( 

621 self.collector_metadata.num_channels, 

622 self.collector_metadata.trigger_channel_index, 

623 self.collector_metadata.pretrigger_fraction, 

624 self.collector_metadata.trigger_slope == TriggerSlope.POSITIVE, 

625 self.collector_metadata.trigger_level, 

626 self.collector_metadata.trigger_hysteresis, 

627 self.collector_metadata.trigger_hysteresis_samples, 

628 self.collector_metadata.frame_size, 

629 self.collector_metadata.overlap_fraction, 

630 self.collector_metadata.acceptance == Acceptance.MANUAL, 

631 self.collector_metadata.acquisition_type != AcquisitionType.FREE_RUN, 

632 self.collector_metadata.acquisition_type == AcquisitionType.TRIGGER_FIRST_FRAME, 

633 self.collector_metadata.wait_samples, 

634 ) 

635 if self.collector_metadata.kurtosis_buffer_length is not None: 

636 self.kurtosis_buffer = KurtosisBuffer( 

637 self.collector_metadata.num_channels, 

638 self.collector_metadata.kurtosis_buffer_length, 

639 ) 

640 if self.collector_metadata.acceptance_function is None: 

641 self.acceptance_function = lambda x: True 

642 else: 

643 module = load_python_module(self.collector_metadata.acceptance_function[0]) 

644 self.acceptance_function = getattr( 

645 module, self.collector_metadata.acceptance_function[1] 

646 ) 

647 self.log("Loaded acceptance function") 

648 if self.collector_metadata.window == Window.RECTANGLE: 

649 self.reference_window = 1 

650 self.response_window = 1 

651 elif self.collector_metadata.window == Window.HANN: 

652 self.reference_window = sig.get_window("hann", self.collector_metadata.frame_size) 

653 self.response_window = self.reference_window.copy() 

654 elif self.collector_metadata.window == Window.HAMMING: 

655 self.reference_window = sig.get_window("hamming", self.collector_metadata.frame_size) 

656 self.response_window = self.reference_window.copy() 

657 elif self.collector_metadata.window == Window.FLATTOP: 

658 self.reference_window = sig.get_window("flattop", self.collector_metadata.frame_size) 

659 self.response_window = self.reference_window.copy() 

660 elif self.collector_metadata.window == Window.TUKEY: 

661 self.reference_window = sig.get_window( 

662 ("tukey", self.collector_metadata.window_parameter_1), 

663 self.collector_metadata.frame_size, 

664 ) 

665 self.response_window = self.reference_window.copy() 

666 elif self.collector_metadata.window == Window.BLACKMANHARRIS: 

667 self.reference_window = sig.get_window( 

668 "blackmanharris", self.collector_metadata.frame_size 

669 ) 

670 self.response_window = self.reference_window.copy() 

671 elif self.collector_metadata.window == Window.EXPONENTIAL: 

672 self.reference_window = sig.get_window( 

673 ( 

674 "exponential", 

675 self.collector_metadata.window_parameter_1, 

676 self.collector_metadata.window_parameter_2, 

677 ), 

678 self.collector_metadata.frame_size, 

679 ) 

680 self.response_window = self.reference_window.copy() 

681 elif self.collector_metadata.window == Window.EXPONENTIAL_FORCE: 

682 self.reference_window = sig.get_window( 

683 ( 

684 "exponential", 

685 self.collector_metadata.window_parameter_2, 

686 self.collector_metadata.window_parameter_3, 

687 ), 

688 self.collector_metadata.frame_size, 

689 ) 

690 self.response_window = self.reference_window.copy() 

691 non_pulse_samples = ( 

692 np.arange(self.collector_metadata.frame_size) + 1 

693 ) / self.collector_metadata.frame_size > self.collector_metadata.window_parameter_1 

694 self.reference_window[non_pulse_samples] = 0 

695 else: 

696 raise ValueError("Invalid Window Type") 

697 self.window_correction = np.sqrt(1 / np.mean(self.response_window**2)) 

698 if DEBUG: 

699 with open("debug_data/collector_metadata.pkl", "wb") as f: 

700 pickle.dump(self.collector_metadata, f) 

701 with open("debug_data/framebuffer.pkl", "wb") as f: 

702 pickle.dump(self.frame_buffer, f) 

703 self.received_data_index = 0 

704 

705 def acquire(self, data): # pylint: disable=unused-argument 

706 """Acquires data from the data_in_queue and sends to the environment 

707 

708 This function will take data and scale it by the test level, or skip 

709 sending the data if the test level is currently changing. It will 

710 also apply the transformation matrices if they are defined. 

711 

712 It will stop itself if the last data is acquired. 

713 

714 Parameters 

715 ---------- 

716 data : Ignored 

717 Unused argument required due to the expectation that functions called 

718 by the RandomDataCollector.run function will have one argument 

719 accepting any data passed along with the instruction. 

720 """ 

721 try: 

722 acquisition_data, last_data = self.data_in_queue.get(timeout=10) 

723 self.log(f"Acquired Data with shape {acquisition_data.shape} and Last Data {last_data}") 

724 self.log(f"Data Average RMS: {rms_time(acquisition_data):0.4f}") 

725 except mp.queues.Empty: 

726 # Keep running until stopped 

727 # self.log('No Incoming Data!') 

728 self.command_queue.put(self.process_name, (DataCollectorCommands.ACQUIRE, None)) 

729 return 

730 # Add data to buffer 

731 self.log("Putting Data to Buffer") 

732 output_frames = self.frame_buffer.add_data_get_frame(acquisition_data) 

733 if DEBUG: 

734 np.save( 

735 f"debug_data/acquisition_data_{self.received_data_index:05d}.npy", 

736 acquisition_data, 

737 ) 

738 np.save( 

739 f"debug_data/output_frames_{self.received_data_index:05d}.npy", 

740 output_frames, 

741 ) 

742 with open( 

743 f"debug_data/framebuffer_{self.received_data_index:05d}.pkl", 

744 "wb", 

745 ) as f: 

746 pickle.dump(self.frame_buffer, f) 

747 self.received_data_index += 1 

748 if output_frames.shape[0] > 0: 

749 self.log(f"Measurement Frames Received ({output_frames.shape[0]})") 

750 for frame in output_frames: 

751 if self.skip_frames > 0: 

752 self.skip_frames -= 1 

753 self.log(f"Skipped Frame, {self.skip_frames} left to skip") 

754 # Reset the buffer. It isn't clear if this is needed, and in the current 

755 # implementation it breaks things... 

756 # self.frame_buffer.reset_trigger() 

757 continue 

758 frame = np.copy(frame) 

759 accepted = self.acceptance_function(frame) 

760 response_frame = frame[self.collector_metadata.response_channel_indices] 

761 reference_frame = frame[self.collector_metadata.reference_channel_indices] 

762 if self.collector_metadata.response_transformation_matrix is not None: 

763 response_frame = ( 

764 self.collector_metadata.response_transformation_matrix @ response_frame 

765 ) 

766 if self.collector_metadata.reference_transformation_matrix is not None: 

767 reference_frame = ( 

768 self.collector_metadata.reference_transformation_matrix @ reference_frame 

769 ) 

770 self.log( 

771 f"Received output from framebuffer with RMS: \n " 

772 f"{rms_time(reference_frame, axis=-1)}" 

773 ) 

774 # Apply window functions 

775 response_frame *= self.response_window / self.test_level 

776 reference_frame *= self.reference_window / self.test_level 

777 if accepted and not self.frame_buffer.manual_accept: 

778 self.gui_update_queue.put( 

779 (self.environment_name, ("time_frame", (frame, True))) 

780 ) 

781 self.log("Sending data") 

782 if self.collector_metadata.kurtosis_buffer_length is not None: 

783 self.kurtosis_buffer.add_data(frame) 

784 self.gui_update_queue.put( 

785 ( 

786 self.environment_name, 

787 ("kurtosis", self.kurtosis_buffer.get_kurtosis()), 

788 ) 

789 ) 

790 # Separate into response and reference 

791 response_fft = rfft(response_frame, axis=-1) * self.window_correction 

792 reference_fft = rfft(reference_frame, axis=-1) * self.window_correction 

793 for queue in self.data_out_queues: 

794 queue.put(copy.deepcopy((response_fft, reference_fft))) 

795 self.log("Sent Data") 

796 elif self.frame_buffer.manual_accept: 

797 self.last_frame = frame 

798 self.gui_update_queue.put( 

799 (self.environment_name, ("time_frame", (frame, False))) 

800 ) 

801 else: 

802 self.gui_update_queue.put( 

803 (self.environment_name, ("time_frame", (frame, False))) 

804 ) 

805 # Keep running until stopped 

806 if not last_data: 

807 self.command_queue.put(self.process_name, (DataCollectorCommands.ACQUIRE, None)) 

808 else: 

809 self.stop(None) 

810 

811 def accept(self, keep_frame): 

812 """Accepts or rejects the data 

813 

814 Parameters 

815 ---------- 

816 keep_frame : bool 

817 If True, the frame will be accepted. If False, it will be rejected. 

818 """ 

819 self.log(f"Received Accept Signal {keep_frame}") 

820 self.frame_buffer.accept() 

821 if keep_frame: 

822 self.log("Sending data manually") 

823 self.gui_update_queue.put( 

824 (self.environment_name, ("time_frame", (self.last_frame, True))) 

825 ) 

826 frame_fft = rfft(self.last_frame, axis=-1) * self.window_correction 

827 # Separate into response and reference 

828 reference_fft = frame_fft[self.collector_metadata.reference_channel_indices] 

829 response_fft = frame_fft[self.collector_metadata.response_channel_indices] 

830 for queue in self.data_out_queues: 

831 queue.put(copy.deepcopy((response_fft, reference_fft))) 

832 self.log("Sent Data") 

833 self.last_frame = None 

834 self.environment_command_queue.put( 

835 self.process_name, (DataCollectorCommands.ACCEPTED, keep_frame) 

836 ) 

837 

838 def stop(self, data): # pylint: disable=unused-argument 

839 """Stops acquiring data from the data_in_queue and flushes queues. 

840 

841 Parameters 

842 ---------- 

843 data : Ignored 

844 Unused argument required due to the expectation that functions called 

845 by the RandomDataCollector.run function will have one argument 

846 accepting any data passed along with the instruction. 

847 """ 

848 sleep(0.05) 

849 self.log("Stopping Data Collection") 

850 for queue in self.data_out_queues: 

851 flush_queue(queue) 

852 self.command_queue.flush(self.process_name) 

853 self.frame_buffer.reset_trigger() 

854 self.environment_command_queue.put( 

855 self.process_name, (DataCollectorCommands.SHUTDOWN_ACHIEVED, None) 

856 ) 

857 

858 def set_test_level(self, data): 

859 """Updates the value of the current test level due and sets the number 

860 of frames to skip. 

861 

862 Parameters 

863 ---------- 

864 data : tuple 

865 Tuple containing the number of frames to skip and the new test 

866 level 

867 

868 """ 

869 self.skip_frames, self.test_level = data 

870 self.log( 

871 f"Setting Test Level to {self.test_level}, skipping next {self.skip_frames} frames" 

872 ) 

873 

874 def clear_kurtosis_buffer(self, data): # pylint: disable=unused-argument 

875 """Clears the kurtosis buffer 

876 

877 Parameters 

878 ---------- 

879 data : ignored 

880 The argument is not used, but is required by the calling signature of functions that 

881 get called via the command map. 

882 """ 

883 if self.kurtosis_buffer is not None: 

884 self.kurtosis_buffer.clear() 

885 

886 

887def data_collector_process( 

888 environment_name: str, 

889 command_queue: VerboseMessageQueue, 

890 data_in_queue: mp.queues.Queue, 

891 data_out_queues: List[mp.queues.Queue], 

892 environment_command_queue: VerboseMessageQueue, 

893 log_file_queue: mp.queues.Queue, 

894 gui_update_queue: mp.queues.Queue, 

895 process_name: str = None, 

896): 

897 """Random vibration data collector process function called by multiprocessing 

898 

899 This function defines the Random Vibration Data Collector process that 

900 gets run by the multiprocessing module when it creates a new process. It 

901 creates a ModalDataCollectorProcess object and runs it. 

902 

903 Parameters 

904 ---------- 

905 environment_name : str : 

906 Name of the environment associated with this signal generation process 

907 """ 

908 

909 data_collector_instance = DataCollectorProcess( 

910 environment_name + " Data Collector" if process_name is None else process_name, 

911 command_queue, 

912 data_in_queue, 

913 data_out_queues, 

914 environment_command_queue, 

915 log_file_queue, 

916 gui_update_queue, 

917 environment_name, 

918 ) 

919 

920 data_collector_instance.run()