Added complete method to stdDAQ

This commit is contained in:
gac-x05la
2025-01-23 11:58:55 +01:00
parent 22e20edd71
commit 9e026388b5
2 changed files with 86 additions and 96 deletions
+85 -95
View File
@@ -12,7 +12,9 @@ from threading import Thread
import requests import requests
from ophyd import Device, Signal, Component, Kind, Staged from ophyd import Device, Signal, Component, Kind, Staged
from websockets.sync.client import connect from ophyd.status import SubscriptionStatus
from ophyd.flyers import FlyerInterface
from websockets.sync.client import connect, ClientConnection
from websockets.exceptions import ConnectionClosedOK, ConnectionClosedError from websockets.exceptions import ConnectionClosedOK, ConnectionClosedError
from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase as PSIDeviceBase from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase as PSIDeviceBase
@@ -82,6 +84,7 @@ class StdDaqMixin(CustomDeviceMixin):
def on_unstage(self): def on_unstage(self):
""" Stop a running acquisition and close connection """ Stop a running acquisition and close connection
""" """
print("Creating virtual dataset")
self.parent.create_virtual_dataset() self.parent.create_virtual_dataset()
self.parent.blueunstage() self.parent.blueunstage()
@@ -99,8 +102,8 @@ class StdDaqMixin(CustomDeviceMixin):
sleep(0.2) sleep(0.2)
for msg in self.parent._wsclient: for msg in self.parent._wsclient:
message = json.loads(msg) message = json.loads(msg)
self.parent.status.put(message["status"], force=True) self.parent.runstatus.put(message["status"], force=True)
# logger.info(f"[{self.parent.name}] Pushed status: {message['status']}") logger.info(f"[{self.parent.name}] Pushed status: {message['status']}")
except (ConnectionClosedError, ConnectionClosedOK, AssertionError): except (ConnectionClosedError, ConnectionClosedOK, AssertionError):
# Libraty throws theese after connection is closed # Libraty throws theese after connection is closed
return return
@@ -114,11 +117,10 @@ class StdDaqMixin(CustomDeviceMixin):
class StdDaqClient(PSIDeviceBase): class StdDaqClient(PSIDeviceBase):
"""StdDaq API """StdDaq API
This class combines the new websocket and REST interfaces of the stdDAQ replaced the documented This class combines the new websocket and REST interfaces of the stdDAQ. The websocket
python client. The websocket interface starts and stops the acquisition and provides status, interface starts and stops the acquisition and provides status, while the REST interface
while the REST interface can read and write the JSON configuration file. can read and write the JSON configuration file. The stdDAQ needs to restart all services
to reconfigure with a new config, which might corrupt
The DAQ needs to restart all services to reconfigure with a new config, which might corrupt
the currently written files (fix is underway). the currently written files (fix is underway).
Example: Example:
@@ -132,12 +134,12 @@ class StdDaqClient(PSIDeviceBase):
_wsclient = None _wsclient = None
# Status attributes # Status attributes
ws_url = Component(Signal, kind=Kind.config) ws_url = Component(Signal, kind=Kind.config, metadata={'write_access': False})
status = Component(Signal, value="unknown", kind=Kind.normal) runstatus = Component(Signal, value="unknown", kind=Kind.normal, metadata={'write_access': False})
num_images = Component(Signal, value=10000, kind=Kind.config) num_images = Component(Signal, value=10000, kind=Kind.config)
file_path = Component(Signal, value="/gpfs/test/test-beamline", kind=Kind.config) file_path = Component(Signal, value="/gpfs/test/test-beamline", kind=Kind.config)
# Configuration attributes # Configuration attributes
rest_url = Component(Signal, kind=Kind.config) rest_url = Component(Signal, kind=Kind.config, metadata={'write_access': False})
cfg_detector_name = Component(Signal, kind=Kind.config) cfg_detector_name = Component(Signal, kind=Kind.config)
cfg_detector_type = Component(Signal, kind=Kind.config) cfg_detector_type = Component(Signal, kind=Kind.config)
cfg_bit_depth = Component(Signal, kind=Kind.config) cfg_bit_depth = Component(Signal, kind=Kind.config)
@@ -145,7 +147,6 @@ class StdDaqClient(PSIDeviceBase):
cfg_pixel_width = Component(Signal, kind=Kind.config) cfg_pixel_width = Component(Signal, kind=Kind.config)
cfg_nr_writers = Component(Signal, kind=Kind.config) cfg_nr_writers = Component(Signal, kind=Kind.config)
def __init__( def __init__(
self, self,
prefix="", prefix="",
@@ -156,27 +157,23 @@ class StdDaqClient(PSIDeviceBase):
configuration_attrs=None, configuration_attrs=None,
parent=None, parent=None,
device_manager=None, device_manager=None,
sim_mode=False,
ws_url: str = "ws://localhost:8080", ws_url: str = "ws://localhost:8080",
rest_url: str = "http://localhost:5000", rest_url: str = "http://localhost:5000",
data_source_name = None, data_source_name = None,
**kwargs, **kwargs,
) -> None: ) -> None:
super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, device_manager=device_manager, **kwargs) super().__init__(prefix=prefix, name=name, kind=kind, read_attrs=read_attrs, configuration_attrs=configuration_attrs, parent=parent, device_manager=device_manager, **kwargs)
self.status._metadata["write_access"] = False
self.ws_url._metadata["write_access"] = False
self.ws_url.set(ws_url, force=True).wait() self.ws_url.set(ws_url, force=True).wait()
self.rest_url._metadata["write_access"] = False
self.rest_url.set(rest_url, force=True).wait() self.rest_url.set(rest_url, force=True).wait()
self.data_source_name = data_source_name self.data_source_name = data_source_name
# Connect to the DAQ and initialize values # Connect to the DAQ and initialize values
try: try:
self.read_daq_config() self.get_daq_config(update=True)
except Exception as ex: except Exception as ex:
logger.error(f"Failed to connect to the stdDAQ REST API\n{ex}") logger.error(f"Failed to connect to the stdDAQ REST API\n{ex}")
def connect(self): def connect(self) -> ClientConnection:
"""Connect to the StdDAQ's websockets interface """Connect to the StdDAQ's websockets interface
StdDAQ may reject connection for a few seconds after restart, or when StdDAQ may reject connection for a few seconds after restart, or when
@@ -185,16 +182,14 @@ class StdDaqClient(PSIDeviceBase):
num_retry = 0 num_retry = 0
while num_retry < 5: while num_retry < 5:
try: try:
logger.debug(f"[{self.name}] Connecting to {self.ws_url.get()}") logger.debug(f"[{self.name}] Connecting to stdDAQ at {self.ws_url.get()}")
self._wsclient = connect(self.ws_url.get()) connection = connect(self.ws_url.get())
break logger.debug(f"[{self.name}] Connected to stdDAQ after {num_retry} tries")
return connection
except ConnectionRefusedError: except ConnectionRefusedError:
num_retry += 1 num_retry += 1
sleep(3) sleep(2)
if num_retry == 5: raise ConnectionRefusedError("The stdDAQ websocket interface refused connection 5 times.")
raise ConnectionRefusedError(
"The stdDAQ websocket interface refused connection 5 times.")
logger.debug(f"[{self.name}] Connected to DAQ after {num_retry} tries")
def message(self, message: dict, timeout=1, wait_reply=True, client=None) -> None | str: def message(self, message: dict, timeout=1, wait_reply=True, client=None) -> None | str:
"""Send a message to the StdDAQ and receive a reply """Send a message to the StdDAQ and receive a reply
@@ -207,7 +202,7 @@ class StdDaqClient(PSIDeviceBase):
# Connect if client was destroyed # Connect if client was destroyed
if self._wsclient is None: if self._wsclient is None:
self.connect() self._wsclient = self.connect()
# Send message (reopen connection if needed) # Send message (reopen connection if needed)
msg = json.dumps(message) if isinstance(message, dict) else str(message) msg = json.dumps(message) if isinstance(message, dict) else str(message)
@@ -215,7 +210,7 @@ class StdDaqClient(PSIDeviceBase):
self._wsclient.send(msg) self._wsclient.send(msg)
except (ConnectionClosedError, ConnectionClosedOK, AttributeError) as ex: except (ConnectionClosedError, ConnectionClosedOK, AttributeError) as ex:
# Re-connect if the connection was closed # Re-connect if the connection was closed
self.connect() self._wsclient = self.connect()
self._wsclient.send(msg) self._wsclient.send(msg)
# Wait for reply # Wait for reply
@@ -234,49 +229,36 @@ class StdDaqClient(PSIDeviceBase):
def configure(self, d: dict = None): def configure(self, d: dict = None):
"""Configure the next scan with the stdDAQ """Configure the next scan with the stdDAQ
Parameters as 'd' dictionary Parameters as 'd' dictionary, the default is unchanged.
---------------------------- ----------------------------
num_points_total : int, optional num_points_total : int, optional
Number of images to be taken during each scan. Set to -1 for an Number of images to be taken during each scan. Set to -1 for an unlimited number of
unlimited number of images (limited by the ringbuffer size and images (limited by the ringbuffer size and backend speed).
backend speed). (default = 10) file_path: str, optional
exposure : float, optional File path to save the data, usually GPFS.
Exposure time [ms]. (default = 0.2) image_width : int, optional
period : float, optional ROI size in the x-direction [pixels].
Exposure period [ms], ignored in soft trigger mode. (default = 1.0) image_height : int, optional
pixel_width : int, optional ROI size in the y-direction [pixels].
ROI size in the x-direction [pixels] (default = 2016) bit_depth: int, optional
pixel_height : int, optional Image bit depth for cameras that can change it [int].
ROI size in the y-direction [pixels] (default = 2016)
scanid : int, optional
Scan identification number to be associated with the scan data
(default = 0)
trigger_mode : str, optional
Trigger mode of the gifafrost
(default = unchanged)
nr_writers: int, optional nr_writers: int, optional
Number of writers Number of writers [int].
(default = unchanged)
correction_mode : int, optional
The correction to be applied to the imaging data. The following
modes are available (default = 5):
* 0: Bypass. No corrections are applied to the data.
* 1: Send correction factor A instead of pixel values
* 2: Send correction factor B instead of pixel values
* 3: Send correction factor C instead of pixel values
* 4: Invert pixel values, but do not apply any linearity correction
* 5: Apply the full linearity correction
""" """
# Configuration parameters
if 'image_width' in d and d['image_width']!=None: if 'image_width' in d and d['image_width']!=None:
self.cfg_pixel_width.set(d['image_width']).wait() self.cfg_pixel_width.set(d['image_width']).wait()
if 'image_height' in d and d['image_height']!=None: if 'image_height' in d and d['image_height']!=None:
self.cfg_pixel_height.set(d['image_height']).wait() self.cfg_pixel_height.set(d['image_height']).wait()
if 'bit_depth' in d:
self.cfg_bit_depth.set(d['bit_depth']).wait()
if 'nr_writers' in d and d['nr_writers']!=None:
self.cfg_nr_writers.set(d['nr_writers']).wait()
# Run parameters
if 'num_points_total' in d: if 'num_points_total' in d:
self.num_images.set(d['num_points_total']).wait() self.num_images.set(d['num_points_total']).wait()
if 'nr_writers' in d and d['nr_writers']!=None: if 'file_path' in d and d['file_path']!=None:
print('Setting number of writers ' + str(d['nr_writers'])) self.file_path.set(d['file_path']).wait()
self.cfg_nr_writers.set(d['nr_writers']).wait()
# Restart the DAQ if resolution changed # Restart the DAQ if resolution changed
cfg = self.get_daq_config() cfg = self.get_daq_config()
@@ -286,23 +268,17 @@ class StdDaqClient(PSIDeviceBase):
cfg['number_of_writers'] != self.cfg_nr_writers.get(): cfg['number_of_writers'] != self.cfg_nr_writers.get():
# Stop if current status is not idle # Stop if current status is not idle
status = self.state()
if self.state() != "idle": if self.state() != "idle":
raise RuntimeWarning(f"[{self.name}] stdDAQ reconfiguration might corrupt files") raise RuntimeWarning(f"[{self.name}] stdDAQ reconfiguration might corrupt files")
# Stop running acquisition
try:
self.stop()
except RuntimeError:
raise RuntimeWarning(f"[{self.name}] Told ya!")
# Update retrieved config # Update retrieved config
cfg['image_pixel_height'] = int(self.cfg_pixel_height.get()) cfg['image_pixel_height'] = int(self.cfg_pixel_height.get())
cfg['image_pixel_width'] = int(self.cfg_pixel_width.get()) cfg['image_pixel_width'] = int(self.cfg_pixel_width.get())
cfg['bit_depth'] = int(self.cfg_bit_depth.get()) cfg['bit_depth'] = int(self.cfg_bit_depth.get())
cfg['number_of_writers'] = int(self.cfg_nr_writers.get()) cfg['number_of_writers'] = int(self.cfg_nr_writers.get())
r = self.set_daq_config(cfg) self.set_daq_config(cfg)
cfg=self.read_daq_config() sleep(1)
self.get_daq_config(update=True)
def bluestage(self): def bluestage(self):
""" Stages the stdDAQ """ Stages the stdDAQ
@@ -312,8 +288,10 @@ class StdDaqClient(PSIDeviceBase):
it for obvious failures. it for obvious failures.
""" """
# Can't stage into a running exposure # Can't stage into a running exposure
print('Before')
if self.state() != 'idle': if self.state() != 'idle':
raise RuntimeError(f"[{self.name}] stdDAQ can't stage from state: {self.state()}") raise RuntimeError(f"[{self.name}] stdDAQ can't stage from state: {self.state()}")
print('After')
# Must make sure that image size matches the data source # Must make sure that image size matches the data source
if self.data_source_name is not None: if self.data_source_name is not None:
@@ -332,13 +310,13 @@ class StdDaqClient(PSIDeviceBase):
num_images = self.num_images.get() num_images = self.num_images.get()
# New connection # New connection
self.connect() self._wsclient = self.connect()
message = {"command": "start", "path": file_path, "n_image": num_images, } message = {"command": "start", "path": file_path, "n_image": num_images, }
reply = self.message(message) reply = self.message(message)
if reply is not None: if reply is not None:
reply = json.loads(reply) reply = json.loads(reply)
self.status.set(reply["status"], force=True).wait() self.runstatus.set(reply["status"], force=True).wait()
logger.info(f"[{self.name}] Start DAQ reply: {reply}") logger.info(f"[{self.name}] Start DAQ reply: {reply}")
# Give it more time to reconfigure # Give it more time to reconfigure
@@ -365,20 +343,20 @@ class StdDaqClient(PSIDeviceBase):
ii = 0 ii = 0
while ii<10: while ii<10:
# Stop the DAQ (will close connection) - reply is always "success" # Stop the DAQ (will close connection) - reply is always "success"
self.connect() self._wsclient = self.connect()
self.message({"command": "stop_all"}, wait_reply=False) self.message({"command": "stop_all"}, wait_reply=False)
# Let it consolidate # Let it consolidate
sleep(0.2) sleep(0.2)
# Check final status (from new connection) # Check final status (from new connection)
self.connect() self._wsclient = self.connect()
reply = self.message({"command": "status"}) reply = self.message({"command": "status"})
if reply is not None: if reply is not None:
logger.info(f"[{self.name}] DAQ status reply: {reply}") logger.info(f"[{self.name}] DAQ status reply: {reply}")
reply = json.loads(reply) reply = json.loads(reply)
if reply["status"] in ("idle"): if reply["status"] in ("idle", "error"):
# Only 'idle' state accepted # Only 'idle' state accepted
print(f"DAQ stopped on try {ii}") print(f"DAQ stopped on try {ii}")
return return
@@ -390,7 +368,18 @@ class StdDaqClient(PSIDeviceBase):
ii += 1 ii += 1
raise RuntimeError(f"Failed to stop StdDAQ in time") raise RuntimeError(f"Failed to stop StdDAQ in time")
def get_daq_config(self) -> dict: ##########################################################################
# Bluesky flyer interface
def complete(self) -> SubscriptionStatus:
"""Wait for current run. Must end in status 'file_saved'."""
def is_running(*args, value, timestamp, **kwargs):
result = value in ["idle", "file_saved", "error"]
return result
status = SubscriptionStatus(self.runstatus, is_running, settle_time=0.5)
return status
def get_daq_config(self, update=False) -> dict:
"""Read the current configuration from the DAQ """Read the current configuration from the DAQ
""" """
r = requests.get( r = requests.get(
@@ -399,21 +388,18 @@ class StdDaqClient(PSIDeviceBase):
timeout=2) timeout=2)
if r.status_code != 200: if r.status_code != 200:
raise ConnectionError(f"[{self.name}] Error {r.status_code}:\t{r.text}") raise ConnectionError(f"[{self.name}] Error {r.status_code}:\t{r.text}")
return r.json() cfg = r.json()
def read_daq_config(self) -> dict: if update:
"""Read the current configuration from the DAQ and update the ophyd device self.cfg_detector_name.set(cfg['detector_name']).wait()
""" self.cfg_detector_type.set(cfg['detector_type']).wait()
cfg = self.get_daq_config() self.cfg_bit_depth.set(cfg['bit_depth']).wait()
self.cfg_detector_name.set(cfg['detector_name']).wait() self.cfg_pixel_height.set(cfg['image_pixel_height']).wait()
self.cfg_detector_type.set(cfg['detector_type']).wait() self.cfg_pixel_width.set(cfg['image_pixel_width']).wait()
self.cfg_bit_depth.set(cfg['bit_depth']).wait() self.cfg_nr_writers.set(cfg['number_of_writers']).wait()
self.cfg_pixel_height.set(cfg['image_pixel_height']).wait()
self.cfg_pixel_width.set(cfg['image_pixel_width']).wait()
self.cfg_nr_writers.set(cfg['number_of_writers']).wait()
return cfg return cfg
def set_daq_config(self, config): def set_daq_config(self, config, settle_time=1):
"""Write a full configuration to the DAQ """Write a full configuration to the DAQ
""" """
url = self.rest_url.get() + '/api/config/set' url = self.rest_url.get() + '/api/config/set'
@@ -426,6 +412,9 @@ class StdDaqClient(PSIDeviceBase):
) )
if r.status_code != 200: if r.status_code != 200:
raise ConnectionError(f"[{self.name}] Error {r.status_code}:\t{r.text}") raise ConnectionError(f"[{self.name}] Error {r.status_code}:\t{r.text}")
# Wait for service to restart (and connect to make sure)
sleep(settle_time)
self.connect()
return r.json() return r.json()
def create_virtual_dataset(self): def create_virtual_dataset(self):
@@ -433,14 +422,17 @@ class StdDaqClient(PSIDeviceBase):
h5 virtual dataset h5 virtual dataset
""" """
url = self.rest_url.get() + '/api/h5/create_interleaved_vds' url = self.rest_url.get() + '/api/h5/create_interleaved_vds'
file_path = self.file_path.get()
r = requests.post( r = requests.post(
url, url,
params = {'user': 'ioc'}, params = {'user': 'ioc'},
data = {'base_path': self.file_path, 'output_file': 'fede_virtual_test'}, data = {'base_path': file_path, 'output_file': 'fede_virtual_test'},
timeout = 2, timeout = 2,
headers = {'Content-type': 'application/json'} headers = {'Content-type': 'application/json'}
) )
print(r)
print(file_path)
def nuke(self, restarttime=5): def nuke(self, restarttime=5):
""" Reconfigures the stdDAQ to restart the services. This causes """ Reconfigures the stdDAQ to restart the services. This causes
@@ -452,13 +444,11 @@ class StdDaqClient(PSIDeviceBase):
sleep(restarttime) sleep(restarttime)
def state(self) -> str | None: def state(self) -> str | None:
""" Querry the current system state""" """ Querry the current system status"""
try: try:
logger.debug(f"[{self.name}] Connecting to {self.ws_url.get()}") wsclient = self.connect()
_wsclient = connect(self.ws_url.get()) wsclient.send(json.dumps({'command': 'status'}))
msg = json.dumps({'command': 'status'}) r = wsclient.recv(timeout=1)
_wsclient.send(msg)
r = _wsclient.recv(timeout=1)
r = json.loads(r) r = json.loads(r)
return r['status'] return r['status']
except ConnectionRefusedError: except ConnectionRefusedError:
+1 -1
View File
@@ -160,4 +160,4 @@ class Measurement:
### TODO: camera reset ### TODO: camera reset
print("Handing over to 'scans.acquire_dark") print("Handing over to 'scans.acquire_dark")
scans.acquire_dark(exp_burst=nimages_dark, exp_time=self.exposure_time, exp_period=self.exposure_period, image_width=self.roix, scans.acquire_dark(exp_burst=nimages_dark, exp_time=self.exposure_time, exp_period=self.exposure_period, image_width=self.roix,
image_height=self.roiy, acq_mode=acq_mode, file_path=file_path, nr_writers=1) image_height=self.roiy, acq_mode=acq_mode, file_path=file_path, nr_writers=2)