From da823f0a4078fff6a976230a55562a63ed517ef5 Mon Sep 17 00:00:00 2001 From: gac-x05la Date: Wed, 24 Jul 2024 10:53:47 +0200 Subject: [PATCH] DAQ preview using standard detector class --- .../devices/gigafrost/stddaq_preview.py | 251 +++++++++++++----- 1 file changed, 182 insertions(+), 69 deletions(-) diff --git a/tomcat_bec/devices/gigafrost/stddaq_preview.py b/tomcat_bec/devices/gigafrost/stddaq_preview.py index 776229d..4c42fd3 100644 --- a/tomcat_bec/devices/gigafrost/stddaq_preview.py +++ b/tomcat_bec/devices/gigafrost/stddaq_preview.py @@ -6,16 +6,28 @@ Created on Thu Jun 27 17:28:43 2024 @author: mohacsi_i """ -import sys import json +import enum +import zmq +import numpy as np from time import sleep, time from threading import Thread -import numpy as np -import zmq -#import matplotlib.pyplot as plt from ophyd import Device, Signal, Component, Kind +from ophyd_devices.interfaces.base_classes.psi_detector_base import ( + CustomDetectorMixin, + PSIDetectorBase, +) -TOPIC_FILTER = '' +from bec_lib import bec_logger +logger = bec_logger.logger +ZMQ_TOPIC_FILTER = b'' + + +class StdDaqPreviewState(enum.IntEnum): + """Standard DAQ ophyd device states""" + UNKNOWN = 0 + DETACHED = 1 + MONITORING = 2 class StdDaqPreview(Device): @@ -27,22 +39,19 @@ class StdDaqPreview(Device): You can add a preview widget to the dock by: cam_widget = gui.add_dock('cam_dock1').add_widget('BECFigure').image('daq_stream1') - """ # pylint: disable=too-many-instance-attributes - + # Subscriptions for plotting image SUB_MONITOR = "monitor" _default_sub = SUB_MONITOR - # Status attributes url = Component(Signal, kind=Kind.config) - status = Component(Signal, value="detached", kind=Kind.omitted) - process = Component(Signal, value=True, kind=Kind.omitted) + status = Component(Signal, value=StdDaqPreviewState.UNKNOWN, kind=Kind.omitted) image = Component(Signal, kind=Kind.normal) frame = Component(Signal, kind=Kind.normal) - shape = Component(Signal, kind=Kind.omitted) + image_shape = Component(Signal, kind=Kind.omitted) value = Component(Signal, kind=Kind.hinted) _throttle = 0.05 @@ -54,10 +63,9 @@ class StdDaqPreview(Device): self.status._metadata["write_access"] = False self.image._metadata["write_access"] = False self.frame._metadata["write_access"] = False - self.shape._metadata["write_access"] = False + self.image_shape._metadata["write_access"] = False self.value._metadata["write_access"] = False self.url.set(url, force=True).wait() - self._stream_url = url self._stop_polling = False self._mon = None @@ -74,18 +82,19 @@ class StdDaqPreview(Device): # Socket to talk to server context = zmq.Context() self._socket = context.socket(zmq.SUB) - self._socket.setsockopt(zmq.SUBSCRIBE, b'') + self._socket.setsockopt(zmq.SUBSCRIBE, ZMQ_TOPIC_FILTER) try: - self._socket.connect(self._stream_url) + self._socket.connect(self.url.get()) except ConnectionRefusedError: - sleep(5) - self._socket.connect(self._stream_url) + sleep(1) + self._socket.connect(self.url.get()) def configure(self, throttle: float = 0.5) -> tuple: """Set the DAQ preview parameters Note that there's not much to do except for additional throtling if the - preview data stream is too fast. + preview data stream is too fast. Perhaps later we can add some online + processing to ophyd. Example: ---------- @@ -100,6 +109,7 @@ class StdDaqPreview(Device): def stage(self) -> list: """Start listening for preview data stream""" + self.connect() self._stop_polling = False self._mon = Thread(target=self.poll, daemon=True) self._mon.start() @@ -114,67 +124,170 @@ class StdDaqPreview(Device): """Stop a running preview""" self.unstage() - def plot(self): - """Plot the current image""" - image = self.image.get() - #plt.imshow(np.log10(image+1), vmin=0, vmax=5) - #plt.pause(self._throttle) + def poll(self): + """Collect streamed updates""" + self.status.set(StdDaqPreviewState.MONITORING, force=True) + t_last = time() + try: + while True: + try: + # pylint: disable=no-member + meta, data = self._socket.recv_multipart(flags=zmq.NOBLOCK) + header = json.loads(meta) + if header["type"]=="uint16": + image = np.frombuffer(data, dtype=np.uint16) + if image.size != np.prod(header['shape']): + raise ValueError(f"Unexpected array size of {image.size} for header: {header}") + image = image.reshape(header['shape']) - def plot_loop(self): - """Blocking loop to keep plotting""" - while True: - self.plot() + # Update image and update subscribers + t_curr = time() + t_elapsed = t_curr - t_last + if t_elapsed > self._throttle: + self.frame.put(header['frame'], force=True) + self.image_shape.put(header['shape'], force=True) + self.image.put(image, force=True) + self._run_subs(sub_type=self.SUB_MONITOR, value=image) + t_last=t_curr + logger.info(f"[{self.name}]\tUpdated frame {header['frame']}\tMean: {np.mean(image)}") - def proc(self, image): - """Basic image processing""" - return np.mean(image) + # Exit loop and finish monitoring + if self._stop_polling: + logger.info(f"[{self.name}]\tDetaching monitor") + break + except ValueError: + # Happens when ZMQ partially delivers the multipart message + pass + except zmq.error.Again: + # Happens when receive queue is empty + sleep(0.1) + except Exception as ex: + logger.info(f"[{self.name}]\t{str(ex)}") + raise + finally: + self._mon = None + self.status.set(StdDaqPreviewState.DETACHED, force=True) + + + +class StdDaqPreviewMixin(CustomDetectorMixin): + """Setup class for the standard DAQ preview stream + + Parent class: CustomDetectorMixin + """ + def on_stage(self): + """Start listening for preview data stream""" + self.parent.connect() + self._stop_polling = False + self._mon = Thread(target=self.poll, daemon=True) + self._mon.start() + + def on_unstage(self): + """Stop a running preview""" + self._stop_polling = True + + def on_stop(self): + """Stop a running preview""" + self.on_unstage() def poll(self): """Collect streamed updates""" - self.status.set("attached", force=True) + self.parent.status.set(StdDaqPreviewState.MONITORING, force=True) t_last = time() - while True: - try: - # pylint: disable=no-member - meta, data = self._socket.recv_multipart(flags=zmq.NOBLOCK) - header = json.loads(meta) - if header["type"]=="uint16": - image = np.frombuffer(data, dtype=np.uint16) - if image.size != np.prod(header['shape']): - self.status.set("detached", force=True) - raise ValueError(f"Unexpected array size of {image.size} for header: {header}") - image = image.reshape(header['shape']) - #print(f"Received frame {header['frame']}", file=sys.stderr) + try: + while True: + try: + # Exit loop and finish monitoring + if self._stop_polling: + break - t_curr = time() - t_elapsed = t_curr - t_last - if t_elapsed > self._throttle: - self.frame.put(header['frame'], force=True) - self.shape.put(header['shape'], force=True) - self.image.put(image, force=True) - self._run_subs(sub_type=self.SUB_MONITOR, value=image) + # pylint: disable=no-member + meta, data = self.parent._socket.recv_multipart(flags=zmq.NOBLOCK) + header = json.loads(meta) + if header["type"]=="uint16": + image = np.frombuffer(data, dtype=np.uint16) + image = image.reshape(header['shape']) - t_last=t_curr - print(f"[{self.name}]\tUpdated frame {header['frame']}\tMean: {np.mean(image)}", file=sys.stderr) + # Update image and update subscribers + t_curr = time() + t_elapsed = t_curr - t_last + if t_elapsed > self.parent.throttle.get(): + self.parent.frame.put(header['frame'], force=True) + self.parent.image_shape.put(header['shape'], force=True) + self.parent.image.put(image, force=True) + self.parent._run_subs(sub_type=self.parent.SUB_MONITOR, value=image) + t_last=t_curr + logger.info(f"[{self.parent.name}]\tUpdated frame {header['frame']}\tMean: {np.mean(image)}") + except ValueError: + # Happens when ZMQ partially delivers the multipart message + pass + except zmq.error.Again: + # Happens when receive queue is empty + sleep(0.1) + except Exception as ex: + logger.info(f"[{self.parent.name}]\t{str(ex)}") + raise + finally: + self._mon = None + self.parent.status.set(StdDaqPreviewState.DETACHED, force=True) + logger.info(f"[{self.parent.name}]\tDetaching monitor") - # Perform some basic analysis on the image - if self.process.get(): - self.value.put(self.proc(image), force=True) - print(f"Frame: {header['frame']}\tMin: {np.min(image)}\tMax: {np.max(image)}") - if self._stop_polling: - self.status.set("detached", force=True) - print("Detaching monitor") - break - except ValueError: - # Happens when ZMQ partially delivers the multipart message - pass - except zmq.error.Again: - sleep(0.1) - except Exception as ex: - print(ex) - self.status.set("detached", force=True) - raise +class StdDaqPreviewDetector(PSIDetectorBase): + """Detector wrapper class around the StdDaq preview image stream. + + This was meant to provide live image stream directly from the StdDAQ. + Note that the preview stream must be already throtled in order to cope + with the incoming data and the python class might throttle it further. + + You can add a preview widget to the dock by: + cam_widget = gui.add_dock('cam_dock1').add_widget('BECFigure').image('daq_stream1') + """ + # Subscriptions for plotting image + SUB_MONITOR = "monitor" + _default_sub = SUB_MONITOR + + custom_prepare_cls = StdDaqPreviewMixin + + # Status attributes + url = Component(Signal, kind=Kind.config) + throttle = Component(Signal, value=0.1, kind=Kind.config) + status = Component(Signal, value=StdDaqPreviewState.UNKNOWN, kind=Kind.omitted) + image = Component(Signal, kind=Kind.normal) + frame = Component(Signal, kind=Kind.hinted) + image_shape = Component(Signal, kind=Kind.omitted) + + def __init__( + self, *args, url: str = "tcp://129.129.95.38:20000", parent: Device = None, **kwargs + ) -> None: + super().__init__(*args, parent=parent, **kwargs) + self.url._metadata["write_access"] = False + self.status._metadata["write_access"] = False + self.image._metadata["write_access"] = False + self.frame._metadata["write_access"] = False + self.image_shape._metadata["write_access"] = False + self.url.set(url, force=True).wait() + + # Connect ro the DAQ + self.connect() + + def connect(self): + """Connect to te StDAQs PUB-SUB streaming interface + + StdDAQ may reject connection for a few seconds when it restarts, + so if it fails, wait a bit and try to connect again. + """ + # pylint: disable=no-member + # Socket to talk to server + context = zmq.Context() + self._socket = context.socket(zmq.SUB) + self._socket.setsockopt(zmq.SUBSCRIBE, ZMQ_TOPIC_FILTER) + try: + self._socket.connect(self.url.get()) + except ConnectionRefusedError: + sleep(1) + self._socket.connect(self.url.get()) + # Automatically connect to MicroSAXS testbench if directly invoked