Coverage for cli/src/xyfigure/xymodel.py: 61%

274 statements  

« prev     ^ index     » next       coverage.py v7.6.9, created at 2024-12-18 01:09 +0000

1# https://www.python.org/dev/peps/pep-0008/#imports 

2# standard library imports 

3# import os 

4import sys 

5 

6# related third-party imports 

7import numpy as np 

8from pathlib import Path 

9from scipy import signal 

10 

11# from scipy.integrate import cumtrapz # version 0.14.0, now deprecated 

12from scipy.integrate import cumulative_trapezoid # version 1.14.1 

13 

14 

15# local application/library specific imports 

16# from xyfigure.xybase import XYBase 

17# from xyfigure.code.xybase import XYBase 

18# from xyfigure.xybase import XYBase, absolute_path 

19from xyfigure.xybase import XYBase 

20 

21 

22# Helper functions 

23def cross_correlation(reference, subject, verbose=False): 

24 

25 if verbose: 

26 print("\nThis is xymodel.cross_correlation...") 

27 print(f"reference: {reference}") 

28 print(f"subject: {subject}") 

29 

30 ref_delta_t = reference[1, 0] - reference[0, 0] 

31 ref_t_min = reference[0, 0] 

32 ref_t_max = reference[-1, 0] 

33 

34 dt = subject[1, 0] - subject[0, 0] # sample delta t 

35 t_min = subject[0, 0] 

36 t_max = subject[-1, 0] 

37 

38 # globalized interval and frequency 

39 DT = np.minimum(ref_delta_t, dt) 

40 T_MIN = np.minimum(ref_t_min, t_min) 

41 T_MAX = np.maximum(ref_t_max, t_max) 

42 

43 n_samples = int((T_MAX - T_MIN) / DT) + 1 

44 t_span = np.linspace(T_MIN, T_MAX, n_samples, endpoint=True) 

45 

46 if verbose: 

47 print("\nSynchronization...") 

48 print( 

49 f" Reference [t_min, t_max] by dt (s): [{ref_t_min}, {ref_t_max}] by {ref_delta_t}" 

50 ) 

51 print(f" Subject [t_min, t_max] by dt (s): [{t_min}, {t_max}] by {dt}") 

52 print(f" Globalized [t_min, t_max] by dt (s): [{T_MIN}, {T_MAX}] by {DT}") 

53 print(f" Globalized times: {t_span}") 

54 print(f" Length of globalized times: {len(t_span)}") 

55 

56 ref_y_span = np.interp( 

57 t_span, reference[:, 0], reference[:, 1], left=0.0, right=0.0 

58 ) 

59 

60 y_span = np.interp(t_span, subject[:, 0], subject[:, 1], left=0.0, right=0.0) 

61 

62 cross_corr = np.correlate(ref_y_span, y_span, mode="full") 

63 cross_corr_max = np.max(cross_corr) 

64 

65 cross_corr_unit = np.correlate( 

66 ref_y_span / np.linalg.norm(ref_y_span), 

67 y_span / np.linalg.norm(y_span), 

68 mode="full", 

69 ) 

70 

71 ref_self_corr = np.correlate(ref_y_span, ref_y_span)[0] # self correlated reference 

72 rel_corr_error = 0.0 

73 

74 if ref_self_corr > 0: 

75 rel_corr_error = abs(cross_corr_max - ref_self_corr) / ref_self_corr 

76 

77 offset_index = np.argmax(cross_corr) 

78 

79 # shift time full-left, then incrementally to the right 

80 # t_shift = t_span - t_span[-1] + t_span[offset_index] # nope! 

81 # t_shift = t_span - t_span[-1] + offset_index * DT # bug! should shift to t0 referance signal 

82 t_shift = t_span - t_span[-1] + t_span[0] + offset_index * DT 

83 

84 T_MIN_CORR = np.minimum(ref_t_min, t_shift[0]) 

85 T_MAX_CORR = np.maximum(ref_t_max, t_shift[-1]) 

86 

87 n_samples_corr = int((T_MAX_CORR - T_MIN_CORR) / DT) + 1 # DT unchanged pre-shift 

88 t_span_corr = np.linspace(T_MIN_CORR, T_MAX_CORR, n_samples_corr, endpoint=True) 

89 

90 ref_y_span_corr = np.interp( 

91 t_span_corr, reference[:, 0], reference[:, 1], left=0.0, right=0.0 

92 ) 

93 

94 y_span_corr = np.interp(t_span_corr, t_shift, y_span, left=0.0, right=0.0) 

95 

96 error = y_span_corr - ref_y_span_corr 

97 

98 L2_norm_error_rate = np.linalg.norm(error) / n_samples_corr 

99 

100 if verbose: 

101 print("\nCorrelation...") 

102 print(f" Sliding dot product (cross-correlation): {cross_corr}") 

103 print(f" Length of the sliding dot product: {len(cross_corr)}") 

104 print(f" Max sliding dot product (cross-correlation): {cross_corr_max}") 

105 print( 

106 f" Sliding dot product of normalized signals (cross-correlation): {cross_corr_unit}" 

107 ) 

108 

109 print(f" Correlated time_shift (from full left)={offset_index * DT}") 

110 print(f" Correlated index_shift (from full left)={offset_index}") 

111 

112 print(f" Correlated time step (s): {DT}") 

113 print(f" Correlated t_min (s): {T_MIN_CORR}") 

114 print(f" Correlated t_max (s): {T_MAX_CORR}") 

115 print(f" Correlated times: {t_span_corr}") 

116 print(f" Correlated reference f(t): {ref_y_span_corr}") 

117 print(f" Correlated subject f(t): {y_span_corr}") 

118 print(f" Correlated error f(t): {error}") 

119 

120 print(f" reference_self_correlation: {ref_self_corr}") 

121 print(f" cross_correlation: {cross_corr_max}") 

122 print(f" >> cross_correlation_relative_error={rel_corr_error}") 

123 print(f" >> L2-norm error rate: {L2_norm_error_rate}") 

124 

125 return t_span_corr, y_span_corr, rel_corr_error, L2_norm_error_rate 

126 

127 

128class XYModel(XYBase): 

129 """The data to be plotted in XY format.""" 

130 

131 def __init__(self, guid, **kwargs): 

132 super().__init__(guid, **kwargs) 

133 

134 # TODO: rearchitect into single parent for both XYModel and XYModelAbaqus 

135 # make sure models have an input file that exists 

136 if not self._file_pathlib.is_file(): 

137 print('Error: keyword "file" has a value (e.g., a file name):') 

138 print(self._file) 

139 print("with full path specification:") 

140 print(self._file_pathlib) 

141 raise KeyError("file not found") 

142 

143 self._skip_rows = kwargs.get("skip_rows", 0) 

144 self._skip_rows_footer = kwargs.get("skip_rows_footer", 0) 

145 self._xcolumn = kwargs.get("xcolumn", 0) # default to the 1st column 

146 self._ycolumn = kwargs.get("ycolumn", 1) # default to the 2nd column 

147 

148 # relative to current run location 

149 # rel_path_and_file = os.path.join(self._folder, self._file) 

150 # self._data = np.genfromtxt( 

151 # rel_path_and_file, 

152 # dtype="float", 

153 # delimiter=",", 

154 # skip_header=self._skip_rows, 

155 # skip_footer=self._skip_rows_footer, 

156 # usecols=(self._xcolumn, self._ycolumn), 

157 # ) 

158 self._data = np.genfromtxt( 

159 self._path_file_input, 

160 dtype="float", 

161 delimiter=",", 

162 skip_header=self._skip_rows, 

163 skip_footer=self._skip_rows_footer, 

164 usecols=(self._xcolumn, self._ycolumn), 

165 ) 

166 

167 # default value if plot_kwargs not client-supplied 

168 default = {"linewidth": 2.0, "linestyle": "-"} 

169 self._plot_kwargs = kwargs.get("plot_kwargs", default) 

170 

171 self._xscale = kwargs.get("xscale", 1.0) 

172 self._yscale = kwargs.get("yscale", 1.0) 

173 self._xoffset = kwargs.get("xoffset", 0.0) 

174 self._yoffset = kwargs.get("yoffset", 0.0) 

175 

176 self._signal_process = kwargs.get("signal_process", None) 

177 

178 if self._signal_process: 

179 for process_id in self._signal_process: 

180 process_dict = self._signal_process.get(process_id) 

181 key = next(iter(process_dict)) 

182 value = process_dict[key] 

183 

184 try: 

185 method = getattr(self, key) 

186 method(value) 

187 except AttributeError as error: 

188 print(f"Error: invalid signal process key: {key}") 

189 print(error.__class__.__name__) 

190 

191 if self._verbose: 

192 print(' Signal process "' + key + '" completed.') 

193 

194 @property 

195 def x(self): 

196 """Returns the model's x data.""" 

197 return self._data[:, 0] * self._xscale + self._xoffset 

198 

199 @property 

200 def y(self): 

201 """Returns the model's y data.""" 

202 return self._data[:, 1] * self._yscale + self._yoffset 

203 

204 @property 

205 def plot_kwargs(self): 

206 """Returns kwargs passed to matplotlib.pyplot.plot().""" 

207 return self._plot_kwargs 

208 

209 def serialize(self, folder, filename, header=""): # extend base class 

210 super().serialize(folder, filename) 

211 np.savetxt( 

212 self._path_file_output, 

213 np.transpose([self._data[:, 0], self._data[:, 1]]), 

214 delimiter=",", 

215 header=header, 

216 ) 

217 if self._verbose: 

218 print(f" Serialized model to: {self._path_file_output}") 

219 

220 def butterworth(self, value): 

221 

222 fc = value.get("cutoff", None) # Hz, cutoff frequency 

223 if fc is None: 

224 print('Error: keyword "cutoff" not found.') 

225 sys.exit("Abnormal termination.") 

226 

227 filter_order = value.get("order", None) 

228 if filter_order is None: 

229 print('Error: keyword "order" not found.') 

230 sys.exit("Abnormal termination.") 

231 

232 filter_type = value.get("type", None) 

233 if filter_type is None: 

234 print('Error: keyword "type" not found.') 

235 sys.exit("Abnormal termination.") 

236 

237 if self._verbose: 

238 print('Signal process "butterworth" with:') 

239 print(f" cutoff frequency = {fc} Hz") 

240 print(f" filter order = {filter_order}") 

241 print(" filter type = " + filter_type) 

242 

243 dt = self._data[1, 0] - self._data[0, 0] # sample delta t 

244 

245 if self._verbose: 

246 print(f" sample delta t = {dt} seconds") 

247 

248 fs = 1.0 / dt # Hz 

249 if self._verbose: 

250 print(f" sample frequency = {fs} Hz") 

251 

252 Wn = fc / (fs / 2) # normalized critical frequency 

253 if self._verbose: 

254 print(f" normalized critical frequency = {Wn} Hz/Hz") 

255 

256 b, a = signal.butter(filter_order, Wn, filter_type) 

257 yfiltered = signal.filtfilt(b, a, self._data[:, 1]) 

258 self._data[:, 1] = yfiltered # overwrite 

259 

260 serialize = value.get("serialize", 0) # default is not to serialize 

261 if serialize: 

262 folder = value.get("folder", ".") # default to current folder 

263 file_output = value.get("file", None) 

264 

265 if file_output is None: 

266 print('Error: keyword "file" not found.') 

267 sys.exit("Abnormal termination.") 

268 else: 

269 self.serialize(folder, file_output) 

270 

271 def correlation(self, value): 

272 

273 reference = value.get("reference", None) 

274 if reference is None: 

275 print('Error: keyword "reference" not found.') 

276 sys.exit("Abnormal termination.") 

277 

278 # ref is the reference signal to compare with 

279 ref_default_folder = "." 

280 ref_folder = reference.get("folder", ref_default_folder) 

281 

282 ref_file = reference.get("file", None) 

283 

284 verbosity = value.get("verbose", False) 

285 

286 if ref_file is None: 

287 print('Error: keyword "file" not found.') 

288 sys.exit("Abnormal termination.") 

289 

290 # abs_path = absolute_path(ref_folder) 

291 abs_path = Path(ref_folder).expanduser() 

292 # ref_path_file_input = os.path.join(abs_path, ref_file) 

293 ref_path_file_input = abs_path.joinpath(ref_file) 

294 

295 ref_skip_rows = reference.get("skip_rows", 0) 

296 ref_skip_rows_footer = reference.get("skip_rows_footer", 0) 

297 ref_xcolumn = reference.get("xcolumn", 0) # default to the 1st column 

298 ref_ycolumn = reference.get("ycolumn", 1) # default to the 2nd column 

299 

300 ref_data = np.genfromtxt( 

301 ref_path_file_input, 

302 dtype="float", 

303 delimiter=",", 

304 skip_header=ref_skip_rows, 

305 skip_footer=ref_skip_rows_footer, 

306 usecols=(ref_xcolumn, ref_ycolumn), 

307 ) 

308 

309 tcorrelated, ycorrelated, cc_relative_error, L2_error = cross_correlation( 

310 ref_data, self._data, verbose=verbosity 

311 ) 

312 

313 self._data = np.transpose([tcorrelated, ycorrelated]) # overwrite 

314 

315 serialize = value.get("serialize", 0) # default is not to serialize 

316 if serialize: 

317 folder = value.get("folder", ".") # default to current folder 

318 file_output = value.get("file", None) 

319 

320 if file_output is None: 

321 print('Error: keyword "file" not found.') 

322 sys.exit("Abnormal termination.") 

323 else: 

324 # self.serialize(folder, file_output, header=corr_str) 

325 self.serialize(folder, file_output) 

326 

327 def gradient(self, value): 

328 

329 gradient_order = value.get("order", None) 

330 if gradient_order is None: 

331 print('Error: keyword "order" not found.') 

332 sys.exit("Abnormal termination.") 

333 

334 if self._verbose: 

335 print( 

336 "Signal process:" 

337 " gradient applied " + str(gradient_order) + " time(s)" 

338 ) 

339 # numerical gradient 

340 # https://docs.scipy.org/doc/numpy/reference/generated/numpy.gradient.html 

341 # for k in range(value): 

342 for k in range(gradient_order): 

343 ydot = np.gradient(self._data[:, 1], self._data[:, 0], edge_order=2) 

344 self._data[:, 1] = ydot # overwrite 

345 if self._verbose: 

346 print(f" Derivative {k+1} completed.") 

347 

348 serialize = value.get("serialize", 0) # default is not to serialize 

349 if serialize: 

350 folder = value.get("folder", ".") # default to current folder 

351 file_output = value.get("file", None) 

352 

353 if file_output is None: 

354 print('Error: keyword "file" not found.') 

355 sys.exit("Abnormal termination.") 

356 else: 

357 self.serialize(folder, file_output) 

358 

359 def integration(self, value): 

360 

361 integration_order = value.get("order", None) 

362 if integration_order is None: 

363 print('Error: keyword "order" not found.') 

364 sys.exit("Abnormal termination.") 

365 

366 print( 

367 'Signal process: "integration" applied ' 

368 + str(integration_order) 

369 + " time(s) with" 

370 ) 

371 

372 default_ics = np.zeros(integration_order) 

373 ics = value.get("initial_conditions", default_ics) 

374 

375 print(f"initial condition(s) as {ics}") 

376 

377 if len(ics) != integration_order: 

378 print("Error: length of initial condition(s) array") 

379 print("must be equal to the order of integration.") 

380 print(f"Specified order = {integration_order}") 

381 print(f"Length of intial condition(s) array = {len(ics)}") 

382 sys.exit("Abnormal termination.") 

383 

384 # https://docs.scipy.org/doc/numpy/reference/generated/numpy.trapz.html 

385 # https://docs.scipy.org/doc/scipy-0.14.0/reference/generated/scipy.integrate.cumtrapz.html 

386 # https://docs.scipy.org/doc/scipy/reference/generated/scipy.integrate.cumulative_trapezoid.html#scipy.integrate.cumulative_trapezoid 

387 for k in range(integration_order): 

388 # inty = np.trapz(self._data[:, 1], self._data[:, 0]) + ics[k] 

389 # inty = cumtrapz(self._data[:, 1], self._data[:, 0], initial=ics[k]) 

390 # inty = ( 

391 # cumtrapz(self._data[:, 1], self._data[:, 0], initial=0) 

392 # + ics[k] 

393 # ) 

394 inty = ( 

395 cumulative_trapezoid(self._data[:, 1], self._data[:, 0], initial=0) 

396 + ics[k] 

397 ) 

398 

399 self._data[:, 1] = inty # overwrite 

400 print(f" Integral {k+1} completed.") 

401 

402 serialize = value.get("serialize", 0) # default is not to serialize 

403 if serialize: 

404 folder = value.get("folder", ".") # default to current folder 

405 file_output = value.get("file", None) 

406 

407 if file_output is None: 

408 print('Error: keyword "file" not found.') 

409 sys.exit("Abnormal termination.") 

410 else: 

411 self.serialize(folder, file_output) 

412 

413 

414class XYModelAbaqus(XYBase): 

415 """The ABAQUS mesh data to be plotted in XY format.""" 

416 

417 def __init__(self, guid, **kwargs): 

418 super().__init__(guid, **kwargs) 

419 

420 # TODO: rearchitect into single parent for both XYModel and XYModelAbaqus 

421 # make sure models have an input file that exists 

422 if not self._file_pathlib.is_file(): 

423 print('Error: keyword "file" has a value (e.g., a file name):') 

424 print(self._file) 

425 print("with full path specification:") 

426 print(self._file_pathlib) 

427 raise KeyError("file not found") 

428 

429 with open(str(self._file_pathlib), "rt") as f: 

430 self._nodes = tuple() 

431 self._elements = tuple() 

432 # nice ref: https://www.pythontutorial.net/python-basics/python-read-text-file/ 

433 try: 

434 # lines = f.readlines() 

435 

436 # for line in f: 

437 line = f.readline() 

438 

439 while line: 

440 if "*NODE" in line or "*Node" in line: 

441 # collect all nodes 

442 line = f.readline() # get the next line 

443 while "*" not in line: 

444 line = line.split(",") 

445 # assume a 3D format even for planar problems, and catch 

446 # exceptions as needed 

447 try: 

448 new_nodes = ( 

449 tuple( 

450 [ 

451 float(eval(line[1])), 

452 float(eval(line[2])), 

453 float(eval(line[3])), 

454 ] 

455 ), 

456 ) 

457 except ( 

458 IndexError 

459 ): # handle 2D input files, append 0.0 as z coordinate 

460 new_nodes = ( 

461 tuple( 

462 [ 

463 float(eval(line[1])), 

464 float(eval(line[2])), 

465 0.0, 

466 ] 

467 ), 

468 ) 

469 

470 self._nodes = self._nodes + new_nodes 

471 # print(self._nodes) 

472 line = f.readline() 

473 # print(line) 

474 elif "*ELEMENT" in line or "*Element" in line: 

475 # collect all elements 

476 line = f.readline() # get the next line 

477 while "*" not in line and len(line) > 0: 

478 line = line.split(",") 

479 new_element = ( 

480 tuple( 

481 [ 

482 int(line[1]), 

483 int(line[2]), 

484 int(line[3]), 

485 int(line[4]), 

486 ] 

487 ), 

488 ) 

489 self._elements = self._elements + new_element 

490 # print(self._elements) 

491 line = f.readline() 

492 else: 

493 line = f.readline() 

494 

495 print(f"Finished reading file: {self._file_pathlib}") 

496 

497 except OSError: 

498 print(f"Cannot read file: {self._file_pathlib}") 

499 

500 # default value if plot_kwargs not client-supplied 

501 # default = {"linewidth": 2.0, "linestyle": "solid", "color": "black", } 

502 _default = { 

503 "alpha": 0.8, 

504 "edgecolor": "navy", 

505 "facecolor": "gray", 

506 "linestyle": "solid", 

507 "linewidth": 1.0, 

508 } 

509 self._plot_kwargs = kwargs.get("plot_kwargs", _default) 

510 self._alpha = self._plot_kwargs.get("alpha", _default["alpha"]) 

511 self._edgecolor = self._plot_kwargs.get("edgecolor", _default["edgecolor"]) 

512 self._facecolor = self._plot_kwargs.get("facecolor", _default["facecolor"]) 

513 self._linestyle = self._plot_kwargs.get("linestyle", _default["linestyle"]) 

514 self._linewidth = self._plot_kwargs.get("linewidth", _default["linewidth"]) 

515 

516 

517""" 

518Copyright 2023 Sandia National Laboratories 

519 

520Notice: This computer software was prepared by National Technology and Engineering Solutions of 

521Sandia, LLC, hereinafter the Contractor, under Contract DE-NA0003525 with the Department of Energy 

522(DOE). All rights in the computer software are reserved by DOE on behalf of the United States 

523Government and the Contractor as provided in the Contract. You are authorized to use this computer 

524software for Governmental purposes but it is not to be released or distributed to the public. 

525NEITHER THE U.S. GOVERNMENT NOR THE CONTRACTOR MAKES ANY WARRANTY, EXPRESS OR IMPLIED, OR ASSUMES 

526ANY LIABILITY FOR THE USE OF THIS SOFTWARE. This notice including this sentence must appear on any 

527copies of this computer software. Export of this data may require a license from the United States 

528Government. 

529"""