BEC free consumer
This commit is contained in:
158
tomcat_bec/devices/gigafrost/pco_consumer.py
Normal file
158
tomcat_bec/devices/gigafrost/pco_consumer.py
Normal file
@@ -0,0 +1,158 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
Standard DAQ preview image stream module
|
||||
|
||||
Created on Thu Jun 27 17:28:43 2024
|
||||
|
||||
@author: mohacsi_i
|
||||
"""
|
||||
from time import sleep, time
|
||||
import threading
|
||||
import zmq
|
||||
import json
|
||||
|
||||
ZMQ_TOPIC_FILTER = b""
|
||||
|
||||
|
||||
|
||||
class PcoTestConsumer:
|
||||
"""Detector wrapper class around the StdDaq preview image stream.
|
||||
|
||||
This was meant to provide live image stream directly from the StdDAQ.
|
||||
Note that the preview stream must be already throtled in order to cope
|
||||
with the incoming data and the python class might throttle it further.
|
||||
|
||||
You can add a preview widget to the dock by:
|
||||
cam_widget = gui.add_dock('cam_dock1').add_widget('BECFigure').image('daq_stream1')
|
||||
"""
|
||||
|
||||
# Subscriptions for plotting image
|
||||
_shutdown_event = threading.Event()
|
||||
_monitor_mutex = threading.Lock()
|
||||
_monitor_thread = None
|
||||
|
||||
# Status attributes
|
||||
_url = None
|
||||
_image = None
|
||||
_frame = None
|
||||
_socket = None
|
||||
|
||||
def __init__(self, url: str = "tcp://129.129.95.38:20000") -> None:
|
||||
super().__init__()
|
||||
self._url = url
|
||||
|
||||
def connect(self):
|
||||
"""Connect to te StDAQs PUB-SUB streaming interface"""
|
||||
# Socket to talk to server
|
||||
context = zmq.Context()
|
||||
self._socket = context.socket(zmq.PULL)
|
||||
try:
|
||||
self._socket.connect(self.url)
|
||||
except ConnectionRefusedError:
|
||||
sleep(1)
|
||||
self._socket.connect(self.url)
|
||||
|
||||
def disconnect(self):
|
||||
"""Disconnect"""
|
||||
try:
|
||||
if self._socket is not None:
|
||||
self._socket.disconnect(self.url)
|
||||
except zmq.ZMQError:
|
||||
pass
|
||||
finally:
|
||||
self._socket = None
|
||||
|
||||
@property
|
||||
def url(self):
|
||||
return self._url
|
||||
|
||||
@property
|
||||
def image(self):
|
||||
return self._image
|
||||
|
||||
@property
|
||||
def frame(self):
|
||||
return self._frame
|
||||
|
||||
# pylint: disable=protected-access
|
||||
def start(self):
|
||||
"""Start listening for preview data stream"""
|
||||
if self._monitor_mutex.locked():
|
||||
raise RuntimeError("Only one consumer permitted")
|
||||
|
||||
self.connect()
|
||||
self._mon = threading.Thread(target=self.poll, daemon=True)
|
||||
self._mon.start()
|
||||
|
||||
def stop(self):
|
||||
"""Stop a running preview"""
|
||||
self._shutdown_event.set()
|
||||
if self._mon is not None:
|
||||
self._stop_polling = True
|
||||
# Might hang on recv_multipart
|
||||
self._mon.join(timeout=1)
|
||||
# So also disconnect the socket
|
||||
self.disconnect()
|
||||
self._shutdown_event.clear()
|
||||
|
||||
def poll(self):
|
||||
"""Collect streamed updates"""
|
||||
try:
|
||||
t_last = time()
|
||||
print("Starting monitor")
|
||||
with self._monitor_mutex:
|
||||
while not self._shutdown_event.is_set():
|
||||
try:
|
||||
# pylint: disable=no-member
|
||||
r = self._socket.recv_multipart(flags=zmq.NOBLOCK)
|
||||
|
||||
# Length and throtling checks
|
||||
t_curr = time()
|
||||
t_elapsed = t_curr - t_last
|
||||
if t_elapsed < self.parent.throttle.get():
|
||||
continue
|
||||
# # Unpack the Array V1 reply to metadata and array data
|
||||
meta, data = r
|
||||
|
||||
# Update image and update subscribers
|
||||
header = json.loads(meta)
|
||||
self.header = header
|
||||
# if header["type"] == "uint16":
|
||||
# image = np.frombuffer(data, dtype=np.uint16)
|
||||
# if image.size != np.prod(header['shape']):
|
||||
# err = f"Unexpected array size of {image.size} for header: {header}"
|
||||
# raise ValueError(err)
|
||||
# image = image.reshape(header['shape'])
|
||||
|
||||
# # Update image and update subscribers
|
||||
# self._frame = header['frame']
|
||||
# self._image = image
|
||||
t_last = t_curr
|
||||
# print(
|
||||
# f"[{self.name}] Updated frame {header['frame']}\t"
|
||||
# f"Shape: {header['shape']}\tMean: {np.mean(image):.3f}"
|
||||
# )
|
||||
except ValueError:
|
||||
# Happens when ZMQ partially delivers the multipart message
|
||||
pass
|
||||
except zmq.error.Again:
|
||||
# Happens when receive queue is empty
|
||||
sleep(0.1)
|
||||
except Exception as ex:
|
||||
print(f"{str(ex)}")
|
||||
raise
|
||||
finally:
|
||||
try:
|
||||
self._socket.disconnect(self.url)
|
||||
except RuntimeError:
|
||||
pass
|
||||
self._monitor_thread = None
|
||||
print(f"Detaching monitor")
|
||||
|
||||
|
||||
# Automatically connect to MicroSAXS testbench if directly invoked
|
||||
if __name__ == "__main__":
|
||||
daq = PcoTestConsumer(url="tcp://10.4.0.82:8080")
|
||||
daq.start()
|
||||
sleep(500)
|
||||
daq.stop()
|
||||
Reference in New Issue
Block a user