diff --git a/ophyd_devices/rt_lamni/rt_flomni_ophyd.py b/ophyd_devices/rt_lamni/rt_flomni_ophyd.py index 7d1deb5..ded8dfd 100644 --- a/ophyd_devices/rt_lamni/rt_flomni_ophyd.py +++ b/ophyd_devices/rt_lamni/rt_flomni_ophyd.py @@ -126,9 +126,9 @@ class RtFlomniController(RtController): ) if not np.isclose(fsamx.obj.readback.get(), fsamx_in, atol=0.01): - fsamx.enabled_set = True + fsamx.read_only = False fsamx.obj.move(fsamx_in, wait=True) - fsamx.enabled_set = False + fsamx.read_only = True time.sleep(1) self.socket_put("l1") @@ -169,13 +169,13 @@ class RtFlomniController(RtController): wait_on_exit = True self.socket_put("v0") fsamx = self.get_device_manager().devices.fsamx - fsamx.enabled_set = True + fsamx.read_only = False fsamx.obj.controller.socket_put_confirmed("axspeed[4]=0.1*stppermm[4]") fsamx.obj.pid_x_correction -= (self.get_pid_x() - expected_voltage) * 0.007 logger.info(f"Correcting fsamx by {fsamx.obj.pid_x_correction}") fsamx_in = fsamx.user_parameter.get("in") fsamx.obj.move(fsamx_in + cenx / 1000 + fsamx.obj.pid_x_correction, wait=True) - fsamx.enabled_set = False + fsamx.read_only = True time.sleep(0.1) self.laser_tracker_on() time.sleep(0.01) diff --git a/ophyd_devices/rt_lamni/rt_lamni_ophyd.py b/ophyd_devices/rt_lamni/rt_lamni_ophyd.py index 0e70d64..6a67912 100644 --- a/ophyd_devices/rt_lamni/rt_lamni_ophyd.py +++ b/ophyd_devices/rt_lamni/rt_lamni_ophyd.py @@ -3,7 +3,7 @@ import threading import time import numpy as np -from bec_lib import messages, MessageEndpoints, bec_logger +from bec_lib import MessageEndpoints, bec_logger, messages from ophyd import Component as Cpt from ophyd import Device, PositionerBase, Signal from ophyd.status import wait as status_wait @@ -435,8 +435,7 @@ class RtLamniController(Controller): self.get_device_manager().producer.send( MessageEndpoints.device_read("rt_lamni"), messages.DeviceMessage( - signals=signals, - metadata={"pointID": pointID, **self.readout_metadata}, + signals=signals, metadata={"pointID": pointID, **self.readout_metadata} ).dumps(), ) @@ -532,7 +531,7 @@ class RtLamniController(Controller): f"Device {device_name} is not configured and cannot be enabled/disabled." ) return - self.get_device_manager().devices[device_name].enabled_set = enabled + self.get_device_manager().devices[device_name].read_only = not enabled class RtLamniSignalBase(SocketSignal): @@ -623,11 +622,7 @@ class RtLamniMotorIsMoving(RtLamniSignalRO): def get(self): val = super().get() if val is not None: - self._run_subs( - sub_type=self.SUB_VALUE, - value=val, - timestamp=time.time(), - ) + self._run_subs(sub_type=self.SUB_VALUE, value=val, timestamp=time.time()) return val @@ -642,11 +637,7 @@ class RtLamniFeedbackRunning(RtLamniSignalRO): class RtLamniMotor(Device, PositionerBase): USER_ACCESS = ["controller"] - readback = Cpt( - RtLamniReadbackSignal, - signal_name="readback", - kind="hinted", - ) + readback = Cpt(RtLamniReadbackSignal, signal_name="readback", kind="hinted") user_setpoint = Cpt(RtLamniSetpointSignal, signal_name="setpoint") motor_is_moving = Cpt(RtLamniMotorIsMoving, signal_name="motor_is_moving", kind="normal") @@ -769,11 +760,7 @@ class RtLamniMotor(Device, PositionerBase): while self.motor_is_moving.get(): print("motor is moving") val = self.readback.read() - self._run_subs( - sub_type=self.SUB_READBACK, - value=val, - timestamp=time.time(), - ) + self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time()) time.sleep(0.01) print("Move finished") self._done_moving() diff --git a/ophyd_devices/rt_lamni/rt_ophyd.py b/ophyd_devices/rt_lamni/rt_ophyd.py index 202c7ef..3c70b33 100644 --- a/ophyd_devices/rt_lamni/rt_ophyd.py +++ b/ophyd_devices/rt_lamni/rt_ophyd.py @@ -405,8 +405,7 @@ class RtController(Controller): self.get_device_manager().producer.send( MessageEndpoints.device_read("rt_lamni"), messages.DeviceMessage( - signals=signals, - metadata={"pointID": pointID, **self.readout_metadata}, + signals=signals, metadata={"pointID": pointID, **self.readout_metadata} ).dumps(), ) @@ -502,7 +501,7 @@ class RtController(Controller): f"Device {device_name} is not configured and cannot be enabled/disabled." ) return - self.get_device_manager().devices[device_name].enabled_set = enabled + self.get_device_manager().devices[device_name].read_only = not enabled class RtSignalBase(SocketSignal): @@ -593,11 +592,7 @@ class RtMotorIsMoving(RtSignalRO): def get(self): val = super().get() if val is not None: - self._run_subs( - sub_type=self.SUB_VALUE, - value=val, - timestamp=time.time(), - ) + self._run_subs(sub_type=self.SUB_VALUE, value=val, timestamp=time.time()) return val @@ -612,11 +607,7 @@ class RtFeedbackRunning(RtSignalRO): class RtMotor(Device, PositionerBase): USER_ACCESS = ["controller"] - readback = Cpt( - RtReadbackSignal, - signal_name="readback", - kind="hinted", - ) + readback = Cpt(RtReadbackSignal, signal_name="readback", kind="hinted") user_setpoint = Cpt(RtSetpointSignal, signal_name="setpoint") motor_is_moving = Cpt(RtMotorIsMoving, signal_name="motor_is_moving", kind="normal") @@ -739,11 +730,7 @@ class RtMotor(Device, PositionerBase): while self.motor_is_moving.get(): print("motor is moving") val = self.readback.read() - self._run_subs( - sub_type=self.SUB_READBACK, - value=val, - timestamp=time.time(), - ) + self._run_subs(sub_type=self.SUB_READBACK, value=val, timestamp=time.time()) time.sleep(0.01) print("Move finished") self._done_moving() diff --git a/ophyd_devices/utils/bec_utils.py b/ophyd_devices/utils/bec_utils.py index 94c9cf7..c2cf88e 100644 --- a/ophyd_devices/utils/bec_utils.py +++ b/ophyd_devices/utils/bec_utils.py @@ -2,12 +2,10 @@ import time from bec_lib import bec_logger from bec_lib.devicemanager import DeviceContainer - -from ophyd import Signal, Kind +from ophyd import Kind, Signal from ophyd_devices.utils.socket import data_shape, data_type - logger = bec_logger.logger DEFAULT_EPICSSIGNAL_VALUE = object() @@ -18,7 +16,7 @@ class DeviceMock: self.name = name self.read_buffer = value self._config = {"deviceConfig": {"limits": [-50, 50]}, "userParameter": None} - self._enabled_set = True + self._read_only = False self._enabled = True def read(self): @@ -28,12 +26,12 @@ class DeviceMock: return self.read_buffer @property - def enabled_set(self) -> bool: - return self._enabled_set + def read_only(self) -> bool: + return self._read_only - @enabled_set.setter - def enabled_set(self, val: bool): - self._enabled_set = val + @read_only.setter + def read_only(self, val: bool): + self._read_only = val @property def enabled(self) -> bool: @@ -191,14 +189,7 @@ class ConfigSignal(Signal): self._readback = getattr(self.parent, self.storage_name)[self.name] return self._readback - def put( - self, - value, - connection_timeout=1, - callback=None, - timeout=1, - **kwargs, - ): + def put(self, value, connection_timeout=1, callback=None, timeout=1, **kwargs): """Using channel access, set the write PV to `value`. Keyword arguments are passed on to callbacks @@ -224,10 +215,7 @@ class ConfigSignal(Signal): getattr(self.parent, self.storage_name)[self.name] = value super().put(value, timestamp=timestamp, force=True) self._run_subs( - sub_type=self.SUB_VALUE, - old_value=old_value, - value=value, - timestamp=timestamp, + sub_type=self.SUB_VALUE, old_value=old_value, value=value, timestamp=timestamp ) def describe(self): diff --git a/tests/utils.py b/tests/utils.py index 84ebce8..f06eea8 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -1,8 +1,8 @@ +from unittest import mock + from bec_lib.devicemanager import DeviceContainer from bec_lib.tests.utils import ProducerMock -from unittest import mock - class SocketMock: """Socket Mock. Used for testing""" @@ -251,7 +251,7 @@ class DeviceMock: self.name = name self.read_buffer = value self._config = {"deviceConfig": {"limits": [-50, 50]}, "userParameter": None} - self._enabled_set = True + self._read_only = False self._enabled = True def read(self): @@ -263,14 +263,14 @@ class DeviceMock: return self.read_buffer @property - def enabled_set(self) -> bool: - """enabled_set property""" - return self._enabled_set + def read_only(self) -> bool: + """read only property""" + return self._read_only - @enabled_set.setter - def enabled_set(self, val: bool): - """enabled_set setter""" - self._enabled_set = val + @read_only.setter + def read_only(self, val: bool): + """read only setter""" + self._read_only = val @property def enabled(self) -> bool: