From 7c5df6d0b46e438ea10ce6c9a09ec4a9a3f8ca93 Mon Sep 17 00:00:00 2001 From: gac-x05la Date: Thu, 18 Jul 2024 12:08:16 +0200 Subject: [PATCH] DAQ config reader --- .../device_configs/microxas_test_bed.yaml | 13 + tomcat_bec/devices/gigafrost/stddaq_rest.py | 112 ++++++++ tomcat_bec/devices/gigafrost/stddaq_urllib.py | 249 ------------------ 3 files changed, 125 insertions(+), 249 deletions(-) create mode 100644 tomcat_bec/devices/gigafrost/stddaq_rest.py delete mode 100644 tomcat_bec/devices/gigafrost/stddaq_urllib.py diff --git a/tomcat_bec/device_configs/microxas_test_bed.yaml b/tomcat_bec/device_configs/microxas_test_bed.yaml index 0093dfa..5c282e7 100644 --- a/tomcat_bec/device_configs/microxas_test_bed.yaml +++ b/tomcat_bec/device_configs/microxas_test_bed.yaml @@ -119,6 +119,19 @@ daq: readoutPriority: monitored softwareTrigger: false +daqcfg: + description: Standard DAQ config + deviceClass: tomcat_bec.devices.gigafrost.stddaq_rest.StdDaqRestConfig + deviceConfig: + url: 'http://xbl-daq-29:5000' + deviceTags: + - std-daq + enabled: true + onFailure: buffer + readOnly: false + readoutPriority: monitored + softwareTrigger: false + daq_stream0: description: Standard DAQ preview stream 2 frames every 1000 deviceClass: tomcat_bec.devices.gigafrost.stddaq_preview.StdDaqPreview diff --git a/tomcat_bec/devices/gigafrost/stddaq_rest.py b/tomcat_bec/devices/gigafrost/stddaq_rest.py new file mode 100644 index 0000000..8560056 --- /dev/null +++ b/tomcat_bec/devices/gigafrost/stddaq_rest.py @@ -0,0 +1,112 @@ +# -*- coding: utf-8 -*- +""" +Standard DAQ class module + +Created on Thu Jun 27 17:28:43 2024 + +@author: mohacsi_i +""" +import sys +import json +from time import sleep +from threading import Thread +from ophyd import Device, Signal, Component, Kind +import requests + + +class StdDaqRestConfig(Device): + """Wrapper class around the new StdDaq REST interface. + + This was meant to replace the websocket inteface that replaced the + documented python client. We can finally read configuration through + standard HTTP requests, although the secondary server is ot reachable + at the time. + """ + # pylint: disable=too-many-instance-attributes + + # Status attributes + cfg_detector_name = Component(Signal, kind=Kind.config) + cfg_detector_type = Component(Signal, kind=Kind.config) + cfg_n_modules = Component(Signal, kind=Kind.config) + cfg_bit_depth = Component(Signal, kind=Kind.config) + cfg_image_pixel_height = Component(Signal, kind=Kind.config) + cfg_image_pixel_width = Component(Signal, kind=Kind.config) + cfg_start_udp_port = Component(Signal, kind=Kind.config) + cfg_writer_user_id = Component(Signal, kind=Kind.config) + cfg_submodule_info = Component(Signal, kind=Kind.config) + cfg_max_number_of_forwarders_spawned = Component(Signal, kind=Kind.config) + cfg_use_all_forwarders = Component(Signal, kind=Kind.config) + cfg_module_sync_queue_size = Component(Signal, kind=Kind.config) + cfg_module_positions = Component(Signal, kind=Kind.config) + + + def __init__( + self, *args, url: str = "http://xbl-daq-29:5000", parent: Device = None, **kwargs + ) -> None: + super().__init__(*args, parent=parent, **kwargs) + self._url_base = url + + # Connect ro the DAQ and initialize values + self.read_daq_config() + + def read_daq_config(self): + """Read the current configuration from the JSON file + """ + r = requests.get(self._url_base + '/api/config/get', params={'config_file': "/etc/std_daq/configs/gf1.json", 'user':"ioc"}) + if r.status_code != 200: + raise ConnectionError(f"[{self.name}] Error {r.status_code}:\t{r.text}") + + cfg = r.json() + for key, val in cfg.items(): + if isinstance(val, (int, float, str)): + getattr(self, "cfg_"+key).set(val).wait() + return cfg + + def write_daq_config(self): + config = { + 'detector_name': str(self.cfg_detector_name.get()), + 'detector_type': str(self.cfg_detector_type.get()), + 'n_modules': int(self.cfg_n_modules.get()), + 'bit_depth': int(self.cfg_bit_depth.get()), + 'image_pixel_height': int(self.cfg_image_pixel_height.get()), + 'image_pixel_width': int(self.cfg_image_pixel_width.get()), + 'start_udp_port': int(self.cfg_start_udp_port.get()), + 'writer_user_id': int(self.cfg_writer_user_id.get()), + 'submodule_info': self.cfg_submodule_info.get(), + 'max_number_of_forwarders_spawned': int(self.cfg_max_number_of_forwarders_spawned.get()), + 'use_all_forwarders': bool(self.cfg_use_all_forwarders.get()), + 'module_sync_queue_size': int(self.cfg_module_sync_queue_size.get()), + 'module_positions': self.cfg_module_positions.get() + } + + params = {"user": "ioc", "config_file": "/etc/std_daq/configs/gf1.json"} + + r = requests.post(self._url_base +'/api/config/set', params=params, json=config, headers={"Content-Type": "application/json"}) + if r.status_code != 200: + raise ConnectionError(f"[{self.name}] Error {r.status_code}:\t{r.text}") + + + def stage(self) -> list: + """Read the current configuration from the DAQ + """ + self.read_daq_config() + return super().stage() + + + def unstage(self): + """Read the current configuration from the DAQ + """ + self.read_daq_config() + return super().unstage() + + + def stop(self): + """Read the current configuration from the DAQ + """ + self.unstage() + + +# Automatically connect to MicroSAXS testbench if directly invoked +if __name__ == "__main__": + daqcfg = StdDaqRestConfig(name="daqcfg", url="http://xbl-daq-29:5000") + daqcfg.wait_for_connection() diff --git a/tomcat_bec/devices/gigafrost/stddaq_urllib.py b/tomcat_bec/devices/gigafrost/stddaq_urllib.py deleted file mode 100644 index df93639..0000000 --- a/tomcat_bec/devices/gigafrost/stddaq_urllib.py +++ /dev/null @@ -1,249 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Standard DAQ class module - -Created on Thu Jun 27 17:28:43 2024 - -@author: mohacsi_i -""" -import sys -import json -from time import sleep -from threading import Thread -from ophyd import Device, Signal, Component, Kind -from websockets.sync.client import connect -from websockets.exceptions import ConnectionClosedOK, ConnectionClosedError - - -class StdDaqRestClient(Device): - """Wrapper class around the new StdDaq REST interface. - - This was meant to replace the websocket inteface that replaced the - documented python client. We can finally read configuration with urllib3: - ''' - import urllib3 as url3 - r = url3.request(method='GET', url='http://xbl-daq-29:5000/api/config/get', fields={'config_file': "/etc/std_daq/configs/gf1.json", 'user':"ioc"}) - ''' - """ - # pylint: disable=too-many-instance-attributes - - # Status attributes - status = Component(Signal, value="unknown", kind=Kind.hinted) - n_images = Component(Signal, value=10000, kind=Kind.config) - file_path = Component(Signal, value="/gpfs/test/test-beamline", kind=Kind.config) - - - cfg_detector_name = Component(Signal, kind=Kind.config) - cfg_detector_type = Component(Signal, kind=Kind.config) - cfg_num_modules = Component(Signal, kind=Kind.config) - cfg_bit_depth = Component(Signal, kind=Kind.config) - cfg_image_pixel_height = Component(Signal, kind=Kind.config) - cfg_image_pixel_width = Component(Signal, kind=Kind.config) - cfg_start_udp_port = Component(Signal, kind=Kind.config) - cfg_writer_user_id = Component(Signal, kind=Kind.config) - cfg_submodule_info = Component(Signal, kind=Kind.config) - cfg_max_number_of_forwarders_spawned = Component(Signal, kind=Kind.config) - cfg_use_all_forwarders = Component(Signal, kind=Kind.config) - cfg_module_sync_queue_size = Component(Signal, kind=Kind.config) - cfg_module_positions = Component(Signal, kind=Kind.config) - - - - def __init__( - self, *args, url: str = "http://xbl-daq-29:5000", parent: Device = None, **kwargs - ) -> None: - super().__init__(*args, parent=parent, **kwargs) - self.status._metadata["write_access"] = False - self._url = url - self._mon = None - - # Connect ro the DAQ - self.connect() - - def read_daq_config(self): - """Read the current configuration from the JSON file - """ - r = url3.request(method='GET', url='http://xbl-daq-29:5000/api/config/get', fields={'config_file': "/etc/std_daq/configs/gf1.json", 'user':"ioc"}) - r = r.json() - - if 'detail' in r: - # Failed to read config, probbaly wrong file - return - - for key, val in r: - getattr(self, "cfg_"+key).set(val).wait() - - def write_daq_config(self): - - - config = { - 'detector_name': str(self.cfg_detector_name.get()), - 'detector_type': str(self.cfg_detector_type.get()), - 'n_modules': int(self.cfg_n_modules.get()), - 'bit_depth': int(self.cfg_bit_depth.get()), - 'image_pixel_height': int(self.cfg_image_pixel_height.get()), - 'image_pixel_width': int(self.cfg_image_pixel_width.get()), - 'start_udp_port': int(self.cfg_start_udp_port.get()), - 'writer_user_id' int(self.cfg_writer_user_id.get()), - 'submodule_info': self.cfg_submodule_info.get(), - 'max_number_of_forwarders_spawned': int(self.cfg_max_number_of_forwarders_spawned.get()), - 'use_all_forwarders': bool(self.cfg_use_all_forwarders.get()), - 'module_sync_queue_size': int(self.cfg_module_sync_queue_size.get()), - 'module_positions': self.cfg_module_positions.get() - } - - - - - - def connect(self): - """Connect to te StDAQs websockets interface - - StdDAQ may reject connection for a few seconds, so if it fails, wait - a bit and try to connect again. - """ - try: - self._client = connect(self._ws_url) - except ConnectionRefusedError: - sleep(5) - self._client = connect(self._ws_url) - - def monitor(self): - """Attach monitoring to the DAQ""" - self._client = connect(self._ws_url) - self._mon = Thread(target=self.poll, daemon=True) - self._mon.start() - - - - - - - - - def configure(self, n_images: int = None, file_path: str = None) -> tuple: - """Set the standard DAQ parameters for the next run - - Note that full reconfiguration is not possible with the websocket - interface, only changing acquisition parameters. These changes are only - activated upon staging! - - Example: - ---------- - std.configure(n_images=10000, file_path="/data/test/raw") - - Parameters - ---------- - n_images : int, optional - Number of images to be taken during each scan. Set to -1 for an - unlimited number of images (limited by the ringbuffer size and - backend speed). (default = 10000) - file_path : string, optional - Save file path. (default = '/gpfs/test/test-beamline') - - """ - old_config = self.read_configuration() - # If Bluesky style configure - if isinstance(n_images, dict): - d = n_images.copy() - n_images = d.get('n_images', None) - file_path = d.get('file_path', None) - - if n_images is not None: - self.n_images.set(int(n_images)) - if file_path is not None: - self.output_file.set(str(file_path)) - - new_config = self.read_configuration() - return (old_config, new_config) - - def stage(self) -> list: - """Start a new run with the standard DAQ - - Behavior: the StdDAQ can stop the previous run either by itself or - by calling unstage. So it might start from an already running state or - not, we can't query if not running. - """ - file_path = self.file_path.get() - n_image = self.n_images.get() - - message = {"command": "start", "path": file_path, "n_image": n_image} - reply = self.message(message) - - reply = json.loads(reply) - if reply["status"] in ("creating_file"): - self.status.put(reply["status"], force=True) - elif reply["status"] in ("rejected"): - raise RuntimeError( - f"Start command rejected (might be already running): {reply['reason']}" - ) - - self._mon = Thread(target=self.poll, daemon=True) - self._mon.start() - return super().stage() - - def unstage(self): - """ Stop a running acquisition - - WARN: This will also close the connection!!! - """ - message = {"command": "stop"} - _ = self.message(message, wait_reply=False) - return super().unstage() - - def stop(self): - """ Stop a running acquisition - - WARN: This will also close the connection!!! - """ - message = {"command": "stop"} - # The poller thread locks recv raising a RuntimeError - self.message(message, wait_reply=False) - - def message(self, message: dict, timeout=1, wait_reply=True): - """Send a message to the StdDAQ and receive a reply - - Note: finishing acquisition meang StdDAQ will close connections so - there's no idle state polling. - """ - if isinstance(message, dict): - msg = json.dumps(message) - else: - msg = str(message) - - # Send message (reopen connection if needed) - try: - self._client.send(msg) - except (ConnectionClosedError, ConnectionClosedOK): - self.connect() - self._client.send(msg) - # Wait for reply - reply = None - if wait_reply: - try: - reply = self._client.recv(timeout) - return reply - except (ConnectionClosedError, ConnectionClosedOK, TimeoutError) as ex: - print(ex) - return reply - - def poll(self): - """Monitor status messages until connection is open""" - try: - for msg in self._client: - try: - message = json.loads(msg) - self.status.put(message["status"], force=True) - except (ConnectionClosedError, ConnectionClosedOK) as ex: - return - except Exception as ex: - print(ex) - return - finally: - self._mon = None - - -# Automatically connect to MicroSAXS testbench if directly invoked -if __name__ == "__main__": - daq = StdDaqWsClient(name="daq", url="ws://xbl-daq-29:8080") - daq.wait_for_connection()