mirror of
https://github.com/bec-project/ophyd_devices.git
synced 2025-06-23 19:27:59 +02:00
XBPM proxies
This commit is contained in:
@ -566,14 +566,21 @@ led:
|
|||||||
status: {enabled: true}
|
status: {enabled: true}
|
||||||
type: EpicsSignalRO
|
type: EpicsSignalRO
|
||||||
bpm3:
|
bpm3:
|
||||||
desc: 'XBPM 3: White beam before mono'
|
desc: 'XBPM 3: White beam before mono (VME)'
|
||||||
acquisition: {schedule: sync}
|
acquisition: {schedule: sync}
|
||||||
config: {name: bpm3, prefix: 'X12SA-OP-BPM3:'}
|
config: {name: bpm3, prefix: 'X12SA-OP-BPM1:'}
|
||||||
deviceGroup: monitor
|
deviceGroup: monitor
|
||||||
status: {enabled: true}
|
status: {enabled: true}
|
||||||
type: QuadEM
|
type: XbpmCsaxsOp
|
||||||
|
bpm4:
|
||||||
|
desc: 'XBPM 4: Somewhere around mono (VME)'
|
||||||
|
acquisition: {schedule: sync}
|
||||||
|
config: {name: bpm4, prefix: 'X12SA-OP-BPM2:'}
|
||||||
|
deviceGroup: monitor
|
||||||
|
status: {enabled: true}
|
||||||
|
type: XbpmCsaxsOp
|
||||||
bpm5:
|
bpm5:
|
||||||
desc: 'XBPM 5: After mirror'
|
desc: 'XBPM 5: Not commissioned'
|
||||||
acquisition: {schedule: sync}
|
acquisition: {schedule: sync}
|
||||||
config: {name: bpm5, prefix: 'X12SA-OP-BPM5:'}
|
config: {name: bpm5, prefix: 'X12SA-OP-BPM5:'}
|
||||||
deviceGroup: monitor
|
deviceGroup: monitor
|
||||||
|
104
ophyd_devices/epics/proxies/SpmBase.py
Normal file
104
ophyd_devices/epics/proxies/SpmBase.py
Normal file
@ -0,0 +1,104 @@
|
|||||||
|
import numpy as np
|
||||||
|
from ophyd import Device, Component, EpicsSignal, EpicsSignalRO
|
||||||
|
import matplotlib.pyplot as plt
|
||||||
|
|
||||||
|
class SpmBase(Device):
|
||||||
|
""" Python wrapper for the Staggered Blade Pair Monitors
|
||||||
|
|
||||||
|
SPM's consist of a set of four horizontal tungsten blades and are
|
||||||
|
used to monitor the beam height (only Y) for the bending magnet
|
||||||
|
beamlines of SLS.
|
||||||
|
|
||||||
|
Note: EPICS provided signals are read only, but the user can
|
||||||
|
change the beam position offset.
|
||||||
|
"""
|
||||||
|
# Motor interface
|
||||||
|
s1 = Component(EpicsSignalRO, "Current1", auto_monitor=True)
|
||||||
|
s2 = Component(EpicsSignalRO, "Current2", auto_monitor=True)
|
||||||
|
s3 = Component(EpicsSignalRO, "Current3", auto_monitor=True)
|
||||||
|
s4 = Component(EpicsSignalRO, "Current4", auto_monitor=True)
|
||||||
|
sum = Component(EpicsSignalRO, "SumAll", auto_monitor=True)
|
||||||
|
y = Component(EpicsSignalRO, "Y", auto_monitor=True)
|
||||||
|
scale = Component(EpicsSignal, "PositionScaleY", auto_monitor=True)
|
||||||
|
offset = Component(EpicsSignal, "PositionOffsetY", auto_monitor=True)
|
||||||
|
|
||||||
|
|
||||||
|
class SpmSim(SpmBase):
|
||||||
|
""" Python wrapper for simulated Staggered Blade Pair Monitors
|
||||||
|
|
||||||
|
SPM's consist of a set of four horizontal tungsten blades and are
|
||||||
|
used to monitor the beam height (only Y) for the bending magnet
|
||||||
|
beamlines of SLS.
|
||||||
|
|
||||||
|
This simulation device extends the basic proxy with a script that
|
||||||
|
fills signals with quasi-randomized values.
|
||||||
|
"""
|
||||||
|
# Motor interface
|
||||||
|
s1w = Component(EpicsSignal, "Current1:RAW.VAL", auto_monitor=False)
|
||||||
|
s2w = Component(EpicsSignal, "Current2:RAW.VAL", auto_monitor=False)
|
||||||
|
s3w = Component(EpicsSignal, "Current3:RAW.VAL", auto_monitor=False)
|
||||||
|
s4w = Component(EpicsSignal, "Current4:RAW.VAL", auto_monitor=False)
|
||||||
|
rangew = Component(EpicsSignal, "RANGEraw", auto_monitor=False)
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
|
||||||
|
self._MX = 0
|
||||||
|
self._MY = 0
|
||||||
|
self._I0 = 255.0
|
||||||
|
self._x = np.linspace(-5, 5, 64)
|
||||||
|
self._y = np.linspace(-5, 5, 64)
|
||||||
|
self._x, self._y = np.meshgrid(self._x, self._y)
|
||||||
|
|
||||||
|
def _simFrame(self):
|
||||||
|
"""Generator to simulate a jumping gaussian"""
|
||||||
|
# define normalized 2D gaussian
|
||||||
|
def gaus2d(x=0, y=0, mx=0, my=0, sx=1, sy=1):
|
||||||
|
return np.exp(-((x - mx)**2. / (2. * sx**2.) + (y - my)**2. / (2. * sy**2.)))
|
||||||
|
|
||||||
|
#Generator for dynamic values
|
||||||
|
self._MX = 0.75 * self._MX + 0.25 * (10.0 * np.random.random()-5.0)
|
||||||
|
self._MY = 0.75 * self._MY + 0.25 * (10.0 * np.random.random()-5.0)
|
||||||
|
self._I0 = 0.75 * self._I0 + 0.25 * (255.0 * np.random.random())
|
||||||
|
|
||||||
|
arr = self._I0 * gaus2d(self._x, self._y, self._MX, self._MY)
|
||||||
|
return arr
|
||||||
|
|
||||||
|
def sim(self):
|
||||||
|
# Get next frame
|
||||||
|
beam = self._simFrame()
|
||||||
|
total = np.sum(beam) - np.sum(beam[24:48,:])
|
||||||
|
rnge = np.floor(np.log10(total) - 0.0 )
|
||||||
|
s1 = np.sum(beam[0:16,:]) / 10**rnge
|
||||||
|
s2 = np.sum(beam[16:24,:]) / 10**rnge
|
||||||
|
s3 = np.sum(beam[40:48,:]) / 10**rnge
|
||||||
|
s4 = np.sum(beam[48:64,:]) / 10**rnge
|
||||||
|
|
||||||
|
self.s1w.set(s1).wait()
|
||||||
|
self.s2w.set(s2).wait()
|
||||||
|
self.s3w.set(s3).wait()
|
||||||
|
self.s4w.set(s4).wait()
|
||||||
|
self.rangew.set(rnge).wait()
|
||||||
|
# Print debug info
|
||||||
|
print(f"Raw signals: R={rnge}\t{s1}\t{s2}\t{s3}\t{s4}")
|
||||||
|
#plt.imshow(beam)
|
||||||
|
#plt.show(block=False)
|
||||||
|
plt.pause(0.5)
|
||||||
|
|
||||||
|
|
||||||
|
# Automatically start simulation if directly invoked
|
||||||
|
if __name__ == "__main__":
|
||||||
|
spm1 = SpmSim("X06D-FE-BM1:", name="spm1")
|
||||||
|
spm2 = SpmSim("X06D-FE-BM2:", name="spm2")
|
||||||
|
|
||||||
|
spm1.wait_for_connection(timeout=5)
|
||||||
|
spm2.wait_for_connection(timeout=5)
|
||||||
|
|
||||||
|
spm1.rangew.set(1).wait()
|
||||||
|
spm2.rangew.set(1).wait()
|
||||||
|
|
||||||
|
while True:
|
||||||
|
print("---")
|
||||||
|
spm1.sim()
|
||||||
|
spm2.sim()
|
||||||
|
|
153
ophyd_devices/epics/proxies/XbpmBase.py
Normal file
153
ophyd_devices/epics/proxies/XbpmBase.py
Normal file
@ -0,0 +1,153 @@
|
|||||||
|
import numpy as np
|
||||||
|
from ophyd import Device, Component, EpicsSignal, EpicsSignalRO
|
||||||
|
import matplotlib.pyplot as plt
|
||||||
|
|
||||||
|
|
||||||
|
class XbpmCsaxsOp(Device):
|
||||||
|
""" Python wrapper for custom XBPMs in the cSAXS optics hutch
|
||||||
|
|
||||||
|
This is completely custom XBPM with templates directly in the
|
||||||
|
VME repo. Thus it needs a custom ophyd template as well...
|
||||||
|
|
||||||
|
WARN: The x and y are not updated by the IOC
|
||||||
|
"""
|
||||||
|
sum = Component(EpicsSignalRO, "SUM", auto_monitor=True)
|
||||||
|
x = Component(EpicsSignalRO, "POSH", auto_monitor=True)
|
||||||
|
y = Component(EpicsSignalRO, "POSV", auto_monitor=True)
|
||||||
|
s1 = Component(EpicsSignalRO, "CHAN1", auto_monitor=True)
|
||||||
|
s2 = Component(EpicsSignalRO, "CHAN2", auto_monitor=True)
|
||||||
|
s3 = Component(EpicsSignalRO, "CHAN3", auto_monitor=True)
|
||||||
|
s4 = Component(EpicsSignalRO, "CHAN4", auto_monitor=True)
|
||||||
|
|
||||||
|
|
||||||
|
class XbpmBase(Device):
|
||||||
|
""" Python wrapper for X-ray Beam Position Monitors
|
||||||
|
|
||||||
|
XBPM's consist of a metal-coated diamond window that ejects
|
||||||
|
photoelectrons from the incoming X-ray beam. These electons
|
||||||
|
are collected and their current is measured. Effectively
|
||||||
|
they act as four quadrant photodiodes and are used as BPMs
|
||||||
|
at the undulator beamlines of SLS.
|
||||||
|
|
||||||
|
Note: EPICS provided signals are read only, but the user can
|
||||||
|
change the beam position offset.
|
||||||
|
"""
|
||||||
|
# Motor interface
|
||||||
|
s1 = Component(EpicsSignalRO, "Current1", auto_monitor=True)
|
||||||
|
s2 = Component(EpicsSignalRO, "Current2", auto_monitor=True)
|
||||||
|
s3 = Component(EpicsSignalRO, "Current3", auto_monitor=True)
|
||||||
|
s4 = Component(EpicsSignalRO, "Current4", auto_monitor=True)
|
||||||
|
sum = Component(EpicsSignalRO, "SumAll", auto_monitor=True)
|
||||||
|
asymH = Component(EpicsSignalRO, "asymH", auto_monitor=True)
|
||||||
|
asymV = Component(EpicsSignalRO, "asymV", auto_monitor=True)
|
||||||
|
x = Component(EpicsSignalRO, "X", auto_monitor=True)
|
||||||
|
y = Component(EpicsSignalRO, "Y", auto_monitor=True)
|
||||||
|
scaleH = Component(EpicsSignal, "PositionScaleX", auto_monitor=False)
|
||||||
|
offsetH = Component(EpicsSignal, "PositionOffsetX", auto_monitor=False)
|
||||||
|
scaleV = Component(EpicsSignal, "PositionScaleY", auto_monitor=False)
|
||||||
|
offsetV = Component(EpicsSignal, "PositionOffsetY", auto_monitor=False)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class XbpmSim(XbpmBase):
|
||||||
|
""" Python wrapper for simulated X-ray Beam Position Monitors
|
||||||
|
|
||||||
|
XBPM's consist of a metal-coated diamond window that ejects
|
||||||
|
photoelectrons from the incoming X-ray beam. These electons
|
||||||
|
are collected and their current is measured. Effectively
|
||||||
|
they act as four quadrant photodiodes and are used as BPMs
|
||||||
|
at the undulator beamlines of SLS.
|
||||||
|
|
||||||
|
Note: EPICS provided signals are read only, but the user can
|
||||||
|
change the beam position offset.
|
||||||
|
|
||||||
|
This simulation device extends the basic proxy with a script that
|
||||||
|
fills signals with quasi-randomized values.
|
||||||
|
"""
|
||||||
|
# Motor interface
|
||||||
|
s1w = Component(EpicsSignal, "Current1:RAW.VAL", auto_monitor=False)
|
||||||
|
s2w = Component(EpicsSignal, "Current2:RAW.VAL", auto_monitor=False)
|
||||||
|
s3w = Component(EpicsSignal, "Current3:RAW.VAL", auto_monitor=False)
|
||||||
|
s4w = Component(EpicsSignal, "Current4:RAW.VAL", auto_monitor=False)
|
||||||
|
rangew = Component(EpicsSignal, "RANGEraw", auto_monitor=False)
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
|
||||||
|
self._MX = 0
|
||||||
|
self._MY = 0
|
||||||
|
self._I0 = 255.0
|
||||||
|
self._x = np.linspace(-5, 5, 64)
|
||||||
|
self._y = np.linspace(-5, 5, 64)
|
||||||
|
self._x, self._y = np.meshgrid(self._x, self._y)
|
||||||
|
|
||||||
|
def _simFrame(self):
|
||||||
|
"""Generator to simulate a jumping gaussian"""
|
||||||
|
# define normalized 2D gaussian
|
||||||
|
def gaus2d(x=0, y=0, mx=0, my=0, sx=1, sy=1):
|
||||||
|
return np.exp(-((x - mx)**2. / (2. * sx**2.) + (y - my)**2. / (2. * sy**2.)))
|
||||||
|
|
||||||
|
#Generator for dynamic values
|
||||||
|
self._MX = 0.75 * self._MX + 0.25 * (10.0 * np.random.random()-5.0)
|
||||||
|
self._MY = 0.75 * self._MY + 0.25 * (10.0 * np.random.random()-5.0)
|
||||||
|
self._I0 = 0.75 * self._I0 + 0.25 * (255.0 * np.random.random())
|
||||||
|
|
||||||
|
arr = self._I0 * gaus2d(self._x, self._y, self._MX, self._MY)
|
||||||
|
return arr
|
||||||
|
|
||||||
|
def sim(self):
|
||||||
|
# Get next frame
|
||||||
|
beam = self._simFrame()
|
||||||
|
total = np.sum(beam)
|
||||||
|
rnge = np.floor(np.log10(total) - 0.0 )
|
||||||
|
s1 = np.sum(beam[32:64,32:64]) / 10**rnge
|
||||||
|
s2 = np.sum(beam[0:32,32:64]) / 10**rnge
|
||||||
|
s3 = np.sum(beam[32:64,0:32]) / 10**rnge
|
||||||
|
s4 = np.sum(beam[0:32,0:32]) / 10**rnge
|
||||||
|
|
||||||
|
self.s1w.set(s1).wait()
|
||||||
|
self.s2w.set(s2).wait()
|
||||||
|
self.s3w.set(s3).wait()
|
||||||
|
self.s4w.set(s4).wait()
|
||||||
|
self.rangew.set(rnge).wait()
|
||||||
|
# Print debug info
|
||||||
|
print(f"Raw signals: R={rnge}\t{s1}\t{s2}\t{s3}\t{s4}")
|
||||||
|
#plt.imshow(beam)
|
||||||
|
#plt.show(block=False)
|
||||||
|
plt.pause(0.5)
|
||||||
|
|
||||||
|
|
||||||
|
# Automatically start simulation if directly invoked
|
||||||
|
if __name__ == "__main__":
|
||||||
|
xbpm1 = XbpmSim("X01DA-FE-XBPM1:", name="xbpm1")
|
||||||
|
xbpm2 = XbpmSim("X01DA-FE-XBPM2:", name="xbpm2")
|
||||||
|
|
||||||
|
xbpm1.wait_for_connection(timeout=5)
|
||||||
|
xbpm2.wait_for_connection(timeout=5)
|
||||||
|
|
||||||
|
xbpm1.rangew.set(1).wait()
|
||||||
|
xbpm2.rangew.set(1).wait()
|
||||||
|
|
||||||
|
while True:
|
||||||
|
print("---")
|
||||||
|
xbpm1.sim()
|
||||||
|
xbpm2.sim()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -1,2 +1,4 @@
|
|||||||
from .DelayGeneratorDG645 import DelayGeneratorDG645
|
from .DelayGeneratorDG645 import DelayGeneratorDG645
|
||||||
from .slits import SlitH, SlitV
|
from .slits import SlitH, SlitV
|
||||||
|
from .XbpmBase import XbpmBase, XbpmCsaxsOp
|
||||||
|
from .SpmBase import SpmBase
|
||||||
|
130
ophyd_devices/epics/proxies/quadem.py
Normal file
130
ophyd_devices/epics/proxies/quadem.py
Normal file
@ -0,0 +1,130 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
"""
|
||||||
|
Created on Wed Oct 13 18:06:15 2021
|
||||||
|
|
||||||
|
@author: mohacsi_i
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
from math import isclose
|
||||||
|
from ophyd import EpicsSignal, EpicsSignalRO, EpicsMotor, PseudoPositioner, PseudoSingle, Device, Component, Kind
|
||||||
|
from ophyd.pseudopos import pseudo_position_argument, real_position_argument
|
||||||
|
from ophyd.sim import SynAxis, Syn2DGauss
|
||||||
|
|
||||||
|
LN_CORR = 2e-4
|
||||||
|
|
||||||
|
def a2e(angle, hkl=[1,1,1], lnc=False, bent=False, deg=False):
|
||||||
|
""" Convert between angle and energy for Si monchromators
|
||||||
|
ATTENTION: 'angle' must be in radians, not degrees!
|
||||||
|
"""
|
||||||
|
lncorr = LN_CORR if lnc else 0.0
|
||||||
|
angle = angle*np.pi/180 if deg else angle
|
||||||
|
|
||||||
|
# Lattice constant along direction
|
||||||
|
d0 = 5.43102 * (1.0-lncorr) / np.linalg.norm(hkl)
|
||||||
|
energy = 12.39842 / (2.0 * d0 * np.sin(angle))
|
||||||
|
return energy
|
||||||
|
|
||||||
|
|
||||||
|
def e2w(energy):
|
||||||
|
""" Convert between energy and wavelength
|
||||||
|
"""
|
||||||
|
return 0.1 * 12398.42 / energy
|
||||||
|
|
||||||
|
|
||||||
|
def w2e(wwl):
|
||||||
|
""" Convert between wavelength and energy
|
||||||
|
"""
|
||||||
|
return 12398.42 * 0.1 / wwl
|
||||||
|
|
||||||
|
|
||||||
|
def e2a(energy, hkl=[1,1,1], lnc=False, bent=False):
|
||||||
|
""" Convert between energy and angle for Si monchromators
|
||||||
|
ATTENTION: 'angle' must be in radians, not degrees!
|
||||||
|
"""
|
||||||
|
lncorr = LN_CORR if lnc else 0.0
|
||||||
|
|
||||||
|
# Lattice constant along direction
|
||||||
|
d0 = 2*5.43102 * (1.0-lncorr) / np.linalg.norm(hkl)
|
||||||
|
angle = np.arcsin(12.39842/d0/energy)
|
||||||
|
|
||||||
|
# Rfine for bent mirror
|
||||||
|
if bent:
|
||||||
|
rho = 2 * 19.65 * 8.35 / 28 * np.sin(angle)
|
||||||
|
dt = 0.2e-3 / rho * 0.279
|
||||||
|
d0 = 2 * 5.43102 * (1.0+dt) / np.linalg.norm(hkl)
|
||||||
|
angle = np.arcsin(12.39842/d0/energy)
|
||||||
|
|
||||||
|
return angle
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class MonoMotor(PseudoPositioner):
|
||||||
|
""" Monochromator axis
|
||||||
|
|
||||||
|
Small wrapper to combine a real angular axis with the corresponding energy.
|
||||||
|
ATTENTION: 'angle' is in degrees, at least for PXIII
|
||||||
|
"""
|
||||||
|
# Real axis (in degrees)
|
||||||
|
angle = Component(EpicsMotor, "", name='angle')
|
||||||
|
# Virtual axis
|
||||||
|
energy = Component(PseudoSingle, name='energy')
|
||||||
|
|
||||||
|
_real = ['angle']
|
||||||
|
|
||||||
|
@pseudo_position_argument
|
||||||
|
def forward(self, pseudo_pos):
|
||||||
|
return self.RealPosition(angle=180.0*e2a(pseudo_pos.energy)/3.141592)
|
||||||
|
|
||||||
|
@real_position_argument
|
||||||
|
def inverse(self, real_pos):
|
||||||
|
return self.PseudoPosition(energy=a2e(3.141592*real_pos.angle/180.0))
|
||||||
|
|
||||||
|
|
||||||
|
class MonoDccm(PseudoPositioner):
|
||||||
|
""" Combined DCCM monochromator
|
||||||
|
|
||||||
|
The first crystal selects the energy, the second one is only following.
|
||||||
|
DCCMs are quite simple in terms that they can't crash and we don't
|
||||||
|
have a beam offset.
|
||||||
|
ATTENTION: 'angle' is in degrees, at least for PXIII
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Real axis (in degrees)
|
||||||
|
th1 = Component(EpicsMotor, "ROX1", name='theta1')
|
||||||
|
th2 = Component(EpicsMotor, "ROX2", name='theta2')
|
||||||
|
|
||||||
|
# Virtual axes
|
||||||
|
en1 = Component(PseudoSingle, name='en1')
|
||||||
|
en2 = Component(PseudoSingle, name='en2')
|
||||||
|
energy = Component(PseudoSingle, name='energy', kind=Kind.hinted)
|
||||||
|
|
||||||
|
# Other parameters
|
||||||
|
#feedback = Component(EpicsSignal, "MONOBEAM", name="feedback")
|
||||||
|
#enc1 = Component(EpicsSignalRO, "1:EXC1", name="enc1")
|
||||||
|
#enc2 = Component(EpicsSignalRO, "1:EXC2", name="enc2")
|
||||||
|
|
||||||
|
@pseudo_position_argument
|
||||||
|
def forward(self, pseudo_pos):
|
||||||
|
"""
|
||||||
|
WARNING: We have an overdefined system! Not sure if common crystal movement is reliable without retuning
|
||||||
|
|
||||||
|
"""
|
||||||
|
if abs(pseudo_pos.energy-self.energy.position) > 0.0001 and abs(pseudo_pos.en1-self.en1.position) < 0.0001 and abs(pseudo_pos.en2-self.en2.position) < 0.0001:
|
||||||
|
# Probably the common energy was changed
|
||||||
|
return self.RealPosition(th1=-180.0*e2a(pseudo_pos.energy)/3.141592, th2=180.0*e2a(pseudo_pos.energy)/3.141592)
|
||||||
|
else:
|
||||||
|
# Probably the individual axes was changes
|
||||||
|
return self.RealPosition(th1=-180.0*e2a(pseudo_pos.en1)/3.141592, th2=180.0*e2a(pseudo_pos.en2)/3.141592)
|
||||||
|
|
||||||
|
@real_position_argument
|
||||||
|
def inverse(self, real_pos):
|
||||||
|
return self.PseudoPosition(en1=-a2e(3.141592*real_pos.th1/180.0),
|
||||||
|
en2=a2e(3.141592*real_pos.th2/180.0),
|
||||||
|
energy=-a2e(3.141592*real_pos.th1/180.0))
|
||||||
|
|
||||||
|
|
Reference in New Issue
Block a user