BEC free consumer
This commit is contained in:
158
tomcat_bec/devices/gigafrost/pco_consumer.py
Normal file
158
tomcat_bec/devices/gigafrost/pco_consumer.py
Normal file
@@ -0,0 +1,158 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
"""
|
||||||
|
Standard DAQ preview image stream module
|
||||||
|
|
||||||
|
Created on Thu Jun 27 17:28:43 2024
|
||||||
|
|
||||||
|
@author: mohacsi_i
|
||||||
|
"""
|
||||||
|
from time import sleep, time
|
||||||
|
import threading
|
||||||
|
import zmq
|
||||||
|
import json
|
||||||
|
|
||||||
|
ZMQ_TOPIC_FILTER = b""
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class PcoTestConsumer:
|
||||||
|
"""Detector wrapper class around the StdDaq preview image stream.
|
||||||
|
|
||||||
|
This was meant to provide live image stream directly from the StdDAQ.
|
||||||
|
Note that the preview stream must be already throtled in order to cope
|
||||||
|
with the incoming data and the python class might throttle it further.
|
||||||
|
|
||||||
|
You can add a preview widget to the dock by:
|
||||||
|
cam_widget = gui.add_dock('cam_dock1').add_widget('BECFigure').image('daq_stream1')
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Subscriptions for plotting image
|
||||||
|
_shutdown_event = threading.Event()
|
||||||
|
_monitor_mutex = threading.Lock()
|
||||||
|
_monitor_thread = None
|
||||||
|
|
||||||
|
# Status attributes
|
||||||
|
_url = None
|
||||||
|
_image = None
|
||||||
|
_frame = None
|
||||||
|
_socket = None
|
||||||
|
|
||||||
|
def __init__(self, url: str = "tcp://129.129.95.38:20000") -> None:
|
||||||
|
super().__init__()
|
||||||
|
self._url = url
|
||||||
|
|
||||||
|
def connect(self):
|
||||||
|
"""Connect to te StDAQs PUB-SUB streaming interface"""
|
||||||
|
# Socket to talk to server
|
||||||
|
context = zmq.Context()
|
||||||
|
self._socket = context.socket(zmq.PULL)
|
||||||
|
try:
|
||||||
|
self._socket.connect(self.url)
|
||||||
|
except ConnectionRefusedError:
|
||||||
|
sleep(1)
|
||||||
|
self._socket.connect(self.url)
|
||||||
|
|
||||||
|
def disconnect(self):
|
||||||
|
"""Disconnect"""
|
||||||
|
try:
|
||||||
|
if self._socket is not None:
|
||||||
|
self._socket.disconnect(self.url)
|
||||||
|
except zmq.ZMQError:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
self._socket = None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def url(self):
|
||||||
|
return self._url
|
||||||
|
|
||||||
|
@property
|
||||||
|
def image(self):
|
||||||
|
return self._image
|
||||||
|
|
||||||
|
@property
|
||||||
|
def frame(self):
|
||||||
|
return self._frame
|
||||||
|
|
||||||
|
# pylint: disable=protected-access
|
||||||
|
def start(self):
|
||||||
|
"""Start listening for preview data stream"""
|
||||||
|
if self._monitor_mutex.locked():
|
||||||
|
raise RuntimeError("Only one consumer permitted")
|
||||||
|
|
||||||
|
self.connect()
|
||||||
|
self._mon = threading.Thread(target=self.poll, daemon=True)
|
||||||
|
self._mon.start()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
"""Stop a running preview"""
|
||||||
|
self._shutdown_event.set()
|
||||||
|
if self._mon is not None:
|
||||||
|
self._stop_polling = True
|
||||||
|
# Might hang on recv_multipart
|
||||||
|
self._mon.join(timeout=1)
|
||||||
|
# So also disconnect the socket
|
||||||
|
self.disconnect()
|
||||||
|
self._shutdown_event.clear()
|
||||||
|
|
||||||
|
def poll(self):
|
||||||
|
"""Collect streamed updates"""
|
||||||
|
try:
|
||||||
|
t_last = time()
|
||||||
|
print("Starting monitor")
|
||||||
|
with self._monitor_mutex:
|
||||||
|
while not self._shutdown_event.is_set():
|
||||||
|
try:
|
||||||
|
# pylint: disable=no-member
|
||||||
|
r = self._socket.recv_multipart(flags=zmq.NOBLOCK)
|
||||||
|
|
||||||
|
# Length and throtling checks
|
||||||
|
t_curr = time()
|
||||||
|
t_elapsed = t_curr - t_last
|
||||||
|
if t_elapsed < self.parent.throttle.get():
|
||||||
|
continue
|
||||||
|
# # Unpack the Array V1 reply to metadata and array data
|
||||||
|
meta, data = r
|
||||||
|
|
||||||
|
# Update image and update subscribers
|
||||||
|
header = json.loads(meta)
|
||||||
|
self.header = header
|
||||||
|
# if header["type"] == "uint16":
|
||||||
|
# image = np.frombuffer(data, dtype=np.uint16)
|
||||||
|
# if image.size != np.prod(header['shape']):
|
||||||
|
# err = f"Unexpected array size of {image.size} for header: {header}"
|
||||||
|
# raise ValueError(err)
|
||||||
|
# image = image.reshape(header['shape'])
|
||||||
|
|
||||||
|
# # Update image and update subscribers
|
||||||
|
# self._frame = header['frame']
|
||||||
|
# self._image = image
|
||||||
|
t_last = t_curr
|
||||||
|
# print(
|
||||||
|
# f"[{self.name}] Updated frame {header['frame']}\t"
|
||||||
|
# f"Shape: {header['shape']}\tMean: {np.mean(image):.3f}"
|
||||||
|
# )
|
||||||
|
except ValueError:
|
||||||
|
# Happens when ZMQ partially delivers the multipart message
|
||||||
|
pass
|
||||||
|
except zmq.error.Again:
|
||||||
|
# Happens when receive queue is empty
|
||||||
|
sleep(0.1)
|
||||||
|
except Exception as ex:
|
||||||
|
print(f"{str(ex)}")
|
||||||
|
raise
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
self._socket.disconnect(self.url)
|
||||||
|
except RuntimeError:
|
||||||
|
pass
|
||||||
|
self._monitor_thread = None
|
||||||
|
print(f"Detaching monitor")
|
||||||
|
|
||||||
|
|
||||||
|
# Automatically connect to MicroSAXS testbench if directly invoked
|
||||||
|
if __name__ == "__main__":
|
||||||
|
daq = PcoTestConsumer(url="tcp://10.4.0.82:8080")
|
||||||
|
daq.start()
|
||||||
|
sleep(500)
|
||||||
|
daq.stop()
|
||||||
Reference in New Issue
Block a user