From 8c6d0f50cdb61843532c7a2f2a03a421acdb126a Mon Sep 17 00:00:00 2001 From: wakonig_k Date: Wed, 24 Jan 2024 11:07:20 +0100 Subject: [PATCH] feat: added tests for connecting devices --- ophyd_devices/utils/static_device_test.py | 220 ++++++++++++++++------ 1 file changed, 164 insertions(+), 56 deletions(-) diff --git a/ophyd_devices/utils/static_device_test.py b/ophyd_devices/utils/static_device_test.py index 8593b77..14591fd 100644 --- a/ophyd_devices/utils/static_device_test.py +++ b/ophyd_devices/utils/static_device_test.py @@ -3,89 +3,167 @@ import copy from io import TextIOWrapper import ophyd -import ophyd.sim as ops import yaml from bec_lib.scibec_validator import SciBecValidator -import ophyd_devices as opd - try: from bec_plugins import devices as plugin_devices except ImportError: plugin_devices = None +try: + from device_server.devices.devicemanager import DeviceManagerDS as device_manager +except ImportError: + device_manager = None + class StaticDeviceTest: + """Class to perform tests on an ophyd device config file.""" + def __init__(self, config: str, output_file: TextIOWrapper) -> None: + """ + Args: + config(str): path to the config file + output_file(TextIOWrapper): file to write the output to + """ self.config = self.read_config(config) self.file = output_file @staticmethod def read_config(config) -> dict: - """load a config from disk""" + """ + Read the config file + + Args: + config(str): path to the config file + + Returns: + dict: config content + """ content = None with open(config, "r", encoding="utf-8") as file: file_content = file.read() content = yaml.safe_load(file_content) return content - def check_signals(self, name, conf) -> None: - """run checks on EpicsSignals""" + def _check_all_signals_of_device(self, name: str, device: ophyd.Device) -> None: + """ + Check if all signals of the device that are not omitted are configured with auto_monitor=True + + Args: + name(str): name of the device + device(ophyd.Device): device object + """ + for _, sub_name, item in device.walk_components(): + if not issubclass(item.cls, ophyd.signal.EpicsSignalBase): + continue + if not item.is_signal: + continue + if not item.kind == ophyd.Kind.omitted: + continue + # check if auto_monitor is in kwargs + self._has_auto_monitor(f"{name}/{sub_name}", item.kwargs) + + def _check_epics_motor(self, name: str, config: dict) -> None: + """ + Check if the epics motor config is valid. + + Args: + name(str): name of the device + config(dict): device config + """ + if "prefix" in config["deviceConfig"]: + return + msg_suffix = "" + + # check if the device specifies a read_pv instead of a prefix. + # This is a common copy-paste error. + if "read_pv" in config["deviceConfig"]: + msg_suffix = "Maybe a typo? The device specifies a read_pv instead." + raise ValueError(f"{name}: does not specify the prefix. {msg_suffix}") + + def _check_epics_signal(self, name: str, config: dict) -> None: + """ + Check if the epics signal config is valid. The device must specify a read_pv. + + Args: + name(str): name of the device + config(dict): device config + """ + self._has_auto_monitor(name, config["deviceConfig"]) + if "read_pv" not in config["deviceConfig"]: + raise ValueError(f"{name}: does not specify the read_pv") + + def check_device_classes(self, name: str, conf: dict) -> int: + """ + Run checks on the device class + + Args: + name(str): name of the device + conf(dict): device config + + Returns: + int: 0 if all checks passed, 1 otherwise + """ try: - dev_class = self._get_device_class(conf["deviceClass"]) + dev_class = device_manager._get_device_class(conf["deviceClass"]) + if issubclass(dev_class, ophyd.EpicsMotor): - if "prefix" not in conf["deviceConfig"]: - msg_suffix = "" - if "read_pv" in conf["deviceConfig"]: - msg_suffix = "Maybe a typo? The device specifies a read_pv instead." - raise ValueError(f"{name}: does not specify the prefix. {msg_suffix}") - if not issubclass(dev_class, ophyd.signal.EpicsSignalBase): - if issubclass(dev_class, ophyd.Device): - for _, sub_name, item in dev_class.walk_components(): - if not issubclass(item.cls, ophyd.signal.EpicsSignalBase): - continue - if not item.is_signal: - continue - if not item.kind < ophyd.Kind.normal: - continue - # check if auto_monitor is in kwargs - self._has_auto_monitor(f"{name}/{sub_name}", item.kwargs) + self._check_epics_motor(name, conf) return 0 - self._has_auto_monitor(name, conf["deviceConfig"]) - if "read_pv" not in conf["deviceConfig"]: - raise ValueError(f"{name}: does not specify the read_pv") + + if issubclass(dev_class, ophyd.signal.EpicsSignalBase): + self._check_epics_signal(name, conf) + return 0 + + if issubclass(dev_class, ophyd.Device): + self._check_all_signals_of_device(name, dev_class) return 0 + except Exception as e: self.print_and_write(f"ERROR: {name} is not valid: {e}") return 1 - @staticmethod - def _has_auto_monitor(name: str, config: dict) -> None: - if "auto_monitor" not in config: - print(f"WARNING: Device {name} is configured without auto monitor.") - - def _get_device_class(self, dev_type): - """Return the class object from 'dev_type' string in the form '[module:][submodule:]class_name' - - The class is looked after in ophyd devices[.module][.submodule] first, if it is not - present plugin_devices, ophyd, ophyd_devices.sim are searched too + def _has_auto_monitor(self, name: str, config: dict) -> None: """ - submodule, _, class_name = dev_type.rpartition(":") - if submodule: - submodule = f".{submodule.replace(':', '.')}" - for parent_module in (opd, plugin_devices, ophyd, ops): - try: - module = __import__(f"{parent_module.__name__}{submodule}", fromlist=[""]) - except ModuleNotFoundError: - continue - else: - break - else: - raise TypeError(f"Unknown device class {dev_type}") - return getattr(module, class_name) + Check if the config has an auto_monitor key and print a warning if not. + + Args: + name(str): name of the device + config(dict): device config + """ + if "auto_monitor" not in config: + self.print_and_write(f"WARNING: Device {name} is configured without auto monitor.") + + def connect_device(self, name: str, conf: dict) -> int: + """ + Connect to the device + + Args: + name(str): name of the device + conf(dict): device config + + Returns: + int: 0 if all checks passed, 1 otherwise + """ + try: + obj, _ = device_manager.construct_device_obj(conf, None) + + device_manager.connect_device(obj) + return 0 + + except Exception as e: + self.print_and_write(f"ERROR: {name} is not connectable: {e}") + return 1 def validate_schema(self, name: str, conf: dict) -> None: - """validate the device config against the json schema""" + """ + Validate the device config against the BEC DB schema + + Args: + name(str): name of the device + conf(dict): device config + """ try: validator = SciBecValidator() db_config = self._translate_to_db_config(name, conf) @@ -97,22 +175,39 @@ class StaticDeviceTest: @staticmethod def _translate_to_db_config(name, config) -> dict: - """translate the config to the format used by the database""" + """ + Translate the device config to a db config + + Args: + name(str): name of the device + config(dict): device config + + Returns: + dict: db config + """ db_config = copy.deepcopy(config) db_config["name"] = name if "deviceConfig" in db_config and db_config["deviceConfig"] is None: db_config["deviceConfig"] = {} - db_config.pop("deviceType") + db_config.pop("deviceType", None) return db_config - def run(self) -> None: - """run the test""" + def run(self, connect: bool) -> None: + """ + Run the tests + + Args: + connect(bool): connect to the devices + """ failed_devices = [] for name, conf in self.config.items(): return_val = 0 self.print_and_write(f"Checking {name}...") return_val += self.validate_schema(name, conf) - return_val += self.check_signals(name, conf) + return_val += self.check_device_classes(name, conf) + if connect: + return_val += self.connect_device(name, conf) + if return_val == 0: self.print_and_write("OK") else: @@ -134,6 +229,12 @@ class StaticDeviceTest: self.file.write(f" {device}\n") def print_and_write(self, text: str) -> None: + """ + Print and write to the output file + + Args: + text(str): text to print and write + """ print(text) self.file.write(text + "\n") @@ -148,10 +249,17 @@ def launch() -> None: parser.add_argument("--config", help="path to the config file", required=True, type=str) optional = parser.add_argument_group("optional arguments") optional.add_argument("--output", default="./report.txt", help="path to the output file") + optional.add_argument("--connect", action="store_true", help="connect to the devices") parser.add_help = True clargs = parser.parse_args() + if device_manager is None: + raise ImportError( + "device_server is not installed. Please install it first with pip install" + " bec-device-server." + ) + with open("./report.txt", "w", encoding="utf-8") as file: device_config_test = StaticDeviceTest(clargs.config, output_file=file) - device_config_test.run() + device_config_test.run(clargs.connect)