pytribeam.utilities
Utilities Module
This module contains various utility functions and decorators for managing and controlling the microscope, handling YAML files, and performing other common tasks.
Functions
beam_type(beam) -> property Return the beam property object as ion and electron beams have the same internal hierarchy.
connect_microscope(microscope: tbt.Microscope, quiet_output: bool = True, connection_host: str = None, connection_port: int = None) -> bool Connect to the microscope with the option to suppress printout.
dict_to_yml(db: dict, file_path: Path) -> Path Convert a dictionary to a YAML file.
disconnect_microscope(microscope: tbt.Microscope, quiet_output: bool = True) -> bool Disconnect from the microscope with the option to suppress printout.
general_settings(exp_settings: dict, yml_format: tbt.YMLFormat) -> dict Grab general experiment settings from a .yml file and return them as a dictionary.
step_type(settings: dict, yml_format: tbt.YMLFormat) -> tbt.StepType Determine the step type for a specific step settings dictionary.
in_interval(val: float, limit: tbt.Limit, type: tbt.IntervalType) -> bool Test whether a value is within an interval, with the interval type defined by an enumerated IntervalType.
gen_dict_extract(key, var) Extract values from a nested dictionary by key.
nested_dictionary_location(d: dict, key: str, value: Any) -> List[str] Find the nested location of a key-value pair in a dictionary.
nested_find_key_value_pair(d: dict, key: str, value: Any) -> List[str] Find a key-value pair in a nested dictionary.
_flatten(dictionary: dict) -> dict Flatten a dictionary using pandas.
none_value_dictionary(dictionary: dict) -> bool Check if all values in a dictionary are None.
nostdout() Create a dummy file to suppress output.
step_count(exp_settings: dict, yml_format: tbt.YMLFormatVersion) -> int Determine the maximum step number from a settings dictionary.
step_settings(exp_settings: dict, step_number_key: str, step_number_val: int, yml_format: tbt.YMLFormatVersion) -> Tuple[str, dict] Grab specific step settings from an experimental dictionary and return them as a dictionary along with the user-defined step name.
valid_microscope_connection(host: str, port: str) -> bool Determine if a microscope connection can be made.
enable_external_device(oem: tbt.ExternalDeviceOEM) -> bool Determine whether to enable external device control.
valid_enum_entry(obj: Any, check_type: Enum) -> bool Determine if an object is a member of an Enum class.
yml_format(version: float) -> tbt.YMLFormatVersion Return the YML file format for a given version.
yml_to_dict(*, yml_path_file: Path, version: float, required_keys: Tuple[str, ...]) -> Dict Convert a YAML file to a dictionary.
yml_version(file: Path, key_name="config_file_version") -> float Return the version of a YAML file if the proper key exists.
yes_no(question) -> bool Simple Yes/No function.
remove_directory(directory: Path) Recursively remove a directory.
split_list(data: List, chunk_size: int) -> List Split a list into equal-sized chunks.
tabular_list(data: List, num_columns: int = Constants.default_column_count, column_width: int = Constants.default_column_width) -> str Format a list into a tabular string.
Decorators
hardware_movement(func) Decorator to run a function only when hardware testing is enabled.
run_on_standalone_machine(func) Decorator to run a function only on a standalone machine.
run_on_microscope_machine(func) Decorator to run a function only on a microscope machine.
1#!/usr/bin/python3 2""" 3Utilities Module 4================ 5 6This module contains various utility functions and decorators for managing and controlling the microscope, handling YAML files, and performing other common tasks. 7 8Functions 9--------- 10beam_type(beam) -> property 11 Return the beam property object as ion and electron beams have the same internal hierarchy. 12 13connect_microscope(microscope: tbt.Microscope, quiet_output: bool = True, connection_host: str = None, connection_port: int = None) -> bool 14 Connect to the microscope with the option to suppress printout. 15 16dict_to_yml(db: dict, file_path: Path) -> Path 17 Convert a dictionary to a YAML file. 18 19disconnect_microscope(microscope: tbt.Microscope, quiet_output: bool = True) -> bool 20 Disconnect from the microscope with the option to suppress printout. 21 22general_settings(exp_settings: dict, yml_format: tbt.YMLFormat) -> dict 23 Grab general experiment settings from a .yml file and return them as a dictionary. 24 25step_type(settings: dict, yml_format: tbt.YMLFormat) -> tbt.StepType 26 Determine the step type for a specific step settings dictionary. 27 28in_interval(val: float, limit: tbt.Limit, type: tbt.IntervalType) -> bool 29 Test whether a value is within an interval, with the interval type defined by an enumerated IntervalType. 30 31gen_dict_extract(key, var) 32 Extract values from a nested dictionary by key. 33 34nested_dictionary_location(d: dict, key: str, value: Any) -> List[str] 35 Find the nested location of a key-value pair in a dictionary. 36 37nested_find_key_value_pair(d: dict, key: str, value: Any) -> List[str] 38 Find a key-value pair in a nested dictionary. 39 40_flatten(dictionary: dict) -> dict 41 Flatten a dictionary using pandas. 42 43none_value_dictionary(dictionary: dict) -> bool 44 Check if all values in a dictionary are None. 45 46nostdout() 47 Create a dummy file to suppress output. 48 49step_count(exp_settings: dict, yml_format: tbt.YMLFormatVersion) -> int 50 Determine the maximum step number from a settings dictionary. 51 52step_settings(exp_settings: dict, step_number_key: str, step_number_val: int, yml_format: tbt.YMLFormatVersion) -> Tuple[str, dict] 53 Grab specific step settings from an experimental dictionary and return them as a dictionary along with the user-defined step name. 54 55valid_microscope_connection(host: str, port: str) -> bool 56 Determine if a microscope connection can be made. 57 58enable_external_device(oem: tbt.ExternalDeviceOEM) -> bool 59 Determine whether to enable external device control. 60 61valid_enum_entry(obj: Any, check_type: Enum) -> bool 62 Determine if an object is a member of an Enum class. 63 64yml_format(version: float) -> tbt.YMLFormatVersion 65 Return the YML file format for a given version. 66 67yml_to_dict(*, yml_path_file: Path, version: float, required_keys: Tuple[str, ...]) -> Dict 68 Convert a YAML file to a dictionary. 69 70yml_version(file: Path, key_name="config_file_version") -> float 71 Return the version of a YAML file if the proper key exists. 72 73yes_no(question) -> bool 74 Simple Yes/No function. 75 76remove_directory(directory: Path) 77 Recursively remove a directory. 78 79split_list(data: List, chunk_size: int) -> List 80 Split a list into equal-sized chunks. 81 82tabular_list(data: List, num_columns: int = Constants.default_column_count, column_width: int = Constants.default_column_width) -> str 83 Format a list into a tabular string. 84 85Decorators 86---------- 87hardware_movement(func) 88 Decorator to run a function only when hardware testing is enabled. 89 90run_on_standalone_machine(func) 91 Decorator to run a function only on a standalone machine. 92 93run_on_microscope_machine(func) 94 Decorator to run a function only on a microscope machine. 95""" 96 97# Default python modules 98import os 99from pathlib import Path 100import time 101import warnings 102import math 103from typing import Dict, NamedTuple, Tuple, Any, List 104from enum import Enum, IntEnum 105import platform 106import pytest 107from functools import singledispatch 108import shutil 109 110# # Autoscript modules 111import yaml 112import contextlib 113import sys 114from pandas import json_normalize 115 116# # # 3rd party module 117# from schema import Schema, And, Use, Optional, SchemaError 118 119# # Local scripts 120import pytribeam.types as tbt 121 122# import pytribeam.constants as cs 123from pytribeam.constants import Constants 124 125 126@singledispatch 127def beam_type(beam) -> property: 128 """ 129 Return the beam property object as ion and electron beams have the same internal hierarchy. 130 131 Parameters 132 ---------- 133 beam : Any 134 The beam object. 135 136 Returns 137 ------- 138 property 139 The beam property object. 140 141 Raises 142 ------ 143 NotImplementedError 144 If the beam type is not implemented. 145 """ 146 _ = beam # no operation 147 raise NotImplementedError() 148 149 150@beam_type.register 151def _(beam: tbt.ElectronBeam, microscope: tbt.Microscope) -> property: 152 """ 153 Return the electron beam property object. 154 155 Parameters 156 ---------- 157 beam : tbt.ElectronBeam 158 The electron beam object. 159 microscope : tbt.Microscope 160 The microscope object. 161 162 Returns 163 ------- 164 property 165 The electron beam property object. 166 """ 167 return microscope.beams.electron_beam 168 169 170@beam_type.register 171def _(beam: tbt.IonBeam, microscope: tbt.Microscope) -> property: 172 """ 173 Return the ion beam property object. 174 175 Parameters 176 ---------- 177 beam : tbt.IonBeam 178 The ion beam object. 179 microscope : tbt.Microscope 180 The microscope object. 181 182 Returns 183 ------- 184 property 185 The ion beam property object. 186 """ 187 return microscope.beams.ion_beam 188 189 190def connect_microscope( 191 microscope: tbt.Microscope, 192 quiet_output: bool = True, 193 connection_host: str = None, 194 connection_port: int = None, 195): 196 """ 197 Connect to the microscope with the option to suppress printout. 198 199 Parameters 200 ---------- 201 microscope : tbt.Microscope 202 The microscope object to connect. 203 quiet_output : bool, optional 204 Whether to suppress printout (default is True). 205 connection_host : str, optional 206 The connection host (default is None). 207 connection_port : int, optional 208 The connection port (default is None). 209 210 Returns 211 ------- 212 bool 213 True if the connection is successful. 214 215 Raises 216 ------ 217 ConnectionError 218 If the connection fails. 219 """ 220 221 # TODO clean up inner function 222 def connect( 223 microscope: tbt.Microscope, 224 connection_host: str = None, 225 connection_port: int = None, 226 ) -> bool: 227 if connection_port is not None: 228 microscope.connect(connection_host, connection_port) 229 elif connection_host is not None: 230 microscope.connect(connection_host) 231 else: 232 microscope.connect() 233 234 if quiet_output: 235 with nostdout(): 236 connect( 237 microscope=microscope, 238 connection_host=connection_host, 239 connection_port=connection_port, 240 ) 241 else: 242 connect( 243 microscope=microscope, 244 connection_host=connection_host, 245 connection_port=connection_port, 246 ) 247 248 if microscope.server_host is not None: 249 return True 250 else: 251 raise ConnectionError( 252 f"Connection failed with connection_host of '{connection_host}' and connection_port of '{connection_port}' microscope not connected." 253 ) 254 255 256def dict_to_yml(db: dict, file_path: Path) -> Path: 257 """ 258 Convert a dictionary to a YAML file. 259 260 Parameters 261 ---------- 262 db : dict 263 The dictionary to convert. 264 file_path : Path 265 The path to save the YAML file. 266 267 Returns 268 ------- 269 Path 270 The path to the saved YAML file. 271 """ 272 with open(file_path, "w", encoding="utf-8") as out_file: 273 yaml.dump( 274 db, 275 out_file, 276 default_flow_style=False, 277 sort_keys=False, 278 ) 279 280 return file_path 281 282 283def disconnect_microscope( 284 microscope: tbt.Microscope, 285 quiet_output: bool = True, 286): 287 """ 288 Disconnect from the microscope with the option to suppress printout. 289 290 Parameters 291 ---------- 292 microscope : tbt.Microscope 293 The microscope object to disconnect. 294 quiet_output : bool, optional 295 Whether to suppress printout (default is True). 296 297 Returns 298 ------- 299 bool 300 True if the disconnection is successful. 301 302 Raises 303 ------ 304 ConnectionError 305 If the disconnection fails. 306 """ 307 if quiet_output: 308 with nostdout(): 309 microscope.disconnect() 310 else: 311 microscope.disconnect() 312 313 if microscope.server_host is None: 314 return True 315 else: 316 raise ConnectionError("Disconnection failed, microscope still connected") 317 318 319def general_settings(exp_settings: dict, yml_format: tbt.YMLFormat) -> dict: 320 """ 321 Grab general experiment settings from a .yml file and return them as a dictionary. 322 323 Parameters 324 ---------- 325 exp_settings : dict 326 The experiment settings dictionary. 327 yml_format : tbt.YMLFormat 328 The YAML format version. 329 330 Returns 331 ------- 332 dict 333 The general experiment settings as a dictionary. 334 """ 335 general_key = yml_format.general_section_key 336 return exp_settings[general_key] 337 338 339def step_type(settings: dict, yml_format: tbt.YMLFormat) -> tbt.StepType: 340 """ 341 Determine the step type for a specific step settings dictionary. 342 343 Parameters 344 ---------- 345 settings : dict 346 The step settings dictionary. 347 yml_format : tbt.YMLFormat 348 The YAML format version. 349 350 Returns 351 ------- 352 tbt.StepType 353 The step type. 354 """ 355 step_type = tbt.StepType( 356 settings[yml_format.step_general_key][yml_format.step_type_key] 357 ) 358 359 return step_type 360 361 362def in_interval(val: float, limit: tbt.Limit, type: tbt.IntervalType) -> bool: 363 """ 364 Test whether a value is within an interval, with the interval type defined by an enumerated IntervalType. 365 366 Parameters 367 ---------- 368 val : float 369 The input value to be compared against min and max. 370 limit : tbt.Limit 371 The bounds of the interval. 372 type : tbt.IntervalType 373 The type of interval. 374 375 Returns 376 ------- 377 bool 378 True if within the interval, False otherwise. 379 """ 380 if type == tbt.IntervalType.OPEN: 381 return (val > limit.min) and (val < limit.max) 382 if type == tbt.IntervalType.CLOSED: 383 return (val >= limit.min) and (val <= limit.max) 384 if type == tbt.IntervalType.LEFT_OPEN: 385 return (val > limit.min) and (val <= limit.max) 386 if type == tbt.IntervalType.RIGHT_OPEN: 387 return (val >= limit.min) and (val < limit.max) 388 389 390def gen_dict_extract(key, var): 391 """ 392 Extract values from a nested dictionary by key. 393 394 Parameters 395 ---------- 396 key : str 397 The key to search for. 398 var : dict 399 The nested dictionary to search. 400 401 Yields 402 ------ 403 Any 404 The values associated with the specified key. 405 """ 406 if hasattr(var, "items"): 407 for k, v in var.items(): 408 if k == key: 409 yield v 410 if isinstance(v, dict): 411 for result in gen_dict_extract(key, v): 412 yield result 413 elif isinstance(v, list): 414 for d in v: 415 for result in gen_dict_extract(key, d): 416 yield result 417 418 419def nested_dictionary_location(d: dict, key: str, value: Any) -> List[str]: 420 """ 421 Find the nested location of a key-value pair in a dictionary. 422 423 This function returns a list of key values from the highest to the lowest level of nested dictionaries. 424 425 Parameters 426 ---------- 427 d : dict 428 The dictionary to search. 429 key : str 430 The key to search for. 431 value : Any 432 The value to search for. 433 434 Returns 435 ------- 436 List[str] 437 The nested location of the key-value pair. 438 439 Raises 440 ------ 441 KeyError 442 If the key-value pair is not found in the dictionary. 443 """ 444 nesting = nested_find_key_value_pair(d=d, key=key, value=value) 445 if nesting is None: 446 raise KeyError( 447 f'Key : value pair of "{key} : {value}" not found in the provided dictionary.' 448 ) 449 return nesting 450 451 452def nested_find_key_value_pair(d: dict, key: str, value: Any) -> List[str]: 453 """ 454 Find a key-value pair in a nested dictionary. 455 456 This function returns a list of key values from the highest to the lowest level of nested dictionaries. 457 458 Parameters 459 ---------- 460 d : dict 461 The dictionary to search. 462 key : str 463 The key to search for. 464 value : Any 465 The value to search for. 466 467 Returns 468 ------- 469 List[str] 470 The nested location of the key-value pair. 471 """ 472 for k, v in d.items(): 473 if k == key: 474 if v == value: 475 return [k] 476 if isinstance(v, dict): 477 p = nested_find_key_value_pair(v, key, value) 478 if p: 479 return [k] + p 480 481 482def _flatten(dictionary: dict) -> dict: 483 """ 484 Flatten a dictionary using pandas. 485 486 This function flattens a nested dictionary using pandas, which can be slow on large dictionaries. 487 From https://stackoverflow.com/questions/6027558/flatten-nested-dictionaries-compressing-keys 488 489 Parameters 490 ---------- 491 dictionary : dict 492 The dictionary to flatten. 493 494 Returns 495 ------- 496 dict 497 The flattened dictionary. 498 """ 499 data_frame = json_normalize(dictionary, sep="_") 500 db_flat = data_frame.to_dict(orient="records")[0] 501 return db_flat 502 503 504def none_value_dictionary(dictionary: dict) -> bool: 505 """ 506 Check if all values in a dictionary are None. 507 508 This function returns True if all values in the dictionary are None, and False otherwise. 509 510 Parameters 511 ---------- 512 dictionary : dict 513 The dictionary to check. 514 515 Returns 516 ------- 517 bool 518 True if all values in the dictionary are None, False otherwise. 519 """ 520 # flatten the dictionary first 521 db_flat = _flatten(dictionary) 522 return all([v is None for v in db_flat.values()]) 523 524 525@contextlib.contextmanager 526def nostdout(): 527 """ 528 Create a dummy file to suppress output. 529 530 This function creates a dummy file to suppress output. 531 532 Yields 533 ------ 534 None 535 """ 536 save_stdout = sys.stdout 537 sys.stdout = tbt.DummyFile() 538 yield 539 sys.stdout = save_stdout 540 541 542def step_count( 543 exp_settings: dict, 544 yml_format: tbt.YMLFormatVersion, 545): 546 """ 547 Determine the maximum step number from a settings dictionary. 548 549 This function determines the maximum step number from a settings dictionary, as specified by the step_number_key. 550 551 Parameters 552 ---------- 553 exp_settings : dict 554 The experiment settings dictionary. 555 yml_format : tbt.YMLFormatVersion 556 The YAML format version. 557 558 Returns 559 ------- 560 int 561 The maximum step number. 562 563 Raises 564 ------ 565 ValueError 566 If the number of steps found does not match the expected step count. 567 """ 568 569 step_number_key = yml_format.step_number_key 570 non_step_sections = yml_format.non_step_section_count 571 572 # make sure dict from yml has correct section count 573 # (steps should all be in one section) 574 total_sections = len(exp_settings) 575 if total_sections != non_step_sections + 1: 576 raise ValueError( 577 f"Invalid .yml file, {total_sections} sections were found but the input .yml should have {non_step_sections + 1} sections. Please verify that all top-level keys in the .yml have unique strings and that all steps are contained in a single top-level section." 578 ) 579 580 expected_step_count = exp_settings[yml_format.general_section_key][ 581 yml_format.step_count_key 582 ] 583 584 found_step_count = 0 585 while True: 586 try: 587 nested_dictionary_location( 588 d=exp_settings, 589 key=step_number_key, 590 value=found_step_count + 1, 591 ) 592 except KeyError: 593 break 594 found_step_count += 1 595 596 # validate number of steps found with steps read by YAML loader 597 # TODO YAML safeloader will ignore duplicate top level keys, so this check relies on unique step numbers in ascending order (no gaps) to be found. 598 599 if expected_step_count != found_step_count: 600 raise ValueError( 601 f"Invalid .yml file, {found_step_count} steps were found but the input .yml should have {expected_step_count} steps from the general setting key '{yml_format.step_count_key}' within the '{yml_format.general_section_key}' section. Please verify that all step_name keys in the .yml have unique strings and that step numbers are continuously-increasing positive integers starting at 1." 602 ) 603 604 return found_step_count 605 606 607def step_settings( 608 exp_settings: dict, 609 step_number_key: str, 610 step_number_val: int, 611 yml_format: tbt.YMLFormatVersion, 612) -> Tuple[str, dict]: 613 """ 614 Grab specific step settings from an experimental dictionary and return them as a dictionary along with the user-defined step name. 615 616 Parameters 617 ---------- 618 exp_settings : dict 619 The experiment settings dictionary. 620 step_number_key : str 621 The key for the step number. 622 step_number_val : int 623 The value for the step number. 624 yml_format : tbt.YMLFormatVersion 625 The YAML format version. 626 627 Returns 628 ------- 629 Tuple[str, dict] 630 The step name and the step settings dictionary. 631 """ 632 633 nested_locations = nested_dictionary_location( 634 d=exp_settings, 635 key=step_number_key, 636 value=step_number_val, 637 ) 638 ### top level dictionary key name is first index, need key name nested within it (second level, index = 1) 639 step_name = nested_locations[1] 640 step_section_key = yml_format.step_section_key 641 return step_name, exp_settings[step_section_key][step_name] 642 643 644def valid_microscope_connection(host: str, port: str) -> bool: 645 """ 646 Determine if a microscope connection can be made. 647 648 This function checks if a microscope connection can be made and disconnects if a connection can be made. 649 650 Parameters 651 ---------- 652 host : str 653 The connection host. 654 port : str 655 The connection port. 656 657 Returns 658 ------- 659 bool 660 True if the connection can be made, False otherwise. 661 """ 662 microscope = tbt.Microscope() 663 if connect_microscope( 664 microscope=microscope, 665 quiet_output=True, 666 connection_host=host, 667 connection_port=port, 668 ): 669 if disconnect_microscope( 670 microscope=microscope, 671 quiet_output=True, 672 ): 673 return True 674 return False 675 676 677def enable_external_device(oem: tbt.ExternalDeviceOEM) -> bool: 678 """ 679 Determine whether to enable external device control. 680 681 This function checks if the external device control should be enabled based on the OEM. 682 683 Parameters 684 ---------- 685 oem : tbt.ExternalDeviceOEM 686 The OEM of the external device. 687 688 Returns 689 ------- 690 bool 691 True if the external device control should be enabled, False otherwise. 692 693 Raises 694 ------ 695 NotImplementedError 696 If the OEM type is unsupported. 697 """ 698 if not isinstance(oem, tbt.ExternalDeviceOEM): 699 raise NotImplementedError( 700 f"Unsupported type of {type(oem)}, only 'ExternalDeviceOEM' types are supported." 701 ) 702 if oem != tbt.ExternalDeviceOEM.NONE: 703 return True 704 return False 705 706 707def valid_enum_entry(obj: Any, check_type: Enum) -> bool: 708 """ 709 Determine if an object is a member of an Enum class. 710 711 This function checks if an object is a member of an Enum class. 712 713 Parameters 714 ---------- 715 obj : Any 716 The object to check. 717 check_type : Enum 718 The Enum class to check against. 719 720 Returns 721 ------- 722 bool 723 True if the object is a member of the Enum class, False otherwise. 724 """ 725 return obj in check_type._value2member_map_ 726 727 728def yml_format(version: float) -> tbt.YMLFormatVersion: 729 """ 730 Return the YML file format for a given version. 731 732 This function returns the YML file format for a given version. 733 734 Parameters 735 ---------- 736 version : float 737 The version of the YML file. 738 739 Returns 740 ------- 741 tbt.YMLFormatVersion 742 The YML file format for the given version. 743 744 Raises 745 ------ 746 NotImplementedError 747 If the YML file version is unsupported. 748 """ 749 supported_versions = [file.version for file in tbt.YMLFormatVersion] 750 if not version in supported_versions: 751 raise NotImplementedError( 752 f'Unsupported YML file version for version "{version}". Valid formats include: {[i.value for i in tbt.YMLFormatVersion]}' 753 ) 754 yml_file_idx = supported_versions.index(version) 755 yml_format = list(tbt.YMLFormatVersion)[yml_file_idx] 756 return yml_format 757 758 759def yml_to_dict( 760 *, yml_path_file: Path, version: float, required_keys: Tuple[str, ...] 761) -> Dict: 762 """ 763 Convert a YAML file to a dictionary. 764 765 This function reads a YAML file and returns the result as a dictionary. 766 767 Parameters 768 ---------- 769 yml_path_file : Path 770 The fully pathed location to the input file. 771 version : float 772 The version of the YAML file in x.y format. 773 required_keys : Tuple[str, ...] 774 The key(s) that must be in the YAML file for conversion to a dictionary to occur. 775 776 Returns 777 ------- 778 dict 779 The YAML file represented as a dictionary. 780 781 Raises 782 ------ 783 TypeError 784 If the file type is unsupported. 785 OSError 786 If the YAML file cannot be opened or decoded. 787 KeyError 788 If the required keys are not found in the YAML file. 789 ValueError 790 If the version specified in the file does not match the requested version. 791 """ 792 793 # Compared to the lower() method, the casefold() method is stronger. 794 # It will convert more characters into lower case, and will find more 795 # matches on comparison of two strings that are both are converted 796 # using the casefold() method. 797 file_type = yml_path_file.suffix.casefold() 798 799 supported_types = (".yaml", ".yml") 800 801 if file_type not in supported_types: 802 raise TypeError("Only file types .yaml, and .yml are supported.") 803 804 try: 805 with open(file=yml_path_file, mode="r", encoding="utf-8") as stream: 806 # See deprecation warning for plain yaml.load(input) at 807 # https://github.com/yaml/pyyaml/wiki/PyYAML-yaml.load(input)-Deprecation 808 db = yaml.load(stream, Loader=yaml.SafeLoader) 809 except yaml.YAMLError as error: 810 print(f"Error with YAML file: {error}") 811 # print(f"Could not open: {self.self.path_file_in}") 812 print(f"Could not open or decode: {yml_path_file}") 813 # raise yaml.YAMLError 814 raise OSError from error 815 816 # check keys found in input file against required keys 817 found_keys = tuple(db.keys()) 818 keys_exist = tuple(map(lambda x: x in found_keys, required_keys)) 819 has_required_keys = all(keys_exist) 820 if not has_required_keys: 821 raise KeyError(f"Input files must have these keys defined: {required_keys}") 822 823 version_specified = db["config_file_version"] 824 version_requested = version 825 826 if version_specified != version_requested: 827 ee = f"Version mismatch: specified in file was {version_specified}," 828 ee += f"requested is {version_requested}" 829 raise ValueError(ee) 830 831 return db 832 833 834def yml_version( 835 file: Path, 836 key_name="config_file_version", 837) -> float: 838 """ 839 Return the version of a YAML file if the proper key exists. 840 841 Parameters 842 ---------- 843 file : Path 844 The path to the YAML file. 845 key_name : str, optional 846 The key name for the version in the YAML file (default is "config_file_version"). 847 848 Returns 849 ------- 850 float 851 The version of the YAML file. 852 853 Raises 854 ------ 855 KeyError 856 If the version key is not found in the YAML file. 857 ValueError 858 If the version value is not a valid float. 859 """ 860 with open(file, "r") as stream: 861 data = yaml.load(stream, Loader=yaml.SafeLoader) 862 863 try: 864 version = data[key_name] 865 except KeyError: 866 # print(f"Error with version key: {error}") 867 raise KeyError(f"Error with version key, '{key_name}' key not found in {file}.") 868 try: 869 version = float(version) 870 except ValueError: 871 raise ValueError( 872 f"Could not find valid version in {file} for key {key_name}, found '{version}' which is not a float." 873 ) 874 return version 875 876 877def yes_no(question): 878 """ 879 Simple Yes/No function. 880 881 Parameters 882 ---------- 883 question : str 884 The question to ask the user. 885 886 Returns 887 ------- 888 bool 889 True if the user answers "yes", False otherwise. 890 """ 891 prompt = f"{question} (y/n): " 892 ans = input(prompt).strip().lower() 893 if ans not in ["y", "n"]: 894 print(f"{ans} is invalid, please try again...") 895 return yes_no(question) 896 if ans == "y": 897 return True 898 return False 899 900 901def remove_directory(directory: Path): 902 """ 903 Recursively remove a directory. 904 905 Parameters 906 ---------- 907 directory : Path 908 The path to the directory to remove. 909 """ 910 shutil.rmtree(directory) 911 912 913def split_list(data: List, chunk_size: int) -> List: 914 """ 915 Split a list into equal-sized chunks. 916 917 Parameters 918 ---------- 919 data : List 920 The list to split. 921 chunk_size : int 922 The size of each chunk. 923 924 Returns 925 ------- 926 List 927 A list of chunks. 928 """ 929 result = [] 930 for i in range(0, len(data), chunk_size): 931 result.append(data[i : i + chunk_size]) 932 return result 933 934 935def tabular_list( 936 data: List, 937 num_columns: int = Constants.default_column_count, 938 column_width: int = Constants.default_column_width, 939) -> str: 940 """ 941 Format a list into a tabular string. 942 943 Parameters 944 ---------- 945 data : List 946 The list to format. 947 num_columns : int, optional 948 The number of columns in the table (default is Constants.default_column_count). 949 column_width : int, optional 950 The width of each column in the table (default is Constants.default_column_width). 951 952 Returns 953 ------- 954 str 955 The formatted tabular string. 956 """ 957 rows = split_list(data, chunk_size=num_columns) 958 result = "" 959 for sublist in rows: 960 result += "\n" 961 for item in sublist: 962 result += f"{item:^{column_width}}" 963 return result 964 965 966### Custom Decorators ### 967 968 969def hardware_movement(func): 970 """ 971 Decorator to run a function only when hardware testing is enabled. 972 973 Parameters 974 ---------- 975 func : function 976 The function to decorate. 977 978 Returns 979 ------- 980 function 981 The decorated function. 982 """ 983 984 @run_on_microscope_machine 985 def wrapper_func(): 986 if not Constants.test_hardware_movement: 987 pytest.skip("Run only when hardware testing is enabled") 988 func() 989 990 return wrapper_func 991 992 993def run_on_standalone_machine(func): 994 """ 995 Decorator to run a function only on a standalone machine. 996 997 Parameters 998 ---------- 999 func : function 1000 The function to decorate. 1001 1002 Returns 1003 ------- 1004 function 1005 The decorated function. 1006 """ 1007 1008 def wrapper_func(): 1009 current_machine = platform.uname().node.lower() 1010 test_machines = [machine.lower() for machine in Constants().offline_machines] 1011 if current_machine not in test_machines: 1012 pytest.skip("Run on Offline License Machine Only.") 1013 func() 1014 1015 return wrapper_func 1016 1017 1018def run_on_microscope_machine(func): 1019 """ 1020 Decorator to run a function only on a microscope machine. 1021 1022 Parameters 1023 ---------- 1024 func : function 1025 The function to decorate. 1026 1027 Returns 1028 ------- 1029 function 1030 The decorated function. 1031 """ 1032 1033 def wrapper_func(): 1034 current_machine = platform.uname().node.lower() 1035 test_machines = [machine.lower() for machine in Constants().microscope_machines] 1036 if current_machine not in test_machines: 1037 pytest.skip("Run on Microscope Machine Only.") 1038 func() 1039 1040 return wrapper_func
127@singledispatch 128def beam_type(beam) -> property: 129 """ 130 Return the beam property object as ion and electron beams have the same internal hierarchy. 131 132 Parameters 133 ---------- 134 beam : Any 135 The beam object. 136 137 Returns 138 ------- 139 property 140 The beam property object. 141 142 Raises 143 ------ 144 NotImplementedError 145 If the beam type is not implemented. 146 """ 147 _ = beam # no operation 148 raise NotImplementedError()
Return the beam property object as ion and electron beams have the same internal hierarchy.
Parameters
beam : Any The beam object.
Returns
property The beam property object.
Raises
NotImplementedError If the beam type is not implemented.
191def connect_microscope( 192 microscope: tbt.Microscope, 193 quiet_output: bool = True, 194 connection_host: str = None, 195 connection_port: int = None, 196): 197 """ 198 Connect to the microscope with the option to suppress printout. 199 200 Parameters 201 ---------- 202 microscope : tbt.Microscope 203 The microscope object to connect. 204 quiet_output : bool, optional 205 Whether to suppress printout (default is True). 206 connection_host : str, optional 207 The connection host (default is None). 208 connection_port : int, optional 209 The connection port (default is None). 210 211 Returns 212 ------- 213 bool 214 True if the connection is successful. 215 216 Raises 217 ------ 218 ConnectionError 219 If the connection fails. 220 """ 221 222 # TODO clean up inner function 223 def connect( 224 microscope: tbt.Microscope, 225 connection_host: str = None, 226 connection_port: int = None, 227 ) -> bool: 228 if connection_port is not None: 229 microscope.connect(connection_host, connection_port) 230 elif connection_host is not None: 231 microscope.connect(connection_host) 232 else: 233 microscope.connect() 234 235 if quiet_output: 236 with nostdout(): 237 connect( 238 microscope=microscope, 239 connection_host=connection_host, 240 connection_port=connection_port, 241 ) 242 else: 243 connect( 244 microscope=microscope, 245 connection_host=connection_host, 246 connection_port=connection_port, 247 ) 248 249 if microscope.server_host is not None: 250 return True 251 else: 252 raise ConnectionError( 253 f"Connection failed with connection_host of '{connection_host}' and connection_port of '{connection_port}' microscope not connected." 254 )
Connect to the microscope with the option to suppress printout.
Parameters
microscope : tbt.Microscope The microscope object to connect. quiet_output : bool, optional Whether to suppress printout (default is True). connection_host : str, optional The connection host (default is None). connection_port : int, optional The connection port (default is None).
Returns
bool True if the connection is successful.
Raises
ConnectionError If the connection fails.
257def dict_to_yml(db: dict, file_path: Path) -> Path: 258 """ 259 Convert a dictionary to a YAML file. 260 261 Parameters 262 ---------- 263 db : dict 264 The dictionary to convert. 265 file_path : Path 266 The path to save the YAML file. 267 268 Returns 269 ------- 270 Path 271 The path to the saved YAML file. 272 """ 273 with open(file_path, "w", encoding="utf-8") as out_file: 274 yaml.dump( 275 db, 276 out_file, 277 default_flow_style=False, 278 sort_keys=False, 279 ) 280 281 return file_path
Convert a dictionary to a YAML file.
Parameters
db : dict The dictionary to convert. file_path : Path The path to save the YAML file.
Returns
Path The path to the saved YAML file.
284def disconnect_microscope( 285 microscope: tbt.Microscope, 286 quiet_output: bool = True, 287): 288 """ 289 Disconnect from the microscope with the option to suppress printout. 290 291 Parameters 292 ---------- 293 microscope : tbt.Microscope 294 The microscope object to disconnect. 295 quiet_output : bool, optional 296 Whether to suppress printout (default is True). 297 298 Returns 299 ------- 300 bool 301 True if the disconnection is successful. 302 303 Raises 304 ------ 305 ConnectionError 306 If the disconnection fails. 307 """ 308 if quiet_output: 309 with nostdout(): 310 microscope.disconnect() 311 else: 312 microscope.disconnect() 313 314 if microscope.server_host is None: 315 return True 316 else: 317 raise ConnectionError("Disconnection failed, microscope still connected")
Disconnect from the microscope with the option to suppress printout.
Parameters
microscope : tbt.Microscope The microscope object to disconnect. quiet_output : bool, optional Whether to suppress printout (default is True).
Returns
bool True if the disconnection is successful.
Raises
ConnectionError If the disconnection fails.
320def general_settings(exp_settings: dict, yml_format: tbt.YMLFormat) -> dict: 321 """ 322 Grab general experiment settings from a .yml file and return them as a dictionary. 323 324 Parameters 325 ---------- 326 exp_settings : dict 327 The experiment settings dictionary. 328 yml_format : tbt.YMLFormat 329 The YAML format version. 330 331 Returns 332 ------- 333 dict 334 The general experiment settings as a dictionary. 335 """ 336 general_key = yml_format.general_section_key 337 return exp_settings[general_key]
Grab general experiment settings from a .yml file and return them as a dictionary.
Parameters
exp_settings : dict The experiment settings dictionary. yml_format : tbt.YMLFormat The YAML format version.
Returns
dict The general experiment settings as a dictionary.
340def step_type(settings: dict, yml_format: tbt.YMLFormat) -> tbt.StepType: 341 """ 342 Determine the step type for a specific step settings dictionary. 343 344 Parameters 345 ---------- 346 settings : dict 347 The step settings dictionary. 348 yml_format : tbt.YMLFormat 349 The YAML format version. 350 351 Returns 352 ------- 353 tbt.StepType 354 The step type. 355 """ 356 step_type = tbt.StepType( 357 settings[yml_format.step_general_key][yml_format.step_type_key] 358 ) 359 360 return step_type
Determine the step type for a specific step settings dictionary.
Parameters
settings : dict The step settings dictionary. yml_format : tbt.YMLFormat The YAML format version.
Returns
tbt.StepType The step type.
363def in_interval(val: float, limit: tbt.Limit, type: tbt.IntervalType) -> bool: 364 """ 365 Test whether a value is within an interval, with the interval type defined by an enumerated IntervalType. 366 367 Parameters 368 ---------- 369 val : float 370 The input value to be compared against min and max. 371 limit : tbt.Limit 372 The bounds of the interval. 373 type : tbt.IntervalType 374 The type of interval. 375 376 Returns 377 ------- 378 bool 379 True if within the interval, False otherwise. 380 """ 381 if type == tbt.IntervalType.OPEN: 382 return (val > limit.min) and (val < limit.max) 383 if type == tbt.IntervalType.CLOSED: 384 return (val >= limit.min) and (val <= limit.max) 385 if type == tbt.IntervalType.LEFT_OPEN: 386 return (val > limit.min) and (val <= limit.max) 387 if type == tbt.IntervalType.RIGHT_OPEN: 388 return (val >= limit.min) and (val < limit.max)
Test whether a value is within an interval, with the interval type defined by an enumerated IntervalType.
Parameters
val : float The input value to be compared against min and max. limit : tbt.Limit The bounds of the interval. type : tbt.IntervalType The type of interval.
Returns
bool True if within the interval, False otherwise.
391def gen_dict_extract(key, var): 392 """ 393 Extract values from a nested dictionary by key. 394 395 Parameters 396 ---------- 397 key : str 398 The key to search for. 399 var : dict 400 The nested dictionary to search. 401 402 Yields 403 ------ 404 Any 405 The values associated with the specified key. 406 """ 407 if hasattr(var, "items"): 408 for k, v in var.items(): 409 if k == key: 410 yield v 411 if isinstance(v, dict): 412 for result in gen_dict_extract(key, v): 413 yield result 414 elif isinstance(v, list): 415 for d in v: 416 for result in gen_dict_extract(key, d): 417 yield result
Extract values from a nested dictionary by key.
Parameters
key : str The key to search for. var : dict The nested dictionary to search.
Yields
Any The values associated with the specified key.
420def nested_dictionary_location(d: dict, key: str, value: Any) -> List[str]: 421 """ 422 Find the nested location of a key-value pair in a dictionary. 423 424 This function returns a list of key values from the highest to the lowest level of nested dictionaries. 425 426 Parameters 427 ---------- 428 d : dict 429 The dictionary to search. 430 key : str 431 The key to search for. 432 value : Any 433 The value to search for. 434 435 Returns 436 ------- 437 List[str] 438 The nested location of the key-value pair. 439 440 Raises 441 ------ 442 KeyError 443 If the key-value pair is not found in the dictionary. 444 """ 445 nesting = nested_find_key_value_pair(d=d, key=key, value=value) 446 if nesting is None: 447 raise KeyError( 448 f'Key : value pair of "{key} : {value}" not found in the provided dictionary.' 449 ) 450 return nesting
Find the nested location of a key-value pair in a dictionary.
This function returns a list of key values from the highest to the lowest level of nested dictionaries.
Parameters
d : dict The dictionary to search. key : str The key to search for. value : Any The value to search for.
Returns
List[str] The nested location of the key-value pair.
Raises
KeyError If the key-value pair is not found in the dictionary.
453def nested_find_key_value_pair(d: dict, key: str, value: Any) -> List[str]: 454 """ 455 Find a key-value pair in a nested dictionary. 456 457 This function returns a list of key values from the highest to the lowest level of nested dictionaries. 458 459 Parameters 460 ---------- 461 d : dict 462 The dictionary to search. 463 key : str 464 The key to search for. 465 value : Any 466 The value to search for. 467 468 Returns 469 ------- 470 List[str] 471 The nested location of the key-value pair. 472 """ 473 for k, v in d.items(): 474 if k == key: 475 if v == value: 476 return [k] 477 if isinstance(v, dict): 478 p = nested_find_key_value_pair(v, key, value) 479 if p: 480 return [k] + p
Find a key-value pair in a nested dictionary.
This function returns a list of key values from the highest to the lowest level of nested dictionaries.
Parameters
d : dict The dictionary to search. key : str The key to search for. value : Any The value to search for.
Returns
List[str] The nested location of the key-value pair.
505def none_value_dictionary(dictionary: dict) -> bool: 506 """ 507 Check if all values in a dictionary are None. 508 509 This function returns True if all values in the dictionary are None, and False otherwise. 510 511 Parameters 512 ---------- 513 dictionary : dict 514 The dictionary to check. 515 516 Returns 517 ------- 518 bool 519 True if all values in the dictionary are None, False otherwise. 520 """ 521 # flatten the dictionary first 522 db_flat = _flatten(dictionary) 523 return all([v is None for v in db_flat.values()])
Check if all values in a dictionary are None.
This function returns True if all values in the dictionary are None, and False otherwise.
Parameters
dictionary : dict The dictionary to check.
Returns
bool True if all values in the dictionary are None, False otherwise.
526@contextlib.contextmanager 527def nostdout(): 528 """ 529 Create a dummy file to suppress output. 530 531 This function creates a dummy file to suppress output. 532 533 Yields 534 ------ 535 None 536 """ 537 save_stdout = sys.stdout 538 sys.stdout = tbt.DummyFile() 539 yield 540 sys.stdout = save_stdout
Create a dummy file to suppress output.
This function creates a dummy file to suppress output.
Yields
None
543def step_count( 544 exp_settings: dict, 545 yml_format: tbt.YMLFormatVersion, 546): 547 """ 548 Determine the maximum step number from a settings dictionary. 549 550 This function determines the maximum step number from a settings dictionary, as specified by the step_number_key. 551 552 Parameters 553 ---------- 554 exp_settings : dict 555 The experiment settings dictionary. 556 yml_format : tbt.YMLFormatVersion 557 The YAML format version. 558 559 Returns 560 ------- 561 int 562 The maximum step number. 563 564 Raises 565 ------ 566 ValueError 567 If the number of steps found does not match the expected step count. 568 """ 569 570 step_number_key = yml_format.step_number_key 571 non_step_sections = yml_format.non_step_section_count 572 573 # make sure dict from yml has correct section count 574 # (steps should all be in one section) 575 total_sections = len(exp_settings) 576 if total_sections != non_step_sections + 1: 577 raise ValueError( 578 f"Invalid .yml file, {total_sections} sections were found but the input .yml should have {non_step_sections + 1} sections. Please verify that all top-level keys in the .yml have unique strings and that all steps are contained in a single top-level section." 579 ) 580 581 expected_step_count = exp_settings[yml_format.general_section_key][ 582 yml_format.step_count_key 583 ] 584 585 found_step_count = 0 586 while True: 587 try: 588 nested_dictionary_location( 589 d=exp_settings, 590 key=step_number_key, 591 value=found_step_count + 1, 592 ) 593 except KeyError: 594 break 595 found_step_count += 1 596 597 # validate number of steps found with steps read by YAML loader 598 # TODO YAML safeloader will ignore duplicate top level keys, so this check relies on unique step numbers in ascending order (no gaps) to be found. 599 600 if expected_step_count != found_step_count: 601 raise ValueError( 602 f"Invalid .yml file, {found_step_count} steps were found but the input .yml should have {expected_step_count} steps from the general setting key '{yml_format.step_count_key}' within the '{yml_format.general_section_key}' section. Please verify that all step_name keys in the .yml have unique strings and that step numbers are continuously-increasing positive integers starting at 1." 603 ) 604 605 return found_step_count
Determine the maximum step number from a settings dictionary.
This function determines the maximum step number from a settings dictionary, as specified by the step_number_key.
Parameters
exp_settings : dict The experiment settings dictionary. yml_format : tbt.YMLFormatVersion The YAML format version.
Returns
int The maximum step number.
Raises
ValueError If the number of steps found does not match the expected step count.
608def step_settings( 609 exp_settings: dict, 610 step_number_key: str, 611 step_number_val: int, 612 yml_format: tbt.YMLFormatVersion, 613) -> Tuple[str, dict]: 614 """ 615 Grab specific step settings from an experimental dictionary and return them as a dictionary along with the user-defined step name. 616 617 Parameters 618 ---------- 619 exp_settings : dict 620 The experiment settings dictionary. 621 step_number_key : str 622 The key for the step number. 623 step_number_val : int 624 The value for the step number. 625 yml_format : tbt.YMLFormatVersion 626 The YAML format version. 627 628 Returns 629 ------- 630 Tuple[str, dict] 631 The step name and the step settings dictionary. 632 """ 633 634 nested_locations = nested_dictionary_location( 635 d=exp_settings, 636 key=step_number_key, 637 value=step_number_val, 638 ) 639 ### top level dictionary key name is first index, need key name nested within it (second level, index = 1) 640 step_name = nested_locations[1] 641 step_section_key = yml_format.step_section_key 642 return step_name, exp_settings[step_section_key][step_name]
Grab specific step settings from an experimental dictionary and return them as a dictionary along with the user-defined step name.
Parameters
exp_settings : dict The experiment settings dictionary. step_number_key : str The key for the step number. step_number_val : int The value for the step number. yml_format : tbt.YMLFormatVersion The YAML format version.
Returns
Tuple[str, dict] The step name and the step settings dictionary.
645def valid_microscope_connection(host: str, port: str) -> bool: 646 """ 647 Determine if a microscope connection can be made. 648 649 This function checks if a microscope connection can be made and disconnects if a connection can be made. 650 651 Parameters 652 ---------- 653 host : str 654 The connection host. 655 port : str 656 The connection port. 657 658 Returns 659 ------- 660 bool 661 True if the connection can be made, False otherwise. 662 """ 663 microscope = tbt.Microscope() 664 if connect_microscope( 665 microscope=microscope, 666 quiet_output=True, 667 connection_host=host, 668 connection_port=port, 669 ): 670 if disconnect_microscope( 671 microscope=microscope, 672 quiet_output=True, 673 ): 674 return True 675 return False
Determine if a microscope connection can be made.
This function checks if a microscope connection can be made and disconnects if a connection can be made.
Parameters
host : str The connection host. port : str The connection port.
Returns
bool True if the connection can be made, False otherwise.
678def enable_external_device(oem: tbt.ExternalDeviceOEM) -> bool: 679 """ 680 Determine whether to enable external device control. 681 682 This function checks if the external device control should be enabled based on the OEM. 683 684 Parameters 685 ---------- 686 oem : tbt.ExternalDeviceOEM 687 The OEM of the external device. 688 689 Returns 690 ------- 691 bool 692 True if the external device control should be enabled, False otherwise. 693 694 Raises 695 ------ 696 NotImplementedError 697 If the OEM type is unsupported. 698 """ 699 if not isinstance(oem, tbt.ExternalDeviceOEM): 700 raise NotImplementedError( 701 f"Unsupported type of {type(oem)}, only 'ExternalDeviceOEM' types are supported." 702 ) 703 if oem != tbt.ExternalDeviceOEM.NONE: 704 return True 705 return False
Determine whether to enable external device control.
This function checks if the external device control should be enabled based on the OEM.
Parameters
oem : tbt.ExternalDeviceOEM The OEM of the external device.
Returns
bool True if the external device control should be enabled, False otherwise.
Raises
NotImplementedError If the OEM type is unsupported.
708def valid_enum_entry(obj: Any, check_type: Enum) -> bool: 709 """ 710 Determine if an object is a member of an Enum class. 711 712 This function checks if an object is a member of an Enum class. 713 714 Parameters 715 ---------- 716 obj : Any 717 The object to check. 718 check_type : Enum 719 The Enum class to check against. 720 721 Returns 722 ------- 723 bool 724 True if the object is a member of the Enum class, False otherwise. 725 """ 726 return obj in check_type._value2member_map_
Determine if an object is a member of an Enum class.
This function checks if an object is a member of an Enum class.
Parameters
obj : Any The object to check. check_type : Enum The Enum class to check against.
Returns
bool True if the object is a member of the Enum class, False otherwise.
729def yml_format(version: float) -> tbt.YMLFormatVersion: 730 """ 731 Return the YML file format for a given version. 732 733 This function returns the YML file format for a given version. 734 735 Parameters 736 ---------- 737 version : float 738 The version of the YML file. 739 740 Returns 741 ------- 742 tbt.YMLFormatVersion 743 The YML file format for the given version. 744 745 Raises 746 ------ 747 NotImplementedError 748 If the YML file version is unsupported. 749 """ 750 supported_versions = [file.version for file in tbt.YMLFormatVersion] 751 if not version in supported_versions: 752 raise NotImplementedError( 753 f'Unsupported YML file version for version "{version}". Valid formats include: {[i.value for i in tbt.YMLFormatVersion]}' 754 ) 755 yml_file_idx = supported_versions.index(version) 756 yml_format = list(tbt.YMLFormatVersion)[yml_file_idx] 757 return yml_format
Return the YML file format for a given version.
This function returns the YML file format for a given version.
Parameters
version : float The version of the YML file.
Returns
tbt.YMLFormatVersion The YML file format for the given version.
Raises
NotImplementedError If the YML file version is unsupported.
760def yml_to_dict( 761 *, yml_path_file: Path, version: float, required_keys: Tuple[str, ...] 762) -> Dict: 763 """ 764 Convert a YAML file to a dictionary. 765 766 This function reads a YAML file and returns the result as a dictionary. 767 768 Parameters 769 ---------- 770 yml_path_file : Path 771 The fully pathed location to the input file. 772 version : float 773 The version of the YAML file in x.y format. 774 required_keys : Tuple[str, ...] 775 The key(s) that must be in the YAML file for conversion to a dictionary to occur. 776 777 Returns 778 ------- 779 dict 780 The YAML file represented as a dictionary. 781 782 Raises 783 ------ 784 TypeError 785 If the file type is unsupported. 786 OSError 787 If the YAML file cannot be opened or decoded. 788 KeyError 789 If the required keys are not found in the YAML file. 790 ValueError 791 If the version specified in the file does not match the requested version. 792 """ 793 794 # Compared to the lower() method, the casefold() method is stronger. 795 # It will convert more characters into lower case, and will find more 796 # matches on comparison of two strings that are both are converted 797 # using the casefold() method. 798 file_type = yml_path_file.suffix.casefold() 799 800 supported_types = (".yaml", ".yml") 801 802 if file_type not in supported_types: 803 raise TypeError("Only file types .yaml, and .yml are supported.") 804 805 try: 806 with open(file=yml_path_file, mode="r", encoding="utf-8") as stream: 807 # See deprecation warning for plain yaml.load(input) at 808 # https://github.com/yaml/pyyaml/wiki/PyYAML-yaml.load(input)-Deprecation 809 db = yaml.load(stream, Loader=yaml.SafeLoader) 810 except yaml.YAMLError as error: 811 print(f"Error with YAML file: {error}") 812 # print(f"Could not open: {self.self.path_file_in}") 813 print(f"Could not open or decode: {yml_path_file}") 814 # raise yaml.YAMLError 815 raise OSError from error 816 817 # check keys found in input file against required keys 818 found_keys = tuple(db.keys()) 819 keys_exist = tuple(map(lambda x: x in found_keys, required_keys)) 820 has_required_keys = all(keys_exist) 821 if not has_required_keys: 822 raise KeyError(f"Input files must have these keys defined: {required_keys}") 823 824 version_specified = db["config_file_version"] 825 version_requested = version 826 827 if version_specified != version_requested: 828 ee = f"Version mismatch: specified in file was {version_specified}," 829 ee += f"requested is {version_requested}" 830 raise ValueError(ee) 831 832 return db
Convert a YAML file to a dictionary.
This function reads a YAML file and returns the result as a dictionary.
Parameters
yml_path_file : Path The fully pathed location to the input file. version : float The version of the YAML file in x.y format. required_keys : Tuple[str, ...] The key(s) that must be in the YAML file for conversion to a dictionary to occur.
Returns
dict The YAML file represented as a dictionary.
Raises
TypeError If the file type is unsupported. OSError If the YAML file cannot be opened or decoded. KeyError If the required keys are not found in the YAML file. ValueError If the version specified in the file does not match the requested version.
835def yml_version( 836 file: Path, 837 key_name="config_file_version", 838) -> float: 839 """ 840 Return the version of a YAML file if the proper key exists. 841 842 Parameters 843 ---------- 844 file : Path 845 The path to the YAML file. 846 key_name : str, optional 847 The key name for the version in the YAML file (default is "config_file_version"). 848 849 Returns 850 ------- 851 float 852 The version of the YAML file. 853 854 Raises 855 ------ 856 KeyError 857 If the version key is not found in the YAML file. 858 ValueError 859 If the version value is not a valid float. 860 """ 861 with open(file, "r") as stream: 862 data = yaml.load(stream, Loader=yaml.SafeLoader) 863 864 try: 865 version = data[key_name] 866 except KeyError: 867 # print(f"Error with version key: {error}") 868 raise KeyError(f"Error with version key, '{key_name}' key not found in {file}.") 869 try: 870 version = float(version) 871 except ValueError: 872 raise ValueError( 873 f"Could not find valid version in {file} for key {key_name}, found '{version}' which is not a float." 874 ) 875 return version
Return the version of a YAML file if the proper key exists.
Parameters
file : Path The path to the YAML file. key_name : str, optional The key name for the version in the YAML file (default is "config_file_version").
Returns
float The version of the YAML file.
Raises
KeyError If the version key is not found in the YAML file. ValueError If the version value is not a valid float.
878def yes_no(question): 879 """ 880 Simple Yes/No function. 881 882 Parameters 883 ---------- 884 question : str 885 The question to ask the user. 886 887 Returns 888 ------- 889 bool 890 True if the user answers "yes", False otherwise. 891 """ 892 prompt = f"{question} (y/n): " 893 ans = input(prompt).strip().lower() 894 if ans not in ["y", "n"]: 895 print(f"{ans} is invalid, please try again...") 896 return yes_no(question) 897 if ans == "y": 898 return True 899 return False
Simple Yes/No function.
Parameters
question : str The question to ask the user.
Returns
bool True if the user answers "yes", False otherwise.
902def remove_directory(directory: Path): 903 """ 904 Recursively remove a directory. 905 906 Parameters 907 ---------- 908 directory : Path 909 The path to the directory to remove. 910 """ 911 shutil.rmtree(directory)
Recursively remove a directory.
Parameters
directory : Path The path to the directory to remove.
914def split_list(data: List, chunk_size: int) -> List: 915 """ 916 Split a list into equal-sized chunks. 917 918 Parameters 919 ---------- 920 data : List 921 The list to split. 922 chunk_size : int 923 The size of each chunk. 924 925 Returns 926 ------- 927 List 928 A list of chunks. 929 """ 930 result = [] 931 for i in range(0, len(data), chunk_size): 932 result.append(data[i : i + chunk_size]) 933 return result
Split a list into equal-sized chunks.
Parameters
data : List The list to split. chunk_size : int The size of each chunk.
Returns
List A list of chunks.
936def tabular_list( 937 data: List, 938 num_columns: int = Constants.default_column_count, 939 column_width: int = Constants.default_column_width, 940) -> str: 941 """ 942 Format a list into a tabular string. 943 944 Parameters 945 ---------- 946 data : List 947 The list to format. 948 num_columns : int, optional 949 The number of columns in the table (default is Constants.default_column_count). 950 column_width : int, optional 951 The width of each column in the table (default is Constants.default_column_width). 952 953 Returns 954 ------- 955 str 956 The formatted tabular string. 957 """ 958 rows = split_list(data, chunk_size=num_columns) 959 result = "" 960 for sublist in rows: 961 result += "\n" 962 for item in sublist: 963 result += f"{item:^{column_width}}" 964 return result
Format a list into a tabular string.
Parameters
data : List The list to format. num_columns : int, optional The number of columns in the table (default is Constants.default_column_count). column_width : int, optional The width of each column in the table (default is Constants.default_column_width).
Returns
str The formatted tabular string.
970def hardware_movement(func): 971 """ 972 Decorator to run a function only when hardware testing is enabled. 973 974 Parameters 975 ---------- 976 func : function 977 The function to decorate. 978 979 Returns 980 ------- 981 function 982 The decorated function. 983 """ 984 985 @run_on_microscope_machine 986 def wrapper_func(): 987 if not Constants.test_hardware_movement: 988 pytest.skip("Run only when hardware testing is enabled") 989 func() 990 991 return wrapper_func
Decorator to run a function only when hardware testing is enabled.
Parameters
func : function The function to decorate.
Returns
function The decorated function.
994def run_on_standalone_machine(func): 995 """ 996 Decorator to run a function only on a standalone machine. 997 998 Parameters 999 ---------- 1000 func : function 1001 The function to decorate. 1002 1003 Returns 1004 ------- 1005 function 1006 The decorated function. 1007 """ 1008 1009 def wrapper_func(): 1010 current_machine = platform.uname().node.lower() 1011 test_machines = [machine.lower() for machine in Constants().offline_machines] 1012 if current_machine not in test_machines: 1013 pytest.skip("Run on Offline License Machine Only.") 1014 func() 1015 1016 return wrapper_func
Decorator to run a function only on a standalone machine.
Parameters
func : function The function to decorate.
Returns
function The decorated function.
1019def run_on_microscope_machine(func): 1020 """ 1021 Decorator to run a function only on a microscope machine. 1022 1023 Parameters 1024 ---------- 1025 func : function 1026 The function to decorate. 1027 1028 Returns 1029 ------- 1030 function 1031 The decorated function. 1032 """ 1033 1034 def wrapper_func(): 1035 current_machine = platform.uname().node.lower() 1036 test_machines = [machine.lower() for machine in Constants().microscope_machines] 1037 if current_machine not in test_machines: 1038 pytest.skip("Run on Microscope Machine Only.") 1039 func() 1040 1041 return wrapper_func
Decorator to run a function only on a microscope machine.
Parameters
func : function The function to decorate.
Returns
function The decorated function.