diff --git a/ophyd_devices/devices/XbpmBase.py b/ophyd_devices/devices/XbpmBase.py index 1237387..651fa05 100644 --- a/ophyd_devices/devices/XbpmBase.py +++ b/ophyd_devices/devices/XbpmBase.py @@ -2,24 +2,6 @@ import numpy as np from ophyd import Component, Device, EpicsSignal, EpicsSignalRO -class XbpmCsaxsOp(Device): - """Python wrapper for custom XBPMs in the cSAXS optics hutch - - This is completely custom XBPM with templates directly in the - VME repo. Thus it needs a custom ophyd template as well... - - WARN: The x and y are not updated by the IOC - """ - - sum = Component(EpicsSignalRO, "SUM", auto_monitor=True) - x = Component(EpicsSignalRO, "POSH", auto_monitor=True) - y = Component(EpicsSignalRO, "POSV", auto_monitor=True) - s1 = Component(EpicsSignalRO, "CHAN1", auto_monitor=True) - s2 = Component(EpicsSignalRO, "CHAN2", auto_monitor=True) - s3 = Component(EpicsSignalRO, "CHAN3", auto_monitor=True) - s4 = Component(EpicsSignalRO, "CHAN4", auto_monitor=True) - - class XbpmBase(Device): """Python wrapper for X-ray Beam Position Monitors diff --git a/ophyd_devices/devices/__init__.py b/ophyd_devices/devices/__init__.py index ec060a7..c40cc0c 100644 --- a/ophyd_devices/devices/__init__.py +++ b/ophyd_devices/devices/__init__.py @@ -3,19 +3,5 @@ from ophyd.quadem import QuadEM from ophyd.sim import SynAxis, SynPeriodicSignal, SynSignal from .epics_motor_ex import EpicsMotorEx -from .slits import SlitH, SlitV from .sls_devices import SLSInfo, SLSOperatorMessages -from .specMotors import ( - Bpm4i, - EnergyKev, - GirderMotorPITCH, - GirderMotorROLL, - GirderMotorX1, - GirderMotorY1, - GirderMotorYAW, - MonoTheta1, - MonoTheta2, - PmDetectorRotation, - PmMonoBender, -) from .SpmBase import SpmBase diff --git a/ophyd_devices/devices/mono_dccm.py b/ophyd_devices/devices/mono_dccm.py deleted file mode 100644 index 60160d3..0000000 --- a/ophyd_devices/devices/mono_dccm.py +++ /dev/null @@ -1,144 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Created on Wed Oct 13 18:06:15 2021 - -@author: mohacsi_i - -IMPORTANT: Virtual monochromator axes should be implemented already in EPICS!!! -""" - -from math import isclose - -import numpy as np -from ophyd import ( - Component, - Device, - EpicsMotor, - EpicsSignal, - EpicsSignalRO, - Kind, - PseudoPositioner, - PseudoSingle, -) -from ophyd.pseudopos import pseudo_position_argument, real_position_argument -from ophyd.sim import Syn2DGauss, SynAxis - -LN_CORR = 2e-4 - - -def a2e(angle, hkl=[1, 1, 1], lnc=False, bent=False, deg=False): - """Convert between angle and energy for Si monchromators - ATTENTION: 'angle' must be in radians, not degrees! - """ - lncorr = LN_CORR if lnc else 0.0 - angle = angle * np.pi / 180 if deg else angle - - # Lattice constant along direction - d0 = 5.43102 * (1.0 - lncorr) / np.linalg.norm(hkl) - energy = 12.39842 / (2.0 * d0 * np.sin(angle)) - return energy - - -def e2w(energy): - """Convert between energy and wavelength""" - return 0.1 * 12398.42 / energy - - -def w2e(wwl): - """Convert between wavelength and energy""" - return 12398.42 * 0.1 / wwl - - -def e2a(energy, hkl=[1, 1, 1], lnc=False, bent=False): - """Convert between energy and angle for Si monchromators - ATTENTION: 'angle' must be in radians, not degrees! - """ - lncorr = LN_CORR if lnc else 0.0 - - # Lattice constant along direction - d0 = 2 * 5.43102 * (1.0 - lncorr) / np.linalg.norm(hkl) - angle = np.arcsin(12.39842 / d0 / energy) - - # Rfine for bent mirror - if bent: - rho = 2 * 19.65 * 8.35 / 28 * np.sin(angle) - dt = 0.2e-3 / rho * 0.279 - d0 = 2 * 5.43102 * (1.0 + dt) / np.linalg.norm(hkl) - angle = np.arcsin(12.39842 / d0 / energy) - - return angle - - -class MonoMotor(PseudoPositioner): - """Monochromator axis - - Small wrapper to combine a real angular axis with the corresponding energy. - ATTENTION: 'angle' is in degrees, at least for PXIII - """ - - # Real axis (in degrees) - angle = Component(EpicsMotor, "", name="angle") - # Virtual axis - energy = Component(PseudoSingle, name="energy") - - _real = ["angle"] - - @pseudo_position_argument - def forward(self, pseudo_pos): - return self.RealPosition(angle=180.0 * e2a(pseudo_pos.energy) / 3.141592) - - @real_position_argument - def inverse(self, real_pos): - return self.PseudoPosition(energy=a2e(3.141592 * real_pos.angle / 180.0)) - - -class MonoDccm(PseudoPositioner): - """Combined DCCM monochromator - - The first crystal selects the energy, the second one is only following. - DCCMs are quite simple in terms that they can't crash and we don't - have a beam offset. - ATTENTION: 'angle' is in degrees, at least for PXIII - """ - - # Real axis (in degrees) - th1 = Component(EpicsMotor, "ROX1", name="theta1") - th2 = Component(EpicsMotor, "ROX2", name="theta2") - - # Virtual axes - en1 = Component(PseudoSingle, name="en1") - en2 = Component(PseudoSingle, name="en2") - energy = Component(PseudoSingle, name="energy", kind=Kind.hinted) - - # Other parameters - # feedback = Component(EpicsSignal, "MONOBEAM", name="feedback") - # enc1 = Component(EpicsSignalRO, "1:EXC1", name="enc1") - # enc2 = Component(EpicsSignalRO, "1:EXC2", name="enc2") - - @pseudo_position_argument - def forward(self, pseudo_pos): - """WARNING: We have an overdefined system! Not sure if common crystal movement is reliable without retuning""" - if ( - abs(pseudo_pos.energy - self.energy.position) > 0.0001 - and abs(pseudo_pos.en1 - self.en1.position) < 0.0001 - and abs(pseudo_pos.en2 - self.en2.position) < 0.0001 - ): - # Probably the common energy was changed - return self.RealPosition( - th1=-180.0 * e2a(pseudo_pos.energy) / 3.141592, - th2=180.0 * e2a(pseudo_pos.energy) / 3.141592, - ) - else: - # Probably the individual axes was changes - return self.RealPosition( - th1=-180.0 * e2a(pseudo_pos.en1) / 3.141592, - th2=180.0 * e2a(pseudo_pos.en2) / 3.141592, - ) - - @real_position_argument - def inverse(self, real_pos): - return self.PseudoPosition( - en1=-a2e(3.141592 * real_pos.th1 / 180.0), - en2=a2e(3.141592 * real_pos.th2 / 180.0), - energy=-a2e(3.141592 * real_pos.th1 / 180.0), - ) diff --git a/ophyd_devices/devices/slits.py b/ophyd_devices/devices/slits.py deleted file mode 100644 index c1ebfe6..0000000 --- a/ophyd_devices/devices/slits.py +++ /dev/null @@ -1,66 +0,0 @@ -from ophyd import Component, Device, EpicsMotor, PseudoPositioner, PseudoSingle -from ophyd.pseudopos import pseudo_position_argument, real_position_argument - - -class SlitH(PseudoPositioner): - """Python wrapper for virtual slits - - These devices should be implemented as an EPICS SoftMotor IOC, - but thats not the case for all slits. So here is a pure ophyd - implementation. Uses standard naming convention! - - NOTE: The real and virtual axes are wrapped together. - """ - - # Motor interface - x1 = Component(EpicsMotor, "TRX1") - x2 = Component(EpicsMotor, "TRX2") - - cenx = Component(PseudoSingle) - gapx = Component(PseudoSingle) - - @pseudo_position_argument - def forward(self, pseudo_pos): - """Run a forward (pseudo -> real) calculation""" - return self.RealPosition( - x1=pseudo_pos.cenx - pseudo_pos.gapx / 2, x2=pseudo_pos.cenx + pseudo_pos.gapx / 2 - ) - - @real_position_argument - def inverse(self, real_pos): - """Run an inverse (real -> pseudo) calculation""" - return self.PseudoPosition( - cenx=(real_pos.x1 + real_pos.x2) / 2, gapx=real_pos.x2 - real_pos.x1 - ) - - -class SlitV(PseudoPositioner): - """Python wrapper for virtual slits - - These devices should be implemented as an EPICS SoftMotor IOC, - but thats not the case for all slits. So here is a pure ophyd - implementation. Uses standard naming convention! - - NOTE: The real and virtual axes are wrapped together. - """ - - # Motor interface - y1 = Component(EpicsMotor, "TRY1") - y2 = Component(EpicsMotor, "TRY2") - - ceny = Component(PseudoSingle) - gapy = Component(PseudoSingle) - - @pseudo_position_argument - def forward(self, pseudo_pos): - """Run a forward (pseudo -> real) calculation""" - return self.RealPosition( - y1=pseudo_pos.ceny - pseudo_pos.gapy / 2, y2=pseudo_pos.ceny + pseudo_pos.gapy / 2 - ) - - @real_position_argument - def inverse(self, real_pos): - """Run an inverse (real -> pseudo) calculation""" - return self.PseudoPosition( - ceny=(real_pos.y1 + real_pos.y2) / 2, gapy=real_pos.y2 - real_pos.y1 - ) diff --git a/ophyd_devices/devices/specMotors.py b/ophyd_devices/devices/specMotors.py deleted file mode 100644 index 65a8e70..0000000 --- a/ophyd_devices/devices/specMotors.py +++ /dev/null @@ -1,304 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Created on Wed Oct 13 18:06:15 2021 - -@author: mohacsi_i - -IMPORTANT: Virtual monochromator axes should be implemented already in EPICS!!! -""" - -import time -from math import asin, atan, isclose, sin, sqrt, tan - -import numpy as np -from ophyd import ( - Component, - Device, - EpicsMotor, - EpicsSignal, - EpicsSignalRO, - Kind, - PseudoPositioner, - PseudoSingle, - PVPositioner, - Signal, -) -from ophyd.pseudopos import pseudo_position_argument, real_position_argument - - -class PmMonoBender(PseudoPositioner): - """Monochromator bender - - Small wrapper to combine the four monochromator bender motors. - """ - - # Real axes - ai = Component(EpicsMotor, "TRYA", name="ai") - bo = Component(EpicsMotor, "TRYB", name="bo") - co = Component(EpicsMotor, "TRYC", name="co") - di = Component(EpicsMotor, "TRYD", name="di") - - # Virtual axis - bend = Component(PseudoSingle, name="bend") - - _real = ["ai", "bo", "co", "di"] - - @pseudo_position_argument - def forward(self, pseudo_pos): - delta = pseudo_pos.bend - 0.25 * ( - self.ai.position + self.bo.position + self.co.position + self.di.position - ) - return self.RealPosition( - ai=self.ai.position + delta, - bo=self.bo.position + delta, - co=self.co.position + delta, - di=self.di.position + delta, - ) - - @real_position_argument - def inverse(self, real_pos): - return self.PseudoPosition( - bend=0.25 * (real_pos.ai + real_pos.bo + real_pos.co + real_pos.di) - ) - - -def r2d(radians): - return radians * 180 / 3.141592 - - -def d2r(degrees): - return degrees * 3.141592 / 180.0 - - -class PmDetectorRotation(PseudoPositioner): - """Detector rotation pseudo motor - - Small wrapper to convert detector pusher position to rotation angle. - """ - - _tables_dt_push_dist_mm = 890 - # Real axes - dtpush = Component(EpicsMotor, "", name="dtpush") - - # Virtual axis - dtth = Component(PseudoSingle, name="dtth") - - _real = ["dtpush"] - - @pseudo_position_argument - def forward(self, pseudo_pos): - return self.RealPosition( - dtpush=d2r(tan(-3.14 / 180 * pseudo_pos.dtth)) * self._tables_dt_push_dist_mm - ) - - @real_position_argument - def inverse(self, real_pos): - return self.PseudoPosition(dtth=r2d(-atan(real_pos.dtpush / self._tables_dt_push_dist_mm))) - - -class GirderMotorX1(PVPositioner): - """Girder X translation pseudo motor""" - - setpoint = Component(EpicsSignal, ":X_SET", name="sp") - readback = Component(EpicsSignalRO, ":X1", name="rbv") - done = Component(EpicsSignal, ":M-DMOV", name="dmov") - - -class GirderMotorY1(PVPositioner): - """Girder Y translation pseudo motor""" - - setpoint = Component(EpicsSignal, ":Y_SET", name="sp") - readback = Component(EpicsSignalRO, ":Y1", name="rbv") - done = Component(EpicsSignal, ":M-DMOV", name="dmov") - - -class GirderMotorYAW(PVPositioner): - """Girder YAW pseudo motor""" - - setpoint = Component(EpicsSignal, ":YAW_SET", name="sp") - readback = Component(EpicsSignalRO, ":YAW1", name="rbv") - done = Component(EpicsSignal, ":M-DMOV", name="dmov") - - -class GirderMotorROLL(PVPositioner): - """Girder ROLL pseudo motor""" - - setpoint = Component(EpicsSignal, ":ROLL_SET", name="sp") - readback = Component(EpicsSignalRO, ":ROLL1", name="rbv") - done = Component(EpicsSignal, ":M-DMOV", name="dmov") - - -class GirderMotorPITCH(PVPositioner): - """Girder YAW pseudo motor""" - - setpoint = Component(EpicsSignal, ":PITCH_SET", name="sp") - readback = Component(EpicsSignalRO, ":PITCH1", name="rbv") - done = Component(EpicsSignal, ":M-DMOV", name="dmov") - - -class VirtualEpicsSignalRO(EpicsSignalRO): - """This is a test class to create derives signals from one or - multiple original signals... - """ - - def calc(self, val): - return val - - def get(self, *args, **kwargs): - raw = super().get(*args, **kwargs) - return self.calc(raw) - - -class MonoTheta1(VirtualEpicsSignalRO): - """Converts the pusher motor position to theta angle""" - - _mono_a0_enc_scale1 = -1.0 - _mono_a1_lever_length1 = 206.706 - _mono_a2_pusher_offs1 = 6.85858 - _mono_a3_enc_offs1 = -16.9731 - - def calc(self, val): - asin_arg = (val - self._mono_a2_pusher_offs1) / self._mono_a1_lever_length1 - theta1 = ( - self._mono_a0_enc_scale1 * asin(asin_arg) / 3.141592 * 180.0 + self._mono_a3_enc_offs1 - ) - return theta1 - - -class MonoTheta2(VirtualEpicsSignalRO): - """Converts the pusher motor position to theta angle""" - - _mono_a3_enc_offs2 = -19.7072 - _mono_a2_pusher_offs2 = 5.93905 - _mono_a1_lever_length2 = 206.572 - _mono_a0_enc_scale2 = -1.0 - - def calc(self, val): - asin_arg = (val - self._mono_a2_pusher_offs2) / self._mono_a1_lever_length2 - theta2 = ( - self._mono_a0_enc_scale2 * asin(asin_arg) / 3.141592 * 180.0 + self._mono_a3_enc_offs2 - ) - return theta2 - - -MONO_THETA2_OFFSETS_FILENAME = ( - "/sls/X12SA/data/gac-x12saop/spec/macros/spec_data/mono_th2_offsets.txt" -) - - -class EnergyKev(VirtualEpicsSignalRO): - """Converts the pusher motor position to energy in keV""" - - _mono_add_offs = True - _mono_a3_enc_offs2 = -19.7072 - _mono_a2_pusher_offs2 = 5.93905 - _mono_a1_lever_length2 = 206.572 - _mono_a0_enc_scale2 = -1.0 - _mono_hce = 12.39852066 - _mono_2d2 = 2 * 5.43102 / sqrt(3) - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self._th2_offsets = np.loadtxt(MONO_THETA2_OFFSETS_FILENAME) - - def _mono_get_th2_offs(self, energy_keV): - if self._th2_offsets is None: - return 0.0 - - max_offs = np.max(self._th2_offsets[:, 1]) - - if max_offs > 0.2: - raise ValueError( - f"\nThe empirical moth2 corrections are as high as {max_offs} deg\nThis is unreasonable and the corrections will not be used.\n\n***PLEASE INFORM BEAMLINE SCIENTISTS***\n" - ) - - offs = np.interp(energy_keV, self._th2_offsets[:, 0], self._th2_offsets[:, 1]) - # print(offs) - return offs - - def calc(self, val): - _mono_sintheta2_to_Ekev = -self._mono_hce / self._mono_2d2 - asin_arg = (val - self._mono_a2_pusher_offs2) / self._mono_a1_lever_length2 - theta2_deg = ( - self._mono_a0_enc_scale2 * asin(asin_arg) / 3.141592 * 180.0 + self._mono_a3_enc_offs2 - ) - E_keV = _mono_sintheta2_to_Ekev / sin(theta2_deg / 180.0 * 3.141592) - - if self._mono_add_offs: - theta2_deg -= self._mono_get_th2_offs(E_keV) - E_keV = _mono_sintheta2_to_Ekev / sin(theta2_deg / 180.0 * 3.141592) - return E_keV - - -class CurrentSum(Signal): - """Adds up four current signals from the parent""" - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.parent.ch1.subscribe(self._emit_value) - - def _emit_value(self, **kwargs): - timestamp = kwargs.pop("timestamp", time.time()) - self.wait_for_connection() - self._run_subs(sub_type="value", timestamp=timestamp, obj=self) - - def get(self, *args, **kwargs): - # self.parent._cnt.set(1).wait() - self._metadata["timestamp"] = time.time() - total = ( - self.parent.ch1.get() - + self.parent.ch2.get() - + self.parent.ch3.get() - + self.parent.ch4.get() - ) - return total - - -class Bpm4i(Device): - SUB_VALUE = "value" - _default_sub = SUB_VALUE - _cont = Component(EpicsSignal, "CONT", put_complete=True, kind=Kind.omitted) - _cnt = Component(EpicsSignal, "CNT", put_complete=True, kind=Kind.omitted) - ch1 = Component(EpicsSignalRO, "S2", auto_monitor=True, kind=Kind.omitted, name="ch1") - ch2 = Component(EpicsSignalRO, "S3", auto_monitor=True, kind=Kind.omitted, name="ch2") - ch3 = Component(EpicsSignalRO, "S4", auto_monitor=True, kind=Kind.omitted, name="ch3") - ch4 = Component(EpicsSignalRO, "S5", auto_monitor=True, kind=Kind.omitted, name="ch4") - sum = Component(CurrentSum, kind=Kind.hinted, name="sum") - - def __init__( - self, - prefix="", - *, - name, - kind=None, - read_attrs=None, - configuration_attrs=None, - parent=None, - **kwargs, - ): - super().__init__( - prefix, - name=name, - kind=kind, - read_attrs=read_attrs, - configuration_attrs=configuration_attrs, - parent=parent, - **kwargs, - ) - self.sum.name = self.name - # Ensure the scaler counts automatically - self._cont.wait_for_connection() - self._cont.set(1).wait() - self.ch1.subscribe(self._emit_value) - - def _emit_value(self, **kwargs): - timestamp = kwargs.pop("timestamp", time.time()) - self.wait_for_connection() - self._run_subs(sub_type=self.SUB_VALUE, timestamp=timestamp, obj=self) - - -if __name__ == "__main__": - dut = Bpm4i("X12SA-OP1-SCALER.", name="bpm4") - dut.wait_for_connection() - print(dut.read()) - print(dut.describe())