refactor(lazy-wait-for-connection): add context manager for lazy-wait-for-connection disable

This commit is contained in:
2025-06-23 14:53:25 +02:00
committed by Christian Appel
parent 40a05796c5
commit c63e455d9c
@@ -4,7 +4,8 @@ is used to create the device interface for proxy objects on other services.
"""
import functools
from typing import Any
from contextlib import contextmanager
from typing import Any, Generator
import msgpack
from ophyd import Device, Kind, PositionerBase, Signal
@@ -20,6 +21,21 @@ from bec_lib.signature_serializer import signature_to_dict
logger = bec_logger.logger
@contextmanager
def disable_lazy_wait_for_connection(
device: PositionerBase | ComputedSignal | Signal | Device | BECDeviceBase,
) -> Generator[None, None, None]:
"""Context manager to disable lazy wait for connection for a device and its subdevices."""
device_dict = get_lazy_wait_for_connection(device)
try:
for dev, _ in device_dict.values(): # Set all to False
dev.lazy_wait_for_connection = False
yield
finally:
for dev, initial_value in device_dict.values():
dev.lazy_wait_for_connection = initial_value
def is_serializable(var: Any) -> bool:
"""
Check if a variable is serializable
@@ -193,12 +209,8 @@ def get_device_info(
# needed because ophyd signals have empty hints
hints = {"fields": [obj.name]}
elif connect: # only works if PVs are connected
device_dict = get_lazy_wait_for_connection(obj)
for _, info in device_dict.items(): # Set all to False
info[0].lazy_wait_for_connection = False
hints = obj.hints
for _, info in device_dict.items(): # Set back to initial value
info[0].lazy_wait_for_connection = info[1]
with disable_lazy_wait_for_connection(obj):
hints = obj.hints
else:
hints = {}