From 84bc0d692f7080d75843fabbafda990adcbd264a Mon Sep 17 00:00:00 2001 From: gac-x05la Date: Mon, 31 Mar 2025 12:24:10 +0200 Subject: [PATCH] BEC free consumer --- tomcat_bec/devices/gigafrost/pco_consumer.py | 158 +++++++++++++++++++ 1 file changed, 158 insertions(+) create mode 100644 tomcat_bec/devices/gigafrost/pco_consumer.py diff --git a/tomcat_bec/devices/gigafrost/pco_consumer.py b/tomcat_bec/devices/gigafrost/pco_consumer.py new file mode 100644 index 0000000..00f9c1f --- /dev/null +++ b/tomcat_bec/devices/gigafrost/pco_consumer.py @@ -0,0 +1,158 @@ +# -*- coding: utf-8 -*- +""" +Standard DAQ preview image stream module + +Created on Thu Jun 27 17:28:43 2024 + +@author: mohacsi_i +""" +from time import sleep, time +import threading +import zmq +import json + +ZMQ_TOPIC_FILTER = b"" + + + +class PcoTestConsumer: + """Detector wrapper class around the StdDaq preview image stream. + + This was meant to provide live image stream directly from the StdDAQ. + Note that the preview stream must be already throtled in order to cope + with the incoming data and the python class might throttle it further. + + You can add a preview widget to the dock by: + cam_widget = gui.add_dock('cam_dock1').add_widget('BECFigure').image('daq_stream1') + """ + + # Subscriptions for plotting image + _shutdown_event = threading.Event() + _monitor_mutex = threading.Lock() + _monitor_thread = None + + # Status attributes + _url = None + _image = None + _frame = None + _socket = None + + def __init__(self, url: str = "tcp://129.129.95.38:20000") -> None: + super().__init__() + self._url = url + + def connect(self): + """Connect to te StDAQs PUB-SUB streaming interface""" + # Socket to talk to server + context = zmq.Context() + self._socket = context.socket(zmq.PULL) + try: + self._socket.connect(self.url) + except ConnectionRefusedError: + sleep(1) + self._socket.connect(self.url) + + def disconnect(self): + """Disconnect""" + try: + if self._socket is not None: + self._socket.disconnect(self.url) + except zmq.ZMQError: + pass + finally: + self._socket = None + + @property + def url(self): + return self._url + + @property + def image(self): + return self._image + + @property + def frame(self): + return self._frame + + # pylint: disable=protected-access + def start(self): + """Start listening for preview data stream""" + if self._monitor_mutex.locked(): + raise RuntimeError("Only one consumer permitted") + + self.connect() + self._mon = threading.Thread(target=self.poll, daemon=True) + self._mon.start() + + def stop(self): + """Stop a running preview""" + self._shutdown_event.set() + if self._mon is not None: + self._stop_polling = True + # Might hang on recv_multipart + self._mon.join(timeout=1) + # So also disconnect the socket + self.disconnect() + self._shutdown_event.clear() + + def poll(self): + """Collect streamed updates""" + try: + t_last = time() + print("Starting monitor") + with self._monitor_mutex: + while not self._shutdown_event.is_set(): + try: + # pylint: disable=no-member + r = self._socket.recv_multipart(flags=zmq.NOBLOCK) + + # Length and throtling checks + t_curr = time() + t_elapsed = t_curr - t_last + if t_elapsed < self.parent.throttle.get(): + continue + # # Unpack the Array V1 reply to metadata and array data + meta, data = r + + # Update image and update subscribers + header = json.loads(meta) + self.header = header + # if header["type"] == "uint16": + # image = np.frombuffer(data, dtype=np.uint16) + # if image.size != np.prod(header['shape']): + # err = f"Unexpected array size of {image.size} for header: {header}" + # raise ValueError(err) + # image = image.reshape(header['shape']) + + # # Update image and update subscribers + # self._frame = header['frame'] + # self._image = image + t_last = t_curr + # print( + # f"[{self.name}] Updated frame {header['frame']}\t" + # f"Shape: {header['shape']}\tMean: {np.mean(image):.3f}" + # ) + except ValueError: + # Happens when ZMQ partially delivers the multipart message + pass + except zmq.error.Again: + # Happens when receive queue is empty + sleep(0.1) + except Exception as ex: + print(f"{str(ex)}") + raise + finally: + try: + self._socket.disconnect(self.url) + except RuntimeError: + pass + self._monitor_thread = None + print(f"Detaching monitor") + + +# Automatically connect to MicroSAXS testbench if directly invoked +if __name__ == "__main__": + daq = PcoTestConsumer(url="tcp://10.4.0.82:8080") + daq.start() + sleep(500) + daq.stop()