Coverage for src/pytribeam/GUI/config_ui/editor_controller.py: 0%

178 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2026-06-16 18:30 +0000

1"""Editor controller for configuration UI. 

2 

3This module manages the state and logic for the configuration editor, 

4separated from the UI presentation layer. 

5""" 

6 

7from copy import deepcopy 

8from pathlib import Path 

9from typing import Callable, Dict, Optional, Any, Tuple, List 

10import tkinter as tk 

11 

12import pytribeam.GUI.config_ui.lookup as lut 

13from pytribeam.GUI.config_ui.pipeline_model import PipelineConfig, StepConfig 

14from pytribeam.GUI.config_ui.validator import ConfigValidator 

15 

16 

17class EditorController: 

18 """Controls configuration editor state and operations. 

19 

20 This class manages the pipeline configuration, step selection, 

21 parameter editing, and validation without UI dependencies. 

22 

23 Attributes: 

24 pipeline: Current pipeline configuration 

25 current_step_index: Index of currently selected step (0 for general) 

26 validator: Configuration validator instance 

27 """ 

28 

29 def __init__(self, version: Optional[float] = None): 

30 """Initialize editor controller. 

31 

32 Args: 

33 version: Configuration file version (uses latest if not specified) 

34 """ 

35 self.pipeline: Optional[PipelineConfig] = None 

36 self.current_step_index: int = 0 

37 self._callbacks: Dict[str, Callable] = {} 

38 self._version = version or float(lut.VERSIONS[-1]) 

39 

40 # Initialize validator 

41 self.validator = ConfigValidator() 

42 self.validator.set_version(self._version) 

43 

44 # Track expanded frames for UI state 

45 self.expanded_frames: Dict[str, bool] = {} 

46 

47 def register_callback(self, event: str, callback: Callable): 

48 """Register callback for events. 

49 

50 Args: 

51 event: Event name (e.g., 'pipeline_changed', 'step_selected') 

52 callback: Function to call when event occurs 

53 """ 

54 self._callbacks[event] = callback 

55 

56 def _notify(self, event: str, *args, **kwargs): 

57 """Trigger registered callback. 

58 

59 Args: 

60 event: Event name 

61 *args: Arguments for callback 

62 **kwargs: Keyword arguments for callback 

63 """ 

64 if event in self._callbacks: 

65 try: 

66 self._callbacks[event](*args, **kwargs) 

67 except Exception as e: 

68 print(f"Error in callback '{event}': {e}") 

69 

70 def create_new_pipeline(self, version: Optional[float] = None): 

71 """Create new empty pipeline. 

72 

73 Args: 

74 version: Configuration version (uses default if not specified) 

75 """ 

76 if version is None: 

77 version = self._version 

78 

79 self.pipeline = PipelineConfig.create_new(version=version) 

80 self.current_step_index = 0 

81 self._notify("pipeline_created", self.pipeline) 

82 # self._notify("step_selected", 0, self.pipeline.general) 

83 

84 def load_pipeline(self, yaml_path: Path) -> Tuple[bool, Optional[str]]: 

85 """Load pipeline from YAML file. 

86 

87 Args: 

88 yaml_path: Path to YAML file 

89 

90 Returns: 

91 Tuple of (success, error_message) 

92 """ 

93 try: 

94 self.pipeline = PipelineConfig.from_yaml(yaml_path) 

95 self.current_step_index = 0 

96 self.set_version(self.pipeline.version) 

97 self._notify("step_selected", 0, self.pipeline.general) 

98 self._notify("pipeline_loaded", self.pipeline) 

99 return True, None 

100 except Exception as e: 

101 return False, str(e) 

102 

103 def save_pipeline(self, yaml_path: Path) -> Tuple[bool, Optional[str]]: 

104 """Save pipeline to YAML file. 

105 

106 Args: 

107 yaml_path: Path where YAML should be saved 

108 

109 Returns: 

110 Tuple of (success, error_message) 

111 """ 

112 if self.pipeline is None: 

113 return False, "No pipeline to save" 

114 

115 try: 

116 self.pipeline.to_yaml(yaml_path) 

117 self._notify("pipeline_saved", yaml_path) 

118 return True, None 

119 except Exception as e: 

120 return False, str(e) 

121 

122 def add_step(self, step_type: str) -> StepConfig: 

123 """Add new step to pipeline. 

124 

125 Args: 

126 step_type: Type of step to add 

127 

128 Returns: 

129 Newly created step 

130 """ 

131 if self.pipeline is None: 

132 raise ValueError("No pipeline loaded") 

133 

134 step = self.pipeline.add_step(step_type) 

135 # self._notify("pipeline_changed", self.pipeline) 

136 self._notify("step_added", step) 

137 return step 

138 

139 def remove_step(self, index: int) -> bool: 

140 """Remove step from pipeline. 

141 

142 Args: 

143 index: Index of step to remove 

144 

145 Returns: 

146 True if removed successfully 

147 """ 

148 if self.pipeline is None: 

149 return False 

150 

151 success = self.pipeline.remove_step(index) 

152 if success: 

153 # Adjust current selection if needed 

154 if self.current_step_index == index: 

155 self.current_step_index = max(0, index - 1) 

156 elif self.current_step_index > index: 

157 self.current_step_index -= 1 

158 

159 self._notify("pipeline_changed", self.pipeline) 

160 self._notify("step_removed", index) 

161 return success 

162 

163 def move_step(self, index: int, direction: int) -> bool: 

164 """Move step up or down in pipeline. 

165 

166 Args: 

167 index: Index of step to move 

168 direction: -1 for up, +1 for down 

169 

170 Returns: 

171 True if moved successfully 

172 """ 

173 if self.pipeline is None: 

174 return False 

175 

176 success = self.pipeline.move_step(index, direction) 

177 if success: 

178 # Update current selection if affected 

179 if self.current_step_index == index: 

180 self.current_step_index += direction 

181 elif abs(self.current_step_index - index) == 1: 

182 self.current_step_index -= direction 

183 

184 self._notify("pipeline_changed", self.pipeline) 

185 self._notify("step_moved", index, direction) 

186 return success 

187 

188 def duplicate_step(self, index: int) -> Optional[StepConfig]: 

189 """Duplicate existing step. 

190 

191 Args: 

192 index: Index of step to duplicate 

193 

194 Returns: 

195 Newly created step or None if failed 

196 """ 

197 if self.pipeline is None: 

198 return None 

199 

200 step = self.pipeline.duplicate_step(index) 

201 if step: 

202 self._notify("pipeline_changed", self.pipeline) 

203 self._notify("step_added", step) 

204 return step 

205 

206 def select_step(self, index: int): 

207 """Select step for editing. 

208 

209 Args: 

210 index: Index of step to select (0 for general) 

211 """ 

212 if self.pipeline is None: 

213 return 

214 

215 step = self.pipeline.get_step(index) 

216 if step: 

217 self.current_step_index = index 

218 self._notify("step_selected", index, step) 

219 

220 def get_current_step(self) -> Optional[StepConfig]: 

221 """Get currently selected step. 

222 

223 Returns: 

224 Current step or None 

225 """ 

226 if self.pipeline is None: 

227 return None 

228 return self.pipeline.get_step(self.current_step_index) 

229 

230 def update_parameter(self, path: str, value: Any): 

231 """Update parameter value in current step. 

232 

233 Args: 

234 path: Parameter path (e.g., 'beam/voltage_kv') 

235 value: New value 

236 """ 

237 step = self.get_current_step() 

238 if step: 

239 # Preserve boolean type, convert others to string 

240 param_value = value if isinstance(value, bool) else str(value) 

241 step.set_param(path, param_value) 

242 self._notify("parameter_changed", path, value) 

243 

244 def get_parameter(self, path: str, default: Any = None) -> Any: 

245 """Get parameter value from current step. 

246 

247 Args: 

248 path: Parameter path 

249 default: Default value if not found 

250 

251 Returns: 

252 Parameter value 

253 """ 

254 step = self.get_current_step() 

255 if step: 

256 return step.get_param(path, default) 

257 return default 

258 

259 def validate_structure(self) -> Tuple[bool, str]: 

260 """Validate pipeline structure (names, step count). 

261 

262 Returns: 

263 Tuple of (is_valid, message) 

264 """ 

265 if self.pipeline is None: 

266 return False, "No pipeline loaded" 

267 

268 results = self.validator.validate_pipeline_structure(self.pipeline) 

269 success, summary = ConfigValidator.get_summary(results) 

270 return success, summary 

271 

272 def validate_full(self) -> Tuple[bool, str]: 

273 """Validate full pipeline configuration. 

274 

275 Args: 

276 microscope: Optional microscope connection 

277 

278 Returns: 

279 Tuple of (is_valid, message) 

280 """ 

281 if self.pipeline is None: 

282 return False, "No pipeline loaded" 

283 

284 results = self.validator.validate_pipeline_model(self.pipeline) 

285 success, summary = ConfigValidator.get_summary(results) 

286 self._notify("pipeline_validation_complete", success, summary) 

287 return success, summary 

288 

289 def validate_step(self, index: int, microscope=None) -> Tuple[bool, str]: 

290 """Validate specific step configuration. 

291 

292 Args: 

293 index: Index of step to validate 

294 microscope: Optional microscope connection 

295 

296 Returns: 

297 Tuple of (is_valid, message) 

298 """ 

299 # Make sure pipeline is loaded 

300 if self.pipeline is None: 

301 return False, "No pipeline loaded" 

302 

303 # Get general settings for validation context, making sure they are valid 

304 result = self.validator.validate_general( 

305 self.pipeline.general.get_all_params(flat=False) 

306 ) 

307 if not result.success: 

308 return False, "General configuration is invalid" 

309 general = result.settings 

310 

311 # Get the step of interest 

312 step = self.pipeline.get_step(index) 

313 if step is None: 

314 return False, f"Step at index {index} not found" 

315 

316 step_db = step.get_all_params(flat=False) 

317 result = self.validator.validate_step(microscope, step.name, step_db, general) 

318 success = result.success 

319 message = ( 

320 "Step is valid." if success else f"Step validation failed: {result.message}" 

321 ) 

322 self._notify("step_validation_complete", index, success, message) 

323 return success, message 

324 

325 def validate_general(self) -> Tuple[bool, str]: 

326 """Validate general configuration. 

327 

328 Returns: 

329 Tuple of (is_valid, message) 

330 """ 

331 result = self.validator.validate_general( 

332 self.pipeline.general.get_all_params(flat=False) 

333 ) 

334 success = result.success 

335 message = ( 

336 "General configuration is valid." 

337 if success 

338 else f"General validation failed: {result.message}" 

339 ) 

340 self._notify("step_validation_complete", 0, success, message) 

341 return success, message 

342 

343 def get_step_count(self) -> int: 

344 """Get number of steps in pipeline. 

345 

346 Returns: 

347 Step count (excluding general) 

348 """ 

349 if self.pipeline is None: 

350 return 0 

351 return self.pipeline.get_step_count() 

352 

353 def get_step_names(self) -> List[str]: 

354 """Get list of step names. 

355 

356 Returns: 

357 List of step names 

358 """ 

359 if self.pipeline is None: 

360 return [] 

361 return [step.name for step in self.pipeline.steps] 

362 

363 def rename_step(self, index: int, new_name: str) -> bool: 

364 """Rename a step. 

365 

366 Args: 

367 index: Index of step to rename 

368 new_name: New step name 

369 

370 Returns: 

371 True if renamed successfully 

372 """ 

373 if self.pipeline is None or index == 0: 

374 return False 

375 

376 step = self.pipeline.get_step(index) 

377 if step: 

378 step.name = new_name 

379 step.set_param("step_general/step_name", new_name) 

380 self._notify("pipeline_changed", self.pipeline) 

381 self._notify("step_renamed", index, new_name) 

382 return True 

383 return False 

384 

385 def set_version(self, version: float): 

386 """Set configuration file version and migrate parameters. 

387 

388 This updates the pipeline version and migrates all step parameters 

389 to match the new version's schema (adds new params, removes old ones). 

390 

391 Args: 

392 version: New version number 

393 """ 

394 self._version = version 

395 self.validator.set_version(self._version) 

396 if self.pipeline: 

397 self.pipeline.set_version(version) 

398 self._notify("version_changed", version) 

399 

400 def get_version(self) -> float: 

401 """Get current configuration version. 

402 

403 Returns: 

404 Version number 

405 """ 

406 return self._version 

407 

408 def is_modified(self) -> bool: 

409 """Check if pipeline has unsaved changes. 

410 

411 Returns: 

412 True if there are unsaved changes 

413 """ 

414 # This would need to track changes since last save 

415 # For now, return False (can be implemented later) 

416 return False 

417 

418 def get_pipeline_summary(self) -> Dict[str, Any]: 

419 """Get summary of pipeline configuration. 

420 

421 Returns: 

422 Dictionary with pipeline information 

423 """ 

424 if self.pipeline is None: 

425 return { 

426 "version": self._version, 

427 "step_count": 0, 

428 "has_general": False, 

429 "step_types": [], 

430 } 

431 

432 step_types = [step.step_type for step in self.pipeline.steps] 

433 return { 

434 "version": self.pipeline.version, 

435 "step_count": len(self.pipeline.steps), 

436 "has_general": True, 

437 "step_types": step_types, 

438 "file_path": ( 

439 str(self.pipeline.file_path) if self.pipeline.file_path else None 

440 ), 

441 }