Dummy PCO edge consumer
This commit is contained in:
192
tomcat_bec/devices/gigafrost/pco_datasink.py
Normal file
192
tomcat_bec/devices/gigafrost/pco_datasink.py
Normal file
@@ -0,0 +1,192 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
Standard DAQ preview image stream module
|
||||
|
||||
Created on Thu Jun 27 17:28:43 2024
|
||||
|
||||
@author: mohacsi_i
|
||||
"""
|
||||
import json
|
||||
import enum
|
||||
from time import sleep, time
|
||||
from threading import Thread
|
||||
import zmq
|
||||
import numpy as np
|
||||
from ophyd import Device, Signal, Component, Kind, DeviceStatus
|
||||
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
|
||||
CustomDetectorMixin,
|
||||
PSIDetectorBase,
|
||||
)
|
||||
|
||||
from bec_lib import bec_logger
|
||||
logger = bec_logger.logger
|
||||
ZMQ_TOPIC_FILTER = b''
|
||||
|
||||
|
||||
|
||||
class PcoTestConsumerMixin(CustomDetectorMixin):
|
||||
"""Setup class for the standard DAQ preview stream
|
||||
|
||||
Parent class: CustomDetectorMixin
|
||||
"""
|
||||
def on_stage(self):
|
||||
"""Start listening for preview data stream"""
|
||||
if self.parent._mon is not None:
|
||||
self.parent.unstage()
|
||||
sleep(0.5)
|
||||
|
||||
self.parent.connect()
|
||||
self._stop_polling = False
|
||||
self.parent._mon = Thread(target=self.poll, daemon=True)
|
||||
self.parent._mon.start()
|
||||
|
||||
def on_unstage(self):
|
||||
"""Stop a running preview"""
|
||||
if self.parent._mon is not None:
|
||||
self._stop_polling = True
|
||||
# Might hang on recv_multipart
|
||||
self.parent._mon.join(timeout=1)
|
||||
# So also disconnect the socket
|
||||
self.parent.disconnect()
|
||||
|
||||
def on_stop(self):
|
||||
"""Stop a running preview"""
|
||||
self.parent.disconnect()
|
||||
|
||||
def poll(self):
|
||||
"""Collect streamed updates"""
|
||||
try:
|
||||
t_last = time()
|
||||
while True:
|
||||
try:
|
||||
# Exit loop and finish monitoring
|
||||
if self._stop_polling:
|
||||
logger.info(f"[{self.parent.name}]\tDetaching monitor")
|
||||
break
|
||||
|
||||
# pylint: disable=no-member
|
||||
r = self.parent._socket.recv()
|
||||
|
||||
# Length and throtling checks
|
||||
t_curr = time()
|
||||
t_elapsed = t_curr - t_last
|
||||
if t_elapsed < self.parent.throttle.get():
|
||||
continue
|
||||
"""
|
||||
# Unpack the Array V1 reply to metadata and array data
|
||||
meta, data = r
|
||||
print(meta)
|
||||
|
||||
# Update image and update subscribers
|
||||
header = json.loads(meta)
|
||||
if header["type"] == "uint16":
|
||||
image = np.frombuffer(data, dtype=np.uint16)
|
||||
if image.size != np.prod(header['shape']):
|
||||
err = f"Unexpected array size of {image.size} for header: {header}"
|
||||
raise ValueError(err)
|
||||
image = image.reshape(header['shape'])
|
||||
|
||||
# Update image and update subscribers
|
||||
self.parent.frame.put(header['frame'], force=True)
|
||||
self.parent.image_shape.put(header['shape'], force=True)
|
||||
self.parent.image.put(image, force=True)
|
||||
self.parent._last_image = image
|
||||
self.parent._run_subs(sub_type=self.parent.SUB_MONITOR, value=image)
|
||||
"""
|
||||
t_last = t_curr
|
||||
#logger.info(
|
||||
# f"[{self.parent.name}] Updated frame {header['frame']}\t"
|
||||
# f"Shape: {header['shape']}\tMean: {np.mean(image):.3f}"
|
||||
# )
|
||||
except ValueError:
|
||||
# Happens when ZMQ partially delivers the multipart message
|
||||
pass
|
||||
except zmq.error.Again:
|
||||
# Happens when receive queue is empty
|
||||
sleep(0.1)
|
||||
except Exception as ex:
|
||||
logger.info(f"[{self.parent.name}]\t{str(ex)}")
|
||||
raise
|
||||
finally:
|
||||
try:
|
||||
self.parent._socket.disconnect()
|
||||
except:
|
||||
pass
|
||||
self.parent._mon = None
|
||||
logger.info(f"[{self.parent.name}]\tDetaching monitor")
|
||||
|
||||
|
||||
class PcoTestConsumer(PSIDetectorBase):
|
||||
"""Detector wrapper class around the StdDaq preview image stream.
|
||||
|
||||
This was meant to provide live image stream directly from the StdDAQ.
|
||||
Note that the preview stream must be already throtled in order to cope
|
||||
with the incoming data and the python class might throttle it further.
|
||||
|
||||
You can add a preview widget to the dock by:
|
||||
cam_widget = gui.add_dock('cam_dock1').add_widget('BECFigure').image('daq_stream1')
|
||||
"""
|
||||
# Subscriptions for plotting image
|
||||
USER_ACCESS = ["get_last_image"]
|
||||
SUB_MONITOR = "device_monitor_2d"
|
||||
_default_sub = SUB_MONITOR
|
||||
|
||||
custom_prepare_cls = PcoTestConsumerMixin
|
||||
|
||||
# Status attributes
|
||||
url = Component(Signal, kind=Kind.config)
|
||||
throttle = Component(Signal, value=0.25, kind=Kind.config)
|
||||
frame = Component(Signal, kind=Kind.hinted)
|
||||
image_shape = Component(Signal, kind=Kind.normal)
|
||||
# FIXME: The BEC client caches the read()s from the last 50 scans
|
||||
image = Component(Signal, kind=Kind.omitted)
|
||||
_last_image = None
|
||||
_mon = None
|
||||
_socket = None
|
||||
|
||||
def __init__(
|
||||
self, *args, url: str = "tcp://129.129.95.38:20000", parent: Device = None, **kwargs
|
||||
) -> None:
|
||||
super().__init__(*args, parent=parent, **kwargs)
|
||||
self.url._metadata["write_access"] = False
|
||||
self.image._metadata["write_access"] = False
|
||||
self.frame._metadata["write_access"] = False
|
||||
self.image_shape._metadata["write_access"] = False
|
||||
self.url.set(url, force=True).wait()
|
||||
|
||||
def connect(self):
|
||||
"""Connect to te StDAQs PUB-SUB streaming interface
|
||||
|
||||
StdDAQ may reject connection for a few seconds when it restarts,
|
||||
so if it fails, wait a bit and try to connect again.
|
||||
"""
|
||||
# pylint: disable=no-member
|
||||
# Socket to talk to server
|
||||
context = zmq.Context()
|
||||
self._socket = context.socket(zmq.PULL)
|
||||
try:
|
||||
self._socket.connect(self.url.get())
|
||||
except ConnectionRefusedError:
|
||||
sleep(1)
|
||||
self._socket.connect(self.url.get())
|
||||
|
||||
def disconnect(self):
|
||||
"""Disconnect
|
||||
"""
|
||||
try:
|
||||
if self._socket is not None:
|
||||
self._socket.disconnect(self.url.get())
|
||||
except zmq.ZMQError:
|
||||
pass
|
||||
finally:
|
||||
self._socket = None
|
||||
|
||||
|
||||
def get_image(self):
|
||||
return self._last_image
|
||||
|
||||
|
||||
# Automatically connect to MicroSAXS testbench if directly invoked
|
||||
if __name__ == "__main__":
|
||||
daq = PcoTestConsumerMixin(url="tcp://129.129.106.124:8080", name="preview")
|
||||
daq.wait_for_connection()
|
||||
@@ -104,6 +104,8 @@ class HelgeCameraBase(PSIDeviceBase):
|
||||
if STOREMODE == FIFO buffer
|
||||
Continously streams out data using the buffer as a FIFO queue and SAVESTART and SAVESTOP selects a ROI of images to be streamed continously (i.e. a large SAVESTOP streams indefinitely)
|
||||
|
||||
Note that in FIFO mode buffer reads are destructive, to prevent this, we don't have EPICS preview
|
||||
|
||||
|
||||
"""
|
||||
|
||||
|
||||
Reference in New Issue
Block a user