From 4437bb13b8e9ee399013d172620a5d6b5b78eb76 Mon Sep 17 00:00:00 2001 From: gac-x05la Date: Wed, 16 Apr 2025 15:01:06 +0200 Subject: [PATCH] WIP --- tomcat_bec/devices/aerotech/AerotechPso.py | 164 +++++++----- tomcat_bec/devices/aerotech/AerotechTasks.py | 140 +++++----- .../devices/gigafrost/gigafrostcamera.py | 9 - tomcat_bec/devices/gigafrost/stddaq_client.py | 249 +++++++++++------- .../devices/gigafrost/stddaq_preview.py | 181 ++++++------- 5 files changed, 406 insertions(+), 337 deletions(-) diff --git a/tomcat_bec/devices/aerotech/AerotechPso.py b/tomcat_bec/devices/aerotech/AerotechPso.py index 9202393..7dab0a3 100644 --- a/tomcat_bec/devices/aerotech/AerotechPso.py +++ b/tomcat_bec/devices/aerotech/AerotechPso.py @@ -7,78 +7,17 @@ synchronized output (PSO) interface. """ from time import sleep import numpy as np -from ophyd import Component, EpicsSignal, EpicsSignalRO, Kind +from ophyd import Device, Component, EpicsSignal, EpicsSignalRO, Kind from ophyd.status import DeviceStatus -from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase as PSIDeviceBase -from ophyd_devices.interfaces.base_classes.psi_detector_base import ( - CustomDetectorMixin as CustomDeviceMixin, -) +from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase + from bec_lib import bec_logger + logger = bec_logger.logger -class AerotechPsoDistanceMixin(CustomDeviceMixin): - """Mixin class for self-configuration and staging - """ - # parent : aa1Tasks - def on_stage(self) -> None: - """Configuration and staging - - NOTE: Scans don't have to fully configure the device, that can be done - manually outside. However we expect that the device is disabled - when not in use. I.e. this method is not expected to be called when - PSO is not needed or when it'd conflict with other devices. - """ - # Fish out configuration from scaninfo (does not need to be full configuration) - d = {} - if "kwargs" in self.parent.scaninfo.scan_msg.info: - scanargs = self.parent.scaninfo.scan_msg.info["kwargs"] - if "pso_distance" in scanargs: - d["pso_distance"] = scanargs["pso_distance"] - if "pso_wavemode" in scanargs: - d["pso_wavemode"] = scanargs["pso_wavemode"] - if "pso_w_pulse" in scanargs: - d["pso_w_pulse"] = scanargs["pso_w_pulse"] - if "pso_t_pulse" in scanargs: - d["pso_t_pulse"] = scanargs["pso_t_pulse"] - if "pso_n_pulse" in scanargs: - d["pso_n_pulse"] = scanargs["pso_n_pulse"] - - # Perform bluesky-style configuration - if len(d) > 0: - logger.info(f"[{self.parent.name}] Configuring with:\n{d}") - self.parent.configure(d=d) - - # Stage the PSO distance module - self.parent.bluestage() - - def on_unstage(self): - """Standard bluesky unstage""" - # Ensure output is set to low - # if self.parent.output.value: - # self.parent.toggle() - # Turn off window mode - self.parent.winOutput.set("Off").wait() - self.parent.winEvents.set("Off").wait() - # Turn off distance mode - self.parent.dstEventsEna.set("Off").wait() - self.parent.dstCounterEna.set("Off").wait() - # Disable output - self.parent.outSource.set("None").wait() - # Sleep for one poll period - sleep(0.2) - - def on_trigger(self) -> None | DeviceStatus: - """Fire a single PSO event (i.e. manual software trigger)""" - # Only trigger if distance was set to invalid - logger.warning(f"[{self.parent.name}] Triggerin...") - if self.parent.dstDistanceVal.get() == 0: - status = self.parent._eventSingle.set(1, settle_time=0.1) - return status - - -class aa1AxisPsoBase(PSIDeviceBase): +class aa1AxisPsoBase(PSIDeviceBase, Device): """Position Sensitive Output - Base class This class provides convenience wrappers around the Aerotech IOC's PSO @@ -152,8 +91,34 @@ class aa1AxisPsoBase(PSIDeviceBase): outPin = Component(EpicsSignalRO, "PIN", auto_monitor=True, kind=Kind.config) outSource = Component(EpicsSignal, "SOURCE", put_complete=True, kind=Kind.config) - def trigger(self, settle_time=0.1) -> DeviceStatus: + def __init__( + self, + prefix="", + *, + name, + kind=None, + read_attrs=None, + configuration_attrs=None, + parent=None, + scan_info=None, + **kwargs, + ): + # Need to call super() to call the mixin class + super().__init__( + prefix=prefix, + name=name, + kind=kind, + read_attrs=read_attrs, + configuration_attrs=configuration_attrs, + parent=parent, + scan_info=scan_info, + **kwargs, + ) + + def fire(self, settle_time=0.1) -> None | DeviceStatus: """Fire a single PSO event (i.e. manual software trigger)""" + # Only trigger if distance was set to invalid + logger.warning(f"[{self.name}] Triggerin...") self._eventSingle.set(1, settle_time=settle_time).wait() status = DeviceStatus(self) status.set_finished() @@ -163,7 +128,7 @@ class aa1AxisPsoBase(PSIDeviceBase): """Toggle waveform outup""" orig_wave_mode = self.waveMode.get() self.waveMode.set("Toggle").wait() - self.trigger(0.1) + self.fire(0.1) self.waveMode.set(orig_wave_mode).wait() def configure(self, d: dict): @@ -232,13 +197,12 @@ class aa1AxisPsoDistance(aa1AxisPsoBase): ``` """ - custom_prepare_cls = AerotechPsoDistanceMixin USER_ACCESS = ["configure", "prepare", "toggle"] _distance_value = None # ######################################################################## # PSO high level interface - def configure(self, d: dict = {}) -> tuple: + def configure(self, d: dict = None) -> tuple: """Simplified configuration interface to access the most common functionality for distance mode PSO. @@ -282,7 +246,49 @@ class aa1AxisPsoDistance(aa1AxisPsoBase): logger.info(f"[{self.name}] PSO configured to {pso_wavemode} mode") return (old, new) - def bluestage(self) -> None: + def on_stage(self) -> None: + """Configuration and staging + + NOTE: Scans don't have to fully configure the device, that can be done + manually outside. However we expect that the device is disabled + when not in use. I.e. this method is not expected to be called when + PSO is not needed or when it'd conflict with other devices. + """ + # Fish out configuration from scaninfo (does not need to be full configuration) + d = {} + if "kwargs" in self.scaninfo.scan_msg.info: + scanargs = self.scaninfo.scan_msg.info["kwargs"] + if "pso_distance" in scanargs: + d["pso_distance"] = scanargs["pso_distance"] + if "pso_wavemode" in scanargs: + d["pso_wavemode"] = scanargs["pso_wavemode"] + if "pso_w_pulse" in scanargs: + d["pso_w_pulse"] = scanargs["pso_w_pulse"] + if "pso_t_pulse" in scanargs: + d["pso_t_pulse"] = scanargs["pso_t_pulse"] + if "pso_n_pulse" in scanargs: + d["pso_n_pulse"] = scanargs["pso_n_pulse"] + + # Perform bluesky-style configuration + if d: + # logger.info(f"[{self.name}] Configuring with:\n{d}") + self.configure(d=d) + + # Stage the PSO distance module + self.arm() + + def on_unstage(self): + """Turn off the PSO module""" + self.disarm() + + def on_trigger(self, settle_time=0.1) -> None | DeviceStatus: + """Fire a single PSO event (i.e. manual software trigger)""" + # Only trigger if distance was set to invalid + # if self.dstDistanceVal.get() == 0: + logger.warning(f"[{self.name}] Triggerin...") + return self.fire(settle_time) + + def arm(self) -> None: """Bluesky style stage""" # Stage the PSO distance module and zero counter if isinstance(self._distance_value, (np.ndarray, list, tuple)): @@ -293,3 +299,19 @@ class aa1AxisPsoDistance(aa1AxisPsoBase): if self.dstDistanceVal.get() > 0: self.dstEventsEna.set("On").wait() self.dstCounterEna.set("On").wait() + + def disarm(self): + """Standard bluesky unstage""" + # Ensure output is set to low + # if self.output.value: + # self.toggle() + # Turn off window mode + self.winOutput.set("Off").wait() + self.winEvents.set("Off").wait() + # Turn off distance mode + self.dstEventsEna.set("Off").wait() + self.dstCounterEna.set("Off").wait() + # Disable output + self.outSource.set("None").wait() + # Sleep for one poll period + sleep(0.2) diff --git a/tomcat_bec/devices/aerotech/AerotechTasks.py b/tomcat_bec/devices/aerotech/AerotechTasks.py index 62ae0c0..d7a2639 100644 --- a/tomcat_bec/devices/aerotech/AerotechTasks.py +++ b/tomcat_bec/devices/aerotech/AerotechTasks.py @@ -5,71 +5,17 @@ interface. @author: mohacsi_i """ -from ophyd import Component, EpicsSignal, EpicsSignalRO, Kind +from ophyd import Device, Component, EpicsSignal, EpicsSignalRO, Kind from ophyd.status import DeviceStatus, SubscriptionStatus -from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase as PSIDeviceBase -from ophyd_devices.interfaces.base_classes.psi_detector_base import ( - CustomDetectorMixin as CustomDeviceMixin, -) +from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase + from bec_lib import bec_logger logger = bec_logger.logger -class AerotechTasksMixin(CustomDeviceMixin): - """Mixin class for self-configuration and staging""" - - # parent : aa1Tasks - def on_stage(self) -> None: - """Configuration and staging - - In the BEC model ophyd devices must fish out their own configuration from the 'scaninfo'. - I.e. they need to know which parameters are relevant for them at each scan. - - NOTE: Scans don't have to fully configure the device, that can be done - manually outside. However we expect that the device is disabled - when not in use. I.e. this method is not expected to be called when - PSO is not needed or when it'd conflict with other devices. - """ - # Fish out our configuration from scaninfo (via explicit or generic addressing) - d = {} - if "kwargs" in self.parent.scaninfo.scan_msg.info: - scanargs = self.parent.scaninfo.scan_msg.info["kwargs"] - if self.parent.scaninfo.scan_type in ("script", "scripted"): - # NOTE: Scans don't have to fully configure the device - if "script_text" in scanargs and scanargs["script_text"] is not None: - d["script_text"] = scanargs["script_text"] - if "script_file" in scanargs and scanargs["script_file"] is not None: - d["script_file"] = scanargs["script_file"] - if "script_task" in scanargs and scanargs["script_task"] is not None: - d["script_task"] = scanargs["script_task"] - - # FIXME: The above should be exchanged with: - # d = self.parent.scan_info.scan_msg.scan_parameters.get("aerotech_config") - - # Perform bluesky-style configuration - if d: - # logger.warning(f"[{self.parent.name}] Configuring with:\n{d}") - self.parent.configure(d=d) - - # The actual staging - self.parent.arm() - - def on_unstage(self): - """Stop the currently selected task""" - self.parent.disarm() - - def on_stop(self): - """Stop the currently selected task""" - self.on_unstage() - - def on_kickoff(self): - """Start execution of the selected task""" - self.parent.launch() - - -class aa1Tasks(PSIDeviceBase): +class aa1Tasks(PSIDeviceBase, Device): """Task management API The place to manage tasks and AeroScript user files on the controller. @@ -99,8 +45,8 @@ class aa1Tasks(PSIDeviceBase): ''' """ + USER_ACCESS = ["arm", "disarm", "launch", "kickoff"] - custom_prepare_cls = AerotechTasksMixin _failure = Component(EpicsSignalRO, "FAILURE", auto_monitor=True, kind=Kind.normal) errStatus = Component(EpicsSignalRO, "ERRW", auto_monitor=True, kind=Kind.normal) @@ -117,6 +63,30 @@ class aa1Tasks(PSIDeviceBase): EpicsSignal, "FILEWRITE", string=True, kind=Kind.omitted, put_complete=True ) + def __init__( + self, + prefix="", + *, + name, + kind=None, + read_attrs=None, + configuration_attrs=None, + parent=None, + scan_info=None, + **kwargs, + ): + # Need to call super() to call the mixin class + super().__init__( + prefix=prefix, + name=name, + kind=kind, + read_attrs=read_attrs, + configuration_attrs=configuration_attrs, + parent=parent, + scan_info=scan_info, + **kwargs, + ) + def configure(self, d: dict) -> tuple: """Configuration interface for flying""" # Common operations @@ -141,6 +111,52 @@ class aa1Tasks(PSIDeviceBase): new = self.read_configuration() return (old, new) + def on_stage(self) -> None: + """Configuration and staging + + In the BEC model ophyd devices must fish out their own configuration from the 'scaninfo'. + I.e. they need to know which parameters are relevant for them at each scan. + + NOTE: Scans don't have to fully configure the device, that can be done + manually outside. However we expect that the device is disabled + when not in use. I.e. this method is not expected to be called when + PSO is not needed or when it'd conflict with other devices. + """ + # Fish out our configuration from scaninfo (via explicit or generic addressing) + d = {} + if "kwargs" in self.scaninfo.scan_msg.info: + scanargs = self.scaninfo.scan_msg.info["kwargs"] + if self.scaninfo.scan_type in ("script", "scripted"): + # NOTE: Scans don't have to fully configure the device + if "script_text" in scanargs and scanargs["script_text"] is not None: + d["script_text"] = scanargs["script_text"] + if "script_file" in scanargs and scanargs["script_file"] is not None: + d["script_file"] = scanargs["script_file"] + if "script_task" in scanargs and scanargs["script_task"] is not None: + d["script_task"] = scanargs["script_task"] + + # FIXME: The above should be exchanged with: + # d = self.scan_info.scan_msg.scan_parameters.get("aerotech_config") + + # Perform bluesky-style configuration + if d: + self.configure(d=d) + + # The actual staging + self.arm() + + def on_unstage(self): + """Stop the currently selected task""" + self.disarm() + + def on_stop(self): + """Stop the currently selected task""" + self.unstage() + + def on_kickoff(self): + """Start execution of the selected task""" + self.launch() + def arm(self) -> None: """Bluesky style stage, prepare, but does not execute""" if self.taskIndex.get() in (0, 1, 2): @@ -165,12 +181,6 @@ class aa1Tasks(PSIDeviceBase): raise RuntimeError("Failed to load task, please check the Aerotech IOC") return status - def kickoff(self): - """Missing kickoff, for real?""" - self.custom_prepare.on_kickoff() - - ########################################################################## - # Bluesky flyer interface def complete(self) -> SubscriptionStatus: """Wait for a RUNNING task""" timestamp_ = 0 diff --git a/tomcat_bec/devices/gigafrost/gigafrostcamera.py b/tomcat_bec/devices/gigafrost/gigafrostcamera.py index 5214704..8527627 100644 --- a/tomcat_bec/devices/gigafrost/gigafrostcamera.py +++ b/tomcat_bec/devices/gigafrost/gigafrostcamera.py @@ -189,13 +189,6 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase): if daq_update: self.backend.set_config(daq_update, force=False) - def disarm(self): - """ Bluesky style unstage""" - # Switch to idle - self.cmdStartCamera.set(0).wait() - if self.autoSoftEnable.get(): - self.cmdSoftEnable.set(0).wait() - def set_acquisition_mode(self, acq_mode): """Set acquisition mode @@ -217,7 +210,6 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase): self.enable_mode = "soft" self.trigger_mode = "auto" self.exposure_mode = "timer" - elif acq_mode in ["ext_enable", "external_enable"]: # NOTE: Trigger using external hardware events via enable input (actually works) # Switch to physical enable signal @@ -228,7 +220,6 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase): self.enable_mode = "external" self.trigger_mode = "auto" self.exposure_mode = "timer" - elif acq_mode == "soft": # NOTE: Fede's configuration for continous streaming # Switch to physical enable signal diff --git a/tomcat_bec/devices/gigafrost/stddaq_client.py b/tomcat_bec/devices/gigafrost/stddaq_client.py index 042cf48..34006b2 100644 --- a/tomcat_bec/devices/gigafrost/stddaq_client.py +++ b/tomcat_bec/devices/gigafrost/stddaq_client.py @@ -13,7 +13,7 @@ import requests import os from ophyd import Signal, Component, Kind -from ophyd.status import SubscriptionStatus, Status +from ophyd.status import SubscriptionStatus from websockets.sync.client import connect, ClientConnection from websockets.exceptions import ConnectionClosedOK, ConnectionClosedError @@ -28,6 +28,8 @@ logger = bec_logger.logger class StdDaqMixin(CustomDeviceMixin): # pylint: disable=protected-access + _mon = None + def on_stage(self) -> None: """Configuration and staging @@ -39,9 +41,6 @@ class StdDaqMixin(CustomDeviceMixin): # Fish out our configuration from scaninfo (via explicit or generic addressing) # NOTE: Scans don't have to fully configure the device d = {} - # scan_parameters = self.parent.scaninfo.scan_msg.scan_parameters - # std_daq_params = scan_parameters.get("std_daq_params") - if "kwargs" in self.parent.scaninfo.scan_msg.info: scanargs = self.parent.scaninfo.scan_msg.info["kwargs"] if "image_width" in scanargs and scanargs["image_width"] is not None: @@ -50,10 +49,6 @@ class StdDaqMixin(CustomDeviceMixin): d["image_height"] = scanargs["image_height"] if "nr_writers" in scanargs and scanargs["nr_writers"] is not None: d["nr_writers"] = scanargs["nr_writers"] - if "system_config" in scanargs and scanargs["system_config"] is not None: - if scanargs["system_config"]["file_directory"]: - file_directory = scanargs["system_config"]["file_directory"] - ### to be used in the future to substitute the procedure using file path if "file_path" in scanargs and scanargs["file_path"] is not None: self.parent.file_path.set(scanargs["file_path"].replace("data", "gpfs")).wait() print(scanargs["file_path"]) @@ -98,20 +93,40 @@ class StdDaqMixin(CustomDeviceMixin): sleep(0.5) # Try to start a new run (reconnects) - self.parent.arm() - - + self.parent.bluestage() + # And start status monitoring + self._mon = Thread(target=self.monitor, daemon=True) + self._mon.start() def on_unstage(self): """Stop a running acquisition and close connection""" print("Creating virtual dataset") self.parent.create_virtual_dataset() - self.parent.disarm() + self.parent.blueunstage() def on_stop(self): """Stop a running acquisition and close connection""" - self.parent.disarm() + self.parent.blueunstage() + def monitor(self) -> None: + """Monitor status messages while connection is open. This will block the reply monitoring + to calling unstage() might throw. Status updates are sent every 1 seconds, but finishing + acquisition means StdDAQ will close connection, so there's no idle state polling. + """ + try: + sleep(0.2) + for msg in self.parent._wsclient: + message = json.loads(msg) + self.parent.runstatus.put(message["status"], force=True) + # logger.info(f"[{self.parent.name}] Pushed status: {message['status']}") + except (ConnectionClosedError, ConnectionClosedOK, AssertionError): + # Libraty throws theese after connection is closed + return + except Exception as ex: + logger.warning(f"[{self.parent.name}] Exception in polling: {ex}") + return + finally: + self._mon = None class StdDaqClient(PSIDeviceBase): @@ -137,9 +152,9 @@ class StdDaqClient(PSIDeviceBase): "nuke", "connect", "message", - "arm", - "disarm", - "complete", + "state", + "bluestage", + "blueunstage", ] _wsclient = None @@ -153,14 +168,12 @@ class StdDaqClient(PSIDeviceBase): file_prefix = Component(Signal, value="file", kind=Kind.config) # Configuration attributes rest_url = Component(Signal, kind=Kind.config, metadata={"write_access": False}) - datasource = Component(Signal, kind=Kind.config, metadata={"write_access": False}) cfg_detector_name = Component(Signal, kind=Kind.config) cfg_detector_type = Component(Signal, kind=Kind.config) cfg_bit_depth = Component(Signal, kind=Kind.config) cfg_pixel_height = Component(Signal, kind=Kind.config) cfg_pixel_width = Component(Signal, kind=Kind.config) cfg_nr_writers = Component(Signal, kind=Kind.config) - _mon = None def __init__( self, @@ -174,7 +187,7 @@ class StdDaqClient(PSIDeviceBase): device_manager=None, ws_url: str = "ws://localhost:8080", rest_url: str = "http://localhost:5000", - data_source_name="", + data_source_name=None, **kwargs, ) -> None: super().__init__( @@ -189,66 +202,67 @@ class StdDaqClient(PSIDeviceBase): ) self.ws_url.set(ws_url, force=True).wait() self.rest_url.set(rest_url, force=True).wait() - self.datasource.set(data_source_name, force=True).wait() + self.data_source_name = data_source_name # Connect to the DAQ and initialize values try: self.get_daq_config(update=True) except Exception as ex: logger.error(f"Failed to connect to the stdDAQ REST API\n{ex}") - # Connect to websockets and start poller - try: - self.connect() - except Exception as ex: - logger.error(f"Failed to connect to the stdDAQ websocket interface\n{ex}") - def connect(self) -> None: + def connect(self) -> ClientConnection: """Connect to the StdDAQ's websockets interface StdDAQ may reject connection for a few seconds after restart, or when it wants so if it fails, wait a bit and try to connect again. """ - # Connect to stdDAQ - logger.debug(f"[{self.name}] Connecting to stdDAQ at {self.ws_url.get()}") - self._wsclient = connect(self.ws_url.get()) - # And start status monitoring - self._mon = Thread(target=self.monitor, daemon=True) - self._mon.start() + num_retry = 0 + while num_retry < 5: + try: + logger.debug(f"[{self.name}] Connecting to stdDAQ at {self.ws_url.get()}") + connection = connect(self.ws_url.get()) + logger.debug(f"[{self.name}] Connected to stdDAQ after {num_retry} tries") + return connection + except ConnectionRefusedError: + num_retry += 1 + sleep(2) + raise ConnectionRefusedError("The stdDAQ websocket interface refused connection 5 times.") - def monitor(self) -> None: - """Monitor status messages while connection is open. Status updates are - sent every 1 seconds, or when there's a transition. - """ - try: - for msg in self._wsclient: - message = json.loads(msg) - self.runstatus.put(message["status"], force=True) - # logger.info(f"[{self.parent.name}] Pushed status: {message['status']}") - except Exception as ex: - logger.warning(f"[{self.name}] Exception in polling: {ex}") - return - finally: - self._mon = None - - def message(self, message: dict) -> None: + def message(self, message: dict, timeout=1, wait_reply=True, client=None) -> None | str: """Send a message to the StdDAQ and receive a reply + + Note: finishing acquisition means StdDAQ will close connection, so + there's no idle state polling. """ # Prepare message msg = json.dumps(message) if isinstance(message, dict) else str(message) # Connect if client was destroyed if self._wsclient is None: - self.connect() + self._wsclient = self.connect() # Send message (reopen connection if needed) msg = json.dumps(message) if isinstance(message, dict) else str(message) try: self._wsclient.send(msg) - except (ConnectionClosedError, ConnectionClosedOK, AttributeError): + except (ConnectionClosedError, ConnectionClosedOK, AttributeError) as ex: # Re-connect if the connection was closed - self.connect() + self._wsclient = self.connect() self._wsclient.send(msg) + # Wait for reply + reply = None + if wait_reply: + try: + reply = self._wsclient.recv(timeout) + return reply + except (ConnectionClosedError, ConnectionClosedOK) as ex: + self._wsclient = None + logger.error(f"[{self.name}] WS connection was closed before reply: {ex}") + except (TimeoutError, RuntimeError) as ex: + logger.error(f"[{self.name}] Error in receiving ws reply: {ex}") + return reply + def configure(self, d: dict = None): """Configure the next scan with the stdDAQ @@ -259,8 +273,6 @@ class StdDaqClient(PSIDeviceBase): images (limited by the ringbuffer size and backend speed). file_path: str, optional File path to save the data, usually GPFS. - file_prefix: str, optional - File prefix to save the data [default = file]. image_width : int, optional ROI size in the x-direction [pixels]. image_height : int, optional @@ -283,10 +295,6 @@ class StdDaqClient(PSIDeviceBase): # Run parameters if "num_points_total" in d: self.num_images.set(d["num_points_total"]).wait() - if "file_path" in d: - self.file_path.set(d["file_path"]).wait() - if "file_prefix" in d: - self.file_prefix.set(d["file_prefix"]).wait() # Restart the DAQ if resolution changed cfg = self.get_daq_config() @@ -298,8 +306,8 @@ class StdDaqClient(PSIDeviceBase): ): # Stop if current status is not idle - # if self.runstatus.get() != "idle": - # logger.warning(f"[{self.name}] stdDAQ reconfiguration might corrupt files") + if self.state() != "idle": + logger.warning(f"[{self.name}] stdDAQ reconfiguration might corrupt files") # Update retrieved config cfg["image_pixel_height"] = int(self.cfg_pixel_height.get()) @@ -310,7 +318,7 @@ class StdDaqClient(PSIDeviceBase): sleep(1) self.get_daq_config(update=True) - def arm(self): + def bluestage(self): """Stages the stdDAQ Opens a new connection to the stdDAQ, sends the start command with @@ -318,58 +326,103 @@ class StdDaqClient(PSIDeviceBase): it for obvious failures. """ # Can't stage into a running exposure - # if self.runstatus.get() != "idle": - # raise RuntimeError(f"[{self.name}] stdDAQ can't stage from state: {self.runstatus.get()}") + if self.state() != "idle": + raise RuntimeError(f"[{self.name}] stdDAQ can't stage from state: {self.state()}") - # Ensure expected shape - self.validate() + # Must make sure that image size matches the data source + if self.data_source_name is not None: + cam_img_w = self.device_manager.devices[self.data_source_name].cfgRoiX.get() + cam_img_h = self.device_manager.devices[self.data_source_name].cfgRoiY.get() + daq_img_w = self.cfg_pixel_width.get() + daq_img_h = self.cfg_pixel_height.get() + + if not (daq_img_w == cam_img_w and daq_img_h == cam_img_h): + raise RuntimeError( + f"[{self.name}] stdDAQ image resolution ({daq_img_w} , {daq_img_h}) does not match camera with ({cam_img_w} , {cam_img_h})" + ) + else: + logger.warning( + f"[{self.name}] stdDAQ image resolution ({daq_img_w} , {daq_img_h}) matches camera with ({cam_img_w} , {cam_img_h})" + ) file_path = self.file_path.get() - file_prefix = self.file_prefix.get() num_images = self.num_images.get() + file_prefix = self.file_prefix.get() + print(file_prefix) # New connection + self._wsclient = self.connect() message = { "command": "start", "path": file_path, "file_prefix": file_prefix, "n_image": num_images, } - self.message(message) + reply = self.message(message) - def is_running(*args, value, timestamp, **kwargs): - result = value not in ["idle", "unknown", "error"] - return result + if reply is not None: + reply = json.loads(reply) + self.runstatus.set(reply["status"], force=True).wait() + logger.info(f"[{self.name}] Start DAQ reply: {reply}") - status = SubscriptionStatus(self.runstatus, is_running, timeout=3, settle_time=0.5) - status.wait() + # Give it more time to reconfigure + if reply["status"] in ("rejected"): + # FIXME: running exposure is a nogo + if reply["reason"] == "driver is busy!": + raise RuntimeError( + f"[{self.name}] Start stdDAQ command rejected: already running" + ) + else: + # Give it more time to consolidate + sleep(1) + else: + # Success!!! + print(f"[{self.name}] Started stdDAQ in: {reply['status']}") + return - logger.warning(f"[{self.name}] Started stdDAQ in: {self.runstatus.get()}") - return status + raise RuntimeError( + f"[{self.name}] Failed to start the stdDAQ in 1 tries, reason: {reply['reason']}" + ) - def disarm(self): + def blueunstage(self): """Unstages the stdDAQ Opens a new connection to the stdDAQ, sends the stop command and waits for the idle state. """ - # Stop the DAQ (will close connection) - reply is always "success" - self.message({"command": "stop"}) + ii = 0 + while ii < 10: + # Stop the DAQ (will close connection) - reply is always "success" + self._wsclient = self.connect() + self.message({"command": "stop_all"}, wait_reply=False) - def is_running(*args, value, timestamp, **kwargs): - result = value in ["idle", "unknown", "error"] - return result + # Let it consolidate + sleep(0.2) - status = SubscriptionStatus(self.runstatus, is_running, timeout=3, settle_time=0.5) - status.wait() + # Check final status (from new connection) + self._wsclient = self.connect() + reply = self.message({"command": "status"}) + if reply is not None: + logger.info(f"[{self.name}] DAQ status reply: {reply}") + reply = json.loads(reply) - logger.warning(f"[{self.name}] Stopped stdDAQ in: {self.runstatus.get()}") - return status + if reply["status"] in ("idle", "error"): + # Only 'idle' state accepted + print(f"DAQ stopped on try {ii}") + return + elif reply["status"] in ("stop"): + # Give it more time to stop + sleep(0.5) + elif ii >= 6: + raise RuntimeError(f"Failed to stop StdDAQ: {reply}") + ii += 1 + raise RuntimeError(f"Failed to stop StdDAQ in time") ########################################################################## # Bluesky flyer interface def complete(self) -> SubscriptionStatus: """Wait for current run. Must end in status 'file_saved'.""" + def is_running(*args, value, timestamp, **kwargs): result = value in ["idle", "file_saved", "error"] return result @@ -377,25 +430,6 @@ class StdDaqClient(PSIDeviceBase): status = SubscriptionStatus(self.runstatus, is_running, settle_time=0.5) return status - def validate(self): - """Validate camera state - - Ensure that data source shape matches with the shape expected by the stdDAQ. - """ - # Must make sure that image size matches the data source - source = self.datasource.get() - if source is not None and len(source) > 0: - if source == "gfcam": - cam_img_w = self.device_manager.devices[source].cfgRoiX.get() - cam_img_h = self.device_manager.devices[source].cfgRoiY.get() - daq_img_w = self.cfg_pixel_width.get() - daq_img_h = self.cfg_pixel_height.get() - - if not (daq_img_w == cam_img_w and daq_img_h == cam_img_h): - raise RuntimeError( - f"[{self.name}] stdDAQ image resolution ({daq_img_w} , {daq_img_h}) does not match camera with ({cam_img_w} , {cam_img_h})" - ) - def get_daq_config(self, update=False) -> dict: """Read the current configuration from the DAQ""" r = requests.get(self.rest_url.get() + "/api/config/get", params={"user": "ioc"}, timeout=2) @@ -458,6 +492,17 @@ class StdDaqClient(PSIDeviceBase): self.set_daq_config(cfg) sleep(restarttime) + def state(self) -> str | None: + """Querry the current system status""" + try: + wsclient = self.connect() + wsclient.send(json.dumps({"command": "status"})) + r = wsclient.recv(timeout=1) + r = json.loads(r) + return r["status"] + except ConnectionRefusedError: + raise + # Automatically connect to microXAS testbench if directly invoked if __name__ == "__main__": diff --git a/tomcat_bec/devices/gigafrost/stddaq_preview.py b/tomcat_bec/devices/gigafrost/stddaq_preview.py index f2e23a6..3a08126 100644 --- a/tomcat_bec/devices/gigafrost/stddaq_preview.py +++ b/tomcat_bec/devices/gigafrost/stddaq_preview.py @@ -19,9 +19,15 @@ from ophyd_devices.interfaces.base_classes.psi_detector_base import ( ) from bec_lib import bec_logger - logger = bec_logger.logger -ZMQ_TOPIC_FILTER = b"" +ZMQ_TOPIC_FILTER = b'' + + +class StdDaqPreviewState(enum.IntEnum): + """Standard DAQ ophyd device states""" + UNKNOWN = 0 + DETACHED = 1 + MONITORING = 2 class StdDaqPreviewMixin(CustomDetectorMixin): @@ -29,19 +35,94 @@ class StdDaqPreviewMixin(CustomDetectorMixin): Parent class: CustomDetectorMixin """ + _mon = None def on_stage(self): """Start listening for preview data stream""" - self.parent.arm() + if self._mon is not None: + self.parent.unstage() + sleep(0.5) + + self.parent.connect() + self._stop_polling = False + self._mon = Thread(target=self.poll, daemon=True) + self._mon.start() def on_unstage(self): """Stop a running preview""" - self.parent.disarm() + if self._mon is not None: + self._stop_polling = True + # Might hang on recv_multipart + self._mon.join(timeout=1) + # So also disconnect the socket + self.parent._socket.disconnect(self.parent.url.get()) def on_stop(self): """Stop a running preview""" self.on_unstage() + def poll(self): + """Collect streamed updates""" + self.parent.status.set(StdDaqPreviewState.MONITORING, force=True) + try: + t_last = time() + while True: + try: + # Exit loop and finish monitoring + if self._stop_polling: + logger.info(f"[{self.parent.name}]\tDetaching monitor") + break + + # pylint: disable=no-member + r = self.parent._socket.recv_multipart(flags=zmq.NOBLOCK) + + # Length and throtling checks + if len(r) != 2: + logger.warning( + f"[{self.parent.name}] Received malformed array of length {len(r)}") + t_curr = time() + t_elapsed = t_curr - t_last + if t_elapsed < self.parent.throttle.get(): + sleep(0.1) + continue + + # Unpack the Array V1 reply to metadata and array data + meta, data = r + + # Update image and update subscribers + header = json.loads(meta) + if header["type"] == "uint16": + image = np.frombuffer(data, dtype=np.uint16) + if image.size != np.prod(header['shape']): + err = f"Unexpected array size of {image.size} for header: {header}" + raise ValueError(err) + image = image.reshape(header['shape']) + + # Update image and update subscribers + self.parent.frame.put(header['frame'], force=True) + self.parent.image_shape.put(header['shape'], force=True) + self.parent.image.put(image, force=True) + self.parent._last_image = image + self.parent._run_subs(sub_type=self.parent.SUB_MONITOR, value=image) + t_last = t_curr + logger.info( + f"[{self.parent.name}] Updated frame {header['frame']}\t" + f"Shape: {header['shape']}\tMean: {np.mean(image):.3f}" + ) + except ValueError: + # Happens when ZMQ partially delivers the multipart message + pass + except zmq.error.Again: + # Happens when receive queue is empty + sleep(0.1) + except Exception as ex: + logger.info(f"[{self.parent.name}]\t{str(ex)}") + raise + finally: + self._mon = None + self.parent.status.set(StdDaqPreviewState.DETACHED, force=True) + logger.info(f"[{self.parent.name}]\tDetaching monitor") + class StdDaqPreviewDetector(PSIDetectorBase): """Detector wrapper class around the StdDaq preview image stream. @@ -53,9 +134,8 @@ class StdDaqPreviewDetector(PSIDetectorBase): You can add a preview widget to the dock by: cam_widget = gui.add_dock('cam_dock1').add_widget('BECFigure').image('daq_stream1') """ - # Subscriptions for plotting image - USER_ACCESS = ["arm", "disarm", "get_last_image"] + USER_ACCESS = ["kickoff", "get_last_image"] SUB_MONITOR = "device_monitor_2d" _default_sub = SUB_MONITOR @@ -64,19 +144,19 @@ class StdDaqPreviewDetector(PSIDetectorBase): # Status attributes url = Component(Signal, kind=Kind.config) throttle = Component(Signal, value=0.25, kind=Kind.config) + status = Component(Signal, value=StdDaqPreviewState.UNKNOWN, kind=Kind.omitted) frame = Component(Signal, kind=Kind.hinted) image_shape = Component(Signal, kind=Kind.normal) # FIXME: The BEC client caches the read()s from the last 50 scans image = Component(Signal, kind=Kind.omitted) _last_image = None - _stop_polling = True - _mon = None def __init__( self, *args, url: str = "tcp://129.129.95.38:20000", parent: Device = None, **kwargs ) -> None: super().__init__(*args, parent=parent, **kwargs) self.url._metadata["write_access"] = False + self.status._metadata["write_access"] = False self.image._metadata["write_access"] = False self.frame._metadata["write_access"] = False self.image_shape._metadata["write_access"] = False @@ -105,88 +185,9 @@ class StdDaqPreviewDetector(PSIDetectorBase): def get_image(self): return self._last_image - def arm(self): - """Start listening for preview data stream""" - if self._mon is not None: - self.unstage() - sleep(0.5) - - self.connect() - self._stop_polling = False - self._mon = Thread(target=self.poll, daemon=True) - self._mon.start() - - def disarm(self): - """Stop a running preview""" - if self._mon is not None: - self._stop_polling = True - # Might hang on recv_multipart - self._mon.join(timeout=1) - # So also disconnect the socket (if not already disconnected) - try: - self._socket.disconnect(self.url.get()) - except zmq.error.ZMQError: - pass - - def poll(self): - """Collect streamed updates""" - try: - t_last = time() - while True: - try: - # Exit loop and finish monitoring - if self._stop_polling: - break - - # pylint: disable=no-member - r = self._socket.recv_multipart(flags=zmq.NOBLOCK) - - # Length and throtling checks - if len(r) != 2: - logger.warning(f"[{self.name}] Received malformed array of length {len(r)}") - t_curr = time() - t_elapsed = t_curr - t_last - if t_elapsed < self.throttle.get(): - sleep(0.1) - continue - - # Unpack the Array V1 reply to metadata and array data - meta, data = r - - # Update image and update subscribers - header = json.loads(meta) - image = None - if header["type"] == "uint16": - image = np.frombuffer(data, dtype=np.uint16) - - if image.size != np.prod(header["shape"]): - err = f"Unexpected array size of {image.size} for header: {header}" - raise ValueError(err) - image = image.reshape(header["shape"]) - - # Update image and update subscribers - self.frame.put(header["frame"], force=True) - self.image_shape.put(header["shape"], force=True) - self.image.put(image, force=True) - self._last_image = image - self._run_subs(sub_type=self.SUB_MONITOR, value=image) - t_last = t_curr - logger.info( - f"[{self.name}] Updated frame {header['frame']}\t" - f"Shape: {header['shape']}\tMean: {np.mean(image):.3f}" - ) - except ValueError: - # Happens when ZMQ partially delivers the multipart message - pass - except zmq.error.Again: - # Happens when receive queue is empty - sleep(0.1) - except Exception as ex: - logger.info(f"[{self.name}]\t{str(ex)}") - raise - finally: - self._mon = None - logger.info(f"[{self.name}]\tDetaching monitor") + def kickoff(self) -> DeviceStatus: + """ The DAQ was not meant to be toggled""" + return DeviceStatus(self, done=True, success=True, settle_time=0.1) # Automatically connect to MicroSAXS testbench if directly invoked