diff --git a/ophyd_devices/epics/DeviceFactory.py b/ophyd_devices/epics/DeviceFactory.py index 88db585..cbd6215 100644 --- a/ophyd_devices/epics/DeviceFactory.py +++ b/ophyd_devices/epics/DeviceFactory.py @@ -36,7 +36,7 @@ lut_db.update(yaml.load(fp, Loader=yaml.Loader)) def createProxy(name: str, connect=True) -> OphydObject: """Factory routine to create an ophyd device with a pre-defined schema. - Does nothing if the device is already an OphydObject! + Does nothing if the device is already an OphydObject! """ if issubclass(type(name), OphydObject): return name @@ -51,7 +51,7 @@ def createProxy(name: str, connect=True) -> OphydObject: ret.wait_for_connection(timeout=5) return ret else: - raise RuntimeError(f"Unsupported return class: {entry["type"]}") + raise RuntimeError(f"Unsupported return class: {entry['type']}") if __name__ == "__main__": diff --git a/ophyd_devices/epics/proxies/DelayGeneratorDG645.py b/ophyd_devices/epics/proxies/DelayGeneratorDG645.py index b4cc615..6674217 100644 --- a/ophyd_devices/epics/proxies/DelayGeneratorDG645.py +++ b/ophyd_devices/epics/proxies/DelayGeneratorDG645.py @@ -7,21 +7,37 @@ Created on Tue Nov 9 16:12:47 2021 from ophyd import Device, Component, EpicsSignal, EpicsSignalRO, Kind from ophyd import PositionerBase -from ophyd.pseudopos import pseudo_position_argument, real_position_argument, PseudoSingle, PseudoPositioner +from ophyd.pseudopos import ( + pseudo_position_argument, + real_position_argument, + PseudoSingle, + PseudoPositioner, +) class DelayStatic(Device): """Static axis for the T0 output channel - It allows setting the logic levels, but the timing is fixed. - The signal is high after receiving the trigger until the end - of the holdoff period. + It allows setting the logic levels, but the timing is fixed. + The signal is high after receiving the trigger until the end + of the holdoff period. """ + # Other channel stuff ttl_mode = Component(EpicsSignal, "OutputModeTtlSS.PROC", kind=Kind.config) nim_mode = Component(EpicsSignal, "OutputModeNimSS.PROC", kind=Kind.config) - polarity = Component(EpicsSignal, "OutputPolarityBI", write_pv="OutputPolarityBO", name='polarity', kind=Kind.config) - amplitude = Component(EpicsSignal, "OutputAmpAI", write_pv="OutputAmpAO", name='amplitude', kind=Kind.config) - polarity = Component(EpicsSignal, "OutputOffsetAI", write_pv="OutputOffsetAO", name='offset', kind=Kind.config) + polarity = Component( + EpicsSignal, + "OutputPolarityBI", + write_pv="OutputPolarityBO", + name="polarity", + kind=Kind.config, + ) + amplitude = Component( + EpicsSignal, "OutputAmpAI", write_pv="OutputAmpAO", name="amplitude", kind=Kind.config + ) + polarity = Component( + EpicsSignal, "OutputOffsetAI", write_pv="OutputOffsetAO", name="offset", kind=Kind.config + ) class DummyPositioner(Device, PositionerBase): @@ -32,28 +48,29 @@ class DummyPositioner(Device, PositionerBase): class DelayPair(PseudoPositioner): """Delay pair interface for DG645 - Virtual motor interface to a pair of signals (on the frontpanel). - It offers a simple delay and pulse width interface for scanning. + Virtual motor interface to a pair of signals (on the frontpanel). + It offers a simple delay and pulse width interface for scanning. """ + # The pseudo positioner axes - delay = Component(PseudoSingle, limits=(0, 2000.0), name='delay') - width = Component(PseudoSingle, limits=(0, 2000.0), name='pulsewidth') + delay = Component(PseudoSingle, limits=(0, 2000.0), name="delay") + width = Component(PseudoSingle, limits=(0, 2000.0), name="pulsewidth") # The real delay axes - ch1 = Component(DummyPositioner, name='ch1') - ch2 = Component(DummyPositioner, name='ch2') + ch1 = Component(DummyPositioner, name="ch1") + ch2 = Component(DummyPositioner, name="ch2") def __init__(self, *args, **kwargs): # Change suffix names before connecting (a bit of dynamic connections) - self.__class__.__dict__['ch1'].suffix = kwargs['channel'][0] - self.__class__.__dict__['ch2'].suffix = kwargs['channel'][1] - del kwargs['channel'] + self.__class__.__dict__["ch1"].suffix = kwargs["channel"][0] + self.__class__.__dict__["ch2"].suffix = kwargs["channel"][1] + del kwargs["channel"] # Call parent to start the connections super().__init__(*args, **kwargs) @pseudo_position_argument def forward(self, pseudo_pos): """Run a forward (pseudo -> real) calculation""" - return self.RealPosition(ch1=pseudo_pos.delay, ch2=pseudo_pos.delay+pseudo_pos.width) + return self.RealPosition(ch1=pseudo_pos.delay, ch2=pseudo_pos.delay + pseudo_pos.width) @real_position_argument def inverse(self, real_pos): @@ -64,52 +81,95 @@ class DelayPair(PseudoPositioner): class DelayGeneratorDG645(Device): """DG645 delay generator - This class implements a thin Ophyd wrapper around the Stanford Research DG645 - digital delay generator. + This class implements a thin Ophyd wrapper around the Stanford Research DG645 + digital delay generator. - Internally, the DG645 generates 8+1 signals: A, B, C, D, E, F, G, H and T0 - Front panel outputs T0, AB, CD, EF and GH are a combination of these signals. - Back panel outputs are directly routed signals. So signals are NOT INDEPENDENT. + Internally, the DG645 generates 8+1 signals: A, B, C, D, E, F, G, H and T0 + Front panel outputs T0, AB, CD, EF and GH are a combination of these signals. + Back panel outputs are directly routed signals. So signals are NOT INDEPENDENT. - Front panel signals: - All signals go high after their defined delays and go low after the trigger - holdoff period, i.e. this is the trigger window. Front panel outputs provide - a combination of these events. - Option 1 back panel 5V signals: - All signals go high after their defined delays and go low after the trigger - holdoff period, i.e. this is the trigger window. The signals will stay high - until the end of the window. - Option 2 back panel 30V signals: - All signals go high after their defined delays for ~100ns. This is fixed by - electronics (30V needs quite some power). This is not implemented in the - current device + Front panel signals: + All signals go high after their defined delays and go low after the trigger + holdoff period, i.e. this is the trigger window. Front panel outputs provide + a combination of these events. + Option 1 back panel 5V signals: + All signals go high after their defined delays and go low after the trigger + holdoff period, i.e. this is the trigger window. The signals will stay high + until the end of the window. + Option 2 back panel 30V signals: + All signals go high after their defined delays for ~100ns. This is fixed by + electronics (30V needs quite some power). This is not implemented in the + current device """ - state = Component(EpicsSignalRO, "EventStatusLI", name='status_register') - status = Component(EpicsSignalRO, "StatusSI", name='status') + + state = Component(EpicsSignalRO, "EventStatusLI", name="status_register") + status = Component(EpicsSignalRO, "StatusSI", name="status") # Front Panel - channelT0 = Component(DelayStatic, "T0", name='T0') - channelAB = Component(DelayPair, "", name='AB', channel="AB") - channelCD = Component(DelayPair, "", name='CD', channel="CD") - channelEF = Component(DelayPair, "", name='EF', channel="EF") - channelGH = Component(DelayPair, "", name='GH', channel="GH") + channelT0 = Component(DelayStatic, "T0", name="T0") + channelAB = Component(DelayPair, "", name="AB", channel="AB") + channelCD = Component(DelayPair, "", name="CD", channel="CD") + channelEF = Component(DelayPair, "", name="EF", channel="EF") + channelGH = Component(DelayPair, "", name="GH", channel="GH") # Minimum time between triggers - holdoff = Component(EpicsSignal, "TriggerHoldoffAI", write_pv="TriggerHoldoffAO", name='trigger_holdoff', kind=Kind.config) - inhibit = Component(EpicsSignal, "TriggerInhibitMI", write_pv="TriggerInhibitMO", name='trigger_inhibit', kind=Kind.config) - source = Component(EpicsSignal, "TriggerSourceMI", write_pv="TriggerSourceMO", name='trigger_source', kind=Kind.config) - level = Component(EpicsSignal, "TriggerLevelAI", write_pv="TriggerLevelAO", name='trigger_level', kind=Kind.config) - rate = Component(EpicsSignal, "TriggerRateAI", write_pv="TriggerRateAO", name='trigger_rate', kind=Kind.config) + holdoff = Component( + EpicsSignal, + "TriggerHoldoffAI", + write_pv="TriggerHoldoffAO", + name="trigger_holdoff", + kind=Kind.config, + ) + inhibit = Component( + EpicsSignal, + "TriggerInhibitMI", + write_pv="TriggerInhibitMO", + name="trigger_inhibit", + kind=Kind.config, + ) + source = Component( + EpicsSignal, + "TriggerSourceMI", + write_pv="TriggerSourceMO", + name="trigger_source", + kind=Kind.config, + ) + level = Component( + EpicsSignal, + "TriggerLevelAI", + write_pv="TriggerLevelAO", + name="trigger_level", + kind=Kind.config, + ) + rate = Component( + EpicsSignal, + "TriggerRateAI", + write_pv="TriggerRateAO", + name="trigger_rate", + kind=Kind.config, + ) # Command PVs - arm = Component(EpicsSignal, "TriggerDelayBI", write_pv="TriggerDelayBO", name='arm', kind=Kind.omitted) + arm = Component( + EpicsSignal, "TriggerDelayBI", write_pv="TriggerDelayBO", name="arm", kind=Kind.omitted + ) # Burst mode - burstMode = Component(EpicsSignal, "BurstModeBI", write_pv="BurstModeBO", name='burstmode', kind=Kind.config) - burstConfig = Component(EpicsSignal, "BurstConfigBI", write_pv="BurstConfigBO", name='burstconfig', kind=Kind.config) - burstCount = Component(EpicsSignal, "BurstCountLI", write_pv="BurstCountLO", name='burstcount', kind=Kind.config) - burstDelay = Component(EpicsSignal, "BurstDelayAI", write_pv="BurstDelayAO", name='burstdelay', kind=Kind.config) - burstPeriod = Component(EpicsSignal, "BurstPeriodAI", write_pv="BurstPeriodAO", name='burstperiod', kind=Kind.config) + burstMode = Component( + EpicsSignal, "BurstModeBI", write_pv="BurstModeBO", name="burstmode", kind=Kind.config + ) + burstConfig = Component( + EpicsSignal, "BurstConfigBI", write_pv="BurstConfigBO", name="burstconfig", kind=Kind.config + ) + burstCount = Component( + EpicsSignal, "BurstCountLI", write_pv="BurstCountLO", name="burstcount", kind=Kind.config + ) + burstDelay = Component( + EpicsSignal, "BurstDelayAI", write_pv="BurstDelayAO", name="burstdelay", kind=Kind.config + ) + burstPeriod = Component( + EpicsSignal, "BurstPeriodAI", write_pv="BurstPeriodAO", name="burstperiod", kind=Kind.config + ) def stage(self): """Trigger the generator by arming to accept triggers""" @@ -126,16 +186,16 @@ class DelayGeneratorDG645(Device): assert count > 0, "Number of bursts must be positive" assert delay > 0, "Burst delay must be positive" assert period > 0, "Burst period must be positive" - assert config in ['all', 'first'], "Supported bust configs are 'all' and 'first'" + assert config in ["all", "first"], "Supported bust configs are 'all' and 'first'" self.burstMode.set(1).wait() self.burstCount.set(count).wait() self.burstDelay.set(delay).wait() self.burstPeriod.set(period).wait() - if config=="all": + if config == "all": self.burstConfig.set(0).wait() - elif config=="first": + elif config == "first": self.burstConfig.set(1).wait() def busrtDisable(self): diff --git a/ophyd_devices/epics/proxies/InsertionDevice.py b/ophyd_devices/epics/proxies/InsertionDevice.py index dbba07e..4144bc5 100644 --- a/ophyd_devices/epics/proxies/InsertionDevice.py +++ b/ophyd_devices/epics/proxies/InsertionDevice.py @@ -10,6 +10,7 @@ class InsertionDevice(PVPositioner): WARN: The x and y are not updated by the IOC """ + status = Component(EpicsSignalRO, "-USER:STATUS", auto_monitor=True) errorSource = Component(EpicsSignalRO, "-USER:ERROR-SOURCE", auto_monitor=True) isOpen = Component(EpicsSignalRO, "-GAP:ISOPEN", auto_monitor=True) @@ -20,6 +21,7 @@ class InsertionDevice(PVPositioner): done = Component(EpicsSignalRO, ":DONE", auto_monitor=True) stop_signal = Component(EpicsSignal, "-GAP:STOP", kind=Kind.omitted) + # Automatically start simulation if directly invoked # (NA for important devices) if __name__ == "__main__": diff --git a/ophyd_devices/epics/proxies/SpmBase.py b/ophyd_devices/epics/proxies/SpmBase.py index dc01b29..502d3c1 100644 --- a/ophyd_devices/epics/proxies/SpmBase.py +++ b/ophyd_devices/epics/proxies/SpmBase.py @@ -6,13 +6,14 @@ import matplotlib.pyplot as plt class SpmBase(Device): """Python wrapper for the Staggered Blade Pair Monitors - SPM's consist of a set of four horizontal tungsten blades and are - used to monitor the beam height (only Y) for the bending magnet - beamlines of SLS. + SPM's consist of a set of four horizontal tungsten blades and are + used to monitor the beam height (only Y) for the bending magnet + beamlines of SLS. - Note: EPICS provided signals are read only, but the users can - change the beam position offset. + Note: EPICS provided signals are read only, but the users can + change the beam position offset. """ + # Motor interface s1 = Component(EpicsSignalRO, "Current1", auto_monitor=True) s2 = Component(EpicsSignalRO, "Current2", auto_monitor=True) @@ -20,20 +21,21 @@ class SpmBase(Device): s4 = Component(EpicsSignalRO, "Current4", auto_monitor=True) sum = Component(EpicsSignalRO, "SumAll", auto_monitor=True) y = Component(EpicsSignalRO, "Y", auto_monitor=True) - scale = Component(EpicsSignal, "PositionScaleY", auto_monitor=True) + scale = Component(EpicsSignal, "PositionScaleY", auto_monitor=True) offset = Component(EpicsSignal, "PositionOffsetY", auto_monitor=True) class SpmSim(SpmBase): """Python wrapper for simulated Staggered Blade Pair Monitors - SPM's consist of a set of four horizontal tungsten blades and are - used to monitor the beam height (only Y) for the bending magnet - beamlines of SLS. + SPM's consist of a set of four horizontal tungsten blades and are + used to monitor the beam height (only Y) for the bending magnet + beamlines of SLS. - This simulation device extends the basic proxy with a script that - fills signals with quasi-randomized values. + This simulation device extends the basic proxy with a script that + fills signals with quasi-randomized values. """ + # Motor interface s1w = Component(EpicsSignal, "Current1:RAW.VAL", auto_monitor=False) s2w = Component(EpicsSignal, "Current2:RAW.VAL", auto_monitor=False) @@ -46,16 +48,18 @@ class SpmSim(SpmBase): self._MX = 0 self._MY = 0 - self._I0 = 255.0 + self._I0 = 255.0 self._x = np.linspace(-5, 5, 64) self._y = np.linspace(-5, 5, 64) - self._x, self._y = np.meshgrid(self._x, self._y) + self._x, self._y = np.meshgrid(self._x, self._y) def _simFrame(self): """Generator to simulate a jumping gaussian""" # Define normalized 2D gaussian def gaus2d(x=0, y=0, mx=0, my=0, sx=1, sy=1): - return np.exp(-((x - mx)**2. / (2. * sx**2.) + (y - my)**2. / (2. * sy**2.))) + return np.exp( + -((x - mx) ** 2.0 / (2.0 * sx**2.0) + (y - my) ** 2.0 / (2.0 * sy**2.0)) + ) # Generator for dynamic values self._MX = 0.75 * self._MX + 0.25 * (10.0 * np.random.random() - 5.0) diff --git a/ophyd_devices/epics/proxies/XbpmBase.py b/ophyd_devices/epics/proxies/XbpmBase.py index 344ea28..1cc540b 100644 --- a/ophyd_devices/epics/proxies/XbpmBase.py +++ b/ophyd_devices/epics/proxies/XbpmBase.py @@ -11,6 +11,7 @@ class XbpmCsaxsOp(Device): WARN: The x and y are not updated by the IOC """ + sum = Component(EpicsSignalRO, "SUM", auto_monitor=True) x = Component(EpicsSignalRO, "POSH", auto_monitor=True) y = Component(EpicsSignalRO, "POSV", auto_monitor=True) @@ -32,6 +33,7 @@ class XbpmBase(Device): Note: EPICS provided signals are read only, but the user can change the beam position offset. """ + # Motor interface s1 = Component(EpicsSignalRO, "Current1", auto_monitor=True) s2 = Component(EpicsSignalRO, "Current2", auto_monitor=True) @@ -42,9 +44,9 @@ class XbpmBase(Device): asymV = Component(EpicsSignalRO, "asymV", auto_monitor=True) x = Component(EpicsSignalRO, "X", auto_monitor=True) y = Component(EpicsSignalRO, "Y", auto_monitor=True) - scaleH = Component(EpicsSignal, "PositionScaleX", auto_monitor=False) + scaleH = Component(EpicsSignal, "PositionScaleX", auto_monitor=False) offsetH = Component(EpicsSignal, "PositionOffsetX", auto_monitor=False) - scaleV = Component(EpicsSignal, "PositionScaleY", auto_monitor=False) + scaleV = Component(EpicsSignal, "PositionScaleY", auto_monitor=False) offsetV = Component(EpicsSignal, "PositionOffsetY", auto_monitor=False) @@ -63,6 +65,7 @@ class XbpmSim(XbpmBase): This simulation device extends the basic proxy with a script that fills signals with quasi-randomized values. """ + # Motor interface s1w = Component(EpicsSignal, "Current1:RAW.VAL", auto_monitor=False) s2w = Component(EpicsSignal, "Current2:RAW.VAL", auto_monitor=False) @@ -84,11 +87,13 @@ class XbpmSim(XbpmBase): """Generator to simulate a jumping gaussian""" # define normalized 2D gaussian def gaus2d(x=0, y=0, mx=0, my=0, sx=1, sy=1): - return np.exp(-((x - mx)**2. / (2. * sx**2.) + (y - my)**2. / (2. * sy**2.))) + return np.exp( + -((x - mx) ** 2.0 / (2.0 * sx**2.0) + (y - my) ** 2.0 / (2.0 * sy**2.0)) + ) - #Generator for dynamic values - self._MX = 0.75 * self._MX + 0.25 * (10.0 * np.random.random()-5.0) - self._MY = 0.75 * self._MY + 0.25 * (10.0 * np.random.random()-5.0) + # Generator for dynamic values + self._MX = 0.75 * self._MX + 0.25 * (10.0 * np.random.random() - 5.0) + self._MY = 0.75 * self._MY + 0.25 * (10.0 * np.random.random() - 5.0) self._I0 = 0.75 * self._I0 + 0.25 * (255.0 * np.random.random()) arr = self._I0 * gaus2d(self._x, self._y, self._MX, self._MY) @@ -98,11 +103,11 @@ class XbpmSim(XbpmBase): # Get next frame beam = self._simFrame() total = np.sum(beam) - rnge = np.floor(np.log10(total) - 0.0 ) - s1 = np.sum(beam[32:64,32:64]) / 10**rnge - s2 = np.sum(beam[0:32,32:64]) / 10**rnge - s3 = np.sum(beam[32:64,0:32]) / 10**rnge - s4 = np.sum(beam[0:32,0:32]) / 10**rnge + rnge = np.floor(np.log10(total) - 0.0) + s1 = np.sum(beam[32:64, 32:64]) / 10**rnge + s2 = np.sum(beam[0:32, 32:64]) / 10**rnge + s3 = np.sum(beam[32:64, 0:32]) / 10**rnge + s4 = np.sum(beam[0:32, 0:32]) / 10**rnge self.s1w.set(s1).wait() self.s2w.set(s2).wait() @@ -111,8 +116,8 @@ class XbpmSim(XbpmBase): self.rangew.set(rnge).wait() # Print debug info print(f"Raw signals: R={rnge}\t{s1}\t{s2}\t{s3}\t{s4}") - #plt.imshow(beam) - #plt.show(block=False) + # plt.imshow(beam) + # plt.show(block=False) plt.pause(0.5) diff --git a/ophyd_devices/epics/proxies/slits.py b/ophyd_devices/epics/proxies/slits.py index 4c5f5db..61e29a3 100644 --- a/ophyd_devices/epics/proxies/slits.py +++ b/ophyd_devices/epics/proxies/slits.py @@ -11,6 +11,7 @@ class SlitH(PseudoPositioner): NOTE: The real and virtual axes are wrapped together. """ + # Motor interface x1 = Component(EpicsMotor, "TRX1") x2 = Component(EpicsMotor, "TRX2") @@ -21,14 +22,16 @@ class SlitH(PseudoPositioner): @pseudo_position_argument def forward(self, pseudo_pos): """Run a forward (pseudo -> real) calculation""" - return self.RealPosition(x1=pseudo_pos.cenx-pseudo_pos.gapx/2, - x2=pseudo_pos.cenx+pseudo_pos.gapx/2) + return self.RealPosition( + x1=pseudo_pos.cenx - pseudo_pos.gapx / 2, x2=pseudo_pos.cenx + pseudo_pos.gapx / 2 + ) @real_position_argument def inverse(self, real_pos): """Run an inverse (real -> pseudo) calculation""" - return self.PseudoPosition(cenx=(real_pos.x1+real_pos.x2)/2, - gapx=real_pos.x2-real_pos.x1) + return self.PseudoPosition( + cenx=(real_pos.x1 + real_pos.x2) / 2, gapx=real_pos.x2 - real_pos.x1 + ) class SlitV(PseudoPositioner): @@ -40,6 +43,7 @@ class SlitV(PseudoPositioner): NOTE: The real and virtual axes are wrapped together. """ + # Motor interface y1 = Component(EpicsMotor, "TRY1") y2 = Component(EpicsMotor, "TRY2") @@ -50,11 +54,13 @@ class SlitV(PseudoPositioner): @pseudo_position_argument def forward(self, pseudo_pos): """Run a forward (pseudo -> real) calculation""" - return self.RealPosition(y1=pseudo_pos.ceny-pseudo_pos.gapy/2, - y2=pseudo_pos.ceny+pseudo_pos.gapy/2) + return self.RealPosition( + y1=pseudo_pos.ceny - pseudo_pos.gapy / 2, y2=pseudo_pos.ceny + pseudo_pos.gapy / 2 + ) @real_position_argument def inverse(self, real_pos): """Run an inverse (real -> pseudo) calculation""" - return self.PseudoPosition(ceny=(real_pos.y1+real_pos.y2)/2, - gapy=real_pos.y2-real_pos.y1) + return self.PseudoPosition( + ceny=(real_pos.y1 + real_pos.y2) / 2, gapy=real_pos.y2 - real_pos.y1 + )