diff --git a/csaxs_bec/devices/epics/delay_generator_csaxs/README.md b/csaxs_bec/devices/epics/delay_generator_csaxs/README.md new file mode 100644 index 0000000..439d920 --- /dev/null +++ b/csaxs_bec/devices/epics/delay_generator_csaxs/README.md @@ -0,0 +1,58 @@ +# Delay Generator implementation at the CSAXS beamline + +This module provides an ophyd device implementation for the Stanford Research Systems Delay Generator DDG645, used at the cSAXS beamline as a master timing source for detector triggering and other beamline devices. Detailed information about the DDG manual can be found here: +https://www.thinksrs.com/downloads/pdfs/manuals/DG645m.pdf. +The implementation is based on a community EPICS driver (https://github.com/epics-modules/delaygen?tab=readme-ov-file). + +**EPICS Interface** + +At the cSAXS beamline, the DDG panel is avaiable via caqtdm on the beamline consoles. + +``` bash +caqtdm -noMsg -attach -macro P=X12SA-CPCL-DDG,R=1: srsDG645.ui +``` +with R=1,2,3,4,5 for 5 different DDG units installed at CSAXS. + +# Ophyd Device integration at cSAXS +For cSAXS, a custom ophyd device class implementation of the DDG is provided [here](./delay_generator_csaxs.py). This class provides a basic interface to the DDG PVs. The interface provides channels 'A', B', 'C', ... with setpoint, readback and references, as well as high level parameters such as *width* and *delay*. Please check the source code of the class for more details of the implementation. + +In addition, the class provides a set of utility methods to configure sets of channel pairs 'AB', 'CD', ... as commonly needed in operation at the beamline. At the cSAXS beamline, a single DDG device is used as a master timing source for other devices. The general scheme is described in a [PDF document here](./trigger_scheme_ddg1_ddg2.pdf). Below is a description of the configuration of the two DDG units used at cSAXS for detector triggering and beamline shutter control. + +## Master card: DDG1 (X12SA-CPCL-DDG1) +The master [delay generator DDG1](./ddg_1.py) is configured to provide the following signals: + +**Connection Scheme**: +- EXT/EN: May be connected to external devices, e.g. SGalil motion controller for fly scans. +- Operation Mode: Burst mode, but with single burst (burst count = 1). This is for practical reasons as it allows +to interrupt and ongoing sequence if needed. +- Software Trigger: Controlled through BEC. +- State Control: BEC checks the *state* of this DDG to wait for the completion of a timing sequence. + +**Delay Pairs**: +- DelayPair 'AB': Provides the external enable (EXT/EN) signal to the second DDG (R=2). +- DelayPair 'CD': Controls the beamline shutter. +- DelayPair 'EF': Generates pulses for the MCS card, combined with the detector pulse train via an OR gate. This ensures the MCS card receives an additional pulse required for proper operation. + +**Delay Channels**: +- a = t0 + 2ms (2ms delay to allow the shutter to open) +- b = a + 1us (short pulse) +- c = t0 +- d = a + exp_time * burst_count + 1ms (to allow the shutter to close) +- e = d +- f = e + 1us (short pulse to OR gate for MCS triggering) + +## Detector card: DDG2 (X12SA-CPCL-DDG2) +The second [delay generator DDG2](./ddg_2.py) is configured to provide the following signals: + +**Connection Scheme**: +- EXT/EN: Connected to the DelayPair AB of the master DDG (R=1). +- Operation Mode: Burst mode: The *burst count* is set to the number of frames per trigger. The *burst delay* is set to 0, and the *burst period* is set to the exposure time. +- Software Trigger: Irrelevant, as the device is externally triggered by DDG1. + +**Delay Pairs**: +- DelayPair 'AB': Provides the trigger signal to the detector. + +**Delay Channels**: +- a = t0 +- b = a + (exp_time - READOUT_TIMES) + diff --git a/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_1.py b/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_1.py index 6d92e96..628617f 100644 --- a/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_1.py +++ b/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_1.py @@ -61,6 +61,13 @@ if TYPE_CHECKING: # pragma: no cover logger = bec_logger.logger +######################## +## DEFAULT SETTINGS #### +######################## + +# NOTE Default channel configuration for all channels of the DDG1 delay generator +# This can be adapted as needed, or fine-tuned per channel. On every reload of the +# device configuration in BEC, these values will be set into the DDG1 device. _DEFAULT_CHANNEL_CONFIG: ChannelConfig = { "amplitude": 5.0, "offset": 0.0, @@ -68,6 +75,8 @@ _DEFAULT_CHANNEL_CONFIG: ChannelConfig = { "mode": "ttl", } +# NOTE Here you can adapt the default IO configuration for all channels of the DDG1 +# Currently, all channels are set to the same default configuration `_DEFAULT_CHANNEL_CONFIG`. DEFAULT_IO_CONFIG: dict[AllChannelNames, ChannelConfig] = { "t0": _DEFAULT_CHANNEL_CONFIG, "ab": _DEFAULT_CHANNEL_CONFIG, @@ -75,9 +84,19 @@ DEFAULT_IO_CONFIG: dict[AllChannelNames, ChannelConfig] = { "ef": _DEFAULT_CHANNEL_CONFIG, "gh": _DEFAULT_CHANNEL_CONFIG, } + DEFAULT_TRIGGER_SOURCE: TRIGGERSOURCE = TRIGGERSOURCE.SINGLE_SHOT + +# NOTE Default readout times for each channel, can be adapted as needed. +# These values are relevant to calculate proper widths of the timing signals. +# They also define a minimum exposure time that can be used as they are subtracted +# as dead times from the exposure time. DEFAULT_READOUT_TIMES = {"ab": 2e-4, "cd": 2e-4, "ef": 2e-4, "gh": 2e-4} # 0.2 ms 5kHz +# NOTE Default channel references for each channel of the DDG1 delay generator. +# This needs to be carefully adjusted to match the envisioned trigger scheme. +# If the trigger scheme changes, adapt the values here together with the README and +# PDF `trigger_scheme_ddg1_ddg2.pdf`. DEFAULT_REFERENCES: list[tuple[LiteralChannels, CHANNELREFERENCE]] = [ ("A", CHANNELREFERENCE.T0), # T0 + 2ms delay ("B", CHANNELREFERENCE.A), @@ -89,14 +108,27 @@ DEFAULT_REFERENCES: list[tuple[LiteralChannels, CHANNELREFERENCE]] = [ ("H", CHANNELREFERENCE.G), ] +############################### +## DDG1 IMPLEMENTATION ######## +############################### + class DDG1(PSIDeviceBase, DelayGeneratorCSAXS): """ - Implementation of DelayGeneratorCSAXS for master trigger delay generator at X12SA-CPCL-DDG1. - It will be triggered by a soft trigger from BEC or a hardware trigger from a beamline device - (e.g. the Galil stages). It is operated in standard mode, not burst mode and will trigger the - EXT/EN of DDG2 (channel ab). It is responsible for opening the shutter (channel cd) and sending - an extra trigger to an or gate for the MCS card (channel ef). + + Implementation of the DelayGenerator DDG1 for the cSAXS beamline. It is the main trigger + source for the cSAXS beamline, and will be triggered by BEC through a software trigger or + by a hardware trigger from a beamline device (e.g. Galil stages). Specific implementation + of the cabling logic expected for this device are described in the module README, the attached + PDF 'trigger_scheme_ddg1_ddg2.pdf' and the module docstring. + + The IOC prefix is 'X12SA-CPCL-DDG1:'. + + Args: + name (str): Name of the device. + prefix (str, optional): EPICS prefix for the device. Defaults to ''. + scan_info (ScanInfo | None, optional): Scan info object. Defaults to None. + device_manager (DeviceManagerBase | None, optional): Device manager. Defaults to None. """ def __init__( @@ -107,9 +139,6 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS): device_manager: DeviceManagerBase | None = None, **kwargs, ): - """ - Initialize the MCSCardCSAXS with the given arguments and keyword arguments. - """ super().__init__( name=name, prefix=prefix, scan_info=scan_info, device_manager=device_manager, **kwargs ) @@ -123,17 +152,30 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS): # pylint: disable=attribute-defined-outside-init def on_connected(self) -> None: """ - Set the default values on the device - intended to overwrite everything to a usable default state. - Sets DEFAULT_IO_CONFIG into each channel, sets the trigger source to DEFAULT_TRIGGER_SOURCE, - and turns off burst mode. + + This method is called after the device is initialized and all signals are connected. This happens + when a device configuration is loaded in BEC. + + It sets the default values for this device - intended to overwrite everything to a usable default state. + For this purpose, we use the DEFAULT SETTINGS defined at the top of this module. + + To ensure that this process is robust, we follow these steps: + - First, we stop any ongoing burst mode operation. + - Then, we set the DEFAULT_IO_CONFIG for each channel, the trigger source to DEFAULT_TRIGGER_SOURCE, + and the channel references to DEFAULT_REFERENCES. + - We set the state proc_status to be event based. This triggers readouts of the EventStatusLI bit + based on events. This was empirically found to be a stable solution in combination with the poll + loop of the state. + - Finally, we set the burst delay to 0, to set it to be of no delay. """ - # NOTE First we make sure that there is nothing running on the DDG. This seems to + + # NOTE First we make sure that there is nothing running on the DDG. This seems to # help to tackle that the DDG occasionally freezes during the first scan # after reconnecting to it. Do not remove. self.stop_ddg() # NOTE Setting DEFAULT configurations for IO config, trigger config and references. - # The three dictionaries above 'DEFAULT_IO_CONFIG', 'DEFAULT_TRIGGER_SOURCE' and + # The three dictionaries above 'DEFAULT_IO_CONFIG', 'DEFAULT_TRIGGER_SOURCE' and # 'DEFAULT_REFERNCES' should be used to adapt configurations if needed. for channel, config in DEFAULT_IO_CONFIG.items(): self.set_io_values(channel, **config) @@ -146,45 +188,71 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS): self.state.proc_status_mode.put(PROC_EVENT_MODE.EVENT) # NOTE Burst delay should be set to 0, don't remove as this will not be checked + # Also set the burst count to 1 to only have a single pulse for DDG1. self.burst_delay.put(0) + self.burst_count.put(1) def on_stage(self) -> None: """ - Stage logic for the DDG1 device, being th main trigger delay generator for CSAXS. - For standard scans, it will be triggered by a soft trigger from BEC. - It also has a hardware trigger feeded into the EXT/EN for fly-scanning, i.e. Galil stages. - This DDG is always not in burst mode. + This method is called in preparation for a scan. All information about the upcoming + scan is available in self.scan_info.msg at this point. We use this information to + configure the DDG1 for the upcoming scan. + + The DDG is operated in burst mode for the scan, but with only a single burst pulse. + THe length of the pulse is set to the expected exposure time for a single trigger, + which includes any burst acquisitions if frames_per_trigger > 1. + + The logic is as follows: + - We check if any default burst parameters need to be set, and set them if needed. + - We calculate the burst pulse width based on the exposure time and frames_per_trigger. + - We set the burst_period and the shutter signal (delay pairs cd) to be + exposure_time * frames_per_trigger + 3ms (2ms for shutter to open, 1ms to close). + - We set the delay pairs ab to be 2ms delayed (to allow the shutter to open) with a width of 1us to trigger DDG2. + - We set the delay pairs ef to be triggered after the shutter closes with a width of 1us to trigger the MCS card. + - Finally, we add a short sleep to ensure that the IOC and DDG HW process the values properly. """ - # NOTE Only set relevant channels on burst_mode channel - # After mutliple tests with the HW, this procedure has been determined empirically - # to improve stability and avoid HW getting stuck in triggering cycles - # Please also note that this should happen first, before setting delay times on the chabnnels. + + ######################################## + ### Burst mode settings ################ + ######################################## + + # NOTE We check here if the delay generator is not in burst mode. We check these values + # and set them to the requried values if they differ from the expected ones. + # This has been found empirically to improve stability and avoid HW getting stuck in triggering cycles. if self.burst_mode.get() == 0: self.burst_mode.put(1) - exp_time = self.scan_info.msg.scan_parameters["exp_time"] - if self.burst_period.get() != exp_time: - self.burst_period.put(exp_time) - if self.burst_delay.get() != 0: self.burst_delay.put(0) + if self.burst_count.get() != 1: + self.burst_count.put(1) + ######################################### - ### Setup delay pairs for acquisition ### + ### Setup timing for burst and delays ### ######################################### frames_per_trigger = self.scan_info.msg.scan_parameters["frames_per_trigger"] + exp_time = self.scan_info.msg.scan_parameters["exp_time"] + + # Burst Period DDG1 + # Set burst_period to shutter width + # c/t0 + 2ms + exp_time * burst_count + 1ms + shutter_width = 2e-3 + exp_time * frames_per_trigger + 1e-3 + if self.burst_period.get() != shutter_width: + self.burst_period.put(shutter_width) # Trigger DDG2 # a = t0 + 2ms, b = a + 1us # a has reference to t0, b has reference to a + # Add delay of 2ms to allow shutter to open self.set_delay_pairs(channel="ab", delay=2e-3, width=1e-6) - + # Trigger shutter # d = c/t0 + 2ms + exp_time * burst_count + 1ms # c has reference to t0, d has reference to c - shutter_width = 2e-3 + exp_time * frames_per_trigger + 1e-3 + # Shutter opens without delay at t0, closes after exp_time * burst_count + 3ms (2ms open, 1ms close) self.set_delay_pairs(channel="cd", delay=0, width=shutter_width) # Trigger extra pulse for MCS OR gate @@ -193,50 +261,64 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS): self.set_delay_pairs(channel="ef", delay=0, width=1e-6) # NOTE Add additional sleep to make sure that the IOC and DDG HW process the values properly - # This value has been choosen empirically after testing with the HW. Please acknowledge that - # this is called in parallel, so it should not add significant overhead to acquisition. It's - # also just called once per scan. + # This value has been choosen empirically after testing with the HW. It's + # also just called once per scan and has been found to improve stability of the HW. time.sleep(0.2) def _prepare_mcs_on_trigger(self, mcs: MCSCardCSAXS) -> None: - """Prepare the MCS card for the next trigger. - This method holds the logic to ensure that the MCS card is ready to read. - It's logic is coupled to the MCS card implementation and the DDG1 trigger logic. """ - # NOTE: It is crucial to first wait for the MCS card to finish it's acquisition before - # the DDG moves on to the next trigger cycle. + This method is used by the DDG1 on_trigger method to prepare the MCS card for the next trigger. + It checks that the MCS card is properly prepared before BEC sends a software trigger to the DDG1, + which is needed for step scans. + + It relies on the MCS card implementation and needs to be adapted if the MCS card logic changes. + """ + + # NOTE First we wait that the MCS card is not acquiring. We add here a timeout of 5s to avoid + # a deadlock in case the MCS card is stuck for some reason. This should not happen normally. status = CompareStatus(mcs.acquiring, ACQUIRING.DONE) self.cancel_on_stop(status) status.wait(timeout=5) - # NOTE: Important logic on the MCS card, this makes sure that callbacks from the MCA channels - # are not surpressed. Please check MCS card and 'erase_all' comment. - mcs._omit_mca_callbacks.clear() + # NOTE Clear the '_omit_mca_callbacks' flag. This makes sure that data received from the mca1...mca3 + # counters are forwarded to BEC. Once the flag is set, we create a TransitionStatus DONE->ACQUIRING + # and start the acquisition through erase_start.put(1). Finally, we wait for the card to go to ACQUIRING state. + mcs._omit_mca_callbacks.clear() # pylint: disable=protected-access status_acquiring = TransitionStatus(mcs.acquiring, [ACQUIRING.DONE, ACQUIRING.ACQUIRING]) self.cancel_on_stop(status_acquiring) - mcs.erase_start.put(1) + mcs.erase_start.put(1) - # NOTE: Now we wait for the card to go to Acuiring after we've called erase_start - # Please increase the timeout if this turns out to be problematic - status_acquiring.wait(timeout=3) + # NOTE Timeout of 3s should be plenty, any longer wait should checked. If this happens to crash + # an acquisition regularly with a WaitTimeoutError, the timeout can be increased but it should + # be investigated why the EPICS interface is slow to respond. + status_acquiring.wait(timeout=3) def _poll_event_status(self) -> None: """ - Poll the event status register in a background thread. Control - the polling with the _poll_thread_run_event and _poll_thread_kill_event. + + Polling loop to retrieve the event status register of the delay generator DDG1. + This method runs in a background thread and the polling is controlled through the + '_poll_thread_run_event' and '_poll_thread_kill_event'. Polling should only become + active when a software trigger was sent in BEC and we are waiting for the burst to complete. """ - # NOTE hook to kill the loop, only needed if device is destroyed + # Main loop of the polling thread. As long as the kill event is not set, the loop continues. while not self._poll_thread_kill_event.is_set(): - # The thread will wait in this event if IDLE. Polling can be started - # by setting 'poll_thread_run_event.set()'. Please check usage for software - # triggered scans from BEC within on_trigger. + # NOTE Main wait event for the polling thread. If the _poll_thread_run_event is not set, + # The thread will wait here. This event is used to start/stop polling from outside the thread, + # as used in on_trigger and on_stop. Please make sure to set this event also when the thread + # should be killed as its otherwise stuck inside the wait. self._poll_thread_run_event.wait() - # NOTE Event to indicate that polling is taking place currently. This is needed as there - # are sleeps of 20ms in the poll loop which were empirically determined after long testing - # to improve stability in communication with the HW. + # NOTE Set the event to indicate that we are currently still in the poll_loop. This is needed + # as we have to use sleeps of 20ms within the poll loop. These sleeps were empirically detetermined + # to ensure that no state changes are missed. However, these sleeps have the side effect that + # setting the '_poll_thread_run_event' may not immediately stop the polling. Therefore, we need the + # '_poll_thread_poll_loop_done' event to indicate that polling has finished. If this logic is changed, + # it requires careful testing as failure rates can be in the 1 out of 500 events rate, which are still + # not acceptable for operation. The current implementation has been tested with failure rates smaller then + # ~ 1:100000 if failures happened at all. self._poll_thread_poll_loop_done.clear() while ( self._poll_thread_run_event.is_set() and not self._poll_thread_kill_event.is_set() @@ -248,36 +330,49 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS): logger.error( f"Exception in polling loop thread, polling continues...\n Error content:\n{content}" ) - # NOTE Important to set the event again. The next trigger loop waits for the poll thread to become - # IDLE again. Do not remove. + # NOTE Set the _poll_thread_poll_loop_done event to indicate that we are done polling. Do not remove! self._poll_thread_poll_loop_done.set() def _poll_loop(self) -> None: """ - Poll loop to update event status. - The checks ensure that the loop exist after each operation and be stuck in sleep. - The 20ms sleep was added to ensure that the event status is not polled too frequently, - and to give the device time to process the previous command. This was found empirically - to be necessary to avoid missing events. + This method is the actual poll loop to update the event status from the satus register + of the delay generator DDG1. + + It follows a procedure that was established empirically after extended testing with the HW. + Any adaptations to this logic need to be carefully tested to avoid that the HW becomes unstable. + NOTE: Sleeps are important in this logic, and should not be removed or optimized without extensive testing. + 20ms has been found to be the minimum sleep time that proofed to be stable in operation. + + The logic is as follows: + - Set the 'proc_status' to 1 with use_complete=True to trigger an event based readout of the EventStatusLI. + - Sleep 20ms to give the device time to process the command. + - Check if the kill event or run event are cleared, and exit the loop if so. + - Read the EventStatusLI channel to update the event status. + - Check again if the kill event or run event are cleared, and exit the loop if so. + + Please note that any important changes of the status register reading will trigger callbacks + if attached to the event status signal. These callbacks hold the logic to resolve status objects + when waiting for specific events (e.g. end of burst). + - IMPORTANT: Do not remove sleeps or try to optimize this logic. This seems to be a - fragile balance between polling frequency and device processing time. Also in between - start/stop of polling. Please also consider that there is a sleep in on_trigger and - that this might also be necessary to avoid that HW becomes unavailable/unstable. """ self.state.proc_status.put(1, use_complete=True) - #NOTE: Important sleep that has been empirically determined after testing for a long time + + # NOTE: Important sleep that has been empirically determined after testing for a long time # Only remove if absolutely certain that the DDG logic of polling the EventStatusLI works without it. - time.sleep(0.02) + time.sleep(0.02) + if self._poll_thread_kill_event.is_set() or not self._poll_thread_run_event.is_set(): return + self.state.event_status.get(use_monitor=False) if self._poll_thread_kill_event.is_set() or not self._poll_thread_run_event.is_set(): return - #NOTE: Again important sleep that has been empirically determined after testing for a long time + + # NOTE: Again important sleep that has been empirically determined after testing for a long time # Only remove if certain that logic can be replaced to not risk HW failures. - time.sleep(0.02) + time.sleep(0.02) def _start_polling(self) -> None: """Start the polling loop in the background thread.""" @@ -297,8 +392,23 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS): else: logger.info("Polling thread stopped.") - def _prepare_trigger_status_event(self, timeout: float | None = None) -> DeviceStatus: - """Prepare the trigger status event for the DDG1, and trigger the de""" + def _prepare_trigger_status_event( + self, timeout: float | None = None + ) -> StatusBitsCompareStatus: + """ + Method to prepare a status object that indicates the end of a burst cycle. + It also sets up a callback to cancel the polling of the event status register + if the status is cancelled externally (e.g. by stopping the device). In addition, + a timeout can either be specified, or is automatically calculated based on the + exposure time, frames_per_trigger and a default extra time of 5 seconds. + + Args: + timeout (float | None, optional): Timeout for the status object. If None, a + default timeout based on exposure time and frames_per_trigger is used. + + Returns: + StatusBitsCompareStatus: + """ if timeout is None: # Default timeout of 5 seconds + exposure time * frames_per_trigger timeout = 5 + self.scan_info.msg.scan_parameters.get( @@ -322,43 +432,57 @@ class DDG1(PSIDeviceBase, DelayGeneratorCSAXS): def on_trigger(self) -> DeviceStatus: """ This method is called from BEC as a software trigger. - It first stops any active polling if still running. The sleep of 20ms is important - for proper functionality of the card. Then it checks if the 'mcs' card is in the config - and enabled, and prepares the card for triggering. For now this is still relevant, but may - be moved to a high level logic in BEC in the future (neeeds). - Then a status_object is prepared that receives the EventStatusLI epics channel (self.state.event_status), - and attaches a callback that resolves once the burst is done. The polling thread is enabled to manually - trigger a reading of the event status before a software trigger is sent via trigger_shot. + It follows a specific procedure to ensure that the DDG1 and MCS card are properly handled + on a trigger event. The established logic is as follows: + + - Stop polling the event status register to avoid that the polling loop is still active + before sending the software trigger. This needs to be done to avoid conflicts + in reading the event status register. + - Wait for the _poll_thread_poll_loop_done event to ensure that the polling loop is no + longer active. A timeout of 1s is plenty as sleeps of 20ms are used in the poll loop. + - Add an extra sleep of 20ms to make sure that the HW is again ready to process new commands. + This has been found empirically after long testing to improve stability. + - If the MCS card is present in the current session of BEC, prepare the card for the next trigger. + - Prepare a status StatusBitsCompareStatus that will be resolved once the burst is done. + - Start the polling loop again to monitor the event status register. + - Send the software trigger to the DDG1 + - Return the status object to BEC which will automatically resolve once the status register has + the END_OF_BURST bit set. The callback of the status object will also stop the polling loop. """ - # Stop polling, poll once manually to ensure that the register is clean self._stop_polling() self._poll_thread_poll_loop_done.wait(timeout=1) - # NOTE: THis sleep is important for the HW to process the event and avoid that - # becomes unresponsive. This was found empirically after long testing. + # NOTE: This sleep is important to ensure that the HW is ready to process new commands. + # It has been empirically determined after long testing that this improves stability. time.sleep(0.02) + # NOTE If the MCS card is present in the current session of BEC, + # we prepare the card for the next trigger. The procedure is implemented + # in the '_prepare_mcs_on_trigger' method. # Prepare the MCS card for the next software trigger mcs = self.device_manager.devices.get("mcs", None) if mcs is None or mcs.enabled is False: logger.info("Did not find mcs card with name 'mcs' in current session") else: self._prepare_mcs_on_trigger(mcs) - # Prepare status with callback to cancel the polling once finished + + # Prepare StatusBitsCompareStatus to resolve once the END_OF_BURST bit was set. status = self._prepare_trigger_status_event() - # Start polling + + # Start polling thread again to monitor event status self._start_polling() # Trigger the DDG1 self.trigger_shot.put(1, use_complete=True) return status def on_stop(self) -> None: - """Stop the delay generator by setting the burst mode to 0""" + """Stop the delay generator HW and polling thread when the device is stopped.""" self.stop_ddg() self._stop_polling() def on_destroy(self) -> None: """Clean up resources when the device is destroyed.""" + self.stop_ddg() self._kill_poll_thread() diff --git a/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_2.py b/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_2.py index 4d8d0c4..16172b5 100644 --- a/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_2.py +++ b/csaxs_bec/devices/epics/delay_generator_csaxs/ddg_2.py @@ -41,6 +41,11 @@ from csaxs_bec.devices.epics.delay_generator_csaxs.delay_generator_csaxs import logger = bec_logger.logger +######################## +## DEFAULT SETTINGS #### +######################## + +# NOTE Default channel configuration for the DDG2 delay generator channels _DEFAULT_CHANNEL_CONFIG: ChannelConfig = { "amplitude": 5.0, "offset": 0.0, @@ -48,6 +53,9 @@ _DEFAULT_CHANNEL_CONFIG: ChannelConfig = { "mode": "ttl", } +# NOTE Default IO configuration for all channels in DDG2 +# Each channel uses the same default configuration as defined above +# If needed, individual channel configurations should be modified here. DEFAULT_IO_CONFIG: dict[AllChannelNames, ChannelConfig] = { "t0": _DEFAULT_CHANNEL_CONFIG, "ab": _DEFAULT_CHANNEL_CONFIG, @@ -55,9 +63,16 @@ DEFAULT_IO_CONFIG: dict[AllChannelNames, ChannelConfig] = { "ef": _DEFAULT_CHANNEL_CONFIG, "gh": _DEFAULT_CHANNEL_CONFIG, } + DEFAULT_TRIGGER_SOURCE: TRIGGERSOURCE = TRIGGERSOURCE.EXT_RISING_EDGE + +# NOTE Default readout times for the detectors connected to DDG2 +# These values are used to calculate the difference between the burst_period and the pulse width of +# individual channel pairs. They also mark a lower limit for the exposure time. Needs to be +# adjusted if the exposure time should possibly go below 0.2 ms. DEFAULT_READOUT_TIMES = {"ab": 2e-4, "cd": 2e-4, "ef": 2e-4, "gh": 2e-4} # 0.2 ms 5kHz +# NOTE Default refernce settings for each channel in DDG2 DEFAULT_REFERENCES: list[tuple[LiteralChannels, CHANNELREFERENCE]] = [ ("A", CHANNELREFERENCE.T0), ("B", CHANNELREFERENCE.A), @@ -69,9 +84,27 @@ DEFAULT_REFERENCES: list[tuple[LiteralChannels, CHANNELREFERENCE]] = [ ("H", CHANNELREFERENCE.G), ] +############################### +## DDG2 IMPLEMENTATION ######## +############################### + class DDG2(PSIDeviceBase, DelayGeneratorCSAXS): """ + + Implementation of the DelayGenerator DDG2 for the cSAXS beamline. This delay generator is + reponsible to create triggers for the detectors. It is configured in burst mode. Please + check the module docstring, the module README and the attached PDF 'trigger_scheme_ddg1_ddg2.pdf' + for more information about the expected cabling and trigger logic. + + The IOC prefix is 'X12SA-CPCL-DDG2:'. + + Args: + name (str): Name of the device. + prefix (str, optional): EPICS prefix for the device. Defaults to ''. + scan_info (ScanInfo | None, optional): Scan info object. Defaults to None. + device_manager (DeviceManagerBase | None, optional): Device manager. Defaults to None. + Implementation of DelayGeneratorCSAXS for the CSAXS master trigger delay generator at X12SA-CPCL-DDG2. This device is responsible for creating triggers in burst mode and is connected to a multiplexer that distributes the trigger to the detectors. The DDG2 is triggered by the DDG1 through the EXT/EN channel. @@ -80,10 +113,22 @@ class DDG2(PSIDeviceBase, DelayGeneratorCSAXS): # pylint: disable=attribute-defined-outside-init def on_connected(self) -> None: """ - Set the default values on the device - intended to overwrite everything to a usable default state. - Sets DEFAULT_IO_CONFIG into each channel, sets the trigger source to DEFAULT_TRIGGER_SOURCE. + + This method is called after the device is initialized and all signals are connected. This happens + when a device configuration is loaded in BEC. + + It sets the default values for this device - intended to overwrite everything to a usable default state. + For this purpose, we use the DEFAULT SETTINGS defined at the top of this module. + + The following procedure is followed: + - Stop the DDG to ensure it is not running. + - Then, we set the DEFAULT_IO_CONFIG for each channel, the trigger source to DEFAULT_TRIGGER_SOURCE, + and the channel references to DEFAULT_REFERENCES. """ - self.burst_disable() # it is possible to miss setting settings if burst is enabled + self.stop_ddg() + + # NOTE Please adjust the default settings under 'DEFAULT SETTINGS' at the top of this module if needed. + # This makes sure that we have a well defined default state for the DDG2 device. for channel, config in DEFAULT_IO_CONFIG.items(): self.set_io_values(channel, **config) self.set_trigger(DEFAULT_TRIGGER_SOURCE) @@ -91,66 +136,73 @@ class DDG2(PSIDeviceBase, DelayGeneratorCSAXS): def on_stage(self) -> DeviceStatus | StatusBase | None: """ - Stage logic for the DDG1 device, being th main trigger delay generator for CSAXS. - For standard scans, it will be triggered by a soft trigger from BEC. - It also has a hardware trigger feeded into the EXT/EN for fly-scanning, i.e. Galil stages. - This DDG is always not in burst mode. + This method is called when the device is staged before a scan. All information about the scan + is available through self.scan_info.msg at this point. The DDG2 needs to be configured to + create a sequence of TTL pulses in burst mode that are sent to the detectors. It therefore needs + to know the exposure time and frames per trigger from the self.scan_info.msg.scan_parameters. + + This logic is robust for step scans as well as fly scans, as the DDG2 is triggered by the DDG1 + through the EXT/EN channel. """ + ######################################## + ### Burst mode settings ################ + ######################################## + + # NOTE Only adjust settings if needed. DDG2 should always be in burst mode when used at CSAXS. + if self.burst_mode.get() == 0: + self.burst_mode.put(1) + + # Ensure that there is no delay for the burst + if self.burst_delay.get() != 0: + self.burst_delay.put(0) + exp_time = self.scan_info.msg.scan_parameters["exp_time"] frames_per_trigger = self.scan_info.msg.scan_parameters["frames_per_trigger"] - # a = t0 - # a has reference to t0, b has reference to a + + # NOTE Check if the exposure time is longer than all readout times. + # Raise a ValueError if requested exposure time is too short. if any(exp_time <= rt for rt in DEFAULT_READOUT_TIMES.values()): raise ValueError( f"Exposure time {exp_time} is too short for the readout times {DEFAULT_READOUT_TIMES}" ) + + ######################################### + ### Setup timing for burst and delays ### + ######################################### + + # Burst Period DDG2 settings. Only adjust them if needed. + if self.burst_count.get() != frames_per_trigger: + self.burst_count.put(frames_per_trigger) + if self.burst_period.get() != exp_time: + self.burst_period.put(exp_time) + + # Calculate the pulse width for the channel pair 'ab' burst_pulse_width = exp_time - DEFAULT_READOUT_TIMES["ab"] + + # Trigger detectors with delay 0, and pulse width = exp_time - readout_time self.set_delay_pairs(channel="ab", delay=0, width=burst_pulse_width) - self.burst_enable(count=frames_per_trigger, delay=0, period=exp_time) def on_pre_scan(self): """ - The delay generator occasionally needs a bit extra time to process all - commands from stage. Therefore, we introduce here a short sleep + + Method that is called just before a scan starts. It was observed that a short delay of 50ms + improves the overall stability in operation. This may be removed as other parts were adjusted, + but for now we will keep it as the delay is short. """ - # Delay Generator occasionaly needs a bit extra time to process all commands, sleep 50ms + # NOTE Short delay to allow for the HW to process the commands before the scan starts. + # This may no longer be needed after other adjustments, and may be removed in the future. time.sleep(0.05) def on_trigger(self) -> DeviceStatus | StatusBase | None: """ - DDG2 will not receive a trigger from BEC, but will be triggered by the DDG1 through the EXT/EN channel. - """ - def wait_for_status( - self, status: DeviceStatus, bit_event: STATUSBITS, timeout: float = 5 - ) -> None: - """Wait for a event status bit to be set. - - Args: - status (StatusBase): The status object to update. - bit_event (STATUSBITS): The event status bit to wait for. - timeout (float): Maximum time to wait for the event status bit to be set. + DDG2 does not implement any trigger specific logic as it is triggered by DDG1 through the EXT/EN channel. """ - current_time = time.time() - while not status.done: - self.state.proc_status.put(1, use_complete=True) - event_status = self.state.event_status.get() - if (STATUSBITS(event_status) & bit_event) == bit_event: - status.set_finished() - if time.time() - current_time > timeout: - status.set_exception( - TimeoutError( - f"Timeout waiting for status of device {self.name} for event_status {bit_event}" - ) - ) - break - time.sleep(0.1) - time.sleep(0.05) # Give time for the IOC to be ready again - return status + pass def on_stop(self) -> None: - """Stop the delay generator by setting the burst mode to 0""" + """Stop the delay generator""" self.stop_ddg() diff --git a/csaxs_bec/devices/epics/mcs_card/README.md b/csaxs_bec/devices/epics/mcs_card/README.md new file mode 100644 index 0000000..d76f8a3 --- /dev/null +++ b/csaxs_bec/devices/epics/mcs_card/README.md @@ -0,0 +1,13 @@ +# MCS Card implementation at the CSAXS beamline + +This module provides an ophyd device implementation for the SIS3820 Multi-Channel Scaler (MCS) card, used at the cSAXS beamline for time-resolved data acquisition. It interfaces with the EPICS IOC for the SIS3820 MCS card. +Information about the EPICS driver can be found here (https://millenia.cars.aps.anl.gov/software/epics/mcaStruck.html). + +# Important Notes +Operation of the MCS card requires proper configuration as some of the parameters are interdependent. In addition, empirical adjustments have been found to be necessary for optimal performance at the beamline. In its current implementation, comments about these dependencies are highlighted in the source code of the ophyd device classes [MCSCard](./mcs_card.py) and [MCSCardCSAXS](./mcs_card_csaxs.py). It is highly recommended to review these comments before refactoring, modifying, or extending the code. + +## Ophyd Device Implementation + +The ophyd device implementation is provided [MCSCard](./mcs_card.py). This class provides a basic interface to the MCS PVs, including configuration of parameters such as number of channels, dwell time, and control of acquisition start/stop. Please check the source code of the class for more details of the implementation. + +The [MCSCardCSAXS](./mcs_card_csaxs.py) class extends the basic MCSCard implementation with cSAXS-specific logic and configurations. Please be aware that this is also linked to the implementation of other devices, most notably the [delay generator integration](../delay_generator/README.md), which is used as the trigger source for the MCS card during operation. \ No newline at end of file diff --git a/csaxs_bec/devices/epics/mcs_card/mcs_card_csaxs.py b/csaxs_bec/devices/epics/mcs_card/mcs_card_csaxs.py index e86fa76..3f21f71 100644 --- a/csaxs_bec/devices/epics/mcs_card/mcs_card_csaxs.py +++ b/csaxs_bec/devices/epics/mcs_card/mcs_card_csaxs.py @@ -1,4 +1,12 @@ -"""Module for the MCSCard CSAXS implementation.""" +""" +Module for the MCSCard CSAXS implementation at cSAXS. + +Please respect the comments regarding timing and procedures of the MCS card. These +are highlighted with NOTE comments directly in the code, indicating requirements +for stable device operation. Most of these constraints were identified +empirically through extensive testing with the SIS3820 MCS card IOC and are intended +to prevent unexpected hardware or IOC behavior. +""" from __future__ import annotations @@ -14,9 +22,8 @@ import numpy as np from bec_lib.logger import bec_logger from ophyd import Component as Cpt from ophyd import EpicsSignalRO, Kind -from ophyd_devices import AsyncSignal, CompareStatus, ProgressSignal, StatusBase +from ophyd_devices import AsyncMultiSignal, CompareStatus, ProgressSignal, StatusBase from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase -from ophyd_devices.utils.bec_signals import AsyncMultiSignal from csaxs_bec.devices.epics.mcs_card.mcs_card import ( ACQUIREMODE, @@ -32,19 +39,33 @@ from csaxs_bec.devices.epics.mcs_card.mcs_card import ( @contextmanager -def suppress_mca_callbacks(mcs_card: MCSCard): +def suppress_mca_callbacks(mcs_card: MCSCard, restore_after_timeout: None | float = None): """ - Context manager to temporarily disable MCA channel callbacks. - Required to avoid additional callbacks when erasing all channels. + Utility context manager to suppress MCA channel callbacks temporarily. + It is required because erasing all channels via 'erase_all' PV triggers + callbacks for each channel. Depending on timing, this can interfere with + ongoing data acquisition so this context manager can be used to suppress + those callbacks temporarily. If used with restore_after_timeout, the suppression + will be automatically cleared after the specified timeout in seconds. + + NOTE: Please be aware that it does not restore previous state, which means + that _omit_mca_callbacks will remain set after exiting the context. It has + to be cleared manually if needed. This can be improved in the future, but + should be carefully coordinated with the logic implemented within '_on_counter_update'. Args: mcs_card (MCSCard): The MCSCard instance to suppress callbacks for. + restore_after_timeout (float | None): Optional timeout in seconds to automatically + clear the suppression after the specified time. If None, the original state + is not restored. """ - mcs_card._omit_mca_callbacks.set() + mcs_card._omit_mca_callbacks.set() # pylint: disable=protected-access try: yield finally: - pass + if restore_after_timeout is not None: + time.sleep(restore_after_timeout) + mcs_card._omit_mca_callbacks.clear() # pylint: disable=protected-access if TYPE_CHECKING: # pragma: no cover @@ -59,27 +80,43 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): The basic functionality is inherited from the MCSCard class. Please note that the number of channels is fixed to 32, so there will be data for all - 32 channels even if not all channels are used in the experiment. This setting can not - be realibly changed on the SIS3820 card's IOC through mux_output, so it is fixed here. - Mux_output should therefore also be set to 32 in the IOC configuration. + 32 channels. In addition, the logic of the card is linked to the timing system (DDG) + and therefore changes have to be coordinated with the logic on the DDG side. + + Args: + name (str): Name of the device. + prefix (str, optional): Prefix for the EPICS PVs. Defaults to "". """ + USER_ACCESS = ["mcs_recovery"] + + # NOTE The number of MCA channels is fixed to 32 for the CSAXS MCS card. + # On the IOC, we receive a 'warning' or 'error' once we set this channel for the + # envisioned input/output mode settings of the card. However, we need to know the + # channels set as callback timing relies on the channels to be set. + # For the future, we may consider adding an initialization parameter to set + # the number of channels, which in return limits the number of subscriptions + # on the channels. However, mux_output should still be set to 32 on the IOC side. + # If this limits performance, this should be investigated with Controls engineers and + # the IOC. NUM_MCA_CHANNELS: int = 32 - # All counter from the MCS card. + # MCA counters for the card. Channels 1-32 will be sent to BEC. mca = Cpt( AsyncMultiSignal, name="counters", signals=[ f"mca{i}" for i in range(1, 33) - ], # This needs to be in sync with counters DynamicDeviceComponent + ], # NOTE Channels 1-32, they need to be in sync with the 'counters' component (DynamicDeviceComponent) of the MCSCard ndim=1, async_update={"type": "add", "max_shape": [None]}, max_size=1000, kind=Kind.normal, - doc="AsyncMultiSignal for MCA card channels 1-32", + doc=( + "AsyncMultiSignal for MCA card channels 1-32." + "Cabling of the MCS card determines which channel corresponds to which input." + ), ) - # Progress Signal progress = Cpt(ProgressSignal, doc="ProgressSignal indicating the progress of the device") def __init__( @@ -90,21 +127,30 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): device_manager: DeviceManagerBase | None = None, **kwargs, ): - """ - Initialize the MCSCardCSAXS with the given arguments and keyword arguments. - """ super().__init__( name=name, prefix=prefix, scan_info=scan_info, device_manager=device_manager, **kwargs ) + # NOTE MCS Clock frequency. This is linked to the settings of the SIS3820 IOC and + # cabeling of the card. Currently, the 'output_mode' is set to MODE_2 and one of the outputs + # 6 or 7 (both 10MHz clocks) is used on channel 5 input for the timing signal of the IOC. + # Please adjust this comment if the cabling or IOC settings change. self._mcs_clock = 1e7 # 10MHz clock -> 1e7 Hz self._pv_timeout = 2.0 # seconds - self._rlock = RLock() # Needed to ensure thread safety for counter updates - self._acquisition_group: str = "monitored" + self._rlock = RLock() + + # NOTE This parameter will be sent with async data of the mcs counters. + # Based on scan-paramters, e.g. frames_per_trigger, this will be either + # 'monitored' or 'burst_group'. This means whether data from this channel + # is in sync with monitored devices or another group. In this scenario, + # the other group is called burst_group. Other detectors connected and + # triggered through the same timing system should implement the same logic + # to allow data to be properly grouped afterwards. + self._acquisition_group: str = "monitored" # default value, will be updated in on_stage self._num_total_triggers: int = 0 - # Event logic to schedule async data emission & monitoring - # Please note that complete needs to wait until all data was sent - # This requires additional logic and a thread to monitor the data emission + # Thread and event logic for monitoring async data emission after scan is done + # These are mostly internal variables for which values should not be changed externally. + # Adjusting the logic of them should also be handled with care and proper testing. self._scan_done_thread_kill_event: threading.Event = threading.Event() self._start_monitor_async_data_emission: threading.Event = threading.Event() self._scan_done_callbacks: list[Callable[[], None]] = [] @@ -118,17 +164,40 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): def on_connected(self): """ - Called when the device is connected. + This method is called once the device and all its PVs are connected. Any initial + setup of PVs should be managed here. Please be aware that settings of the MCS card + correlate with its operation mode, input/output modes, and timing. Changing single + parameters without understanding the overall logic may lead to unexpected behavior + of the device.Therefore, any modification of these parameters should be handled + with care and tested. + + A brief summary of the procesdure that is implemented here: + - Stop any ongoing acquisiton. + - Setup the Initial initial settings of the MCS card with respective operation modes + - Run 'mcs_recovery' procedure to ensure that no pending acquisition data is scheduled + to be pushed through mcs channels + - Subscribe a callback '_on_counter_update' to mcs counter PVs to forward + data through AsyncMultiSignal to BEC + - Start the monitoring thread for async data emission after scan is done """ + # NOTE Stop any ongoing acquisition first. This shut be done before setting any PVs. self.stop_all.put(1) - - # Setup the MCS card settings + ######################### + ### Setup MCS Card ### + ######################### + # Setup the MCS card settings. Please note that any runtime modification + # these parameter may lead to unexpected behavior of the device. + # Therefore this has to be set up correctly. self.channel_advance.set(CHANNELADVANCE.EXTERNAL).wait(timeout=self._pv_timeout) self.channel1_source.set(CHANNEL1SOURCE.EXTERNAL).wait(timeout=self._pv_timeout) self.prescale.set(1).wait(timeout=self._pv_timeout) self.user_led.set(0).wait(timeout=self._pv_timeout) + # NOTE The number of output channels has to be set to NUM_MCA_CHANNELS. + # The logic to send data to BEC relies on knowing how many channels are active. + self.mux_output.put(self.NUM_MCA_CHANNELS) + # Set the input and output modes & polarities self.input_mode.set(INPUTMODE.MODE_3).wait(timeout=self._pv_timeout) self.input_polarity.set(POLARITY.NORMAL).wait(timeout=self._pv_timeout) @@ -136,7 +205,10 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): self.output_polarity.set(POLARITY.NORMAL).wait(timeout=self._pv_timeout) self.count_on_start.set(0).wait(timeout=self._pv_timeout) - # Set ReadMode to PASSIVE, card will wait for external trigger to be read + # NOTE Data is read out when the MCS card finishes an acquisition. The logic for this + # is also linked to triggering on the DDG. + # Set ReadMode to PASSIVE, the card will wait either wait for readout command or + # automatically readout once acquisition is done. self.read_mode.set(READMODE.PASSIVE).wait(timeout=self._pv_timeout) # Set the acquire mode @@ -145,57 +217,93 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): # Subscribe the progress signal self.current_channel.subscribe(self._progress_update, run=False) - self.mcs_recovery() + # NOTE: Run a recovery procedure to ensure that the card has no pending data + # that needs to be pushed through the mca channels. The procedure involves + # stopping any ongoing acquisition and erasing all data on the card. Including + # a short sleep to allow the IOC to process the commands. + self.mcs_recovery(timeout=1) - # Subscribe to the mca updates + #################################### + ### Setup MCS Subscriptions ### + #################################### for sig in self.counters.component_names: sig_obj: EpicsSignalRO = getattr(self.counters, sig) sig_obj.subscribe(self._on_counter_update, run=False) # Start monitoring thread self._scan_done_thread.start() - def _on_counter_update(self, value, **kwargs) -> None: """ - Callback for counter updates of the mca channels (1-32). + Callback for counter updates of the mca channels (1-32). This callback is attached + to each mca channel PV on the MCS card. It collects data from all channels + and once all channels have been updated for a given acquisition, it pushes + the data to BEC through the AsyncMultiSignal 'mca'. - Data from the mca channels will be pushed to a list, and then forwarded to - the async multi signal 'raw' for readout after the trigger is complete. + It is important that mux_output is set to the correct number of channels in on_connected, + because the callback here waits for updates on all channels before pushing data to BEC. + The _rlock is used to ensure thread safety as multiple callbacks may be executed + simultaneously from different threads. + + If _omit_mca_callbacks is set, the callback will return immediately without processing the + data. This is used when erasing all channels to avoid interference with ongoing acquisition. + It has to manually cleared after the context manager 'suppress_mca_callbacks' is used. + + Args: + value: The new value from the counter PV. + **kwargs: Additional keyword arguments from the subscription, including 'obj' (the EpicsSignalRO instance). """ with self._rlock: self._mca_counter_index += 1 if self._omit_mca_callbacks.is_set(): return # Suppress callbacks when erasing all channels - signal = kwargs.get("obj", None) + signal: EpicsSignalRO | None = kwargs.get("obj", None) if signal is None: logger.error(f"Called without 'obj' in kwargs: {kwargs}") return - signal: EpicsSignalRO - attr_name = signal.attr_name - # Ignore updates for channels that are not setup through num_connected_channels + # NOTE: This relies on the naming convention of the mca channels being 'mca1', 'mca2', ..., 'mca32'. + # for the MCSCard class with the 'counters' DynamicDeviceComponent. + # Ignore any updates from channels beyond NUM_MCA_CHANNELS + attr_name = signal.attr_name index = int(attr_name[3:]) # Extract index from 'mcaX' if index > self.NUM_MCA_CHANNELS: return + # NOTE Depending on the scan parameters, we may either receive single values or numpy arrays. + # Therefore, we need to handle both cases here to ensure that data is always stored. We do + # this by converting single values to a list with one element, and numpy arrays to lists. if isinstance(value, np.ndarray): value = value.tolist() # Convert numpy array to list else: value = [value] # Received single value, convert to list + + # Store the value with timestamp. If available in kwargs, use provided timestamp from CA, + # otherwise use current time when received. self._current_data.update( {attr_name: {"value": value, "timestamp": kwargs.get("timestamp") or time.time()}} ) + + # Once we have received all channels, push data to BEC and reset for next accumulation if len(self._current_data) == self.NUM_MCA_CHANNELS: - # Send out data on multi async signal self.mca.put(self._current_data, acquisition_group=self._acquisition_group) self._current_data.clear() self._mca_counter_index = 0 self._current_data_index += 1 + # NOTE The logic for the device progress is not yet fully refined for all scan types. + # This has to be adjusted once fly scan and step scan logic is fully implemented. + # pylint: disable=unused-argument def _progress_update(self, *args, old_value: any, value: any, **kwargs) -> None: - """Callback for progress updates from ophyd subscription on current_channel.""" + """ + Callback to update the progress signals base on values of current_channel in respect to expected total triggers. + Logic for these updates need to be extended once fly and step scan logic is fully implemented. + + Args: + old_value: Previous value of the signal. + value: New value of the signal. + """ scan_done = bool(value == self._num_total_triggers) self.progress.put(value=value, max_value=self._num_total_triggers, done=scan_done) if scan_done: @@ -203,7 +311,16 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): def on_stage(self) -> None: """ - Called when the device is staged. + This method is called when the device is staged before a scan. Any bootstrapping required + for the scan should be handled here. We also need to handle MCS card specific logic to ensure + that the card is properly prepared for the scan. + + The following procedure is implemented here: + - Ensure that any ongoing acquisition is stopped (should never happen if not interfered with manually) + - Erase all data on the MCS card to ensure a clean start (should never + - Set acquisition parameters based on scan parameters (frames_per_trigger, num_points, acquisition_group) + - Clear any events and buffers related to async data emission. This includes '_omit_mca_callbacks', + '_start_monitor_async_data_emission', '_scan_done_callbacks', and '_current_data'. """ # NOTE: If for some reason, the card is still acquiring, we need to stop it first @@ -217,21 +334,23 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): status = CompareStatus(self.acquiring, ACQUIRING.DONE) status.wait(timeout=10) - # NOTE: Erase all will result in data emission through mca callback subscriptions + # NOTE: If current_channel != 0, erase all data on the card. This + # needs to be done with the 'suppress_mca_callbacks' context manager as erase_all will result + # in data emission through mca callback subscriptions. # The buffer needs to be cleared as this will otherwise lead to missing # triggers during the scan. Again, this should not happen if unstage is properly called. # But user interference or a restart of the device_server may lead to this situation. - # self.erase_all.put(1) - # time.sleep(3) if self.current_channel.get() != 0: - with suppress_mca_callbacks(self): + with suppress_mca_callbacks(self, restore_after_timeout=1.0): logger.warning( f"MCS Card {self.name} had still data in buffer Erased all data on staging and sleeping for 1 second." ) # Erase all data on the MCS card self.erase_all.put(1) - time.sleep(1) # Allow time to process erase + ##################################### + ### Setup Acquisition Parameters ### + ##################################### triggers = self.scan_info.msg.scan_parameters.get("frames_per_trigger", 1) num_points = self.scan_info.msg.num_points self._num_total_triggers = triggers * num_points @@ -239,72 +358,130 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): self.preset_real.set(0).wait(timeout=self._pv_timeout) self.num_use_all.set(triggers).wait(timeout=self._pv_timeout) - # Make sure to clear any remaining data in the local buffer + # Clear any previous data, just to be sure with self._rlock: self._current_data.clear() - # Reset monitoring of async data emission + + # NOTE Reset events for monitoring async_data_emission thread which is + # running during complete to wait for all data from the card + # to be emitted to BEC. self._start_monitor_async_data_emission.clear() + + # Clear any previous scan done callbacks self._scan_done_callbacks.clear() + + # Reset counter for data index of emitted data, NOTE for fly scans, this logic may have to be adjusted. self._current_data_index = 0 + # NOTE Make sure that the signal that omits mca callbacks is cleared + self._omit_mca_callbacks.clear() + def on_unstage(self) -> None: """ - Called when the device is unstaged. + Called when the device is unstaged. This method should be omnipotent and resolve fast. + It stops any ongoing acquisition, erases all data on the MCS and clears the local buffer '_current_data'. + + NOTE: It is important that the logic for on_complete is solid and properly waiting for mca data to be emitted + to BEC. Otherwise, unstage may interfere with ongoing data emission. Unstage is called after complete during scans. + It is crucial that the device itself calls '_omit_mca_callbacks' in its on_stage method to make sure + that data is emitted once the card is properly staged. """ self.stop_all.put(1) - # Make sure that upon unstaging, all data on the MCS card is erased. with suppress_mca_callbacks(self): with self._rlock: self._current_data.clear() self.erase_all.put(1) - def _monitor_async_data_emission(self, timeout: int = 10) -> None: - """Monitor data emission after scan is done.""" + def _monitor_async_data_emission(self) -> None: + """ + Monitoring loop that runs in a separate thread to check if all async data has been emitted to BEC. + It is IDLE most of the time, but activate in the 'on_complete' method called by 'complete'. + + The check is done by comparing the number of data updates '_current_data_index' received through + mca channel callbacks with the expected number of points in the scan. Once they match, all + callbacks in _scan_done_callbacks are called to indicate that data emission is done. + Callbacks need to also accept and handle exceptions to properly report failure. + NOTE! This logic currently works for any step scan, but has to be extended for fly scans. + """ while not self._scan_done_thread_kill_event.is_set(): while self._start_monitor_async_data_emission.wait(): try: if self._current_data_index == self.scan_info.msg.num_points: for callback in self._scan_done_callbacks: - callback() + callback(exception=None) time.sleep(0.02) # 20ms delay to avoid busy loop - except Exception: # pylint: disable=broad-except + except Exception as exc: # pylint: disable=broad-except content = traceback.format_exc() logger.error( f"Exception in monitoring thread of complete for {self.name}:\n{content}" "Running callbacks to avoid deadlock." ) for callback in self._scan_done_callbacks: - callback() + callback(exception=exc) - def _status_callback(self, status: StatusBase) -> None: + def _status_callback(self, status: StatusBase, exception=None) -> None: """Callback for status completion.""" - if not status.done: - status.set_finished() self._start_monitor_async_data_emission.clear() # Stop monitoring + # NOTE Important check as set_finished or set_exception should not be called + # if the status is already done (e.g. cancelled externally) + if status.done: + return # Already done and cancelled externally. + if exception: + status.set_exception(exception) + else: + status.set_finished() def _status_failed_callback(self, status: StatusBase) -> None: - """Callback for status failure.""" - if status.done and not status.success: + """Callback for status failure, the monitoring thread should be stopped.""" + # NOTE Check for status.done and status.success is important to avoid + if status.done: self._start_monitor_async_data_emission.clear() # Stop monitoring def on_complete(self) -> CompareStatus: - """On scan completion.""" - # Prepare callback for data emission done + """ + + Method that is called at the end of scan core, but before unstage. This method is + used to report whether the device successfully completed its data acquisition for the scan. + The check has to be implemented asynchronously and resolve through a status (future) object + returned by this method. + NOTE: For the MCS card, we need to ensure that all data has been acquired + and emitted to BEC as updates after 'on_complete' resolved will be rejected by BEC. + Therefore, we need to ensure that all data has been emitted to BEC before + reporting completion of the device. + + This method implements the following procedure: + - Starts the IDLE async data monitoring thread that checks if all expected data + has been emitted to BEC through the mca channel callbacks. + - Use a CompareStatus to monitor when the MCS card becomes DONE. Please note that this + only indicates that the card has finished acquisition, but not that all data has been + emitted to BEC. + - Return combined status object. A callback is registered to handle failure of the status + if it is stopped externally, e.g. through scan abort. This should ensure that the + monitoring thread is stopped properly. + + """ + # Prepare and register status callback for the async monitoring loop status_async_data = StatusBase(obj=self) self._scan_done_callbacks.append(partial(self._status_callback, status_async_data)) - # Start done callback loop + # Set the event to start monitoring async data emission self._start_monitor_async_data_emission.set() + # Add CompareStatus for Acquiring DONE status = CompareStatus(self.acquiring, ACQUIRING.DONE) # Combine both statuses ret_status = status & status_async_data + # Handle external stop/cancel, and stop monitoring ret_status.add_callback(self._status_failed_callback) self.cancel_on_stop(ret_status) return status def on_destroy(self): + """ + The on destroy hook is called when the device is destroyed, but also reloaded. + Here, we need to clean up all resources used up by the device, including running threads. + """ self._scan_done_thread_kill_event.set() self._start_monitor_async_data_emission.set() if self._scan_done_thread.is_alive(): @@ -313,18 +490,30 @@ class MCSCardCSAXS(PSIDeviceBase, MCSCard): logger.warning(f"Thread for device {self.name} did not terminate properly.") def on_stop(self) -> None: - """ - Called when the scan is stopped. - """ + """Hook called when the device is stopped. In addition, any status that is registered through cancel_on_stop will be cancelled here.""" self.stop_all.put(1) self.erase_all.put(1) - def mcs_recovery(self, timeout:int=1) -> None: - """Recovery procedure for the mcs card""" - sleep_time = timeout / 2 # 2 sleeps - logger.info(f"Running recovery procedure for MCS card {self.name} with {sleep_time}s sleep, calling stop_all and erase_all, and another {sleep_time}s sleep") + def mcs_recovery(self, timeout: int = 1) -> None: + """ + Recovery procedure for the mcs card. This procedure has been empirically found and can + be used to ensure that the MCS card is stopped and has no pending data to be emitted. + It involves stopping any ongoing acquisition and erasing all data on the card, with + a sleep in between to allow the IOC to process the commands. + + Args: + timeout (int): Total timeout for the recovery procedure. Defaults to 1 second. + """ + sleep_time = timeout / 2 # 2 sleeps + logger.info( + f"Running recovery procedure for MCS card {self.name} with {sleep_time}s sleep, calling stop_all and erase_all, and another {sleep_time}s sleep" + ) + # First erase and start ongoing acquisition. self.erase_start.put(1) time.sleep(sleep_time) + # After a brief processing time, we stop any ongoing acquisition. self.stop_all.put(1) - self.erase_all.put(1) - time.sleep(sleep_time) \ No newline at end of file + # Finally, we erase all data while suppressing mca callbacks to avoid interference. + # We restore the callback suppression after timeout to ensure proper operation afterwards. + with suppress_mca_callbacks(self, restore_after_timeout=sleep_time): + self.erase_all.put(1)