Coverage for src\pytribeam\workflow.py: 66%
199 statements
« prev ^ index » next coverage.py v7.5.1, created at 2025-03-04 17:41 -0800
« prev ^ index » next coverage.py v7.5.1, created at 2025-03-04 17:41 -0800
1#!/usr/bin/python3
3# Default python modules
4# from functools import singledispatch
5import os
6from pathlib import Path
7import time
8import warnings
9import math
10from typing import NamedTuple, List, Tuple
11from functools import singledispatch
12import subprocess
14# Autoscript included modules
15import numpy as np
16from matplotlib import pyplot as plt
18# 3rd party module
20# Local scripts
21import pytribeam.constants as cs
22from pytribeam.constants import Conversions
23import pytribeam.insertable_devices as devices
24import pytribeam.factory as factory
25import pytribeam.types as tbt
26import pytribeam.utilities as ut
27import pytribeam.stage as stage
28import pytribeam.log as log
29import pytribeam.laser as laser
30import pytribeam.image as img
31import pytribeam.fib as fib
34@singledispatch
35def perform_operation(
36 step_settings,
37 step: tbt.Step,
38 general_settings: tbt.GeneralSettings,
39 slice_number: int,
40) -> bool:
41 """Create step object for different step types, including validation."""
42 _ = step_settings
43 __ = step
44 ___ = general_settings
45 ____ = slice_number
46 raise NotImplementedError(f"No handler for type {type(step_settings)}")
49@perform_operation.register
50def _(
51 step_settings: tbt.ImageSettings,
52 step: tbt.Step,
53 general_settings: tbt.GeneralSettings,
54 slice_number: int,
55) -> bool:
56 return img.image_operation(
57 step=step,
58 image_settings=step.operation_settings,
59 general_settings=general_settings,
60 slice_number=slice_number,
61 )
64@perform_operation.register
65def _(
66 step_settings: tbt.FIBSettings,
67 step: tbt.Step,
68 general_settings: tbt.GeneralSettings,
69 slice_number: int,
70) -> bool:
71 # collect image
72 image_step = tbt.Step(
73 type=tbt.StepType.IMAGE,
74 name=step.name,
75 number=step.number,
76 frequency=step.frequency,
77 stage=step.stage,
78 operation_settings=step_settings.image,
79 )
80 type(image_step.operation_settings)
81 perform_operation(
82 image_step.operation_settings,
83 step=image_step,
84 general_settings=general_settings,
85 slice_number=slice_number,
86 )
87 # mill pattern
88 fib.mill_operation(
89 step=step,
90 fib_settings=step_settings,
91 general_settings=general_settings,
92 slice_number=slice_number,
93 )
95 return True
98@perform_operation.register
99def _(
100 step_settings: tbt.CustomSettings,
101 step: tbt.Step,
102 general_settings: tbt.GeneralSettings,
103 slice_number: int,
104) -> bool:
105 # run script
106 aa = 2
107 # dump out .yml with experiment info
108 slice_info_path = Path.joinpath(general_settings.exp_dir, "slice_info.yml")
109 db = {"exp_dir": str(general_settings.exp_dir), "slice_number": slice_number}
110 ut.dict_to_yml(db=db, file_path=slice_info_path)
112 output = subprocess.run(
113 [step_settings.executable_path, step_settings.script_path],
114 capture_output=True,
115 )
116 stdout, stderr = output.stdout.decode("utf-8"), output.stderr.decode("utf-8")
117 if stdout:
118 print(f"\nCustom script output: {stdout}\n")
120 if output.returncode != 0:
121 if stderr:
122 print(f"\nCustom script errors: {stderr}\n")
123 raise ValueError(
124 f"Subprocess call for script {step_settings.script_path} using executable {step_settings.executable_path} did not execute correctly."
125 )
127 slice_info_path.unlink()
128 return True
131@perform_operation.register
132def _(
133 step_settings: tbt.EBSDSettings,
134 step: tbt.Step,
135 general_settings: tbt.GeneralSettings,
136 slice_number: int,
137) -> bool:
138 image_settings = step_settings.image
139 microscope = image_settings.microscope
141 # insert detector
142 devices.insert_EBSD(microscope=microscope)
143 if step_settings.enable_eds:
144 devices.insert_EDS(microscope=microscope)
146 # measure and log specimen current
147 found_current_na = devices.specimen_current(microscope=microscope)
148 log.specimen_current(
149 step_number=step.number,
150 step_name=step.name,
151 slice_number=slice_number,
152 log_filepath=general_settings.log_filepath,
153 dataset_name=cs.Constants.specimen_current_dataset_name,
154 specimen_current_na=found_current_na,
155 )
157 # take image
158 img.image_operation(
159 step=step,
160 image_settings=image_settings,
161 general_settings=general_settings,
162 slice_number=slice_number,
163 )
165 # set dynamic focus/tilt correction
166 dynamic_focus = image_settings.beam.settings.dynamic_focus
167 tilt_correction = image_settings.beam.settings.tilt_correction
168 img.beam_angular_correction(
169 microscope=microscope,
170 dynamic_focus=dynamic_focus,
171 tilt_correction=tilt_correction,
172 )
174 # take map
175 laser.map_ebsd()
177 # retract detector(s)
178 devices.retract_EBSD(microscope=microscope)
179 if step_settings.enable_eds:
180 devices.retract_EDS(microscope=microscope)
182 return True
185@perform_operation.register
186def _(
187 step_settings: tbt.EDSSettings,
188 step: tbt.Step,
189 general_settings: tbt.GeneralSettings,
190 slice_number: int,
191) -> bool:
192 image_settings = step_settings.image
193 microscope = image_settings.microscope
195 # insert detector
196 devices.insert_EDS(microscope=microscope)
198 # measure and log specimen current
199 found_current_na = devices.specimen_current(microscope=microscope)
200 log.specimen_current(
201 step_number=step.number,
202 step_name=step.name,
203 slice_number=slice_number,
204 log_filepath=general_settings.log_filepath,
205 dataset_name=cs.Constants.specimen_current_dataset_name,
206 specimen_current_na=found_current_na,
207 )
209 # take image
210 img.image_operation(
211 step=step,
212 image_settings=image_settings,
213 general_settings=general_settings,
214 slice_number=slice_number,
215 )
217 # set dynamic focus/tilt correction
218 dynamic_focus = image_settings.beam.settings.dynamic_focus
219 tilt_correction = image_settings.beam.settings.tilt_correction
220 img.beam_angular_correction(
221 microscope=microscope,
222 dynamic_focus=dynamic_focus,
223 tilt_correction=tilt_correction,
224 )
226 # take map
227 laser.map_eds()
229 # retract detector
230 devices.retract_EDS(microscope=microscope)
232 return True
235@perform_operation.register
236def _(
237 step_settings: tbt.LaserSettings,
238 step: tbt.Step,
239 general_settings: tbt.GeneralSettings,
240 slice_number: int,
241) -> bool:
242 return laser.laser_operation(
243 step=step,
244 general_settings=general_settings,
245 slice_number=slice_number,
246 )
249# @perform_operation(tbt.StageSettings)
250# def _(
251# step_settings,
252# step: tbt.Step,
253# log_filepath: Path,
254# ) -> bool:
255# pass
258def ebsd_eds_conflict_free(step_sequence: List[tbt.Step]) -> bool:
259 EBSD_EDS_conflict_msg = "Due to current limitations in 3rd party EBSD/EDS integration with the TriBeam, only one of these step types is allowed as only one map can be configured for an experiment, but EDS can be configured to be included with an EBSD type step. See User Guide for more details."
261 found_EBSD = False
262 found_EDS = False
264 for step in step_sequence:
265 if step.type == tbt.StepType.EBSD:
266 if found_EDS == True:
267 raise ValueError(
268 f"EBSD step found in sequence after EDS step was already defined. {EBSD_EDS_conflict_msg}"
269 )
270 found_EBSD = True
272 if step.type == tbt.StepType.EDS:
273 if found_EBSD == True:
274 raise ValueError(
275 f"EDS step found in sequence after EBSD step was already defined. {EBSD_EDS_conflict_msg}"
276 )
277 found_EDS = True
279 return True
282def pre_flight_check(yml_path: Path) -> tbt.ExperimentSettings:
283 # get configuration from yml
284 yml_version = ut.yml_version(yml_path)
285 experiment_settings = ut.yml_to_dict(
286 yml_path_file=yml_path,
287 version=yml_version,
288 required_keys=(
289 "general",
290 "config_file_version",
291 ),
292 )
293 yml_format = ut.yml_format(version=yml_version)
295 # get general settings and validate them
296 general_db = ut.general_settings(
297 exp_settings=experiment_settings, yml_format=yml_format
298 )
299 general_settings = factory.general(
300 general_db=general_db,
301 yml_format=yml_format,
302 )
304 # whether to enable EBSD and EDS control
305 enable_EBSD = ut.enable_external_device(general_settings.EBSD_OEM)
306 enable_EDS = ut.enable_external_device(general_settings.EDS_OEM)
307 if enable_EBSD:
308 status = devices.connect_EBSD()
309 if status == tbt.RetractableDeviceState.ERROR:
310 raise SystemError("EBSD camera is connected but in error state.")
311 if enable_EDS:
312 status = devices.connect_EDS()
313 if status == tbt.RetractableDeviceState.ERROR:
314 raise SystemError("EDS camera is connected but in error state.")
316 # connect to microscope:
317 connection = general_settings.connection
318 microscope = tbt.Microscope()
319 ut.connect_microscope(
320 microscope=microscope,
321 quiet_output=True,
322 connection_host=connection.host,
323 connection_port=connection.port,
324 )
326 # get step_count and validate settings
327 num_steps = ut.step_count(exp_settings=experiment_settings, yml_format=yml_format)
328 step_sequence = [] # empty list of tbt.Step type objects
329 for step in range(1, num_steps + 1):
330 step_name, step_settings = ut.step_settings(
331 exp_settings=experiment_settings,
332 step_number_key=yml_format.step_number_key,
333 step_number_val=step,
334 yml_format=yml_format,
335 )
336 if not step_name:
337 raise KeyError(
338 f"Step name for step {step} of {num_steps} is empty. Please provide a unique name for each step in your configuration."
339 )
340 step_type = ut.step_type(
341 settings=step_settings,
342 yml_format=yml_format,
343 )
345 # validate connections for specific step types
346 if step_type == tbt.StepType.LASER:
347 laser_enabled = laser.laser_connected()
348 if not laser_enabled:
349 raise SystemError(
350 f"Step name '{step_name}' is a Laser step type but Laser control is not currently enabled. Ensure TFS laser API is installed, Laser Control application is open."
351 )
352 if (step_type == tbt.StepType.EDS) and (not enable_EDS):
353 raise SystemError(
354 f"Step name '{step_name}' is an EDS step type but EDS control is not currently enabled."
355 )
356 if (step_type == tbt.StepType.EBSD) and (not enable_EBSD):
357 raise SystemError(
358 f"Step name '{step_name}' is an EDS step type but EDS control is not currently enabled."
359 )
360 # if (step_type == tbt.StepType.EBSD_EDS) and (
361 # (not enable_EBSD) or (not enable_EDS)
362 # ):
363 # raise SystemError(
364 # f"Step name '{step_name}' is an EBSD_EDS step type but EBSD and EDS control are not both currently enabled."
365 # )
366 # create the step settings
367 step = factory.step(
368 microscope=microscope,
369 step_name=step_name,
370 step_settings=step_settings,
371 general_settings=general_settings,
372 yml_format=yml_format,
373 )
375 step_sequence.append(step)
377 if len(step_sequence) != num_steps:
378 raise ValueError(
379 f"Settings not parsed correctly, expected {num_steps} but only {len(step_sequence)} have been parsed."
380 )
382 # ensure only EBSD or EDS step type exists
383 ebsd_eds_conflict_free(step_sequence=step_sequence)
385 experiment_settings = tbt.ExperimentSettings(
386 microscope=microscope,
387 general_settings=general_settings,
388 step_sequence=step_sequence,
389 enable_EBSD=enable_EBSD,
390 enable_EDS=enable_EDS,
391 )
392 # print("Pre-flight check complete.")
393 return experiment_settings
396def setup_experiment(
397 yml_path: Path,
398) -> tbt.ExperimentSettings:
399 # validate yml
400 experiment_settings = pre_flight_check(yml_path=yml_path)
402 log_filepath = experiment_settings.general_settings.log_filepath
403 log.create_file(log_filepath)
405 # link stage to free working distance
406 experiment_settings.microscope.specimen.stage.link()
408 # retract all devices
409 print("\tRetracting all devices...")
410 devices.retract_all_devices(
411 microscope=experiment_settings.microscope,
412 enable_EBSD=experiment_settings.enable_EBSD,
413 enable_EDS=experiment_settings.enable_EDS,
414 )
416 return experiment_settings
419def perform_step(
420 slice_number: int,
421 step_number: int,
422 experiment_settings: tbt.ExperimentSettings,
423):
424 # # breakout experiment settings elements
425 microscope = experiment_settings.microscope
426 general_settings = experiment_settings.general_settings
427 step_sequence = experiment_settings.step_sequence
428 enable_EBSD = experiment_settings.enable_EBSD
429 enable_EDS = experiment_settings.enable_EDS
431 # get operation settings, execute operation.
432 operation = step_sequence[step_number - 1] # list is 0-indexed
433 print(
434 f"Slice {slice_number}, Step {step_number} of {general_settings.step_count}, '{operation.name}', a {operation.type.value} type step."
435 )
436 if (
437 slice_number - 1
438 ) % operation.frequency != 0: # slices start at 1, perform all steps on slice 1.
439 print(
440 f"\tStep frequency is every {operation.frequency} slices, starting on slice 1. Skipping step on this slice.\n"
441 )
442 return
444 # log step_start position
445 log.position(
446 step_number=step_number,
447 step_name=operation.name,
448 slice_number=slice_number,
449 log_filepath=general_settings.log_filepath,
450 dataset_name=cs.Constants.pre_position_dataset_name,
451 current_position=factory.active_stage_position_settings(
452 microscope=microscope,
453 ),
454 )
456 # retract all devices
457 print("\tRetracting all devices...")
458 with ut.nostdout():
459 devices.retract_all_devices(
460 microscope=microscope,
461 enable_EBSD=enable_EBSD,
462 enable_EDS=enable_EDS,
463 )
464 print("\tDevices retracted.")
466 # move stage to starting position for slice
467 stage.step_start_position(
468 microscope=microscope,
469 slice_number=slice_number,
470 operation=operation,
471 general_settings=general_settings,
472 )
474 # perform specific operation
475 perform_operation(
476 operation.operation_settings,
477 step=operation,
478 general_settings=general_settings,
479 slice_number=slice_number,
480 )
482 # log step end position
483 log.position(
484 step_number=step_number,
485 step_name=operation.name,
486 slice_number=slice_number,
487 log_filepath=general_settings.log_filepath,
488 dataset_name=cs.Constants.post_position_dataset_name,
489 current_position=factory.active_stage_position_settings(
490 microscope=microscope,
491 ),
492 )
494 # retract all devices
495 print("\tRetracting all devices...")
496 with ut.nostdout():
497 devices.retract_all_devices(
498 microscope=microscope,
499 enable_EBSD=enable_EBSD,
500 enable_EDS=enable_EDS,
501 )
502 print("\tDevices retracted. Step Complete.\n")
505def run_experiment_cli(
506 start_slice: int,
507 start_step: int,
508 yml_path: Path,
509):
510 """main loop for the experiment, accessed through command line"""
512 experiment_settings = setup_experiment(yml_path=yml_path)
514 # warn user of any EBSD/EDS lack of control
515 warning_text = """is not enabled, you will not have access to safety
516 checking and these modalities during data collection. Please ensure
517 this detector is retracted before proceeding."""
518 if not experiment_settings.enable_EBSD:
519 print(f"\nWARNING: EBSD {warning_text}")
520 if not ut.yes_no("Continue?"):
521 print("\nExiting now...")
522 exit()
523 if not experiment_settings.enable_EDS:
524 print(f"\nWARNING: EDS {warning_text}")
525 if not ut.yes_no("Continue?"):
526 print("\nExiting now...")
527 exit()
529 # main loop
530 log.experiment_settings(
531 slice_number=start_slice,
532 step_number=start_step,
533 log_filepath=experiment_settings.general_settings.log_filepath,
534 yml_path=yml_path,
535 )
536 num_steps = len(experiment_settings.step_sequence)
537 print(
538 f"\n\nBeginning serial sectioning experiment on slice {start_slice}, step {start_step} of {num_steps}.\n"
539 )
541 for slice_number in range(
542 start_slice, experiment_settings.general_settings.max_slice_number + 1
543 ): # inclusive of max slice number
544 for step_number in range(start_step, num_steps + 1): # list is 1-indexed
545 perform_step(
546 slice_number=slice_number,
547 step_number=step_number,
548 experiment_settings=experiment_settings,
549 )
551 # reset start_step to 1 at end of slice
552 start_step = 1
554 ut.disconnect_microscope(
555 microscope=experiment_settings.microscope,
556 quiet_output=True,
557 )
559 print("\n\nExperiment complete.")
562if __name__ == "__main__":
563 pass