Merge branch 'feat-combined-channels' into 'master'

Two solutions for combined EPICS PVs

See merge request bec/ophyd_devices!11
This commit is contained in:
wakonig_k 2022-11-29 16:13:22 +00:00
commit e11ef9533c
3 changed files with 53 additions and 13 deletions

View File

@ -7,6 +7,7 @@ from .devices.DelayGeneratorDG645 import DelayGeneratorDG645
from .devices.InsertionDevice import InsertionDevice from .devices.InsertionDevice import InsertionDevice
from .devices.slits import SlitH, SlitV from .devices.slits import SlitH, SlitV
from .devices.specMotors import ( from .devices.specMotors import (
Bpm4i,
EnergyKev, EnergyKev,
GirderMotorPITCH, GirderMotorPITCH,
GirderMotorROLL, GirderMotorROLL,

View File

@ -4,19 +4,19 @@ from .XbpmBase import XbpmBase, XbpmCsaxsOp
from .SpmBase import SpmBase from .SpmBase import SpmBase
from .InsertionDevice import InsertionDevice from .InsertionDevice import InsertionDevice
from .specMotors import ( from .specMotors import (
PmMonoBender, Bpm4i,
PmDetectorRotation, EnergyKev,
GirderMotorPITCH,
GirderMotorROLL,
GirderMotorX1, GirderMotorX1,
GirderMotorY1, GirderMotorY1,
GirderMotorROLL,
GirderMotorYAW, GirderMotorYAW,
GirderMotorPITCH,
MonoTheta1, MonoTheta1,
MonoTheta2, MonoTheta2,
EnergyKev, PmDetectorRotation,
PmMonoBender,
) )
# Standard ophyd classes # Standard ophyd classes
from ophyd import EpicsSignal, EpicsSignalRO, EpicsMotor from ophyd import EpicsSignal, EpicsSignalRO, EpicsMotor
from ophyd.sim import SynAxis, SynSignal, SynPeriodicSignal from ophyd.sim import SynAxis, SynSignal, SynPeriodicSignal

View File

@ -8,6 +8,7 @@ IMPORTANT: Virtual monochromator axes should be implemented already in EPICS!!!
""" """
import numpy as np import numpy as np
import time
from math import isclose, tan, atan, sqrt, sin, asin from math import isclose, tan, atan, sqrt, sin, asin
from ophyd import ( from ophyd import (
EpicsSignal, EpicsSignal,
@ -17,11 +18,11 @@ from ophyd import (
PseudoSingle, PseudoSingle,
PVPositioner, PVPositioner,
Device, Device,
Signal,
Component, Component,
Kind, Kind,
) )
from ophyd.pseudopos import pseudo_position_argument, real_position_argument from ophyd.pseudopos import pseudo_position_argument, real_position_argument
from ophyd.utils.epics_pvs import data_type
class PmMonoBender(PseudoPositioner): class PmMonoBender(PseudoPositioner):
@ -146,12 +147,6 @@ class VirtualEpicsSignalRO(EpicsSignalRO):
raw = super().get(*args, **kwargs) raw = super().get(*args, **kwargs)
return self.calc(raw) return self.calc(raw)
# def describe(self):
# val = self.get()
# d = super().describe()
# d[self.name]["dtype"] = data_type(val)
# return d
class MonoTheta1(VirtualEpicsSignalRO): class MonoTheta1(VirtualEpicsSignalRO):
"""Converts the pusher motor position to theta angle""" """Converts the pusher motor position to theta angle"""
@ -202,3 +197,47 @@ class EnergyKev(VirtualEpicsSignalRO):
) )
E_keV = -self._mono_hce / self._mono_2d2 / sin(theta2_deg / 180.0 * 3.14152) E_keV = -self._mono_hce / self._mono_2d2 / sin(theta2_deg / 180.0 * 3.14152)
return E_keV return E_keV
class CurrentSum(Signal):
"""Adds up four current signals from the parent"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.parent.ch1.subscribe(self._emit_value)
def _emit_value(self, **kwargs):
timestamp = kwargs.pop("timestamp", time.time())
self.wait_for_connection()
self._run_subs(sub_type="value", timestamp=timestamp, obj=self)
def get(self, *args, **kwargs):
total = (
self.parent.ch1.get()
+ self.parent.ch2.get()
+ self.parent.ch3.get()
+ self.parent.ch4.get()
)
return total
class Bpm4i(Device):
SUB_VALUE = "value"
_default_sub = SUB_VALUE
ch1 = Component(EpicsSignalRO, "S2", auto_monitor=True, kind=Kind.omitted, name="ch1")
ch2 = Component(EpicsSignalRO, "S3", auto_monitor=True, kind=Kind.omitted, name="ch2")
ch3 = Component(EpicsSignalRO, "S4", auto_monitor=True, kind=Kind.omitted, name="ch3")
ch4 = Component(EpicsSignalRO, "S5", auto_monitor=True, kind=Kind.omitted, name="ch4")
sum = Component(
CurrentSum,
kind=Kind.hinted,
name="sum",
)
if __name__ == "__main__":
dut = Bpm4i("X12SA-OP1-SCALER.", name="bpm4")
dut.wait_for_connection()
print(dut.read())
print(dut.describe())