This commit is contained in:
gac-x05la
2025-04-16 15:01:06 +02:00
parent bea5f95503
commit 4437bb13b8
5 changed files with 406 additions and 337 deletions

View File

@@ -7,78 +7,17 @@ synchronized output (PSO) interface.
""" """
from time import sleep from time import sleep
import numpy as np import numpy as np
from ophyd import Component, EpicsSignal, EpicsSignalRO, Kind from ophyd import Device, Component, EpicsSignal, EpicsSignalRO, Kind
from ophyd.status import DeviceStatus from ophyd.status import DeviceStatus
from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase as PSIDeviceBase from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
CustomDetectorMixin as CustomDeviceMixin,
)
from bec_lib import bec_logger from bec_lib import bec_logger
logger = bec_logger.logger logger = bec_logger.logger
class AerotechPsoDistanceMixin(CustomDeviceMixin): class aa1AxisPsoBase(PSIDeviceBase, Device):
"""Mixin class for self-configuration and staging
"""
# parent : aa1Tasks
def on_stage(self) -> None:
"""Configuration and staging
NOTE: Scans don't have to fully configure the device, that can be done
manually outside. However we expect that the device is disabled
when not in use. I.e. this method is not expected to be called when
PSO is not needed or when it'd conflict with other devices.
"""
# Fish out configuration from scaninfo (does not need to be full configuration)
d = {}
if "kwargs" in self.parent.scaninfo.scan_msg.info:
scanargs = self.parent.scaninfo.scan_msg.info["kwargs"]
if "pso_distance" in scanargs:
d["pso_distance"] = scanargs["pso_distance"]
if "pso_wavemode" in scanargs:
d["pso_wavemode"] = scanargs["pso_wavemode"]
if "pso_w_pulse" in scanargs:
d["pso_w_pulse"] = scanargs["pso_w_pulse"]
if "pso_t_pulse" in scanargs:
d["pso_t_pulse"] = scanargs["pso_t_pulse"]
if "pso_n_pulse" in scanargs:
d["pso_n_pulse"] = scanargs["pso_n_pulse"]
# Perform bluesky-style configuration
if len(d) > 0:
logger.info(f"[{self.parent.name}] Configuring with:\n{d}")
self.parent.configure(d=d)
# Stage the PSO distance module
self.parent.bluestage()
def on_unstage(self):
"""Standard bluesky unstage"""
# Ensure output is set to low
# if self.parent.output.value:
# self.parent.toggle()
# Turn off window mode
self.parent.winOutput.set("Off").wait()
self.parent.winEvents.set("Off").wait()
# Turn off distance mode
self.parent.dstEventsEna.set("Off").wait()
self.parent.dstCounterEna.set("Off").wait()
# Disable output
self.parent.outSource.set("None").wait()
# Sleep for one poll period
sleep(0.2)
def on_trigger(self) -> None | DeviceStatus:
"""Fire a single PSO event (i.e. manual software trigger)"""
# Only trigger if distance was set to invalid
logger.warning(f"[{self.parent.name}] Triggerin...")
if self.parent.dstDistanceVal.get() == 0:
status = self.parent._eventSingle.set(1, settle_time=0.1)
return status
class aa1AxisPsoBase(PSIDeviceBase):
"""Position Sensitive Output - Base class """Position Sensitive Output - Base class
This class provides convenience wrappers around the Aerotech IOC's PSO This class provides convenience wrappers around the Aerotech IOC's PSO
@@ -152,8 +91,34 @@ class aa1AxisPsoBase(PSIDeviceBase):
outPin = Component(EpicsSignalRO, "PIN", auto_monitor=True, kind=Kind.config) outPin = Component(EpicsSignalRO, "PIN", auto_monitor=True, kind=Kind.config)
outSource = Component(EpicsSignal, "SOURCE", put_complete=True, kind=Kind.config) outSource = Component(EpicsSignal, "SOURCE", put_complete=True, kind=Kind.config)
def trigger(self, settle_time=0.1) -> DeviceStatus: def __init__(
self,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
scan_info=None,
**kwargs,
):
# Need to call super() to call the mixin class
super().__init__(
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
scan_info=scan_info,
**kwargs,
)
def fire(self, settle_time=0.1) -> None | DeviceStatus:
"""Fire a single PSO event (i.e. manual software trigger)""" """Fire a single PSO event (i.e. manual software trigger)"""
# Only trigger if distance was set to invalid
logger.warning(f"[{self.name}] Triggerin...")
self._eventSingle.set(1, settle_time=settle_time).wait() self._eventSingle.set(1, settle_time=settle_time).wait()
status = DeviceStatus(self) status = DeviceStatus(self)
status.set_finished() status.set_finished()
@@ -163,7 +128,7 @@ class aa1AxisPsoBase(PSIDeviceBase):
"""Toggle waveform outup""" """Toggle waveform outup"""
orig_wave_mode = self.waveMode.get() orig_wave_mode = self.waveMode.get()
self.waveMode.set("Toggle").wait() self.waveMode.set("Toggle").wait()
self.trigger(0.1) self.fire(0.1)
self.waveMode.set(orig_wave_mode).wait() self.waveMode.set(orig_wave_mode).wait()
def configure(self, d: dict): def configure(self, d: dict):
@@ -232,13 +197,12 @@ class aa1AxisPsoDistance(aa1AxisPsoBase):
``` ```
""" """
custom_prepare_cls = AerotechPsoDistanceMixin
USER_ACCESS = ["configure", "prepare", "toggle"] USER_ACCESS = ["configure", "prepare", "toggle"]
_distance_value = None _distance_value = None
# ######################################################################## # ########################################################################
# PSO high level interface # PSO high level interface
def configure(self, d: dict = {}) -> tuple: def configure(self, d: dict = None) -> tuple:
"""Simplified configuration interface to access the most common """Simplified configuration interface to access the most common
functionality for distance mode PSO. functionality for distance mode PSO.
@@ -282,7 +246,49 @@ class aa1AxisPsoDistance(aa1AxisPsoBase):
logger.info(f"[{self.name}] PSO configured to {pso_wavemode} mode") logger.info(f"[{self.name}] PSO configured to {pso_wavemode} mode")
return (old, new) return (old, new)
def bluestage(self) -> None: def on_stage(self) -> None:
"""Configuration and staging
NOTE: Scans don't have to fully configure the device, that can be done
manually outside. However we expect that the device is disabled
when not in use. I.e. this method is not expected to be called when
PSO is not needed or when it'd conflict with other devices.
"""
# Fish out configuration from scaninfo (does not need to be full configuration)
d = {}
if "kwargs" in self.scaninfo.scan_msg.info:
scanargs = self.scaninfo.scan_msg.info["kwargs"]
if "pso_distance" in scanargs:
d["pso_distance"] = scanargs["pso_distance"]
if "pso_wavemode" in scanargs:
d["pso_wavemode"] = scanargs["pso_wavemode"]
if "pso_w_pulse" in scanargs:
d["pso_w_pulse"] = scanargs["pso_w_pulse"]
if "pso_t_pulse" in scanargs:
d["pso_t_pulse"] = scanargs["pso_t_pulse"]
if "pso_n_pulse" in scanargs:
d["pso_n_pulse"] = scanargs["pso_n_pulse"]
# Perform bluesky-style configuration
if d:
# logger.info(f"[{self.name}] Configuring with:\n{d}")
self.configure(d=d)
# Stage the PSO distance module
self.arm()
def on_unstage(self):
"""Turn off the PSO module"""
self.disarm()
def on_trigger(self, settle_time=0.1) -> None | DeviceStatus:
"""Fire a single PSO event (i.e. manual software trigger)"""
# Only trigger if distance was set to invalid
# if self.dstDistanceVal.get() == 0:
logger.warning(f"[{self.name}] Triggerin...")
return self.fire(settle_time)
def arm(self) -> None:
"""Bluesky style stage""" """Bluesky style stage"""
# Stage the PSO distance module and zero counter # Stage the PSO distance module and zero counter
if isinstance(self._distance_value, (np.ndarray, list, tuple)): if isinstance(self._distance_value, (np.ndarray, list, tuple)):
@@ -293,3 +299,19 @@ class aa1AxisPsoDistance(aa1AxisPsoBase):
if self.dstDistanceVal.get() > 0: if self.dstDistanceVal.get() > 0:
self.dstEventsEna.set("On").wait() self.dstEventsEna.set("On").wait()
self.dstCounterEna.set("On").wait() self.dstCounterEna.set("On").wait()
def disarm(self):
"""Standard bluesky unstage"""
# Ensure output is set to low
# if self.output.value:
# self.toggle()
# Turn off window mode
self.winOutput.set("Off").wait()
self.winEvents.set("Off").wait()
# Turn off distance mode
self.dstEventsEna.set("Off").wait()
self.dstCounterEna.set("Off").wait()
# Disable output
self.outSource.set("None").wait()
# Sleep for one poll period
sleep(0.2)

View File

@@ -5,71 +5,17 @@ interface.
@author: mohacsi_i @author: mohacsi_i
""" """
from ophyd import Component, EpicsSignal, EpicsSignalRO, Kind from ophyd import Device, Component, EpicsSignal, EpicsSignalRO, Kind
from ophyd.status import DeviceStatus, SubscriptionStatus from ophyd.status import DeviceStatus, SubscriptionStatus
from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase as PSIDeviceBase from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
CustomDetectorMixin as CustomDeviceMixin,
)
from bec_lib import bec_logger from bec_lib import bec_logger
logger = bec_logger.logger logger = bec_logger.logger
class AerotechTasksMixin(CustomDeviceMixin): class aa1Tasks(PSIDeviceBase, Device):
"""Mixin class for self-configuration and staging"""
# parent : aa1Tasks
def on_stage(self) -> None:
"""Configuration and staging
In the BEC model ophyd devices must fish out their own configuration from the 'scaninfo'.
I.e. they need to know which parameters are relevant for them at each scan.
NOTE: Scans don't have to fully configure the device, that can be done
manually outside. However we expect that the device is disabled
when not in use. I.e. this method is not expected to be called when
PSO is not needed or when it'd conflict with other devices.
"""
# Fish out our configuration from scaninfo (via explicit or generic addressing)
d = {}
if "kwargs" in self.parent.scaninfo.scan_msg.info:
scanargs = self.parent.scaninfo.scan_msg.info["kwargs"]
if self.parent.scaninfo.scan_type in ("script", "scripted"):
# NOTE: Scans don't have to fully configure the device
if "script_text" in scanargs and scanargs["script_text"] is not None:
d["script_text"] = scanargs["script_text"]
if "script_file" in scanargs and scanargs["script_file"] is not None:
d["script_file"] = scanargs["script_file"]
if "script_task" in scanargs and scanargs["script_task"] is not None:
d["script_task"] = scanargs["script_task"]
# FIXME: The above should be exchanged with:
# d = self.parent.scan_info.scan_msg.scan_parameters.get("aerotech_config")
# Perform bluesky-style configuration
if d:
# logger.warning(f"[{self.parent.name}] Configuring with:\n{d}")
self.parent.configure(d=d)
# The actual staging
self.parent.arm()
def on_unstage(self):
"""Stop the currently selected task"""
self.parent.disarm()
def on_stop(self):
"""Stop the currently selected task"""
self.on_unstage()
def on_kickoff(self):
"""Start execution of the selected task"""
self.parent.launch()
class aa1Tasks(PSIDeviceBase):
"""Task management API """Task management API
The place to manage tasks and AeroScript user files on the controller. The place to manage tasks and AeroScript user files on the controller.
@@ -99,8 +45,8 @@ class aa1Tasks(PSIDeviceBase):
''' '''
""" """
USER_ACCESS = ["arm", "disarm", "launch", "kickoff"] USER_ACCESS = ["arm", "disarm", "launch", "kickoff"]
custom_prepare_cls = AerotechTasksMixin
_failure = Component(EpicsSignalRO, "FAILURE", auto_monitor=True, kind=Kind.normal) _failure = Component(EpicsSignalRO, "FAILURE", auto_monitor=True, kind=Kind.normal)
errStatus = Component(EpicsSignalRO, "ERRW", auto_monitor=True, kind=Kind.normal) errStatus = Component(EpicsSignalRO, "ERRW", auto_monitor=True, kind=Kind.normal)
@@ -117,6 +63,30 @@ class aa1Tasks(PSIDeviceBase):
EpicsSignal, "FILEWRITE", string=True, kind=Kind.omitted, put_complete=True EpicsSignal, "FILEWRITE", string=True, kind=Kind.omitted, put_complete=True
) )
def __init__(
self,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
scan_info=None,
**kwargs,
):
# Need to call super() to call the mixin class
super().__init__(
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
scan_info=scan_info,
**kwargs,
)
def configure(self, d: dict) -> tuple: def configure(self, d: dict) -> tuple:
"""Configuration interface for flying""" """Configuration interface for flying"""
# Common operations # Common operations
@@ -141,6 +111,52 @@ class aa1Tasks(PSIDeviceBase):
new = self.read_configuration() new = self.read_configuration()
return (old, new) return (old, new)
def on_stage(self) -> None:
"""Configuration and staging
In the BEC model ophyd devices must fish out their own configuration from the 'scaninfo'.
I.e. they need to know which parameters are relevant for them at each scan.
NOTE: Scans don't have to fully configure the device, that can be done
manually outside. However we expect that the device is disabled
when not in use. I.e. this method is not expected to be called when
PSO is not needed or when it'd conflict with other devices.
"""
# Fish out our configuration from scaninfo (via explicit or generic addressing)
d = {}
if "kwargs" in self.scaninfo.scan_msg.info:
scanargs = self.scaninfo.scan_msg.info["kwargs"]
if self.scaninfo.scan_type in ("script", "scripted"):
# NOTE: Scans don't have to fully configure the device
if "script_text" in scanargs and scanargs["script_text"] is not None:
d["script_text"] = scanargs["script_text"]
if "script_file" in scanargs and scanargs["script_file"] is not None:
d["script_file"] = scanargs["script_file"]
if "script_task" in scanargs and scanargs["script_task"] is not None:
d["script_task"] = scanargs["script_task"]
# FIXME: The above should be exchanged with:
# d = self.scan_info.scan_msg.scan_parameters.get("aerotech_config")
# Perform bluesky-style configuration
if d:
self.configure(d=d)
# The actual staging
self.arm()
def on_unstage(self):
"""Stop the currently selected task"""
self.disarm()
def on_stop(self):
"""Stop the currently selected task"""
self.unstage()
def on_kickoff(self):
"""Start execution of the selected task"""
self.launch()
def arm(self) -> None: def arm(self) -> None:
"""Bluesky style stage, prepare, but does not execute""" """Bluesky style stage, prepare, but does not execute"""
if self.taskIndex.get() in (0, 1, 2): if self.taskIndex.get() in (0, 1, 2):
@@ -165,12 +181,6 @@ class aa1Tasks(PSIDeviceBase):
raise RuntimeError("Failed to load task, please check the Aerotech IOC") raise RuntimeError("Failed to load task, please check the Aerotech IOC")
return status return status
def kickoff(self):
"""Missing kickoff, for real?"""
self.custom_prepare.on_kickoff()
##########################################################################
# Bluesky flyer interface
def complete(self) -> SubscriptionStatus: def complete(self) -> SubscriptionStatus:
"""Wait for a RUNNING task""" """Wait for a RUNNING task"""
timestamp_ = 0 timestamp_ = 0

View File

@@ -189,13 +189,6 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase):
if daq_update: if daq_update:
self.backend.set_config(daq_update, force=False) self.backend.set_config(daq_update, force=False)
def disarm(self):
""" Bluesky style unstage"""
# Switch to idle
self.cmdStartCamera.set(0).wait()
if self.autoSoftEnable.get():
self.cmdSoftEnable.set(0).wait()
def set_acquisition_mode(self, acq_mode): def set_acquisition_mode(self, acq_mode):
"""Set acquisition mode """Set acquisition mode
@@ -217,7 +210,6 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase):
self.enable_mode = "soft" self.enable_mode = "soft"
self.trigger_mode = "auto" self.trigger_mode = "auto"
self.exposure_mode = "timer" self.exposure_mode = "timer"
elif acq_mode in ["ext_enable", "external_enable"]: elif acq_mode in ["ext_enable", "external_enable"]:
# NOTE: Trigger using external hardware events via enable input (actually works) # NOTE: Trigger using external hardware events via enable input (actually works)
# Switch to physical enable signal # Switch to physical enable signal
@@ -228,7 +220,6 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase):
self.enable_mode = "external" self.enable_mode = "external"
self.trigger_mode = "auto" self.trigger_mode = "auto"
self.exposure_mode = "timer" self.exposure_mode = "timer"
elif acq_mode == "soft": elif acq_mode == "soft":
# NOTE: Fede's configuration for continous streaming # NOTE: Fede's configuration for continous streaming
# Switch to physical enable signal # Switch to physical enable signal

View File

@@ -13,7 +13,7 @@ import requests
import os import os
from ophyd import Signal, Component, Kind from ophyd import Signal, Component, Kind
from ophyd.status import SubscriptionStatus, Status from ophyd.status import SubscriptionStatus
from websockets.sync.client import connect, ClientConnection from websockets.sync.client import connect, ClientConnection
from websockets.exceptions import ConnectionClosedOK, ConnectionClosedError from websockets.exceptions import ConnectionClosedOK, ConnectionClosedError
@@ -28,6 +28,8 @@ logger = bec_logger.logger
class StdDaqMixin(CustomDeviceMixin): class StdDaqMixin(CustomDeviceMixin):
# pylint: disable=protected-access # pylint: disable=protected-access
_mon = None
def on_stage(self) -> None: def on_stage(self) -> None:
"""Configuration and staging """Configuration and staging
@@ -39,9 +41,6 @@ class StdDaqMixin(CustomDeviceMixin):
# Fish out our configuration from scaninfo (via explicit or generic addressing) # Fish out our configuration from scaninfo (via explicit or generic addressing)
# NOTE: Scans don't have to fully configure the device # NOTE: Scans don't have to fully configure the device
d = {} d = {}
# scan_parameters = self.parent.scaninfo.scan_msg.scan_parameters
# std_daq_params = scan_parameters.get("std_daq_params")
if "kwargs" in self.parent.scaninfo.scan_msg.info: if "kwargs" in self.parent.scaninfo.scan_msg.info:
scanargs = self.parent.scaninfo.scan_msg.info["kwargs"] scanargs = self.parent.scaninfo.scan_msg.info["kwargs"]
if "image_width" in scanargs and scanargs["image_width"] is not None: if "image_width" in scanargs and scanargs["image_width"] is not None:
@@ -50,10 +49,6 @@ class StdDaqMixin(CustomDeviceMixin):
d["image_height"] = scanargs["image_height"] d["image_height"] = scanargs["image_height"]
if "nr_writers" in scanargs and scanargs["nr_writers"] is not None: if "nr_writers" in scanargs and scanargs["nr_writers"] is not None:
d["nr_writers"] = scanargs["nr_writers"] d["nr_writers"] = scanargs["nr_writers"]
if "system_config" in scanargs and scanargs["system_config"] is not None:
if scanargs["system_config"]["file_directory"]:
file_directory = scanargs["system_config"]["file_directory"]
### to be used in the future to substitute the procedure using file path
if "file_path" in scanargs and scanargs["file_path"] is not None: if "file_path" in scanargs and scanargs["file_path"] is not None:
self.parent.file_path.set(scanargs["file_path"].replace("data", "gpfs")).wait() self.parent.file_path.set(scanargs["file_path"].replace("data", "gpfs")).wait()
print(scanargs["file_path"]) print(scanargs["file_path"])
@@ -98,20 +93,40 @@ class StdDaqMixin(CustomDeviceMixin):
sleep(0.5) sleep(0.5)
# Try to start a new run (reconnects) # Try to start a new run (reconnects)
self.parent.arm() self.parent.bluestage()
# And start status monitoring
self._mon = Thread(target=self.monitor, daemon=True)
self._mon.start()
def on_unstage(self): def on_unstage(self):
"""Stop a running acquisition and close connection""" """Stop a running acquisition and close connection"""
print("Creating virtual dataset") print("Creating virtual dataset")
self.parent.create_virtual_dataset() self.parent.create_virtual_dataset()
self.parent.disarm() self.parent.blueunstage()
def on_stop(self): def on_stop(self):
"""Stop a running acquisition and close connection""" """Stop a running acquisition and close connection"""
self.parent.disarm() self.parent.blueunstage()
def monitor(self) -> None:
"""Monitor status messages while connection is open. This will block the reply monitoring
to calling unstage() might throw. Status updates are sent every 1 seconds, but finishing
acquisition means StdDAQ will close connection, so there's no idle state polling.
"""
try:
sleep(0.2)
for msg in self.parent._wsclient:
message = json.loads(msg)
self.parent.runstatus.put(message["status"], force=True)
# logger.info(f"[{self.parent.name}] Pushed status: {message['status']}")
except (ConnectionClosedError, ConnectionClosedOK, AssertionError):
# Libraty throws theese after connection is closed
return
except Exception as ex:
logger.warning(f"[{self.parent.name}] Exception in polling: {ex}")
return
finally:
self._mon = None
class StdDaqClient(PSIDeviceBase): class StdDaqClient(PSIDeviceBase):
@@ -137,9 +152,9 @@ class StdDaqClient(PSIDeviceBase):
"nuke", "nuke",
"connect", "connect",
"message", "message",
"arm", "state",
"disarm", "bluestage",
"complete", "blueunstage",
] ]
_wsclient = None _wsclient = None
@@ -153,14 +168,12 @@ class StdDaqClient(PSIDeviceBase):
file_prefix = Component(Signal, value="file", kind=Kind.config) file_prefix = Component(Signal, value="file", kind=Kind.config)
# Configuration attributes # Configuration attributes
rest_url = Component(Signal, kind=Kind.config, metadata={"write_access": False}) rest_url = Component(Signal, kind=Kind.config, metadata={"write_access": False})
datasource = Component(Signal, kind=Kind.config, metadata={"write_access": False})
cfg_detector_name = Component(Signal, kind=Kind.config) cfg_detector_name = Component(Signal, kind=Kind.config)
cfg_detector_type = Component(Signal, kind=Kind.config) cfg_detector_type = Component(Signal, kind=Kind.config)
cfg_bit_depth = Component(Signal, kind=Kind.config) cfg_bit_depth = Component(Signal, kind=Kind.config)
cfg_pixel_height = Component(Signal, kind=Kind.config) cfg_pixel_height = Component(Signal, kind=Kind.config)
cfg_pixel_width = Component(Signal, kind=Kind.config) cfg_pixel_width = Component(Signal, kind=Kind.config)
cfg_nr_writers = Component(Signal, kind=Kind.config) cfg_nr_writers = Component(Signal, kind=Kind.config)
_mon = None
def __init__( def __init__(
self, self,
@@ -174,7 +187,7 @@ class StdDaqClient(PSIDeviceBase):
device_manager=None, device_manager=None,
ws_url: str = "ws://localhost:8080", ws_url: str = "ws://localhost:8080",
rest_url: str = "http://localhost:5000", rest_url: str = "http://localhost:5000",
data_source_name="", data_source_name=None,
**kwargs, **kwargs,
) -> None: ) -> None:
super().__init__( super().__init__(
@@ -189,66 +202,67 @@ class StdDaqClient(PSIDeviceBase):
) )
self.ws_url.set(ws_url, force=True).wait() self.ws_url.set(ws_url, force=True).wait()
self.rest_url.set(rest_url, force=True).wait() self.rest_url.set(rest_url, force=True).wait()
self.datasource.set(data_source_name, force=True).wait() self.data_source_name = data_source_name
# Connect to the DAQ and initialize values # Connect to the DAQ and initialize values
try: try:
self.get_daq_config(update=True) self.get_daq_config(update=True)
except Exception as ex: except Exception as ex:
logger.error(f"Failed to connect to the stdDAQ REST API\n{ex}") logger.error(f"Failed to connect to the stdDAQ REST API\n{ex}")
# Connect to websockets and start poller
try:
self.connect()
except Exception as ex:
logger.error(f"Failed to connect to the stdDAQ websocket interface\n{ex}")
def connect(self) -> None: def connect(self) -> ClientConnection:
"""Connect to the StdDAQ's websockets interface """Connect to the StdDAQ's websockets interface
StdDAQ may reject connection for a few seconds after restart, or when StdDAQ may reject connection for a few seconds after restart, or when
it wants so if it fails, wait a bit and try to connect again. it wants so if it fails, wait a bit and try to connect again.
""" """
# Connect to stdDAQ num_retry = 0
logger.debug(f"[{self.name}] Connecting to stdDAQ at {self.ws_url.get()}") while num_retry < 5:
self._wsclient = connect(self.ws_url.get()) try:
# And start status monitoring logger.debug(f"[{self.name}] Connecting to stdDAQ at {self.ws_url.get()}")
self._mon = Thread(target=self.monitor, daemon=True) connection = connect(self.ws_url.get())
self._mon.start() logger.debug(f"[{self.name}] Connected to stdDAQ after {num_retry} tries")
return connection
except ConnectionRefusedError:
num_retry += 1
sleep(2)
raise ConnectionRefusedError("The stdDAQ websocket interface refused connection 5 times.")
def monitor(self) -> None: def message(self, message: dict, timeout=1, wait_reply=True, client=None) -> None | str:
"""Monitor status messages while connection is open. Status updates are
sent every 1 seconds, or when there's a transition.
"""
try:
for msg in self._wsclient:
message = json.loads(msg)
self.runstatus.put(message["status"], force=True)
# logger.info(f"[{self.parent.name}] Pushed status: {message['status']}")
except Exception as ex:
logger.warning(f"[{self.name}] Exception in polling: {ex}")
return
finally:
self._mon = None
def message(self, message: dict) -> None:
"""Send a message to the StdDAQ and receive a reply """Send a message to the StdDAQ and receive a reply
Note: finishing acquisition means StdDAQ will close connection, so
there's no idle state polling.
""" """
# Prepare message # Prepare message
msg = json.dumps(message) if isinstance(message, dict) else str(message) msg = json.dumps(message) if isinstance(message, dict) else str(message)
# Connect if client was destroyed # Connect if client was destroyed
if self._wsclient is None: if self._wsclient is None:
self.connect() self._wsclient = self.connect()
# Send message (reopen connection if needed) # Send message (reopen connection if needed)
msg = json.dumps(message) if isinstance(message, dict) else str(message) msg = json.dumps(message) if isinstance(message, dict) else str(message)
try: try:
self._wsclient.send(msg) self._wsclient.send(msg)
except (ConnectionClosedError, ConnectionClosedOK, AttributeError): except (ConnectionClosedError, ConnectionClosedOK, AttributeError) as ex:
# Re-connect if the connection was closed # Re-connect if the connection was closed
self.connect() self._wsclient = self.connect()
self._wsclient.send(msg) self._wsclient.send(msg)
# Wait for reply
reply = None
if wait_reply:
try:
reply = self._wsclient.recv(timeout)
return reply
except (ConnectionClosedError, ConnectionClosedOK) as ex:
self._wsclient = None
logger.error(f"[{self.name}] WS connection was closed before reply: {ex}")
except (TimeoutError, RuntimeError) as ex:
logger.error(f"[{self.name}] Error in receiving ws reply: {ex}")
return reply
def configure(self, d: dict = None): def configure(self, d: dict = None):
"""Configure the next scan with the stdDAQ """Configure the next scan with the stdDAQ
@@ -259,8 +273,6 @@ class StdDaqClient(PSIDeviceBase):
images (limited by the ringbuffer size and backend speed). images (limited by the ringbuffer size and backend speed).
file_path: str, optional file_path: str, optional
File path to save the data, usually GPFS. File path to save the data, usually GPFS.
file_prefix: str, optional
File prefix to save the data [default = file].
image_width : int, optional image_width : int, optional
ROI size in the x-direction [pixels]. ROI size in the x-direction [pixels].
image_height : int, optional image_height : int, optional
@@ -283,10 +295,6 @@ class StdDaqClient(PSIDeviceBase):
# Run parameters # Run parameters
if "num_points_total" in d: if "num_points_total" in d:
self.num_images.set(d["num_points_total"]).wait() self.num_images.set(d["num_points_total"]).wait()
if "file_path" in d:
self.file_path.set(d["file_path"]).wait()
if "file_prefix" in d:
self.file_prefix.set(d["file_prefix"]).wait()
# Restart the DAQ if resolution changed # Restart the DAQ if resolution changed
cfg = self.get_daq_config() cfg = self.get_daq_config()
@@ -298,8 +306,8 @@ class StdDaqClient(PSIDeviceBase):
): ):
# Stop if current status is not idle # Stop if current status is not idle
# if self.runstatus.get() != "idle": if self.state() != "idle":
# logger.warning(f"[{self.name}] stdDAQ reconfiguration might corrupt files") logger.warning(f"[{self.name}] stdDAQ reconfiguration might corrupt files")
# Update retrieved config # Update retrieved config
cfg["image_pixel_height"] = int(self.cfg_pixel_height.get()) cfg["image_pixel_height"] = int(self.cfg_pixel_height.get())
@@ -310,7 +318,7 @@ class StdDaqClient(PSIDeviceBase):
sleep(1) sleep(1)
self.get_daq_config(update=True) self.get_daq_config(update=True)
def arm(self): def bluestage(self):
"""Stages the stdDAQ """Stages the stdDAQ
Opens a new connection to the stdDAQ, sends the start command with Opens a new connection to the stdDAQ, sends the start command with
@@ -318,58 +326,103 @@ class StdDaqClient(PSIDeviceBase):
it for obvious failures. it for obvious failures.
""" """
# Can't stage into a running exposure # Can't stage into a running exposure
# if self.runstatus.get() != "idle": if self.state() != "idle":
# raise RuntimeError(f"[{self.name}] stdDAQ can't stage from state: {self.runstatus.get()}") raise RuntimeError(f"[{self.name}] stdDAQ can't stage from state: {self.state()}")
# Ensure expected shape # Must make sure that image size matches the data source
self.validate() if self.data_source_name is not None:
cam_img_w = self.device_manager.devices[self.data_source_name].cfgRoiX.get()
cam_img_h = self.device_manager.devices[self.data_source_name].cfgRoiY.get()
daq_img_w = self.cfg_pixel_width.get()
daq_img_h = self.cfg_pixel_height.get()
if not (daq_img_w == cam_img_w and daq_img_h == cam_img_h):
raise RuntimeError(
f"[{self.name}] stdDAQ image resolution ({daq_img_w} , {daq_img_h}) does not match camera with ({cam_img_w} , {cam_img_h})"
)
else:
logger.warning(
f"[{self.name}] stdDAQ image resolution ({daq_img_w} , {daq_img_h}) matches camera with ({cam_img_w} , {cam_img_h})"
)
file_path = self.file_path.get() file_path = self.file_path.get()
file_prefix = self.file_prefix.get()
num_images = self.num_images.get() num_images = self.num_images.get()
file_prefix = self.file_prefix.get()
print(file_prefix)
# New connection # New connection
self._wsclient = self.connect()
message = { message = {
"command": "start", "command": "start",
"path": file_path, "path": file_path,
"file_prefix": file_prefix, "file_prefix": file_prefix,
"n_image": num_images, "n_image": num_images,
} }
self.message(message) reply = self.message(message)
def is_running(*args, value, timestamp, **kwargs): if reply is not None:
result = value not in ["idle", "unknown", "error"] reply = json.loads(reply)
return result self.runstatus.set(reply["status"], force=True).wait()
logger.info(f"[{self.name}] Start DAQ reply: {reply}")
status = SubscriptionStatus(self.runstatus, is_running, timeout=3, settle_time=0.5) # Give it more time to reconfigure
status.wait() if reply["status"] in ("rejected"):
# FIXME: running exposure is a nogo
if reply["reason"] == "driver is busy!":
raise RuntimeError(
f"[{self.name}] Start stdDAQ command rejected: already running"
)
else:
# Give it more time to consolidate
sleep(1)
else:
# Success!!!
print(f"[{self.name}] Started stdDAQ in: {reply['status']}")
return
logger.warning(f"[{self.name}] Started stdDAQ in: {self.runstatus.get()}") raise RuntimeError(
return status f"[{self.name}] Failed to start the stdDAQ in 1 tries, reason: {reply['reason']}"
)
def disarm(self): def blueunstage(self):
"""Unstages the stdDAQ """Unstages the stdDAQ
Opens a new connection to the stdDAQ, sends the stop command and Opens a new connection to the stdDAQ, sends the stop command and
waits for the idle state. waits for the idle state.
""" """
# Stop the DAQ (will close connection) - reply is always "success" ii = 0
self.message({"command": "stop"}) while ii < 10:
# Stop the DAQ (will close connection) - reply is always "success"
self._wsclient = self.connect()
self.message({"command": "stop_all"}, wait_reply=False)
def is_running(*args, value, timestamp, **kwargs): # Let it consolidate
result = value in ["idle", "unknown", "error"] sleep(0.2)
return result
status = SubscriptionStatus(self.runstatus, is_running, timeout=3, settle_time=0.5) # Check final status (from new connection)
status.wait() self._wsclient = self.connect()
reply = self.message({"command": "status"})
if reply is not None:
logger.info(f"[{self.name}] DAQ status reply: {reply}")
reply = json.loads(reply)
logger.warning(f"[{self.name}] Stopped stdDAQ in: {self.runstatus.get()}") if reply["status"] in ("idle", "error"):
return status # Only 'idle' state accepted
print(f"DAQ stopped on try {ii}")
return
elif reply["status"] in ("stop"):
# Give it more time to stop
sleep(0.5)
elif ii >= 6:
raise RuntimeError(f"Failed to stop StdDAQ: {reply}")
ii += 1
raise RuntimeError(f"Failed to stop StdDAQ in time")
########################################################################## ##########################################################################
# Bluesky flyer interface # Bluesky flyer interface
def complete(self) -> SubscriptionStatus: def complete(self) -> SubscriptionStatus:
"""Wait for current run. Must end in status 'file_saved'.""" """Wait for current run. Must end in status 'file_saved'."""
def is_running(*args, value, timestamp, **kwargs): def is_running(*args, value, timestamp, **kwargs):
result = value in ["idle", "file_saved", "error"] result = value in ["idle", "file_saved", "error"]
return result return result
@@ -377,25 +430,6 @@ class StdDaqClient(PSIDeviceBase):
status = SubscriptionStatus(self.runstatus, is_running, settle_time=0.5) status = SubscriptionStatus(self.runstatus, is_running, settle_time=0.5)
return status return status
def validate(self):
"""Validate camera state
Ensure that data source shape matches with the shape expected by the stdDAQ.
"""
# Must make sure that image size matches the data source
source = self.datasource.get()
if source is not None and len(source) > 0:
if source == "gfcam":
cam_img_w = self.device_manager.devices[source].cfgRoiX.get()
cam_img_h = self.device_manager.devices[source].cfgRoiY.get()
daq_img_w = self.cfg_pixel_width.get()
daq_img_h = self.cfg_pixel_height.get()
if not (daq_img_w == cam_img_w and daq_img_h == cam_img_h):
raise RuntimeError(
f"[{self.name}] stdDAQ image resolution ({daq_img_w} , {daq_img_h}) does not match camera with ({cam_img_w} , {cam_img_h})"
)
def get_daq_config(self, update=False) -> dict: def get_daq_config(self, update=False) -> dict:
"""Read the current configuration from the DAQ""" """Read the current configuration from the DAQ"""
r = requests.get(self.rest_url.get() + "/api/config/get", params={"user": "ioc"}, timeout=2) r = requests.get(self.rest_url.get() + "/api/config/get", params={"user": "ioc"}, timeout=2)
@@ -458,6 +492,17 @@ class StdDaqClient(PSIDeviceBase):
self.set_daq_config(cfg) self.set_daq_config(cfg)
sleep(restarttime) sleep(restarttime)
def state(self) -> str | None:
"""Querry the current system status"""
try:
wsclient = self.connect()
wsclient.send(json.dumps({"command": "status"}))
r = wsclient.recv(timeout=1)
r = json.loads(r)
return r["status"]
except ConnectionRefusedError:
raise
# Automatically connect to microXAS testbench if directly invoked # Automatically connect to microXAS testbench if directly invoked
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -19,9 +19,15 @@ from ophyd_devices.interfaces.base_classes.psi_detector_base import (
) )
from bec_lib import bec_logger from bec_lib import bec_logger
logger = bec_logger.logger logger = bec_logger.logger
ZMQ_TOPIC_FILTER = b"" ZMQ_TOPIC_FILTER = b''
class StdDaqPreviewState(enum.IntEnum):
"""Standard DAQ ophyd device states"""
UNKNOWN = 0
DETACHED = 1
MONITORING = 2
class StdDaqPreviewMixin(CustomDetectorMixin): class StdDaqPreviewMixin(CustomDetectorMixin):
@@ -29,19 +35,94 @@ class StdDaqPreviewMixin(CustomDetectorMixin):
Parent class: CustomDetectorMixin Parent class: CustomDetectorMixin
""" """
_mon = None
def on_stage(self): def on_stage(self):
"""Start listening for preview data stream""" """Start listening for preview data stream"""
self.parent.arm() if self._mon is not None:
self.parent.unstage()
sleep(0.5)
self.parent.connect()
self._stop_polling = False
self._mon = Thread(target=self.poll, daemon=True)
self._mon.start()
def on_unstage(self): def on_unstage(self):
"""Stop a running preview""" """Stop a running preview"""
self.parent.disarm() if self._mon is not None:
self._stop_polling = True
# Might hang on recv_multipart
self._mon.join(timeout=1)
# So also disconnect the socket
self.parent._socket.disconnect(self.parent.url.get())
def on_stop(self): def on_stop(self):
"""Stop a running preview""" """Stop a running preview"""
self.on_unstage() self.on_unstage()
def poll(self):
"""Collect streamed updates"""
self.parent.status.set(StdDaqPreviewState.MONITORING, force=True)
try:
t_last = time()
while True:
try:
# Exit loop and finish monitoring
if self._stop_polling:
logger.info(f"[{self.parent.name}]\tDetaching monitor")
break
# pylint: disable=no-member
r = self.parent._socket.recv_multipart(flags=zmq.NOBLOCK)
# Length and throtling checks
if len(r) != 2:
logger.warning(
f"[{self.parent.name}] Received malformed array of length {len(r)}")
t_curr = time()
t_elapsed = t_curr - t_last
if t_elapsed < self.parent.throttle.get():
sleep(0.1)
continue
# Unpack the Array V1 reply to metadata and array data
meta, data = r
# Update image and update subscribers
header = json.loads(meta)
if header["type"] == "uint16":
image = np.frombuffer(data, dtype=np.uint16)
if image.size != np.prod(header['shape']):
err = f"Unexpected array size of {image.size} for header: {header}"
raise ValueError(err)
image = image.reshape(header['shape'])
# Update image and update subscribers
self.parent.frame.put(header['frame'], force=True)
self.parent.image_shape.put(header['shape'], force=True)
self.parent.image.put(image, force=True)
self.parent._last_image = image
self.parent._run_subs(sub_type=self.parent.SUB_MONITOR, value=image)
t_last = t_curr
logger.info(
f"[{self.parent.name}] Updated frame {header['frame']}\t"
f"Shape: {header['shape']}\tMean: {np.mean(image):.3f}"
)
except ValueError:
# Happens when ZMQ partially delivers the multipart message
pass
except zmq.error.Again:
# Happens when receive queue is empty
sleep(0.1)
except Exception as ex:
logger.info(f"[{self.parent.name}]\t{str(ex)}")
raise
finally:
self._mon = None
self.parent.status.set(StdDaqPreviewState.DETACHED, force=True)
logger.info(f"[{self.parent.name}]\tDetaching monitor")
class StdDaqPreviewDetector(PSIDetectorBase): class StdDaqPreviewDetector(PSIDetectorBase):
"""Detector wrapper class around the StdDaq preview image stream. """Detector wrapper class around the StdDaq preview image stream.
@@ -53,9 +134,8 @@ class StdDaqPreviewDetector(PSIDetectorBase):
You can add a preview widget to the dock by: You can add a preview widget to the dock by:
cam_widget = gui.add_dock('cam_dock1').add_widget('BECFigure').image('daq_stream1') cam_widget = gui.add_dock('cam_dock1').add_widget('BECFigure').image('daq_stream1')
""" """
# Subscriptions for plotting image # Subscriptions for plotting image
USER_ACCESS = ["arm", "disarm", "get_last_image"] USER_ACCESS = ["kickoff", "get_last_image"]
SUB_MONITOR = "device_monitor_2d" SUB_MONITOR = "device_monitor_2d"
_default_sub = SUB_MONITOR _default_sub = SUB_MONITOR
@@ -64,19 +144,19 @@ class StdDaqPreviewDetector(PSIDetectorBase):
# Status attributes # Status attributes
url = Component(Signal, kind=Kind.config) url = Component(Signal, kind=Kind.config)
throttle = Component(Signal, value=0.25, kind=Kind.config) throttle = Component(Signal, value=0.25, kind=Kind.config)
status = Component(Signal, value=StdDaqPreviewState.UNKNOWN, kind=Kind.omitted)
frame = Component(Signal, kind=Kind.hinted) frame = Component(Signal, kind=Kind.hinted)
image_shape = Component(Signal, kind=Kind.normal) image_shape = Component(Signal, kind=Kind.normal)
# FIXME: The BEC client caches the read()s from the last 50 scans # FIXME: The BEC client caches the read()s from the last 50 scans
image = Component(Signal, kind=Kind.omitted) image = Component(Signal, kind=Kind.omitted)
_last_image = None _last_image = None
_stop_polling = True
_mon = None
def __init__( def __init__(
self, *args, url: str = "tcp://129.129.95.38:20000", parent: Device = None, **kwargs self, *args, url: str = "tcp://129.129.95.38:20000", parent: Device = None, **kwargs
) -> None: ) -> None:
super().__init__(*args, parent=parent, **kwargs) super().__init__(*args, parent=parent, **kwargs)
self.url._metadata["write_access"] = False self.url._metadata["write_access"] = False
self.status._metadata["write_access"] = False
self.image._metadata["write_access"] = False self.image._metadata["write_access"] = False
self.frame._metadata["write_access"] = False self.frame._metadata["write_access"] = False
self.image_shape._metadata["write_access"] = False self.image_shape._metadata["write_access"] = False
@@ -105,88 +185,9 @@ class StdDaqPreviewDetector(PSIDetectorBase):
def get_image(self): def get_image(self):
return self._last_image return self._last_image
def arm(self): def kickoff(self) -> DeviceStatus:
"""Start listening for preview data stream""" """ The DAQ was not meant to be toggled"""
if self._mon is not None: return DeviceStatus(self, done=True, success=True, settle_time=0.1)
self.unstage()
sleep(0.5)
self.connect()
self._stop_polling = False
self._mon = Thread(target=self.poll, daemon=True)
self._mon.start()
def disarm(self):
"""Stop a running preview"""
if self._mon is not None:
self._stop_polling = True
# Might hang on recv_multipart
self._mon.join(timeout=1)
# So also disconnect the socket (if not already disconnected)
try:
self._socket.disconnect(self.url.get())
except zmq.error.ZMQError:
pass
def poll(self):
"""Collect streamed updates"""
try:
t_last = time()
while True:
try:
# Exit loop and finish monitoring
if self._stop_polling:
break
# pylint: disable=no-member
r = self._socket.recv_multipart(flags=zmq.NOBLOCK)
# Length and throtling checks
if len(r) != 2:
logger.warning(f"[{self.name}] Received malformed array of length {len(r)}")
t_curr = time()
t_elapsed = t_curr - t_last
if t_elapsed < self.throttle.get():
sleep(0.1)
continue
# Unpack the Array V1 reply to metadata and array data
meta, data = r
# Update image and update subscribers
header = json.loads(meta)
image = None
if header["type"] == "uint16":
image = np.frombuffer(data, dtype=np.uint16)
if image.size != np.prod(header["shape"]):
err = f"Unexpected array size of {image.size} for header: {header}"
raise ValueError(err)
image = image.reshape(header["shape"])
# Update image and update subscribers
self.frame.put(header["frame"], force=True)
self.image_shape.put(header["shape"], force=True)
self.image.put(image, force=True)
self._last_image = image
self._run_subs(sub_type=self.SUB_MONITOR, value=image)
t_last = t_curr
logger.info(
f"[{self.name}] Updated frame {header['frame']}\t"
f"Shape: {header['shape']}\tMean: {np.mean(image):.3f}"
)
except ValueError:
# Happens when ZMQ partially delivers the multipart message
pass
except zmq.error.Again:
# Happens when receive queue is empty
sleep(0.1)
except Exception as ex:
logger.info(f"[{self.name}]\t{str(ex)}")
raise
finally:
self._mon = None
logger.info(f"[{self.name}]\tDetaching monitor")
# Automatically connect to MicroSAXS testbench if directly invoked # Automatically connect to MicroSAXS testbench if directly invoked