Dummy PCO edge consumer

This commit is contained in:
gac-x05la
2025-02-05 17:32:34 +01:00
parent 5c168a902d
commit 0dc6412ed8
2 changed files with 194 additions and 0 deletions

View File

@@ -0,0 +1,192 @@
# -*- coding: utf-8 -*-
"""
Standard DAQ preview image stream module
Created on Thu Jun 27 17:28:43 2024
@author: mohacsi_i
"""
import json
import enum
from time import sleep, time
from threading import Thread
import zmq
import numpy as np
from ophyd import Device, Signal, Component, Kind, DeviceStatus
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
CustomDetectorMixin,
PSIDetectorBase,
)
from bec_lib import bec_logger
logger = bec_logger.logger
ZMQ_TOPIC_FILTER = b''
class PcoTestConsumerMixin(CustomDetectorMixin):
"""Setup class for the standard DAQ preview stream
Parent class: CustomDetectorMixin
"""
def on_stage(self):
"""Start listening for preview data stream"""
if self.parent._mon is not None:
self.parent.unstage()
sleep(0.5)
self.parent.connect()
self._stop_polling = False
self.parent._mon = Thread(target=self.poll, daemon=True)
self.parent._mon.start()
def on_unstage(self):
"""Stop a running preview"""
if self.parent._mon is not None:
self._stop_polling = True
# Might hang on recv_multipart
self.parent._mon.join(timeout=1)
# So also disconnect the socket
self.parent.disconnect()
def on_stop(self):
"""Stop a running preview"""
self.parent.disconnect()
def poll(self):
"""Collect streamed updates"""
try:
t_last = time()
while True:
try:
# Exit loop and finish monitoring
if self._stop_polling:
logger.info(f"[{self.parent.name}]\tDetaching monitor")
break
# pylint: disable=no-member
r = self.parent._socket.recv()
# Length and throtling checks
t_curr = time()
t_elapsed = t_curr - t_last
if t_elapsed < self.parent.throttle.get():
continue
"""
# Unpack the Array V1 reply to metadata and array data
meta, data = r
print(meta)
# Update image and update subscribers
header = json.loads(meta)
if header["type"] == "uint16":
image = np.frombuffer(data, dtype=np.uint16)
if image.size != np.prod(header['shape']):
err = f"Unexpected array size of {image.size} for header: {header}"
raise ValueError(err)
image = image.reshape(header['shape'])
# Update image and update subscribers
self.parent.frame.put(header['frame'], force=True)
self.parent.image_shape.put(header['shape'], force=True)
self.parent.image.put(image, force=True)
self.parent._last_image = image
self.parent._run_subs(sub_type=self.parent.SUB_MONITOR, value=image)
"""
t_last = t_curr
#logger.info(
# f"[{self.parent.name}] Updated frame {header['frame']}\t"
# f"Shape: {header['shape']}\tMean: {np.mean(image):.3f}"
# )
except ValueError:
# Happens when ZMQ partially delivers the multipart message
pass
except zmq.error.Again:
# Happens when receive queue is empty
sleep(0.1)
except Exception as ex:
logger.info(f"[{self.parent.name}]\t{str(ex)}")
raise
finally:
try:
self.parent._socket.disconnect()
except:
pass
self.parent._mon = None
logger.info(f"[{self.parent.name}]\tDetaching monitor")
class PcoTestConsumer(PSIDetectorBase):
"""Detector wrapper class around the StdDaq preview image stream.
This was meant to provide live image stream directly from the StdDAQ.
Note that the preview stream must be already throtled in order to cope
with the incoming data and the python class might throttle it further.
You can add a preview widget to the dock by:
cam_widget = gui.add_dock('cam_dock1').add_widget('BECFigure').image('daq_stream1')
"""
# Subscriptions for plotting image
USER_ACCESS = ["get_last_image"]
SUB_MONITOR = "device_monitor_2d"
_default_sub = SUB_MONITOR
custom_prepare_cls = PcoTestConsumerMixin
# Status attributes
url = Component(Signal, kind=Kind.config)
throttle = Component(Signal, value=0.25, kind=Kind.config)
frame = Component(Signal, kind=Kind.hinted)
image_shape = Component(Signal, kind=Kind.normal)
# FIXME: The BEC client caches the read()s from the last 50 scans
image = Component(Signal, kind=Kind.omitted)
_last_image = None
_mon = None
_socket = None
def __init__(
self, *args, url: str = "tcp://129.129.95.38:20000", parent: Device = None, **kwargs
) -> None:
super().__init__(*args, parent=parent, **kwargs)
self.url._metadata["write_access"] = False
self.image._metadata["write_access"] = False
self.frame._metadata["write_access"] = False
self.image_shape._metadata["write_access"] = False
self.url.set(url, force=True).wait()
def connect(self):
"""Connect to te StDAQs PUB-SUB streaming interface
StdDAQ may reject connection for a few seconds when it restarts,
so if it fails, wait a bit and try to connect again.
"""
# pylint: disable=no-member
# Socket to talk to server
context = zmq.Context()
self._socket = context.socket(zmq.PULL)
try:
self._socket.connect(self.url.get())
except ConnectionRefusedError:
sleep(1)
self._socket.connect(self.url.get())
def disconnect(self):
"""Disconnect
"""
try:
if self._socket is not None:
self._socket.disconnect(self.url.get())
except zmq.ZMQError:
pass
finally:
self._socket = None
def get_image(self):
return self._last_image
# Automatically connect to MicroSAXS testbench if directly invoked
if __name__ == "__main__":
daq = PcoTestConsumerMixin(url="tcp://129.129.106.124:8080", name="preview")
daq.wait_for_connection()

View File

@@ -104,6 +104,8 @@ class HelgeCameraBase(PSIDeviceBase):
if STOREMODE == FIFO buffer
Continously streams out data using the buffer as a FIFO queue and SAVESTART and SAVESTOP selects a ROI of images to be streamed continously (i.e. a large SAVESTOP streams indefinitely)
Note that in FIFO mode buffer reads are destructive, to prevent this, we don't have EPICS preview
"""