WIP
This commit is contained in:
@@ -7,78 +7,17 @@ synchronized output (PSO) interface.
|
||||
"""
|
||||
from time import sleep
|
||||
import numpy as np
|
||||
from ophyd import Component, EpicsSignal, EpicsSignalRO, Kind
|
||||
from ophyd import Device, Component, EpicsSignal, EpicsSignalRO, Kind
|
||||
from ophyd.status import DeviceStatus
|
||||
|
||||
from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase as PSIDeviceBase
|
||||
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
|
||||
CustomDetectorMixin as CustomDeviceMixin,
|
||||
)
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||
|
||||
from bec_lib import bec_logger
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
class AerotechPsoDistanceMixin(CustomDeviceMixin):
|
||||
"""Mixin class for self-configuration and staging
|
||||
"""
|
||||
# parent : aa1Tasks
|
||||
def on_stage(self) -> None:
|
||||
"""Configuration and staging
|
||||
|
||||
NOTE: Scans don't have to fully configure the device, that can be done
|
||||
manually outside. However we expect that the device is disabled
|
||||
when not in use. I.e. this method is not expected to be called when
|
||||
PSO is not needed or when it'd conflict with other devices.
|
||||
"""
|
||||
# Fish out configuration from scaninfo (does not need to be full configuration)
|
||||
d = {}
|
||||
if "kwargs" in self.parent.scaninfo.scan_msg.info:
|
||||
scanargs = self.parent.scaninfo.scan_msg.info["kwargs"]
|
||||
if "pso_distance" in scanargs:
|
||||
d["pso_distance"] = scanargs["pso_distance"]
|
||||
if "pso_wavemode" in scanargs:
|
||||
d["pso_wavemode"] = scanargs["pso_wavemode"]
|
||||
if "pso_w_pulse" in scanargs:
|
||||
d["pso_w_pulse"] = scanargs["pso_w_pulse"]
|
||||
if "pso_t_pulse" in scanargs:
|
||||
d["pso_t_pulse"] = scanargs["pso_t_pulse"]
|
||||
if "pso_n_pulse" in scanargs:
|
||||
d["pso_n_pulse"] = scanargs["pso_n_pulse"]
|
||||
|
||||
# Perform bluesky-style configuration
|
||||
if len(d) > 0:
|
||||
logger.info(f"[{self.parent.name}] Configuring with:\n{d}")
|
||||
self.parent.configure(d=d)
|
||||
|
||||
# Stage the PSO distance module
|
||||
self.parent.bluestage()
|
||||
|
||||
def on_unstage(self):
|
||||
"""Standard bluesky unstage"""
|
||||
# Ensure output is set to low
|
||||
# if self.parent.output.value:
|
||||
# self.parent.toggle()
|
||||
# Turn off window mode
|
||||
self.parent.winOutput.set("Off").wait()
|
||||
self.parent.winEvents.set("Off").wait()
|
||||
# Turn off distance mode
|
||||
self.parent.dstEventsEna.set("Off").wait()
|
||||
self.parent.dstCounterEna.set("Off").wait()
|
||||
# Disable output
|
||||
self.parent.outSource.set("None").wait()
|
||||
# Sleep for one poll period
|
||||
sleep(0.2)
|
||||
|
||||
def on_trigger(self) -> None | DeviceStatus:
|
||||
"""Fire a single PSO event (i.e. manual software trigger)"""
|
||||
# Only trigger if distance was set to invalid
|
||||
logger.warning(f"[{self.parent.name}] Triggerin...")
|
||||
if self.parent.dstDistanceVal.get() == 0:
|
||||
status = self.parent._eventSingle.set(1, settle_time=0.1)
|
||||
return status
|
||||
|
||||
|
||||
class aa1AxisPsoBase(PSIDeviceBase):
|
||||
class aa1AxisPsoBase(PSIDeviceBase, Device):
|
||||
"""Position Sensitive Output - Base class
|
||||
|
||||
This class provides convenience wrappers around the Aerotech IOC's PSO
|
||||
@@ -152,8 +91,34 @@ class aa1AxisPsoBase(PSIDeviceBase):
|
||||
outPin = Component(EpicsSignalRO, "PIN", auto_monitor=True, kind=Kind.config)
|
||||
outSource = Component(EpicsSignal, "SOURCE", put_complete=True, kind=Kind.config)
|
||||
|
||||
def trigger(self, settle_time=0.1) -> DeviceStatus:
|
||||
def __init__(
|
||||
self,
|
||||
prefix="",
|
||||
*,
|
||||
name,
|
||||
kind=None,
|
||||
read_attrs=None,
|
||||
configuration_attrs=None,
|
||||
parent=None,
|
||||
scan_info=None,
|
||||
**kwargs,
|
||||
):
|
||||
# Need to call super() to call the mixin class
|
||||
super().__init__(
|
||||
prefix=prefix,
|
||||
name=name,
|
||||
kind=kind,
|
||||
read_attrs=read_attrs,
|
||||
configuration_attrs=configuration_attrs,
|
||||
parent=parent,
|
||||
scan_info=scan_info,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
def fire(self, settle_time=0.1) -> None | DeviceStatus:
|
||||
"""Fire a single PSO event (i.e. manual software trigger)"""
|
||||
# Only trigger if distance was set to invalid
|
||||
logger.warning(f"[{self.name}] Triggerin...")
|
||||
self._eventSingle.set(1, settle_time=settle_time).wait()
|
||||
status = DeviceStatus(self)
|
||||
status.set_finished()
|
||||
@@ -163,7 +128,7 @@ class aa1AxisPsoBase(PSIDeviceBase):
|
||||
"""Toggle waveform outup"""
|
||||
orig_wave_mode = self.waveMode.get()
|
||||
self.waveMode.set("Toggle").wait()
|
||||
self.trigger(0.1)
|
||||
self.fire(0.1)
|
||||
self.waveMode.set(orig_wave_mode).wait()
|
||||
|
||||
def configure(self, d: dict):
|
||||
@@ -232,13 +197,12 @@ class aa1AxisPsoDistance(aa1AxisPsoBase):
|
||||
```
|
||||
"""
|
||||
|
||||
custom_prepare_cls = AerotechPsoDistanceMixin
|
||||
USER_ACCESS = ["configure", "prepare", "toggle"]
|
||||
_distance_value = None
|
||||
|
||||
# ########################################################################
|
||||
# PSO high level interface
|
||||
def configure(self, d: dict = {}) -> tuple:
|
||||
def configure(self, d: dict = None) -> tuple:
|
||||
"""Simplified configuration interface to access the most common
|
||||
functionality for distance mode PSO.
|
||||
|
||||
@@ -282,7 +246,49 @@ class aa1AxisPsoDistance(aa1AxisPsoBase):
|
||||
logger.info(f"[{self.name}] PSO configured to {pso_wavemode} mode")
|
||||
return (old, new)
|
||||
|
||||
def bluestage(self) -> None:
|
||||
def on_stage(self) -> None:
|
||||
"""Configuration and staging
|
||||
|
||||
NOTE: Scans don't have to fully configure the device, that can be done
|
||||
manually outside. However we expect that the device is disabled
|
||||
when not in use. I.e. this method is not expected to be called when
|
||||
PSO is not needed or when it'd conflict with other devices.
|
||||
"""
|
||||
# Fish out configuration from scaninfo (does not need to be full configuration)
|
||||
d = {}
|
||||
if "kwargs" in self.scaninfo.scan_msg.info:
|
||||
scanargs = self.scaninfo.scan_msg.info["kwargs"]
|
||||
if "pso_distance" in scanargs:
|
||||
d["pso_distance"] = scanargs["pso_distance"]
|
||||
if "pso_wavemode" in scanargs:
|
||||
d["pso_wavemode"] = scanargs["pso_wavemode"]
|
||||
if "pso_w_pulse" in scanargs:
|
||||
d["pso_w_pulse"] = scanargs["pso_w_pulse"]
|
||||
if "pso_t_pulse" in scanargs:
|
||||
d["pso_t_pulse"] = scanargs["pso_t_pulse"]
|
||||
if "pso_n_pulse" in scanargs:
|
||||
d["pso_n_pulse"] = scanargs["pso_n_pulse"]
|
||||
|
||||
# Perform bluesky-style configuration
|
||||
if d:
|
||||
# logger.info(f"[{self.name}] Configuring with:\n{d}")
|
||||
self.configure(d=d)
|
||||
|
||||
# Stage the PSO distance module
|
||||
self.arm()
|
||||
|
||||
def on_unstage(self):
|
||||
"""Turn off the PSO module"""
|
||||
self.disarm()
|
||||
|
||||
def on_trigger(self, settle_time=0.1) -> None | DeviceStatus:
|
||||
"""Fire a single PSO event (i.e. manual software trigger)"""
|
||||
# Only trigger if distance was set to invalid
|
||||
# if self.dstDistanceVal.get() == 0:
|
||||
logger.warning(f"[{self.name}] Triggerin...")
|
||||
return self.fire(settle_time)
|
||||
|
||||
def arm(self) -> None:
|
||||
"""Bluesky style stage"""
|
||||
# Stage the PSO distance module and zero counter
|
||||
if isinstance(self._distance_value, (np.ndarray, list, tuple)):
|
||||
@@ -293,3 +299,19 @@ class aa1AxisPsoDistance(aa1AxisPsoBase):
|
||||
if self.dstDistanceVal.get() > 0:
|
||||
self.dstEventsEna.set("On").wait()
|
||||
self.dstCounterEna.set("On").wait()
|
||||
|
||||
def disarm(self):
|
||||
"""Standard bluesky unstage"""
|
||||
# Ensure output is set to low
|
||||
# if self.output.value:
|
||||
# self.toggle()
|
||||
# Turn off window mode
|
||||
self.winOutput.set("Off").wait()
|
||||
self.winEvents.set("Off").wait()
|
||||
# Turn off distance mode
|
||||
self.dstEventsEna.set("Off").wait()
|
||||
self.dstCounterEna.set("Off").wait()
|
||||
# Disable output
|
||||
self.outSource.set("None").wait()
|
||||
# Sleep for one poll period
|
||||
sleep(0.2)
|
||||
|
||||
@@ -5,71 +5,17 @@ interface.
|
||||
|
||||
@author: mohacsi_i
|
||||
"""
|
||||
from ophyd import Component, EpicsSignal, EpicsSignalRO, Kind
|
||||
from ophyd import Device, Component, EpicsSignal, EpicsSignalRO, Kind
|
||||
from ophyd.status import DeviceStatus, SubscriptionStatus
|
||||
|
||||
from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase as PSIDeviceBase
|
||||
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
|
||||
CustomDetectorMixin as CustomDeviceMixin,
|
||||
)
|
||||
from ophyd_devices.interfaces.base_classes.psi_device_base import PSIDeviceBase
|
||||
|
||||
from bec_lib import bec_logger
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
class AerotechTasksMixin(CustomDeviceMixin):
|
||||
"""Mixin class for self-configuration and staging"""
|
||||
|
||||
# parent : aa1Tasks
|
||||
def on_stage(self) -> None:
|
||||
"""Configuration and staging
|
||||
|
||||
In the BEC model ophyd devices must fish out their own configuration from the 'scaninfo'.
|
||||
I.e. they need to know which parameters are relevant for them at each scan.
|
||||
|
||||
NOTE: Scans don't have to fully configure the device, that can be done
|
||||
manually outside. However we expect that the device is disabled
|
||||
when not in use. I.e. this method is not expected to be called when
|
||||
PSO is not needed or when it'd conflict with other devices.
|
||||
"""
|
||||
# Fish out our configuration from scaninfo (via explicit or generic addressing)
|
||||
d = {}
|
||||
if "kwargs" in self.parent.scaninfo.scan_msg.info:
|
||||
scanargs = self.parent.scaninfo.scan_msg.info["kwargs"]
|
||||
if self.parent.scaninfo.scan_type in ("script", "scripted"):
|
||||
# NOTE: Scans don't have to fully configure the device
|
||||
if "script_text" in scanargs and scanargs["script_text"] is not None:
|
||||
d["script_text"] = scanargs["script_text"]
|
||||
if "script_file" in scanargs and scanargs["script_file"] is not None:
|
||||
d["script_file"] = scanargs["script_file"]
|
||||
if "script_task" in scanargs and scanargs["script_task"] is not None:
|
||||
d["script_task"] = scanargs["script_task"]
|
||||
|
||||
# FIXME: The above should be exchanged with:
|
||||
# d = self.parent.scan_info.scan_msg.scan_parameters.get("aerotech_config")
|
||||
|
||||
# Perform bluesky-style configuration
|
||||
if d:
|
||||
# logger.warning(f"[{self.parent.name}] Configuring with:\n{d}")
|
||||
self.parent.configure(d=d)
|
||||
|
||||
# The actual staging
|
||||
self.parent.arm()
|
||||
|
||||
def on_unstage(self):
|
||||
"""Stop the currently selected task"""
|
||||
self.parent.disarm()
|
||||
|
||||
def on_stop(self):
|
||||
"""Stop the currently selected task"""
|
||||
self.on_unstage()
|
||||
|
||||
def on_kickoff(self):
|
||||
"""Start execution of the selected task"""
|
||||
self.parent.launch()
|
||||
|
||||
|
||||
class aa1Tasks(PSIDeviceBase):
|
||||
class aa1Tasks(PSIDeviceBase, Device):
|
||||
"""Task management API
|
||||
|
||||
The place to manage tasks and AeroScript user files on the controller.
|
||||
@@ -99,8 +45,8 @@ class aa1Tasks(PSIDeviceBase):
|
||||
'''
|
||||
|
||||
"""
|
||||
|
||||
USER_ACCESS = ["arm", "disarm", "launch", "kickoff"]
|
||||
custom_prepare_cls = AerotechTasksMixin
|
||||
|
||||
_failure = Component(EpicsSignalRO, "FAILURE", auto_monitor=True, kind=Kind.normal)
|
||||
errStatus = Component(EpicsSignalRO, "ERRW", auto_monitor=True, kind=Kind.normal)
|
||||
@@ -117,6 +63,30 @@ class aa1Tasks(PSIDeviceBase):
|
||||
EpicsSignal, "FILEWRITE", string=True, kind=Kind.omitted, put_complete=True
|
||||
)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
prefix="",
|
||||
*,
|
||||
name,
|
||||
kind=None,
|
||||
read_attrs=None,
|
||||
configuration_attrs=None,
|
||||
parent=None,
|
||||
scan_info=None,
|
||||
**kwargs,
|
||||
):
|
||||
# Need to call super() to call the mixin class
|
||||
super().__init__(
|
||||
prefix=prefix,
|
||||
name=name,
|
||||
kind=kind,
|
||||
read_attrs=read_attrs,
|
||||
configuration_attrs=configuration_attrs,
|
||||
parent=parent,
|
||||
scan_info=scan_info,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
def configure(self, d: dict) -> tuple:
|
||||
"""Configuration interface for flying"""
|
||||
# Common operations
|
||||
@@ -141,6 +111,52 @@ class aa1Tasks(PSIDeviceBase):
|
||||
new = self.read_configuration()
|
||||
return (old, new)
|
||||
|
||||
def on_stage(self) -> None:
|
||||
"""Configuration and staging
|
||||
|
||||
In the BEC model ophyd devices must fish out their own configuration from the 'scaninfo'.
|
||||
I.e. they need to know which parameters are relevant for them at each scan.
|
||||
|
||||
NOTE: Scans don't have to fully configure the device, that can be done
|
||||
manually outside. However we expect that the device is disabled
|
||||
when not in use. I.e. this method is not expected to be called when
|
||||
PSO is not needed or when it'd conflict with other devices.
|
||||
"""
|
||||
# Fish out our configuration from scaninfo (via explicit or generic addressing)
|
||||
d = {}
|
||||
if "kwargs" in self.scaninfo.scan_msg.info:
|
||||
scanargs = self.scaninfo.scan_msg.info["kwargs"]
|
||||
if self.scaninfo.scan_type in ("script", "scripted"):
|
||||
# NOTE: Scans don't have to fully configure the device
|
||||
if "script_text" in scanargs and scanargs["script_text"] is not None:
|
||||
d["script_text"] = scanargs["script_text"]
|
||||
if "script_file" in scanargs and scanargs["script_file"] is not None:
|
||||
d["script_file"] = scanargs["script_file"]
|
||||
if "script_task" in scanargs and scanargs["script_task"] is not None:
|
||||
d["script_task"] = scanargs["script_task"]
|
||||
|
||||
# FIXME: The above should be exchanged with:
|
||||
# d = self.scan_info.scan_msg.scan_parameters.get("aerotech_config")
|
||||
|
||||
# Perform bluesky-style configuration
|
||||
if d:
|
||||
self.configure(d=d)
|
||||
|
||||
# The actual staging
|
||||
self.arm()
|
||||
|
||||
def on_unstage(self):
|
||||
"""Stop the currently selected task"""
|
||||
self.disarm()
|
||||
|
||||
def on_stop(self):
|
||||
"""Stop the currently selected task"""
|
||||
self.unstage()
|
||||
|
||||
def on_kickoff(self):
|
||||
"""Start execution of the selected task"""
|
||||
self.launch()
|
||||
|
||||
def arm(self) -> None:
|
||||
"""Bluesky style stage, prepare, but does not execute"""
|
||||
if self.taskIndex.get() in (0, 1, 2):
|
||||
@@ -165,12 +181,6 @@ class aa1Tasks(PSIDeviceBase):
|
||||
raise RuntimeError("Failed to load task, please check the Aerotech IOC")
|
||||
return status
|
||||
|
||||
def kickoff(self):
|
||||
"""Missing kickoff, for real?"""
|
||||
self.custom_prepare.on_kickoff()
|
||||
|
||||
##########################################################################
|
||||
# Bluesky flyer interface
|
||||
def complete(self) -> SubscriptionStatus:
|
||||
"""Wait for a RUNNING task"""
|
||||
timestamp_ = 0
|
||||
|
||||
@@ -189,13 +189,6 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase):
|
||||
if daq_update:
|
||||
self.backend.set_config(daq_update, force=False)
|
||||
|
||||
def disarm(self):
|
||||
""" Bluesky style unstage"""
|
||||
# Switch to idle
|
||||
self.cmdStartCamera.set(0).wait()
|
||||
if self.autoSoftEnable.get():
|
||||
self.cmdSoftEnable.set(0).wait()
|
||||
|
||||
def set_acquisition_mode(self, acq_mode):
|
||||
"""Set acquisition mode
|
||||
|
||||
@@ -217,7 +210,6 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase):
|
||||
self.enable_mode = "soft"
|
||||
self.trigger_mode = "auto"
|
||||
self.exposure_mode = "timer"
|
||||
|
||||
elif acq_mode in ["ext_enable", "external_enable"]:
|
||||
# NOTE: Trigger using external hardware events via enable input (actually works)
|
||||
# Switch to physical enable signal
|
||||
@@ -228,7 +220,6 @@ class GigaFrostCamera(PSIDeviceBase, GigaFrostBase):
|
||||
self.enable_mode = "external"
|
||||
self.trigger_mode = "auto"
|
||||
self.exposure_mode = "timer"
|
||||
|
||||
elif acq_mode == "soft":
|
||||
# NOTE: Fede's configuration for continous streaming
|
||||
# Switch to physical enable signal
|
||||
|
||||
@@ -13,7 +13,7 @@ import requests
|
||||
import os
|
||||
|
||||
from ophyd import Signal, Component, Kind
|
||||
from ophyd.status import SubscriptionStatus, Status
|
||||
from ophyd.status import SubscriptionStatus
|
||||
from websockets.sync.client import connect, ClientConnection
|
||||
from websockets.exceptions import ConnectionClosedOK, ConnectionClosedError
|
||||
|
||||
@@ -28,6 +28,8 @@ logger = bec_logger.logger
|
||||
|
||||
class StdDaqMixin(CustomDeviceMixin):
|
||||
# pylint: disable=protected-access
|
||||
_mon = None
|
||||
|
||||
def on_stage(self) -> None:
|
||||
"""Configuration and staging
|
||||
|
||||
@@ -39,9 +41,6 @@ class StdDaqMixin(CustomDeviceMixin):
|
||||
# Fish out our configuration from scaninfo (via explicit or generic addressing)
|
||||
# NOTE: Scans don't have to fully configure the device
|
||||
d = {}
|
||||
# scan_parameters = self.parent.scaninfo.scan_msg.scan_parameters
|
||||
# std_daq_params = scan_parameters.get("std_daq_params")
|
||||
|
||||
if "kwargs" in self.parent.scaninfo.scan_msg.info:
|
||||
scanargs = self.parent.scaninfo.scan_msg.info["kwargs"]
|
||||
if "image_width" in scanargs and scanargs["image_width"] is not None:
|
||||
@@ -50,10 +49,6 @@ class StdDaqMixin(CustomDeviceMixin):
|
||||
d["image_height"] = scanargs["image_height"]
|
||||
if "nr_writers" in scanargs and scanargs["nr_writers"] is not None:
|
||||
d["nr_writers"] = scanargs["nr_writers"]
|
||||
if "system_config" in scanargs and scanargs["system_config"] is not None:
|
||||
if scanargs["system_config"]["file_directory"]:
|
||||
file_directory = scanargs["system_config"]["file_directory"]
|
||||
### to be used in the future to substitute the procedure using file path
|
||||
if "file_path" in scanargs and scanargs["file_path"] is not None:
|
||||
self.parent.file_path.set(scanargs["file_path"].replace("data", "gpfs")).wait()
|
||||
print(scanargs["file_path"])
|
||||
@@ -98,20 +93,40 @@ class StdDaqMixin(CustomDeviceMixin):
|
||||
sleep(0.5)
|
||||
|
||||
# Try to start a new run (reconnects)
|
||||
self.parent.arm()
|
||||
|
||||
|
||||
self.parent.bluestage()
|
||||
# And start status monitoring
|
||||
self._mon = Thread(target=self.monitor, daemon=True)
|
||||
self._mon.start()
|
||||
|
||||
def on_unstage(self):
|
||||
"""Stop a running acquisition and close connection"""
|
||||
print("Creating virtual dataset")
|
||||
self.parent.create_virtual_dataset()
|
||||
self.parent.disarm()
|
||||
self.parent.blueunstage()
|
||||
|
||||
def on_stop(self):
|
||||
"""Stop a running acquisition and close connection"""
|
||||
self.parent.disarm()
|
||||
self.parent.blueunstage()
|
||||
|
||||
def monitor(self) -> None:
|
||||
"""Monitor status messages while connection is open. This will block the reply monitoring
|
||||
to calling unstage() might throw. Status updates are sent every 1 seconds, but finishing
|
||||
acquisition means StdDAQ will close connection, so there's no idle state polling.
|
||||
"""
|
||||
try:
|
||||
sleep(0.2)
|
||||
for msg in self.parent._wsclient:
|
||||
message = json.loads(msg)
|
||||
self.parent.runstatus.put(message["status"], force=True)
|
||||
# logger.info(f"[{self.parent.name}] Pushed status: {message['status']}")
|
||||
except (ConnectionClosedError, ConnectionClosedOK, AssertionError):
|
||||
# Libraty throws theese after connection is closed
|
||||
return
|
||||
except Exception as ex:
|
||||
logger.warning(f"[{self.parent.name}] Exception in polling: {ex}")
|
||||
return
|
||||
finally:
|
||||
self._mon = None
|
||||
|
||||
|
||||
class StdDaqClient(PSIDeviceBase):
|
||||
@@ -137,9 +152,9 @@ class StdDaqClient(PSIDeviceBase):
|
||||
"nuke",
|
||||
"connect",
|
||||
"message",
|
||||
"arm",
|
||||
"disarm",
|
||||
"complete",
|
||||
"state",
|
||||
"bluestage",
|
||||
"blueunstage",
|
||||
]
|
||||
_wsclient = None
|
||||
|
||||
@@ -153,14 +168,12 @@ class StdDaqClient(PSIDeviceBase):
|
||||
file_prefix = Component(Signal, value="file", kind=Kind.config)
|
||||
# Configuration attributes
|
||||
rest_url = Component(Signal, kind=Kind.config, metadata={"write_access": False})
|
||||
datasource = Component(Signal, kind=Kind.config, metadata={"write_access": False})
|
||||
cfg_detector_name = Component(Signal, kind=Kind.config)
|
||||
cfg_detector_type = Component(Signal, kind=Kind.config)
|
||||
cfg_bit_depth = Component(Signal, kind=Kind.config)
|
||||
cfg_pixel_height = Component(Signal, kind=Kind.config)
|
||||
cfg_pixel_width = Component(Signal, kind=Kind.config)
|
||||
cfg_nr_writers = Component(Signal, kind=Kind.config)
|
||||
_mon = None
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -174,7 +187,7 @@ class StdDaqClient(PSIDeviceBase):
|
||||
device_manager=None,
|
||||
ws_url: str = "ws://localhost:8080",
|
||||
rest_url: str = "http://localhost:5000",
|
||||
data_source_name="",
|
||||
data_source_name=None,
|
||||
**kwargs,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
@@ -189,66 +202,67 @@ class StdDaqClient(PSIDeviceBase):
|
||||
)
|
||||
self.ws_url.set(ws_url, force=True).wait()
|
||||
self.rest_url.set(rest_url, force=True).wait()
|
||||
self.datasource.set(data_source_name, force=True).wait()
|
||||
self.data_source_name = data_source_name
|
||||
|
||||
# Connect to the DAQ and initialize values
|
||||
try:
|
||||
self.get_daq_config(update=True)
|
||||
except Exception as ex:
|
||||
logger.error(f"Failed to connect to the stdDAQ REST API\n{ex}")
|
||||
# Connect to websockets and start poller
|
||||
try:
|
||||
self.connect()
|
||||
except Exception as ex:
|
||||
logger.error(f"Failed to connect to the stdDAQ websocket interface\n{ex}")
|
||||
|
||||
def connect(self) -> None:
|
||||
def connect(self) -> ClientConnection:
|
||||
"""Connect to the StdDAQ's websockets interface
|
||||
|
||||
StdDAQ may reject connection for a few seconds after restart, or when
|
||||
it wants so if it fails, wait a bit and try to connect again.
|
||||
"""
|
||||
# Connect to stdDAQ
|
||||
logger.debug(f"[{self.name}] Connecting to stdDAQ at {self.ws_url.get()}")
|
||||
self._wsclient = connect(self.ws_url.get())
|
||||
# And start status monitoring
|
||||
self._mon = Thread(target=self.monitor, daemon=True)
|
||||
self._mon.start()
|
||||
num_retry = 0
|
||||
while num_retry < 5:
|
||||
try:
|
||||
logger.debug(f"[{self.name}] Connecting to stdDAQ at {self.ws_url.get()}")
|
||||
connection = connect(self.ws_url.get())
|
||||
logger.debug(f"[{self.name}] Connected to stdDAQ after {num_retry} tries")
|
||||
return connection
|
||||
except ConnectionRefusedError:
|
||||
num_retry += 1
|
||||
sleep(2)
|
||||
raise ConnectionRefusedError("The stdDAQ websocket interface refused connection 5 times.")
|
||||
|
||||
def monitor(self) -> None:
|
||||
"""Monitor status messages while connection is open. Status updates are
|
||||
sent every 1 seconds, or when there's a transition.
|
||||
"""
|
||||
try:
|
||||
for msg in self._wsclient:
|
||||
message = json.loads(msg)
|
||||
self.runstatus.put(message["status"], force=True)
|
||||
# logger.info(f"[{self.parent.name}] Pushed status: {message['status']}")
|
||||
except Exception as ex:
|
||||
logger.warning(f"[{self.name}] Exception in polling: {ex}")
|
||||
return
|
||||
finally:
|
||||
self._mon = None
|
||||
|
||||
def message(self, message: dict) -> None:
|
||||
def message(self, message: dict, timeout=1, wait_reply=True, client=None) -> None | str:
|
||||
"""Send a message to the StdDAQ and receive a reply
|
||||
|
||||
Note: finishing acquisition means StdDAQ will close connection, so
|
||||
there's no idle state polling.
|
||||
"""
|
||||
# Prepare message
|
||||
msg = json.dumps(message) if isinstance(message, dict) else str(message)
|
||||
|
||||
# Connect if client was destroyed
|
||||
if self._wsclient is None:
|
||||
self.connect()
|
||||
self._wsclient = self.connect()
|
||||
|
||||
# Send message (reopen connection if needed)
|
||||
msg = json.dumps(message) if isinstance(message, dict) else str(message)
|
||||
try:
|
||||
self._wsclient.send(msg)
|
||||
except (ConnectionClosedError, ConnectionClosedOK, AttributeError):
|
||||
except (ConnectionClosedError, ConnectionClosedOK, AttributeError) as ex:
|
||||
# Re-connect if the connection was closed
|
||||
self.connect()
|
||||
self._wsclient = self.connect()
|
||||
self._wsclient.send(msg)
|
||||
|
||||
# Wait for reply
|
||||
reply = None
|
||||
if wait_reply:
|
||||
try:
|
||||
reply = self._wsclient.recv(timeout)
|
||||
return reply
|
||||
except (ConnectionClosedError, ConnectionClosedOK) as ex:
|
||||
self._wsclient = None
|
||||
logger.error(f"[{self.name}] WS connection was closed before reply: {ex}")
|
||||
except (TimeoutError, RuntimeError) as ex:
|
||||
logger.error(f"[{self.name}] Error in receiving ws reply: {ex}")
|
||||
return reply
|
||||
|
||||
def configure(self, d: dict = None):
|
||||
"""Configure the next scan with the stdDAQ
|
||||
|
||||
@@ -259,8 +273,6 @@ class StdDaqClient(PSIDeviceBase):
|
||||
images (limited by the ringbuffer size and backend speed).
|
||||
file_path: str, optional
|
||||
File path to save the data, usually GPFS.
|
||||
file_prefix: str, optional
|
||||
File prefix to save the data [default = file].
|
||||
image_width : int, optional
|
||||
ROI size in the x-direction [pixels].
|
||||
image_height : int, optional
|
||||
@@ -283,10 +295,6 @@ class StdDaqClient(PSIDeviceBase):
|
||||
# Run parameters
|
||||
if "num_points_total" in d:
|
||||
self.num_images.set(d["num_points_total"]).wait()
|
||||
if "file_path" in d:
|
||||
self.file_path.set(d["file_path"]).wait()
|
||||
if "file_prefix" in d:
|
||||
self.file_prefix.set(d["file_prefix"]).wait()
|
||||
|
||||
# Restart the DAQ if resolution changed
|
||||
cfg = self.get_daq_config()
|
||||
@@ -298,8 +306,8 @@ class StdDaqClient(PSIDeviceBase):
|
||||
):
|
||||
|
||||
# Stop if current status is not idle
|
||||
# if self.runstatus.get() != "idle":
|
||||
# logger.warning(f"[{self.name}] stdDAQ reconfiguration might corrupt files")
|
||||
if self.state() != "idle":
|
||||
logger.warning(f"[{self.name}] stdDAQ reconfiguration might corrupt files")
|
||||
|
||||
# Update retrieved config
|
||||
cfg["image_pixel_height"] = int(self.cfg_pixel_height.get())
|
||||
@@ -310,7 +318,7 @@ class StdDaqClient(PSIDeviceBase):
|
||||
sleep(1)
|
||||
self.get_daq_config(update=True)
|
||||
|
||||
def arm(self):
|
||||
def bluestage(self):
|
||||
"""Stages the stdDAQ
|
||||
|
||||
Opens a new connection to the stdDAQ, sends the start command with
|
||||
@@ -318,58 +326,103 @@ class StdDaqClient(PSIDeviceBase):
|
||||
it for obvious failures.
|
||||
"""
|
||||
# Can't stage into a running exposure
|
||||
# if self.runstatus.get() != "idle":
|
||||
# raise RuntimeError(f"[{self.name}] stdDAQ can't stage from state: {self.runstatus.get()}")
|
||||
if self.state() != "idle":
|
||||
raise RuntimeError(f"[{self.name}] stdDAQ can't stage from state: {self.state()}")
|
||||
|
||||
# Ensure expected shape
|
||||
self.validate()
|
||||
# Must make sure that image size matches the data source
|
||||
if self.data_source_name is not None:
|
||||
cam_img_w = self.device_manager.devices[self.data_source_name].cfgRoiX.get()
|
||||
cam_img_h = self.device_manager.devices[self.data_source_name].cfgRoiY.get()
|
||||
daq_img_w = self.cfg_pixel_width.get()
|
||||
daq_img_h = self.cfg_pixel_height.get()
|
||||
|
||||
if not (daq_img_w == cam_img_w and daq_img_h == cam_img_h):
|
||||
raise RuntimeError(
|
||||
f"[{self.name}] stdDAQ image resolution ({daq_img_w} , {daq_img_h}) does not match camera with ({cam_img_w} , {cam_img_h})"
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
f"[{self.name}] stdDAQ image resolution ({daq_img_w} , {daq_img_h}) matches camera with ({cam_img_w} , {cam_img_h})"
|
||||
)
|
||||
|
||||
file_path = self.file_path.get()
|
||||
file_prefix = self.file_prefix.get()
|
||||
num_images = self.num_images.get()
|
||||
file_prefix = self.file_prefix.get()
|
||||
print(file_prefix)
|
||||
|
||||
# New connection
|
||||
self._wsclient = self.connect()
|
||||
message = {
|
||||
"command": "start",
|
||||
"path": file_path,
|
||||
"file_prefix": file_prefix,
|
||||
"n_image": num_images,
|
||||
}
|
||||
self.message(message)
|
||||
reply = self.message(message)
|
||||
|
||||
def is_running(*args, value, timestamp, **kwargs):
|
||||
result = value not in ["idle", "unknown", "error"]
|
||||
return result
|
||||
if reply is not None:
|
||||
reply = json.loads(reply)
|
||||
self.runstatus.set(reply["status"], force=True).wait()
|
||||
logger.info(f"[{self.name}] Start DAQ reply: {reply}")
|
||||
|
||||
status = SubscriptionStatus(self.runstatus, is_running, timeout=3, settle_time=0.5)
|
||||
status.wait()
|
||||
# Give it more time to reconfigure
|
||||
if reply["status"] in ("rejected"):
|
||||
# FIXME: running exposure is a nogo
|
||||
if reply["reason"] == "driver is busy!":
|
||||
raise RuntimeError(
|
||||
f"[{self.name}] Start stdDAQ command rejected: already running"
|
||||
)
|
||||
else:
|
||||
# Give it more time to consolidate
|
||||
sleep(1)
|
||||
else:
|
||||
# Success!!!
|
||||
print(f"[{self.name}] Started stdDAQ in: {reply['status']}")
|
||||
return
|
||||
|
||||
logger.warning(f"[{self.name}] Started stdDAQ in: {self.runstatus.get()}")
|
||||
return status
|
||||
raise RuntimeError(
|
||||
f"[{self.name}] Failed to start the stdDAQ in 1 tries, reason: {reply['reason']}"
|
||||
)
|
||||
|
||||
def disarm(self):
|
||||
def blueunstage(self):
|
||||
"""Unstages the stdDAQ
|
||||
|
||||
Opens a new connection to the stdDAQ, sends the stop command and
|
||||
waits for the idle state.
|
||||
"""
|
||||
# Stop the DAQ (will close connection) - reply is always "success"
|
||||
self.message({"command": "stop"})
|
||||
ii = 0
|
||||
while ii < 10:
|
||||
# Stop the DAQ (will close connection) - reply is always "success"
|
||||
self._wsclient = self.connect()
|
||||
self.message({"command": "stop_all"}, wait_reply=False)
|
||||
|
||||
def is_running(*args, value, timestamp, **kwargs):
|
||||
result = value in ["idle", "unknown", "error"]
|
||||
return result
|
||||
# Let it consolidate
|
||||
sleep(0.2)
|
||||
|
||||
status = SubscriptionStatus(self.runstatus, is_running, timeout=3, settle_time=0.5)
|
||||
status.wait()
|
||||
# Check final status (from new connection)
|
||||
self._wsclient = self.connect()
|
||||
reply = self.message({"command": "status"})
|
||||
if reply is not None:
|
||||
logger.info(f"[{self.name}] DAQ status reply: {reply}")
|
||||
reply = json.loads(reply)
|
||||
|
||||
logger.warning(f"[{self.name}] Stopped stdDAQ in: {self.runstatus.get()}")
|
||||
return status
|
||||
if reply["status"] in ("idle", "error"):
|
||||
# Only 'idle' state accepted
|
||||
print(f"DAQ stopped on try {ii}")
|
||||
return
|
||||
elif reply["status"] in ("stop"):
|
||||
# Give it more time to stop
|
||||
sleep(0.5)
|
||||
elif ii >= 6:
|
||||
raise RuntimeError(f"Failed to stop StdDAQ: {reply}")
|
||||
ii += 1
|
||||
raise RuntimeError(f"Failed to stop StdDAQ in time")
|
||||
|
||||
##########################################################################
|
||||
# Bluesky flyer interface
|
||||
def complete(self) -> SubscriptionStatus:
|
||||
"""Wait for current run. Must end in status 'file_saved'."""
|
||||
|
||||
def is_running(*args, value, timestamp, **kwargs):
|
||||
result = value in ["idle", "file_saved", "error"]
|
||||
return result
|
||||
@@ -377,25 +430,6 @@ class StdDaqClient(PSIDeviceBase):
|
||||
status = SubscriptionStatus(self.runstatus, is_running, settle_time=0.5)
|
||||
return status
|
||||
|
||||
def validate(self):
|
||||
"""Validate camera state
|
||||
|
||||
Ensure that data source shape matches with the shape expected by the stdDAQ.
|
||||
"""
|
||||
# Must make sure that image size matches the data source
|
||||
source = self.datasource.get()
|
||||
if source is not None and len(source) > 0:
|
||||
if source == "gfcam":
|
||||
cam_img_w = self.device_manager.devices[source].cfgRoiX.get()
|
||||
cam_img_h = self.device_manager.devices[source].cfgRoiY.get()
|
||||
daq_img_w = self.cfg_pixel_width.get()
|
||||
daq_img_h = self.cfg_pixel_height.get()
|
||||
|
||||
if not (daq_img_w == cam_img_w and daq_img_h == cam_img_h):
|
||||
raise RuntimeError(
|
||||
f"[{self.name}] stdDAQ image resolution ({daq_img_w} , {daq_img_h}) does not match camera with ({cam_img_w} , {cam_img_h})"
|
||||
)
|
||||
|
||||
def get_daq_config(self, update=False) -> dict:
|
||||
"""Read the current configuration from the DAQ"""
|
||||
r = requests.get(self.rest_url.get() + "/api/config/get", params={"user": "ioc"}, timeout=2)
|
||||
@@ -458,6 +492,17 @@ class StdDaqClient(PSIDeviceBase):
|
||||
self.set_daq_config(cfg)
|
||||
sleep(restarttime)
|
||||
|
||||
def state(self) -> str | None:
|
||||
"""Querry the current system status"""
|
||||
try:
|
||||
wsclient = self.connect()
|
||||
wsclient.send(json.dumps({"command": "status"}))
|
||||
r = wsclient.recv(timeout=1)
|
||||
r = json.loads(r)
|
||||
return r["status"]
|
||||
except ConnectionRefusedError:
|
||||
raise
|
||||
|
||||
|
||||
# Automatically connect to microXAS testbench if directly invoked
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -19,9 +19,15 @@ from ophyd_devices.interfaces.base_classes.psi_detector_base import (
|
||||
)
|
||||
|
||||
from bec_lib import bec_logger
|
||||
|
||||
logger = bec_logger.logger
|
||||
ZMQ_TOPIC_FILTER = b""
|
||||
ZMQ_TOPIC_FILTER = b''
|
||||
|
||||
|
||||
class StdDaqPreviewState(enum.IntEnum):
|
||||
"""Standard DAQ ophyd device states"""
|
||||
UNKNOWN = 0
|
||||
DETACHED = 1
|
||||
MONITORING = 2
|
||||
|
||||
|
||||
class StdDaqPreviewMixin(CustomDetectorMixin):
|
||||
@@ -29,19 +35,94 @@ class StdDaqPreviewMixin(CustomDetectorMixin):
|
||||
|
||||
Parent class: CustomDetectorMixin
|
||||
"""
|
||||
_mon = None
|
||||
|
||||
def on_stage(self):
|
||||
"""Start listening for preview data stream"""
|
||||
self.parent.arm()
|
||||
if self._mon is not None:
|
||||
self.parent.unstage()
|
||||
sleep(0.5)
|
||||
|
||||
self.parent.connect()
|
||||
self._stop_polling = False
|
||||
self._mon = Thread(target=self.poll, daemon=True)
|
||||
self._mon.start()
|
||||
|
||||
def on_unstage(self):
|
||||
"""Stop a running preview"""
|
||||
self.parent.disarm()
|
||||
if self._mon is not None:
|
||||
self._stop_polling = True
|
||||
# Might hang on recv_multipart
|
||||
self._mon.join(timeout=1)
|
||||
# So also disconnect the socket
|
||||
self.parent._socket.disconnect(self.parent.url.get())
|
||||
|
||||
def on_stop(self):
|
||||
"""Stop a running preview"""
|
||||
self.on_unstage()
|
||||
|
||||
def poll(self):
|
||||
"""Collect streamed updates"""
|
||||
self.parent.status.set(StdDaqPreviewState.MONITORING, force=True)
|
||||
try:
|
||||
t_last = time()
|
||||
while True:
|
||||
try:
|
||||
# Exit loop and finish monitoring
|
||||
if self._stop_polling:
|
||||
logger.info(f"[{self.parent.name}]\tDetaching monitor")
|
||||
break
|
||||
|
||||
# pylint: disable=no-member
|
||||
r = self.parent._socket.recv_multipart(flags=zmq.NOBLOCK)
|
||||
|
||||
# Length and throtling checks
|
||||
if len(r) != 2:
|
||||
logger.warning(
|
||||
f"[{self.parent.name}] Received malformed array of length {len(r)}")
|
||||
t_curr = time()
|
||||
t_elapsed = t_curr - t_last
|
||||
if t_elapsed < self.parent.throttle.get():
|
||||
sleep(0.1)
|
||||
continue
|
||||
|
||||
# Unpack the Array V1 reply to metadata and array data
|
||||
meta, data = r
|
||||
|
||||
# Update image and update subscribers
|
||||
header = json.loads(meta)
|
||||
if header["type"] == "uint16":
|
||||
image = np.frombuffer(data, dtype=np.uint16)
|
||||
if image.size != np.prod(header['shape']):
|
||||
err = f"Unexpected array size of {image.size} for header: {header}"
|
||||
raise ValueError(err)
|
||||
image = image.reshape(header['shape'])
|
||||
|
||||
# Update image and update subscribers
|
||||
self.parent.frame.put(header['frame'], force=True)
|
||||
self.parent.image_shape.put(header['shape'], force=True)
|
||||
self.parent.image.put(image, force=True)
|
||||
self.parent._last_image = image
|
||||
self.parent._run_subs(sub_type=self.parent.SUB_MONITOR, value=image)
|
||||
t_last = t_curr
|
||||
logger.info(
|
||||
f"[{self.parent.name}] Updated frame {header['frame']}\t"
|
||||
f"Shape: {header['shape']}\tMean: {np.mean(image):.3f}"
|
||||
)
|
||||
except ValueError:
|
||||
# Happens when ZMQ partially delivers the multipart message
|
||||
pass
|
||||
except zmq.error.Again:
|
||||
# Happens when receive queue is empty
|
||||
sleep(0.1)
|
||||
except Exception as ex:
|
||||
logger.info(f"[{self.parent.name}]\t{str(ex)}")
|
||||
raise
|
||||
finally:
|
||||
self._mon = None
|
||||
self.parent.status.set(StdDaqPreviewState.DETACHED, force=True)
|
||||
logger.info(f"[{self.parent.name}]\tDetaching monitor")
|
||||
|
||||
|
||||
class StdDaqPreviewDetector(PSIDetectorBase):
|
||||
"""Detector wrapper class around the StdDaq preview image stream.
|
||||
@@ -53,9 +134,8 @@ class StdDaqPreviewDetector(PSIDetectorBase):
|
||||
You can add a preview widget to the dock by:
|
||||
cam_widget = gui.add_dock('cam_dock1').add_widget('BECFigure').image('daq_stream1')
|
||||
"""
|
||||
|
||||
# Subscriptions for plotting image
|
||||
USER_ACCESS = ["arm", "disarm", "get_last_image"]
|
||||
USER_ACCESS = ["kickoff", "get_last_image"]
|
||||
SUB_MONITOR = "device_monitor_2d"
|
||||
_default_sub = SUB_MONITOR
|
||||
|
||||
@@ -64,19 +144,19 @@ class StdDaqPreviewDetector(PSIDetectorBase):
|
||||
# Status attributes
|
||||
url = Component(Signal, kind=Kind.config)
|
||||
throttle = Component(Signal, value=0.25, kind=Kind.config)
|
||||
status = Component(Signal, value=StdDaqPreviewState.UNKNOWN, kind=Kind.omitted)
|
||||
frame = Component(Signal, kind=Kind.hinted)
|
||||
image_shape = Component(Signal, kind=Kind.normal)
|
||||
# FIXME: The BEC client caches the read()s from the last 50 scans
|
||||
image = Component(Signal, kind=Kind.omitted)
|
||||
_last_image = None
|
||||
_stop_polling = True
|
||||
_mon = None
|
||||
|
||||
def __init__(
|
||||
self, *args, url: str = "tcp://129.129.95.38:20000", parent: Device = None, **kwargs
|
||||
) -> None:
|
||||
super().__init__(*args, parent=parent, **kwargs)
|
||||
self.url._metadata["write_access"] = False
|
||||
self.status._metadata["write_access"] = False
|
||||
self.image._metadata["write_access"] = False
|
||||
self.frame._metadata["write_access"] = False
|
||||
self.image_shape._metadata["write_access"] = False
|
||||
@@ -105,88 +185,9 @@ class StdDaqPreviewDetector(PSIDetectorBase):
|
||||
def get_image(self):
|
||||
return self._last_image
|
||||
|
||||
def arm(self):
|
||||
"""Start listening for preview data stream"""
|
||||
if self._mon is not None:
|
||||
self.unstage()
|
||||
sleep(0.5)
|
||||
|
||||
self.connect()
|
||||
self._stop_polling = False
|
||||
self._mon = Thread(target=self.poll, daemon=True)
|
||||
self._mon.start()
|
||||
|
||||
def disarm(self):
|
||||
"""Stop a running preview"""
|
||||
if self._mon is not None:
|
||||
self._stop_polling = True
|
||||
# Might hang on recv_multipart
|
||||
self._mon.join(timeout=1)
|
||||
# So also disconnect the socket (if not already disconnected)
|
||||
try:
|
||||
self._socket.disconnect(self.url.get())
|
||||
except zmq.error.ZMQError:
|
||||
pass
|
||||
|
||||
def poll(self):
|
||||
"""Collect streamed updates"""
|
||||
try:
|
||||
t_last = time()
|
||||
while True:
|
||||
try:
|
||||
# Exit loop and finish monitoring
|
||||
if self._stop_polling:
|
||||
break
|
||||
|
||||
# pylint: disable=no-member
|
||||
r = self._socket.recv_multipart(flags=zmq.NOBLOCK)
|
||||
|
||||
# Length and throtling checks
|
||||
if len(r) != 2:
|
||||
logger.warning(f"[{self.name}] Received malformed array of length {len(r)}")
|
||||
t_curr = time()
|
||||
t_elapsed = t_curr - t_last
|
||||
if t_elapsed < self.throttle.get():
|
||||
sleep(0.1)
|
||||
continue
|
||||
|
||||
# Unpack the Array V1 reply to metadata and array data
|
||||
meta, data = r
|
||||
|
||||
# Update image and update subscribers
|
||||
header = json.loads(meta)
|
||||
image = None
|
||||
if header["type"] == "uint16":
|
||||
image = np.frombuffer(data, dtype=np.uint16)
|
||||
|
||||
if image.size != np.prod(header["shape"]):
|
||||
err = f"Unexpected array size of {image.size} for header: {header}"
|
||||
raise ValueError(err)
|
||||
image = image.reshape(header["shape"])
|
||||
|
||||
# Update image and update subscribers
|
||||
self.frame.put(header["frame"], force=True)
|
||||
self.image_shape.put(header["shape"], force=True)
|
||||
self.image.put(image, force=True)
|
||||
self._last_image = image
|
||||
self._run_subs(sub_type=self.SUB_MONITOR, value=image)
|
||||
t_last = t_curr
|
||||
logger.info(
|
||||
f"[{self.name}] Updated frame {header['frame']}\t"
|
||||
f"Shape: {header['shape']}\tMean: {np.mean(image):.3f}"
|
||||
)
|
||||
except ValueError:
|
||||
# Happens when ZMQ partially delivers the multipart message
|
||||
pass
|
||||
except zmq.error.Again:
|
||||
# Happens when receive queue is empty
|
||||
sleep(0.1)
|
||||
except Exception as ex:
|
||||
logger.info(f"[{self.name}]\t{str(ex)}")
|
||||
raise
|
||||
finally:
|
||||
self._mon = None
|
||||
logger.info(f"[{self.name}]\tDetaching monitor")
|
||||
def kickoff(self) -> DeviceStatus:
|
||||
""" The DAQ was not meant to be toggled"""
|
||||
return DeviceStatus(self, done=True, success=True, settle_time=0.1)
|
||||
|
||||
|
||||
# Automatically connect to MicroSAXS testbench if directly invoked
|
||||
|
||||
Reference in New Issue
Block a user