This commit is contained in:
@@ -1,100 +1,113 @@
|
||||
from ophyd_devices.utils.bec_processed_signal import BECProcessedSignal, ProcessedSignalModel
|
||||
from ophyd_devices import PSIDeviceBase
|
||||
from ophyd import Kind, Signal
|
||||
import time
|
||||
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import Kind, Signal
|
||||
from ophyd_devices import PSIDeviceBase
|
||||
from ophyd_devices.utils.bec_processed_signal import BECProcessedSignal
|
||||
|
||||
|
||||
class BPM(PSIDeviceBase):
|
||||
|
||||
# Blade signals, a,b,c,d
|
||||
top = Cpt(BECProcessedSignal, name="top", model=None,kind=Kind.config, doc="... top blade")
|
||||
right = Cpt(BECProcessedSignal, name="right", model=None,kind=Kind.config, doc="... right blade")
|
||||
bot = Cpt(BECProcessedSignal, name="bot", model=None,kind=Kind.config, doc="... bot blade")
|
||||
left = Cpt(BECProcessedSignal, name="left", model=None,kind=Kind.config, doc="... left blade")
|
||||
top = Cpt(BECProcessedSignal, name="top", model=None, kind=Kind.config, doc="... top blade")
|
||||
right = Cpt(
|
||||
BECProcessedSignal, name="right", model=None, kind=Kind.config, doc="... right blade"
|
||||
)
|
||||
bot = Cpt(BECProcessedSignal, name="bot", model=None, kind=Kind.config, doc="... bot blade")
|
||||
left = Cpt(BECProcessedSignal, name="left", model=None, kind=Kind.config, doc="... left blade")
|
||||
|
||||
# Virtual signals
|
||||
# pos_x = Cpt(BECProcessedSignal, name="pos_x", model=None,kind=Kind.config, doc="... pos_x")
|
||||
# pos_y = Cpt(BECProcessedSignal, name="pos_y", model=None,kind=Kind.config, doc="... pos_y")
|
||||
# diagonal = Cpt(BECProcessedSignal, name="diagonal", model=None,kind=Kind.config, doc="... diagonal")
|
||||
intensity = Cpt(BECProcessedSignal, name="intensity", model=None,kind=Kind.config, doc="... intensity")
|
||||
# Virtual signals
|
||||
pos_x = Cpt(BECProcessedSignal, name="pos_x", model=None, kind=Kind.config, doc="... pos_x")
|
||||
pos_y = Cpt(BECProcessedSignal, name="pos_y", model=None, kind=Kind.config, doc="... pos_y")
|
||||
diagonal = Cpt(
|
||||
BECProcessedSignal, name="diagonal", model=None, kind=Kind.config, doc="... diagonal"
|
||||
)
|
||||
intensity = Cpt(
|
||||
BECProcessedSignal, name="intensity", model=None, kind=Kind.config, doc="... intensity"
|
||||
)
|
||||
|
||||
|
||||
def __init__(self,
|
||||
name,
|
||||
blade_t: str,
|
||||
blade_r: str,
|
||||
blade_b: str,
|
||||
blade_l: str,
|
||||
device_manager=None,
|
||||
**kwargs):
|
||||
def __init__(
|
||||
self,
|
||||
name,
|
||||
blade_t: str,
|
||||
blade_r: str,
|
||||
blade_b: str,
|
||||
blade_l: str,
|
||||
device_manager=None,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(name=name, device_manager=device_manager, **kwargs)
|
||||
self.top.set_model(devices= {"signal", blade_t}, compute_method=self._compute_blade_signal)
|
||||
self.top.set_model(devices= {"signal", blade_r}, compute_method=self._compute_blade_signal)
|
||||
self.top.set_model(devices= {"signal", blade_b}, compute_method=self._compute_blade_signal)
|
||||
self.top.set_model(devices= {"signal", blade_l}, compute_method=self._compute_blade_signal)
|
||||
self.intensity
|
||||
|
||||
signal_t = self.top.get_device_object_from_bec(
|
||||
object_name=blade_t, signal_name=self.name, device_manager=device_manager
|
||||
)
|
||||
self.top.set_compute_method(self._compute_blade_signal, signal=signal_t)
|
||||
|
||||
def wait_for_connection(self, *args, **kwargs):
|
||||
# Blade models...
|
||||
model = ProcessedSignalModel(
|
||||
devices = {"signal" : self._blade_names[0]},
|
||||
compute_method=self._compute_blade_signal,
|
||||
return_type=float,
|
||||
signal_r = self.right.get_device_object_from_bec(
|
||||
object_name=blade_r, signal_name=self.name, device_manager=device_manager
|
||||
)
|
||||
self.top.set_model(model)
|
||||
self.top.wait_for_connection()
|
||||
model = ProcessedSignalModel(
|
||||
devices = {"signal" : self._blade_names[1]},
|
||||
compute_method=self._compute_blade_signal,
|
||||
return_type=float,
|
||||
)
|
||||
self.right.set_model(model)
|
||||
self.right.wait_for_connection()
|
||||
model = ProcessedSignalModel(
|
||||
devices = {"signal" : self._blade_names[2]},
|
||||
compute_method=self._compute_blade_signal,
|
||||
return_type=float,
|
||||
)
|
||||
self.bot.set_model(model)
|
||||
self.bot.wait_for_connection()
|
||||
model = ProcessedSignalModel(
|
||||
devices = {"signal" : self._blade_names[3]},
|
||||
compute_method=self._compute_blade_signal,
|
||||
return_type=float,
|
||||
)
|
||||
self.left.set_model(model)
|
||||
self.left.wait_for_connection()
|
||||
self.right.set_compute_method(self._compute_blade_signal, signal=signal_r)
|
||||
|
||||
# Intensity
|
||||
model = ProcessedSignalModel(
|
||||
devices = {
|
||||
"top" : f"{self.name}_{self.top.name}",
|
||||
"right" : f"{self.name}_{self.right.name}",
|
||||
"bot" : f"{self.name}_{self.bot.name}",
|
||||
"left" : f"{self.name}_{self.left.name}",
|
||||
},
|
||||
compute_method=self._compute_intensity,
|
||||
return_type=float
|
||||
signal_b = self.bot.get_device_object_from_bec(
|
||||
object_name=blade_b, signal_name=self.name, device_manager=device_manager
|
||||
)
|
||||
self.intensity.set_model(model)
|
||||
dev_objs = {
|
||||
"top" : self.top,
|
||||
"right" : self.right,
|
||||
"bot" : self.bot,
|
||||
"left" : self.left,
|
||||
}
|
||||
self.intensity.set_device_object(device_objects=dev_objs)
|
||||
self.intensity.wait_for_connection()
|
||||
return super().wait_for_connection(*args, **kwargs)
|
||||
self.bot.set_compute_method(self._compute_blade_signal, signal=signal_b)
|
||||
|
||||
def compute(self, signal_1: Signal, signal_2: Signal) -> float:
|
||||
"""Compute the value of the pseudo signal based on the values of signal_1 and signal_2."""
|
||||
return float(signal_1.get() + signal_2.get())
|
||||
|
||||
signal_l = self.left.get_device_object_from_bec(
|
||||
object_name=blade_l, signal_name=self.name, device_manager=device_manager
|
||||
)
|
||||
self.left.set_compute_method(self._compute_blade_signal, signal=signal_l)
|
||||
|
||||
def _compute_blade_signal(self, signal:Signal) -> float:
|
||||
self.intensity.set_compute_method(
|
||||
self._compute_intensity, top=self.top, right=self.right, bot=self.bot, left=self.left
|
||||
)
|
||||
self.pos_x.set_compute_method(
|
||||
self._compute_pos_x, left=self.left, top=self.top, right=self.right, bot=self.bot
|
||||
)
|
||||
self.pos_y.set_compute_method(
|
||||
self._compute_pos_y, left=self.left, top=self.top, right=self.right, bot=self.bot
|
||||
)
|
||||
self.diagonal.set_compute_method(
|
||||
self._compute_diagonal, left=self.left, top=self.top, right=self.right, bot=self.bot
|
||||
)
|
||||
|
||||
def _compute_blade_signal(self, signal: Signal) -> float:
|
||||
timestamp = time.time()
|
||||
self._metadata["timestamp"] = timestamp
|
||||
return signal.get()
|
||||
|
||||
def _compute_intensity(self, top, right, bot, left) -> float:
|
||||
intensity = top.get() + right.get() + bot.get() + left.get()
|
||||
return intensity
|
||||
|
||||
def _compute_intensity(self, top: Signal, right: Signal, bot: Signal, left: Signal) -> float:
|
||||
timestamp = time.time()
|
||||
self._metadata["timestamp"] = timestamp
|
||||
intensity = top.get() + right.get() + bot.get() + left.get()
|
||||
return intensity
|
||||
|
||||
def _compute_pos_x(self, left: Signal, top: Signal, right: Signal, bot: Signal) -> float:
|
||||
timestamp = time.time()
|
||||
self._metadata["timestamp"] = timestamp
|
||||
sum_left = left.get() + top.get()
|
||||
sum_right = right.get() + bot.get()
|
||||
sum_total = sum_left + sum_right
|
||||
if sum_total == 0:
|
||||
return 0.0
|
||||
return (sum_left - sum_right) / sum_total
|
||||
|
||||
def _compute_pos_y(self, left: Signal, top: Signal, right: Signal, bot: Signal) -> float:
|
||||
timestamp = time.time()
|
||||
self._metadata["timestamp"] = timestamp
|
||||
sum_top = top.get() + right.get()
|
||||
sum_bot = bot.get() + left.get()
|
||||
sum_total = sum_top + sum_bot
|
||||
if sum_total == 0:
|
||||
return 0.0
|
||||
return (sum_top - sum_bot) / sum_total
|
||||
|
||||
def _compute_diagonal(self, left: Signal, top: Signal, right: Signal, bot: Signal) -> float:
|
||||
timestamp = time.time()
|
||||
self._metadata["timestamp"] = timestamp
|
||||
sum_diag1 = left.get() + right.get()
|
||||
sum_diag2 = top.get() + bot.get()
|
||||
sum_total = sum_diag1 + sum_diag2
|
||||
if sum_total == 0:
|
||||
return 0.0
|
||||
return (sum_diag1 - sum_diag2) / sum_total
|
||||
|
||||
Reference in New Issue
Block a user