From 20aaa069de8666b59a68d8849bf01054f6c88fe3 Mon Sep 17 00:00:00 2001 From: gac-x07mb Date: Wed, 21 Aug 2024 17:52:24 +0200 Subject: [PATCH] next some base classed copied and commented to local scripts --- phoenix_bec/devices/Phoenix_trigger.py | 243 -- phoenix_bec/devices/phoenix_trigger.py | 182 ++ .../BASE_CLASS_delay_GENERATOR.TXT | 510 ++++ .../Base_Classes/BASE_CLASS_scan_stubs.txt | 570 +++++ .../Base_Classes/BASE_CLASS_scans.txt | 513 ++++ .../Base_Classes/BAse_CLASS_ScanBase.txt | 2061 +++++++++++++++++ .../DefiningEpics_Channels_details.py | 120 + phoenix_bec/local_scripts/README.md | 9 + 8 files changed, 3965 insertions(+), 243 deletions(-) delete mode 100644 phoenix_bec/devices/Phoenix_trigger.py create mode 100644 phoenix_bec/devices/phoenix_trigger.py create mode 100644 phoenix_bec/local_scripts/Documentation/Base_Classes/BASE_CLASS_delay_GENERATOR.TXT create mode 100644 phoenix_bec/local_scripts/Documentation/Base_Classes/BASE_CLASS_scan_stubs.txt create mode 100644 phoenix_bec/local_scripts/Documentation/Base_Classes/BASE_CLASS_scans.txt create mode 100644 phoenix_bec/local_scripts/Documentation/Base_Classes/BAse_CLASS_ScanBase.txt create mode 100644 phoenix_bec/local_scripts/Examples/Learn_about_ophyd/DefiningEpics_Channels_details.py create mode 100644 phoenix_bec/local_scripts/README.md diff --git a/phoenix_bec/devices/Phoenix_trigger.py b/phoenix_bec/devices/Phoenix_trigger.py deleted file mode 100644 index a948ee5..0000000 --- a/phoenix_bec/devices/Phoenix_trigger.py +++ /dev/null @@ -1,243 +0,0 @@ -from ophyd import ( - ADComponent as ADCpt, - Device, - DeviceStatus, -) - -from ophyd import Component as Cpt -from ophyd import Device, EpicsSignal, EpicsSignalRO - -from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase, CustomDetectorMixin - -from bec_lib import bec_logger, messages -from bec_lib.endpoints import MessageEndpoints - -logger = bec_logger.logger - -DETECTOR_TIMEOUT = 5 - -class PhoenixTriggerError(Exception): - """Base class for exceptions in this module.""" - - -class PhoenixTriggerTimeoutError(XMAPError): - """Raised when the PhoenixTrigger does not respond in time.""" - - -class PhoenixTriggerDetectorState(enum.IntEnum): - """Detector states for XMAP detector""" - - DONE = 0 - ACQUIRING = 1 - - - - -class PhoenixTriggerSetup(CustomDetectorMixin): - """ - This defines the trigger setup. - - - """ - - def __init__(self, *args, parent:Device = None, **kwargs): - super().__init__(*args, parent=parent, **kwargs) - self._counter = 0 - - def on_stage(self): - exposure_time = self.parent.scaninfo.exp_time - num_points = self.parent.scaninfo.num_points - - # camera acquisition parameters - self.parent.cam.array_counter.put(0) - if self.parent.scaninfo.scan_type == 'step': - self.parent.cam.acquire_time.put(exposure_time) - self.parent.cam.num_images.put(1) - self.parent.cam.image_mode.put(0) # Single - self.parent.cam.trigger_mode.put(0) # auto - else: - # In flyscan, the exp_time is the time between two triggers, - # which minus 15% is used as the acquisition time. - self.parent.cam.acquire_time.put(exposure_time * 0.85) - self.parent.cam.num_images.put(num_points) - self.parent.cam.image_mode.put(1) # Multiple - self.parent.cam.trigger_mode.put(1) # trigger - self.parent.cam.acquire.put(1, wait=False) # arm - - # file writer - self.parent.hdf.lazy_open.put(1) - self.parent.hdf.num_capture.put(num_points) - self.parent.hdf.file_write_mode.put(2) # Stream - self.parent.hdf.capture.put(1, wait=False) - self.parent.hdf.enable.put(1) # enable plugin - - # roi statistics to collect signal and background in a timeseries - self.parent.roistat.enable.put(1) - self.parent.roistat.ts_num_points.put(num_points) - self.parent.roistat.ts_control.put(0, wait=False) # Erase/Start - - logger.success('XXXX stage XXXX') - - def on_trigger(self): - self.parent.cam.acquire.put(1, wait=False) - logger.success('XXXX trigger XXXX') - - return self.wait_with_status( - [(self.parent.cam.acquire.get, 0)], - self.parent.scaninfo.exp_time + DETECTOR_TIMEOUT, - all_signals=True - ) - - def on_complete(self): - status = DeviceStatus(self.parent) - - if self.parent.scaninfo.scan_type == 'step': - timeout = DETECTOR_TIMEOUT - else: - timeout = self.parent.scaninfo.exp_time * self.parent.scaninfo.num_points + DETECTOR_TIMEOUT - logger.success('XXXX %s XXXX' % self.parent.roistat.ts_acquiring.get()) - success = self.wait_for_signals( - [ - (self.parent.cam.acquire.get, 0), - (self.parent.hdf.capture.get, 0), - (self.parent.roistat.ts_acquiring.get, 'Done') - ], - timeout, - check_stopped=True, - all_signals=True - ) - - # publish file location - self.parent.filepath.put(self.parent.hdf.full_file_name.get()) - self.publish_file_location(done=True, successful=success) - - # publish timeseries data - metadata = self.parent.scaninfo.scan_msg.metadata - metadata.update({"async_update": "append", "num_lines": self.parent.roistat.ts_current_point.get()}) - msg = messages.DeviceMessage( - signals={ - self.parent.roistat.roi1.name_.get(): { - 'value': self.parent.roistat.roi1.ts_total.get(), - }, - self.parent.roistat.roi2.name_.get(): { - 'value': self.parent.roistat.roi2.ts_total.get(), - }, - }, - metadata=self.parent.scaninfo.scan_msg.metadata - ) - self.parent.connector.xadd( - topic=MessageEndpoints.device_async_readback( - scan_id=self.parent.scaninfo.scan_id, device=self.parent.name - ), - msg_dict={"data": msg}, - expire=1800, - ) - - logger.success('XXXX complete %d XXXX' % success) - if success: - status.set_finished() - else: - status.set_exception(TimeoutError()) - return status - - def on_stop(self): - logger.success('XXXX stop XXXX') - self.parent.cam.acquire.put(0) - self.parent.hdf.capture.put(0) - self.parent.roistat.ts_control.put(2) - - def on_unstage(self): - self.parent.cam.acquire.put(0) - self.parent.hdf.capture.put(0) - self.parent.roistat.ts_control.put(2) - logger.success('XXXX unstage XXXX') - - -class EigerROIStatPlugin(ROIStatPlugin): - roi1 = ADCpt(ROIStatNPlugin, '1:') - roi2 = ADCpt(ROIStatNPlugin, '2:') - -class PhoenixTrigger(PSIDetectorBase): - - """ - Parent class: PSIDetectorBase - - class attributes: - custom_prepare_cls (XMAPSetup) : Custom detector setup class for cSAXS, - inherits from CustomDetectorMixin - in __init__ of PSIDetecor bases - class is initialized - self.custom_prepare = self.custom_prepare_cls(parent=self, **kwargs) - PSIDetectorBase.set_min_readout (float) : Minimum readout time for the detector - dxp (EpicsDXPXMAP) : DXP parameters for XMAP detector - mca (EpicsMCARecord) : MCA parameters for XMAP detector - hdf5 (XMAPHDF5Plugins) : HDF5 parameters for XMAP detector - MIN_READOUT (float) : Minimum readout time for the detector - - - The class PhoenixTrigger is the class to be called via yaml configuration file - the input arguments are defined by PSIDetectorBase, - and need to be given in the yaml configuration file. - To adress chanels such as 'X07MB-OP2:SMPL-DONE': - - use prefix 'X07MB-OP2:' in the device definition in the yaml configuration file. - - PSIDetectorBase( - prefix='', - *,Q - name, - kind=None, - parent=None, - device_manager=None, - **kwargs, - ) - Docstring: - Abstract base class for SLS detectors - - Class attributes: - custom_prepare_cls (object): class for custom prepare logic (BL specific) - - Args: - prefix (str): EPICS PV prefix for component (optional) - name (str): name of the device, as will be reported via read() - kind (str): member of class 'ophydobj.Kind', defaults to Kind.normal - omitted -> readout ignored for read 'ophydobj.read()' - normal -> readout for read - config -> config parameter for 'ophydobj.read_configuration()' - hinted -> which attribute is readout for read - parent (object): instance of the parent device - device_manager (object): bec device manager - **kwargs: keyword arguments - File: /data/test/x07mb-test-bec/bec_deployment/ophyd_devices/ophyd_devices/interfaces/base_classes/psi_detector_base.py - Type: type - Subclasses: SimCamera, SimMonitorAsync - - - - - - """ - #custom_prepare_cls = PhoenixTriggerSetup - - #cam = ADCpt(SLSDetectorCam, 'cam1:') - - - #X07MB-OP2:START-CSMPL.. cont on / off - - # X07MB-OP2:SMPL.. take single sample - #X07MB-OP2:INTR-COUNT.. counter run up - #X07MB-OP2:TOTAL-CYCLES .. cycles set - #X07MB-OP2:SMPL-DONE - - QUESTION HOW does ADCpt kno the EPICS prefix?????? - - #image = ADCpt(ImagePlugin, 'image1:') - #roi1 = ADCpt(ROIPlugin, 'ROI1:') - #roi2 = ADCpt(ROIPlugin, 'ROI2:') - #stats1 = ADCpt(StatsPlugin, 'Stats1:') - #stats2 = ADCpt(StatsPlugin, 'Stats2:') - roistat = ADCpt(EigerROIStatPlugin, 'ROIStat1:') - #roistat1 = ADCpt(ROIStatNPlugin, 'ROIStat1:1:') - #roistat2 = ADCpt(ROIStatNPlugin, 'ROIStat1:2:') - hdf = ADCpt(HDF5Plugin, 'HDF1:') - ) diff --git a/phoenix_bec/devices/phoenix_trigger.py b/phoenix_bec/devices/phoenix_trigger.py new file mode 100644 index 0000000..d457eb6 --- /dev/null +++ b/phoenix_bec/devices/phoenix_trigger.py @@ -0,0 +1,182 @@ +from ophyd import ( + ADComponent as ADCpt, + Device, + DeviceStatus, +) + +from ophyd import Component as Cpt +from ophyd import Device, EpicsSignal, EpicsSignalRO + +from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase, CustomDetectorMixin + +from bec_lib import bec_logger, messages +from bec_lib.endpoints import MessageEndpoints + +import time + +logger = bec_logger.logger + +DETECTOR_TIMEOUT = 5 + +#class PhoenixTriggerError(Exce start_csmpl=Cpt(EPicsSignal,'START-CSMPL') # cont on / off + + + + + +class PhoenixTriggerSetup(CustomDetectorMixin): + """ + This defines the PHOENIX trigger setup. + + + """ + + def __init__(self, *args, parent:Device = None, **kwargs): + super().__init__(*args, parent=parent, **kwargs) + self._counter = 0 + + WW + def on_stage(self): + # is this called on each point in scan or just before scan ??? + print('on stage') + self.parent.start_csmpl.put(0) + time.sleep(0.05) + cycles=self.parent.total_cycles.get() + time.sleep(0.05) + cycles=self.parent.total_cycles.put(0) + time.sleep(0.05) + cycles=self.parent.smpl.put(2) + time.sleep(0.5) + cycles=self.parent.total_cycles.put(cycles) + + logger.success('PhoenixTrigger on stage') + + def on_trigger(self): + + self.parent.start_smpl.put(1) + time.sleep(0.05) # use blocking + logger.success('PhoenixTrigger on_trigger') + + return self.wait_with_status( + [(self.parent.smpl_done.get, 1)]) + + + + +# logger.success(' PhoenixTrigger on_trigger complete ') + +# if success: +# status.set_finished() +# else: +# status.set_exception(TimeoutError()) +# return status + + + + def on_complete(self): + + timeout =10 + + + logger.success('XXXX complete %d XXXX' % success) + + success = self.wait_for_signals( + [ + (self.parent.smpl_done.get, 0)) + ], + timeout, + check_stopped=True, + all_signals=True + ) + + + + if success: + status.set_finished() + else: + status.set_exception(TimeoutError()) + return status + + + + + def on_stop(self): + logger.success(' PhoenixTrigger on_stop ') + + self.parent.csmpl.put(1) + logger.success(' PhoenixTrigger on_stop finished ') + + def on_unstage(self): + logger.success(' PhoenixTrigger on_unstage ') + self.parent.csmpl.put(1) + self.parent.smpl.put(1) + logger.success(' PhoenixTrigger on_unstage finished ') + + + + + +class PhoenixTrigger(PSIDetectorBase): + + """ + Parent class: PSIDetectorBase + + class attributes: + custom_prepare_cls (XMAPSetup) : Custom detector setup class for cSAXS, + inherits from CustomDetectorMixin + in __init__ of PSIDetecor bases + class is initialized + self.custom_prepare = self.custom_prepare_cls(parent=self, **kwargs) + PSIDetectorBase.set_min_readout (float) : Minimum readout time for the detector + dxp (EpicsDXPXMAP) : DXP parameters for XMAP detector + mca (EpicsMCARecord) : MCA parameters for XMAP detector + hdf5 (XMAPHDF5Plugins) : HDF5 parameters for XMAP detector + MIN_READOUT (float) : Minimum readout time for the detector + + + The class PhoenixTrigger is the class to be called via yaml configuration file + the input arguments are defined by PSIDetectorBase, + and need to be given in the yaml configuration file. + To adress chanels such as 'X07MB-OP2:SMPL-DONE': + + use prefix 'X07MB-OP2:' in the device definition in the yaml configuration file. + + PSIDetectorBase( + prefix='', + *,Q + name, + kind=None, + parent=None, + device_manager=None, + **kwargs, + ) + Docstring: + Abstract base class for SLS detectors + + Class attributes: + custom_prepare_cls (object): class for custom prepare logic (BL specific) + + Args: + prefix (str): EPICS PV prefix for component (optional) + name (str): name of the device, as will be reported via read() + kind (str): member of class 'ophydobj.Kind', defaults to Kind.normal + omitted -> readout ignored for read 'ophydobj.read()' + normal -> readout for read + config -> config parameter for 'ophydobj.read_configuration()' + hinted -> which attribute is readout for read + parent (object): instance of the parent device + device_manager (object): bec device manager + **kwargs: keyword arguments + File: /data/test/x07mb-test-bec/bec_deployment/ophyd_devices/ophyd_devices/interfaces/base_classes/psi_detector_base.py + Type: type + Subclasses: EpicsSignal + """ + + custom_prepare_cls = PhoenixTriggerSetup + + + start_csmpl = Cpt(EpicsSignal,'START-CSMPL') # cont on / off + intr_count = Cpt(EpicsSignal,'INTR-COUNT') # conter run up + total_cycles = Cpt(EpicsSignal,'TOTAL-CYCLES') # cycles set + smpl_done = Cpt(EpicsSignal,'SMPL-DONE') # show trigger is done + diff --git a/phoenix_bec/local_scripts/Documentation/Base_Classes/BASE_CLASS_delay_GENERATOR.TXT b/phoenix_bec/local_scripts/Documentation/Base_Classes/BASE_CLASS_delay_GENERATOR.TXT new file mode 100644 index 0000000..03c76ea --- /dev/null +++ b/phoenix_bec/local_scripts/Documentation/Base_Classes/BASE_CLASS_delay_GENERATOR.TXT @@ -0,0 +1,510 @@ +import enum +import time +from typing import Any + +from bec_lib import bec_logger +from ophyd import ( + Component, + Device, + DeviceStatus, + EpicsSignal, + EpicsSignalRO, + Kind, + PVPositioner, + Signal, +) +from ophyd.device import Staged +from ophyd.pseudopos import ( + PseudoPositioner, + PseudoSingle, + pseudo_position_argument, + real_position_argument, +) + +from ophyd_devices.utils import bec_utils +from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin + +logger = bec_logger.logger + + +class DelayGeneratorError(Exception): + """Exception raised for errors.""" + + +class DeviceInitError(DelayGeneratorError): + """Error upon failed initialization, invoked by missing device manager or device not started in sim_mode.""" + + +class DelayGeneratorNotOkay(DelayGeneratorError): + """Error when DDG is not okay""" + + +class TriggerSource(enum.IntEnum): + """ + Class for trigger options of DG645 + + Used to set the trigger source of the DG645 by setting the value + e.g. source.put(TriggerSource.Internal) + Exp: + TriggerSource.Internal + """ + + INTERNAL = 0 + EXT_RISING_EDGE = 1 + EXT_FALLING_EDGE = 2 + SS_EXT_RISING_EDGE = 3 + SS_EXT_FALLING_EDGE = 4 + SINGLE_SHOT = 5 + LINE = 6 + + +class DelayStatic(Device): + """ + Static axis for the T0 output channel + + It allows setting the logic levels, but the timing is fixed. + The signal is high after receiving the trigger until the end + of the holdoff period. + """ + + # Other channel stuff + ttl_mode = Component(EpicsSignal, "OutputModeTtlSS.PROC", kind=Kind.config) + nim_mode = Component(EpicsSignal, "OutputModeNimSS.PROC", kind=Kind.config) + polarity = Component( + EpicsSignal, + "OutputPolarityBI", + write_pv="OutputPolarityBO", + name="polarity", + kind=Kind.config, + ) + amplitude = Component( + EpicsSignal, "OutputAmpAI", write_pv="OutputAmpAO", name="amplitude", kind=Kind.config + ) + offset = Component( + EpicsSignal, "OutputOffsetAI", write_pv="OutputOffsetAO", name="offset", kind=Kind.config + ) + + +class DummyPositioner(PVPositioner): + """Dummy Positioner to set AO, AI and ReferenceMO.""" + + setpoint = Component(EpicsSignal, "DelayAO", put_complete=True, kind=Kind.config) + readback = Component(EpicsSignalRO, "DelayAI", kind=Kind.config) + done = Component(Signal, value=1) + reference = Component(EpicsSignal, "ReferenceMO", put_complete=True, kind=Kind.config) + + +class DelayPair(PseudoPositioner): + """ + Delay pair interface + + Virtual motor interface to a pair of signals (on the frontpanel - AB/CD/EF/GH). + It offers a simple delay and pulse width interface. + """ + + # The pseudo positioner axes + delay = Component(PseudoSingle, limits=(0, 2000.0), name="delay") + width = Component(PseudoSingle, limits=(0, 2000.0), name="pulsewidth") + ch1 = Component(DummyPositioner, name="ch1") + ch2 = Component(DummyPositioner, name="ch2") + io = Component(DelayStatic, name="io") + + def __init__(self, *args, **kwargs): + # Change suffix names before connecting (a bit of dynamic connections) + self.__class__.__dict__["ch1"].suffix = kwargs["channel"][0] + self.__class__.__dict__["ch2"].suffix = kwargs["channel"][1] + self.__class__.__dict__["io"].suffix = kwargs["channel"] + + del kwargs["channel"] + # Call parent to start the connections + super().__init__(*args, **kwargs) + + @pseudo_position_argument + def forward(self, pseudo_pos): + """Run a forward (pseudo -> real) calculation""" + return self.RealPosition(ch1=pseudo_pos.delay, ch2=pseudo_pos.delay + pseudo_pos.width) + + @real_position_argument + def inverse(self, real_pos): + """Run an inverse (real -> pseudo) calculation""" + return self.PseudoPosition(delay=real_pos.ch1, width=real_pos.ch2 - real_pos.ch1) + + +class DDGCustomMixin: + """ + Mixin class for custom DelayGenerator logic within PSIDelayGeneratorBase. + + This class provides a parent class for implementation of BL specific logic of the device. + It is also possible to pass implementing certain methods, e.g. finished or on_trigger, + based on the setup and desired operation mode at the beamline. + + Args: + parent (object): instance of PSIDelayGeneratorBase + **kwargs: keyword arguments + """ + + def __init__(self, *_args, parent: Device = None, **_kwargs) -> None: + self.parent = parent + + def initialize_default_parameter(self) -> None: + """ + Method to initialize default parameters for DDG. + + Called upon initiating the base class. + It should be used to set the DDG default parameters. + These may include: amplitude, offsets, delays, etc. + """ + + def prepare_ddg(self) -> None: + """ + Method to prepare the DDG for the upcoming scan. + + Called by the stage method of the base class. + It should be used to set the DDG parameters for the upcoming scan. + """ + + def on_trigger(self) -> None: + """Method executed upon trigger call in parent class""" + + def finished(self) -> None: + """Method to check if DDG is finished with the scan""" + + def on_pre_scan(self) -> None: + """ + Method executed upon pre_scan call in parent class. + + Covenient to implement time sensitive actions to be executed right before start of the scan. + Example could be to open the shutter by triggering a pulse via pre_scan. + """ + + def check_scan_id(self) -> None: + """Method to check if there is a new scan_id, called by stage.""" + + def is_ddg_okay(self, raise_on_error=False) -> None: + """ + Method to check if DDG is okay + + It checks the status PV of the DDG and tries to clear the error if it is not okay. + It will rerun itself and raise DelayGeneratorNotOkay if DDG is still not okay. + + Args: + raise_on_error (bool, optional): raise exception if DDG is not okay. Defaults to False. + """ + status = self.parent.status.read()[self.parent.status.name]["value"] + if status != "STATUS OK" and not raise_on_error: + logger.warning(f"DDG returns {status}, trying to clear ERROR") + self.parent.clear_error() + time.sleep(1) + self.is_ddg_okay(raise_on_error=True) + elif status != "STATUS OK": + raise DelayGeneratorNotOkay(f"DDG failed to start with status: {status}") + + +class PSIDelayGeneratorBase(Device): + """ + Abstract base class for DelayGenerator DG645 + + This class implements a thin Ophyd wrapper around the Stanford Research DG645 + digital delay generator. + + The DG645 generates 8+1 signals: A, B, C, D, E, F, G, H and T0. Front panel outputs + T0, AB, CD, EF and GH are combinations of these signals. Back panel outputs are + directly routed signals. Signals are not independent. + + Signal pairs, e.g. AB, CD, EF, GH, are implemented as DelayPair objects. They + have a TTL pulse width, delay and a reference signal to which they are being triggered. + In addition, the io layer allows setting amplitude, offset and polarity for each pair. + + Detailed information can be found in the manual: + https://www.thinksrs.com/downloads/pdfs/manuals/DG645m.pdf + + Class attributes: + custom_prepare_cls (object): class for custom prepare logic (BL specific) + + Args: + prefix (str) : EPICS PV prefix for component (optional) + name (str) : name of the device, as will be reported via read() + kind (str) : member of class 'ophydobj.Kind', defaults to Kind.normal + omitted -> readout ignored for read 'ophydobj.read()' + normal -> readout for read + config -> config parameter for 'ophydobj.read_configuration()' + hinted -> which attribute is readout for read + read_attrs (list) : sequence of attribute names to read + configuration_attrs (list) : sequence of attribute names via config_parameters + parent (object) : instance of the parent device + device_manager (object) : bec device manager + sim_mode (bool) : simulation mode, if True, no device manager is required + **kwargs : keyword arguments + attributes : lazy_wait_for_connection : bool + """ + + # Custom_prepare_cls + custom_prepare_cls = DDGCustomMixin + + SUB_PROGRESS = "progress" + SUB_VALUE = "value" + _default_sub = SUB_VALUE + + USER_ACCESS = ["set_channels", "_set_trigger", "burst_enable", "burst_disable", "reload_config"] + + # Assign PVs from DDG645 + trigger_burst_readout = Component( + EpicsSignal, "EventStatusLI.PROC", name="trigger_burst_readout" + ) + burst_cycle_finished = Component(EpicsSignalRO, "EventStatusMBBID.B3", name="read_burst_state") + delay_finished = Component(EpicsSignalRO, "EventStatusMBBID.B2", name="delay_finished") + status = Component(EpicsSignalRO, "StatusSI", name="status") + clear_error = Component(EpicsSignal, "StatusClearBO", name="clear_error") + + # Front Panel + channelT0 = Component(DelayStatic, "T0", name="T0") + channelAB = Component(DelayPair, "", name="AB", channel="AB") + channelCD = Component(DelayPair, "", name="CD", channel="CD") + channelEF = Component(DelayPair, "", name="EF", channel="EF") + channelGH = Component(DelayPair, "", name="GH", channel="GH") + + holdoff = Component( + EpicsSignal, + "TriggerHoldoffAI", + write_pv="TriggerHoldoffAO", + name="trigger_holdoff", + kind=Kind.config, + ) + inhibit = Component( + EpicsSignal, + "TriggerInhibitMI", + write_pv="TriggerInhibitMO", + name="trigger_inhibit", + kind=Kind.config, + ) + source = Component( + EpicsSignal, + "TriggerSourceMI", + write_pv="TriggerSourceMO", + name="trigger_source", + kind=Kind.config, + ) + level = Component( + EpicsSignal, + "TriggerLevelAI", + write_pv="TriggerLevelAO", + name="trigger_level", + kind=Kind.config, + ) + rate = Component( + EpicsSignal, + "TriggerRateAI", + write_pv="TriggerRateAO", + name="trigger_rate", + kind=Kind.config, + ) + trigger_shot = Component(EpicsSignal, "TriggerDelayBO", name="trigger_shot", kind="config") + burstMode = Component( + EpicsSignal, "BurstModeBI", write_pv="BurstModeBO", name="burstmode", kind=Kind.config + ) + burstConfig = Component( + EpicsSignal, "BurstConfigBI", write_pv="BurstConfigBO", name="burstconfig", kind=Kind.config + ) + burstCount = Component( + EpicsSignal, "BurstCountLI", write_pv="BurstCountLO", name="burstcount", kind=Kind.config + ) + burstDelay = Component( + EpicsSignal, "BurstDelayAI", write_pv="BurstDelayAO", name="burstdelay", kind=Kind.config + ) + burstPeriod = Component( + EpicsSignal, "BurstPeriodAI", write_pv="BurstPeriodAO", name="burstperiod", kind=Kind.config + ) + + def __init__( + self, + prefix="", + *, + name, + kind=None, + read_attrs=None, + configuration_attrs=None, + parent=None, + device_manager=None, + sim_mode=False, + **kwargs, + ): + super().__init__( + prefix=prefix, + name=name, + kind=kind, + read_attrs=read_attrs, + configuration_attrs=configuration_attrs, + parent=parent, + **kwargs, + ) + if device_manager is None and not sim_mode: + raise DeviceInitError( + f"No device manager for device: {name}, and not started sim_mode: {sim_mode}. Add" + " DeviceManager to initialization or init with sim_mode=True" + ) + # Init variables + self.sim_mode = sim_mode + self.stopped = False + self.name = name + self.scaninfo = None + self.timeout = 5 + self.all_channels = ["channelT0", "channelAB", "channelCD", "channelEF", "channelGH"] + self.all_delay_pairs = ["AB", "CD", "EF", "GH"] + self.wait_for_connection(all_signals=True) + + # Init custom prepare class with BL specific logic + self.custom_prepare = self.custom_prepare_cls(parent=self, **kwargs) + if not sim_mode: + self.device_manager = device_manager + else: + self.device_manager = bec_utils.DMMock() + self.connector = self.device_manager.connector + self._update_scaninfo() + self._init() + + def _update_scaninfo(self) -> None: + """ + Method to updated scaninfo from BEC. + + In sim_mode, scaninfo output is mocked - see bec_scaninfo_mixin.py + """ + self.scaninfo = BecScaninfoMixin(self.device_manager, self.sim_mode) + self.scaninfo.load_scan_metadata() + + def _init(self) -> None: + """Method to initialize custom parameters of the DDG.""" + self.custom_prepare.initialize_default_parameter() + self.custom_prepare.is_ddg_okay() + + def set_channels(self, signal: str, value: Any, channels: list = None) -> None: + """ + Method to set signals on DelayPair and DelayStatic channels. + + Signals can be set on the DelayPair and DelayStatic channels. The method checks + if the signal is available on the channel and sets it. It works for both, DelayPair + and Delay Static although signals are hosted in different layers. + + Args: + signal (str) : signal to set (width, delay, amplitude, offset, polarity) + value (Any) : value to set + channels (list, optional) : list of channels to set. Defaults to self.all_channels (T0,AB,CD,EF,GH) + """ + if not channels: + channels = self.all_channels + for chname in channels: + channel = getattr(self, chname, None) + if not channel: + continue + if signal in channel.component_names: + getattr(channel, signal).set(value) + continue + if "io" in channel.component_names and signal in channel.io.component_names: + getattr(channel.io, signal).set(value) + + def set_trigger(self, trigger_source: TriggerSource) -> None: + """Set trigger source on DDG - possible values defined in TriggerSource enum""" + value = int(trigger_source) + self.source.put(value) + + def burst_enable(self, count, delay, period, config="all"): + """Enable the burst mode""" + # Validate inputs + count = int(count) + assert count > 0, "Number of bursts must be positive" + assert delay >= 0, "Burst delay must be larger than 0" + assert period > 0, "Burst period must be positive" + assert config in ["all", "first"], "Supported burst configs are 'all' and 'first'" + + self.burstMode.put(1) + self.burstCount.put(count) + self.burstDelay.put(delay) + self.burstPeriod.put(period) + + if config == "all": + self.burstConfig.put(0) + elif config == "first": + self.burstConfig.put(1) + + def burst_disable(self): + """Disable burst mode""" + self.burstMode.put(0) + + def stage(self) -> list[object]: + """ + Method to stage the device. + + Called in preparation for a scan. + + Internal Calls: + - scaninfo.load_scan_metadata : load scan metadata + - custom_prepare.prepare_ddg : prepare DDG for measurement + - is_ddg_okay : check if DDG is okay + + Returns: + list(object): list of objects that were staged + """ + if self._staged != Staged.no: + return super().stage() + self.stopped = False + self.scaninfo.load_scan_metadata() + self.custom_prepare.prepare_ddg() + self.custom_prepare.is_ddg_okay() + # At the moment needed bc signal might not be reliable, BEC too fast. + # Consider removing this overhead in future! + time.sleep(0.05) + return super().stage() + + def trigger(self) -> DeviceStatus: + """ + Method to trigger the acquisition. + + Internal Call: + - custom_prepare.on_trigger : execute BL specific action + """ + self.custom_prepare.on_trigger() + return super().trigger() + + def pre_scan(self) -> None: + """ + Method pre_scan gets executed directly before the scan + + Internal Call: + - custom_prepare.on_pre_scan : execute BL specific action + """ + self.custom_prepare.on_pre_scan() + + def unstage(self) -> list[object]: + """ + Method unstage gets called at the end of a scan. + + If scan (self.stopped is True) is stopped, returns directly. + Otherwise, checks if the DDG finished acquisition + + Internal Calls: + - custom_prepare.check_scan_id : check if scan_id changed or detector stopped + - custom_prepare.finished : check if device finished acquisition (succesfully) + - is_ddg_okay : check if DDG is okay + + Returns: + list(object): list of objects that were unstaged + """ + self.custom_prepare.check_scan_id() + if self.stopped is True: + return super().unstage() + self.custom_prepare.finished() + self.custom_prepare.is_ddg_okay() + self.stopped = False + return super().unstage() + + def stop(self, *, success=False) -> None: + """ + Method to stop the DDG + + #TODO Check if the pulse generation can be interruppted + + Internal Call: + - custom_prepare.is_ddg_okay : check if DDG is okay + """ + self.custom_prepare.is_ddg_okay() + super().stop(success=success) + self.stopped = True diff --git a/phoenix_bec/local_scripts/Documentation/Base_Classes/BASE_CLASS_scan_stubs.txt b/phoenix_bec/local_scripts/Documentation/Base_Classes/BASE_CLASS_scan_stubs.txt new file mode 100644 index 0000000..f672ae6 --- /dev/null +++ b/phoenix_bec/local_scripts/Documentation/Base_Classes/BASE_CLASS_scan_stubs.txt @@ -0,0 +1,570 @@ +""" +Scan stubs are commands that can be used to control devices during a scan. They typically yield device messages that are +consumed by the scan worker and potentially forwarded to the device server. +""" + +from __future__ import annotations + +import threading +import time +import uuid +from collections.abc import Callable +from typing import Generator, Literal + +import numpy as np + +from bec_lib import messages +from bec_lib.connector import ConnectorBase +from bec_lib.device import Status +from bec_lib.endpoints import MessageEndpoints +from bec_lib.logger import bec_logger + +from .errors import DeviceMessageError, ScanAbortion + +logger = bec_logger.logger + + +class ScanStubs: + """ + Scan stubs are commands that can be used to control devices during a scan. They typically yield device messages that are + consumed by the scan worker and potentially forwarded to the device server. + """ + + def __init__( + self, + connector: ConnectorBase, + device_msg_callback: Callable = None, + shutdown_event: threading.Event = None, + ) -> None: + self.connector = connector + self.device_msg_metadata = ( + device_msg_callback if device_msg_callback is not None else lambda: {} + ) + self.shutdown_event = shutdown_event + + @staticmethod + def _exclude_nones(input_dict: dict): + for key in list(input_dict.keys()): + if input_dict[key] is None: + input_dict.pop(key) + + def _device_msg(self, **kwargs): + """""" + msg = messages.DeviceInstructionMessage(**kwargs) + msg.metadata = {**self.device_msg_metadata(), **msg.metadata} + return msg + + def send_rpc_and_wait(self, device: str, func_name: str, *args, **kwargs) -> any: + """Perform an RPC (remote procedure call) on a device and wait for its return value. + + Args: + device (str): Name of the device + func_name (str): Function name. The function name will be appended to the device. + args (tuple): Arguments to pass on to the RPC function + kwargs (dict): Keyword arguments to pass on to the RPC function + + Raises: + ScanAbortion: Raised if the RPC's success is False + + Returns: + any: Return value of the executed rpc function + + Examples: + >>> send_rpc_and_wait("samx", "controller.my_custom_function") + """ + rpc_id = str(uuid.uuid4()) + parameter = { + "device": device, + "func": func_name, + "rpc_id": rpc_id, + "args": args, + "kwargs": kwargs, + } + yield from self.rpc( + device=device, parameter=parameter, metadata={"response": True, "RID": rpc_id} + ) + return self._get_from_rpc(rpc_id) + + def _get_from_rpc(self, rpc_id) -> any: + """ + Get the return value from an RPC call. + + Args: + rpc_id (str): RPC ID + + Raises: + ScanAbortion: Raised if the RPC's success flag is False + + Returns: + any: Return value of the RPC call + """ + + while not self.shutdown_event.is_set(): + msg = self.connector.get(MessageEndpoints.device_rpc(rpc_id)) + if msg: + break + time.sleep(0.001) + if self.shutdown_event.is_set(): + raise ScanAbortion("The scan was aborted.") + if not msg.content["success"]: + error = msg.content["out"] + if isinstance(error, dict) and {"error", "msg", "traceback"}.issubset( + set(error.keys()) + ): + error_msg = f"During an RPC, the following error occured:\n{error['error']}: {error['msg']}.\nTraceback: {error['traceback']}\n The scan will be aborted." + else: + error_msg = "During an RPC, an error occured" + raise ScanAbortion(error_msg) + + logger.debug(msg.content.get("out")) + return_val = msg.content.get("return_val") + if not isinstance(return_val, dict): + return return_val + if return_val.get("type") == "status" and return_val.get("RID"): + return Status(self.connector, return_val.get("RID")) + return return_val + + def set_with_response( + self, *, device: str, value: float, request_id: str = None, metadata=None + ) -> Generator[None, None, None]: + """Set a device to a specific value and return the request ID. Use :func:`request_is_completed` to later check if the request is completed. + + Args: + device (str): Device name. + value (float): Target value. + + Returns: + Generator[None, None, None]: Generator that yields a device message. + + see also: :func:`request_is_completed` + + """ + request_id = str(uuid.uuid4()) if request_id is None else request_id + metadata = metadata if metadata is not None else {} + metadata.update({"response": True, "RID": request_id}) + yield from self.set(device=device, value=value, wait_group="set", metadata=metadata) + return request_id + + def request_is_completed(self, RID: str) -> bool: + """Check if a request that was initiated with :func:`set_with_response` is completed. + + Args: + RID (str): Request ID. + + Returns: + bool: True if the request is completed, False otherwise. + + """ + msg = self.connector.lrange(MessageEndpoints.device_req_status_container(RID), 0, -1) + if not msg: + return False + return True + + def set_and_wait( + self, *, device: list[str], positions: list | np.ndarray + ) -> Generator[None, None, None]: + """Set devices to a specific position and wait completion. + + Args: + device (list[str]): List of device names. + positions (list | np.ndarray): Target position. + + Returns: + Generator[None, None, None]: Generator that yields a device message. + + see also: :func:`set`, :func:`wait`, :func:`set_with_response` + + """ + if not isinstance(positions, list) and not isinstance(positions, np.ndarray): + positions = [positions] + if len(positions) == 0: + return + for ind, val in enumerate(device): + yield from self.set(device=val, value=positions[ind], wait_group="scan_motor") + yield from self.wait(device=device, wait_type="move", wait_group="scan_motor") + + def read_and_wait( + self, *, wait_group: str, device: list = None, group: str = None, point_id: int = None + ) -> Generator[None, None, None]: + """Trigger a reading and wait for completion. + + Args: + wait_group (str): wait group + device (list, optional): List of device names. Can be specified instead of group. Defaults to None. + group (str, optional): Group name of devices. Can be specified instead of device. Defaults to None. + point_id (int, optional): _description_. Defaults to None. + + Returns: + Generator[None, None, None]: Generator that yields a device message. + + """ + self._check_device_and_groups(device, group) + yield from self.read(device=device, group=group, wait_group=wait_group, point_id=point_id) + yield from self.wait(device=device, wait_type="read", group=group, wait_group=wait_group) + + def open_scan( + self, + *, + scan_motors: list, + readout_priority: dict, + num_pos: int, + scan_name: str, + scan_type: Literal["step", "fly"], + positions=None, + metadata=None, + ) -> Generator[None, None, None]: + """Open a new scan. + + Args: + scan_motors (list): List of scan motors. + readout_priority (dict): Modification of the readout priority. + num_pos (int): Number of positions within the scope of this scan. + positions (list): List of positions for this scan. + scan_name (str): Scan name. + scan_type (str): Scan type (e.g. 'step' or 'fly') + + Returns: + Generator[None, None, None]: Generator that yields a device message. + + """ + yield self._device_msg( + device=None, + action="open_scan", + parameter={ + "scan_motors": scan_motors, + "readout_priority": readout_priority, + "num_points": num_pos, + "positions": positions, + "scan_name": scan_name, + "scan_type": scan_type, + }, + metadata=metadata, + ) + + def kickoff( + self, *, device: str, parameter: dict = None, wait_group="kickoff", metadata=None + ) -> Generator[None, None, None]: + """Kickoff a fly scan device. + + Args: + device (str): Device name of flyer. + parameter (dict, optional): Additional parameters that should be forwarded to the device. Defaults to {}. + + Returns: + Generator[None, None, None]: Generator that yields a device message. + """ + parameter = parameter if parameter is not None else {} + parameter = {"configure": parameter, "wait_group": wait_group} + yield self._device_msg( + device=device, action="kickoff", parameter=parameter, metadata=metadata + ) + + def complete(self, *, device: str, metadata=None) -> Generator[None, None, None]: + """Complete a fly scan device. + + Args: + device (str): Device name of flyer. + + Returns: + Generator[None, None, None]: Generator that yields a device message. + """ + yield self._device_msg(device=device, action="complete", parameter={}, metadata=metadata) + + def get_req_status(self, device: str, RID: str, DIID: int) -> int: + """Check if a device request status matches the given RID and DIID + + Args: + device (str): device under inspection + RID (str): request ID + DIID (int): device instruction ID + + Returns: + int: 1 if the request status matches the RID and DIID, 0 otherwise + """ + msg = self.connector.get(MessageEndpoints.device_req_status(device)) + if not msg: + return 0 + matching_RID = msg.metadata.get("RID") == RID + matching_DIID = msg.metadata.get("DIID") == DIID + if matching_DIID and matching_RID: + return 1 + return 0 + + def get_device_progress(self, device: str, RID: str) -> float | None: + """Get reported device progress + + Args: + device (str): Name of the device + RID (str): request ID + + Returns: + float: reported progress value + + """ + msg = self.connector.get(MessageEndpoints.device_progress(device)) + if not msg: + return None + matching_RID = msg.metadata.get("RID") == RID + if not matching_RID: + return None + if not isinstance(msg, messages.ProgressMessage): + raise DeviceMessageError( + f"Expected to receive a Progressmessage for device {device} but instead received {msg}." + ) + return msg.content["value"] + + def close_scan(self) -> Generator[None, None, None]: + """ + Close the scan. + + Returns: + Generator[None, None, None]: Generator that yields a device message. + + see also: :func:`open_scan` + """ + + yield self._device_msg(device=None, action="close_scan", parameter={}) + + def stage(self) -> Generator[None, None, None]: + """ + Stage all devices + + Returns: + Generator[None, None, None]: Generator that yields a device message. + + see also: :func:`unstage` + """ + yield self._device_msg(device=None, action="stage", parameter={}) + + def unstage(self) -> Generator[None, None, None]: + """ + Unstage all devices + + Returns: + Generator[None, None, None]: Generator that yields a device message. + + see also: :func:`stage` + """ + yield self._device_msg(device=None, action="unstage", parameter={}) + + def pre_scan(self) -> Generator[None, None, None]: + """ + Trigger pre-scan actions on all devices. Typically, pre-scan actions are called directly before the scan core starts and + are used to perform time-critical actions. + The event will be sent to all devices that have a pre_scan method implemented. + + Returns: + Generator[None, None, None]: Generator that yields a device message. + """ + yield self._device_msg(device=None, action="pre_scan", parameter={}) + + def baseline_reading(self) -> Generator[None, None, None]: + """ + Run the baseline readings. This will readout all devices that are marked with the readout_priority "baseline". + + Returns: + Generator[None, None, None]: Generator that yields a device message. + + """ + yield self._device_msg( + device=None, + action="baseline_reading", + parameter={}, + metadata={"readout_priority": "baseline"}, + ) + + def wait( + self, + *, + wait_type: Literal["move", "read", "trigger"], + device: list[str] | str | None = None, + group: Literal["scan_motor", "primary", None] = None, + wait_group: str = None, + wait_time: float = None, + ): + """Wait for an event. + + Args: + wait_type (Literal["move", "read", "trigger"]): Type of wait event. Can be "move", "read" or "trigger". + device (list[str] | str, optional): List of device names. Defaults to None. + group (Literal["scan_motor", "primary", None]): Device group that can be used instead of device. Defaults to None. + wait_group (str, optional): Wait group for this event. Defaults to None. + wait_time (float, optional): Wait time (for wait_type="trigger"). Defaults to None. + + Returns: + Generator[None, None, None]: Generator that yields a device message. + + Example: + >>> yield from self.stubs.wait(wait_type="move", group="scan_motor", wait_group="scan_motor") + >>> yield from self.stubs.wait(wait_type="read", group="scan_motor", wait_group="my_readout_motors") + + """ + self._check_device_and_groups(device, group) + parameter = {"type": wait_type, "time": wait_time, "group": group, "wait_group": wait_group} + self._exclude_nones(parameter) + yield self._device_msg(device=device, action="wait", parameter=parameter) + + def read( + self, + *, + wait_group: str, + device: list[str] | str | None = None, + point_id: int | None = None, + group: Literal["scan_motor", "primary", None] = None, + ) -> Generator[None, None, None]: + """ + Trigger a reading on a device or device group. + + Args: + wait_group (str): Wait group for this event. The specified wait group can later be used + to wait for the completion of this event. Please note that the wait group has to be + unique. within the scope of the read / wait event. + device (list, optional): Device name. Can be used instead of group. Defaults to None. + point_id (int, optional): point_id to assign this reading to point within the scan. Defaults to None. + group (Literal["scan_motor", "primary", None], optional): Device group. Can be used instead of device. Defaults to None. + + Returns: + Generator[None, None, None]: Generator that yields a device message. + + Example: + >>> yield from self.stubs.read(wait_group="readout_primary", group="primary", point_id=self.point_id) + >>> yield from self.stubs.read(wait_group="sample_stage", device="samx", point_id=self.point_id) + + """ + self._check_device_and_groups(device, group) + parameter = {"group": group, "wait_group": wait_group} + metadata = {"point_id": point_id} + self._exclude_nones(parameter) + self._exclude_nones(metadata) + yield self._device_msg(device=device, action="read", parameter=parameter, metadata=metadata) + + def publish_data_as_read( + self, *, device: str, data: dict, point_id: int + ) -> Generator[None, None, None]: + """ + Publish the given data as a read event and assign it to the given point_id. + This method can be used to customize the assignment of data to a specific point within a scan. + + Args: + device (str): Device name. + data (dict): Data that should be published. + point_id (int): point_id that should be attached to this data. + + Returns: + Generator[None, None, None]: Generator that yields a device message. + """ + metadata = {"point_id": point_id} + yield self._device_msg( + device=device, + action="publish_data_as_read", + parameter={"data": data}, + metadata=metadata, + ) + + def trigger(self, *, group: str, point_id: int) -> Generator[None, None, None]: + """Trigger a device group. Note that the trigger event is not blocking and does not wait for the completion of the trigger event. + To wait for the completion of the trigger event, use the :func:`wait` command, specifying the wait_type as "trigger". + + Args: + group (str): Device group that should receive the trigger. + point_id (int): point_id that should be attached to this trigger event. + + Returns: + Generator[None, None, None]: Generator that yields a device message. + + see also: :func:`wait` + """ + yield self._device_msg( + device=None, + action="trigger", + parameter={"group": group}, + metadata={"point_id": point_id}, + ) + + def set(self, *, device: str, value: float, wait_group: str, metadata=None): + """Set the device to a specific value. This is similar to the direct set command + in the command-line interface. The wait_group can be used to wait for the completion of this event. + For a set operation, this simply means that the device has acknowledged the set command and does not + necessarily mean that the device has reached the target value. + + Args: + device (str): Device name + value (float): Target value. + wait_group (str): wait group for this event. + + Returns: + Generator[None, None, None]: Generator that yields a device message. + + .. warning:: + + Do not use this command to kickoff a long running operation. Use :func:`kickoff` instead or, if the + device does not support the kickoff command, use :func:`set_with_response` instead. + + see also: :func:`wait`, :func:`set_and_wait`, :func:`set_with_response` + + """ + yield self._device_msg( + device=device, + action="set", + parameter={"value": value, "wait_group": wait_group}, + metadata=metadata, + ) + + def open_scan_def(self) -> Generator[None, None, None]: + """ + Open a new scan definition + + Returns: + Generator[None, None, None]: Generator that yields a device message. + """ + yield self._device_msg(device=None, action="open_scan_def", parameter={}) + + def close_scan_def(self) -> Generator[None, None, None]: + """ + Close a scan definition + + Returns: + Generator[None, None, None]: Generator that yields a device message. + """ + yield self._device_msg(device=None, action="close_scan_def", parameter={}) + + def close_scan_group(self) -> Generator[None, None, None]: + """ + Close a scan group + + Returns: + Generator[None, None, None]: Generator that yields a device message. + """ + yield self._device_msg(device=None, action="close_scan_group", parameter={}) + + def rpc(self, *, device: str, parameter: dict, metadata=None) -> Generator[None, None, None]: + """Perfrom an RPC (remote procedure call) on a device. + + Args: + device (str): Device name. + parameter (dict): parameters used for this rpc instructions. + + Returns: + Generator[None, None, None]: Generator that yields a device message. + + """ + yield self._device_msg(device=device, action="rpc", parameter=parameter, metadata=metadata) + + def scan_report_instruction(self, instructions: dict) -> Generator[None, None, None]: + """Scan report instructions + + Args: + instructions (dict): Dict containing the scan report instructions + + Returns: + Generator[None, None, None]: Generator that yields a device message. + """ + yield self._device_msg( + device=None, action="scan_report_instruction", parameter=instructions + ) + + def _check_device_and_groups(self, device, group) -> None: + if device and group: + raise DeviceMessageError("Device and device group was specified. Pick one.") + if device is None and group is None: + raise DeviceMessageError("Either devices or device groups have to be specified.") diff --git a/phoenix_bec/local_scripts/Documentation/Base_Classes/BASE_CLASS_scans.txt b/phoenix_bec/local_scripts/Documentation/Base_Classes/BASE_CLASS_scans.txt new file mode 100644 index 0000000..177b228 --- /dev/null +++ b/phoenix_bec/local_scripts/Documentation/Base_Classes/BASE_CLASS_scans.txt @@ -0,0 +1,513 @@ +""" +This module contains the Scans class and related classes for defining and running scans in BEC +from the client side. +""" + +from __future__ import annotations + +import builtins +import uuid +from collections.abc import Callable +from contextlib import ContextDecorator +from copy import deepcopy +from typing import TYPE_CHECKING, Dict, Literal + +from toolz import partition +from typeguard import typechecked + +from bec_lib import messages +from bec_lib.bec_errors import ScanAbortion +from bec_lib.client import SystemConfig +from bec_lib.device import DeviceBase +from bec_lib.endpoints import MessageEndpoints +from bec_lib.logger import bec_logger +from bec_lib.scan_report import ScanReport +from bec_lib.signature_serializer import dict_to_signature +from bec_lib.utils import scan_to_csv + +if TYPE_CHECKING: + from bec_lib.client import BECClient + from bec_lib.connector import ConsumerConnector + +logger = bec_logger.logger + + +class ScanObject: + """ScanObject is a class for scans""" + + def __init__(self, scan_name: str, scan_info: dict, client: BECClient = None) -> None: + self.scan_name = scan_name + self.scan_info = scan_info + self.client = client + + # run must be an anonymous function to allow for multiple doc strings + # pylint: disable=unnecessary-lambda + self.run = lambda *args, **kwargs: self._run(*args, **kwargs) + + def _run( + self, + *args, + callback: Callable = None, + async_callback: Callable = None, + hide_report: bool = False, + metadata: dict = None, + monitored: list[str | DeviceBase] = None, + file_suffix: str = None, + file_directory: str = None, + **kwargs, + ) -> ScanReport: + """ + Run the request with the given arguments. + + Args: + *args: Arguments for the scan + callback: Callback function + async_callback: Asynchronous callback function + hide_report: Hide the report + metadata: Metadata dictionary + monitored: List of monitored devices + **kwargs: Keyword arguments + + Returns: + ScanReport + """ + if self.client.alarm_handler.alarms_stack: + logger.info("The alarm stack is not empty but will be cleared now.") + self.client.clear_all_alarms() + scans = self.client.scans + + # pylint: disable=protected-access + hide_report = hide_report or scans._hide_report + + user_metadata = deepcopy(self.client.metadata) + + sys_config = self.client.system_config.model_copy(deep=True) + if file_suffix: + sys_config.file_suffix = file_suffix + if file_directory: + sys_config.file_directory = file_directory + + if "sample_name" not in user_metadata: + var = self.client.get_globa file_suffix: str = None, +l_var("sample_name") + if var is not None: + user_metadata["sample_name"] = var + + if metadata is not None: + user_metadata.update(metadata) + + if monitored is not None: + if not isinstance(monitored, list): + monitored = [monitored] + for mon_device in monitored: + if isinstance(mon_device, str): + mon_device = self.client.device_manager.devices.get(mon_device) + if not mon_device: + raise RuntimeError( + f"Specified monitored device {mon_device} does not exist in the current device configuration." + ) + kwargs["monitored"] = monitored + + sys_config = sys_config.model_dump() + # pylint: disable=protected-access + if scans._scan_group: + sys_config["queue_group"] = scans._scan_group + if scans._scan_def_id: + sys_config["scan_def_id"] = scans._scan_def_id + if scans._dataset_id_on_hold: + sys_config["dataset_id_on_hold"] = scans._dataset_id_on_hold + + kwargs["user_metadata"] = user_metadata + kwargs["system_config"] = sys_config + + request = Scans.prepare_scan_request(self.scan_name, self.scan_info, *args, **kwargs) + request_id = str(uuid.uuid4()) + + # pylint: disable=unsupported-assignment-operation + request.metadata["RID"] = request_id + + self._send_scan_request(request) + + report = ScanReport.from_request(request, client=self.client) + report.request.callbacks.register_many("scan_segment", callback, sync=True) + report.request.callbacks.register_many("scan_segment", async_callback, sync=False) + + if scans._scan_export and scans._scan_export.scans is not None: + scans._scan_export.scans.append(report) + + if not hide_report and self.client.live_updates: + self.client.live_updates.process_request(request, callback) + + self.client.callbacks.poll() + + return report + + def _start_register(self, request: messages.ScanQueueMessage) -> ConsumerConnector: + """Start a register for the given request""" + register = self.client.device_manager.connector.register( + [ + MessageEndpoints.device_readback(dev) + for dev in request.content["parameter"]["args"].keys() + ], + threaded=False, + cb=(lambda msg: msg), + ) + return register + + def _send_scan_request(self, request: messages.ScanQueueMessage) -> None: + """Send a scan request to the scan server""" + self.client.device_manager.connector.send(MessageEndpoints.scan_queue_request(), request) + + +class Scans: + """Scans is a class for available scans in BEC""" + + def __init__(self, parent): + self.parent = parent + self._available_scans = {} + self._import_scans() + self._scan_group = None + self._scan_def_id = None + self._scan_group_ctx = ScanGroup(parent=self) + self._scan_def_ctx = ScanDef(parent=self) + self._hide_report = None + self._hide_report_ctx = HideReport(parent=self) + self._dataset_id_on_hold = None + self._dataset_id_on_hold_ctx = DatasetIdOnHold(parent=self) + self._scan_export = None + + def _import_scans(self): + """Import scans from the scan server""" + available_scans = self.parent.connector.get(MessageEndpoints.available_scans()) + if available_scans is None: + logger.warning("No scans available. Are redis and the BEC server running?") + return + for scan_name, scan_info in available_scans.resource.items(): + self._available_scans[scan_name] = ScanObject(scan_name, scan_info, client=self.parent) + setattr(self, scan_name, self._available_scans[scan_name].run) + setattr(getattr(self, scan_name), "__doc__", scan_info.get("doc")) + setattr( + getattr(self, scan_name), + "__signature__", + dict_to_signature(scan_info.get("signature")), + ) + + @staticmethod + def get_arg_type(in_type: str): + """translate type string into python type""" + # pylint: disable=too-many-return-statements + if in_type == "float": + return (float, int) + if in_type == "int": + return int + if in_type == "list": + return list + if in_type == "boolean": + return bool + if in_type == "str": + return str + if in_type == "dict": + return dict + if in_type == "device": + return DeviceBase + raise TypeError(f"Unknown type {in_type}") + + @staticmethod + def prepare_scan_request( + scan_name: str, scan_info: dict, *args, **kwargs + ) -> messages.ScanQueueMessage: + """Prepare scan request message with given scan arguments + + Args: + scan_name (str): scan name (matching a scan name on the scan server) + scan_info (dict): dictionary describing the scan (e.g. doc string, required kwargs etc.) + + Raises: + TypeError: Raised if not all required keyword arguments have been specified. + TypeError: Raised if the number of args do fit into the required bundling pattern. + TypeError: Raised if an argument is not of the required type as specified in scan_info. + + Returns: + messages.ScanQueueMessage: scan request message + """ + arg_input = list(scan_info.get("arg_input", {}).values()) + + arg_bundle_size = scan_info.get("arg_bundle_size", {}) + bundle_size = arg_bundle_size.get("bundle") + if len(arg_input) > 0: + if len(args) % len(arg_input) != 0: + raise TypeError( + f"{scan_info.get('doc')}\n {scan_name} takes multiples of" + f" {len(arg_input)} arguments ({len(args)} given)." + ) + if not all(req_kwarg in kwargs for req_kwarg in scan_info.get("required_kwargs")): + raise TypeError( + f"{scan_info.get('doc')}\n Not all required keyword arguments have been" + f" specified. The required arguments are: {scan_info.get('required_kwargs')}" + ) + # check that all specified devices in args are different objects + for arg in args: + if not isinstance(arg, DeviceBase): + continue + if args.count(arg) > 1: + raise TypeError( + f"{scan_info.get('doc')}\n All specified devices must be different" + f" objects." + ) + + # check that all arguments are of the correct type + for ii, arg in enumerate(args): + if not isinstance(arg, Scans.get_arg_type(arg_input[ii % len(arg_input)])): + raise TypeError( + f"{scan_info.get('doc')}\n Argument {ii} must be of type" + f" {arg_input[ii%len(arg_input)]}, not {type(arg).__name__}." + ) + + metadata = {} + metadata.update(kwargs["system_config"]) + metadata["user_metadata"] = kwargs.pop("user_metadata", {}) + + params = {"args": Scans._parameter_bundler(args, bundle_size), "kwargs": kwargs} + # check the number of arg bundles against the number of required bundles + if bundle_size: + num_bundles = len(params["args"]) + min_bundles = arg_bundle_size.get("min") + max_bundles = arg_bundle_size.get("max") + if min_bundles and num_bundles < min_bundles: + raise TypeError( + f"{scan_info.get('doc')}\n {scan_name} requires at least {min_bundles} bundles" + f" of arguments ({num_bundles} given)." + ) + if max_bundles and num_bundles > max_bundles: + raise TypeError( + f"{scan_info.get('doc')}\n {scan_name} requires at most {max_bundles} bundles" + f" of arguments ({num_bundles} given)." + ) + return messages.ScanQueueMessage( + scan_type=scan_name, parameter=params, queue="primary", metadata=metadata + ) + + @staticmethod + def _parameter_bundler(args, bundle_size): + """ + + Args: + args: + bundle_size: number of parameters per bundle + + Returns: + + """ + if not bundle_size: + return tuple(cmd.name if hasattr(cmd, "name") else cmd for cmd in args) + params = {} + for cmds in partition(bundle_size, args): + cmds_serialized = [cmd.name if hasattr(cmd, "name") else cmd for cmd in cmds] + params[cmds_serialized[0]] = cmds_serialized[1:] + return params + + @property + def scan_group(self): + """Context manager / decorator for defining scan groups""" + return self._scan_group_ctx + + @property + def scan_def(self): + """Context manager / decorator for defining new scans""" + return self._scan_def_ctx + + @property + def hide_report(self): + """Context manager / decorator for hiding the report""" + return self._hide_report_ctx + + @property + def dataset_id_on_hold(self): + """Context manager / decorator for setting the dataset id on hold""" + return self._dataset_id_on_hold_ctx + + def scan_export(self, output_file: str): + """Context manager / decorator for exporting scans""" + return ScanExport(output_file) + + +class ScanGroup(ContextDecorator): + """ScanGroup is a ContextDecorator for defining a scan group""" + + def __init__(self, parent: Scans = None) -> None: + super().__init__() + self.parent = parent + + def __enter__(self): + group_id = str(uuid.uuid4()) + self.parent._scan_group = group_id + return self + + def __exit__(self, *exc): + self.parent.close_scan_group() + self.parent._scan_group = None + + +class ScanDef(ContextDecorator): + """ScanDef is a ContextDecorator for defining a new scan""" + + def __init__(self, parent: Scans = None) -> None: + super().__init__() + self.parent = parent + + def __enter__(self): + if self.parent._scan_def_id is not None: + raise ScanAbortion("Nested scan definitions currently not supported.") + scan_def_id = str(uuid.uuid4()) + self.parent._scan_def_id = scan_def_id + self.parent.open_scan_def() + return self + + def __exit__(self, *exc): + if exc[0] is None: + self.parent.close_scan_def() + self.parent._scan_def_id = None + + +class HideReport(ContextDecorator): + """HideReport is a ContextDecorator for hiding the report""" + + def __init__(self, parent: Scans = None) -> None: + super().__init__() + self.parent = parent + + def __enter__(self): + if self.parent._hide_report is None: + self.parent._hide_report = True + return self + + def __exit__(self, *exc): + self.parent._hide_report = None + + +class DatasetIdOnHold(ContextDecorator): + """DatasetIdOnHold is a ContextDecorator for setting the dataset id on hold""" + + def __init__(self, parent: Scans = None) -> None: + super().__init__() + self.parent = parent + self._call_count = 0 + + def __enter__(self): + self._call_count += 1 + if self.parent._dataset_id_on_hold is None: + self.parent._dataset_id_on_hold = True + return self + + def __exit__(self, *exc): + self._call_count -= 1 + if self._call_count: + return + self.parent._dataset_id_on_hold = None + queue = self.parent.parent.queue + queue.next_dataset_number += 1 + + +class FileWriter: + @typechecked + def __init__(self, file_suffix: str = None, file_directory: str = None) -> None: + """Context manager for updating metadata + + Args: + fw_config (dict): Dictionary with metadata for the filewriter, can only have keys "file_suffix" and "file_directory" + """ + self.client = self._get_client() + self.system_config = self.client.system_config + self._orig_system_config = None + self._orig_metadata = None + self.file_suffix = file_suffix + self.file_directory = file_directory + + def _get_client(self): + """Get BEC client""" + return builtins.__dict__["bec"] + + def __enter__(self): + """Enter the context manager""" + self._orig_metadata = deepcopy(self.client.metadata) + self._orig_system_config = self.system_config.model_copy(deep=True) + self.system_config.file_suffix = self.file_suffix + self.system_config.file_directory = self.file_directory + return self + + def __exit__(self, *exc): + """Exit the context manager""" + self.client.metadata = self._orig_metadata + self.system_config.file_suffix = self._orig_system_config.file_suffix + self.system_config.file_directory = self._orig_system_config.file_directory + + +class Metadata: + @typechecked + def __init__(self, metadata: dict) -> None: + """Context manager for updating metadata + + Args: + metadata (dict): Metadata dictionary + """ + self.client = self._get_client() + self._metadata = metadata + self._orig_metadata = None + + def _get_client(self): + """Get BEC client""" + return builtins.__dict__["bec"] + + def __enter__(self): + """Enter the context manager""" + self._orig_metadata = deepcopy(self.client.metadata) + self.client.metadata.update(self._metadata) + return self + + def __exit__(self, *exc): + """Exit the context manager""" + self.client.metadata = self._orig_metadata + + +class ScanExport: + def __init__(self, output_file: str) -> None: + """Context manager for exporting scans + + Args: + output_file (str): Output file name + """ + self.output_file = output_file + self.client = None + self.scans = None + + def _check_abort_on_ctrl_c(self): + """Check if scan should be aborted on Ctrl-C""" + # pylint: disable=protected-access + if not self.client._service_config.abort_on_ctrl_c: + raise RuntimeError( + "ScanExport context manager can only be used if abort_on_ctrl_c is set to True" + ) + + def _get_client(self): + return builtins.__dict__["bec"] + + def __enter__(self): + self.scans = [] + self.client = self._get_client() + self.client.scans._scan_export = self + self._check_abort_on_ctrl_c() + return self + + def _export_to_csv(self): + scan_to_csv(self.scans, self.output_file) + + def __exit__(self, *exc): + try: + for scan in self.scans: + scan.wait() + finally: + try: + self._export_to_csv() + self.scans = None + except Exception as exc: + logger.warning(f"Could not export scans to csv file, due to exception {exc}") diff --git a/phoenix_bec/local_scripts/Documentation/Base_Classes/BAse_CLASS_ScanBase.txt b/phoenix_bec/local_scripts/Documentation/Base_Classes/BAse_CLASS_ScanBase.txt new file mode 100644 index 0000000..b34195f --- /dev/null +++ b/phoenix_bec/local_scripts/Documentation/Base_Classes/BAse_CLASS_ScanBase.txt @@ -0,0 +1,2061 @@ +https://bec.readthedocs.io/en/latest/api_reference/_autosummary/bec_server.scan_server.scans.ScanBase.html#bec_server.scan_server.scans.ScanBase + + +Sequence of events: + + + read_scan_motors + + prepare_positions + + _calculate_positions + + _optimize_trajectory + + _set_position_offset + + _check_limits + + open_scan + + stage + + run_baseline_reading + + pre_scan + + scan_core + + finalize + + unstage + + cleanup + + + + + + +class ScanBase(*args, device_manager: DeviceManagerBase +| None = None, parameter: dict \ +| None = None, exp_time: float = 0, readout_time: float = 0, acquisition_config: dict +| None = None, settling_time: float = 0, relative: bool = False, burst_at_each_point: int = 1, +frames_per_trigger: int = 1, optim_trajectory: Literal['corridor', None] +| None = None, monitored: list | None = None, metadata: dict | None = None, **kwargs) + +Key functions in ScanBase during step by step scanning are + + +# This walks through all positions and calls _at_each_point() + + def scan_core(self): + """perform the scan core procedure""" + for ind, pos in self._get_position(): + for self.burst_index in range(self.burst_at_each_point): + yield from self._at_each_point(ind, pos) + self.burst_index = 0 + + + + def _at_each_point(self, ind=None, pos=None): + yield from self._move_scan_motors_and_wait(pos) -------- > wait until all motors have arrived + if ind > 0: ----------> now wait for primary group + yield from self.stubs.wait( + wait_type="read", group="primary", wait_group="readout_primary" + ) + time.sleep(self.settling_time) + yield from self.stubs.trigger(group="trigger", point_id=self.point_id) ------ trigger after settling time + yield from self.stubs.wait(wait_type="trigger", group="trigger", wait_time=self.exp_time) ---- wait for trigger finish + yield from self.stubs.read( + group="primary", wait_group="readout_primary", point_id=self.point_id + ) + yield from self.stubs.wait( + wait_type="read", group="scan_motor", wait_group="readout_primary" + ) + + self.point_id += 1 + + + + +Methods + + + + + + + + + + + + + + + + + + + +sourece code in bec_server.scan_server.scans + + + +from __future__ import annotations + +import ast +import enum +import threading +import time +import uuid +from abc import ABC, abstractmethod +from typing import Any, Literal + +import numpy as np + +from bec_lib.device import DeviceBase +from bec_lib.devicemanager import DeviceManagerBase +from bec_lib.endpoints import MessageEndpoints +from bec_lib.logger import bec_logger + +from .errors import LimitError, ScanAbortion +from .path_optimization import PathOptimizerMixin +from .scan_stubs import ScanStubs + +logger = bec_logger.logger + + + +[docs] +class ScanArgType(str, enum.Enum): + DEVICE = "device" + FLOAT = "float" + INT = "int" + BOOL = "boolean" + STR = "str" + LIST = "list" + DICT = "dict" + + + + +[docs] +def unpack_scan_args(scan_args: dict[str, Any]) -> list: + """unpack_scan_args unpacks the scan arguments and returns them as a tuple. + + Args: + scan_args (dict[str, Any]): scan arguments + + Returns: + list: list of arguments + """ + args = [] + if not scan_args: + return args + if not isinstance(scan_args, dict): + return scan_args + for cmd_name, cmd_args in scan_args.items(): + args.append(cmd_name) + args.extend(cmd_args) + return args + + + + +[docs] +def get_2D_raster_pos(axis, snaked=True): + """get_2D_raster_post calculates and returns the positions for a 2D + + snaked==True: + ->->->->- + -<-<-<-<- + ->->->->- + snaked==False: + ->->->->- + ->->->->- + ->->->->- + + Args: + axis (list): list of positions for each axis + snaked (bool, optional): If true, the positions will be calculcated for a snake scan. Defaults to True. + + Returns: + array: calculated positions + """ + + x_grid, y_grid = np.meshgrid(axis[0], axis[1]) + if snaked: + y_grid.T[::2] = np.fliplr(y_grid.T[::2]) + x_flat = x_grid.T.ravel() + y_flat = y_grid.T.ravel() + positions = np.vstack((x_flat, y_flat)).T + return positions + + + +# pylint: disable=too-many-arguments + +[docs] +def get_fermat_spiral_pos( + m1_start, m1_stop, m2_start, m2_stop, step=1, spiral_type=0, center=False +): + """get_fermat_spiral_pos calculates and returns the positions for a Fermat spiral scan. + + Args: + m1_start (float): start position motor 1 + m1_stop (float): end position motor 1 + m2_start (float): start position motor 2 + m2_stop (float): end position motor 2 + step (float, optional): Step size. Defaults to 1. + spiral_type (float, optional): Angular offset in radians that determines the shape of the spiral. + A spiral with spiral_type=2 is the same as spiral_type=0. Defaults to 0. + center (bool, optional): Add a center point. Defaults to False. + + Returns: + array: calculated positions in the form [[m1, m2], ...] + """ + positions = [] + phi = 2 * np.pi * ((1 + np.sqrt(5)) / 2.0) + spiral_type * np.pi + + start = int(not center) + + length_axis1 = abs(m1_stop - m1_start) + length_axis2 = abs(m2_stop - m2_start) + n_max = int(length_axis1 * length_axis2 * 3.2 / step / step) + + for ii in range(start, n_max): + radius = step * 0.57 * np.sqrt(ii) + if abs(radius * np.sin(ii * phi)) > length_axis1 / 2: + continue + if abs(radius * np.cos(ii * phi)) > length_axis2 / 2: + continue + positions.extend([(radius * np.sin(ii * phi), radius * np.cos(ii * phi))]) + return np.array(positions) + + + + +[docs] +def get_round_roi_scan_positions(lx: float, ly: float, dr: float, nth: int, cenx=0, ceny=0): + """ + get_round_roi_scan_positions calculates and returns the positions for a round scan in a rectangular region of interest. + + Args: + lx (float): length in x + ly (float): length in y + dr (float): step size + nth (int): number of angles in the inner ring + cenx (int, optional): center in x. Defaults to 0. + ceny (int, optional): center in y. Defaults to 0. + + Returns: + array: calculated positions in the form [[x, y], ...] + """ + positions = [] + nr = 1 + int(np.floor(max([lx, ly]) / dr)) + for ir in range(1, nr + 2): + rr = ir * dr + dth = 2 * np.pi / (nth * ir) + pos = [ + (rr * np.cos(ith * dth) + cenx, rr * np.sin(ith * dth) + ceny) + for ith in range(nth * ir) + if np.abs(rr * np.cos(ith * dth)) < lx / 2 and np.abs(rr * np.sin(ith * dth)) < ly / 2 + ] + positions.extend(pos) + return np.array(positions) + + + + +[docs] +def get_round_scan_positions(r_in: float, r_out: float, nr: int, nth: int, cenx=0, ceny=0): + """ + get_round_scan_positions calculates and returns the positions for a round scan. + + Args: + r_in (float): inner radius + r_out (float): outer radius + nr (int): number of radii + nth (int): number of angles in the inner ring + cenx (int, optional): center in x. Defaults to 0. + ceny (int, optional): center in y. Defaults to 0. + + Returns: + array: calculated positions in the form [[x, y], ...] + + """ + positions = [] + dr = (r_in - r_out) / nr + for ir in range(1, nr + 2): + rr = r_in + ir * dr + dth = 2 * np.pi / (nth * ir) + positions.extend( + [ + (rr * np.sin(ith * dth) + cenx, rr * np.cos(ith * dth) + ceny) + for ith in range(nth * ir) + ] + ) + return np.array(positions, dtype=float) + + + + +[docs] +class RequestBase(ABC): + """ + Base class for all scan requests. + """ + + scan_name = "" + arg_input = {} + arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None} + gui_args = {} + required_kwargs = [] + return_to_start_after_abort = False + use_scan_progress_report = False + + def __init__( + self, + *args, + device_manager: DeviceManagerBase = None, + monitored: list = None, + parameter: dict = None, + metadata: dict = None, + **kwargs, + ) -> None: + super().__init__() + self._shutdown_event = threading.Event() + self.parameter = parameter if parameter is not None else {} + self.caller_args = self.parameter.get("args", {}) + self.caller_kwargs = self.parameter.get("kwargs", {}) + self.metadata = metadata + self.device_manager = device_manager + self.connector = device_manager.connector + self.DIID = 0 + self.scan_motors = [] + self.positions = [] + self._pre_scan_macros = [] + self._scan_report_devices = None + self._get_scan_motors() + self.readout_priority = { + "monitored": monitored if monitored is not None else [], + "baseline": [], + "on_request": [], + "async": [], + } + self.update_readout_priority() + if metadata is None: + self.metadata = {} + self.stubs = ScanStubs( + connector=self.device_manager.connector, + device_msg_callback=self.device_msg_metadata, + shutdown_event=self._shutdown_event, + ) + + @property + def scan_report_devices(self): + """devices to be included in the scan report""" + if self._scan_report_devices is None: + return self.readout_priority["monitored"] + return self._scan_report_devices + + @scan_report_devices.setter + def scan_report_devices(self, devices: list): + self._scan_report_devices = devices + + def device_msg_metadata(self): + default_metadata = {"readout_priority": "monitored", "DIID": self.DIID} + metadata = {**default_metadata, **self.metadata} + self.DIID += 1 + return metadata + + @staticmethod + def _get_func_name_from_macro(macro: str): + return ast.parse(macro).body[0].name + + +[docs] + def run_pre_scan_macros(self): + """run pre scan macros if any""" + macros = self.device_manager.connector.lrange(MessageEndpoints.pre_scan_macros(), 0, -1) + for macro in macros: + macro = macro.value.strip() + func_name = self._get_func_name_from_macro(macro) + exec(macro) + eval(func_name)(self.device_manager.devices, self) + + + def initialize(self): + self.run_pre_scan_macros() + + def _check_limits(self): + logger.debug("check limits") + for ii, dev in enumerate(self.scan_motors): + low_limit, high_limit = self.device_manager.devices[dev].limits + if low_limit >= high_limit: + # if both limits are equal or low > high, no restrictions ought to be applied + return + for pos in self.positions: + pos_axis = pos[ii] + if not low_limit <= pos_axis <= high_limit: + raise LimitError( + f"Target position {pos} for motor {dev} is outside of range: [{low_limit}," + f" {high_limit}]" + ) + + def _get_scan_motors(self): + if len(self.caller_args) == 0: + return + if self.arg_bundle_size.get("bundle"): + self.scan_motors = list(self.caller_args.keys()) + return + for motor in self.caller_args: + if motor not in self.device_manager.devices: + continue + self.scan_motors.append(motor) + + +[docs] + def update_readout_priority(self): + """update the readout priority for this request. Typically the monitored devices should also include the scan motors.""" + self.readout_priority["monitored"].extend(self.scan_motors) + self.readout_priority["monitored"] = list( + sorted( + set(self.readout_priority["monitored"]), + key=self.readout_priority["monitored"].index, + ) + ) + + + @abstractmethod + def run(self): + pass + + + + +[docs] +class ScanBase(RequestBase, PathOptimizerMixin): + """ + Base class for all scans. The following methods are called in the following order during the scan + 1. initialize + - run_pre_scan_macros + 2. read_scan_motors + 3. prepare_positions + - _calculate_positions + - _optimize_trajectory + - _set_position_offset + - _check_limits + 4. open_scan + 5. stage + 6. run_baseline_reading + 7. pre_scan + 8. scan_core + 9. finalize + 10. unstage + 11. cleanup + + A subclass of ScanBase must implement the following methods: + - _calculate_positions + + Attributes: + scan_name (str): name of the scan + scan_type (str): scan type. Can be "step" or "fly" + arg_input (list): list of scan argument types + arg_bundle_size (dict): + - bundle: number of arguments that are bundled together + - min: minimum number of bundles + - max: maximum number of bundles + required_kwargs (list): list of required kwargs + return_to_start_after_abort (bool): if True, the scan will return to the start position after an abort + """ + + scan_name = "" + scan_type = "step" + required_kwargs = ["required"] + return_to_start_after_abort = True + use_scan_progress_report = True + + # perform pre-move action before the pre_scan trigger is sent + pre_move = True + + def __init__( + self, + *args, + device_manager: DeviceManagerBase = None, + parameter: dict = None, + exp_time: float = 0, + readout_time: float = 0, + acquisition_config: dict = None, + settling_time: float = 0, + relative: bool = False, + burst_at_each_point: int = 1, + frames_per_trigger: int = 1, + optim_trajectory: Literal["corridor", None] = None, + monitored: list = None, + metadata: dict = None, + **kwargs, + ): + super().__init__( + *args, + device_manager=device_manager, + monitored=monitored, + parameter=parameter, + metadata=metadata, + **kwargs, + ) + self.DIID = 0 + self.point_id = 0 + self.exp_time = exp_time + self.readout_time = readout_time + self.acquisition_config = acquisition_config + self.settling_time = settling_time + self.relative = relative + self.burst_at_each_point = burst_at_each_point + self.frames_per_trigger = frames_per_trigger + self.optim_trajectory = optim_trajectory + self.burst_index = 0 + + self.start_pos = [] + self.positions = [] + self.num_pos = 0 + + if self.scan_name == "": + raise ValueError("scan_name cannot be empty") + + if acquisition_config is None or "default" not in acquisition_config: + self.acquisition_config = { + "default": {"exp_time": self.exp_time, "readout_time": self.readout_time} + } + + @property + def monitor_sync(self): + """ + monitor_sync is a property that defines how monitored devices are synchronized. + It can be either bec or the name of the device. If set to bec, the scan bundler + will synchronize scan segments based on the bec triggered readouts. If set to a device name, + the scan bundler will synchronize based on the readouts of the device, i.e. upon + receiving a new readout of the device, cached monitored readings will be added + to the scan segment. + """ + return "bec" + + +[docs] + def read_scan_motors(self): + """read the scan motors""" + yield from self.stubs.read_and_wait(device=self.scan_motors, wait_group="scan_motor") + + + @abstractmethod + def _calculate_positions(self) -> None: + """Calculate the positions""" + + def _optimize_trajectory(self): + if not self.optim_trajectory: + return + if self.optim_trajectory == "corridor": + self.positions = self.optimize_corridor(self.positions) + return + return + + +[docs] + def prepare_positions(self): + """prepare the positions for the scan""" + self._calculate_positions() + self._optimize_trajectory() + self.num_pos = len(self.positions) * self.burst_at_each_point + yield from self._set_position_offset() + self._check_limits() + + + +[docs] + def open_scan(self): + """open the scan""" + positions = self.positions if isinstance(self.positions, list) else self.positions.tolist() + yield from self.stubs.open_scan( + scan_motors=self.scan_motors, + readout_priority=self.readout_priority, + num_pos=self.num_pos, + positions=positions, + scan_name=self.scan_name, + scan_type=self.scan_type, + ) + + + +[docs] + def stage(self): + """call the stage procedure""" + yield from self.stubs.stage() + + + +[docs] + def run_baseline_reading(self): + """perform a reading of all baseline devices""" + yield from self.stubs.baseline_reading() + + + def _set_position_offset(self): + for dev in self.scan_motors: + val = yield from self.stubs.send_rpc_and_wait(dev, "read") + self.start_pos.append(val[dev].get("value")) + if self.relative: + self.positions += self.start_pos + + +[docs] + def close_scan(self): + """close the scan""" + yield from self.stubs.close_scan() + + + +[docs] + def scan_core(self): + """perform the scan core procedure""" + for ind, pos in self._get_position(): + for self.burst_index in range(self.burst_at_each_point): + yield from self._at_each_point(ind, pos) + self.burst_index = 0 + + + +[docs] + def return_to_start(self): + """return to the start position""" + yield from self._move_scan_motors_and_wait(self.start_pos) + + + +[docs] + def finalize(self): + """finalize the scan""" + yield from self.return_to_start() + yield from self.stubs.wait(wait_type="read", group="primary", wait_group="readout_primary") + yield from self.stubs.complete(device=None) + + + +[docs] + def unstage(self): + """call the unstage procedure""" + yield from self.stubs.unstage() + + + +[docs] + def cleanup(self): + """call the cleanup procedure""" + yield from self.close_scan() + + + def _at_each_point(self, ind=None, pos=None): + yield from self._move_scan_motors_and_wait(pos) + if ind > 0: + yield from self.stubs.wait( + wait_type="read", group="primary", wait_group="readout_primary" + ) + time.sleep(self.settling_time) + yield from self.stubs.trigger(group="trigger", point_id=self.point_id) + yield from self.stubs.wait(wait_type="trigger", group="trigger", wait_time=self.exp_time) + yield from self.stubs.read( + group="primary", wait_group="readout_primary", point_id=self.point_id + ) + yield from self.stubs.wait( + wait_type="read", group="scan_motor", wait_group="readout_primary" + ) + + self.point_id += 1 + + def _move_scan_motors_and_wait(self, pos): + if not isinstance(pos, list) and not isinstance(pos, np.ndarray): + pos = [pos] + if len(pos) == 0: + return + for ind, val in enumerate(self.scan_motors): + yield from self.stubs.set(device=val, value=pos[ind], wait_group="scan_motor") + + yield from self.stubs.wait(wait_type="move", group="scan_motor", wait_group="scan_motor") + + def _get_position(self): + for ind, pos in enumerate(self.positions): + yield (ind, pos) + + def scan_report_instructions(self): + yield None + + +[docs] + def pre_scan(self): + """ + pre scan procedure. This method is called before the scan_core method and can be used to + perform additional tasks before the scan is started. This + """ + if self.pre_move and len(self.positions) > 0: + yield from self._move_scan_motors_and_wait(self.positions[0]) + yield from self.stubs.pre_scan() + + + +[docs] + def run(self): + """run the scan. This method is called by the scan server and is the main entry point for the scan.""" + self.initialize() + yield from self.read_scan_motors() + yield from self.prepare_positions() + yield from self.scan_report_instructions() + yield from self.open_scan() + yield from self.stage() + yield from self.run_baseline_reading() + yield from self.pre_scan() + yield from self.scan_core() + yield from self.finalize() + yield from self.unstage() + yield from self.cleanup() + + + @classmethod + def scan(cls, *args, **kwargs): + scan = cls(args, **kwargs) + yield from scan.run() + + + + +[docs] +class SyncFlyScanBase(ScanBase, ABC): + """ + Fly scan base class for all synchronous fly scans. A synchronous fly scan is a scan where the flyer is + synced with the monitored devices. + Classes inheriting from SyncFlyScanBase must at least implement the scan_core method and the monitor_sync property. + """ + + scan_type = "fly" + pre_move = False + + def _get_scan_motors(self): + # fly scans normally do not have stepper scan motors so + # the default way of retrieving scan motors is not applicable + return [] + + @property + @abstractmethod + def monitor_sync(self) -> str: + """ + monitor_sync is the flyer that will be used to synchronize the monitor readings in the scan bundler. + The return value should be the name of the flyer device. + """ + + def _calculate_positions(self) -> None: + pass + + +[docs] + def read_scan_motors(self): + yield None + + + +[docs] + def prepare_positions(self): + yield None + + + +[docs] + @abstractmethod + def scan_core(self): + """perform the scan core procedure""" + + + ############################################ + # Example of how to kickoff and wait for a flyer: + ############################################ + + # yield from self.stubs.kickoff(device=self.flyer, parameter=self.caller_kwargs) + # yield from self.stubs.complete(device=self.flyer) + # target_diid = self.DIID - 1 + + # while True: + # status = self.stubs.get_req_status( + # device=self.flyer, RID=self.metadata["RID"], DIID=target_diid + # ) + # progress = self.stubs.get_device_progress( + # device=self.flyer, RID=self.metadata["RID"] + # ) + # if progress: + # self.num_pos = progress + # if status: + # break + # time.sleep(1) + + # def _get_flyer_status(self) -> list: + # connector = self.device_manager.connector + + # pipe = connector.pipeline() + # connector.lrange( + # MessageEndpoints.device_req_status_container(self.metadata["RID"]), 0, -1, pipe + # ) + # connector.get(MessageEndpoints.device_readback(self.flyer), pipe) + # return connector.execute_pipeline(pipe) + +g + +[docs] +class AsyncFlyScanBase(SyncFlyScanBase): + """ + Fly scan base class for all asynchronous fly scans. An asynchronous fly scan is a scan where the flyer is + not synced with the monitored devices. + Classes inheriting from AsyncFlyScanBase must at least implement the scan_core method. + """ + + @property + def monitor_sync(self): + return "bec" + + + + +[docs] +class ScanStub(RequestBase): + pass + + + + +[docs] +class OpenScanDef(ScanStub): + scan_name = "open_scan_def" + + def run(self): + yield from self.stubs.open_scan_def() + + + + +[docs] +class CloseScanDef(ScanStub): + scan_name = "close_scan_def" + + def run(self): + yield from self.stubs.close_scan_def() + + + + +[docs] +class CloseScanGroup(ScanStub): + scan_name = "close_scan_group" + + def run(self): + yield from self.stubs.close_scan_group() + + + + +[docs] +class DeviceRPC(ScanStub): + scan_name = "device_rpc" + arg_input = { + "device": ScanArgType.DEVICE, + "func": ScanArgType.STR, + "args": ScanArgType.LIST, + "kwargs": ScanArgType.DICT, + } + arg_bundle_size = {"bundle": len(arg_input), "min": 1, "max": 1} + + def _get_scan_motors(self): + pass + + def run(self): + # different to calling self.device_rpc, this procedure will not wait for a reply and therefore not check any errors. + yield from self.stubs.rpc(device=self.parameter.get("device"), parameter=self.parameter) + + + + +[docs] +class Move(RequestBase): + scan_name = "mv" + arg_input = {"device": ScanArgType.DEVICE, "target": ScanArgType.FLOAT} + arg_bundle_size = {"bundle": len(arg_input), "min": 1, "max": None} + required_kwargs = ["relative"] + + def __init__(self, *args, relative=False, **kwargs): + """ + Move device(s) to an absolute position + Args: + *args (Device, float): pairs of device / position arguments + relative (bool): if True, move relative to current position + + Returns: + ScanReport + + Examples: + >>> scans.mv(dev.samx, 1, dev.samy,2) + """ + super().__init__(**kwargs) + self.relative = relative + self.start_pos = [] + + def _calculate_positions(self): + self.positions = np.asarray([[val[0] for val in self.caller_args.values()]], dtype=float) + + def _at_each_point(self, pos=None): + for ii, motor in enumerate(self.scan_motors): + yield from self.stubs.set( + device=motor, + value=self.positions[0][ii], + wait_group="scan_motor", + metadata={"response": True}, + ) + + def cleanup(self): + pass + + def _set_position_offset(self): + self.start_pos = [] + for dev in self.scan_motors: + val = yield from self.stubs.send_rpc_and_wait(dev, "read") + self.start_pos.append(val[dev].get("value")) + if not self.relative: + return + self.positions += self.start_pos + + def prepare_positions(self): + self._calculate_positions() + yield from self._set_position_offset() + self._check_limits() + + def scan_report_instructions(self): + yield None + + def run(self): + self.initialize() + yield from self.prepare_positions() + yield from self.scan_report_instructions() + yield from self._at_each_point() + + + + +[docs] +class UpdatedMove(Move): + """ + Move device(s) to an absolute position and show live updates. This is a blocking call. For non-blocking use Move. + Args: + *args (Device, float): pairs of device / position arguments + relative (bool): if True, move relative to current position + + Returns: + ScanReport + + Examples: + >>> scans.umv(dev.samx, 1, dev.samy,2) + """ + + scan_name = "umv" + + def _at_each_point(self, pos=None): + for ii, motor in enumerate(self.scan_motors): + yield from self.stubs.set( + device=motor, value=self.positions[0][ii], wait_group="scan_motor" + ) + + for motor in self.scan_motors: + yield from self.stubs.wait(wait_type="move", device=motor, wait_group="scan_motor") + + def scan_report_instructions(self): + yield from self.stubs.scan_report_instruction( + { + "readback": { + "RID": self.metadata["RID"], + "devices": self.scan_motors, + "start": self.start_pos, + "end": self.positions[0], + } + } + ) + + + + +[docs] +class Scan(ScanBase): + scan_name = "grid_scan" + arg_input = { + "device": ScanArgType.DEVICE, + "start": ScanArgType.FLOAT, + "stop": ScanArgType.FLOAT, + "steps": ScanArgType.INT, + } + arg_bundle_size = {"bundle": len(arg_input), "min": 2, "max": None} + required_kwargs = ["relative"] + gui_config = { + "Scan Parameters": ["exp_time", "settling_time", "burst_at_each_point", "relative"] + } + + def __init__( + self, + *args, + exp_time: float = 0, + settling_time: float = 0, + relative: bool = False, + burst_at_each_point: int = 1, + **kwargs, + ): + """ + Scan two or more motors in a grid. + + Args: + *args (Device, float, float, int): pairs of device / start / stop / steps arguments + exp_time (float): exposure time in seconds. Default is 0. + settling_time (float): settling time in seconds. Default is 0. + relative (bool): if True, the motors will be moved relative to their current position. Default is False. + burst_at_each_point (int): number of exposures at each point. Default is 1. + + Returns: + ScanReport + + Examples: + >>> scans.grid_scan(dev.motor1, -5, 5, 10, dev.motor2, -5, 5, 10, exp_time=0.1, relative=True) + + """ + super().__init__( + exp_time=exp_time, + settling_time=settling_time, + relative=relative, + burst_at_each_point=burst_at_each_point, + **kwargs, + ) + + def _calculate_positions(self): + axis = [] + for _, val in self.caller_args.items(): + axis.append(np.linspace(val[0], val[1], val[2], dtype=float)) + if len(axis) > 1: + self.positions = get_2D_raster_pos(axis) + else: + self.positions = np.vstack(tuple(axis)).T + + + + +[docs] +class FermatSpiralScan(ScanBase): + scan_name = "fermat_scan" + required_kwargs = ["step", "relative"] + gui_config = { + "Device 1": ["motor1", "start_motor1", "stop_motor1"], + "Device 2": ["motor2", "start_motor2", "stop_motor2"], + "Movement Parameters": ["step", "relative"], + "Acquisition Parameters": ["exp_time", "settling_time", "burst_at_each_point"], + } + + def __init__( + self, + motor1: DeviceBase, + start_motor1: float, + stop_motor1: float, + motor2: DeviceBase, + start_motor2: float, + stop_motor2: float, + step: float = 0.1, + exp_time: float = 0, + settling_time: float = 0, + relative: bool = False, + burst_at_each_point: int = 1, + spiral_type: float = 0, + optim_trajectory: Literal["corridor", None] = None, + **kwargs, + ): + """ + A scan following Fermat's spiral. + + Args: + motor1 (DeviceBase): first motor + start_motor1 (float): start position motor 1 + stop_motor1 (float): end position motor 1 + motor2 (DeviceBase): second motor + start_motor2 (float): start position motor 2 + stop_motor2 (float): end position motor 2 + step (float): step size in motor units. Default is 0.1. + exp_time (float): exposure time in seconds. Default is 0. + settling_time (float): settling time in seconds. Default is 0. + relative (bool): if True, the motors will be moved relative to their current position. Default is False. + burst_at_each_point (int): number of exposures at each point. Default is 1. + spiral_type (float): type of spiral to use. Default is 0. + optim_trajectory (str): trajectory optimization method. Default is None. Options are "corridor" and "none". + + Returns: + ScanReport + + Examples: + >>> scans.fermat_scan(dev.motor1, -5, 5, dev.motor2, -5, 5, step=0.5, exp_time=0.1, relative=True, optim_trajectory="corridor") + + """ + super().__init__( + exp_time=exp_time, + settling_time=settling_time, + relative=relative, + burst_at_each_point=burst_at_each_point, + optim_trajectory=optim_trajectory, + **kwargs, + ) + self.motor1 = motor1 + self.motor2 = motor2 + self.start_motor1 = start_motor1 + self.stop_motor1 = stop_motor1 + self.start_motor2 = start_motor2 + self.stop_motor2 = stop_motor2 + self.step = step + self.spiral_type = spiral_type + + def _calculate_positions(self): + self.positions = get_fermat_spiral_pos( + self.start_motor1, + self.stop_motor1, + self.start_motor2, + self.stop_motor2, + step=self.step, + spiral_type=self.spiral_type, + center=False, + ) + + + + +[docs] +class RoundScan(ScanBase): + scan_name = "round_scan" + required_kwargs = ["relative"] + gui_config = { + "Motors": ["motor_1", "motor_2"], + "Ring Parameters": ["inner_ring", "outer_ring", "number_of_rings", "pos_in_first_ring"], + "Scan Parameters": ["relative", "burst_at_each_point"], + } + + def __init__( + self, + motor_1: DeviceBase, + motor_2: DeviceBase, + inner_ring: float, + outer_ring: float, + number_of_rings: int, + pos_in_first_ring: int, + relative: bool = False, + burst_at_each_point: int = 1, + **kwargs, + ): + """ + A scan following a round shell-like pattern. + + Args: + motor_1 (DeviceBase): first motor + motor_2 (DeviceBase): second motor + inner_ring (float): inner radius + outer_ring (float): outer radius + number_of_rings (int): number of rings + pos_in_first_ring (int): number of positions in the first ring + relative (bool): if True, the motors will be moved relative to their current position. Default is False. + burst_at_each_point (int): number of exposures at each point. Default is 1. + + Returns: + ScanReport + + Examples: + >>> scans.round_scan(dev.motor1, dev.motor2, 0, 25, 5, 3, exp_time=0.1, relative=True) + + """ + super().__init__(relative=relative, burst_at_each_point=burst_at_each_point, **kwargs) + self.axis = [] + self.motor_1 = motor_1 + self.motor_2 = motor_2 + self.inner_ring = inner_ring + self.outer_ring = outer_ring + self.number_of_rings = number_of_rings + self.pos_in_first_ring = pos_in_first_ring + + def _get_scan_motors(self): + caller_args = list(self.caller_args.items())[0] + self.scan_motors = [caller_args[0], caller_args[1][0]] + + def _calculate_positions(self): + self.positions = get_round_scan_positions( + r_in=self.inner_ring, + r_out=self.outer_ring, + nr=self.number_of_rings, + nth=self.pos_in_first_ring, + ) + + + + +[docs] +class ContLineScan(ScanBase): + scan_name = "cont_line_scan" + required_kwargs = ["steps", "relative"] + scan_type = "step" + gui_config = { + "Device": ["device", "start", "stop"], + "Movement Parameters": ["steps", "relative", "offset", "atol"], + "Acquisition Parameters": ["exp_time", "burst_at_each_point"], + } + + def __init__( + self, + device: DeviceBase, + start: float, + stop: float, + offset: float = 1, + atol: float = 0.5, + exp_time: float = 0, + steps: int = 10, + relative: bool = False, + burst_at_each_point: int = 1, + **kwargs, + ): + """ + A continuous line scan. Use this scan if you want to move a motor continuously from start to stop position whilst + acquiring data at predefined positions. The scan will abort if the motor moves too fast and a point is skipped. + + Args: + device (DeviceBase): motor to move continuously from start to stop position + start (float): start position + stop (float): stop position + exp_time (float): exposure time in seconds. Default is 0. + steps (int): number of steps. Default is 10. + relative (bool): if True, the motors will be moved relative to their current position. Default is False. + burst_at_each_point (int): number of exposures at each point. Default is 1. + offset (float): offset in motor units. Default is 1. + atol (float): absolute tolerance for position check. Default is 0.5. + + Returns: + ScanReport + + Examples: + >>> scans.cont_line_scan(dev.motor1, -5, 5, steps=10, exp_time=0.1, relative=True) + + """ + super().__init__( + exp_time=exp_time, relative=relative, burst_at_each_point=burst_at_each_point, **kwargs + ) + self.steps = steps + self.device = device + self.offset = offset + self.start = start + self.stop = stop + self.atol = atol + self.motor_velocity = self.device_manager.devices[self.device].read()[ + f"{self.device}_velocity" + ]["value"] + + def _calculate_positions(self) -> None: + self.positions = np.linspace(self.start, self.stop, self.steps, dtype=float)[ + np.newaxis, : + ].T + # Check if the motor is moving faster than the exp_time + dist_setp = self.positions[1][0] - self.positions[0][0] + time_per_step = dist_setp / self.motor_velocity + if time_per_step < self.exp_time: + raise ScanAbortion( + f"Motor {self.device} is moving too fast. Time per step: {time_per_step:.03f} < Exp_time: {self.exp_time:.03f}." + + f" Consider reducing speed {self.motor_velocity} or reducing exp_time {self.exp_time}" + ) + + def _check_limits(self): + logger.debug("check limits") + low_limit, high_limit = self.device_manager.devices[self.device].limits + if low_limit >= high_limit: + # if both limits are equal or low > high, no restrictions ought to be applied + return + for ii, pos in enumerate(self.positions): + if ii == 0: + pos_axis = pos - self.offset + else: + pos_axis = pos + if not low_limit <= pos_axis <= high_limit: + raise LimitError( + f"Target position {pos} for motor {self.device} is outside of range: [{low_limit}," + f" {high_limit}]" + ) + + def _at_each_point(self, _pos=None): + yield from self.stubs.trigger(group="trigger", point_id=self.point_id) + yield from self.stubs.read(group="primary", wait_group="primary", point_id=self.point_id) + self.point_id += 1 + + +[docs] + def scan_core(self): + yield from self._move_scan_motors_and_wait(self.positions[0] - self.offset) + # send the slow motor on its way + yield from self.stubs.set( + device=self.scan_motors[0], value=self.positions[-1][0], wait_group="scan_motor" + ) + + while self.point_id < len(self.positions[:]): + cont_motor_positions = self.device_manager.devices[self.scan_motors[0]].readback.read() + + if not cont_motor_positions: + continue + + cont_motor_positions = cont_motor_positions[self.scan_motors[0]].get("value") + logger.debug(f"Current position of {self.scan_motors[0]}: {cont_motor_positions}") + # TODO: consider the alternative, which triggers a readout for each point right after the motor passed it + # if cont_motor_positions > self.positions[self.point_id][0]: + if np.isclose(cont_motor_positions, self.positions[self.point_id][0], atol=self.atol): + logger.debug(f"reading point {self.point_id}") + yield from self._at_each_point() + continue + if cont_motor_positions > self.positions[self.point_id][0]: + raise ScanAbortion( + f"Skipped point {self.point_id + 1}:" + f"Consider reducing speed {self.device_manager.devices[self.scan_motors[0]].velocity.get()}, " + f"increasing the atol {self.atol}, or increasing the offset {self.offset}" + ) + + + + + +[docs] +class ContLineFlyScan(AsyncFlyScanBase): + scan_name = "cont_line_fly_scan" + required_kwargs = [] + use_scan_progress_report = False + gui_config = {"Device": ["motor", "start", "stop"], "Scan Parameters": ["exp_time", "relative"]} + + def __init__( + self, + motor: DeviceBase, + start: float, + stop: float, + exp_time: float = 0, + relative: bool = False, + **kwargs, + ): + """ + A continuous line fly scan. Use this scan if you want to move a motor continuously from start to stop position whilst + acquiring data as fast as possible (respecting the exposure time). The scan will stop automatically when the motor + reaches the end position. + + Args: + motor (DeviceBase): motor to move continuously from start to stop position + start (float): start position + stop (float): stop position + exp_time (float): exposure time in seconds. Default is 0. + relative (bool): if True, the motor will be moved relative to its current position. Default is False. + + Returns: + ScanReport + + Examples: + >>> scans.cont_line_fly_scan(dev.sam_rot, 0, 180, exp_time=0.1) + + """ + super().__init__(relative=relative, exp_time=exp_time, **kwargs) + self.motor = motor + self.start = start + self.stop = stop + self.device_move_request_id = str(uuid.uuid4()) + + +[docs] + def prepare_positions(self): + self.positions = np.array([[self.start], [self.stop]], dtype=float) + self.num_pos = None + yield from self._set_position_offset() + + + def scan_report_instructions(self): + yield from self.stubs.scan_report_instruction( + { + "readback": { + "RID": self.device_move_request_id, + "devices": [self.motor], + "start": [self.start], + "end": [self.stop], + } + } + ) + + +[docs] + def scan_core(self): + # move the motor to the start position + yield from self.stubs.set_and_wait(device=[self.motor], positions=self.positions[0]) + + # start the flyer + flyer_request = yield from self.stubs.set_with_response( + device=self.motor, value=self.positions[1][0], request_id=self.device_move_request_id + ) + + while True: + yield from self.stubs.trigger(group="trigger", point_id=self.point_id) + yield from self.stubs.read_and_wait( + group="primary", wait_group="readout_primary", point_id=self.point_id + ) + yield from self.stubs.wait( + wait_type="trigger", group="trigger", wait_time=self.exp_time + ) + if self.stubs.request_is_completed(flyer_request): + break + self.point_id += 1 + + + +[docs] + def finalize(self): + yield from super().finalize() + self.num_pos = self.point_id + 1 + + + + + +[docs] +class RoundScanFlySim(SyncFlyScanBase): + scan_name = "round_scan_fly" + scan_type = "fly" + pre_move = False + required_kwargs = ["relative"] + gui_config = { + "Fly Parameters": ["flyer", "relative"], + "Ring Parameters": ["inner_ring", "outer_ring", "number_of_rings", "number_pos"], + } + + def __init__( + self, + flyer: DeviceBase, + inner_ring: float, + outer_ring: float, + number_of_rings: int, + number_pos: int, + relative: bool = False, + **kwargs, + ): + """ + A fly scan following a round shell-like pattern. + + Args: + flyer (DeviceBase): flyer device + inner_ring (float): inner radius + outer_ring (float): outer radius + number_of_rings (int): number of rings + number_pos (int): number of positions in the first ring + relative (bool): if True, the motors will be moved relative to their current position. Default is False. + burst_at_each_point (int): number of exposures at each point. Default is 1. + + Returns: + ScanReport + + Examples: + >>> scans.round_scan_fly(dev.flyer_sim, 0, 50, 5, 3, exp_time=0.1, relative=True) + + """ + super().__init__(**kwargs) + self.flyer = flyer + self.inner_ring = inner_ring + self.outer_ring = outer_ring + self.number_of_rings = number_of_rings + self.number_pos = number_pos + + def _get_scan_motors(self): + self.scan_motors = [] + + @property + def monitor_sync(self): + return self.flyer + + +[docs] + def prepare_positions(self): + self._calculate_positions() + self.num_pos = len(self.positions) * self.burst_at_each_point + self._check_limits() + yield None + + + +[docs] + def finalize(self): + yield + + + def _calculate_positions(self): + self.positions = get_round_scan_positions( + r_in=self.inner_ring, + r_out=self.outer_ring, + nr=self.number_of_rings, + nth=self.number_pos, + ) + + +[docs] + def scan_core(self): + yield from self.stubs.kickoff( + device=self.flyer, + parameter={ + "num_pos": self.num_pos, + "positions": self.positions.tolist(), + "exp_time": self.exp_time, + }, + ) + target_DIID = self.DIID - 1 + + while True: + yield from self.stubs.read_and_wait(group="primary", wait_group="readout_primary") + status = self.device_manager.connector.get(MessageEndpoints.device_status(self.flyer)) + if status: + device_is_idle = status.content.get("status", 1) == 0 + matching_RID = self.metadata.get("RID") == status.metadata.get("RID") + matching_DIID = target_DIID == status.metadata.get("DIID") + if device_is_idle and matching_RID and matching_DIID: + break + + time.sleep(1) + logger.debug("reading monitors") + + + + + +[docs] +class RoundROIScan(ScanBase): + scan_name = "round_roi_scan" + required_kwargs = ["dr", "nth", "relative"] + gui_config = { + "Motor 1": ["motor_1", "width_1"], + "Motor 2": ["motor_2", "width_2"], + "Shell Parametes": ["dr", "nth"], + "Acquisition Parameters": ["exp_time", "relative", "burst_at_each_point"], + } + + def __init__( + self, + motor_1: DeviceBase, + width_1: float, + motor_2: DeviceBase, + width_2: float, + dr: float = 1, + nth: int = 5, + exp_time: float = 0, + relative: bool = False, + burst_at_each_point: int = 1, + **kwargs, + ): + """ + A scan following a round-roi-like pattern. + + Args: + motor_1 (DeviceBase): first motor + width_1 (float): width of region of interest for motor_1 + motor_2 (DeviceBase): second motor + width_2 (float): width of region of interest for motor_2 + dr (float): shell width. Default is 1. + nth (int): number of points in the first shell. Default is 5. + exp_time (float): exposure time in seconds. Default is 0. + relative (bool): Start from an absolute or relative position. Default is False. + burst_at_each_point (int): number of acquisition per point. Default is 1. + + Returns: + ScanReport + + Examples: + >>> scans.round_roi_scan(dev.motor1, 20, dev.motor2, 20, dr=2, nth=3, exp_time=0.1, relative=True) + + """ + super().__init__( + exp_time=exp_time, relative=relative, burst_at_each_point=burst_at_each_point, **kwargs + ) + self.motor_1 = motor_1 + self.motor_2 = motor_2 + self.width_1 = width_1 + self.width_2 = width_2 + self.dr = dr + self.nth = nth + + def _calculate_positions(self) -> None: + self.positions = get_round_roi_scan_positions( + lx=self.width_1, ly=self.width_2, dr=self.dr, nth=self.nth + ) + + + + +[docs] +class ListScan(ScanBase): + scan_name = "list_scan" + required_kwargs = ["relative"] + arg_input = {"device": ScanArgType.DEVICE, "positions": ScanArgType.LIST} + arg_bundle_size = {"bundle": len(arg_input), "min": 1, "max": None} + + def __init__(self, *args, parameter: dict = None, **kwargs): + """ + A scan following the positions specified in a list. + Please note that all lists must be of equal length. + + Args: + *args: pairs of motors and position lists + relative: Start from an absolute or relative position + burst: number of acquisition per point + + Returns: + ScanReport + + Examples: + >>> scans.list_scan(dev.motor1, [0,1,2,3,4], dev.motor2, [4,3,2,1,0], exp_time=0.1, relative=True) + + """ + super().__init__(parameter=parameter, **kwargs) + if len(set(len(entry[0]) for entry in self.caller_args.values())) != 1: + raise ValueError("All position lists must be of equal length.") + + def _calculate_positions(self): + self.positions = np.vstack(tuple(self.caller_args.values()), dtype=float).T.tolist() + + + + +[docs] +class TimeScan(ScanBase): + scan_name = "time_scan" + required_kwargs = ["points", "interval"] + gui_config = {"Scan Parameters": ["points", "interval", "exp_time", "burst_at_each_point"]} + + def __init__( + self, + points: int, + interval: float, + exp_time: float = 0, + burst_at_each_point: int = 1, + **kwargs, + ): + """ + Trigger and readout devices at a fixed interval. + Note that the interval time cannot be less than the exposure time. + The effective "sleep" time between points is + sleep_time = interval - exp_time + + Args: + points: number of points + interval: time interval between points + exp_time: exposure time in s + burst: number of acquisition per point + + Returns: + ScanReport + + Examples: + >>> scans.time_scan(points=10, interval=1.5, exp_time=0.1, relative=True) + + """ + super().__init__(exp_time=exp_time, burst_at_each_point=burst_at_each_point, **kwargs) + self.points = points + self.interval = interval + self.interval -= self.exp_time + + def _calculate_positions(self) -> None: + pass + + +[docs] + def prepare_positions(self): + self.num_pos = self.points + yield None + + + def _at_each_point(self, ind=None, pos=None): + if ind > 0: + yield from self.stubs.wait( + wait_type="read", group="primary", wait_group="readout_primary" + ) + yield from self.stubs.trigger(group="trigger", point_id=self.point_id) + yield from self.stubs.wait(wait_type="trigger", group="trigger", wait_time=self.exp_time) + yield from self.stubs.read( + group="primary", wait_group="readout_primary", point_id=self.point_id + ) + yield from self.stubs.wait(wait_type="trigger", group="trigger", wait_time=self.interval) + self.point_id += 1 + + +[docs] + def scan_core(self): + for ind in range(self.num_pos): + yield from self._at_each_point(ind) + + + + + +[docs] +class MonitorScan(ScanBase): + scan_name = "monitor_scan" + required_kwargs = ["relative"] + scan_type = "fly" + gui_config = {"Device": ["device", "start", "stop"], "Scan Parameters": ["relative"]} + + def __init__( + self, device: DeviceBase, start: float, stop: float, relative: bool = False, **kwargs + ): + """ + Readout all primary devices at each update of the monitored device. + + Args: + device (Device): monitored device + start (float): start position of the monitored device + stop (float): stop position of the monitored device + relative (bool): if True, the motor will be moved relative to its current position. Default is False. + + Returns: + ScanReport + + Examples: + >>> scans.monitor_scan(dev.motor1, -5, 5, exp_time=0.1, relative=True) + + """ + self.device = device + super().__init__(relative=relative, **kwargs) + self.start = start + self.stop = stop + + def _get_scan_motors(self): + self.scan_motors = [self.device] + self.flyer = self.device + + @property + def monitor_sync(self): + return self.flyer + + def _calculate_positions(self) -> None: + self.positions = np.array([[self.start], [self.stop]], dtype=float) + + +[docs] + def prepare_positions(self): + self._calculate_positions() + self.num_pos = 0 + yield from self._set_position_offset() + self._check_limits() + + + def _get_flyer_status(self) -> list: + connector = self.device_manager.connector + + pipe = connector.pipeline() + connector.lrange( + MessageEndpoints.device_req_status_container(self.metadata["RID"]), 0, -1, pipe + ) + connector.get(MessageEndpoints.device_readback(self.flyer), pipe) + return connector.execute_pipeline(pipe) + + +[docs] + def scan_core(self): + yield from self.stubs.set( + device=self.flyer, value=self.positions[0][0], wait_group="scan_motor" + ) + yield from self.stubs.wait(wait_type="move", device=self.flyer, wait_group="scan_motor") + + # send the slow motor on its way + yield from self.stubs.set( + device=self.flyer, + value=self.positions[1][0], + wait_group="scan_motor", + metadata={"response": True}, + ) + + while True: + move_completed, readback = self._get_flyer_status() + + if move_completed: + break + + if not readback: + continue + readback = readback.content["signals"] + yield from self.stubs.publish_data_as_read( + device=self.flyer, data=readback, point_id=self.point_id + ) + self.point_id += 1 + self.num_pos += 1 + + + + + +[docs] +class Acquire(ScanBase): + scan_name = "acquire" + required_kwargs = [] + gui_config = {"Scan Parameters": ["exp_time", "burst_at_each_point"]} + + def __init__(self, *args, exp_time: float = 0, burst_at_each_point: int = 1, **kwargs): + """ + A simple acquisition at the current position. + + Args: + exp_time (float): exposure time in s + burst: number of acquisition per point + + Returns: + ScanReport + + Examples: + >>> scans.acquire(exp_time=0.1, relative=True) + + """ + super().__init__(exp_time=exp_time, burst_at_each_point=burst_at_each_point, **kwargs) + + def _calculate_positions(self) -> None: + self.num_pos = self.burst_at_each_point + + +[docs] + def prepare_positions(self): + self._calculate_positions() + + + def _at_each_point(self, ind=None, pos=None): + if ind > 0: + yield from self.stubs.wait( + wait_type="read", group="primary", wait_group="readout_primary" + ) + yield from self.stubs.trigger(group="trigger", point_id=self.point_id) + yield from self.stubs.wait(wait_type="trigger", group="trigger", wait_time=self.exp_time) + yield from self.stubs.read( + group="primary", wait_group="readout_primary", point_id=self.point_id + ) + self.point_id += 1 + + +[docs] + def scan_core(self): + for self.burst_index in range(self.burst_at_each_point): + yield from self._at_each_point(self.burst_index) + self.burst_index = 0 + + + +[docs] + def run(self): + self.initialize() + self.prepare_positions() + yield from self.open_scan() + yield from self.stage() + yield from self.run_baseline_reading() + yield from self.pre_scan() + yield from self.scan_core() + yield from self.finalize() + yield from self.unstage() + yield from self.cleanup() + + + + + +[docs] +class LineScan(ScanBase): + scan_name = "line_scan" + required_kwargs = ["steps", "relative"] + arg_input = { + "device": ScanArgType.DEVICE, + "start": ScanArgType.FLOAT, + "stop": ScanArgType.FLOAT, + } + arg_bundle_size = {"bundle": len(arg_input), "min": 1, "max": None} + gui_config = { + "Movement Parameters": ["steps", "relative"], + "Acquisition Parameters": ["exp_time", "burst_at_each_point"], + } + + def __init__( + self, + *args, + exp_time: float = 0, + steps: int = None, + relative: bool = False, + burst_at_each_point: int = 1, + **kwargs, + ): + """ + A line scan for one or more motors. + + Args: + *args (Device, float, float): pairs of device / start position / end position + exp_time (float): exposure time in s. Default: 0 + steps (int): number of steps. Default: 10 + relative (bool): if True, the start and end positions are relative to the current position. Default: False + burst_at_each_point (int): number of acquisition per point. Default: 1 + + Returns: + ScanReport + + Examples: + >>> scans.line_scan(dev.motor1, -5, 5, dev.motor2, -5, 5, steps=10, exp_time=0.1, relative=True) + + """ + super().__init__( + exp_time=exp_time, relative=relative, burst_at_each_point=burst_at_each_point, **kwargs + ) + self.steps = steps + + def _calculate_positions(self) -> None: + axis = [] + for _, val in self.caller_args.items(): + ax_pos = np.linspace(val[0], val[1], self.steps, dtype=float) + axis.append(ax_pos) + self.positions = np.array(list(zip(*axis)), dtype=float) + + + + +[docs] +class ScanComponent(ScanBase): + pass + + + + +[docs] +class OpenInteractiveScan(ScanComponent): + scan_name = "open_interactive_scan" + required_kwargs = [] + arg_input = {"device": ScanArgType.DEVICE} + arg_bundle_size = {"bundle": len(arg_input), "min": 1, "max": None} + + def __init__(self, *args, **kwargs): + """ + An interactive scan for one or more motors. + + Args: + *args: devices + exp_time: exposure time in s + steps: number of steps (please note: 5 steps == 6 positions) + relative: Start from an absolute or relative position + burst: number of acquisition per point + + Returns: + ScanReport + + Examples: + >>> scans.open_interactive_scan(dev.motor1, dev.motor2, exp_time=0.1) + + """ + super().__init__(**kwargs) + + def _calculate_positions(self): + pass + + def _get_scan_motors(self): + caller_args = list(self.caller_args.keys()) + self.scan_motors = caller_args + + +[docs] + def run(self): + yield from self.stubs.open_scan_def() + self.initialize() + yield from self.read_scan_motors() + yield from self.open_scan() + yield from self.stage() + yield from self.run_baseline_reading() + + + + + +[docs] +class AddInteractiveScanPoint(ScanComponent): + scan_name = "interactive_scan_trigger" + arg_input = {"device": ScanArgType.DEVICE} + arg_bundle_size = {"bundle": len(arg_input), "min": 1, "max": None} + + def __init__(self, *args, **kwargs): + """ + An interactive scan for one or more motors. + + Args: + *args: devices + exp_time: exposure time in s + steps: number of steps (please note: 5 steps == 6 positions) + relative: Start from an absolute or relative position + burst: number of acquisition per point + + Returns: + ScanReport + + Examples: + >>> scans.interactive_scan_trigger() + + """ + super().__init__(**kwargs) + + def _calculate_positions(self): + pass + + def _get_scan_motors(self): + self.scan_motors = list(self.caller_args.keys()) + + def _at_each_point(self, ind=None, pos=None): + yield from self.stubs.trigger(group="trigger", point_id=self.point_id) + yield from self.stubs.wait(wait_type="trigger", group="trigger", wait_time=self.exp_time) + yield from self.stubs.read_and_wait( + group="primary", wait_group="readout_primary", point_id=self.point_id + ) + self.point_id += 1 + + +[docs] + def run(self): + yield from self.open_scan() + yield from self._at_each_point() + yield from self.close_scan() + + + + + +[docs] +class CloseInteractiveScan(ScanComponent): + scan_name = "close_interactive_scan" + + def __init__(self, *args, **kwargs): + """ + An interactive scan for one or more motors. + + Args: + *args: devices + Parameters +Next, we need to define the scan parameters. In exp_time: exposure time in s + steps: number of steps (please note: 5 steps == 6 positions) + relative: Start from an absolute or relative position + burst: number of acquisition per point + + Returns: + ScanReport + + Examples: + >>> scans.close_interactive_scan(dev.motor1, dev.motor2, exp_time=0.1) + + """ + super().__init__(**kwargs) + + def _calculate_positions(self): + pass + + +[docs] + def run(self): + yield from self.finalize() + yield from self.unstage() + yield from self.cleanup() + yield from self.stubs.close_scan_def() + diff --git a/phoenix_bec/local_scripts/Examples/Learn_about_ophyd/DefiningEpics_Channels_details.py b/phoenix_bec/local_scripts/Examples/Learn_about_ophyd/DefiningEpics_Channels_details.py new file mode 100644 index 0000000..aa40aaa --- /dev/null +++ b/phoenix_bec/local_scripts/Examples/Learn_about_ophyd/DefiningEpics_Channels_details.py @@ -0,0 +1,120 @@ +from ophyd import Device, EpicsMotor, EpicsSignal, EpicsSignalRO +from ophyd import Component as Cpt + + +from phoenix_bec.local_scripts.Examples.my_ophyd import Device,EpicsMotor, EpicsSignal, EpicsSignalRO +from phoenix_bec.local_scripts.Examples.my_ophyd import Component as Cpt + +############################################ +# +# KEEP my_ophyd zipped to avoid chaos local version does not run +# so this is rather useless +# +########################################## + + + +#option I via direct acces to classes + +def print_dic(clname,cl): + print('') + print('-------- ',clname) + for ii in cl.__dict__: + if '_' not in ii: + + try: + print(ii,' ---- ',cl.__getattribute__(ii)) + except: + print(ii) + + + + +ScanX = EpicsMotor(name='ScanX',prefix='X07MB-ES-MA1:ScanX') +ScanY = EpicsMotor(name='ScanY',prefix='X07MB-ES-MA1:ScanY') +DIODE = EpicsSignal(name='SI',read_pv='X07MB-OP2-SAI_07:MEAN') +SMPL = EpicsSignal(name='SMPL',read_pv='X07MB-OP2:SMPL') +CYCLES = EpicsSignal(name='SMPL',read_pv='X07MB-OP2:TOTAL-CYCLES',write_pv='X07MB-OP2:TOTAL-CYCLES') + + +#prefix='XXXX:' +y_cpt = Cpt(EpicsMotor, 'ScanX') +# Option 2 using component + +device_ins=Device('X07MB-ES-MA1:',name=('device_name')) +print(' initialzation of device_in=Device(X07MB-ES-MA1:,name=(device_name)') +print('device_ins.__init__') +print(device_ins.__init__) +print_dic('class Device',Device) +print_dic('instance of device device_ins',device_ins) + + +print(' ') +print('DEFINE class StageXY... prefix variable not defined ') + +EpicsMotor, 'ScanY' +""" +class MyCpt(typing.Generic[K]): + + def __init__( + self, + cls: Type[K], + suffix: Optional[str] = None, + *, + lazy: Optional[bool] = None, + trigger_value: Optional[Any] = None, + add_prefix: Optional[Sequence[str]] = None, + doc: Optional[str] = None, + kind: Union[str, Kind] = Kind.normal, + **kwargs, + ): + self.attr = None # attr is set later by the device when known + self.cls = cls + self.kwargs = kwargs + #self.lazy = lazy if lazy is not None else self.lazy_default + self.suffix = suffix + self.doc = doc + self.trigger_value = trigger_value # TODO discuss + self.kind = Kind[kind.lower()] if isinstance(kind, str) else Kind(kind) + if add_prefix is None: + add_prefix = ("suffix", "write_pv") + self.add_prefix = tuple(add_prefix) + self._subscriptions = collections.defaultdict(list) + print(' ') +""" + + +in Device we have this class method +Class device(..): +.... + @classmethod + def _initialize_device(cls): +.... + + +class StageXY(Device): + # Here the whole namespace and finctionality + # of Device(Blueskyinterface,Pphydobject) is inherited + # into class StageXY + + x = Cpt(EpicsMotor, 'ScanX') + y = Cpt(EpicsMotor, 'ScanY') + +# end class + + + + +print() +print('init xy_stage, use input parameter from Device and prefix is defined in call ') +xy = StageXY('X07MB-ES-MA1:', name='xy_name') + +print_dic('class StageXY',StageXY) +print_dic('instance of StageXY',xy) + + + + +#print('xy.x.prefix') +#print(xy.x.prefix) +xy.__dict__ diff --git a/phoenix_bec/local_scripts/README.md b/phoenix_bec/local_scripts/README.md new file mode 100644 index 0000000..3f9b602 --- /dev/null +++ b/phoenix_bec/local_scripts/README.md @@ -0,0 +1,9 @@ +# purpose of directory + +This diretory is for scripts, test etc. which are not loaded into the bec-server. + +For now we keep it in the phoenix_bec structure, but for operation, such files should be located out side of the +bec_phoenix plugin. + + +TO avoid poading of these files, there should be no files called __init__.py anywhere in the directory tree