Preview data stream

This commit is contained in:
gac-x05la
2024-07-18 10:40:28 +02:00
committed by mohacsi_i
parent 15f08b8bf3
commit 0e020f326d
3 changed files with 263 additions and 3 deletions

View File

@@ -120,7 +120,7 @@ daq:
softwareTrigger: false
daq_stream0:
description: Standard DAQ controls
description: Standard DAQ preview stream 2 frames every 1000
deviceClass: tomcat_bec.devices.gigafrost.stddaq_preview.StdDaqPreview
deviceConfig:
url: 'tcp://129.129.95.38:20000'
@@ -133,7 +133,7 @@ daq_stream0:
softwareTrigger: false
daq_stream1:
description: Standard DAQ controls
description: Standard DAQ preview stream 4 frames at 10 Hz
deviceClass: tomcat_bec.devices.gigafrost.stddaq_preview.StdDaqPreview
deviceConfig:
url: 'tcp://129.129.95.38:20001'

View File

@@ -24,9 +24,18 @@ class StdDaqPreview(Device):
This was meant to provide live image preview directly from the StdDAQ.
Note that the preview stream must be heavily throtled in order to cope
with the incoming data.
You can add a preview widget to the dock by:
cam_widget = gui.add_dock('cam_dock1').add_widget('BECFigure').image('daq_stream1')
"""
# pylint: disable=too-many-instance-attributes
SUB_MONITOR = "monitor"
_default_sub = SUB_MONITOR
# Status attributes
url = Component(Signal, kind=Kind.config)
status = Component(Signal, value="detached", kind=Kind.omitted)
@@ -143,8 +152,10 @@ class StdDaqPreview(Device):
self.frame.put(header['frame'], force=True)
self.shape.put(header['shape'], force=True)
self.image.put(image, force=True)
self._run_subs(sub_type=self.SUB_MONITOR, value=image)
t_last=t_curr
print(f"[DPREV] Updated frame {header['frame']}\tMean: {np.mean(image)}", file=sys.stderr)
print(f"[{self.name}]\tUpdated frame {header['frame']}\tMean: {np.mean(image)}", file=sys.stderr)
# Perform some basic analysis on the image
if self.process.get():

View File

@@ -0,0 +1,249 @@
# -*- coding: utf-8 -*-
"""
Standard DAQ class module
Created on Thu Jun 27 17:28:43 2024
@author: mohacsi_i
"""
import sys
import json
from time import sleep
from threading import Thread
from ophyd import Device, Signal, Component, Kind
from websockets.sync.client import connect
from websockets.exceptions import ConnectionClosedOK, ConnectionClosedError
class StdDaqRestClient(Device):
"""Wrapper class around the new StdDaq REST interface.
This was meant to replace the websocket inteface that replaced the
documented python client. We can finally read configuration with urllib3:
'''
import urllib3 as url3
r = url3.request(method='GET', url='http://xbl-daq-29:5000/api/config/get', fields={'config_file': "/etc/std_daq/configs/gf1.json", 'user':"ioc"})
'''
"""
# pylint: disable=too-many-instance-attributes
# Status attributes
status = Component(Signal, value="unknown", kind=Kind.hinted)
n_images = Component(Signal, value=10000, kind=Kind.config)
file_path = Component(Signal, value="/gpfs/test/test-beamline", kind=Kind.config)
cfg_detector_name = Component(Signal, kind=Kind.config)
cfg_detector_type = Component(Signal, kind=Kind.config)
cfg_num_modules = Component(Signal, kind=Kind.config)
cfg_bit_depth = Component(Signal, kind=Kind.config)
cfg_image_pixel_height = Component(Signal, kind=Kind.config)
cfg_image_pixel_width = Component(Signal, kind=Kind.config)
cfg_start_udp_port = Component(Signal, kind=Kind.config)
cfg_writer_user_id = Component(Signal, kind=Kind.config)
cfg_submodule_info = Component(Signal, kind=Kind.config)
cfg_max_number_of_forwarders_spawned = Component(Signal, kind=Kind.config)
cfg_use_all_forwarders = Component(Signal, kind=Kind.config)
cfg_module_sync_queue_size = Component(Signal, kind=Kind.config)
cfg_module_positions = Component(Signal, kind=Kind.config)
def __init__(
self, *args, url: str = "http://xbl-daq-29:5000", parent: Device = None, **kwargs
) -> None:
super().__init__(*args, parent=parent, **kwargs)
self.status._metadata["write_access"] = False
self._url = url
self._mon = None
# Connect ro the DAQ
self.connect()
def read_daq_config(self):
"""Read the current configuration from the JSON file
"""
r = url3.request(method='GET', url='http://xbl-daq-29:5000/api/config/get', fields={'config_file': "/etc/std_daq/configs/gf1.json", 'user':"ioc"})
r = r.json()
if 'detail' in r:
# Failed to read config, probbaly wrong file
return
for key, val in r:
getattr(self, "cfg_"+key).set(val).wait()
def write_daq_config(self):
config = {
'detector_name': str(self.cfg_detector_name.get()),
'detector_type': str(self.cfg_detector_type.get()),
'n_modules': int(self.cfg_n_modules.get()),
'bit_depth': int(self.cfg_bit_depth.get()),
'image_pixel_height': int(self.cfg_image_pixel_height.get()),
'image_pixel_width': int(self.cfg_image_pixel_width.get()),
'start_udp_port': int(self.cfg_start_udp_port.get()),
'writer_user_id' int(self.cfg_writer_user_id.get()),
'submodule_info': self.cfg_submodule_info.get(),
'max_number_of_forwarders_spawned': int(self.cfg_max_number_of_forwarders_spawned.get()),
'use_all_forwarders': bool(self.cfg_use_all_forwarders.get()),
'module_sync_queue_size': int(self.cfg_module_sync_queue_size.get()),
'module_positions': self.cfg_module_positions.get()
}
def connect(self):
"""Connect to te StDAQs websockets interface
StdDAQ may reject connection for a few seconds, so if it fails, wait
a bit and try to connect again.
"""
try:
self._client = connect(self._ws_url)
except ConnectionRefusedError:
sleep(5)
self._client = connect(self._ws_url)
def monitor(self):
"""Attach monitoring to the DAQ"""
self._client = connect(self._ws_url)
self._mon = Thread(target=self.poll, daemon=True)
self._mon.start()
def configure(self, n_images: int = None, file_path: str = None) -> tuple:
"""Set the standard DAQ parameters for the next run
Note that full reconfiguration is not possible with the websocket
interface, only changing acquisition parameters. These changes are only
activated upon staging!
Example:
----------
std.configure(n_images=10000, file_path="/data/test/raw")
Parameters
----------
n_images : int, optional
Number of images to be taken during each scan. Set to -1 for an
unlimited number of images (limited by the ringbuffer size and
backend speed). (default = 10000)
file_path : string, optional
Save file path. (default = '/gpfs/test/test-beamline')
"""
old_config = self.read_configuration()
# If Bluesky style configure
if isinstance(n_images, dict):
d = n_images.copy()
n_images = d.get('n_images', None)
file_path = d.get('file_path', None)
if n_images is not None:
self.n_images.set(int(n_images))
if file_path is not None:
self.output_file.set(str(file_path))
new_config = self.read_configuration()
return (old_config, new_config)
def stage(self) -> list:
"""Start a new run with the standard DAQ
Behavior: the StdDAQ can stop the previous run either by itself or
by calling unstage. So it might start from an already running state or
not, we can't query if not running.
"""
file_path = self.file_path.get()
n_image = self.n_images.get()
message = {"command": "start", "path": file_path, "n_image": n_image}
reply = self.message(message)
reply = json.loads(reply)
if reply["status"] in ("creating_file"):
self.status.put(reply["status"], force=True)
elif reply["status"] in ("rejected"):
raise RuntimeError(
f"Start command rejected (might be already running): {reply['reason']}"
)
self._mon = Thread(target=self.poll, daemon=True)
self._mon.start()
return super().stage()
def unstage(self):
""" Stop a running acquisition
WARN: This will also close the connection!!!
"""
message = {"command": "stop"}
_ = self.message(message, wait_reply=False)
return super().unstage()
def stop(self):
""" Stop a running acquisition
WARN: This will also close the connection!!!
"""
message = {"command": "stop"}
# The poller thread locks recv raising a RuntimeError
self.message(message, wait_reply=False)
def message(self, message: dict, timeout=1, wait_reply=True):
"""Send a message to the StdDAQ and receive a reply
Note: finishing acquisition meang StdDAQ will close connections so
there's no idle state polling.
"""
if isinstance(message, dict):
msg = json.dumps(message)
else:
msg = str(message)
# Send message (reopen connection if needed)
try:
self._client.send(msg)
except (ConnectionClosedError, ConnectionClosedOK):
self.connect()
self._client.send(msg)
# Wait for reply
reply = None
if wait_reply:
try:
reply = self._client.recv(timeout)
return reply
except (ConnectionClosedError, ConnectionClosedOK, TimeoutError) as ex:
print(ex)
return reply
def poll(self):
"""Monitor status messages until connection is open"""
try:
for msg in self._client:
try:
message = json.loads(msg)
self.status.put(message["status"], force=True)
except (ConnectionClosedError, ConnectionClosedOK) as ex:
return
except Exception as ex:
print(ex)
return
finally:
self._mon = None
# Automatically connect to MicroSAXS testbench if directly invoked
if __name__ == "__main__":
daq = StdDaqWsClient(name="daq", url="ws://xbl-daq-29:8080")
daq.wait_for_connection()