Files
x03da/script/scan_guard.py
gac-x03da 2cffc6be30 Closedown
2018-08-28 14:28:29 +02:00

917 lines
31 KiB
Python

#!/usr/bin/env python
"""
Scan guard utility for PEARL scans
This utility watches the scan facilities at X03DA
and responds to beam loss and other incidents:
IMPLEMENTED:
* Beam loss: Pause the sscan records.
* Beam loss: Pause the Scienta analyser.
* Beam loss: Repeat the last Scienta measurement when the beam is restored.
* Beam loss: Close/open the station shutter.
* Beam loss: Independent low and high trip levels.
* Beam loss: Beam can be okay only if beamline is "attended" or "unattended"
* Beam loss: Closed absorber or front end shutter will also trigger a beam loss
TODO:
Author:
Matthias Muntwiler
Created:
Oct 03, 2013
Copyright:
(c) 2013 Paul Scherrer Institut
"""
import time
import logging
import epicsPV
##### MonitoredDevice class #####
class MonitoredDevice(object):
"""
Base class for devices which detect beam loss/return.
"""
def __init__(self, name):
"""
self.name = device name from which the PV names can be derived.
self.pv = an epicsPV.epicsPV object connecting to an EPICS PV
self.device_active = the device is consulted to decide whether the beam is on or off.
self.severity = log level to report on beam loss (logging.WARNING, logging.ERROR, or logging.CRITICAL)
"""
self.name = name
self.pv = None
self.device_active = True
self.severity = logging.WARNING
self._loss_log_func = logging.warning
def connect(self):
"""
Connects to the EPICS PVs.
Name of PVs can be derived from self.name.
pend_io() should be done by caller.
"""
logging.info("setting up monitored device: %s", self.name)
if self.severity == logging.ERROR:
self._loss_log_func = logging.error
elif self.severity == logging.CRITICAL:
self._loss_log_func = logging.critical
else:
self._loss_log_func = logging.warning
def poll(self):
"""
Polls the PVs used by the class if necessary.
"""
pass
def set_monitors(self):
"""
Sets monitors on the PVs used by the class if necessary.
"""
pass
def beam_okay(self, last_beam_status):
"""
Returns True if the beam is okay from the perspective of this device, False otherwise.
"""
return False
##### TripDevice class #####
class TripDevice(MonitoredDevice):
"""
Base class for devices which detect beam loss/return by checking that a float value is within the allowed range.
"""
def __init__(self, name, trip_lo, trip_hi, dead_band):
"""
name = complete name of the EPICS PV to be monitored
trip_lo = beam loss is triggered if value falls below this level
trip_hi = beam loss is triggered if value rises above this level
dead_band = beam is restored if value returns to within trip_lo + dead_band ... trip_hi - dead_band
"""
super(TripDevice, self).__init__(name)
self.trip_lo = trip_lo
self.trip_hi = trip_hi
self.dead_band = dead_band
def connect(self):
self.pv = epicsPV.epicsPV(self.name, wait=0)
logging.info("%s low trip %g", self.name, self.trip_lo)
logging.info("%s high trip %g", self.name, self.trip_hi)
def set_monitors(self):
self.pv.array_get()
self.pv.setMonitor()
def poll(self):
logging.debug("caget %s: %g", self.pv.pvname, self.pv.getValue())
def beam_okay(self, last_beam_status):
if not self.device_active:
return True
val = self.pv.getValue()
if last_beam_status:
result = self.trip_lo <= val <= self.trip_hi
else:
result = self.trip_lo + self.dead_band <= val <= self.trip_hi - self.dead_band
logging.debug("beam_okay = %s (%g mA)", str(result), val)
if last_beam_status and not result:
self._loss_log_func("%s trip (%g mA)", self.name, val)
return result
##### BeamDrop class #####
class BeamDrop(MonitoredDevice):
def __init__(self, name):
super(BeamDrop, self).__init__(name)
def connect(self):
self.pv = epicsPV.epicsPV(self.name, wait=0)
def set_monitors(self):
self.pv.array_get()
self.pv.setMonitor()
def poll(self):
logging.debug("caget %s: %g", self.pv.pvname, self.pv.getValue())
def beam_okay(self, last_beam_status):
if not self.device_active:
return True
val = self.pv.getValue()
result = val == 0
logging.debug("operation_mode_okay() = %s (%g)", str(result), val)
if last_beam_status and not result:
self._loss_log_func("operation mode trip (%g)", val)
return result
##### BeamlineMode class #####
class BeamlineMode(MonitoredDevice):
OFFLINE = 0
ATTENDED = 1
UNATTENDED = 2
def __init__(self, name):
super(BeamlineMode, self).__init__(name)
def connect(self):
self.pv = epicsPV.epicsPV(self.name, wait=0)
def set_monitors(self):
self.pv.array_get()
self.pv.setMonitor()
def poll(self):
logging.debug("caget %s: %g", self.pv.pvname, self.pv.getValue())
def beam_okay(self, last_beam_status):
if not self.device_active:
return True
val = self.pv.getValue()
result = val == self.ATTENDED or val == self.UNATTENDED
logging.debug("beamline_mode_okay = %s (%g)", str(result), val)
return result
##### FrontendShutter class #####
class FrontendShutter(MonitoredDevice):
"""
Monitors a front end shutter.
This can be any on/off device with a {name}:OPEN_EPS readback
which reports 1 in the open state, and 0 in the closed state.
"""
CLOSED = 0
OPEN = 1
def __init__(self, name):
super(FrontendShutter, self).__init__(name)
def connect(self):
"""
Connects to the process variables.
name = name of the shutter, e.g. "X03DA-FE-PH1"
"""
MonitoredDevice.connect(self)
self.pv = epicsPV.epicsPV(self.name + ":OPEN_EPS", wait=0)
def set_monitors(self):
self.pv.array_get()
self.pv.setMonitor()
def poll(self):
"""
Polls the shutter position
"""
logging.debug("caget %s: %g", self.pv.pvname, self.pv.getValue())
def beam_okay(self, last_beam_status):
"""
Returns True if the shutter is open, False otherwise.
"""
if not self.device_active:
return True
val = self.pv.getValue()
result = val == self.OPEN
logging.debug("shutter %s = %s (%g)", self.name, str(result), val)
if last_beam_status and not result:
self._loss_log_func("shutter trip %s", self.name)
return result
##### ControlledDevice class #####
class ControlledDevice(object):
"""
Base class for devices which are controlled in response to beam loss/return.
"""
def __init__(self, name):
"""
self.name = device name from which the PV names can be derived.
self.device_active = the device participated in the measurement before it was paused.
self.paused = the device is paused due to a call to the self.pause() method.
"""
self.name = name
self.device_active = False
self.paused = False
def connect(self):
"""
Connects to the EPICS PVs.
Name of PVs can be derived from self.name.
pend_io() should be done by caller.
"""
logging.info("setting up controlled device: %s", self.name)
pass
def poll(self):
"""
Polls the PVs used by the class if necessary.
"""
pass
def set_monitors(self):
"""
Sets monitors on the PVs used by the class if necessary.
"""
pass
def pause(self):
"""
Pauses the device, e.g. in response to beam loss.
The method should update self.device_active and self.paused.
"""
pass
def resume(self):
"""
Resumes the device.
This method should act only if the previous pause() actually changed the device,
i.e. self.pause == True.
The method must reset self.paused.
"""
pass
##### ScanRecord class #####
class ScanRecord(ControlledDevice):
def __init__(self, name):
"""
self.faze: epicsPV.epicsPV connecting to the FAZE field of the scan record.
self.wait: epicsPV.epicsPV connecting to the WAIT field of the scan record.
"""
super(ScanRecord, self).__init__(name)
self.faze = None
self.wait = None
def connect(self):
"""
Connects to the process variables of the scan record
name = complete name of the scan record
"""
ControlledDevice.connect(self)
self.faze = epicsPV.epicsPV(self.name + '.FAZE', wait=0)
self.wait = epicsPV.epicsPV(self.name + '.WAIT', wait=0)
def set_monitors(self):
# before we can set a monitor, we have to get the value once.
self.faze.array_get()
self.faze.setMonitor()
self.wait.array_get()
def poll(self):
logging.debug("caget %s: %g", self.faze.pvname, self.faze.getValue())
logging.debug("caget %s: %g", self.wait.pvname, self.wait.getValue())
def pause(self):
self.device_active = self.faze.getValue() >= 1
if self.device_active:
logging.info("pausing scan on %s", self.name)
self.wait.array_put(1)
self.wait.pend_io()
self.paused = True
else:
self.paused = False
def resume(self):
if self.paused:
# do not resume if the user has aborted the scan
if self.faze.getValue() >= 1:
logging.info("resuming scan on %s", self.name)
self.wait.array_put(0)
self.wait.pend_io()
else:
logging.warning("scan on %s aborted by user", self.name)
self.paused = False
##### AreaDetector class #####
class AreaDetector(ControlledDevice):
"""
This class pauses and resumes an Area Detector device.
While pausing, array callbacks are disabled
so that the incomplete measurement is not passed to the plugins.
If the detector resumes before the scan advances,
the incomplete measurement is effectively repeated.
Pause: Callbacks are disabled. Acquisition is aborted.
Resume: Callbacks are enabled. Acquisition is restarted if it was interrupted.
The detector is paused only if all of the following conditions are true,
otherwise pause() will have no effect:
1) Array callbacks are enabled on cam1
2) HDF1 plugin is capturing
resume() has no effect unless the previous pause() stopped the detector.
"""
def __init__(self, name):
super(AreaDetector, self).__init__(name)
# indicates whether we stopped a running acquisition when the pause started
self.acq_active = False
self.ArrayCallbacks = None
self.DetectorState = None
self.Acquire = None
self.AcquireTime = None
self.Capturing = None
def connect(self):
"""
Connects to the process variables.
name = prefix of the area detector (everything before "cam1", e.g. "X03DA-SCIENTA")
"""
ControlledDevice.connect(self)
self.ArrayCallbacks = epicsPV.epicsPV(self.name + ':cam1:ArrayCallbacks', wait=0)
self.DetectorState = epicsPV.epicsPV(self.name + ':cam1:DetectorState_RBV', wait=0)
self.Acquire = epicsPV.epicsPV(self.name + ':cam1:Acquire', wait=0)
self.AcquireTime = epicsPV.epicsPV(self.name + ':cam1:AcquireTime_RBV', wait=0)
self.Capturing = epicsPV.epicsPV(self.name + ':HDF1:Capture_RBV', wait=0)
def poll(self):
"""
Polls the detector state.
This class doesn't set monitors on the PVs.
Thus, we have to explicitly poll them before reading their values.
"""
self.DetectorState.array_get()
logging.debug("caget %s: %g", self.DetectorState.pvname, self.DetectorState.getValue())
self.ArrayCallbacks.array_get()
logging.debug("caget %s: %g", self.ArrayCallbacks.pvname, self.ArrayCallbacks.getValue())
self.Capturing.array_get()
logging.debug("caget %s: %g", self.Capturing.pvname, self.Capturing.getValue())
def pause(self):
"""
Pauses the area detector.
The goal is that no corrupted data gets into a data file in stream mode.
The detector is considered active if the HDF5 plugin is in capture mode,
and if the array callbacks are enabled. In this case,
1) the callbacks are disabled so that the corrupted data is not passed on to the plugins,
2) the acquisition is stopped if running.
"""
self.device_active = ((self.ArrayCallbacks.getValue() == 1) and
(self.Capturing.getValue() == 1))
self.acq_active = self.DetectorState.getValue() == 1
logging.debug("caget %s: %g", self.DetectorState.pvname, self.DetectorState.getValue())
logging.debug("acq_active = %s", str(self.acq_active))
if self.device_active:
logging.info("disabling callbacks on %s", self.name)
self.ArrayCallbacks.array_put(0)
self.Acquire.pend_io()
logging.info("stopping acquisition on %s", self.name)
self.Acquire.array_put(0)
self.Acquire.pend_io()
self.paused = True
else:
logging.debug("device %s not active", self.name)
self.paused = False
def resume(self):
"""
Resumes the area detector.
1) The callbacks are re-enabled.
2) If the acquisition was interruped by the pause, it is restarted
unless the user has stopped the capture mode.
"""
if self.paused:
self.AcquireTime.array_get()
timeout = self.AcquireTime.getValue() * 2.0 + 10.0
self.wait_idle(timeout=timeout)
logging.info("re-enabling callbacks on %s", self.name)
self.ArrayCallbacks.array_put(1)
logging.debug("acq_active = %s", str(self.acq_active))
self.Capturing.array_get()
if self.Capturing.getValue() >= 1:
# note: we need to restart the acquisition regardless of previous state.
# if self.acq_active:
logging.info("repeating acquisition on %s", self.name)
self.Acquire.array_put(1)
self.Acquire.pend_io()
if self.wait_acquire(timeout=10.0):
self.wait_idle(timeout=timeout)
else:
# user stopped the capture
logging.warning("file capture on %s aborted by user", self.name)
self.paused = False
def wait_idle(self, timeout=60.0):
"""
waits for the detector to become idle.
returns True if detector is idle at the end,
False if the timeout exceeded while the detector is not idle.
"""
self.DetectorState.array_get()
if self.DetectorState.getValue() == 1:
logging.info("waiting until acquisition stops on %s", self.name)
if self.DetectorState.getValue() > 1:
logging.error("unexpected detector state on %s: %g", self.name, self.DetectorState.getValue())
while (self.DetectorState.getValue() >= 1) and (timeout > 0.0):
self.DetectorState.pend_event(timeout=1.0)
self.DetectorState.array_get()
timeout -= 1.0
if timeout <= 0.0:
logging.error("timeout while waiting for detector %s to become idle. current state: %g", self.name,
self.DetectorState.getValue())
return False
else:
return True
def wait_acquire(self, timeout=60.0):
"""
waits for the detector to start acquisition.
returns True if detector is acquiring at the end,
False if the timeout exceeded while the detector is idle.
"""
self.DetectorState.array_get()
if self.DetectorState.getValue() < 1:
logging.info("waiting until acquisition starts on %s", self.name)
if self.DetectorState.getValue() > 1:
logging.error("unexpected detector state on %s: %g", self.name, self.DetectorState.getValue())
while (self.DetectorState.getValue() < 1) and (timeout > 0.0):
self.DetectorState.pend_event(timeout=1.0)
self.DetectorState.array_get()
timeout -= 1.0
if timeout <= 0.0:
logging.error("timeout while waiting for detector %s. current state: %g", self.name,
self.DetectorState.getValue())
return False
else:
return True
##### AnalogChannels class #####
class AnalogChannels(ControlledDevice):
"""
This class pauses and resumes the Analog Channels device.
We don't have much control over this devices. All we can do is trigger it.
Triggering is allowed at any time even if the previous interval hasn't ended.
Pause: no action.
Resume: Device is triggered.
resume() has no effect unless the previous pause() stopped the detector.
"""
def __init__(self, name):
super(AnalogChannels, self).__init__(name)
self.Trigger = None
def connect(self):
"""
Connects to the process variables.
name = prefix of the analog channels (everything before the colon ":", e.g. "X03DA-OP-10ADC")
"""
ControlledDevice.connect(self)
self.Trigger = epicsPV.epicsPV(self.name + ':TRG', wait=0)
def poll(self):
"""
Polls the detector state.
This class doesn't set monitors on the PVs.
Thus, we have to explicitly poll them before reading their values.
"""
self.Trigger.array_get()
logging.debug("caget %s: %g", self.Trigger.pvname, self.Trigger.getValue())
def pause(self):
"""
Pauses the detector.
This method does not change the state of the device.
It is not possible (and not necessary) to stop the device.
Writing any value to self.Trigger will trigger a new measurement at any time.
"""
self.device_active = True
self.paused = True
def resume(self):
"""
Triggers the detector.
"""
if self.paused:
logging.info("triggering analog channels %s", self.name)
self.Trigger.array_put(1)
self.paused = False
##### StationShutter class #####
class StationShutter(ControlledDevice):
CLOSED = 2
MOVING = 4
OPEN = 5
def __init__(self, name):
super(StationShutter, self).__init__(name)
self.ShutterOpen = None
self.ShutterClose = None
self.ShutterRbv = None
def connect(self):
"""
Connects to the process variables.
name = name of the station shutter, e.g. "X03DA-OP-ST1"
"""
ControlledDevice.connect(self)
self.ShutterOpen = epicsPV.epicsPV(self.name + ":WT_SET_OPEN", wait=0)
self.ShutterClose = epicsPV.epicsPV(self.name + ":WT_SET_CLOSE", wait=0)
self.ShutterRbv = epicsPV.epicsPV(self.name + ":POSITION", wait=0)
def set_monitors(self):
self.ShutterRbv.array_get()
self.ShutterRbv.setMonitor()
def poll(self):
"""
Polls the shutter position
"""
logging.debug("caget %s: %g", self.ShutterRbv.pvname, self.ShutterRbv.getValue())
def pause(self):
"""
Closes the shutter.
"""
self.device_active = self.ShutterRbv.getValue() == self.OPEN
if self.device_active:
logging.info("closing shutter %s", self.name)
self.set_shutter(self.CLOSED)
self.paused = True
else:
self.paused = False
def resume(self):
"""
Opens the shutter if it was open before the pause.
"""
if self.paused:
logging.info("opening shutter %s", self.name)
self.set_shutter(self.OPEN)
self.paused = False
def set_shutter(self, newpos):
"""
Opens or closes the shutter.
newpos = StationShutter.OPEN or StationShutter.CLOSED
"""
# try 5 times before giving up
tries = 5
while (self.ShutterRbv.getValue() != newpos) and (tries > 0):
oldpos = self.ShutterRbv.getValue()
curpos = oldpos
logging.debug("shutter position before trigger: %g", curpos)
# send the command sequence
self.ShutterOpen.array_put(0)
self.ShutterClose.array_put(0)
self.ShutterRbv.pend_event(timeout=0.01)
if newpos == self.OPEN:
self.ShutterOpen.array_put(1)
else:
self.ShutterClose.array_put(1)
self.ShutterRbv.pend_event(timeout=0.5)
self.ShutterOpen.array_put(0)
self.ShutterClose.array_put(0)
self.ShutterRbv.pend_event(timeout=0.01)
curpos = self.ShutterRbv.getValue()
logging.debug("shutter position after command: %g", curpos)
# wait until it starts moving
timeout = 2.0
while (self.ShutterRbv.getValue() == oldpos) and (timeout > 0.0):
self.ShutterRbv.pend_event(timeout=0.1)
timeout -= 0.1
# wait while it is moving
timeout = 5.0
while (self.ShutterRbv.getValue() == self.MOVING) and (timeout > 0.0):
self.ShutterRbv.pend_event(timeout=0.1)
timeout -= 0.1
# wait longer if the readback is unexpected
timeout = 10.0
while (self.ShutterRbv.getValue() not in [self.OPEN, self.CLOSED]) and (timeout > 0.0):
self.ShutterRbv.pend_event(timeout=0.5)
timeout -= 0.5
curpos = self.ShutterRbv.getValue()
logging.debug("shutter position after wait: %g", curpos)
tries -= 1
curpos = self.ShutterRbv.getValue()
result = curpos == newpos
if not result:
logging.error("failed to set station shutter (requested %g, actual %g)", newpos, curpos)
return result
###Global flag
class GlobalFlag(ControlledDevice):
def __init__(self):
ControlledDevice.__init__(self,"Global Flag")
global beam_ok
beam_ok = True
def pause(self):
"""
"""
global beam_ok
beam_ok = False
def resume(self):
"""
Opens the shutter if it was open before the pause.
"""
global beam_ok
beam_ok = True
##### ScanGuard class #####
class ScanGuard(object):
"""
Info strings for operation mode.
Similar to those used by ACOAU-ACCU:OP-MODE without spaces and punctuation.
Notes:
OP-MODE = 5 is signalled for orbit-feedback problems.
"""
OPMODES = ["MachineDown", "InjStopped", "Accumulating_", "Accumulating", "TopUpReady", "LightAvailable_",
"LightAvailable"]
def __init__(self):
# list of epicsPV objects monitored by this class
self.monitors = []
# list of ControlledDevice objects to be controlled
self.devices = []
# False = test mode, do not actually send out commands
self.controls_active = False
# True = beam is okay
self.beam_ok = False
# True while executing the main loop
self.running = False
# For test and demo, simulate a beam loss after 15 seconds.
self.test = False
self.test_time_seconds = 15
self.test_start_seconds = time.time()
# for testing, use a local PV
# self._ringcurrent = TripDevice('X03DA-OP-EXS:AP', 10.0, 500.0, 10.0)
self._ringcurrent = TripDevice('ARIDI-PCT:CURRENT', 390.0, 410.0, 10.0)
self.monitors.append(self._ringcurrent)
# guard against erratic motors
self._fmu_rx = TripDevice('X03DA-OP-FMU:oRx', -1.0, 1.0, 0.1)
self._fmu_rx.severity = logging.ERROR
self.monitors.append(self._fmu_rx)
self._fmu_ry = TripDevice('X03DA-OP-FMU:oRy', -1.1, 0.9, 0.1)
self._fmu_ry.severity = logging.ERROR
self.monitors.append(self._fmu_ry)
self._fmu_rz = TripDevice('X03DA-OP-FMU:oRz', -0.5, 0.5, 0.1)
self._fmu_rz.severity = logging.ERROR
self.monitors.append(self._fmu_rz)
# self.operation_mode = OperationMode('ACOAU-ACCU:OP-MODE')
# self.monitors.append(self.operation_mode)
self._beam_drop = BeamDrop('ACOAU-ACCU:OP-BEAMDROP')
self.monitors.append(self._beam_drop)
self._beamline_mode = BeamlineMode('ACOAU-ACCU:OP-X03DA')
self.monitors.append(self._beamline_mode)
self._frontend_shutter = FrontendShutter('X03DA-FE-PH1')
self.monitors.append(self._frontend_shutter)
self._frontend_absorber = FrontendShutter('X03DA-FE-AB1')
self.monitors.append(self._frontend_absorber)
# devices are paused in the order set here
# they are resumed in reversed order
self.devices.append(ScanRecord('X03DA-PC:scan1'))
self.devices.append(ScanRecord('X03DA-PC:scan2'))
self.devices.append(ScanRecord('X03DA-PC:scan3'))
self.devices.append(ScanRecord('X03DA-PC:scan4'))
# how can we detect whether channels are available?
self.devices.append(AreaDetector('X03DA-SCIENTA'))
self.devices.append(AnalogChannels('X03DA-OP-10ADC'))
self.devices.append(StationShutter('X03DA-OP-ST1'))
self.devices.append(GlobalFlag())
@property
def ringcurrent_trip_lo(self):
"""
trip if ring current falls below this level.
"""
return self._ringcurrent.trip_lo
@ringcurrent_trip_lo.setter
def ringcurrent_trip_lo(self, value):
self._ringcurrent.trip_lo = value
@property
def ringcurrent_trip_hi(self):
"""
trip if ring current rises above this level.
"""
return self._ringcurrent.trip_hi
@ringcurrent_trip_hi.setter
def ringcurrent_trip_hi(self, value):
self._ringcurrent.trip_hi = value
@property
def ringcurrent_dead_band(self):
"""
resume if ring current returns above ringcurrent_trip_lo + ringcurrent_dead_band.
"""
return self._ringcurrent.dead_band
@ringcurrent_dead_band.setter
def ringcurrent_dead_band(self, value):
self._ringcurrent.dead_band = value
def connect(self):
for monitor in self.monitors:
monitor.connect()
for device in self.devices:
device.connect()
# Wait for all PVs to connect
try:
self._ringcurrent.pv.pend_io()
except:
logging.error('EPICS channels not available')
return False
# get first value, initialize data structures, set monitors
for monitor in self.monitors:
monitor.set_monitors()
monitor.poll()
for device in self.devices:
device.set_monitors()
device.poll()
return True
def mainloop(self):
self._ringcurrent.pv.pend_io(timeout=0.01)
#self.beam_ok = self.test or self.check_beam()
self.beam_ok = True
self.test_start_seconds = time.time()
self.running = True
if self.test:
logging.info("test mode. simulated beam loss in %g seconds.", self.test_time_seconds)
while self.running:
self._ringcurrent.pv.poll() #pend_event(timeout=1)
new_beam_ok = self.check_beam()
if new_beam_ok != self.beam_ok:
self.beam_ok = new_beam_ok
if new_beam_ok:
logging.warning("beam restored")
self.resume()
else:
logging.warning("beam down")
self.pause()
time.sleep(1.0)
def check_beam(self):
beam_ok = self.beam_ok
if self.test:
rem_time = self.test_time_seconds - (time.time() - self.test_start_seconds)
if rem_time < 0:
beam_ok = not beam_ok
self.test_start_seconds = time.time()
# end test after one instance
self.test = not beam_ok
if self.test and not beam_ok:
logging.info("simulated beam restore in %g seconds.", self.test_time_seconds)
if not self.test:
logging.info("test done - resuming normal operation")
else:
beam_ok = True
for monitor in self.monitors:
beam_ok = beam_ok and monitor.beam_okay(self.beam_ok)
logging.debug("check_beam() = %s", str(beam_ok))
return beam_ok
def pause(self):
for device in self.devices:
device.poll()
if self.controls_active:
for device in self.devices:
device.pause()
def resume(self):
for device in reversed(self.devices):
device.poll()
if self.controls_active:
for device in reversed(self.devices):
device.resume()
##### main #####
def main():
nll = logging.INFO
logging.basicConfig(format="%(asctime)s %(levelname)s: %(message)s", level=nll)
logging.info("PEARL Scan Guard")
logging.info("version 1.1.3")
guard = ScanGuard()
guard.ringcurrent_trip_lo = 390.0
guard.ringcurrent_trip_hi = 410.0
guard.ringcurrent_dead_band = 1.0
# True = block acquisition while beam is down, False = do not block (e.g. for He lamp measurements)
guard.controls_active = True
# True = simulate beam down after a few seconds, False = normal operation
guard.test = False
global beam_ok
guard.connect()
try:
guard.mainloop()
beam_ok = True
except KeyboardInterrupt:
logging.info("scan guard aborted by user")
beam_ok = True
except:
print sys.exc_info()[1]
#This is to dispose created channels - otherwise, if channel is off, old channels remain and CPU consumption goes 100%.
import _ca
_ca.initialize()
main()