From 0dc6412ed85f66c3485a1070f2be58190e33dabd Mon Sep 17 00:00:00 2001 From: gac-x05la Date: Wed, 5 Feb 2025 17:32:34 +0100 Subject: [PATCH] Dummy PCO edge consumer --- tomcat_bec/devices/gigafrost/pco_datasink.py | 192 ++++++++++++++++++ tomcat_bec/devices/gigafrost/pcoedgecamera.py | 2 + 2 files changed, 194 insertions(+) create mode 100644 tomcat_bec/devices/gigafrost/pco_datasink.py diff --git a/tomcat_bec/devices/gigafrost/pco_datasink.py b/tomcat_bec/devices/gigafrost/pco_datasink.py new file mode 100644 index 0000000..b76defb --- /dev/null +++ b/tomcat_bec/devices/gigafrost/pco_datasink.py @@ -0,0 +1,192 @@ +# -*- coding: utf-8 -*- +""" +Standard DAQ preview image stream module + +Created on Thu Jun 27 17:28:43 2024 + +@author: mohacsi_i +""" +import json +import enum +from time import sleep, time +from threading import Thread +import zmq +import numpy as np +from ophyd import Device, Signal, Component, Kind, DeviceStatus +from ophyd_devices.interfaces.base_classes.psi_detector_base import ( + CustomDetectorMixin, + PSIDetectorBase, +) + +from bec_lib import bec_logger +logger = bec_logger.logger +ZMQ_TOPIC_FILTER = b'' + + + +class PcoTestConsumerMixin(CustomDetectorMixin): + """Setup class for the standard DAQ preview stream + + Parent class: CustomDetectorMixin + """ + def on_stage(self): + """Start listening for preview data stream""" + if self.parent._mon is not None: + self.parent.unstage() + sleep(0.5) + + self.parent.connect() + self._stop_polling = False + self.parent._mon = Thread(target=self.poll, daemon=True) + self.parent._mon.start() + + def on_unstage(self): + """Stop a running preview""" + if self.parent._mon is not None: + self._stop_polling = True + # Might hang on recv_multipart + self.parent._mon.join(timeout=1) + # So also disconnect the socket + self.parent.disconnect() + + def on_stop(self): + """Stop a running preview""" + self.parent.disconnect() + + def poll(self): + """Collect streamed updates""" + try: + t_last = time() + while True: + try: + # Exit loop and finish monitoring + if self._stop_polling: + logger.info(f"[{self.parent.name}]\tDetaching monitor") + break + + # pylint: disable=no-member + r = self.parent._socket.recv() + + # Length and throtling checks + t_curr = time() + t_elapsed = t_curr - t_last + if t_elapsed < self.parent.throttle.get(): + continue + """ + # Unpack the Array V1 reply to metadata and array data + meta, data = r + print(meta) + + # Update image and update subscribers + header = json.loads(meta) + if header["type"] == "uint16": + image = np.frombuffer(data, dtype=np.uint16) + if image.size != np.prod(header['shape']): + err = f"Unexpected array size of {image.size} for header: {header}" + raise ValueError(err) + image = image.reshape(header['shape']) + + # Update image and update subscribers + self.parent.frame.put(header['frame'], force=True) + self.parent.image_shape.put(header['shape'], force=True) + self.parent.image.put(image, force=True) + self.parent._last_image = image + self.parent._run_subs(sub_type=self.parent.SUB_MONITOR, value=image) + """ + t_last = t_curr + #logger.info( + # f"[{self.parent.name}] Updated frame {header['frame']}\t" + # f"Shape: {header['shape']}\tMean: {np.mean(image):.3f}" + # ) + except ValueError: + # Happens when ZMQ partially delivers the multipart message + pass + except zmq.error.Again: + # Happens when receive queue is empty + sleep(0.1) + except Exception as ex: + logger.info(f"[{self.parent.name}]\t{str(ex)}") + raise + finally: + try: + self.parent._socket.disconnect() + except: + pass + self.parent._mon = None + logger.info(f"[{self.parent.name}]\tDetaching monitor") + + +class PcoTestConsumer(PSIDetectorBase): + """Detector wrapper class around the StdDaq preview image stream. + + This was meant to provide live image stream directly from the StdDAQ. + Note that the preview stream must be already throtled in order to cope + with the incoming data and the python class might throttle it further. + + You can add a preview widget to the dock by: + cam_widget = gui.add_dock('cam_dock1').add_widget('BECFigure').image('daq_stream1') + """ + # Subscriptions for plotting image + USER_ACCESS = ["get_last_image"] + SUB_MONITOR = "device_monitor_2d" + _default_sub = SUB_MONITOR + + custom_prepare_cls = PcoTestConsumerMixin + + # Status attributes + url = Component(Signal, kind=Kind.config) + throttle = Component(Signal, value=0.25, kind=Kind.config) + frame = Component(Signal, kind=Kind.hinted) + image_shape = Component(Signal, kind=Kind.normal) + # FIXME: The BEC client caches the read()s from the last 50 scans + image = Component(Signal, kind=Kind.omitted) + _last_image = None + _mon = None + _socket = None + + def __init__( + self, *args, url: str = "tcp://129.129.95.38:20000", parent: Device = None, **kwargs + ) -> None: + super().__init__(*args, parent=parent, **kwargs) + self.url._metadata["write_access"] = False + self.image._metadata["write_access"] = False + self.frame._metadata["write_access"] = False + self.image_shape._metadata["write_access"] = False + self.url.set(url, force=True).wait() + + def connect(self): + """Connect to te StDAQs PUB-SUB streaming interface + + StdDAQ may reject connection for a few seconds when it restarts, + so if it fails, wait a bit and try to connect again. + """ + # pylint: disable=no-member + # Socket to talk to server + context = zmq.Context() + self._socket = context.socket(zmq.PULL) + try: + self._socket.connect(self.url.get()) + except ConnectionRefusedError: + sleep(1) + self._socket.connect(self.url.get()) + + def disconnect(self): + """Disconnect + """ + try: + if self._socket is not None: + self._socket.disconnect(self.url.get()) + except zmq.ZMQError: + pass + finally: + self._socket = None + + + def get_image(self): + return self._last_image + + +# Automatically connect to MicroSAXS testbench if directly invoked +if __name__ == "__main__": + daq = PcoTestConsumerMixin(url="tcp://129.129.106.124:8080", name="preview") + daq.wait_for_connection() diff --git a/tomcat_bec/devices/gigafrost/pcoedgecamera.py b/tomcat_bec/devices/gigafrost/pcoedgecamera.py index cf7a34f..edbfa5e 100644 --- a/tomcat_bec/devices/gigafrost/pcoedgecamera.py +++ b/tomcat_bec/devices/gigafrost/pcoedgecamera.py @@ -104,6 +104,8 @@ class HelgeCameraBase(PSIDeviceBase): if STOREMODE == FIFO buffer Continously streams out data using the buffer as a FIFO queue and SAVESTART and SAVESTOP selects a ROI of images to be streamed continously (i.e. a large SAVESTOP streams indefinitely) + Note that in FIFO mode buffer reads are destructive, to prevent this, we don't have EPICS preview + """