feat: added tests for connecting devices
This commit is contained in:
parent
bb02a619e5
commit
8c6d0f50cd
@ -3,89 +3,167 @@ import copy
|
|||||||
from io import TextIOWrapper
|
from io import TextIOWrapper
|
||||||
|
|
||||||
import ophyd
|
import ophyd
|
||||||
import ophyd.sim as ops
|
|
||||||
import yaml
|
import yaml
|
||||||
from bec_lib.scibec_validator import SciBecValidator
|
from bec_lib.scibec_validator import SciBecValidator
|
||||||
|
|
||||||
import ophyd_devices as opd
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from bec_plugins import devices as plugin_devices
|
from bec_plugins import devices as plugin_devices
|
||||||
except ImportError:
|
except ImportError:
|
||||||
plugin_devices = None
|
plugin_devices = None
|
||||||
|
|
||||||
|
try:
|
||||||
|
from device_server.devices.devicemanager import DeviceManagerDS as device_manager
|
||||||
|
except ImportError:
|
||||||
|
device_manager = None
|
||||||
|
|
||||||
|
|
||||||
class StaticDeviceTest:
|
class StaticDeviceTest:
|
||||||
|
"""Class to perform tests on an ophyd device config file."""
|
||||||
|
|
||||||
def __init__(self, config: str, output_file: TextIOWrapper) -> None:
|
def __init__(self, config: str, output_file: TextIOWrapper) -> None:
|
||||||
|
"""
|
||||||
|
Args:
|
||||||
|
config(str): path to the config file
|
||||||
|
output_file(TextIOWrapper): file to write the output to
|
||||||
|
"""
|
||||||
self.config = self.read_config(config)
|
self.config = self.read_config(config)
|
||||||
self.file = output_file
|
self.file = output_file
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def read_config(config) -> dict:
|
def read_config(config) -> dict:
|
||||||
"""load a config from disk"""
|
"""
|
||||||
|
Read the config file
|
||||||
|
|
||||||
|
Args:
|
||||||
|
config(str): path to the config file
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
dict: config content
|
||||||
|
"""
|
||||||
content = None
|
content = None
|
||||||
with open(config, "r", encoding="utf-8") as file:
|
with open(config, "r", encoding="utf-8") as file:
|
||||||
file_content = file.read()
|
file_content = file.read()
|
||||||
content = yaml.safe_load(file_content)
|
content = yaml.safe_load(file_content)
|
||||||
return content
|
return content
|
||||||
|
|
||||||
def check_signals(self, name, conf) -> None:
|
def _check_all_signals_of_device(self, name: str, device: ophyd.Device) -> None:
|
||||||
"""run checks on EpicsSignals"""
|
"""
|
||||||
|
Check if all signals of the device that are not omitted are configured with auto_monitor=True
|
||||||
|
|
||||||
|
Args:
|
||||||
|
name(str): name of the device
|
||||||
|
device(ophyd.Device): device object
|
||||||
|
"""
|
||||||
|
for _, sub_name, item in device.walk_components():
|
||||||
|
if not issubclass(item.cls, ophyd.signal.EpicsSignalBase):
|
||||||
|
continue
|
||||||
|
if not item.is_signal:
|
||||||
|
continue
|
||||||
|
if not item.kind == ophyd.Kind.omitted:
|
||||||
|
continue
|
||||||
|
# check if auto_monitor is in kwargs
|
||||||
|
self._has_auto_monitor(f"{name}/{sub_name}", item.kwargs)
|
||||||
|
|
||||||
|
def _check_epics_motor(self, name: str, config: dict) -> None:
|
||||||
|
"""
|
||||||
|
Check if the epics motor config is valid.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
name(str): name of the device
|
||||||
|
config(dict): device config
|
||||||
|
"""
|
||||||
|
if "prefix" in config["deviceConfig"]:
|
||||||
|
return
|
||||||
|
msg_suffix = ""
|
||||||
|
|
||||||
|
# check if the device specifies a read_pv instead of a prefix.
|
||||||
|
# This is a common copy-paste error.
|
||||||
|
if "read_pv" in config["deviceConfig"]:
|
||||||
|
msg_suffix = "Maybe a typo? The device specifies a read_pv instead."
|
||||||
|
raise ValueError(f"{name}: does not specify the prefix. {msg_suffix}")
|
||||||
|
|
||||||
|
def _check_epics_signal(self, name: str, config: dict) -> None:
|
||||||
|
"""
|
||||||
|
Check if the epics signal config is valid. The device must specify a read_pv.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
name(str): name of the device
|
||||||
|
config(dict): device config
|
||||||
|
"""
|
||||||
|
self._has_auto_monitor(name, config["deviceConfig"])
|
||||||
|
if "read_pv" not in config["deviceConfig"]:
|
||||||
|
raise ValueError(f"{name}: does not specify the read_pv")
|
||||||
|
|
||||||
|
def check_device_classes(self, name: str, conf: dict) -> int:
|
||||||
|
"""
|
||||||
|
Run checks on the device class
|
||||||
|
|
||||||
|
Args:
|
||||||
|
name(str): name of the device
|
||||||
|
conf(dict): device config
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
int: 0 if all checks passed, 1 otherwise
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
dev_class = self._get_device_class(conf["deviceClass"])
|
dev_class = device_manager._get_device_class(conf["deviceClass"])
|
||||||
|
|
||||||
if issubclass(dev_class, ophyd.EpicsMotor):
|
if issubclass(dev_class, ophyd.EpicsMotor):
|
||||||
if "prefix" not in conf["deviceConfig"]:
|
self._check_epics_motor(name, conf)
|
||||||
msg_suffix = ""
|
|
||||||
if "read_pv" in conf["deviceConfig"]:
|
|
||||||
msg_suffix = "Maybe a typo? The device specifies a read_pv instead."
|
|
||||||
raise ValueError(f"{name}: does not specify the prefix. {msg_suffix}")
|
|
||||||
if not issubclass(dev_class, ophyd.signal.EpicsSignalBase):
|
|
||||||
if issubclass(dev_class, ophyd.Device):
|
|
||||||
for _, sub_name, item in dev_class.walk_components():
|
|
||||||
if not issubclass(item.cls, ophyd.signal.EpicsSignalBase):
|
|
||||||
continue
|
|
||||||
if not item.is_signal:
|
|
||||||
continue
|
|
||||||
if not item.kind < ophyd.Kind.normal:
|
|
||||||
continue
|
|
||||||
# check if auto_monitor is in kwargs
|
|
||||||
self._has_auto_monitor(f"{name}/{sub_name}", item.kwargs)
|
|
||||||
return 0
|
return 0
|
||||||
self._has_auto_monitor(name, conf["deviceConfig"])
|
|
||||||
if "read_pv" not in conf["deviceConfig"]:
|
if issubclass(dev_class, ophyd.signal.EpicsSignalBase):
|
||||||
raise ValueError(f"{name}: does not specify the read_pv")
|
self._check_epics_signal(name, conf)
|
||||||
|
return 0
|
||||||
|
|
||||||
|
if issubclass(dev_class, ophyd.Device):
|
||||||
|
self._check_all_signals_of_device(name, dev_class)
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.print_and_write(f"ERROR: {name} is not valid: {e}")
|
self.print_and_write(f"ERROR: {name} is not valid: {e}")
|
||||||
return 1
|
return 1
|
||||||
|
|
||||||
@staticmethod
|
def _has_auto_monitor(self, name: str, config: dict) -> None:
|
||||||
def _has_auto_monitor(name: str, config: dict) -> None:
|
|
||||||
if "auto_monitor" not in config:
|
|
||||||
print(f"WARNING: Device {name} is configured without auto monitor.")
|
|
||||||
|
|
||||||
def _get_device_class(self, dev_type):
|
|
||||||
"""Return the class object from 'dev_type' string in the form '[module:][submodule:]class_name'
|
|
||||||
|
|
||||||
The class is looked after in ophyd devices[.module][.submodule] first, if it is not
|
|
||||||
present plugin_devices, ophyd, ophyd_devices.sim are searched too
|
|
||||||
"""
|
"""
|
||||||
submodule, _, class_name = dev_type.rpartition(":")
|
Check if the config has an auto_monitor key and print a warning if not.
|
||||||
if submodule:
|
|
||||||
submodule = f".{submodule.replace(':', '.')}"
|
Args:
|
||||||
for parent_module in (opd, plugin_devices, ophyd, ops):
|
name(str): name of the device
|
||||||
try:
|
config(dict): device config
|
||||||
module = __import__(f"{parent_module.__name__}{submodule}", fromlist=[""])
|
"""
|
||||||
except ModuleNotFoundError:
|
if "auto_monitor" not in config:
|
||||||
continue
|
self.print_and_write(f"WARNING: Device {name} is configured without auto monitor.")
|
||||||
else:
|
|
||||||
break
|
def connect_device(self, name: str, conf: dict) -> int:
|
||||||
else:
|
"""
|
||||||
raise TypeError(f"Unknown device class {dev_type}")
|
Connect to the device
|
||||||
return getattr(module, class_name)
|
|
||||||
|
Args:
|
||||||
|
name(str): name of the device
|
||||||
|
conf(dict): device config
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
int: 0 if all checks passed, 1 otherwise
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
obj, _ = device_manager.construct_device_obj(conf, None)
|
||||||
|
|
||||||
|
device_manager.connect_device(obj)
|
||||||
|
return 0
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.print_and_write(f"ERROR: {name} is not connectable: {e}")
|
||||||
|
return 1
|
||||||
|
|
||||||
def validate_schema(self, name: str, conf: dict) -> None:
|
def validate_schema(self, name: str, conf: dict) -> None:
|
||||||
"""validate the device config against the json schema"""
|
"""
|
||||||
|
Validate the device config against the BEC DB schema
|
||||||
|
|
||||||
|
Args:
|
||||||
|
name(str): name of the device
|
||||||
|
conf(dict): device config
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
validator = SciBecValidator()
|
validator = SciBecValidator()
|
||||||
db_config = self._translate_to_db_config(name, conf)
|
db_config = self._translate_to_db_config(name, conf)
|
||||||
@ -97,22 +175,39 @@ class StaticDeviceTest:
|
|||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _translate_to_db_config(name, config) -> dict:
|
def _translate_to_db_config(name, config) -> dict:
|
||||||
"""translate the config to the format used by the database"""
|
"""
|
||||||
|
Translate the device config to a db config
|
||||||
|
|
||||||
|
Args:
|
||||||
|
name(str): name of the device
|
||||||
|
config(dict): device config
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
dict: db config
|
||||||
|
"""
|
||||||
db_config = copy.deepcopy(config)
|
db_config = copy.deepcopy(config)
|
||||||
db_config["name"] = name
|
db_config["name"] = name
|
||||||
if "deviceConfig" in db_config and db_config["deviceConfig"] is None:
|
if "deviceConfig" in db_config and db_config["deviceConfig"] is None:
|
||||||
db_config["deviceConfig"] = {}
|
db_config["deviceConfig"] = {}
|
||||||
db_config.pop("deviceType")
|
db_config.pop("deviceType", None)
|
||||||
return db_config
|
return db_config
|
||||||
|
|
||||||
def run(self) -> None:
|
def run(self, connect: bool) -> None:
|
||||||
"""run the test"""
|
"""
|
||||||
|
Run the tests
|
||||||
|
|
||||||
|
Args:
|
||||||
|
connect(bool): connect to the devices
|
||||||
|
"""
|
||||||
failed_devices = []
|
failed_devices = []
|
||||||
for name, conf in self.config.items():
|
for name, conf in self.config.items():
|
||||||
return_val = 0
|
return_val = 0
|
||||||
self.print_and_write(f"Checking {name}...")
|
self.print_and_write(f"Checking {name}...")
|
||||||
return_val += self.validate_schema(name, conf)
|
return_val += self.validate_schema(name, conf)
|
||||||
return_val += self.check_signals(name, conf)
|
return_val += self.check_device_classes(name, conf)
|
||||||
|
if connect:
|
||||||
|
return_val += self.connect_device(name, conf)
|
||||||
|
|
||||||
if return_val == 0:
|
if return_val == 0:
|
||||||
self.print_and_write("OK")
|
self.print_and_write("OK")
|
||||||
else:
|
else:
|
||||||
@ -134,6 +229,12 @@ class StaticDeviceTest:
|
|||||||
self.file.write(f" {device}\n")
|
self.file.write(f" {device}\n")
|
||||||
|
|
||||||
def print_and_write(self, text: str) -> None:
|
def print_and_write(self, text: str) -> None:
|
||||||
|
"""
|
||||||
|
Print and write to the output file
|
||||||
|
|
||||||
|
Args:
|
||||||
|
text(str): text to print and write
|
||||||
|
"""
|
||||||
print(text)
|
print(text)
|
||||||
self.file.write(text + "\n")
|
self.file.write(text + "\n")
|
||||||
|
|
||||||
@ -148,10 +249,17 @@ def launch() -> None:
|
|||||||
parser.add_argument("--config", help="path to the config file", required=True, type=str)
|
parser.add_argument("--config", help="path to the config file", required=True, type=str)
|
||||||
optional = parser.add_argument_group("optional arguments")
|
optional = parser.add_argument_group("optional arguments")
|
||||||
optional.add_argument("--output", default="./report.txt", help="path to the output file")
|
optional.add_argument("--output", default="./report.txt", help="path to the output file")
|
||||||
|
optional.add_argument("--connect", action="store_true", help="connect to the devices")
|
||||||
parser.add_help = True
|
parser.add_help = True
|
||||||
|
|
||||||
clargs = parser.parse_args()
|
clargs = parser.parse_args()
|
||||||
|
|
||||||
|
if device_manager is None:
|
||||||
|
raise ImportError(
|
||||||
|
"device_server is not installed. Please install it first with pip install"
|
||||||
|
" bec-device-server."
|
||||||
|
)
|
||||||
|
|
||||||
with open("./report.txt", "w", encoding="utf-8") as file:
|
with open("./report.txt", "w", encoding="utf-8") as file:
|
||||||
device_config_test = StaticDeviceTest(clargs.config, output_file=file)
|
device_config_test = StaticDeviceTest(clargs.config, output_file=file)
|
||||||
device_config_test.run()
|
device_config_test.run(clargs.connect)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user