From 9990a70d55e7370dd8cb2a2ad591a920805fbada Mon Sep 17 00:00:00 2001 From: appel_c Date: Tue, 24 Mar 2026 08:21:05 +0100 Subject: [PATCH] bpm draft --- csaxs_bec/devices/pseudo_devices/bpm.py | 181 +++++++++++++----------- 1 file changed, 97 insertions(+), 84 deletions(-) diff --git a/csaxs_bec/devices/pseudo_devices/bpm.py b/csaxs_bec/devices/pseudo_devices/bpm.py index 5dd4840..1d5991f 100644 --- a/csaxs_bec/devices/pseudo_devices/bpm.py +++ b/csaxs_bec/devices/pseudo_devices/bpm.py @@ -1,100 +1,113 @@ -from ophyd_devices.utils.bec_processed_signal import BECProcessedSignal, ProcessedSignalModel -from ophyd_devices import PSIDeviceBase -from ophyd import Kind, Signal +import time + from ophyd import Component as Cpt +from ophyd import Kind, Signal +from ophyd_devices import PSIDeviceBase +from ophyd_devices.utils.bec_processed_signal import BECProcessedSignal + class BPM(PSIDeviceBase): # Blade signals, a,b,c,d - top = Cpt(BECProcessedSignal, name="top", model=None,kind=Kind.config, doc="... top blade") - right = Cpt(BECProcessedSignal, name="right", model=None,kind=Kind.config, doc="... right blade") - bot = Cpt(BECProcessedSignal, name="bot", model=None,kind=Kind.config, doc="... bot blade") - left = Cpt(BECProcessedSignal, name="left", model=None,kind=Kind.config, doc="... left blade") + top = Cpt(BECProcessedSignal, name="top", model=None, kind=Kind.config, doc="... top blade") + right = Cpt( + BECProcessedSignal, name="right", model=None, kind=Kind.config, doc="... right blade" + ) + bot = Cpt(BECProcessedSignal, name="bot", model=None, kind=Kind.config, doc="... bot blade") + left = Cpt(BECProcessedSignal, name="left", model=None, kind=Kind.config, doc="... left blade") - # Virtual signals - # pos_x = Cpt(BECProcessedSignal, name="pos_x", model=None,kind=Kind.config, doc="... pos_x") - # pos_y = Cpt(BECProcessedSignal, name="pos_y", model=None,kind=Kind.config, doc="... pos_y") - # diagonal = Cpt(BECProcessedSignal, name="diagonal", model=None,kind=Kind.config, doc="... diagonal") - intensity = Cpt(BECProcessedSignal, name="intensity", model=None,kind=Kind.config, doc="... intensity") + # Virtual signals + pos_x = Cpt(BECProcessedSignal, name="pos_x", model=None, kind=Kind.config, doc="... pos_x") + pos_y = Cpt(BECProcessedSignal, name="pos_y", model=None, kind=Kind.config, doc="... pos_y") + diagonal = Cpt( + BECProcessedSignal, name="diagonal", model=None, kind=Kind.config, doc="... diagonal" + ) + intensity = Cpt( + BECProcessedSignal, name="intensity", model=None, kind=Kind.config, doc="... intensity" + ) - - def __init__(self, - name, - blade_t: str, - blade_r: str, - blade_b: str, - blade_l: str, - device_manager=None, - **kwargs): + def __init__( + self, + name, + blade_t: str, + blade_r: str, + blade_b: str, + blade_l: str, + device_manager=None, + **kwargs, + ): super().__init__(name=name, device_manager=device_manager, **kwargs) - self.top.set_model(devices= {"signal", blade_t}, compute_method=self._compute_blade_signal) - self.top.set_model(devices= {"signal", blade_r}, compute_method=self._compute_blade_signal) - self.top.set_model(devices= {"signal", blade_b}, compute_method=self._compute_blade_signal) - self.top.set_model(devices= {"signal", blade_l}, compute_method=self._compute_blade_signal) - self.intensity + signal_t = self.top.get_device_object_from_bec( + object_name=blade_t, signal_name=self.name, device_manager=device_manager + ) + self.top.set_compute_method(self._compute_blade_signal, signal=signal_t) - def wait_for_connection(self, *args, **kwargs): - # Blade models... - model = ProcessedSignalModel( - devices = {"signal" : self._blade_names[0]}, - compute_method=self._compute_blade_signal, - return_type=float, + signal_r = self.right.get_device_object_from_bec( + object_name=blade_r, signal_name=self.name, device_manager=device_manager ) - self.top.set_model(model) - self.top.wait_for_connection() - model = ProcessedSignalModel( - devices = {"signal" : self._blade_names[1]}, - compute_method=self._compute_blade_signal, - return_type=float, - ) - self.right.set_model(model) - self.right.wait_for_connection() - model = ProcessedSignalModel( - devices = {"signal" : self._blade_names[2]}, - compute_method=self._compute_blade_signal, - return_type=float, - ) - self.bot.set_model(model) - self.bot.wait_for_connection() - model = ProcessedSignalModel( - devices = {"signal" : self._blade_names[3]}, - compute_method=self._compute_blade_signal, - return_type=float, - ) - self.left.set_model(model) - self.left.wait_for_connection() + self.right.set_compute_method(self._compute_blade_signal, signal=signal_r) - # Intensity - model = ProcessedSignalModel( - devices = { - "top" : f"{self.name}_{self.top.name}", - "right" : f"{self.name}_{self.right.name}", - "bot" : f"{self.name}_{self.bot.name}", - "left" : f"{self.name}_{self.left.name}", - }, - compute_method=self._compute_intensity, - return_type=float + signal_b = self.bot.get_device_object_from_bec( + object_name=blade_b, signal_name=self.name, device_manager=device_manager ) - self.intensity.set_model(model) - dev_objs = { - "top" : self.top, - "right" : self.right, - "bot" : self.bot, - "left" : self.left, - } - self.intensity.set_device_object(device_objects=dev_objs) - self.intensity.wait_for_connection() - return super().wait_for_connection(*args, **kwargs) + self.bot.set_compute_method(self._compute_blade_signal, signal=signal_b) - def compute(self, signal_1: Signal, signal_2: Signal) -> float: - """Compute the value of the pseudo signal based on the values of signal_1 and signal_2.""" - return float(signal_1.get() + signal_2.get()) - + signal_l = self.left.get_device_object_from_bec( + object_name=blade_l, signal_name=self.name, device_manager=device_manager + ) + self.left.set_compute_method(self._compute_blade_signal, signal=signal_l) - def _compute_blade_signal(self, signal:Signal) -> float: + self.intensity.set_compute_method( + self._compute_intensity, top=self.top, right=self.right, bot=self.bot, left=self.left + ) + self.pos_x.set_compute_method( + self._compute_pos_x, left=self.left, top=self.top, right=self.right, bot=self.bot + ) + self.pos_y.set_compute_method( + self._compute_pos_y, left=self.left, top=self.top, right=self.right, bot=self.bot + ) + self.diagonal.set_compute_method( + self._compute_diagonal, left=self.left, top=self.top, right=self.right, bot=self.bot + ) + + def _compute_blade_signal(self, signal: Signal) -> float: + timestamp = time.time() + self._metadata["timestamp"] = timestamp return signal.get() - - def _compute_intensity(self, top, right, bot, left) -> float: - intensity = top.get() + right.get() + bot.get() + left.get() - return intensity \ No newline at end of file + + def _compute_intensity(self, top: Signal, right: Signal, bot: Signal, left: Signal) -> float: + timestamp = time.time() + self._metadata["timestamp"] = timestamp + intensity = top.get() + right.get() + bot.get() + left.get() + return intensity + + def _compute_pos_x(self, left: Signal, top: Signal, right: Signal, bot: Signal) -> float: + timestamp = time.time() + self._metadata["timestamp"] = timestamp + sum_left = left.get() + top.get() + sum_right = right.get() + bot.get() + sum_total = sum_left + sum_right + if sum_total == 0: + return 0.0 + return (sum_left - sum_right) / sum_total + + def _compute_pos_y(self, left: Signal, top: Signal, right: Signal, bot: Signal) -> float: + timestamp = time.time() + self._metadata["timestamp"] = timestamp + sum_top = top.get() + right.get() + sum_bot = bot.get() + left.get() + sum_total = sum_top + sum_bot + if sum_total == 0: + return 0.0 + return (sum_top - sum_bot) / sum_total + + def _compute_diagonal(self, left: Signal, top: Signal, right: Signal, bot: Signal) -> float: + timestamp = time.time() + self._metadata["timestamp"] = timestamp + sum_diag1 = left.get() + right.get() + sum_diag2 = top.get() + bot.get() + sum_total = sum_diag1 + sum_diag2 + if sum_total == 0: + return 0.0 + return (sum_diag1 - sum_diag2) / sum_total