next some base classed copied and commented to local scripts

This commit is contained in:
gac-x07mb
2024-08-21 17:52:24 +02:00
parent 7359d1b8f6
commit 20aaa069de
8 changed files with 3965 additions and 243 deletions

View File

@ -1,243 +0,0 @@
from ophyd import (
ADComponent as ADCpt,
Device,
DeviceStatus,
)
from ophyd import Component as Cpt
from ophyd import Device, EpicsSignal, EpicsSignalRO
from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase, CustomDetectorMixin
from bec_lib import bec_logger, messages
from bec_lib.endpoints import MessageEndpoints
logger = bec_logger.logger
DETECTOR_TIMEOUT = 5
class PhoenixTriggerError(Exception):
"""Base class for exceptions in this module."""
class PhoenixTriggerTimeoutError(XMAPError):
"""Raised when the PhoenixTrigger does not respond in time."""
class PhoenixTriggerDetectorState(enum.IntEnum):
"""Detector states for XMAP detector"""
DONE = 0
ACQUIRING = 1
class PhoenixTriggerSetup(CustomDetectorMixin):
"""
This defines the trigger setup.
"""
def __init__(self, *args, parent:Device = None, **kwargs):
super().__init__(*args, parent=parent, **kwargs)
self._counter = 0
def on_stage(self):
exposure_time = self.parent.scaninfo.exp_time
num_points = self.parent.scaninfo.num_points
# camera acquisition parameters
self.parent.cam.array_counter.put(0)
if self.parent.scaninfo.scan_type == 'step':
self.parent.cam.acquire_time.put(exposure_time)
self.parent.cam.num_images.put(1)
self.parent.cam.image_mode.put(0) # Single
self.parent.cam.trigger_mode.put(0) # auto
else:
# In flyscan, the exp_time is the time between two triggers,
# which minus 15% is used as the acquisition time.
self.parent.cam.acquire_time.put(exposure_time * 0.85)
self.parent.cam.num_images.put(num_points)
self.parent.cam.image_mode.put(1) # Multiple
self.parent.cam.trigger_mode.put(1) # trigger
self.parent.cam.acquire.put(1, wait=False) # arm
# file writer
self.parent.hdf.lazy_open.put(1)
self.parent.hdf.num_capture.put(num_points)
self.parent.hdf.file_write_mode.put(2) # Stream
self.parent.hdf.capture.put(1, wait=False)
self.parent.hdf.enable.put(1) # enable plugin
# roi statistics to collect signal and background in a timeseries
self.parent.roistat.enable.put(1)
self.parent.roistat.ts_num_points.put(num_points)
self.parent.roistat.ts_control.put(0, wait=False) # Erase/Start
logger.success('XXXX stage XXXX')
def on_trigger(self):
self.parent.cam.acquire.put(1, wait=False)
logger.success('XXXX trigger XXXX')
return self.wait_with_status(
[(self.parent.cam.acquire.get, 0)],
self.parent.scaninfo.exp_time + DETECTOR_TIMEOUT,
all_signals=True
)
def on_complete(self):
status = DeviceStatus(self.parent)
if self.parent.scaninfo.scan_type == 'step':
timeout = DETECTOR_TIMEOUT
else:
timeout = self.parent.scaninfo.exp_time * self.parent.scaninfo.num_points + DETECTOR_TIMEOUT
logger.success('XXXX %s XXXX' % self.parent.roistat.ts_acquiring.get())
success = self.wait_for_signals(
[
(self.parent.cam.acquire.get, 0),
(self.parent.hdf.capture.get, 0),
(self.parent.roistat.ts_acquiring.get, 'Done')
],
timeout,
check_stopped=True,
all_signals=True
)
# publish file location
self.parent.filepath.put(self.parent.hdf.full_file_name.get())
self.publish_file_location(done=True, successful=success)
# publish timeseries data
metadata = self.parent.scaninfo.scan_msg.metadata
metadata.update({"async_update": "append", "num_lines": self.parent.roistat.ts_current_point.get()})
msg = messages.DeviceMessage(
signals={
self.parent.roistat.roi1.name_.get(): {
'value': self.parent.roistat.roi1.ts_total.get(),
},
self.parent.roistat.roi2.name_.get(): {
'value': self.parent.roistat.roi2.ts_total.get(),
},
},
metadata=self.parent.scaninfo.scan_msg.metadata
)
self.parent.connector.xadd(
topic=MessageEndpoints.device_async_readback(
scan_id=self.parent.scaninfo.scan_id, device=self.parent.name
),
msg_dict={"data": msg},
expire=1800,
)
logger.success('XXXX complete %d XXXX' % success)
if success:
status.set_finished()
else:
status.set_exception(TimeoutError())
return status
def on_stop(self):
logger.success('XXXX stop XXXX')
self.parent.cam.acquire.put(0)
self.parent.hdf.capture.put(0)
self.parent.roistat.ts_control.put(2)
def on_unstage(self):
self.parent.cam.acquire.put(0)
self.parent.hdf.capture.put(0)
self.parent.roistat.ts_control.put(2)
logger.success('XXXX unstage XXXX')
class EigerROIStatPlugin(ROIStatPlugin):
roi1 = ADCpt(ROIStatNPlugin, '1:')
roi2 = ADCpt(ROIStatNPlugin, '2:')
class PhoenixTrigger(PSIDetectorBase):
"""
Parent class: PSIDetectorBase
class attributes:
custom_prepare_cls (XMAPSetup) : Custom detector setup class for cSAXS,
inherits from CustomDetectorMixin
in __init__ of PSIDetecor bases
class is initialized
self.custom_prepare = self.custom_prepare_cls(parent=self, **kwargs)
PSIDetectorBase.set_min_readout (float) : Minimum readout time for the detector
dxp (EpicsDXPXMAP) : DXP parameters for XMAP detector
mca (EpicsMCARecord) : MCA parameters for XMAP detector
hdf5 (XMAPHDF5Plugins) : HDF5 parameters for XMAP detector
MIN_READOUT (float) : Minimum readout time for the detector
The class PhoenixTrigger is the class to be called via yaml configuration file
the input arguments are defined by PSIDetectorBase,
and need to be given in the yaml configuration file.
To adress chanels such as 'X07MB-OP2:SMPL-DONE':
use prefix 'X07MB-OP2:' in the device definition in the yaml configuration file.
PSIDetectorBase(
prefix='',
*,Q
name,
kind=None,
parent=None,
device_manager=None,
**kwargs,
)
Docstring:
Abstract base class for SLS detectors
Class attributes:
custom_prepare_cls (object): class for custom prepare logic (BL specific)
Args:
prefix (str): EPICS PV prefix for component (optional)
name (str): name of the device, as will be reported via read()
kind (str): member of class 'ophydobj.Kind', defaults to Kind.normal
omitted -> readout ignored for read 'ophydobj.read()'
normal -> readout for read
config -> config parameter for 'ophydobj.read_configuration()'
hinted -> which attribute is readout for read
parent (object): instance of the parent device
device_manager (object): bec device manager
**kwargs: keyword arguments
File: /data/test/x07mb-test-bec/bec_deployment/ophyd_devices/ophyd_devices/interfaces/base_classes/psi_detector_base.py
Type: type
Subclasses: SimCamera, SimMonitorAsync
"""
#custom_prepare_cls = PhoenixTriggerSetup
#cam = ADCpt(SLSDetectorCam, 'cam1:')
#X07MB-OP2:START-CSMPL.. cont on / off
# X07MB-OP2:SMPL.. take single sample
#X07MB-OP2:INTR-COUNT.. counter run up
#X07MB-OP2:TOTAL-CYCLES .. cycles set
#X07MB-OP2:SMPL-DONE
QUESTION HOW does ADCpt kno the EPICS prefix??????
#image = ADCpt(ImagePlugin, 'image1:')
#roi1 = ADCpt(ROIPlugin, 'ROI1:')
#roi2 = ADCpt(ROIPlugin, 'ROI2:')
#stats1 = ADCpt(StatsPlugin, 'Stats1:')
#stats2 = ADCpt(StatsPlugin, 'Stats2:')
roistat = ADCpt(EigerROIStatPlugin, 'ROIStat1:')
#roistat1 = ADCpt(ROIStatNPlugin, 'ROIStat1:1:')
#roistat2 = ADCpt(ROIStatNPlugin, 'ROIStat1:2:')
hdf = ADCpt(HDF5Plugin, 'HDF1:')
)

View File

@ -0,0 +1,182 @@
from ophyd import (
ADComponent as ADCpt,
Device,
DeviceStatus,
)
from ophyd import Component as Cpt
from ophyd import Device, EpicsSignal, EpicsSignalRO
from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase, CustomDetectorMixin
from bec_lib import bec_logger, messages
from bec_lib.endpoints import MessageEndpoints
import time
logger = bec_logger.logger
DETECTOR_TIMEOUT = 5
#class PhoenixTriggerError(Exce start_csmpl=Cpt(EPicsSignal,'START-CSMPL') # cont on / off
class PhoenixTriggerSetup(CustomDetectorMixin):
"""
This defines the PHOENIX trigger setup.
"""
def __init__(self, *args, parent:Device = None, **kwargs):
super().__init__(*args, parent=parent, **kwargs)
self._counter = 0
WW
def on_stage(self):
# is this called on each point in scan or just before scan ???
print('on stage')
self.parent.start_csmpl.put(0)
time.sleep(0.05)
cycles=self.parent.total_cycles.get()
time.sleep(0.05)
cycles=self.parent.total_cycles.put(0)
time.sleep(0.05)
cycles=self.parent.smpl.put(2)
time.sleep(0.5)
cycles=self.parent.total_cycles.put(cycles)
logger.success('PhoenixTrigger on stage')
def on_trigger(self):
self.parent.start_smpl.put(1)
time.sleep(0.05) # use blocking
logger.success('PhoenixTrigger on_trigger')
return self.wait_with_status(
[(self.parent.smpl_done.get, 1)])
# logger.success(' PhoenixTrigger on_trigger complete ')
# if success:
# status.set_finished()
# else:
# status.set_exception(TimeoutError())
# return status
def on_complete(self):
timeout =10
logger.success('XXXX complete %d XXXX' % success)
success = self.wait_for_signals(
[
(self.parent.smpl_done.get, 0))
],
timeout,
check_stopped=True,
all_signals=True
)
if success:
status.set_finished()
else:
status.set_exception(TimeoutError())
return status
def on_stop(self):
logger.success(' PhoenixTrigger on_stop ')
self.parent.csmpl.put(1)
logger.success(' PhoenixTrigger on_stop finished ')
def on_unstage(self):
logger.success(' PhoenixTrigger on_unstage ')
self.parent.csmpl.put(1)
self.parent.smpl.put(1)
logger.success(' PhoenixTrigger on_unstage finished ')
class PhoenixTrigger(PSIDetectorBase):
"""
Parent class: PSIDetectorBase
class attributes:
custom_prepare_cls (XMAPSetup) : Custom detector setup class for cSAXS,
inherits from CustomDetectorMixin
in __init__ of PSIDetecor bases
class is initialized
self.custom_prepare = self.custom_prepare_cls(parent=self, **kwargs)
PSIDetectorBase.set_min_readout (float) : Minimum readout time for the detector
dxp (EpicsDXPXMAP) : DXP parameters for XMAP detector
mca (EpicsMCARecord) : MCA parameters for XMAP detector
hdf5 (XMAPHDF5Plugins) : HDF5 parameters for XMAP detector
MIN_READOUT (float) : Minimum readout time for the detector
The class PhoenixTrigger is the class to be called via yaml configuration file
the input arguments are defined by PSIDetectorBase,
and need to be given in the yaml configuration file.
To adress chanels such as 'X07MB-OP2:SMPL-DONE':
use prefix 'X07MB-OP2:' in the device definition in the yaml configuration file.
PSIDetectorBase(
prefix='',
*,Q
name,
kind=None,
parent=None,
device_manager=None,
**kwargs,
)
Docstring:
Abstract base class for SLS detectors
Class attributes:
custom_prepare_cls (object): class for custom prepare logic (BL specific)
Args:
prefix (str): EPICS PV prefix for component (optional)
name (str): name of the device, as will be reported via read()
kind (str): member of class 'ophydobj.Kind', defaults to Kind.normal
omitted -> readout ignored for read 'ophydobj.read()'
normal -> readout for read
config -> config parameter for 'ophydobj.read_configuration()'
hinted -> which attribute is readout for read
parent (object): instance of the parent device
device_manager (object): bec device manager
**kwargs: keyword arguments
File: /data/test/x07mb-test-bec/bec_deployment/ophyd_devices/ophyd_devices/interfaces/base_classes/psi_detector_base.py
Type: type
Subclasses: EpicsSignal
"""
custom_prepare_cls = PhoenixTriggerSetup
start_csmpl = Cpt(EpicsSignal,'START-CSMPL') # cont on / off
intr_count = Cpt(EpicsSignal,'INTR-COUNT') # conter run up
total_cycles = Cpt(EpicsSignal,'TOTAL-CYCLES') # cycles set
smpl_done = Cpt(EpicsSignal,'SMPL-DONE') # show trigger is done

View File

@ -0,0 +1,510 @@
import enum
import time
from typing import Any
from bec_lib import bec_logger
from ophyd import (
Component,
Device,
DeviceStatus,
EpicsSignal,
EpicsSignalRO,
Kind,
PVPositioner,
Signal,
)
from ophyd.device import Staged
from ophyd.pseudopos import (
PseudoPositioner,
PseudoSingle,
pseudo_position_argument,
real_position_argument,
)
from ophyd_devices.utils import bec_utils
from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin
logger = bec_logger.logger
class DelayGeneratorError(Exception):
"""Exception raised for errors."""
class DeviceInitError(DelayGeneratorError):
"""Error upon failed initialization, invoked by missing device manager or device not started in sim_mode."""
class DelayGeneratorNotOkay(DelayGeneratorError):
"""Error when DDG is not okay"""
class TriggerSource(enum.IntEnum):
"""
Class for trigger options of DG645
Used to set the trigger source of the DG645 by setting the value
e.g. source.put(TriggerSource.Internal)
Exp:
TriggerSource.Internal
"""
INTERNAL = 0
EXT_RISING_EDGE = 1
EXT_FALLING_EDGE = 2
SS_EXT_RISING_EDGE = 3
SS_EXT_FALLING_EDGE = 4
SINGLE_SHOT = 5
LINE = 6
class DelayStatic(Device):
"""
Static axis for the T0 output channel
It allows setting the logic levels, but the timing is fixed.
The signal is high after receiving the trigger until the end
of the holdoff period.
"""
# Other channel stuff
ttl_mode = Component(EpicsSignal, "OutputModeTtlSS.PROC", kind=Kind.config)
nim_mode = Component(EpicsSignal, "OutputModeNimSS.PROC", kind=Kind.config)
polarity = Component(
EpicsSignal,
"OutputPolarityBI",
write_pv="OutputPolarityBO",
name="polarity",
kind=Kind.config,
)
amplitude = Component(
EpicsSignal, "OutputAmpAI", write_pv="OutputAmpAO", name="amplitude", kind=Kind.config
)
offset = Component(
EpicsSignal, "OutputOffsetAI", write_pv="OutputOffsetAO", name="offset", kind=Kind.config
)
class DummyPositioner(PVPositioner):
"""Dummy Positioner to set AO, AI and ReferenceMO."""
setpoint = Component(EpicsSignal, "DelayAO", put_complete=True, kind=Kind.config)
readback = Component(EpicsSignalRO, "DelayAI", kind=Kind.config)
done = Component(Signal, value=1)
reference = Component(EpicsSignal, "ReferenceMO", put_complete=True, kind=Kind.config)
class DelayPair(PseudoPositioner):
"""
Delay pair interface
Virtual motor interface to a pair of signals (on the frontpanel - AB/CD/EF/GH).
It offers a simple delay and pulse width interface.
"""
# The pseudo positioner axes
delay = Component(PseudoSingle, limits=(0, 2000.0), name="delay")
width = Component(PseudoSingle, limits=(0, 2000.0), name="pulsewidth")
ch1 = Component(DummyPositioner, name="ch1")
ch2 = Component(DummyPositioner, name="ch2")
io = Component(DelayStatic, name="io")
def __init__(self, *args, **kwargs):
# Change suffix names before connecting (a bit of dynamic connections)
self.__class__.__dict__["ch1"].suffix = kwargs["channel"][0]
self.__class__.__dict__["ch2"].suffix = kwargs["channel"][1]
self.__class__.__dict__["io"].suffix = kwargs["channel"]
del kwargs["channel"]
# Call parent to start the connections
super().__init__(*args, **kwargs)
@pseudo_position_argument
def forward(self, pseudo_pos):
"""Run a forward (pseudo -> real) calculation"""
return self.RealPosition(ch1=pseudo_pos.delay, ch2=pseudo_pos.delay + pseudo_pos.width)
@real_position_argument
def inverse(self, real_pos):
"""Run an inverse (real -> pseudo) calculation"""
return self.PseudoPosition(delay=real_pos.ch1, width=real_pos.ch2 - real_pos.ch1)
class DDGCustomMixin:
"""
Mixin class for custom DelayGenerator logic within PSIDelayGeneratorBase.
This class provides a parent class for implementation of BL specific logic of the device.
It is also possible to pass implementing certain methods, e.g. finished or on_trigger,
based on the setup and desired operation mode at the beamline.
Args:
parent (object): instance of PSIDelayGeneratorBase
**kwargs: keyword arguments
"""
def __init__(self, *_args, parent: Device = None, **_kwargs) -> None:
self.parent = parent
def initialize_default_parameter(self) -> None:
"""
Method to initialize default parameters for DDG.
Called upon initiating the base class.
It should be used to set the DDG default parameters.
These may include: amplitude, offsets, delays, etc.
"""
def prepare_ddg(self) -> None:
"""
Method to prepare the DDG for the upcoming scan.
Called by the stage method of the base class.
It should be used to set the DDG parameters for the upcoming scan.
"""
def on_trigger(self) -> None:
"""Method executed upon trigger call in parent class"""
def finished(self) -> None:
"""Method to check if DDG is finished with the scan"""
def on_pre_scan(self) -> None:
"""
Method executed upon pre_scan call in parent class.
Covenient to implement time sensitive actions to be executed right before start of the scan.
Example could be to open the shutter by triggering a pulse via pre_scan.
"""
def check_scan_id(self) -> None:
"""Method to check if there is a new scan_id, called by stage."""
def is_ddg_okay(self, raise_on_error=False) -> None:
"""
Method to check if DDG is okay
It checks the status PV of the DDG and tries to clear the error if it is not okay.
It will rerun itself and raise DelayGeneratorNotOkay if DDG is still not okay.
Args:
raise_on_error (bool, optional): raise exception if DDG is not okay. Defaults to False.
"""
status = self.parent.status.read()[self.parent.status.name]["value"]
if status != "STATUS OK" and not raise_on_error:
logger.warning(f"DDG returns {status}, trying to clear ERROR")
self.parent.clear_error()
time.sleep(1)
self.is_ddg_okay(raise_on_error=True)
elif status != "STATUS OK":
raise DelayGeneratorNotOkay(f"DDG failed to start with status: {status}")
class PSIDelayGeneratorBase(Device):
"""
Abstract base class for DelayGenerator DG645
This class implements a thin Ophyd wrapper around the Stanford Research DG645
digital delay generator.
The DG645 generates 8+1 signals: A, B, C, D, E, F, G, H and T0. Front panel outputs
T0, AB, CD, EF and GH are combinations of these signals. Back panel outputs are
directly routed signals. Signals are not independent.
Signal pairs, e.g. AB, CD, EF, GH, are implemented as DelayPair objects. They
have a TTL pulse width, delay and a reference signal to which they are being triggered.
In addition, the io layer allows setting amplitude, offset and polarity for each pair.
Detailed information can be found in the manual:
https://www.thinksrs.com/downloads/pdfs/manuals/DG645m.pdf
Class attributes:
custom_prepare_cls (object): class for custom prepare logic (BL specific)
Args:
prefix (str) : EPICS PV prefix for component (optional)
name (str) : name of the device, as will be reported via read()
kind (str) : member of class 'ophydobj.Kind', defaults to Kind.normal
omitted -> readout ignored for read 'ophydobj.read()'
normal -> readout for read
config -> config parameter for 'ophydobj.read_configuration()'
hinted -> which attribute is readout for read
read_attrs (list) : sequence of attribute names to read
configuration_attrs (list) : sequence of attribute names via config_parameters
parent (object) : instance of the parent device
device_manager (object) : bec device manager
sim_mode (bool) : simulation mode, if True, no device manager is required
**kwargs : keyword arguments
attributes : lazy_wait_for_connection : bool
"""
# Custom_prepare_cls
custom_prepare_cls = DDGCustomMixin
SUB_PROGRESS = "progress"
SUB_VALUE = "value"
_default_sub = SUB_VALUE
USER_ACCESS = ["set_channels", "_set_trigger", "burst_enable", "burst_disable", "reload_config"]
# Assign PVs from DDG645
trigger_burst_readout = Component(
EpicsSignal, "EventStatusLI.PROC", name="trigger_burst_readout"
)
burst_cycle_finished = Component(EpicsSignalRO, "EventStatusMBBID.B3", name="read_burst_state")
delay_finished = Component(EpicsSignalRO, "EventStatusMBBID.B2", name="delay_finished")
status = Component(EpicsSignalRO, "StatusSI", name="status")
clear_error = Component(EpicsSignal, "StatusClearBO", name="clear_error")
# Front Panel
channelT0 = Component(DelayStatic, "T0", name="T0")
channelAB = Component(DelayPair, "", name="AB", channel="AB")
channelCD = Component(DelayPair, "", name="CD", channel="CD")
channelEF = Component(DelayPair, "", name="EF", channel="EF")
channelGH = Component(DelayPair, "", name="GH", channel="GH")
holdoff = Component(
EpicsSignal,
"TriggerHoldoffAI",
write_pv="TriggerHoldoffAO",
name="trigger_holdoff",
kind=Kind.config,
)
inhibit = Component(
EpicsSignal,
"TriggerInhibitMI",
write_pv="TriggerInhibitMO",
name="trigger_inhibit",
kind=Kind.config,
)
source = Component(
EpicsSignal,
"TriggerSourceMI",
write_pv="TriggerSourceMO",
name="trigger_source",
kind=Kind.config,
)
level = Component(
EpicsSignal,
"TriggerLevelAI",
write_pv="TriggerLevelAO",
name="trigger_level",
kind=Kind.config,
)
rate = Component(
EpicsSignal,
"TriggerRateAI",
write_pv="TriggerRateAO",
name="trigger_rate",
kind=Kind.config,
)
trigger_shot = Component(EpicsSignal, "TriggerDelayBO", name="trigger_shot", kind="config")
burstMode = Component(
EpicsSignal, "BurstModeBI", write_pv="BurstModeBO", name="burstmode", kind=Kind.config
)
burstConfig = Component(
EpicsSignal, "BurstConfigBI", write_pv="BurstConfigBO", name="burstconfig", kind=Kind.config
)
burstCount = Component(
EpicsSignal, "BurstCountLI", write_pv="BurstCountLO", name="burstcount", kind=Kind.config
)
burstDelay = Component(
EpicsSignal, "BurstDelayAI", write_pv="BurstDelayAO", name="burstdelay", kind=Kind.config
)
burstPeriod = Component(
EpicsSignal, "BurstPeriodAI", write_pv="BurstPeriodAO", name="burstperiod", kind=Kind.config
)
def __init__(
self,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
device_manager=None,
sim_mode=False,
**kwargs,
):
super().__init__(
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
**kwargs,
)
if device_manager is None and not sim_mode:
raise DeviceInitError(
f"No device manager for device: {name}, and not started sim_mode: {sim_mode}. Add"
" DeviceManager to initialization or init with sim_mode=True"
)
# Init variables
self.sim_mode = sim_mode
self.stopped = False
self.name = name
self.scaninfo = None
self.timeout = 5
self.all_channels = ["channelT0", "channelAB", "channelCD", "channelEF", "channelGH"]
self.all_delay_pairs = ["AB", "CD", "EF", "GH"]
self.wait_for_connection(all_signals=True)
# Init custom prepare class with BL specific logic
self.custom_prepare = self.custom_prepare_cls(parent=self, **kwargs)
if not sim_mode:
self.device_manager = device_manager
else:
self.device_manager = bec_utils.DMMock()
self.connector = self.device_manager.connector
self._update_scaninfo()
self._init()
def _update_scaninfo(self) -> None:
"""
Method to updated scaninfo from BEC.
In sim_mode, scaninfo output is mocked - see bec_scaninfo_mixin.py
"""
self.scaninfo = BecScaninfoMixin(self.device_manager, self.sim_mode)
self.scaninfo.load_scan_metadata()
def _init(self) -> None:
"""Method to initialize custom parameters of the DDG."""
self.custom_prepare.initialize_default_parameter()
self.custom_prepare.is_ddg_okay()
def set_channels(self, signal: str, value: Any, channels: list = None) -> None:
"""
Method to set signals on DelayPair and DelayStatic channels.
Signals can be set on the DelayPair and DelayStatic channels. The method checks
if the signal is available on the channel and sets it. It works for both, DelayPair
and Delay Static although signals are hosted in different layers.
Args:
signal (str) : signal to set (width, delay, amplitude, offset, polarity)
value (Any) : value to set
channels (list, optional) : list of channels to set. Defaults to self.all_channels (T0,AB,CD,EF,GH)
"""
if not channels:
channels = self.all_channels
for chname in channels:
channel = getattr(self, chname, None)
if not channel:
continue
if signal in channel.component_names:
getattr(channel, signal).set(value)
continue
if "io" in channel.component_names and signal in channel.io.component_names:
getattr(channel.io, signal).set(value)
def set_trigger(self, trigger_source: TriggerSource) -> None:
"""Set trigger source on DDG - possible values defined in TriggerSource enum"""
value = int(trigger_source)
self.source.put(value)
def burst_enable(self, count, delay, period, config="all"):
"""Enable the burst mode"""
# Validate inputs
count = int(count)
assert count > 0, "Number of bursts must be positive"
assert delay >= 0, "Burst delay must be larger than 0"
assert period > 0, "Burst period must be positive"
assert config in ["all", "first"], "Supported burst configs are 'all' and 'first'"
self.burstMode.put(1)
self.burstCount.put(count)
self.burstDelay.put(delay)
self.burstPeriod.put(period)
if config == "all":
self.burstConfig.put(0)
elif config == "first":
self.burstConfig.put(1)
def burst_disable(self):
"""Disable burst mode"""
self.burstMode.put(0)
def stage(self) -> list[object]:
"""
Method to stage the device.
Called in preparation for a scan.
Internal Calls:
- scaninfo.load_scan_metadata : load scan metadata
- custom_prepare.prepare_ddg : prepare DDG for measurement
- is_ddg_okay : check if DDG is okay
Returns:
list(object): list of objects that were staged
"""
if self._staged != Staged.no:
return super().stage()
self.stopped = False
self.scaninfo.load_scan_metadata()
self.custom_prepare.prepare_ddg()
self.custom_prepare.is_ddg_okay()
# At the moment needed bc signal might not be reliable, BEC too fast.
# Consider removing this overhead in future!
time.sleep(0.05)
return super().stage()
def trigger(self) -> DeviceStatus:
"""
Method to trigger the acquisition.
Internal Call:
- custom_prepare.on_trigger : execute BL specific action
"""
self.custom_prepare.on_trigger()
return super().trigger()
def pre_scan(self) -> None:
"""
Method pre_scan gets executed directly before the scan
Internal Call:
- custom_prepare.on_pre_scan : execute BL specific action
"""
self.custom_prepare.on_pre_scan()
def unstage(self) -> list[object]:
"""
Method unstage gets called at the end of a scan.
If scan (self.stopped is True) is stopped, returns directly.
Otherwise, checks if the DDG finished acquisition
Internal Calls:
- custom_prepare.check_scan_id : check if scan_id changed or detector stopped
- custom_prepare.finished : check if device finished acquisition (succesfully)
- is_ddg_okay : check if DDG is okay
Returns:
list(object): list of objects that were unstaged
"""
self.custom_prepare.check_scan_id()
if self.stopped is True:
return super().unstage()
self.custom_prepare.finished()
self.custom_prepare.is_ddg_okay()
self.stopped = False
return super().unstage()
def stop(self, *, success=False) -> None:
"""
Method to stop the DDG
#TODO Check if the pulse generation can be interruppted
Internal Call:
- custom_prepare.is_ddg_okay : check if DDG is okay
"""
self.custom_prepare.is_ddg_okay()
super().stop(success=success)
self.stopped = True

View File

@ -0,0 +1,570 @@
"""
Scan stubs are commands that can be used to control devices during a scan. They typically yield device messages that are
consumed by the scan worker and potentially forwarded to the device server.
"""
from __future__ import annotations
import threading
import time
import uuid
from collections.abc import Callable
from typing import Generator, Literal
import numpy as np
from bec_lib import messages
from bec_lib.connector import ConnectorBase
from bec_lib.device import Status
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from .errors import DeviceMessageError, ScanAbortion
logger = bec_logger.logger
class ScanStubs:
"""
Scan stubs are commands that can be used to control devices during a scan. They typically yield device messages that are
consumed by the scan worker and potentially forwarded to the device server.
"""
def __init__(
self,
connector: ConnectorBase,
device_msg_callback: Callable = None,
shutdown_event: threading.Event = None,
) -> None:
self.connector = connector
self.device_msg_metadata = (
device_msg_callback if device_msg_callback is not None else lambda: {}
)
self.shutdown_event = shutdown_event
@staticmethod
def _exclude_nones(input_dict: dict):
for key in list(input_dict.keys()):
if input_dict[key] is None:
input_dict.pop(key)
def _device_msg(self, **kwargs):
""""""
msg = messages.DeviceInstructionMessage(**kwargs)
msg.metadata = {**self.device_msg_metadata(), **msg.metadata}
return msg
def send_rpc_and_wait(self, device: str, func_name: str, *args, **kwargs) -> any:
"""Perform an RPC (remote procedure call) on a device and wait for its return value.
Args:
device (str): Name of the device
func_name (str): Function name. The function name will be appended to the device.
args (tuple): Arguments to pass on to the RPC function
kwargs (dict): Keyword arguments to pass on to the RPC function
Raises:
ScanAbortion: Raised if the RPC's success is False
Returns:
any: Return value of the executed rpc function
Examples:
>>> send_rpc_and_wait("samx", "controller.my_custom_function")
"""
rpc_id = str(uuid.uuid4())
parameter = {
"device": device,
"func": func_name,
"rpc_id": rpc_id,
"args": args,
"kwargs": kwargs,
}
yield from self.rpc(
device=device, parameter=parameter, metadata={"response": True, "RID": rpc_id}
)
return self._get_from_rpc(rpc_id)
def _get_from_rpc(self, rpc_id) -> any:
"""
Get the return value from an RPC call.
Args:
rpc_id (str): RPC ID
Raises:
ScanAbortion: Raised if the RPC's success flag is False
Returns:
any: Return value of the RPC call
"""
while not self.shutdown_event.is_set():
msg = self.connector.get(MessageEndpoints.device_rpc(rpc_id))
if msg:
break
time.sleep(0.001)
if self.shutdown_event.is_set():
raise ScanAbortion("The scan was aborted.")
if not msg.content["success"]:
error = msg.content["out"]
if isinstance(error, dict) and {"error", "msg", "traceback"}.issubset(
set(error.keys())
):
error_msg = f"During an RPC, the following error occured:\n{error['error']}: {error['msg']}.\nTraceback: {error['traceback']}\n The scan will be aborted."
else:
error_msg = "During an RPC, an error occured"
raise ScanAbortion(error_msg)
logger.debug(msg.content.get("out"))
return_val = msg.content.get("return_val")
if not isinstance(return_val, dict):
return return_val
if return_val.get("type") == "status" and return_val.get("RID"):
return Status(self.connector, return_val.get("RID"))
return return_val
def set_with_response(
self, *, device: str, value: float, request_id: str = None, metadata=None
) -> Generator[None, None, None]:
"""Set a device to a specific value and return the request ID. Use :func:`request_is_completed` to later check if the request is completed.
Args:
device (str): Device name.
value (float): Target value.
Returns:
Generator[None, None, None]: Generator that yields a device message.
see also: :func:`request_is_completed`
"""
request_id = str(uuid.uuid4()) if request_id is None else request_id
metadata = metadata if metadata is not None else {}
metadata.update({"response": True, "RID": request_id})
yield from self.set(device=device, value=value, wait_group="set", metadata=metadata)
return request_id
def request_is_completed(self, RID: str) -> bool:
"""Check if a request that was initiated with :func:`set_with_response` is completed.
Args:
RID (str): Request ID.
Returns:
bool: True if the request is completed, False otherwise.
"""
msg = self.connector.lrange(MessageEndpoints.device_req_status_container(RID), 0, -1)
if not msg:
return False
return True
def set_and_wait(
self, *, device: list[str], positions: list | np.ndarray
) -> Generator[None, None, None]:
"""Set devices to a specific position and wait completion.
Args:
device (list[str]): List of device names.
positions (list | np.ndarray): Target position.
Returns:
Generator[None, None, None]: Generator that yields a device message.
see also: :func:`set`, :func:`wait`, :func:`set_with_response`
"""
if not isinstance(positions, list) and not isinstance(positions, np.ndarray):
positions = [positions]
if len(positions) == 0:
return
for ind, val in enumerate(device):
yield from self.set(device=val, value=positions[ind], wait_group="scan_motor")
yield from self.wait(device=device, wait_type="move", wait_group="scan_motor")
def read_and_wait(
self, *, wait_group: str, device: list = None, group: str = None, point_id: int = None
) -> Generator[None, None, None]:
"""Trigger a reading and wait for completion.
Args:
wait_group (str): wait group
device (list, optional): List of device names. Can be specified instead of group. Defaults to None.
group (str, optional): Group name of devices. Can be specified instead of device. Defaults to None.
point_id (int, optional): _description_. Defaults to None.
Returns:
Generator[None, None, None]: Generator that yields a device message.
"""
self._check_device_and_groups(device, group)
yield from self.read(device=device, group=group, wait_group=wait_group, point_id=point_id)
yield from self.wait(device=device, wait_type="read", group=group, wait_group=wait_group)
def open_scan(
self,
*,
scan_motors: list,
readout_priority: dict,
num_pos: int,
scan_name: str,
scan_type: Literal["step", "fly"],
positions=None,
metadata=None,
) -> Generator[None, None, None]:
"""Open a new scan.
Args:
scan_motors (list): List of scan motors.
readout_priority (dict): Modification of the readout priority.
num_pos (int): Number of positions within the scope of this scan.
positions (list): List of positions for this scan.
scan_name (str): Scan name.
scan_type (str): Scan type (e.g. 'step' or 'fly')
Returns:
Generator[None, None, None]: Generator that yields a device message.
"""
yield self._device_msg(
device=None,
action="open_scan",
parameter={
"scan_motors": scan_motors,
"readout_priority": readout_priority,
"num_points": num_pos,
"positions": positions,
"scan_name": scan_name,
"scan_type": scan_type,
},
metadata=metadata,
)
def kickoff(
self, *, device: str, parameter: dict = None, wait_group="kickoff", metadata=None
) -> Generator[None, None, None]:
"""Kickoff a fly scan device.
Args:
device (str): Device name of flyer.
parameter (dict, optional): Additional parameters that should be forwarded to the device. Defaults to {}.
Returns:
Generator[None, None, None]: Generator that yields a device message.
"""
parameter = parameter if parameter is not None else {}
parameter = {"configure": parameter, "wait_group": wait_group}
yield self._device_msg(
device=device, action="kickoff", parameter=parameter, metadata=metadata
)
def complete(self, *, device: str, metadata=None) -> Generator[None, None, None]:
"""Complete a fly scan device.
Args:
device (str): Device name of flyer.
Returns:
Generator[None, None, None]: Generator that yields a device message.
"""
yield self._device_msg(device=device, action="complete", parameter={}, metadata=metadata)
def get_req_status(self, device: str, RID: str, DIID: int) -> int:
"""Check if a device request status matches the given RID and DIID
Args:
device (str): device under inspection
RID (str): request ID
DIID (int): device instruction ID
Returns:
int: 1 if the request status matches the RID and DIID, 0 otherwise
"""
msg = self.connector.get(MessageEndpoints.device_req_status(device))
if not msg:
return 0
matching_RID = msg.metadata.get("RID") == RID
matching_DIID = msg.metadata.get("DIID") == DIID
if matching_DIID and matching_RID:
return 1
return 0
def get_device_progress(self, device: str, RID: str) -> float | None:
"""Get reported device progress
Args:
device (str): Name of the device
RID (str): request ID
Returns:
float: reported progress value
"""
msg = self.connector.get(MessageEndpoints.device_progress(device))
if not msg:
return None
matching_RID = msg.metadata.get("RID") == RID
if not matching_RID:
return None
if not isinstance(msg, messages.ProgressMessage):
raise DeviceMessageError(
f"Expected to receive a Progressmessage for device {device} but instead received {msg}."
)
return msg.content["value"]
def close_scan(self) -> Generator[None, None, None]:
"""
Close the scan.
Returns:
Generator[None, None, None]: Generator that yields a device message.
see also: :func:`open_scan`
"""
yield self._device_msg(device=None, action="close_scan", parameter={})
def stage(self) -> Generator[None, None, None]:
"""
Stage all devices
Returns:
Generator[None, None, None]: Generator that yields a device message.
see also: :func:`unstage`
"""
yield self._device_msg(device=None, action="stage", parameter={})
def unstage(self) -> Generator[None, None, None]:
"""
Unstage all devices
Returns:
Generator[None, None, None]: Generator that yields a device message.
see also: :func:`stage`
"""
yield self._device_msg(device=None, action="unstage", parameter={})
def pre_scan(self) -> Generator[None, None, None]:
"""
Trigger pre-scan actions on all devices. Typically, pre-scan actions are called directly before the scan core starts and
are used to perform time-critical actions.
The event will be sent to all devices that have a pre_scan method implemented.
Returns:
Generator[None, None, None]: Generator that yields a device message.
"""
yield self._device_msg(device=None, action="pre_scan", parameter={})
def baseline_reading(self) -> Generator[None, None, None]:
"""
Run the baseline readings. This will readout all devices that are marked with the readout_priority "baseline".
Returns:
Generator[None, None, None]: Generator that yields a device message.
"""
yield self._device_msg(
device=None,
action="baseline_reading",
parameter={},
metadata={"readout_priority": "baseline"},
)
def wait(
self,
*,
wait_type: Literal["move", "read", "trigger"],
device: list[str] | str | None = None,
group: Literal["scan_motor", "primary", None] = None,
wait_group: str = None,
wait_time: float = None,
):
"""Wait for an event.
Args:
wait_type (Literal["move", "read", "trigger"]): Type of wait event. Can be "move", "read" or "trigger".
device (list[str] | str, optional): List of device names. Defaults to None.
group (Literal["scan_motor", "primary", None]): Device group that can be used instead of device. Defaults to None.
wait_group (str, optional): Wait group for this event. Defaults to None.
wait_time (float, optional): Wait time (for wait_type="trigger"). Defaults to None.
Returns:
Generator[None, None, None]: Generator that yields a device message.
Example:
>>> yield from self.stubs.wait(wait_type="move", group="scan_motor", wait_group="scan_motor")
>>> yield from self.stubs.wait(wait_type="read", group="scan_motor", wait_group="my_readout_motors")
"""
self._check_device_and_groups(device, group)
parameter = {"type": wait_type, "time": wait_time, "group": group, "wait_group": wait_group}
self._exclude_nones(parameter)
yield self._device_msg(device=device, action="wait", parameter=parameter)
def read(
self,
*,
wait_group: str,
device: list[str] | str | None = None,
point_id: int | None = None,
group: Literal["scan_motor", "primary", None] = None,
) -> Generator[None, None, None]:
"""
Trigger a reading on a device or device group.
Args:
wait_group (str): Wait group for this event. The specified wait group can later be used
to wait for the completion of this event. Please note that the wait group has to be
unique. within the scope of the read / wait event.
device (list, optional): Device name. Can be used instead of group. Defaults to None.
point_id (int, optional): point_id to assign this reading to point within the scan. Defaults to None.
group (Literal["scan_motor", "primary", None], optional): Device group. Can be used instead of device. Defaults to None.
Returns:
Generator[None, None, None]: Generator that yields a device message.
Example:
>>> yield from self.stubs.read(wait_group="readout_primary", group="primary", point_id=self.point_id)
>>> yield from self.stubs.read(wait_group="sample_stage", device="samx", point_id=self.point_id)
"""
self._check_device_and_groups(device, group)
parameter = {"group": group, "wait_group": wait_group}
metadata = {"point_id": point_id}
self._exclude_nones(parameter)
self._exclude_nones(metadata)
yield self._device_msg(device=device, action="read", parameter=parameter, metadata=metadata)
def publish_data_as_read(
self, *, device: str, data: dict, point_id: int
) -> Generator[None, None, None]:
"""
Publish the given data as a read event and assign it to the given point_id.
This method can be used to customize the assignment of data to a specific point within a scan.
Args:
device (str): Device name.
data (dict): Data that should be published.
point_id (int): point_id that should be attached to this data.
Returns:
Generator[None, None, None]: Generator that yields a device message.
"""
metadata = {"point_id": point_id}
yield self._device_msg(
device=device,
action="publish_data_as_read",
parameter={"data": data},
metadata=metadata,
)
def trigger(self, *, group: str, point_id: int) -> Generator[None, None, None]:
"""Trigger a device group. Note that the trigger event is not blocking and does not wait for the completion of the trigger event.
To wait for the completion of the trigger event, use the :func:`wait` command, specifying the wait_type as "trigger".
Args:
group (str): Device group that should receive the trigger.
point_id (int): point_id that should be attached to this trigger event.
Returns:
Generator[None, None, None]: Generator that yields a device message.
see also: :func:`wait`
"""
yield self._device_msg(
device=None,
action="trigger",
parameter={"group": group},
metadata={"point_id": point_id},
)
def set(self, *, device: str, value: float, wait_group: str, metadata=None):
"""Set the device to a specific value. This is similar to the direct set command
in the command-line interface. The wait_group can be used to wait for the completion of this event.
For a set operation, this simply means that the device has acknowledged the set command and does not
necessarily mean that the device has reached the target value.
Args:
device (str): Device name
value (float): Target value.
wait_group (str): wait group for this event.
Returns:
Generator[None, None, None]: Generator that yields a device message.
.. warning::
Do not use this command to kickoff a long running operation. Use :func:`kickoff` instead or, if the
device does not support the kickoff command, use :func:`set_with_response` instead.
see also: :func:`wait`, :func:`set_and_wait`, :func:`set_with_response`
"""
yield self._device_msg(
device=device,
action="set",
parameter={"value": value, "wait_group": wait_group},
metadata=metadata,
)
def open_scan_def(self) -> Generator[None, None, None]:
"""
Open a new scan definition
Returns:
Generator[None, None, None]: Generator that yields a device message.
"""
yield self._device_msg(device=None, action="open_scan_def", parameter={})
def close_scan_def(self) -> Generator[None, None, None]:
"""
Close a scan definition
Returns:
Generator[None, None, None]: Generator that yields a device message.
"""
yield self._device_msg(device=None, action="close_scan_def", parameter={})
def close_scan_group(self) -> Generator[None, None, None]:
"""
Close a scan group
Returns:
Generator[None, None, None]: Generator that yields a device message.
"""
yield self._device_msg(device=None, action="close_scan_group", parameter={})
def rpc(self, *, device: str, parameter: dict, metadata=None) -> Generator[None, None, None]:
"""Perfrom an RPC (remote procedure call) on a device.
Args:
device (str): Device name.
parameter (dict): parameters used for this rpc instructions.
Returns:
Generator[None, None, None]: Generator that yields a device message.
"""
yield self._device_msg(device=device, action="rpc", parameter=parameter, metadata=metadata)
def scan_report_instruction(self, instructions: dict) -> Generator[None, None, None]:
"""Scan report instructions
Args:
instructions (dict): Dict containing the scan report instructions
Returns:
Generator[None, None, None]: Generator that yields a device message.
"""
yield self._device_msg(
device=None, action="scan_report_instruction", parameter=instructions
)
def _check_device_and_groups(self, device, group) -> None:
if device and group:
raise DeviceMessageError("Device and device group was specified. Pick one.")
if device is None and group is None:
raise DeviceMessageError("Either devices or device groups have to be specified.")

View File

@ -0,0 +1,513 @@
"""
This module contains the Scans class and related classes for defining and running scans in BEC
from the client side.
"""
from __future__ import annotations
import builtins
import uuid
from collections.abc import Callable
from contextlib import ContextDecorator
from copy import deepcopy
from typing import TYPE_CHECKING, Dict, Literal
from toolz import partition
from typeguard import typechecked
from bec_lib import messages
from bec_lib.bec_errors import ScanAbortion
from bec_lib.client import SystemConfig
from bec_lib.device import DeviceBase
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from bec_lib.scan_report import ScanReport
from bec_lib.signature_serializer import dict_to_signature
from bec_lib.utils import scan_to_csv
if TYPE_CHECKING:
from bec_lib.client import BECClient
from bec_lib.connector import ConsumerConnector
logger = bec_logger.logger
class ScanObject:
"""ScanObject is a class for scans"""
def __init__(self, scan_name: str, scan_info: dict, client: BECClient = None) -> None:
self.scan_name = scan_name
self.scan_info = scan_info
self.client = client
# run must be an anonymous function to allow for multiple doc strings
# pylint: disable=unnecessary-lambda
self.run = lambda *args, **kwargs: self._run(*args, **kwargs)
def _run(
self,
*args,
callback: Callable = None,
async_callback: Callable = None,
hide_report: bool = False,
metadata: dict = None,
monitored: list[str | DeviceBase] = None,
file_suffix: str = None,
file_directory: str = None,
**kwargs,
) -> ScanReport:
"""
Run the request with the given arguments.
Args:
*args: Arguments for the scan
callback: Callback function
async_callback: Asynchronous callback function
hide_report: Hide the report
metadata: Metadata dictionary
monitored: List of monitored devices
**kwargs: Keyword arguments
Returns:
ScanReport
"""
if self.client.alarm_handler.alarms_stack:
logger.info("The alarm stack is not empty but will be cleared now.")
self.client.clear_all_alarms()
scans = self.client.scans
# pylint: disable=protected-access
hide_report = hide_report or scans._hide_report
user_metadata = deepcopy(self.client.metadata)
sys_config = self.client.system_config.model_copy(deep=True)
if file_suffix:
sys_config.file_suffix = file_suffix
if file_directory:
sys_config.file_directory = file_directory
if "sample_name" not in user_metadata:
var = self.client.get_globa file_suffix: str = None,
l_var("sample_name")
if var is not None:
user_metadata["sample_name"] = var
if metadata is not None:
user_metadata.update(metadata)
if monitored is not None:
if not isinstance(monitored, list):
monitored = [monitored]
for mon_device in monitored:
if isinstance(mon_device, str):
mon_device = self.client.device_manager.devices.get(mon_device)
if not mon_device:
raise RuntimeError(
f"Specified monitored device {mon_device} does not exist in the current device configuration."
)
kwargs["monitored"] = monitored
sys_config = sys_config.model_dump()
# pylint: disable=protected-access
if scans._scan_group:
sys_config["queue_group"] = scans._scan_group
if scans._scan_def_id:
sys_config["scan_def_id"] = scans._scan_def_id
if scans._dataset_id_on_hold:
sys_config["dataset_id_on_hold"] = scans._dataset_id_on_hold
kwargs["user_metadata"] = user_metadata
kwargs["system_config"] = sys_config
request = Scans.prepare_scan_request(self.scan_name, self.scan_info, *args, **kwargs)
request_id = str(uuid.uuid4())
# pylint: disable=unsupported-assignment-operation
request.metadata["RID"] = request_id
self._send_scan_request(request)
report = ScanReport.from_request(request, client=self.client)
report.request.callbacks.register_many("scan_segment", callback, sync=True)
report.request.callbacks.register_many("scan_segment", async_callback, sync=False)
if scans._scan_export and scans._scan_export.scans is not None:
scans._scan_export.scans.append(report)
if not hide_report and self.client.live_updates:
self.client.live_updates.process_request(request, callback)
self.client.callbacks.poll()
return report
def _start_register(self, request: messages.ScanQueueMessage) -> ConsumerConnector:
"""Start a register for the given request"""
register = self.client.device_manager.connector.register(
[
MessageEndpoints.device_readback(dev)
for dev in request.content["parameter"]["args"].keys()
],
threaded=False,
cb=(lambda msg: msg),
)
return register
def _send_scan_request(self, request: messages.ScanQueueMessage) -> None:
"""Send a scan request to the scan server"""
self.client.device_manager.connector.send(MessageEndpoints.scan_queue_request(), request)
class Scans:
"""Scans is a class for available scans in BEC"""
def __init__(self, parent):
self.parent = parent
self._available_scans = {}
self._import_scans()
self._scan_group = None
self._scan_def_id = None
self._scan_group_ctx = ScanGroup(parent=self)
self._scan_def_ctx = ScanDef(parent=self)
self._hide_report = None
self._hide_report_ctx = HideReport(parent=self)
self._dataset_id_on_hold = None
self._dataset_id_on_hold_ctx = DatasetIdOnHold(parent=self)
self._scan_export = None
def _import_scans(self):
"""Import scans from the scan server"""
available_scans = self.parent.connector.get(MessageEndpoints.available_scans())
if available_scans is None:
logger.warning("No scans available. Are redis and the BEC server running?")
return
for scan_name, scan_info in available_scans.resource.items():
self._available_scans[scan_name] = ScanObject(scan_name, scan_info, client=self.parent)
setattr(self, scan_name, self._available_scans[scan_name].run)
setattr(getattr(self, scan_name), "__doc__", scan_info.get("doc"))
setattr(
getattr(self, scan_name),
"__signature__",
dict_to_signature(scan_info.get("signature")),
)
@staticmethod
def get_arg_type(in_type: str):
"""translate type string into python type"""
# pylint: disable=too-many-return-statements
if in_type == "float":
return (float, int)
if in_type == "int":
return int
if in_type == "list":
return list
if in_type == "boolean":
return bool
if in_type == "str":
return str
if in_type == "dict":
return dict
if in_type == "device":
return DeviceBase
raise TypeError(f"Unknown type {in_type}")
@staticmethod
def prepare_scan_request(
scan_name: str, scan_info: dict, *args, **kwargs
) -> messages.ScanQueueMessage:
"""Prepare scan request message with given scan arguments
Args:
scan_name (str): scan name (matching a scan name on the scan server)
scan_info (dict): dictionary describing the scan (e.g. doc string, required kwargs etc.)
Raises:
TypeError: Raised if not all required keyword arguments have been specified.
TypeError: Raised if the number of args do fit into the required bundling pattern.
TypeError: Raised if an argument is not of the required type as specified in scan_info.
Returns:
messages.ScanQueueMessage: scan request message
"""
arg_input = list(scan_info.get("arg_input", {}).values())
arg_bundle_size = scan_info.get("arg_bundle_size", {})
bundle_size = arg_bundle_size.get("bundle")
if len(arg_input) > 0:
if len(args) % len(arg_input) != 0:
raise TypeError(
f"{scan_info.get('doc')}\n {scan_name} takes multiples of"
f" {len(arg_input)} arguments ({len(args)} given)."
)
if not all(req_kwarg in kwargs for req_kwarg in scan_info.get("required_kwargs")):
raise TypeError(
f"{scan_info.get('doc')}\n Not all required keyword arguments have been"
f" specified. The required arguments are: {scan_info.get('required_kwargs')}"
)
# check that all specified devices in args are different objects
for arg in args:
if not isinstance(arg, DeviceBase):
continue
if args.count(arg) > 1:
raise TypeError(
f"{scan_info.get('doc')}\n All specified devices must be different"
f" objects."
)
# check that all arguments are of the correct type
for ii, arg in enumerate(args):
if not isinstance(arg, Scans.get_arg_type(arg_input[ii % len(arg_input)])):
raise TypeError(
f"{scan_info.get('doc')}\n Argument {ii} must be of type"
f" {arg_input[ii%len(arg_input)]}, not {type(arg).__name__}."
)
metadata = {}
metadata.update(kwargs["system_config"])
metadata["user_metadata"] = kwargs.pop("user_metadata", {})
params = {"args": Scans._parameter_bundler(args, bundle_size), "kwargs": kwargs}
# check the number of arg bundles against the number of required bundles
if bundle_size:
num_bundles = len(params["args"])
min_bundles = arg_bundle_size.get("min")
max_bundles = arg_bundle_size.get("max")
if min_bundles and num_bundles < min_bundles:
raise TypeError(
f"{scan_info.get('doc')}\n {scan_name} requires at least {min_bundles} bundles"
f" of arguments ({num_bundles} given)."
)
if max_bundles and num_bundles > max_bundles:
raise TypeError(
f"{scan_info.get('doc')}\n {scan_name} requires at most {max_bundles} bundles"
f" of arguments ({num_bundles} given)."
)
return messages.ScanQueueMessage(
scan_type=scan_name, parameter=params, queue="primary", metadata=metadata
)
@staticmethod
def _parameter_bundler(args, bundle_size):
"""
Args:
args:
bundle_size: number of parameters per bundle
Returns:
"""
if not bundle_size:
return tuple(cmd.name if hasattr(cmd, "name") else cmd for cmd in args)
params = {}
for cmds in partition(bundle_size, args):
cmds_serialized = [cmd.name if hasattr(cmd, "name") else cmd for cmd in cmds]
params[cmds_serialized[0]] = cmds_serialized[1:]
return params
@property
def scan_group(self):
"""Context manager / decorator for defining scan groups"""
return self._scan_group_ctx
@property
def scan_def(self):
"""Context manager / decorator for defining new scans"""
return self._scan_def_ctx
@property
def hide_report(self):
"""Context manager / decorator for hiding the report"""
return self._hide_report_ctx
@property
def dataset_id_on_hold(self):
"""Context manager / decorator for setting the dataset id on hold"""
return self._dataset_id_on_hold_ctx
def scan_export(self, output_file: str):
"""Context manager / decorator for exporting scans"""
return ScanExport(output_file)
class ScanGroup(ContextDecorator):
"""ScanGroup is a ContextDecorator for defining a scan group"""
def __init__(self, parent: Scans = None) -> None:
super().__init__()
self.parent = parent
def __enter__(self):
group_id = str(uuid.uuid4())
self.parent._scan_group = group_id
return self
def __exit__(self, *exc):
self.parent.close_scan_group()
self.parent._scan_group = None
class ScanDef(ContextDecorator):
"""ScanDef is a ContextDecorator for defining a new scan"""
def __init__(self, parent: Scans = None) -> None:
super().__init__()
self.parent = parent
def __enter__(self):
if self.parent._scan_def_id is not None:
raise ScanAbortion("Nested scan definitions currently not supported.")
scan_def_id = str(uuid.uuid4())
self.parent._scan_def_id = scan_def_id
self.parent.open_scan_def()
return self
def __exit__(self, *exc):
if exc[0] is None:
self.parent.close_scan_def()
self.parent._scan_def_id = None
class HideReport(ContextDecorator):
"""HideReport is a ContextDecorator for hiding the report"""
def __init__(self, parent: Scans = None) -> None:
super().__init__()
self.parent = parent
def __enter__(self):
if self.parent._hide_report is None:
self.parent._hide_report = True
return self
def __exit__(self, *exc):
self.parent._hide_report = None
class DatasetIdOnHold(ContextDecorator):
"""DatasetIdOnHold is a ContextDecorator for setting the dataset id on hold"""
def __init__(self, parent: Scans = None) -> None:
super().__init__()
self.parent = parent
self._call_count = 0
def __enter__(self):
self._call_count += 1
if self.parent._dataset_id_on_hold is None:
self.parent._dataset_id_on_hold = True
return self
def __exit__(self, *exc):
self._call_count -= 1
if self._call_count:
return
self.parent._dataset_id_on_hold = None
queue = self.parent.parent.queue
queue.next_dataset_number += 1
class FileWriter:
@typechecked
def __init__(self, file_suffix: str = None, file_directory: str = None) -> None:
"""Context manager for updating metadata
Args:
fw_config (dict): Dictionary with metadata for the filewriter, can only have keys "file_suffix" and "file_directory"
"""
self.client = self._get_client()
self.system_config = self.client.system_config
self._orig_system_config = None
self._orig_metadata = None
self.file_suffix = file_suffix
self.file_directory = file_directory
def _get_client(self):
"""Get BEC client"""
return builtins.__dict__["bec"]
def __enter__(self):
"""Enter the context manager"""
self._orig_metadata = deepcopy(self.client.metadata)
self._orig_system_config = self.system_config.model_copy(deep=True)
self.system_config.file_suffix = self.file_suffix
self.system_config.file_directory = self.file_directory
return self
def __exit__(self, *exc):
"""Exit the context manager"""
self.client.metadata = self._orig_metadata
self.system_config.file_suffix = self._orig_system_config.file_suffix
self.system_config.file_directory = self._orig_system_config.file_directory
class Metadata:
@typechecked
def __init__(self, metadata: dict) -> None:
"""Context manager for updating metadata
Args:
metadata (dict): Metadata dictionary
"""
self.client = self._get_client()
self._metadata = metadata
self._orig_metadata = None
def _get_client(self):
"""Get BEC client"""
return builtins.__dict__["bec"]
def __enter__(self):
"""Enter the context manager"""
self._orig_metadata = deepcopy(self.client.metadata)
self.client.metadata.update(self._metadata)
return self
def __exit__(self, *exc):
"""Exit the context manager"""
self.client.metadata = self._orig_metadata
class ScanExport:
def __init__(self, output_file: str) -> None:
"""Context manager for exporting scans
Args:
output_file (str): Output file name
"""
self.output_file = output_file
self.client = None
self.scans = None
def _check_abort_on_ctrl_c(self):
"""Check if scan should be aborted on Ctrl-C"""
# pylint: disable=protected-access
if not self.client._service_config.abort_on_ctrl_c:
raise RuntimeError(
"ScanExport context manager can only be used if abort_on_ctrl_c is set to True"
)
def _get_client(self):
return builtins.__dict__["bec"]
def __enter__(self):
self.scans = []
self.client = self._get_client()
self.client.scans._scan_export = self
self._check_abort_on_ctrl_c()
return self
def _export_to_csv(self):
scan_to_csv(self.scans, self.output_file)
def __exit__(self, *exc):
try:
for scan in self.scans:
scan.wait()
finally:
try:
self._export_to_csv()
self.scans = None
except Exception as exc:
logger.warning(f"Could not export scans to csv file, due to exception {exc}")

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,120 @@
from ophyd import Device, EpicsMotor, EpicsSignal, EpicsSignalRO
from ophyd import Component as Cpt
from phoenix_bec.local_scripts.Examples.my_ophyd import Device,EpicsMotor, EpicsSignal, EpicsSignalRO
from phoenix_bec.local_scripts.Examples.my_ophyd import Component as Cpt
############################################
#
# KEEP my_ophyd zipped to avoid chaos local version does not run
# so this is rather useless
#
##########################################
#option I via direct acces to classes
def print_dic(clname,cl):
print('')
print('-------- ',clname)
for ii in cl.__dict__:
if '_' not in ii:
try:
print(ii,' ---- ',cl.__getattribute__(ii))
except:
print(ii)
ScanX = EpicsMotor(name='ScanX',prefix='X07MB-ES-MA1:ScanX')
ScanY = EpicsMotor(name='ScanY',prefix='X07MB-ES-MA1:ScanY')
DIODE = EpicsSignal(name='SI',read_pv='X07MB-OP2-SAI_07:MEAN')
SMPL = EpicsSignal(name='SMPL',read_pv='X07MB-OP2:SMPL')
CYCLES = EpicsSignal(name='SMPL',read_pv='X07MB-OP2:TOTAL-CYCLES',write_pv='X07MB-OP2:TOTAL-CYCLES')
#prefix='XXXX:'
y_cpt = Cpt(EpicsMotor, 'ScanX')
# Option 2 using component
device_ins=Device('X07MB-ES-MA1:',name=('device_name'))
print(' initialzation of device_in=Device(X07MB-ES-MA1:,name=(device_name)')
print('device_ins.__init__')
print(device_ins.__init__)
print_dic('class Device',Device)
print_dic('instance of device device_ins',device_ins)
print(' ')
print('DEFINE class StageXY... prefix variable not defined ')
EpicsMotor, 'ScanY'
"""
class MyCpt(typing.Generic[K]):
def __init__(
self,
cls: Type[K],
suffix: Optional[str] = None,
*,
lazy: Optional[bool] = None,
trigger_value: Optional[Any] = None,
add_prefix: Optional[Sequence[str]] = None,
doc: Optional[str] = None,
kind: Union[str, Kind] = Kind.normal,
**kwargs,
):
self.attr = None # attr is set later by the device when known
self.cls = cls
self.kwargs = kwargs
#self.lazy = lazy if lazy is not None else self.lazy_default
self.suffix = suffix
self.doc = doc
self.trigger_value = trigger_value # TODO discuss
self.kind = Kind[kind.lower()] if isinstance(kind, str) else Kind(kind)
if add_prefix is None:
add_prefix = ("suffix", "write_pv")
self.add_prefix = tuple(add_prefix)
self._subscriptions = collections.defaultdict(list)
print(' ')
"""
in Device we have this class method
Class device(..):
....
@classmethod
def _initialize_device(cls):
....
class StageXY(Device):
# Here the whole namespace and finctionality
# of Device(Blueskyinterface,Pphydobject) is inherited
# into class StageXY
x = Cpt(EpicsMotor, 'ScanX')
y = Cpt(EpicsMotor, 'ScanY')
# end class
print()
print('init xy_stage, use input parameter from Device and prefix is defined in call ')
xy = StageXY('X07MB-ES-MA1:', name='xy_name')
print_dic('class StageXY',StageXY)
print_dic('instance of StageXY',xy)
#print('xy.x.prefix')
#print(xy.x.prefix)
xy.__dict__

View File

@ -0,0 +1,9 @@
# purpose of directory
This diretory is for scripts, test etc. which are not loaded into the bec-server.
For now we keep it in the phoenix_bec structure, but for operation, such files should be located out side of the
bec_phoenix plugin.
TO avoid poading of these files, there should be no files called __init__.py anywhere in the directory tree