More or less working DAQ with potential crash on idle stop

This commit is contained in:
gac-x05la
2024-10-25 17:10:42 +02:00
committed by mohacsi_i
parent 4e13938184
commit ff91a3c39e
4 changed files with 251 additions and 223 deletions
+1 -1
View File
@@ -12,7 +12,7 @@ classifiers = [
"Programming Language :: Python :: 3",
"Topic :: Scientific/Engineering",
]
dependencies = ["ophyd_devices", "bec_lib", "websockets", "pyzmq"]
dependencies = ["ophyd_devices", "bec_lib", "requests", "websockets", "pyzmq", "jinja2"]
[project.optional-dependencies]
dev = ["black", "isort", "coverage", "pylint", "pytest", "pytest-random-order", "ophyd_devices", "bec_server"]
@@ -108,7 +108,7 @@ class GigaFrostClient(PSIDetectorBase):
custom_prepare_cls = GigaFrostClientMixin
USER_ACCESS = ["kickoff"]
cam = Component(gfcam.GigaFrostCamera, prefix="X02DA-CAM-GF2:", name="cam")
# cam = Component(gfcam.GigaFrostCamera, prefix="X02DA-CAM-GF2:", name="cam")
daq = Component(stddaq.StdDaqClient, name="daq")
# pylint: disable=too-many-arguments
@@ -124,8 +124,8 @@ class GigaFrostClient(PSIDetectorBase):
kind=None,
**kwargs,
):
self.__class__.__dict__["cam"].kwargs['backend_url'] = backend_url
self.__class__.__dict__["cam"].kwargs['auto_soft_enable'] = auto_soft_enable
# self.__class__.__dict__["cam"].kwargs['backend_url'] = backend_url
# self.__class__.__dict__["cam"].kwargs['auto_soft_enable'] = auto_soft_enable
self.__class__.__dict__["daq"].kwargs['ws_url'] = daq_ws_url
self.__class__.__dict__["daq"].kwargs['rest_url'] = daq_rest_url
+235 -211
View File
@@ -9,181 +9,70 @@ Created on Thu Jun 27 17:28:43 2024
import json
from time import sleep
from threading import Thread
import requests
from ophyd import Device, Signal, Component, Kind, DeviceStatus, Staged
from websockets.sync.client import connect
from websockets.exceptions import ConnectionClosedOK, ConnectionClosedError
try:
from stddaq_rest import StdDaqRestClient
except ModuleNotFoundError:
from tomcat_bec.devices.gigafrost.stddaq_rest import StdDaqRestClient
from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase as PSIDeviceBase
from ophyd_devices.interfaces.base_classes.psi_detector_base import CustomDetectorMixin as CustomDeviceMixin
from bec_lib import bec_logger
logger = bec_logger.logger
class StdDaqClient(Device):
"""StdDaq API
class StdDaqMixin(CustomDeviceMixin):
# parent : StdDaqClient
_mon = None
This class combines the new websocket and REST interfaces of the stdDAQ that
were meant to replace the documented python client. The websocket interface
starts and stops the acquisition and provides status, while the REST
interface can read and write the configuration. The DAQ needs to restart
all services to reconfigure with a new config.
def on_stage(self) -> None:
logger.warning(self.parent.scaninfo.__dict__)
The websocket provides status updates about a running acquisition but the
interface breaks connection at the end of the run.
# Restart the DAQ if resolution changed
The standard DAQ configuration is a single JSON file locally autodeployed
to the DAQ servers (as root!!!). It can only be written through a REST API
that is semi-supported. The DAQ might be distributed across several servers,
we'll only interface with the primary REST interface will synchronize with
all secondary REST servers. In the past this was a source of problems.
reconfigure = False
if hasattr(self.parent.scaninfo, 'image_width'):
reconfigure = True
self.parent.cfg_pixel_width.set(self.parent.scaninfo.image_width).wait()
if hasattr(self.parent.scaninfo, 'image_height'):
reconfigure = True
self.parent.cfg_pixel_height.set(self.parent.scaninfo.image_height).wait()
Example:
'''
daq = StdDaqClient(name="daq", ws_url="ws://xbl-daq-29:8080", rest_url="http://xbl-daq-29:5000")
'''
"""
# pylint: disable=too-many-instance-attributes
USER_ACCESS=["kickoff"]
# Restart the DAQ if resolution changed
cfg = self.parent.get_daq_config()
if cfg['image_pixel_height'] != self.parent.cfg_pixel_height.get() or cfg['image_pixel_width'] != self.parent.cfg_pixel_width.get():
self.parent.safestop()
sleep(1)
cfg = self.parent.get_daq_config()
changes = {
'image_pixel_height': int(self.parent.cfg_pixel_height.get()),
'image_pixel_width': int(self.parent.cfg_pixel_width.get()),
}
cfg = cfg.update(changes)
self.parent.set_daq_config(cfg)
self.parent.read_daq_config()
# Status attributes
url = Component(Signal, kind=Kind.config)
status = Component(Signal, value="unknown", kind=Kind.normal)
n_total = Component(Signal, value=10000, kind=Kind.config)
file_path = Component(Signal, value="/gpfs/test/test-beamline", kind=Kind.config)
# Configuration
config = Component(StdDaqRestClient, kind=Kind.config)
# Online configurable changes
if hasattr(self.parent.scaninfo, 'num_images'):
self.parent.num_images.set(self.parent.scaninfo.num_images).wait()
if hasattr(self.parent.scaninfo, 'daq_file_path'):
self.parent.file_path.set(self.parent.scaninfo.daq_file_path).wait()
file_path = self.parent.file_path.get()
num_images = self.parent.num_images.get()
def __init__(
self,
*args,
ws_url: str = "ws://localhost:8080",
rest_url: str = "http://localhost:5000",
parent: Device = None,
**kwargs
) -> None:
self.__class__.__dict__['config'].kwargs['rest_url'] = rest_url
super().__init__(*args, parent=parent, **kwargs)
self.status._metadata["write_access"] = False
self.url._metadata["write_access"] = False
self.url.set(ws_url, force=True).wait()
self._ws_url = ws_url
self._mon = None
# Connect ro the DAQ
self._client = None
self.connect()
def __del__(self) -> None:
self._client.close()
return super().__del__()
def connect(self):
"""Connect to the StdDAQ's websockets interface
StdDAQ may reject connection for a few seconds after restart, or when
it wants so if it fails, wait a bit and try to connect again.
"""
num_retry = 0
while num_retry < 5:
try:
self._client = connect(self._ws_url)
break
except ConnectionRefusedError:
num_retry += 1
sleep(3)
if num_retry == 5:
raise ConnectionRefusedError(
"The stdDAQ websocket interface refused connection 5 times.")
def monitor(self):
"""Attach monitoring to the DAQ"""
self._client = connect(self._ws_url)
self._mon = Thread(target=self.poll, daemon=True)
self._mon.start()
def configure(self, d: dict = None) -> tuple:
"""Set the standard DAQ parameters for the next run
Note that full reconfiguration is not possible with the websocket
interface, only changing acquisition parameters. These changes are only
activated upon staging!
Example:
----------
std.configure(n_total=10000, file_path="/data/test/raw")
Parameters
----------
n_total : int, optional
Total number of images to be taken during each scan. Set to -1 for
an unlimited number of images (limited by the ringbuffer size and
backend speed). (default = 10000)
file_path : string, optional
Save file path. (default = '/gpfs/test/test-beamline')
"""
old_config = self.read_configuration()
if d is not None:
# Set acquisition parameters
if 'n_total' in d:
self.n_total.set(int(d['n_total']))
if 'ntotal' in d:
self.n_total.set(int(d['ntotal']))
if 'file_path' in d:
self.output_file.set(str(d['file_path']))
# Configure DAQ
if 'pixel_width' in d or 'pixel_height' in d:
# Safe stop before configure (see 'reset')
self.reset()
self.config.configure(d)
new_config = self.read_configuration()
return (old_config, new_config)
def reset(self):
"""
The current stdDAQ refuses connection if another session is running. This is safety so
we don't accidentally kill a running exposure. But this also means that we have to wait
until a dead session dies of timeout.
NOTE: REST reconfiguration restarts with systemd and can corrupt currently written files.
"""
try:
if self._client is not None:
self._client.close()
self._client = connect(self._ws_url)
msg = json.dumps({"command": "stop"})
self._client.send(msg)
except (ConnectionClosedError, ConnectionClosedOK, ConnectionRefusedError):
pass
self._staged = Staged.no
sleep(1)
def stage(self) -> list:
"""Start a new run with the standard DAQ
Behavior: the StdDAQ can stop the previous run either by itself or
by calling unstage. So it might start from an already running state or
not, we can't query if not running.
"""
if self._staged:
self._client.close()
file_path = self.file_path.get()
n_total = self.n_total.get()
message = {"command": "start", "path": file_path, "n_image": n_total}
# Try to start a new run
message = {"command": "start", "path": file_path, "n_image": num_images}
ii = 0
self.parent.connect()
while True:
reply = self.message(message)
reply = self.parent.message(message)
if reply is not None:
reply = json.loads(reply)
self.status.put(reply["status"], force=True)
self.parent.status.set(reply["status"], force=True).wait()
logger.info(f"[{self.parent.name}] Start DAq reply: {reply['status']}")
# Give it more time to reconfigure
if reply["status"] in ("rejected"):
sleep(2)
@@ -196,76 +85,44 @@ class StdDaqClient(Device):
raise RuntimeError(
f"Start StdDAQ command rejected (might be already running): {reply['reason']}"
)
# And start status monitoring
self._mon = Thread(target=self.poll, daemon=True)
self._mon.start()
return super().stage()
def unstage(self):
""" Stop a running acquisition
WARN: This will also close the connection!!!
def on_unstage(self) -> None:
""" Stop a running acquisition and close connection
"""
# The poller thread locks recv raising a RuntimeError
try:
if self.parent._wsclient is None:
self.parent.connect()
message = {"command": "stop"}
self.message(message, wait_reply=False)
self.parent.message(message, wait_reply=False)
except RuntimeError:
pass
self._client.close()
return super().unstage()
def kickoff(self) -> DeviceStatus:
""" The DAQ was not meant to quickly toggle"""
return DeviceStatus(self, done=True, success=True, settle_time=0.1)
def stop(self, *, success=False):
""" Stop a running acquisition
WARN: This will also close the connection!!!
"""
self.unstage()
def message(self, message: dict, timeout=1, wait_reply=True):
"""Send a message to the StdDAQ and receive a reply
Note: finishing acquisition means StdDAQ will close connection, so
there's no idle state polling.
"""
if isinstance(message, dict):
msg = json.dumps(message)
else:
msg = str(message)
# Send message (reopen connection if needed)
try:
self._client.send(msg)
except (ConnectionClosedError, ConnectionClosedOK):
self.connect()
self._client.send(msg)
# Wait for reply
reply = None
if wait_reply:
finally:
try:
reply = self._client.recv(timeout)
return reply
except (ConnectionClosedError, ConnectionClosedOK, TimeoutError) as ex:
print(ex)
return reply
self.parent._wsclient.close()
except TypeError:
# Already closed
pass
def poll(self):
"""Monitor status messages until connection is open
def on_stop(self):
""" Stop a running acquisition and close connection
"""
return self.parent.unstage()
This will block the reply monitoring to calling unstage() might throw.
Status updates are sent every 1 seconds
def poll(self) -> None:
""" Monitor status messages while connection is open. This will block the reply monitoring
to calling unstage() might throw. Status updates are sent every 1 seconds, but finishing
acquisition means StdDAQ will close connection, so there's no idle state polling.
"""
try:
sleep(1.2)
for msg in self._client:
sleep(0.2)
for msg in self.parent._wsclient:
try:
message = json.loads(msg)
self.status.put(message["status"], force=True)
self.parent.status.put(message["status"], force=True)
except (ConnectionClosedError, ConnectionClosedOK):
return
except Exception as ex:
@@ -280,6 +137,173 @@ class StdDaqClient(Device):
self._mon = None
class StdDaqClient(PSIDeviceBase):
"""StdDaq API
This class combines the new websocket and REST interfaces of the stdDAQ replaced the documented
python client. The websocket interface starts and stops the acquisition and provides status,
while the REST interface can read and write the JSON configuration file.
The DAQ needs to restart all services to reconfigure with a new config, which might corrupt
the currently written files (fix is underway).
Example:
```
daq = StdDaqClient(name="daq", ws_url="ws://xbl-daq-29:8080", rest_url="http://xbl-daq-29:5000")
```
"""
# pylint: disable=too-many-instance-attributes
custom_prepare_cls = StdDaqMixin
USER_ACCESS = ["set_daq_config", "get_daq_config", "safestop", "restart"]
_wsclient = None
# Status attributes
ws_url = Component(Signal, kind=Kind.config)
status = Component(Signal, value="unknown", kind=Kind.normal)
num_images = Component(Signal, value=10000, kind=Kind.config)
file_path = Component(Signal, value="/gpfs/test/test-beamline", kind=Kind.config)
# Configuration attributes
rest_url = Component(Signal, kind=Kind.config)
cfg_detector_name = Component(Signal, kind=Kind.config)
cfg_detector_type = Component(Signal, kind=Kind.config)
cfg_bit_depth = Component(Signal, kind=Kind.config)
cfg_pixel_height = Component(Signal, kind=Kind.config)
cfg_pixel_width = Component(Signal, kind=Kind.config)
def __init__(
self,
*args,
ws_url: str = "ws://localhost:8080",
rest_url: str = "http://localhost:5000",
parent: Device = None,
**kwargs
) -> None:
super().__init__(*args, parent=parent, **kwargs)
self.status._metadata["write_access"] = False
self.ws_url._metadata["write_access"] = False
self.ws_url.set(ws_url, force=True).wait()
self.rest_url._metadata["write_access"] = False
self.rest_url.set(rest_url, force=True).wait()
self._mon = None
# Connect ro the DAQ and initialize values
try:
self.read_daq_config()
except Exception as ex:
logger.error(f"Failed to connect to the stdDAQ REST API\n{ex}")
def __del__(self) -> None:
self._wsclient.close()
return super().__del__()
def connect(self):
"""Connect to the StdDAQ's websockets interface
StdDAQ may reject connection for a few seconds after restart, or when
it wants so if it fails, wait a bit and try to connect again.
"""
num_retry = 0
while num_retry < 5:
try:
self._wsclient = connect(self.ws_url.get())
break
except ConnectionRefusedError:
num_retry += 1
sleep(3)
if num_retry == 5:
raise ConnectionRefusedError(
"The stdDAQ websocket interface refused connection 5 times.")
def message(self, message: dict, timeout=1, wait_reply=True):
"""Send a message to the StdDAQ and receive a reply
Note: finishing acquisition means StdDAQ will close connection, so
there's no idle state polling.
"""
# Send message (reopen connection if needed)
logger.warning(self._wsclient.__dict__)
try:
msg = json.dumps(message) if isinstance(message, dict) else str(message)
self._wsclient.send(msg)
except (ConnectionClosedError, ConnectionClosedOK) as ex:
print(ex)
# self.connect()
self._wsclient.send(msg)
# Wait for reply
reply = None
if wait_reply:
try:
reply = self._wsclient.recv(timeout)
return reply
except (ConnectionClosedError, ConnectionClosedOK, TimeoutError) as ex:
print(ex)
return reply
def get_daq_config(self) -> dict:
"""Read the current configuration from the DAQ
"""
r = requests.get(
self.rest_url.get() + '/api/config/get',
params={'user': "ioc"},
timeout=2)
if r.status_code != 200:
raise ConnectionError(f"[{self.name}] Error {r.status_code}:\t{r.text}")
return r.json()
def read_daq_config(self) -> dict:
"""Read the current configuration from the DAQ and update the ophyd device
"""
cfg = self.get_daq_config()
self.cfg_detector_name.set(cfg['detector_name']).wait()
self.cfg_detector_type.set(cfg['detector_type']).wait()
self.cfg_bit_depth.set(cfg['bit_depth']).wait()
self.cfg_pixel_height.set(cfg['image_pixel_height']).wait()
self.cfg_pixel_width.set(cfg['image_pixel_width']).wait()
return cfg
def set_daq_config(self, config):
"""Write a full configuration to the DAQ
"""
r = requests.post(
self.rest_url.get() + '/api/config/set',
params={"user": "ioc"},
json=config,
timeout=2,
headers={"Content-Type": "application/json"}
)
if r.status_code != 200:
raise ConnectionError(f"[{self.name}] Error {r.status_code}:\t{r.text}")
return r.json()
def restart(self):
""" Reconfigures the stdDAQ to restart the services
"""
cfg = self.get_daq_config()
self.set_daq_config(cfg)
def safestop(self):
"""
The current stdDAQ refuses connection if another session is running. This is safety so
we don't accidentally kill a running exposure. But this also means that we have to wait
until a dead session dies of timeout.
NOTE: REST reconfiguration restarts with systemd and can corrupt currently written files.
"""
try:
if self._wsclient is not None:
self._wsclient.close()
self._wsclient = connect(self._ws_url)
msg = json.dumps({"command": "stop"})
self._wsclient.send(msg)
except (ConnectionClosedError, ConnectionClosedOK, ConnectionRefusedError):
pass
self._staged = Staged.no
sleep(1)
# Automatically connect to microXAS testbench if directly invoked
if __name__ == "__main__":
daq = StdDaqClient(name="daq", ws_url="ws://xbl-daq-29:8080", rest_url="http://xbl-daq-29:5000")
+12 -8
View File
@@ -7,15 +7,18 @@ Created on Thu Jun 27 17:28:43 2024
@author: mohacsi_i
"""
from time import sleep
from ophyd import Device, Signal, Component, Kind
import requests
from ophyd import Device, Signal, Component, Kind
from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase as PSIDeviceBase
from ophyd_devices.interfaces.base_classes.psi_detector_base import CustomDetectorMixin as CustomDeviceMixin
from bec_lib import bec_logger
logger = bec_logger.logger
try:
from bec_lib import bec_logger
logger = bec_logger.logger
except ModuleNotFoundError:
import logging
logger = logging.getLogger("GfCam")
class StdDaqRestClient(Device):
@@ -57,9 +60,10 @@ class StdDaqRestClient(Device):
cfg_module_positions = Component(Signal, kind=Kind.config)
def __init__(
self, *args, rest_url: str = "http://localhost:5000", parent: Device = None, **kwargs
self, *args, rest_url: str = "http://localhost:5000", camera_name="gigafrost", parent: Device = None, **kwargs
) -> None:
super().__init__(*args, parent=parent, **kwargs)
self.camera_name = camera_name
self.rest_url._metadata["write_access"] = False
self.rest_url.put(rest_url, force=True)