diff --git a/tomcat_bec/device_configs/microxas_test_bed.yaml b/tomcat_bec/device_configs/microxas_test_bed.yaml index 8b32ecc..0093dfa 100644 --- a/tomcat_bec/device_configs/microxas_test_bed.yaml +++ b/tomcat_bec/device_configs/microxas_test_bed.yaml @@ -120,7 +120,7 @@ daq: softwareTrigger: false daq_stream0: - description: Standard DAQ controls + description: Standard DAQ preview stream 2 frames every 1000 deviceClass: tomcat_bec.devices.gigafrost.stddaq_preview.StdDaqPreview deviceConfig: url: 'tcp://129.129.95.38:20000' @@ -133,7 +133,7 @@ daq_stream0: softwareTrigger: false daq_stream1: - description: Standard DAQ controls + description: Standard DAQ preview stream 4 frames at 10 Hz deviceClass: tomcat_bec.devices.gigafrost.stddaq_preview.StdDaqPreview deviceConfig: url: 'tcp://129.129.95.38:20001' diff --git a/tomcat_bec/devices/gigafrost/stddaq_preview.py b/tomcat_bec/devices/gigafrost/stddaq_preview.py index 85dd745..776229d 100644 --- a/tomcat_bec/devices/gigafrost/stddaq_preview.py +++ b/tomcat_bec/devices/gigafrost/stddaq_preview.py @@ -24,9 +24,18 @@ class StdDaqPreview(Device): This was meant to provide live image preview directly from the StdDAQ. Note that the preview stream must be heavily throtled in order to cope with the incoming data. + + You can add a preview widget to the dock by: + cam_widget = gui.add_dock('cam_dock1').add_widget('BECFigure').image('daq_stream1') + """ # pylint: disable=too-many-instance-attributes + + SUB_MONITOR = "monitor" + _default_sub = SUB_MONITOR + + # Status attributes url = Component(Signal, kind=Kind.config) status = Component(Signal, value="detached", kind=Kind.omitted) @@ -143,8 +152,10 @@ class StdDaqPreview(Device): self.frame.put(header['frame'], force=True) self.shape.put(header['shape'], force=True) self.image.put(image, force=True) + self._run_subs(sub_type=self.SUB_MONITOR, value=image) + t_last=t_curr - print(f"[DPREV] Updated frame {header['frame']}\tMean: {np.mean(image)}", file=sys.stderr) + print(f"[{self.name}]\tUpdated frame {header['frame']}\tMean: {np.mean(image)}", file=sys.stderr) # Perform some basic analysis on the image if self.process.get(): diff --git a/tomcat_bec/devices/gigafrost/stddaq_urllib.py b/tomcat_bec/devices/gigafrost/stddaq_urllib.py new file mode 100644 index 0000000..df93639 --- /dev/null +++ b/tomcat_bec/devices/gigafrost/stddaq_urllib.py @@ -0,0 +1,249 @@ +# -*- coding: utf-8 -*- +""" +Standard DAQ class module + +Created on Thu Jun 27 17:28:43 2024 + +@author: mohacsi_i +""" +import sys +import json +from time import sleep +from threading import Thread +from ophyd import Device, Signal, Component, Kind +from websockets.sync.client import connect +from websockets.exceptions import ConnectionClosedOK, ConnectionClosedError + + +class StdDaqRestClient(Device): + """Wrapper class around the new StdDaq REST interface. + + This was meant to replace the websocket inteface that replaced the + documented python client. We can finally read configuration with urllib3: + ''' + import urllib3 as url3 + r = url3.request(method='GET', url='http://xbl-daq-29:5000/api/config/get', fields={'config_file': "/etc/std_daq/configs/gf1.json", 'user':"ioc"}) + ''' + """ + # pylint: disable=too-many-instance-attributes + + # Status attributes + status = Component(Signal, value="unknown", kind=Kind.hinted) + n_images = Component(Signal, value=10000, kind=Kind.config) + file_path = Component(Signal, value="/gpfs/test/test-beamline", kind=Kind.config) + + + cfg_detector_name = Component(Signal, kind=Kind.config) + cfg_detector_type = Component(Signal, kind=Kind.config) + cfg_num_modules = Component(Signal, kind=Kind.config) + cfg_bit_depth = Component(Signal, kind=Kind.config) + cfg_image_pixel_height = Component(Signal, kind=Kind.config) + cfg_image_pixel_width = Component(Signal, kind=Kind.config) + cfg_start_udp_port = Component(Signal, kind=Kind.config) + cfg_writer_user_id = Component(Signal, kind=Kind.config) + cfg_submodule_info = Component(Signal, kind=Kind.config) + cfg_max_number_of_forwarders_spawned = Component(Signal, kind=Kind.config) + cfg_use_all_forwarders = Component(Signal, kind=Kind.config) + cfg_module_sync_queue_size = Component(Signal, kind=Kind.config) + cfg_module_positions = Component(Signal, kind=Kind.config) + + + + def __init__( + self, *args, url: str = "http://xbl-daq-29:5000", parent: Device = None, **kwargs + ) -> None: + super().__init__(*args, parent=parent, **kwargs) + self.status._metadata["write_access"] = False + self._url = url + self._mon = None + + # Connect ro the DAQ + self.connect() + + def read_daq_config(self): + """Read the current configuration from the JSON file + """ + r = url3.request(method='GET', url='http://xbl-daq-29:5000/api/config/get', fields={'config_file': "/etc/std_daq/configs/gf1.json", 'user':"ioc"}) + r = r.json() + + if 'detail' in r: + # Failed to read config, probbaly wrong file + return + + for key, val in r: + getattr(self, "cfg_"+key).set(val).wait() + + def write_daq_config(self): + + + config = { + 'detector_name': str(self.cfg_detector_name.get()), + 'detector_type': str(self.cfg_detector_type.get()), + 'n_modules': int(self.cfg_n_modules.get()), + 'bit_depth': int(self.cfg_bit_depth.get()), + 'image_pixel_height': int(self.cfg_image_pixel_height.get()), + 'image_pixel_width': int(self.cfg_image_pixel_width.get()), + 'start_udp_port': int(self.cfg_start_udp_port.get()), + 'writer_user_id' int(self.cfg_writer_user_id.get()), + 'submodule_info': self.cfg_submodule_info.get(), + 'max_number_of_forwarders_spawned': int(self.cfg_max_number_of_forwarders_spawned.get()), + 'use_all_forwarders': bool(self.cfg_use_all_forwarders.get()), + 'module_sync_queue_size': int(self.cfg_module_sync_queue_size.get()), + 'module_positions': self.cfg_module_positions.get() + } + + + + + + def connect(self): + """Connect to te StDAQs websockets interface + + StdDAQ may reject connection for a few seconds, so if it fails, wait + a bit and try to connect again. + """ + try: + self._client = connect(self._ws_url) + except ConnectionRefusedError: + sleep(5) + self._client = connect(self._ws_url) + + def monitor(self): + """Attach monitoring to the DAQ""" + self._client = connect(self._ws_url) + self._mon = Thread(target=self.poll, daemon=True) + self._mon.start() + + + + + + + + + def configure(self, n_images: int = None, file_path: str = None) -> tuple: + """Set the standard DAQ parameters for the next run + + Note that full reconfiguration is not possible with the websocket + interface, only changing acquisition parameters. These changes are only + activated upon staging! + + Example: + ---------- + std.configure(n_images=10000, file_path="/data/test/raw") + + Parameters + ---------- + n_images : int, optional + Number of images to be taken during each scan. Set to -1 for an + unlimited number of images (limited by the ringbuffer size and + backend speed). (default = 10000) + file_path : string, optional + Save file path. (default = '/gpfs/test/test-beamline') + + """ + old_config = self.read_configuration() + # If Bluesky style configure + if isinstance(n_images, dict): + d = n_images.copy() + n_images = d.get('n_images', None) + file_path = d.get('file_path', None) + + if n_images is not None: + self.n_images.set(int(n_images)) + if file_path is not None: + self.output_file.set(str(file_path)) + + new_config = self.read_configuration() + return (old_config, new_config) + + def stage(self) -> list: + """Start a new run with the standard DAQ + + Behavior: the StdDAQ can stop the previous run either by itself or + by calling unstage. So it might start from an already running state or + not, we can't query if not running. + """ + file_path = self.file_path.get() + n_image = self.n_images.get() + + message = {"command": "start", "path": file_path, "n_image": n_image} + reply = self.message(message) + + reply = json.loads(reply) + if reply["status"] in ("creating_file"): + self.status.put(reply["status"], force=True) + elif reply["status"] in ("rejected"): + raise RuntimeError( + f"Start command rejected (might be already running): {reply['reason']}" + ) + + self._mon = Thread(target=self.poll, daemon=True) + self._mon.start() + return super().stage() + + def unstage(self): + """ Stop a running acquisition + + WARN: This will also close the connection!!! + """ + message = {"command": "stop"} + _ = self.message(message, wait_reply=False) + return super().unstage() + + def stop(self): + """ Stop a running acquisition + + WARN: This will also close the connection!!! + """ + message = {"command": "stop"} + # The poller thread locks recv raising a RuntimeError + self.message(message, wait_reply=False) + + def message(self, message: dict, timeout=1, wait_reply=True): + """Send a message to the StdDAQ and receive a reply + + Note: finishing acquisition meang StdDAQ will close connections so + there's no idle state polling. + """ + if isinstance(message, dict): + msg = json.dumps(message) + else: + msg = str(message) + + # Send message (reopen connection if needed) + try: + self._client.send(msg) + except (ConnectionClosedError, ConnectionClosedOK): + self.connect() + self._client.send(msg) + # Wait for reply + reply = None + if wait_reply: + try: + reply = self._client.recv(timeout) + return reply + except (ConnectionClosedError, ConnectionClosedOK, TimeoutError) as ex: + print(ex) + return reply + + def poll(self): + """Monitor status messages until connection is open""" + try: + for msg in self._client: + try: + message = json.loads(msg) + self.status.put(message["status"], force=True) + except (ConnectionClosedError, ConnectionClosedOK) as ex: + return + except Exception as ex: + print(ex) + return + finally: + self._mon = None + + +# Automatically connect to MicroSAXS testbench if directly invoked +if __name__ == "__main__": + daq = StdDaqWsClient(name="daq", url="ws://xbl-daq-29:8080") + daq.wait_for_connection()