refactor: added common controller methods

This commit is contained in:
wakonig_k 2024-05-07 18:00:19 +02:00
parent 44de499d40
commit 00b3ae8258

View File

@ -8,6 +8,10 @@ from ophyd.ophydobj import OphydObject
logger = bec_logger.logger
class ControllerCommunicationError(Exception):
"""Exception raised when a communication error occurs with the controller."""
def threadlocked(fcn):
"""Ensure that the thread acquires and releases the lock."""
@ -20,6 +24,20 @@ def threadlocked(fcn):
return wrapper
def retry_once(fcn):
"""Decorator to rerun a function in case a CommunicationError was raised. This may happen if the buffer was not empty."""
@functools.wraps(fcn)
def wrapper(self, *args, **kwargs):
try:
val = fcn(self, *args, **kwargs)
except ControllerCommunicationError:
val = fcn(self, *args, **kwargs)
return val
return wrapper
def axis_checked(fcn):
"""Decorator to catch attempted access to channels that are not available."""
@ -70,6 +88,60 @@ class Controller(OphydObject):
self._socket_host = socket_host
self._socket_port = socket_port
@threadlocked
def socket_put(self, val: str):
"""
Send a command to the controller.
Args:
val (str): Command to send
"""
self.sock.put(f"{val}\n".encode())
@threadlocked
def socket_get(self):
return self.sock.receive().decode()
@retry_once
@threadlocked
def socket_put_and_receive(self, val: str, remove_trailing_chars=True) -> str:
"""
Send a command to the controller and receive the response.
Override this method in the derived class if necessary, especially if the response
needs to be parsed differently.
"""
self.socket_put(val)
if remove_trailing_chars:
return self._remove_trailing_characters(self.sock.receive().decode())
return self.socket_get()
def _remove_trailing_characters(self, var) -> str:
if len(var) > 1:
return var.split("\r\n")[0]
return var
def get_device_manager(self):
for axis in self._axis:
if hasattr(axis, "device_manager") and axis.device_manager:
return axis.device_manager
raise BECConfigError("Could not access the device_manager")
def get_axis_by_name(self, name):
for axis in self._axis:
if axis:
if axis.name == name:
return axis
raise RuntimeError(f"Could not find an axis with name {name}")
def set_device_enabled(self, device_name: str, enabled: bool) -> None:
"""enable / disable a device"""
if device_name not in self.get_device_manager().devices:
logger.warning(
f"Device {device_name} is not configured and cannot be enabled/disabled."
)
return
self.get_device_manager().devices[device_name].read_only = not enabled
def _initialize(self):
self._connected = False
self._set_default_values()