From 8b74d08097705f66b4aa92816cb0c8e87d736080 Mon Sep 17 00:00:00 2001 From: gac-x12sa Date: Mon, 14 Nov 2022 15:19:35 +0100 Subject: [PATCH] XBPM proxies --- ophyd_devices/epics/db/x12sa_database.yml | 15 ++- ophyd_devices/epics/proxies/SpmBase.py | 104 +++++++++++++++ ophyd_devices/epics/proxies/XbpmBase.py | 153 ++++++++++++++++++++++ ophyd_devices/epics/proxies/__init__.py | 2 + ophyd_devices/epics/proxies/quadem.py | 130 ++++++++++++++++++ 5 files changed, 400 insertions(+), 4 deletions(-) create mode 100644 ophyd_devices/epics/proxies/SpmBase.py create mode 100644 ophyd_devices/epics/proxies/XbpmBase.py create mode 100644 ophyd_devices/epics/proxies/quadem.py diff --git a/ophyd_devices/epics/db/x12sa_database.yml b/ophyd_devices/epics/db/x12sa_database.yml index 979e7e9..846844c 100644 --- a/ophyd_devices/epics/db/x12sa_database.yml +++ b/ophyd_devices/epics/db/x12sa_database.yml @@ -566,14 +566,21 @@ led: status: {enabled: true} type: EpicsSignalRO bpm3: - desc: 'XBPM 3: White beam before mono' + desc: 'XBPM 3: White beam before mono (VME)' acquisition: {schedule: sync} - config: {name: bpm3, prefix: 'X12SA-OP-BPM3:'} + config: {name: bpm3, prefix: 'X12SA-OP-BPM1:'} deviceGroup: monitor status: {enabled: true} - type: QuadEM + type: XbpmCsaxsOp +bpm4: + desc: 'XBPM 4: Somewhere around mono (VME)' + acquisition: {schedule: sync} + config: {name: bpm4, prefix: 'X12SA-OP-BPM2:'} + deviceGroup: monitor + status: {enabled: true} + type: XbpmCsaxsOp bpm5: - desc: 'XBPM 5: After mirror' + desc: 'XBPM 5: Not commissioned' acquisition: {schedule: sync} config: {name: bpm5, prefix: 'X12SA-OP-BPM5:'} deviceGroup: monitor diff --git a/ophyd_devices/epics/proxies/SpmBase.py b/ophyd_devices/epics/proxies/SpmBase.py new file mode 100644 index 0000000..b080ac2 --- /dev/null +++ b/ophyd_devices/epics/proxies/SpmBase.py @@ -0,0 +1,104 @@ +import numpy as np +from ophyd import Device, Component, EpicsSignal, EpicsSignalRO +import matplotlib.pyplot as plt + +class SpmBase(Device): + """ Python wrapper for the Staggered Blade Pair Monitors + + SPM's consist of a set of four horizontal tungsten blades and are + used to monitor the beam height (only Y) for the bending magnet + beamlines of SLS. + + Note: EPICS provided signals are read only, but the user can + change the beam position offset. + """ + # Motor interface + s1 = Component(EpicsSignalRO, "Current1", auto_monitor=True) + s2 = Component(EpicsSignalRO, "Current2", auto_monitor=True) + s3 = Component(EpicsSignalRO, "Current3", auto_monitor=True) + s4 = Component(EpicsSignalRO, "Current4", auto_monitor=True) + sum = Component(EpicsSignalRO, "SumAll", auto_monitor=True) + y = Component(EpicsSignalRO, "Y", auto_monitor=True) + scale = Component(EpicsSignal, "PositionScaleY", auto_monitor=True) + offset = Component(EpicsSignal, "PositionOffsetY", auto_monitor=True) + + +class SpmSim(SpmBase): + """ Python wrapper for simulated Staggered Blade Pair Monitors + + SPM's consist of a set of four horizontal tungsten blades and are + used to monitor the beam height (only Y) for the bending magnet + beamlines of SLS. + + This simulation device extends the basic proxy with a script that + fills signals with quasi-randomized values. + """ + # Motor interface + s1w = Component(EpicsSignal, "Current1:RAW.VAL", auto_monitor=False) + s2w = Component(EpicsSignal, "Current2:RAW.VAL", auto_monitor=False) + s3w = Component(EpicsSignal, "Current3:RAW.VAL", auto_monitor=False) + s4w = Component(EpicsSignal, "Current4:RAW.VAL", auto_monitor=False) + rangew = Component(EpicsSignal, "RANGEraw", auto_monitor=False) + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self._MX = 0 + self._MY = 0 + self._I0 = 255.0 + self._x = np.linspace(-5, 5, 64) + self._y = np.linspace(-5, 5, 64) + self._x, self._y = np.meshgrid(self._x, self._y) + + def _simFrame(self): + """Generator to simulate a jumping gaussian""" + # define normalized 2D gaussian + def gaus2d(x=0, y=0, mx=0, my=0, sx=1, sy=1): + return np.exp(-((x - mx)**2. / (2. * sx**2.) + (y - my)**2. / (2. * sy**2.))) + + #Generator for dynamic values + self._MX = 0.75 * self._MX + 0.25 * (10.0 * np.random.random()-5.0) + self._MY = 0.75 * self._MY + 0.25 * (10.0 * np.random.random()-5.0) + self._I0 = 0.75 * self._I0 + 0.25 * (255.0 * np.random.random()) + + arr = self._I0 * gaus2d(self._x, self._y, self._MX, self._MY) + return arr + + def sim(self): + # Get next frame + beam = self._simFrame() + total = np.sum(beam) - np.sum(beam[24:48,:]) + rnge = np.floor(np.log10(total) - 0.0 ) + s1 = np.sum(beam[0:16,:]) / 10**rnge + s2 = np.sum(beam[16:24,:]) / 10**rnge + s3 = np.sum(beam[40:48,:]) / 10**rnge + s4 = np.sum(beam[48:64,:]) / 10**rnge + + self.s1w.set(s1).wait() + self.s2w.set(s2).wait() + self.s3w.set(s3).wait() + self.s4w.set(s4).wait() + self.rangew.set(rnge).wait() + # Print debug info + print(f"Raw signals: R={rnge}\t{s1}\t{s2}\t{s3}\t{s4}") + #plt.imshow(beam) + #plt.show(block=False) + plt.pause(0.5) + + +# Automatically start simulation if directly invoked +if __name__ == "__main__": + spm1 = SpmSim("X06D-FE-BM1:", name="spm1") + spm2 = SpmSim("X06D-FE-BM2:", name="spm2") + + spm1.wait_for_connection(timeout=5) + spm2.wait_for_connection(timeout=5) + + spm1.rangew.set(1).wait() + spm2.rangew.set(1).wait() + + while True: + print("---") + spm1.sim() + spm2.sim() + diff --git a/ophyd_devices/epics/proxies/XbpmBase.py b/ophyd_devices/epics/proxies/XbpmBase.py new file mode 100644 index 0000000..bcd30bd --- /dev/null +++ b/ophyd_devices/epics/proxies/XbpmBase.py @@ -0,0 +1,153 @@ +import numpy as np +from ophyd import Device, Component, EpicsSignal, EpicsSignalRO +import matplotlib.pyplot as plt + + +class XbpmCsaxsOp(Device): + """ Python wrapper for custom XBPMs in the cSAXS optics hutch + + This is completely custom XBPM with templates directly in the + VME repo. Thus it needs a custom ophyd template as well... + + WARN: The x and y are not updated by the IOC + """ + sum = Component(EpicsSignalRO, "SUM", auto_monitor=True) + x = Component(EpicsSignalRO, "POSH", auto_monitor=True) + y = Component(EpicsSignalRO, "POSV", auto_monitor=True) + s1 = Component(EpicsSignalRO, "CHAN1", auto_monitor=True) + s2 = Component(EpicsSignalRO, "CHAN2", auto_monitor=True) + s3 = Component(EpicsSignalRO, "CHAN3", auto_monitor=True) + s4 = Component(EpicsSignalRO, "CHAN4", auto_monitor=True) + + +class XbpmBase(Device): + """ Python wrapper for X-ray Beam Position Monitors + + XBPM's consist of a metal-coated diamond window that ejects + photoelectrons from the incoming X-ray beam. These electons + are collected and their current is measured. Effectively + they act as four quadrant photodiodes and are used as BPMs + at the undulator beamlines of SLS. + + Note: EPICS provided signals are read only, but the user can + change the beam position offset. + """ + # Motor interface + s1 = Component(EpicsSignalRO, "Current1", auto_monitor=True) + s2 = Component(EpicsSignalRO, "Current2", auto_monitor=True) + s3 = Component(EpicsSignalRO, "Current3", auto_monitor=True) + s4 = Component(EpicsSignalRO, "Current4", auto_monitor=True) + sum = Component(EpicsSignalRO, "SumAll", auto_monitor=True) + asymH = Component(EpicsSignalRO, "asymH", auto_monitor=True) + asymV = Component(EpicsSignalRO, "asymV", auto_monitor=True) + x = Component(EpicsSignalRO, "X", auto_monitor=True) + y = Component(EpicsSignalRO, "Y", auto_monitor=True) + scaleH = Component(EpicsSignal, "PositionScaleX", auto_monitor=False) + offsetH = Component(EpicsSignal, "PositionOffsetX", auto_monitor=False) + scaleV = Component(EpicsSignal, "PositionScaleY", auto_monitor=False) + offsetV = Component(EpicsSignal, "PositionOffsetY", auto_monitor=False) + + + + + +class XbpmSim(XbpmBase): + """ Python wrapper for simulated X-ray Beam Position Monitors + + XBPM's consist of a metal-coated diamond window that ejects + photoelectrons from the incoming X-ray beam. These electons + are collected and their current is measured. Effectively + they act as four quadrant photodiodes and are used as BPMs + at the undulator beamlines of SLS. + + Note: EPICS provided signals are read only, but the user can + change the beam position offset. + + This simulation device extends the basic proxy with a script that + fills signals with quasi-randomized values. + """ + # Motor interface + s1w = Component(EpicsSignal, "Current1:RAW.VAL", auto_monitor=False) + s2w = Component(EpicsSignal, "Current2:RAW.VAL", auto_monitor=False) + s3w = Component(EpicsSignal, "Current3:RAW.VAL", auto_monitor=False) + s4w = Component(EpicsSignal, "Current4:RAW.VAL", auto_monitor=False) + rangew = Component(EpicsSignal, "RANGEraw", auto_monitor=False) + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self._MX = 0 + self._MY = 0 + self._I0 = 255.0 + self._x = np.linspace(-5, 5, 64) + self._y = np.linspace(-5, 5, 64) + self._x, self._y = np.meshgrid(self._x, self._y) + + def _simFrame(self): + """Generator to simulate a jumping gaussian""" + # define normalized 2D gaussian + def gaus2d(x=0, y=0, mx=0, my=0, sx=1, sy=1): + return np.exp(-((x - mx)**2. / (2. * sx**2.) + (y - my)**2. / (2. * sy**2.))) + + #Generator for dynamic values + self._MX = 0.75 * self._MX + 0.25 * (10.0 * np.random.random()-5.0) + self._MY = 0.75 * self._MY + 0.25 * (10.0 * np.random.random()-5.0) + self._I0 = 0.75 * self._I0 + 0.25 * (255.0 * np.random.random()) + + arr = self._I0 * gaus2d(self._x, self._y, self._MX, self._MY) + return arr + + def sim(self): + # Get next frame + beam = self._simFrame() + total = np.sum(beam) + rnge = np.floor(np.log10(total) - 0.0 ) + s1 = np.sum(beam[32:64,32:64]) / 10**rnge + s2 = np.sum(beam[0:32,32:64]) / 10**rnge + s3 = np.sum(beam[32:64,0:32]) / 10**rnge + s4 = np.sum(beam[0:32,0:32]) / 10**rnge + + self.s1w.set(s1).wait() + self.s2w.set(s2).wait() + self.s3w.set(s3).wait() + self.s4w.set(s4).wait() + self.rangew.set(rnge).wait() + # Print debug info + print(f"Raw signals: R={rnge}\t{s1}\t{s2}\t{s3}\t{s4}") + #plt.imshow(beam) + #plt.show(block=False) + plt.pause(0.5) + + +# Automatically start simulation if directly invoked +if __name__ == "__main__": + xbpm1 = XbpmSim("X01DA-FE-XBPM1:", name="xbpm1") + xbpm2 = XbpmSim("X01DA-FE-XBPM2:", name="xbpm2") + + xbpm1.wait_for_connection(timeout=5) + xbpm2.wait_for_connection(timeout=5) + + xbpm1.rangew.set(1).wait() + xbpm2.rangew.set(1).wait() + + while True: + print("---") + xbpm1.sim() + xbpm2.sim() + + + + + + + + + + + + + + + + + diff --git a/ophyd_devices/epics/proxies/__init__.py b/ophyd_devices/epics/proxies/__init__.py index 7a737c9..5c7c1e3 100644 --- a/ophyd_devices/epics/proxies/__init__.py +++ b/ophyd_devices/epics/proxies/__init__.py @@ -1,2 +1,4 @@ from .DelayGeneratorDG645 import DelayGeneratorDG645 from .slits import SlitH, SlitV +from .XbpmBase import XbpmBase, XbpmCsaxsOp +from .SpmBase import SpmBase diff --git a/ophyd_devices/epics/proxies/quadem.py b/ophyd_devices/epics/proxies/quadem.py new file mode 100644 index 0000000..25f2414 --- /dev/null +++ b/ophyd_devices/epics/proxies/quadem.py @@ -0,0 +1,130 @@ +# -*- coding: utf-8 -*- +""" +Created on Wed Oct 13 18:06:15 2021 + +@author: mohacsi_i +""" + + + + +import numpy as np +from math import isclose +from ophyd import EpicsSignal, EpicsSignalRO, EpicsMotor, PseudoPositioner, PseudoSingle, Device, Component, Kind +from ophyd.pseudopos import pseudo_position_argument, real_position_argument +from ophyd.sim import SynAxis, Syn2DGauss + +LN_CORR = 2e-4 + +def a2e(angle, hkl=[1,1,1], lnc=False, bent=False, deg=False): + """ Convert between angle and energy for Si monchromators + ATTENTION: 'angle' must be in radians, not degrees! + """ + lncorr = LN_CORR if lnc else 0.0 + angle = angle*np.pi/180 if deg else angle + + # Lattice constant along direction + d0 = 5.43102 * (1.0-lncorr) / np.linalg.norm(hkl) + energy = 12.39842 / (2.0 * d0 * np.sin(angle)) + return energy + + +def e2w(energy): + """ Convert between energy and wavelength + """ + return 0.1 * 12398.42 / energy + + +def w2e(wwl): + """ Convert between wavelength and energy + """ + return 12398.42 * 0.1 / wwl + + +def e2a(energy, hkl=[1,1,1], lnc=False, bent=False): + """ Convert between energy and angle for Si monchromators + ATTENTION: 'angle' must be in radians, not degrees! + """ + lncorr = LN_CORR if lnc else 0.0 + + # Lattice constant along direction + d0 = 2*5.43102 * (1.0-lncorr) / np.linalg.norm(hkl) + angle = np.arcsin(12.39842/d0/energy) + + # Rfine for bent mirror + if bent: + rho = 2 * 19.65 * 8.35 / 28 * np.sin(angle) + dt = 0.2e-3 / rho * 0.279 + d0 = 2 * 5.43102 * (1.0+dt) / np.linalg.norm(hkl) + angle = np.arcsin(12.39842/d0/energy) + + return angle + + + + +class MonoMotor(PseudoPositioner): + """ Monochromator axis + + Small wrapper to combine a real angular axis with the corresponding energy. + ATTENTION: 'angle' is in degrees, at least for PXIII + """ + # Real axis (in degrees) + angle = Component(EpicsMotor, "", name='angle') + # Virtual axis + energy = Component(PseudoSingle, name='energy') + + _real = ['angle'] + + @pseudo_position_argument + def forward(self, pseudo_pos): + return self.RealPosition(angle=180.0*e2a(pseudo_pos.energy)/3.141592) + + @real_position_argument + def inverse(self, real_pos): + return self.PseudoPosition(energy=a2e(3.141592*real_pos.angle/180.0)) + + +class MonoDccm(PseudoPositioner): + """ Combined DCCM monochromator + + The first crystal selects the energy, the second one is only following. + DCCMs are quite simple in terms that they can't crash and we don't + have a beam offset. + ATTENTION: 'angle' is in degrees, at least for PXIII + """ + + # Real axis (in degrees) + th1 = Component(EpicsMotor, "ROX1", name='theta1') + th2 = Component(EpicsMotor, "ROX2", name='theta2') + + # Virtual axes + en1 = Component(PseudoSingle, name='en1') + en2 = Component(PseudoSingle, name='en2') + energy = Component(PseudoSingle, name='energy', kind=Kind.hinted) + + # Other parameters + #feedback = Component(EpicsSignal, "MONOBEAM", name="feedback") + #enc1 = Component(EpicsSignalRO, "1:EXC1", name="enc1") + #enc2 = Component(EpicsSignalRO, "1:EXC2", name="enc2") + + @pseudo_position_argument + def forward(self, pseudo_pos): + """ + WARNING: We have an overdefined system! Not sure if common crystal movement is reliable without retuning + + """ + if abs(pseudo_pos.energy-self.energy.position) > 0.0001 and abs(pseudo_pos.en1-self.en1.position) < 0.0001 and abs(pseudo_pos.en2-self.en2.position) < 0.0001: + # Probably the common energy was changed + return self.RealPosition(th1=-180.0*e2a(pseudo_pos.energy)/3.141592, th2=180.0*e2a(pseudo_pos.energy)/3.141592) + else: + # Probably the individual axes was changes + return self.RealPosition(th1=-180.0*e2a(pseudo_pos.en1)/3.141592, th2=180.0*e2a(pseudo_pos.en2)/3.141592) + + @real_position_argument + def inverse(self, real_pos): + return self.PseudoPosition(en1=-a2e(3.141592*real_pos.th1/180.0), + en2=a2e(3.141592*real_pos.th2/180.0), + energy=-a2e(3.141592*real_pos.th1/180.0)) + +