pytribeam.utilities
Utilities Module
This module contains various utility functions and decorators for managing and controlling the microscope, handling YAML files, and performing other common tasks.
Functions
beam_type(beam) -> property Return the beam property object as ion and electron beams have the same internal hierarchy.
connect_microscope(microscope: tbt.Microscope, quiet_output: bool = True, connection_host: str = None, connection_port: int = None) -> bool Connect to the microscope with the option to suppress printout.
dict_to_yml(db: dict, file_path: Path) -> Path Convert a dictionary to a YAML file.
disconnect_microscope(microscope: tbt.Microscope, quiet_output: bool = True) -> bool Disconnect from the microscope with the option to suppress printout.
general_settings(exp_settings: dict, yml_format: tbt.YMLFormat) -> dict Grab general experiment settings from a .yml file and return them as a dictionary.
step_type(settings: dict, yml_format: tbt.YMLFormat) -> tbt.StepType Determine the step type for a specific step settings dictionary.
in_interval(val: float, limit: tbt.Limit, type: tbt.IntervalType) -> bool Test whether a value is within an interval, with the interval type defined by an enumerated IntervalType.
gen_dict_extract(key, var) Extract values from a nested dictionary by key.
nested_dictionary_location(d: dict, key: str, value: Any) -> List[str] Find the nested location of a key-value pair in a dictionary.
nested_find_key_value_pair(d: dict, key: str, value: Any) -> List[str] Find a key-value pair in a nested dictionary.
_flatten(dictionary: dict) -> dict Flatten a dictionary using pandas.
none_value_dictionary(dictionary: dict) -> bool Check if all values in a dictionary are None.
nostdout() Create a dummy file to suppress output.
step_count(exp_settings: dict, yml_format: tbt.YMLFormatVersion) -> int Determine the maximum step number from a settings dictionary.
step_settings(exp_settings: dict, step_number_key: str, step_number_val: int, yml_format: tbt.YMLFormatVersion) -> Tuple[str, dict] Grab specific step settings from an experimental dictionary and return them as a dictionary along with the user-defined step name.
valid_microscope_connection(host: str, port: str) -> bool Determine if a microscope connection can be made.
enable_external_device(oem: tbt.ExternalDeviceOEM) -> bool Determine whether to enable external device control.
valid_enum_entry(obj: Any, check_type: Enum) -> bool Determine if an object is a member of an Enum class.
yml_format(version: float) -> tbt.YMLFormatVersion Return the YML file format for a given version.
yml_to_dict(*, yml_path_file: Path, version: float, required_keys: Tuple[str, ...]) -> Dict Convert a YAML file to a dictionary.
yml_version(file: Path, key_name="config_file_version") -> float Return the version of a YAML file if the proper key exists.
yes_no(question) -> bool Simple Yes/No function.
remove_directory(directory: Path) Recursively remove a directory.
split_list(data: List, chunk_size: int) -> List Split a list into equal-sized chunks.
tabular_list(data: List, num_columns: int = Constants.default_column_count, column_width: int = Constants.default_column_width) -> str Format a list into a tabular string.
Decorators
hardware_movement(func) Decorator to run a function only when hardware testing is enabled.
run_on_standalone_machine(func) Decorator to run a function only on a standalone machine.
run_on_microscope_machine(func) Decorator to run a function only on a microscope machine.
1#!/usr/bin/python3 2""" 3Utilities Module 4================ 5 6This module contains various utility functions and decorators for managing and controlling the microscope, handling YAML files, and performing other common tasks. 7 8Functions 9--------- 10beam_type(beam) -> property 11 Return the beam property object as ion and electron beams have the same internal hierarchy. 12 13connect_microscope(microscope: tbt.Microscope, quiet_output: bool = True, connection_host: str = None, connection_port: int = None) -> bool 14 Connect to the microscope with the option to suppress printout. 15 16dict_to_yml(db: dict, file_path: Path) -> Path 17 Convert a dictionary to a YAML file. 18 19disconnect_microscope(microscope: tbt.Microscope, quiet_output: bool = True) -> bool 20 Disconnect from the microscope with the option to suppress printout. 21 22general_settings(exp_settings: dict, yml_format: tbt.YMLFormat) -> dict 23 Grab general experiment settings from a .yml file and return them as a dictionary. 24 25step_type(settings: dict, yml_format: tbt.YMLFormat) -> tbt.StepType 26 Determine the step type for a specific step settings dictionary. 27 28in_interval(val: float, limit: tbt.Limit, type: tbt.IntervalType) -> bool 29 Test whether a value is within an interval, with the interval type defined by an enumerated IntervalType. 30 31gen_dict_extract(key, var) 32 Extract values from a nested dictionary by key. 33 34nested_dictionary_location(d: dict, key: str, value: Any) -> List[str] 35 Find the nested location of a key-value pair in a dictionary. 36 37nested_find_key_value_pair(d: dict, key: str, value: Any) -> List[str] 38 Find a key-value pair in a nested dictionary. 39 40_flatten(dictionary: dict) -> dict 41 Flatten a dictionary using pandas. 42 43none_value_dictionary(dictionary: dict) -> bool 44 Check if all values in a dictionary are None. 45 46nostdout() 47 Create a dummy file to suppress output. 48 49step_count(exp_settings: dict, yml_format: tbt.YMLFormatVersion) -> int 50 Determine the maximum step number from a settings dictionary. 51 52step_settings(exp_settings: dict, step_number_key: str, step_number_val: int, yml_format: tbt.YMLFormatVersion) -> Tuple[str, dict] 53 Grab specific step settings from an experimental dictionary and return them as a dictionary along with the user-defined step name. 54 55valid_microscope_connection(host: str, port: str) -> bool 56 Determine if a microscope connection can be made. 57 58enable_external_device(oem: tbt.ExternalDeviceOEM) -> bool 59 Determine whether to enable external device control. 60 61valid_enum_entry(obj: Any, check_type: Enum) -> bool 62 Determine if an object is a member of an Enum class. 63 64yml_format(version: float) -> tbt.YMLFormatVersion 65 Return the YML file format for a given version. 66 67yml_to_dict(*, yml_path_file: Path, version: float, required_keys: Tuple[str, ...]) -> Dict 68 Convert a YAML file to a dictionary. 69 70yml_version(file: Path, key_name="config_file_version") -> float 71 Return the version of a YAML file if the proper key exists. 72 73yes_no(question) -> bool 74 Simple Yes/No function. 75 76remove_directory(directory: Path) 77 Recursively remove a directory. 78 79split_list(data: List, chunk_size: int) -> List 80 Split a list into equal-sized chunks. 81 82tabular_list(data: List, num_columns: int = Constants.default_column_count, column_width: int = Constants.default_column_width) -> str 83 Format a list into a tabular string. 84 85Decorators 86---------- 87hardware_movement(func) 88 Decorator to run a function only when hardware testing is enabled. 89 90run_on_standalone_machine(func) 91 Decorator to run a function only on a standalone machine. 92 93run_on_microscope_machine(func) 94 Decorator to run a function only on a microscope machine. 95""" 96 97# Default python modules 98import os 99from pathlib import Path 100import time 101import warnings 102import math 103from typing import Dict, NamedTuple, Tuple, Any, List 104from enum import Enum, IntEnum 105import platform 106import pytest 107from functools import singledispatch 108import shutil 109 110# # Autoscript modules 111import yaml 112import contextlib 113import sys 114from pandas import json_normalize 115 116# # # 3rd party module 117# from schema import Schema, And, Use, Optional, SchemaError 118 119# # Local scripts 120import pytribeam.types as tbt 121 122# import pytribeam.constants as cs 123from pytribeam.constants import Constants 124 125 126@singledispatch 127def beam_type(beam) -> property: 128 """ 129 Return the beam property object as ion and electron beams have the same internal hierarchy. 130 131 Parameters 132 ---------- 133 beam : Any 134 The beam object. 135 136 Returns 137 ------- 138 property 139 The beam property object. 140 141 Raises 142 ------ 143 NotImplementedError 144 If the beam type is not implemented. 145 """ 146 _ = beam # no operation 147 raise NotImplementedError() 148 149 150@beam_type.register 151def _(beam: tbt.ElectronBeam, microscope: tbt.Microscope) -> property: 152 """ 153 Return the electron beam property object. 154 155 Parameters 156 ---------- 157 beam : tbt.ElectronBeam 158 The electron beam object. 159 microscope : tbt.Microscope 160 The microscope object. 161 162 Returns 163 ------- 164 property 165 The electron beam property object. 166 """ 167 return microscope.beams.electron_beam 168 169 170@beam_type.register 171def _(beam: tbt.IonBeam, microscope: tbt.Microscope) -> property: 172 """ 173 Return the ion beam property object. 174 175 Parameters 176 ---------- 177 beam : tbt.IonBeam 178 The ion beam object. 179 microscope : tbt.Microscope 180 The microscope object. 181 182 Returns 183 ------- 184 property 185 The ion beam property object. 186 """ 187 return microscope.beams.ion_beam 188 189 190def connect_microscope( 191 microscope: tbt.Microscope, 192 quiet_output: bool = True, 193 connection_host: str = None, 194 connection_port: int = None, 195): 196 """ 197 Connect to the microscope with the option to suppress printout. 198 199 Parameters 200 ---------- 201 microscope : tbt.Microscope 202 The microscope object to connect. 203 quiet_output : bool, optional 204 Whether to suppress printout (default is True). 205 connection_host : str, optional 206 The connection host (default is None). 207 connection_port : int, optional 208 The connection port (default is None). 209 210 Returns 211 ------- 212 bool 213 True if the connection is successful. 214 215 Raises 216 ------ 217 ConnectionError 218 If the connection fails. 219 """ 220 221 # TODO clean up inner function 222 def connect( 223 microscope: tbt.Microscope, 224 connection_host: str = None, 225 connection_port: int = None, 226 ) -> bool: 227 if connection_port is not None: 228 microscope.connect(connection_host, connection_port) 229 elif connection_host is not None: 230 microscope.connect(connection_host) 231 else: 232 microscope.connect() 233 234 if quiet_output: 235 with nostdout(): 236 connect( 237 microscope=microscope, 238 connection_host=connection_host, 239 connection_port=connection_port, 240 ) 241 else: 242 connect( 243 microscope=microscope, 244 connection_host=connection_host, 245 connection_port=connection_port, 246 ) 247 248 if microscope.server_host is not None: 249 return True 250 else: 251 raise ConnectionError( 252 f"Connection failed with connection_host of '{connection_host}' and connection_port of '{connection_port}' microscope not connected." 253 ) 254 255 256def dict_to_yml(db: dict, file_path: Path) -> Path: 257 """ 258 Convert a dictionary to a YAML file. 259 260 Parameters 261 ---------- 262 db : dict 263 The dictionary to convert. 264 file_path : Path 265 The path to save the YAML file. 266 267 Returns 268 ------- 269 Path 270 The path to the saved YAML file. 271 """ 272 with open(file_path, "w", encoding="utf-8") as out_file: 273 yaml.dump( 274 db, 275 out_file, 276 default_flow_style=False, 277 sort_keys=False, 278 ) 279 280 return file_path 281 282 283def disconnect_microscope( 284 microscope: tbt.Microscope, 285 quiet_output: bool = True, 286): 287 """ 288 Disconnect from the microscope with the option to suppress printout. 289 290 Parameters 291 ---------- 292 microscope : tbt.Microscope 293 The microscope object to disconnect. 294 quiet_output : bool, optional 295 Whether to suppress printout (default is True). 296 297 Returns 298 ------- 299 bool 300 True if the disconnection is successful. 301 302 Raises 303 ------ 304 ConnectionError 305 If the disconnection fails. 306 """ 307 if quiet_output: 308 with nostdout(): 309 microscope.disconnect() 310 else: 311 microscope.disconnect() 312 313 if microscope.server_host is None: 314 return True 315 else: 316 raise ConnectionError("Disconnection failed, microscope still connected") 317 318 319def general_settings(exp_settings: dict, yml_format: tbt.YMLFormat) -> dict: 320 """ 321 Grab general experiment settings from a .yml file and return them as a dictionary. 322 323 Parameters 324 ---------- 325 exp_settings : dict 326 The experiment settings dictionary. 327 yml_format : tbt.YMLFormat 328 The YAML format version. 329 330 Returns 331 ------- 332 dict 333 The general experiment settings as a dictionary. 334 """ 335 general_key = yml_format.general_section_key 336 return exp_settings[general_key] 337 338 339def step_type(settings: dict, yml_format: tbt.YMLFormat) -> tbt.StepType: 340 """ 341 Determine the step type for a specific step settings dictionary. 342 343 Parameters 344 ---------- 345 settings : dict 346 The step settings dictionary. 347 yml_format : tbt.YMLFormat 348 The YAML format version. 349 350 Returns 351 ------- 352 tbt.StepType 353 The step type. 354 """ 355 step_type = tbt.StepType( 356 settings[yml_format.step_general_key][yml_format.step_type_key] 357 ) 358 359 return step_type 360 361 362def in_interval(val: float, limit: tbt.Limit, type: tbt.IntervalType) -> bool: 363 """ 364 Test whether a value is within an interval, with the interval type defined by an enumerated IntervalType. 365 366 Parameters 367 ---------- 368 val : float 369 The input value to be compared against min and max. 370 limit : tbt.Limit 371 The bounds of the interval. 372 type : tbt.IntervalType 373 The type of interval. 374 375 Returns 376 ------- 377 bool 378 True if within the interval, False otherwise. 379 """ 380 if type == tbt.IntervalType.OPEN: 381 return (val > limit.min) and (val < limit.max) 382 if type == tbt.IntervalType.CLOSED: 383 return (val >= limit.min) and (val <= limit.max) 384 if type == tbt.IntervalType.LEFT_OPEN: 385 return (val > limit.min) and (val <= limit.max) 386 if type == tbt.IntervalType.RIGHT_OPEN: 387 return (val >= limit.min) and (val < limit.max) 388 389 390def gen_dict_extract(key, var): 391 """ 392 Extract values from a nested dictionary by key. 393 394 Parameters 395 ---------- 396 key : str 397 The key to search for. 398 var : dict 399 The nested dictionary to search. 400 401 Yields 402 ------ 403 Any 404 The values associated with the specified key. 405 """ 406 if hasattr(var, "items"): 407 for k, v in var.items(): 408 if k == key: 409 yield v 410 if isinstance(v, dict): 411 for result in gen_dict_extract(key, v): 412 yield result 413 elif isinstance(v, list): 414 for d in v: 415 for result in gen_dict_extract(key, d): 416 yield result 417 418 419def nested_dictionary_location(d: dict, key: str, value: Any) -> List[str]: 420 """ 421 Find the nested location of a key-value pair in a dictionary. 422 423 This function returns a list of key values from the highest to the lowest level of nested dictionaries. 424 425 Parameters 426 ---------- 427 d : dict 428 The dictionary to search. 429 key : str 430 The key to search for. 431 value : Any 432 The value to search for. 433 434 Returns 435 ------- 436 List[str] 437 The nested location of the key-value pair. 438 439 Raises 440 ------ 441 KeyError 442 If the key-value pair is not found in the dictionary. 443 """ 444 nesting = nested_find_key_value_pair(d=d, key=key, value=value) 445 if nesting is None: 446 raise KeyError( 447 f'Key : value pair of "{key} : {value}" not found in the provided dictionary.' 448 ) 449 return nesting 450 451 452def nested_find_key_value_pair(d: dict, key: str, value: Any) -> List[str]: 453 """ 454 Find a key-value pair in a nested dictionary. 455 456 This function returns a list of key values from the highest to the lowest level of nested dictionaries. 457 458 Parameters 459 ---------- 460 d : dict 461 The dictionary to search. 462 key : str 463 The key to search for. 464 value : Any 465 The value to search for. 466 467 Returns 468 ------- 469 List[str] 470 The nested location of the key-value pair. 471 """ 472 for k, v in d.items(): 473 if k == key: 474 if v == value: 475 return [k] 476 if isinstance(v, dict): 477 p = nested_find_key_value_pair(v, key, value) 478 if p: 479 return [k] + p 480 481 482def _flatten(dictionary: dict) -> dict: 483 """ 484 Flatten a dictionary using pandas. 485 486 This function flattens a nested dictionary using pandas, which can be slow on large dictionaries. 487 From https://stackoverflow.com/questions/6027558/flatten-nested-dictionaries-compressing-keys 488 489 Parameters 490 ---------- 491 dictionary : dict 492 The dictionary to flatten. 493 494 Returns 495 ------- 496 dict 497 The flattened dictionary. 498 """ 499 data_frame = json_normalize(dictionary, sep="_") 500 db_flat = data_frame.to_dict(orient="records")[0] 501 return db_flat 502 503 504def none_value_dictionary(dictionary: dict) -> bool: 505 """ 506 Check if all values in a dictionary are None. 507 508 This function returns True if all values in the dictionary are None, and False otherwise. 509 510 Parameters 511 ---------- 512 dictionary : dict 513 The dictionary to check. 514 515 Returns 516 ------- 517 bool 518 True if all values in the dictionary are None, False otherwise. 519 """ 520 # flatten the dictionary first 521 db_flat = _flatten(dictionary) 522 return all([v is None for v in db_flat.values()]) 523 524 525@contextlib.contextmanager 526def nostdout(): 527 """ 528 Create a dummy file to suppress output. 529 530 This function creates a dummy file to suppress output. 531 532 Yields 533 ------ 534 None 535 """ 536 save_stdout = sys.stdout 537 sys.stdout = tbt.DummyFile() 538 try: 539 yield 540 finally: 541 # Always restore stdout, even if KeyboardInterrupt or other exceptions occur 542 sys.stdout = save_stdout 543 544 545def step_count( 546 exp_settings: dict, 547 yml_format: tbt.YMLFormatVersion, 548): 549 """ 550 Determine the maximum step number from a settings dictionary. 551 552 This function determines the maximum step number from a settings dictionary, as specified by the step_number_key. 553 554 Parameters 555 ---------- 556 exp_settings : dict 557 The experiment settings dictionary. 558 yml_format : tbt.YMLFormatVersion 559 The YAML format version. 560 561 Returns 562 ------- 563 int 564 The maximum step number. 565 566 Raises 567 ------ 568 ValueError 569 If the number of steps found does not match the expected step count. 570 """ 571 572 step_number_key = yml_format.step_number_key 573 non_step_sections = yml_format.non_step_section_count 574 575 # make sure dict from yml has correct section count 576 # (steps should all be in one section) 577 total_sections = len(exp_settings) 578 if total_sections != non_step_sections + 1: 579 raise ValueError( 580 f"Invalid .yml file, {total_sections} sections were found but the input .yml should have {non_step_sections + 1} sections. Please verify that all top-level keys in the .yml have unique strings and that all steps are contained in a single top-level section." 581 ) 582 583 expected_step_count = exp_settings[yml_format.general_section_key][ 584 yml_format.step_count_key 585 ] 586 587 found_step_count = 0 588 while True: 589 try: 590 nested_dictionary_location( 591 d=exp_settings, 592 key=step_number_key, 593 value=found_step_count + 1, 594 ) 595 except KeyError: 596 break 597 found_step_count += 1 598 599 # validate number of steps found with steps read by YAML loader 600 # TODO YAML safeloader will ignore duplicate top level keys, so this check relies on unique step numbers in ascending order (no gaps) to be found. 601 602 if expected_step_count != found_step_count: 603 raise ValueError( 604 f"Invalid .yml file, {found_step_count} steps were found but the input .yml should have {expected_step_count} steps from the general setting key '{yml_format.step_count_key}' within the '{yml_format.general_section_key}' section. Please verify that all step_name keys in the .yml have unique strings and that step numbers are continuously-increasing positive integers starting at 1." 605 ) 606 607 return found_step_count 608 609 610def step_settings( 611 exp_settings: dict, 612 step_number_key: str, 613 step_number_val: int, 614 yml_format: tbt.YMLFormatVersion, 615) -> Tuple[str, dict]: 616 """ 617 Grab specific step settings from an experimental dictionary and return them as a dictionary along with the user-defined step name. 618 619 Parameters 620 ---------- 621 exp_settings : dict 622 The experiment settings dictionary. 623 step_number_key : str 624 The key for the step number. 625 step_number_val : int 626 The value for the step number. 627 yml_format : tbt.YMLFormatVersion 628 The YAML format version. 629 630 Returns 631 ------- 632 Tuple[str, dict] 633 The step name and the step settings dictionary. 634 """ 635 636 nested_locations = nested_dictionary_location( 637 d=exp_settings, 638 key=step_number_key, 639 value=step_number_val, 640 ) 641 ### top level dictionary key name is first index, need key name nested within it (second level, index = 1) 642 step_name = nested_locations[1] 643 step_section_key = yml_format.step_section_key 644 return step_name, exp_settings[step_section_key][step_name] 645 646 647def valid_microscope_connection(host: str, port: str) -> bool: 648 """ 649 Determine if a microscope connection can be made. 650 651 This function checks if a microscope connection can be made and disconnects if a connection can be made. 652 653 Parameters 654 ---------- 655 host : str 656 The connection host. 657 port : str 658 The connection port. 659 660 Returns 661 ------- 662 bool 663 True if the connection can be made, False otherwise. 664 """ 665 microscope = tbt.Microscope() 666 if connect_microscope( 667 microscope=microscope, 668 quiet_output=True, 669 connection_host=host, 670 connection_port=port, 671 ): 672 if disconnect_microscope( 673 microscope=microscope, 674 quiet_output=True, 675 ): 676 return True 677 return False 678 679 680def enable_external_device(oem: tbt.ExternalDeviceOEM) -> bool: 681 """ 682 Determine whether to enable external device control. 683 684 This function checks if the external device control should be enabled based on the OEM. 685 686 Parameters 687 ---------- 688 oem : tbt.ExternalDeviceOEM 689 The OEM of the external device. 690 691 Returns 692 ------- 693 bool 694 True if the external device control should be enabled, False otherwise. 695 696 Raises 697 ------ 698 NotImplementedError 699 If the OEM type is unsupported. 700 """ 701 if not isinstance(oem, tbt.ExternalDeviceOEM): 702 raise NotImplementedError( 703 f"Unsupported type of {type(oem)}, only 'ExternalDeviceOEM' types are supported." 704 ) 705 if oem != tbt.ExternalDeviceOEM.NONE: 706 return True 707 return False 708 709 710def valid_enum_entry(obj: Any, check_type: Enum) -> bool: 711 """ 712 Determine if an object is a member of an Enum class. 713 714 This function checks if an object is a member of an Enum class. 715 716 Parameters 717 ---------- 718 obj : Any 719 The object to check. 720 check_type : Enum 721 The Enum class to check against. 722 723 Returns 724 ------- 725 bool 726 True if the object is a member of the Enum class, False otherwise. 727 """ 728 return obj in check_type._value2member_map_ 729 730 731def yml_format(version: float) -> tbt.YMLFormatVersion: 732 """ 733 Return the YML file format for a given version. 734 735 This function returns the YML file format for a given version. 736 737 Parameters 738 ---------- 739 version : float 740 The version of the YML file. 741 742 Returns 743 ------- 744 tbt.YMLFormatVersion 745 The YML file format for the given version. 746 747 Raises 748 ------ 749 NotImplementedError 750 If the YML file version is unsupported. 751 """ 752 supported_versions = [file.version for file in tbt.YMLFormatVersion] 753 if not version in supported_versions: 754 raise NotImplementedError( 755 f'Unsupported YML file version for version "{version}". Valid formats include: {[i.value for i in tbt.YMLFormatVersion]}' 756 ) 757 yml_file_idx = supported_versions.index(version) 758 yml_format = list(tbt.YMLFormatVersion)[yml_file_idx] 759 return yml_format 760 761 762def yml_to_dict( 763 *, yml_path_file: Path, version: float, required_keys: Tuple[str, ...] 764) -> Dict: 765 """ 766 Convert a YAML file to a dictionary. 767 768 This function reads a YAML file and returns the result as a dictionary. 769 770 Parameters 771 ---------- 772 yml_path_file : Path 773 The fully pathed location to the input file. 774 version : float 775 The version of the YAML file in x.y format. 776 required_keys : Tuple[str, ...] 777 The key(s) that must be in the YAML file for conversion to a dictionary to occur. 778 779 Returns 780 ------- 781 dict 782 The YAML file represented as a dictionary. 783 784 Raises 785 ------ 786 TypeError 787 If the file type is unsupported. 788 OSError 789 If the YAML file cannot be opened or decoded. 790 KeyError 791 If the required keys are not found in the YAML file. 792 ValueError 793 If the version specified in the file does not match the requested version. 794 """ 795 796 # Compared to the lower() method, the casefold() method is stronger. 797 # It will convert more characters into lower case, and will find more 798 # matches on comparison of two strings that are both are converted 799 # using the casefold() method. 800 file_type = yml_path_file.suffix.casefold() 801 802 supported_types = (".yaml", ".yml") 803 804 if file_type not in supported_types: 805 raise TypeError("Only file types .yaml, and .yml are supported.") 806 807 try: 808 with open(file=yml_path_file, mode="r", encoding="utf-8") as stream: 809 # See deprecation warning for plain yaml.load(input) at 810 # https://github.com/yaml/pyyaml/wiki/PyYAML-yaml.load(input)-Deprecation 811 db = yaml.load(stream, Loader=yaml.SafeLoader) 812 except yaml.YAMLError as error: 813 print(f"Error with YAML file: {error}") 814 # print(f"Could not open: {self.self.path_file_in}") 815 print(f"Could not open or decode: {yml_path_file}") 816 # raise yaml.YAMLError 817 raise OSError from error 818 819 # check keys found in input file against required keys 820 found_keys = tuple(db.keys()) 821 keys_exist = tuple(map(lambda x: x in found_keys, required_keys)) 822 has_required_keys = all(keys_exist) 823 if not has_required_keys: 824 raise KeyError(f"Input files must have these keys defined: {required_keys}") 825 826 version_specified = db["config_file_version"] 827 version_requested = version 828 829 if version_specified != version_requested: 830 ee = f"Version mismatch: specified in file was {version_specified}," 831 ee += f"requested is {version_requested}" 832 raise ValueError(ee) 833 834 return db 835 836 837def yml_version( 838 file: Path, 839 key_name="config_file_version", 840) -> float: 841 """ 842 Return the version of a YAML file if the proper key exists. 843 844 Parameters 845 ---------- 846 file : Path 847 The path to the YAML file. 848 key_name : str, optional 849 The key name for the version in the YAML file (default is "config_file_version"). 850 851 Returns 852 ------- 853 float 854 The version of the YAML file. 855 856 Raises 857 ------ 858 KeyError 859 If the version key is not found in the YAML file. 860 ValueError 861 If the version value is not a valid float. 862 """ 863 with open(file, "r") as stream: 864 data = yaml.load(stream, Loader=yaml.SafeLoader) 865 866 try: 867 version = data[key_name] 868 except KeyError: 869 # print(f"Error with version key: {error}") 870 raise KeyError(f"Error with version key, '{key_name}' key not found in {file}.") 871 try: 872 version = float(version) 873 except ValueError: 874 raise ValueError( 875 f"Could not find valid version in {file} for key {key_name}, found '{version}' which is not a float." 876 ) 877 return version 878 879 880def yes_no(question): 881 """ 882 Simple Yes/No function. 883 884 Parameters 885 ---------- 886 question : str 887 The question to ask the user. 888 889 Returns 890 ------- 891 bool 892 True if the user answers "yes", False otherwise. 893 """ 894 prompt = f"{question} (y/n): " 895 ans = input(prompt).strip().lower() 896 if ans not in ["y", "n"]: 897 print(f"{ans} is invalid, please try again...") 898 return yes_no(question) 899 if ans == "y": 900 return True 901 return False 902 903 904def remove_directory(directory: Path): 905 """ 906 Recursively remove a directory. 907 908 Parameters 909 ---------- 910 directory : Path 911 The path to the directory to remove. 912 """ 913 shutil.rmtree(directory) 914 915 916def split_list(data: List, chunk_size: int) -> List: 917 """ 918 Split a list into equal-sized chunks. 919 920 Parameters 921 ---------- 922 data : List 923 The list to split. 924 chunk_size : int 925 The size of each chunk. 926 927 Returns 928 ------- 929 List 930 A list of chunks. 931 """ 932 result = [] 933 for i in range(0, len(data), chunk_size): 934 result.append(data[i : i + chunk_size]) 935 return result 936 937 938def tabular_list( 939 data: List, 940 num_columns: int = Constants.default_column_count, 941 column_width: int = Constants.default_column_width, 942) -> str: 943 """ 944 Format a list into a tabular string. 945 946 Parameters 947 ---------- 948 data : List 949 The list to format. 950 num_columns : int, optional 951 The number of columns in the table (default is Constants.default_column_count). 952 column_width : int, optional 953 The width of each column in the table (default is Constants.default_column_width). 954 955 Returns 956 ------- 957 str 958 The formatted tabular string. 959 """ 960 rows = split_list(data, chunk_size=num_columns) 961 result = "" 962 for sublist in rows: 963 result += "\n" 964 for item in sublist: 965 result += f"{item:^{column_width}}" 966 return result 967 968 969### Custom Decorators ### 970 971 972def hardware_movement(func): 973 """ 974 Decorator to run a function only when hardware testing is enabled. 975 976 Parameters 977 ---------- 978 func : function 979 The function to decorate. 980 981 Returns 982 ------- 983 function 984 The decorated function. 985 """ 986 987 @run_on_microscope_machine 988 def wrapper_func(): 989 if not Constants.test_hardware_movement: 990 pytest.skip("Run only when hardware testing is enabled") 991 func() 992 993 return wrapper_func 994 995 996def run_on_standalone_machine(func): 997 """ 998 Decorator to run a function only on a standalone machine. 999 1000 Parameters 1001 ---------- 1002 func : function 1003 The function to decorate. 1004 1005 Returns 1006 ------- 1007 function 1008 The decorated function. 1009 """ 1010 1011 def wrapper_func(): 1012 current_machine = platform.uname().node.lower() 1013 test_machines = [machine.lower() for machine in Constants().offline_machines] 1014 if current_machine not in test_machines: 1015 pytest.skip("Run on Offline License Machine Only.") 1016 func() 1017 1018 return wrapper_func 1019 1020 1021def run_on_microscope_machine(func): 1022 """ 1023 Decorator to run a function only on a microscope machine. 1024 1025 Parameters 1026 ---------- 1027 func : function 1028 The function to decorate. 1029 1030 Returns 1031 ------- 1032 function 1033 The decorated function. 1034 """ 1035 1036 def wrapper_func(): 1037 current_machine = platform.uname().node.lower() 1038 test_machines = [machine.lower() for machine in Constants().microscope_machines] 1039 if current_machine not in test_machines: 1040 pytest.skip("Run on Microscope Machine Only.") 1041 func() 1042 1043 return wrapper_func
127@singledispatch 128def beam_type(beam) -> property: 129 """ 130 Return the beam property object as ion and electron beams have the same internal hierarchy. 131 132 Parameters 133 ---------- 134 beam : Any 135 The beam object. 136 137 Returns 138 ------- 139 property 140 The beam property object. 141 142 Raises 143 ------ 144 NotImplementedError 145 If the beam type is not implemented. 146 """ 147 _ = beam # no operation 148 raise NotImplementedError()
Return the beam property object as ion and electron beams have the same internal hierarchy.
Parameters
beam : Any The beam object.
Returns
property The beam property object.
Raises
NotImplementedError If the beam type is not implemented.
191def connect_microscope( 192 microscope: tbt.Microscope, 193 quiet_output: bool = True, 194 connection_host: str = None, 195 connection_port: int = None, 196): 197 """ 198 Connect to the microscope with the option to suppress printout. 199 200 Parameters 201 ---------- 202 microscope : tbt.Microscope 203 The microscope object to connect. 204 quiet_output : bool, optional 205 Whether to suppress printout (default is True). 206 connection_host : str, optional 207 The connection host (default is None). 208 connection_port : int, optional 209 The connection port (default is None). 210 211 Returns 212 ------- 213 bool 214 True if the connection is successful. 215 216 Raises 217 ------ 218 ConnectionError 219 If the connection fails. 220 """ 221 222 # TODO clean up inner function 223 def connect( 224 microscope: tbt.Microscope, 225 connection_host: str = None, 226 connection_port: int = None, 227 ) -> bool: 228 if connection_port is not None: 229 microscope.connect(connection_host, connection_port) 230 elif connection_host is not None: 231 microscope.connect(connection_host) 232 else: 233 microscope.connect() 234 235 if quiet_output: 236 with nostdout(): 237 connect( 238 microscope=microscope, 239 connection_host=connection_host, 240 connection_port=connection_port, 241 ) 242 else: 243 connect( 244 microscope=microscope, 245 connection_host=connection_host, 246 connection_port=connection_port, 247 ) 248 249 if microscope.server_host is not None: 250 return True 251 else: 252 raise ConnectionError( 253 f"Connection failed with connection_host of '{connection_host}' and connection_port of '{connection_port}' microscope not connected." 254 )
Connect to the microscope with the option to suppress printout.
Parameters
microscope : tbt.Microscope The microscope object to connect. quiet_output : bool, optional Whether to suppress printout (default is True). connection_host : str, optional The connection host (default is None). connection_port : int, optional The connection port (default is None).
Returns
bool True if the connection is successful.
Raises
ConnectionError If the connection fails.
257def dict_to_yml(db: dict, file_path: Path) -> Path: 258 """ 259 Convert a dictionary to a YAML file. 260 261 Parameters 262 ---------- 263 db : dict 264 The dictionary to convert. 265 file_path : Path 266 The path to save the YAML file. 267 268 Returns 269 ------- 270 Path 271 The path to the saved YAML file. 272 """ 273 with open(file_path, "w", encoding="utf-8") as out_file: 274 yaml.dump( 275 db, 276 out_file, 277 default_flow_style=False, 278 sort_keys=False, 279 ) 280 281 return file_path
Convert a dictionary to a YAML file.
Parameters
db : dict The dictionary to convert. file_path : Path The path to save the YAML file.
Returns
Path The path to the saved YAML file.
284def disconnect_microscope( 285 microscope: tbt.Microscope, 286 quiet_output: bool = True, 287): 288 """ 289 Disconnect from the microscope with the option to suppress printout. 290 291 Parameters 292 ---------- 293 microscope : tbt.Microscope 294 The microscope object to disconnect. 295 quiet_output : bool, optional 296 Whether to suppress printout (default is True). 297 298 Returns 299 ------- 300 bool 301 True if the disconnection is successful. 302 303 Raises 304 ------ 305 ConnectionError 306 If the disconnection fails. 307 """ 308 if quiet_output: 309 with nostdout(): 310 microscope.disconnect() 311 else: 312 microscope.disconnect() 313 314 if microscope.server_host is None: 315 return True 316 else: 317 raise ConnectionError("Disconnection failed, microscope still connected")
Disconnect from the microscope with the option to suppress printout.
Parameters
microscope : tbt.Microscope The microscope object to disconnect. quiet_output : bool, optional Whether to suppress printout (default is True).
Returns
bool True if the disconnection is successful.
Raises
ConnectionError If the disconnection fails.
320def general_settings(exp_settings: dict, yml_format: tbt.YMLFormat) -> dict: 321 """ 322 Grab general experiment settings from a .yml file and return them as a dictionary. 323 324 Parameters 325 ---------- 326 exp_settings : dict 327 The experiment settings dictionary. 328 yml_format : tbt.YMLFormat 329 The YAML format version. 330 331 Returns 332 ------- 333 dict 334 The general experiment settings as a dictionary. 335 """ 336 general_key = yml_format.general_section_key 337 return exp_settings[general_key]
Grab general experiment settings from a .yml file and return them as a dictionary.
Parameters
exp_settings : dict The experiment settings dictionary. yml_format : tbt.YMLFormat The YAML format version.
Returns
dict The general experiment settings as a dictionary.
340def step_type(settings: dict, yml_format: tbt.YMLFormat) -> tbt.StepType: 341 """ 342 Determine the step type for a specific step settings dictionary. 343 344 Parameters 345 ---------- 346 settings : dict 347 The step settings dictionary. 348 yml_format : tbt.YMLFormat 349 The YAML format version. 350 351 Returns 352 ------- 353 tbt.StepType 354 The step type. 355 """ 356 step_type = tbt.StepType( 357 settings[yml_format.step_general_key][yml_format.step_type_key] 358 ) 359 360 return step_type
Determine the step type for a specific step settings dictionary.
Parameters
settings : dict The step settings dictionary. yml_format : tbt.YMLFormat The YAML format version.
Returns
tbt.StepType The step type.
363def in_interval(val: float, limit: tbt.Limit, type: tbt.IntervalType) -> bool: 364 """ 365 Test whether a value is within an interval, with the interval type defined by an enumerated IntervalType. 366 367 Parameters 368 ---------- 369 val : float 370 The input value to be compared against min and max. 371 limit : tbt.Limit 372 The bounds of the interval. 373 type : tbt.IntervalType 374 The type of interval. 375 376 Returns 377 ------- 378 bool 379 True if within the interval, False otherwise. 380 """ 381 if type == tbt.IntervalType.OPEN: 382 return (val > limit.min) and (val < limit.max) 383 if type == tbt.IntervalType.CLOSED: 384 return (val >= limit.min) and (val <= limit.max) 385 if type == tbt.IntervalType.LEFT_OPEN: 386 return (val > limit.min) and (val <= limit.max) 387 if type == tbt.IntervalType.RIGHT_OPEN: 388 return (val >= limit.min) and (val < limit.max)
Test whether a value is within an interval, with the interval type defined by an enumerated IntervalType.
Parameters
val : float The input value to be compared against min and max. limit : tbt.Limit The bounds of the interval. type : tbt.IntervalType The type of interval.
Returns
bool True if within the interval, False otherwise.
391def gen_dict_extract(key, var): 392 """ 393 Extract values from a nested dictionary by key. 394 395 Parameters 396 ---------- 397 key : str 398 The key to search for. 399 var : dict 400 The nested dictionary to search. 401 402 Yields 403 ------ 404 Any 405 The values associated with the specified key. 406 """ 407 if hasattr(var, "items"): 408 for k, v in var.items(): 409 if k == key: 410 yield v 411 if isinstance(v, dict): 412 for result in gen_dict_extract(key, v): 413 yield result 414 elif isinstance(v, list): 415 for d in v: 416 for result in gen_dict_extract(key, d): 417 yield result
Extract values from a nested dictionary by key.
Parameters
key : str The key to search for. var : dict The nested dictionary to search.
Yields
Any The values associated with the specified key.
420def nested_dictionary_location(d: dict, key: str, value: Any) -> List[str]: 421 """ 422 Find the nested location of a key-value pair in a dictionary. 423 424 This function returns a list of key values from the highest to the lowest level of nested dictionaries. 425 426 Parameters 427 ---------- 428 d : dict 429 The dictionary to search. 430 key : str 431 The key to search for. 432 value : Any 433 The value to search for. 434 435 Returns 436 ------- 437 List[str] 438 The nested location of the key-value pair. 439 440 Raises 441 ------ 442 KeyError 443 If the key-value pair is not found in the dictionary. 444 """ 445 nesting = nested_find_key_value_pair(d=d, key=key, value=value) 446 if nesting is None: 447 raise KeyError( 448 f'Key : value pair of "{key} : {value}" not found in the provided dictionary.' 449 ) 450 return nesting
Find the nested location of a key-value pair in a dictionary.
This function returns a list of key values from the highest to the lowest level of nested dictionaries.
Parameters
d : dict The dictionary to search. key : str The key to search for. value : Any The value to search for.
Returns
List[str] The nested location of the key-value pair.
Raises
KeyError If the key-value pair is not found in the dictionary.
453def nested_find_key_value_pair(d: dict, key: str, value: Any) -> List[str]: 454 """ 455 Find a key-value pair in a nested dictionary. 456 457 This function returns a list of key values from the highest to the lowest level of nested dictionaries. 458 459 Parameters 460 ---------- 461 d : dict 462 The dictionary to search. 463 key : str 464 The key to search for. 465 value : Any 466 The value to search for. 467 468 Returns 469 ------- 470 List[str] 471 The nested location of the key-value pair. 472 """ 473 for k, v in d.items(): 474 if k == key: 475 if v == value: 476 return [k] 477 if isinstance(v, dict): 478 p = nested_find_key_value_pair(v, key, value) 479 if p: 480 return [k] + p
Find a key-value pair in a nested dictionary.
This function returns a list of key values from the highest to the lowest level of nested dictionaries.
Parameters
d : dict The dictionary to search. key : str The key to search for. value : Any The value to search for.
Returns
List[str] The nested location of the key-value pair.
505def none_value_dictionary(dictionary: dict) -> bool: 506 """ 507 Check if all values in a dictionary are None. 508 509 This function returns True if all values in the dictionary are None, and False otherwise. 510 511 Parameters 512 ---------- 513 dictionary : dict 514 The dictionary to check. 515 516 Returns 517 ------- 518 bool 519 True if all values in the dictionary are None, False otherwise. 520 """ 521 # flatten the dictionary first 522 db_flat = _flatten(dictionary) 523 return all([v is None for v in db_flat.values()])
Check if all values in a dictionary are None.
This function returns True if all values in the dictionary are None, and False otherwise.
Parameters
dictionary : dict The dictionary to check.
Returns
bool True if all values in the dictionary are None, False otherwise.
526@contextlib.contextmanager 527def nostdout(): 528 """ 529 Create a dummy file to suppress output. 530 531 This function creates a dummy file to suppress output. 532 533 Yields 534 ------ 535 None 536 """ 537 save_stdout = sys.stdout 538 sys.stdout = tbt.DummyFile() 539 try: 540 yield 541 finally: 542 # Always restore stdout, even if KeyboardInterrupt or other exceptions occur 543 sys.stdout = save_stdout
Create a dummy file to suppress output.
This function creates a dummy file to suppress output.
Yields
None
546def step_count( 547 exp_settings: dict, 548 yml_format: tbt.YMLFormatVersion, 549): 550 """ 551 Determine the maximum step number from a settings dictionary. 552 553 This function determines the maximum step number from a settings dictionary, as specified by the step_number_key. 554 555 Parameters 556 ---------- 557 exp_settings : dict 558 The experiment settings dictionary. 559 yml_format : tbt.YMLFormatVersion 560 The YAML format version. 561 562 Returns 563 ------- 564 int 565 The maximum step number. 566 567 Raises 568 ------ 569 ValueError 570 If the number of steps found does not match the expected step count. 571 """ 572 573 step_number_key = yml_format.step_number_key 574 non_step_sections = yml_format.non_step_section_count 575 576 # make sure dict from yml has correct section count 577 # (steps should all be in one section) 578 total_sections = len(exp_settings) 579 if total_sections != non_step_sections + 1: 580 raise ValueError( 581 f"Invalid .yml file, {total_sections} sections were found but the input .yml should have {non_step_sections + 1} sections. Please verify that all top-level keys in the .yml have unique strings and that all steps are contained in a single top-level section." 582 ) 583 584 expected_step_count = exp_settings[yml_format.general_section_key][ 585 yml_format.step_count_key 586 ] 587 588 found_step_count = 0 589 while True: 590 try: 591 nested_dictionary_location( 592 d=exp_settings, 593 key=step_number_key, 594 value=found_step_count + 1, 595 ) 596 except KeyError: 597 break 598 found_step_count += 1 599 600 # validate number of steps found with steps read by YAML loader 601 # TODO YAML safeloader will ignore duplicate top level keys, so this check relies on unique step numbers in ascending order (no gaps) to be found. 602 603 if expected_step_count != found_step_count: 604 raise ValueError( 605 f"Invalid .yml file, {found_step_count} steps were found but the input .yml should have {expected_step_count} steps from the general setting key '{yml_format.step_count_key}' within the '{yml_format.general_section_key}' section. Please verify that all step_name keys in the .yml have unique strings and that step numbers are continuously-increasing positive integers starting at 1." 606 ) 607 608 return found_step_count
Determine the maximum step number from a settings dictionary.
This function determines the maximum step number from a settings dictionary, as specified by the step_number_key.
Parameters
exp_settings : dict The experiment settings dictionary. yml_format : tbt.YMLFormatVersion The YAML format version.
Returns
int The maximum step number.
Raises
ValueError If the number of steps found does not match the expected step count.
611def step_settings( 612 exp_settings: dict, 613 step_number_key: str, 614 step_number_val: int, 615 yml_format: tbt.YMLFormatVersion, 616) -> Tuple[str, dict]: 617 """ 618 Grab specific step settings from an experimental dictionary and return them as a dictionary along with the user-defined step name. 619 620 Parameters 621 ---------- 622 exp_settings : dict 623 The experiment settings dictionary. 624 step_number_key : str 625 The key for the step number. 626 step_number_val : int 627 The value for the step number. 628 yml_format : tbt.YMLFormatVersion 629 The YAML format version. 630 631 Returns 632 ------- 633 Tuple[str, dict] 634 The step name and the step settings dictionary. 635 """ 636 637 nested_locations = nested_dictionary_location( 638 d=exp_settings, 639 key=step_number_key, 640 value=step_number_val, 641 ) 642 ### top level dictionary key name is first index, need key name nested within it (second level, index = 1) 643 step_name = nested_locations[1] 644 step_section_key = yml_format.step_section_key 645 return step_name, exp_settings[step_section_key][step_name]
Grab specific step settings from an experimental dictionary and return them as a dictionary along with the user-defined step name.
Parameters
exp_settings : dict The experiment settings dictionary. step_number_key : str The key for the step number. step_number_val : int The value for the step number. yml_format : tbt.YMLFormatVersion The YAML format version.
Returns
Tuple[str, dict] The step name and the step settings dictionary.
648def valid_microscope_connection(host: str, port: str) -> bool: 649 """ 650 Determine if a microscope connection can be made. 651 652 This function checks if a microscope connection can be made and disconnects if a connection can be made. 653 654 Parameters 655 ---------- 656 host : str 657 The connection host. 658 port : str 659 The connection port. 660 661 Returns 662 ------- 663 bool 664 True if the connection can be made, False otherwise. 665 """ 666 microscope = tbt.Microscope() 667 if connect_microscope( 668 microscope=microscope, 669 quiet_output=True, 670 connection_host=host, 671 connection_port=port, 672 ): 673 if disconnect_microscope( 674 microscope=microscope, 675 quiet_output=True, 676 ): 677 return True 678 return False
Determine if a microscope connection can be made.
This function checks if a microscope connection can be made and disconnects if a connection can be made.
Parameters
host : str The connection host. port : str The connection port.
Returns
bool True if the connection can be made, False otherwise.
681def enable_external_device(oem: tbt.ExternalDeviceOEM) -> bool: 682 """ 683 Determine whether to enable external device control. 684 685 This function checks if the external device control should be enabled based on the OEM. 686 687 Parameters 688 ---------- 689 oem : tbt.ExternalDeviceOEM 690 The OEM of the external device. 691 692 Returns 693 ------- 694 bool 695 True if the external device control should be enabled, False otherwise. 696 697 Raises 698 ------ 699 NotImplementedError 700 If the OEM type is unsupported. 701 """ 702 if not isinstance(oem, tbt.ExternalDeviceOEM): 703 raise NotImplementedError( 704 f"Unsupported type of {type(oem)}, only 'ExternalDeviceOEM' types are supported." 705 ) 706 if oem != tbt.ExternalDeviceOEM.NONE: 707 return True 708 return False
Determine whether to enable external device control.
This function checks if the external device control should be enabled based on the OEM.
Parameters
oem : tbt.ExternalDeviceOEM The OEM of the external device.
Returns
bool True if the external device control should be enabled, False otherwise.
Raises
NotImplementedError If the OEM type is unsupported.
711def valid_enum_entry(obj: Any, check_type: Enum) -> bool: 712 """ 713 Determine if an object is a member of an Enum class. 714 715 This function checks if an object is a member of an Enum class. 716 717 Parameters 718 ---------- 719 obj : Any 720 The object to check. 721 check_type : Enum 722 The Enum class to check against. 723 724 Returns 725 ------- 726 bool 727 True if the object is a member of the Enum class, False otherwise. 728 """ 729 return obj in check_type._value2member_map_
Determine if an object is a member of an Enum class.
This function checks if an object is a member of an Enum class.
Parameters
obj : Any The object to check. check_type : Enum The Enum class to check against.
Returns
bool True if the object is a member of the Enum class, False otherwise.
732def yml_format(version: float) -> tbt.YMLFormatVersion: 733 """ 734 Return the YML file format for a given version. 735 736 This function returns the YML file format for a given version. 737 738 Parameters 739 ---------- 740 version : float 741 The version of the YML file. 742 743 Returns 744 ------- 745 tbt.YMLFormatVersion 746 The YML file format for the given version. 747 748 Raises 749 ------ 750 NotImplementedError 751 If the YML file version is unsupported. 752 """ 753 supported_versions = [file.version for file in tbt.YMLFormatVersion] 754 if not version in supported_versions: 755 raise NotImplementedError( 756 f'Unsupported YML file version for version "{version}". Valid formats include: {[i.value for i in tbt.YMLFormatVersion]}' 757 ) 758 yml_file_idx = supported_versions.index(version) 759 yml_format = list(tbt.YMLFormatVersion)[yml_file_idx] 760 return yml_format
Return the YML file format for a given version.
This function returns the YML file format for a given version.
Parameters
version : float The version of the YML file.
Returns
tbt.YMLFormatVersion The YML file format for the given version.
Raises
NotImplementedError If the YML file version is unsupported.
763def yml_to_dict( 764 *, yml_path_file: Path, version: float, required_keys: Tuple[str, ...] 765) -> Dict: 766 """ 767 Convert a YAML file to a dictionary. 768 769 This function reads a YAML file and returns the result as a dictionary. 770 771 Parameters 772 ---------- 773 yml_path_file : Path 774 The fully pathed location to the input file. 775 version : float 776 The version of the YAML file in x.y format. 777 required_keys : Tuple[str, ...] 778 The key(s) that must be in the YAML file for conversion to a dictionary to occur. 779 780 Returns 781 ------- 782 dict 783 The YAML file represented as a dictionary. 784 785 Raises 786 ------ 787 TypeError 788 If the file type is unsupported. 789 OSError 790 If the YAML file cannot be opened or decoded. 791 KeyError 792 If the required keys are not found in the YAML file. 793 ValueError 794 If the version specified in the file does not match the requested version. 795 """ 796 797 # Compared to the lower() method, the casefold() method is stronger. 798 # It will convert more characters into lower case, and will find more 799 # matches on comparison of two strings that are both are converted 800 # using the casefold() method. 801 file_type = yml_path_file.suffix.casefold() 802 803 supported_types = (".yaml", ".yml") 804 805 if file_type not in supported_types: 806 raise TypeError("Only file types .yaml, and .yml are supported.") 807 808 try: 809 with open(file=yml_path_file, mode="r", encoding="utf-8") as stream: 810 # See deprecation warning for plain yaml.load(input) at 811 # https://github.com/yaml/pyyaml/wiki/PyYAML-yaml.load(input)-Deprecation 812 db = yaml.load(stream, Loader=yaml.SafeLoader) 813 except yaml.YAMLError as error: 814 print(f"Error with YAML file: {error}") 815 # print(f"Could not open: {self.self.path_file_in}") 816 print(f"Could not open or decode: {yml_path_file}") 817 # raise yaml.YAMLError 818 raise OSError from error 819 820 # check keys found in input file against required keys 821 found_keys = tuple(db.keys()) 822 keys_exist = tuple(map(lambda x: x in found_keys, required_keys)) 823 has_required_keys = all(keys_exist) 824 if not has_required_keys: 825 raise KeyError(f"Input files must have these keys defined: {required_keys}") 826 827 version_specified = db["config_file_version"] 828 version_requested = version 829 830 if version_specified != version_requested: 831 ee = f"Version mismatch: specified in file was {version_specified}," 832 ee += f"requested is {version_requested}" 833 raise ValueError(ee) 834 835 return db
Convert a YAML file to a dictionary.
This function reads a YAML file and returns the result as a dictionary.
Parameters
yml_path_file : Path The fully pathed location to the input file. version : float The version of the YAML file in x.y format. required_keys : Tuple[str, ...] The key(s) that must be in the YAML file for conversion to a dictionary to occur.
Returns
dict The YAML file represented as a dictionary.
Raises
TypeError If the file type is unsupported. OSError If the YAML file cannot be opened or decoded. KeyError If the required keys are not found in the YAML file. ValueError If the version specified in the file does not match the requested version.
838def yml_version( 839 file: Path, 840 key_name="config_file_version", 841) -> float: 842 """ 843 Return the version of a YAML file if the proper key exists. 844 845 Parameters 846 ---------- 847 file : Path 848 The path to the YAML file. 849 key_name : str, optional 850 The key name for the version in the YAML file (default is "config_file_version"). 851 852 Returns 853 ------- 854 float 855 The version of the YAML file. 856 857 Raises 858 ------ 859 KeyError 860 If the version key is not found in the YAML file. 861 ValueError 862 If the version value is not a valid float. 863 """ 864 with open(file, "r") as stream: 865 data = yaml.load(stream, Loader=yaml.SafeLoader) 866 867 try: 868 version = data[key_name] 869 except KeyError: 870 # print(f"Error with version key: {error}") 871 raise KeyError(f"Error with version key, '{key_name}' key not found in {file}.") 872 try: 873 version = float(version) 874 except ValueError: 875 raise ValueError( 876 f"Could not find valid version in {file} for key {key_name}, found '{version}' which is not a float." 877 ) 878 return version
Return the version of a YAML file if the proper key exists.
Parameters
file : Path The path to the YAML file. key_name : str, optional The key name for the version in the YAML file (default is "config_file_version").
Returns
float The version of the YAML file.
Raises
KeyError If the version key is not found in the YAML file. ValueError If the version value is not a valid float.
881def yes_no(question): 882 """ 883 Simple Yes/No function. 884 885 Parameters 886 ---------- 887 question : str 888 The question to ask the user. 889 890 Returns 891 ------- 892 bool 893 True if the user answers "yes", False otherwise. 894 """ 895 prompt = f"{question} (y/n): " 896 ans = input(prompt).strip().lower() 897 if ans not in ["y", "n"]: 898 print(f"{ans} is invalid, please try again...") 899 return yes_no(question) 900 if ans == "y": 901 return True 902 return False
Simple Yes/No function.
Parameters
question : str The question to ask the user.
Returns
bool True if the user answers "yes", False otherwise.
905def remove_directory(directory: Path): 906 """ 907 Recursively remove a directory. 908 909 Parameters 910 ---------- 911 directory : Path 912 The path to the directory to remove. 913 """ 914 shutil.rmtree(directory)
Recursively remove a directory.
Parameters
directory : Path The path to the directory to remove.
917def split_list(data: List, chunk_size: int) -> List: 918 """ 919 Split a list into equal-sized chunks. 920 921 Parameters 922 ---------- 923 data : List 924 The list to split. 925 chunk_size : int 926 The size of each chunk. 927 928 Returns 929 ------- 930 List 931 A list of chunks. 932 """ 933 result = [] 934 for i in range(0, len(data), chunk_size): 935 result.append(data[i : i + chunk_size]) 936 return result
Split a list into equal-sized chunks.
Parameters
data : List The list to split. chunk_size : int The size of each chunk.
Returns
List A list of chunks.
939def tabular_list( 940 data: List, 941 num_columns: int = Constants.default_column_count, 942 column_width: int = Constants.default_column_width, 943) -> str: 944 """ 945 Format a list into a tabular string. 946 947 Parameters 948 ---------- 949 data : List 950 The list to format. 951 num_columns : int, optional 952 The number of columns in the table (default is Constants.default_column_count). 953 column_width : int, optional 954 The width of each column in the table (default is Constants.default_column_width). 955 956 Returns 957 ------- 958 str 959 The formatted tabular string. 960 """ 961 rows = split_list(data, chunk_size=num_columns) 962 result = "" 963 for sublist in rows: 964 result += "\n" 965 for item in sublist: 966 result += f"{item:^{column_width}}" 967 return result
Format a list into a tabular string.
Parameters
data : List The list to format. num_columns : int, optional The number of columns in the table (default is Constants.default_column_count). column_width : int, optional The width of each column in the table (default is Constants.default_column_width).
Returns
str The formatted tabular string.
973def hardware_movement(func): 974 """ 975 Decorator to run a function only when hardware testing is enabled. 976 977 Parameters 978 ---------- 979 func : function 980 The function to decorate. 981 982 Returns 983 ------- 984 function 985 The decorated function. 986 """ 987 988 @run_on_microscope_machine 989 def wrapper_func(): 990 if not Constants.test_hardware_movement: 991 pytest.skip("Run only when hardware testing is enabled") 992 func() 993 994 return wrapper_func
Decorator to run a function only when hardware testing is enabled.
Parameters
func : function The function to decorate.
Returns
function The decorated function.
997def run_on_standalone_machine(func): 998 """ 999 Decorator to run a function only on a standalone machine. 1000 1001 Parameters 1002 ---------- 1003 func : function 1004 The function to decorate. 1005 1006 Returns 1007 ------- 1008 function 1009 The decorated function. 1010 """ 1011 1012 def wrapper_func(): 1013 current_machine = platform.uname().node.lower() 1014 test_machines = [machine.lower() for machine in Constants().offline_machines] 1015 if current_machine not in test_machines: 1016 pytest.skip("Run on Offline License Machine Only.") 1017 func() 1018 1019 return wrapper_func
Decorator to run a function only on a standalone machine.
Parameters
func : function The function to decorate.
Returns
function The decorated function.
1022def run_on_microscope_machine(func): 1023 """ 1024 Decorator to run a function only on a microscope machine. 1025 1026 Parameters 1027 ---------- 1028 func : function 1029 The function to decorate. 1030 1031 Returns 1032 ------- 1033 function 1034 The decorated function. 1035 """ 1036 1037 def wrapper_func(): 1038 current_machine = platform.uname().node.lower() 1039 test_machines = [machine.lower() for machine in Constants().microscope_machines] 1040 if current_machine not in test_machines: 1041 pytest.skip("Run on Microscope Machine Only.") 1042 func() 1043 1044 return wrapper_func
Decorator to run a function only on a microscope machine.
Parameters
func : function The function to decorate.
Returns
function The decorated function.