Coverage for src / sdynpy / fileio / sdynpy_tshaker.py: 8%

198 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-11 16:22 +0000

1# -*- coding: utf-8 -*- 

2""" 

3This module handles reading data from output files of T-shaker, which is a 

4Labview-based Vibration software in use at Sandia National Laboratories. 

5 

6T-shaker writes TDMS files natively, but can also output .mat files as well. 

7There are .mat file formats for shaker shock, random vibration, and time 

8histories. 

9""" 

10""" 

11Copyright 2022 National Technology & Engineering Solutions of Sandia, 

12LLC (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S. 

13Government retains certain rights in this software. 

14 

15This program is free software: you can redistribute it and/or modify 

16it under the terms of the GNU General Public License as published by 

17the Free Software Foundation, either version 3 of the License, or 

18(at your option) any later version. 

19 

20This program is distributed in the hope that it will be useful, 

21but WITHOUT ANY WARRANTY; without even the implied warranty of 

22MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

23GNU General Public License for more details. 

24 

25You should have received a copy of the GNU General Public License 

26along with this program. If not, see <https://www.gnu.org/licenses/>. 

27""" 

28 

29import numpy as np 

30try: 

31 import nptdms as tdms 

32except ImportError: 

33 tdms = None 

34 

35from ..core.sdynpy_coordinate import (coordinate_array, outer_product, 

36 CoordinateArray, _string_map) 

37from ..core.sdynpy_data import data_array, FunctionTypes 

38from glob import glob 

39import os 

40from scipy.io import loadmat 

41 

42 

43def read_tdms(file, coordinate_property="Ch_Location", 

44 coordinate_string_map=None, invalid_coordinate_value='999999', 

45 min_time=None, max_time=None, channel_indices=None): 

46 if tdms is None: 

47 raise ValueError('Must have npTDMS installed to import T-shaker files') 

48 # Read in the file if we've passed a string. 

49 if not isinstance(file, tdms.TdmsFile): 

50 file = tdms.TdmsFile.read(file) 

51 

52 # Set up the coordinate_string_map if necessary 

53 if coordinate_string_map is None: 

54 coordinate_string_map = {} 

55 

56 # Go through and get the metadata 

57 dt = 1/file['TH Data Info'].properties['Sample Rate'] 

58 run_number = file['Test Run Info'].properties['Run Number'] 

59 run_description = file['Test Run Info'].properties['Run Description'] 

60 test_axis = file['Test Run Info'].properties['Test Axis'] 

61 test_type = file['Test Run Info'].properties['Test Type'] 

62 test_title = file['Test Series Info'].properties['Test Series Title'] 

63 

64 # Set up abscissa limits 

65 if min_time is None: 

66 min_index = 0 

67 else: 

68 min_index = int(np.ceil(min_time / dt)) 

69 if max_time is None: 

70 max_index = None 

71 else: 

72 max_index = int(np.floor(max_time / dt))+1 

73 abscissa_slice = slice(min_index, max_index) 

74 

75 # Now let's go through the channels 

76 data = [] 

77 for i, channel in enumerate(file['TH Data'].channels()): 

78 if channel_indices is not None and i not in channel_indices: 

79 continue 

80 print('Reading {:}'.format(channel.name)) 

81 dsa_device = channel.properties['DSA_Device'] 

82 dsa_sn = channel.properties['DSA_S/N'] 

83 dsa_channel = channel.properties['DSA_Channel'] 

84 trans_type = channel.properties['Trans_Type'] 

85 ch_eu = channel.properties['Ch_EU'] 

86 trans_model = channel.properties['Trans_Model'] 

87 trans_axis = channel.properties['Trans_Axis'] 

88 trans_sn = channel.properties['Trans_S/N'] 

89 coordinate = channel.properties[coordinate_property] 

90 if coordinate in coordinate_string_map: 

91 coordinate = coordinate_string_map[coordinate] 

92 try: 

93 coordinate = coordinate_array(string_array=coordinate) 

94 except (KeyError, ValueError): 

95 coordinate = coordinate_array(string_array=invalid_coordinate_value) 

96 time_data = channel[abscissa_slice] 

97 num_elements = time_data.size 

98 abscissa = np.arange(num_elements)*dt + min_index*dt 

99 # Comment 1 will be Trans_Type :: Ch_EU 

100 # Comment 2 will be DSA_Device DSA_S/N :: DSA_Channel 

101 # Comment 3 will be Test Series Title :: Run Number :: Run Description, Test Axis :: Test Type 

102 # Comment 5 will be the coordinate value as a string, just in case we can't convert it to a coordinate 

103 comment1 = 'Type: {:}; Unit: {:}'.format(trans_type, ch_eu) 

104 comment2 = 'Device: {:}; S/N: {:}; Ch: {:}'.format(dsa_device, dsa_sn, dsa_channel) 

105 comment3 = '{:} Run {:}: {:} {:} {:}'.format(test_title, run_number, run_description, test_axis, test_type) 

106 comment4 = 'Sensor: {:} {:} {:}'.format(trans_model, trans_sn, trans_axis) 

107 comment5 = '{:}'.format(channel.properties[coordinate_property]) 

108 # Now create the data object 

109 data.append(data_array(FunctionTypes.TIME_RESPONSE, abscissa, time_data, coordinate, 

110 comment1, comment2, comment3, comment4, comment5)[np.newaxis]) 

111 data = np.concatenate(data) 

112 return data 

113 

114 

115def read_mat_time_history(data_directory, coordinate_property="Ch_ID", 

116 coordinate_string_map=None, invalid_coordinate_value='999999', 

117 min_time=None, max_time=None, file_pattern='Ch*_THData.mat', 

118 file_sort_key=lambda x: int(os.path.split(x)[-1].split('_')[0].replace('Ch', ''))): 

119 # Set up the coordinate_string_map if necessary 

120 if coordinate_string_map is None: 

121 coordinate_string_map = {} 

122 

123 # Find files in the directory 

124 mat_files = glob(os.path.join(data_directory, file_pattern)) 

125 # Sort the files by number 

126 mat_files = sorted(mat_files, key=file_sort_key) 

127 data_arrays = [] 

128 for i, data_file in enumerate(mat_files): 

129 print('Reading File {:}'.format(data_file)) 

130 data = loadmat(data_file) 

131 # Get sample rate 

132 dt = data['dt'].squeeze() 

133 # Set up abscissa limits 

134 if min_time is None: 

135 min_index = 0 

136 else: 

137 min_index = int(np.ceil(min_time / dt)) 

138 if max_time is None: 

139 max_index = None 

140 else: 

141 max_index = int(np.floor(max_time / dt))+1 

142 abscissa_slice = slice(min_index, max_index) 

143 # Get metadata 

144 run_number = str(data['Test_RunNum'].squeeze()) 

145 run_description = str(data['Test_Description'].squeeze()) 

146 test_axis = str(data['Test_Axis'].squeeze()) 

147 dsa_channel = str(data['Daq_Ch'].squeeze()) 

148 trans_type = str(data['Data_Description'].squeeze()) 

149 ch_eu = str(data['Ch_Unit'].squeeze()) 

150 trans_model = str(data['Trans_Model'].squeeze()) 

151 trans_sn = str(data['Trans_SN'].squeeze()) 

152 coordinate = str(data[coordinate_property].squeeze()) 

153 if coordinate in coordinate_string_map: 

154 coordinate = coordinate_string_map[coordinate] 

155 try: 

156 coordinate = coordinate_array(string_array=coordinate) 

157 except (KeyError, ValueError): 

158 coordinate = coordinate_array(string_array=invalid_coordinate_value) 

159 # Get the data 

160 time_data = data['THData'].squeeze()[abscissa_slice] 

161 num_elements = time_data.size 

162 abscissa = np.arange(num_elements)*dt + min_index*dt 

163 comment1 = 'Type: {:}; Unit: {:}'.format(trans_type, ch_eu) 

164 comment2 = 'Channel: {:}'.format(dsa_channel) 

165 comment3 = 'Run {:}: {:} {:}'.format(run_number, run_description, test_axis) 

166 comment4 = 'Sensor: {:} {:}'.format(trans_model, trans_sn) 

167 comment5 = '{:}'.format(str(data[coordinate_property].squeeze())) 

168 # Now create the data object 

169 data_arrays.append(data_array(FunctionTypes.TIME_RESPONSE, abscissa, time_data, coordinate, 

170 comment1, comment2, comment3, comment4, comment5)[np.newaxis]) 

171 data_arrays = np.concatenate(data_arrays) 

172 return data_arrays 

173 

174 

175def read_mat_shock(data_file, coordinate_property="Ch_Info", 

176 coordinate_string_map=None, invalid_coordinate_value='999999', 

177 min_time=None, max_time=None, read_filtered_time_data=True): 

178 # Set up the coordinate_string_map if necessary 

179 if coordinate_string_map is None: 

180 coordinate_string_map = {} 

181 data = loadmat(data_file) 

182 # Get sample rate 

183 dt = data['dt'].squeeze() 

184 # Set up abscissa limits 

185 if min_time is None: 

186 min_index = 0 

187 else: 

188 min_index = int(np.ceil(min_time / dt)) 

189 if max_time is None: 

190 max_index = None 

191 else: 

192 max_index = int(np.floor(max_time / dt))+1 

193 abscissa_slice = slice(min_index, max_index) 

194 # Get metadata 

195 run_number = str(data['Test_RunNum'].squeeze()) 

196 run_description = str(data['Test_Description'].squeeze()) 

197 test_axis = str(data['Test_Axis'].squeeze()) 

198 dsa_channel = [str(ch) for ch in data['Daq_Ch'].squeeze()] 

199 trans_type = [str(ch) for ch in data['Data_Description'].squeeze()] 

200 ch_eu = [str(ch) for ch in data['Ch_Unit'].squeeze()] 

201 coordinates_raw = data[coordinate_property] 

202 coordinates = [] 

203 for coordinate in coordinates_raw: 

204 coordinate = str(coordinate).strip() 

205 if coordinate in coordinate_string_map: 

206 coordinate = coordinate_string_map[coordinate] 

207 try: 

208 coordinate = coordinate_array(string_array=coordinate) 

209 except (KeyError, ValueError): 

210 coordinate = coordinate_array(string_array=invalid_coordinate_value) 

211 coordinates.append(coordinate[np.newaxis]) 

212 coordinates = np.concatenate(coordinates)[:, np.newaxis] 

213 time_data = data['shk' if read_filtered_time_data else 'unfilshk'][abscissa_slice].T 

214 num_elements = time_data.shape[-1] 

215 abscissa = np.arange(num_elements)*dt + min_index*dt 

216 if len(trans_type) == 0: 

217 comment1 = ['Unit: {:}'.format(ch) for ch in ch_eu] 

218 else: 

219 comment1 = ['Type: {:}; Unit: {:}'.format(t, ch) for t, ch in zip(trans_type, ch_eu)] 

220 comment2 = ['Channel: {:}'.format(ch) for ch in dsa_channel] 

221 comment3 = 'Run {:}: {:} {:}'.format(run_number, run_description, test_axis) 

222 comment5 = data[coordinate_property] 

223 time_data = data_array(FunctionTypes.TIME_RESPONSE, abscissa, time_data, 

224 coordinates, comment1, comment2, comment3, 

225 comment5=comment5) 

226 return time_data 

227 

228 

229def read_mat_random(data_file, coordinate_property="Ch_Info", 

230 reference_coordinate_property="FRF_RefID", 

231 coordinate_string_map=None, invalid_coordinate_value='999999'): 

232 # Set up the coordinate_string_map if necessary 

233 if coordinate_string_map is None: 

234 coordinate_string_map = {} 

235 data = loadmat(data_file) 

236 abscissa = data['Freq'].squeeze() 

237 coherence = data['Coh'].T 

238 frf = data['FRF'].T 

239 psd = data['PSD'].T 

240 coordinates_raw = data[coordinate_property] 

241 coordinates = [] 

242 for coordinate in coordinates_raw: 

243 coordinate = str(coordinate).strip() 

244 if coordinate in coordinate_string_map: 

245 coordinate = coordinate_string_map[coordinate] 

246 try: 

247 coordinate = coordinate_array(string_array=coordinate) 

248 except (KeyError, ValueError): 

249 coordinate = coordinate_array(string_array=invalid_coordinate_value) 

250 coordinates.append(coordinate[np.newaxis]) 

251 coordinates = np.concatenate(coordinates)[:, np.newaxis] 

252 reference_coordinates_raw = data[reference_coordinate_property] 

253 reference_coordinates = [] 

254 for coordinate in reference_coordinates_raw: 

255 coordinate = str(coordinate).strip() 

256 if coordinate in coordinate_string_map: 

257 coordinate = coordinate_string_map[coordinate] 

258 try: 

259 coordinate = coordinate_array(string_array=coordinate) 

260 except (KeyError, ValueError): 

261 coordinate = coordinate_array(string_array=invalid_coordinate_value) 

262 reference_coordinates.append(coordinate[np.newaxis]) 

263 reference_coordinates = np.concatenate(reference_coordinates) 

264 # Get metadata 

265 run_number = str(data['Test_RunNum'].squeeze()) 

266 run_description = str(data['Test_Description'].squeeze()) 

267 test_axis = str(data['Test_Axis'].squeeze()) 

268 dsa_channel = [str(ch).strip() for ch in data['Daq_Ch'].squeeze()] 

269 trans_type = [str(ch).strip() for ch in data['Ch_Type'].squeeze()] 

270 ch_eu = [str(ch).strip() for ch in data['Ch_Unit'].squeeze()] 

271 if len(trans_type) == 0: 

272 psd_comment1 = ['Unit: {:}^2/Hz'.format(ch) for ch in ch_eu] 

273 coh_comment1 = ['' for ch in ch_eu] 

274 frf_comment1 = ['Unit: {:}'.format(ch) for ch in ch_eu] 

275 else: 

276 psd_comment1 = ['Type: {:}; Unit: {:}^2/Hz'.format(t, ch) for t, ch in zip(trans_type, ch_eu)] 

277 coh_comment1 = ['Type: {:};'.format(t) for t, ch in zip(trans_type, ch_eu)] 

278 frf_comment1 = ['Type: {:}; Unit: {:}'.format(t, ch) for t, ch in zip(trans_type, ch_eu)] 

279 comment2 = ['Channel: {:}'.format(ch) for ch in dsa_channel] 

280 comment3 = 'Run {:}: {:} {:}'.format(run_number, run_description, test_axis) 

281 comment5 = data[coordinate_property] 

282 psd_data = data_array(FunctionTypes.POWER_SPECTRAL_DENSITY, 

283 abscissa, psd, np.concatenate((coordinates, coordinates), axis=-1), 

284 psd_comment1, comment2, comment3, comment5=comment5) 

285 coh_data = data_array(FunctionTypes.COHERENCE, 

286 abscissa, coherence, outer_product(coordinates.flatten(), reference_coordinates.flatten())[:, 0, :], 

287 coh_comment1, comment2, comment3, comment5=comment5) 

288 frf_data = data_array(FunctionTypes.FREQUENCY_RESPONSE_FUNCTION, 

289 abscissa, frf, outer_product(coordinates.flatten(), reference_coordinates.flatten())[:, 0, :], 

290 frf_comment1, comment2, comment3, comment5=comment5) 

291 return psd_data, coh_data, frf_data