Coverage for src / sdynpy / fileio / sdynpy_rattlesnake.py: 6%
787 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 21:21 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-17 21:21 +0000
1# -*- coding: utf-8 -*-
2"""
3Load in time data from Rattlesnake runs
4"""
5# Copyright 2022 National Technology & Engineering Solutions of Sandia,
6# LLC (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S.
7# Government retains certain rights in this software.
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
22from __future__ import annotations
23import netCDF4 as nc4
24import numpy as np
25from ..core.sdynpy_coordinate import coordinate_array, outer_product, CoordinateArray, _string_map
26from ..core.sdynpy_data import (
27 data_array,
28 FunctionTypes,
29 time_history_array,
30 TimeHistoryArray,
31 power_spectral_density_array,
32 PowerSpectralDensityArray,
33 transfer_function_array,
34 TransferFunctionArray,
35 coherence_array,
36 CoherenceArray,
37 multiple_coherence_array,
38 MultipleCoherenceArray,
39)
40from ..core.sdynpy_system import System
41from ..core.sdynpy_matrix import matrix
42import pandas as pd
43import sys
44import openpyxl as opxl
45import os
46import warnings
49def read_rattlesnake_output(
50 file,
51 coordinate_override_column=None,
52 read_only_indices=None,
53 read_variable="time_data",
54 abscissa_start=None,
55 abscissa_stop=None,
56 downsample=None,
57):
58 """
59 Reads in an nc4 Rattlesnake data file and returns the time history array as well
60 as the channel table
62 Parameters
63 ----------
64 file : str or netCDF4.Dataset
65 Path to the nc4 file to read, or an already-open ``netCDF4.Dataset``.
66 coordinate_override_column : str, optional
67 Name of a channel-table column whose string values are parsed as
68 coordinates. When not specified, coordinates are assembled from the
69 ``node_number`` and ``node_direction`` channel-table columns.
70 read_only_indices : slice or iterable, optional
71 A valid indexing operation to select which channel indices to read
72 read_variable : str, optional
73 The time variable from the Rattlesnake file to read. These will
74 generally be time_data, time_data_1, time_data_2, etc. depending on
75 how many streams exist in the file. The default is 'time_data'.
76 abscissa_start : float, optional
77 Data will not be extracted for abscissa values less than this value
78 abscissa_stop : float, optional
79 Data will not be extracted for abscissa values greater than this value
80 downsample : int, optional
81 A step size to use to downsample the dataset when reading
83 Returns
84 -------
85 data_array : TimeHistoryArray
86 Time history data in the Rattlesnake output file
87 channel_table : DataFrame
88 Pandas Dataframe containing the channel table information
90 """
91 if isinstance(file, str):
92 ds = nc4.Dataset(file, "r")
93 elif isinstance(file, nc4.Dataset):
94 ds = file
95 if read_only_indices is None:
96 read_only_indices = slice(None)
97 if abscissa_start is None:
98 start_index = None
99 else:
100 start_index = int(np.ceil(abscissa_start * ds.sample_rate))
101 if abscissa_stop is None:
102 stop_index = None
103 else:
104 stop_index = int(np.ceil(abscissa_stop * ds.sample_rate))
105 abscissa_slice = slice(start_index, stop_index, downsample)
106 output_data = np.array(ds[read_variable][:, abscissa_slice][read_only_indices])
107 abscissa = (
108 np.arange(
109 0 if start_index is None else start_index,
110 ds[read_variable].shape[-1] if stop_index is None else stop_index,
111 1 if downsample is None else downsample,
112 )
113 / ds.sample_rate
114 )
115 if coordinate_override_column is None:
116 nodes = [
117 int("".join(char for char in node if char in "0123456789"))
118 for node in ds["channels"]["node_number"][...][read_only_indices]
119 ]
120 directions = np.array(ds["channels"]["node_direction"][...][read_only_indices], dtype="<U3")
121 coordinates = coordinate_array(nodes, directions)[:, np.newaxis]
122 else:
123 coordinates = coordinate_array(
124 string_array=ds["channels"][coordinate_override_column][read_only_indices]
125 )[:, np.newaxis]
126 array = {name: np.array(variable[:]) for name, variable in ds["channels"].variables.items()}
127 channel_table = pd.DataFrame(array)
128 comment1 = np.char.add(
129 np.char.add(
130 np.array(ds["channels"]["channel_type"][...][read_only_indices], dtype="<U80"),
131 np.array(" :: "),
132 ),
133 np.array(ds["channels"]["unit"][...][read_only_indices], dtype="<U80"),
134 )
135 comment2 = np.char.add(
136 np.char.add(
137 np.array(ds["channels"]["physical_device"][...][read_only_indices], dtype="<U80"),
138 np.array(" :: "),
139 ),
140 np.array(ds["channels"]["physical_channel"][...][read_only_indices], dtype="<U80"),
141 )
142 comment3 = np.char.add(
143 np.char.add(
144 np.array(ds["channels"]["feedback_device"][...][read_only_indices], dtype="<U80"),
145 np.array(" :: "),
146 ),
147 np.array(ds["channels"]["feedback_channel"][...][read_only_indices], dtype="<U80"),
148 )
149 comment4 = np.array(ds["channels"]["comment"][...][read_only_indices], dtype="<U80")
150 comment5 = np.array(ds["channels"]["make"][...][read_only_indices], dtype="<U80")
151 for key in ("model", "serial_number", "triax_dof"):
152 comment5 = np.char.add(comment5, np.array(" "))
153 comment5 = np.char.add(
154 comment5, np.array(ds["channels"][key][...][read_only_indices], dtype="<U80")
155 )
156 time_data = data_array(
157 FunctionTypes.TIME_RESPONSE,
158 abscissa,
159 output_data,
160 coordinates,
161 comment1,
162 comment2,
163 comment3,
164 comment4,
165 comment5,
166 )
167 if isinstance(file, str):
168 ds.close()
169 return time_data, channel_table
172def read_system_id_data(file):
173 """Read system-identification data from a Rattlesnake npz file.
175 Parses the FRF matrix, response CPSD, reference CPSD, response noise
176 CPSD, reference noise CPSD, and multiple coherence from a numpy ``.npz``
177 file produced by the Rattlesnake system-identification routine. Applies
178 response and reference transformation matrices when they are present in
179 the file (i.e. when the corresponding array is not ``NaN``).
181 Parameters
182 ----------
183 file : str or numpy.lib.npyio.NpzFile
184 Path to the Rattlesnake ``.npz`` system-ID file, or an already-loaded
185 ``NpzFile`` object.
187 Returns
188 -------
189 frfs : TransferFunctionArray
190 Frequency response function matrix from the system ID.
191 response_cpsd : PowerSpectralDensityArray
192 Response cross-power spectral density array.
193 reference_cpsd : PowerSpectralDensityArray
194 Reference cross-power spectral density array.
195 response_noise_cpsd : PowerSpectralDensityArray
196 Response noise cross-power spectral density array.
197 reference_noise_cpsd : PowerSpectralDensityArray
198 Reference noise cross-power spectral density array.
199 coherence : MultipleCoherenceArray
200 Multiple coherence array for each response channel.
201 """
203 if isinstance(file, str):
204 file = np.load(file)
205 df = file["sysid_frequency_spacing"]
206 if np.isnan(file["response_transformation_matrix"]):
207 try:
208 response_dofs = coordinate_array(
209 [int(v) for v in file["channel_node_number"][file["response_indices"]]],
210 file["channel_node_direction"][file["response_indices"]],
211 )
212 except Exception:
213 response_dofs = coordinate_array(file["response_indices"] + 1, 0)
214 else:
215 response_dofs = coordinate_array(
216 np.arange(file["response_transformation_matrix"].shape[0]) + 1, 0
217 )
218 if np.isnan(file["reference_transformation_matrix"]):
219 try:
220 reference_dofs = coordinate_array(
221 [int(v) for v in file["channel_node_number"][file["reference_indices"]]],
222 file["channel_node_direction"][file["reference_indices"]],
223 )
224 except Exception:
225 reference_dofs = coordinate_array(file["reference_indices"] + 1, 0)
226 else:
227 reference_dofs = coordinate_array(
228 np.arange(file["reference_transformation_matrix"].shape[0]) + 1, 0
229 )
230 ordinate = np.moveaxis(file["frf_data"], 0, -1)
231 frfs = data_array(
232 FunctionTypes.FREQUENCY_RESPONSE_FUNCTION,
233 df * np.arange(ordinate.shape[-1]),
234 ordinate,
235 outer_product(response_dofs, reference_dofs),
236 )
237 ordinate = np.moveaxis(file["response_cpsd"], 0, -1)
238 response_cpsd = data_array(
239 FunctionTypes.POWER_SPECTRAL_DENSITY,
240 df * np.arange(ordinate.shape[-1]),
241 ordinate,
242 outer_product(response_dofs, response_dofs),
243 )
244 ordinate = np.moveaxis(file["reference_cpsd"], 0, -1)
245 reference_cpsd = data_array(
246 FunctionTypes.POWER_SPECTRAL_DENSITY,
247 df * np.arange(ordinate.shape[-1]),
248 ordinate,
249 outer_product(reference_dofs, reference_dofs),
250 )
251 ordinate = np.moveaxis(file["response_noise_cpsd"], 0, -1)
252 response_noise_cpsd = data_array(
253 FunctionTypes.POWER_SPECTRAL_DENSITY,
254 df * np.arange(ordinate.shape[-1]),
255 ordinate,
256 outer_product(response_dofs, response_dofs),
257 )
258 ordinate = np.moveaxis(file["reference_noise_cpsd"], 0, -1)
259 reference_noise_cpsd = data_array(
260 FunctionTypes.POWER_SPECTRAL_DENSITY,
261 df * np.arange(ordinate.shape[-1]),
262 ordinate,
263 outer_product(reference_dofs, reference_dofs),
264 )
265 ordinate = np.moveaxis(file["coherence"], 0, -1)
266 coherence = data_array(
267 FunctionTypes.MULTIPLE_COHERENCE,
268 df * np.arange(ordinate.shape[-1]),
269 ordinate,
270 outer_product(response_dofs),
271 )
272 return frfs, response_cpsd, reference_cpsd, response_noise_cpsd, reference_noise_cpsd, coherence
275def read_system_id_nc4(file, coordinate_override_column=None):
276 """Read system-identification spectral data from a Rattlesnake nc4 file.
278 Parses FRF, response CPSD, drive CPSD, noise CPSD, and coherence arrays
279 from the first non-channel environment group found in *file*. Applies
280 response and reference transformation matrices when present in the
281 environment group.
283 Parameters
284 ----------
285 file : str or netCDF4.Dataset
286 Path to a Rattlesnake nc4 file, or an already-open ``netCDF4.Dataset``.
287 coordinate_override_column : str, optional
288 Name of a channel-table column whose string values are parsed as
289 coordinates. When ``None`` (default) coordinates are assembled from
290 the ``node_number`` and ``node_direction`` channel-table columns.
292 Returns
293 -------
294 frfs : TransferFunctionArray
295 Frequency response functions with shape ``(n_responses, n_drives)``.
296 response_cpsd : PowerSpectralDensityArray
297 Response cross-power spectral density matrix.
298 drive_cpsd : PowerSpectralDensityArray
299 Drive (reference) cross-power spectral density matrix.
300 response_noise_cpsd : PowerSpectralDensityArray
301 Response noise cross-power spectral density matrix.
302 drive_noise_cpsd : PowerSpectralDensityArray
303 Drive noise cross-power spectral density matrix.
304 coherence : MultipleCoherenceArray
305 Multiple coherence for each response channel.
306 """
307 if isinstance(file, str):
308 ds = nc4.Dataset(file, "r")
309 elif isinstance(file, nc4.Dataset):
310 ds = file
312 environment = [group for group in ds.groups if not group == "channels"][0]
314 # Get the channels in the group
315 if coordinate_override_column is None:
316 nodes = [
317 int("".join(char for char in node if char in "0123456789"))
318 for node in ds["channels"]["node_number"]
319 ]
320 directions = np.array(ds["channels"]["node_direction"][:], dtype="<U3")
321 coordinates = coordinate_array(nodes, directions)
322 else:
323 coordinates = coordinate_array(string_array=ds["channels"][coordinate_override_column])
324 drives = ds["channels"]["feedback_device"][:] != ""
326 # Cull down to just those in the environment
327 environment_index = np.where(ds["environment_names"][:] == environment)[0][0]
328 environment_channels = ds["environment_active_channels"][:, environment_index].astype(bool)
330 drives = drives[environment_channels]
331 coordinates = coordinates[environment_channels]
333 control_indices = ds[environment]["control_channel_indices"][:]
335 if "response_transformation_matrix" in ds[environment].variables:
336 control_coordinates = coordinate_array(
337 np.arange(ds[environment]["response_transformation_matrix"].shape[0]) + 1, 0
338 )
339 response_transform_comment1 = np.array(
340 [
341 f"Unknown :: Transformed Response {i}"
342 for i in np.arange(ds[environment]["response_transformation_matrix"].shape[0]) + 1
343 ],
344 dtype="<U80",
345 )
346 response_transform_comment2 = np.array(
347 [
348 f"Transformed Response {i} :: Transformed Response {i}"
349 for i in np.arange(ds[environment]["response_transformation_matrix"].shape[0]) + 1
350 ],
351 dtype="<U80",
352 )
353 response_transform_comment3 = np.array(
354 [
355 f"Transformed Response {i} :: Transformed Response {i}"
356 for i in np.arange(ds[environment]["response_transformation_matrix"].shape[0]) + 1
357 ],
358 dtype="<U80",
359 )
360 response_transform_comment4 = np.array(
361 [
362 f"Transformed Response {i}"
363 for i in np.arange(ds[environment]["response_transformation_matrix"].shape[0]) + 1
364 ],
365 dtype="<U80",
366 )
367 response_transform_comment5 = np.array(
368 [
369 f"Transformed Response {i}"
370 for i in np.arange(ds[environment]["response_transformation_matrix"].shape[0]) + 1
371 ],
372 dtype="<U80",
373 )
374 control_indices = np.arange(ds[environment]["response_transformation_matrix"].shape[0])
375 else:
376 control_coordinates = coordinates[control_indices]
378 if "reference_transformation_matrix" in ds[environment].variables:
379 drive_coordinates = coordinate_array(
380 np.arange(ds[environment]["reference_transformation_matrix"].shape[0]) + 1, 0
381 )
382 drive_transform_comment1 = np.array(
383 [
384 f"Unknown :: Transformed Drive {i}"
385 for i in np.arange(ds[environment]["reference_transformation_matrix"].shape[0]) + 1
386 ],
387 dtype="<U80",
388 )
389 drive_transform_comment2 = np.array(
390 [
391 f"Transformed Drive {i} :: Transformed Drive {i}"
392 for i in np.arange(ds[environment]["reference_transformation_matrix"].shape[0]) + 1
393 ],
394 dtype="<U80",
395 )
396 drive_transform_comment3 = np.array(
397 [
398 f"Transformed Drive {i} :: Transformed Drive {i}"
399 for i in np.arange(ds[environment]["reference_transformation_matrix"].shape[0]) + 1
400 ],
401 dtype="<U80",
402 )
403 drive_transform_comment4 = np.array(
404 [
405 f"Transformed Drive {i}"
406 for i in np.arange(ds[environment]["reference_transformation_matrix"].shape[0]) + 1
407 ],
408 dtype="<U80",
409 )
410 drive_transform_comment5 = np.array(
411 [
412 f"Transformed Drive {i}"
413 for i in np.arange(ds[environment]["reference_transformation_matrix"].shape[0]) + 1
414 ],
415 dtype="<U80",
416 )
417 drives = np.ones(ds[environment]["reference_transformation_matrix"].shape[0], dtype=bool)
418 else:
419 drive_coordinates = coordinates[drives]
421 # Load the spectral data
422 frequency_spacing = ds.sample_rate / ds[environment].sysid_frame_size
423 fft_lines = ds[environment].dimensions["sysid_fft_lines"].size
424 frequencies = np.arange(fft_lines) * frequency_spacing
426 frf_array = np.moveaxis(
427 np.array(ds[environment]["frf_data_real"][:] + 1j * ds[environment]["frf_data_imag"][:]),
428 0,
429 -1,
430 )
432 response_cpsd_array = np.moveaxis(
433 np.array(
434 ds[environment]["response_cpsd_real"][:] + 1j * ds[environment]["response_cpsd_imag"][:]
435 ),
436 0,
437 -1,
438 )
440 drive_cpsd_array = np.moveaxis(
441 np.array(
442 ds[environment]["reference_cpsd_real"][:]
443 + 1j * ds[environment]["reference_cpsd_imag"][:]
444 ),
445 0,
446 -1,
447 )
449 response_noise_cpsd_array = np.moveaxis(
450 np.array(
451 ds[environment]["response_noise_cpsd_real"][:]
452 + 1j * ds[environment]["response_noise_cpsd_imag"][:]
453 ),
454 0,
455 -1,
456 )
458 drive_noise_cpsd_array = np.moveaxis(
459 np.array(
460 ds[environment]["reference_noise_cpsd_real"][:]
461 + 1j * ds[environment]["reference_noise_cpsd_imag"][:]
462 ),
463 0,
464 -1,
465 )
467 coherence_array = np.moveaxis(np.array(ds[environment]["frf_coherence"][:]), 0, -1)
469 response_coordinates_cpsd = outer_product(control_coordinates, control_coordinates)
470 drive_coordinates_cpsd = outer_product(drive_coordinates, drive_coordinates)
471 frf_coordinates = outer_product(control_coordinates, drive_coordinates)
472 coherence_coordinates = control_coordinates[:, np.newaxis]
474 comment1 = np.char.add(
475 np.char.add(np.array(ds["channels"]["channel_type"][:], dtype="<U80"), np.array(" :: ")),
476 np.array(ds["channels"]["unit"][:], dtype="<U80"),
477 )
478 comment2 = np.char.add(
479 np.char.add(np.array(ds["channels"]["physical_device"][:], dtype="<U80"), np.array(" :: ")),
480 np.array(ds["channels"]["physical_channel"][:], dtype="<U80"),
481 )
482 comment3 = np.char.add(
483 np.char.add(np.array(ds["channels"]["feedback_device"][:], dtype="<U80"), np.array(" :: ")),
484 np.array(ds["channels"]["feedback_channel"][:], dtype="<U80"),
485 )
486 comment4 = np.array(ds["channels"]["comment"][:], dtype="<U80")
487 comment5 = np.array(ds["channels"]["make"][:], dtype="<U80")
488 for key in ("model", "serial_number", "triax_dof"):
489 comment5 = np.char.add(comment5, np.array(" "))
490 comment5 = np.char.add(comment5, np.array(ds["channels"][key][:], dtype="<U80"))
492 full_comment1 = comment1[environment_channels]
493 full_comment2 = comment2[environment_channels]
494 full_comment3 = comment3[environment_channels]
495 full_comment4 = comment4[environment_channels]
496 full_comment5 = comment5[environment_channels]
498 if "response_transformation_matrix" in ds[environment].variables:
499 comment1 = response_transform_comment1
500 comment2 = response_transform_comment2
501 comment3 = response_transform_comment3
502 comment4 = response_transform_comment4
503 comment5 = response_transform_comment5
504 else:
505 comment1 = full_comment1
506 comment2 = full_comment2
507 comment3 = full_comment3
508 comment4 = full_comment4
509 comment5 = full_comment5
510 comment1_response_cpsd = np.empty(
511 (response_coordinates_cpsd.shape[0], response_coordinates_cpsd.shape[1]),
512 dtype=comment1.dtype,
513 )
514 comment2_response_cpsd = np.empty(
515 (response_coordinates_cpsd.shape[0], response_coordinates_cpsd.shape[1]),
516 dtype=comment1.dtype,
517 )
518 comment3_response_cpsd = np.empty(
519 (response_coordinates_cpsd.shape[0], response_coordinates_cpsd.shape[1]),
520 dtype=comment1.dtype,
521 )
522 comment4_response_cpsd = np.empty(
523 (response_coordinates_cpsd.shape[0], response_coordinates_cpsd.shape[1]),
524 dtype=comment1.dtype,
525 )
526 comment5_response_cpsd = np.empty(
527 (response_coordinates_cpsd.shape[0], response_coordinates_cpsd.shape[1]),
528 dtype=comment1.dtype,
529 )
530 comment1_coherence = np.empty(response_coordinates_cpsd.shape[0], dtype=comment1.dtype)
531 comment2_coherence = np.empty(response_coordinates_cpsd.shape[0], dtype=comment1.dtype)
532 comment3_coherence = np.empty(response_coordinates_cpsd.shape[0], dtype=comment1.dtype)
533 comment4_coherence = np.empty(response_coordinates_cpsd.shape[0], dtype=comment1.dtype)
534 comment5_coherence = np.empty(response_coordinates_cpsd.shape[0], dtype=comment1.dtype)
535 for i, idx in enumerate(control_indices):
536 comment1_coherence[i] = comment1[idx]
537 comment2_coherence[i] = comment2[idx]
538 comment3_coherence[i] = comment3[idx]
539 comment4_coherence[i] = comment4[idx]
540 comment5_coherence[i] = comment5[idx]
541 for j, jdx in enumerate(control_indices):
542 comment1_response_cpsd[i, j] = comment1[idx] + " // " + comment1[jdx]
543 comment2_response_cpsd[i, j] = comment2[idx] + " // " + comment2[jdx]
544 comment3_response_cpsd[i, j] = comment3[idx] + " // " + comment3[jdx]
545 comment4_response_cpsd[i, j] = comment4[idx] + " // " + comment4[jdx]
546 comment5_response_cpsd[i, j] = comment5[idx] + " // " + comment5[jdx]
548 if "reference_transformation_matrix" in ds[environment].variables:
549 comment1 = drive_transform_comment1
550 comment2 = drive_transform_comment2
551 comment3 = drive_transform_comment3
552 comment4 = drive_transform_comment4
553 comment5 = drive_transform_comment5
554 else:
555 comment1 = full_comment1
556 comment2 = full_comment2
557 comment3 = full_comment3
558 comment4 = full_comment4
559 comment5 = full_comment5
560 comment1_drive_cpsd = np.empty(
561 (drive_coordinates_cpsd.shape[0], drive_coordinates_cpsd.shape[1]), dtype=comment1.dtype
562 )
563 comment2_drive_cpsd = np.empty(
564 (drive_coordinates_cpsd.shape[0], drive_coordinates_cpsd.shape[1]), dtype=comment1.dtype
565 )
566 comment3_drive_cpsd = np.empty(
567 (drive_coordinates_cpsd.shape[0], drive_coordinates_cpsd.shape[1]), dtype=comment1.dtype
568 )
569 comment4_drive_cpsd = np.empty(
570 (drive_coordinates_cpsd.shape[0], drive_coordinates_cpsd.shape[1]), dtype=comment1.dtype
571 )
572 comment5_drive_cpsd = np.empty(
573 (drive_coordinates_cpsd.shape[0], drive_coordinates_cpsd.shape[1]), dtype=comment1.dtype
574 )
575 drive_indices = np.where(drives)[0]
576 for i, idx in enumerate(drive_indices):
577 for j, jdx in enumerate(drive_indices):
578 comment1_drive_cpsd[i, j] = comment1[idx] + " // " + comment1[jdx]
579 comment2_drive_cpsd[i, j] = comment2[idx] + " // " + comment2[jdx]
580 comment3_drive_cpsd[i, j] = comment3[idx] + " // " + comment3[jdx]
581 comment4_drive_cpsd[i, j] = comment4[idx] + " // " + comment4[jdx]
582 comment5_drive_cpsd[i, j] = comment5[idx] + " // " + comment5[jdx]
584 if "response_transformation_matrix" in ds[environment].variables:
585 rcomment1 = response_transform_comment1
586 rcomment2 = response_transform_comment2
587 rcomment3 = response_transform_comment3
588 rcomment4 = response_transform_comment4
589 rcomment5 = response_transform_comment5
590 else:
591 rcomment1 = full_comment1
592 rcomment2 = full_comment2
593 rcomment3 = full_comment3
594 rcomment4 = full_comment4
595 rcomment5 = full_comment5
596 if "reference_transformation_matrix" in ds[environment].variables:
597 dcomment1 = drive_transform_comment1
598 dcomment2 = drive_transform_comment2
599 dcomment3 = drive_transform_comment3
600 dcomment4 = drive_transform_comment4
601 dcomment5 = drive_transform_comment5
602 else:
603 dcomment1 = full_comment1
604 dcomment2 = full_comment2
605 dcomment3 = full_comment3
606 dcomment4 = full_comment4
607 dcomment5 = full_comment5
609 comment1_frf = np.empty(
610 (frf_coordinates.shape[0], frf_coordinates.shape[1]), dtype=comment1.dtype
611 )
612 comment2_frf = np.empty(
613 (frf_coordinates.shape[0], frf_coordinates.shape[1]), dtype=comment1.dtype
614 )
615 comment3_frf = np.empty(
616 (frf_coordinates.shape[0], frf_coordinates.shape[1]), dtype=comment1.dtype
617 )
618 comment4_frf = np.empty(
619 (frf_coordinates.shape[0], frf_coordinates.shape[1]), dtype=comment1.dtype
620 )
621 comment5_frf = np.empty(
622 (frf_coordinates.shape[0], frf_coordinates.shape[1]), dtype=comment1.dtype
623 )
624 for i, idx in enumerate(control_indices):
625 for j, jdx in enumerate(drive_indices):
626 comment1_frf[i, j] = rcomment1[idx] + " // " + dcomment1[jdx]
627 comment2_frf[i, j] = rcomment2[idx] + " // " + dcomment2[jdx]
628 comment3_frf[i, j] = rcomment3[idx] + " // " + dcomment3[jdx]
629 comment4_frf[i, j] = rcomment4[idx] + " // " + dcomment4[jdx]
630 comment5_frf[i, j] = rcomment5[idx] + " // " + dcomment5[jdx]
632 # Save the data to SDynpy objects
633 response_cpsd = data_array(
634 FunctionTypes.POWER_SPECTRAL_DENSITY,
635 frequencies,
636 response_cpsd_array,
637 response_coordinates_cpsd,
638 comment1_response_cpsd,
639 comment2_response_cpsd,
640 comment3_response_cpsd,
641 comment4_response_cpsd,
642 comment5_response_cpsd,
643 )
644 response_noise_cpsd = data_array(
645 FunctionTypes.POWER_SPECTRAL_DENSITY,
646 frequencies,
647 response_noise_cpsd_array,
648 response_coordinates_cpsd,
649 comment1_response_cpsd,
650 comment2_response_cpsd,
651 comment3_response_cpsd,
652 comment4_response_cpsd,
653 comment5_response_cpsd,
654 )
655 drive_cpsd = data_array(
656 FunctionTypes.POWER_SPECTRAL_DENSITY,
657 frequencies,
658 drive_cpsd_array,
659 drive_coordinates_cpsd,
660 comment1_drive_cpsd,
661 comment2_drive_cpsd,
662 comment3_drive_cpsd,
663 comment4_drive_cpsd,
664 comment5_drive_cpsd,
665 )
666 drive_noise_cpsd = data_array(
667 FunctionTypes.POWER_SPECTRAL_DENSITY,
668 frequencies,
669 drive_noise_cpsd_array,
670 drive_coordinates_cpsd,
671 comment1_drive_cpsd,
672 comment2_drive_cpsd,
673 comment3_drive_cpsd,
674 comment4_drive_cpsd,
675 comment5_drive_cpsd,
676 )
677 frfs = data_array(
678 FunctionTypes.FREQUENCY_RESPONSE_FUNCTION,
679 frequencies,
680 frf_array,
681 frf_coordinates,
682 comment1_frf,
683 comment2_frf,
684 comment3_frf,
685 comment4_frf,
686 comment5_frf,
687 )
688 coherence = data_array(
689 FunctionTypes.MULTIPLE_COHERENCE,
690 frequencies,
691 coherence_array,
692 coherence_coordinates,
693 comment1_coherence,
694 comment2_coherence,
695 comment3_coherence,
696 comment4_coherence,
697 comment5_coherence,
698 )
700 return frfs, response_cpsd, drive_cpsd, response_noise_cpsd, drive_noise_cpsd, coherence
703def read_random_spectral_data(file, coordinate_override_column=None):
704 """Read random-vibration spectral data from a Rattlesnake nc4 file.
706 Reads the response CPSD, specification CPSD, and drive CPSD arrays that
707 are written to disk while a Rattlesnake Random environment is running
708 (i.e. during active control, not from a system-identification run).
709 Applies response and reference transformation matrices when present.
711 Parameters
712 ----------
713 file : str or netCDF4.Dataset
714 Path to a Rattlesnake nc4 file, or an already-open ``netCDF4.Dataset``.
715 coordinate_override_column : str, optional
716 Name of a channel-table column whose string values are parsed as
717 coordinates. When ``None`` (default) coordinates are assembled from
718 the ``node_number`` and ``node_direction`` channel-table columns.
720 Returns
721 -------
722 response_cpsd : PowerSpectralDensityArray
723 Measured response cross-power spectral density matrix.
724 spec_cpsd : PowerSpectralDensityArray
725 Specification (target) cross-power spectral density matrix.
726 drive_cpsd : PowerSpectralDensityArray
727 Drive (output) cross-power spectral density matrix.
728 """
730 if isinstance(file, str):
731 ds = nc4.Dataset(file, "r")
732 elif isinstance(file, nc4.Dataset):
733 ds = file
735 environment = [group for group in ds.groups if not group == "channels"][0]
737 # Get the channels in the group
738 if coordinate_override_column is None:
739 nodes = [
740 int("".join(char for char in node if char in "0123456789"))
741 for node in ds["channels"]["node_number"]
742 ]
743 directions = np.array(ds["channels"]["node_direction"][:], dtype="<U3")
744 coordinates = coordinate_array(nodes, directions)
745 else:
746 coordinates = coordinate_array(string_array=ds["channels"][coordinate_override_column])
747 drives = ds["channels"]["feedback_device"][:] != ""
749 # Cull down to just those in the environment
750 environment_index = np.where(ds["environment_names"][:] == environment)[0][0]
751 environment_channels = ds["environment_active_channels"][:, environment_index].astype(bool)
753 drives = drives[environment_channels]
754 coordinates = coordinates[environment_channels]
756 control_indices = ds[environment]["control_channel_indices"][:]
758 if "response_transformation_matrix" in ds[environment].variables:
759 control_coordinates = coordinate_array(
760 np.arange(ds[environment]["response_transformation_matrix"].shape[0]) + 1, 0
761 )
762 response_transform_comment1 = np.array(
763 [
764 f"Unknown :: Transformed Response {i}"
765 for i in np.arange(ds[environment]["response_transformation_matrix"].shape[0]) + 1
766 ],
767 dtype="<U80",
768 )
769 response_transform_comment2 = np.array(
770 [
771 f"Transformed Response {i} :: Transformed Response {i}"
772 for i in np.arange(ds[environment]["response_transformation_matrix"].shape[0]) + 1
773 ],
774 dtype="<U80",
775 )
776 response_transform_comment3 = np.array(
777 [
778 f"Transformed Response {i} :: Transformed Response {i}"
779 for i in np.arange(ds[environment]["response_transformation_matrix"].shape[0]) + 1
780 ],
781 dtype="<U80",
782 )
783 response_transform_comment4 = np.array(
784 [
785 f"Transformed Response {i}"
786 for i in np.arange(ds[environment]["response_transformation_matrix"].shape[0]) + 1
787 ],
788 dtype="<U80",
789 )
790 response_transform_comment5 = np.array(
791 [
792 f"Transformed Response {i}"
793 for i in np.arange(ds[environment]["response_transformation_matrix"].shape[0]) + 1
794 ],
795 dtype="<U80",
796 )
797 control_indices = np.arange(ds[environment]["response_transformation_matrix"].shape[0])
798 else:
799 control_coordinates = coordinates[control_indices]
801 if "reference_transformation_matrix" in ds[environment].variables:
802 drive_coordinates = coordinate_array(
803 np.arange(ds[environment]["reference_transformation_matrix"].shape[0]) + 1, 0
804 )
805 drive_transform_comment1 = np.array(
806 [
807 f"Unknown :: Transformed Drive {i}"
808 for i in np.arange(ds[environment]["reference_transformation_matrix"].shape[0]) + 1
809 ],
810 dtype="<U80",
811 )
812 drive_transform_comment2 = np.array(
813 [
814 f"Transformed Drive {i} :: Transformed Drive {i}"
815 for i in np.arange(ds[environment]["reference_transformation_matrix"].shape[0]) + 1
816 ],
817 dtype="<U80",
818 )
819 drive_transform_comment3 = np.array(
820 [
821 f"Transformed Drive {i} :: Transformed Drive {i}"
822 for i in np.arange(ds[environment]["reference_transformation_matrix"].shape[0]) + 1
823 ],
824 dtype="<U80",
825 )
826 drive_transform_comment4 = np.array(
827 [
828 f"Transformed Drive {i}"
829 for i in np.arange(ds[environment]["reference_transformation_matrix"].shape[0]) + 1
830 ],
831 dtype="<U80",
832 )
833 drive_transform_comment5 = np.array(
834 [
835 f"Transformed Drive {i}"
836 for i in np.arange(ds[environment]["reference_transformation_matrix"].shape[0]) + 1
837 ],
838 dtype="<U80",
839 )
840 drives = np.ones(ds[environment]["reference_transformation_matrix"].shape[0], dtype=bool)
841 else:
842 drive_coordinates = coordinates[drives]
844 # Load the spectral data
845 frequencies = np.array(ds[environment]["specification_frequency_lines"][:])
847 spec_cpsd = np.moveaxis(
848 np.array(
849 ds[environment]["specification_cpsd_matrix_real"][:]
850 + 1j * ds[environment]["specification_cpsd_matrix_imag"][:]
851 ),
852 0,
853 -1,
854 )
856 response_cpsd = np.moveaxis(
857 np.array(
858 ds[environment]["response_cpsd_real"][:] + 1j * ds[environment]["response_cpsd_imag"][:]
859 ),
860 0,
861 -1,
862 )
864 drive_cpsd = np.moveaxis(
865 np.array(
866 ds[environment]["drive_cpsd_real"][:] + 1j * ds[environment]["drive_cpsd_imag"][:]
867 ),
868 0,
869 -1,
870 )
872 response_coordinates_cpsd = outer_product(control_coordinates, control_coordinates)
873 drive_coordinates_cpsd = outer_product(drive_coordinates, drive_coordinates)
875 comment1 = np.char.add(
876 np.char.add(np.array(ds["channels"]["channel_type"][:], dtype="<U80"), np.array(" :: ")),
877 np.array(ds["channels"]["unit"][:], dtype="<U80"),
878 )
879 comment2 = np.char.add(
880 np.char.add(np.array(ds["channels"]["physical_device"][:], dtype="<U80"), np.array(" :: ")),
881 np.array(ds["channels"]["physical_channel"][:], dtype="<U80"),
882 )
883 comment3 = np.char.add(
884 np.char.add(np.array(ds["channels"]["feedback_device"][:], dtype="<U80"), np.array(" :: ")),
885 np.array(ds["channels"]["feedback_channel"][:], dtype="<U80"),
886 )
887 comment4 = np.array(ds["channels"]["comment"][:], dtype="<U80")
888 comment5 = np.array(ds["channels"]["make"][:], dtype="<U80")
889 for key in ("model", "serial_number", "triax_dof"):
890 comment5 = np.char.add(comment5, np.array(" "))
891 comment5 = np.char.add(comment5, np.array(ds["channels"][key][:], dtype="<U80"))
893 full_comment1 = comment1[environment_channels]
894 full_comment2 = comment2[environment_channels]
895 full_comment3 = comment3[environment_channels]
896 full_comment4 = comment4[environment_channels]
897 full_comment5 = comment5[environment_channels]
899 if "response_transformation_matrix" in ds[environment].variables:
900 comment1 = response_transform_comment1
901 comment2 = response_transform_comment2
902 comment3 = response_transform_comment3
903 comment4 = response_transform_comment4
904 comment5 = response_transform_comment5
905 else:
906 comment1 = full_comment1
907 comment2 = full_comment2
908 comment3 = full_comment3
909 comment4 = full_comment4
910 comment5 = full_comment5
911 comment1_response_cpsd = np.empty(
912 (response_coordinates_cpsd.shape[0], response_coordinates_cpsd.shape[1]),
913 dtype=comment1.dtype,
914 )
915 comment2_response_cpsd = np.empty(
916 (response_coordinates_cpsd.shape[0], response_coordinates_cpsd.shape[1]),
917 dtype=comment1.dtype,
918 )
919 comment3_response_cpsd = np.empty(
920 (response_coordinates_cpsd.shape[0], response_coordinates_cpsd.shape[1]),
921 dtype=comment1.dtype,
922 )
923 comment4_response_cpsd = np.empty(
924 (response_coordinates_cpsd.shape[0], response_coordinates_cpsd.shape[1]),
925 dtype=comment1.dtype,
926 )
927 comment5_response_cpsd = np.empty(
928 (response_coordinates_cpsd.shape[0], response_coordinates_cpsd.shape[1]),
929 dtype=comment1.dtype,
930 )
931 for i, idx in enumerate(control_indices):
932 for j, jdx in enumerate(control_indices):
933 comment1_response_cpsd[i, j] = comment1[idx] + " // " + comment1[jdx]
934 comment2_response_cpsd[i, j] = comment2[idx] + " // " + comment2[jdx]
935 comment3_response_cpsd[i, j] = comment3[idx] + " // " + comment3[jdx]
936 comment4_response_cpsd[i, j] = comment4[idx] + " // " + comment4[jdx]
937 comment5_response_cpsd[i, j] = comment5[idx] + " // " + comment5[jdx]
939 if "reference_transformation_matrix" in ds[environment].variables:
940 comment1 = drive_transform_comment1
941 comment2 = drive_transform_comment2
942 comment3 = drive_transform_comment3
943 comment4 = drive_transform_comment4
944 comment5 = drive_transform_comment5
945 else:
946 comment1 = full_comment1
947 comment2 = full_comment2
948 comment3 = full_comment3
949 comment4 = full_comment4
950 comment5 = full_comment5
951 comment1_drive_cpsd = np.empty(
952 (drive_coordinates_cpsd.shape[0], drive_coordinates_cpsd.shape[1]), dtype=comment1.dtype
953 )
954 comment2_drive_cpsd = np.empty(
955 (drive_coordinates_cpsd.shape[0], drive_coordinates_cpsd.shape[1]), dtype=comment1.dtype
956 )
957 comment3_drive_cpsd = np.empty(
958 (drive_coordinates_cpsd.shape[0], drive_coordinates_cpsd.shape[1]), dtype=comment1.dtype
959 )
960 comment4_drive_cpsd = np.empty(
961 (drive_coordinates_cpsd.shape[0], drive_coordinates_cpsd.shape[1]), dtype=comment1.dtype
962 )
963 comment5_drive_cpsd = np.empty(
964 (drive_coordinates_cpsd.shape[0], drive_coordinates_cpsd.shape[1]), dtype=comment1.dtype
965 )
966 drive_indices = np.where(drives)[0]
967 for i, idx in enumerate(drive_indices):
968 for j, jdx in enumerate(drive_indices):
969 comment1_drive_cpsd[i, j] = comment1[idx] + " // " + comment1[jdx]
970 comment2_drive_cpsd[i, j] = comment2[idx] + " // " + comment2[jdx]
971 comment3_drive_cpsd[i, j] = comment3[idx] + " // " + comment3[jdx]
972 comment4_drive_cpsd[i, j] = comment4[idx] + " // " + comment4[jdx]
973 comment5_drive_cpsd[i, j] = comment5[idx] + " // " + comment5[jdx]
975 # Save the data to SDynpy objects
976 response_cpsd = data_array(
977 FunctionTypes.POWER_SPECTRAL_DENSITY,
978 frequencies,
979 response_cpsd,
980 response_coordinates_cpsd,
981 comment1_response_cpsd,
982 comment2_response_cpsd,
983 comment3_response_cpsd,
984 comment4_response_cpsd,
985 comment5_response_cpsd,
986 )
987 spec_cpsd = data_array(
988 FunctionTypes.POWER_SPECTRAL_DENSITY,
989 frequencies,
990 spec_cpsd,
991 response_coordinates_cpsd,
992 comment1_response_cpsd,
993 comment2_response_cpsd,
994 comment3_response_cpsd,
995 comment4_response_cpsd,
996 comment5_response_cpsd,
997 )
998 drive_cpsd = data_array(
999 FunctionTypes.POWER_SPECTRAL_DENSITY,
1000 frequencies,
1001 drive_cpsd,
1002 drive_coordinates_cpsd,
1003 comment1_drive_cpsd,
1004 comment2_drive_cpsd,
1005 comment3_drive_cpsd,
1006 comment4_drive_cpsd,
1007 comment5_drive_cpsd,
1008 )
1009 return response_cpsd, spec_cpsd, drive_cpsd
1012def read_modal_data(file, coordinate_override_column=None, read_only_indices=None):
1013 """Read modal test data from a Rattlesnake nc4 file.
1015 Reads the full time history, FRF matrix, and multiple coherence array
1016 from a Rattlesnake modal-test nc4 file. Time data are reshaped into
1017 ``(n_averages, n_channels, samples_per_frame)`` blocks.
1019 Parameters
1020 ----------
1021 file : str or netCDF4.Dataset
1022 Path to a Rattlesnake nc4 file, or an already-open ``netCDF4.Dataset``.
1023 coordinate_override_column : str, optional
1024 Channel-table column whose string values are parsed as coordinates.
1025 When ``None`` (default) coordinates are built from ``node_number`` and
1026 ``node_direction``.
1027 read_only_indices : slice or array_like, optional
1028 Index expression applied to the channel axis when loading time data.
1029 Defaults to ``slice(None)`` (all channels).
1031 Returns
1032 -------
1033 time_data : TimeHistoryArray
1034 Averaged time blocks with shape ``(n_averages, n_channels)``.
1035 frf_data : TransferFunctionArray
1036 Frequency response function matrix.
1037 coherence_data : MultipleCoherenceArray
1038 Multiple coherence for each response channel.
1039 channel_table : pandas.DataFrame
1040 Full channel table from the nc4 file.
1042 Warns
1043 -----
1044 UserWarning
1045 If the number of complete averages in the time data does not match the
1046 ``num_averages`` attribute stored in the test settings group.
1047 """
1048 if isinstance(file, str):
1049 ds = nc4.Dataset(file, "r")
1050 elif isinstance(file, nc4.Dataset):
1051 ds = file
1052 if read_only_indices is None:
1053 read_only_indices = slice(None)
1054 # Get parameters
1055 num_channels = ds.groups["channels"].variables["physical_device"].size
1056 group_key = [g for g in ds.groups if not g == "channels"][0]
1057 group = ds.groups[group_key]
1058 sample_rate = ds.sample_rate
1059 samples_per_frame = group.samples_per_frame
1060 num_averages = group.num_averages
1061 # Load in the time data
1062 try:
1063 output_data = (
1064 np.array(ds["time_data"][...][read_only_indices])
1065 .reshape(num_channels, num_averages, samples_per_frame)
1066 .transpose(1, 0, 2)
1067 )
1068 except ValueError:
1069 warnings.warn(
1070 "Number of averages in the time data does not match the number of averages specified in the test settings. Your test may be incomplete."
1071 )
1072 output_data = (
1073 np.array(ds["time_data"][...][read_only_indices])
1074 .reshape(num_channels, -1, samples_per_frame)
1075 .transpose(1, 0, 2)
1076 )
1077 abscissa = np.arange(samples_per_frame) / sample_rate
1078 if coordinate_override_column is None:
1079 nodes = [
1080 int("".join(char for char in node if char in "0123456789"))
1081 for node in ds["channels"]["node_number"][...][read_only_indices]
1082 ]
1083 directions = np.array(ds["channels"]["node_direction"][...][read_only_indices], dtype="<U3")
1084 coordinates = coordinate_array(nodes, directions)[:, np.newaxis]
1085 else:
1086 coordinates = coordinate_array(
1087 string_array=ds["channels"][coordinate_override_column][read_only_indices]
1088 )[:, np.newaxis]
1089 array = {name: np.array(variable[:]) for name, variable in ds["channels"].variables.items()}
1090 channel_table = pd.DataFrame(array)
1091 comment1 = np.char.add(
1092 np.char.add(
1093 np.array(ds["channels"]["channel_type"][...][read_only_indices], dtype="<U80"),
1094 np.array(" :: "),
1095 ),
1096 np.array(ds["channels"]["unit"][...][read_only_indices], dtype="<U80"),
1097 )
1098 comment2 = np.char.add(
1099 np.char.add(
1100 np.array(ds["channels"]["physical_device"][...][read_only_indices], dtype="<U80"),
1101 np.array(" :: "),
1102 ),
1103 np.array(ds["channels"]["physical_channel"][...][read_only_indices], dtype="<U80"),
1104 )
1105 comment3 = np.char.add(
1106 np.char.add(
1107 np.array(ds["channels"]["feedback_device"][...][read_only_indices], dtype="<U80"),
1108 np.array(" :: "),
1109 ),
1110 np.array(ds["channels"]["feedback_channel"][...][read_only_indices], dtype="<U80"),
1111 )
1112 comment4 = np.array(ds["channels"]["comment"][...][read_only_indices], dtype="<U80")
1113 comment5 = np.array(ds["channels"]["make"][...][read_only_indices], dtype="<U80")
1114 for key in ("model", "serial_number", "triax_dof"):
1115 comment5 = np.char.add(comment5, np.array(" "))
1116 comment5 = np.char.add(
1117 comment5, np.array(ds["channels"][key][...][read_only_indices], dtype="<U80")
1118 )
1119 time_data = data_array(
1120 FunctionTypes.TIME_RESPONSE,
1121 abscissa,
1122 output_data,
1123 coordinates,
1124 comment1,
1125 comment2,
1126 comment3,
1127 comment4,
1128 comment5,
1129 )
1130 # Response and Reference Indices
1131 kept_indices = np.arange(num_channels)[read_only_indices]
1132 reference_indices = np.array(group.variables["reference_channel_indices"][:])
1133 response_indices = np.array(group.variables["response_channel_indices"][:])
1134 keep_response_indices = np.array(
1135 [i for i, index in enumerate(response_indices) if index in kept_indices]
1136 )
1137 keep_reference_indices = np.array(
1138 [i for i, index in enumerate(reference_indices) if index in kept_indices]
1139 )
1140 frequency_lines = (
1141 np.arange(group.dimensions["fft_lines"].size) * sample_rate / samples_per_frame
1142 )
1143 coherence_data = np.array(group["coherence"][:, keep_response_indices]).T
1144 comment1 = np.char.add(
1145 np.char.add(
1146 np.array(
1147 ds["channels"]["channel_type"][...][response_indices[keep_response_indices]],
1148 dtype="<U80",
1149 ),
1150 np.array(" :: "),
1151 ),
1152 np.array(
1153 ds["channels"]["unit"][...][response_indices[keep_response_indices]], dtype="<U80"
1154 ),
1155 )
1156 comment2 = np.char.add(
1157 np.char.add(
1158 np.array(
1159 ds["channels"]["physical_device"][...][response_indices[keep_response_indices]],
1160 dtype="<U80",
1161 ),
1162 np.array(" :: "),
1163 ),
1164 np.array(
1165 ds["channels"]["physical_channel"][...][response_indices[keep_response_indices]],
1166 dtype="<U80",
1167 ),
1168 )
1169 comment3 = np.char.add(
1170 np.char.add(
1171 np.array(
1172 ds["channels"]["feedback_device"][...][response_indices[keep_response_indices]],
1173 dtype="<U80",
1174 ),
1175 np.array(" :: "),
1176 ),
1177 np.array(
1178 ds["channels"]["feedback_channel"][...][response_indices[keep_response_indices]],
1179 dtype="<U80",
1180 ),
1181 )
1182 comment4 = np.array(
1183 ds["channels"]["comment"][...][response_indices[keep_response_indices]], dtype="<U80"
1184 )
1185 comment5 = np.array(
1186 ds["channels"]["make"][...][response_indices[keep_response_indices]], dtype="<U80"
1187 )
1188 for key in ("model", "serial_number", "triax_dof"):
1189 comment5 = np.char.add(comment5, np.array(" "))
1190 comment5 = np.char.add(
1191 comment5,
1192 np.array(
1193 ds["channels"][key][...][response_indices[keep_response_indices]], dtype="<U80"
1194 ),
1195 )
1196 coherence_data = data_array(
1197 FunctionTypes.MULTIPLE_COHERENCE,
1198 frequency_lines,
1199 coherence_data,
1200 coordinates[response_indices[keep_response_indices]],
1201 comment1,
1202 comment2,
1203 comment3,
1204 comment4,
1205 comment5,
1206 )
1207 # Frequency Response Functions
1208 frf_data = np.moveaxis(
1209 np.array(group["frf_data_real"])[
1210 :, keep_response_indices[:, np.newaxis], keep_reference_indices
1211 ]
1212 + np.array(group["frf_data_imag"])[
1213 :, keep_response_indices[:, np.newaxis], keep_reference_indices
1214 ]
1215 * 1j,
1216 0,
1217 -1,
1218 )
1219 frf_coordinate = outer_product(
1220 coordinates[response_indices[keep_response_indices], 0],
1221 coordinates[reference_indices[keep_reference_indices], 0],
1222 )
1223 # print(response_indices[keep_response_indices])
1224 # print(reference_indices[keep_reference_indices])
1225 response_comment1 = np.char.add(
1226 np.char.add(
1227 np.array(
1228 ds["channels"]["channel_type"][...][response_indices[keep_response_indices]],
1229 dtype="<U80",
1230 ),
1231 np.array(" :: "),
1232 ),
1233 np.array(
1234 ds["channels"]["unit"][...][response_indices[keep_response_indices]], dtype="<U80"
1235 ),
1236 )
1237 response_comment2 = np.char.add(
1238 np.char.add(
1239 np.array(
1240 ds["channels"]["physical_device"][...][response_indices[keep_response_indices]],
1241 dtype="<U80",
1242 ),
1243 np.array(" :: "),
1244 ),
1245 np.array(
1246 ds["channels"]["physical_channel"][...][response_indices[keep_response_indices]],
1247 dtype="<U80",
1248 ),
1249 )
1250 response_comment3 = np.char.add(
1251 np.char.add(
1252 np.array(
1253 ds["channels"]["feedback_device"][...][response_indices[keep_response_indices]],
1254 dtype="<U80",
1255 ),
1256 np.array(" :: "),
1257 ),
1258 np.array(
1259 ds["channels"]["feedback_channel"][...][response_indices[keep_response_indices]],
1260 dtype="<U80",
1261 ),
1262 )
1263 response_comment4 = np.array(
1264 ds["channels"]["comment"][...][response_indices[keep_response_indices]], dtype="<U80"
1265 )
1266 response_comment5 = np.array(
1267 ds["channels"]["make"][...][response_indices[keep_response_indices]], dtype="<U80"
1268 )
1269 for key in ("model", "serial_number", "triax_dof"):
1270 response_comment5 = np.char.add(response_comment5, np.array(" "))
1271 response_comment5 = np.char.add(
1272 response_comment5,
1273 np.array(
1274 ds["channels"][key][...][response_indices[keep_response_indices]], dtype="<U80"
1275 ),
1276 )
1277 reference_comment1 = np.char.add(
1278 np.char.add(
1279 np.array(
1280 ds["channels"]["channel_type"][...][reference_indices[keep_reference_indices]],
1281 dtype="<U80",
1282 ),
1283 np.array(" :: "),
1284 ),
1285 np.array(
1286 ds["channels"]["unit"][...][reference_indices[keep_reference_indices]], dtype="<U80"
1287 ),
1288 )
1289 reference_comment2 = np.char.add(
1290 np.char.add(
1291 np.array(
1292 ds["channels"]["physical_device"][...][reference_indices[keep_reference_indices]],
1293 dtype="<U80",
1294 ),
1295 np.array(" :: "),
1296 ),
1297 np.array(
1298 ds["channels"]["physical_channel"][...][reference_indices[keep_reference_indices]],
1299 dtype="<U80",
1300 ),
1301 )
1302 reference_comment3 = np.char.add(
1303 np.char.add(
1304 np.array(
1305 ds["channels"]["feedback_device"][...][reference_indices[keep_reference_indices]],
1306 dtype="<U80",
1307 ),
1308 np.array(" :: "),
1309 ),
1310 np.array(
1311 ds["channels"]["feedback_channel"][...][reference_indices[keep_reference_indices]],
1312 dtype="<U80",
1313 ),
1314 )
1315 reference_comment4 = np.array(
1316 ds["channels"]["comment"][...][reference_indices[keep_reference_indices]], dtype="<U80"
1317 )
1318 reference_comment5 = np.array(
1319 ds["channels"]["make"][...][reference_indices[keep_reference_indices]], dtype="<U80"
1320 )
1321 for key in ("model", "serial_number", "triax_dof"):
1322 reference_comment5 = np.char.add(reference_comment5, np.array(" "))
1323 reference_comment5 = np.char.add(
1324 reference_comment5,
1325 np.array(
1326 ds["channels"][key][...][reference_indices[keep_reference_indices]], dtype="<U80"
1327 ),
1328 )
1329 response_comment1, reference_comment1 = np.broadcast_arrays(
1330 response_comment1[:, np.newaxis], reference_comment1
1331 )
1332 comment1 = np.char.add(np.char.add(response_comment1, np.array(" / ")), reference_comment1)
1333 response_comment2, reference_comment2 = np.broadcast_arrays(
1334 response_comment2[:, np.newaxis], reference_comment2
1335 )
1336 comment2 = np.char.add(np.char.add(response_comment2, np.array(" / ")), reference_comment2)
1337 response_comment3, reference_comment3 = np.broadcast_arrays(
1338 response_comment3[:, np.newaxis], reference_comment3
1339 )
1340 comment3 = np.char.add(np.char.add(response_comment3, np.array(" / ")), reference_comment3)
1341 response_comment4, reference_comment4 = np.broadcast_arrays(
1342 response_comment4[:, np.newaxis], reference_comment4
1343 )
1344 comment4 = np.char.add(np.char.add(response_comment4, np.array(" / ")), reference_comment4)
1345 response_comment5, reference_comment5 = np.broadcast_arrays(
1346 response_comment5[:, np.newaxis], reference_comment5
1347 )
1348 comment5 = np.char.add(np.char.add(response_comment5, np.array(" / ")), reference_comment5)
1349 frf_data = data_array(
1350 FunctionTypes.FREQUENCY_RESPONSE_FUNCTION,
1351 frequency_lines,
1352 frf_data,
1353 frf_coordinate,
1354 comment1,
1355 comment2,
1356 comment3,
1357 comment4,
1358 comment5,
1359 )
1360 return time_data, frf_data, coherence_data, channel_table
1363def read_transient_control_data(file, coordinate_override_column=None):
1364 """Read transient control time-history data from a Rattlesnake nc4 file.
1366 Reads the control response signal, specification signal, and drive signal
1367 time histories from the first non-channel environment group in *file*.
1368 Applies response and reference transformation matrices when present.
1370 Parameters
1371 ----------
1372 file : str or netCDF4.Dataset
1373 Path to a Rattlesnake nc4 file, or an already-open ``netCDF4.Dataset``.
1374 coordinate_override_column : str, optional
1375 Channel-table column whose string values are parsed as coordinates.
1376 When ``None`` (default) coordinates are built from ``node_number`` and
1377 ``node_direction``. Note: this argument is currently overridden to
1378 ``None`` inside the function body regardless of the value passed.
1380 Returns
1381 -------
1382 response_signal : TimeHistoryArray
1383 Measured control-response time history.
1384 spec_signal : TimeHistoryArray
1385 Specification (target) control signal time history.
1386 drive_signal : TimeHistoryArray
1387 Drive (output) signal time history.
1388 """
1389 if isinstance(file, str):
1390 ds = nc4.Dataset(file, "r")
1391 elif isinstance(file, nc4.Dataset):
1392 ds = file
1393 coordinate_override_column = None
1395 environment = [group for group in ds.groups if not group == "channels"][0]
1397 # Get the channels in the group
1398 if coordinate_override_column is None:
1399 nodes = [
1400 int("".join(char for char in node if char in "0123456789"))
1401 for node in ds["channels"]["node_number"]
1402 ]
1403 directions = np.array(ds["channels"]["node_direction"][:], dtype="<U3")
1404 coordinates = coordinate_array(nodes, directions)
1405 else:
1406 coordinates = coordinate_array(string_array=ds["channels"][coordinate_override_column])
1407 drives = ds["channels"]["feedback_device"][:] != ""
1409 # Cull down to just those in the environment
1410 environment_index = np.where(ds["environment_names"][:] == environment)[0][0]
1411 environment_channels = ds["environment_active_channels"][:, environment_index].astype(bool)
1413 drives = drives[environment_channels]
1414 coordinates = coordinates[environment_channels]
1416 control_indices = ds[environment]["control_channel_indices"][:]
1418 if "response_transformation_matrix" in ds[environment].variables:
1419 control_coordinates = coordinate_array(
1420 np.arange(ds[environment]["response_transformation_matrix"].shape[0]) + 1, 0
1421 )
1422 response_transform_comment1 = np.array(
1423 [
1424 f"Unknown :: Transformed Response {i}"
1425 for i in np.arange(ds[environment]["response_transformation_matrix"].shape[0]) + 1
1426 ],
1427 dtype="<U80",
1428 )
1429 response_transform_comment2 = np.array(
1430 [
1431 f"Transformed Response {i} :: Transformed Response {i}"
1432 for i in np.arange(ds[environment]["response_transformation_matrix"].shape[0]) + 1
1433 ],
1434 dtype="<U80",
1435 )
1436 response_transform_comment3 = np.array(
1437 [
1438 f"Transformed Response {i} :: Transformed Response {i}"
1439 for i in np.arange(ds[environment]["response_transformation_matrix"].shape[0]) + 1
1440 ],
1441 dtype="<U80",
1442 )
1443 response_transform_comment4 = np.array(
1444 [
1445 f"Transformed Response {i}"
1446 for i in np.arange(ds[environment]["response_transformation_matrix"].shape[0]) + 1
1447 ],
1448 dtype="<U80",
1449 )
1450 response_transform_comment5 = np.array(
1451 [
1452 f"Transformed Response {i}"
1453 for i in np.arange(ds[environment]["response_transformation_matrix"].shape[0]) + 1
1454 ],
1455 dtype="<U80",
1456 )
1457 control_indices = np.arange(ds[environment]["response_transformation_matrix"].shape[0])
1458 else:
1459 control_coordinates = coordinates[control_indices]
1461 if "reference_transformation_matrix" in ds[environment].variables:
1462 drive_coordinates = coordinate_array(
1463 np.arange(ds[environment]["reference_transformation_matrix"].shape[0]) + 1, 0
1464 )
1465 drive_transform_comment1 = np.array(
1466 [
1467 f"Unknown :: Transformed Drive {i}"
1468 for i in np.arange(ds[environment]["reference_transformation_matrix"].shape[0]) + 1
1469 ],
1470 dtype="<U80",
1471 )
1472 drive_transform_comment2 = np.array(
1473 [
1474 f"Transformed Drive {i} :: Transformed Drive {i}"
1475 for i in np.arange(ds[environment]["reference_transformation_matrix"].shape[0]) + 1
1476 ],
1477 dtype="<U80",
1478 )
1479 drive_transform_comment3 = np.array(
1480 [
1481 f"Transformed Drive {i} :: Transformed Drive {i}"
1482 for i in np.arange(ds[environment]["reference_transformation_matrix"].shape[0]) + 1
1483 ],
1484 dtype="<U80",
1485 )
1486 drive_transform_comment4 = np.array(
1487 [
1488 f"Transformed Drive {i}"
1489 for i in np.arange(ds[environment]["reference_transformation_matrix"].shape[0]) + 1
1490 ],
1491 dtype="<U80",
1492 )
1493 drive_transform_comment5 = np.array(
1494 [
1495 f"Transformed Drive {i}"
1496 for i in np.arange(ds[environment]["reference_transformation_matrix"].shape[0]) + 1
1497 ],
1498 dtype="<U80",
1499 )
1500 drives = np.ones(ds[environment]["reference_transformation_matrix"].shape[0], dtype=bool)
1501 else:
1502 drive_coordinates = coordinates[drives]
1504 # Load the time data
1505 timesteps = np.arange(ds[environment].dimensions["signal_samples"].size) / ds.sample_rate
1507 spec_signal = np.array(ds[environment]["control_signal"][...])
1509 response_signal = np.array(ds[environment]["control_response"][...])
1511 drive_signal = np.array(ds[environment]["control_drives"][...])
1513 response_coordinates = control_coordinates[:, np.newaxis]
1514 drive_coordinates = drive_coordinates[:, np.newaxis]
1516 comment1 = np.char.add(
1517 np.char.add(np.array(ds["channels"]["channel_type"][:], dtype="<U80"), np.array(" :: ")),
1518 np.array(ds["channels"]["unit"][:], dtype="<U80"),
1519 )
1520 comment2 = np.char.add(
1521 np.char.add(np.array(ds["channels"]["physical_device"][:], dtype="<U80"), np.array(" :: ")),
1522 np.array(ds["channels"]["physical_channel"][:], dtype="<U80"),
1523 )
1524 comment3 = np.char.add(
1525 np.char.add(np.array(ds["channels"]["feedback_device"][:], dtype="<U80"), np.array(" :: ")),
1526 np.array(ds["channels"]["feedback_channel"][:], dtype="<U80"),
1527 )
1528 comment4 = np.array(ds["channels"]["comment"][:], dtype="<U80")
1529 comment5 = np.array(ds["channels"]["make"][:], dtype="<U80")
1530 for key in ("model", "serial_number", "triax_dof"):
1531 comment5 = np.char.add(comment5, np.array(" "))
1532 comment5 = np.char.add(comment5, np.array(ds["channels"][key][:], dtype="<U80"))
1534 full_comment1 = comment1[environment_channels]
1535 full_comment2 = comment2[environment_channels]
1536 full_comment3 = comment3[environment_channels]
1537 full_comment4 = comment4[environment_channels]
1538 full_comment5 = comment5[environment_channels]
1540 if "response_transformation_matrix" in ds[environment].variables:
1541 comment1 = response_transform_comment1
1542 comment2 = response_transform_comment2
1543 comment3 = response_transform_comment3
1544 comment4 = response_transform_comment4
1545 comment5 = response_transform_comment5
1546 else:
1547 comment1 = full_comment1[control_indices]
1548 comment2 = full_comment2[control_indices]
1549 comment3 = full_comment3[control_indices]
1550 comment4 = full_comment4[control_indices]
1551 comment5 = full_comment5[control_indices]
1553 # Save the data to SDynpy objects
1554 response_signal = data_array(
1555 FunctionTypes.TIME_RESPONSE,
1556 timesteps,
1557 response_signal,
1558 response_coordinates,
1559 comment1,
1560 comment2,
1561 comment3,
1562 comment4,
1563 comment5,
1564 )
1565 spec_signal = data_array(
1566 FunctionTypes.TIME_RESPONSE,
1567 timesteps,
1568 spec_signal,
1569 response_coordinates,
1570 comment1,
1571 comment2,
1572 comment3,
1573 comment4,
1574 comment5,
1575 )
1577 if "reference_transformation_matrix" in ds[environment].variables:
1578 comment1 = drive_transform_comment1
1579 comment2 = drive_transform_comment2
1580 comment3 = drive_transform_comment3
1581 comment4 = drive_transform_comment4
1582 comment5 = drive_transform_comment5
1583 else:
1584 comment1 = full_comment1[drives]
1585 comment2 = full_comment2[drives]
1586 comment3 = full_comment3[drives]
1587 comment4 = full_comment4[drives]
1588 comment5 = full_comment5[drives]
1590 drive_signal = data_array(
1591 FunctionTypes.TIME_RESPONSE,
1592 timesteps,
1593 drive_signal,
1594 drive_coordinates,
1595 comment1,
1596 comment2,
1597 comment3,
1598 comment4,
1599 comment5,
1600 )
1602 return response_signal, spec_signal, drive_signal
1605def create_synthetic_test(
1606 spreadsheet_file_name: str,
1607 system_filename: str,
1608 system: System,
1609 excitation_coordinates: CoordinateArray,
1610 response_coordinates: CoordinateArray,
1611 rattlesnake_directory: str,
1612 displacement_derivative=2,
1613 sample_rate: int = None,
1614 time_per_read: float = None,
1615 time_per_write: float = None,
1616 integration_oversample: int = 10,
1617 environments: list = [],
1618 channel_comment_data: list = None,
1619 channel_serial_number_data: list = None,
1620 channel_triax_dof_data: list = None,
1621 channel_engineering_unit_data: list = None,
1622 channel_warning_level_data: list = None,
1623 channel_abort_level_data: list = None,
1624 channel_active_in_environment_data: dict = None,
1625):
1626 """Create a Rattlesnake synthetic-test spreadsheet and save the system model.
1628 Saves the structural dynamics *system* to *system_filename*, then uses the
1629 Rattlesnake ``components`` API to write a combined-environments profile
1630 template spreadsheet to *spreadsheet_file_name*. The channel table and
1631 hardware sheets are populated with the supplied coordinates, sample rate,
1632 and optional per-channel data.
1634 Parameters
1635 ----------
1636 spreadsheet_file_name : str
1637 Path (including filename) for the output Rattlesnake Excel spreadsheet.
1638 system_filename : str
1639 Path where the ``System`` object is saved before populating the sheet.
1640 system : System
1641 Structural dynamics system model to save.
1642 excitation_coordinates : CoordinateArray
1643 Coordinates for excitation (force/drive) channels.
1644 response_coordinates : CoordinateArray
1645 Coordinates for response (acceleration/measurement) channels.
1646 rattlesnake_directory : str
1647 Directory containing the Rattlesnake ``components`` Python package.
1648 displacement_derivative : int, optional
1649 Order of derivative used when mapping from displacement to the measured
1650 quantity (e.g. 2 for acceleration). Default is 2.
1651 sample_rate : int, optional
1652 Hardware sample rate in Hz written to the spreadsheet. Default is
1653 ``None`` (left blank).
1654 time_per_read : float, optional
1655 Acquisition frame duration in seconds. Default is ``None``.
1656 time_per_write : float, optional
1657 Output frame duration in seconds. Default is ``None``.
1658 integration_oversample : int, optional
1659 Integration oversample factor written to the Hardware sheet.
1660 Default is 10.
1661 environments : list of (str, str), optional
1662 List of ``(environment_type, environment_name)`` tuples describing the
1663 environments to include in the profile template. Default is ``[]``.
1664 channel_comment_data : list, optional
1665 Per-channel comment strings (column 4). Default is ``None``.
1666 channel_serial_number_data : list, optional
1667 Per-channel serial number values (column 5). Default is ``None``.
1668 channel_triax_dof_data : list, optional
1669 Per-channel triaxial DOF labels (column 6). Default is ``None``.
1670 channel_engineering_unit_data : list, optional
1671 Per-channel engineering unit strings (column 8). Default is ``None``.
1672 channel_warning_level_data : list, optional
1673 Per-channel warning level values (column 22). Default is ``None``.
1674 channel_abort_level_data : list, optional
1675 Per-channel abort level values (column 23). Default is ``None``.
1676 channel_active_in_environment_data : dict, optional
1677 Mapping of environment name to a boolean list indicating which channels
1678 are active in that environment. When ``None`` (default) all channels
1679 are marked active in every environment.
1680 """
1681 system.save(system_filename)
1682 # Load in Rattlesnake to create a template for the test
1683 sys.path.insert(0, rattlesnake_directory)
1684 import components as rs
1686 environment_data = []
1687 for environment_type, environment_name in environments:
1688 # Find the identifier
1689 environment_type = rs.environments.ControlTypes[environment_type.upper()]
1690 environment_data.append((environment_type, environment_name))
1691 rs.ui_utilities.save_combined_environments_profile_template(
1692 spreadsheet_file_name, environment_data
1693 )
1694 sys.path.pop(0)
1695 # Populate the channel table
1696 workbook = opxl.load_workbook(spreadsheet_file_name)
1697 worksheet = workbook.get_sheet_by_name("Channel Table")
1698 index = 3
1699 for i, channel in enumerate(response_coordinates):
1700 worksheet.cell(index, 1, i + 1)
1701 worksheet.cell(index, 2, channel.node)
1702 worksheet.cell(index, 3, _string_map[channel.direction])
1703 worksheet.cell(index, 12, "Virtual")
1704 worksheet.cell(index, 14, "Accel")
1705 index += 1
1706 for i, channel in enumerate(excitation_coordinates):
1707 worksheet.cell(index, 1, len(response_coordinates) + i + 1)
1708 worksheet.cell(index, 2, channel.node)
1709 worksheet.cell(index, 3, _string_map[channel.direction])
1710 worksheet.cell(index, 12, "Virtual")
1711 worksheet.cell(index, 14, "Force")
1712 worksheet.cell(index, 20, "Shaker")
1713 index += 1
1714 # Go through the various channel table data that could have been optionally
1715 # provided
1716 for column, data in [
1717 (4, channel_comment_data),
1718 (5, channel_serial_number_data),
1719 (6, channel_triax_dof_data),
1720 (8, channel_engineering_unit_data),
1721 (22, channel_warning_level_data),
1722 (23, channel_abort_level_data),
1723 ]:
1724 if data is None:
1725 continue
1726 for row_index, value in enumerate(data):
1727 worksheet.cell(3 + row_index, column, value)
1728 # Now fill out the environment table
1729 if channel_active_in_environment_data is not None:
1730 for environment_index, (environment_type, environment_name) in enumerate(environment_data):
1731 for row_index, value in enumerate(channel_active_in_environment_data[environment_name]):
1732 if value:
1733 worksheet.cell(3 + row_index, 24 + environment_index, "X")
1734 else:
1735 for environment_index, (environment_type, environment_name) in enumerate(environment_data):
1736 for row_index in range(response_coordinates.size + excitation_coordinates.size):
1737 worksheet.cell(3 + row_index, 24 + environment_index, "X")
1738 worksheet = workbook.get_sheet_by_name("Hardware")
1739 worksheet.cell(1, 2, 6)
1740 worksheet.cell(2, 2, os.path.abspath(system_filename))
1741 if sample_rate is not None:
1742 worksheet.cell(3, 2, sample_rate)
1743 if time_per_read is not None:
1744 worksheet.cell(4, 2, time_per_read)
1745 if time_per_write is not None:
1746 worksheet.cell(5, 2, time_per_write)
1747 worksheet.cell(6, 2, 1)
1748 worksheet.cell(7, 2, integration_oversample)
1749 workbook.save(spreadsheet_file_name)
1752def read_sine_control_data(
1753 control_file,
1754 read_quantities="control_response_signals_combined",
1755 excitation_dofs=None,
1756 control_dofs=None,
1757):
1758 """Read sine-control data from a Rattlesnake npz control file.
1760 Extracts one or more control quantities and returns them as
1761 ``TimeHistoryArray`` objects. Quantities whose keys are suffixed with
1762 block indices (e.g. ``control_response_signals_combined_0``) are
1763 concatenated along the last axis before wrapping.
1765 Parameters
1766 ----------
1767 control_file : str or numpy.lib.npyio.NpzFile
1768 Path to a Rattlesnake ``.npz`` control file, or an already-loaded
1769 ``NpzFile`` object.
1770 read_quantities : str or list of str, optional
1771 Name or list of names of the quantity/quantities to extract. Valid
1772 values are:
1774 *Concatenated* (built from numbered sub-keys):
1775 ``'control_response_signals_combined'``,
1776 ``'control_response_amplitudes'``,
1777 ``'control_response_phases'``,
1778 ``'control_drive_modifications'``
1780 *Unconcatenated* (single array):
1781 ``'control_response_frequencies'``,
1782 ``'control_response_arguments'``,
1783 ``'control_target_phases'``,
1784 ``'control_target_amplitudes'``
1786 Defaults to ``'control_response_signals_combined'``.
1787 excitation_dofs : CoordinateArray, optional
1788 Coordinate labels for excitation channels. When ``None`` (default)
1789 sequential integer nodes with direction 0 are used.
1790 control_dofs : CoordinateArray, optional
1791 Coordinate labels for response/control channels. When ``None``
1792 (default) sequential integer nodes with direction 0 are used.
1794 Returns
1795 -------
1796 TimeHistoryArray or list of TimeHistoryArray
1797 If *read_quantities* is a single string the return value is a single
1798 ``TimeHistoryArray``. If it is a list, a list of
1799 ``TimeHistoryArray`` objects is returned in the same order.
1801 Raises
1802 ------
1803 ValueError
1804 If any entry in *read_quantities* is not one of the valid quantity
1805 names listed above.
1806 """
1807 concatenated_keys = [
1808 "control_response_signals_combined",
1809 "control_response_amplitudes",
1810 "control_response_phases",
1811 "control_drive_modifications",
1812 ]
1813 unconcatenated_keys = [
1814 "control_response_frequencies",
1815 "control_response_arguments",
1816 "control_target_phases",
1817 "control_target_amplitudes",
1818 ]
1819 dimension_labels = {}
1820 dimension_labels["control_response_signals_combined"] = ("response", "timestep")
1821 dimension_labels["control_response_amplitudes"] = ("tone", "response", "timestep")
1822 dimension_labels["control_response_phases"] = ("tone", "response", "timestep")
1823 dimension_labels["control_drive_modifications"] = ("tone", "excitation", "block_num")
1824 dimension_labels["achieved_excitation_signals_combined"] = ("excitation", "timestep")
1825 dimension_labels["achieved_excitation_signals"] = ("tone", "excitation", "timestep")
1826 dimension_labels["control_response_frequencies"] = ("tone", "timestep")
1827 dimension_labels["control_response_arguments"] = ("tone", "timestep")
1828 dimension_labels["control_target_amplitudes"] = ("tone", "response", "timestep")
1829 dimension_labels["control_target_phases"] = ("tone", "response", "timestep")
1830 if isinstance(control_file, str):
1831 control_file = np.load(control_file)
1832 sample_rate = control_file["sample_rate"]
1833 if isinstance(read_quantities, str):
1834 read_quantities = [read_quantities]
1835 return_single = True
1836 else:
1837 return_single = False
1838 return_data = []
1839 for read_quantity in read_quantities:
1840 try:
1841 dimension_label = dimension_labels[read_quantity]
1842 except KeyError:
1843 raise ValueError(
1844 f"{read_quantity} is not a valid quantity to read. read_quantity must be one of {concatenated_keys+unconcatenated_keys}."
1845 )
1846 # Extract the data and concatenate if necessary
1847 if read_quantity in concatenated_keys:
1848 data = []
1849 for key in control_file:
1850 if read_quantity == "_".join(key.split("_")[:-1]):
1851 this_data = control_file[key]
1852 while this_data.ndim < len(dimension_label):
1853 this_data = this_data[..., np.newaxis]
1854 data.append(this_data)
1855 data = np.concatenate(data, axis=-1)
1856 elif read_quantity in unconcatenated_keys:
1857 data = control_file[read_quantity]
1858 else:
1859 raise ValueError(
1860 f"{read_quantity} is not a valid quantity to read. read_quantity must be one of {concatenated_keys+unconcatenated_keys}."
1861 )
1862 # Set up the abscissa
1863 if dimension_label[-1] == "timestep":
1864 abscissa = np.arange(data.shape[-1]) / sample_rate
1865 elif dimension_label[-1] == "block_num":
1866 abscissa = np.arange(data.shape[-1])
1867 else:
1868 raise ValueError(f"{dimension_label[-1]} is an invalid entry. How did you get here?")
1869 # Set up degrees of freedom
1870 if dimension_label[-2] == "response":
1871 if control_dofs is None:
1872 dofs = coordinate_array(np.arange(data.shape[-2]) + 1, 0)
1873 else:
1874 dofs = control_dofs
1875 elif dimension_label[-2] == "excitation":
1876 if excitation_dofs is None:
1877 dofs = coordinate_array(np.arange(data.shape[-2]) + 1, 0)
1878 else:
1879 dofs = excitation_dofs
1880 elif dimension_label[-2] == "tone":
1881 dofs = coordinate_array(np.arange(data.shape[-2]) + 1, 0)
1882 else:
1883 raise ValueError(f"{dimension_label[-2]} is an invalid entry. How did you get here?")
1884 if any([dimension == "tone" for dimension in dimension_label]):
1885 comment1 = control_file["names"].reshape(
1886 *[-1 if dimension == "tone" else 1 for dimension in dimension_label][:-1]
1887 )
1888 else:
1889 comment1 = ""
1890 # Construct the TimeHistoryArray
1891 return_data.append(data_array(FunctionTypes.TIME_RESPONSE, abscissa, data, dofs, comment1))
1892 if return_single:
1893 return_data = return_data[0]
1894 return return_data
1897class RattlesnakeRandomEnvironmentData:
1898 """Data for a Rattlesnake Random vibration environment.
1900 Parses variables, dimensions, and attributes from the environment group of a
1901 Rattlesnake nc4 file and provides convenience properties that return
1902 sdynpy array objects.
1904 Parameters
1905 ----------
1906 channel_coordinates : CoordinateArray
1907 Coordinate array for all physical channels in the parent
1908 ``RattlesnakeData`` object. Used to resolve control-channel
1909 coordinates without storing a back-reference to the parent.
1910 sysid_averages : int
1911 Number of averages used during system identification.
1912 samples_per_frame : int
1913 Number of time samples per data frame.
1914 cpsd_overlap : int
1915 Overlap count used when computing the CPSD.
1916 cpsd_window : str
1917 Window function name applied before CPSD computation.
1918 specification_frequency_lines : np.ndarray
1919 Frequency axis (Hz) for the specification CPSD matrices.
1920 specification_cpsd_matrix : np.ndarray
1921 Target CPSD specification matrix with shape
1922 ``(n_freq, n_control, n_control)``.
1923 specification_warning_matrix : np.ndarray
1924 Warning-band CPSD matrix with shape ``(2, n_freq, n_control, n_control)``
1925 where index 0 is the lower bound and index 1 is the upper bound.
1926 specification_abort_matrix : np.ndarray
1927 Abort-band CPSD matrix with shape ``(2, n_freq, n_control, n_control)``
1928 where index 0 is the lower bound and index 1 is the upper bound.
1929 control_channel_indices : np.ndarray
1930 Integer indices into the parent channel table identifying the control
1931 channels for this environment.
1932 fft_lines : int
1933 Number of FFT frequency lines used in control.
1934 **kwargs
1935 Any additional nc4 attributes, variables, or dimensions not covered
1936 by the explicit parameters above are stored directly as instance
1937 attributes via ``__setattr__``.
1938 """
1940 def __init__(
1941 self,
1942 channel_coordinates: CoordinateArray,
1943 sysid_averages: int,
1944 samples_per_frame: int,
1945 cpsd_overlap: int,
1946 cpsd_window: str,
1947 specification_frequency_lines: np.ndarray,
1948 specification_cpsd_matrix: np.ndarray,
1949 specification_warning_matrix: np.ndarray,
1950 specification_abort_matrix: np.ndarray,
1951 control_channel_indices: np.ndarray,
1952 fft_lines: int,
1953 sysid_frame_size: int = None,
1954 sysid_averaging_type: str = None,
1955 sysid_noise_averages: int = None,
1956 sysid_exponential_averaging_coefficient: float = None,
1957 sysid_estimator: str = None,
1958 sysid_level: float = None,
1959 sysid_level_ramp_time: float = None,
1960 sysid_signal_type: str = None,
1961 sysid_window: str = None,
1962 sysid_overlap: float = None,
1963 sysid_burst_on: float = None,
1964 sysid_pretrigger: float = None,
1965 sysid_burst_ramp_fraction: float = None,
1966 sysid_low_frequency_cutoff: float = None,
1967 sysid_high_frequency_cutoff: float = None,
1968 test_level_ramp_time: float = None,
1969 update_tf_during_control: int = None,
1970 cola_window: str = None,
1971 cola_overlap: float = None,
1972 cola_window_exponent: float = None,
1973 frames_in_cpsd: int = None,
1974 control_python_script: os.pathlike = None,
1975 control_python_function: str = None,
1976 control_python_function_type: int = None,
1977 control_python_function_parameters: str = None,
1978 allow_automatic_aborts: int = None,
1979 specification_channels: int = None,
1980 control_channels: int = None,
1981 **kwargs,
1982 ):
1983 """Store all environment parameters as instance attributes.
1985 Every named parameter is assigned directly to ``self``. Any
1986 additional keyword arguments (extra nc4 attributes, variables, or
1987 dimensions) are stored via ``__setattr__``.
1988 """
1989 self.channel_coordinates = channel_coordinates
1990 self.sysid_averages = sysid_averages
1991 self.samples_per_frame = samples_per_frame
1992 self.cpsd_overlap = cpsd_overlap
1993 self.cpsd_window = cpsd_window.lower()
1994 self.specification_frequency_lines = specification_frequency_lines
1995 self.specification_cpsd_matrix = specification_cpsd_matrix
1996 self.specification_warning_matrix = specification_warning_matrix
1997 self.specification_abort_matrix = specification_abort_matrix
1998 self.control_channel_indices = control_channel_indices
1999 self.fft_lines = fft_lines
2000 self.sysid_frame_size = sysid_frame_size
2001 self.sysid_averaging_type = sysid_averaging_type
2002 self.sysid_noise_averages = sysid_noise_averages
2003 self.sysid_exponential_averaging_coefficient = sysid_exponential_averaging_coefficient
2004 self.sysid_estimator = sysid_estimator
2005 self.sysid_level = sysid_level
2006 self.sysid_level_ramp_time = sysid_level_ramp_time
2007 self.sysid_signal_type = sysid_signal_type
2008 self.sysid_window = sysid_window.lower()
2009 self.sysid_overlap = sysid_overlap
2010 self.sysid_burst_on = sysid_burst_on
2011 self.sysid_pretrigger = sysid_pretrigger
2012 self.sysid_burst_ramp_fraction = sysid_burst_ramp_fraction
2013 self.sysid_low_frequency_cutoff = sysid_low_frequency_cutoff
2014 self.sysid_high_frequency_cutoff = sysid_high_frequency_cutoff
2015 self.test_level_ramp_time = test_level_ramp_time
2016 self.update_tf_during_control = update_tf_during_control
2017 self.cola_window = cola_window
2018 self.cola_overlap = cola_overlap
2019 self.cola_window_exponent = cola_window_exponent
2020 self.frames_in_cpsd = frames_in_cpsd
2021 self.control_python_script = control_python_script
2022 self.control_python_function = control_python_function
2023 self.control_python_function_type = control_python_function_type
2024 self.control_python_function_parameters = control_python_function_parameters
2025 self.allow_automatic_aborts = allow_automatic_aborts
2026 self.specification_channels = specification_channels
2027 self.control_channels = control_channels
2028 for key, value in kwargs.items():
2029 self.__setattr__(key, value)
2031 @classmethod
2032 def load_from_env_group(
2033 cls, parent: "RattlesnakeData", env_group: nc4.Group
2034 ) -> "RattlesnakeRandomEnvironmentData":
2035 """Construct an instance by reading all data from an nc4 environment group.
2037 Loads nc4 attributes, variables (including complex split-variable pairs
2038 stored as ``<name>_real`` / ``<name>_imag``), and dimension sizes from
2039 *env_group*, then calls the constructor with
2040 ``channel_coordinates=parent.coordinate``.
2042 Parameters
2043 ----------
2044 parent : RattlesnakeData
2045 The top-level data object whose ``coordinate`` property provides
2046 the full physical-channel coordinate array.
2047 env_group : netCDF4.Group
2048 The nc4 group for this environment (e.g. ``data['Random']``).
2050 Returns
2051 -------
2052 RattlesnakeRandomEnvironmentData
2053 Fully populated environment data object.
2054 """
2056 kwargs = {}
2057 # Load environment attributes
2058 for attr in env_group.ncattrs():
2059 kwargs[attr] = env_group.__getattr__(attr)
2061 # Load environment variables (handle complex split-variable convention)
2062 loaded_vars = set()
2063 for var in list(env_group.variables.keys()):
2064 if var.endswith(("_real", "_imag")):
2065 base = var.removesuffix("_real").removesuffix("_imag")
2066 if base in loaded_vars:
2067 continue
2068 else:
2069 loaded_vars.add(base)
2070 kwargs[base] = np.vectorize(complex)(
2071 env_group[base + "_real"][:], env_group[base + "_imag"][:]
2072 )
2073 else:
2074 loaded_vars.add(var)
2075 kwargs[var] = np.array(env_group[var])
2077 # Load environment dimensions
2078 for dim in env_group.dimensions:
2079 kwargs[dim] = env_group.dimensions[dim].size
2081 # Create the object
2082 obj = cls(channel_coordinates=parent.coordinate, **kwargs)
2084 return obj
2086 @property
2087 def control_coordinate(self) -> CoordinateArray:
2088 """Coordinate array for the control channels of this environment.
2090 When a ``response_transformation_matrix`` is present the coordinates
2091 are synthetic sequential nodes (1-based, direction 0) whose count
2092 equals the number of transformed virtual channels. Otherwise
2093 ``channel_coordinates`` is indexed by ``control_channel_indices`` to
2094 return the physical control-channel coordinates.
2096 Returns
2097 -------
2098 CoordinateArray
2099 Coordinates for each control channel (or virtual transformed
2100 channel) in this environment.
2101 """
2102 if hasattr(self, "response_transformation_matrix"):
2103 control_coordinates = coordinate_array(
2104 node=np.arange(self.response_transformation_matrix.shape[0]) + 1, direction=0
2105 )
2106 else:
2107 control_coordinates = self.channel_coordinates[self.control_channel_indices]
2108 return control_coordinates
2110 @property
2111 def specification_cpsd(self) -> PowerSpectralDensityArray:
2112 """Target CPSD specification for this random environment.
2114 Assembles a square ``PowerSpectralDensityArray`` from the
2115 ``specification_cpsd_matrix`` and ``specification_frequency_lines``
2116 attributes loaded from the nc4 file. Returns ``None`` when any of
2117 those attributes are absent.
2119 Returns
2120 -------
2121 PowerSpectralDensityArray or None
2122 Square cross-power spectral density array with shape
2123 ``(n_control, n_control)`` at each frequency line, or ``None``
2124 if the required attributes are not available.
2125 """
2126 if (
2127 hasattr(self, "specification_cpsd_matrix")
2128 and hasattr(self, "control_coordinate")
2129 and hasattr(self, "specification_frequency_lines")
2130 ):
2131 return power_spectral_density_array(
2132 abscissa=self.specification_frequency_lines,
2133 ordinate=np.moveaxis(self.specification_cpsd_matrix, 0, -1),
2134 coordinate=outer_product(self.control_coordinate, self.control_coordinate),
2135 )
2137 @property
2138 def specification_warning_psd(self) -> PowerSpectralDensityArray:
2139 """Upper and lower warning-band PSDs for this random environment.
2141 Assembles two ``PowerSpectralDensityArray`` objects from the lower
2142 (index 0) and upper (index -1) slices of ``specification_warning_matrix``
2143 along the first axis and concatenates them into a two-element array.
2144 Returns ``None`` when any required attribute is absent.
2146 Returns
2147 -------
2148 PowerSpectralDensityArray or None
2149 Array with shape ``(2, n_control)`` where index 0 is the lower
2150 warning bound and index 1 is the upper warning bound, or ``None``
2151 if ``specification_abort_matrix``, ``control_coordinate``, or
2152 ``specification_frequency_lines`` are not available.
2153 """
2154 if (
2155 hasattr(self, "specification_abort_matrix")
2156 and hasattr(self, "control_coordinate")
2157 and hasattr(self, "specification_frequency_lines")
2158 ):
2159 output_low = power_spectral_density_array(
2160 abscissa=self.specification_frequency_lines,
2161 ordinate=np.moveaxis(self.specification_warning_matrix[0, ...], 0, -1),
2162 coordinate=np.tile(self.control_coordinate, (2, 1)).T,
2163 )
2164 output_high = power_spectral_density_array(
2165 abscissa=self.specification_frequency_lines,
2166 ordinate=np.moveaxis(self.specification_warning_matrix[-1, ...], 0, -1),
2167 coordinate=np.tile(self.control_coordinate, (2, 1)).T,
2168 )
2169 return np.concatenate((output_low[np.newaxis, :], output_high[np.newaxis, :]))
2171 @property
2172 def specification_abort_psd(self) -> PowerSpectralDensityArray:
2173 """Upper and lower abort-band PSDs for this random environment.
2175 Assembles two ``PowerSpectralDensityArray`` objects from the lower
2176 (index 0) and upper (index -1) slices of ``specification_abort_matrix``
2177 along the first axis and concatenates them into a two-element array.
2178 Returns ``None`` when any required attribute is absent.
2180 Returns
2181 -------
2182 PowerSpectralDensityArray or None
2183 Array with shape ``(2, n_control)`` where index 0 is the lower
2184 abort bound and index 1 is the upper abort bound, or ``None`` if
2185 ``specification_warning_matrix``, ``control_coordinate``, or
2186 ``specification_frequency_lines`` are not available.
2187 """
2188 if (
2189 hasattr(self, "specification_warning_matrix")
2190 and hasattr(self, "control_coordinate")
2191 and hasattr(self, "specification_frequency_lines")
2192 ):
2193 output_low = power_spectral_density_array(
2194 abscissa=self.specification_frequency_lines,
2195 ordinate=np.moveaxis(self.specification_abort_matrix[0, ...], 0, -1),
2196 coordinate=np.tile(self.control_coordinate, (2, 1)).T,
2197 )
2198 output_high = power_spectral_density_array(
2199 abscissa=self.specification_frequency_lines,
2200 ordinate=np.moveaxis(self.specification_abort_matrix[-1, ...], 0, -1),
2201 coordinate=np.tile(self.control_coordinate, (2, 1)).T,
2202 )
2203 return np.concatenate((output_low[np.newaxis, :], output_high[np.newaxis, :]))
2206class RattlesnakeData:
2207 """Top-level container for all data read from a Rattlesnake nc4 file.
2209 Provides the global channel table, per-environment data objects, and
2210 convenience methods for assembling sdynpy array objects directly from the
2211 file data.
2213 Instances are normally created via :meth:`read_rattlesnake_nc4` rather
2214 than by calling the constructor directly.
2216 Parameters
2217 ----------
2218 sample_rate : int
2219 Acquisition sample rate in Hz.
2220 time_data : np.ndarray
2221 Raw time-domain data array loaded from the nc4 file.
2222 channel_table : pd.DataFrame
2223 Channel metadata table loaded from the ``channels`` group of the
2224 nc4 file.
2225 file_version : str, optional
2226 Version string stored in the nc4 file's global attributes.
2227 time_per_write : float, optional
2228 Duration of each write block in seconds.
2229 time_per_read : float, optional
2230 Duration of each read block in seconds.
2231 hardware : int, optional
2232 Hardware identifier stored in the nc4 file.
2233 hardware_file : str, optional
2234 Path to the hardware configuration file recorded in the nc4 file.
2235 output_oversample : int, optional
2236 Output oversample factor.
2237 task_trigger : int, optional
2238 Trigger channel index used to start/stop acquisition.
2239 task_trigger_output_channel : str, optional
2240 Name of the output channel used for task triggering.
2241 environment_names : list or np.ndarray, optional
2242 Ordered list of environment group names present in the nc4 file.
2243 Default is ``[]``.
2244 environment_active_channels : list or np.ndarray, optional
2245 Per-environment active channel masks. Default is ``[]``.
2246 environments : dict, optional
2247 Mapping from environment name to the corresponding
2248 :class:`RattlesnakeRandomEnvironmentData` instance. Populated by
2249 :meth:`read_rattlesnake_nc4`. Default is ``{}``.
2250 **kwargs
2251 Any additional global nc4 attributes or variables not covered by
2252 the explicit parameters above are stored as instance attributes via
2253 ``__setattr__``.
2254 """
2256 # All known environment data classes, in order of preference.
2257 _ENV_CLASSES = (RattlesnakeRandomEnvironmentData,)
2259 def __init__(
2260 self,
2261 sample_rate: int,
2262 time_data: np.ndarray,
2263 channel_table: pd.DataFrame,
2264 file_version: str = None,
2265 time_per_write: float = None,
2266 time_per_read: float = None,
2267 hardware: int = None,
2268 hardware_file: str = None,
2269 output_oversample: int = None,
2270 task_trigger: int = None,
2271 task_trigger_output_channel: str = None,
2272 environment_names: list | np.ndarray = [],
2273 environment_active_channels: list | np.ndarray = [],
2274 environments: dict[str, RattlesnakeRandomEnvironmentData] = {},
2275 **kwargs,
2276 ):
2277 """Store all data parameters as instance attributes.
2279 Every named parameter is assigned directly to ``self``. Any
2280 additional keyword arguments (extra global nc4 attributes or variables)
2281 are stored via ``__setattr__``. See the class docstring for parameter
2282 descriptions.
2283 """
2284 self.sample_rate = sample_rate
2285 self.time_data = time_data
2286 self.channel_table = channel_table
2287 self.file_version = file_version
2288 self.time_per_write = time_per_write
2289 self.time_per_read = time_per_read
2290 self.hardware = hardware
2291 self.hardware_file = hardware_file
2292 self.output_oversample = output_oversample
2293 self.task_trigger = task_trigger
2294 self.task_trigger_output_channel = task_trigger_output_channel
2295 self.environment_names = environment_names
2296 self.environment_active_channels = environment_active_channels
2297 self.environments = environments
2298 for key, value in kwargs.items():
2299 self.__setattr__(key, value)
2301 @classmethod
2302 def read_rattlesnake_nc4(cls, filename: os.PathLike | nc4.Dataset):
2303 """Read a Rattlesnake nc4 file and return a populated data object.
2305 Loads global nc4 attributes, global variables (including all
2306 ``time_data*`` streams), the channel table, and each environment group
2307 into a new ``RattlesnakeData`` instance. Each environment group is
2308 tried against every class in ``_ENV_CLASSES`` (currently only
2309 :class:`RattlesnakeRandomEnvironmentData`); the first class that loads
2310 without error wins.
2312 Parameters
2313 ----------
2314 filename : os.PathLike or netCDF4.Dataset
2315 Path to a Rattlesnake nc4 file, or an already-open
2316 ``netCDF4.Dataset``.
2318 Returns
2319 -------
2320 RattlesnakeData
2321 Fully populated data object with ``channel_table`` and
2322 ``environments`` attributes set.
2324 Raises
2325 ------
2326 Warning
2327 If environment groups are present in the file but none of the
2328 known environment classes can load any of them.
2329 """
2331 if isinstance(filename, nc4.Dataset):
2332 data = filename
2333 else:
2334 data = nc4.Dataset(filename)
2336 kwargs = {}
2337 # get global attributes
2338 for attr in data.ncattrs():
2339 kwargs[attr] = data.__getattr__(attr)
2341 # get global variables
2342 for var in list(data.variables.keys()):
2343 kwargs[var] = np.array(data.variables[var])
2345 # get channel table
2346 array = {
2347 name: np.array(variable[:])
2348 for name, variable in data.groups["channels"].variables.items()
2349 }
2350 kwargs["channel_table"] = pd.DataFrame(array)
2352 # create the object
2353 obj = cls(**kwargs)
2355 # get environment properties — detect which class to use for the environment - assign them to return object
2356 for env_name in obj.environment_names:
2357 env_group = data[env_name]
2358 for env_cls in cls._ENV_CLASSES:
2359 try:
2360 obj.environments[env_name] = env_cls.load_from_env_group(
2361 parent=obj, env_group=env_group
2362 )
2363 break # first class that loads cleanly wins
2364 except Exception:
2365 continue
2367 if not obj.environments:
2368 warnings.warn("None of the known environment classes could load any environment group from this file.")
2369 return obj
2371 @property
2372 def coordinate(self) -> CoordinateArray:
2373 """Coordinate array for all physical channels.
2375 Builds a ``CoordinateArray`` from the ``node_number`` and
2376 ``node_direction`` columns of the channel table.
2378 Returns
2379 -------
2380 CoordinateArray
2381 Coordinate array for all physical channels.
2382 """
2383 nodestrings = [
2384 "".join(char for char in node if char in "0123456789")
2385 for node in self.channel_table["node_number"]
2386 ]
2387 nodes = [i if node == "" else int(node) for i, node in enumerate(nodestrings)]
2388 directions = np.array(self.channel_table["node_direction"][:], dtype="<U3")
2389 return coordinate_array(nodes, directions)
2391 def get_coordinate(self, env_name: str = None) -> CoordinateArray:
2392 """Coordinate array for all channels, optionally including virtual channels.
2394 Parameters
2395 ----------
2396 env_name : str, optional
2397 Name of the environment to query for a response transformation
2398 matrix. When ``None`` only the physical channel coordinates are
2399 returned.
2401 Returns
2402 -------
2403 CoordinateArray
2404 Physical channel coordinates, with virtual transformed-channel
2405 coordinates appended when *env_name* is supplied and a
2406 ``response_transformation_matrix`` is present.
2407 """
2408 coordinates = self.coordinate
2409 if env_name is not None and hasattr(
2410 self.environments[env_name], "response_transformation_matrix"
2411 ):
2412 virtual_coordinates = coordinate_array(
2413 node=np.arange(self.environments[env_name].response_transformation_matrix.shape[0])
2414 + 1,
2415 direction=0,
2416 )
2417 coordinates = np.concatenate([coordinates, virtual_coordinates])
2418 return coordinates
2420 @property
2421 def excitation_coordinate(self) -> CoordinateArray:
2422 """Coordinate array for excitation (drive) channels.
2424 Selects channels whose ``feedback_channel`` entry in the channel table
2425 is non-empty, which identifies them as drive/excitation channels.
2427 Returns
2428 -------
2429 CoordinateArray
2430 Subset of the full channel coordinate array containing only
2431 channels that have a non-empty ``feedback_channel`` value.
2432 """
2433 excitation_coordinate = self.coordinate[
2434 self.channel_table.feedback_channel.astype(str).str.strip().astype(bool)
2435 ]
2436 return excitation_coordinate
2438 @property
2439 def response_coordinate(self) -> CoordinateArray:
2440 """Coordinate array for response (non-excitation) channels.
2442 Returns all channel coordinates that are not present in
2443 :attr:`excitation_coordinate`.
2445 Returns
2446 -------
2447 CoordinateArray
2448 Subset of the full channel coordinate array containing only
2449 channels that do not have a non-empty ``feedback_channel`` value.
2450 """
2451 return np.setdiff1d(self.coordinate, self.excitation_coordinate)
2453 def units(self, env_name: str = None) -> np.ndarray:
2454 """Engineering-unit label for each channel, optionally including virtual channels.
2456 Reads the ``unit`` column of the channel table. When *env_name* is
2457 provided and the named environment contains a
2458 ``response_transformation_matrix``, the array is zero-padded (empty
2459 string ``''``) for each virtual transformed channel.
2461 Parameters
2462 ----------
2463 env_name : str, optional
2464 Name of the environment to query for a response transformation
2465 matrix. When ``None`` (default) only the physical channel units
2466 are returned.
2468 Returns
2469 -------
2470 np.ndarray of str
2471 Array of unit strings, one per channel (physical channels first,
2472 followed by empty-string placeholders for virtual transformed
2473 channels when *env_name* is supplied and a transformation matrix
2474 is present).
2475 """
2476 units = self.channel_table["unit"].astype(str).to_numpy()
2477 # Append Units for Virtual Channels of 'Transformed' responses
2478 if env_name is not None and hasattr(
2479 self.environments[env_name], "response_transformation_matrix"
2480 ):
2481 units = np.pad(
2482 units,
2483 (0, self.environments[env_name].response_transformation_matrix.shape[0]),
2484 constant_values="",
2485 ).astype(str)
2486 return units
2488 def get_time_data(
2489 self, index: int | None = None, env_name: str = None
2490 ) -> TimeHistoryArray | list[TimeHistoryArray]:
2491 """Build ``TimeHistoryArray`` objects from the source nc4 file data.
2493 Iterates over all ``time_data`` variables found on this data object
2494 (populated by :meth:`read_rattlesnake_nc4`), converts each to a
2495 ``TimeHistoryArray``, and optionally applies a response transformation
2496 matrix to produce virtual channel time histories which are appended to
2497 the array.
2499 Parameters
2500 ----------
2501 index : int or None, optional
2502 Zero-based index selecting a single time-data stream. When
2503 ``None`` (default) all streams are returned as a list.
2504 env_name : str, optional
2505 Name of the environment to query for a ``response_transformation_matrix``.
2506 When provided and the environment has such a matrix, the
2507 transformed virtual-channel responses are computed and appended
2508 to each ``TimeHistoryArray``. When ``None`` (default) no
2509 transformation is applied.
2511 Returns
2512 -------
2513 TimeHistoryArray or list of TimeHistoryArray or None
2514 A single ``TimeHistoryArray`` when *index* is specified, or a list
2515 of ``TimeHistoryArray`` objects (one per time-data stream) when
2516 *index* is ``None``. Returns ``None`` if ``time_data`` or
2517 ``sample_rate`` attributes are not present on this object.
2518 """
2519 if hasattr(self, "time_data") and hasattr(self, "sample_rate"):
2520 names = [name for name in vars(self).keys() if name.startswith("time_data")]
2521 if index is not None:
2522 names = [names[index]]
2523 output = []
2524 for name in names:
2525 time_data_array = getattr(self, name)
2526 abscissa = np.arange(0, time_data_array.shape[-1]) / self.sample_rate
2527 time_data = time_history_array(
2528 abscissa=abscissa,
2529 ordinate=time_data_array,
2530 coordinate=self.coordinate[:, np.newaxis],
2531 )
2533 if env_name is not None:
2534 # If a Response Transformation is Used, Compute the Transformed Responses and append them to the end of the time_data with sequential nodes starting at 1 and increasing to the total number of transformed responses.
2535 if hasattr(self.environments[env_name], "response_transformation_matrix"):
2536 column_coordinates = self.coordinate[
2537 self.environments[env_name].control_channel_indices
2538 ]
2539 row_coordinates = coordinate_array(
2540 node=np.arange(
2541 self.environments[env_name].response_transformation_matrix.shape[0]
2542 )
2543 + 1,
2544 direction=0,
2545 )
2546 response_transformation = matrix(
2547 matrix=self.environments[env_name].response_transformation_matrix,
2548 row_coordinate=row_coordinates,
2549 column_coordinate=column_coordinates,
2550 )
2551 time_data_transformed = time_data[
2552 self.environments[env_name].control_channel_indices
2553 ].apply_transformation(response_transformation)
2554 time_data = np.concatenate([time_data, time_data_transformed])
2555 output.append(time_data)
2556 return output if index is None else output[0]