Coverage for src/pytribeam/GUI/runner_util/experiment_controller.py: 0%

226 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2026-06-16 18:30 +0000

1"""Experiment execution controller. 

2 

3This module handles the business logic for running experiments, 

4separated from the GUI presentation layer. 

5""" 

6 

7import time 

8import datetime 

9import traceback 

10from dataclasses import dataclass 

11from pathlib import Path 

12from typing import Callable, Optional, Dict, Any, Tuple, List 

13 

14import pytribeam.types as tbt 

15from pytribeam import workflow, stage, insertable_devices, utilities 

16from pytribeam.GUI.common import AppConfig, StoppableThread 

17from pytribeam.GUI.common.threading_utils import generate_escape_keypress 

18 

19 

20@dataclass 

21class ExperimentState: 

22 """Represents the current state of an experiment. 

23 

24 This is an immutable snapshot of experiment state that can be 

25 passed to UI callbacks for updates. 

26 

27 Attributes: 

28 current_slice: Current slice number being processed 

29 current_step: Current step name being processed 

30 total_slices: Total number of slices in experiment 

31 total_steps: Total number of steps per slice 

32 is_running: Whether experiment is currently running 

33 should_stop_step: Flag to stop after current step 

34 should_stop_slice: Flag to stop after current slice 

35 should_stop_now: Flag for immediate stop 

36 progress_percent: Completion percentage (0-100) 

37 avg_slice_time_str: Average time per slice (formatted) 

38 remaining_time_str: Estimated remaining time (formatted) 

39 """ 

40 

41 current_slice: int = 1 

42 current_step: str = "-" 

43 total_slices: int = 0 

44 total_steps: int = 0 

45 is_running: bool = False 

46 should_stop_step: bool = False 

47 should_stop_slice: bool = False 

48 should_stop_now: bool = False 

49 progress_percent: int = 0 

50 avg_slice_time_str: str = "-" 

51 remaining_time_str: str = "-" 

52 

53 

54class ExperimentController: 

55 """Controls experiment execution without UI dependencies. 

56 

57 This class handles all experiment logic including validation, 

58 execution, stopping, and progress tracking. It communicates with 

59 the UI through callbacks only. 

60 

61 Attributes: 

62 config_path: Path to experiment configuration file 

63 state: Current experiment state 

64 """ 

65 

66 def __init__(self, config_path: Optional[Path] = None): 

67 """Initialize experiment controller. 

68 

69 Args: 

70 config_path: Path to configuration file (can be set later) 

71 """ 

72 self.config_path = config_path 

73 self.state = ExperimentState() 

74 self._callbacks: Dict[str, Callable] = {} 

75 self._thread: Optional[StoppableThread] = None 

76 self._slice_times: List[float] = [] 

77 self.experiment_settings: Optional[tbt.ExperimentSettings] = None 

78 

79 def clear_experiment_settings(self): 

80 """Clear cached experiment settings and release resources. 

81 

82 This should be called when the configuration file is edited 

83 to ensure old microscope connections are properly released. 

84 """ 

85 if self.experiment_settings is not None: 

86 # Disconnect microscope before clearing 

87 try: 

88 if self.experiment_settings.microscope is not None: 

89 utilities.disconnect_microscope( 

90 self.experiment_settings.microscope, quiet_output=True 

91 ) 

92 # print("Disconnected from microscope") 

93 except Exception as e: 

94 if str(e) == "Client is already disconnected.": 

95 pass 

96 else: 

97 print(f"Warning: Error disconnecting microscope: {e}") 

98 

99 # Clear the reference to allow garbage collection 

100 self.experiment_settings = None 

101 

102 def set_config_path(self, path: Path): 

103 """Set or update configuration file path. 

104 

105 Args: 

106 path: Path to configuration file 

107 """ 

108 self.config_path = path 

109 

110 def register_callback(self, event: str, callback: Callable): 

111 """Register a callback for state updates. 

112 

113 Args: 

114 event: Event name (e.g., 'state_changed', 'validation_failed') 

115 callback: Function to call when event occurs 

116 """ 

117 self._callbacks[event] = callback 

118 

119 def _notify(self, event: str, *args, **kwargs): 

120 """Trigger registered callback for event. 

121 

122 Args: 

123 event: Event name 

124 *args: Positional arguments for callback 

125 **kwargs: Keyword arguments for callback 

126 """ 

127 if event in self._callbacks: 

128 try: 

129 self._callbacks[event](*args, **kwargs) 

130 except Exception as e: 

131 # Don't let callback errors crash the controller 

132 print(f"Error in callback '{event}': {e}") 

133 

134 def validate_config( 

135 self, 

136 ) -> Tuple[bool, Optional[tbt.ExperimentSettings], Optional[str]]: 

137 """Validate the current configuration file. 

138 

139 Returns: 

140 Tuple of (is_valid, experiment_settings, error_message) 

141 """ 

142 if self.config_path is None: 

143 return False, None, "No configuration file loaded" 

144 

145 # Clear old experiment settings before creating new ones 

146 # This ensures old microscope connections are released 

147 self.clear_experiment_settings() 

148 

149 try: 

150 experiment_settings = workflow.setup_experiment(self.config_path) 

151 # experiment_settings = workflow.pre_flight_check(self.config_path) 

152 return True, experiment_settings, None 

153 except Exception as e: 

154 return False, None, f"Validation failed: {e}" 

155 

156 def start_experiment( 

157 self, 

158 starting_slice: int = 1, 

159 starting_step: str = None, 

160 ) -> bool: 

161 """Start experiment execution. 

162 

163 Args: 

164 starting_slice: Slice number to start at 

165 starting_step: Step name to start at (or None for first step) 

166 

167 Returns: 

168 True if experiment started successfully, False otherwise 

169 """ 

170 if self.state.is_running: 

171 self._notify("error", "Experiment is already running") 

172 return False 

173 

174 # Validate configuration 

175 is_valid, experiment_settings, error = self.validate_config() 

176 if not is_valid: 

177 self._notify("validation_failed", error) 

178 return False 

179 self.experiment_settings = experiment_settings 

180 

181 # Reset stop flags 

182 self.state = ExperimentState( 

183 total_slices=experiment_settings.general_settings.max_slice_number, 

184 total_steps=experiment_settings.general_settings.step_count, 

185 current_slice=starting_slice, 

186 is_running=True, 

187 ) 

188 

189 # Get step information 

190 step_names = [s.name for s in experiment_settings.step_sequence] 

191 if starting_step is None: 

192 starting_step_idx = 0 

193 else: 

194 starting_step_idx = step_names.index(starting_step) 

195 

196 # Check EBSD/EDS detector status and warn user if needed 

197 self._check_detector_warning(experiment_settings) 

198 

199 # Notify experiment start 

200 self._notify( 

201 "experiment_started", experiment_settings, starting_slice, starting_step_idx 

202 ) 

203 

204 # Run experiment in background thread (non-blocking) 

205 self._thread = StoppableThread( 

206 target=self._run_experiment_loop, 

207 args=(experiment_settings, starting_slice, starting_step_idx, step_names), 

208 name="ExperimentThread", 

209 ) 

210 self._thread.start() 

211 

212 return True 

213 

214 def _check_detector_warning(self, experiment_settings: tbt.ExperimentSettings): 

215 """Check EBSD/EDS detector status and notify UI if warning needed. 

216 

217 Args: 

218 experiment_settings: Validated experiment settings 

219 """ 

220 # Check if EBSD and EDS are enabled 

221 if not experiment_settings.enable_EBSD or not experiment_settings.enable_EDS: 

222 if experiment_settings.enable_EBSD: 

223 message_part1 = "EDS is not enabled" 

224 elif experiment_settings.enable_EDS: 

225 message_part1 = "EBSD is not enabled" 

226 else: 

227 message_part1 = "EBSD and EDS are not enabled" 

228 

229 message_part2 = ( 

230 ", you will not have access to safety checking and these modalities " 

231 "during data collection. Please ensure these detectors are retracted " 

232 "before proceeding." 

233 ) 

234 warning_message = message_part1 + message_part2 

235 

236 # Notify UI to show warning 

237 self._notify("detector_warning", warning_message) 

238 

239 def _run_experiment_loop( 

240 self, 

241 experiment_settings: tbt.ExperimentSettings, 

242 starting_slice: int, 

243 starting_step_idx: int, 

244 step_names: List[str], 

245 ): 

246 """Execute the main experiment loop. 

247 

248 This method runs the actual experiment, calling workflow steps 

249 and updating progress. 

250 

251 Args: 

252 experiment_settings: Validated experiment settings 

253 starting_slice: Starting slice number 

254 starting_step_idx: Starting step index 

255 step_names: List of step names 

256 """ 

257 self._slice_times = [] 

258 ending_slice = self.state.total_slices 

259 num_steps = self.state.total_steps 

260 

261 try: 

262 for i in range(starting_slice, ending_slice + 1): 

263 if self.state.should_stop_now: 

264 raise KeyboardInterrupt 

265 

266 # Track slice start time 

267 slice_start = time.time() 

268 count_slice_for_time = True 

269 self.state.current_slice = i 

270 self._notify("state_changed", self.state) 

271 

272 for j in range(num_steps): 

273 # Skip steps if starting mid-slice 

274 if i == starting_slice and j < starting_step_idx: 

275 count_slice_for_time = False 

276 continue 

277 

278 if self.state.should_stop_now: 

279 raise KeyboardInterrupt 

280 

281 # Update current step 

282 self.state.current_step = step_names[j] 

283 self._notify("state_changed", self.state) 

284 

285 # Execute step 

286 success = self._execute_step(i, j + 1, experiment_settings) 

287 

288 if not success or self.state.should_stop_now: 

289 self.state.should_stop_now = True 

290 break 

291 

292 # Update progress 

293 self._update_progress(i, j + 1, ending_slice, num_steps) 

294 

295 if self.state.should_stop_step: 

296 break 

297 

298 # Check stop conditions 

299 if ( 

300 self.state.should_stop_step 

301 or self.state.should_stop_slice 

302 or self.state.should_stop_now 

303 ): 

304 break 

305 

306 # Update timing stats 

307 slice_end = time.time() 

308 if count_slice_for_time: 

309 self._slice_times.append(slice_end - slice_start) 

310 self._update_timing_stats(i, ending_slice) 

311 

312 except KeyboardInterrupt: 

313 self._notify("experiment_interrupted") 

314 finally: 

315 self._cleanup_experiment( 

316 experiment_settings, i, j if "j" in locals() else 0, num_steps 

317 ) 

318 

319 def _execute_step( 

320 self, 

321 slice_number: int, 

322 step_index: int, 

323 experiment_settings: tbt.ExperimentSettings, 

324 ) -> bool: 

325 """Execute a single workflow step. 

326 

327 Args: 

328 slice_number: Current slice number 

329 step_index: Current step index (1-based) 

330 experiment_settings: Experiment settings 

331 

332 Returns: 

333 True if step succeeded, False if error occurred 

334 """ 

335 try: 

336 workflow.perform_step(slice_number, step_index, experiment_settings) 

337 return True 

338 except KeyboardInterrupt: 

339 self._try_stop_stage(experiment_settings.microscope) 

340 return False 

341 except Exception as e: 

342 message = f"Unexpected error in step {step_index} of slice {slice_number}: {e.__class__.__name__}: {e}" 

343 print(message) 

344 self._log_error(e, slice_number, step_index) 

345 self._try_stop_stage(experiment_settings.microscope) 

346 return False 

347 

348 def _try_stop_stage(self, microscope): 

349 """Attempt to stop stage movement. 

350 

351 Args: 

352 microscope: Microscope object 

353 """ 

354 try: 

355 stage.stop(microscope) 

356 print("-----> Stage stop unsuccessful") 

357 except SystemError: 

358 print("-----> Stage stop successful") 

359 

360 def _log_error(self, error: Exception, slice_number: int, step_index: int): 

361 """Log error to file. 

362 

363 Args: 

364 error: Exception that occurred 

365 slice_number: Slice where error occurred 

366 step_index: Step where error occurred 

367 """ 

368 app_config = AppConfig.from_env() 

369 app_config.ensure_directories() 

370 err_path = app_config.get_error_log_path() 

371 

372 with open(err_path, "w") as f: 

373 f.write(f"Error in slice {slice_number}, step {step_index}\n") 

374 f.write(f"Exception: {type(error).__name__} - {error}\n\n") 

375 traceback.print_exc(file=f) 

376 

377 print(f"Error details saved to: {err_path}") 

378 

379 def _update_progress( 

380 self, slice_num: int, step_num: int, total_slices: int, total_steps: int 

381 ): 

382 """Update progress percentage. 

383 

384 Args: 

385 slice_num: Current slice number 

386 step_num: Current step number 

387 total_slices: Total slices 

388 total_steps: Total steps per slice 

389 """ 

390 completed_steps = (slice_num - 1) * total_steps + step_num 

391 total_work = total_slices * total_steps 

392 self.state.progress_percent = int((completed_steps / total_work) * 100) 

393 self._notify("state_changed", self.state) 

394 

395 def _update_timing_stats(self, current_slice: int, total_slices: int): 

396 """Update timing statistics. 

397 

398 Args: 

399 current_slice: Current slice number 

400 total_slices: Total number of slices 

401 """ 

402 if not self._slice_times: 

403 return 

404 

405 avg_time = sum(self._slice_times) / len(self._slice_times) 

406 remaining_slices = total_slices - current_slice 

407 remaining_time = avg_time * remaining_slices 

408 

409 self.state.avg_slice_time_str = str(datetime.timedelta(seconds=int(avg_time))) 

410 self.state.remaining_time_str = str( 

411 datetime.timedelta(seconds=int(remaining_time)) 

412 ) 

413 self._notify("state_changed", self.state) 

414 

415 def _cleanup_experiment( 

416 self, 

417 experiment_settings: tbt.ExperimentSettings, 

418 final_slice: int, 

419 final_step: int, 

420 total_steps: int, 

421 ): 

422 """Clean up after experiment completion or stop. 

423 

424 Args: 

425 experiment_settings: Experiment settings 

426 final_slice: Last slice that was processed 

427 final_step: Last step that was processed 

428 total_steps: Total steps per slice 

429 """ 

430 # Retract all devices 

431 try: 

432 insertable_devices.retract_all_devices( 

433 microscope=experiment_settings.microscope, 

434 enable_EBSD=experiment_settings.enable_EBSD, 

435 enable_EDS=experiment_settings.enable_EDS, 

436 ) 

437 except Exception as e: 

438 print(f"Warning: Failed to retract devices: {e}") 

439 

440 # Disconnect microscope to release resources 

441 try: 

442 if experiment_settings.microscope is not None: 

443 utilities.disconnect_microscope( 

444 experiment_settings.microscope, quiet_output=True 

445 ) 

446 # print("Disconnected from microscope") 

447 except Exception as e: 

448 if str(e) == "Client is already disconnected.": 

449 pass 

450 else: 

451 print(f"Warning: Failed to disconnect microscope: {e}") 

452 

453 # Get whether or not last step was completed 

454 is_step_completed = not self.state.should_stop_now 

455 

456 # Update state 

457 self.state.is_running = False 

458 self.state.should_stop_step = False 

459 self.state.should_stop_slice = False 

460 self.state.should_stop_now = False 

461 

462 # Notify completion 

463 if final_slice == self.state.total_slices and final_step == total_steps - 1: 

464 self._notify("experiment_completed") 

465 else: 

466 # If step is completed, move to next step for resume 

467 if is_step_completed: 

468 if final_step + 1 < total_steps: 

469 final_step += 1 

470 else: 

471 final_slice += 1 

472 final_step = 0 

473 # Convert to 1-based step name 

474 final_step_name = experiment_settings.step_sequence[final_step].name 

475 self._notify("experiment_stopped", final_slice, final_step_name) 

476 

477 def request_stop_after_step(self): 

478 """Request experiment stop after current step completes.""" 

479 if not self.state.is_running: 

480 return 

481 

482 self.state.should_stop_step = True 

483 self._notify("stop_requested", "step") 

484 print("-----> Stopping after current step") 

485 

486 def request_stop_after_slice(self): 

487 """Request experiment stop after current slice completes.""" 

488 if not self.state.is_running: 

489 return 

490 

491 self.state.should_stop_slice = True 

492 self._notify("stop_requested", "slice") 

493 print("-----> Stopping after current slice") 

494 

495 def request_stop_now(self): 

496 """Request immediate experiment stop.""" 

497 if not self.state.is_running: 

498 return 

499 

500 self.state.should_stop_now = True 

501 self._notify("stop_requested", "now") 

502 print("-----> Experiment stopped immediately by user") 

503 

504 # Try to interrupt hardware 

505 try: 

506 generate_escape_keypress() 

507 except Exception as e: 

508 print(f"Warning: Failed to send escape keypress: {e}") 

509 

510 # If there's a running thread, interrupt it 

511 if self._thread and self._thread.is_alive(): 

512 try: 

513 count = 0 

514 while self._thread.is_alive(): 

515 self._thread.raise_exception(KeyboardInterrupt) 

516 time.sleep(0.1) 

517 count += 1 

518 if count >= 10: 

519 break 

520 except Exception as e: 

521 print(f"Warning: Failed to interrupt thread: {e}")