Coverage for src/pytribeam/GUI/common/threading_utils.py: 0%
206 statements
« prev ^ index » next coverage.py v7.6.1, created at 2026-06-16 18:30 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2026-06-16 18:30 +0000
1"""Thread management utilities for GUI operations.
3This module provides thread management capabilities including stoppable threads
4and Windows-specific keyboard event generation for emergency stops.
5"""
7import ctypes
8import inspect
9import threading
10import time
11from collections import namedtuple
12from ctypes import wintypes
13from typing import Any, Callable, Dict, Optional, Tuple
16class StoppableThread(threading.Thread):
17 """Thread that can be stopped via exception injection.
19 This thread wrapper allows raising exceptions in a running thread,
20 which is useful for implementing emergency stops in long-running operations.
22 Attributes:
23 result: Dictionary containing 'value' and 'error' from thread execution
24 """
26 def __init__(
27 self,
28 target: Callable,
29 args: Tuple = (),
30 kwargs: Optional[dict] = None,
31 name: Optional[str] = None,
32 ):
33 """Initialize stoppable thread.
35 Args:
36 target: Function to run in thread
37 args: Positional arguments for target function
38 kwargs: Keyword arguments for target function
39 name: Optional thread name for debugging
40 """
41 self._user_target = target
42 self._result: Dict[str, Any] = {"value": None, "error": None, "stopped": False}
43 self._thread_id: Optional[int] = None
45 super().__init__(
46 target=self._wrapped_target,
47 args=args,
48 kwargs=kwargs or {},
49 name=name,
50 )
52 self.daemon = False
54 def _wrapped_target(self, *args, **kwargs):
55 """Wrap target to capture result/errors."""
56 try:
57 result = self._user_target(*args, **kwargs)
58 self._result["value"] = result
59 return result
60 except BaseException as e:
61 self._result["error"] = e
62 self._result["value"] = None
63 self._result["stopped"] = isinstance(e, (KeyboardInterrupt, SystemExit))
64 return None
66 @property
67 def result(self) -> Dict[str, Any]:
68 """Get result dictionary containing 'value' and 'error'."""
69 return self._result
71 def _get_thread_id(self) -> int:
72 """Get thread ID for exception raising.
74 Returns:
75 Thread ID
77 Raises:
78 threading.ThreadError: If thread is not active
79 AssertionError: If thread ID cannot be determined
80 """
81 if not self.is_alive():
82 raise threading.ThreadError("the thread is not active")
84 if self._thread_id is not None:
85 return self._thread_id
87 # Look for thread in _active dict
88 for tid, tobj in threading._active.items():
89 if tobj is self:
90 self._thread_id = tid
91 return tid
93 raise AssertionError("could not determine the thread's id")
95 def raise_exception(self, exc_type: type):
96 """Raise exception in the context of this thread.
98 Note: If thread is in a system call, exception may be ignored.
99 For critical stops, repeatedly call this method until thread exits.
101 Args:
102 exc_type: Exception type to raise (e.g., KeyboardInterrupt)
104 Raises:
105 TypeError: If exc_type is not a type
106 ValueError: If thread ID is invalid
107 SystemError: If exception raising fails
108 """
109 if not inspect.isclass(exc_type):
110 raise TypeError("Only types can be raised (not instances)")
112 tid = self._get_thread_id()
113 res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
114 ctypes.c_long(tid), ctypes.py_object(exc_type)
115 )
117 if res == 0:
118 raise ValueError("invalid thread id")
119 elif res != 1:
120 # Revert the effect if multiple threads were affected
121 ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), None)
122 raise SystemError("PyThreadState_SetAsyncExc failed")
125class ThreadManager:
126 """Manages lifecycle of multiple threads.
128 Provides centralized thread management with named thread tracking
129 and emergency stop capabilities.
130 """
132 def __init__(self):
133 """Initialize thread manager."""
134 self._threads: Dict[str, StoppableThread] = {}
136 def run_async(
137 self,
138 name: str,
139 target: Callable,
140 args: Tuple = (),
141 kwargs: Optional[dict] = None,
142 ) -> StoppableThread:
143 """Run function in managed thread.
145 Args:
146 name: Unique name for this thread
147 target: Function to run
148 args: Positional arguments
149 kwargs: Keyword arguments
151 Returns:
152 Started StoppableThread instance
153 """
154 thread = StoppableThread(target=target, args=args, kwargs=kwargs, name=name)
155 self._threads[name] = thread
156 thread.start()
157 return thread
159 def get_thread(self, name: str) -> Optional[StoppableThread]:
160 """Get thread by name.
162 Args:
163 name: Thread name
165 Returns:
166 Thread instance or None if not found
167 """
168 return self._threads.get(name)
170 def is_running(self, name: str) -> bool:
171 """Check if named thread is still running.
173 Args:
174 name: Thread name
176 Returns:
177 True if thread exists and is alive
178 """
179 thread = self._threads.get(name)
180 return thread is not None and thread.is_alive()
182 def stop_thread(self, name: str, exc_type: type = KeyboardInterrupt):
183 """Stop specific thread by raising exception.
185 Args:
186 name: Thread name
187 exc_type: Exception type to raise
188 """
189 thread = self._threads.get(name)
190 if thread and thread.is_alive():
191 thread.raise_exception(exc_type)
193 def stop_all(self, exc_type: type = KeyboardInterrupt):
194 """Stop all managed threads.
196 Args:
197 exc_type: Exception type to raise in all threads
198 """
199 for thread in self._threads.values():
200 if thread.is_alive():
201 try:
202 thread.raise_exception(exc_type)
203 except (ValueError, SystemError):
204 # Thread may have already stopped
205 pass
207 def wait_for_thread(self, name: str, timeout: Optional[float] = None) -> bool:
208 """Wait for specific thread to complete.
210 Args:
211 name: Thread name
212 timeout: Maximum time to wait in seconds
214 Returns:
215 True if thread completed, False if timeout occurred
216 """
217 thread = self._threads.get(name)
218 if thread is None:
219 return True
220 thread.join(timeout)
221 return not thread.is_alive()
223 def cleanup(self):
224 """Clean up stopped threads from tracking."""
225 self._threads = {
226 name: thread for name, thread in self._threads.items() if thread.is_alive()
227 }
230class TextRedirector:
231 """Redirect stdout/stderr to a Tkinter Text widget.
233 This allows capturing print statements and displaying them in the GUI
234 while optionally logging to a file. Thread-safe for use with background threads.
235 """
237 def __init__(self, widget, tag: str = "stdout", log_path: Optional[str] = None):
238 """Initialize text redirector.
240 Args:
241 widget: Tkinter Text widget to write to
242 tag: Tag for text styling (e.g., 'stdout', 'stderr')
243 log_path: Optional file path to also log output
244 """
245 self.widget = widget
246 self.tag = tag
247 self.log_path = log_path
248 self._main_thread_id = threading.current_thread().ident
250 if self.log_path is not None:
251 import os
253 if not os.path.exists(self.log_path):
254 os.makedirs(os.path.dirname(self.log_path), exist_ok=True)
255 with open(self.log_path, "w") as f:
256 f.write(time.strftime("%Y-%m-%d %H:%M:%S") + "\n")
258 def write(self, text: str):
259 """Write text to widget and optional log file.
261 Args:
262 text: Text to write
263 """
264 import tkinter as tk
266 # Write to log file FIRST to ensure it always happens
267 # even if widget access fails
268 if self.log_path is not None:
269 try:
270 with open(self.log_path, "a") as f:
271 f.write(text)
272 except Exception:
273 # Ignore file write errors to avoid breaking stdout
274 pass
276 # Write to widget using thread-safe approach
277 # If we're on the main thread, write directly
278 # If we're on a background thread, schedule on main thread
279 if threading.current_thread().ident == self._main_thread_id:
280 self._write_to_widget(text)
281 else:
282 # Schedule widget update on main thread using after()
283 try:
284 self.widget.after(0, self._write_to_widget, text)
285 except Exception:
286 # If after() fails (widget destroyed), fall back to direct write
287 self._write_to_widget(text)
289 def _write_to_widget(self, text: str):
290 """Internal method to write text to widget.
292 Args:
293 text: Text to write
294 """
295 import tkinter as tk
297 # Protect ALL widget access in try/except
298 try:
299 # Check if we should autoscroll
300 autoscroll = getattr(self.widget, "autoscroll", True)
301 if autoscroll:
302 bottom = self.widget.yview()[1]
304 # Write to widget
305 self.widget.config(state=tk.NORMAL)
306 self.widget.insert(tk.END, text, (self.tag,))
307 self.widget.config(state=tk.DISABLED)
309 # Autoscroll if at bottom
310 if autoscroll and bottom == 1:
311 self.widget.see(tk.END)
313 # Force GUI update to show text immediately
314 self.widget.update_idletasks()
315 except tk.TclError:
316 # Widget may have been destroyed or is not accessible
317 # This can happen when writing from a background thread
318 pass
319 except Exception:
320 # Catch any other widget-related errors to prevent
321 # breaking stdout/stderr redirection
322 pass
324 def flush(self):
325 """Flush output (required for file-like interface)."""
326 pass
329def generate_escape_keypress():
330 """Generate escape keypress for Windows microscope control.
332 This function finds the microscope control window and sends escape
333 and F6 key presses to stop ongoing operations.
335 Note: This is Windows-specific and uses ctypes to interact with Win32 API.
337 Raises:
338 OSError: If on non-Windows platform
339 """
340 import sys
342 if sys.platform != "win32":
343 raise OSError("generate_escape_keypress only works on Windows")
345 user32 = ctypes.WinDLL("user32", use_last_error=True)
347 # Window enumeration setup
348 def check_zero(result, func, args):
349 if not result:
350 err = ctypes.get_last_error()
351 if err:
352 raise ctypes.WinError(err)
353 return args
355 if not hasattr(wintypes, "LPDWORD"):
356 wintypes.LPDWORD = ctypes.POINTER(wintypes.DWORD)
358 WindowInfo = namedtuple("WindowInfo", "pid title")
360 WNDENUMPROC = ctypes.WINFUNCTYPE(
361 wintypes.BOOL,
362 wintypes.HWND,
363 wintypes.LPARAM,
364 )
366 user32.EnumWindows.errcheck = check_zero
367 user32.EnumWindows.argtypes = (WNDENUMPROC, wintypes.LPARAM)
368 user32.IsWindowVisible.argtypes = (wintypes.HWND,)
369 user32.GetWindowThreadProcessId.restype = wintypes.DWORD
370 user32.GetWindowThreadProcessId.argtypes = (wintypes.HWND, wintypes.LPDWORD)
371 user32.GetWindowTextLengthW.errcheck = check_zero
372 user32.GetWindowTextLengthW.argtypes = (wintypes.HWND,)
373 user32.GetWindowTextW.errcheck = check_zero
374 user32.GetWindowTextW.argtypes = (wintypes.HWND, wintypes.LPWSTR, ctypes.c_int)
376 # Enumerate windows
377 result = []
379 @WNDENUMPROC
380 def enum_proc(hWnd, lParam):
381 if user32.IsWindowVisible(hWnd):
382 pid = wintypes.DWORD()
383 user32.GetWindowThreadProcessId(hWnd, ctypes.byref(pid))
384 length = user32.GetWindowTextLengthW(hWnd) + 1
385 title = ctypes.create_unicode_buffer(length)
386 user32.GetWindowTextW(hWnd, title, length)
387 result.append(WindowInfo(pid.value, title.value))
388 return True
390 user32.EnumWindows(enum_proc, 0)
392 # Input simulation setup
393 INPUT_KEYBOARD = 1
394 KEYEVENTF_KEYUP = 0x0002
395 KEYEVENTF_UNICODE = 0x0004
396 MAPVK_VK_TO_VSC = 0
398 wintypes.ULONG_PTR = wintypes.WPARAM
400 class MOUSEINPUT(ctypes.Structure):
401 _fields_ = (
402 ("dx", wintypes.LONG),
403 ("dy", wintypes.LONG),
404 ("mouseData", wintypes.DWORD),
405 ("dwFlags", wintypes.DWORD),
406 ("time", wintypes.DWORD),
407 ("dwExtraInfo", wintypes.ULONG_PTR),
408 )
410 class KEYBDINPUT(ctypes.Structure):
411 _fields_ = (
412 ("wVk", wintypes.WORD),
413 ("wScan", wintypes.WORD),
414 ("dwFlags", wintypes.DWORD),
415 ("time", wintypes.DWORD),
416 ("dwExtraInfo", wintypes.ULONG_PTR),
417 )
419 def __init__(self, *args, **kwds):
420 super(KEYBDINPUT, self).__init__(*args, **kwds)
421 if not self.dwFlags & KEYEVENTF_UNICODE:
422 self.wScan = user32.MapVirtualKeyExW(self.wVk, MAPVK_VK_TO_VSC, 0)
424 class HARDWAREINPUT(ctypes.Structure):
425 _fields_ = (
426 ("uMsg", wintypes.DWORD),
427 ("wParamL", wintypes.WORD),
428 ("wParamH", wintypes.WORD),
429 )
431 class INPUT(ctypes.Structure):
432 class _INPUT(ctypes.Union):
433 _fields_ = (("ki", KEYBDINPUT), ("mi", MOUSEINPUT), ("hi", HARDWAREINPUT))
435 _anonymous_ = ("_input",)
436 _fields_ = (("type", wintypes.DWORD), ("_input", _INPUT))
438 LPINPUT = ctypes.POINTER(INPUT)
440 def _check_count(result, func, args):
441 if result == 0:
442 raise ctypes.WinError(ctypes.get_last_error())
443 return args
445 user32.SendInput.errcheck = _check_count
446 user32.SendInput.argtypes = (wintypes.UINT, LPINPUT, ctypes.c_int)
448 def press_key(hex_key_code):
449 """Press a key."""
450 x = INPUT(type=INPUT_KEYBOARD, ki=KEYBDINPUT(wVk=hex_key_code))
451 user32.SendInput(1, ctypes.byref(x), ctypes.sizeof(x))
453 def release_key(hex_key_code):
454 """Release a key."""
455 x = INPUT(
456 type=INPUT_KEYBOARD,
457 ki=KEYBDINPUT(wVk=hex_key_code, dwFlags=KEYEVENTF_KEYUP),
458 )
459 user32.SendInput(1, ctypes.byref(x), ctypes.sizeof(x))
461 # Find and activate microscope window
462 VK_ESC = 0x1B
463 VK_F6 = 0x75
464 VKs = [VK_ESC, VK_F6, VK_F6]
466 for window_info in result:
467 if "Microscope Control" in window_info.title:
468 window = user32.FindWindowW(None, window_info.title)
469 if window:
470 user32.ShowWindow(window, 9) # SW_RESTORE
471 user32.SetForegroundWindow(window)
472 for vk in VKs:
473 press_key(vk)
474 time.sleep(0.05)
475 release_key(vk)
476 break