DAQ preview using standard detector class

This commit is contained in:
gac-x05la
2024-07-24 10:53:47 +02:00
committed by mohacsi_i
parent 51cfae82c4
commit da823f0a40
+182 -69
View File
@@ -6,16 +6,28 @@ Created on Thu Jun 27 17:28:43 2024
@author: mohacsi_i
"""
import sys
import json
import enum
import zmq
import numpy as np
from time import sleep, time
from threading import Thread
import numpy as np
import zmq
#import matplotlib.pyplot as plt
from ophyd import Device, Signal, Component, Kind
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
CustomDetectorMixin,
PSIDetectorBase,
)
TOPIC_FILTER = ''
from bec_lib import bec_logger
logger = bec_logger.logger
ZMQ_TOPIC_FILTER = b''
class StdDaqPreviewState(enum.IntEnum):
"""Standard DAQ ophyd device states"""
UNKNOWN = 0
DETACHED = 1
MONITORING = 2
class StdDaqPreview(Device):
@@ -27,22 +39,19 @@ class StdDaqPreview(Device):
You can add a preview widget to the dock by:
cam_widget = gui.add_dock('cam_dock1').add_widget('BECFigure').image('daq_stream1')
"""
# pylint: disable=too-many-instance-attributes
# Subscriptions for plotting image
SUB_MONITOR = "monitor"
_default_sub = SUB_MONITOR
# Status attributes
url = Component(Signal, kind=Kind.config)
status = Component(Signal, value="detached", kind=Kind.omitted)
process = Component(Signal, value=True, kind=Kind.omitted)
status = Component(Signal, value=StdDaqPreviewState.UNKNOWN, kind=Kind.omitted)
image = Component(Signal, kind=Kind.normal)
frame = Component(Signal, kind=Kind.normal)
shape = Component(Signal, kind=Kind.omitted)
image_shape = Component(Signal, kind=Kind.omitted)
value = Component(Signal, kind=Kind.hinted)
_throttle = 0.05
@@ -54,10 +63,9 @@ class StdDaqPreview(Device):
self.status._metadata["write_access"] = False
self.image._metadata["write_access"] = False
self.frame._metadata["write_access"] = False
self.shape._metadata["write_access"] = False
self.image_shape._metadata["write_access"] = False
self.value._metadata["write_access"] = False
self.url.set(url, force=True).wait()
self._stream_url = url
self._stop_polling = False
self._mon = None
@@ -74,18 +82,19 @@ class StdDaqPreview(Device):
# Socket to talk to server
context = zmq.Context()
self._socket = context.socket(zmq.SUB)
self._socket.setsockopt(zmq.SUBSCRIBE, b'')
self._socket.setsockopt(zmq.SUBSCRIBE, ZMQ_TOPIC_FILTER)
try:
self._socket.connect(self._stream_url)
self._socket.connect(self.url.get())
except ConnectionRefusedError:
sleep(5)
self._socket.connect(self._stream_url)
sleep(1)
self._socket.connect(self.url.get())
def configure(self, throttle: float = 0.5) -> tuple:
"""Set the DAQ preview parameters
Note that there's not much to do except for additional throtling if the
preview data stream is too fast.
preview data stream is too fast. Perhaps later we can add some online
processing to ophyd.
Example:
----------
@@ -100,6 +109,7 @@ class StdDaqPreview(Device):
def stage(self) -> list:
"""Start listening for preview data stream"""
self.connect()
self._stop_polling = False
self._mon = Thread(target=self.poll, daemon=True)
self._mon.start()
@@ -114,67 +124,170 @@ class StdDaqPreview(Device):
"""Stop a running preview"""
self.unstage()
def plot(self):
"""Plot the current image"""
image = self.image.get()
#plt.imshow(np.log10(image+1), vmin=0, vmax=5)
#plt.pause(self._throttle)
def poll(self):
"""Collect streamed updates"""
self.status.set(StdDaqPreviewState.MONITORING, force=True)
t_last = time()
try:
while True:
try:
# pylint: disable=no-member
meta, data = self._socket.recv_multipart(flags=zmq.NOBLOCK)
header = json.loads(meta)
if header["type"]=="uint16":
image = np.frombuffer(data, dtype=np.uint16)
if image.size != np.prod(header['shape']):
raise ValueError(f"Unexpected array size of {image.size} for header: {header}")
image = image.reshape(header['shape'])
def plot_loop(self):
"""Blocking loop to keep plotting"""
while True:
self.plot()
# Update image and update subscribers
t_curr = time()
t_elapsed = t_curr - t_last
if t_elapsed > self._throttle:
self.frame.put(header['frame'], force=True)
self.image_shape.put(header['shape'], force=True)
self.image.put(image, force=True)
self._run_subs(sub_type=self.SUB_MONITOR, value=image)
t_last=t_curr
logger.info(f"[{self.name}]\tUpdated frame {header['frame']}\tMean: {np.mean(image)}")
def proc(self, image):
"""Basic image processing"""
return np.mean(image)
# Exit loop and finish monitoring
if self._stop_polling:
logger.info(f"[{self.name}]\tDetaching monitor")
break
except ValueError:
# Happens when ZMQ partially delivers the multipart message
pass
except zmq.error.Again:
# Happens when receive queue is empty
sleep(0.1)
except Exception as ex:
logger.info(f"[{self.name}]\t{str(ex)}")
raise
finally:
self._mon = None
self.status.set(StdDaqPreviewState.DETACHED, force=True)
class StdDaqPreviewMixin(CustomDetectorMixin):
"""Setup class for the standard DAQ preview stream
Parent class: CustomDetectorMixin
"""
def on_stage(self):
"""Start listening for preview data stream"""
self.parent.connect()
self._stop_polling = False
self._mon = Thread(target=self.poll, daemon=True)
self._mon.start()
def on_unstage(self):
"""Stop a running preview"""
self._stop_polling = True
def on_stop(self):
"""Stop a running preview"""
self.on_unstage()
def poll(self):
"""Collect streamed updates"""
self.status.set("attached", force=True)
self.parent.status.set(StdDaqPreviewState.MONITORING, force=True)
t_last = time()
while True:
try:
# pylint: disable=no-member
meta, data = self._socket.recv_multipart(flags=zmq.NOBLOCK)
header = json.loads(meta)
if header["type"]=="uint16":
image = np.frombuffer(data, dtype=np.uint16)
if image.size != np.prod(header['shape']):
self.status.set("detached", force=True)
raise ValueError(f"Unexpected array size of {image.size} for header: {header}")
image = image.reshape(header['shape'])
#print(f"Received frame {header['frame']}", file=sys.stderr)
try:
while True:
try:
# Exit loop and finish monitoring
if self._stop_polling:
break
t_curr = time()
t_elapsed = t_curr - t_last
if t_elapsed > self._throttle:
self.frame.put(header['frame'], force=True)
self.shape.put(header['shape'], force=True)
self.image.put(image, force=True)
self._run_subs(sub_type=self.SUB_MONITOR, value=image)
# pylint: disable=no-member
meta, data = self.parent._socket.recv_multipart(flags=zmq.NOBLOCK)
header = json.loads(meta)
if header["type"]=="uint16":
image = np.frombuffer(data, dtype=np.uint16)
image = image.reshape(header['shape'])
t_last=t_curr
print(f"[{self.name}]\tUpdated frame {header['frame']}\tMean: {np.mean(image)}", file=sys.stderr)
# Update image and update subscribers
t_curr = time()
t_elapsed = t_curr - t_last
if t_elapsed > self.parent.throttle.get():
self.parent.frame.put(header['frame'], force=True)
self.parent.image_shape.put(header['shape'], force=True)
self.parent.image.put(image, force=True)
self.parent._run_subs(sub_type=self.parent.SUB_MONITOR, value=image)
t_last=t_curr
logger.info(f"[{self.parent.name}]\tUpdated frame {header['frame']}\tMean: {np.mean(image)}")
except ValueError:
# Happens when ZMQ partially delivers the multipart message
pass
except zmq.error.Again:
# Happens when receive queue is empty
sleep(0.1)
except Exception as ex:
logger.info(f"[{self.parent.name}]\t{str(ex)}")
raise
finally:
self._mon = None
self.parent.status.set(StdDaqPreviewState.DETACHED, force=True)
logger.info(f"[{self.parent.name}]\tDetaching monitor")
# Perform some basic analysis on the image
if self.process.get():
self.value.put(self.proc(image), force=True)
print(f"Frame: {header['frame']}\tMin: {np.min(image)}\tMax: {np.max(image)}")
if self._stop_polling:
self.status.set("detached", force=True)
print("Detaching monitor")
break
except ValueError:
# Happens when ZMQ partially delivers the multipart message
pass
except zmq.error.Again:
sleep(0.1)
except Exception as ex:
print(ex)
self.status.set("detached", force=True)
raise
class StdDaqPreviewDetector(PSIDetectorBase):
"""Detector wrapper class around the StdDaq preview image stream.
This was meant to provide live image stream directly from the StdDAQ.
Note that the preview stream must be already throtled in order to cope
with the incoming data and the python class might throttle it further.
You can add a preview widget to the dock by:
cam_widget = gui.add_dock('cam_dock1').add_widget('BECFigure').image('daq_stream1')
"""
# Subscriptions for plotting image
SUB_MONITOR = "monitor"
_default_sub = SUB_MONITOR
custom_prepare_cls = StdDaqPreviewMixin
# Status attributes
url = Component(Signal, kind=Kind.config)
throttle = Component(Signal, value=0.1, kind=Kind.config)
status = Component(Signal, value=StdDaqPreviewState.UNKNOWN, kind=Kind.omitted)
image = Component(Signal, kind=Kind.normal)
frame = Component(Signal, kind=Kind.hinted)
image_shape = Component(Signal, kind=Kind.omitted)
def __init__(
self, *args, url: str = "tcp://129.129.95.38:20000", parent: Device = None, **kwargs
) -> None:
super().__init__(*args, parent=parent, **kwargs)
self.url._metadata["write_access"] = False
self.status._metadata["write_access"] = False
self.image._metadata["write_access"] = False
self.frame._metadata["write_access"] = False
self.image_shape._metadata["write_access"] = False
self.url.set(url, force=True).wait()
# Connect ro the DAQ
self.connect()
def connect(self):
"""Connect to te StDAQs PUB-SUB streaming interface
StdDAQ may reject connection for a few seconds when it restarts,
so if it fails, wait a bit and try to connect again.
"""
# pylint: disable=no-member
# Socket to talk to server
context = zmq.Context()
self._socket = context.socket(zmq.SUB)
self._socket.setsockopt(zmq.SUBSCRIBE, ZMQ_TOPIC_FILTER)
try:
self._socket.connect(self.url.get())
except ConnectionRefusedError:
sleep(1)
self._socket.connect(self.url.get())
# Automatically connect to MicroSAXS testbench if directly invoked