Dummy PCO edge consumer
This commit is contained in:
192
tomcat_bec/devices/gigafrost/pco_datasink.py
Normal file
192
tomcat_bec/devices/gigafrost/pco_datasink.py
Normal file
@@ -0,0 +1,192 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
"""
|
||||||
|
Standard DAQ preview image stream module
|
||||||
|
|
||||||
|
Created on Thu Jun 27 17:28:43 2024
|
||||||
|
|
||||||
|
@author: mohacsi_i
|
||||||
|
"""
|
||||||
|
import json
|
||||||
|
import enum
|
||||||
|
from time import sleep, time
|
||||||
|
from threading import Thread
|
||||||
|
import zmq
|
||||||
|
import numpy as np
|
||||||
|
from ophyd import Device, Signal, Component, Kind, DeviceStatus
|
||||||
|
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
|
||||||
|
CustomDetectorMixin,
|
||||||
|
PSIDetectorBase,
|
||||||
|
)
|
||||||
|
|
||||||
|
from bec_lib import bec_logger
|
||||||
|
logger = bec_logger.logger
|
||||||
|
ZMQ_TOPIC_FILTER = b''
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class PcoTestConsumerMixin(CustomDetectorMixin):
|
||||||
|
"""Setup class for the standard DAQ preview stream
|
||||||
|
|
||||||
|
Parent class: CustomDetectorMixin
|
||||||
|
"""
|
||||||
|
def on_stage(self):
|
||||||
|
"""Start listening for preview data stream"""
|
||||||
|
if self.parent._mon is not None:
|
||||||
|
self.parent.unstage()
|
||||||
|
sleep(0.5)
|
||||||
|
|
||||||
|
self.parent.connect()
|
||||||
|
self._stop_polling = False
|
||||||
|
self.parent._mon = Thread(target=self.poll, daemon=True)
|
||||||
|
self.parent._mon.start()
|
||||||
|
|
||||||
|
def on_unstage(self):
|
||||||
|
"""Stop a running preview"""
|
||||||
|
if self.parent._mon is not None:
|
||||||
|
self._stop_polling = True
|
||||||
|
# Might hang on recv_multipart
|
||||||
|
self.parent._mon.join(timeout=1)
|
||||||
|
# So also disconnect the socket
|
||||||
|
self.parent.disconnect()
|
||||||
|
|
||||||
|
def on_stop(self):
|
||||||
|
"""Stop a running preview"""
|
||||||
|
self.parent.disconnect()
|
||||||
|
|
||||||
|
def poll(self):
|
||||||
|
"""Collect streamed updates"""
|
||||||
|
try:
|
||||||
|
t_last = time()
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
# Exit loop and finish monitoring
|
||||||
|
if self._stop_polling:
|
||||||
|
logger.info(f"[{self.parent.name}]\tDetaching monitor")
|
||||||
|
break
|
||||||
|
|
||||||
|
# pylint: disable=no-member
|
||||||
|
r = self.parent._socket.recv()
|
||||||
|
|
||||||
|
# Length and throtling checks
|
||||||
|
t_curr = time()
|
||||||
|
t_elapsed = t_curr - t_last
|
||||||
|
if t_elapsed < self.parent.throttle.get():
|
||||||
|
continue
|
||||||
|
"""
|
||||||
|
# Unpack the Array V1 reply to metadata and array data
|
||||||
|
meta, data = r
|
||||||
|
print(meta)
|
||||||
|
|
||||||
|
# Update image and update subscribers
|
||||||
|
header = json.loads(meta)
|
||||||
|
if header["type"] == "uint16":
|
||||||
|
image = np.frombuffer(data, dtype=np.uint16)
|
||||||
|
if image.size != np.prod(header['shape']):
|
||||||
|
err = f"Unexpected array size of {image.size} for header: {header}"
|
||||||
|
raise ValueError(err)
|
||||||
|
image = image.reshape(header['shape'])
|
||||||
|
|
||||||
|
# Update image and update subscribers
|
||||||
|
self.parent.frame.put(header['frame'], force=True)
|
||||||
|
self.parent.image_shape.put(header['shape'], force=True)
|
||||||
|
self.parent.image.put(image, force=True)
|
||||||
|
self.parent._last_image = image
|
||||||
|
self.parent._run_subs(sub_type=self.parent.SUB_MONITOR, value=image)
|
||||||
|
"""
|
||||||
|
t_last = t_curr
|
||||||
|
#logger.info(
|
||||||
|
# f"[{self.parent.name}] Updated frame {header['frame']}\t"
|
||||||
|
# f"Shape: {header['shape']}\tMean: {np.mean(image):.3f}"
|
||||||
|
# )
|
||||||
|
except ValueError:
|
||||||
|
# Happens when ZMQ partially delivers the multipart message
|
||||||
|
pass
|
||||||
|
except zmq.error.Again:
|
||||||
|
# Happens when receive queue is empty
|
||||||
|
sleep(0.1)
|
||||||
|
except Exception as ex:
|
||||||
|
logger.info(f"[{self.parent.name}]\t{str(ex)}")
|
||||||
|
raise
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
self.parent._socket.disconnect()
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
self.parent._mon = None
|
||||||
|
logger.info(f"[{self.parent.name}]\tDetaching monitor")
|
||||||
|
|
||||||
|
|
||||||
|
class PcoTestConsumer(PSIDetectorBase):
|
||||||
|
"""Detector wrapper class around the StdDaq preview image stream.
|
||||||
|
|
||||||
|
This was meant to provide live image stream directly from the StdDAQ.
|
||||||
|
Note that the preview stream must be already throtled in order to cope
|
||||||
|
with the incoming data and the python class might throttle it further.
|
||||||
|
|
||||||
|
You can add a preview widget to the dock by:
|
||||||
|
cam_widget = gui.add_dock('cam_dock1').add_widget('BECFigure').image('daq_stream1')
|
||||||
|
"""
|
||||||
|
# Subscriptions for plotting image
|
||||||
|
USER_ACCESS = ["get_last_image"]
|
||||||
|
SUB_MONITOR = "device_monitor_2d"
|
||||||
|
_default_sub = SUB_MONITOR
|
||||||
|
|
||||||
|
custom_prepare_cls = PcoTestConsumerMixin
|
||||||
|
|
||||||
|
# Status attributes
|
||||||
|
url = Component(Signal, kind=Kind.config)
|
||||||
|
throttle = Component(Signal, value=0.25, kind=Kind.config)
|
||||||
|
frame = Component(Signal, kind=Kind.hinted)
|
||||||
|
image_shape = Component(Signal, kind=Kind.normal)
|
||||||
|
# FIXME: The BEC client caches the read()s from the last 50 scans
|
||||||
|
image = Component(Signal, kind=Kind.omitted)
|
||||||
|
_last_image = None
|
||||||
|
_mon = None
|
||||||
|
_socket = None
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self, *args, url: str = "tcp://129.129.95.38:20000", parent: Device = None, **kwargs
|
||||||
|
) -> None:
|
||||||
|
super().__init__(*args, parent=parent, **kwargs)
|
||||||
|
self.url._metadata["write_access"] = False
|
||||||
|
self.image._metadata["write_access"] = False
|
||||||
|
self.frame._metadata["write_access"] = False
|
||||||
|
self.image_shape._metadata["write_access"] = False
|
||||||
|
self.url.set(url, force=True).wait()
|
||||||
|
|
||||||
|
def connect(self):
|
||||||
|
"""Connect to te StDAQs PUB-SUB streaming interface
|
||||||
|
|
||||||
|
StdDAQ may reject connection for a few seconds when it restarts,
|
||||||
|
so if it fails, wait a bit and try to connect again.
|
||||||
|
"""
|
||||||
|
# pylint: disable=no-member
|
||||||
|
# Socket to talk to server
|
||||||
|
context = zmq.Context()
|
||||||
|
self._socket = context.socket(zmq.PULL)
|
||||||
|
try:
|
||||||
|
self._socket.connect(self.url.get())
|
||||||
|
except ConnectionRefusedError:
|
||||||
|
sleep(1)
|
||||||
|
self._socket.connect(self.url.get())
|
||||||
|
|
||||||
|
def disconnect(self):
|
||||||
|
"""Disconnect
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
if self._socket is not None:
|
||||||
|
self._socket.disconnect(self.url.get())
|
||||||
|
except zmq.ZMQError:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
self._socket = None
|
||||||
|
|
||||||
|
|
||||||
|
def get_image(self):
|
||||||
|
return self._last_image
|
||||||
|
|
||||||
|
|
||||||
|
# Automatically connect to MicroSAXS testbench if directly invoked
|
||||||
|
if __name__ == "__main__":
|
||||||
|
daq = PcoTestConsumerMixin(url="tcp://129.129.106.124:8080", name="preview")
|
||||||
|
daq.wait_for_connection()
|
||||||
@@ -104,6 +104,8 @@ class HelgeCameraBase(PSIDeviceBase):
|
|||||||
if STOREMODE == FIFO buffer
|
if STOREMODE == FIFO buffer
|
||||||
Continously streams out data using the buffer as a FIFO queue and SAVESTART and SAVESTOP selects a ROI of images to be streamed continously (i.e. a large SAVESTOP streams indefinitely)
|
Continously streams out data using the buffer as a FIFO queue and SAVESTART and SAVESTOP selects a ROI of images to be streamed continously (i.e. a large SAVESTOP streams indefinitely)
|
||||||
|
|
||||||
|
Note that in FIFO mode buffer reads are destructive, to prevent this, we don't have EPICS preview
|
||||||
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user