Added complete method to stdDAQ

This commit is contained in:
gac-x05la
2025-01-23 11:58:55 +01:00
parent 22e20edd71
commit 9e026388b5
2 changed files with 86 additions and 96 deletions

View File

@@ -12,7 +12,9 @@ from threading import Thread
import requests
from ophyd import Device, Signal, Component, Kind, Staged
from websockets.sync.client import connect
from ophyd.status import SubscriptionStatus
from ophyd.flyers import FlyerInterface
from websockets.sync.client import connect, ClientConnection
from websockets.exceptions import ConnectionClosedOK, ConnectionClosedError
from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase as PSIDeviceBase
@@ -82,6 +84,7 @@ class StdDaqMixin(CustomDeviceMixin):
def on_unstage(self):
""" Stop a running acquisition and close connection
"""
print("Creating virtual dataset")
self.parent.create_virtual_dataset()
self.parent.blueunstage()
@@ -99,8 +102,8 @@ class StdDaqMixin(CustomDeviceMixin):
sleep(0.2)
for msg in self.parent._wsclient:
message = json.loads(msg)
self.parent.status.put(message["status"], force=True)
# logger.info(f"[{self.parent.name}] Pushed status: {message['status']}")
self.parent.runstatus.put(message["status"], force=True)
logger.info(f"[{self.parent.name}] Pushed status: {message['status']}")
except (ConnectionClosedError, ConnectionClosedOK, AssertionError):
# Libraty throws theese after connection is closed
return
@@ -114,11 +117,10 @@ class StdDaqMixin(CustomDeviceMixin):
class StdDaqClient(PSIDeviceBase):
"""StdDaq API
This class combines the new websocket and REST interfaces of the stdDAQ replaced the documented
python client. The websocket interface starts and stops the acquisition and provides status,
while the REST interface can read and write the JSON configuration file.
The DAQ needs to restart all services to reconfigure with a new config, which might corrupt
This class combines the new websocket and REST interfaces of the stdDAQ. The websocket
interface starts and stops the acquisition and provides status, while the REST interface
can read and write the JSON configuration file. The stdDAQ needs to restart all services
to reconfigure with a new config, which might corrupt
the currently written files (fix is underway).
Example:
@@ -132,12 +134,12 @@ class StdDaqClient(PSIDeviceBase):
_wsclient = None
# Status attributes
ws_url = Component(Signal, kind=Kind.config)
status = Component(Signal, value="unknown", kind=Kind.normal)
ws_url = Component(Signal, kind=Kind.config, metadata={'write_access': False})
runstatus = Component(Signal, value="unknown", kind=Kind.normal, metadata={'write_access': False})
num_images = Component(Signal, value=10000, kind=Kind.config)
file_path = Component(Signal, value="/gpfs/test/test-beamline", kind=Kind.config)
# Configuration attributes
rest_url = Component(Signal, kind=Kind.config)
rest_url = Component(Signal, kind=Kind.config, metadata={'write_access': False})
cfg_detector_name = Component(Signal, kind=Kind.config)
cfg_detector_type = Component(Signal, kind=Kind.config)
cfg_bit_depth = Component(Signal, kind=Kind.config)
@@ -145,7 +147,6 @@ class StdDaqClient(PSIDeviceBase):
cfg_pixel_width = Component(Signal, kind=Kind.config)
cfg_nr_writers = Component(Signal, kind=Kind.config)
def __init__(
self,
prefix="",
@@ -156,27 +157,23 @@ class StdDaqClient(PSIDeviceBase):
configuration_attrs=None,
parent=None,
device_manager=None,
sim_mode=False,
ws_url: str = "ws://localhost:8080",
rest_url: str = "http://localhost:5000",
data_source_name = None,
**kwargs,
) -> None:
super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, device_manager=device_manager, **kwargs)
self.status._metadata["write_access"] = False
self.ws_url._metadata["write_access"] = False
self.ws_url.set(ws_url, force=True).wait()
self.rest_url._metadata["write_access"] = False
self.rest_url.set(rest_url, force=True).wait()
self.data_source_name = data_source_name
# Connect to the DAQ and initialize values
try:
self.read_daq_config()
self.get_daq_config(update=True)
except Exception as ex:
logger.error(f"Failed to connect to the stdDAQ REST API\n{ex}")
def connect(self):
def connect(self) -> ClientConnection:
"""Connect to the StdDAQ's websockets interface
StdDAQ may reject connection for a few seconds after restart, or when
@@ -185,16 +182,14 @@ class StdDaqClient(PSIDeviceBase):
num_retry = 0
while num_retry < 5:
try:
logger.debug(f"[{self.name}] Connecting to {self.ws_url.get()}")
self._wsclient = connect(self.ws_url.get())
break
logger.debug(f"[{self.name}] Connecting to stdDAQ at {self.ws_url.get()}")
connection = connect(self.ws_url.get())
logger.debug(f"[{self.name}] Connected to stdDAQ after {num_retry} tries")
return connection
except ConnectionRefusedError:
num_retry += 1
sleep(3)
if num_retry == 5:
raise ConnectionRefusedError(
"The stdDAQ websocket interface refused connection 5 times.")
logger.debug(f"[{self.name}] Connected to DAQ after {num_retry} tries")
sleep(2)
raise ConnectionRefusedError("The stdDAQ websocket interface refused connection 5 times.")
def message(self, message: dict, timeout=1, wait_reply=True, client=None) -> None | str:
"""Send a message to the StdDAQ and receive a reply
@@ -207,7 +202,7 @@ class StdDaqClient(PSIDeviceBase):
# Connect if client was destroyed
if self._wsclient is None:
self.connect()
self._wsclient = self.connect()
# Send message (reopen connection if needed)
msg = json.dumps(message) if isinstance(message, dict) else str(message)
@@ -215,7 +210,7 @@ class StdDaqClient(PSIDeviceBase):
self._wsclient.send(msg)
except (ConnectionClosedError, ConnectionClosedOK, AttributeError) as ex:
# Re-connect if the connection was closed
self.connect()
self._wsclient = self.connect()
self._wsclient.send(msg)
# Wait for reply
@@ -234,49 +229,36 @@ class StdDaqClient(PSIDeviceBase):
def configure(self, d: dict = None):
"""Configure the next scan with the stdDAQ
Parameters as 'd' dictionary
Parameters as 'd' dictionary, the default is unchanged.
----------------------------
num_points_total : int, optional
Number of images to be taken during each scan. Set to -1 for an
unlimited number of images (limited by the ringbuffer size and
backend speed). (default = 10)
exposure : float, optional
Exposure time [ms]. (default = 0.2)
period : float, optional
Exposure period [ms], ignored in soft trigger mode. (default = 1.0)
pixel_width : int, optional
ROI size in the x-direction [pixels] (default = 2016)
pixel_height : int, optional
ROI size in the y-direction [pixels] (default = 2016)
scanid : int, optional
Scan identification number to be associated with the scan data
(default = 0)
trigger_mode : str, optional
Trigger mode of the gifafrost
(default = unchanged)
Number of images to be taken during each scan. Set to -1 for an unlimited number of
images (limited by the ringbuffer size and backend speed).
file_path: str, optional
File path to save the data, usually GPFS.
image_width : int, optional
ROI size in the x-direction [pixels].
image_height : int, optional
ROI size in the y-direction [pixels].
bit_depth: int, optional
Image bit depth for cameras that can change it [int].
nr_writers: int, optional
Number of writers
(default = unchanged)
correction_mode : int, optional
The correction to be applied to the imaging data. The following
modes are available (default = 5):
* 0: Bypass. No corrections are applied to the data.
* 1: Send correction factor A instead of pixel values
* 2: Send correction factor B instead of pixel values
* 3: Send correction factor C instead of pixel values
* 4: Invert pixel values, but do not apply any linearity correction
* 5: Apply the full linearity correction
Number of writers [int].
"""
# Configuration parameters
if 'image_width' in d and d['image_width']!=None:
self.cfg_pixel_width.set(d['image_width']).wait()
if 'image_height' in d and d['image_height']!=None:
self.cfg_pixel_height.set(d['image_height']).wait()
if 'bit_depth' in d:
self.cfg_bit_depth.set(d['bit_depth']).wait()
if 'nr_writers' in d and d['nr_writers']!=None:
self.cfg_nr_writers.set(d['nr_writers']).wait()
# Run parameters
if 'num_points_total' in d:
self.num_images.set(d['num_points_total']).wait()
if 'nr_writers' in d and d['nr_writers']!=None:
print('Setting number of writers ' + str(d['nr_writers']))
self.cfg_nr_writers.set(d['nr_writers']).wait()
if 'file_path' in d and d['file_path']!=None:
self.file_path.set(d['file_path']).wait()
# Restart the DAQ if resolution changed
cfg = self.get_daq_config()
@@ -286,23 +268,17 @@ class StdDaqClient(PSIDeviceBase):
cfg['number_of_writers'] != self.cfg_nr_writers.get():
# Stop if current status is not idle
status = self.state()
if self.state() != "idle":
raise RuntimeWarning(f"[{self.name}] stdDAQ reconfiguration might corrupt files")
# Stop running acquisition
try:
self.stop()
except RuntimeError:
raise RuntimeWarning(f"[{self.name}] Told ya!")
# Update retrieved config
cfg['image_pixel_height'] = int(self.cfg_pixel_height.get())
cfg['image_pixel_width'] = int(self.cfg_pixel_width.get())
cfg['bit_depth'] = int(self.cfg_bit_depth.get())
cfg['number_of_writers'] = int(self.cfg_nr_writers.get())
r = self.set_daq_config(cfg)
cfg=self.read_daq_config()
self.set_daq_config(cfg)
sleep(1)
self.get_daq_config(update=True)
def bluestage(self):
""" Stages the stdDAQ
@@ -312,8 +288,10 @@ class StdDaqClient(PSIDeviceBase):
it for obvious failures.
"""
# Can't stage into a running exposure
print('Before')
if self.state() != 'idle':
raise RuntimeError(f"[{self.name}] stdDAQ can't stage from state: {self.state()}")
print('After')
# Must make sure that image size matches the data source
if self.data_source_name is not None:
@@ -332,13 +310,13 @@ class StdDaqClient(PSIDeviceBase):
num_images = self.num_images.get()
# New connection
self.connect()
self._wsclient = self.connect()
message = {"command": "start", "path": file_path, "n_image": num_images, }
reply = self.message(message)
if reply is not None:
reply = json.loads(reply)
self.status.set(reply["status"], force=True).wait()
self.runstatus.set(reply["status"], force=True).wait()
logger.info(f"[{self.name}] Start DAQ reply: {reply}")
# Give it more time to reconfigure
@@ -365,20 +343,20 @@ class StdDaqClient(PSIDeviceBase):
ii = 0
while ii<10:
# Stop the DAQ (will close connection) - reply is always "success"
self.connect()
self._wsclient = self.connect()
self.message({"command": "stop_all"}, wait_reply=False)
# Let it consolidate
sleep(0.2)
# Check final status (from new connection)
self.connect()
self._wsclient = self.connect()
reply = self.message({"command": "status"})
if reply is not None:
logger.info(f"[{self.name}] DAQ status reply: {reply}")
reply = json.loads(reply)
if reply["status"] in ("idle"):
if reply["status"] in ("idle", "error"):
# Only 'idle' state accepted
print(f"DAQ stopped on try {ii}")
return
@@ -390,7 +368,18 @@ class StdDaqClient(PSIDeviceBase):
ii += 1
raise RuntimeError(f"Failed to stop StdDAQ in time")
def get_daq_config(self) -> dict:
##########################################################################
# Bluesky flyer interface
def complete(self) -> SubscriptionStatus:
"""Wait for current run. Must end in status 'file_saved'."""
def is_running(*args, value, timestamp, **kwargs):
result = value in ["idle", "file_saved", "error"]
return result
status = SubscriptionStatus(self.runstatus, is_running, settle_time=0.5)
return status
def get_daq_config(self, update=False) -> dict:
"""Read the current configuration from the DAQ
"""
r = requests.get(
@@ -399,21 +388,18 @@ class StdDaqClient(PSIDeviceBase):
timeout=2)
if r.status_code != 200:
raise ConnectionError(f"[{self.name}] Error {r.status_code}:\t{r.text}")
return r.json()
cfg = r.json()
def read_daq_config(self) -> dict:
"""Read the current configuration from the DAQ and update the ophyd device
"""
cfg = self.get_daq_config()
self.cfg_detector_name.set(cfg['detector_name']).wait()
self.cfg_detector_type.set(cfg['detector_type']).wait()
self.cfg_bit_depth.set(cfg['bit_depth']).wait()
self.cfg_pixel_height.set(cfg['image_pixel_height']).wait()
self.cfg_pixel_width.set(cfg['image_pixel_width']).wait()
self.cfg_nr_writers.set(cfg['number_of_writers']).wait()
if update:
self.cfg_detector_name.set(cfg['detector_name']).wait()
self.cfg_detector_type.set(cfg['detector_type']).wait()
self.cfg_bit_depth.set(cfg['bit_depth']).wait()
self.cfg_pixel_height.set(cfg['image_pixel_height']).wait()
self.cfg_pixel_width.set(cfg['image_pixel_width']).wait()
self.cfg_nr_writers.set(cfg['number_of_writers']).wait()
return cfg
def set_daq_config(self, config):
def set_daq_config(self, config, settle_time=1):
"""Write a full configuration to the DAQ
"""
url = self.rest_url.get() + '/api/config/set'
@@ -426,6 +412,9 @@ class StdDaqClient(PSIDeviceBase):
)
if r.status_code != 200:
raise ConnectionError(f"[{self.name}] Error {r.status_code}:\t{r.text}")
# Wait for service to restart (and connect to make sure)
sleep(settle_time)
self.connect()
return r.json()
def create_virtual_dataset(self):
@@ -433,14 +422,17 @@ class StdDaqClient(PSIDeviceBase):
h5 virtual dataset
"""
url = self.rest_url.get() + '/api/h5/create_interleaved_vds'
file_path = self.file_path.get()
r = requests.post(
url,
params = {'user': 'ioc'},
data = {'base_path': self.file_path, 'output_file': 'fede_virtual_test'},
data = {'base_path': file_path, 'output_file': 'fede_virtual_test'},
timeout = 2,
headers = {'Content-type': 'application/json'}
)
print(r)
print(file_path)
def nuke(self, restarttime=5):
""" Reconfigures the stdDAQ to restart the services. This causes
@@ -452,13 +444,11 @@ class StdDaqClient(PSIDeviceBase):
sleep(restarttime)
def state(self) -> str | None:
""" Querry the current system state"""
""" Querry the current system status"""
try:
logger.debug(f"[{self.name}] Connecting to {self.ws_url.get()}")
_wsclient = connect(self.ws_url.get())
msg = json.dumps({'command': 'status'})
_wsclient.send(msg)
r = _wsclient.recv(timeout=1)
wsclient = self.connect()
wsclient.send(json.dumps({'command': 'status'}))
r = wsclient.recv(timeout=1)
r = json.loads(r)
return r['status']
except ConnectionRefusedError:

View File

@@ -160,4 +160,4 @@ class Measurement:
### TODO: camera reset
print("Handing over to 'scans.acquire_dark")
scans.acquire_dark(exp_burst=nimages_dark, exp_time=self.exposure_time, exp_period=self.exposure_period, image_width=self.roix,
image_height=self.roiy, acq_mode=acq_mode, file_path=file_path, nr_writers=1)
image_height=self.roiy, acq_mode=acq_mode, file_path=file_path, nr_writers=2)