#!/usr/bin/env python """ Scan guard utility for PEARL scans This utility watches the scan facilities at X03DA and responds to beam loss and other incidents: IMPLEMENTED: * Beam loss: Pause the sscan records. * Beam loss: Pause the Scienta analyser. * Beam loss: Repeat the last Scienta measurement when the beam is restored. * Beam loss: Close/open the station shutter. * Beam loss: Independent low and high trip levels. * Beam loss: Beam can be okay only if beamline is "attended" or "unattended" * Beam loss: Closed absorber or front end shutter will also trigger a beam loss TODO: Author: Matthias Muntwiler Created: Oct 03, 2013 Copyright: (c) 2013 Paul Scherrer Institut """ import time import logging import epicsPV ##### MonitoredDevice class ##### class MonitoredDevice(object): """ Base class for devices which detect beam loss/return. """ def __init__(self, name): """ self.name = device name from which the PV names can be derived. self.pv = an epicsPV.epicsPV object connecting to an EPICS PV self.device_active = the device is consulted to decide whether the beam is on or off. self.severity = log level to report on beam loss (logging.WARNING, logging.ERROR, or logging.CRITICAL) """ self.name = name self.pv = None self.device_active = True self.severity = logging.WARNING self._loss_log_func = logging.warning def connect(self): """ Connects to the EPICS PVs. Name of PVs can be derived from self.name. pend_io() should be done by caller. """ logging.info("setting up monitored device: %s", self.name) if self.severity == logging.ERROR: self._loss_log_func = logging.error elif self.severity == logging.CRITICAL: self._loss_log_func = logging.critical else: self._loss_log_func = logging.warning def poll(self): """ Polls the PVs used by the class if necessary. """ pass def set_monitors(self): """ Sets monitors on the PVs used by the class if necessary. """ pass def beam_okay(self, last_beam_status): """ Returns True if the beam is okay from the perspective of this device, False otherwise. """ return False ##### TripDevice class ##### class TripDevice(MonitoredDevice): """ Base class for devices which detect beam loss/return by checking that a float value is within the allowed range. """ def __init__(self, name, trip_lo, trip_hi, dead_band): """ name = complete name of the EPICS PV to be monitored trip_lo = beam loss is triggered if value falls below this level trip_hi = beam loss is triggered if value rises above this level dead_band = beam is restored if value returns to within trip_lo + dead_band ... trip_hi - dead_band """ super(TripDevice, self).__init__(name) self.trip_lo = trip_lo self.trip_hi = trip_hi self.dead_band = dead_band def connect(self): self.pv = epicsPV.epicsPV(self.name, wait=0) logging.info("%s low trip %g", self.name, self.trip_lo) logging.info("%s high trip %g", self.name, self.trip_hi) def set_monitors(self): self.pv.array_get() self.pv.setMonitor() def poll(self): logging.debug("caget %s: %g", self.pv.pvname, self.pv.getValue()) def beam_okay(self, last_beam_status): if not self.device_active: return True val = self.pv.getValue() if last_beam_status: result = self.trip_lo <= val <= self.trip_hi else: result = self.trip_lo + self.dead_band <= val <= self.trip_hi - self.dead_band logging.debug("beam_okay = %s (%g mA)", str(result), val) if last_beam_status and not result: self._loss_log_func("%s trip (%g mA)", self.name, val) return result ##### BeamDrop class ##### class BeamDrop(MonitoredDevice): def __init__(self, name): super(BeamDrop, self).__init__(name) def connect(self): self.pv = epicsPV.epicsPV(self.name, wait=0) def set_monitors(self): self.pv.array_get() self.pv.setMonitor() def poll(self): logging.debug("caget %s: %g", self.pv.pvname, self.pv.getValue()) def beam_okay(self, last_beam_status): if not self.device_active: return True val = self.pv.getValue() result = val == 0 logging.debug("operation_mode_okay() = %s (%g)", str(result), val) if last_beam_status and not result: self._loss_log_func("operation mode trip (%g)", val) return result ##### BeamlineMode class ##### class BeamlineMode(MonitoredDevice): OFFLINE = 0 ATTENDED = 1 UNATTENDED = 2 def __init__(self, name): super(BeamlineMode, self).__init__(name) def connect(self): self.pv = epicsPV.epicsPV(self.name, wait=0) def set_monitors(self): self.pv.array_get() self.pv.setMonitor() def poll(self): logging.debug("caget %s: %g", self.pv.pvname, self.pv.getValue()) def beam_okay(self, last_beam_status): if not self.device_active: return True val = self.pv.getValue() result = val == self.ATTENDED or val == self.UNATTENDED logging.debug("beamline_mode_okay = %s (%g)", str(result), val) return result ##### FrontendShutter class ##### class FrontendShutter(MonitoredDevice): """ Monitors a front end shutter. This can be any on/off device with a {name}:OPEN_EPS readback which reports 1 in the open state, and 0 in the closed state. """ CLOSED = 0 OPEN = 1 def __init__(self, name): super(FrontendShutter, self).__init__(name) def connect(self): """ Connects to the process variables. name = name of the shutter, e.g. "X03DA-FE-PH1" """ MonitoredDevice.connect(self) self.pv = epicsPV.epicsPV(self.name + ":OPEN_EPS", wait=0) def set_monitors(self): self.pv.array_get() self.pv.setMonitor() def poll(self): """ Polls the shutter position """ logging.debug("caget %s: %g", self.pv.pvname, self.pv.getValue()) def beam_okay(self, last_beam_status): """ Returns True if the shutter is open, False otherwise. """ if not self.device_active: return True val = self.pv.getValue() result = val == self.OPEN logging.debug("shutter %s = %s (%g)", self.name, str(result), val) if last_beam_status and not result: self._loss_log_func("shutter trip %s", self.name) return result ##### ControlledDevice class ##### class ControlledDevice(object): """ Base class for devices which are controlled in response to beam loss/return. """ def __init__(self, name): """ self.name = device name from which the PV names can be derived. self.device_active = the device participated in the measurement before it was paused. self.paused = the device is paused due to a call to the self.pause() method. """ self.name = name self.device_active = False self.paused = False def connect(self): """ Connects to the EPICS PVs. Name of PVs can be derived from self.name. pend_io() should be done by caller. """ logging.info("setting up controlled device: %s", self.name) pass def poll(self): """ Polls the PVs used by the class if necessary. """ pass def set_monitors(self): """ Sets monitors on the PVs used by the class if necessary. """ pass def pause(self): """ Pauses the device, e.g. in response to beam loss. The method should update self.device_active and self.paused. """ pass def resume(self): """ Resumes the device. This method should act only if the previous pause() actually changed the device, i.e. self.pause == True. The method must reset self.paused. """ pass ##### ScanRecord class ##### class ScanRecord(ControlledDevice): def __init__(self, name): """ self.faze: epicsPV.epicsPV connecting to the FAZE field of the scan record. self.wait: epicsPV.epicsPV connecting to the WAIT field of the scan record. """ super(ScanRecord, self).__init__(name) self.faze = None self.wait = None def connect(self): """ Connects to the process variables of the scan record name = complete name of the scan record """ ControlledDevice.connect(self) self.faze = epicsPV.epicsPV(self.name + '.FAZE', wait=0) self.wait = epicsPV.epicsPV(self.name + '.WAIT', wait=0) def set_monitors(self): # before we can set a monitor, we have to get the value once. self.faze.array_get() self.faze.setMonitor() self.wait.array_get() def poll(self): logging.debug("caget %s: %g", self.faze.pvname, self.faze.getValue()) logging.debug("caget %s: %g", self.wait.pvname, self.wait.getValue()) def pause(self): self.device_active = self.faze.getValue() >= 1 if self.device_active: logging.info("pausing scan on %s", self.name) self.wait.array_put(1) self.wait.pend_io() self.paused = True else: self.paused = False def resume(self): if self.paused: # do not resume if the user has aborted the scan if self.faze.getValue() >= 1: logging.info("resuming scan on %s", self.name) self.wait.array_put(0) self.wait.pend_io() else: logging.warning("scan on %s aborted by user", self.name) self.paused = False ##### AreaDetector class ##### class AreaDetector(ControlledDevice): """ This class pauses and resumes an Area Detector device. While pausing, array callbacks are disabled so that the incomplete measurement is not passed to the plugins. If the detector resumes before the scan advances, the incomplete measurement is effectively repeated. Pause: Callbacks are disabled. Acquisition is aborted. Resume: Callbacks are enabled. Acquisition is restarted if it was interrupted. The detector is paused only if all of the following conditions are true, otherwise pause() will have no effect: 1) Array callbacks are enabled on cam1 2) HDF1 plugin is capturing resume() has no effect unless the previous pause() stopped the detector. """ def __init__(self, name): super(AreaDetector, self).__init__(name) # indicates whether we stopped a running acquisition when the pause started self.acq_active = False self.ArrayCallbacks = None self.DetectorState = None self.Acquire = None self.AcquireTime = None self.Capturing = None def connect(self): """ Connects to the process variables. name = prefix of the area detector (everything before "cam1", e.g. "X03DA-SCIENTA") """ ControlledDevice.connect(self) self.ArrayCallbacks = epicsPV.epicsPV(self.name + ':cam1:ArrayCallbacks', wait=0) self.DetectorState = epicsPV.epicsPV(self.name + ':cam1:DetectorState_RBV', wait=0) self.Acquire = epicsPV.epicsPV(self.name + ':cam1:Acquire', wait=0) self.AcquireTime = epicsPV.epicsPV(self.name + ':cam1:AcquireTime_RBV', wait=0) self.Capturing = epicsPV.epicsPV(self.name + ':HDF1:Capture_RBV', wait=0) def poll(self): """ Polls the detector state. This class doesn't set monitors on the PVs. Thus, we have to explicitly poll them before reading their values. """ self.DetectorState.array_get() logging.debug("caget %s: %g", self.DetectorState.pvname, self.DetectorState.getValue()) self.ArrayCallbacks.array_get() logging.debug("caget %s: %g", self.ArrayCallbacks.pvname, self.ArrayCallbacks.getValue()) self.Capturing.array_get() logging.debug("caget %s: %g", self.Capturing.pvname, self.Capturing.getValue()) def pause(self): """ Pauses the area detector. The goal is that no corrupted data gets into a data file in stream mode. The detector is considered active if the HDF5 plugin is in capture mode, and if the array callbacks are enabled. In this case, 1) the callbacks are disabled so that the corrupted data is not passed on to the plugins, 2) the acquisition is stopped if running. """ self.device_active = ((self.ArrayCallbacks.getValue() == 1) and (self.Capturing.getValue() == 1)) self.acq_active = self.DetectorState.getValue() == 1 logging.debug("caget %s: %g", self.DetectorState.pvname, self.DetectorState.getValue()) logging.debug("acq_active = %s", str(self.acq_active)) if self.device_active: logging.info("disabling callbacks on %s", self.name) self.ArrayCallbacks.array_put(0) self.Acquire.pend_io() logging.info("stopping acquisition on %s", self.name) self.Acquire.array_put(0) self.Acquire.pend_io() self.paused = True else: logging.debug("device %s not active", self.name) self.paused = False def resume(self): """ Resumes the area detector. 1) The callbacks are re-enabled. 2) If the acquisition was interruped by the pause, it is restarted unless the user has stopped the capture mode. """ if self.paused: self.AcquireTime.array_get() timeout = self.AcquireTime.getValue() * 2.0 + 10.0 self.wait_idle(timeout=timeout) logging.info("re-enabling callbacks on %s", self.name) self.ArrayCallbacks.array_put(1) logging.debug("acq_active = %s", str(self.acq_active)) self.Capturing.array_get() if self.Capturing.getValue() >= 1: # note: we need to restart the acquisition regardless of previous state. # if self.acq_active: logging.info("repeating acquisition on %s", self.name) self.Acquire.array_put(1) self.Acquire.pend_io() if self.wait_acquire(timeout=10.0): self.wait_idle(timeout=timeout) else: # user stopped the capture logging.warning("file capture on %s aborted by user", self.name) self.paused = False def wait_idle(self, timeout=60.0): """ waits for the detector to become idle. returns True if detector is idle at the end, False if the timeout exceeded while the detector is not idle. """ self.DetectorState.array_get() if self.DetectorState.getValue() == 1: logging.info("waiting until acquisition stops on %s", self.name) if self.DetectorState.getValue() > 1: logging.error("unexpected detector state on %s: %g", self.name, self.DetectorState.getValue()) while (self.DetectorState.getValue() >= 1) and (timeout > 0.0): self.DetectorState.pend_event(timeout=1.0) self.DetectorState.array_get() timeout -= 1.0 if timeout <= 0.0: logging.error("timeout while waiting for detector %s to become idle. current state: %g", self.name, self.DetectorState.getValue()) return False else: return True def wait_acquire(self, timeout=60.0): """ waits for the detector to start acquisition. returns True if detector is acquiring at the end, False if the timeout exceeded while the detector is idle. """ self.DetectorState.array_get() if self.DetectorState.getValue() < 1: logging.info("waiting until acquisition starts on %s", self.name) if self.DetectorState.getValue() > 1: logging.error("unexpected detector state on %s: %g", self.name, self.DetectorState.getValue()) while (self.DetectorState.getValue() < 1) and (timeout > 0.0): self.DetectorState.pend_event(timeout=1.0) self.DetectorState.array_get() timeout -= 1.0 if timeout <= 0.0: logging.error("timeout while waiting for detector %s. current state: %g", self.name, self.DetectorState.getValue()) return False else: return True ##### AnalogChannels class ##### class AnalogChannels(ControlledDevice): """ This class pauses and resumes the Analog Channels device. We don't have much control over this devices. All we can do is trigger it. Triggering is allowed at any time even if the previous interval hasn't ended. Pause: no action. Resume: Device is triggered. resume() has no effect unless the previous pause() stopped the detector. """ def __init__(self, name): super(AnalogChannels, self).__init__(name) self.Trigger = None def connect(self): """ Connects to the process variables. name = prefix of the analog channels (everything before the colon ":", e.g. "X03DA-OP-10ADC") """ ControlledDevice.connect(self) self.Trigger = epicsPV.epicsPV(self.name + ':TRG', wait=0) def poll(self): """ Polls the detector state. This class doesn't set monitors on the PVs. Thus, we have to explicitly poll them before reading their values. """ self.Trigger.array_get() logging.debug("caget %s: %g", self.Trigger.pvname, self.Trigger.getValue()) def pause(self): """ Pauses the detector. This method does not change the state of the device. It is not possible (and not necessary) to stop the device. Writing any value to self.Trigger will trigger a new measurement at any time. """ self.device_active = True self.paused = True def resume(self): """ Triggers the detector. """ if self.paused: logging.info("triggering analog channels %s", self.name) self.Trigger.array_put(1) self.paused = False ##### StationShutter class ##### class StationShutter(ControlledDevice): CLOSED = 2 MOVING = 4 OPEN = 5 def __init__(self, name): super(StationShutter, self).__init__(name) self.ShutterOpen = None self.ShutterClose = None self.ShutterRbv = None def connect(self): """ Connects to the process variables. name = name of the station shutter, e.g. "X03DA-OP-ST1" """ ControlledDevice.connect(self) self.ShutterOpen = epicsPV.epicsPV(self.name + ":WT_SET_OPEN", wait=0) self.ShutterClose = epicsPV.epicsPV(self.name + ":WT_SET_CLOSE", wait=0) self.ShutterRbv = epicsPV.epicsPV(self.name + ":POSITION", wait=0) def set_monitors(self): self.ShutterRbv.array_get() self.ShutterRbv.setMonitor() def poll(self): """ Polls the shutter position """ logging.debug("caget %s: %g", self.ShutterRbv.pvname, self.ShutterRbv.getValue()) def pause(self): """ Closes the shutter. """ self.device_active = self.ShutterRbv.getValue() == self.OPEN if self.device_active: logging.info("closing shutter %s", self.name) self.set_shutter(self.CLOSED) self.paused = True else: self.paused = False def resume(self): """ Opens the shutter if it was open before the pause. """ if self.paused: logging.info("opening shutter %s", self.name) self.set_shutter(self.OPEN) self.paused = False def set_shutter(self, newpos): """ Opens or closes the shutter. newpos = StationShutter.OPEN or StationShutter.CLOSED """ # try 5 times before giving up tries = 5 while (self.ShutterRbv.getValue() != newpos) and (tries > 0): oldpos = self.ShutterRbv.getValue() curpos = oldpos logging.debug("shutter position before trigger: %g", curpos) # send the command sequence self.ShutterOpen.array_put(0) self.ShutterClose.array_put(0) self.ShutterRbv.pend_event(timeout=0.01) if newpos == self.OPEN: self.ShutterOpen.array_put(1) else: self.ShutterClose.array_put(1) self.ShutterRbv.pend_event(timeout=0.5) self.ShutterOpen.array_put(0) self.ShutterClose.array_put(0) self.ShutterRbv.pend_event(timeout=0.01) curpos = self.ShutterRbv.getValue() logging.debug("shutter position after command: %g", curpos) # wait until it starts moving timeout = 2.0 while (self.ShutterRbv.getValue() == oldpos) and (timeout > 0.0): self.ShutterRbv.pend_event(timeout=0.1) timeout -= 0.1 # wait while it is moving timeout = 5.0 while (self.ShutterRbv.getValue() == self.MOVING) and (timeout > 0.0): self.ShutterRbv.pend_event(timeout=0.1) timeout -= 0.1 # wait longer if the readback is unexpected timeout = 10.0 while (self.ShutterRbv.getValue() not in [self.OPEN, self.CLOSED]) and (timeout > 0.0): self.ShutterRbv.pend_event(timeout=0.5) timeout -= 0.5 curpos = self.ShutterRbv.getValue() logging.debug("shutter position after wait: %g", curpos) tries -= 1 curpos = self.ShutterRbv.getValue() result = curpos == newpos if not result: logging.error("failed to set station shutter (requested %g, actual %g)", newpos, curpos) return result ###Global flag class GlobalFlag(ControlledDevice): def __init__(self): ControlledDevice.__init__(self,"Global Flag") global beam_ok beam_ok = True def pause(self): """ """ global beam_ok beam_ok = False def resume(self): """ Opens the shutter if it was open before the pause. """ global beam_ok beam_ok = True ##### ScanGuard class ##### class ScanGuard(object): """ Info strings for operation mode. Similar to those used by ACOAU-ACCU:OP-MODE without spaces and punctuation. Notes: OP-MODE = 5 is signalled for orbit-feedback problems. """ OPMODES = ["MachineDown", "InjStopped", "Accumulating_", "Accumulating", "TopUpReady", "LightAvailable_", "LightAvailable"] def __init__(self): # list of epicsPV objects monitored by this class self.monitors = [] # list of ControlledDevice objects to be controlled self.devices = [] # False = test mode, do not actually send out commands self.controls_active = False # True = beam is okay self.beam_ok = False # True while executing the main loop self.running = False # For test and demo, simulate a beam loss after 15 seconds. self.test = False self.test_time_seconds = 15 self.test_start_seconds = time.time() # for testing, use a local PV # self._ringcurrent = TripDevice('X03DA-OP-EXS:AP', 10.0, 500.0, 10.0) self._ringcurrent = TripDevice('ARIDI-PCT:CURRENT', 390.0, 410.0, 10.0) self.monitors.append(self._ringcurrent) # guard against erratic motors self._fmu_rx = TripDevice('X03DA-OP-FMU:oRx', -1.0, 1.0, 0.1) self._fmu_rx.severity = logging.ERROR self.monitors.append(self._fmu_rx) self._fmu_ry = TripDevice('X03DA-OP-FMU:oRy', -1.1, 0.9, 0.1) self._fmu_ry.severity = logging.ERROR self.monitors.append(self._fmu_ry) self._fmu_rz = TripDevice('X03DA-OP-FMU:oRz', -0.5, 0.5, 0.1) self._fmu_rz.severity = logging.ERROR self.monitors.append(self._fmu_rz) # self.operation_mode = OperationMode('ACOAU-ACCU:OP-MODE') # self.monitors.append(self.operation_mode) self._beam_drop = BeamDrop('ACOAU-ACCU:OP-BEAMDROP') self.monitors.append(self._beam_drop) self._beamline_mode = BeamlineMode('ACOAU-ACCU:OP-X03DA') self.monitors.append(self._beamline_mode) self._frontend_shutter = FrontendShutter('X03DA-FE-PH1') self.monitors.append(self._frontend_shutter) self._frontend_absorber = FrontendShutter('X03DA-FE-AB1') self.monitors.append(self._frontend_absorber) # devices are paused in the order set here # they are resumed in reversed order self.devices.append(ScanRecord('X03DA-PC:scan1')) self.devices.append(ScanRecord('X03DA-PC:scan2')) self.devices.append(ScanRecord('X03DA-PC:scan3')) self.devices.append(ScanRecord('X03DA-PC:scan4')) # how can we detect whether channels are available? self.devices.append(AreaDetector('X03DA-SCIENTA')) self.devices.append(AnalogChannels('X03DA-OP-10ADC')) self.devices.append(StationShutter('X03DA-OP-ST1')) self.devices.append(GlobalFlag()) @property def ringcurrent_trip_lo(self): """ trip if ring current falls below this level. """ return self._ringcurrent.trip_lo @ringcurrent_trip_lo.setter def ringcurrent_trip_lo(self, value): self._ringcurrent.trip_lo = value @property def ringcurrent_trip_hi(self): """ trip if ring current rises above this level. """ return self._ringcurrent.trip_hi @ringcurrent_trip_hi.setter def ringcurrent_trip_hi(self, value): self._ringcurrent.trip_hi = value @property def ringcurrent_dead_band(self): """ resume if ring current returns above ringcurrent_trip_lo + ringcurrent_dead_band. """ return self._ringcurrent.dead_band @ringcurrent_dead_band.setter def ringcurrent_dead_band(self, value): self._ringcurrent.dead_band = value def connect(self): for monitor in self.monitors: monitor.connect() for device in self.devices: device.connect() # Wait for all PVs to connect try: self._ringcurrent.pv.pend_io() except: logging.error('EPICS channels not available') return False # get first value, initialize data structures, set monitors for monitor in self.monitors: monitor.set_monitors() monitor.poll() for device in self.devices: device.set_monitors() device.poll() return True def mainloop(self): self._ringcurrent.pv.pend_io(timeout=0.01) #self.beam_ok = self.test or self.check_beam() self.beam_ok = True self.test_start_seconds = time.time() self.running = True if self.test: logging.info("test mode. simulated beam loss in %g seconds.", self.test_time_seconds) while self.running: self._ringcurrent.pv.poll() #pend_event(timeout=1) new_beam_ok = self.check_beam() if new_beam_ok != self.beam_ok: self.beam_ok = new_beam_ok if new_beam_ok: logging.warning("beam restored") self.resume() else: logging.warning("beam down") self.pause() time.sleep(1.0) def check_beam(self): beam_ok = self.beam_ok if self.test: rem_time = self.test_time_seconds - (time.time() - self.test_start_seconds) if rem_time < 0: beam_ok = not beam_ok self.test_start_seconds = time.time() # end test after one instance self.test = not beam_ok if self.test and not beam_ok: logging.info("simulated beam restore in %g seconds.", self.test_time_seconds) if not self.test: logging.info("test done - resuming normal operation") else: beam_ok = True for monitor in self.monitors: beam_ok = beam_ok and monitor.beam_okay(self.beam_ok) logging.debug("check_beam() = %s", str(beam_ok)) return beam_ok def pause(self): for device in self.devices: device.poll() if self.controls_active: for device in self.devices: device.pause() def resume(self): for device in reversed(self.devices): device.poll() if self.controls_active: for device in reversed(self.devices): device.resume() ##### main ##### def main(): nll = logging.INFO logging.basicConfig(format="%(asctime)s %(levelname)s: %(message)s", level=nll) logging.info("PEARL Scan Guard") logging.info("version 1.1.3") guard = ScanGuard() guard.ringcurrent_trip_lo = 390.0 guard.ringcurrent_trip_hi = 410.0 guard.ringcurrent_dead_band = 1.0 # True = block acquisition while beam is down, False = do not block (e.g. for He lamp measurements) guard.controls_active = True # True = simulate beam down after a few seconds, False = normal operation guard.test = False global beam_ok guard.connect() try: guard.mainloop() beam_ok = True except KeyboardInterrupt: logging.info("scan guard aborted by user") beam_ok = True except: print sys.exc_info()[1] #This is to dispose created channels - otherwise, if channel is off, old channels remain and CPU consumption goes 100%. import _ca _ca.initialize() main()