From e02bb5892eb06cfcb664b40eab07390a54b8d81a Mon Sep 17 00:00:00 2001 From: gac-x05la Date: Mon, 3 Mar 2025 12:20:56 +0100 Subject: [PATCH] WIP --- .../device_configs/microxas_test_bed.yaml | 32 +-- .../aerotech/AerotechDriveDataCollection.py | 14 +- tomcat_bec/devices/aerotech/AerotechTasks.py | 34 ++- .../devices/gigafrost/gigafrostcamera.py | 41 ++-- tomcat_bec/devices/gigafrost/pcoedgecamera.py | 14 +- tomcat_bec/devices/gigafrost/stddaq_client.py | 196 ++++++------------ .../devices/gigafrost/stddaq_preview.py | 24 +-- tomcat_bec/scans/advanced_scans.py | 131 +++++++----- 8 files changed, 231 insertions(+), 255 deletions(-) diff --git a/tomcat_bec/device_configs/microxas_test_bed.yaml b/tomcat_bec/device_configs/microxas_test_bed.yaml index 4087d59..613b127 100644 --- a/tomcat_bec/device_configs/microxas_test_bed.yaml +++ b/tomcat_bec/device_configs/microxas_test_bed.yaml @@ -196,22 +196,22 @@ pcocam: readoutPriority: monitored softwareTrigger: true -pcodaq: - description: GigaFrost stdDAQ client - deviceClass: tomcat_bec.devices.StdDaqClient - deviceConfig: - ws_url: 'ws://129.129.95.111:8081' - rest_url: 'http://129.129.95.111:5010' - data_source_name: 'pcocam' - deviceTags: - - std-daq - - daq - - pcocam - enabled: true - onFailure: buffer - readOnly: false - readoutPriority: monitored - softwareTrigger: false +# pcodaq: +# description: GigaFrost stdDAQ client +# deviceClass: tomcat_bec.devices.StdDaqClient +# deviceConfig: +# ws_url: 'ws://129.129.95.111:8081' +# rest_url: 'http://129.129.95.111:5010' +# data_source_name: 'pcocam' +# deviceTags: +# - std-daq +# - daq +# - pcocam +# enabled: false +# onFailure: buffer +# readOnly: false +# readoutPriority: monitored +# softwareTrigger: false pco_stream0: description: stdDAQ preview (2 every 555) diff --git a/tomcat_bec/devices/aerotech/AerotechDriveDataCollection.py b/tomcat_bec/devices/aerotech/AerotechDriveDataCollection.py index d8087df..aa5d136 100644 --- a/tomcat_bec/devices/aerotech/AerotechDriveDataCollection.py +++ b/tomcat_bec/devices/aerotech/AerotechDriveDataCollection.py @@ -40,7 +40,7 @@ class AerotechDriveDataCollectionMixin(CustomDeviceMixin): d["ddc_source0"] = scanargs["ddc_source0"] if "ddc_source1" in scanargs: d["ddc_source1"] = scanargs["ddc_source1"] - + # Number of points if "ddc_num_points" in scanargs and scanargs["ddc_num_points"] is not None: d["num_points_total"] = scanargs["ddc_num_points"] elif "daq_num_points" in scanargs and scanargs["daq_num_points"] is not None: @@ -67,12 +67,12 @@ class AerotechDriveDataCollectionMixin(CustomDeviceMixin): self.parent.configure(d=d) # Stage the data collection if not in internally launced mode - # NOTE: Scripted scans start acquiring from the scrits - self.parent.bluestage() + # NOTE: Scripted scans might configure and start acquiring from the scrits + self.parent.arm() def on_unstage(self): """Standard bluesky unstage""" - self.parent._switch.set("Stop", settle_time=0.2).wait() + self.parent.disarm() class aa1AxisDriveDataCollection(PSIDeviceBase): @@ -118,7 +118,7 @@ class aa1AxisDriveDataCollection(PSIDeviceBase): _buffer1 = Component(EpicsSignalRO, "BUFFER1", auto_monitor=True, kind=Kind.normal) custom_prepare_cls = AerotechDriveDataCollectionMixin - USER_ACCESS = ["configure", "reset", "collect"] + USER_ACCESS = ["configure", "reset", "collect", "arm", "disarm"] def configure(self, d: dict = None) -> tuple: """Configure data capture @@ -145,12 +145,12 @@ class aa1AxisDriveDataCollection(PSIDeviceBase): new = self.read_configuration() return (old, new) - def bluestage(self) -> None: + def arm(self) -> None: """Bluesky-style stage""" self._switch.set("ResetRB", settle_time=0.1).wait() self._switch.set("Start", settle_time=0.2).wait() - def blueunstage(self): + def disarm(self): """Bluesky-style unstage""" self._switch.set("Stop", settle_time=0.2).wait() diff --git a/tomcat_bec/devices/aerotech/AerotechTasks.py b/tomcat_bec/devices/aerotech/AerotechTasks.py index da8c5d7..62ae0c0 100644 --- a/tomcat_bec/devices/aerotech/AerotechTasks.py +++ b/tomcat_bec/devices/aerotech/AerotechTasks.py @@ -33,7 +33,20 @@ class AerotechTasksMixin(CustomDeviceMixin): PSO is not needed or when it'd conflict with other devices. """ # Fish out our configuration from scaninfo (via explicit or generic addressing) - d = self.parent.scan_info.scan_msg.scan_parameters.get("aerotech_config") + d = {} + if "kwargs" in self.parent.scaninfo.scan_msg.info: + scanargs = self.parent.scaninfo.scan_msg.info["kwargs"] + if self.parent.scaninfo.scan_type in ("script", "scripted"): + # NOTE: Scans don't have to fully configure the device + if "script_text" in scanargs and scanargs["script_text"] is not None: + d["script_text"] = scanargs["script_text"] + if "script_file" in scanargs and scanargs["script_file"] is not None: + d["script_file"] = scanargs["script_file"] + if "script_task" in scanargs and scanargs["script_task"] is not None: + d["script_task"] = scanargs["script_task"] + + # FIXME: The above should be exchanged with: + # d = self.parent.scan_info.scan_msg.scan_parameters.get("aerotech_config") # Perform bluesky-style configuration if d: @@ -41,19 +54,19 @@ class AerotechTasksMixin(CustomDeviceMixin): self.parent.configure(d=d) # The actual staging - self.parent.bluestage() + self.parent.arm() def on_unstage(self): """Stop the currently selected task""" - self.parent.blueunstage() + self.parent.disarm() def on_stop(self): """Stop the currently selected task""" - self.parent.switch.set("Stop").wait() + self.on_unstage() def on_kickoff(self): """Start execution of the selected task""" - self.parent.bluekickoff() + self.parent.launch() class aa1Tasks(PSIDeviceBase): @@ -86,7 +99,7 @@ class aa1Tasks(PSIDeviceBase): ''' """ - + USER_ACCESS = ["arm", "disarm", "launch", "kickoff"] custom_prepare_cls = AerotechTasksMixin _failure = Component(EpicsSignalRO, "FAILURE", auto_monitor=True, kind=Kind.normal) @@ -100,7 +113,6 @@ class aa1Tasks(PSIDeviceBase): _executeReply = Component(EpicsSignalRO, "EXECUTE_RBV", string=True, auto_monitor=True) fileName = Component(EpicsSignal, "FILENAME", string=True, kind=Kind.omitted, put_complete=True) - # _fileRead = Component(EpicsPassiveRO, "FILEREAD", string=True, kind=Kind.omitted) _fileWrite = Component( EpicsSignal, "FILEWRITE", string=True, kind=Kind.omitted, put_complete=True ) @@ -129,7 +141,7 @@ class aa1Tasks(PSIDeviceBase): new = self.read_configuration() return (old, new) - def bluestage(self) -> None: + def arm(self) -> None: """Bluesky style stage, prepare, but does not execute""" if self.taskIndex.get() in (0, 1, 2): logger.error(f"[{self.name}] Loading AeroScript on system task. Daring today are we?") @@ -140,11 +152,11 @@ class aa1Tasks(PSIDeviceBase): raise RuntimeError("Failed to load task, please check the Aerotech IOC") return status - def blueunstage(self): + def disarm(self): """Bluesky style unstage, stops execution""" self.switch.set("Stop").wait() - def bluekickoff(self): + def launch(self): """Bluesky style kickoff""" # Launch and check success status = self.switch.set("Start", settle_time=0.2) @@ -159,7 +171,7 @@ class aa1Tasks(PSIDeviceBase): ########################################################################## # Bluesky flyer interface - def complete(self) -> DeviceStatus: + def complete(self) -> SubscriptionStatus: """Wait for a RUNNING task""" timestamp_ = 0 task_idx = int(self.taskIndex.get()) diff --git a/tomcat_bec/devices/gigafrost/gigafrostcamera.py b/tomcat_bec/devices/gigafrost/gigafrostcamera.py index 0e86170..778f678 100644 --- a/tomcat_bec/devices/gigafrost/gigafrostcamera.py +++ b/tomcat_bec/devices/gigafrost/gigafrostcamera.py @@ -194,7 +194,7 @@ class GigaFrostCameraMixin(CustomDetectorMixin): if self.parent.infoSyncFlag.value == 0: self.parent.cmdSyncHw.set(1).wait() # Switch to acquiring - self.parent.bluestage() + self.parent.arm() def on_unstage(self) -> None: """Specify actions to be executed during unstage. @@ -204,9 +204,7 @@ class GigaFrostCameraMixin(CustomDetectorMixin): with flagged done to BEC. """ # Switch to idle - self.parent.cmdStartCamera.set(0).wait() - if self.parent.autoSoftEnable.get(): - self.parent.cmdSoftEnable.set(0).wait() + self.parent.disarm() def on_stop(self) -> None: """ @@ -440,7 +438,7 @@ class GigaFrostCamera(PSIDetectorBase): cfgInputPolarity2 = Component(EpicsSignalRO, "BNC5_RBV", auto_monitor=True, kind=Kind.config) infoBoardTemp = Component(EpicsSignalRO, "T_BOARD", auto_monitor=True) - USER_ACCESS = ["exposure_mode", "fix_nframes_mode", "trigger_mode", "enable_mode", "initialize"] + USER_ACCESS = ["exposure_mode", "fix_nframes_mode", "trigger_mode", "enable_mode", "initialize", "arm", "disarm"] autoSoftEnable = Component(Signal, kind=Kind.config) backendUrl = Component(Signal, kind=Kind.config) @@ -572,11 +570,18 @@ class GigaFrostCamera(PSIDetectorBase): # Commit parameters self.cmdSetParam.set(1).wait() - def bluestage(self): + def arm(self): """Bluesky style stage""" # Switch to acquiring self.cmdStartCamera.set(1).wait() + def disarm(self): + """ Bluesky style unstage""" + # Switch to idle + self.cmdStartCamera.set(0).wait() + if self.autoSoftEnable.get(): + self.cmdSoftEnable.set(0).wait() + def set_acquisition_mode(self, acq_mode): """Set acquisition mode @@ -589,18 +594,17 @@ class GigaFrostCamera(PSIDetectorBase): if acq_mode == "default": # NOTE: Trigger using software events via softEnable (actually works) + # Switch to physical enable signal + self.cfgEnableScheme.set(0).wait() # Trigger parameters self.cfgCntStartBit.set(1).wait() self.cfgCntEndBit.set(0).wait() - - # Switch to physical enable signal - self.cfgEnableScheme.set(0).wait() - # Set modes # self.cmdSoftEnable.set(0).wait() self.enable_mode = "soft" self.trigger_mode = "auto" self.exposure_mode = "timer" + elif acq_mode in ["ext_enable", "external_enable"]: # NOTE: Trigger using external hardware events via enable input (actually works) # Switch to physical enable signal @@ -612,10 +616,14 @@ class GigaFrostCamera(PSIDetectorBase): self.enable_mode = "external" self.trigger_mode = "auto" self.exposure_mode = "timer" + elif acq_mode == "soft": # NOTE: Fede's configuration for continous streaming # Switch to physical enable signal self.cfgEnableScheme.set(0).wait() + # Set trigger edge to fixed frames on posedge + self.cfgCntStartBit.set(1).wait() + self.cfgCntEndBit.set(0).wait() # Set enable signal to always self.cfgEnableExt.set(0).wait() self.cfgEnableSoft.set(1).wait() @@ -626,16 +634,18 @@ class GigaFrostCamera(PSIDetectorBase): self.cfgTrigTimer.set(1).wait() self.cfgTrigAuto.set(0).wait() # Set exposure mode to timer + # self.exposure_mode = "timer" self.cfgExpExt.set(0).wait() self.cfgExpSoft.set(0).wait() self.cfgExpTimer.set(1).wait() - # Set trigger edge to fixed frames on posedge - self.cfgCntStartBit.set(1).wait() - self.cfgCntEndBit.set(0).wait() + elif acq_mode in ["ext", "external"]: # NOTE: Untested # Switch to physical enable signal self.cfgEnableScheme.set(0).wait() + # Set trigger edge to fixed frames on posedge + self.cfgCntStartBit.set(1).wait() + self.cfgCntEndBit.set(0).wait() # Set enable signal to always self.cfgEnableExt.set(0).wait() self.cfgEnableSoft.set(0).wait() @@ -646,12 +656,11 @@ class GigaFrostCamera(PSIDetectorBase): self.cfgTrigTimer.set(0).wait() self.cfgTrigAuto.set(0).wait() # Set exposure mode to timer + # self.exposure_mode = "timer" self.cfgExpExt.set(0).wait() self.cfgExpSoft.set(0).wait() self.cfgExpTimer.set(1).wait() - # Set trigger edge to fixed frames on posedge - self.cfgCntStartBit.set(1).wait() - self.cfgCntEndBit.set(0).wait() + else: raise RuntimeError(f"Unsupported acquisition mode: {acq_mode}") diff --git a/tomcat_bec/devices/gigafrost/pcoedgecamera.py b/tomcat_bec/devices/gigafrost/pcoedgecamera.py index d9e0f07..d3aa728 100644 --- a/tomcat_bec/devices/gigafrost/pcoedgecamera.py +++ b/tomcat_bec/devices/gigafrost/pcoedgecamera.py @@ -75,15 +75,15 @@ class PcoEdgeCameraMixin(CustomPrepare): self.parent.configure(d=d) # ARM the camera - self.parent.bluestage() + self.parent.arm() def on_unstage(self) -> None: """Disarm the PCO.Edge camera""" - self.parent.blueunstage() + self.parent.disarm() def on_stop(self) -> None: """Stop the PCO.Edge camera""" - self.parent.blueunstage() + self.parent.disarm() def on_trigger(self) -> None | DeviceStatus: """Trigger mode operation @@ -333,7 +333,7 @@ class HelgeCameraBase(BECDeviceBase): status = SubscriptionStatus(self.camSetParam, negedge, timeout=5, settle_time=0.5) status.wait() - def bluestage(self): + def arm(self): """Bluesky style stage: arm the detector""" logger.warning("Staging PCO") # Acquisition is only allowed when the IOC is not busy @@ -358,7 +358,7 @@ class HelgeCameraBase(BECDeviceBase): status = SubscriptionStatus(self.camStatusCode, is_running, timeout=5, settle_time=0.2) status.wait() - def blueunstage(self): + def disarm(self): """Bluesky style unstage: stop the detector""" self.camStatusCmd.set("Idle").wait() @@ -366,7 +366,7 @@ class HelgeCameraBase(BECDeviceBase): # FIXME: This might interrupt data transfer self.file_savestop.set(0).wait() - def bluekickoff(self): + def launch(self): """Start data transfer TODO: Need to revisit this once triggering is complete @@ -383,7 +383,7 @@ class PcoEdge5M(HelgeCameraBase): """ custom_prepare_cls = PcoEdgeCameraMixin - USER_ACCESS = ["bluestage", "blueunstage", "bluekickoff"] + USER_ACCESS = ["arm", "disarm", "launch"] # ######################################################################## # Additional status info diff --git a/tomcat_bec/devices/gigafrost/stddaq_client.py b/tomcat_bec/devices/gigafrost/stddaq_client.py index 82345d9..042cf48 100644 --- a/tomcat_bec/devices/gigafrost/stddaq_client.py +++ b/tomcat_bec/devices/gigafrost/stddaq_client.py @@ -28,8 +28,6 @@ logger = bec_logger.logger class StdDaqMixin(CustomDeviceMixin): # pylint: disable=protected-access - _mon = None - def on_stage(self) -> None: """Configuration and staging @@ -100,42 +98,20 @@ class StdDaqMixin(CustomDeviceMixin): sleep(0.5) # Try to start a new run (reconnects) - # if std_daq_params.get("reconnect", True): - self.parent.bluestage() + self.parent.arm() + - # And start status monitoring - self._mon = Thread(target=self.monitor, daemon=True) - self._mon.start() def on_unstage(self): """Stop a running acquisition and close connection""" print("Creating virtual dataset") self.parent.create_virtual_dataset() - self.parent.blueunstage() + self.parent.disarm() def on_stop(self): """Stop a running acquisition and close connection""" - self.parent.blueunstage() + self.parent.disarm() - def monitor(self) -> None: - """Monitor status messages while connection is open. This will block the reply monitoring - to calling unstage() might throw. Status updates are sent every 1 seconds, but finishing - acquisition means StdDAQ will close connection, so there's no idle state polling. - """ - try: - sleep(0.2) - for msg in self.parent._wsclient: - message = json.loads(msg) - self.parent.runstatus.put(message["status"], force=True) - # logger.info(f"[{self.parent.name}] Pushed status: {message['status']}") - except (ConnectionClosedError, ConnectionClosedOK, AssertionError): - # Libraty throws theese after connection is closed - return - except Exception as ex: - logger.warning(f"[{self.parent.name}] Exception in polling: {ex}") - return - finally: - self._mon = None class StdDaqClient(PSIDeviceBase): @@ -161,9 +137,8 @@ class StdDaqClient(PSIDeviceBase): "nuke", "connect", "message", - "state", - "bluestage", - "blueunstage", + "arm", + "disarm", "complete", ] _wsclient = None @@ -185,6 +160,7 @@ class StdDaqClient(PSIDeviceBase): cfg_pixel_height = Component(Signal, kind=Kind.config) cfg_pixel_width = Component(Signal, kind=Kind.config) cfg_nr_writers = Component(Signal, kind=Kind.config) + _mon = None def __init__( self, @@ -198,7 +174,7 @@ class StdDaqClient(PSIDeviceBase): device_manager=None, ws_url: str = "ws://localhost:8080", rest_url: str = "http://localhost:5000", - data_source_name=None, + data_source_name="", **kwargs, ) -> None: super().__init__( @@ -220,60 +196,59 @@ class StdDaqClient(PSIDeviceBase): self.get_daq_config(update=True) except Exception as ex: logger.error(f"Failed to connect to the stdDAQ REST API\n{ex}") + # Connect to websockets and start poller + try: + self.connect() + except Exception as ex: + logger.error(f"Failed to connect to the stdDAQ websocket interface\n{ex}") - def connect(self) -> ClientConnection: + def connect(self) -> None: """Connect to the StdDAQ's websockets interface StdDAQ may reject connection for a few seconds after restart, or when it wants so if it fails, wait a bit and try to connect again. """ - num_retry = 0 - while num_retry < 5: - try: - logger.debug(f"[{self.name}] Connecting to stdDAQ at {self.ws_url.get()}") - connection = connect(self.ws_url.get()) - logger.debug(f"[{self.name}] Connected to stdDAQ after {num_retry} tries") - return connection - except ConnectionRefusedError: - num_retry += 1 - sleep(2) - raise ConnectionRefusedError("The stdDAQ websocket interface refused connection 5 times.") + # Connect to stdDAQ + logger.debug(f"[{self.name}] Connecting to stdDAQ at {self.ws_url.get()}") + self._wsclient = connect(self.ws_url.get()) + # And start status monitoring + self._mon = Thread(target=self.monitor, daemon=True) + self._mon.start() - def message(self, message: dict, timeout=1, wait_reply=True, client=None) -> None | str: + def monitor(self) -> None: + """Monitor status messages while connection is open. Status updates are + sent every 1 seconds, or when there's a transition. + """ + try: + for msg in self._wsclient: + message = json.loads(msg) + self.runstatus.put(message["status"], force=True) + # logger.info(f"[{self.parent.name}] Pushed status: {message['status']}") + except Exception as ex: + logger.warning(f"[{self.name}] Exception in polling: {ex}") + return + finally: + self._mon = None + + def message(self, message: dict) -> None: """Send a message to the StdDAQ and receive a reply - - Note: finishing acquisition means StdDAQ will close connection, so - there's no idle state polling. """ # Prepare message msg = json.dumps(message) if isinstance(message, dict) else str(message) # Connect if client was destroyed if self._wsclient is None: - self._wsclient = self.connect() + self.connect() # Send message (reopen connection if needed) msg = json.dumps(message) if isinstance(message, dict) else str(message) try: self._wsclient.send(msg) - except (ConnectionClosedError, ConnectionClosedOK, AttributeError) as ex: + except (ConnectionClosedError, ConnectionClosedOK, AttributeError): # Re-connect if the connection was closed - self._wsclient = self.connect() + self.connect() self._wsclient.send(msg) - # Wait for reply - reply = None - if wait_reply: - try: - reply = self._wsclient.recv(timeout) - return reply - except (ConnectionClosedError, ConnectionClosedOK) as ex: - self._wsclient = None - logger.error(f"[{self.name}] WS connection was closed before reply: {ex}") - except (TimeoutError, RuntimeError) as ex: - logger.error(f"[{self.name}] Error in receiving ws reply: {ex}") - return reply - def configure(self, d: dict = None): """Configure the next scan with the stdDAQ @@ -323,8 +298,8 @@ class StdDaqClient(PSIDeviceBase): ): # Stop if current status is not idle - if self.state() != "idle": - logger.warning(f"[{self.name}] stdDAQ reconfiguration might corrupt files") + # if self.runstatus.get() != "idle": + # logger.warning(f"[{self.name}] stdDAQ reconfiguration might corrupt files") # Update retrieved config cfg["image_pixel_height"] = int(self.cfg_pixel_height.get()) @@ -335,7 +310,7 @@ class StdDaqClient(PSIDeviceBase): sleep(1) self.get_daq_config(update=True) - def bluestage(self): + def arm(self): """Stages the stdDAQ Opens a new connection to the stdDAQ, sends the start command with @@ -343,8 +318,8 @@ class StdDaqClient(PSIDeviceBase): it for obvious failures. """ # Can't stage into a running exposure - if self.state() != "idle": - raise RuntimeError(f"[{self.name}] stdDAQ can't stage from state: {self.state()}") + # if self.runstatus.get() != "idle": + # raise RuntimeError(f"[{self.name}] stdDAQ can't stage from state: {self.runstatus.get()}") # Ensure expected shape self.validate() @@ -354,83 +329,47 @@ class StdDaqClient(PSIDeviceBase): num_images = self.num_images.get() # New connection - self._wsclient = self.connect() message = { "command": "start", "path": file_path, "file_prefix": file_prefix, "n_image": num_images, } - reply = self.message(message) + self.message(message) - if reply is not None: - reply = json.loads(reply) - self.runstatus.set(reply["status"], force=True).wait() - logger.info(f"[{self.name}] Start DAQ reply: {reply}") + def is_running(*args, value, timestamp, **kwargs): + result = value not in ["idle", "unknown", "error"] + return result - # Give it more time to reconfigure - if reply["status"] in ("rejected"): - # FIXME: running exposure is a nogo - if reply["reason"] == "driver is busy!": - raise RuntimeError( - f"[{self.name}] Start stdDAQ command rejected: already running" - ) - else: - # Give it more time to consolidate - sleep(1) - else: - # Success!!! - print(f"[{self.name}] Started stdDAQ in: {reply['status']}") - return + status = SubscriptionStatus(self.runstatus, is_running, timeout=3, settle_time=0.5) + status.wait() - raise RuntimeError(f"[{self.name}] Failed to start the stdDAQ, reason: {reply['reason']}") + logger.warning(f"[{self.name}] Started stdDAQ in: {self.runstatus.get()}") + return status - def blueunstage(self): + def disarm(self): """Unstages the stdDAQ Opens a new connection to the stdDAQ, sends the stop command and waits for the idle state. """ - ii = 0 - while ii < 10: - # Stop the DAQ (will close connection) - reply is always "success" - self._wsclient = self.connect() - self.message({"command": "stop_all"}, wait_reply=False) + # Stop the DAQ (will close connection) - reply is always "success" + self.message({"command": "stop"}) - # Let it consolidate - sleep(0.2) + def is_running(*args, value, timestamp, **kwargs): + result = value in ["idle", "unknown", "error"] + return result - # Check final status (from new connection) - self._wsclient = self.connect() - reply = self.message({"command": "status"}) - if reply is not None: - logger.info(f"[{self.name}] DAQ status reply: {reply}") - reply = json.loads(reply) + status = SubscriptionStatus(self.runstatus, is_running, timeout=3, settle_time=0.5) + status.wait() - if reply["status"] in ("idle", "error"): - # Only 'idle' state accepted - print(f"DAQ stopped on try {ii}") - return - elif reply["status"] in ("stop"): - # Give it more time to stop - sleep(0.5) - elif ii >= 6: - raise RuntimeError(f"Failed to stop StdDAQ: {reply}") - ii += 1 - raise RuntimeError(f"Failed to stop StdDAQ in time") + logger.warning(f"[{self.name}] Stopped stdDAQ in: {self.runstatus.get()}") + return status ########################################################################## # Bluesky flyer interface def complete(self) -> SubscriptionStatus: """Wait for current run. Must end in status 'file_saved'.""" - - # Return immediately if we're detached - # TODO: Maybe check for connection (not sure if it's better) - if self.state() in ["idle", "file_saved", "error"]: - status = Status(self) - status.set_finished() - return status - def is_running(*args, value, timestamp, **kwargs): result = value in ["idle", "file_saved", "error"] return result @@ -519,17 +458,6 @@ class StdDaqClient(PSIDeviceBase): self.set_daq_config(cfg) sleep(restarttime) - def state(self) -> str | None: - """Querry the current system status""" - try: - wsclient = self.connect() - wsclient.send(json.dumps({"command": "status"})) - r = wsclient.recv(timeout=1) - r = json.loads(r) - return r["status"] - except ConnectionRefusedError: - raise - # Automatically connect to microXAS testbench if directly invoked if __name__ == "__main__": diff --git a/tomcat_bec/devices/gigafrost/stddaq_preview.py b/tomcat_bec/devices/gigafrost/stddaq_preview.py index 29e16f5..f2e23a6 100644 --- a/tomcat_bec/devices/gigafrost/stddaq_preview.py +++ b/tomcat_bec/devices/gigafrost/stddaq_preview.py @@ -19,8 +19,9 @@ from ophyd_devices.interfaces.base_classes.psi_detector_base import ( ) from bec_lib import bec_logger + logger = bec_logger.logger -ZMQ_TOPIC_FILTER = b'' +ZMQ_TOPIC_FILTER = b"" class StdDaqPreviewMixin(CustomDetectorMixin): @@ -28,6 +29,7 @@ class StdDaqPreviewMixin(CustomDetectorMixin): Parent class: CustomDetectorMixin """ + def on_stage(self): """Start listening for preview data stream""" self.parent.arm() @@ -40,6 +42,7 @@ class StdDaqPreviewMixin(CustomDetectorMixin): """Stop a running preview""" self.on_unstage() + class StdDaqPreviewDetector(PSIDetectorBase): """Detector wrapper class around the StdDaq preview image stream. @@ -50,6 +53,7 @@ class StdDaqPreviewDetector(PSIDetectorBase): You can add a preview widget to the dock by: cam_widget = gui.add_dock('cam_dock1').add_widget('BECFigure').image('daq_stream1') """ + # Subscriptions for plotting image USER_ACCESS = ["arm", "disarm", "get_last_image"] SUB_MONITOR = "device_monitor_2d" @@ -112,8 +116,6 @@ class StdDaqPreviewDetector(PSIDetectorBase): self._mon = Thread(target=self.poll, daemon=True) self._mon.start() - - def disarm(self): """Stop a running preview""" if self._mon is not None: @@ -126,7 +128,6 @@ class StdDaqPreviewDetector(PSIDetectorBase): except zmq.error.ZMQError: pass - def poll(self): """Collect streamed updates""" try: @@ -142,8 +143,7 @@ class StdDaqPreviewDetector(PSIDetectorBase): # Length and throtling checks if len(r) != 2: - logger.warning( - f"[{self.name}] Received malformed array of length {len(r)}") + logger.warning(f"[{self.name}] Received malformed array of length {len(r)}") t_curr = time() t_elapsed = t_curr - t_last if t_elapsed < self.throttle.get(): @@ -158,15 +158,15 @@ class StdDaqPreviewDetector(PSIDetectorBase): image = None if header["type"] == "uint16": image = np.frombuffer(data, dtype=np.uint16) - - if image.size != np.prod(header['shape']): + + if image.size != np.prod(header["shape"]): err = f"Unexpected array size of {image.size} for header: {header}" raise ValueError(err) - image = image.reshape(header['shape']) + image = image.reshape(header["shape"]) # Update image and update subscribers - self.frame.put(header['frame'], force=True) - self.image_shape.put(header['shape'], force=True) + self.frame.put(header["frame"], force=True) + self.image_shape.put(header["shape"], force=True) self.image.put(image, force=True) self._last_image = image self._run_subs(sub_type=self.SUB_MONITOR, value=image) @@ -174,7 +174,7 @@ class StdDaqPreviewDetector(PSIDetectorBase): logger.info( f"[{self.name}] Updated frame {header['frame']}\t" f"Shape: {header['shape']}\tMean: {np.mean(image):.3f}" - ) + ) except ValueError: # Happens when ZMQ partially delivers the multipart message pass diff --git a/tomcat_bec/scans/advanced_scans.py b/tomcat_bec/scans/advanced_scans.py index 8bb0808..2d9442f 100644 --- a/tomcat_bec/scans/advanced_scans.py +++ b/tomcat_bec/scans/advanced_scans.py @@ -10,7 +10,8 @@ logger = bec_logger.logger class Shutter: - """ Shutter status """ + """Shutter status""" + CLOSED = 0 OPEN = 1 @@ -25,34 +26,46 @@ class AcquireDarkV2(Acquire): Acquire dark images. This scan is used to acquire dark images. Dark images are images taken with the shutter closed and no beam on the sample. Dark images are used to correct the data images for dark current. - NOTE: this scan has a special operation mode that does not call + NOTE: this scan has a special operation mode that does not call Args: exp_burst : int Number of dark images to acquire (no default) file_prefix : str - File prefix + File prefix Examples: >>> scans.acquire_dark(5) """ super().__init__(exp_burst=exp_burst, file_prefix="", **kwargs) - self.burst_at_each_point = 1 # At each point, how many times I want to individually trigger + self.burst_at_each_point = 1 # At each point, how many times I want to individually trigger self.exp_burst = exp_burst self.file_prefix = file_prefix def pre_scan(self): - """ Close the shutter before scan""" + """Close the shutter before scan""" yield from self.stubs.set(device=["eyex"], value=[Shutter.CLOSED]) return super().pre_scan() def direct(self): - """ Direct scan procedure call""" + """Direct scan procedure call""" # Collect relevant devices - self.cams = [cam.name for cam in self.device_manager.devices.get_devices_with_tags("camera") if cam.enabled] - self.prev = [pre.name for pre in self.device_manager.devices.get_devices_with_tags("preview") if pre.enabled] - self.daqs = [daq.name for daq in self.device_manager.devices.get_devices_with_tags("daq") if daq.enabled] + self.cams = [ + cam.name + for cam in self.device_manager.devices.get_devices_with_tags("camera") + if cam.enabled + ] + self.prev = [ + pre.name + for pre in self.device_manager.devices.get_devices_with_tags("preview") + if pre.enabled + ] + self.daqs = [ + daq.name + for daq in self.device_manager.devices.get_devices_with_tags("daq") + if daq.enabled + ] # Do not call stage, as there's no ScanInfo emitted for direct call for daq in self.daqs: @@ -60,12 +73,14 @@ class AcquireDarkV2(Acquire): prefix = f"{self.file_prefix}_{cam}_dark" yield from self.stubs.send_rpc_and_wait(daq, "file_prefix.set", prefix) yield from self.stubs.send_rpc_and_wait(daq, "num_images.set", self.exp_burst) - yield from self.stubs.send_rpc_and_wait(daq, "bluestage") + yield from self.stubs.send_rpc_and_wait(daq, "arm") for prev in self.prev: yield from self.stubs.send_rpc_and_wait(prev, "arm") for cam in self.cams: - yield from self.stubs.send_rpc_and_wait(cam, "configure", {'exposure_num_burst': self.exp_burst}) - yield from self.stubs.send_rpc_and_wait(cam, "bluestage") + yield from self.stubs.send_rpc_and_wait( + cam, "configure", {"exposure_num_burst": self.exp_burst} + ) + yield from self.stubs.send_rpc_and_wait(cam, "arm") yield from self.pre_scan() yield from self.scan_core() @@ -78,11 +93,19 @@ class AcquireWhiteV2(Acquire): scan_name = "acquire_white_v2" gui_config = {"Acquisition parameters": ["exp_burst"]} - def __init__(self, motor: DeviceBase, exp_burst: int, sample_position_out: float, sample_angle_out: float, file_prefix: str="", **kwargs): + def __init__( + self, + motor: DeviceBase, + exp_burst: int, + sample_position_out: float, + sample_angle_out: float, + file_prefix: str = "", + **kwargs, + ): """ - Acquire flat field images. This scan is used to acquire flat field images. The flat field image is an image taken - with the shutter open but the sample out of the beam. Flat field images are used to correct the data images for - non-uniformity in the detector. + Acquire flat field images. This scan is used to acquire flat field images. The flat field + image is an image taken with the shutter open but the sample out of the beam. Flat field + images are used to correct the data images for non-uniformity in the detector. Args: motor : DeviceBase @@ -90,7 +113,7 @@ class AcquireWhiteV2(Acquire): exp_burst : int Number of flat field images to acquire (no default) sample_position_out : float - Position to move the sample stage to position the sample out of beam and take flat field images + Position to move the sample stage out of beam and take flat field images sample_angle_out : float Angular position where to take the flat field images @@ -108,7 +131,7 @@ class AcquireWhiteV2(Acquire): self.out_position = [sample_position_out, sample_angle_out] def pre_scan(self): - """ Open the shutter before scan""" + """Open the shutter before scan""" # Move sample out yield from self._move_scan_motors_and_wait(self.out_position) # Open the main shutter (TODO change to the correct shutter device) @@ -117,17 +140,29 @@ class AcquireWhiteV2(Acquire): return super().pre_scan() def cleanup(self): - """ Close the shutter after scan""" + """Close the shutter after scan""" # Close fast shutter yield from self.stubs.set(device=["eyex"], value=[Shutter.CLOSED]) return super().cleanup() def direct(self): - """ Direct scan procedure call""" + """Direct scan procedure call""" # Collect relevant devices - self.cams = [cam.name for cam in self.device_manager.devices.get_devices_with_tags("camera") if cam.enabled] - self.prev = [pre.name for pre in self.device_manager.devices.get_devices_with_tags("preview") if pre.enabled] - self.daqs = [daq.name for daq in self.device_manager.devices.get_devices_with_tags("daq") if daq.enabled] + self.cams = [ + cam.name + for cam in self.device_manager.devices.get_devices_with_tags("camera") + if cam.enabled + ] + self.prev = [ + pre.name + for pre in self.device_manager.devices.get_devices_with_tags("preview") + if pre.enabled + ] + self.daqs = [ + daq.name + for daq in self.device_manager.devices.get_devices_with_tags("daq") + if daq.enabled + ] # Do not call stage, as there's no ScanInfo emitted for direct call for daq in self.daqs: @@ -135,12 +170,14 @@ class AcquireWhiteV2(Acquire): prefix = f"{self.file_prefix}_{cam}_white" yield from self.stubs.send_rpc_and_wait(daq, "file_prefix.set", prefix) yield from self.stubs.send_rpc_and_wait(daq, "num_images.set", self.exp_burst) - yield from self.stubs.send_rpc_and_wait(daq, "bluestage") + yield from self.stubs.send_rpc_and_wait(daq, "arm") for prev in self.prev: yield from self.stubs.send_rpc_and_wait(prev, "arm") for cam in self.cams: - yield from self.stubs.send_rpc_and_wait(cam, "configure", {'exposure_num_burst': self.exp_burst}) - yield from self.stubs.send_rpc_and_wait(cam, "bluestage") + yield from self.stubs.send_rpc_and_wait( + cam, "configure", {"exposure_num_burst": self.exp_burst} + ) + yield from self.stubs.send_rpc_and_wait(cam, "arm") yield from self.pre_scan() yield from self.scan_core() @@ -157,7 +194,7 @@ class AcquireWhiteV2(Acquire): # "Camera": ["exp_time", "exp_burst"] # } -# def __init__(self, +# def __init__(self, # motor: DeviceBase, # exp_burst: int, # sample_position_in: float, @@ -166,7 +203,7 @@ class AcquireWhiteV2(Acquire): # exp_time:float, # **kwargs): # """ -# Acquire projection images. +# Acquire projection images. # Args: # motor : DeviceBase @@ -223,7 +260,7 @@ class AcquireWhiteV2(Acquire): # yield from self._set_position_offset() # def scan_core(self): - + # # move to in position and go to start angular position # yield from self.stubs.set(device=["eyez", self.motor], value=[self.sample_position_in, self.positions[0][0]]) @@ -238,10 +275,10 @@ class AcquireWhiteV2(Acquire): # self.connector.send_client_info( # "Starting the scan", show_asap=True, rid=self.metadata.get("RID") -# ) +# ) # yield from self.stubs.trigger() - + # while not flyer_request.done: # yield from self.stubs.read( @@ -267,13 +304,13 @@ class AcquireRefsV2(Acquire): sample_angle_out: float = 0, sample_position_in: float = 0, sample_position_out: float = 1, - file_prefix_dark: str = 'tmp_dark', - file_prefix_white: str = 'tmp_white', - **kwargs + file_prefix_dark: str = "tmp_dark", + file_prefix_white: str = "tmp_white", + **kwargs, ): """ Acquire reference images (darks + whites) and return to beam position. - + Reference images are acquired automatically in an optimized sequence and the sample is returned to the sample_in_position afterwards. @@ -291,16 +328,16 @@ class AcquireRefsV2(Acquire): sample_position_out : float ,optional Sample stage X position for sample out of the beam [um] exp_time : float, optional - Exposure time [ms]. If not specified, the currently configured value + Exposure time [ms]. If not specified, the currently configured value on the camera will be used exp_period : float, optional Exposure period [ms]. If not specified, the currently configured value on the camera will be used image_width : int, optional - ROI size in the x-direction [pixels]. If not specified, the currently + ROI size in the x-direction [pixels]. If not specified, the currently configured value on the camera will be used image_height : int, optional - ROI size in the y-direction [pixels]. If not specified, the currently + ROI size in the y-direction [pixels]. If not specified, the currently configured value on the camera will be used acq_mode : str, optional Predefined acquisition mode (default= 'default') @@ -334,15 +371,11 @@ class AcquireRefsV2(Acquire): if self.num_darks: msg = f"Acquiring {self.num_darks} dark images" logger.warning(msg) - self.connector.send_client_info( - msg, - show_asap=True, - rid=self.metadata.get("RID"), - ) + self.connector.send_client_info(msg, show_asap=True, rid=self.metadata.get("RID")) darks = AcquireDarkV2( exp_burst=self.num_darks, -# file_prefix=self.file_prefix_dark, + # file_prefix=self.file_prefix_dark, device_manager=self.device_manager, metadata=self.metadata, instruction_handler=self.stubs._instruction_handler, @@ -351,16 +384,11 @@ class AcquireRefsV2(Acquire): yield from darks.direct() self.point_id = darks.point_id - if self.num_flats: msg = f"Acquiring {self.num_flats} flat field images" logger.warning(msg) - self.connector.send_client_info( - msg, - show_asap=True, - rid=self.metadata.get("RID"), - ) + self.connector.send_client_info(msg, show_asap=True, rid=self.metadata.get("RID")) logger.warning("Calling AcquireWhite") flats = AcquireWhiteV2( @@ -373,11 +401,10 @@ class AcquireRefsV2(Acquire): instruction_handler=self.stubs._instruction_handler, **self.caller_kwargs, ) - + flats.point_id = self.point_id yield from flats.direct() self.point_id = flats.point_id ## TODO move sample in beam and do not wait ## TODO move rotation to angle and do not wait logger.warning("[AcquireRefsV2] Done with scan_core") -