use FlyerInterface for PGM OtF scan

This commit is contained in:
Xiaoqiang Wang 2023-03-13 10:18:11 +01:00
parent 5cc8cb7d73
commit b7ead905c0

View File

@ -1,12 +1,16 @@
"""
ophyd device classes for X07MA beamline
"""
from collections import OrderedDict
import time
from typing import Any
from ophyd import Component as Cpt
from ophyd import FormattedComponent as FCpt
from ophyd import Device, EpicsSignal, EpicsSignalRO, Kind, PVPositioner, EpicsMotor
from ophyd.flyers import FlyerInterface
from ophyd.status import DeviceStatus, SubscriptionStatus
from ophyd.pv_positioner import PVPositionerComparator
__all__ = [
@ -53,7 +57,7 @@ class PGMMonochromator(PVPositioner):
with_undulator = Cpt(EpicsSignal, "PHS-E:OPT", kind=Kind.config)
class PGMOtFScan(Device):
class PGMOtFScan(FlyerInterface, Device):
"""
PGM on-the-fly scan
"""
@ -63,8 +67,45 @@ class PGMOtFScan(Device):
time = Cpt(EpicsSignal, "TIME")
folder = Cpt(EpicsSignal, "FOLDER")
file = Cpt(EpicsSignal, "FILE")
start = Cpt(EpicsSignal, "START")
acquire = Cpt(EpicsSignal, "START", auto_monitor=True)
edata = Cpt(EpicsSignalRO, "EDATA", kind=Kind.hinted, auto_monitor=True)
data = Cpt(EpicsSignalRO, "DATA", kind=Kind.hinted, auto_monitor=True)
idata = Cpt(EpicsSignalRO, "IDATA", kind=Kind.hinted, auto_monitor=True)
fdata = Cpt(EpicsSignalRO, "FDATA", kind=Kind.hinted, auto_monitor=True)
def kickoff(self):
self._start_time = time.time()
self.acquire.put(1, use_complete=True)
status = DeviceStatus(self)
status.set_finished()
return status
def complete(self):
def check_value(*, old_value, value, **kwargs):
return (old_value == 1 and value == 0)
status = SubscriptionStatus(self.acquire, check_value, event_type=self.acquire.SUB_VALUE)
return status
def collect(self):
data = {
"time": self._start_time,
"data": {},
"timestamps": {}
}
for attr in ("edata", "data", "idata", "fdata"):
obj = getattr(self, attr)
data["data"][attr] = obj.value
data["timestamps"][attr] = obj.timestamp
return data
def describe(self):
desc = OrderedDict()
for attr in ("edata", "data", "idata", "fdata"):
desc.update(getattr(self, attr).describe())
desc[attr]["dtype"] = "array"
return desc
class VacuumValve(PVPositionerComparator):
"""