Two solutions for combined PVs
This commit is contained in:
parent
c1cb33e0fc
commit
ae59265ef7
@ -8,6 +8,7 @@ IMPORTANT: Virtual monochromator axes should be implemented already in EPICS!!!
|
||||
"""
|
||||
|
||||
import numpy as np
|
||||
import time
|
||||
from math import isclose, tan, atan, sqrt, sin, asin
|
||||
from ophyd import (
|
||||
EpicsSignal,
|
||||
@ -18,6 +19,7 @@ from ophyd import (
|
||||
PVPositioner,
|
||||
Device,
|
||||
Component,
|
||||
DynamicDeviceComponent,
|
||||
Kind,
|
||||
)
|
||||
from ophyd.pseudopos import pseudo_position_argument, real_position_argument
|
||||
@ -95,7 +97,8 @@ class PmDetectorRotation(PseudoPositioner):
|
||||
|
||||
|
||||
class GirderMotorX1(PVPositioner):
|
||||
"""Girder X translation pseudo motor"""
|
||||
"""Girder X translation pseudo motor
|
||||
"""
|
||||
|
||||
setpoint = Component(EpicsSignal, ":X_SET", name="sp")
|
||||
readback = Component(EpicsSignalRO, ":X1", name="rbv")
|
||||
@ -103,7 +106,8 @@ class GirderMotorX1(PVPositioner):
|
||||
|
||||
|
||||
class GirderMotorY1(PVPositioner):
|
||||
"""Girder Y translation pseudo motor"""
|
||||
"""Girder Y translation pseudo motor
|
||||
"""
|
||||
|
||||
setpoint = Component(EpicsSignal, ":Y_SET", name="sp")
|
||||
readback = Component(EpicsSignalRO, ":Y1", name="rbv")
|
||||
@ -111,7 +115,8 @@ class GirderMotorY1(PVPositioner):
|
||||
|
||||
|
||||
class GirderMotorYAW(PVPositioner):
|
||||
"""Girder YAW pseudo motor"""
|
||||
"""Girder YAW pseudo motor
|
||||
"""
|
||||
|
||||
setpoint = Component(EpicsSignal, ":YAW_SET", name="sp")
|
||||
readback = Component(EpicsSignalRO, ":YAW1", name="rbv")
|
||||
@ -119,7 +124,8 @@ class GirderMotorYAW(PVPositioner):
|
||||
|
||||
|
||||
class GirderMotorROLL(PVPositioner):
|
||||
"""Girder ROLL pseudo motor"""
|
||||
"""Girder ROLL pseudo motor
|
||||
"""
|
||||
|
||||
setpoint = Component(EpicsSignal, ":ROLL_SET", name="sp")
|
||||
readback = Component(EpicsSignalRO, ":ROLL1", name="rbv")
|
||||
@ -127,7 +133,8 @@ class GirderMotorROLL(PVPositioner):
|
||||
|
||||
|
||||
class GirderMotorPITCH(PVPositioner):
|
||||
"""Girder YAW pseudo motor"""
|
||||
"""Girder YAW pseudo motor
|
||||
"""
|
||||
|
||||
setpoint = Component(EpicsSignal, ":PITCH_SET", name="sp")
|
||||
readback = Component(EpicsSignalRO, ":PITCH1", name="rbv")
|
||||
@ -135,7 +142,7 @@ class GirderMotorPITCH(PVPositioner):
|
||||
|
||||
|
||||
class VirtualEpicsSignalRO(EpicsSignalRO):
|
||||
"""This is a test class to create derives signals from one or
|
||||
"""This is a test class to create derives signals from one or
|
||||
multiple original signals...
|
||||
"""
|
||||
|
||||
@ -154,7 +161,8 @@ class VirtualEpicsSignalRO(EpicsSignalRO):
|
||||
|
||||
|
||||
class MonoTheta1(VirtualEpicsSignalRO):
|
||||
"""Converts the pusher motor position to theta angle"""
|
||||
"""Converts the pusher motor position to theta angle
|
||||
"""
|
||||
|
||||
_mono_a0_enc_scale1 = -1.0
|
||||
_mono_a1_lever_length1 = 206.706
|
||||
@ -170,7 +178,8 @@ class MonoTheta1(VirtualEpicsSignalRO):
|
||||
|
||||
|
||||
class MonoTheta2(VirtualEpicsSignalRO):
|
||||
"""Converts the pusher motor position to theta angle"""
|
||||
"""Converts the pusher motor position to theta angle
|
||||
"""
|
||||
|
||||
_mono_a3_enc_offs2 = -19.7072
|
||||
_mono_a2_pusher_offs2 = 5.93905
|
||||
@ -186,7 +195,8 @@ class MonoTheta2(VirtualEpicsSignalRO):
|
||||
|
||||
|
||||
class EnergyKev(VirtualEpicsSignalRO):
|
||||
"""Converts the pusher motor position to energy in keV"""
|
||||
"""Converts the pusher motor position to energy in keV
|
||||
"""
|
||||
|
||||
_mono_a3_enc_offs2 = -19.7072
|
||||
_mono_a2_pusher_offs2 = 5.93905
|
||||
@ -202,3 +212,80 @@ class EnergyKev(VirtualEpicsSignalRO):
|
||||
)
|
||||
E_keV = -self._mono_hce / self._mono_2d2 / sin(theta2_deg / 180.0 * 3.14152)
|
||||
return E_keV
|
||||
|
||||
class CombinedEpicsSignalRO(EpicsSignalRO):
|
||||
"""This is a test class to create derives signals from one or
|
||||
multiple original signals...
|
||||
"""
|
||||
def __init__(self, *args, **kwargs):
|
||||
if "pvs" in kwargs:
|
||||
self._private_signals = []
|
||||
for key in kwargs["pvs"]:
|
||||
self.__dict__[key] = EpicsSignalRO(kwargs["pvs"][key], auto_monitor=True)
|
||||
self._private_signals.append(key)
|
||||
del kwargs["pvs"]
|
||||
super().__init__(*args, **kwargs)
|
||||
def wait_for_connection(self, *args, **kwargs):
|
||||
for key in self._private_signals:
|
||||
print(key)
|
||||
self.__dict__[key].wait_for_connection()
|
||||
def calc(self, vals: list):
|
||||
return vals
|
||||
def get(self, *args, **kwargs):
|
||||
raw = [self.__dict__[key].value for key in self._private_signals]
|
||||
print(raw)
|
||||
return self.calc(raw)
|
||||
|
||||
class SumPvs(CombinedEpicsSignalRO):
|
||||
"""Adds up four current signals"""
|
||||
def calc(self, vals):
|
||||
total = 0
|
||||
for vv in vals:
|
||||
total += vv
|
||||
return total
|
||||
|
||||
|
||||
class Bpm4i(Device):
|
||||
SUB_VALUE = "value"
|
||||
_default_sub = SUB_VALUE
|
||||
|
||||
ch1 = Component(EpicsSignalRO, "S2", auto_monitor=True, kind=Kind.omitted, name="ch1")
|
||||
ch2 = Component(EpicsSignalRO, "S3", auto_monitor=True, kind=Kind.omitted, name="ch2")
|
||||
ch3 = Component(EpicsSignalRO, "S4", auto_monitor=True, kind=Kind.omitted, name="ch3")
|
||||
ch4 = Component(EpicsSignalRO, "S5", auto_monitor=True, kind=Kind.omitted, name="ch4")
|
||||
|
||||
def __init__(self, prefix="", *, name, **kwargs):
|
||||
super().__init__(prefix, name=name, **kwargs)
|
||||
self.ch1.subscribe(self._emit_value)
|
||||
|
||||
def _emit_value(self, **kwargs):
|
||||
timestamp = kwargs.pop("timestamp", time.time())
|
||||
self.wait_for_connection()
|
||||
self._run_subs(sub_type=self.SUB_VALUE, timestamp=timestamp, obj=self)
|
||||
|
||||
def get(self, *args, **kwargs):
|
||||
return self.ch1.get() + self.ch2.get() + self.ch3.get() + self.ch4.get()
|
||||
|
||||
def read(self, *args, **kwargs):
|
||||
return {self.name: {'value': self.get(), 'timestamp': time.time()}}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
#dut = SumPvs("X12SA-OP1-SCALER.S2", pvs={"q1": "X12SA-OP1-SCALER.S2", "q2": "X12SA-OP1-SCALER.S3", "q3": "X12SA-OP1-SCALER.S4", "q4": "X12SA-OP1-SCALER.S5"}, name="sum_all")
|
||||
#dut.wait_for_connection()
|
||||
#print(dut.read())
|
||||
dut = Bpm4i("X12SA-OP1-SCALER.", name="bpm4i")
|
||||
dut.wait_for_connection()
|
||||
print(dut.read())
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user