From 43fe8e37bd7824f01f22909ee49434a866ad92f2 Mon Sep 17 00:00:00 2001 From: Mohacsi Istvan Date: Wed, 26 Jun 2024 13:53:45 +0200 Subject: [PATCH] GigaFrost works in soft trigger mode --- .../devices/gigafrost/gigafrostclient.py | 57 +++++- tomcat_bec/devices/gigafrost/stddaq_rest.py | 155 +++++++++++++++ tomcat_bec/devices/gigafrost/stddaq_ws.py | 179 ++++++++++++++++++ tomcat_bec/devices/stddaqclient.py | 131 ------------- 4 files changed, 387 insertions(+), 135 deletions(-) create mode 100644 tomcat_bec/devices/gigafrost/stddaq_rest.py create mode 100644 tomcat_bec/devices/gigafrost/stddaq_ws.py delete mode 100644 tomcat_bec/devices/stddaqclient.py diff --git a/tomcat_bec/devices/gigafrost/gigafrostclient.py b/tomcat_bec/devices/gigafrost/gigafrostclient.py index 67717d9..6bb3eea 100644 --- a/tomcat_bec/devices/gigafrost/gigafrostclient.py +++ b/tomcat_bec/devices/gigafrost/gigafrostclient.py @@ -1,6 +1,8 @@ from ophyd import Device, Component, EpicsMotor, EpicsSignal, EpicsSignalRO, Kind, DerivedSignal from ophyd.status import Status, SubscriptionStatus, StatusBase, DeviceStatus from ophyd.flyers import FlyerInterface +from ophyd.utils import RedundantStaging +from ophyd.device import Staged from time import sleep import warnings import numpy as np @@ -82,10 +84,10 @@ class GigaFrostClient(Device): cfgInputPolarity2 = Component(EpicsSignalRO, "BNC5_RBV", auto_monitor=True) infoBoardTemp = Component(EpicsSignalRO, "T_BOARD", auto_monitor=True) - def __init__(self, prefix="", *, name, backend_url=const.BE999_DAFL_CLIENT, + def __init__(self, prefix="", *, name, auto_soft_enable=False, backend_url=const.BE999_DAFL_CLIENT, kind=None, read_attrs=None, configuration_attrs=None, parent=None, **kwargs): super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, **kwargs) - self.oldInitializer(backend_url=backend_url) + self.oldInitializer(backend_url=backend_url, auto_soft_enable=auto_soft_enable) def oldInitializer(self, auto_soft_enable=False, @@ -207,7 +209,7 @@ class GigaFrostClient(Device): ## Stop acquisition self.cmdStartCamera.set(0).wait() if self._auto_soft_enable: - self.set_soft_enable(0) + self.cmdSoftEnable.set(0).wait() # change settings self.cfgExposure.set(exposure).wait() @@ -244,6 +246,9 @@ class GigaFrostClient(Device): if self._auto_soft_enable: self.cmdSoftEnable.set(1).wait() self.state = const.GfStatus.ACQUIRING + + # Gigafrost can finish a run without explicit unstaging + self._staged = Staged.no return super().stage() def unstage(self): @@ -263,6 +268,12 @@ class GigaFrostClient(Device): """ self.unstage() + def trigger(self): + """ + Sends a software trigger + """ + self.cmdSoftTrigger.set(1).wait() + def reset(self): try: self.unstage() @@ -346,6 +357,44 @@ class GigaFrostClient(Device): else: return None + def put_trigger_mode(self, mode): + """ + Set the trigger mode for the GigaFRoST camera. + + Parameters + ---------- + mode : {'auto', 'external', 'timer', 'soft'} + The GigaFRoST trigger mode. + + """ + + if mode not in self._valid_trigger_modes: + raise ValueError("Invalid trigger mode! Valid modes are:\n" + "{}".format(self._valid_trigger_modes)) + + if mode == 'auto': + self.cfgTrigAuto.set(1).wait() + self.cfgTrigSoft.set(0).wait() + self.cfgTrigTimer.set(0).wait() + self.cfgTrigExt.set(0).wait() + elif mode == 'external': + self.cfgTrigAuto.set(0).wait() + self.cfgTrigSoft.set(0).wait() + self.cfgTrigTimer.set(0).wait() + self.cfgTrigExt.set(1).wait() + elif mode == 'timer': + self.cfgTrigAuto.set(0).wait() + self.cfgTrigSoft.set(0).wait() + self.cfgTrigTimer.set(1).wait() + self.cfgTrigExt.set(0).wait() + elif mode == 'soft': + self.cfgTrigAuto.set(0).wait() + self.cfgTrigSoft.set(1).wait() + self.cfgTrigTimer.set(0).wait() + self.cfgTrigExt.set(0).wait() + # Commit parameters + self.cmdSetParam.set(1).wait() + def put_enable_mode(self, mode): """ Apply the enable mode for the GigaFRoST camera. @@ -516,7 +565,7 @@ class GigaFrostClient(Device): -# Automatically start simulation if directly invoked +# Automatically connect to MicroSAXS testbench if directly invoked if __name__ == "__main__": gf = GigaFrostClient("X02DA-CAM-GF2:", name="gf2") gf.wait_for_connection() diff --git a/tomcat_bec/devices/gigafrost/stddaq_rest.py b/tomcat_bec/devices/gigafrost/stddaq_rest.py new file mode 100644 index 0000000..a2b6216 --- /dev/null +++ b/tomcat_bec/devices/gigafrost/stddaq_rest.py @@ -0,0 +1,155 @@ +# -*- coding: utf-8 -*- +""" +Created on Mon Jun 3 14:16:29 2024 + +@author: mohacsi_i +""" + +from time import sleep +from ophyd import Device, SignalRO, Component +from std_daq_client import StdDaqClient + + + +class StdDaqRestClient(Device): + """ Lightweight wrapper around the official StdDaqClient ophyd package. + Coincidentally also the StdDaqClient is using a Redis broker, that can + potentially be directly fed to the BEC. + + """ + # Status attributes + num_images = Component(SignalRO) + num_images_counter = Component(SignalRO) + output_file = Component(SignalRO) + run_id = Component(SignalRO) + state = Component(SignalRO) + + # Configuration attributes + bit_depth = Component(SignalRO) + detector_name = Component(SignalRO) + detector_type = Component(SignalRO) + image_pixel_width = Component(SignalRO) + image_pixel_height = Component(SignalRO) + start_udp_port = Component(SignalRO) + + def __init__(self, *args, url: str="http://localhost:5000", parent: Device = None, **kwargs) -> None: + + + super().__init__(*args, parent=parent, **kwargs) + self.url = url + + self._n_images = None + self._output_file = None + # Fill signals from current DAQ config + #self.poll_device_config() + #self.poll() + + def connect(self): + self.client = StdDaqClient(url_base=self.url) + + def configure(self, d: dict) -> tuple: + """ + Example: + std.configure(d={'bit_depth': 16, 'writer_user_id': 0}) + + """ + if "n_images" in d: + self._n_images = d['n_images'] + del d['n_images'] + if "output_file" in d: + self._output_file = d['output_file'] + del d['output_file'] + + old_config = self.client.get_config() + + self.client.set_config(daq_config=d) + + new_config = self.client.get_config() + return (old_config, new_config) + + + def stage(self): + self.client.start_writer_async( + {'output_file': self._output_file, 'n_images': self._n_images} + ) + sleep(0.1) + return super().stage() + #while True: + # sleep(0.1) + # daq_status = self.client.get_status() + # if daq_status['acquisition']['state'] in ["ACQUIRING"]: + # break + + def unstage(self): + """ Stop a running acquisition """ + self.client.stop_writer() + return super().unstage() + + def stop(self, *, success=False): + """ Stop a running acquisition """ + self.client.stop_writer() + + if success: + while True: + sleep(0.1) + daq_status = self.client.get_status() + if daq_status['acquisition']['state'] in ["STOPPED", "FINISHED"]: + break + + def poll(self): + """ Querry the currrent status from Std DAQ""" + daq_status = self.client.get_status() + + # Put if new value (put runs subscriptions) + if self.n_images.value != daq_status['acquisition']['info']['n_images']: + self.n_images.put(daq_status['acquisition']['info']['n_images'])get_ + + if self.n_written.value != daq_status['acquisition']['stats']['n_write_completed']: + self.n_written.put(daq_status['acquisition']['stats']['n_write_completed']) + + if self.output_file.value != daq_status['acquisition']['info']['output_file']: + self.output_file.put(daq_status['acquisition']['info']['output_file']) + + if self.run_id.value != daq_status['acquisition']['info']['run_id']: + self.run_id.put(daq_status['acquisition']['info']['run_id']) + + if self.state.value != daq_status['acquisition']['state']: + self.state.put(daq_status['acquisition']['state']) + + + def poll_device_config(self): + """ Querry the currrent configuration from Std DAQ""" + daq_config = self.client.get_config() + + # Put if new value (put runs subscriptions) + if self.bit_depth.value != daq_config['bit_depth']: + self.bit_depth.put(daq_config['bit_depth']) + + if self.detector_name.value != daq_config['detector_name']: + self.detector_name.put(daq_config['detector_name']) + + if self.detector_type.value != daq_config['detector_type']: + self.detector_type.put(daq_config['detector_type']) + + if self.image_pixel_width.value != daq_config['image_pixel_width']: + self.image_pixel_width.put(daq_config['image_pixel_width']) + + if self.image_pixel_height.value != daq_config['image_pixel_height']: + self.image_pixel_height.put(daq_config['image_pixel_height']) + + if self.start_udp_port.value != daq_config['start_udp_port']: + self.start_udp_port.put(daq_config['start_udp_port']) + + + + + +# Automatically connect to MicroSAXS testbench if directly invoked +if __name__ == "__main__": + daq = StdDaqRestClient(name='daq', url="http://xbl-daq-29:5001") + daq.wait_for_connection() + + + + + diff --git a/tomcat_bec/devices/gigafrost/stddaq_ws.py b/tomcat_bec/devices/gigafrost/stddaq_ws.py new file mode 100644 index 0000000..79f7712 --- /dev/null +++ b/tomcat_bec/devices/gigafrost/stddaq_ws.py @@ -0,0 +1,179 @@ +# -*- coding: utf-8 -*- +""" +Created on Mon Jun 3 14:16:29 2024 + +@author: mohacsi_i +""" +from time import sleep +from threading import Thread +from ophyd import Device, Signal, Component +from ophyd.utils import ReadOnlyError +from websockets.sync.client import connect +from websockets.exceptions import ConnectionClosedOK, ConnectionClosedError + +import json + + + +class SignalRO(Signal): + """ Reimplementation of SignalRO that allows forced writes""" + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._metadata.update( + connected=True, + write_access=False, + ) + + def put(self, value, *, timestamp=None, force=False): + if not force: + raise ReadOnlyError("The signal {} is readonly.".format(self.name)) + super().put(value, timestamp=timestamp, force=force) + + def set(self, value, *, timestamp=None, force=False): + raise ReadOnlyError("The signal {} is readonly.".format(self.name)) + + + +class StdDaqWsClient(Device): + """ Lightweight wrapper around the undocumented StdDaq websocket interface. + This was meant to replace the documented python client. We cannot read + or change the current configuration through this interface. + + A bit more about the Standard DAQ configuration: + + The standard DAQ configuration is a single JSON file locally autodeployed + to the DAQ servers (as root!!!). Previously there was a service to offer + a REST API to write this file, but since there's no frontend group, this + is no longer available. + """ + # Status attributes + status = Component(SignalRO, value='unknown') + n_image = Component(Signal, value=100) + file_path = Component(Signal, value="/gpfs/test/test-beamline") + + def __init__(self, *args, url: str="ws://localhost:8080", parent: Device = None, **kwargs) -> None: + super().__init__(*args, parent=parent, **kwargs) + self._ws_url = url + + # Connect ro the DAQ + self.connect() + + def connect(self): + # StdDAQ may reject connection for a few seconds + try: + self._client = connect(self._ws_url) + except ConnectionRefusedError: + sleep(5) + self._client = connect(self._ws_url) + + def configure(self, d: dict) -> tuple: + """ + Example: + std.configure(d={'n_images': 234, 'file_path': "/data/test/raw"}) + """ + if "num_images" in d: + self.n_images.set(d['n_images']) + del d['num_images'] + if "file_path" in d: + self.output_file.set(d['file_path']) + del d['file_path'] + return (old_config, new_config) + + def stage(self) -> list: + """Behavior: the StdDAQ can stop the previous run either by itself or + by calling unstage. So it might start from an already running state or + not, we can't query if not running. + """ + + file_path = self.file_path.get() + n_image = self.n_image.get() + + message = {"command":"start", "path": file_path, "n_image": n_image} + reply = self.message(message) + + + reply = json.loads(reply) + if reply['status'] in ('creating_file'): + self.status.put(reply['status'], force=True) + elif reply['status'] in ('rejected'): + raise RuntimeError(f"Start command rejected (might be already running): {reply['reason']}") + + self.t = Thread(target = self.poll) + self.t.start() + return super().stage() + + def unstage(self): + """ Stop a running acquisition + + WARN: This will also close the connection!!! + """ + message = {"command":"stop"} + self.message(message, wait_reply=False) + return super().unstage() + + def stop(self, *, success=False): + """ Stop a running acquisition + + WARN: This will also close the connection!!! + """ + message = {"command":"stop"} + # The poller thread locks recv raising a RuntimeError + self.message(message, wait_reply=False) + + def abort(self): + return self.message({"command": "abort"}) + + def message(self, d: dict, timeout=1, wait_reply=True): + """Send a message to the StdDAQ and receive a reply + + Note: finishing acquisition meang StdDAQ will close connections so + there's no idle state polling. + """ + reply = None + if isinstance(d, dict): + msg = json.dumps(d) + else: + msg = str(d) + + # Send message (reopen connection if needed) + try: + self._client.send(msg) + except ConnectionClosedError: + self.connect() + self._client.send(msg) + except ConnectionClosedOK: + self.connect() + self._client.send(msg) + # Wait for reply + if wait_reply: + try: + reply = self._client.recv(timeout) + print("A: ", reply) + return reply + except ConnectionClosedError as ex: + print(ex) + return None + except TimeoutError: + return None + return None + + def poll(self): + """Monitor status messages until connection is open""" + for msg in self._client: + try: + message = json.loads(msg) + self.status.put(message['status'], force=True) + except Exception as ex: + print(ex) + return + + +# Automatically connect to MicroSAXS testbench if directly invoked +if __name__ == "__main__": + daq = StdDaqWsClient(name='daq', url="ws://xbl-daq-29:8080") + daq.wait_for_connection() + + + + + diff --git a/tomcat_bec/devices/stddaqclient.py b/tomcat_bec/devices/stddaqclient.py deleted file mode 100644 index b606b05..0000000 --- a/tomcat_bec/devices/stddaqclient.py +++ /dev/null @@ -1,131 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Created on Mon Jun 3 14:16:29 2024 - -@author: mohacsi_i -""" -from time import sleep -from ophyd import Device, Signal, Component -from websockets.sync.client import connect -from websockets.exceptions import ConnectionClosedOK, ConnectionClosedError - -import json - -class StdDaqClientDevice(Device): - """ Lightweight wrapper around the undocumented StdDaq websocket interface. - This was meant to replace the documented python client. - - - - - - - A bit more about the Standard DAQ configuration: - - The standard DAQ configuration is a single JSON file locally autodeployed - to the DAQ servers (as root!!!). Previously there was a service to offer - a REST API to write this file, but since there's no frontend group, this - is no longer available. - """ - # Status attributes - n_image = Component(Signal) - file_path = Component(Signal) - - def __init__(self, *args, parent: Device = None, **kwargs) -> None: - super().__init__(*args, parent=parent, **kwargs) - self.ws_server_url = ( - kwargs["daq_url"] if "daq_url" in kwargs else "ws://xbl-daq-29:8080") - self._client = connect(self.ws_server_url) - - self.n_image.set(100) - self.file_path.set("/gpfs/test/test-beamline") - - def connect(self): - self._client = connect(self.ws_server_url) - - def configure(self, d: dict) -> tuple: - """ - Example: - std.configure(d={'n_images': 234, 'file_path': "/data/test/raw"}) - """ - if "num_images" in d: - self.n_images.set(d['n_images']) - del d['num_images'] - if "file_path" in d: - self.output_file.set(d['file_path']) - del d['file_path'] - return (old_config, new_config) - - def stage(self): - file_path = self.file_path.get() - n_image = self.n_image.get() - - message = {"command":"start", "path": file_path, "n_image": n_image} - self.message(message) - return super().stage() - - def unstage(self): - """ Stop a running acquisition - - WARN: This will also close the connection!!! - """ - message = {"command":"stop"} - self.message(message) - return super().unstage() - - def stop(self, *, success=False): - """ Stop a running acquisition - - WARN: This will also close the connection!!! - """ - message = {"command":"stop"} - self.message(message) - - def status(self): - return self.message({"command": "status"}) - - def abort(self): - return self.message({"command": "abort"}) - - def message(self, d: dict, timeout=1): - """ - - Note: finishing acquisition meang StdDAQ will close connections - """ - reply = None - if isinstance(d, dict): - msg = json.dumps(d) - else: - msg = str(d) - print("Q: ", msg) - # Send message (reopen connection if needed) - try: - self._client.send(msg) - except ConnectionClosedError: - # StdDAQ may reject connection for a few seconds - try: - self._client = connect(self.ws_server_url) - except ConnectionRefusedError: - sleep(5) - self._client = connect(self.ws_server_url) - self._client.send(msg) - except ConnectionClosedOK: - # StdDAQ may reject connection for a few seconds - try: - self._client = connect(self.ws_server_url) - except ConnectionRefusedError: - sleep(5) - self._client = connect(self.ws_server_url) - self._client.send(msg) - # Wait for reply - try: - reply = self._client.recv(timeout) - print("A: ", reply) - except ConnectionClosedError as ex: - print(ex) - pass - except TimeoutError: - pass - return reply - -