This commit is contained in:
mohacsi_i 2022-11-15 19:03:55 +01:00
parent 15950e6d05
commit 5f251db891
6 changed files with 169 additions and 92 deletions

View File

@ -51,7 +51,7 @@ def createProxy(name: str, connect=True) -> OphydObject:
ret.wait_for_connection(timeout=5)
return ret
else:
raise RuntimeError(f"Unsupported return class: {entry["type"]}")
raise RuntimeError(f"Unsupported return class: {entry['type']}")
if __name__ == "__main__":

View File

@ -7,7 +7,12 @@ Created on Tue Nov 9 16:12:47 2021
from ophyd import Device, Component, EpicsSignal, EpicsSignalRO, Kind
from ophyd import PositionerBase
from ophyd.pseudopos import pseudo_position_argument, real_position_argument, PseudoSingle, PseudoPositioner
from ophyd.pseudopos import (
pseudo_position_argument,
real_position_argument,
PseudoSingle,
PseudoPositioner,
)
class DelayStatic(Device):
@ -16,12 +21,23 @@ class DelayStatic(Device):
The signal is high after receiving the trigger until the end
of the holdoff period.
"""
# Other channel stuff
ttl_mode = Component(EpicsSignal, "OutputModeTtlSS.PROC", kind=Kind.config)
nim_mode = Component(EpicsSignal, "OutputModeNimSS.PROC", kind=Kind.config)
polarity = Component(EpicsSignal, "OutputPolarityBI", write_pv="OutputPolarityBO", name='polarity', kind=Kind.config)
amplitude = Component(EpicsSignal, "OutputAmpAI", write_pv="OutputAmpAO", name='amplitude', kind=Kind.config)
polarity = Component(EpicsSignal, "OutputOffsetAI", write_pv="OutputOffsetAO", name='offset', kind=Kind.config)
polarity = Component(
EpicsSignal,
"OutputPolarityBI",
write_pv="OutputPolarityBO",
name="polarity",
kind=Kind.config,
)
amplitude = Component(
EpicsSignal, "OutputAmpAI", write_pv="OutputAmpAO", name="amplitude", kind=Kind.config
)
polarity = Component(
EpicsSignal, "OutputOffsetAI", write_pv="OutputOffsetAO", name="offset", kind=Kind.config
)
class DummyPositioner(Device, PositionerBase):
@ -35,25 +51,26 @@ class DelayPair(PseudoPositioner):
Virtual motor interface to a pair of signals (on the frontpanel).
It offers a simple delay and pulse width interface for scanning.
"""
# The pseudo positioner axes
delay = Component(PseudoSingle, limits=(0, 2000.0), name='delay')
width = Component(PseudoSingle, limits=(0, 2000.0), name='pulsewidth')
delay = Component(PseudoSingle, limits=(0, 2000.0), name="delay")
width = Component(PseudoSingle, limits=(0, 2000.0), name="pulsewidth")
# The real delay axes
ch1 = Component(DummyPositioner, name='ch1')
ch2 = Component(DummyPositioner, name='ch2')
ch1 = Component(DummyPositioner, name="ch1")
ch2 = Component(DummyPositioner, name="ch2")
def __init__(self, *args, **kwargs):
# Change suffix names before connecting (a bit of dynamic connections)
self.__class__.__dict__['ch1'].suffix = kwargs['channel'][0]
self.__class__.__dict__['ch2'].suffix = kwargs['channel'][1]
del kwargs['channel']
self.__class__.__dict__["ch1"].suffix = kwargs["channel"][0]
self.__class__.__dict__["ch2"].suffix = kwargs["channel"][1]
del kwargs["channel"]
# Call parent to start the connections
super().__init__(*args, **kwargs)
@pseudo_position_argument
def forward(self, pseudo_pos):
"""Run a forward (pseudo -> real) calculation"""
return self.RealPosition(ch1=pseudo_pos.delay, ch2=pseudo_pos.delay+pseudo_pos.width)
return self.RealPosition(ch1=pseudo_pos.delay, ch2=pseudo_pos.delay + pseudo_pos.width)
@real_position_argument
def inverse(self, real_pos):
@ -84,32 +101,75 @@ class DelayGeneratorDG645(Device):
electronics (30V needs quite some power). This is not implemented in the
current device
"""
state = Component(EpicsSignalRO, "EventStatusLI", name='status_register')
status = Component(EpicsSignalRO, "StatusSI", name='status')
state = Component(EpicsSignalRO, "EventStatusLI", name="status_register")
status = Component(EpicsSignalRO, "StatusSI", name="status")
# Front Panel
channelT0 = Component(DelayStatic, "T0", name='T0')
channelAB = Component(DelayPair, "", name='AB', channel="AB")
channelCD = Component(DelayPair, "", name='CD', channel="CD")
channelEF = Component(DelayPair, "", name='EF', channel="EF")
channelGH = Component(DelayPair, "", name='GH', channel="GH")
channelT0 = Component(DelayStatic, "T0", name="T0")
channelAB = Component(DelayPair, "", name="AB", channel="AB")
channelCD = Component(DelayPair, "", name="CD", channel="CD")
channelEF = Component(DelayPair, "", name="EF", channel="EF")
channelGH = Component(DelayPair, "", name="GH", channel="GH")
# Minimum time between triggers
holdoff = Component(EpicsSignal, "TriggerHoldoffAI", write_pv="TriggerHoldoffAO", name='trigger_holdoff', kind=Kind.config)
inhibit = Component(EpicsSignal, "TriggerInhibitMI", write_pv="TriggerInhibitMO", name='trigger_inhibit', kind=Kind.config)
source = Component(EpicsSignal, "TriggerSourceMI", write_pv="TriggerSourceMO", name='trigger_source', kind=Kind.config)
level = Component(EpicsSignal, "TriggerLevelAI", write_pv="TriggerLevelAO", name='trigger_level', kind=Kind.config)
rate = Component(EpicsSignal, "TriggerRateAI", write_pv="TriggerRateAO", name='trigger_rate', kind=Kind.config)
holdoff = Component(
EpicsSignal,
"TriggerHoldoffAI",
write_pv="TriggerHoldoffAO",
name="trigger_holdoff",
kind=Kind.config,
)
inhibit = Component(
EpicsSignal,
"TriggerInhibitMI",
write_pv="TriggerInhibitMO",
name="trigger_inhibit",
kind=Kind.config,
)
source = Component(
EpicsSignal,
"TriggerSourceMI",
write_pv="TriggerSourceMO",
name="trigger_source",
kind=Kind.config,
)
level = Component(
EpicsSignal,
"TriggerLevelAI",
write_pv="TriggerLevelAO",
name="trigger_level",
kind=Kind.config,
)
rate = Component(
EpicsSignal,
"TriggerRateAI",
write_pv="TriggerRateAO",
name="trigger_rate",
kind=Kind.config,
)
# Command PVs
arm = Component(EpicsSignal, "TriggerDelayBI", write_pv="TriggerDelayBO", name='arm', kind=Kind.omitted)
arm = Component(
EpicsSignal, "TriggerDelayBI", write_pv="TriggerDelayBO", name="arm", kind=Kind.omitted
)
# Burst mode
burstMode = Component(EpicsSignal, "BurstModeBI", write_pv="BurstModeBO", name='burstmode', kind=Kind.config)
burstConfig = Component(EpicsSignal, "BurstConfigBI", write_pv="BurstConfigBO", name='burstconfig', kind=Kind.config)
burstCount = Component(EpicsSignal, "BurstCountLI", write_pv="BurstCountLO", name='burstcount', kind=Kind.config)
burstDelay = Component(EpicsSignal, "BurstDelayAI", write_pv="BurstDelayAO", name='burstdelay', kind=Kind.config)
burstPeriod = Component(EpicsSignal, "BurstPeriodAI", write_pv="BurstPeriodAO", name='burstperiod', kind=Kind.config)
burstMode = Component(
EpicsSignal, "BurstModeBI", write_pv="BurstModeBO", name="burstmode", kind=Kind.config
)
burstConfig = Component(
EpicsSignal, "BurstConfigBI", write_pv="BurstConfigBO", name="burstconfig", kind=Kind.config
)
burstCount = Component(
EpicsSignal, "BurstCountLI", write_pv="BurstCountLO", name="burstcount", kind=Kind.config
)
burstDelay = Component(
EpicsSignal, "BurstDelayAI", write_pv="BurstDelayAO", name="burstdelay", kind=Kind.config
)
burstPeriod = Component(
EpicsSignal, "BurstPeriodAI", write_pv="BurstPeriodAO", name="burstperiod", kind=Kind.config
)
def stage(self):
"""Trigger the generator by arming to accept triggers"""
@ -126,16 +186,16 @@ class DelayGeneratorDG645(Device):
assert count > 0, "Number of bursts must be positive"
assert delay > 0, "Burst delay must be positive"
assert period > 0, "Burst period must be positive"
assert config in ['all', 'first'], "Supported bust configs are 'all' and 'first'"
assert config in ["all", "first"], "Supported bust configs are 'all' and 'first'"
self.burstMode.set(1).wait()
self.burstCount.set(count).wait()
self.burstDelay.set(delay).wait()
self.burstPeriod.set(period).wait()
if config=="all":
if config == "all":
self.burstConfig.set(0).wait()
elif config=="first":
elif config == "first":
self.burstConfig.set(1).wait()
def busrtDisable(self):

View File

@ -10,6 +10,7 @@ class InsertionDevice(PVPositioner):
WARN: The x and y are not updated by the IOC
"""
status = Component(EpicsSignalRO, "-USER:STATUS", auto_monitor=True)
errorSource = Component(EpicsSignalRO, "-USER:ERROR-SOURCE", auto_monitor=True)
isOpen = Component(EpicsSignalRO, "-GAP:ISOPEN", auto_monitor=True)
@ -20,6 +21,7 @@ class InsertionDevice(PVPositioner):
done = Component(EpicsSignalRO, ":DONE", auto_monitor=True)
stop_signal = Component(EpicsSignal, "-GAP:STOP", kind=Kind.omitted)
# Automatically start simulation if directly invoked
# (NA for important devices)
if __name__ == "__main__":

View File

@ -13,6 +13,7 @@ class SpmBase(Device):
Note: EPICS provided signals are read only, but the users can
change the beam position offset.
"""
# Motor interface
s1 = Component(EpicsSignalRO, "Current1", auto_monitor=True)
s2 = Component(EpicsSignalRO, "Current2", auto_monitor=True)
@ -34,6 +35,7 @@ class SpmSim(SpmBase):
This simulation device extends the basic proxy with a script that
fills signals with quasi-randomized values.
"""
# Motor interface
s1w = Component(EpicsSignal, "Current1:RAW.VAL", auto_monitor=False)
s2w = Component(EpicsSignal, "Current2:RAW.VAL", auto_monitor=False)
@ -55,7 +57,9 @@ class SpmSim(SpmBase):
"""Generator to simulate a jumping gaussian"""
# Define normalized 2D gaussian
def gaus2d(x=0, y=0, mx=0, my=0, sx=1, sy=1):
return np.exp(-((x - mx)**2. / (2. * sx**2.) + (y - my)**2. / (2. * sy**2.)))
return np.exp(
-((x - mx) ** 2.0 / (2.0 * sx**2.0) + (y - my) ** 2.0 / (2.0 * sy**2.0))
)
# Generator for dynamic values
self._MX = 0.75 * self._MX + 0.25 * (10.0 * np.random.random() - 5.0)

View File

@ -11,6 +11,7 @@ class XbpmCsaxsOp(Device):
WARN: The x and y are not updated by the IOC
"""
sum = Component(EpicsSignalRO, "SUM", auto_monitor=True)
x = Component(EpicsSignalRO, "POSH", auto_monitor=True)
y = Component(EpicsSignalRO, "POSV", auto_monitor=True)
@ -32,6 +33,7 @@ class XbpmBase(Device):
Note: EPICS provided signals are read only, but the user can
change the beam position offset.
"""
# Motor interface
s1 = Component(EpicsSignalRO, "Current1", auto_monitor=True)
s2 = Component(EpicsSignalRO, "Current2", auto_monitor=True)
@ -63,6 +65,7 @@ class XbpmSim(XbpmBase):
This simulation device extends the basic proxy with a script that
fills signals with quasi-randomized values.
"""
# Motor interface
s1w = Component(EpicsSignal, "Current1:RAW.VAL", auto_monitor=False)
s2w = Component(EpicsSignal, "Current2:RAW.VAL", auto_monitor=False)
@ -84,11 +87,13 @@ class XbpmSim(XbpmBase):
"""Generator to simulate a jumping gaussian"""
# define normalized 2D gaussian
def gaus2d(x=0, y=0, mx=0, my=0, sx=1, sy=1):
return np.exp(-((x - mx)**2. / (2. * sx**2.) + (y - my)**2. / (2. * sy**2.)))
return np.exp(
-((x - mx) ** 2.0 / (2.0 * sx**2.0) + (y - my) ** 2.0 / (2.0 * sy**2.0))
)
#Generator for dynamic values
self._MX = 0.75 * self._MX + 0.25 * (10.0 * np.random.random()-5.0)
self._MY = 0.75 * self._MY + 0.25 * (10.0 * np.random.random()-5.0)
# Generator for dynamic values
self._MX = 0.75 * self._MX + 0.25 * (10.0 * np.random.random() - 5.0)
self._MY = 0.75 * self._MY + 0.25 * (10.0 * np.random.random() - 5.0)
self._I0 = 0.75 * self._I0 + 0.25 * (255.0 * np.random.random())
arr = self._I0 * gaus2d(self._x, self._y, self._MX, self._MY)
@ -98,11 +103,11 @@ class XbpmSim(XbpmBase):
# Get next frame
beam = self._simFrame()
total = np.sum(beam)
rnge = np.floor(np.log10(total) - 0.0 )
s1 = np.sum(beam[32:64,32:64]) / 10**rnge
s2 = np.sum(beam[0:32,32:64]) / 10**rnge
s3 = np.sum(beam[32:64,0:32]) / 10**rnge
s4 = np.sum(beam[0:32,0:32]) / 10**rnge
rnge = np.floor(np.log10(total) - 0.0)
s1 = np.sum(beam[32:64, 32:64]) / 10**rnge
s2 = np.sum(beam[0:32, 32:64]) / 10**rnge
s3 = np.sum(beam[32:64, 0:32]) / 10**rnge
s4 = np.sum(beam[0:32, 0:32]) / 10**rnge
self.s1w.set(s1).wait()
self.s2w.set(s2).wait()
@ -111,8 +116,8 @@ class XbpmSim(XbpmBase):
self.rangew.set(rnge).wait()
# Print debug info
print(f"Raw signals: R={rnge}\t{s1}\t{s2}\t{s3}\t{s4}")
#plt.imshow(beam)
#plt.show(block=False)
# plt.imshow(beam)
# plt.show(block=False)
plt.pause(0.5)

View File

@ -11,6 +11,7 @@ class SlitH(PseudoPositioner):
NOTE: The real and virtual axes are wrapped together.
"""
# Motor interface
x1 = Component(EpicsMotor, "TRX1")
x2 = Component(EpicsMotor, "TRX2")
@ -21,14 +22,16 @@ class SlitH(PseudoPositioner):
@pseudo_position_argument
def forward(self, pseudo_pos):
"""Run a forward (pseudo -> real) calculation"""
return self.RealPosition(x1=pseudo_pos.cenx-pseudo_pos.gapx/2,
x2=pseudo_pos.cenx+pseudo_pos.gapx/2)
return self.RealPosition(
x1=pseudo_pos.cenx - pseudo_pos.gapx / 2, x2=pseudo_pos.cenx + pseudo_pos.gapx / 2
)
@real_position_argument
def inverse(self, real_pos):
"""Run an inverse (real -> pseudo) calculation"""
return self.PseudoPosition(cenx=(real_pos.x1+real_pos.x2)/2,
gapx=real_pos.x2-real_pos.x1)
return self.PseudoPosition(
cenx=(real_pos.x1 + real_pos.x2) / 2, gapx=real_pos.x2 - real_pos.x1
)
class SlitV(PseudoPositioner):
@ -40,6 +43,7 @@ class SlitV(PseudoPositioner):
NOTE: The real and virtual axes are wrapped together.
"""
# Motor interface
y1 = Component(EpicsMotor, "TRY1")
y2 = Component(EpicsMotor, "TRY2")
@ -50,11 +54,13 @@ class SlitV(PseudoPositioner):
@pseudo_position_argument
def forward(self, pseudo_pos):
"""Run a forward (pseudo -> real) calculation"""
return self.RealPosition(y1=pseudo_pos.ceny-pseudo_pos.gapy/2,
y2=pseudo_pos.ceny+pseudo_pos.gapy/2)
return self.RealPosition(
y1=pseudo_pos.ceny - pseudo_pos.gapy / 2, y2=pseudo_pos.ceny + pseudo_pos.gapy / 2
)
@real_position_argument
def inverse(self, real_pos):
"""Run an inverse (real -> pseudo) calculation"""
return self.PseudoPosition(ceny=(real_pos.y1+real_pos.y2)/2,
gapy=real_pos.y2-real_pos.y1)
return self.PseudoPosition(
ceny=(real_pos.y1 + real_pos.y2) / 2, gapy=real_pos.y2 - real_pos.y1
)