This commit is contained in:
2024-04-22 14:19:49 +02:00
parent b3a4a35a7f
commit 432b275ceb

View File

@ -10,6 +10,102 @@ from ophyd.status import Status, SubscriptionStatus, StatusBase, DeviceStatus
from time import sleep from time import sleep
import warnings import warnings
import numpy as np import numpy as np
import time
from ophyd_devices.epics.devices.psi_detector_base import CustomDetectorMixin, PSIDetectorBase
class HelgeCameraMixin(CustomDetectorMixin):
"""Mixin class to setup the Helge camera bae class.
This class will be called by the custom_prepare_cls attribute of the detector class.
"""
def __init__(self, *_args, parent: Device = None, **_kwargs) -> None:
super().__init__(*_args, parent=parent, **_kwargs)
self.monitor_thread = None
self.stop_monitor = False
self.update_frequency = 1
def set_exposure_time(self, exposure_time: float) -> None:
"""Set the detector framerate.
Args:
exposure_time (float): Desired exposure time in [sec]
"""
if exposure_time is not None:
self.parent.acqExpTime.set(exposure_time).wait()
def prepare_detector(self) -> None:
"""Prepare detector for acquisition.
State machine:
BUSY and SET both low -> BUSY high, SET low -> BUSY low, SET high -> BUSY low, SET low
"""
self.camSetParam.set(1).wait()
def risingEdge(*args, old_value, value, timestamp, **kwargs):
return bool(not old_value and value)
def fallingEdge(*args, old_value, value, timestamp, **kwargs):
return bool(old_value and not value)
# Subscribe and wait for update
status = SubscriptionStatus(self.camSetParam, fallingEdge, settle_time=0.5)
status.wait()
def arm_acquisition(self) -> None:
"""Arm camera for acquisition"""
# Acquisition is only allowed when the IOC is not busy
if self.parent.state in ("OFFLINE", "BUSY", "REMOVED", "RUNNING"):
raise RuntimeError(f"Camera in in state: {self.parent.state}")
# Start the acquisition (this sets parameers and starts acquisition)
self.parent.camStatusCmd.set("Running").wait()
# Subscribe and wait for update
def isRunning(*args, old_value, value, timestamp, **kwargs):
return bool(self.state=="RUNNING")
status = SubscriptionStatus(self.parent.camStatusCode, isRunning, settle_time=0.2)
status.wait()
def stop_detector(self) -> None:
self.camStatusCmd.set("Idle").wait()
# Subscribe and wait for update
def isIdle(*args, old_value, value, timestamp, **kwargs):
return bool(value==2)
status = SubscriptionStatus(self.parent.camStatusCode, isIdle, settle_time=0.5)
status.wait()
def send_data(self) -> None:
"""Send data to monitor endpoint in redis."""
try:
img = self.parent.image.get()
# pylint: disable=protected-access
self.parent._run_subs(sub_type=self.parent.SUB_VALUE, value=img)
except Exception as e:
logger.debug(f"{e} for image with shape {self.parent.image.get().shape}")
def monitor_loop(self) -> None:
"""
Monitor the detector status and send data.
"""
while True:
self.send_data()
time.sleep(1 / self.update_frequency)
if self.parent.state != "RUNNING":
break
if self.stop_monitor:
break
def run_monitor(self) -> None:
"""
Run the monitor loop in a separate thread.
"""
self.monitor_thread = threading.Thread(target=self.monitor_loop, daemon=True)
self.monitor_thread.start()
class HelgeCameraCore(Device): class HelgeCameraCore(Device):
@ -23,6 +119,10 @@ class HelgeCameraCore(Device):
""" """
# Specify Mixin class
custom_prepare_cls = HelgeCameraMixin
USER_ACCESS = ["kickoff"] USER_ACCESS = ["kickoff"]
# ######################################################################## # ########################################################################
# General hardware info # General hardware info
@ -49,6 +149,8 @@ class HelgeCameraCore(Device):
camStateString = Component(EpicsSignalRO, "SS_CAMERA", string=True, auto_monitor=True, kind=Kind.config) camStateString = Component(EpicsSignalRO, "SS_CAMERA", string=True, auto_monitor=True, kind=Kind.config)
image = Component(EpicsSignalRO, "FPICTURE", kind=Kind.omitted)
@property @property
def state(self) -> str: def state(self) -> str:
""" Single word camera state""" """ Single word camera state"""
@ -73,51 +175,47 @@ class HelgeCameraCore(Device):
if self.state in ["OFFLINE", "REMOVED", "RUNNING"]: if self.state in ["OFFLINE", "REMOVED", "RUNNING"]:
raise RuntimeError(f"Can't change configuration from state {self.state}") raise RuntimeError(f"Can't change configuration from state {self.state}")
def stage(self) -> None: def stage(self) -> list[object]:
""" Start acquisition""" """ Start acquisition"""
self.custom_prepare.arm_acquisition()
return super().stage()
# Acquisition is only allowed when the IOC is not busy
if self.state in ("OFFLINE", "BUSY", "REMOVED", "RUNNING"):
raise RuntimeError(f"Camera in in state: {self.state}")
# Start the acquisition (this sets parameers and starts acquisition) def kickoff(self) -> DeviceStatus:
self.camStatusCmd.set("Running").wait()
# Subscribe and wait for update
def isRunning(*args, old_value, value, timestamp, **kwargs):
return bool(self.state=="RUNNING")
status = SubscriptionStatus(self.camStatusCode, isRunning, settle_time=0.2)
status.wait()
# Call parent
super().stage()
def kickoff(self, settle_time=0.2) -> DeviceStatus:
""" Start acquisition""" """ Start acquisition"""
return self.stage()
# Acquisition is only allowed when the IOC is not busy
if self.state in ("OFFLINE", "BUSY", "REMOVED", "RUNNING"):
raise RuntimeError(f"Camera in in state: {self.state}")
# Start the acquisition
self.camStatusCmd.set("Running").wait()
# Subscribe and wait for update
def isRunning(*args, old_value, value, timestamp, **kwargs):
# result = bool(value==6 and self.camInit.value==1)
result = bool(self.state=="RUNNING")
return result
status = SubscriptionStatus(self.camStatusCode, isRunning, settle_time=0.2)
return status
def stop(self): def stop(self):
""" Stop the running acquisition """ """ Stop the running acquisition """
self.camStatusCmd.set("Idle").wait() self.camStatusCmd.set("Idle").wait()
super().unstage() self.custom_prepare.stop_monitor = True
return super().unstage()
def unstage(self): def unstage(self):
""" Stop the running acquisition and unstage the device""" """ Stop the running acquisition and unstage the device"""
self.camStatusCmd.set("Idle").wait() self.camStatusCmd.set("Idle").wait()
super().unstage() self.custom_prepare.stop_monitor = True
return super().unstage()
@ -147,6 +245,8 @@ class HelgeCameraBase(HelgeCameraCore):
""" """
USER_ACCESS = ["describe", "shape", "bin", "roi"] USER_ACCESS = ["describe", "shape", "bin", "roi"]
# ######################################################################## # ########################################################################
# Additional status info # Additional status info