This commit is contained in:
gac-x05la
2025-03-03 12:20:56 +01:00
parent 4266798e30
commit e02bb5892e
8 changed files with 231 additions and 255 deletions

View File

@@ -196,22 +196,22 @@ pcocam:
readoutPriority: monitored readoutPriority: monitored
softwareTrigger: true softwareTrigger: true
pcodaq: # pcodaq:
description: GigaFrost stdDAQ client # description: GigaFrost stdDAQ client
deviceClass: tomcat_bec.devices.StdDaqClient # deviceClass: tomcat_bec.devices.StdDaqClient
deviceConfig: # deviceConfig:
ws_url: 'ws://129.129.95.111:8081' # ws_url: 'ws://129.129.95.111:8081'
rest_url: 'http://129.129.95.111:5010' # rest_url: 'http://129.129.95.111:5010'
data_source_name: 'pcocam' # data_source_name: 'pcocam'
deviceTags: # deviceTags:
- std-daq # - std-daq
- daq # - daq
- pcocam # - pcocam
enabled: true # enabled: false
onFailure: buffer # onFailure: buffer
readOnly: false # readOnly: false
readoutPriority: monitored # readoutPriority: monitored
softwareTrigger: false # softwareTrigger: false
pco_stream0: pco_stream0:
description: stdDAQ preview (2 every 555) description: stdDAQ preview (2 every 555)

View File

@@ -40,7 +40,7 @@ class AerotechDriveDataCollectionMixin(CustomDeviceMixin):
d["ddc_source0"] = scanargs["ddc_source0"] d["ddc_source0"] = scanargs["ddc_source0"]
if "ddc_source1" in scanargs: if "ddc_source1" in scanargs:
d["ddc_source1"] = scanargs["ddc_source1"] d["ddc_source1"] = scanargs["ddc_source1"]
# Number of points
if "ddc_num_points" in scanargs and scanargs["ddc_num_points"] is not None: if "ddc_num_points" in scanargs and scanargs["ddc_num_points"] is not None:
d["num_points_total"] = scanargs["ddc_num_points"] d["num_points_total"] = scanargs["ddc_num_points"]
elif "daq_num_points" in scanargs and scanargs["daq_num_points"] is not None: elif "daq_num_points" in scanargs and scanargs["daq_num_points"] is not None:
@@ -67,12 +67,12 @@ class AerotechDriveDataCollectionMixin(CustomDeviceMixin):
self.parent.configure(d=d) self.parent.configure(d=d)
# Stage the data collection if not in internally launced mode # Stage the data collection if not in internally launced mode
# NOTE: Scripted scans start acquiring from the scrits # NOTE: Scripted scans might configure and start acquiring from the scrits
self.parent.bluestage() self.parent.arm()
def on_unstage(self): def on_unstage(self):
"""Standard bluesky unstage""" """Standard bluesky unstage"""
self.parent._switch.set("Stop", settle_time=0.2).wait() self.parent.disarm()
class aa1AxisDriveDataCollection(PSIDeviceBase): class aa1AxisDriveDataCollection(PSIDeviceBase):
@@ -118,7 +118,7 @@ class aa1AxisDriveDataCollection(PSIDeviceBase):
_buffer1 = Component(EpicsSignalRO, "BUFFER1", auto_monitor=True, kind=Kind.normal) _buffer1 = Component(EpicsSignalRO, "BUFFER1", auto_monitor=True, kind=Kind.normal)
custom_prepare_cls = AerotechDriveDataCollectionMixin custom_prepare_cls = AerotechDriveDataCollectionMixin
USER_ACCESS = ["configure", "reset", "collect"] USER_ACCESS = ["configure", "reset", "collect", "arm", "disarm"]
def configure(self, d: dict = None) -> tuple: def configure(self, d: dict = None) -> tuple:
"""Configure data capture """Configure data capture
@@ -145,12 +145,12 @@ class aa1AxisDriveDataCollection(PSIDeviceBase):
new = self.read_configuration() new = self.read_configuration()
return (old, new) return (old, new)
def bluestage(self) -> None: def arm(self) -> None:
"""Bluesky-style stage""" """Bluesky-style stage"""
self._switch.set("ResetRB", settle_time=0.1).wait() self._switch.set("ResetRB", settle_time=0.1).wait()
self._switch.set("Start", settle_time=0.2).wait() self._switch.set("Start", settle_time=0.2).wait()
def blueunstage(self): def disarm(self):
"""Bluesky-style unstage""" """Bluesky-style unstage"""
self._switch.set("Stop", settle_time=0.2).wait() self._switch.set("Stop", settle_time=0.2).wait()

View File

@@ -33,7 +33,20 @@ class AerotechTasksMixin(CustomDeviceMixin):
PSO is not needed or when it'd conflict with other devices. PSO is not needed or when it'd conflict with other devices.
""" """
# Fish out our configuration from scaninfo (via explicit or generic addressing) # Fish out our configuration from scaninfo (via explicit or generic addressing)
d = self.parent.scan_info.scan_msg.scan_parameters.get("aerotech_config") d = {}
if "kwargs" in self.parent.scaninfo.scan_msg.info:
scanargs = self.parent.scaninfo.scan_msg.info["kwargs"]
if self.parent.scaninfo.scan_type in ("script", "scripted"):
# NOTE: Scans don't have to fully configure the device
if "script_text" in scanargs and scanargs["script_text"] is not None:
d["script_text"] = scanargs["script_text"]
if "script_file" in scanargs and scanargs["script_file"] is not None:
d["script_file"] = scanargs["script_file"]
if "script_task" in scanargs and scanargs["script_task"] is not None:
d["script_task"] = scanargs["script_task"]
# FIXME: The above should be exchanged with:
# d = self.parent.scan_info.scan_msg.scan_parameters.get("aerotech_config")
# Perform bluesky-style configuration # Perform bluesky-style configuration
if d: if d:
@@ -41,19 +54,19 @@ class AerotechTasksMixin(CustomDeviceMixin):
self.parent.configure(d=d) self.parent.configure(d=d)
# The actual staging # The actual staging
self.parent.bluestage() self.parent.arm()
def on_unstage(self): def on_unstage(self):
"""Stop the currently selected task""" """Stop the currently selected task"""
self.parent.blueunstage() self.parent.disarm()
def on_stop(self): def on_stop(self):
"""Stop the currently selected task""" """Stop the currently selected task"""
self.parent.switch.set("Stop").wait() self.on_unstage()
def on_kickoff(self): def on_kickoff(self):
"""Start execution of the selected task""" """Start execution of the selected task"""
self.parent.bluekickoff() self.parent.launch()
class aa1Tasks(PSIDeviceBase): class aa1Tasks(PSIDeviceBase):
@@ -86,7 +99,7 @@ class aa1Tasks(PSIDeviceBase):
''' '''
""" """
USER_ACCESS = ["arm", "disarm", "launch", "kickoff"]
custom_prepare_cls = AerotechTasksMixin custom_prepare_cls = AerotechTasksMixin
_failure = Component(EpicsSignalRO, "FAILURE", auto_monitor=True, kind=Kind.normal) _failure = Component(EpicsSignalRO, "FAILURE", auto_monitor=True, kind=Kind.normal)
@@ -100,7 +113,6 @@ class aa1Tasks(PSIDeviceBase):
_executeReply = Component(EpicsSignalRO, "EXECUTE_RBV", string=True, auto_monitor=True) _executeReply = Component(EpicsSignalRO, "EXECUTE_RBV", string=True, auto_monitor=True)
fileName = Component(EpicsSignal, "FILENAME", string=True, kind=Kind.omitted, put_complete=True) fileName = Component(EpicsSignal, "FILENAME", string=True, kind=Kind.omitted, put_complete=True)
# _fileRead = Component(EpicsPassiveRO, "FILEREAD", string=True, kind=Kind.omitted)
_fileWrite = Component( _fileWrite = Component(
EpicsSignal, "FILEWRITE", string=True, kind=Kind.omitted, put_complete=True EpicsSignal, "FILEWRITE", string=True, kind=Kind.omitted, put_complete=True
) )
@@ -129,7 +141,7 @@ class aa1Tasks(PSIDeviceBase):
new = self.read_configuration() new = self.read_configuration()
return (old, new) return (old, new)
def bluestage(self) -> None: def arm(self) -> None:
"""Bluesky style stage, prepare, but does not execute""" """Bluesky style stage, prepare, but does not execute"""
if self.taskIndex.get() in (0, 1, 2): if self.taskIndex.get() in (0, 1, 2):
logger.error(f"[{self.name}] Loading AeroScript on system task. Daring today are we?") logger.error(f"[{self.name}] Loading AeroScript on system task. Daring today are we?")
@@ -140,11 +152,11 @@ class aa1Tasks(PSIDeviceBase):
raise RuntimeError("Failed to load task, please check the Aerotech IOC") raise RuntimeError("Failed to load task, please check the Aerotech IOC")
return status return status
def blueunstage(self): def disarm(self):
"""Bluesky style unstage, stops execution""" """Bluesky style unstage, stops execution"""
self.switch.set("Stop").wait() self.switch.set("Stop").wait()
def bluekickoff(self): def launch(self):
"""Bluesky style kickoff""" """Bluesky style kickoff"""
# Launch and check success # Launch and check success
status = self.switch.set("Start", settle_time=0.2) status = self.switch.set("Start", settle_time=0.2)
@@ -159,7 +171,7 @@ class aa1Tasks(PSIDeviceBase):
########################################################################## ##########################################################################
# Bluesky flyer interface # Bluesky flyer interface
def complete(self) -> DeviceStatus: def complete(self) -> SubscriptionStatus:
"""Wait for a RUNNING task""" """Wait for a RUNNING task"""
timestamp_ = 0 timestamp_ = 0
task_idx = int(self.taskIndex.get()) task_idx = int(self.taskIndex.get())

View File

@@ -194,7 +194,7 @@ class GigaFrostCameraMixin(CustomDetectorMixin):
if self.parent.infoSyncFlag.value == 0: if self.parent.infoSyncFlag.value == 0:
self.parent.cmdSyncHw.set(1).wait() self.parent.cmdSyncHw.set(1).wait()
# Switch to acquiring # Switch to acquiring
self.parent.bluestage() self.parent.arm()
def on_unstage(self) -> None: def on_unstage(self) -> None:
"""Specify actions to be executed during unstage. """Specify actions to be executed during unstage.
@@ -204,9 +204,7 @@ class GigaFrostCameraMixin(CustomDetectorMixin):
with flagged done to BEC. with flagged done to BEC.
""" """
# Switch to idle # Switch to idle
self.parent.cmdStartCamera.set(0).wait() self.parent.disarm()
if self.parent.autoSoftEnable.get():
self.parent.cmdSoftEnable.set(0).wait()
def on_stop(self) -> None: def on_stop(self) -> None:
""" """
@@ -440,7 +438,7 @@ class GigaFrostCamera(PSIDetectorBase):
cfgInputPolarity2 = Component(EpicsSignalRO, "BNC5_RBV", auto_monitor=True, kind=Kind.config) cfgInputPolarity2 = Component(EpicsSignalRO, "BNC5_RBV", auto_monitor=True, kind=Kind.config)
infoBoardTemp = Component(EpicsSignalRO, "T_BOARD", auto_monitor=True) infoBoardTemp = Component(EpicsSignalRO, "T_BOARD", auto_monitor=True)
USER_ACCESS = ["exposure_mode", "fix_nframes_mode", "trigger_mode", "enable_mode", "initialize"] USER_ACCESS = ["exposure_mode", "fix_nframes_mode", "trigger_mode", "enable_mode", "initialize", "arm", "disarm"]
autoSoftEnable = Component(Signal, kind=Kind.config) autoSoftEnable = Component(Signal, kind=Kind.config)
backendUrl = Component(Signal, kind=Kind.config) backendUrl = Component(Signal, kind=Kind.config)
@@ -572,11 +570,18 @@ class GigaFrostCamera(PSIDetectorBase):
# Commit parameters # Commit parameters
self.cmdSetParam.set(1).wait() self.cmdSetParam.set(1).wait()
def bluestage(self): def arm(self):
"""Bluesky style stage""" """Bluesky style stage"""
# Switch to acquiring # Switch to acquiring
self.cmdStartCamera.set(1).wait() self.cmdStartCamera.set(1).wait()
def disarm(self):
""" Bluesky style unstage"""
# Switch to idle
self.cmdStartCamera.set(0).wait()
if self.autoSoftEnable.get():
self.cmdSoftEnable.set(0).wait()
def set_acquisition_mode(self, acq_mode): def set_acquisition_mode(self, acq_mode):
"""Set acquisition mode """Set acquisition mode
@@ -589,18 +594,17 @@ class GigaFrostCamera(PSIDetectorBase):
if acq_mode == "default": if acq_mode == "default":
# NOTE: Trigger using software events via softEnable (actually works) # NOTE: Trigger using software events via softEnable (actually works)
# Switch to physical enable signal
self.cfgEnableScheme.set(0).wait()
# Trigger parameters # Trigger parameters
self.cfgCntStartBit.set(1).wait() self.cfgCntStartBit.set(1).wait()
self.cfgCntEndBit.set(0).wait() self.cfgCntEndBit.set(0).wait()
# Switch to physical enable signal
self.cfgEnableScheme.set(0).wait()
# Set modes # Set modes
# self.cmdSoftEnable.set(0).wait() # self.cmdSoftEnable.set(0).wait()
self.enable_mode = "soft" self.enable_mode = "soft"
self.trigger_mode = "auto" self.trigger_mode = "auto"
self.exposure_mode = "timer" self.exposure_mode = "timer"
elif acq_mode in ["ext_enable", "external_enable"]: elif acq_mode in ["ext_enable", "external_enable"]:
# NOTE: Trigger using external hardware events via enable input (actually works) # NOTE: Trigger using external hardware events via enable input (actually works)
# Switch to physical enable signal # Switch to physical enable signal
@@ -612,10 +616,14 @@ class GigaFrostCamera(PSIDetectorBase):
self.enable_mode = "external" self.enable_mode = "external"
self.trigger_mode = "auto" self.trigger_mode = "auto"
self.exposure_mode = "timer" self.exposure_mode = "timer"
elif acq_mode == "soft": elif acq_mode == "soft":
# NOTE: Fede's configuration for continous streaming # NOTE: Fede's configuration for continous streaming
# Switch to physical enable signal # Switch to physical enable signal
self.cfgEnableScheme.set(0).wait() self.cfgEnableScheme.set(0).wait()
# Set trigger edge to fixed frames on posedge
self.cfgCntStartBit.set(1).wait()
self.cfgCntEndBit.set(0).wait()
# Set enable signal to always # Set enable signal to always
self.cfgEnableExt.set(0).wait() self.cfgEnableExt.set(0).wait()
self.cfgEnableSoft.set(1).wait() self.cfgEnableSoft.set(1).wait()
@@ -626,16 +634,18 @@ class GigaFrostCamera(PSIDetectorBase):
self.cfgTrigTimer.set(1).wait() self.cfgTrigTimer.set(1).wait()
self.cfgTrigAuto.set(0).wait() self.cfgTrigAuto.set(0).wait()
# Set exposure mode to timer # Set exposure mode to timer
# self.exposure_mode = "timer"
self.cfgExpExt.set(0).wait() self.cfgExpExt.set(0).wait()
self.cfgExpSoft.set(0).wait() self.cfgExpSoft.set(0).wait()
self.cfgExpTimer.set(1).wait() self.cfgExpTimer.set(1).wait()
# Set trigger edge to fixed frames on posedge
self.cfgCntStartBit.set(1).wait()
self.cfgCntEndBit.set(0).wait()
elif acq_mode in ["ext", "external"]: elif acq_mode in ["ext", "external"]:
# NOTE: Untested # NOTE: Untested
# Switch to physical enable signal # Switch to physical enable signal
self.cfgEnableScheme.set(0).wait() self.cfgEnableScheme.set(0).wait()
# Set trigger edge to fixed frames on posedge
self.cfgCntStartBit.set(1).wait()
self.cfgCntEndBit.set(0).wait()
# Set enable signal to always # Set enable signal to always
self.cfgEnableExt.set(0).wait() self.cfgEnableExt.set(0).wait()
self.cfgEnableSoft.set(0).wait() self.cfgEnableSoft.set(0).wait()
@@ -646,12 +656,11 @@ class GigaFrostCamera(PSIDetectorBase):
self.cfgTrigTimer.set(0).wait() self.cfgTrigTimer.set(0).wait()
self.cfgTrigAuto.set(0).wait() self.cfgTrigAuto.set(0).wait()
# Set exposure mode to timer # Set exposure mode to timer
# self.exposure_mode = "timer"
self.cfgExpExt.set(0).wait() self.cfgExpExt.set(0).wait()
self.cfgExpSoft.set(0).wait() self.cfgExpSoft.set(0).wait()
self.cfgExpTimer.set(1).wait() self.cfgExpTimer.set(1).wait()
# Set trigger edge to fixed frames on posedge
self.cfgCntStartBit.set(1).wait()
self.cfgCntEndBit.set(0).wait()
else: else:
raise RuntimeError(f"Unsupported acquisition mode: {acq_mode}") raise RuntimeError(f"Unsupported acquisition mode: {acq_mode}")

View File

@@ -75,15 +75,15 @@ class PcoEdgeCameraMixin(CustomPrepare):
self.parent.configure(d=d) self.parent.configure(d=d)
# ARM the camera # ARM the camera
self.parent.bluestage() self.parent.arm()
def on_unstage(self) -> None: def on_unstage(self) -> None:
"""Disarm the PCO.Edge camera""" """Disarm the PCO.Edge camera"""
self.parent.blueunstage() self.parent.disarm()
def on_stop(self) -> None: def on_stop(self) -> None:
"""Stop the PCO.Edge camera""" """Stop the PCO.Edge camera"""
self.parent.blueunstage() self.parent.disarm()
def on_trigger(self) -> None | DeviceStatus: def on_trigger(self) -> None | DeviceStatus:
"""Trigger mode operation """Trigger mode operation
@@ -333,7 +333,7 @@ class HelgeCameraBase(BECDeviceBase):
status = SubscriptionStatus(self.camSetParam, negedge, timeout=5, settle_time=0.5) status = SubscriptionStatus(self.camSetParam, negedge, timeout=5, settle_time=0.5)
status.wait() status.wait()
def bluestage(self): def arm(self):
"""Bluesky style stage: arm the detector""" """Bluesky style stage: arm the detector"""
logger.warning("Staging PCO") logger.warning("Staging PCO")
# Acquisition is only allowed when the IOC is not busy # Acquisition is only allowed when the IOC is not busy
@@ -358,7 +358,7 @@ class HelgeCameraBase(BECDeviceBase):
status = SubscriptionStatus(self.camStatusCode, is_running, timeout=5, settle_time=0.2) status = SubscriptionStatus(self.camStatusCode, is_running, timeout=5, settle_time=0.2)
status.wait() status.wait()
def blueunstage(self): def disarm(self):
"""Bluesky style unstage: stop the detector""" """Bluesky style unstage: stop the detector"""
self.camStatusCmd.set("Idle").wait() self.camStatusCmd.set("Idle").wait()
@@ -366,7 +366,7 @@ class HelgeCameraBase(BECDeviceBase):
# FIXME: This might interrupt data transfer # FIXME: This might interrupt data transfer
self.file_savestop.set(0).wait() self.file_savestop.set(0).wait()
def bluekickoff(self): def launch(self):
"""Start data transfer """Start data transfer
TODO: Need to revisit this once triggering is complete TODO: Need to revisit this once triggering is complete
@@ -383,7 +383,7 @@ class PcoEdge5M(HelgeCameraBase):
""" """
custom_prepare_cls = PcoEdgeCameraMixin custom_prepare_cls = PcoEdgeCameraMixin
USER_ACCESS = ["bluestage", "blueunstage", "bluekickoff"] USER_ACCESS = ["arm", "disarm", "launch"]
# ######################################################################## # ########################################################################
# Additional status info # Additional status info

View File

@@ -28,8 +28,6 @@ logger = bec_logger.logger
class StdDaqMixin(CustomDeviceMixin): class StdDaqMixin(CustomDeviceMixin):
# pylint: disable=protected-access # pylint: disable=protected-access
_mon = None
def on_stage(self) -> None: def on_stage(self) -> None:
"""Configuration and staging """Configuration and staging
@@ -100,42 +98,20 @@ class StdDaqMixin(CustomDeviceMixin):
sleep(0.5) sleep(0.5)
# Try to start a new run (reconnects) # Try to start a new run (reconnects)
# if std_daq_params.get("reconnect", True): self.parent.arm()
self.parent.bluestage()
# And start status monitoring
self._mon = Thread(target=self.monitor, daemon=True)
self._mon.start()
def on_unstage(self): def on_unstage(self):
"""Stop a running acquisition and close connection""" """Stop a running acquisition and close connection"""
print("Creating virtual dataset") print("Creating virtual dataset")
self.parent.create_virtual_dataset() self.parent.create_virtual_dataset()
self.parent.blueunstage() self.parent.disarm()
def on_stop(self): def on_stop(self):
"""Stop a running acquisition and close connection""" """Stop a running acquisition and close connection"""
self.parent.blueunstage() self.parent.disarm()
def monitor(self) -> None:
"""Monitor status messages while connection is open. This will block the reply monitoring
to calling unstage() might throw. Status updates are sent every 1 seconds, but finishing
acquisition means StdDAQ will close connection, so there's no idle state polling.
"""
try:
sleep(0.2)
for msg in self.parent._wsclient:
message = json.loads(msg)
self.parent.runstatus.put(message["status"], force=True)
# logger.info(f"[{self.parent.name}] Pushed status: {message['status']}")
except (ConnectionClosedError, ConnectionClosedOK, AssertionError):
# Libraty throws theese after connection is closed
return
except Exception as ex:
logger.warning(f"[{self.parent.name}] Exception in polling: {ex}")
return
finally:
self._mon = None
class StdDaqClient(PSIDeviceBase): class StdDaqClient(PSIDeviceBase):
@@ -161,9 +137,8 @@ class StdDaqClient(PSIDeviceBase):
"nuke", "nuke",
"connect", "connect",
"message", "message",
"state", "arm",
"bluestage", "disarm",
"blueunstage",
"complete", "complete",
] ]
_wsclient = None _wsclient = None
@@ -185,6 +160,7 @@ class StdDaqClient(PSIDeviceBase):
cfg_pixel_height = Component(Signal, kind=Kind.config) cfg_pixel_height = Component(Signal, kind=Kind.config)
cfg_pixel_width = Component(Signal, kind=Kind.config) cfg_pixel_width = Component(Signal, kind=Kind.config)
cfg_nr_writers = Component(Signal, kind=Kind.config) cfg_nr_writers = Component(Signal, kind=Kind.config)
_mon = None
def __init__( def __init__(
self, self,
@@ -198,7 +174,7 @@ class StdDaqClient(PSIDeviceBase):
device_manager=None, device_manager=None,
ws_url: str = "ws://localhost:8080", ws_url: str = "ws://localhost:8080",
rest_url: str = "http://localhost:5000", rest_url: str = "http://localhost:5000",
data_source_name=None, data_source_name="",
**kwargs, **kwargs,
) -> None: ) -> None:
super().__init__( super().__init__(
@@ -220,60 +196,59 @@ class StdDaqClient(PSIDeviceBase):
self.get_daq_config(update=True) self.get_daq_config(update=True)
except Exception as ex: except Exception as ex:
logger.error(f"Failed to connect to the stdDAQ REST API\n{ex}") logger.error(f"Failed to connect to the stdDAQ REST API\n{ex}")
# Connect to websockets and start poller
try:
self.connect()
except Exception as ex:
logger.error(f"Failed to connect to the stdDAQ websocket interface\n{ex}")
def connect(self) -> ClientConnection: def connect(self) -> None:
"""Connect to the StdDAQ's websockets interface """Connect to the StdDAQ's websockets interface
StdDAQ may reject connection for a few seconds after restart, or when StdDAQ may reject connection for a few seconds after restart, or when
it wants so if it fails, wait a bit and try to connect again. it wants so if it fails, wait a bit and try to connect again.
""" """
num_retry = 0 # Connect to stdDAQ
while num_retry < 5: logger.debug(f"[{self.name}] Connecting to stdDAQ at {self.ws_url.get()}")
try: self._wsclient = connect(self.ws_url.get())
logger.debug(f"[{self.name}] Connecting to stdDAQ at {self.ws_url.get()}") # And start status monitoring
connection = connect(self.ws_url.get()) self._mon = Thread(target=self.monitor, daemon=True)
logger.debug(f"[{self.name}] Connected to stdDAQ after {num_retry} tries") self._mon.start()
return connection
except ConnectionRefusedError:
num_retry += 1
sleep(2)
raise ConnectionRefusedError("The stdDAQ websocket interface refused connection 5 times.")
def message(self, message: dict, timeout=1, wait_reply=True, client=None) -> None | str: def monitor(self) -> None:
"""Monitor status messages while connection is open. Status updates are
sent every 1 seconds, or when there's a transition.
"""
try:
for msg in self._wsclient:
message = json.loads(msg)
self.runstatus.put(message["status"], force=True)
# logger.info(f"[{self.parent.name}] Pushed status: {message['status']}")
except Exception as ex:
logger.warning(f"[{self.name}] Exception in polling: {ex}")
return
finally:
self._mon = None
def message(self, message: dict) -> None:
"""Send a message to the StdDAQ and receive a reply """Send a message to the StdDAQ and receive a reply
Note: finishing acquisition means StdDAQ will close connection, so
there's no idle state polling.
""" """
# Prepare message # Prepare message
msg = json.dumps(message) if isinstance(message, dict) else str(message) msg = json.dumps(message) if isinstance(message, dict) else str(message)
# Connect if client was destroyed # Connect if client was destroyed
if self._wsclient is None: if self._wsclient is None:
self._wsclient = self.connect() self.connect()
# Send message (reopen connection if needed) # Send message (reopen connection if needed)
msg = json.dumps(message) if isinstance(message, dict) else str(message) msg = json.dumps(message) if isinstance(message, dict) else str(message)
try: try:
self._wsclient.send(msg) self._wsclient.send(msg)
except (ConnectionClosedError, ConnectionClosedOK, AttributeError) as ex: except (ConnectionClosedError, ConnectionClosedOK, AttributeError):
# Re-connect if the connection was closed # Re-connect if the connection was closed
self._wsclient = self.connect() self.connect()
self._wsclient.send(msg) self._wsclient.send(msg)
# Wait for reply
reply = None
if wait_reply:
try:
reply = self._wsclient.recv(timeout)
return reply
except (ConnectionClosedError, ConnectionClosedOK) as ex:
self._wsclient = None
logger.error(f"[{self.name}] WS connection was closed before reply: {ex}")
except (TimeoutError, RuntimeError) as ex:
logger.error(f"[{self.name}] Error in receiving ws reply: {ex}")
return reply
def configure(self, d: dict = None): def configure(self, d: dict = None):
"""Configure the next scan with the stdDAQ """Configure the next scan with the stdDAQ
@@ -323,8 +298,8 @@ class StdDaqClient(PSIDeviceBase):
): ):
# Stop if current status is not idle # Stop if current status is not idle
if self.state() != "idle": # if self.runstatus.get() != "idle":
logger.warning(f"[{self.name}] stdDAQ reconfiguration might corrupt files") # logger.warning(f"[{self.name}] stdDAQ reconfiguration might corrupt files")
# Update retrieved config # Update retrieved config
cfg["image_pixel_height"] = int(self.cfg_pixel_height.get()) cfg["image_pixel_height"] = int(self.cfg_pixel_height.get())
@@ -335,7 +310,7 @@ class StdDaqClient(PSIDeviceBase):
sleep(1) sleep(1)
self.get_daq_config(update=True) self.get_daq_config(update=True)
def bluestage(self): def arm(self):
"""Stages the stdDAQ """Stages the stdDAQ
Opens a new connection to the stdDAQ, sends the start command with Opens a new connection to the stdDAQ, sends the start command with
@@ -343,8 +318,8 @@ class StdDaqClient(PSIDeviceBase):
it for obvious failures. it for obvious failures.
""" """
# Can't stage into a running exposure # Can't stage into a running exposure
if self.state() != "idle": # if self.runstatus.get() != "idle":
raise RuntimeError(f"[{self.name}] stdDAQ can't stage from state: {self.state()}") # raise RuntimeError(f"[{self.name}] stdDAQ can't stage from state: {self.runstatus.get()}")
# Ensure expected shape # Ensure expected shape
self.validate() self.validate()
@@ -354,83 +329,47 @@ class StdDaqClient(PSIDeviceBase):
num_images = self.num_images.get() num_images = self.num_images.get()
# New connection # New connection
self._wsclient = self.connect()
message = { message = {
"command": "start", "command": "start",
"path": file_path, "path": file_path,
"file_prefix": file_prefix, "file_prefix": file_prefix,
"n_image": num_images, "n_image": num_images,
} }
reply = self.message(message) self.message(message)
if reply is not None: def is_running(*args, value, timestamp, **kwargs):
reply = json.loads(reply) result = value not in ["idle", "unknown", "error"]
self.runstatus.set(reply["status"], force=True).wait() return result
logger.info(f"[{self.name}] Start DAQ reply: {reply}")
# Give it more time to reconfigure status = SubscriptionStatus(self.runstatus, is_running, timeout=3, settle_time=0.5)
if reply["status"] in ("rejected"): status.wait()
# FIXME: running exposure is a nogo
if reply["reason"] == "driver is busy!":
raise RuntimeError(
f"[{self.name}] Start stdDAQ command rejected: already running"
)
else:
# Give it more time to consolidate
sleep(1)
else:
# Success!!!
print(f"[{self.name}] Started stdDAQ in: {reply['status']}")
return
raise RuntimeError(f"[{self.name}] Failed to start the stdDAQ, reason: {reply['reason']}") logger.warning(f"[{self.name}] Started stdDAQ in: {self.runstatus.get()}")
return status
def blueunstage(self): def disarm(self):
"""Unstages the stdDAQ """Unstages the stdDAQ
Opens a new connection to the stdDAQ, sends the stop command and Opens a new connection to the stdDAQ, sends the stop command and
waits for the idle state. waits for the idle state.
""" """
ii = 0 # Stop the DAQ (will close connection) - reply is always "success"
while ii < 10: self.message({"command": "stop"})
# Stop the DAQ (will close connection) - reply is always "success"
self._wsclient = self.connect()
self.message({"command": "stop_all"}, wait_reply=False)
# Let it consolidate def is_running(*args, value, timestamp, **kwargs):
sleep(0.2) result = value in ["idle", "unknown", "error"]
return result
# Check final status (from new connection) status = SubscriptionStatus(self.runstatus, is_running, timeout=3, settle_time=0.5)
self._wsclient = self.connect() status.wait()
reply = self.message({"command": "status"})
if reply is not None:
logger.info(f"[{self.name}] DAQ status reply: {reply}")
reply = json.loads(reply)
if reply["status"] in ("idle", "error"): logger.warning(f"[{self.name}] Stopped stdDAQ in: {self.runstatus.get()}")
# Only 'idle' state accepted return status
print(f"DAQ stopped on try {ii}")
return
elif reply["status"] in ("stop"):
# Give it more time to stop
sleep(0.5)
elif ii >= 6:
raise RuntimeError(f"Failed to stop StdDAQ: {reply}")
ii += 1
raise RuntimeError(f"Failed to stop StdDAQ in time")
########################################################################## ##########################################################################
# Bluesky flyer interface # Bluesky flyer interface
def complete(self) -> SubscriptionStatus: def complete(self) -> SubscriptionStatus:
"""Wait for current run. Must end in status 'file_saved'.""" """Wait for current run. Must end in status 'file_saved'."""
# Return immediately if we're detached
# TODO: Maybe check for connection (not sure if it's better)
if self.state() in ["idle", "file_saved", "error"]:
status = Status(self)
status.set_finished()
return status
def is_running(*args, value, timestamp, **kwargs): def is_running(*args, value, timestamp, **kwargs):
result = value in ["idle", "file_saved", "error"] result = value in ["idle", "file_saved", "error"]
return result return result
@@ -519,17 +458,6 @@ class StdDaqClient(PSIDeviceBase):
self.set_daq_config(cfg) self.set_daq_config(cfg)
sleep(restarttime) sleep(restarttime)
def state(self) -> str | None:
"""Querry the current system status"""
try:
wsclient = self.connect()
wsclient.send(json.dumps({"command": "status"}))
r = wsclient.recv(timeout=1)
r = json.loads(r)
return r["status"]
except ConnectionRefusedError:
raise
# Automatically connect to microXAS testbench if directly invoked # Automatically connect to microXAS testbench if directly invoked
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -19,8 +19,9 @@ from ophyd_devices.interfaces.base_classes.psi_detector_base import (
) )
from bec_lib import bec_logger from bec_lib import bec_logger
logger = bec_logger.logger logger = bec_logger.logger
ZMQ_TOPIC_FILTER = b'' ZMQ_TOPIC_FILTER = b""
class StdDaqPreviewMixin(CustomDetectorMixin): class StdDaqPreviewMixin(CustomDetectorMixin):
@@ -28,6 +29,7 @@ class StdDaqPreviewMixin(CustomDetectorMixin):
Parent class: CustomDetectorMixin Parent class: CustomDetectorMixin
""" """
def on_stage(self): def on_stage(self):
"""Start listening for preview data stream""" """Start listening for preview data stream"""
self.parent.arm() self.parent.arm()
@@ -40,6 +42,7 @@ class StdDaqPreviewMixin(CustomDetectorMixin):
"""Stop a running preview""" """Stop a running preview"""
self.on_unstage() self.on_unstage()
class StdDaqPreviewDetector(PSIDetectorBase): class StdDaqPreviewDetector(PSIDetectorBase):
"""Detector wrapper class around the StdDaq preview image stream. """Detector wrapper class around the StdDaq preview image stream.
@@ -50,6 +53,7 @@ class StdDaqPreviewDetector(PSIDetectorBase):
You can add a preview widget to the dock by: You can add a preview widget to the dock by:
cam_widget = gui.add_dock('cam_dock1').add_widget('BECFigure').image('daq_stream1') cam_widget = gui.add_dock('cam_dock1').add_widget('BECFigure').image('daq_stream1')
""" """
# Subscriptions for plotting image # Subscriptions for plotting image
USER_ACCESS = ["arm", "disarm", "get_last_image"] USER_ACCESS = ["arm", "disarm", "get_last_image"]
SUB_MONITOR = "device_monitor_2d" SUB_MONITOR = "device_monitor_2d"
@@ -112,8 +116,6 @@ class StdDaqPreviewDetector(PSIDetectorBase):
self._mon = Thread(target=self.poll, daemon=True) self._mon = Thread(target=self.poll, daemon=True)
self._mon.start() self._mon.start()
def disarm(self): def disarm(self):
"""Stop a running preview""" """Stop a running preview"""
if self._mon is not None: if self._mon is not None:
@@ -126,7 +128,6 @@ class StdDaqPreviewDetector(PSIDetectorBase):
except zmq.error.ZMQError: except zmq.error.ZMQError:
pass pass
def poll(self): def poll(self):
"""Collect streamed updates""" """Collect streamed updates"""
try: try:
@@ -142,8 +143,7 @@ class StdDaqPreviewDetector(PSIDetectorBase):
# Length and throtling checks # Length and throtling checks
if len(r) != 2: if len(r) != 2:
logger.warning( logger.warning(f"[{self.name}] Received malformed array of length {len(r)}")
f"[{self.name}] Received malformed array of length {len(r)}")
t_curr = time() t_curr = time()
t_elapsed = t_curr - t_last t_elapsed = t_curr - t_last
if t_elapsed < self.throttle.get(): if t_elapsed < self.throttle.get():
@@ -159,14 +159,14 @@ class StdDaqPreviewDetector(PSIDetectorBase):
if header["type"] == "uint16": if header["type"] == "uint16":
image = np.frombuffer(data, dtype=np.uint16) image = np.frombuffer(data, dtype=np.uint16)
if image.size != np.prod(header['shape']): if image.size != np.prod(header["shape"]):
err = f"Unexpected array size of {image.size} for header: {header}" err = f"Unexpected array size of {image.size} for header: {header}"
raise ValueError(err) raise ValueError(err)
image = image.reshape(header['shape']) image = image.reshape(header["shape"])
# Update image and update subscribers # Update image and update subscribers
self.frame.put(header['frame'], force=True) self.frame.put(header["frame"], force=True)
self.image_shape.put(header['shape'], force=True) self.image_shape.put(header["shape"], force=True)
self.image.put(image, force=True) self.image.put(image, force=True)
self._last_image = image self._last_image = image
self._run_subs(sub_type=self.SUB_MONITOR, value=image) self._run_subs(sub_type=self.SUB_MONITOR, value=image)
@@ -174,7 +174,7 @@ class StdDaqPreviewDetector(PSIDetectorBase):
logger.info( logger.info(
f"[{self.name}] Updated frame {header['frame']}\t" f"[{self.name}] Updated frame {header['frame']}\t"
f"Shape: {header['shape']}\tMean: {np.mean(image):.3f}" f"Shape: {header['shape']}\tMean: {np.mean(image):.3f}"
) )
except ValueError: except ValueError:
# Happens when ZMQ partially delivers the multipart message # Happens when ZMQ partially delivers the multipart message
pass pass

View File

@@ -10,7 +10,8 @@ logger = bec_logger.logger
class Shutter: class Shutter:
""" Shutter status """ """Shutter status"""
CLOSED = 0 CLOSED = 0
OPEN = 1 OPEN = 1
@@ -38,21 +39,33 @@ class AcquireDarkV2(Acquire):
""" """
super().__init__(exp_burst=exp_burst, file_prefix="", **kwargs) super().__init__(exp_burst=exp_burst, file_prefix="", **kwargs)
self.burst_at_each_point = 1 # At each point, how many times I want to individually trigger self.burst_at_each_point = 1 # At each point, how many times I want to individually trigger
self.exp_burst = exp_burst self.exp_burst = exp_burst
self.file_prefix = file_prefix self.file_prefix = file_prefix
def pre_scan(self): def pre_scan(self):
""" Close the shutter before scan""" """Close the shutter before scan"""
yield from self.stubs.set(device=["eyex"], value=[Shutter.CLOSED]) yield from self.stubs.set(device=["eyex"], value=[Shutter.CLOSED])
return super().pre_scan() return super().pre_scan()
def direct(self): def direct(self):
""" Direct scan procedure call""" """Direct scan procedure call"""
# Collect relevant devices # Collect relevant devices
self.cams = [cam.name for cam in self.device_manager.devices.get_devices_with_tags("camera") if cam.enabled] self.cams = [
self.prev = [pre.name for pre in self.device_manager.devices.get_devices_with_tags("preview") if pre.enabled] cam.name
self.daqs = [daq.name for daq in self.device_manager.devices.get_devices_with_tags("daq") if daq.enabled] for cam in self.device_manager.devices.get_devices_with_tags("camera")
if cam.enabled
]
self.prev = [
pre.name
for pre in self.device_manager.devices.get_devices_with_tags("preview")
if pre.enabled
]
self.daqs = [
daq.name
for daq in self.device_manager.devices.get_devices_with_tags("daq")
if daq.enabled
]
# Do not call stage, as there's no ScanInfo emitted for direct call # Do not call stage, as there's no ScanInfo emitted for direct call
for daq in self.daqs: for daq in self.daqs:
@@ -60,12 +73,14 @@ class AcquireDarkV2(Acquire):
prefix = f"{self.file_prefix}_{cam}_dark" prefix = f"{self.file_prefix}_{cam}_dark"
yield from self.stubs.send_rpc_and_wait(daq, "file_prefix.set", prefix) yield from self.stubs.send_rpc_and_wait(daq, "file_prefix.set", prefix)
yield from self.stubs.send_rpc_and_wait(daq, "num_images.set", self.exp_burst) yield from self.stubs.send_rpc_and_wait(daq, "num_images.set", self.exp_burst)
yield from self.stubs.send_rpc_and_wait(daq, "bluestage") yield from self.stubs.send_rpc_and_wait(daq, "arm")
for prev in self.prev: for prev in self.prev:
yield from self.stubs.send_rpc_and_wait(prev, "arm") yield from self.stubs.send_rpc_and_wait(prev, "arm")
for cam in self.cams: for cam in self.cams:
yield from self.stubs.send_rpc_and_wait(cam, "configure", {'exposure_num_burst': self.exp_burst}) yield from self.stubs.send_rpc_and_wait(
yield from self.stubs.send_rpc_and_wait(cam, "bluestage") cam, "configure", {"exposure_num_burst": self.exp_burst}
)
yield from self.stubs.send_rpc_and_wait(cam, "arm")
yield from self.pre_scan() yield from self.pre_scan()
yield from self.scan_core() yield from self.scan_core()
@@ -78,11 +93,19 @@ class AcquireWhiteV2(Acquire):
scan_name = "acquire_white_v2" scan_name = "acquire_white_v2"
gui_config = {"Acquisition parameters": ["exp_burst"]} gui_config = {"Acquisition parameters": ["exp_burst"]}
def __init__(self, motor: DeviceBase, exp_burst: int, sample_position_out: float, sample_angle_out: float, file_prefix: str="", **kwargs): def __init__(
self,
motor: DeviceBase,
exp_burst: int,
sample_position_out: float,
sample_angle_out: float,
file_prefix: str = "",
**kwargs,
):
""" """
Acquire flat field images. This scan is used to acquire flat field images. The flat field image is an image taken Acquire flat field images. This scan is used to acquire flat field images. The flat field
with the shutter open but the sample out of the beam. Flat field images are used to correct the data images for image is an image taken with the shutter open but the sample out of the beam. Flat field
non-uniformity in the detector. images are used to correct the data images for non-uniformity in the detector.
Args: Args:
motor : DeviceBase motor : DeviceBase
@@ -90,7 +113,7 @@ class AcquireWhiteV2(Acquire):
exp_burst : int exp_burst : int
Number of flat field images to acquire (no default) Number of flat field images to acquire (no default)
sample_position_out : float sample_position_out : float
Position to move the sample stage to position the sample out of beam and take flat field images Position to move the sample stage out of beam and take flat field images
sample_angle_out : float sample_angle_out : float
Angular position where to take the flat field images Angular position where to take the flat field images
@@ -108,7 +131,7 @@ class AcquireWhiteV2(Acquire):
self.out_position = [sample_position_out, sample_angle_out] self.out_position = [sample_position_out, sample_angle_out]
def pre_scan(self): def pre_scan(self):
""" Open the shutter before scan""" """Open the shutter before scan"""
# Move sample out # Move sample out
yield from self._move_scan_motors_and_wait(self.out_position) yield from self._move_scan_motors_and_wait(self.out_position)
# Open the main shutter (TODO change to the correct shutter device) # Open the main shutter (TODO change to the correct shutter device)
@@ -117,17 +140,29 @@ class AcquireWhiteV2(Acquire):
return super().pre_scan() return super().pre_scan()
def cleanup(self): def cleanup(self):
""" Close the shutter after scan""" """Close the shutter after scan"""
# Close fast shutter # Close fast shutter
yield from self.stubs.set(device=["eyex"], value=[Shutter.CLOSED]) yield from self.stubs.set(device=["eyex"], value=[Shutter.CLOSED])
return super().cleanup() return super().cleanup()
def direct(self): def direct(self):
""" Direct scan procedure call""" """Direct scan procedure call"""
# Collect relevant devices # Collect relevant devices
self.cams = [cam.name for cam in self.device_manager.devices.get_devices_with_tags("camera") if cam.enabled] self.cams = [
self.prev = [pre.name for pre in self.device_manager.devices.get_devices_with_tags("preview") if pre.enabled] cam.name
self.daqs = [daq.name for daq in self.device_manager.devices.get_devices_with_tags("daq") if daq.enabled] for cam in self.device_manager.devices.get_devices_with_tags("camera")
if cam.enabled
]
self.prev = [
pre.name
for pre in self.device_manager.devices.get_devices_with_tags("preview")
if pre.enabled
]
self.daqs = [
daq.name
for daq in self.device_manager.devices.get_devices_with_tags("daq")
if daq.enabled
]
# Do not call stage, as there's no ScanInfo emitted for direct call # Do not call stage, as there's no ScanInfo emitted for direct call
for daq in self.daqs: for daq in self.daqs:
@@ -135,12 +170,14 @@ class AcquireWhiteV2(Acquire):
prefix = f"{self.file_prefix}_{cam}_white" prefix = f"{self.file_prefix}_{cam}_white"
yield from self.stubs.send_rpc_and_wait(daq, "file_prefix.set", prefix) yield from self.stubs.send_rpc_and_wait(daq, "file_prefix.set", prefix)
yield from self.stubs.send_rpc_and_wait(daq, "num_images.set", self.exp_burst) yield from self.stubs.send_rpc_and_wait(daq, "num_images.set", self.exp_burst)
yield from self.stubs.send_rpc_and_wait(daq, "bluestage") yield from self.stubs.send_rpc_and_wait(daq, "arm")
for prev in self.prev: for prev in self.prev:
yield from self.stubs.send_rpc_and_wait(prev, "arm") yield from self.stubs.send_rpc_and_wait(prev, "arm")
for cam in self.cams: for cam in self.cams:
yield from self.stubs.send_rpc_and_wait(cam, "configure", {'exposure_num_burst': self.exp_burst}) yield from self.stubs.send_rpc_and_wait(
yield from self.stubs.send_rpc_and_wait(cam, "bluestage") cam, "configure", {"exposure_num_burst": self.exp_burst}
)
yield from self.stubs.send_rpc_and_wait(cam, "arm")
yield from self.pre_scan() yield from self.pre_scan()
yield from self.scan_core() yield from self.scan_core()
@@ -267,9 +304,9 @@ class AcquireRefsV2(Acquire):
sample_angle_out: float = 0, sample_angle_out: float = 0,
sample_position_in: float = 0, sample_position_in: float = 0,
sample_position_out: float = 1, sample_position_out: float = 1,
file_prefix_dark: str = 'tmp_dark', file_prefix_dark: str = "tmp_dark",
file_prefix_white: str = 'tmp_white', file_prefix_white: str = "tmp_white",
**kwargs **kwargs,
): ):
""" """
Acquire reference images (darks + whites) and return to beam position. Acquire reference images (darks + whites) and return to beam position.
@@ -334,15 +371,11 @@ class AcquireRefsV2(Acquire):
if self.num_darks: if self.num_darks:
msg = f"Acquiring {self.num_darks} dark images" msg = f"Acquiring {self.num_darks} dark images"
logger.warning(msg) logger.warning(msg)
self.connector.send_client_info( self.connector.send_client_info(msg, show_asap=True, rid=self.metadata.get("RID"))
msg,
show_asap=True,
rid=self.metadata.get("RID"),
)
darks = AcquireDarkV2( darks = AcquireDarkV2(
exp_burst=self.num_darks, exp_burst=self.num_darks,
# file_prefix=self.file_prefix_dark, # file_prefix=self.file_prefix_dark,
device_manager=self.device_manager, device_manager=self.device_manager,
metadata=self.metadata, metadata=self.metadata,
instruction_handler=self.stubs._instruction_handler, instruction_handler=self.stubs._instruction_handler,
@@ -352,15 +385,10 @@ class AcquireRefsV2(Acquire):
yield from darks.direct() yield from darks.direct()
self.point_id = darks.point_id self.point_id = darks.point_id
if self.num_flats: if self.num_flats:
msg = f"Acquiring {self.num_flats} flat field images" msg = f"Acquiring {self.num_flats} flat field images"
logger.warning(msg) logger.warning(msg)
self.connector.send_client_info( self.connector.send_client_info(msg, show_asap=True, rid=self.metadata.get("RID"))
msg,
show_asap=True,
rid=self.metadata.get("RID"),
)
logger.warning("Calling AcquireWhite") logger.warning("Calling AcquireWhite")
flats = AcquireWhiteV2( flats = AcquireWhiteV2(
@@ -380,4 +408,3 @@ class AcquireRefsV2(Acquire):
## TODO move sample in beam and do not wait ## TODO move sample in beam and do not wait
## TODO move rotation to angle and do not wait ## TODO move rotation to angle and do not wait
logger.warning("[AcquireRefsV2] Done with scan_core") logger.warning("[AcquireRefsV2] Done with scan_core")