DAQ config reader
This commit is contained in:
@@ -119,6 +119,19 @@ daq:
|
||||
readoutPriority: monitored
|
||||
softwareTrigger: false
|
||||
|
||||
daqcfg:
|
||||
description: Standard DAQ config
|
||||
deviceClass: tomcat_bec.devices.gigafrost.stddaq_rest.StdDaqRestConfig
|
||||
deviceConfig:
|
||||
url: 'http://xbl-daq-29:5000'
|
||||
deviceTags:
|
||||
- std-daq
|
||||
enabled: true
|
||||
onFailure: buffer
|
||||
readOnly: false
|
||||
readoutPriority: monitored
|
||||
softwareTrigger: false
|
||||
|
||||
daq_stream0:
|
||||
description: Standard DAQ preview stream 2 frames every 1000
|
||||
deviceClass: tomcat_bec.devices.gigafrost.stddaq_preview.StdDaqPreview
|
||||
|
||||
112
tomcat_bec/devices/gigafrost/stddaq_rest.py
Normal file
112
tomcat_bec/devices/gigafrost/stddaq_rest.py
Normal file
@@ -0,0 +1,112 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
Standard DAQ class module
|
||||
|
||||
Created on Thu Jun 27 17:28:43 2024
|
||||
|
||||
@author: mohacsi_i
|
||||
"""
|
||||
import sys
|
||||
import json
|
||||
from time import sleep
|
||||
from threading import Thread
|
||||
from ophyd import Device, Signal, Component, Kind
|
||||
import requests
|
||||
|
||||
|
||||
class StdDaqRestConfig(Device):
|
||||
"""Wrapper class around the new StdDaq REST interface.
|
||||
|
||||
This was meant to replace the websocket inteface that replaced the
|
||||
documented python client. We can finally read configuration through
|
||||
standard HTTP requests, although the secondary server is ot reachable
|
||||
at the time.
|
||||
"""
|
||||
# pylint: disable=too-many-instance-attributes
|
||||
|
||||
# Status attributes
|
||||
cfg_detector_name = Component(Signal, kind=Kind.config)
|
||||
cfg_detector_type = Component(Signal, kind=Kind.config)
|
||||
cfg_n_modules = Component(Signal, kind=Kind.config)
|
||||
cfg_bit_depth = Component(Signal, kind=Kind.config)
|
||||
cfg_image_pixel_height = Component(Signal, kind=Kind.config)
|
||||
cfg_image_pixel_width = Component(Signal, kind=Kind.config)
|
||||
cfg_start_udp_port = Component(Signal, kind=Kind.config)
|
||||
cfg_writer_user_id = Component(Signal, kind=Kind.config)
|
||||
cfg_submodule_info = Component(Signal, kind=Kind.config)
|
||||
cfg_max_number_of_forwarders_spawned = Component(Signal, kind=Kind.config)
|
||||
cfg_use_all_forwarders = Component(Signal, kind=Kind.config)
|
||||
cfg_module_sync_queue_size = Component(Signal, kind=Kind.config)
|
||||
cfg_module_positions = Component(Signal, kind=Kind.config)
|
||||
|
||||
|
||||
def __init__(
|
||||
self, *args, url: str = "http://xbl-daq-29:5000", parent: Device = None, **kwargs
|
||||
) -> None:
|
||||
super().__init__(*args, parent=parent, **kwargs)
|
||||
self._url_base = url
|
||||
|
||||
# Connect ro the DAQ and initialize values
|
||||
self.read_daq_config()
|
||||
|
||||
def read_daq_config(self):
|
||||
"""Read the current configuration from the JSON file
|
||||
"""
|
||||
r = requests.get(self._url_base + '/api/config/get', params={'config_file': "/etc/std_daq/configs/gf1.json", 'user':"ioc"})
|
||||
if r.status_code != 200:
|
||||
raise ConnectionError(f"[{self.name}] Error {r.status_code}:\t{r.text}")
|
||||
|
||||
cfg = r.json()
|
||||
for key, val in cfg.items():
|
||||
if isinstance(val, (int, float, str)):
|
||||
getattr(self, "cfg_"+key).set(val).wait()
|
||||
return cfg
|
||||
|
||||
def write_daq_config(self):
|
||||
config = {
|
||||
'detector_name': str(self.cfg_detector_name.get()),
|
||||
'detector_type': str(self.cfg_detector_type.get()),
|
||||
'n_modules': int(self.cfg_n_modules.get()),
|
||||
'bit_depth': int(self.cfg_bit_depth.get()),
|
||||
'image_pixel_height': int(self.cfg_image_pixel_height.get()),
|
||||
'image_pixel_width': int(self.cfg_image_pixel_width.get()),
|
||||
'start_udp_port': int(self.cfg_start_udp_port.get()),
|
||||
'writer_user_id': int(self.cfg_writer_user_id.get()),
|
||||
'submodule_info': self.cfg_submodule_info.get(),
|
||||
'max_number_of_forwarders_spawned': int(self.cfg_max_number_of_forwarders_spawned.get()),
|
||||
'use_all_forwarders': bool(self.cfg_use_all_forwarders.get()),
|
||||
'module_sync_queue_size': int(self.cfg_module_sync_queue_size.get()),
|
||||
'module_positions': self.cfg_module_positions.get()
|
||||
}
|
||||
|
||||
params = {"user": "ioc", "config_file": "/etc/std_daq/configs/gf1.json"}
|
||||
|
||||
r = requests.post(self._url_base +'/api/config/set', params=params, json=config, headers={"Content-Type": "application/json"})
|
||||
if r.status_code != 200:
|
||||
raise ConnectionError(f"[{self.name}] Error {r.status_code}:\t{r.text}")
|
||||
|
||||
|
||||
def stage(self) -> list:
|
||||
"""Read the current configuration from the DAQ
|
||||
"""
|
||||
self.read_daq_config()
|
||||
return super().stage()
|
||||
|
||||
|
||||
def unstage(self):
|
||||
"""Read the current configuration from the DAQ
|
||||
"""
|
||||
self.read_daq_config()
|
||||
return super().unstage()
|
||||
|
||||
|
||||
def stop(self):
|
||||
"""Read the current configuration from the DAQ
|
||||
"""
|
||||
self.unstage()
|
||||
|
||||
|
||||
# Automatically connect to MicroSAXS testbench if directly invoked
|
||||
if __name__ == "__main__":
|
||||
daqcfg = StdDaqRestConfig(name="daqcfg", url="http://xbl-daq-29:5000")
|
||||
daqcfg.wait_for_connection()
|
||||
@@ -1,249 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
Standard DAQ class module
|
||||
|
||||
Created on Thu Jun 27 17:28:43 2024
|
||||
|
||||
@author: mohacsi_i
|
||||
"""
|
||||
import sys
|
||||
import json
|
||||
from time import sleep
|
||||
from threading import Thread
|
||||
from ophyd import Device, Signal, Component, Kind
|
||||
from websockets.sync.client import connect
|
||||
from websockets.exceptions import ConnectionClosedOK, ConnectionClosedError
|
||||
|
||||
|
||||
class StdDaqRestClient(Device):
|
||||
"""Wrapper class around the new StdDaq REST interface.
|
||||
|
||||
This was meant to replace the websocket inteface that replaced the
|
||||
documented python client. We can finally read configuration with urllib3:
|
||||
'''
|
||||
import urllib3 as url3
|
||||
r = url3.request(method='GET', url='http://xbl-daq-29:5000/api/config/get', fields={'config_file': "/etc/std_daq/configs/gf1.json", 'user':"ioc"})
|
||||
'''
|
||||
"""
|
||||
# pylint: disable=too-many-instance-attributes
|
||||
|
||||
# Status attributes
|
||||
status = Component(Signal, value="unknown", kind=Kind.hinted)
|
||||
n_images = Component(Signal, value=10000, kind=Kind.config)
|
||||
file_path = Component(Signal, value="/gpfs/test/test-beamline", kind=Kind.config)
|
||||
|
||||
|
||||
cfg_detector_name = Component(Signal, kind=Kind.config)
|
||||
cfg_detector_type = Component(Signal, kind=Kind.config)
|
||||
cfg_num_modules = Component(Signal, kind=Kind.config)
|
||||
cfg_bit_depth = Component(Signal, kind=Kind.config)
|
||||
cfg_image_pixel_height = Component(Signal, kind=Kind.config)
|
||||
cfg_image_pixel_width = Component(Signal, kind=Kind.config)
|
||||
cfg_start_udp_port = Component(Signal, kind=Kind.config)
|
||||
cfg_writer_user_id = Component(Signal, kind=Kind.config)
|
||||
cfg_submodule_info = Component(Signal, kind=Kind.config)
|
||||
cfg_max_number_of_forwarders_spawned = Component(Signal, kind=Kind.config)
|
||||
cfg_use_all_forwarders = Component(Signal, kind=Kind.config)
|
||||
cfg_module_sync_queue_size = Component(Signal, kind=Kind.config)
|
||||
cfg_module_positions = Component(Signal, kind=Kind.config)
|
||||
|
||||
|
||||
|
||||
def __init__(
|
||||
self, *args, url: str = "http://xbl-daq-29:5000", parent: Device = None, **kwargs
|
||||
) -> None:
|
||||
super().__init__(*args, parent=parent, **kwargs)
|
||||
self.status._metadata["write_access"] = False
|
||||
self._url = url
|
||||
self._mon = None
|
||||
|
||||
# Connect ro the DAQ
|
||||
self.connect()
|
||||
|
||||
def read_daq_config(self):
|
||||
"""Read the current configuration from the JSON file
|
||||
"""
|
||||
r = url3.request(method='GET', url='http://xbl-daq-29:5000/api/config/get', fields={'config_file': "/etc/std_daq/configs/gf1.json", 'user':"ioc"})
|
||||
r = r.json()
|
||||
|
||||
if 'detail' in r:
|
||||
# Failed to read config, probbaly wrong file
|
||||
return
|
||||
|
||||
for key, val in r:
|
||||
getattr(self, "cfg_"+key).set(val).wait()
|
||||
|
||||
def write_daq_config(self):
|
||||
|
||||
|
||||
config = {
|
||||
'detector_name': str(self.cfg_detector_name.get()),
|
||||
'detector_type': str(self.cfg_detector_type.get()),
|
||||
'n_modules': int(self.cfg_n_modules.get()),
|
||||
'bit_depth': int(self.cfg_bit_depth.get()),
|
||||
'image_pixel_height': int(self.cfg_image_pixel_height.get()),
|
||||
'image_pixel_width': int(self.cfg_image_pixel_width.get()),
|
||||
'start_udp_port': int(self.cfg_start_udp_port.get()),
|
||||
'writer_user_id' int(self.cfg_writer_user_id.get()),
|
||||
'submodule_info': self.cfg_submodule_info.get(),
|
||||
'max_number_of_forwarders_spawned': int(self.cfg_max_number_of_forwarders_spawned.get()),
|
||||
'use_all_forwarders': bool(self.cfg_use_all_forwarders.get()),
|
||||
'module_sync_queue_size': int(self.cfg_module_sync_queue_size.get()),
|
||||
'module_positions': self.cfg_module_positions.get()
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def connect(self):
|
||||
"""Connect to te StDAQs websockets interface
|
||||
|
||||
StdDAQ may reject connection for a few seconds, so if it fails, wait
|
||||
a bit and try to connect again.
|
||||
"""
|
||||
try:
|
||||
self._client = connect(self._ws_url)
|
||||
except ConnectionRefusedError:
|
||||
sleep(5)
|
||||
self._client = connect(self._ws_url)
|
||||
|
||||
def monitor(self):
|
||||
"""Attach monitoring to the DAQ"""
|
||||
self._client = connect(self._ws_url)
|
||||
self._mon = Thread(target=self.poll, daemon=True)
|
||||
self._mon.start()
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def configure(self, n_images: int = None, file_path: str = None) -> tuple:
|
||||
"""Set the standard DAQ parameters for the next run
|
||||
|
||||
Note that full reconfiguration is not possible with the websocket
|
||||
interface, only changing acquisition parameters. These changes are only
|
||||
activated upon staging!
|
||||
|
||||
Example:
|
||||
----------
|
||||
std.configure(n_images=10000, file_path="/data/test/raw")
|
||||
|
||||
Parameters
|
||||
----------
|
||||
n_images : int, optional
|
||||
Number of images to be taken during each scan. Set to -1 for an
|
||||
unlimited number of images (limited by the ringbuffer size and
|
||||
backend speed). (default = 10000)
|
||||
file_path : string, optional
|
||||
Save file path. (default = '/gpfs/test/test-beamline')
|
||||
|
||||
"""
|
||||
old_config = self.read_configuration()
|
||||
# If Bluesky style configure
|
||||
if isinstance(n_images, dict):
|
||||
d = n_images.copy()
|
||||
n_images = d.get('n_images', None)
|
||||
file_path = d.get('file_path', None)
|
||||
|
||||
if n_images is not None:
|
||||
self.n_images.set(int(n_images))
|
||||
if file_path is not None:
|
||||
self.output_file.set(str(file_path))
|
||||
|
||||
new_config = self.read_configuration()
|
||||
return (old_config, new_config)
|
||||
|
||||
def stage(self) -> list:
|
||||
"""Start a new run with the standard DAQ
|
||||
|
||||
Behavior: the StdDAQ can stop the previous run either by itself or
|
||||
by calling unstage. So it might start from an already running state or
|
||||
not, we can't query if not running.
|
||||
"""
|
||||
file_path = self.file_path.get()
|
||||
n_image = self.n_images.get()
|
||||
|
||||
message = {"command": "start", "path": file_path, "n_image": n_image}
|
||||
reply = self.message(message)
|
||||
|
||||
reply = json.loads(reply)
|
||||
if reply["status"] in ("creating_file"):
|
||||
self.status.put(reply["status"], force=True)
|
||||
elif reply["status"] in ("rejected"):
|
||||
raise RuntimeError(
|
||||
f"Start command rejected (might be already running): {reply['reason']}"
|
||||
)
|
||||
|
||||
self._mon = Thread(target=self.poll, daemon=True)
|
||||
self._mon.start()
|
||||
return super().stage()
|
||||
|
||||
def unstage(self):
|
||||
""" Stop a running acquisition
|
||||
|
||||
WARN: This will also close the connection!!!
|
||||
"""
|
||||
message = {"command": "stop"}
|
||||
_ = self.message(message, wait_reply=False)
|
||||
return super().unstage()
|
||||
|
||||
def stop(self):
|
||||
""" Stop a running acquisition
|
||||
|
||||
WARN: This will also close the connection!!!
|
||||
"""
|
||||
message = {"command": "stop"}
|
||||
# The poller thread locks recv raising a RuntimeError
|
||||
self.message(message, wait_reply=False)
|
||||
|
||||
def message(self, message: dict, timeout=1, wait_reply=True):
|
||||
"""Send a message to the StdDAQ and receive a reply
|
||||
|
||||
Note: finishing acquisition meang StdDAQ will close connections so
|
||||
there's no idle state polling.
|
||||
"""
|
||||
if isinstance(message, dict):
|
||||
msg = json.dumps(message)
|
||||
else:
|
||||
msg = str(message)
|
||||
|
||||
# Send message (reopen connection if needed)
|
||||
try:
|
||||
self._client.send(msg)
|
||||
except (ConnectionClosedError, ConnectionClosedOK):
|
||||
self.connect()
|
||||
self._client.send(msg)
|
||||
# Wait for reply
|
||||
reply = None
|
||||
if wait_reply:
|
||||
try:
|
||||
reply = self._client.recv(timeout)
|
||||
return reply
|
||||
except (ConnectionClosedError, ConnectionClosedOK, TimeoutError) as ex:
|
||||
print(ex)
|
||||
return reply
|
||||
|
||||
def poll(self):
|
||||
"""Monitor status messages until connection is open"""
|
||||
try:
|
||||
for msg in self._client:
|
||||
try:
|
||||
message = json.loads(msg)
|
||||
self.status.put(message["status"], force=True)
|
||||
except (ConnectionClosedError, ConnectionClosedOK) as ex:
|
||||
return
|
||||
except Exception as ex:
|
||||
print(ex)
|
||||
return
|
||||
finally:
|
||||
self._mon = None
|
||||
|
||||
|
||||
# Automatically connect to MicroSAXS testbench if directly invoked
|
||||
if __name__ == "__main__":
|
||||
daq = StdDaqWsClient(name="daq", url="ws://xbl-daq-29:8080")
|
||||
daq.wait_for_connection()
|
||||
Reference in New Issue
Block a user