diff --git a/phoenix_bec/bec_ipython_client/startup/post_startup.py b/phoenix_bec/bec_ipython_client/startup/post_startup.py index 385e9e2..6ad6153 100644 --- a/phoenix_bec/bec_ipython_client/startup/post_startup.py +++ b/phoenix_bec/bec_ipython_client/startup/post_startup.py @@ -36,6 +36,7 @@ to setup the prompts. # pylint: disable=invalid-name, unused-import, import-error, undefined-variable, unused-variable, unused-argument, no-name-in-module import time as tt import sys +import os from IPython.core.magic import register_line_magic @@ -132,6 +133,10 @@ def ph_load_config(line): print('elapsed time:', tt.time()-t0) #enddef +@register_line_magic +def ph_restart_bec_server(line): + os.system('bec-server restart') + os.system('gnome-terminal --geometry 120X50 -- bash -c "bec-server attach; exec bash"') ##@register_line_magic #def ph_post_startup(line): diff --git a/phoenix_bec/device_configs/phoenix_devices.yaml b/phoenix_bec/device_configs/phoenix_devices.yaml index 5218499..c9c57fc 100644 --- a/phoenix_bec/device_configs/phoenix_devices.yaml +++ b/phoenix_bec/device_configs/phoenix_devices.yaml @@ -3,23 +3,15 @@ # phoenix standard devices (motors) # # -##################################################### - - -#################### -# -# TRIGGER/Delay -# -################### - -phoenix_trigger: - description: Trigger +####################################################: +TTL: + description: PHOENIX TTL trigger deviceClass: phoenix_bec.devices.phoenix_trigger.PhoenixTrigger deviceConfig: prefix: 'X07MB-OP2:' deviceTags: - phoenix - - trigger + - TTL Trigger - phoenix_devices.yaml onFailure: buffer enabled: true diff --git a/phoenix_bec/devices/__init__.py b/phoenix_bec/devices/__init__.py index 9ba919d..7322b2b 100644 --- a/phoenix_bec/devices/__init__.py +++ b/phoenix_bec/devices/__init__.py @@ -1 +1 @@ -from PhoenixTrigger import PhoenixTrigger \ No newline at end of file +from .phoenix_trigger import PhoenixTrigger \ No newline at end of file diff --git a/phoenix_bec/devices/falcon_phoenix_no_hdf5.py b/phoenix_bec/devices/falcon_phoenix_no_hdf5.py index 371abcd..443e4e5 100644 --- a/phoenix_bec/devices/falcon_phoenix_no_hdf5.py +++ b/phoenix_bec/devices/falcon_phoenix_no_hdf5.py @@ -256,7 +256,7 @@ class FalconSetup(CustomDetectorMixin): #): # # Retry stop detector and wait for remaining time # raise FalconTimeoutError( - # f"Failed to stop detector, timeout with state {signal_conditions[0][0]}" + # f"Failed to stop detector, timeou t with state {signal_conditions[0][0]}" # ) def stop_detector_backend(self) -> None: diff --git a/phoenix_bec/devices/phoenix_trigger.py b/phoenix_bec/devices/phoenix_trigger.py index 0b83026..8d1ea1d 100644 --- a/phoenix_bec/devices/phoenix_trigger.py +++ b/phoenix_bec/devices/phoenix_trigger.py @@ -1,3 +1,5 @@ +import time + from ophyd import ( ADComponent as ADCpt, Device, @@ -18,10 +20,6 @@ DETECTOR_TIMEOUT = 5 #class PhoenixTriggerError(Exce start_csmpl=Cpt(EPicsSignal,'START-CSMPL') # cont on / off - - - - class PhoenixTriggerSetup(CustomDetectorMixin): """ This defines the PHOENIX trigger setup. @@ -29,11 +27,38 @@ class PhoenixTriggerSetup(CustomDetectorMixin): """ + #self.acquire = self.parent.smpl.put(1) + #self.continuous_sampling_on = parent.start_cmpl.put(1) + #self.continuous_sampling_off = self.parent.start_cmpl.put(0) + def __init__(self, *args, parent:Device = None, **kwargs): super().__init__(*args, parent=parent, **kwargs) self._counter = 0 - WW + + def on_acquire(self): + self.parent.smpl.put(1) + print('on_aquire') + + + def on_cont_sample_on(self): + self.parent.start_csmpl.put(1) + print('on_cont_sample_on') + + def on_cont_sample_off(self): + self.parent.start_csmpl.put(0) + print('on_cont_sample_off') + + + def on_done(self): + done = self.parent.smpl_done.get() + return done + + + def on_dwell(self,t): + " calculate cycles from time in sec " + cycles=self.parent.total_cycles.put(0)*5 + def on_stage(self): # is this called on each point in scan or just before scan ??? print('on stage') @@ -43,23 +68,26 @@ class PhoenixTriggerSetup(CustomDetectorMixin): time.sleep(0.05) cycles=self.parent.total_cycles.put(0) time.sleep(0.05) - cycles=self.parent.smpl.put(2) + cycles=self.parent.smpl.put(1) time.sleep(0.5) cycles=self.parent.total_cycles.put(cycles) - logger.success('PhoenixTrigger on stage') - def on_trigger(self): - print('on_trigger') - self.parent.start_smpl.put(1) - logger.success('PhoenixTrigger on_trigger') - - return self.wait_with_status( - [(self.parent.smpl_done.get, 1)]) - + + #def on_trigger(self): + # print('on_trigger') + # self.parent.start_smpl.put(1) + # logger.success('PhoenixTrigger on_trigger') + # + # return self.wait_with_status( + # [(self.parent.smpl_done.get, 1)]) + + + + # logger.success(' PhoenixTrigger on_trigger complete ') # if success: @@ -70,67 +98,57 @@ class PhoenixTriggerSetup(CustomDetectorMixin): - def on_complete(self): - print('on_complete') - timeout =10 + #def on_complete(self): + # print('on_complete') + # timeout =10 - logger.success('XXXX complete %d XXXX' % success) + # logger.success('XXXX complete %d XXXX' % success) - success = self.wait_for_signals( - [ - (self.parent.smpl_done.get, 0)) - ], - timeout, - check_stopped=True, - all_signals=True - ) + # success = self.wait_for_signals( + # [ + # (self.parent.smpl_done.get, 0) + # ], + # timeout, + # check_stopped=True, + # all_signals=True + # ) - if success: - status.set_finished() - else: - status.set_exception(TimeoutError()) - return status + # if success: + # status.set_finished() + # else: + # status.set_exception(TimeoutError()) + # return status - - def on_stop(self): - logger.success(' PhoenixTrigger on_stop ') - - self.parent.csmpl.put(1) - logger.success(' PhoenixTrigger on_stop finished ') - - def on_unstage(self): - logger.success(' PhoenixTrigger on_unstage ') - self.parent.csmpl.put(1) - self.parent.smpl.put(1) - logger.success(' PhoenixTrigger on_unstage finished ') - + # hoenixTrigger on_unstage ') + # self.parent.csmpl.put(1) + # self.parent.smpl.put(1) + # logger.success(' PhoenixTrigger on_unstage finished ') + #def on_trigger(): + # print('on_trigger') class PhoenixTrigger(PSIDetectorBase): """ + Docstring: + + Class for PHOENIX TTL hardware trigger + Parent class: PSIDetectorBase class attributes: - custom_prepare_cls (XMAPSetup) : Custom detector setup class for cSAXS, + custom_prepare_cls (PhoenixTriggerSetup) : Custom setup for TTL trigger at PHOENIX inherits from CustomDetectorMixin in __init__ of PSIDetecor bases class is initialized self.custom_prepare = self.custom_prepare_cls(parent=self, **kwargs) - PSIDetectorBase.set_min_readout (float) : Minimum readout time for the detector - dxp (EpicsDXPXMAP) : DXP parameters for XMAP detector - mca (EpicsMCARecord) : MCA parameters for XMAP detector - hdf5 (XMAPHDF5Plugins) : HDF5 parameters for XMAP detector - MIN_READOUT (float) : Minimum readout time for the detector - - The class PhoenixTrigger is the class to be called via yaml configuration file the input arguments are defined by PSIDetectorBase, and need to be given in the yaml configuration file. @@ -138,6 +156,8 @@ class PhoenixTrigger(PSIDetectorBase): use prefix 'X07MB-OP2:' in the device definition in the yaml configuration file. + + PSIDetectorBase( prefix='', *,Q @@ -147,12 +167,6 @@ class PhoenixTrigger(PSIDetectorBase): device_manager=None, **kwargs, ) - Docstring: - Abstract base class for SLS detectors - - Class attributes: - custom_prepare_cls (object): class for custom prepare logic (BL specific) - Args: prefix (str): EPICS PV prefix for component (optional) name (str): name of the device, as will be reported via read() @@ -167,13 +181,66 @@ class PhoenixTrigger(PSIDetectorBase): File: /data/test/x07mb-test-bec/bec_deployment/ophyd_devices/ophyd_devices/interfaces/base_classes/psi_detector_base.py Type: type Subclasses: EpicsSignal + """ + ################################################################## + # Specify which functions are revealed to the user in BEC client + # only a set of predefined functions will be visible in dev.TTL + # The Variable USER_ACCESS contains an ascii list of functions which will be + # visible in dev.TTL as well + # Alternatively one couls also create 2nd instance of PhoenixTrigger, + # which is probably not ideal + + USER_ACCESS = ["a_acquire" + ,"a_cont_sample_on" + ,"a_cont_sample_off" + ,"prefix" + ,"a_done"] + + ##################################################################### + # specify Setup class into variable custom_prepare_cls + # in __init__ of PSIDetectorBase will the initialzed by + # self.custom_prepare = self.custom_prepare_cls(parent=self, **kwargs) + # making the instance of PSIDetectorBase availble in functions + custom_prepare_cls = PhoenixTriggerSetup - - start_csmpl = Cpt(EpicsSignal,'START-CSMPL') # cont on / off + #############################################################3 + # Now use component to provide channel access + # when PhoenixTrigger is initialized, the parameters of the base class are + # inherided, most notable prefix, which is here X07MB-OP2: + # The input of Component=Cpt is Cpt(deviceClass,suffix) + # if Cpt is used in a class, which has interited Device, here via: + # (Here PhoenixTrigger <-- PSIDetectorBase <- Device + # the Cpt will construct - magically- the Epics channel name + # EpicsPV = prefix+suffix, + # for example + # 'X07MB-OP2:' + 'START-CSMPL' -> 'X07MB-OP2:' + 'START-CSMPL' + # + start_csmpl = Cpt(EpicsSignal, 'START-CSMPL') # cont on / off intr_count = Cpt(EpicsSignal,'INTR-COUNT') # conter run up total_cycles = Cpt(EpicsSignal,'TOTAL-CYCLES') # cycles set + smpl = Cpt(EpicsSignal,'SMPL') # start sampling --> aquire smpl_done = Cpt(EpicsSignal,'SMPL-DONE') # show trigger is done + + # link to reasonable names + # start with a_ to see functions quicklz in listing + # + # + def a_acquire(self): + self.custom_prepare.on_acquire() + + def a_cont_sample_on(self): + self.custom_prepare.on_cont_sample_on() + + def a_cont_sample_off(self): + self.custom_prepare.on_cont_sample_off() + + def a_done(self): + done=self.custom_prepare.on_done() + return done + + def a_dwell(self): + self.custom_prepare.on_dwell() \ No newline at end of file diff --git a/phoenix_bec/devices/phoenix_trigger.py~ b/phoenix_bec/devices/phoenix_trigger.py~ new file mode 100644 index 0000000..d457eb6 --- /dev/null +++ b/phoenix_bec/devices/phoenix_trigger.py~ @@ -0,0 +1,182 @@ +from ophyd import ( + ADComponent as ADCpt, + Device, + DeviceStatus, +) + +from ophyd import Component as Cpt +from ophyd import Device, EpicsSignal, EpicsSignalRO + +from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase, CustomDetectorMixin + +from bec_lib import bec_logger, messages +from bec_lib.endpoints import MessageEndpoints + +import time + +logger = bec_logger.logger + +DETECTOR_TIMEOUT = 5 + +#class PhoenixTriggerError(Exce start_csmpl=Cpt(EPicsSignal,'START-CSMPL') # cont on / off + + + + + +class PhoenixTriggerSetup(CustomDetectorMixin): + """ + This defines the PHOENIX trigger setup. + + + """ + + def __init__(self, *args, parent:Device = None, **kwargs): + super().__init__(*args, parent=parent, **kwargs) + self._counter = 0 + + WW + def on_stage(self): + # is this called on each point in scan or just before scan ??? + print('on stage') + self.parent.start_csmpl.put(0) + time.sleep(0.05) + cycles=self.parent.total_cycles.get() + time.sleep(0.05) + cycles=self.parent.total_cycles.put(0) + time.sleep(0.05) + cycles=self.parent.smpl.put(2) + time.sleep(0.5) + cycles=self.parent.total_cycles.put(cycles) + + logger.success('PhoenixTrigger on stage') + + def on_trigger(self): + + self.parent.start_smpl.put(1) + time.sleep(0.05) # use blocking + logger.success('PhoenixTrigger on_trigger') + + return self.wait_with_status( + [(self.parent.smpl_done.get, 1)]) + + + + +# logger.success(' PhoenixTrigger on_trigger complete ') + +# if success: +# status.set_finished() +# else: +# status.set_exception(TimeoutError()) +# return status + + + + def on_complete(self): + + timeout =10 + + + logger.success('XXXX complete %d XXXX' % success) + + success = self.wait_for_signals( + [ + (self.parent.smpl_done.get, 0)) + ], + timeout, + check_stopped=True, + all_signals=True + ) + + + + if success: + status.set_finished() + else: + status.set_exception(TimeoutError()) + return status + + + + + def on_stop(self): + logger.success(' PhoenixTrigger on_stop ') + + self.parent.csmpl.put(1) + logger.success(' PhoenixTrigger on_stop finished ') + + def on_unstage(self): + logger.success(' PhoenixTrigger on_unstage ') + self.parent.csmpl.put(1) + self.parent.smpl.put(1) + logger.success(' PhoenixTrigger on_unstage finished ') + + + + + +class PhoenixTrigger(PSIDetectorBase): + + """ + Parent class: PSIDetectorBase + + class attributes: + custom_prepare_cls (XMAPSetup) : Custom detector setup class for cSAXS, + inherits from CustomDetectorMixin + in __init__ of PSIDetecor bases + class is initialized + self.custom_prepare = self.custom_prepare_cls(parent=self, **kwargs) + PSIDetectorBase.set_min_readout (float) : Minimum readout time for the detector + dxp (EpicsDXPXMAP) : DXP parameters for XMAP detector + mca (EpicsMCARecord) : MCA parameters for XMAP detector + hdf5 (XMAPHDF5Plugins) : HDF5 parameters for XMAP detector + MIN_READOUT (float) : Minimum readout time for the detector + + + The class PhoenixTrigger is the class to be called via yaml configuration file + the input arguments are defined by PSIDetectorBase, + and need to be given in the yaml configuration file. + To adress chanels such as 'X07MB-OP2:SMPL-DONE': + + use prefix 'X07MB-OP2:' in the device definition in the yaml configuration file. + + PSIDetectorBase( + prefix='', + *,Q + name, + kind=None, + parent=None, + device_manager=None, + **kwargs, + ) + Docstring: + Abstract base class for SLS detectors + + Class attributes: + custom_prepare_cls (object): class for custom prepare logic (BL specific) + + Args: + prefix (str): EPICS PV prefix for component (optional) + name (str): name of the device, as will be reported via read() + kind (str): member of class 'ophydobj.Kind', defaults to Kind.normal + omitted -> readout ignored for read 'ophydobj.read()' + normal -> readout for read + config -> config parameter for 'ophydobj.read_configuration()' + hinted -> which attribute is readout for read + parent (object): instance of the parent device + device_manager (object): bec device manager + **kwargs: keyword arguments + File: /data/test/x07mb-test-bec/bec_deployment/ophyd_devices/ophyd_devices/interfaces/base_classes/psi_detector_base.py + Type: type + Subclasses: EpicsSignal + """ + + custom_prepare_cls = PhoenixTriggerSetup + + + start_csmpl = Cpt(EpicsSignal,'START-CSMPL') # cont on / off + intr_count = Cpt(EpicsSignal,'INTR-COUNT') # conter run up + total_cycles = Cpt(EpicsSignal,'TOTAL-CYCLES') # cycles set + smpl_done = Cpt(EpicsSignal,'SMPL-DONE') # show trigger is done + diff --git a/phoenix_bec/devices/xmap_phoenix_no_hdf5.py b/phoenix_bec/devices/xmap_phoenix_no_hdf5.py index 57b0d19..87bb9f1 100644 --- a/phoenix_bec/devices/xmap_phoenix_no_hdf5.py +++ b/phoenix_bec/devices/xmap_phoenix_no_hdf5.py @@ -310,10 +310,7 @@ class XMAPSetup(CustomDetectorMixin): class XMAPphoenix(PSIDetectorBase): """MCA XMAP detector for phoenix - - Parent class: PSIDetectorBase - - class attributes: + custom_prepare_cls (XMAPSetu custom_prepare_cls (XMAPSetup) : Custom detector setup class for cSAXS, inherits from CustomDetectorMixin in __init__ of PSIDetecor base @@ -337,9 +334,9 @@ class XMAPphoenix(PSIDetectorBase): dxp = Cpt(EpicsDXPXMAP, "dxp1:") mca1 = Cpt(EpicsMCARecord, "mca1") - mca2 = Cpt(EpicsMCARecord, "mca2") - mca3 = Cpt(EpicsMCARecord, "mca3") - mca4 = Cpt(EpicsMCARecord, "mca4") + #mca2 = Cpt(EpicsMCARecord, "mca2") + #mca3 = Cpt(EpicsMCARecord, "mca3") + #mca4 = Cpt(EpicsMCARecord, "mca4") print('load hdf5') #hdf5 = Cpt(XMAPHDF5Plugins, "HDF1:") @@ -362,5 +359,11 @@ class XMAPphoenix(PSIDetectorBase): auto_pixels_per_buffer = Cpt(EpicsSignal, "AutoPixelsPerBuffer") pixels_per_buffer = Cpt(EpicsSignal, "PixelsPerBuffer") pixels_per_run = Cpt(EpicsSignal, "PixelsPerRun") + + #nd_array_mode = Cpt(EpicsSignal, "NDArrayMode") print('DONE connecton chanels in XMAPphoenix') + + + def aaaa(self): + print('aaaa') diff --git a/phoenix_bec/local_scripts/Documentation/Base_Classes/BASE_CLASS_PSI_detector_base.txt b/phoenix_bec/local_scripts/Documentation/Base_Classes/BASE_CLASS_PSI_detector_base.txt new file mode 100644 index 0000000..fe67b35 --- /dev/null +++ b/phoenix_bec/local_scripts/Documentation/Base_Classes/BASE_CLASS_PSI_detector_base.txt @@ -0,0 +1,435 @@ +FILE ophyd_devices/ophy_devices/devices/interfaces/base_classes + + +"""This module contains the base class for SLS detectors. We follow the approach to integrate +PSI detectors into the BEC system based on this base class. The base class is used to implement +certain methods that are expected by BEC, such as stage, unstage, trigger, stop, etc... +We use composition with a custom prepare class to implement BL specific logic for the detector. +The beamlines need to inherit from the Custoon_ +import threading +import time +import traceback + +from bec_lib import messages +from bec_lib.endpoints import MessageEndpoints +from bec_lib.file_utils import FileWriter +from bec_lib.logger import bec_logger +from ophyd import Component, Device, DeviceStatus, Kind +from ophyd.device import Staged + +from ophyd_devices.sim.sim_signals import SetableSignal +from ophyd_devices.utils import bec_utils +from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin +from ophyd_devices.utils.errors import DeviceStopError, DeviceTimeoutError + +logger = bec_logger.logger + + +class DetectorInitError(Exception): + """Raised when initiation of the device class fails, + due to missing device manager or not started in sim_mode.""" + + +class CustomDetectorMixin: + """ + Mixin class for custom detector logic + + This class is used to implement BL specific logic for the detector. + It is used in the PSIDetectorBase class. + + For the integration of a new detector, the following functions should + help with integrating functionality, but additional ones can be added. + + Check PSIDetectorBase for the functions that are called during relevant function calls of + stage, unstage, trigger, stop and _init. + """ + + def __init__(self, *_args, parent: Device = None, **_kwargs) -> None: + self.parent = parent + + def on_init(self) -> None: + """ + Init sequence for the detector + """ + + def on_stage(self) -> None: + """ + Specify actions to be executed during stage in preparation for a scan. + self.parent.scaninfo already has all current parameters for the upcoming scan. + + In case the backend service is writing data on disk, this step should include publishing + a file_event and file_message to BEC to inform the system where the data is written to. + + IMPORTANT: + It must be safe to assume that the device is ready for the scan + to start immediately once this function is finished. + """ + + def on_unstage(self) -> None: + """ + Specify actions to be executed during unstage. + + This step should include checking if the acqusition was successful, + and publishing the file location and file event message, + with flagged done to BEC. + """ + + def on_stop(self) -> None: + """ + Specify actions to be executed during stop. + This must also set self.parent.stopped to True. + + This step should include stopping the detector and backend service. + """ + + def on_trigger(self) -> None | DeviceStatus: + """ + Specify actions to be executed upon receiving trigger signal. + Return a DeviceStatus object or None + """ + + def on_pre_scan(self) -> None: + """ + Specify actions to be executed right before a scan starts. + + Only use if needed, and it is recommended to keep this function as short/fast as possible. + """ + + def on_complete(self) -> None | DeviceStatus: + """ + Specify actions to be executed when the scan is complete. + + This can for instance be to check with the detector and backend if all data is written succsessfully. + """ + + def publish_file_location(self, done: bool, successful: bool, metadata: dict = None) -> None: + """ + Publish the filepath to REDIS. + + We publish two events here: + - file_event: event for the filewriter + - public_file: event for any secondary service (e.g. radial integ code) + + Args: + done (bool): True if scan is finished + successful (bool): True if scan was successful + metadata (dict): additional metadata to publish + """ + if metadata is None: + metadata = {} + + msg = messages.FileMessage( + file_path=self.parent.filepath.get(), + done=done, + successful=successful, + metadata=metadata, + ) + pipe = self.parent.connector.pipeline() + self.parent.connector.set_and_publish( + MessageEndpoints.public_file(self.parent.scaninfo.scan_id, self.parent.name), + msg, + pipe=pipe, + ) + self.parent.connector.set_and_publish( + MessageEndpoints.file_event(self.parent.name), msg, pipe=pipe + ) + pipe.execute() + + def wait_for_signals( + self, + signal_conditions: list[tuple], + timeout: float, + check_stopped: bool = False, + interval: float = 0.05, + all_signals: bool = False, + ) -> bool: + """ + Convenience wrapper to allow waiting for signals to reach a certain condition. + For EPICs PVs, an example usage is pasted at the bottom. + + Args: + signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check + timeout (float): timeout in seconds + interval (float): interval in seconds + all_signals (bool): True if all signals should be True, False if any signal should be True + + Returns: + bool: True if all signals are in the desired state, False if timeout is reached + + >>> Example usage for EPICS PVs: + >>> self.wait_for_signals(signal_conditions=[(self.acquiring.get, False)], timeout=5, interval=0.05, check_stopped=True, all_signals=True) + """ + + timer = 0 + while True: + checks = [ + get_current_state() == condition + for get_current_state, condition in signal_conditions + ] + if check_stopped is True and self.parent.stopped is True: + return False + if (all_signals and all(checks)) or (not all_signals and any(checks)): + return True + if timer > timeout: + return False + time.sleep(interval) + timer += interval + + def wait_with_status( + self, + signal_conditions: list[tuple], + timeout: float, + check_stopped: bool = False, + interval: float = 0.05, + all_signals: bool = False, + exception_on_timeout: Exception = None, + ) -> DeviceStatus: + """Utility function to wait for signals in a thread. + Returns a DevicesStatus object that resolves either to set_finished or set_exception. + The DeviceStatus is attached to the parent device, i.e. the detector object inheriting from PSIDetectorBase. + + Usage: + This function should be used to wait for signals to reach a certain condition, especially in the context of + on_trigger and on_complete. If it is not used, functions may block and slow down the performance of BEC. + It will return a DeviceStatus object that is to be returned from the function. Once the conditions are met, + the DeviceStatus will be set to set_finished in case of success or set_exception in case of a timeout or exception. + The exception can be specified with the exception_on_timeout argument. The default exception is a TimeoutError. + + Args: + signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check + timeout (float): timeout in seconds + check_stopped (bool): True if stopped flag should be checked + interval (float): interval in seconds + all_signals (bool): True if all signals should be True, False if any signal should be True + exception_on_timeout (Exception): Exception to raise on timeout + + Returns: + DeviceStatus: DeviceStatus object that resolves either to set_finished or set_exception + """ + if exception_on_timeout is None: + exception_on_timeout = DeviceTimeoutError( + f"Timeout error for {self.parent.name} while waiting for signals {signal_conditions}" + ) + + status = DeviceStatus(self.parent) + + # utility function to wrap the wait_for_signals function + def wait_for_signals_wrapper( + status: DeviceStatus, + signal_conditions: list[tuple], + timeout: float, + check_stopped: bool, + interval: float, + all_signals: bool, + exception_on_timeout: Exception, + ): + """Convenient wrapper around wait_for_signals to set status based on the result. + + Args: + status (DeviceStatus): DeviceStatus object to be set + signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check + timeout (float): timeout in seconds + check_stopped (bool): True if stopped flag should be checked + interval (float): interval in seconds + all_signals (bool): True if all signals should be True, False if any signal should be True + exception_on_timeout (Exception): Exception to raise on timeout + """ + try: + result = self.wait_for_signals( + signal_conditions, timeout, check_stopped, interval, all_signals + ) + if result: + status.set_finished() + else: + if self.parent.stopped: + # INFO This will execute a callback to the parent device.stop() method + status.set_exception(exc=DeviceStopError(f"{self.parent.name} was stopped")) + else: + # INFO This will execute a callback to the parent device.stop() method + status.set_exception(exc=exception_on_timeout) + # pylint: disable=broad-except + except Exception as exc: + content = traceback.format_exc() + logger.warning( + f"Error in wait_for_signals in {self.parent.name}; Traceback: {content}" + ) + # INFO This will execute a callback to the parent device.stop() method + status.set_exception(exc=exc) + + thread = threading.Thread( + target=wait_for_signals_wrapper, + args=( + status, + signal_conditions, + timeout, + check_stopped, + interval, + all_signals, + exception_on_timeout, + ), + daemon=True, + ) + thread.start() + return status + + +class PSIDetectorBase(Device): + """ + Abstract base class for SLS detectors + + Class attributes: + custom_prepare_cls (object): class for custom prepare logic (BL specific) + + Args: + prefix (str): EPICS PV prefix for component (optional) + name (str): name of the device, as will be reported via read() + kind (str): member of class 'ophydobj.Kind', defaults to Kind.normal + omitted -> readout ignored for read 'ophydobj.read()' + normal -> readout for read + config -> config parameter for 'ophydobj.read_configuration()' + hinted -> which attribute is readout for read + parent (object): instance of the parent device + device_manager (object): bec device manager + **kwargs: keyword arguments + """ + + filepath = Component(SetableSignal, value="", kind=Kind.config) + + custom_prepare_cls = CustomDetectorMixin + + def __init__(self, prefix="", *, name, kind=None, parent=None, device_manager=None, **kwargs): + super().__init__(prefix=prefix, name=name, kind=kind, parent=parent, **kwargs) + self.stopped = False + self.name = name + self.service_cfg = None + self.scaninfo = None + self.filewriter = None + + if not issubclass(self.custom_prepare_cls, CustomDetectorMixin): + raise DetectorInitError("Custom prepare class must be subclass of CustomDetectorMixin") + self.custom_prepare = self.custom_prepare_cls(parent=self, **kwargs) + + if device_manager: + self._update_service_config() + self.device_manager = device_manager + else: + self.device_manager = bec_utils.DMMock() + base_path = kwargs["basepath"] if "basepath" in kwargs else "." + self.service_cfg = {"base_path": os.path.abspath(base_path)} + + self.connector = self.device_manager.connector + self._update_scaninfo() + self._update_filewriter() + self._init() + + def _update_filewriter(self) -> None: + """Update filewriter with service config""" + self.filewriter = FileWriter(service_config=self.service_cfg, connector=self.connector) + + def _update_scaninfo(self) -> None: + """Update scaninfo from BecScaninfoMixing + This depends on device manager and operation/sim_mode + """ + self.scaninfo = BecScaninfoMixin(self.device_manager) + self.scaninfo.load_scan_metadata() + + def _update_service_config(self) -> None: + """Update service config from BEC service config + + If bec services are not running and SERVICE_CONFIG is NONE, we fall back to the current directory. + """ + # pylint: disable=import-outside-toplevel + from bec_lib.bec_service import SERVICE_CONFIG + + if SERVICE_CONFIG: + self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"] + return + self.service_cfg = {"base_path": os.path.abspath(".")} + + def check_scan_id(self) -> None: + """Checks if scan_id has changed and set stopped flagged to True if it has.""" + old_scan_id = self.scaninfo.scan_id + self.scaninfo.load_scan_metadata() + if self.scaninfo.scan_id != old_scan_id: + self.stopped = True + + def _init(self) -> None: + """Initialize detector, filewriter and set default parameters""" + self.custom_prepare.on_init() + + def stage(self) -> list[object]: + """ + Stage device in preparation for a scan. + First we check if the device is already staged. Stage is idempotent, + if staged twice it should raise (we let ophyd.Device handle the raise here). + We reset the stopped flag and get the scaninfo from BEC, before calling custom_prepare.on_stage. + + Returns: + list(object): list of objects that were staged + + """ + if self._staged != Staged.no: + return super().stage() + self.stopped = False + self.scaninfo.load_scan_metadata() + self.custom_prepare.on_stage() + return super().stage() + + def pre_scan(self) -> None: + """Pre-scan logic. + + This function will be called from BEC directly before the scan core starts, and should only implement + time-critical actions. Therefore, it should also be kept as short/fast as possible. + I.e. Arming a detector in case there is a risk of timing out. + """ + self.custom_prepare.on_pre_scan() + + def trigger(self) -> DeviceStatus: + """Trigger the detector, called from BEC.""" + # pylint: disable=assignment-from-no-return + status = self.custom_prepare.on_trigger() + if isinstance(status, DeviceStatus): + return status + return super().trigger() + + def complete(self) -> None: + """Complete the acquisition, called from BEC. + + This function is called after the scan is complete, just before unstage. + We can check here with the data backend and detector if the acquisition successfully finished. + + Actions are implemented in custom_prepare.on_complete since they are beamline specific. + """ + # pylint: disable=assignment-from-no-return + status = self.custom_prepare.on_complete() + if isinstance(status, DeviceStatus): + return status + status = DeviceStatus(self) + status.set_finished() + return status + + def unstage(self) -> list[object]: + """ + Unstage device after a scan. + + We first check if the scanID has changed, thus, the scan was unexpectedly interrupted but the device was not stopped. + If that is the case, the stopped flag is set to True, which will immediately unstage the device. + + Custom_prepare.on_unstage is called to allow for BL specific logic to be executed. + + Returns: + list(object): list of objects that were unstaged + """ + self.check_scan_id() + self.custom_prepare.on_unstage() + self.stopped = False + return super().unstage() + + def stop(self, *, success=False) -> None: + """ + Stop the scan, with camera and file writer + + """ + self.custom_prepare.on_stop() + super().stop(success=success) + self.stopped = True diff --git a/phoenix_bec/local_scripts/Documentation/Base_Classes/BASE_CLASS_scans.txt b/phoenix_bec/local_scripts/Documentation/Base_Classes/BASE_CLASS_Scans.txt similarity index 99% rename from phoenix_bec/local_scripts/Documentation/Base_Classes/BASE_CLASS_scans.txt rename to phoenix_bec/local_scripts/Documentation/Base_Classes/BASE_CLASS_Scans.txt index 177b228..ed0ced0 100644 --- a/phoenix_bec/local_scripts/Documentation/Base_Classes/BASE_CLASS_scans.txt +++ b/phoenix_bec/local_scripts/Documentation/Base_Classes/BASE_CLASS_Scans.txt @@ -13,8 +13,7 @@ from copy import deepcopy from typing import TYPE_CHECKING, Dict, Literal from toolz import partition -from typeguard import typechecked - +from typeguard import typecheck from bec_lib import messages from bec_lib.bec_errors import ScanAbortion from bec_lib.client import SystemConfig diff --git a/phoenix_bec/local_scripts/Documentation/Base_Classes/BAse_CLASS_ScanBase.txt~ b/phoenix_bec/local_scripts/Documentation/Base_Classes/BAse_CLASS_ScanBase.txt~ new file mode 100644 index 0000000..c841ee1 --- /dev/null +++ b/phoenix_bec/local_scripts/Documentation/Base_Classes/BAse_CLASS_ScanBase.txt~ @@ -0,0 +1,2025 @@ +https://bec.readthedocs.io/en/latest/api_reference/_autosummary/bec_server.scan_server.scans.ScanBase.html#bec_server.scan_server.scans.ScanBase + + +Sequence of events: + + + read_scan_motors + + prepare_positions + + _calculate_positions + + _optimize_trajectory + + _set_position_offset + + _check_limits + + open_scan + + stage + + run_baseline_reading + + pre_scan + + scan_core + + finalize + + unstage + + cleanup + + + +class ScanBase(*args, device_manager: DeviceManagerBase +| None = None, parameter: dict \ +| None = None, exp_time: float = 0, readout_time: float = 0, acquisition_config: dict +| None = None, settling_time: float = 0, relative: bool = False, burst_at_each_point: int = 1, +frames_per_trigger: int = 1, optim_trajectory: Literal['corridor', None] +| None = None, monitored: list | None = None, metadata: dict | None = None, **kwargs) + + + +Methods + + + + + + + + + + + + + + + + + + + +sourece code in bec_server.scan_server.scans + + + +from __future__ import annotations + +import ast +import enum +import threading +import time +import uuid +from abc import ABC, abstractmethod +from typing import Any, Literal + +import numpy as np + +from bec_lib.device import DeviceBase +from bec_lib.devicemanager import DeviceManagerBase +from bec_lib.endpoints import MessageEndpoints +from bec_lib.logger import bec_logger + +from .errors import LimitError, ScanAbortion +from .path_optimization import PathOptimizerMixin +from .scan_stubs import ScanStubs + +logger = bec_logger.logger + + + +[docs] +class ScanArgType(str, enum.Enum): + DEVICE = "device" + FLOAT = "float" + INT = "int" + BOOL = "boolean" + STR = "str" + LIST = "list" + DICT = "dict" + + + + +[docs] +def unpack_scan_args(scan_args: dict[str, Any]) -> list: + """unpack_scan_args unpacks the scan arguments and returns them as a tuple. + + Args: + scan_args (dict[str, Any]): scan arguments + + Returns: + list: list of arguments + """ + args = [] + if not scan_args: + return args + if not isinstance(scan_args, dict): + return scan_args + for cmd_name, cmd_args in scan_args.items(): + args.append(cmd_name) + args.extend(cmd_args) + return args + + + + +[docs] +def get_2D_raster_pos(axis, snaked=True): + """get_2D_raster_post calculates and returns the positions for a 2D + + snaked==True: + ->->->->- + -<-<-<-<- + ->->->->- + snaked==False: + ->->->->- + ->->->->- + ->->->->- + + Args: + axis (list): list of positions for each axis + snaked (bool, optional): If true, the positions will be calculcated for a snake scan. Defaults to True. + + Returns: + array: calculated positions + """ + + x_grid, y_grid = np.meshgrid(axis[0], axis[1]) + if snaked: + y_grid.T[::2] = np.fliplr(y_grid.T[::2]) + x_flat = x_grid.T.ravel() + y_flat = y_grid.T.ravel() + positions = np.vstack((x_flat, y_flat)).T + return positions + + + +# pylint: disable=too-many-arguments + +[docs] +def get_fermat_spiral_pos( + m1_start, m1_stop, m2_start, m2_stop, step=1, spiral_type=0, center=False +): + """get_fermat_spiral_pos calculates and returns the positions for a Fermat spiral scan. + + Args: + m1_start (float): start position motor 1 + m1_stop (float): end position motor 1 + m2_start (float): start position motor 2 + m2_stop (float): end position motor 2 + step (float, optional): Step size. Defaults to 1. + spiral_type (float, optional): Angular offset in radians that determines the shape of the spiral. + A spiral with spiral_type=2 is the same as spiral_type=0. Defaults to 0. + center (bool, optional): Add a center point. Defaults to False. + + Returns: + array: calculated positions in the form [[m1, m2], ...] + """ + positions = [] + phi = 2 * np.pi * ((1 + np.sqrt(5)) / 2.0) + spiral_type * np.pi + + start = int(not center) + + length_axis1 = abs(m1_stop - m1_start) + length_axis2 = abs(m2_stop - m2_start) + n_max = int(length_axis1 * length_axis2 * 3.2 / step / step) + + for ii in range(start, n_max): + radius = step * 0.57 * np.sqrt(ii) + if abs(radius * np.sin(ii * phi)) > length_axis1 / 2: + continue + if abs(radius * np.cos(ii * phi)) > length_axis2 / 2: + continue + positions.extend([(radius * np.sin(ii * phi), radius * np.cos(ii * phi))]) + return np.array(positions) + + + + +[docs] +def get_round_roi_scan_positions(lx: float, ly: float, dr: float, nth: int, cenx=0, ceny=0): + """ + get_round_roi_scan_positions calculates and returns the positions for a round scan in a rectangular region of interest. + + Args: + lx (float): length in x + ly (float): length in y + dr (float): step size + nth (int): number of angles in the inner ring + cenx (int, optional): center in x. Defaults to 0. + ceny (int, optional): center in y. Defaults to 0. + + Returns: + array: calculated positions in the form [[x, y], ...] + """ + positions = [] + nr = 1 + int(np.floor(max([lx, ly]) / dr)) + for ir in range(1, nr + 2): + rr = ir * dr + dth = 2 * np.pi / (nth * ir) + pos = [ + (rr * np.cos(ith * dth) + cenx, rr * np.sin(ith * dth) + ceny) + for ith in range(nth * ir) + if np.abs(rr * np.cos(ith * dth)) < lx / 2 and np.abs(rr * np.sin(ith * dth)) < ly / 2 + ] + positions.extend(pos) + return np.array(positions) + + + + +[docs] +def get_round_scan_positions(r_in: float, r_out: float, nr: int, nth: int, cenx=0, ceny=0): + """ + get_round_scan_positions calculates and returns the positions for a round scan. + + Args: + r_in (float): inner radius + r_out (float): outer radius + nr (int): number of radii + nth (int): number of angles in the inner ring + cenx (int, optional): center in x. Defaults to 0. + ceny (int, optional): center in y. Defaults to 0. + + Returns: + array: calculated positions in the form [[x, y], ...] + + """ + positions = [] + dr = (r_in - r_out) / nr + for ir in range(1, nr + 2): + rr = r_in + ir * dr + dth = 2 * np.pi / (nth * ir) + positions.extend( + [ + (rr * np.sin(ith * dth) + cenx, rr * np.cos(ith * dth) + ceny) + for ith in range(nth * ir) + ] + ) + return np.array(positions, dtype=float) + + + + +[docs] +class RequestBase(ABC): + """ + Base class for all scan requests. + """ + + scan_name = "" + arg_input = {} + arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None} + gui_args = {} + required_kwargs = [] + return_to_start_after_abort = False + use_scan_progress_report = False + + def __init__( + self, + *args, + device_manager: DeviceManagerBase = None, + monitored: list = None, + parameter: dict = None, + metadata: dict = None, + **kwargs, + ) -> None: + super().__init__() + self._shutdown_event = threading.Event() + self.parameter = parameter if parameter is not None else {} + self.caller_args = self.parameter.get("args", {}) + self.caller_kwargs = self.parameter.get("kwargs", {}) + self.metadata = metadata + self.device_manager = device_manager + self.connector = device_manager.connector + self.DIID = 0 + self.scan_motors = [] + self.positions = [] + self._pre_scan_macros = [] + self._scan_report_devices = None + self._get_scan_motors() + self.readout_priority = { + "monitored": monitored if monitored is not None else [], + "baseline": [], + "on_request": [], + "async": [], + } + self.update_readout_priority() + if metadata is None: + self.metadata = {} + self.stubs = ScanStubs( + connector=self.device_manager.connector, + device_msg_callback=self.device_msg_metadata, + shutdown_event=self._shutdown_event, + ) + + @property + def scan_report_devices(self): + """devices to be included in the scan report""" + if self._scan_report_devices is None: + return self.readout_priority["monitored"] + return self._scan_report_devices + + @scan_report_devices.setter + def scan_report_devices(self, devices: list): + self._scan_report_devices = devices + + def device_msg_metadata(self): + default_metadata = {"readout_priority": "monitored", "DIID": self.DIID} + metadata = {**default_metadata, **self.metadata} + self.DIID += 1 + return metadata + + @staticmethod + def _get_func_name_from_macro(macro: str): + return ast.parse(macro).body[0].name + + +[docs] + def run_pre_scan_macros(self): + """run pre scan macros if any""" + macros = self.device_manager.connector.lrange(MessageEndpoints.pre_scan_macros(), 0, -1) + for macro in macros: + macro = macro.value.strip() + func_name = self._get_func_name_from_macro(macro) + exec(macro) + eval(func_name)(self.device_manager.devices, self) + + + def initialize(self): + self.run_pre_scan_macros() + + def _check_limits(self): + logger.debug("check limits") + for ii, dev in enumerate(self.scan_motors): + low_limit, high_limit = self.device_manager.devices[dev].limits + if low_limit >= high_limit: + # if both limits are equal or low > high, no restrictions ought to be applied + return + for pos in self.positions: + pos_axis = pos[ii] + if not low_limit <= pos_axis <= high_limit: + raise LimitError( + f"Target position {pos} for motor {dev} is outside of range: [{low_limit}," + f" {high_limit}]" + ) + + def _get_scan_motors(self): + if len(self.caller_args) == 0: + return + if self.arg_bundle_size.get("bundle"): + self.scan_motors = list(self.caller_args.keys()) + return + for motor in self.caller_args: + if motor not in self.device_manager.devices: + continue + self.scan_motors.append(motor) + + +[docs] + def update_readout_priority(self): + """update the readout priority for this request. Typically the monitored devices should also include the scan motors.""" + self.readout_priority["monitored"].extend(self.scan_motors) + self.readout_priority["monitored"] = list( + sorted( + set(self.readout_priority["monitored"]), + key=self.readout_priority["monitored"].index, + ) + ) + + + @abstractmethod + def run(self): + pass + + + + +[docs] +class ScanBase(RequestBase, PathOptimizerMixin): + """ + Base class for all scans. The following methods are called in the following order during the scan + 1. initialize + - run_pre_scan_macros + 2. read_scan_motors + 3. prepare_positions + - _calculate_positions + - _optimize_trajectory + - _set_position_offset + - _check_limits + 4. open_scan + 5. stage + 6. run_baseline_reading + 7. pre_scan + 8. scan_core + 9. finalize + 10. unstage + 11. cleanup + + A subclass of ScanBase must implement the following methods: + - _calculate_positions + + Attributes: + scan_name (str): name of the scan + scan_type (str): scan type. Can be "step" or "fly" + arg_input (list): list of scan argument types + arg_bundle_size (dict): + - bundle: number of arguments that are bundled together + - min: minimum number of bundles + - max: maximum number of bundles + required_kwargs (list): list of required kwargs + return_to_start_after_abort (bool): if True, the scan will return to the start position after an abort + """ + + scan_name = "" + scan_type = "step" + required_kwargs = ["required"] + return_to_start_after_abort = True + use_scan_progress_report = True + + # perform pre-move action before the pre_scan trigger is sent + pre_move = True + + def __init__( + self, + *args, + device_manager: DeviceManagerBase = None, + parameter: dict = None, + exp_time: float = 0, + readout_time: float = 0, + acquisition_config: dict = None, + settling_time: float = 0, + relative: bool = False, + burst_at_each_point: int = 1, + frames_per_trigger: int = 1, + optim_trajectory: Literal["corridor", None] = None, + monitored: list = None, + metadata: dict = None, + **kwargs, + ): + super().__init__( + *args, + device_manager=device_manager, + monitored=monitored, + parameter=parameter, + metadata=metadata, + **kwargs, + ) + self.DIID = 0 + self.point_id = 0 + self.exp_time = exp_time + self.readout_time = readout_time + self.acquisition_config = acquisition_config + self.settling_time = settling_time + self.relative = relative + self.burst_at_each_point = burst_at_each_point + self.frames_per_trigger = frames_per_trigger + self.optim_trajectory = optim_trajectory + self.burst_index = 0 + + self.start_pos = [] + self.positions = [] + self.num_pos = 0 + + if self.scan_name == "": + raise ValueError("scan_name cannot be empty") + + if acquisition_config is None or "default" not in acquisition_config: + self.acquisition_config = { + "default": {"exp_time": self.exp_time, "readout_time": self.readout_time} + } + + @property + def monitor_sync(self): + """ + monitor_sync is a property that defines how monitored devices are synchronized. + It can be either bec or the name of the device. If set to bec, the scan bundler + will synchronize scan segments based on the bec triggered readouts. If set to a device name, + the scan bundler will synchronize based on the readouts of the device, i.e. upon + receiving a new readout of the device, cached monitored readings will be added + to the scan segment. + """ + return "bec" + + +[docs] + def read_scan_motors(self): + """read the scan motors""" + yield from self.stubs.read_and_wait(device=self.scan_motors, wait_group="scan_motor") + + + @abstractmethod + def _calculate_positions(self) -> None: + """Calculate the positions""" + + def _optimize_trajectory(self): + if not self.optim_trajectory: + return + if self.optim_trajectory == "corridor": + self.positions = self.optimize_corridor(self.positions) + return + return + + +[docs] + def prepare_positions(self): + """prepare the positions for the scan""" + self._calculate_positions() + self._optimize_trajectory() + self.num_pos = len(self.positions) * self.burst_at_each_point + yield from self._set_position_offset() + self._check_limits() + + + +[docs] + def open_scan(self): + """open the scan""" + positions = self.positions if isinstance(self.positions, list) else self.positions.tolist() + yield from self.stubs.open_scan( + scan_motors=self.scan_motors, + readout_priority=self.readout_priority, + num_pos=self.num_pos, + positions=positions, + scan_name=self.scan_name, + scan_type=self.scan_type, + ) + + + +[docs] + def stage(self): + """call the stage procedure""" + yield from self.stubs.stage() + + + +[docs] + def run_baseline_reading(self): + """perform a reading of all baseline devices""" + yield from self.stubs.baseline_reading() + + + def _set_position_offset(self): + for dev in self.scan_motors: + val = yield from self.stubs.send_rpc_and_wait(dev, "read") + self.start_pos.append(val[dev].get("value")) + if self.relative: + self.positions += self.start_pos + + +[docs] + def close_scan(self): + """close the scan""" + yield from self.stubs.close_scan() + + + +[docs] + def scan_core(self): + """perform the scan core procedure""" + for ind, pos in self._get_position(): + for self.burst_index in range(self.burst_at_each_point): + yield from self._at_each_point(ind, pos) + self.burst_index = 0 + + + +[docs] + def return_to_start(self): + """return to the start position""" + yield from self._move_scan_motors_and_wait(self.start_pos) + + + +[docs] + def finalize(self): + """finalize the scan""" + yield from self.return_to_start() + yield from self.stubs.wait(wait_type="read", group="primary", wait_group="readout_primary") + yield from self.stubs.complete(device=None) + + + +[docs] + def unstage(self): + """call the unstage procedure""" + yield from self.stubs.unstage() + + + +[docs] + def cleanup(self): + """call the cleanup procedure""" + yield from self.close_scan() + + + def _at_each_point(self, ind=None, pos=None): + yield from self._move_scan_motors_and_wait(pos) + if ind > 0: + yield from self.stubs.wait( + wait_type="read", group="primary", wait_group="readout_primary" + ) + time.sleep(self.settling_time) + yield from self.stubs.trigger(group="trigger", point_id=self.point_id) + yield from self.stubs.wait(wait_type="trigger", group="trigger", wait_time=self.exp_time) + yield from self.stubs.read( + group="primary", wait_group="readout_primary", point_id=self.point_id + ) + yield from self.stubs.wait( + wait_type="read", group="scan_motor", wait_group="readout_primary" + ) + + self.point_id += 1 + + def _move_scan_motors_and_wait(self, pos): + if not isinstance(pos, list) and not isinstance(pos, np.ndarray): + pos = [pos] + if len(pos) == 0: + return + for ind, val in enumerate(self.scan_motors): + yield from self.stubs.set(device=val, value=pos[ind], wait_group="scan_motor") + + yield from self.stubs.wait(wait_type="move", group="scan_motor", wait_group="scan_motor") + + def _get_position(self): + for ind, pos in enumerate(self.positions): + yield (ind, pos) + + def scan_report_instructions(self): + yield None + + +[docs] + def pre_scan(self): + """ + pre scan procedure. This method is called before the scan_core method and can be used to + perform additional tasks before the scan is started. This + """ + if self.pre_move and len(self.positions) > 0: + yield from self._move_scan_motors_and_wait(self.positions[0]) + yield from self.stubs.pre_scan() + + + +[docs] + def run(self): + """run the scan. This method is called by the scan server and is the main entry point for the scan.""" + self.initialize() + yield from self.read_scan_motors() + yield from self.prepare_positions() + yield from self.scan_report_instructions() + yield from self.open_scan() + yield from self.stage() + yield from self.run_baseline_reading() + yield from self.pre_scan() + yield from self.scan_core() + yield from self.finalize() + yield from self.unstage() + yield from self.cleanup() + + + @classmethod + def scan(cls, *args, **kwargs): + scan = cls(args, **kwargs) + yield from scan.run() + + + + +[docs] +class SyncFlyScanBase(ScanBase, ABC): + """ + Fly scan base class for all synchronous fly scans. A synchronous fly scan is a scan where the flyer is + synced with the monitored devices. + Classes inheriting from SyncFlyScanBase must at least implement the scan_core method and the monitor_sync property. + """ + + scan_type = "fly" + pre_move = False + + def _get_scan_motors(self): + # fly scans normally do not have stepper scan motors so + # the default way of retrieving scan motors is not applicable + return [] + + @property + @abstractmethod + def monitor_sync(self) -> str: + """ + monitor_sync is the flyer that will be used to synchronize the monitor readings in the scan bundler. + The return value should be the name of the flyer device. + """ + + def _calculate_positions(self) -> None: + pass + + +[docs] + def read_scan_motors(self): + yield None + + + +[docs] + def prepare_positions(self): + yield None + + + +[docs] + @abstractmethod + def scan_core(self): + """perform the scan core procedure""" + + + ############################################ + # Example of how to kickoff and wait for a flyer: + ############################################ + + # yield from self.stubs.kickoff(device=self.flyer, parameter=self.caller_kwargs) + # yield from self.stubs.complete(device=self.flyer) + # target_diid = self.DIID - 1 + + # while True: + # status = self.stubs.get_req_status( + # device=self.flyer, RID=self.metadata["RID"], DIID=target_diid + # ) + # progress = self.stubs.get_device_progress( + # device=self.flyer, RID=self.metadata["RID"] + # ) + # if progress: + # self.num_pos = progress + # if status: + # break + # time.sleep(1) + + # def _get_flyer_status(self) -> list: + # connector = self.device_manager.connector + + # pipe = connector.pipeline() + # connector.lrange( + # MessageEndpoints.device_req_status_container(self.metadata["RID"]), 0, -1, pipe + # ) + # connector.get(MessageEndpoints.device_readback(self.flyer), pipe) + # return connector.execute_pipeline(pipe) + + + +[docs] +class AsyncFlyScanBase(SyncFlyScanBase): + """ + Fly scan base class for all asynchronous fly scans. An asynchronous fly scan is a scan where the flyer is + not synced with the monitored devices. + Classes inheriting from AsyncFlyScanBase must at least implement the scan_core method. + """ + + @property + def monitor_sync(self): + return "bec" + + + + +[docs] +class ScanStub(RequestBase): + pass + + + + +[docs] +class OpenScanDef(ScanStub): + scan_name = "open_scan_def" + + def run(self): + yield from self.stubs.open_scan_def() + + + + +[docs] +class CloseScanDef(ScanStub): + scan_name = "close_scan_def" + + def run(self): + yield from self.stubs.close_scan_def() + + + + +[docs] +class CloseScanGroup(ScanStub): + scan_name = "close_scan_group" + + def run(self): + yield from self.stubs.close_scan_group() + + + + +[docs] +class DeviceRPC(ScanStub): + scan_name = "device_rpc" + arg_input = { + "device": ScanArgType.DEVICE, + "func": ScanArgType.STR, + "args": ScanArgType.LIST, + "kwargs": ScanArgType.DICT, + } + arg_bundle_size = {"bundle": len(arg_input), "min": 1, "max": 1} + + def _get_scan_motors(self): + pass + + def run(self): + # different to calling self.device_rpc, this procedure will not wait for a reply and therefore not check any errors. + yield from self.stubs.rpc(device=self.parameter.get("device"), parameter=self.parameter) + + + + +[docs] +class Move(RequestBase): + scan_name = "mv" + arg_input = {"device": ScanArgType.DEVICE, "target": ScanArgType.FLOAT} + arg_bundle_size = {"bundle": len(arg_input), "min": 1, "max": None} + required_kwargs = ["relative"] + + def __init__(self, *args, relative=False, **kwargs): + """ + Move device(s) to an absolute position + Args: + *args (Device, float): pairs of device / position arguments + relative (bool): if True, move relative to current position + + Returns: + ScanReport + + Examples: + >>> scans.mv(dev.samx, 1, dev.samy,2) + """ + super().__init__(**kwargs) + self.relative = relative + self.start_pos = [] + + def _calculate_positions(self): + self.positions = np.asarray([[val[0] for val in self.caller_args.values()]], dtype=float) + + def _at_each_point(self, pos=None): + for ii, motor in enumerate(self.scan_motors): + yield from self.stubs.set( + device=motor, + value=self.positions[0][ii], + wait_group="scan_motor", + metadata={"response": True}, + ) + + def cleanup(self): + pass + + def _set_position_offset(self): + self.start_pos = [] + for dev in self.scan_motors: + val = yield from self.stubs.send_rpc_and_wait(dev, "read") + self.start_pos.append(val[dev].get("value")) + if not self.relative: + return + self.positions += self.start_pos + + def prepare_positions(self): + self._calculate_positions() + yield from self._set_position_offset() + self._check_limits() + + def scan_report_instructions(self): + yield None + + def run(self): + self.initialize() + yield from self.prepare_positions() + yield from self.scan_report_instructions() + yield from self._at_each_point() + + + + +[docs] +class UpdatedMove(Move): + """ + Move device(s) to an absolute position and show live updates. This is a blocking call. For non-blocking use Move. + Args: + *args (Device, float): pairs of device / position arguments + relative (bool): if True, move relative to current position + + Returns: + ScanReport + + Examples: + >>> scans.umv(dev.samx, 1, dev.samy,2) + """ + + scan_name = "umv" + + def _at_each_point(self, pos=None): + for ii, motor in enumerate(self.scan_motors): + yield from self.stubs.set( + device=motor, value=self.positions[0][ii], wait_group="scan_motor" + ) + + for motor in self.scan_motors: + yield from self.stubs.wait(wait_type="move", device=motor, wait_group="scan_motor") + + def scan_report_instructions(self): + yield from self.stubs.scan_report_instruction( + { + "readback": { + "RID": self.metadata["RID"], + "devices": self.scan_motors, + "start": self.start_pos, + "end": self.positions[0], + } + } + ) + + + + +[docs] +class Scan(ScanBase): + scan_name = "grid_scan" + arg_input = { + "device": ScanArgType.DEVICE, + "start": ScanArgType.FLOAT, + "stop": ScanArgType.FLOAT, + "steps": ScanArgType.INT, + } + arg_bundle_size = {"bundle": len(arg_input), "min": 2, "max": None} + required_kwargs = ["relative"] + gui_config = { + "Scan Parameters": ["exp_time", "settling_time", "burst_at_each_point", "relative"] + } + + def __init__( + self, + *args, + exp_time: float = 0, + settling_time: float = 0, + relative: bool = False, + burst_at_each_point: int = 1, + **kwargs, + ): + """ + Scan two or more motors in a grid. + + Args: + *args (Device, float, float, int): pairs of device / start / stop / steps arguments + exp_time (float): exposure time in seconds. Default is 0. + settling_time (float): settling time in seconds. Default is 0. + relative (bool): if True, the motors will be moved relative to their current position. Default is False. + burst_at_each_point (int): number of exposures at each point. Default is 1. + + Returns: + ScanReport + + Examples: + >>> scans.grid_scan(dev.motor1, -5, 5, 10, dev.motor2, -5, 5, 10, exp_time=0.1, relative=True) + + """ + super().__init__( + exp_time=exp_time, + settling_time=settling_time, + relative=relative, + burst_at_each_point=burst_at_each_point, + **kwargs, + ) + + def _calculate_positions(self): + axis = [] + for _, val in self.caller_args.items(): + axis.append(np.linspace(val[0], val[1], val[2], dtype=float)) + if len(axis) > 1: + self.positions = get_2D_raster_pos(axis) + else: + self.positions = np.vstack(tuple(axis)).T + + + + +[docs] +class FermatSpiralScan(ScanBase): + scan_name = "fermat_scan" + required_kwargs = ["step", "relative"] + gui_config = { + "Device 1": ["motor1", "start_motor1", "stop_motor1"], + "Device 2": ["motor2", "start_motor2", "stop_motor2"], + "Movement Parameters": ["step", "relative"], + "Acquisition Parameters": ["exp_time", "settling_time", "burst_at_each_point"], + } + + def __init__( + self, + motor1: DeviceBase, + start_motor1: float, + stop_motor1: float, + motor2: DeviceBase, + start_motor2: float, + stop_motor2: float, + step: float = 0.1, + exp_time: float = 0, + settling_time: float = 0, + relative: bool = False, + burst_at_each_point: int = 1, + spiral_type: float = 0, + optim_trajectory: Literal["corridor", None] = None, + **kwargs, + ): + """ + A scan following Fermat's spiral. + + Args: + motor1 (DeviceBase): first motor + start_motor1 (float): start position motor 1 + stop_motor1 (float): end position motor 1 + motor2 (DeviceBase): second motor + start_motor2 (float): start position motor 2 + stop_motor2 (float): end position motor 2 + step (float): step size in motor units. Default is 0.1. + exp_time (float): exposure time in seconds. Default is 0. + settling_time (float): settling time in seconds. Default is 0. + relative (bool): if True, the motors will be moved relative to their current position. Default is False. + burst_at_each_point (int): number of exposures at each point. Default is 1. + spiral_type (float): type of spiral to use. Default is 0. + optim_trajectory (str): trajectory optimization method. Default is None. Options are "corridor" and "none". + + Returns: + ScanReport + + Examples: + >>> scans.fermat_scan(dev.motor1, -5, 5, dev.motor2, -5, 5, step=0.5, exp_time=0.1, relative=True, optim_trajectory="corridor") + + """ + super().__init__( + exp_time=exp_time, + settling_time=settling_time, + relative=relative, + burst_at_each_point=burst_at_each_point, + optim_trajectory=optim_trajectory, + **kwargs, + ) + self.motor1 = motor1 + self.motor2 = motor2 + self.start_motor1 = start_motor1 + self.stop_motor1 = stop_motor1 + self.start_motor2 = start_motor2 + self.stop_motor2 = stop_motor2 + self.step = step + self.spiral_type = spiral_type + + def _calculate_positions(self): + self.positions = get_fermat_spiral_pos( + self.start_motor1, + self.stop_motor1, + self.start_motor2, + self.stop_motor2, + step=self.step, + spiral_type=self.spiral_type, + center=False, + ) + + + + +[docs] +class RoundScan(ScanBase): + scan_name = "round_scan" + required_kwargs = ["relative"] + gui_config = { + "Motors": ["motor_1", "motor_2"], + "Ring Parameters": ["inner_ring", "outer_ring", "number_of_rings", "pos_in_first_ring"], + "Scan Parameters": ["relative", "burst_at_each_point"], + } + + def __init__( + self, + motor_1: DeviceBase, + motor_2: DeviceBase, + inner_ring: float, + outer_ring: float, + number_of_rings: int, + pos_in_first_ring: int, + relative: bool = False, + burst_at_each_point: int = 1, + **kwargs, + ): + """ + A scan following a round shell-like pattern. + + Args: + motor_1 (DeviceBase): first motor + motor_2 (DeviceBase): second motor + inner_ring (float): inner radius + outer_ring (float): outer radius + number_of_rings (int): number of rings + pos_in_first_ring (int): number of positions in the first ring + relative (bool): if True, the motors will be moved relative to their current position. Default is False. + burst_at_each_point (int): number of exposures at each point. Default is 1. + + Returns: + ScanReport + + Examples: + >>> scans.round_scan(dev.motor1, dev.motor2, 0, 25, 5, 3, exp_time=0.1, relative=True) + + """ + super().__init__(relative=relative, burst_at_each_point=burst_at_each_point, **kwargs) + self.axis = [] + self.motor_1 = motor_1 + self.motor_2 = motor_2 + self.inner_ring = inner_ring + self.outer_ring = outer_ring + self.number_of_rings = number_of_rings + self.pos_in_first_ring = pos_in_first_ring + + def _get_scan_motors(self): + caller_args = list(self.caller_args.items())[0] + self.scan_motors = [caller_args[0], caller_args[1][0]] + + def _calculate_positions(self): + self.positions = get_round_scan_positions( + r_in=self.inner_ring, + r_out=self.outer_ring, + nr=self.number_of_rings, + nth=self.pos_in_first_ring, + ) + + + + +[docs] +class ContLineScan(ScanBase): + scan_name = "cont_line_scan" + required_kwargs = ["steps", "relative"] + scan_type = "step" + gui_config = { + "Device": ["device", "start", "stop"], + "Movement Parameters": ["steps", "relative", "offset", "atol"], + "Acquisition Parameters": ["exp_time", "burst_at_each_point"], + } + + def __init__( + self, + device: DeviceBase, + start: float, + stop: float, + offset: float = 1, + atol: float = 0.5, + exp_time: float = 0, + steps: int = 10, + relative: bool = False, + burst_at_each_point: int = 1, + **kwargs, + ): + """ + A continuous line scan. Use this scan if you want to move a motor continuously from start to stop position whilst + acquiring data at predefined positions. The scan will abort if the motor moves too fast and a point is skipped. + + Args: + device (DeviceBase): motor to move continuously from start to stop position + start (float): start position + stop (float): stop position + exp_time (float): exposure time in seconds. Default is 0. + steps (int): number of steps. Default is 10. + relative (bool): if True, the motors will be moved relative to their current position. Default is False. + burst_at_each_point (int): number of exposures at each point. Default is 1. + offset (float): offset in motor units. Default is 1. + atol (float): absolute tolerance for position check. Default is 0.5. + + Returns: + ScanReport + + Examples: + >>> scans.cont_line_scan(dev.motor1, -5, 5, steps=10, exp_time=0.1, relative=True) + + """ + super().__init__( + exp_time=exp_time, relative=relative, burst_at_each_point=burst_at_each_point, **kwargs + ) + self.steps = steps + self.device = device + self.offset = offset + self.start = start + self.stop = stop + self.atol = atol + self.motor_velocity = self.device_manager.devices[self.device].read()[ + f"{self.device}_velocity" + ]["value"] + + def _calculate_positions(self) -> None: + self.positions = np.linspace(self.start, self.stop, self.steps, dtype=float)[ + np.newaxis, : + ].T + # Check if the motor is moving faster than the exp_time + dist_setp = self.positions[1][0] - self.positions[0][0] + time_per_step = dist_setp / self.motor_velocity + if time_per_step < self.exp_time: + raise ScanAbortion( + f"Motor {self.device} is moving too fast. Time per step: {time_per_step:.03f} < Exp_time: {self.exp_time:.03f}." + + f" Consider reducing speed {self.motor_velocity} or reducing exp_time {self.exp_time}" + ) + + def _check_limits(self): + logger.debug("check limits") + low_limit, high_limit = self.device_manager.devices[self.device].limits + if low_limit >= high_limit: + # if both limits are equal or low > high, no restrictions ought to be applied + return + for ii, pos in enumerate(self.positions): + if ii == 0: + pos_axis = pos - self.offset + else: + pos_axis = pos + if not low_limit <= pos_axis <= high_limit: + raise LimitError( + f"Target position {pos} for motor {self.device} is outside of range: [{low_limit}," + f" {high_limit}]" + ) + + def _at_each_point(self, _pos=None): + yield from self.stubs.trigger(group="trigger", point_id=self.point_id) + yield from self.stubs.read(group="primary", wait_group="primary", point_id=self.point_id) + self.point_id += 1 + + +[docs] + def scan_core(self): + yield from self._move_scan_motors_and_wait(self.positions[0] - self.offset) + # send the slow motor on its way + yield from self.stubs.set( + device=self.scan_motors[0], value=self.positions[-1][0], wait_group="scan_motor" + ) + + while self.point_id < len(self.positions[:]): + cont_motor_positions = self.device_manager.devices[self.scan_motors[0]].readback.read() + + if not cont_motor_positions: + continue + + cont_motor_positions = cont_motor_positions[self.scan_motors[0]].get("value") + logger.debug(f"Current position of {self.scan_motors[0]}: {cont_motor_positions}") + # TODO: consider the alternative, which triggers a readout for each point right after the motor passed it + # if cont_motor_positions > self.positions[self.point_id][0]: + if np.isclose(cont_motor_positions, self.positions[self.point_id][0], atol=self.atol): + logger.debug(f"reading point {self.point_id}") + yield from self._at_each_point() + continue + if cont_motor_positions > self.positions[self.point_id][0]: + raise ScanAbortion( + f"Skipped point {self.point_id + 1}:" + f"Consider reducing speed {self.device_manager.devices[self.scan_motors[0]].velocity.get()}, " + f"increasing the atol {self.atol}, or increasing the offset {self.offset}" + ) + + + + + +[docs] +class ContLineFlyScan(AsyncFlyScanBase): + scan_name = "cont_line_fly_scan" + required_kwargs = [] + use_scan_progress_report = False + gui_config = {"Device": ["motor", "start", "stop"], "Scan Parameters": ["exp_time", "relative"]} + + def __init__( + self, + motor: DeviceBase, + start: float, + stop: float, + exp_time: float = 0, + relative: bool = False, + **kwargs, + ): + """ + A continuous line fly scan. Use this scan if you want to move a motor continuously from start to stop position whilst + acquiring data as fast as possible (respecting the exposure time). The scan will stop automatically when the motor + reaches the end position. + + Args: + motor (DeviceBase): motor to move continuously from start to stop position + start (float): start position + stop (float): stop position + exp_time (float): exposure time in seconds. Default is 0. + relative (bool): if True, the motor will be moved relative to its current position. Default is False. + + Returns: + ScanReport + + Examples: + >>> scans.cont_line_fly_scan(dev.sam_rot, 0, 180, exp_time=0.1) + + """ + super().__init__(relative=relative, exp_time=exp_time, **kwargs) + self.motor = motor + self.start = start + self.stop = stop + self.device_move_request_id = str(uuid.uuid4()) + + +[docs] + def prepare_positions(self): + self.positions = np.array([[self.start], [self.stop]], dtype=float) + self.num_pos = None + yield from self._set_position_offset() + + + def scan_report_instructions(self): + yield from self.stubs.scan_report_instruction( + { + "readback": { + "RID": self.device_move_request_id, + "devices": [self.motor], + "start": [self.start], + "end": [self.stop], + } + } + ) + + +[docs] + def scan_core(self): + # move the motor to the start position + yield from self.stubs.set_and_wait(device=[self.motor], positions=self.positions[0]) + + # start the flyer + flyer_request = yield from self.stubs.set_with_response( + device=self.motor, value=self.positions[1][0], request_id=self.device_move_request_id + ) + + while True: + yield from self.stubs.trigger(group="trigger", point_id=self.point_id) + yield from self.stubs.read_and_wait( + group="primary", wait_group="readout_primary", point_id=self.point_id + ) + yield from self.stubs.wait( + wait_type="trigger", group="trigger", wait_time=self.exp_time + ) + if self.stubs.request_is_completed(flyer_request): + break + self.point_id += 1 + + + +[docs] + def finalize(self): + yield from super().finalize() + self.num_pos = self.point_id + 1 + + + + + +[docs] +class RoundScanFlySim(SyncFlyScanBase): + scan_name = "round_scan_fly" + scan_type = "fly" + pre_move = False + required_kwargs = ["relative"] + gui_config = { + "Fly Parameters": ["flyer", "relative"], + "Ring Parameters": ["inner_ring", "outer_ring", "number_of_rings", "number_pos"], + } + + def __init__( + self, + flyer: DeviceBase, + inner_ring: float, + outer_ring: float, + number_of_rings: int, + number_pos: int, + relative: bool = False, + **kwargs, + ): + """ + A fly scan following a round shell-like pattern. + + Args: + flyer (DeviceBase): flyer device + inner_ring (float): inner radius + outer_ring (float): outer radius + number_of_rings (int): number of rings + number_pos (int): number of positions in the first ring + relative (bool): if True, the motors will be moved relative to their current position. Default is False. + burst_at_each_point (int): number of exposures at each point. Default is 1. + + Returns: + ScanReport + + Examples: + >>> scans.round_scan_fly(dev.flyer_sim, 0, 50, 5, 3, exp_time=0.1, relative=True) + + """ + super().__init__(**kwargs) + self.flyer = flyer + self.inner_ring = inner_ring + self.outer_ring = outer_ring + self.number_of_rings = number_of_rings + self.number_pos = number_pos + + def _get_scan_motors(self): + self.scan_motors = [] + + @property + def monitor_sync(self): + return self.flyer + + +[docs] + def prepare_positions(self): + self._calculate_positions() + self.num_pos = len(self.positions) * self.burst_at_each_point + self._check_limits() + yield None + + + +[docs] + def finalize(self): + yield + + + def _calculate_positions(self): + self.positions = get_round_scan_positions( + r_in=self.inner_ring, + r_out=self.outer_ring, + nr=self.number_of_rings, + nth=self.number_pos, + ) + + +[docs] + def scan_core(self): + yield from self.stubs.kickoff( + device=self.flyer, + parameter={ + "num_pos": self.num_pos, + "positions": self.positions.tolist(), + "exp_time": self.exp_time, + }, + ) + target_DIID = self.DIID - 1 + + while True: + yield from self.stubs.read_and_wait(group="primary", wait_group="readout_primary") + status = self.device_manager.connector.get(MessageEndpoints.device_status(self.flyer)) + if status: + device_is_idle = status.content.get("status", 1) == 0 + matching_RID = self.metadata.get("RID") == status.metadata.get("RID") + matching_DIID = target_DIID == status.metadata.get("DIID") + if device_is_idle and matching_RID and matching_DIID: + break + + time.sleep(1) + logger.debug("reading monitors") + + + + + +[docs] +class RoundROIScan(ScanBase): + scan_name = "round_roi_scan" + required_kwargs = ["dr", "nth", "relative"] + gui_config = { + "Motor 1": ["motor_1", "width_1"], + "Motor 2": ["motor_2", "width_2"], + "Shell Parametes": ["dr", "nth"], + "Acquisition Parameters": ["exp_time", "relative", "burst_at_each_point"], + } + + def __init__( + self, + motor_1: DeviceBase, + width_1: float, + motor_2: DeviceBase, + width_2: float, + dr: float = 1, + nth: int = 5, + exp_time: float = 0, + relative: bool = False, + burst_at_each_point: int = 1, + **kwargs, + ): + """ + A scan following a round-roi-like pattern. + + Args: + motor_1 (DeviceBase): first motor + width_1 (float): width of region of interest for motor_1 + motor_2 (DeviceBase): second motor + width_2 (float): width of region of interest for motor_2 + dr (float): shell width. Default is 1. + nth (int): number of points in the first shell. Default is 5. + exp_time (float): exposure time in seconds. Default is 0. + relative (bool): Start from an absolute or relative position. Default is False. + burst_at_each_point (int): number of acquisition per point. Default is 1. + + Returns: + ScanReport + + Examples: + >>> scans.round_roi_scan(dev.motor1, 20, dev.motor2, 20, dr=2, nth=3, exp_time=0.1, relative=True) + + """ + super().__init__( + exp_time=exp_time, relative=relative, burst_at_each_point=burst_at_each_point, **kwargs + ) + self.motor_1 = motor_1 + self.motor_2 = motor_2 + self.width_1 = width_1 + self.width_2 = width_2 + self.dr = dr + self.nth = nth + + def _calculate_positions(self) -> None: + self.positions = get_round_roi_scan_positions( + lx=self.width_1, ly=self.width_2, dr=self.dr, nth=self.nth + ) + + + + +[docs] +class ListScan(ScanBase): + scan_name = "list_scan" + required_kwargs = ["relative"] + arg_input = {"device": ScanArgType.DEVICE, "positions": ScanArgType.LIST} + arg_bundle_size = {"bundle": len(arg_input), "min": 1, "max": None} + + def __init__(self, *args, parameter: dict = None, **kwargs): + """ + A scan following the positions specified in a list. + Please note that all lists must be of equal length. + + Args: + *args: pairs of motors and position lists + relative: Start from an absolute or relative position + burst: number of acquisition per point + + Returns: + ScanReport + + Examples: + >>> scans.list_scan(dev.motor1, [0,1,2,3,4], dev.motor2, [4,3,2,1,0], exp_time=0.1, relative=True) + + """ + super().__init__(parameter=parameter, **kwargs) + if len(set(len(entry[0]) for entry in self.caller_args.values())) != 1: + raise ValueError("All position lists must be of equal length.") + + def _calculate_positions(self): + self.positions = np.vstack(tuple(self.caller_args.values()), dtype=float).T.tolist() + + + + +[docs] +class TimeScan(ScanBase): + scan_name = "time_scan" + required_kwargs = ["points", "interval"] + gui_config = {"Scan Parameters": ["points", "interval", "exp_time", "burst_at_each_point"]} + + def __init__( + self, + points: int, + interval: float, + exp_time: float = 0, + burst_at_each_point: int = 1, + **kwargs, + ): + """ + Trigger and readout devices at a fixed interval. + Note that the interval time cannot be less than the exposure time. + The effective "sleep" time between points is + sleep_time = interval - exp_time + + Args: + points: number of points + interval: time interval between points + exp_time: exposure time in s + burst: number of acquisition per point + + Returns: + ScanReport + + Examples: + >>> scans.time_scan(points=10, interval=1.5, exp_time=0.1, relative=True) + + """ + super().__init__(exp_time=exp_time, burst_at_each_point=burst_at_each_point, **kwargs) + self.points = points + self.interval = interval + self.interval -= self.exp_time + + def _calculate_positions(self) -> None: + pass + + +[docs] + def prepare_positions(self): + self.num_pos = self.points + yield None + + + def _at_each_point(self, ind=None, pos=None): + if ind > 0: + yield from self.stubs.wait( + wait_type="read", group="primary", wait_group="readout_primary" + ) + yield from self.stubs.trigger(group="trigger", point_id=self.point_id) + yield from self.stubs.wait(wait_type="trigger", group="trigger", wait_time=self.exp_time) + yield from self.stubs.read( + group="primary", wait_group="readout_primary", point_id=self.point_id + ) + yield from self.stubs.wait(wait_type="trigger", group="trigger", wait_time=self.interval) + self.point_id += 1 + + +[docs] + def scan_core(self): + for ind in range(self.num_pos): + yield from self._at_each_point(ind) + + + + + +[docs] +class MonitorScan(ScanBase): + scan_name = "monitor_scan" + required_kwargs = ["relative"] + scan_type = "fly" + gui_config = {"Device": ["device", "start", "stop"], "Scan Parameters": ["relative"]} + + def __init__( + self, device: DeviceBase, start: float, stop: float, relative: bool = False, **kwargs + ): + """ + Readout all primary devices at each update of the monitored device. + + Args: + device (Device): monitored device + start (float): start position of the monitored device + stop (float): stop position of the monitored device + relative (bool): if True, the motor will be moved relative to its current position. Default is False. + + Returns: + ScanReport + + Examples: + >>> scans.monitor_scan(dev.motor1, -5, 5, exp_time=0.1, relative=True) + + """ + self.device = device + super().__init__(relative=relative, **kwargs) + self.start = start + self.stop = stop + + def _get_scan_motors(self): + self.scan_motors = [self.device] + self.flyer = self.device + + @property + def monitor_sync(self): + return self.flyer + + def _calculate_positions(self) -> None: + self.positions = np.array([[self.start], [self.stop]], dtype=float) + + +[docs] + def prepare_positions(self): + self._calculate_positions() + self.num_pos = 0 + yield from self._set_position_offset() + self._check_limits() + + + def _get_flyer_status(self) -> list: + connector = self.device_manager.connector + + pipe = connector.pipeline() + connector.lrange( + MessageEndpoints.device_req_status_container(self.metadata["RID"]), 0, -1, pipe + ) + connector.get(MessageEndpoints.device_readback(self.flyer), pipe) + return connector.execute_pipeline(pipe) + + +[docs] + def scan_core(self): + yield from self.stubs.set( + device=self.flyer, value=self.positions[0][0], wait_group="scan_motor" + ) + yield from self.stubs.wait(wait_type="move", device=self.flyer, wait_group="scan_motor") + + # send the slow motor on its way + yield from self.stubs.set( + device=self.flyer, + value=self.positions[1][0], + wait_group="scan_motor", + metadata={"response": True}, + ) + + while True: + move_completed, readback = self._get_flyer_status() + + if move_completed: + break + + if not readback: + continue + readback = readback.content["signals"] + yield from self.stubs.publish_data_as_read( + device=self.flyer, data=readback, point_id=self.point_id + ) + self.point_id += 1 + self.num_pos += 1 + + + + + +[docs] +class Acquire(ScanBase): + scan_name = "acquire" + required_kwargs = [] + gui_config = {"Scan Parameters": ["exp_time", "burst_at_each_point"]} + + def __init__(self, *args, exp_time: float = 0, burst_at_each_point: int = 1, **kwargs): + """ + A simple acquisition at the current position. + + Args: + exp_time (float): exposure time in s + burst: number of acquisition per point + + Returns: + ScanReport + + Examples: + >>> scans.acquire(exp_time=0.1, relative=True) + + """ + super().__init__(exp_time=exp_time, burst_at_each_point=burst_at_each_point, **kwargs) + + def _calculate_positions(self) -> None: + self.num_pos = self.burst_at_each_point + + +[docs] + def prepare_positions(self): + self._calculate_positions() + + + def _at_each_point(self, ind=None, pos=None): + if ind > 0: + yield from self.stubs.wait( + wait_type="read", group="primary", wait_group="readout_primary" + ) + yield from self.stubs.trigger(group="trigger", point_id=self.point_id) + yield from self.stubs.wait(wait_type="trigger", group="trigger", wait_time=self.exp_time) + yield from self.stubs.read( + group="primary", wait_group="readout_primary", point_id=self.point_id + ) + self.point_id += 1 + + +[docs] + def scan_core(self): + for self.burst_index in range(self.burst_at_each_point): + yield from self._at_each_point(self.burst_index) + self.burst_index = 0 + + + +[docs] + def run(self): + self.initialize() + self.prepare_positions() + yield from self.open_scan() + yield from self.stage() + yield from self.run_baseline_reading() + yield from self.pre_scan() + yield from self.scan_core() + yield from self.finalize() + yield from self.unstage() + yield from self.cleanup() + + + + + +[docs] +class LineScan(ScanBase): + scan_name = "line_scan" + required_kwargs = ["steps", "relative"] + arg_input = { + "device": ScanArgType.DEVICE, + "start": ScanArgType.FLOAT, + "stop": ScanArgType.FLOAT, + } + arg_bundle_size = {"bundle": len(arg_input), "min": 1, "max": None} + gui_config = { + "Movement Parameters": ["steps", "relative"], + "Acquisition Parameters": ["exp_time", "burst_at_each_point"], + } + + def __init__( + self, + *args, + exp_time: float = 0, + steps: int = None, + relative: bool = False, + burst_at_each_point: int = 1, + **kwargs, + ): + """ + A line scan for one or more motors. + + Args: + *args (Device, float, float): pairs of device / start position / end position + exp_time (float): exposure time in s. Default: 0 + steps (int): number of steps. Default: 10 + relative (bool): if True, the start and end positions are relative to the current position. Default: False + burst_at_each_point (int): number of acquisition per point. Default: 1 + + Returns: + ScanReport + + Examples: + >>> scans.line_scan(dev.motor1, -5, 5, dev.motor2, -5, 5, steps=10, exp_time=0.1, relative=True) + + """ + super().__init__( + exp_time=exp_time, relative=relative, burst_at_each_point=burst_at_each_point, **kwargs + ) + self.steps = steps + + def _calculate_positions(self) -> None: + axis = [] + for _, val in self.caller_args.items(): + ax_pos = np.linspace(val[0], val[1], self.steps, dtype=float) + axis.append(ax_pos) + self.positions = np.array(list(zip(*axis)), dtype=float) + + + + +[docs] +class ScanComponent(ScanBase): + pass + + + + +[docs] +class OpenInteractiveScan(ScanComponent): + scan_name = "open_interactive_scan" + required_kwargs = [] + arg_input = {"device": ScanArgType.DEVICE} + arg_bundle_size = {"bundle": len(arg_input), "min": 1, "max": None} + + def __init__(self, *args, **kwargs): + """ + An interactive scan for one or more motors. + + Args: + *args: devices + exp_time: exposure time in s + steps: number of steps (please note: 5 steps == 6 positions) + relative: Start from an absolute or relative position + burst: number of acquisition per point + + Returns: + ScanReport + + Examples: + >>> scans.open_interactive_scan(dev.motor1, dev.motor2, exp_time=0.1) + + """ + super().__init__(**kwargs) + + def _calculate_positions(self): + pass + + def _get_scan_motors(self): + caller_args = list(self.caller_args.keys()) + self.scan_motors = caller_args + + +[docs] + def run(self): + yield from self.stubs.open_scan_def() + self.initialize() + yield from self.read_scan_motors() + yield from self.open_scan() + yield from self.stage() + yield from self.run_baseline_reading() + + + + + +[docs] +class AddInteractiveScanPoint(ScanComponent): + scan_name = "interactive_scan_trigger" + arg_input = {"device": ScanArgType.DEVICE} + arg_bundle_size = {"bundle": len(arg_input), "min": 1, "max": None} + + def __init__(self, *args, **kwargs): + """ + An interactive scan for one or more motors. + + Args: + *args: devices + exp_time: exposure time in s + steps: number of steps (please note: 5 steps == 6 positions) + relative: Start from an absolute or relative position + burst: number of acquisition per point + + Returns: + ScanReport + + Examples: + >>> scans.interactive_scan_trigger() + + """ + super().__init__(**kwargs) + + def _calculate_positions(self): + pass + + def _get_scan_motors(self): + self.scan_motors = list(self.caller_args.keys()) + + def _at_each_point(self, ind=None, pos=None): + yield from self.stubs.trigger(group="trigger", point_id=self.point_id) + yield from self.stubs.wait(wait_type="trigger", group="trigger", wait_time=self.exp_time) + yield from self.stubs.read_and_wait( + group="primary", wait_group="readout_primary", point_id=self.point_id + ) + self.point_id += 1 + + +[docs] + def run(self): + yield from self.open_scan() + yield from self._at_each_point() + yield from self.close_scan() + + + + + +[docs] +class CloseInteractiveScan(ScanComponent): + scan_name = "close_interactive_scan" + + def __init__(self, *args, **kwargs): + """ + An interactive scan for one or more motors. + + Args: + *args: devices + Parameters +Next, we need to define the scan parameters. In exp_time: exposure time in s + steps: number of steps (please note: 5 steps == 6 positions) + relative: Start from an absolute or relative position + burst: number of acquisition per point + + Returns: + ScanReport + + Examples: + >>> scans.close_interactive_scan(dev.motor1, dev.motor2, exp_time=0.1) + + """ + super().__init__(**kwargs) + + def _calculate_positions(self): + pass + + +[docs] + def run(self): + yield from self.finalize() + yield from self.unstage() + yield from self.cleanup() + yield from self.stubs.close_scan_def() + diff --git a/phoenix_bec/local_scripts/PhoenixTemplate.py b/phoenix_bec/local_scripts/PhoenixTemplate.py index af47e90..71c13dd 100644 --- a/phoenix_bec/local_scripts/PhoenixTemplate.py +++ b/phoenix_bec/local_scripts/PhoenixTemplate.py @@ -19,25 +19,15 @@ import importlib import ophyd -#logger = bec_logger.logger - # load local configuration -#bec.config.load_demo_config() - -# .. define base path for directory with scripts - -PhoenixBL=0 -from ConfigPHOENIX.config.phoenix import PhoenixBL -#from ConfigPHOENIX.devices.falcon_csaxs import FalconSetup -# initialize general parameter -ph=PhoenixBL() - -bec.config.update_session_with_file('./ConfigPHOENIX/device_config/phoenix_devices.yaml') +# +phoenix.add_phoenix_config() +#bec.config.update_session_with_file('./ConfigPHOENIX/device_config/phoenix_devices.yaml') time.sleep(1) -s1=scans.line_scan(dev.ScanX,0,0.002,steps=4,exp_time=1,relative=False,delay=2) +#s1=scans.line_scan(dev.ScanX,0,0.1,steps=4,exp_time=1,relative=False,delay=2) s2=scans.phoenix_line_scan(dev.ScanX,0,0.002,steps=4,exp_time=.01,relative=False,delay=2) diff --git a/phoenix_bec/local_scripts/README.md~ b/phoenix_bec/local_scripts/README.md~ new file mode 100644 index 0000000..6d32ff0 --- /dev/null +++ b/phoenix_bec/local_scripts/README.md~ @@ -0,0 +1,8 @@ +This diretory is for scripts, test etc. which are not loaded into the server. + +Hence no directory should contain a file named +__init__.py + + +For now we keep it in the phoenix_bec structure, but for operation, such files should be located out side of the +bec_phoenix plugin. diff --git a/phoenix_bec/scans/__init__.py b/phoenix_bec/scans/__init__.py index 2cead0a..6e9a21e 100644 --- a/phoenix_bec/scans/__init__.py +++ b/phoenix_bec/scans/__init__.py @@ -1 +1 @@ -from .phoenix_line_scan import PhoenixLineScan \ No newline at end of file +from .phoenix_scans import PhoenixLineScan \ No newline at end of file diff --git a/phoenix_bec/scans/phoenix_line_scan.py b/phoenix_bec/scans/phoenix_scans.py similarity index 71% rename from phoenix_bec/scans/phoenix_line_scan.py rename to phoenix_bec/scans/phoenix_scans.py index 1ac0ff8..cce9c87 100644 --- a/phoenix_bec/scans/phoenix_line_scan.py +++ b/phoenix_bec/scans/phoenix_scans.py @@ -22,19 +22,41 @@ but they are executed in a specific order: - self.cleanup # send a close scan message and perform additional cleanups if needed """ +# imports in ScanBase +#from __future__ import annotations + +#import ast +#import enum +#import threading +#import time +#import uuid +#from abc import ABC, abstractmethod +#from typing import Any, Literal + +#import numpy as np + +#from bec_lib.device import DeviceBase +#from bec_lib.devicemanager import DeviceManagerBase +#from bec_lib.endpoints import MessageEndpoints +#from bec_lib.logger import bec_logger + +#from .errors import LimitError, ScanAbortion +#from .path_optimization import PathOptimizerMixin +#from .scan_stubs import ScanStubs +# end imports in ScanBase + # import time # import numpy as np # from bec_lib.endpoints import MessageEndpoints -# from bec_lib.logger import bec_logger +from bec_lib.logger import bec_logger # from bec_lib import messages # from bec_server.scan_server.errors import ScanAbortion # from bec_server.scan_server.scans import FlyScanBase, RequestBase, ScanArgType, ScanBase # logger = bec_logger.logger - from bec_server.scan_server.scans import ScanBase, ScanArgType import numpy as np import time @@ -42,8 +64,63 @@ from bec_lib.logger import bec_logger logger = bec_logger.logger -class PhoenixLineScan(ScanBase): - scan_name = "phoenix_line_scanZZZ" + +class LogTime(): + + def __init__(self): + self.t0=time.process_time() + + def p_s(self,x): + now=time.process_time() + delta=now-self.t0 + m=str(delta)+' sec '+x + logger.success(m) + self.t0=now + +ll=LogTime() + + +class PhoenixScanBaseTTL(ScanBase): + """ + Base scan cl p_s('init scrips.phoenix.scans.PhoenixLineScan') + """ + + + ll.p_s('enter scripts.phoenix.scans.PhoenixScanBaseTTL') + def scan_core(self): + """perform the scan core procedure""" + ll.p_s('PhoenixScanBaseTT.scan_core') + for ind, pos in self._get_position(): + for self.burst_index in range(self.burst_at_each_point): + ll.p_s('PhoenixScanBaseTT.scan_core in loop ') + + yield from self._at_each_point(ind, pos) + self.burst_index = 0 + + def _at_each_point(self, ind=None, pos=None): + ll.p_s('PhoenixScanBaseTT._at_each_point') + yield from self._move_scan_motors_and_wait(pos) + if ind > 0: + yield from self.stubs.wait( + wait_type="read", group="primary", wait_group="readout_primary" + ) + time.sleep(self.settling_time) + yield from self.stubs.trigger(group="trigger", point_id=self.point_id) + yield from self.stubs.wait(wait_type="trigger", group="trigger", wait_time=self.exp_time) + yield from self.stubs.read( + group="primary", wait_group="readout_primary", point_id=self.point_id + ) + yield from self.stubs.wait( + wait_type="read", group="scan_motor", wait_group="readout_primary" + ) + + self.point_id += 1 + ll.p_s('done') + +class PhoenixLineScan(PhoenixScanBaseTTL): + + ll.p_s('enter scripts.phoenix.scans.PhoenixLineScan') + scan_name = "phoenix_line_scan" required_kwargs = ["steps", "relative"] arg_input = { "device": ScanArgType.DEVICE, @@ -78,38 +155,22 @@ class PhoenixLineScan(ScanBase): ans.line_scan(dev.motor1, -5, 5, dev.motor2, -5, 5, steps=10, exp_time=0.1, relative=True) """ + ll.p_s('init scripts.phoenix.scans.PhoenixLineScan') super().__init__( exp_time=exp_time, relative=relative, burst_at_each_point=burst_at_each_point, **kwargs ) self.steps = steps self.setup_device = setup_device - print('INIT CLASS PhoenixLineScan') - time.sleep(1) + time.sleep(1) + ll.p_s('done') def _calculate_positions(self) -> None: + ll.p_s('PhoenixLineScan._calculate_positions') axis = [] for _, val in self.caller_args.items(): ax_pos = np.linspace(val[0], val[1], self.steps, dtype=float) axis.append(ax_pos) self.positions = np.array(list(zip(*axis)), dtype=float) + ll.p_s('done') - def _at_each_point(self, ind=None, pos=None): - yield from self._move_scan_motors_and_wait(pos) - if ind > 0: - yield from self.stubs.wait( - wait_type="read", group="primary", wait_group="readout_primary" - ) - time.sleep(self.settling_time) - if self.setup_device: - yield from self.stubs.send_rpc_and_wait(self.setup_device, "velocity.set", 1) - yield from self.stubs.trigger(group="trigger", point_id=self.point_id) - yield from self.stubs.wait(wait_type="trigger", group="trigger", wait_time=self.exp_time) - yield from self.stubs.read( - group="primary", wait_group="readout_primary", point_id=self.point_id - ) - yield from self.stubs.wait( - wait_type="read", group="scan_motor", wait_group="readout_primary" - ) - - self.point_id += 1 \ No newline at end of file diff --git a/phoenix_bec/scripts/phoenix.py b/phoenix_bec/scripts/phoenix.py index ab1f1cd..ff0975d 100644 --- a/phoenix_bec/scripts/phoenix.py +++ b/phoenix_bec/scripts/phoenix.py @@ -28,6 +28,9 @@ logger = bec_logger.logger # .. define base path for directory with scripts + + + class PhoenixBL(): """ # @@ -68,7 +71,7 @@ class PhoenixBL(): print('add xmap ') print(self.path_devices+'phoenix_xmap.yaml') - bec.config.update_session_with_file(self.path_devices+'phoenix_xmap.yaml',timeout=100) + bec.config.update_session_with_file(self.path_devices+'phoenix_xmap.yaml')#,timeout=100) def add_falcon(self): print('add_xmap')