From ae59265ef749091b2424699e5a6501d1bcd16159 Mon Sep 17 00:00:00 2001 From: Mohacsi Istvan Date: Tue, 29 Nov 2022 14:40:08 +0100 Subject: [PATCH] Two solutions for combined PVs --- ophyd_devices/epics/devices/specMotors.py | 105 ++++++++++++++++++++-- 1 file changed, 96 insertions(+), 9 deletions(-) diff --git a/ophyd_devices/epics/devices/specMotors.py b/ophyd_devices/epics/devices/specMotors.py index b2d54ef..4b75e1f 100644 --- a/ophyd_devices/epics/devices/specMotors.py +++ b/ophyd_devices/epics/devices/specMotors.py @@ -8,6 +8,7 @@ IMPORTANT: Virtual monochromator axes should be implemented already in EPICS!!! """ import numpy as np +import time from math import isclose, tan, atan, sqrt, sin, asin from ophyd import ( EpicsSignal, @@ -18,6 +19,7 @@ from ophyd import ( PVPositioner, Device, Component, + DynamicDeviceComponent, Kind, ) from ophyd.pseudopos import pseudo_position_argument, real_position_argument @@ -95,7 +97,8 @@ class PmDetectorRotation(PseudoPositioner): class GirderMotorX1(PVPositioner): - """Girder X translation pseudo motor""" + """Girder X translation pseudo motor + """ setpoint = Component(EpicsSignal, ":X_SET", name="sp") readback = Component(EpicsSignalRO, ":X1", name="rbv") @@ -103,7 +106,8 @@ class GirderMotorX1(PVPositioner): class GirderMotorY1(PVPositioner): - """Girder Y translation pseudo motor""" + """Girder Y translation pseudo motor + """ setpoint = Component(EpicsSignal, ":Y_SET", name="sp") readback = Component(EpicsSignalRO, ":Y1", name="rbv") @@ -111,7 +115,8 @@ class GirderMotorY1(PVPositioner): class GirderMotorYAW(PVPositioner): - """Girder YAW pseudo motor""" + """Girder YAW pseudo motor + """ setpoint = Component(EpicsSignal, ":YAW_SET", name="sp") readback = Component(EpicsSignalRO, ":YAW1", name="rbv") @@ -119,7 +124,8 @@ class GirderMotorYAW(PVPositioner): class GirderMotorROLL(PVPositioner): - """Girder ROLL pseudo motor""" + """Girder ROLL pseudo motor + """ setpoint = Component(EpicsSignal, ":ROLL_SET", name="sp") readback = Component(EpicsSignalRO, ":ROLL1", name="rbv") @@ -127,7 +133,8 @@ class GirderMotorROLL(PVPositioner): class GirderMotorPITCH(PVPositioner): - """Girder YAW pseudo motor""" + """Girder YAW pseudo motor + """ setpoint = Component(EpicsSignal, ":PITCH_SET", name="sp") readback = Component(EpicsSignalRO, ":PITCH1", name="rbv") @@ -135,7 +142,7 @@ class GirderMotorPITCH(PVPositioner): class VirtualEpicsSignalRO(EpicsSignalRO): - """This is a test class to create derives signals from one or + """This is a test class to create derives signals from one or multiple original signals... """ @@ -154,7 +161,8 @@ class VirtualEpicsSignalRO(EpicsSignalRO): class MonoTheta1(VirtualEpicsSignalRO): - """Converts the pusher motor position to theta angle""" + """Converts the pusher motor position to theta angle + """ _mono_a0_enc_scale1 = -1.0 _mono_a1_lever_length1 = 206.706 @@ -170,7 +178,8 @@ class MonoTheta1(VirtualEpicsSignalRO): class MonoTheta2(VirtualEpicsSignalRO): - """Converts the pusher motor position to theta angle""" + """Converts the pusher motor position to theta angle + """ _mono_a3_enc_offs2 = -19.7072 _mono_a2_pusher_offs2 = 5.93905 @@ -186,7 +195,8 @@ class MonoTheta2(VirtualEpicsSignalRO): class EnergyKev(VirtualEpicsSignalRO): - """Converts the pusher motor position to energy in keV""" + """Converts the pusher motor position to energy in keV + """ _mono_a3_enc_offs2 = -19.7072 _mono_a2_pusher_offs2 = 5.93905 @@ -202,3 +212,80 @@ class EnergyKev(VirtualEpicsSignalRO): ) E_keV = -self._mono_hce / self._mono_2d2 / sin(theta2_deg / 180.0 * 3.14152) return E_keV + +class CombinedEpicsSignalRO(EpicsSignalRO): + """This is a test class to create derives signals from one or + multiple original signals... + """ + def __init__(self, *args, **kwargs): + if "pvs" in kwargs: + self._private_signals = [] + for key in kwargs["pvs"]: + self.__dict__[key] = EpicsSignalRO(kwargs["pvs"][key], auto_monitor=True) + self._private_signals.append(key) + del kwargs["pvs"] + super().__init__(*args, **kwargs) + def wait_for_connection(self, *args, **kwargs): + for key in self._private_signals: + print(key) + self.__dict__[key].wait_for_connection() + def calc(self, vals: list): + return vals + def get(self, *args, **kwargs): + raw = [self.__dict__[key].value for key in self._private_signals] + print(raw) + return self.calc(raw) + +class SumPvs(CombinedEpicsSignalRO): + """Adds up four current signals""" + def calc(self, vals): + total = 0 + for vv in vals: + total += vv + return total + + +class Bpm4i(Device): + SUB_VALUE = "value" + _default_sub = SUB_VALUE + + ch1 = Component(EpicsSignalRO, "S2", auto_monitor=True, kind=Kind.omitted, name="ch1") + ch2 = Component(EpicsSignalRO, "S3", auto_monitor=True, kind=Kind.omitted, name="ch2") + ch3 = Component(EpicsSignalRO, "S4", auto_monitor=True, kind=Kind.omitted, name="ch3") + ch4 = Component(EpicsSignalRO, "S5", auto_monitor=True, kind=Kind.omitted, name="ch4") + + def __init__(self, prefix="", *, name, **kwargs): + super().__init__(prefix, name=name, **kwargs) + self.ch1.subscribe(self._emit_value) + + def _emit_value(self, **kwargs): + timestamp = kwargs.pop("timestamp", time.time()) + self.wait_for_connection() + self._run_subs(sub_type=self.SUB_VALUE, timestamp=timestamp, obj=self) + + def get(self, *args, **kwargs): + return self.ch1.get() + self.ch2.get() + self.ch3.get() + self.ch4.get() + + def read(self, *args, **kwargs): + return {self.name: {'value': self.get(), 'timestamp': time.time()}} + + + + + + + +if __name__ == "__main__": + #dut = SumPvs("X12SA-OP1-SCALER.S2", pvs={"q1": "X12SA-OP1-SCALER.S2", "q2": "X12SA-OP1-SCALER.S3", "q3": "X12SA-OP1-SCALER.S4", "q4": "X12SA-OP1-SCALER.S5"}, name="sum_all") + #dut.wait_for_connection() + #print(dut.read()) + dut = Bpm4i("X12SA-OP1-SCALER.", name="bpm4i") + dut.wait_for_connection() + print(dut.read()) + + + + + + +