Coverage for src\pytribeam\workflow.py: 66%

199 statements  

« prev     ^ index     » next       coverage.py v7.5.1, created at 2025-03-04 17:41 -0800

1#!/usr/bin/python3 

2 

3# Default python modules 

4# from functools import singledispatch 

5import os 

6from pathlib import Path 

7import time 

8import warnings 

9import math 

10from typing import NamedTuple, List, Tuple 

11from functools import singledispatch 

12import subprocess 

13 

14# Autoscript included modules 

15import numpy as np 

16from matplotlib import pyplot as plt 

17 

18# 3rd party module 

19 

20# Local scripts 

21import pytribeam.constants as cs 

22from pytribeam.constants import Conversions 

23import pytribeam.insertable_devices as devices 

24import pytribeam.factory as factory 

25import pytribeam.types as tbt 

26import pytribeam.utilities as ut 

27import pytribeam.stage as stage 

28import pytribeam.log as log 

29import pytribeam.laser as laser 

30import pytribeam.image as img 

31import pytribeam.fib as fib 

32 

33 

34@singledispatch 

35def perform_operation( 

36 step_settings, 

37 step: tbt.Step, 

38 general_settings: tbt.GeneralSettings, 

39 slice_number: int, 

40) -> bool: 

41 """Create step object for different step types, including validation.""" 

42 _ = step_settings 

43 __ = step 

44 ___ = general_settings 

45 ____ = slice_number 

46 raise NotImplementedError(f"No handler for type {type(step_settings)}") 

47 

48 

49@perform_operation.register 

50def _( 

51 step_settings: tbt.ImageSettings, 

52 step: tbt.Step, 

53 general_settings: tbt.GeneralSettings, 

54 slice_number: int, 

55) -> bool: 

56 return img.image_operation( 

57 step=step, 

58 image_settings=step.operation_settings, 

59 general_settings=general_settings, 

60 slice_number=slice_number, 

61 ) 

62 

63 

64@perform_operation.register 

65def _( 

66 step_settings: tbt.FIBSettings, 

67 step: tbt.Step, 

68 general_settings: tbt.GeneralSettings, 

69 slice_number: int, 

70) -> bool: 

71 # collect image 

72 image_step = tbt.Step( 

73 type=tbt.StepType.IMAGE, 

74 name=step.name, 

75 number=step.number, 

76 frequency=step.frequency, 

77 stage=step.stage, 

78 operation_settings=step_settings.image, 

79 ) 

80 type(image_step.operation_settings) 

81 perform_operation( 

82 image_step.operation_settings, 

83 step=image_step, 

84 general_settings=general_settings, 

85 slice_number=slice_number, 

86 ) 

87 # mill pattern 

88 fib.mill_operation( 

89 step=step, 

90 fib_settings=step_settings, 

91 general_settings=general_settings, 

92 slice_number=slice_number, 

93 ) 

94 

95 return True 

96 

97 

98@perform_operation.register 

99def _( 

100 step_settings: tbt.CustomSettings, 

101 step: tbt.Step, 

102 general_settings: tbt.GeneralSettings, 

103 slice_number: int, 

104) -> bool: 

105 # run script 

106 aa = 2 

107 # dump out .yml with experiment info 

108 slice_info_path = Path.joinpath(general_settings.exp_dir, "slice_info.yml") 

109 db = {"exp_dir": str(general_settings.exp_dir), "slice_number": slice_number} 

110 ut.dict_to_yml(db=db, file_path=slice_info_path) 

111 

112 output = subprocess.run( 

113 [step_settings.executable_path, step_settings.script_path], 

114 capture_output=True, 

115 ) 

116 stdout, stderr = output.stdout.decode("utf-8"), output.stderr.decode("utf-8") 

117 if stdout: 

118 print(f"\nCustom script output: {stdout}\n") 

119 

120 if output.returncode != 0: 

121 if stderr: 

122 print(f"\nCustom script errors: {stderr}\n") 

123 raise ValueError( 

124 f"Subprocess call for script {step_settings.script_path} using executable {step_settings.executable_path} did not execute correctly." 

125 ) 

126 

127 slice_info_path.unlink() 

128 return True 

129 

130 

131@perform_operation.register 

132def _( 

133 step_settings: tbt.EBSDSettings, 

134 step: tbt.Step, 

135 general_settings: tbt.GeneralSettings, 

136 slice_number: int, 

137) -> bool: 

138 image_settings = step_settings.image 

139 microscope = image_settings.microscope 

140 

141 # insert detector 

142 devices.insert_EBSD(microscope=microscope) 

143 if step_settings.enable_eds: 

144 devices.insert_EDS(microscope=microscope) 

145 

146 # measure and log specimen current 

147 found_current_na = devices.specimen_current(microscope=microscope) 

148 log.specimen_current( 

149 step_number=step.number, 

150 step_name=step.name, 

151 slice_number=slice_number, 

152 log_filepath=general_settings.log_filepath, 

153 dataset_name=cs.Constants.specimen_current_dataset_name, 

154 specimen_current_na=found_current_na, 

155 ) 

156 

157 # take image 

158 img.image_operation( 

159 step=step, 

160 image_settings=image_settings, 

161 general_settings=general_settings, 

162 slice_number=slice_number, 

163 ) 

164 

165 # set dynamic focus/tilt correction 

166 dynamic_focus = image_settings.beam.settings.dynamic_focus 

167 tilt_correction = image_settings.beam.settings.tilt_correction 

168 img.beam_angular_correction( 

169 microscope=microscope, 

170 dynamic_focus=dynamic_focus, 

171 tilt_correction=tilt_correction, 

172 ) 

173 

174 # take map 

175 laser.map_ebsd() 

176 

177 # retract detector(s) 

178 devices.retract_EBSD(microscope=microscope) 

179 if step_settings.enable_eds: 

180 devices.retract_EDS(microscope=microscope) 

181 

182 return True 

183 

184 

185@perform_operation.register 

186def _( 

187 step_settings: tbt.EDSSettings, 

188 step: tbt.Step, 

189 general_settings: tbt.GeneralSettings, 

190 slice_number: int, 

191) -> bool: 

192 image_settings = step_settings.image 

193 microscope = image_settings.microscope 

194 

195 # insert detector 

196 devices.insert_EDS(microscope=microscope) 

197 

198 # measure and log specimen current 

199 found_current_na = devices.specimen_current(microscope=microscope) 

200 log.specimen_current( 

201 step_number=step.number, 

202 step_name=step.name, 

203 slice_number=slice_number, 

204 log_filepath=general_settings.log_filepath, 

205 dataset_name=cs.Constants.specimen_current_dataset_name, 

206 specimen_current_na=found_current_na, 

207 ) 

208 

209 # take image 

210 img.image_operation( 

211 step=step, 

212 image_settings=image_settings, 

213 general_settings=general_settings, 

214 slice_number=slice_number, 

215 ) 

216 

217 # set dynamic focus/tilt correction 

218 dynamic_focus = image_settings.beam.settings.dynamic_focus 

219 tilt_correction = image_settings.beam.settings.tilt_correction 

220 img.beam_angular_correction( 

221 microscope=microscope, 

222 dynamic_focus=dynamic_focus, 

223 tilt_correction=tilt_correction, 

224 ) 

225 

226 # take map 

227 laser.map_eds() 

228 

229 # retract detector 

230 devices.retract_EDS(microscope=microscope) 

231 

232 return True 

233 

234 

235@perform_operation.register 

236def _( 

237 step_settings: tbt.LaserSettings, 

238 step: tbt.Step, 

239 general_settings: tbt.GeneralSettings, 

240 slice_number: int, 

241) -> bool: 

242 return laser.laser_operation( 

243 step=step, 

244 general_settings=general_settings, 

245 slice_number=slice_number, 

246 ) 

247 

248 

249# @perform_operation(tbt.StageSettings) 

250# def _( 

251# step_settings, 

252# step: tbt.Step, 

253# log_filepath: Path, 

254# ) -> bool: 

255# pass 

256 

257 

258def ebsd_eds_conflict_free(step_sequence: List[tbt.Step]) -> bool: 

259 EBSD_EDS_conflict_msg = "Due to current limitations in 3rd party EBSD/EDS integration with the TriBeam, only one of these step types is allowed as only one map can be configured for an experiment, but EDS can be configured to be included with an EBSD type step. See User Guide for more details." 

260 

261 found_EBSD = False 

262 found_EDS = False 

263 

264 for step in step_sequence: 

265 if step.type == tbt.StepType.EBSD: 

266 if found_EDS == True: 

267 raise ValueError( 

268 f"EBSD step found in sequence after EDS step was already defined. {EBSD_EDS_conflict_msg}" 

269 ) 

270 found_EBSD = True 

271 

272 if step.type == tbt.StepType.EDS: 

273 if found_EBSD == True: 

274 raise ValueError( 

275 f"EDS step found in sequence after EBSD step was already defined. {EBSD_EDS_conflict_msg}" 

276 ) 

277 found_EDS = True 

278 

279 return True 

280 

281 

282def pre_flight_check(yml_path: Path) -> tbt.ExperimentSettings: 

283 # get configuration from yml 

284 yml_version = ut.yml_version(yml_path) 

285 experiment_settings = ut.yml_to_dict( 

286 yml_path_file=yml_path, 

287 version=yml_version, 

288 required_keys=( 

289 "general", 

290 "config_file_version", 

291 ), 

292 ) 

293 yml_format = ut.yml_format(version=yml_version) 

294 

295 # get general settings and validate them 

296 general_db = ut.general_settings( 

297 exp_settings=experiment_settings, yml_format=yml_format 

298 ) 

299 general_settings = factory.general( 

300 general_db=general_db, 

301 yml_format=yml_format, 

302 ) 

303 

304 # whether to enable EBSD and EDS control 

305 enable_EBSD = ut.enable_external_device(general_settings.EBSD_OEM) 

306 enable_EDS = ut.enable_external_device(general_settings.EDS_OEM) 

307 if enable_EBSD: 

308 status = devices.connect_EBSD() 

309 if status == tbt.RetractableDeviceState.ERROR: 

310 raise SystemError("EBSD camera is connected but in error state.") 

311 if enable_EDS: 

312 status = devices.connect_EDS() 

313 if status == tbt.RetractableDeviceState.ERROR: 

314 raise SystemError("EDS camera is connected but in error state.") 

315 

316 # connect to microscope: 

317 connection = general_settings.connection 

318 microscope = tbt.Microscope() 

319 ut.connect_microscope( 

320 microscope=microscope, 

321 quiet_output=True, 

322 connection_host=connection.host, 

323 connection_port=connection.port, 

324 ) 

325 

326 # get step_count and validate settings 

327 num_steps = ut.step_count(exp_settings=experiment_settings, yml_format=yml_format) 

328 step_sequence = [] # empty list of tbt.Step type objects 

329 for step in range(1, num_steps + 1): 

330 step_name, step_settings = ut.step_settings( 

331 exp_settings=experiment_settings, 

332 step_number_key=yml_format.step_number_key, 

333 step_number_val=step, 

334 yml_format=yml_format, 

335 ) 

336 if not step_name: 

337 raise KeyError( 

338 f"Step name for step {step} of {num_steps} is empty. Please provide a unique name for each step in your configuration." 

339 ) 

340 step_type = ut.step_type( 

341 settings=step_settings, 

342 yml_format=yml_format, 

343 ) 

344 

345 # validate connections for specific step types 

346 if step_type == tbt.StepType.LASER: 

347 laser_enabled = laser.laser_connected() 

348 if not laser_enabled: 

349 raise SystemError( 

350 f"Step name '{step_name}' is a Laser step type but Laser control is not currently enabled. Ensure TFS laser API is installed, Laser Control application is open." 

351 ) 

352 if (step_type == tbt.StepType.EDS) and (not enable_EDS): 

353 raise SystemError( 

354 f"Step name '{step_name}' is an EDS step type but EDS control is not currently enabled." 

355 ) 

356 if (step_type == tbt.StepType.EBSD) and (not enable_EBSD): 

357 raise SystemError( 

358 f"Step name '{step_name}' is an EDS step type but EDS control is not currently enabled." 

359 ) 

360 # if (step_type == tbt.StepType.EBSD_EDS) and ( 

361 # (not enable_EBSD) or (not enable_EDS) 

362 # ): 

363 # raise SystemError( 

364 # f"Step name '{step_name}' is an EBSD_EDS step type but EBSD and EDS control are not both currently enabled." 

365 # ) 

366 # create the step settings 

367 step = factory.step( 

368 microscope=microscope, 

369 step_name=step_name, 

370 step_settings=step_settings, 

371 general_settings=general_settings, 

372 yml_format=yml_format, 

373 ) 

374 

375 step_sequence.append(step) 

376 

377 if len(step_sequence) != num_steps: 

378 raise ValueError( 

379 f"Settings not parsed correctly, expected {num_steps} but only {len(step_sequence)} have been parsed." 

380 ) 

381 

382 # ensure only EBSD or EDS step type exists 

383 ebsd_eds_conflict_free(step_sequence=step_sequence) 

384 

385 experiment_settings = tbt.ExperimentSettings( 

386 microscope=microscope, 

387 general_settings=general_settings, 

388 step_sequence=step_sequence, 

389 enable_EBSD=enable_EBSD, 

390 enable_EDS=enable_EDS, 

391 ) 

392 # print("Pre-flight check complete.") 

393 return experiment_settings 

394 

395 

396def setup_experiment( 

397 yml_path: Path, 

398) -> tbt.ExperimentSettings: 

399 # validate yml 

400 experiment_settings = pre_flight_check(yml_path=yml_path) 

401 

402 log_filepath = experiment_settings.general_settings.log_filepath 

403 log.create_file(log_filepath) 

404 

405 # link stage to free working distance 

406 experiment_settings.microscope.specimen.stage.link() 

407 

408 # retract all devices 

409 print("\tRetracting all devices...") 

410 devices.retract_all_devices( 

411 microscope=experiment_settings.microscope, 

412 enable_EBSD=experiment_settings.enable_EBSD, 

413 enable_EDS=experiment_settings.enable_EDS, 

414 ) 

415 

416 return experiment_settings 

417 

418 

419def perform_step( 

420 slice_number: int, 

421 step_number: int, 

422 experiment_settings: tbt.ExperimentSettings, 

423): 

424 # # breakout experiment settings elements 

425 microscope = experiment_settings.microscope 

426 general_settings = experiment_settings.general_settings 

427 step_sequence = experiment_settings.step_sequence 

428 enable_EBSD = experiment_settings.enable_EBSD 

429 enable_EDS = experiment_settings.enable_EDS 

430 

431 # get operation settings, execute operation. 

432 operation = step_sequence[step_number - 1] # list is 0-indexed 

433 print( 

434 f"Slice {slice_number}, Step {step_number} of {general_settings.step_count}, '{operation.name}', a {operation.type.value} type step." 

435 ) 

436 if ( 

437 slice_number - 1 

438 ) % operation.frequency != 0: # slices start at 1, perform all steps on slice 1. 

439 print( 

440 f"\tStep frequency is every {operation.frequency} slices, starting on slice 1. Skipping step on this slice.\n" 

441 ) 

442 return 

443 

444 # log step_start position 

445 log.position( 

446 step_number=step_number, 

447 step_name=operation.name, 

448 slice_number=slice_number, 

449 log_filepath=general_settings.log_filepath, 

450 dataset_name=cs.Constants.pre_position_dataset_name, 

451 current_position=factory.active_stage_position_settings( 

452 microscope=microscope, 

453 ), 

454 ) 

455 

456 # retract all devices 

457 print("\tRetracting all devices...") 

458 with ut.nostdout(): 

459 devices.retract_all_devices( 

460 microscope=microscope, 

461 enable_EBSD=enable_EBSD, 

462 enable_EDS=enable_EDS, 

463 ) 

464 print("\tDevices retracted.") 

465 

466 # move stage to starting position for slice 

467 stage.step_start_position( 

468 microscope=microscope, 

469 slice_number=slice_number, 

470 operation=operation, 

471 general_settings=general_settings, 

472 ) 

473 

474 # perform specific operation 

475 perform_operation( 

476 operation.operation_settings, 

477 step=operation, 

478 general_settings=general_settings, 

479 slice_number=slice_number, 

480 ) 

481 

482 # log step end position 

483 log.position( 

484 step_number=step_number, 

485 step_name=operation.name, 

486 slice_number=slice_number, 

487 log_filepath=general_settings.log_filepath, 

488 dataset_name=cs.Constants.post_position_dataset_name, 

489 current_position=factory.active_stage_position_settings( 

490 microscope=microscope, 

491 ), 

492 ) 

493 

494 # retract all devices 

495 print("\tRetracting all devices...") 

496 with ut.nostdout(): 

497 devices.retract_all_devices( 

498 microscope=microscope, 

499 enable_EBSD=enable_EBSD, 

500 enable_EDS=enable_EDS, 

501 ) 

502 print("\tDevices retracted. Step Complete.\n") 

503 

504 

505def run_experiment_cli( 

506 start_slice: int, 

507 start_step: int, 

508 yml_path: Path, 

509): 

510 """main loop for the experiment, accessed through command line""" 

511 

512 experiment_settings = setup_experiment(yml_path=yml_path) 

513 

514 # warn user of any EBSD/EDS lack of control 

515 warning_text = """is not enabled, you will not have access to safety 

516 checking and these modalities during data collection. Please ensure  

517 this detector is retracted before proceeding.""" 

518 if not experiment_settings.enable_EBSD: 

519 print(f"\nWARNING: EBSD {warning_text}") 

520 if not ut.yes_no("Continue?"): 

521 print("\nExiting now...") 

522 exit() 

523 if not experiment_settings.enable_EDS: 

524 print(f"\nWARNING: EDS {warning_text}") 

525 if not ut.yes_no("Continue?"): 

526 print("\nExiting now...") 

527 exit() 

528 

529 # main loop 

530 log.experiment_settings( 

531 slice_number=start_slice, 

532 step_number=start_step, 

533 log_filepath=experiment_settings.general_settings.log_filepath, 

534 yml_path=yml_path, 

535 ) 

536 num_steps = len(experiment_settings.step_sequence) 

537 print( 

538 f"\n\nBeginning serial sectioning experiment on slice {start_slice}, step {start_step} of {num_steps}.\n" 

539 ) 

540 

541 for slice_number in range( 

542 start_slice, experiment_settings.general_settings.max_slice_number + 1 

543 ): # inclusive of max slice number 

544 for step_number in range(start_step, num_steps + 1): # list is 1-indexed 

545 perform_step( 

546 slice_number=slice_number, 

547 step_number=step_number, 

548 experiment_settings=experiment_settings, 

549 ) 

550 

551 # reset start_step to 1 at end of slice 

552 start_step = 1 

553 

554 ut.disconnect_microscope( 

555 microscope=experiment_settings.microscope, 

556 quiet_output=True, 

557 ) 

558 

559 print("\n\nExperiment complete.") 

560 

561 

562if __name__ == "__main__": 

563 pass