Coverage for cli/src/xyfigure/xymodel.py: 61%
274 statements
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-18 01:09 +0000
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-18 01:09 +0000
1# https://www.python.org/dev/peps/pep-0008/#imports
2# standard library imports
3# import os
4import sys
6# related third-party imports
7import numpy as np
8from pathlib import Path
9from scipy import signal
11# from scipy.integrate import cumtrapz # version 0.14.0, now deprecated
12from scipy.integrate import cumulative_trapezoid # version 1.14.1
15# local application/library specific imports
16# from xyfigure.xybase import XYBase
17# from xyfigure.code.xybase import XYBase
18# from xyfigure.xybase import XYBase, absolute_path
19from xyfigure.xybase import XYBase
22# Helper functions
23def cross_correlation(reference, subject, verbose=False):
25 if verbose:
26 print("\nThis is xymodel.cross_correlation...")
27 print(f"reference: {reference}")
28 print(f"subject: {subject}")
30 ref_delta_t = reference[1, 0] - reference[0, 0]
31 ref_t_min = reference[0, 0]
32 ref_t_max = reference[-1, 0]
34 dt = subject[1, 0] - subject[0, 0] # sample delta t
35 t_min = subject[0, 0]
36 t_max = subject[-1, 0]
38 # globalized interval and frequency
39 DT = np.minimum(ref_delta_t, dt)
40 T_MIN = np.minimum(ref_t_min, t_min)
41 T_MAX = np.maximum(ref_t_max, t_max)
43 n_samples = int((T_MAX - T_MIN) / DT) + 1
44 t_span = np.linspace(T_MIN, T_MAX, n_samples, endpoint=True)
46 if verbose:
47 print("\nSynchronization...")
48 print(
49 f" Reference [t_min, t_max] by dt (s): [{ref_t_min}, {ref_t_max}] by {ref_delta_t}"
50 )
51 print(f" Subject [t_min, t_max] by dt (s): [{t_min}, {t_max}] by {dt}")
52 print(f" Globalized [t_min, t_max] by dt (s): [{T_MIN}, {T_MAX}] by {DT}")
53 print(f" Globalized times: {t_span}")
54 print(f" Length of globalized times: {len(t_span)}")
56 ref_y_span = np.interp(
57 t_span, reference[:, 0], reference[:, 1], left=0.0, right=0.0
58 )
60 y_span = np.interp(t_span, subject[:, 0], subject[:, 1], left=0.0, right=0.0)
62 cross_corr = np.correlate(ref_y_span, y_span, mode="full")
63 cross_corr_max = np.max(cross_corr)
65 cross_corr_unit = np.correlate(
66 ref_y_span / np.linalg.norm(ref_y_span),
67 y_span / np.linalg.norm(y_span),
68 mode="full",
69 )
71 ref_self_corr = np.correlate(ref_y_span, ref_y_span)[0] # self correlated reference
72 rel_corr_error = 0.0
74 if ref_self_corr > 0:
75 rel_corr_error = abs(cross_corr_max - ref_self_corr) / ref_self_corr
77 offset_index = np.argmax(cross_corr)
79 # shift time full-left, then incrementally to the right
80 # t_shift = t_span - t_span[-1] + t_span[offset_index] # nope!
81 # t_shift = t_span - t_span[-1] + offset_index * DT # bug! should shift to t0 referance signal
82 t_shift = t_span - t_span[-1] + t_span[0] + offset_index * DT
84 T_MIN_CORR = np.minimum(ref_t_min, t_shift[0])
85 T_MAX_CORR = np.maximum(ref_t_max, t_shift[-1])
87 n_samples_corr = int((T_MAX_CORR - T_MIN_CORR) / DT) + 1 # DT unchanged pre-shift
88 t_span_corr = np.linspace(T_MIN_CORR, T_MAX_CORR, n_samples_corr, endpoint=True)
90 ref_y_span_corr = np.interp(
91 t_span_corr, reference[:, 0], reference[:, 1], left=0.0, right=0.0
92 )
94 y_span_corr = np.interp(t_span_corr, t_shift, y_span, left=0.0, right=0.0)
96 error = y_span_corr - ref_y_span_corr
98 L2_norm_error_rate = np.linalg.norm(error) / n_samples_corr
100 if verbose:
101 print("\nCorrelation...")
102 print(f" Sliding dot product (cross-correlation): {cross_corr}")
103 print(f" Length of the sliding dot product: {len(cross_corr)}")
104 print(f" Max sliding dot product (cross-correlation): {cross_corr_max}")
105 print(
106 f" Sliding dot product of normalized signals (cross-correlation): {cross_corr_unit}"
107 )
109 print(f" Correlated time_shift (from full left)={offset_index * DT}")
110 print(f" Correlated index_shift (from full left)={offset_index}")
112 print(f" Correlated time step (s): {DT}")
113 print(f" Correlated t_min (s): {T_MIN_CORR}")
114 print(f" Correlated t_max (s): {T_MAX_CORR}")
115 print(f" Correlated times: {t_span_corr}")
116 print(f" Correlated reference f(t): {ref_y_span_corr}")
117 print(f" Correlated subject f(t): {y_span_corr}")
118 print(f" Correlated error f(t): {error}")
120 print(f" reference_self_correlation: {ref_self_corr}")
121 print(f" cross_correlation: {cross_corr_max}")
122 print(f" >> cross_correlation_relative_error={rel_corr_error}")
123 print(f" >> L2-norm error rate: {L2_norm_error_rate}")
125 return t_span_corr, y_span_corr, rel_corr_error, L2_norm_error_rate
128class XYModel(XYBase):
129 """The data to be plotted in XY format."""
131 def __init__(self, guid, **kwargs):
132 super().__init__(guid, **kwargs)
134 # TODO: rearchitect into single parent for both XYModel and XYModelAbaqus
135 # make sure models have an input file that exists
136 if not self._file_pathlib.is_file():
137 print('Error: keyword "file" has a value (e.g., a file name):')
138 print(self._file)
139 print("with full path specification:")
140 print(self._file_pathlib)
141 raise KeyError("file not found")
143 self._skip_rows = kwargs.get("skip_rows", 0)
144 self._skip_rows_footer = kwargs.get("skip_rows_footer", 0)
145 self._xcolumn = kwargs.get("xcolumn", 0) # default to the 1st column
146 self._ycolumn = kwargs.get("ycolumn", 1) # default to the 2nd column
148 # relative to current run location
149 # rel_path_and_file = os.path.join(self._folder, self._file)
150 # self._data = np.genfromtxt(
151 # rel_path_and_file,
152 # dtype="float",
153 # delimiter=",",
154 # skip_header=self._skip_rows,
155 # skip_footer=self._skip_rows_footer,
156 # usecols=(self._xcolumn, self._ycolumn),
157 # )
158 self._data = np.genfromtxt(
159 self._path_file_input,
160 dtype="float",
161 delimiter=",",
162 skip_header=self._skip_rows,
163 skip_footer=self._skip_rows_footer,
164 usecols=(self._xcolumn, self._ycolumn),
165 )
167 # default value if plot_kwargs not client-supplied
168 default = {"linewidth": 2.0, "linestyle": "-"}
169 self._plot_kwargs = kwargs.get("plot_kwargs", default)
171 self._xscale = kwargs.get("xscale", 1.0)
172 self._yscale = kwargs.get("yscale", 1.0)
173 self._xoffset = kwargs.get("xoffset", 0.0)
174 self._yoffset = kwargs.get("yoffset", 0.0)
176 self._signal_process = kwargs.get("signal_process", None)
178 if self._signal_process:
179 for process_id in self._signal_process:
180 process_dict = self._signal_process.get(process_id)
181 key = next(iter(process_dict))
182 value = process_dict[key]
184 try:
185 method = getattr(self, key)
186 method(value)
187 except AttributeError as error:
188 print(f"Error: invalid signal process key: {key}")
189 print(error.__class__.__name__)
191 if self._verbose:
192 print(' Signal process "' + key + '" completed.')
194 @property
195 def x(self):
196 """Returns the model's x data."""
197 return self._data[:, 0] * self._xscale + self._xoffset
199 @property
200 def y(self):
201 """Returns the model's y data."""
202 return self._data[:, 1] * self._yscale + self._yoffset
204 @property
205 def plot_kwargs(self):
206 """Returns kwargs passed to matplotlib.pyplot.plot()."""
207 return self._plot_kwargs
209 def serialize(self, folder, filename, header=""): # extend base class
210 super().serialize(folder, filename)
211 np.savetxt(
212 self._path_file_output,
213 np.transpose([self._data[:, 0], self._data[:, 1]]),
214 delimiter=",",
215 header=header,
216 )
217 if self._verbose:
218 print(f" Serialized model to: {self._path_file_output}")
220 def butterworth(self, value):
222 fc = value.get("cutoff", None) # Hz, cutoff frequency
223 if fc is None:
224 print('Error: keyword "cutoff" not found.')
225 sys.exit("Abnormal termination.")
227 filter_order = value.get("order", None)
228 if filter_order is None:
229 print('Error: keyword "order" not found.')
230 sys.exit("Abnormal termination.")
232 filter_type = value.get("type", None)
233 if filter_type is None:
234 print('Error: keyword "type" not found.')
235 sys.exit("Abnormal termination.")
237 if self._verbose:
238 print('Signal process "butterworth" with:')
239 print(f" cutoff frequency = {fc} Hz")
240 print(f" filter order = {filter_order}")
241 print(" filter type = " + filter_type)
243 dt = self._data[1, 0] - self._data[0, 0] # sample delta t
245 if self._verbose:
246 print(f" sample delta t = {dt} seconds")
248 fs = 1.0 / dt # Hz
249 if self._verbose:
250 print(f" sample frequency = {fs} Hz")
252 Wn = fc / (fs / 2) # normalized critical frequency
253 if self._verbose:
254 print(f" normalized critical frequency = {Wn} Hz/Hz")
256 b, a = signal.butter(filter_order, Wn, filter_type)
257 yfiltered = signal.filtfilt(b, a, self._data[:, 1])
258 self._data[:, 1] = yfiltered # overwrite
260 serialize = value.get("serialize", 0) # default is not to serialize
261 if serialize:
262 folder = value.get("folder", ".") # default to current folder
263 file_output = value.get("file", None)
265 if file_output is None:
266 print('Error: keyword "file" not found.')
267 sys.exit("Abnormal termination.")
268 else:
269 self.serialize(folder, file_output)
271 def correlation(self, value):
273 reference = value.get("reference", None)
274 if reference is None:
275 print('Error: keyword "reference" not found.')
276 sys.exit("Abnormal termination.")
278 # ref is the reference signal to compare with
279 ref_default_folder = "."
280 ref_folder = reference.get("folder", ref_default_folder)
282 ref_file = reference.get("file", None)
284 verbosity = value.get("verbose", False)
286 if ref_file is None:
287 print('Error: keyword "file" not found.')
288 sys.exit("Abnormal termination.")
290 # abs_path = absolute_path(ref_folder)
291 abs_path = Path(ref_folder).expanduser()
292 # ref_path_file_input = os.path.join(abs_path, ref_file)
293 ref_path_file_input = abs_path.joinpath(ref_file)
295 ref_skip_rows = reference.get("skip_rows", 0)
296 ref_skip_rows_footer = reference.get("skip_rows_footer", 0)
297 ref_xcolumn = reference.get("xcolumn", 0) # default to the 1st column
298 ref_ycolumn = reference.get("ycolumn", 1) # default to the 2nd column
300 ref_data = np.genfromtxt(
301 ref_path_file_input,
302 dtype="float",
303 delimiter=",",
304 skip_header=ref_skip_rows,
305 skip_footer=ref_skip_rows_footer,
306 usecols=(ref_xcolumn, ref_ycolumn),
307 )
309 tcorrelated, ycorrelated, cc_relative_error, L2_error = cross_correlation(
310 ref_data, self._data, verbose=verbosity
311 )
313 self._data = np.transpose([tcorrelated, ycorrelated]) # overwrite
315 serialize = value.get("serialize", 0) # default is not to serialize
316 if serialize:
317 folder = value.get("folder", ".") # default to current folder
318 file_output = value.get("file", None)
320 if file_output is None:
321 print('Error: keyword "file" not found.')
322 sys.exit("Abnormal termination.")
323 else:
324 # self.serialize(folder, file_output, header=corr_str)
325 self.serialize(folder, file_output)
327 def gradient(self, value):
329 gradient_order = value.get("order", None)
330 if gradient_order is None:
331 print('Error: keyword "order" not found.')
332 sys.exit("Abnormal termination.")
334 if self._verbose:
335 print(
336 "Signal process:"
337 " gradient applied " + str(gradient_order) + " time(s)"
338 )
339 # numerical gradient
340 # https://docs.scipy.org/doc/numpy/reference/generated/numpy.gradient.html
341 # for k in range(value):
342 for k in range(gradient_order):
343 ydot = np.gradient(self._data[:, 1], self._data[:, 0], edge_order=2)
344 self._data[:, 1] = ydot # overwrite
345 if self._verbose:
346 print(f" Derivative {k+1} completed.")
348 serialize = value.get("serialize", 0) # default is not to serialize
349 if serialize:
350 folder = value.get("folder", ".") # default to current folder
351 file_output = value.get("file", None)
353 if file_output is None:
354 print('Error: keyword "file" not found.')
355 sys.exit("Abnormal termination.")
356 else:
357 self.serialize(folder, file_output)
359 def integration(self, value):
361 integration_order = value.get("order", None)
362 if integration_order is None:
363 print('Error: keyword "order" not found.')
364 sys.exit("Abnormal termination.")
366 print(
367 'Signal process: "integration" applied '
368 + str(integration_order)
369 + " time(s) with"
370 )
372 default_ics = np.zeros(integration_order)
373 ics = value.get("initial_conditions", default_ics)
375 print(f"initial condition(s) as {ics}")
377 if len(ics) != integration_order:
378 print("Error: length of initial condition(s) array")
379 print("must be equal to the order of integration.")
380 print(f"Specified order = {integration_order}")
381 print(f"Length of intial condition(s) array = {len(ics)}")
382 sys.exit("Abnormal termination.")
384 # https://docs.scipy.org/doc/numpy/reference/generated/numpy.trapz.html
385 # https://docs.scipy.org/doc/scipy-0.14.0/reference/generated/scipy.integrate.cumtrapz.html
386 # https://docs.scipy.org/doc/scipy/reference/generated/scipy.integrate.cumulative_trapezoid.html#scipy.integrate.cumulative_trapezoid
387 for k in range(integration_order):
388 # inty = np.trapz(self._data[:, 1], self._data[:, 0]) + ics[k]
389 # inty = cumtrapz(self._data[:, 1], self._data[:, 0], initial=ics[k])
390 # inty = (
391 # cumtrapz(self._data[:, 1], self._data[:, 0], initial=0)
392 # + ics[k]
393 # )
394 inty = (
395 cumulative_trapezoid(self._data[:, 1], self._data[:, 0], initial=0)
396 + ics[k]
397 )
399 self._data[:, 1] = inty # overwrite
400 print(f" Integral {k+1} completed.")
402 serialize = value.get("serialize", 0) # default is not to serialize
403 if serialize:
404 folder = value.get("folder", ".") # default to current folder
405 file_output = value.get("file", None)
407 if file_output is None:
408 print('Error: keyword "file" not found.')
409 sys.exit("Abnormal termination.")
410 else:
411 self.serialize(folder, file_output)
414class XYModelAbaqus(XYBase):
415 """The ABAQUS mesh data to be plotted in XY format."""
417 def __init__(self, guid, **kwargs):
418 super().__init__(guid, **kwargs)
420 # TODO: rearchitect into single parent for both XYModel and XYModelAbaqus
421 # make sure models have an input file that exists
422 if not self._file_pathlib.is_file():
423 print('Error: keyword "file" has a value (e.g., a file name):')
424 print(self._file)
425 print("with full path specification:")
426 print(self._file_pathlib)
427 raise KeyError("file not found")
429 with open(str(self._file_pathlib), "rt") as f:
430 self._nodes = tuple()
431 self._elements = tuple()
432 # nice ref: https://www.pythontutorial.net/python-basics/python-read-text-file/
433 try:
434 # lines = f.readlines()
436 # for line in f:
437 line = f.readline()
439 while line:
440 if "*NODE" in line or "*Node" in line:
441 # collect all nodes
442 line = f.readline() # get the next line
443 while "*" not in line:
444 line = line.split(",")
445 # assume a 3D format even for planar problems, and catch
446 # exceptions as needed
447 try:
448 new_nodes = (
449 tuple(
450 [
451 float(eval(line[1])),
452 float(eval(line[2])),
453 float(eval(line[3])),
454 ]
455 ),
456 )
457 except (
458 IndexError
459 ): # handle 2D input files, append 0.0 as z coordinate
460 new_nodes = (
461 tuple(
462 [
463 float(eval(line[1])),
464 float(eval(line[2])),
465 0.0,
466 ]
467 ),
468 )
470 self._nodes = self._nodes + new_nodes
471 # print(self._nodes)
472 line = f.readline()
473 # print(line)
474 elif "*ELEMENT" in line or "*Element" in line:
475 # collect all elements
476 line = f.readline() # get the next line
477 while "*" not in line and len(line) > 0:
478 line = line.split(",")
479 new_element = (
480 tuple(
481 [
482 int(line[1]),
483 int(line[2]),
484 int(line[3]),
485 int(line[4]),
486 ]
487 ),
488 )
489 self._elements = self._elements + new_element
490 # print(self._elements)
491 line = f.readline()
492 else:
493 line = f.readline()
495 print(f"Finished reading file: {self._file_pathlib}")
497 except OSError:
498 print(f"Cannot read file: {self._file_pathlib}")
500 # default value if plot_kwargs not client-supplied
501 # default = {"linewidth": 2.0, "linestyle": "solid", "color": "black", }
502 _default = {
503 "alpha": 0.8,
504 "edgecolor": "navy",
505 "facecolor": "gray",
506 "linestyle": "solid",
507 "linewidth": 1.0,
508 }
509 self._plot_kwargs = kwargs.get("plot_kwargs", _default)
510 self._alpha = self._plot_kwargs.get("alpha", _default["alpha"])
511 self._edgecolor = self._plot_kwargs.get("edgecolor", _default["edgecolor"])
512 self._facecolor = self._plot_kwargs.get("facecolor", _default["facecolor"])
513 self._linestyle = self._plot_kwargs.get("linestyle", _default["linestyle"])
514 self._linewidth = self._plot_kwargs.get("linewidth", _default["linewidth"])
517"""
518Copyright 2023 Sandia National Laboratories
520Notice: This computer software was prepared by National Technology and Engineering Solutions of
521Sandia, LLC, hereinafter the Contractor, under Contract DE-NA0003525 with the Department of Energy
522(DOE). All rights in the computer software are reserved by DOE on behalf of the United States
523Government and the Contractor as provided in the Contract. You are authorized to use this computer
524software for Governmental purposes but it is not to be released or distributed to the public.
525NEITHER THE U.S. GOVERNMENT NOR THE CONTRACTOR MAKES ANY WARRANTY, EXPRESS OR IMPLIED, OR ASSUMES
526ANY LIABILITY FOR THE USE OF THIS SOFTWARE. This notice including this sentence must appear on any
527copies of this computer software. Export of this data may require a license from the United States
528Government.
529"""