Coverage for src / sdynpy / fileio / sdynpy_tshaker.py: 8%
198 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-11 16:22 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-11 16:22 +0000
1# -*- coding: utf-8 -*-
2"""
3This module handles reading data from output files of T-shaker, which is a
4Labview-based Vibration software in use at Sandia National Laboratories.
6T-shaker writes TDMS files natively, but can also output .mat files as well.
7There are .mat file formats for shaker shock, random vibration, and time
8histories.
9"""
10"""
11Copyright 2022 National Technology & Engineering Solutions of Sandia,
12LLC (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S.
13Government retains certain rights in this software.
15This program is free software: you can redistribute it and/or modify
16it under the terms of the GNU General Public License as published by
17the Free Software Foundation, either version 3 of the License, or
18(at your option) any later version.
20This program is distributed in the hope that it will be useful,
21but WITHOUT ANY WARRANTY; without even the implied warranty of
22MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23GNU General Public License for more details.
25You should have received a copy of the GNU General Public License
26along with this program. If not, see <https://www.gnu.org/licenses/>.
27"""
29import numpy as np
30try:
31 import nptdms as tdms
32except ImportError:
33 tdms = None
35from ..core.sdynpy_coordinate import (coordinate_array, outer_product,
36 CoordinateArray, _string_map)
37from ..core.sdynpy_data import data_array, FunctionTypes
38from glob import glob
39import os
40from scipy.io import loadmat
43def read_tdms(file, coordinate_property="Ch_Location",
44 coordinate_string_map=None, invalid_coordinate_value='999999',
45 min_time=None, max_time=None, channel_indices=None):
46 if tdms is None:
47 raise ValueError('Must have npTDMS installed to import T-shaker files')
48 # Read in the file if we've passed a string.
49 if not isinstance(file, tdms.TdmsFile):
50 file = tdms.TdmsFile.read(file)
52 # Set up the coordinate_string_map if necessary
53 if coordinate_string_map is None:
54 coordinate_string_map = {}
56 # Go through and get the metadata
57 dt = 1/file['TH Data Info'].properties['Sample Rate']
58 run_number = file['Test Run Info'].properties['Run Number']
59 run_description = file['Test Run Info'].properties['Run Description']
60 test_axis = file['Test Run Info'].properties['Test Axis']
61 test_type = file['Test Run Info'].properties['Test Type']
62 test_title = file['Test Series Info'].properties['Test Series Title']
64 # Set up abscissa limits
65 if min_time is None:
66 min_index = 0
67 else:
68 min_index = int(np.ceil(min_time / dt))
69 if max_time is None:
70 max_index = None
71 else:
72 max_index = int(np.floor(max_time / dt))+1
73 abscissa_slice = slice(min_index, max_index)
75 # Now let's go through the channels
76 data = []
77 for i, channel in enumerate(file['TH Data'].channels()):
78 if channel_indices is not None and i not in channel_indices:
79 continue
80 print('Reading {:}'.format(channel.name))
81 dsa_device = channel.properties['DSA_Device']
82 dsa_sn = channel.properties['DSA_S/N']
83 dsa_channel = channel.properties['DSA_Channel']
84 trans_type = channel.properties['Trans_Type']
85 ch_eu = channel.properties['Ch_EU']
86 trans_model = channel.properties['Trans_Model']
87 trans_axis = channel.properties['Trans_Axis']
88 trans_sn = channel.properties['Trans_S/N']
89 coordinate = channel.properties[coordinate_property]
90 if coordinate in coordinate_string_map:
91 coordinate = coordinate_string_map[coordinate]
92 try:
93 coordinate = coordinate_array(string_array=coordinate)
94 except (KeyError, ValueError):
95 coordinate = coordinate_array(string_array=invalid_coordinate_value)
96 time_data = channel[abscissa_slice]
97 num_elements = time_data.size
98 abscissa = np.arange(num_elements)*dt + min_index*dt
99 # Comment 1 will be Trans_Type :: Ch_EU
100 # Comment 2 will be DSA_Device DSA_S/N :: DSA_Channel
101 # Comment 3 will be Test Series Title :: Run Number :: Run Description, Test Axis :: Test Type
102 # Comment 5 will be the coordinate value as a string, just in case we can't convert it to a coordinate
103 comment1 = 'Type: {:}; Unit: {:}'.format(trans_type, ch_eu)
104 comment2 = 'Device: {:}; S/N: {:}; Ch: {:}'.format(dsa_device, dsa_sn, dsa_channel)
105 comment3 = '{:} Run {:}: {:} {:} {:}'.format(test_title, run_number, run_description, test_axis, test_type)
106 comment4 = 'Sensor: {:} {:} {:}'.format(trans_model, trans_sn, trans_axis)
107 comment5 = '{:}'.format(channel.properties[coordinate_property])
108 # Now create the data object
109 data.append(data_array(FunctionTypes.TIME_RESPONSE, abscissa, time_data, coordinate,
110 comment1, comment2, comment3, comment4, comment5)[np.newaxis])
111 data = np.concatenate(data)
112 return data
115def read_mat_time_history(data_directory, coordinate_property="Ch_ID",
116 coordinate_string_map=None, invalid_coordinate_value='999999',
117 min_time=None, max_time=None, file_pattern='Ch*_THData.mat',
118 file_sort_key=lambda x: int(os.path.split(x)[-1].split('_')[0].replace('Ch', ''))):
119 # Set up the coordinate_string_map if necessary
120 if coordinate_string_map is None:
121 coordinate_string_map = {}
123 # Find files in the directory
124 mat_files = glob(os.path.join(data_directory, file_pattern))
125 # Sort the files by number
126 mat_files = sorted(mat_files, key=file_sort_key)
127 data_arrays = []
128 for i, data_file in enumerate(mat_files):
129 print('Reading File {:}'.format(data_file))
130 data = loadmat(data_file)
131 # Get sample rate
132 dt = data['dt'].squeeze()
133 # Set up abscissa limits
134 if min_time is None:
135 min_index = 0
136 else:
137 min_index = int(np.ceil(min_time / dt))
138 if max_time is None:
139 max_index = None
140 else:
141 max_index = int(np.floor(max_time / dt))+1
142 abscissa_slice = slice(min_index, max_index)
143 # Get metadata
144 run_number = str(data['Test_RunNum'].squeeze())
145 run_description = str(data['Test_Description'].squeeze())
146 test_axis = str(data['Test_Axis'].squeeze())
147 dsa_channel = str(data['Daq_Ch'].squeeze())
148 trans_type = str(data['Data_Description'].squeeze())
149 ch_eu = str(data['Ch_Unit'].squeeze())
150 trans_model = str(data['Trans_Model'].squeeze())
151 trans_sn = str(data['Trans_SN'].squeeze())
152 coordinate = str(data[coordinate_property].squeeze())
153 if coordinate in coordinate_string_map:
154 coordinate = coordinate_string_map[coordinate]
155 try:
156 coordinate = coordinate_array(string_array=coordinate)
157 except (KeyError, ValueError):
158 coordinate = coordinate_array(string_array=invalid_coordinate_value)
159 # Get the data
160 time_data = data['THData'].squeeze()[abscissa_slice]
161 num_elements = time_data.size
162 abscissa = np.arange(num_elements)*dt + min_index*dt
163 comment1 = 'Type: {:}; Unit: {:}'.format(trans_type, ch_eu)
164 comment2 = 'Channel: {:}'.format(dsa_channel)
165 comment3 = 'Run {:}: {:} {:}'.format(run_number, run_description, test_axis)
166 comment4 = 'Sensor: {:} {:}'.format(trans_model, trans_sn)
167 comment5 = '{:}'.format(str(data[coordinate_property].squeeze()))
168 # Now create the data object
169 data_arrays.append(data_array(FunctionTypes.TIME_RESPONSE, abscissa, time_data, coordinate,
170 comment1, comment2, comment3, comment4, comment5)[np.newaxis])
171 data_arrays = np.concatenate(data_arrays)
172 return data_arrays
175def read_mat_shock(data_file, coordinate_property="Ch_Info",
176 coordinate_string_map=None, invalid_coordinate_value='999999',
177 min_time=None, max_time=None, read_filtered_time_data=True):
178 # Set up the coordinate_string_map if necessary
179 if coordinate_string_map is None:
180 coordinate_string_map = {}
181 data = loadmat(data_file)
182 # Get sample rate
183 dt = data['dt'].squeeze()
184 # Set up abscissa limits
185 if min_time is None:
186 min_index = 0
187 else:
188 min_index = int(np.ceil(min_time / dt))
189 if max_time is None:
190 max_index = None
191 else:
192 max_index = int(np.floor(max_time / dt))+1
193 abscissa_slice = slice(min_index, max_index)
194 # Get metadata
195 run_number = str(data['Test_RunNum'].squeeze())
196 run_description = str(data['Test_Description'].squeeze())
197 test_axis = str(data['Test_Axis'].squeeze())
198 dsa_channel = [str(ch) for ch in data['Daq_Ch'].squeeze()]
199 trans_type = [str(ch) for ch in data['Data_Description'].squeeze()]
200 ch_eu = [str(ch) for ch in data['Ch_Unit'].squeeze()]
201 coordinates_raw = data[coordinate_property]
202 coordinates = []
203 for coordinate in coordinates_raw:
204 coordinate = str(coordinate).strip()
205 if coordinate in coordinate_string_map:
206 coordinate = coordinate_string_map[coordinate]
207 try:
208 coordinate = coordinate_array(string_array=coordinate)
209 except (KeyError, ValueError):
210 coordinate = coordinate_array(string_array=invalid_coordinate_value)
211 coordinates.append(coordinate[np.newaxis])
212 coordinates = np.concatenate(coordinates)[:, np.newaxis]
213 time_data = data['shk' if read_filtered_time_data else 'unfilshk'][abscissa_slice].T
214 num_elements = time_data.shape[-1]
215 abscissa = np.arange(num_elements)*dt + min_index*dt
216 if len(trans_type) == 0:
217 comment1 = ['Unit: {:}'.format(ch) for ch in ch_eu]
218 else:
219 comment1 = ['Type: {:}; Unit: {:}'.format(t, ch) for t, ch in zip(trans_type, ch_eu)]
220 comment2 = ['Channel: {:}'.format(ch) for ch in dsa_channel]
221 comment3 = 'Run {:}: {:} {:}'.format(run_number, run_description, test_axis)
222 comment5 = data[coordinate_property]
223 time_data = data_array(FunctionTypes.TIME_RESPONSE, abscissa, time_data,
224 coordinates, comment1, comment2, comment3,
225 comment5=comment5)
226 return time_data
229def read_mat_random(data_file, coordinate_property="Ch_Info",
230 reference_coordinate_property="FRF_RefID",
231 coordinate_string_map=None, invalid_coordinate_value='999999'):
232 # Set up the coordinate_string_map if necessary
233 if coordinate_string_map is None:
234 coordinate_string_map = {}
235 data = loadmat(data_file)
236 abscissa = data['Freq'].squeeze()
237 coherence = data['Coh'].T
238 frf = data['FRF'].T
239 psd = data['PSD'].T
240 coordinates_raw = data[coordinate_property]
241 coordinates = []
242 for coordinate in coordinates_raw:
243 coordinate = str(coordinate).strip()
244 if coordinate in coordinate_string_map:
245 coordinate = coordinate_string_map[coordinate]
246 try:
247 coordinate = coordinate_array(string_array=coordinate)
248 except (KeyError, ValueError):
249 coordinate = coordinate_array(string_array=invalid_coordinate_value)
250 coordinates.append(coordinate[np.newaxis])
251 coordinates = np.concatenate(coordinates)[:, np.newaxis]
252 reference_coordinates_raw = data[reference_coordinate_property]
253 reference_coordinates = []
254 for coordinate in reference_coordinates_raw:
255 coordinate = str(coordinate).strip()
256 if coordinate in coordinate_string_map:
257 coordinate = coordinate_string_map[coordinate]
258 try:
259 coordinate = coordinate_array(string_array=coordinate)
260 except (KeyError, ValueError):
261 coordinate = coordinate_array(string_array=invalid_coordinate_value)
262 reference_coordinates.append(coordinate[np.newaxis])
263 reference_coordinates = np.concatenate(reference_coordinates)
264 # Get metadata
265 run_number = str(data['Test_RunNum'].squeeze())
266 run_description = str(data['Test_Description'].squeeze())
267 test_axis = str(data['Test_Axis'].squeeze())
268 dsa_channel = [str(ch).strip() for ch in data['Daq_Ch'].squeeze()]
269 trans_type = [str(ch).strip() for ch in data['Ch_Type'].squeeze()]
270 ch_eu = [str(ch).strip() for ch in data['Ch_Unit'].squeeze()]
271 if len(trans_type) == 0:
272 psd_comment1 = ['Unit: {:}^2/Hz'.format(ch) for ch in ch_eu]
273 coh_comment1 = ['' for ch in ch_eu]
274 frf_comment1 = ['Unit: {:}'.format(ch) for ch in ch_eu]
275 else:
276 psd_comment1 = ['Type: {:}; Unit: {:}^2/Hz'.format(t, ch) for t, ch in zip(trans_type, ch_eu)]
277 coh_comment1 = ['Type: {:};'.format(t) for t, ch in zip(trans_type, ch_eu)]
278 frf_comment1 = ['Type: {:}; Unit: {:}'.format(t, ch) for t, ch in zip(trans_type, ch_eu)]
279 comment2 = ['Channel: {:}'.format(ch) for ch in dsa_channel]
280 comment3 = 'Run {:}: {:} {:}'.format(run_number, run_description, test_axis)
281 comment5 = data[coordinate_property]
282 psd_data = data_array(FunctionTypes.POWER_SPECTRAL_DENSITY,
283 abscissa, psd, np.concatenate((coordinates, coordinates), axis=-1),
284 psd_comment1, comment2, comment3, comment5=comment5)
285 coh_data = data_array(FunctionTypes.COHERENCE,
286 abscissa, coherence, outer_product(coordinates.flatten(), reference_coordinates.flatten())[:, 0, :],
287 coh_comment1, comment2, comment3, comment5=comment5)
288 frf_data = data_array(FunctionTypes.FREQUENCY_RESPONSE_FUNCTION,
289 abscissa, frf, outer_product(coordinates.flatten(), reference_coordinates.flatten())[:, 0, :],
290 frf_comment1, comment2, comment3, comment5=comment5)
291 return psd_data, coh_data, frf_data