This commit is contained in:
gac-x05la
2025-04-16 15:01:06 +02:00
parent bea5f95503
commit 4437bb13b8
5 changed files with 406 additions and 337 deletions

View File

@@ -7,78 +7,17 @@ synchronized output (PSO) interface.
"""
from time import sleep
import numpy as np
from ophyd import Component, EpicsSignal, EpicsSignalRO, Kind
from ophyd import Device, Component, EpicsSignal, EpicsSignalRO, Kind
from ophyd.status import DeviceStatus
from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase as PSIDeviceBase
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
CustomDetectorMixin as CustomDeviceMixin,
)
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from bec_lib import bec_logger
logger = bec_logger.logger
class AerotechPsoDistanceMixin(CustomDeviceMixin):
"""Mixin class for self-configuration and staging
"""
# parent : aa1Tasks
def on_stage(self) -> None:
"""Configuration and staging
NOTE: Scans don't have to fully configure the device, that can be done
manually outside. However we expect that the device is disabled
when not in use. I.e. this method is not expected to be called when
PSO is not needed or when it'd conflict with other devices.
"""
# Fish out configuration from scaninfo (does not need to be full configuration)
d = {}
if "kwargs" in self.parent.scaninfo.scan_msg.info:
scanargs = self.parent.scaninfo.scan_msg.info["kwargs"]
if "pso_distance" in scanargs:
d["pso_distance"] = scanargs["pso_distance"]
if "pso_wavemode" in scanargs:
d["pso_wavemode"] = scanargs["pso_wavemode"]
if "pso_w_pulse" in scanargs:
d["pso_w_pulse"] = scanargs["pso_w_pulse"]
if "pso_t_pulse" in scanargs:
d["pso_t_pulse"] = scanargs["pso_t_pulse"]
if "pso_n_pulse" in scanargs:
d["pso_n_pulse"] = scanargs["pso_n_pulse"]
# Perform bluesky-style configuration
if len(d) > 0:
logger.info(f"[{self.parent.name}] Configuring with:\n{d}")
self.parent.configure(d=d)
# Stage the PSO distance module
self.parent.bluestage()
def on_unstage(self):
"""Standard bluesky unstage"""
# Ensure output is set to low
# if self.parent.output.value:
# self.parent.toggle()
# Turn off window mode
self.parent.winOutput.set("Off").wait()
self.parent.winEvents.set("Off").wait()
# Turn off distance mode
self.parent.dstEventsEna.set("Off").wait()
self.parent.dstCounterEna.set("Off").wait()
# Disable output
self.parent.outSource.set("None").wait()
# Sleep for one poll period
sleep(0.2)
def on_trigger(self) -> None | DeviceStatus:
"""Fire a single PSO event (i.e. manual software trigger)"""
# Only trigger if distance was set to invalid
logger.warning(f"[{self.parent.name}] Triggerin...")
if self.parent.dstDistanceVal.get() == 0:
status = self.parent._eventSingle.set(1, settle_time=0.1)
return status
class aa1AxisPsoBase(PSIDeviceBase):
class aa1AxisPsoBase(PSIDeviceBase, Device):
"""Position Sensitive Output - Base class
This class provides convenience wrappers around the Aerotech IOC's PSO
@@ -152,8 +91,34 @@ class aa1AxisPsoBase(PSIDeviceBase):
outPin = Component(EpicsSignalRO, "PIN", auto_monitor=True, kind=Kind.config)
outSource = Component(EpicsSignal, "SOURCE", put_complete=True, kind=Kind.config)
def trigger(self, settle_time=0.1) -> DeviceStatus:
def __init__(
self,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
scan_info=None,
**kwargs,
):
# Need to call super() to call the mixin class
super().__init__(
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
scan_info=scan_info,
**kwargs,
)
def fire(self, settle_time=0.1) -> None | DeviceStatus:
"""Fire a single PSO event (i.e. manual software trigger)"""
# Only trigger if distance was set to invalid
logger.warning(f"[{self.name}] Triggerin...")
self._eventSingle.set(1, settle_time=settle_time).wait()
status = DeviceStatus(self)
status.set_finished()
@@ -163,7 +128,7 @@ class aa1AxisPsoBase(PSIDeviceBase):
"""Toggle waveform outup"""
orig_wave_mode = self.waveMode.get()
self.waveMode.set("Toggle").wait()
self.trigger(0.1)
self.fire(0.1)
self.waveMode.set(orig_wave_mode).wait()
def configure(self, d: dict):
@@ -232,13 +197,12 @@ class aa1AxisPsoDistance(aa1AxisPsoBase):
```
"""
custom_prepare_cls = AerotechPsoDistanceMixin
USER_ACCESS = ["configure", "prepare", "toggle"]
_distance_value = None
# ########################################################################
# PSO high level interface
def configure(self, d: dict = {}) -> tuple:
def configure(self, d: dict = None) -> tuple:
"""Simplified configuration interface to access the most common
functionality for distance mode PSO.
@@ -282,7 +246,49 @@ class aa1AxisPsoDistance(aa1AxisPsoBase):
logger.info(f"[{self.name}] PSO configured to {pso_wavemode} mode")
return (old, new)
def bluestage(self) -> None:
def on_stage(self) -> None:
"""Configuration and staging
NOTE: Scans don't have to fully configure the device, that can be done
manually outside. However we expect that the device is disabled
when not in use. I.e. this method is not expected to be called when
PSO is not needed or when it'd conflict with other devices.
"""
# Fish out configuration from scaninfo (does not need to be full configuration)
d = {}
if "kwargs" in self.scaninfo.scan_msg.info:
scanargs = self.scaninfo.scan_msg.info["kwargs"]
if "pso_distance" in scanargs:
d["pso_distance"] = scanargs["pso_distance"]
if "pso_wavemode" in scanargs:
d["pso_wavemode"] = scanargs["pso_wavemode"]
if "pso_w_pulse" in scanargs:
d["pso_w_pulse"] = scanargs["pso_w_pulse"]
if "pso_t_pulse" in scanargs:
d["pso_t_pulse"] = scanargs["pso_t_pulse"]
if "pso_n_pulse" in scanargs:
d["pso_n_pulse"] = scanargs["pso_n_pulse"]
# Perform bluesky-style configuration
if d:
# logger.info(f"[{self.name}] Configuring with:\n{d}")
self.configure(d=d)
# Stage the PSO distance module
self.arm()
def on_unstage(self):
"""Turn off the PSO module"""
self.disarm()
def on_trigger(self, settle_time=0.1) -> None | DeviceStatus:
"""Fire a single PSO event (i.e. manual software trigger)"""
# Only trigger if distance was set to invalid
# if self.dstDistanceVal.get() == 0:
logger.warning(f"[{self.name}] Triggerin...")
return self.fire(settle_time)
def arm(self) -> None:
"""Bluesky style stage"""
# Stage the PSO distance module and zero counter
if isinstance(self._distance_value, (np.ndarray, list, tuple)):
@@ -293,3 +299,19 @@ class aa1AxisPsoDistance(aa1AxisPsoBase):
if self.dstDistanceVal.get() > 0:
self.dstEventsEna.set("On").wait()
self.dstCounterEna.set("On").wait()
def disarm(self):
"""Standard bluesky unstage"""
# Ensure output is set to low
# if self.output.value:
# self.toggle()
# Turn off window mode
self.winOutput.set("Off").wait()
self.winEvents.set("Off").wait()
# Turn off distance mode
self.dstEventsEna.set("Off").wait()
self.dstCounterEna.set("Off").wait()
# Disable output
self.outSource.set("None").wait()
# Sleep for one poll period
sleep(0.2)

View File

@@ -5,71 +5,17 @@ interface.
@author: mohacsi_i
"""
from ophyd import Component, EpicsSignal, EpicsSignalRO, Kind
from ophyd import Device, Component, EpicsSignal, EpicsSignalRO, Kind
from ophyd.status import DeviceStatus, SubscriptionStatus
from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase as PSIDeviceBase
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
CustomDetectorMixin as CustomDeviceMixin,
)
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
from bec_lib import bec_logger
logger = bec_logger.logger
class AerotechTasksMixin(CustomDeviceMixin):
"""Mixin class for self-configuration and staging"""
# parent : aa1Tasks
def on_stage(self) -> None:
"""Configuration and staging
In the BEC model ophyd devices must fish out their own configuration from the 'scaninfo'.
I.e. they need to know which parameters are relevant for them at each scan.
NOTE: Scans don't have to fully configure the device, that can be done
manually outside. However we expect that the device is disabled
when not in use. I.e. this method is not expected to be called when
PSO is not needed or when it'd conflict with other devices.
"""
# Fish out our configuration from scaninfo (via explicit or generic addressing)
d = {}
if "kwargs" in self.parent.scaninfo.scan_msg.info:
scanargs = self.parent.scaninfo.scan_msg.info["kwargs"]
if self.parent.scaninfo.scan_type in ("script", "scripted"):
# NOTE: Scans don't have to fully configure the device
if "script_text" in scanargs and scanargs["script_text"] is not None:
d["script_text"] = scanargs["script_text"]
if "script_file" in scanargs and scanargs["script_file"] is not None:
d["script_file"] = scanargs["script_file"]
if "script_task" in scanargs and scanargs["script_task"] is not None:
d["script_task"] = scanargs["script_task"]
# FIXME: The above should be exchanged with:
# d = self.parent.scan_info.scan_msg.scan_parameters.get("aerotech_config")
# Perform bluesky-style configuration
if d:
# logger.warning(f"[{self.parent.name}] Configuring with:\n{d}")
self.parent.configure(d=d)
# The actual staging
self.parent.arm()
def on_unstage(self):
"""Stop the currently selected task"""
self.parent.disarm()
def on_stop(self):
"""Stop the currently selected task"""
self.on_unstage()
def on_kickoff(self):
"""Start execution of the selected task"""
self.parent.launch()
class aa1Tasks(PSIDeviceBase):
class aa1Tasks(PSIDeviceBase, Device):
"""Task management API
The place to manage tasks and AeroScript user files on the controller.
@@ -99,8 +45,8 @@ class aa1Tasks(PSIDeviceBase):
'''
"""
USER_ACCESS = ["arm", "disarm", "launch", "kickoff"]
custom_prepare_cls = AerotechTasksMixin
_failure = Component(EpicsSignalRO, "FAILURE", auto_monitor=True, kind=Kind.normal)
errStatus = Component(EpicsSignalRO, "ERRW", auto_monitor=True, kind=Kind.normal)
@@ -117,6 +63,30 @@ class aa1Tasks(PSIDeviceBase):
EpicsSignal, "FILEWRITE", string=True, kind=Kind.omitted, put_complete=True
)
def __init__(
self,
prefix="",
*,
name,
kind=None,
read_attrs=None,
configuration_attrs=None,
parent=None,
scan_info=None,
**kwargs,
):
# Need to call super() to call the mixin class
super().__init__(
prefix=prefix,
name=name,
kind=kind,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs,
parent=parent,
scan_info=scan_info,
**kwargs,
)
def configure(self, d: dict) -> tuple:
"""Configuration interface for flying"""
# Common operations
@@ -141,6 +111,52 @@ class aa1Tasks(PSIDeviceBase):
new = self.read_configuration()
return (old, new)
def on_stage(self) -> None:
"""Configuration and staging
In the BEC model ophyd devices must fish out their own configuration from the 'scaninfo'.
I.e. they need to know which parameters are relevant for them at each scan.
NOTE: Scans don't have to fully configure the device, that can be done
manually outside. However we expect that the device is disabled
when not in use. I.e. this method is not expected to be called when
PSO is not needed or when it'd conflict with other devices.
"""
# Fish out our configuration from scaninfo (via explicit or generic addressing)
d = {}
if "kwargs" in self.scaninfo.scan_msg.info:
scanargs = self.scaninfo.scan_msg.info["kwargs"]
if self.scaninfo.scan_type in ("script", "scripted"):
# NOTE: Scans don't have to fully configure the device
if "script_text" in scanargs and scanargs["script_text"] is not None:
d["script_text"] = scanargs["script_text"]
if "script_file" in scanargs and scanargs["script_file"] is not None:
d["script_file"] = scanargs["script_file"]
if "script_task" in scanargs and scanargs["script_task"] is not None:
d["script_task"] = scanargs["script_task"]
# FIXME: The above should be exchanged with:
# d = self.scan_info.scan_msg.scan_parameters.get("aerotech_config")
# Perform bluesky-style configuration
if d:
self.configure(d=d)
# The actual staging
self.arm()
def on_unstage(self):
"""Stop the currently selected task"""
self.disarm()
def on_stop(self):
"""Stop the currently selected task"""
self.unstage()
def on_kickoff(self):
"""Start execution of the selected task"""
self.launch()
def arm(self) -> None:
"""Bluesky style stage, prepare, but does not execute"""
if self.taskIndex.get() in (0, 1, 2):
@@ -165,12 +181,6 @@ class aa1Tasks(PSIDeviceBase):
raise RuntimeError("Failed to load task, please check the Aerotech IOC")
return status
def kickoff(self):
"""Missing kickoff, for real?"""
self.custom_prepare.on_kickoff()
##########################################################################
# Bluesky flyer interface
def complete(self) -> SubscriptionStatus:
"""Wait for a RUNNING task"""
timestamp_ = 0

View File

@@ -189,13 +189,6 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase):
if daq_update:
self.backend.set_config(daq_update, force=False)
def disarm(self):
""" Bluesky style unstage"""
# Switch to idle
self.cmdStartCamera.set(0).wait()
if self.autoSoftEnable.get():
self.cmdSoftEnable.set(0).wait()
def set_acquisition_mode(self, acq_mode):
"""Set acquisition mode
@@ -217,7 +210,6 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase):
self.enable_mode = "soft"
self.trigger_mode = "auto"
self.exposure_mode = "timer"
elif acq_mode in ["ext_enable", "external_enable"]:
# NOTE: Trigger using external hardware events via enable input (actually works)
# Switch to physical enable signal
@@ -228,7 +220,6 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase):
self.enable_mode = "external"
self.trigger_mode = "auto"
self.exposure_mode = "timer"
elif acq_mode == "soft":
# NOTE: Fede's configuration for continous streaming
# Switch to physical enable signal

View File

@@ -13,7 +13,7 @@ import requests
import os
from ophyd import Signal, Component, Kind
from ophyd.status import SubscriptionStatus, Status
from ophyd.status import SubscriptionStatus
from websockets.sync.client import connect, ClientConnection
from websockets.exceptions import ConnectionClosedOK, ConnectionClosedError
@@ -28,6 +28,8 @@ logger = bec_logger.logger
class StdDaqMixin(CustomDeviceMixin):
# pylint: disable=protected-access
_mon = None
def on_stage(self) -> None:
"""Configuration and staging
@@ -39,9 +41,6 @@ class StdDaqMixin(CustomDeviceMixin):
# Fish out our configuration from scaninfo (via explicit or generic addressing)
# NOTE: Scans don't have to fully configure the device
d = {}
# scan_parameters = self.parent.scaninfo.scan_msg.scan_parameters
# std_daq_params = scan_parameters.get("std_daq_params")
if "kwargs" in self.parent.scaninfo.scan_msg.info:
scanargs = self.parent.scaninfo.scan_msg.info["kwargs"]
if "image_width" in scanargs and scanargs["image_width"] is not None:
@@ -50,10 +49,6 @@ class StdDaqMixin(CustomDeviceMixin):
d["image_height"] = scanargs["image_height"]
if "nr_writers" in scanargs and scanargs["nr_writers"] is not None:
d["nr_writers"] = scanargs["nr_writers"]
if "system_config" in scanargs and scanargs["system_config"] is not None:
if scanargs["system_config"]["file_directory"]:
file_directory = scanargs["system_config"]["file_directory"]
### to be used in the future to substitute the procedure using file path
if "file_path" in scanargs and scanargs["file_path"] is not None:
self.parent.file_path.set(scanargs["file_path"].replace("data", "gpfs")).wait()
print(scanargs["file_path"])
@@ -98,20 +93,40 @@ class StdDaqMixin(CustomDeviceMixin):
sleep(0.5)
# Try to start a new run (reconnects)
self.parent.arm()
self.parent.bluestage()
# And start status monitoring
self._mon = Thread(target=self.monitor, daemon=True)
self._mon.start()
def on_unstage(self):
"""Stop a running acquisition and close connection"""
print("Creating virtual dataset")
self.parent.create_virtual_dataset()
self.parent.disarm()
self.parent.blueunstage()
def on_stop(self):
"""Stop a running acquisition and close connection"""
self.parent.disarm()
self.parent.blueunstage()
def monitor(self) -> None:
"""Monitor status messages while connection is open. This will block the reply monitoring
to calling unstage() might throw. Status updates are sent every 1 seconds, but finishing
acquisition means StdDAQ will close connection, so there's no idle state polling.
"""
try:
sleep(0.2)
for msg in self.parent._wsclient:
message = json.loads(msg)
self.parent.runstatus.put(message["status"], force=True)
# logger.info(f"[{self.parent.name}] Pushed status: {message['status']}")
except (ConnectionClosedError, ConnectionClosedOK, AssertionError):
# Libraty throws theese after connection is closed
return
except Exception as ex:
logger.warning(f"[{self.parent.name}] Exception in polling: {ex}")
return
finally:
self._mon = None
class StdDaqClient(PSIDeviceBase):
@@ -137,9 +152,9 @@ class StdDaqClient(PSIDeviceBase):
"nuke",
"connect",
"message",
"arm",
"disarm",
"complete",
"state",
"bluestage",
"blueunstage",
]
_wsclient = None
@@ -153,14 +168,12 @@ class StdDaqClient(PSIDeviceBase):
file_prefix = Component(Signal, value="file", kind=Kind.config)
# Configuration attributes
rest_url = Component(Signal, kind=Kind.config, metadata={"write_access": False})
datasource = Component(Signal, kind=Kind.config, metadata={"write_access": False})
cfg_detector_name = Component(Signal, kind=Kind.config)
cfg_detector_type = Component(Signal, kind=Kind.config)
cfg_bit_depth = Component(Signal, kind=Kind.config)
cfg_pixel_height = Component(Signal, kind=Kind.config)
cfg_pixel_width = Component(Signal, kind=Kind.config)
cfg_nr_writers = Component(Signal, kind=Kind.config)
_mon = None
def __init__(
self,
@@ -174,7 +187,7 @@ class StdDaqClient(PSIDeviceBase):
device_manager=None,
ws_url: str = "ws://localhost:8080",
rest_url: str = "http://localhost:5000",
data_source_name="",
data_source_name=None,
**kwargs,
) -> None:
super().__init__(
@@ -189,66 +202,67 @@ class StdDaqClient(PSIDeviceBase):
)
self.ws_url.set(ws_url, force=True).wait()
self.rest_url.set(rest_url, force=True).wait()
self.datasource.set(data_source_name, force=True).wait()
self.data_source_name = data_source_name
# Connect to the DAQ and initialize values
try:
self.get_daq_config(update=True)
except Exception as ex:
logger.error(f"Failed to connect to the stdDAQ REST API\n{ex}")
# Connect to websockets and start poller
try:
self.connect()
except Exception as ex:
logger.error(f"Failed to connect to the stdDAQ websocket interface\n{ex}")
def connect(self) -> None:
def connect(self) -> ClientConnection:
"""Connect to the StdDAQ's websockets interface
StdDAQ may reject connection for a few seconds after restart, or when
it wants so if it fails, wait a bit and try to connect again.
"""
# Connect to stdDAQ
logger.debug(f"[{self.name}] Connecting to stdDAQ at {self.ws_url.get()}")
self._wsclient = connect(self.ws_url.get())
# And start status monitoring
self._mon = Thread(target=self.monitor, daemon=True)
self._mon.start()
num_retry = 0
while num_retry < 5:
try:
logger.debug(f"[{self.name}] Connecting to stdDAQ at {self.ws_url.get()}")
connection = connect(self.ws_url.get())
logger.debug(f"[{self.name}] Connected to stdDAQ after {num_retry} tries")
return connection
except ConnectionRefusedError:
num_retry += 1
sleep(2)
raise ConnectionRefusedError("The stdDAQ websocket interface refused connection 5 times.")
def monitor(self) -> None:
"""Monitor status messages while connection is open. Status updates are
sent every 1 seconds, or when there's a transition.
"""
try:
for msg in self._wsclient:
message = json.loads(msg)
self.runstatus.put(message["status"], force=True)
# logger.info(f"[{self.parent.name}] Pushed status: {message['status']}")
except Exception as ex:
logger.warning(f"[{self.name}] Exception in polling: {ex}")
return
finally:
self._mon = None
def message(self, message: dict) -> None:
def message(self, message: dict, timeout=1, wait_reply=True, client=None) -> None | str:
"""Send a message to the StdDAQ and receive a reply
Note: finishing acquisition means StdDAQ will close connection, so
there's no idle state polling.
"""
# Prepare message
msg = json.dumps(message) if isinstance(message, dict) else str(message)
# Connect if client was destroyed
if self._wsclient is None:
self.connect()
self._wsclient = self.connect()
# Send message (reopen connection if needed)
msg = json.dumps(message) if isinstance(message, dict) else str(message)
try:
self._wsclient.send(msg)
except (ConnectionClosedError, ConnectionClosedOK, AttributeError):
except (ConnectionClosedError, ConnectionClosedOK, AttributeError) as ex:
# Re-connect if the connection was closed
self.connect()
self._wsclient = self.connect()
self._wsclient.send(msg)
# Wait for reply
reply = None
if wait_reply:
try:
reply = self._wsclient.recv(timeout)
return reply
except (ConnectionClosedError, ConnectionClosedOK) as ex:
self._wsclient = None
logger.error(f"[{self.name}] WS connection was closed before reply: {ex}")
except (TimeoutError, RuntimeError) as ex:
logger.error(f"[{self.name}] Error in receiving ws reply: {ex}")
return reply
def configure(self, d: dict = None):
"""Configure the next scan with the stdDAQ
@@ -259,8 +273,6 @@ class StdDaqClient(PSIDeviceBase):
images (limited by the ringbuffer size and backend speed).
file_path: str, optional
File path to save the data, usually GPFS.
file_prefix: str, optional
File prefix to save the data [default = file].
image_width : int, optional
ROI size in the x-direction [pixels].
image_height : int, optional
@@ -283,10 +295,6 @@ class StdDaqClient(PSIDeviceBase):
# Run parameters
if "num_points_total" in d:
self.num_images.set(d["num_points_total"]).wait()
if "file_path" in d:
self.file_path.set(d["file_path"]).wait()
if "file_prefix" in d:
self.file_prefix.set(d["file_prefix"]).wait()
# Restart the DAQ if resolution changed
cfg = self.get_daq_config()
@@ -298,8 +306,8 @@ class StdDaqClient(PSIDeviceBase):
):
# Stop if current status is not idle
# if self.runstatus.get() != "idle":
# logger.warning(f"[{self.name}] stdDAQ reconfiguration might corrupt files")
if self.state() != "idle":
logger.warning(f"[{self.name}] stdDAQ reconfiguration might corrupt files")
# Update retrieved config
cfg["image_pixel_height"] = int(self.cfg_pixel_height.get())
@@ -310,7 +318,7 @@ class StdDaqClient(PSIDeviceBase):
sleep(1)
self.get_daq_config(update=True)
def arm(self):
def bluestage(self):
"""Stages the stdDAQ
Opens a new connection to the stdDAQ, sends the start command with
@@ -318,58 +326,103 @@ class StdDaqClient(PSIDeviceBase):
it for obvious failures.
"""
# Can't stage into a running exposure
# if self.runstatus.get() != "idle":
# raise RuntimeError(f"[{self.name}] stdDAQ can't stage from state: {self.runstatus.get()}")
if self.state() != "idle":
raise RuntimeError(f"[{self.name}] stdDAQ can't stage from state: {self.state()}")
# Ensure expected shape
self.validate()
# Must make sure that image size matches the data source
if self.data_source_name is not None:
cam_img_w = self.device_manager.devices[self.data_source_name].cfgRoiX.get()
cam_img_h = self.device_manager.devices[self.data_source_name].cfgRoiY.get()
daq_img_w = self.cfg_pixel_width.get()
daq_img_h = self.cfg_pixel_height.get()
if not (daq_img_w == cam_img_w and daq_img_h == cam_img_h):
raise RuntimeError(
f"[{self.name}] stdDAQ image resolution ({daq_img_w} , {daq_img_h}) does not match camera with ({cam_img_w} , {cam_img_h})"
)
else:
logger.warning(
f"[{self.name}] stdDAQ image resolution ({daq_img_w} , {daq_img_h}) matches camera with ({cam_img_w} , {cam_img_h})"
)
file_path = self.file_path.get()
file_prefix = self.file_prefix.get()
num_images = self.num_images.get()
file_prefix = self.file_prefix.get()
print(file_prefix)
# New connection
self._wsclient = self.connect()
message = {
"command": "start",
"path": file_path,
"file_prefix": file_prefix,
"n_image": num_images,
}
self.message(message)
reply = self.message(message)
def is_running(*args, value, timestamp, **kwargs):
result = value not in ["idle", "unknown", "error"]
return result
if reply is not None:
reply = json.loads(reply)
self.runstatus.set(reply["status"], force=True).wait()
logger.info(f"[{self.name}] Start DAQ reply: {reply}")
status = SubscriptionStatus(self.runstatus, is_running, timeout=3, settle_time=0.5)
status.wait()
# Give it more time to reconfigure
if reply["status"] in ("rejected"):
# FIXME: running exposure is a nogo
if reply["reason"] == "driver is busy!":
raise RuntimeError(
f"[{self.name}] Start stdDAQ command rejected: already running"
)
else:
# Give it more time to consolidate
sleep(1)
else:
# Success!!!
print(f"[{self.name}] Started stdDAQ in: {reply['status']}")
return
logger.warning(f"[{self.name}] Started stdDAQ in: {self.runstatus.get()}")
return status
raise RuntimeError(
f"[{self.name}] Failed to start the stdDAQ in 1 tries, reason: {reply['reason']}"
)
def disarm(self):
def blueunstage(self):
"""Unstages the stdDAQ
Opens a new connection to the stdDAQ, sends the stop command and
waits for the idle state.
"""
# Stop the DAQ (will close connection) - reply is always "success"
self.message({"command": "stop"})
ii = 0
while ii < 10:
# Stop the DAQ (will close connection) - reply is always "success"
self._wsclient = self.connect()
self.message({"command": "stop_all"}, wait_reply=False)
def is_running(*args, value, timestamp, **kwargs):
result = value in ["idle", "unknown", "error"]
return result
# Let it consolidate
sleep(0.2)
status = SubscriptionStatus(self.runstatus, is_running, timeout=3, settle_time=0.5)
status.wait()
# Check final status (from new connection)
self._wsclient = self.connect()
reply = self.message({"command": "status"})
if reply is not None:
logger.info(f"[{self.name}] DAQ status reply: {reply}")
reply = json.loads(reply)
logger.warning(f"[{self.name}] Stopped stdDAQ in: {self.runstatus.get()}")
return status
if reply["status"] in ("idle", "error"):
# Only 'idle' state accepted
print(f"DAQ stopped on try {ii}")
return
elif reply["status"] in ("stop"):
# Give it more time to stop
sleep(0.5)
elif ii >= 6:
raise RuntimeError(f"Failed to stop StdDAQ: {reply}")
ii += 1
raise RuntimeError(f"Failed to stop StdDAQ in time")
##########################################################################
# Bluesky flyer interface
def complete(self) -> SubscriptionStatus:
"""Wait for current run. Must end in status 'file_saved'."""
def is_running(*args, value, timestamp, **kwargs):
result = value in ["idle", "file_saved", "error"]
return result
@@ -377,25 +430,6 @@ class StdDaqClient(PSIDeviceBase):
status = SubscriptionStatus(self.runstatus, is_running, settle_time=0.5)
return status
def validate(self):
"""Validate camera state
Ensure that data source shape matches with the shape expected by the stdDAQ.
"""
# Must make sure that image size matches the data source
source = self.datasource.get()
if source is not None and len(source) > 0:
if source == "gfcam":
cam_img_w = self.device_manager.devices[source].cfgRoiX.get()
cam_img_h = self.device_manager.devices[source].cfgRoiY.get()
daq_img_w = self.cfg_pixel_width.get()
daq_img_h = self.cfg_pixel_height.get()
if not (daq_img_w == cam_img_w and daq_img_h == cam_img_h):
raise RuntimeError(
f"[{self.name}] stdDAQ image resolution ({daq_img_w} , {daq_img_h}) does not match camera with ({cam_img_w} , {cam_img_h})"
)
def get_daq_config(self, update=False) -> dict:
"""Read the current configuration from the DAQ"""
r = requests.get(self.rest_url.get() + "/api/config/get", params={"user": "ioc"}, timeout=2)
@@ -458,6 +492,17 @@ class StdDaqClient(PSIDeviceBase):
self.set_daq_config(cfg)
sleep(restarttime)
def state(self) -> str | None:
"""Querry the current system status"""
try:
wsclient = self.connect()
wsclient.send(json.dumps({"command": "status"}))
r = wsclient.recv(timeout=1)
r = json.loads(r)
return r["status"]
except ConnectionRefusedError:
raise
# Automatically connect to microXAS testbench if directly invoked
if __name__ == "__main__":

View File

@@ -19,9 +19,15 @@ from ophyd_devices.interfaces.base_classes.psi_detector_base import (
)
from bec_lib import bec_logger
logger = bec_logger.logger
ZMQ_TOPIC_FILTER = b""
ZMQ_TOPIC_FILTER = b''
class StdDaqPreviewState(enum.IntEnum):
"""Standard DAQ ophyd device states"""
UNKNOWN = 0
DETACHED = 1
MONITORING = 2
class StdDaqPreviewMixin(CustomDetectorMixin):
@@ -29,19 +35,94 @@ class StdDaqPreviewMixin(CustomDetectorMixin):
Parent class: CustomDetectorMixin
"""
_mon = None
def on_stage(self):
"""Start listening for preview data stream"""
self.parent.arm()
if self._mon is not None:
self.parent.unstage()
sleep(0.5)
self.parent.connect()
self._stop_polling = False
self._mon = Thread(target=self.poll, daemon=True)
self._mon.start()
def on_unstage(self):
"""Stop a running preview"""
self.parent.disarm()
if self._mon is not None:
self._stop_polling = True
# Might hang on recv_multipart
self._mon.join(timeout=1)
# So also disconnect the socket
self.parent._socket.disconnect(self.parent.url.get())
def on_stop(self):
"""Stop a running preview"""
self.on_unstage()
def poll(self):
"""Collect streamed updates"""
self.parent.status.set(StdDaqPreviewState.MONITORING, force=True)
try:
t_last = time()
while True:
try:
# Exit loop and finish monitoring
if self._stop_polling:
logger.info(f"[{self.parent.name}]\tDetaching monitor")
break
# pylint: disable=no-member
r = self.parent._socket.recv_multipart(flags=zmq.NOBLOCK)
# Length and throtling checks
if len(r) != 2:
logger.warning(
f"[{self.parent.name}] Received malformed array of length {len(r)}")
t_curr = time()
t_elapsed = t_curr - t_last
if t_elapsed < self.parent.throttle.get():
sleep(0.1)
continue
# Unpack the Array V1 reply to metadata and array data
meta, data = r
# Update image and update subscribers
header = json.loads(meta)
if header["type"] == "uint16":
image = np.frombuffer(data, dtype=np.uint16)
if image.size != np.prod(header['shape']):
err = f"Unexpected array size of {image.size} for header: {header}"
raise ValueError(err)
image = image.reshape(header['shape'])
# Update image and update subscribers
self.parent.frame.put(header['frame'], force=True)
self.parent.image_shape.put(header['shape'], force=True)
self.parent.image.put(image, force=True)
self.parent._last_image = image
self.parent._run_subs(sub_type=self.parent.SUB_MONITOR, value=image)
t_last = t_curr
logger.info(
f"[{self.parent.name}] Updated frame {header['frame']}\t"
f"Shape: {header['shape']}\tMean: {np.mean(image):.3f}"
)
except ValueError:
# Happens when ZMQ partially delivers the multipart message
pass
except zmq.error.Again:
# Happens when receive queue is empty
sleep(0.1)
except Exception as ex:
logger.info(f"[{self.parent.name}]\t{str(ex)}")
raise
finally:
self._mon = None
self.parent.status.set(StdDaqPreviewState.DETACHED, force=True)
logger.info(f"[{self.parent.name}]\tDetaching monitor")
class StdDaqPreviewDetector(PSIDetectorBase):
"""Detector wrapper class around the StdDaq preview image stream.
@@ -53,9 +134,8 @@ class StdDaqPreviewDetector(PSIDetectorBase):
You can add a preview widget to the dock by:
cam_widget = gui.add_dock('cam_dock1').add_widget('BECFigure').image('daq_stream1')
"""
# Subscriptions for plotting image
USER_ACCESS = ["arm", "disarm", "get_last_image"]
USER_ACCESS = ["kickoff", "get_last_image"]
SUB_MONITOR = "device_monitor_2d"
_default_sub = SUB_MONITOR
@@ -64,19 +144,19 @@ class StdDaqPreviewDetector(PSIDetectorBase):
# Status attributes
url = Component(Signal, kind=Kind.config)
throttle = Component(Signal, value=0.25, kind=Kind.config)
status = Component(Signal, value=StdDaqPreviewState.UNKNOWN, kind=Kind.omitted)
frame = Component(Signal, kind=Kind.hinted)
image_shape = Component(Signal, kind=Kind.normal)
# FIXME: The BEC client caches the read()s from the last 50 scans
image = Component(Signal, kind=Kind.omitted)
_last_image = None
_stop_polling = True
_mon = None
def __init__(
self, *args, url: str = "tcp://129.129.95.38:20000", parent: Device = None, **kwargs
) -> None:
super().__init__(*args, parent=parent, **kwargs)
self.url._metadata["write_access"] = False
self.status._metadata["write_access"] = False
self.image._metadata["write_access"] = False
self.frame._metadata["write_access"] = False
self.image_shape._metadata["write_access"] = False
@@ -105,88 +185,9 @@ class StdDaqPreviewDetector(PSIDetectorBase):
def get_image(self):
return self._last_image
def arm(self):
"""Start listening for preview data stream"""
if self._mon is not None:
self.unstage()
sleep(0.5)
self.connect()
self._stop_polling = False
self._mon = Thread(target=self.poll, daemon=True)
self._mon.start()
def disarm(self):
"""Stop a running preview"""
if self._mon is not None:
self._stop_polling = True
# Might hang on recv_multipart
self._mon.join(timeout=1)
# So also disconnect the socket (if not already disconnected)
try:
self._socket.disconnect(self.url.get())
except zmq.error.ZMQError:
pass
def poll(self):
"""Collect streamed updates"""
try:
t_last = time()
while True:
try:
# Exit loop and finish monitoring
if self._stop_polling:
break
# pylint: disable=no-member
r = self._socket.recv_multipart(flags=zmq.NOBLOCK)
# Length and throtling checks
if len(r) != 2:
logger.warning(f"[{self.name}] Received malformed array of length {len(r)}")
t_curr = time()
t_elapsed = t_curr - t_last
if t_elapsed < self.throttle.get():
sleep(0.1)
continue
# Unpack the Array V1 reply to metadata and array data
meta, data = r
# Update image and update subscribers
header = json.loads(meta)
image = None
if header["type"] == "uint16":
image = np.frombuffer(data, dtype=np.uint16)
if image.size != np.prod(header["shape"]):
err = f"Unexpected array size of {image.size} for header: {header}"
raise ValueError(err)
image = image.reshape(header["shape"])
# Update image and update subscribers
self.frame.put(header["frame"], force=True)
self.image_shape.put(header["shape"], force=True)
self.image.put(image, force=True)
self._last_image = image
self._run_subs(sub_type=self.SUB_MONITOR, value=image)
t_last = t_curr
logger.info(
f"[{self.name}] Updated frame {header['frame']}\t"
f"Shape: {header['shape']}\tMean: {np.mean(image):.3f}"
)
except ValueError:
# Happens when ZMQ partially delivers the multipart message
pass
except zmq.error.Again:
# Happens when receive queue is empty
sleep(0.1)
except Exception as ex:
logger.info(f"[{self.name}]\t{str(ex)}")
raise
finally:
self._mon = None
logger.info(f"[{self.name}]\tDetaching monitor")
def kickoff(self) -> DeviceStatus:
""" The DAQ was not meant to be toggled"""
return DeviceStatus(self, done=True, success=True, settle_time=0.1)
# Automatically connect to MicroSAXS testbench if directly invoked