Coverage for src/pytribeam/utilities.py: 0%

228 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2026-06-16 18:30 +0000

1#!/usr/bin/python3 

2""" 

3Utilities Module 

4================ 

5 

6This module contains various utility functions and decorators for managing and controlling the microscope, handling YAML files, and performing other common tasks. 

7 

8Functions 

9--------- 

10beam_type(beam) -> property 

11 Return the beam property object as ion and electron beams have the same internal hierarchy. 

12 

13connect_microscope(microscope: tbt.Microscope, quiet_output: bool = True, connection_host: str = None, connection_port: int = None) -> bool 

14 Connect to the microscope with the option to suppress printout. 

15 

16dict_to_yml(db: dict, file_path: Path) -> Path 

17 Convert a dictionary to a YAML file. 

18 

19disconnect_microscope(microscope: tbt.Microscope, quiet_output: bool = True) -> bool 

20 Disconnect from the microscope with the option to suppress printout. 

21 

22general_settings(exp_settings: dict, yml_format: tbt.YMLFormat) -> dict 

23 Grab general experiment settings from a .yml file and return them as a dictionary. 

24 

25step_type(settings: dict, yml_format: tbt.YMLFormat) -> tbt.StepType 

26 Determine the step type for a specific step settings dictionary. 

27 

28in_interval(val: float, limit: tbt.Limit, type: tbt.IntervalType) -> bool 

29 Test whether a value is within an interval, with the interval type defined by an enumerated IntervalType. 

30 

31gen_dict_extract(key, var) 

32 Extract values from a nested dictionary by key. 

33 

34nested_dictionary_location(d: dict, key: str, value: Any) -> List[str] 

35 Find the nested location of a key-value pair in a dictionary. 

36 

37nested_find_key_value_pair(d: dict, key: str, value: Any) -> List[str] 

38 Find a key-value pair in a nested dictionary. 

39 

40_flatten(dictionary: dict) -> dict 

41 Flatten a dictionary using pandas. 

42 

43none_value_dictionary(dictionary: dict) -> bool 

44 Check if all values in a dictionary are None. 

45 

46nostdout() 

47 Create a dummy file to suppress output. 

48 

49step_count(exp_settings: dict, yml_format: tbt.YMLFormatVersion) -> int 

50 Determine the maximum step number from a settings dictionary. 

51 

52step_settings(exp_settings: dict, step_number_key: str, step_number_val: int, yml_format: tbt.YMLFormatVersion) -> Tuple[str, dict] 

53 Grab specific step settings from an experimental dictionary and return them as a dictionary along with the user-defined step name. 

54 

55valid_microscope_connection(host: str, port: str) -> bool 

56 Determine if a microscope connection can be made. 

57 

58enable_external_device(oem: tbt.ExternalDeviceOEM) -> bool 

59 Determine whether to enable external device control. 

60 

61valid_enum_entry(obj: Any, check_type: Enum) -> bool 

62 Determine if an object is a member of an Enum class. 

63 

64yml_format(version: float) -> tbt.YMLFormatVersion 

65 Return the YML file format for a given version. 

66 

67yml_to_dict(*, yml_path_file: Path, version: float, required_keys: Tuple[str, ...]) -> Dict 

68 Convert a YAML file to a dictionary. 

69 

70yml_version(file: Path, key_name="config_file_version") -> float 

71 Return the version of a YAML file if the proper key exists. 

72 

73yes_no(question) -> bool 

74 Simple Yes/No function. 

75 

76remove_directory(directory: Path) 

77 Recursively remove a directory. 

78 

79split_list(data: List, chunk_size: int) -> List 

80 Split a list into equal-sized chunks. 

81 

82tabular_list(data: List, num_columns: int = Constants.default_column_count, column_width: int = Constants.default_column_width) -> str 

83 Format a list into a tabular string. 

84 

85Decorators 

86---------- 

87hardware_movement(func) 

88 Decorator to run a function only when hardware testing is enabled. 

89 

90run_on_standalone_machine(func) 

91 Decorator to run a function only on a standalone machine. 

92 

93run_on_microscope_machine(func) 

94 Decorator to run a function only on a microscope machine. 

95""" 

96 

97# Default python modules 

98from pathlib import Path 

99from typing import Dict, Tuple, Any, List 

100from enum import Enum 

101import platform 

102import pytest 

103from functools import singledispatch 

104import shutil 

105 

106# # Autoscript modules 

107import yaml 

108import contextlib 

109import sys 

110from pandas import json_normalize 

111 

112# # # 3rd party module 

113# from schema import Schema, And, Use, Optional, SchemaError 

114 

115# # Local scripts 

116import pytribeam.types as tbt 

117 

118# import pytribeam.constants as cs 

119from pytribeam.constants import Constants 

120 

121 

122@singledispatch 

123def beam_type(beam) -> property: 

124 """ 

125 Return the beam property object as ion and electron beams have the same internal hierarchy. 

126 

127 Parameters 

128 ---------- 

129 beam : Any 

130 The beam object. 

131 

132 Returns 

133 ------- 

134 property 

135 The beam property object. 

136 

137 Raises 

138 ------ 

139 NotImplementedError 

140 If the beam type is not implemented. 

141 """ 

142 _ = beam # no operation 

143 raise NotImplementedError() 

144 

145 

146@beam_type.register 

147def _(beam: tbt.ElectronBeam, microscope: tbt.Microscope) -> property: 

148 """ 

149 Return the electron beam property object. 

150 

151 Parameters 

152 ---------- 

153 beam : tbt.ElectronBeam 

154 The electron beam object. 

155 microscope : tbt.Microscope 

156 The microscope object. 

157 

158 Returns 

159 ------- 

160 property 

161 The electron beam property object. 

162 """ 

163 return microscope.beams.electron_beam 

164 

165 

166@beam_type.register 

167def _(beam: tbt.IonBeam, microscope: tbt.Microscope) -> property: 

168 """ 

169 Return the ion beam property object. 

170 

171 Parameters 

172 ---------- 

173 beam : tbt.IonBeam 

174 The ion beam object. 

175 microscope : tbt.Microscope 

176 The microscope object. 

177 

178 Returns 

179 ------- 

180 property 

181 The ion beam property object. 

182 """ 

183 return microscope.beams.ion_beam 

184 

185 

186def connect_microscope( 

187 microscope: tbt.Microscope, 

188 quiet_output: bool = True, 

189 connection_host: str = None, 

190 connection_port: int = None, 

191): 

192 """ 

193 Connect to the microscope with the option to suppress printout. 

194 

195 Parameters 

196 ---------- 

197 microscope : tbt.Microscope 

198 The microscope object to connect. 

199 quiet_output : bool, optional 

200 Whether to suppress printout (default is True). 

201 connection_host : str, optional 

202 The connection host (default is None). 

203 connection_port : int, optional 

204 The connection port (default is None). 

205 

206 Returns 

207 ------- 

208 bool 

209 True if the connection is successful. 

210 

211 Raises 

212 ------ 

213 ConnectionError 

214 If the connection fails. 

215 """ 

216 

217 # TODO clean up inner function 

218 def connect( 

219 microscope: tbt.Microscope, 

220 connection_host: str = None, 

221 connection_port: int = None, 

222 ) -> bool: 

223 if connection_port is not None: 

224 microscope.connect(connection_host, connection_port) 

225 elif connection_host is not None: 

226 microscope.connect(connection_host) 

227 

228 else: 

229 microscope.connect() 

230 

231 if quiet_output: 

232 with nostdout(): 

233 connect( 

234 microscope=microscope, 

235 connection_host=connection_host, 

236 connection_port=connection_port, 

237 ) 

238 else: 

239 connect( 

240 microscope=microscope, 

241 connection_host=connection_host, 

242 connection_port=connection_port, 

243 ) 

244 

245 if microscope.server_host is not None: 

246 return True 

247 else: 

248 raise ConnectionError( 

249 f"Connection failed with connection_host of '{connection_host}' and connection_port of '{connection_port}' microscope not connected." 

250 ) 

251 

252 

253def dict_to_yml(db: dict, file_path: Path) -> Path: 

254 """ 

255 Convert a dictionary to a YAML file. 

256 

257 Parameters 

258 ---------- 

259 db : dict 

260 The dictionary to convert. 

261 file_path : Path 

262 The path to save the YAML file. 

263 

264 Returns 

265 ------- 

266 Path 

267 The path to the saved YAML file. 

268 """ 

269 with open(file_path, "w", encoding="utf-8") as out_file: 

270 yaml.dump( 

271 db, 

272 out_file, 

273 default_flow_style=False, 

274 sort_keys=False, 

275 ) 

276 

277 return file_path 

278 

279 

280def disconnect_microscope( 

281 microscope: tbt.Microscope, 

282 quiet_output: bool = True, 

283): 

284 """ 

285 Disconnect from the microscope with the option to suppress printout. 

286 

287 Parameters 

288 ---------- 

289 microscope : tbt.Microscope 

290 The microscope object to disconnect. 

291 quiet_output : bool, optional 

292 Whether to suppress printout (default is True). 

293 

294 Returns 

295 ------- 

296 bool 

297 True if the disconnection is successful. 

298 

299 Raises 

300 ------ 

301 ConnectionError 

302 If the disconnection fails. 

303 """ 

304 if quiet_output: 

305 with nostdout(): 

306 microscope.disconnect() 

307 else: 

308 microscope.disconnect() 

309 

310 if microscope.server_host is None: 

311 return True 

312 else: 

313 raise ConnectionError("Disconnection failed, microscope still connected") 

314 

315 

316def general_settings(exp_settings: dict, yml_format: tbt.YMLFormat) -> dict: 

317 """ 

318 Grab general experiment settings from a .yml file and return them as a dictionary. 

319 

320 Parameters 

321 ---------- 

322 exp_settings : dict 

323 The experiment settings dictionary. 

324 yml_format : tbt.YMLFormat 

325 The YAML format version. 

326 

327 Returns 

328 ------- 

329 dict 

330 The general experiment settings as a dictionary. 

331 """ 

332 general_key = yml_format.general_section_key 

333 return exp_settings[general_key] 

334 

335 

336def step_type(settings: dict, yml_format: tbt.YMLFormat) -> tbt.StepType: 

337 """ 

338 Determine the step type for a specific step settings dictionary. 

339 

340 Parameters 

341 ---------- 

342 settings : dict 

343 The step settings dictionary. 

344 yml_format : tbt.YMLFormat 

345 The YAML format version. 

346 

347 Returns 

348 ------- 

349 tbt.StepType 

350 The step type. 

351 """ 

352 step_type = tbt.StepType( 

353 settings[yml_format.step_general_key][yml_format.step_type_key] 

354 ) 

355 

356 return step_type 

357 

358 

359def in_interval(val: float, limit: tbt.Limit, type: tbt.IntervalType) -> bool: 

360 """ 

361 Test whether a value is within an interval, with the interval type defined by an enumerated IntervalType. 

362 

363 Parameters 

364 ---------- 

365 val : float 

366 The input value to be compared against min and max. 

367 limit : tbt.Limit 

368 The bounds of the interval. 

369 type : tbt.IntervalType 

370 The type of interval. 

371 

372 Returns 

373 ------- 

374 bool 

375 True if within the interval, False otherwise. 

376 """ 

377 if type == tbt.IntervalType.OPEN: 

378 return (val > limit.min) and (val < limit.max) 

379 if type == tbt.IntervalType.CLOSED: 

380 return (val >= limit.min) and (val <= limit.max) 

381 if type == tbt.IntervalType.LEFT_OPEN: 

382 return (val > limit.min) and (val <= limit.max) 

383 if type == tbt.IntervalType.RIGHT_OPEN: 

384 return (val >= limit.min) and (val < limit.max) 

385 

386 

387def gen_dict_extract(key, var): 

388 """ 

389 Extract values from a nested dictionary by key. 

390 

391 Parameters 

392 ---------- 

393 key : str 

394 The key to search for. 

395 var : dict 

396 The nested dictionary to search. 

397 

398 Yields 

399 ------ 

400 Any 

401 The values associated with the specified key. 

402 """ 

403 if hasattr(var, "items"): 

404 for k, v in var.items(): 

405 if k == key: 

406 yield v 

407 if isinstance(v, dict): 

408 for result in gen_dict_extract(key, v): 

409 yield result 

410 elif isinstance(v, list): 

411 for d in v: 

412 for result in gen_dict_extract(key, d): 

413 yield result 

414 

415 

416def nested_dictionary_location(d: dict, key: str, value: Any) -> List[str]: 

417 """ 

418 Find the nested location of a key-value pair in a dictionary. 

419 

420 This function returns a list of key values from the highest to the lowest level of nested dictionaries. 

421 

422 Parameters 

423 ---------- 

424 d : dict 

425 The dictionary to search. 

426 key : str 

427 The key to search for. 

428 value : Any 

429 The value to search for. 

430 

431 Returns 

432 ------- 

433 List[str] 

434 The nested location of the key-value pair. 

435 

436 Raises 

437 ------ 

438 KeyError 

439 If the key-value pair is not found in the dictionary. 

440 """ 

441 nesting = nested_find_key_value_pair(d=d, key=key, value=value) 

442 if nesting is None: 

443 raise KeyError( 

444 f'Key : value pair of "{key} : {value}" not found in the provided dictionary.' 

445 ) 

446 return nesting 

447 

448 

449def nested_find_key_value_pair(d: dict, key: str, value: Any) -> List[str]: 

450 """ 

451 Find a key-value pair in a nested dictionary. 

452 

453 This function returns a list of key values from the highest to the lowest level of nested dictionaries. 

454 

455 Parameters 

456 ---------- 

457 d : dict 

458 The dictionary to search. 

459 key : str 

460 The key to search for. 

461 value : Any 

462 The value to search for. 

463 

464 Returns 

465 ------- 

466 List[str] 

467 The nested location of the key-value pair. 

468 """ 

469 for k, v in d.items(): 

470 if k == key: 

471 if v == value: 

472 return [k] 

473 if isinstance(v, dict): 

474 p = nested_find_key_value_pair(v, key, value) 

475 if p: 

476 return [k] + p 

477 

478 

479def _flatten(dictionary: dict) -> dict: 

480 """ 

481 Flatten a dictionary using pandas. 

482 

483 This function flattens a nested dictionary using pandas, which can be slow on large dictionaries. 

484 From https://stackoverflow.com/questions/6027558/flatten-nested-dictionaries-compressing-keys 

485 

486 Parameters 

487 ---------- 

488 dictionary : dict 

489 The dictionary to flatten. 

490 

491 Returns 

492 ------- 

493 dict 

494 The flattened dictionary. 

495 """ 

496 data_frame = json_normalize(dictionary, sep="_") 

497 db_flat = data_frame.to_dict(orient="records")[0] 

498 return db_flat 

499 

500 

501def none_value_dictionary(dictionary: dict) -> bool: 

502 """ 

503 Check if all values in a dictionary are None. 

504 

505 This function returns True if all values in the dictionary are None, and False otherwise. 

506 

507 Parameters 

508 ---------- 

509 dictionary : dict 

510 The dictionary to check. 

511 

512 Returns 

513 ------- 

514 bool 

515 True if all values in the dictionary are None, False otherwise. 

516 """ 

517 # flatten the dictionary first 

518 db_flat = _flatten(dictionary) 

519 return all([v is None for v in db_flat.values()]) 

520 

521 

522@contextlib.contextmanager 

523def nostdout(): 

524 """ 

525 Create a dummy file to suppress output. 

526 

527 This function creates a dummy file to suppress output. 

528 

529 Yields 

530 ------ 

531 None 

532 """ 

533 save_stdout = sys.stdout 

534 sys.stdout = tbt.DummyFile() 

535 try: 

536 yield 

537 finally: 

538 # Always restore stdout, even if KeyboardInterrupt or other exceptions occur 

539 sys.stdout = save_stdout 

540 

541 

542def step_count( 

543 exp_settings: dict, 

544 yml_format: tbt.YMLFormatVersion, 

545): 

546 """ 

547 Determine the maximum step number from a settings dictionary. 

548 

549 This function determines the maximum step number from a settings dictionary, as specified by the step_number_key. 

550 

551 Parameters 

552 ---------- 

553 exp_settings : dict 

554 The experiment settings dictionary. 

555 yml_format : tbt.YMLFormatVersion 

556 The YAML format version. 

557 

558 Returns 

559 ------- 

560 int 

561 The maximum step number. 

562 

563 Raises 

564 ------ 

565 ValueError 

566 If the number of steps found does not match the expected step count. 

567 """ 

568 

569 step_number_key = yml_format.step_number_key 

570 non_step_sections = yml_format.non_step_section_count 

571 

572 # make sure dict from yml has correct section count 

573 # (steps should all be in one section) 

574 total_sections = len(exp_settings) 

575 if total_sections != non_step_sections + 1: 

576 raise ValueError( 

577 f"Invalid .yml file, {total_sections} sections were found but the input .yml should have {non_step_sections + 1} sections. Please verify that all top-level keys in the .yml have unique strings and that all steps are contained in a single top-level section." 

578 ) 

579 

580 expected_step_count = exp_settings[yml_format.general_section_key][ 

581 yml_format.step_count_key 

582 ] 

583 

584 found_step_count = 0 

585 while True: 

586 try: 

587 nested_dictionary_location( 

588 d=exp_settings, 

589 key=step_number_key, 

590 value=found_step_count + 1, 

591 ) 

592 except KeyError: 

593 break 

594 found_step_count += 1 

595 

596 # validate number of steps found with steps read by YAML loader 

597 # TODO YAML safeloader will ignore duplicate top level keys, so this check relies on unique step numbers in ascending order (no gaps) to be found. 

598 

599 if expected_step_count != found_step_count: 

600 raise ValueError( 

601 f"Invalid .yml file, {found_step_count} steps were found but the input .yml should have {expected_step_count} steps from the general setting key '{yml_format.step_count_key}' within the '{yml_format.general_section_key}' section. Please verify that all step_name keys in the .yml have unique strings and that step numbers are continuously-increasing positive integers starting at 1." 

602 ) 

603 

604 return found_step_count 

605 

606 

607def step_settings( 

608 exp_settings: dict, 

609 step_number_key: str, 

610 step_number_val: int, 

611 yml_format: tbt.YMLFormatVersion, 

612) -> Tuple[str, dict]: 

613 """ 

614 Grab specific step settings from an experimental dictionary and return them as a dictionary along with the user-defined step name. 

615 

616 Parameters 

617 ---------- 

618 exp_settings : dict 

619 The experiment settings dictionary. 

620 step_number_key : str 

621 The key for the step number. 

622 step_number_val : int 

623 The value for the step number. 

624 yml_format : tbt.YMLFormatVersion 

625 The YAML format version. 

626 

627 Returns 

628 ------- 

629 Tuple[str, dict] 

630 The step name and the step settings dictionary. 

631 """ 

632 

633 nested_locations = nested_dictionary_location( 

634 d=exp_settings, 

635 key=step_number_key, 

636 value=step_number_val, 

637 ) 

638 ### top level dictionary key name is first index, need key name nested within it (second level, index = 1) 

639 step_name = nested_locations[1] 

640 step_section_key = yml_format.step_section_key 

641 return step_name, exp_settings[step_section_key][step_name] 

642 

643 

644def valid_microscope_connection(host: str, port: str) -> bool: 

645 """ 

646 Determine if a microscope connection can be made. 

647 

648 This function checks if a microscope connection can be made and disconnects if a connection can be made. 

649 

650 Parameters 

651 ---------- 

652 host : str 

653 The connection host. 

654 port : str 

655 The connection port. 

656 

657 Returns 

658 ------- 

659 bool 

660 True if the connection can be made, False otherwise. 

661 """ 

662 microscope = tbt.Microscope() 

663 if connect_microscope( 

664 microscope=microscope, 

665 quiet_output=True, 

666 connection_host=host, 

667 connection_port=port, 

668 ): 

669 if disconnect_microscope( 

670 microscope=microscope, 

671 quiet_output=True, 

672 ): 

673 return True 

674 return False 

675 

676 

677def enable_external_device(oem: tbt.ExternalDeviceOEM) -> bool: 

678 """ 

679 Determine whether to enable external device control. 

680 

681 This function checks if the external device control should be enabled based on the OEM. 

682 

683 Parameters 

684 ---------- 

685 oem : tbt.ExternalDeviceOEM 

686 The OEM of the external device. 

687 

688 Returns 

689 ------- 

690 bool 

691 True if the external device control should be enabled, False otherwise. 

692 

693 Raises 

694 ------ 

695 NotImplementedError 

696 If the OEM type is unsupported. 

697 """ 

698 if not isinstance(oem, tbt.ExternalDeviceOEM): 

699 raise NotImplementedError( 

700 f"Unsupported type of {type(oem)}, only 'ExternalDeviceOEM' types are supported." 

701 ) 

702 if oem != tbt.ExternalDeviceOEM.NONE: 

703 return True 

704 return False 

705 

706 

707def valid_enum_entry(obj: Any, check_type: Enum) -> bool: 

708 """ 

709 Determine if an object is a member of an Enum class. 

710 

711 This function checks if an object is a member of an Enum class. 

712 

713 Parameters 

714 ---------- 

715 obj : Any 

716 The object to check. 

717 check_type : Enum 

718 The Enum class to check against. 

719 

720 Returns 

721 ------- 

722 bool 

723 True if the object is a member of the Enum class, False otherwise. 

724 """ 

725 return obj in check_type._value2member_map_ 

726 

727 

728def yml_format(version: float) -> tbt.YMLFormatVersion: 

729 """ 

730 Return the YML file format for a given version. 

731 

732 This function returns the YML file format for a given version. 

733 

734 Parameters 

735 ---------- 

736 version : float 

737 The version of the YML file. 

738 

739 Returns 

740 ------- 

741 tbt.YMLFormatVersion 

742 The YML file format for the given version. 

743 

744 Raises 

745 ------ 

746 NotImplementedError 

747 If the YML file version is unsupported. 

748 """ 

749 supported_versions = [file.version for file in tbt.YMLFormatVersion] 

750 if not version in supported_versions: 

751 raise NotImplementedError( 

752 f'Unsupported YML file version for version "{version}". Valid formats include: {[i.value for i in tbt.YMLFormatVersion]}' 

753 ) 

754 yml_file_idx = supported_versions.index(version) 

755 yml_format = list(tbt.YMLFormatVersion)[yml_file_idx] 

756 return yml_format 

757 

758 

759def yml_to_dict( 

760 *, yml_path_file: Path, version: float, required_keys: Tuple[str, ...] 

761) -> Dict: 

762 """ 

763 Convert a YAML file to a dictionary. 

764 

765 This function reads a YAML file and returns the result as a dictionary. 

766 

767 Parameters 

768 ---------- 

769 yml_path_file : Path 

770 The fully pathed location to the input file. 

771 version : float 

772 The version of the YAML file in x.y format. 

773 required_keys : Tuple[str, ...] 

774 The key(s) that must be in the YAML file for conversion to a dictionary to occur. 

775 

776 Returns 

777 ------- 

778 dict 

779 The YAML file represented as a dictionary. 

780 

781 Raises 

782 ------ 

783 TypeError 

784 If the file type is unsupported. 

785 OSError 

786 If the YAML file cannot be opened or decoded. 

787 KeyError 

788 If the required keys are not found in the YAML file. 

789 ValueError 

790 If the version specified in the file does not match the requested version. 

791 """ 

792 

793 # Compared to the lower() method, the casefold() method is stronger. 

794 # It will convert more characters into lower case, and will find more 

795 # matches on comparison of two strings that are both are converted 

796 # using the casefold() method. 

797 file_type = yml_path_file.suffix.casefold() 

798 

799 supported_types = (".yaml", ".yml") 

800 

801 if file_type not in supported_types: 

802 raise TypeError("Only file types .yaml, and .yml are supported.") 

803 

804 try: 

805 with open(file=yml_path_file, mode="r", encoding="utf-8") as stream: 

806 # See deprecation warning for plain yaml.load(input) at 

807 # https://github.com/yaml/pyyaml/wiki/PyYAML-yaml.load(input)-Deprecation 

808 db = yaml.load(stream, Loader=yaml.SafeLoader) 

809 except yaml.YAMLError as error: 

810 print(f"Error with YAML file: {error}") 

811 # print(f"Could not open: {self.self.path_file_in}") 

812 print(f"Could not open or decode: {yml_path_file}") 

813 # raise yaml.YAMLError 

814 raise OSError from error 

815 

816 # check keys found in input file against required keys 

817 found_keys = tuple(db.keys()) 

818 keys_exist = tuple(map(lambda x: x in found_keys, required_keys)) 

819 has_required_keys = all(keys_exist) 

820 if not has_required_keys: 

821 raise KeyError(f"Input files must have these keys defined: {required_keys}") 

822 

823 version_specified = db["config_file_version"] 

824 version_requested = version 

825 

826 if version_specified != version_requested: 

827 ee = f"Version mismatch: specified in file was {version_specified}," 

828 ee += f"requested is {version_requested}" 

829 raise ValueError(ee) 

830 

831 return db 

832 

833 

834def yml_version( 

835 file: Path, 

836 key_name="config_file_version", 

837) -> float: 

838 """ 

839 Return the version of a YAML file if the proper key exists. 

840 

841 Parameters 

842 ---------- 

843 file : Path 

844 The path to the YAML file. 

845 key_name : str, optional 

846 The key name for the version in the YAML file (default is "config_file_version"). 

847 

848 Returns 

849 ------- 

850 float 

851 The version of the YAML file. 

852 

853 Raises 

854 ------ 

855 KeyError 

856 If the version key is not found in the YAML file. 

857 ValueError 

858 If the version value is not a valid float. 

859 """ 

860 with open(file, "r") as stream: 

861 data = yaml.load(stream, Loader=yaml.SafeLoader) 

862 

863 try: 

864 version = data[key_name] 

865 except KeyError: 

866 # print(f"Error with version key: {error}") 

867 raise KeyError(f"Error with version key, '{key_name}' key not found in {file}.") 

868 try: 

869 version = float(version) 

870 except ValueError: 

871 raise ValueError( 

872 f"Could not find valid version in {file} for key {key_name}, found '{version}' which is not a float." 

873 ) 

874 return version 

875 

876 

877def yes_no(question): 

878 """ 

879 Simple Yes/No function. 

880 

881 Parameters 

882 ---------- 

883 question : str 

884 The question to ask the user. 

885 

886 Returns 

887 ------- 

888 bool 

889 True if the user answers "yes", False otherwise. 

890 """ 

891 prompt = f"{question} (y/n): " 

892 ans = input(prompt).strip().lower() 

893 if ans not in ["y", "n"]: 

894 print(f"{ans} is invalid, please try again...") 

895 return yes_no(question) 

896 if ans == "y": 

897 return True 

898 return False 

899 

900 

901def remove_directory(directory: Path): 

902 """ 

903 Recursively remove a directory. 

904 

905 Parameters 

906 ---------- 

907 directory : Path 

908 The path to the directory to remove. 

909 """ 

910 shutil.rmtree(directory) 

911 

912 

913def split_list(data: List, chunk_size: int) -> List: 

914 """ 

915 Split a list into equal-sized chunks. 

916 

917 Parameters 

918 ---------- 

919 data : List 

920 The list to split. 

921 chunk_size : int 

922 The size of each chunk. 

923 

924 Returns 

925 ------- 

926 List 

927 A list of chunks. 

928 """ 

929 result = [] 

930 for i in range(0, len(data), chunk_size): 

931 result.append(data[i : i + chunk_size]) 

932 return result 

933 

934 

935def tabular_list( 

936 data: List, 

937 num_columns: int = Constants.default_column_count, 

938 column_width: int = Constants.default_column_width, 

939) -> str: 

940 """ 

941 Format a list into a tabular string. 

942 

943 Parameters 

944 ---------- 

945 data : List 

946 The list to format. 

947 num_columns : int, optional 

948 The number of columns in the table (default is Constants.default_column_count). 

949 column_width : int, optional 

950 The width of each column in the table (default is Constants.default_column_width). 

951 

952 Returns 

953 ------- 

954 str 

955 The formatted tabular string. 

956 """ 

957 rows = split_list(data, chunk_size=num_columns) 

958 result = "" 

959 for sublist in rows: 

960 result += "\n" 

961 for item in sublist: 

962 result += f"{item:^{column_width}}" 

963 return result 

964 

965 

966### Functions for tests and CI/CD### 

967 

968 

969def get_test_description(): 

970 """ 

971 Decorator to run a function only on a standalone machine. 

972 

973 Parameters 

974 ---------- 

975 func : function 

976 The function to decorate. 

977 

978 Returns 

979 ------- 

980 function 

981 The decorated function. 

982 """ 

983 node = platform.uname().node.lower() 

984 offline_machine = any( 

985 node in machine.lower() or machine.lower() in node 

986 for machine in Constants.offline_machines 

987 ) 

988 hardware_machine = any( 

989 node in machine.lower() or machine.lower() in node 

990 for machine in Constants.microscope_machines 

991 ) 

992 

993 laser_machine = is_laser_available() 

994 

995 api_version = get_autoscript_version() 

996 

997 if offline_machine: 

998 description = "simulated_" 

999 elif hardware_machine and laser_machine: 

1000 description = "laser_hardware_" 

1001 else: 

1002 description = "hardware_" 

1003 

1004 return description + api_version 

1005 

1006 

1007def get_autoscript_version() -> str: 

1008 """ 

1009 Get the version of autoscript for the present system 

1010 

1011 Returns 

1012 ------- 

1013 version : str 

1014 The version of autoscript 

1015 """ 

1016 try: 

1017 import autoscript_sdb_microscope_client as asmc 

1018 

1019 version = asmc.build_information.INFO_VERSIONSHORT 

1020 except ImportError: 

1021 version = "none" 

1022 return version 

1023 

1024 

1025def is_laser_available() -> bool: 

1026 """ 

1027 Get the version of ThermoFisher Laser Control API for the present system 

1028 

1029 Returns 

1030 ------- 

1031 version : str 

1032 The version of the Laser API 

1033 """ 

1034 try: 

1035 import Laser.PythonControl as tfs_laser 

1036 

1037 return True 

1038 except ImportError: 

1039 return False