Coverage for src/pytribeam/GUI/common/threading_utils.py: 0%

206 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2026-06-16 18:30 +0000

1"""Thread management utilities for GUI operations. 

2 

3This module provides thread management capabilities including stoppable threads 

4and Windows-specific keyboard event generation for emergency stops. 

5""" 

6 

7import ctypes 

8import inspect 

9import threading 

10import time 

11from collections import namedtuple 

12from ctypes import wintypes 

13from typing import Any, Callable, Dict, Optional, Tuple 

14 

15 

16class StoppableThread(threading.Thread): 

17 """Thread that can be stopped via exception injection. 

18 

19 This thread wrapper allows raising exceptions in a running thread, 

20 which is useful for implementing emergency stops in long-running operations. 

21 

22 Attributes: 

23 result: Dictionary containing 'value' and 'error' from thread execution 

24 """ 

25 

26 def __init__( 

27 self, 

28 target: Callable, 

29 args: Tuple = (), 

30 kwargs: Optional[dict] = None, 

31 name: Optional[str] = None, 

32 ): 

33 """Initialize stoppable thread. 

34 

35 Args: 

36 target: Function to run in thread 

37 args: Positional arguments for target function 

38 kwargs: Keyword arguments for target function 

39 name: Optional thread name for debugging 

40 """ 

41 self._user_target = target 

42 self._result: Dict[str, Any] = {"value": None, "error": None, "stopped": False} 

43 self._thread_id: Optional[int] = None 

44 

45 super().__init__( 

46 target=self._wrapped_target, 

47 args=args, 

48 kwargs=kwargs or {}, 

49 name=name, 

50 ) 

51 

52 self.daemon = False 

53 

54 def _wrapped_target(self, *args, **kwargs): 

55 """Wrap target to capture result/errors.""" 

56 try: 

57 result = self._user_target(*args, **kwargs) 

58 self._result["value"] = result 

59 return result 

60 except BaseException as e: 

61 self._result["error"] = e 

62 self._result["value"] = None 

63 self._result["stopped"] = isinstance(e, (KeyboardInterrupt, SystemExit)) 

64 return None 

65 

66 @property 

67 def result(self) -> Dict[str, Any]: 

68 """Get result dictionary containing 'value' and 'error'.""" 

69 return self._result 

70 

71 def _get_thread_id(self) -> int: 

72 """Get thread ID for exception raising. 

73 

74 Returns: 

75 Thread ID 

76 

77 Raises: 

78 threading.ThreadError: If thread is not active 

79 AssertionError: If thread ID cannot be determined 

80 """ 

81 if not self.is_alive(): 

82 raise threading.ThreadError("the thread is not active") 

83 

84 if self._thread_id is not None: 

85 return self._thread_id 

86 

87 # Look for thread in _active dict 

88 for tid, tobj in threading._active.items(): 

89 if tobj is self: 

90 self._thread_id = tid 

91 return tid 

92 

93 raise AssertionError("could not determine the thread's id") 

94 

95 def raise_exception(self, exc_type: type): 

96 """Raise exception in the context of this thread. 

97 

98 Note: If thread is in a system call, exception may be ignored. 

99 For critical stops, repeatedly call this method until thread exits. 

100 

101 Args: 

102 exc_type: Exception type to raise (e.g., KeyboardInterrupt) 

103 

104 Raises: 

105 TypeError: If exc_type is not a type 

106 ValueError: If thread ID is invalid 

107 SystemError: If exception raising fails 

108 """ 

109 if not inspect.isclass(exc_type): 

110 raise TypeError("Only types can be raised (not instances)") 

111 

112 tid = self._get_thread_id() 

113 res = ctypes.pythonapi.PyThreadState_SetAsyncExc( 

114 ctypes.c_long(tid), ctypes.py_object(exc_type) 

115 ) 

116 

117 if res == 0: 

118 raise ValueError("invalid thread id") 

119 elif res != 1: 

120 # Revert the effect if multiple threads were affected 

121 ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), None) 

122 raise SystemError("PyThreadState_SetAsyncExc failed") 

123 

124 

125class ThreadManager: 

126 """Manages lifecycle of multiple threads. 

127 

128 Provides centralized thread management with named thread tracking 

129 and emergency stop capabilities. 

130 """ 

131 

132 def __init__(self): 

133 """Initialize thread manager.""" 

134 self._threads: Dict[str, StoppableThread] = {} 

135 

136 def run_async( 

137 self, 

138 name: str, 

139 target: Callable, 

140 args: Tuple = (), 

141 kwargs: Optional[dict] = None, 

142 ) -> StoppableThread: 

143 """Run function in managed thread. 

144 

145 Args: 

146 name: Unique name for this thread 

147 target: Function to run 

148 args: Positional arguments 

149 kwargs: Keyword arguments 

150 

151 Returns: 

152 Started StoppableThread instance 

153 """ 

154 thread = StoppableThread(target=target, args=args, kwargs=kwargs, name=name) 

155 self._threads[name] = thread 

156 thread.start() 

157 return thread 

158 

159 def get_thread(self, name: str) -> Optional[StoppableThread]: 

160 """Get thread by name. 

161 

162 Args: 

163 name: Thread name 

164 

165 Returns: 

166 Thread instance or None if not found 

167 """ 

168 return self._threads.get(name) 

169 

170 def is_running(self, name: str) -> bool: 

171 """Check if named thread is still running. 

172 

173 Args: 

174 name: Thread name 

175 

176 Returns: 

177 True if thread exists and is alive 

178 """ 

179 thread = self._threads.get(name) 

180 return thread is not None and thread.is_alive() 

181 

182 def stop_thread(self, name: str, exc_type: type = KeyboardInterrupt): 

183 """Stop specific thread by raising exception. 

184 

185 Args: 

186 name: Thread name 

187 exc_type: Exception type to raise 

188 """ 

189 thread = self._threads.get(name) 

190 if thread and thread.is_alive(): 

191 thread.raise_exception(exc_type) 

192 

193 def stop_all(self, exc_type: type = KeyboardInterrupt): 

194 """Stop all managed threads. 

195 

196 Args: 

197 exc_type: Exception type to raise in all threads 

198 """ 

199 for thread in self._threads.values(): 

200 if thread.is_alive(): 

201 try: 

202 thread.raise_exception(exc_type) 

203 except (ValueError, SystemError): 

204 # Thread may have already stopped 

205 pass 

206 

207 def wait_for_thread(self, name: str, timeout: Optional[float] = None) -> bool: 

208 """Wait for specific thread to complete. 

209 

210 Args: 

211 name: Thread name 

212 timeout: Maximum time to wait in seconds 

213 

214 Returns: 

215 True if thread completed, False if timeout occurred 

216 """ 

217 thread = self._threads.get(name) 

218 if thread is None: 

219 return True 

220 thread.join(timeout) 

221 return not thread.is_alive() 

222 

223 def cleanup(self): 

224 """Clean up stopped threads from tracking.""" 

225 self._threads = { 

226 name: thread for name, thread in self._threads.items() if thread.is_alive() 

227 } 

228 

229 

230class TextRedirector: 

231 """Redirect stdout/stderr to a Tkinter Text widget. 

232 

233 This allows capturing print statements and displaying them in the GUI 

234 while optionally logging to a file. Thread-safe for use with background threads. 

235 """ 

236 

237 def __init__(self, widget, tag: str = "stdout", log_path: Optional[str] = None): 

238 """Initialize text redirector. 

239 

240 Args: 

241 widget: Tkinter Text widget to write to 

242 tag: Tag for text styling (e.g., 'stdout', 'stderr') 

243 log_path: Optional file path to also log output 

244 """ 

245 self.widget = widget 

246 self.tag = tag 

247 self.log_path = log_path 

248 self._main_thread_id = threading.current_thread().ident 

249 

250 if self.log_path is not None: 

251 import os 

252 

253 if not os.path.exists(self.log_path): 

254 os.makedirs(os.path.dirname(self.log_path), exist_ok=True) 

255 with open(self.log_path, "w") as f: 

256 f.write(time.strftime("%Y-%m-%d %H:%M:%S") + "\n") 

257 

258 def write(self, text: str): 

259 """Write text to widget and optional log file. 

260 

261 Args: 

262 text: Text to write 

263 """ 

264 import tkinter as tk 

265 

266 # Write to log file FIRST to ensure it always happens 

267 # even if widget access fails 

268 if self.log_path is not None: 

269 try: 

270 with open(self.log_path, "a") as f: 

271 f.write(text) 

272 except Exception: 

273 # Ignore file write errors to avoid breaking stdout 

274 pass 

275 

276 # Write to widget using thread-safe approach 

277 # If we're on the main thread, write directly 

278 # If we're on a background thread, schedule on main thread 

279 if threading.current_thread().ident == self._main_thread_id: 

280 self._write_to_widget(text) 

281 else: 

282 # Schedule widget update on main thread using after() 

283 try: 

284 self.widget.after(0, self._write_to_widget, text) 

285 except Exception: 

286 # If after() fails (widget destroyed), fall back to direct write 

287 self._write_to_widget(text) 

288 

289 def _write_to_widget(self, text: str): 

290 """Internal method to write text to widget. 

291 

292 Args: 

293 text: Text to write 

294 """ 

295 import tkinter as tk 

296 

297 # Protect ALL widget access in try/except 

298 try: 

299 # Check if we should autoscroll 

300 autoscroll = getattr(self.widget, "autoscroll", True) 

301 if autoscroll: 

302 bottom = self.widget.yview()[1] 

303 

304 # Write to widget 

305 self.widget.config(state=tk.NORMAL) 

306 self.widget.insert(tk.END, text, (self.tag,)) 

307 self.widget.config(state=tk.DISABLED) 

308 

309 # Autoscroll if at bottom 

310 if autoscroll and bottom == 1: 

311 self.widget.see(tk.END) 

312 

313 # Force GUI update to show text immediately 

314 self.widget.update_idletasks() 

315 except tk.TclError: 

316 # Widget may have been destroyed or is not accessible 

317 # This can happen when writing from a background thread 

318 pass 

319 except Exception: 

320 # Catch any other widget-related errors to prevent 

321 # breaking stdout/stderr redirection 

322 pass 

323 

324 def flush(self): 

325 """Flush output (required for file-like interface).""" 

326 pass 

327 

328 

329def generate_escape_keypress(): 

330 """Generate escape keypress for Windows microscope control. 

331 

332 This function finds the microscope control window and sends escape 

333 and F6 key presses to stop ongoing operations. 

334 

335 Note: This is Windows-specific and uses ctypes to interact with Win32 API. 

336 

337 Raises: 

338 OSError: If on non-Windows platform 

339 """ 

340 import sys 

341 

342 if sys.platform != "win32": 

343 raise OSError("generate_escape_keypress only works on Windows") 

344 

345 user32 = ctypes.WinDLL("user32", use_last_error=True) 

346 

347 # Window enumeration setup 

348 def check_zero(result, func, args): 

349 if not result: 

350 err = ctypes.get_last_error() 

351 if err: 

352 raise ctypes.WinError(err) 

353 return args 

354 

355 if not hasattr(wintypes, "LPDWORD"): 

356 wintypes.LPDWORD = ctypes.POINTER(wintypes.DWORD) 

357 

358 WindowInfo = namedtuple("WindowInfo", "pid title") 

359 

360 WNDENUMPROC = ctypes.WINFUNCTYPE( 

361 wintypes.BOOL, 

362 wintypes.HWND, 

363 wintypes.LPARAM, 

364 ) 

365 

366 user32.EnumWindows.errcheck = check_zero 

367 user32.EnumWindows.argtypes = (WNDENUMPROC, wintypes.LPARAM) 

368 user32.IsWindowVisible.argtypes = (wintypes.HWND,) 

369 user32.GetWindowThreadProcessId.restype = wintypes.DWORD 

370 user32.GetWindowThreadProcessId.argtypes = (wintypes.HWND, wintypes.LPDWORD) 

371 user32.GetWindowTextLengthW.errcheck = check_zero 

372 user32.GetWindowTextLengthW.argtypes = (wintypes.HWND,) 

373 user32.GetWindowTextW.errcheck = check_zero 

374 user32.GetWindowTextW.argtypes = (wintypes.HWND, wintypes.LPWSTR, ctypes.c_int) 

375 

376 # Enumerate windows 

377 result = [] 

378 

379 @WNDENUMPROC 

380 def enum_proc(hWnd, lParam): 

381 if user32.IsWindowVisible(hWnd): 

382 pid = wintypes.DWORD() 

383 user32.GetWindowThreadProcessId(hWnd, ctypes.byref(pid)) 

384 length = user32.GetWindowTextLengthW(hWnd) + 1 

385 title = ctypes.create_unicode_buffer(length) 

386 user32.GetWindowTextW(hWnd, title, length) 

387 result.append(WindowInfo(pid.value, title.value)) 

388 return True 

389 

390 user32.EnumWindows(enum_proc, 0) 

391 

392 # Input simulation setup 

393 INPUT_KEYBOARD = 1 

394 KEYEVENTF_KEYUP = 0x0002 

395 KEYEVENTF_UNICODE = 0x0004 

396 MAPVK_VK_TO_VSC = 0 

397 

398 wintypes.ULONG_PTR = wintypes.WPARAM 

399 

400 class MOUSEINPUT(ctypes.Structure): 

401 _fields_ = ( 

402 ("dx", wintypes.LONG), 

403 ("dy", wintypes.LONG), 

404 ("mouseData", wintypes.DWORD), 

405 ("dwFlags", wintypes.DWORD), 

406 ("time", wintypes.DWORD), 

407 ("dwExtraInfo", wintypes.ULONG_PTR), 

408 ) 

409 

410 class KEYBDINPUT(ctypes.Structure): 

411 _fields_ = ( 

412 ("wVk", wintypes.WORD), 

413 ("wScan", wintypes.WORD), 

414 ("dwFlags", wintypes.DWORD), 

415 ("time", wintypes.DWORD), 

416 ("dwExtraInfo", wintypes.ULONG_PTR), 

417 ) 

418 

419 def __init__(self, *args, **kwds): 

420 super(KEYBDINPUT, self).__init__(*args, **kwds) 

421 if not self.dwFlags & KEYEVENTF_UNICODE: 

422 self.wScan = user32.MapVirtualKeyExW(self.wVk, MAPVK_VK_TO_VSC, 0) 

423 

424 class HARDWAREINPUT(ctypes.Structure): 

425 _fields_ = ( 

426 ("uMsg", wintypes.DWORD), 

427 ("wParamL", wintypes.WORD), 

428 ("wParamH", wintypes.WORD), 

429 ) 

430 

431 class INPUT(ctypes.Structure): 

432 class _INPUT(ctypes.Union): 

433 _fields_ = (("ki", KEYBDINPUT), ("mi", MOUSEINPUT), ("hi", HARDWAREINPUT)) 

434 

435 _anonymous_ = ("_input",) 

436 _fields_ = (("type", wintypes.DWORD), ("_input", _INPUT)) 

437 

438 LPINPUT = ctypes.POINTER(INPUT) 

439 

440 def _check_count(result, func, args): 

441 if result == 0: 

442 raise ctypes.WinError(ctypes.get_last_error()) 

443 return args 

444 

445 user32.SendInput.errcheck = _check_count 

446 user32.SendInput.argtypes = (wintypes.UINT, LPINPUT, ctypes.c_int) 

447 

448 def press_key(hex_key_code): 

449 """Press a key.""" 

450 x = INPUT(type=INPUT_KEYBOARD, ki=KEYBDINPUT(wVk=hex_key_code)) 

451 user32.SendInput(1, ctypes.byref(x), ctypes.sizeof(x)) 

452 

453 def release_key(hex_key_code): 

454 """Release a key.""" 

455 x = INPUT( 

456 type=INPUT_KEYBOARD, 

457 ki=KEYBDINPUT(wVk=hex_key_code, dwFlags=KEYEVENTF_KEYUP), 

458 ) 

459 user32.SendInput(1, ctypes.byref(x), ctypes.sizeof(x)) 

460 

461 # Find and activate microscope window 

462 VK_ESC = 0x1B 

463 VK_F6 = 0x75 

464 VKs = [VK_ESC, VK_F6, VK_F6] 

465 

466 for window_info in result: 

467 if "Microscope Control" in window_info.title: 

468 window = user32.FindWindowW(None, window_info.title) 

469 if window: 

470 user32.ShowWindow(window, 9) # SW_RESTORE 

471 user32.SetForegroundWindow(window) 

472 for vk in VKs: 

473 press_key(vk) 

474 time.sleep(0.05) 

475 release_key(vk) 

476 break