Coverage for src/pytribeam/GUI/runner_util/experiment_controller.py: 0%
226 statements
« prev ^ index » next coverage.py v7.6.1, created at 2026-06-16 18:30 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2026-06-16 18:30 +0000
1"""Experiment execution controller.
3This module handles the business logic for running experiments,
4separated from the GUI presentation layer.
5"""
7import time
8import datetime
9import traceback
10from dataclasses import dataclass
11from pathlib import Path
12from typing import Callable, Optional, Dict, Any, Tuple, List
14import pytribeam.types as tbt
15from pytribeam import workflow, stage, insertable_devices, utilities
16from pytribeam.GUI.common import AppConfig, StoppableThread
17from pytribeam.GUI.common.threading_utils import generate_escape_keypress
20@dataclass
21class ExperimentState:
22 """Represents the current state of an experiment.
24 This is an immutable snapshot of experiment state that can be
25 passed to UI callbacks for updates.
27 Attributes:
28 current_slice: Current slice number being processed
29 current_step: Current step name being processed
30 total_slices: Total number of slices in experiment
31 total_steps: Total number of steps per slice
32 is_running: Whether experiment is currently running
33 should_stop_step: Flag to stop after current step
34 should_stop_slice: Flag to stop after current slice
35 should_stop_now: Flag for immediate stop
36 progress_percent: Completion percentage (0-100)
37 avg_slice_time_str: Average time per slice (formatted)
38 remaining_time_str: Estimated remaining time (formatted)
39 """
41 current_slice: int = 1
42 current_step: str = "-"
43 total_slices: int = 0
44 total_steps: int = 0
45 is_running: bool = False
46 should_stop_step: bool = False
47 should_stop_slice: bool = False
48 should_stop_now: bool = False
49 progress_percent: int = 0
50 avg_slice_time_str: str = "-"
51 remaining_time_str: str = "-"
54class ExperimentController:
55 """Controls experiment execution without UI dependencies.
57 This class handles all experiment logic including validation,
58 execution, stopping, and progress tracking. It communicates with
59 the UI through callbacks only.
61 Attributes:
62 config_path: Path to experiment configuration file
63 state: Current experiment state
64 """
66 def __init__(self, config_path: Optional[Path] = None):
67 """Initialize experiment controller.
69 Args:
70 config_path: Path to configuration file (can be set later)
71 """
72 self.config_path = config_path
73 self.state = ExperimentState()
74 self._callbacks: Dict[str, Callable] = {}
75 self._thread: Optional[StoppableThread] = None
76 self._slice_times: List[float] = []
77 self.experiment_settings: Optional[tbt.ExperimentSettings] = None
79 def clear_experiment_settings(self):
80 """Clear cached experiment settings and release resources.
82 This should be called when the configuration file is edited
83 to ensure old microscope connections are properly released.
84 """
85 if self.experiment_settings is not None:
86 # Disconnect microscope before clearing
87 try:
88 if self.experiment_settings.microscope is not None:
89 utilities.disconnect_microscope(
90 self.experiment_settings.microscope, quiet_output=True
91 )
92 # print("Disconnected from microscope")
93 except Exception as e:
94 if str(e) == "Client is already disconnected.":
95 pass
96 else:
97 print(f"Warning: Error disconnecting microscope: {e}")
99 # Clear the reference to allow garbage collection
100 self.experiment_settings = None
102 def set_config_path(self, path: Path):
103 """Set or update configuration file path.
105 Args:
106 path: Path to configuration file
107 """
108 self.config_path = path
110 def register_callback(self, event: str, callback: Callable):
111 """Register a callback for state updates.
113 Args:
114 event: Event name (e.g., 'state_changed', 'validation_failed')
115 callback: Function to call when event occurs
116 """
117 self._callbacks[event] = callback
119 def _notify(self, event: str, *args, **kwargs):
120 """Trigger registered callback for event.
122 Args:
123 event: Event name
124 *args: Positional arguments for callback
125 **kwargs: Keyword arguments for callback
126 """
127 if event in self._callbacks:
128 try:
129 self._callbacks[event](*args, **kwargs)
130 except Exception as e:
131 # Don't let callback errors crash the controller
132 print(f"Error in callback '{event}': {e}")
134 def validate_config(
135 self,
136 ) -> Tuple[bool, Optional[tbt.ExperimentSettings], Optional[str]]:
137 """Validate the current configuration file.
139 Returns:
140 Tuple of (is_valid, experiment_settings, error_message)
141 """
142 if self.config_path is None:
143 return False, None, "No configuration file loaded"
145 # Clear old experiment settings before creating new ones
146 # This ensures old microscope connections are released
147 self.clear_experiment_settings()
149 try:
150 experiment_settings = workflow.setup_experiment(self.config_path)
151 # experiment_settings = workflow.pre_flight_check(self.config_path)
152 return True, experiment_settings, None
153 except Exception as e:
154 return False, None, f"Validation failed: {e}"
156 def start_experiment(
157 self,
158 starting_slice: int = 1,
159 starting_step: str = None,
160 ) -> bool:
161 """Start experiment execution.
163 Args:
164 starting_slice: Slice number to start at
165 starting_step: Step name to start at (or None for first step)
167 Returns:
168 True if experiment started successfully, False otherwise
169 """
170 if self.state.is_running:
171 self._notify("error", "Experiment is already running")
172 return False
174 # Validate configuration
175 is_valid, experiment_settings, error = self.validate_config()
176 if not is_valid:
177 self._notify("validation_failed", error)
178 return False
179 self.experiment_settings = experiment_settings
181 # Reset stop flags
182 self.state = ExperimentState(
183 total_slices=experiment_settings.general_settings.max_slice_number,
184 total_steps=experiment_settings.general_settings.step_count,
185 current_slice=starting_slice,
186 is_running=True,
187 )
189 # Get step information
190 step_names = [s.name for s in experiment_settings.step_sequence]
191 if starting_step is None:
192 starting_step_idx = 0
193 else:
194 starting_step_idx = step_names.index(starting_step)
196 # Check EBSD/EDS detector status and warn user if needed
197 self._check_detector_warning(experiment_settings)
199 # Notify experiment start
200 self._notify(
201 "experiment_started", experiment_settings, starting_slice, starting_step_idx
202 )
204 # Run experiment in background thread (non-blocking)
205 self._thread = StoppableThread(
206 target=self._run_experiment_loop,
207 args=(experiment_settings, starting_slice, starting_step_idx, step_names),
208 name="ExperimentThread",
209 )
210 self._thread.start()
212 return True
214 def _check_detector_warning(self, experiment_settings: tbt.ExperimentSettings):
215 """Check EBSD/EDS detector status and notify UI if warning needed.
217 Args:
218 experiment_settings: Validated experiment settings
219 """
220 # Check if EBSD and EDS are enabled
221 if not experiment_settings.enable_EBSD or not experiment_settings.enable_EDS:
222 if experiment_settings.enable_EBSD:
223 message_part1 = "EDS is not enabled"
224 elif experiment_settings.enable_EDS:
225 message_part1 = "EBSD is not enabled"
226 else:
227 message_part1 = "EBSD and EDS are not enabled"
229 message_part2 = (
230 ", you will not have access to safety checking and these modalities "
231 "during data collection. Please ensure these detectors are retracted "
232 "before proceeding."
233 )
234 warning_message = message_part1 + message_part2
236 # Notify UI to show warning
237 self._notify("detector_warning", warning_message)
239 def _run_experiment_loop(
240 self,
241 experiment_settings: tbt.ExperimentSettings,
242 starting_slice: int,
243 starting_step_idx: int,
244 step_names: List[str],
245 ):
246 """Execute the main experiment loop.
248 This method runs the actual experiment, calling workflow steps
249 and updating progress.
251 Args:
252 experiment_settings: Validated experiment settings
253 starting_slice: Starting slice number
254 starting_step_idx: Starting step index
255 step_names: List of step names
256 """
257 self._slice_times = []
258 ending_slice = self.state.total_slices
259 num_steps = self.state.total_steps
261 try:
262 for i in range(starting_slice, ending_slice + 1):
263 if self.state.should_stop_now:
264 raise KeyboardInterrupt
266 # Track slice start time
267 slice_start = time.time()
268 count_slice_for_time = True
269 self.state.current_slice = i
270 self._notify("state_changed", self.state)
272 for j in range(num_steps):
273 # Skip steps if starting mid-slice
274 if i == starting_slice and j < starting_step_idx:
275 count_slice_for_time = False
276 continue
278 if self.state.should_stop_now:
279 raise KeyboardInterrupt
281 # Update current step
282 self.state.current_step = step_names[j]
283 self._notify("state_changed", self.state)
285 # Execute step
286 success = self._execute_step(i, j + 1, experiment_settings)
288 if not success or self.state.should_stop_now:
289 self.state.should_stop_now = True
290 break
292 # Update progress
293 self._update_progress(i, j + 1, ending_slice, num_steps)
295 if self.state.should_stop_step:
296 break
298 # Check stop conditions
299 if (
300 self.state.should_stop_step
301 or self.state.should_stop_slice
302 or self.state.should_stop_now
303 ):
304 break
306 # Update timing stats
307 slice_end = time.time()
308 if count_slice_for_time:
309 self._slice_times.append(slice_end - slice_start)
310 self._update_timing_stats(i, ending_slice)
312 except KeyboardInterrupt:
313 self._notify("experiment_interrupted")
314 finally:
315 self._cleanup_experiment(
316 experiment_settings, i, j if "j" in locals() else 0, num_steps
317 )
319 def _execute_step(
320 self,
321 slice_number: int,
322 step_index: int,
323 experiment_settings: tbt.ExperimentSettings,
324 ) -> bool:
325 """Execute a single workflow step.
327 Args:
328 slice_number: Current slice number
329 step_index: Current step index (1-based)
330 experiment_settings: Experiment settings
332 Returns:
333 True if step succeeded, False if error occurred
334 """
335 try:
336 workflow.perform_step(slice_number, step_index, experiment_settings)
337 return True
338 except KeyboardInterrupt:
339 self._try_stop_stage(experiment_settings.microscope)
340 return False
341 except Exception as e:
342 message = f"Unexpected error in step {step_index} of slice {slice_number}: {e.__class__.__name__}: {e}"
343 print(message)
344 self._log_error(e, slice_number, step_index)
345 self._try_stop_stage(experiment_settings.microscope)
346 return False
348 def _try_stop_stage(self, microscope):
349 """Attempt to stop stage movement.
351 Args:
352 microscope: Microscope object
353 """
354 try:
355 stage.stop(microscope)
356 print("-----> Stage stop unsuccessful")
357 except SystemError:
358 print("-----> Stage stop successful")
360 def _log_error(self, error: Exception, slice_number: int, step_index: int):
361 """Log error to file.
363 Args:
364 error: Exception that occurred
365 slice_number: Slice where error occurred
366 step_index: Step where error occurred
367 """
368 app_config = AppConfig.from_env()
369 app_config.ensure_directories()
370 err_path = app_config.get_error_log_path()
372 with open(err_path, "w") as f:
373 f.write(f"Error in slice {slice_number}, step {step_index}\n")
374 f.write(f"Exception: {type(error).__name__} - {error}\n\n")
375 traceback.print_exc(file=f)
377 print(f"Error details saved to: {err_path}")
379 def _update_progress(
380 self, slice_num: int, step_num: int, total_slices: int, total_steps: int
381 ):
382 """Update progress percentage.
384 Args:
385 slice_num: Current slice number
386 step_num: Current step number
387 total_slices: Total slices
388 total_steps: Total steps per slice
389 """
390 completed_steps = (slice_num - 1) * total_steps + step_num
391 total_work = total_slices * total_steps
392 self.state.progress_percent = int((completed_steps / total_work) * 100)
393 self._notify("state_changed", self.state)
395 def _update_timing_stats(self, current_slice: int, total_slices: int):
396 """Update timing statistics.
398 Args:
399 current_slice: Current slice number
400 total_slices: Total number of slices
401 """
402 if not self._slice_times:
403 return
405 avg_time = sum(self._slice_times) / len(self._slice_times)
406 remaining_slices = total_slices - current_slice
407 remaining_time = avg_time * remaining_slices
409 self.state.avg_slice_time_str = str(datetime.timedelta(seconds=int(avg_time)))
410 self.state.remaining_time_str = str(
411 datetime.timedelta(seconds=int(remaining_time))
412 )
413 self._notify("state_changed", self.state)
415 def _cleanup_experiment(
416 self,
417 experiment_settings: tbt.ExperimentSettings,
418 final_slice: int,
419 final_step: int,
420 total_steps: int,
421 ):
422 """Clean up after experiment completion or stop.
424 Args:
425 experiment_settings: Experiment settings
426 final_slice: Last slice that was processed
427 final_step: Last step that was processed
428 total_steps: Total steps per slice
429 """
430 # Retract all devices
431 try:
432 insertable_devices.retract_all_devices(
433 microscope=experiment_settings.microscope,
434 enable_EBSD=experiment_settings.enable_EBSD,
435 enable_EDS=experiment_settings.enable_EDS,
436 )
437 except Exception as e:
438 print(f"Warning: Failed to retract devices: {e}")
440 # Disconnect microscope to release resources
441 try:
442 if experiment_settings.microscope is not None:
443 utilities.disconnect_microscope(
444 experiment_settings.microscope, quiet_output=True
445 )
446 # print("Disconnected from microscope")
447 except Exception as e:
448 if str(e) == "Client is already disconnected.":
449 pass
450 else:
451 print(f"Warning: Failed to disconnect microscope: {e}")
453 # Get whether or not last step was completed
454 is_step_completed = not self.state.should_stop_now
456 # Update state
457 self.state.is_running = False
458 self.state.should_stop_step = False
459 self.state.should_stop_slice = False
460 self.state.should_stop_now = False
462 # Notify completion
463 if final_slice == self.state.total_slices and final_step == total_steps - 1:
464 self._notify("experiment_completed")
465 else:
466 # If step is completed, move to next step for resume
467 if is_step_completed:
468 if final_step + 1 < total_steps:
469 final_step += 1
470 else:
471 final_slice += 1
472 final_step = 0
473 # Convert to 1-based step name
474 final_step_name = experiment_settings.step_sequence[final_step].name
475 self._notify("experiment_stopped", final_slice, final_step_name)
477 def request_stop_after_step(self):
478 """Request experiment stop after current step completes."""
479 if not self.state.is_running:
480 return
482 self.state.should_stop_step = True
483 self._notify("stop_requested", "step")
484 print("-----> Stopping after current step")
486 def request_stop_after_slice(self):
487 """Request experiment stop after current slice completes."""
488 if not self.state.is_running:
489 return
491 self.state.should_stop_slice = True
492 self._notify("stop_requested", "slice")
493 print("-----> Stopping after current slice")
495 def request_stop_now(self):
496 """Request immediate experiment stop."""
497 if not self.state.is_running:
498 return
500 self.state.should_stop_now = True
501 self._notify("stop_requested", "now")
502 print("-----> Experiment stopped immediately by user")
504 # Try to interrupt hardware
505 try:
506 generate_escape_keypress()
507 except Exception as e:
508 print(f"Warning: Failed to send escape keypress: {e}")
510 # If there's a running thread, interrupt it
511 if self._thread and self._thread.is_alive():
512 try:
513 count = 0
514 while self._thread.is_alive():
515 self._thread.raise_exception(KeyboardInterrupt)
516 time.sleep(0.1)
517 count += 1
518 if count >= 10:
519 break
520 except Exception as e:
521 print(f"Warning: Failed to interrupt thread: {e}")