Feature/phoenix trigger #4
@@ -1,2 +1 @@
|
||||
from .phoenix_trigger import PhoenixTrigger
|
||||
from .dummy_devices import Dummy_PSIDetector
|
||||
|
||||
@@ -13,26 +13,27 @@ from bec_lib import messages
|
||||
from bec_lib.endpoints import MessageEndpoints
|
||||
from bec_lib.file_utils import FileWriter
|
||||
from bec_lib.logger import bec_logger
|
||||
from ophyd import Component, Device, DeviceStatus, Kind
|
||||
from ophyd import Component
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import Device, DeviceStatus, EpicsSignal, EpicsSignalRO
|
||||
from ophyd import FormattedComponent as FCpt
|
||||
from ophyd import Kind
|
||||
from ophyd.device import Staged
|
||||
|
||||
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
|
||||
CustomDetectorMixin,
|
||||
PSIDetectorBase,
|
||||
)
|
||||
from ophyd_devices.sim.sim_signals import SetableSignal
|
||||
from ophyd_devices.utils import bec_utils
|
||||
from ophyd_devices.utils.bec_scaninfo_mixin import BecScaninfoMixin
|
||||
from ophyd_devices.utils.errors import DeviceStopError, DeviceTimeoutError
|
||||
from ophyd_devices.interfaces.base_classes.psi_detector_base import PSIDetectorBase, CustomDetectorMixin
|
||||
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import FormattedComponent as FCpt
|
||||
from ophyd import Device, EpicsSignal, EpicsSignalRO
|
||||
|
||||
from phoenix_bec.scripts.phoenix import PhoenixBL
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
|
||||
#class LogTime():
|
||||
# class LogTime():
|
||||
|
||||
# def __init__(self):
|
||||
# self.t0=time.time()
|
||||
@@ -48,8 +49,8 @@ logger = bec_logger.logger
|
||||
# file.close
|
||||
|
||||
|
||||
p_s = PhoenixBL.my_log
|
||||
|
||||
p_s=PhoenixBL.my_log
|
||||
|
||||
class DetectorInitError(Exception):
|
||||
"""Raised when initiation of the device class fails,
|
||||
@@ -74,8 +75,10 @@ class SetupDummy(CustomDetectorMixin):
|
||||
self.parent = parent
|
||||
|
||||
def on_init(self) -> None:
|
||||
"""
|
||||
def on_stage(self) -> None:e is writing data on disk, this step should include publishing
|
||||
""" """
|
||||
|
||||
def on_stage(self) -> None:
|
||||
"""e is writing data on disk, this step should include publishing
|
||||
a file_event and file_message to BEC to inform the system where the data is written to.
|
||||
|
||||
IMPORTANT:
|
||||
@@ -203,27 +206,27 @@ class SetupDummy(CustomDetectorMixin):
|
||||
exception_on_timeout: Exception = None,
|
||||
) -> DeviceStatus:
|
||||
"""Utility function to wait for signals in a thread.
|
||||
Returns a DevicesStatus object that resolves either to set_finished or set_exception.
|
||||
The DeviceStatus is attached to the parent device, i.e. the detector object inheriting from PSIDetectorBase.
|
||||
Returns a DevicesStatus object that resolves either to set_finished or set_exception.
|
||||
The DeviceStatus is attached to the parent device, i.e. the detector object inheriting from PSIDetectorBase.
|
||||
|
||||
Usage:
|
||||
This function should be used to wait for signals to reach a certain condition, especially in the context of
|
||||
on_trigger and on_complete. If it is not used, functions may block and slow down the performance of BEC.
|
||||
It will return a DeviceStatus object that is to be returned from the function. Once the conditions are met,
|
||||
the DeviceStatus will be set to set_finished in case of success or set_exception in case of a timeout or exception.
|
||||
The exception can be specified with the exception_on_timeout argument. The default exception is a TimeoutError.
|
||||
Usage:
|
||||
This function should be used to wait for signals to reach a certain condition, especially in the context of
|
||||
on_trigger and on_complete. If it is not used, functions may block and slow down the performance of BEC.
|
||||
It will return a DeviceStatus object that is to be returned from the function. Once the conditions are met,
|
||||
the DeviceStatus will be set to set_finished in case of success or set_exception in case of a timeout or exception.
|
||||
The exception can be specified with the exception_on_timeout argument. The default exception is a TimeoutError.
|
||||
|
||||
Args:
|
||||
signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check
|
||||
timeout (float): timeout in seconds
|
||||
check_stopped (bool): T t_offset = 1724683600 # subtract some arbtrary offset from the time value
|
||||
rue if stopped flag should be checked
|
||||
interval (float): interval in seconds
|
||||
all_signals (bool): True if all signals should be True, False if any signal should be True
|
||||
exception_on_timeout (Exception): Exception to raise on timeout
|
||||
Args:
|
||||
signal_conditions (list[tuple]): tuple of executable calls for conditions (get_current_state, condition) to check
|
||||
timeout (float): timeout in seconds
|
||||
check_stopped (bool): T t_offset = 1724683600 # subtract some arbtrary offset from the time value
|
||||
rue if stopped flag should be checked
|
||||
interval (float): interval in seconds
|
||||
all_signals (bool): True if all signals should be True, False if any signal should be True
|
||||
exception_on_timeout (Exception): Exception to raise on timeout
|
||||
|
||||
Returns:
|
||||
DeviceStatus: DeviceStatus object that resolves either to set_finished or set_exception
|
||||
Returns:
|
||||
DeviceStatus: DeviceStatus object that resolves either to set_finished or set_exception
|
||||
"""
|
||||
if exception_on_timeout is None:
|
||||
exception_on_timeout = DeviceTimeoutError(
|
||||
@@ -308,19 +311,17 @@ class Dummy_PSIDetector(PSIDetectorBase):
|
||||
|
||||
filepath = Component(SetableSignal, value="", kind=Kind.config)
|
||||
|
||||
custom_prepare_cls = SetupDummy
|
||||
custom_prepare_cls = SetupDummy
|
||||
|
||||
#prefix=X07MB-PC-PSCAN
|
||||
|
||||
|
||||
D = Cpt(EpicsSignal, 'P-P0D0') # cont on / off
|
||||
# prefix=X07MB-PC-PSCAN
|
||||
|
||||
D = Cpt(EpicsSignal, "P-P0D0") # cont on / off
|
||||
|
||||
def __init__(self, prefix="", *, name, kind=None, parent=None, device_manager=None, **kwargs):
|
||||
|
||||
self.p_s=PhoenixBL.my_log #must be before super!!!
|
||||
self.p_s = PhoenixBL.my_log # must be before super!!!
|
||||
|
||||
self.p_s('Dummy_device Dummy_PSIDetector.__init__ ')
|
||||
self.p_s("Dummy_device Dummy_PSIDetector.__init__ ")
|
||||
|
||||
super().__init__(prefix=prefix, name=name, kind=kind, parent=parent, **kwargs)
|
||||
|
||||
@@ -345,28 +346,25 @@ class Dummy_PSIDetector(PSIDetectorBase):
|
||||
self._update_scaninfo()
|
||||
self._update_filewriter()
|
||||
self._init()
|
||||
#.. prepare my own log file
|
||||
|
||||
self.p_s('Dummy_device Dummy_PSIDetector.__init__ .. done ')
|
||||
# .. prepare my own log file
|
||||
|
||||
self.p_s("Dummy_device Dummy_PSIDetector.__init__ .. done ")
|
||||
|
||||
def _update_filewriter(self) -> None:
|
||||
"""Update filewriter with service config"""
|
||||
self.p_s('Dummy_device Dummy_PSIDetector._update_filewriter')
|
||||
self.p_s("Dummy_device Dummy_PSIDetector._update_filewriter")
|
||||
self.filewriter = FileWriter(service_config=self.service_cfg, connector=self.connector)
|
||||
self.p_s('Dummy_device Dummy_PSIDetector._update_filewriter .. done ')
|
||||
|
||||
|
||||
self.p_s("Dummy_device Dummy_PSIDetector._update_filewriter .. done ")
|
||||
|
||||
def _update_scaninfo(self) -> None:
|
||||
"""Update scaninfo from BecScaninfoMixing
|
||||
This depends on device manager and operation/sim_mode
|
||||
"""
|
||||
self.p_s('Dummy_device Dummy_PSIDetector._update_scaninfo')
|
||||
self.p_s("Dummy_device Dummy_PSIDetector._update_scaninfo")
|
||||
|
||||
self.scaninfo = BecScaninfoMixin(self.device_manager)
|
||||
self.scaninfo.load_scan_metadata()
|
||||
self.p_s('Dummy_device Dummy_PSIDetector._update_scaninfo .. done ')
|
||||
self.p_s("Dummy_device Dummy_PSIDetector._update_scaninfo .. done ")
|
||||
|
||||
def _update_service_config(self) -> None:
|
||||
"""Update service config from BEC service config
|
||||
@@ -376,32 +374,31 @@ class Dummy_PSIDetector(PSIDetectorBase):
|
||||
# pylint: disable=import-outside-toplevel
|
||||
|
||||
from bec_lib.bec_service import SERVICE_CONFIG
|
||||
self.p_s('Dummy_device Dummy_PSIDetector._update_service_config')
|
||||
|
||||
self.p_s("Dummy_device Dummy_PSIDetector._update_service_config")
|
||||
|
||||
if SERVICE_CONFIG:
|
||||
self.service_cfg = SERVICE_CONFIG.config["service_config"]["file_writer"]
|
||||
return
|
||||
self.service_cfg = {"base_path": os.path.abspath(".")}
|
||||
self.p_s('Dummy_device Dummy_PSIDetector._update_service_config .. done')
|
||||
self.p_s("Dummy_device Dummy_PSIDetector._update_service_config .. done")
|
||||
|
||||
def check_scan_id(self) -> None:
|
||||
"""Checks if scan_id has changed and set stopped flagged to True if it has."""
|
||||
self.p_s('Dummy_device Dummy_PSIDetector.check_scan_id')
|
||||
self.p_s("Dummy_device Dummy_PSIDetector.check_scan_id")
|
||||
|
||||
old_scan_id = self.scaninfo.scan_id
|
||||
self.scaninfo.load_scan_metadata()
|
||||
if self.scaninfo.scan_id != old_scan_id:
|
||||
self.stopped = True
|
||||
self.p_s('Dummy_device Dummy_PSIDetector.check_scan_id .. done ')
|
||||
|
||||
self.p_s("Dummy_device Dummy_PSIDetector.check_scan_id .. done ")
|
||||
|
||||
def _init(self) -> None:
|
||||
"""Initialize detector, filewriter and set default parameters"""
|
||||
self.p_s('Dummy_device Dummy_PSIDetector._init')
|
||||
self.p_s("Dummy_device Dummy_PSIDetector._init")
|
||||
|
||||
self.custom_prepare.on_init()
|
||||
self.p_s('Dummy_device Dummy_PSIDetector._init ... done ')
|
||||
|
||||
self.p_s("Dummy_device Dummy_PSIDetector._init ... done ")
|
||||
|
||||
def stage(self) -> list[object]:
|
||||
"""
|
||||
@@ -414,15 +411,14 @@ class Dummy_PSIDetector(PSIDetectorBase):
|
||||
list(object): list of objects that were staged
|
||||
|
||||
"""
|
||||
self.p_s('Dummy_device Dummy_PSIDetector.stage')
|
||||
self.p_s("Dummy_device Dummy_PSIDetector.stage")
|
||||
|
||||
if self._staged != Staged.no:
|
||||
return super().stage()
|
||||
self.stopped = False
|
||||
self.scaninfo.load_scan_metadata()
|
||||
self.custom_prepare.on_stage()
|
||||
self.p_s('Dummy_device Dummy_PSIDetector.stage done ')
|
||||
|
||||
self.p_s("Dummy_device Dummy_PSIDetector.stage done ")
|
||||
|
||||
return super().stage()
|
||||
|
||||
@@ -433,22 +429,21 @@ class Dummy_PSIDetector(PSIDetectorBase):
|
||||
time-critical actions. Therefore, it should also be kept as short/fast as possible.
|
||||
I.e. Arming a detector in case there is a risk of timing out.
|
||||
"""
|
||||
self.p_s('Dummy_device Dummy_PSIDetector.pre_scan')
|
||||
self.p_s("Dummy_device Dummy_PSIDetector.pre_scan")
|
||||
|
||||
self.custom_prepare.on_pre_scan()
|
||||
self.p_s('Dummy_device Dummy_PSIDetector.pre_scan .. done ')
|
||||
|
||||
self.p_s("Dummy_device Dummy_PSIDetector.pre_scan .. done ")
|
||||
|
||||
def trigger(self) -> DeviceStatus:
|
||||
"""Trigger the detector, called from BEC."""
|
||||
|
||||
# pylint: disable=assignment-from-no-return
|
||||
self.p_s('Dummy_device Dummy_PSIDetector.trigger')
|
||||
self.p_s("Dummy_device Dummy_PSIDetector.trigger")
|
||||
|
||||
status = self.custom_prepare.on_trigger()
|
||||
if isinstance(status, DeviceStatus):
|
||||
return status
|
||||
self.p_s('Dummy_device Dummy_PSIDetector.trigger.. done ')
|
||||
self.p_s("Dummy_device Dummy_PSIDetector.trigger.. done ")
|
||||
|
||||
return super().trigger()
|
||||
|
||||
@@ -461,14 +456,14 @@ class Dummy_PSIDetector(PSIDetectorBase):
|
||||
Actions are implemented in custom_prepare.on_complete since they are beamline specific.
|
||||
"""
|
||||
# pylint: disable=assignment-from-no-return
|
||||
self.p_s('Dummy_device Dummy_PSIDetector.complete')
|
||||
self.p_s("Dummy_device Dummy_PSIDetector.complete")
|
||||
|
||||
status = self.custom_prepare.on_complete()
|
||||
if isinstance(status, DeviceStatus):
|
||||
return status
|
||||
status = DeviceStatus(self)
|
||||
status.set_finished()
|
||||
self.p_s('Dummy_device Dummy_PSIDetector.complete ... done ')
|
||||
self.p_s("Dummy_device Dummy_PSIDetector.complete ... done ")
|
||||
|
||||
return status
|
||||
|
||||
@@ -484,11 +479,11 @@ class Dummy_PSIDetector(PSIDetectorBase):
|
||||
Returns:
|
||||
list(object): list of objects that were unstaged
|
||||
"""
|
||||
self.p_s('Dummy_device Dummy_PSIDetector.unstage')
|
||||
self.p_s("Dummy_device Dummy_PSIDetector.unstage")
|
||||
self.check_scan_id()
|
||||
self.custom_prepare.on_unstage()
|
||||
self.stopped = False
|
||||
self.p_s('Dummy_device Dummy_PSIDetector.unstage .. done')
|
||||
self.p_s("Dummy_device Dummy_PSIDetector.unstage .. done")
|
||||
|
||||
return super().unstage()
|
||||
|
||||
@@ -497,9 +492,8 @@ class Dummy_PSIDetector(PSIDetectorBase):
|
||||
Stop the scan, with camera and file writer
|
||||
|
||||
"""
|
||||
self.p_s('Dummy_device Dummy_PSIDetector.stop')
|
||||
self.p_s("Dummy_device Dummy_PSIDetector.stop")
|
||||
self.custom_prepare.on_stop()
|
||||
super().stop(success=success)
|
||||
self.stopped = True
|
||||
self.p_s('Dummy_device Dummy_PSIDetector.stop ... done')
|
||||
|
||||
self.p_s("Dummy_device Dummy_PSIDetector.stop ... done")
|
||||
|
||||
@@ -1,18 +1,18 @@
|
||||
import time
|
||||
""" Module for the PhoenixTrigger class to connect to the ADC card
|
||||
that creates TTL signals to trigger cameras and detectors at Phoenix. """
|
||||
|
||||
import enum
|
||||
import numpy as np
|
||||
import time
|
||||
|
||||
import numpy as np
|
||||
from bec_lib import bec_logger
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import DeviceStatus, EpicsSignal, EpicsSignalRO, Kind
|
||||
|
||||
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
|
||||
PSIDetectorBase,
|
||||
CustomDetectorMixin,
|
||||
PSIDetectorBase,
|
||||
)
|
||||
|
||||
from bec_lib import bec_logger
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
DETECTOR_TIMEOUT = 5
|
||||
@@ -22,7 +22,7 @@ class PhoenixTriggerError(Exception):
|
||||
"""PhoenixTrigger specific error"""
|
||||
|
||||
|
||||
class SAMPLINGDONE(int, enum.Enum):
|
||||
class SAMPLING(int, enum.Enum):
|
||||
"""Sampling Done PV"""
|
||||
|
||||
RUNNING = 0
|
||||
@@ -31,13 +31,11 @@ class SAMPLINGDONE(int, enum.Enum):
|
||||
|
||||
class PhoenixTriggerSetup(CustomDetectorMixin):
|
||||
"""
|
||||
This defines the PHOENIX trigger setup.
|
||||
|
||||
|
||||
Mixin Class to setup the PhoenixTrigger device
|
||||
"""
|
||||
|
||||
def on_stage(self) -> None:
|
||||
"""Actions to take place on stage"""
|
||||
"""On stage actions which are executed upon staging the device"""
|
||||
if self.parent.scaninfo.scan_type == "step":
|
||||
self.parent.start_csmpl.set(0)
|
||||
self.parent.total_cycles.set(1)
|
||||
@@ -47,19 +45,21 @@ class PhoenixTriggerSetup(CustomDetectorMixin):
|
||||
logger.info(f"Device {self.parent.name} was staged for step scan")
|
||||
|
||||
def on_unstage(self) -> None:
|
||||
"""Actions to take place on unstage"""
|
||||
"""On unstage actions which are executed upon unstaging the device"""
|
||||
self.on_stop()
|
||||
|
||||
def on_trigger(self) -> DeviceStatus:
|
||||
"""Actions to be performed upon receiving a software trigger"""
|
||||
"""On trigger actions which are executed upon triggering the device"""
|
||||
# TODO Test the proper check for the falcon state
|
||||
# Check first that falcon is set to acquiring
|
||||
falcon = self.parent.device_manager.devices.get("falcon_nohdf5", None)
|
||||
timeout = 1
|
||||
if falcon is not None:
|
||||
if self.wait_for_signals([(falcon.state.get, 1)], timeout=1):
|
||||
# TODO Check that falcon.state.get() == 1 is the correct check. --> When is the falcon acquiring, this assumes 1?
|
||||
if not self.wait_for_signals([(falcon.state.get, 1)], timeout=timeout):
|
||||
raise PhoenixTriggerError(
|
||||
f"Falcon not ready to take trigger after 1s timeout in trigger"
|
||||
f"Device {self.parent.name} is not ready to take trigger, timeout due to waiting for Falcon to get ready. Timeout after {timeout}s"
|
||||
)
|
||||
falcon.state.get() == 1 # Acquiring
|
||||
if self.parent.scaninfo.scan_type == "step":
|
||||
time.sleep(0.2)
|
||||
self.parent.smpl.put(1)
|
||||
@@ -67,7 +67,7 @@ class PhoenixTriggerSetup(CustomDetectorMixin):
|
||||
time.sleep(0.2)
|
||||
# Trigger function from ophyd.Device returns a DeviceStatus. This function
|
||||
# starts a process that creates a DeviceStatus, and waits for the signal_conditions
|
||||
# self.parent.smpl_done.get to change to the value SAMPLINGDONE.DONE
|
||||
# self.parent.smpl_done.get to change to the value SAMPLING.DONE
|
||||
# Once this takes place, the DeviceStatus.done flag will be set to True.
|
||||
# When BEC calls trigger() on the devices, this method will be called assuming that
|
||||
# the devices config softwareTrigger=True is set.
|
||||
@@ -75,7 +75,7 @@ class PhoenixTriggerSetup(CustomDetectorMixin):
|
||||
# self.stubs.wait(wait_type="trigger", group="trigger", wait_time=self.exp_time)
|
||||
# which ensures that the DeviceStatus object resolves before continuing, i.e. DeviceStatus.done = True
|
||||
status = self.wait_with_status(
|
||||
signal_conditions=[(self.parent.smpl_done.get, SAMPLINGDONE.DONE)],
|
||||
signal_conditions=[(self.parent.smpl_done.get, SAMPLING.DONE)],
|
||||
timeout=5 * self.parent.scaninfo.exp_time, # Check if timeout is appropriate
|
||||
check_stopped=True,
|
||||
)
|
||||
@@ -83,24 +83,23 @@ class PhoenixTriggerSetup(CustomDetectorMixin):
|
||||
|
||||
def on_stop(self) -> None:
|
||||
"""Actions to stop the Device"""
|
||||
# Put the Device in cont mode
|
||||
# Put the Device again in continous acquisition mode
|
||||
self.parent.total_cycles.set(5)
|
||||
self.parent.start_csmpl.set(1)
|
||||
self.parent.smpl.put(1)
|
||||
time.sleep(0.5)
|
||||
self.parent.smpl.put(1)
|
||||
time.sleep(0.2)
|
||||
if self.parent.smpl_done.get() == SAMPLINGDONE.RUNNING:
|
||||
if self.parent.smpl_done.get() == SAMPLING.RUNNING:
|
||||
return
|
||||
self.parent.smpl.put(1)
|
||||
|
||||
|
||||
class PhoenixTrigger(PSIDetectorBase):
|
||||
"""
|
||||
Docstring:
|
||||
|
||||
Class for PHOENIX TTL hardware trigger (X07MB-OP2:)
|
||||
Class for PHOENIX TTL hardware trigger: 'X07MB-OP2:'
|
||||
|
||||
This device is used to trigger communicate with an ADC card that creates TTL signals to trigger cameras and detectors at Phoenix.
|
||||
"""
|
||||
|
||||
custom_prepare_cls = PhoenixTriggerSetup
|
||||
@@ -123,15 +122,13 @@ class PhoenixTrigger(PSIDetectorBase):
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Test the PhoenixTrigger class
|
||||
trigger = PhoenixTrigger(name="trigger", prefix="X07MB-OP2:")
|
||||
trigger.wait_for_connection(all_signals=True)
|
||||
trigger.read()
|
||||
trigger.read_configuration()
|
||||
|
||||
trigger.stage()
|
||||
status = trigger.trigger()
|
||||
while status.done is False:
|
||||
print(f" Waiting for status, flag is {status.done}")
|
||||
time.sleep(0.2)
|
||||
|
||||
device_status = trigger.trigger()
|
||||
device_status.wait()
|
||||
trigger.unstage()
|
||||
|
||||
@@ -23,90 +23,87 @@ but they are executed in a specific order:
|
||||
"""
|
||||
|
||||
# imports in ScanBase
|
||||
#from __future__ import annotations
|
||||
# from __future__ import annotations
|
||||
|
||||
#import ast
|
||||
#import enum
|
||||
#import threading
|
||||
#import time
|
||||
#import uuid
|
||||
#from abc import ABC, abstractmethod
|
||||
#from typing import Any, Literal
|
||||
# import ast
|
||||
# import enum
|
||||
# import threading
|
||||
# import time
|
||||
# import uuid
|
||||
# from abc import ABC, abstractmethod
|
||||
# from typing import Any, Literal
|
||||
|
||||
#import numpy as np
|
||||
# import numpy as np
|
||||
|
||||
#from bec_lib.device import DeviceBase
|
||||
#from bec_lib.devicemanager import DeviceManagerBase
|
||||
#from bec_lib.endpoints import MessageEndpoints
|
||||
#from bec_lib.logger import bec_logger
|
||||
# from bec_lib.device import DeviceBase
|
||||
# from bec_lib.devicemanager import DeviceManagerBase
|
||||
# from bec_lib.endpoints import MessageEndpoints
|
||||
# from bec_lib.logger import bec_logger
|
||||
|
||||
#from .errors import LimitError, ScanAbortion
|
||||
#from .path_optimization import PathOptimizerMixin
|
||||
#from .scan_stubs import ScanStubs
|
||||
# from .errors import LimitError, ScanAbortion
|
||||
# from .path_optimization import PathOptimizerMixin
|
||||
# from .scan_stubs import ScanStubs
|
||||
# end imports in ScanBase
|
||||
|
||||
# import time
|
||||
|
||||
# import numpy as np
|
||||
|
||||
import time
|
||||
|
||||
import numpy as np
|
||||
|
||||
# from bec_lib.endpoints import MessageEndpoints
|
||||
from bec_lib.logger import bec_logger
|
||||
from bec_server.scan_server.scans import ScanArgType, ScanBase
|
||||
|
||||
from phoenix_bec.scripts.phoenix import PhoenixBL
|
||||
|
||||
# from bec_lib import messages
|
||||
# from bec_server.scan_server.errors import ScanAbortion
|
||||
# from bec_server.scan_server.scans import FlyScanBase, RequestBase, ScanArgType, ScanBase
|
||||
|
||||
# logger = bec_logger.logger
|
||||
|
||||
from bec_server.scan_server.scans import ScanBase, ScanArgType
|
||||
import numpy as np
|
||||
import time
|
||||
from bec_lib.logger import bec_logger
|
||||
from phoenix_bec.scripts.phoenix import PhoenixBL
|
||||
|
||||
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
class LogTime():
|
||||
class LogTime:
|
||||
|
||||
def __init__(self):
|
||||
logger.success('init LogTime')
|
||||
self.t0=time.time()
|
||||
logger.success("init LogTime")
|
||||
self.t0 = time.time()
|
||||
|
||||
def p_s(self,x):
|
||||
now=time.time()
|
||||
#delta=now-self.t0
|
||||
m=str(now)+' sec '+x
|
||||
logger.success(m)custom_prepare_cls(parent=self, **kwargs)
|
||||
# making the instance of PSID
|
||||
#self.t0=now
|
||||
file=open('MyLogfile.txt','a')
|
||||
file.write(m+'\n')
|
||||
def p_s(self, x):
|
||||
now = time.time()
|
||||
# delta=now-self.t0
|
||||
m = str(now) + " sec " + x
|
||||
logger.success(m)
|
||||
# making the instance of PSID
|
||||
# self.t0=now
|
||||
file = open("MyLogfile.txt", "a")
|
||||
file.write(m + "\n")
|
||||
file.close
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
class PhoenixScanBaseTTL(ScanBase):
|
||||
"""
|
||||
Base scan cl p_s('init scrips.phoenix.scans.PhoenixLineScan')
|
||||
"""
|
||||
|
||||
|
||||
|
||||
def scan_core(self):
|
||||
"""perform the scan core procedure"""
|
||||
self.p_s('PhoenixScanBaseTT.scan_core')
|
||||
self.p_s("PhoenixScanBaseTT.scan_core")
|
||||
for ind, pos in self._get_position():
|
||||
for self.burst_index in range(self.burst_at_each_point):
|
||||
self.p_s('PhoenixScanBaseTT.scan_core in loop ')
|
||||
self.p_s("PhoenixScanBaseTT.scan_core in loop ")
|
||||
|
||||
yield from self._at_each_point(ind, pos)
|
||||
self.burst_index = 0
|
||||
|
||||
def _at_each_point(self, ind=None, pos=None):
|
||||
self.p_s('PhoenixScanBaseTT._at_each_point')
|
||||
self.p_s("PhoenixScanBaseTT._at_each_point")
|
||||
yield from self._move_scan_motors_and_wait(pos)
|
||||
if ind > 0:
|
||||
yield from self.stubs.wait(
|
||||
@@ -123,12 +120,11 @@ class PhoenixScanBaseTTL(ScanBase):
|
||||
)
|
||||
|
||||
self.point_id += 1
|
||||
self.p_s('done')
|
||||
self.p_s("done")
|
||||
|
||||
|
||||
class PhoenixLineScan(PhoenixScanBaseTTL):
|
||||
|
||||
|
||||
|
||||
scan_name = "phoenix_line_scan"
|
||||
required_kwargs = ["steps", "relative"]
|
||||
arg_input = {
|
||||
@@ -149,25 +145,25 @@ class PhoenixLineScan(PhoenixScanBaseTTL):
|
||||
steps: int = None,
|
||||
relative: bool = False,
|
||||
burst_at_each_point: int = 1,
|
||||
setup_device:str= None,
|
||||
setup_device: str = None,
|
||||
**kwargs,
|
||||
):
|
||||
"""
|
||||
A phoenix line scan for one or more motors.
|
||||
A phoenix line scan for one or more motors.
|
||||
|
||||
Args:
|
||||
*args (Device, float, float): pairs of device / start position / end position
|
||||
exp_time (float): exposure time in s. Default: 0
|
||||
steps (int): number of steps. Default: 10
|
||||
relative (bool): if True, the start and end positions are relative to the current position. Default: False
|
||||
burst_a Specifies the level of type checking analysis to perform.
|
||||
ans.line_scan(dev.motor1, -5, 5, dev.motor2, -5, 5, steps=10, exp_time=0.1, relative=True)
|
||||
Args:
|
||||
*args (Device, float, float): pairs of device / start position / end position
|
||||
exp_time (float): exposure time in s. Default: 0
|
||||
steps (int): number of steps. Default: 10
|
||||
relative (bool): if True, the start and end positions are relative to the current position. Default: False
|
||||
burst_a Specifies the level of type checking analysis to perform.
|
||||
ans.line_scan(dev.motor1, -5, 5, dev.motor2, -5, 5, steps=10, exp_time=0.1, relative=True)
|
||||
|
||||
"""
|
||||
#from phoenix_bec.scripts.phoenix import PhoenixBL
|
||||
self.p_s=PhoenixBL.my_log
|
||||
# from phoenix_bec.scripts.phoenix import PhoenixBL
|
||||
self.p_s = PhoenixBL.my_log
|
||||
|
||||
self.p_s('init scripts.phoenix.scans.PhoenixLineScan')
|
||||
self.p_s("init scripts.phoenix.scans.PhoenixLineScan")
|
||||
|
||||
super().__init__(
|
||||
exp_time=exp_time, relative=relative, burst_at_each_point=burst_at_each_point, **kwargs
|
||||
@@ -176,14 +172,13 @@ ans.line_scan(dev.motor1, -5, 5, dev.motor2, -5, 5, steps=10, exp_time=0.1, rela
|
||||
self.setup_device = setup_device
|
||||
|
||||
time.sleep(1)
|
||||
self.p_s('done')
|
||||
self.p_s("done")
|
||||
|
||||
def _calculate_positions(self) -> None:
|
||||
self.p_s('PhoenixLineScan._calculate_positions')
|
||||
self.p_s("PhoenixLineScan._calculate_positions")
|
||||
axis = []
|
||||
for _, val in self.caller_args.items():
|
||||
ax_pos = np.linspace(val[0], val[1], self.steps, dtype=float)
|
||||
axis.append(ax_pos)
|
||||
self.positions = np.array(list(zip(*axis)), dtype=float)
|
||||
self.p_s('done')
|
||||
|
||||
self.p_s("done")
|
||||
|
||||
97
tests/tests_devices/test_phoenix_trigger.py
Normal file
97
tests/tests_devices/test_phoenix_trigger.py
Normal file
@@ -0,0 +1,97 @@
|
||||
import threading
|
||||
import time
|
||||
from unittest import mock
|
||||
|
||||
import numpy as np
|
||||
import ophyd
|
||||
import pytest
|
||||
from bec_server.device_server.tests.utils import DMMock
|
||||
from ophyd_devices.interfaces.base_classes.psi_detector_base import DeviceTimeoutError
|
||||
from ophyd_devices.tests.utils import MockPV, patch_dual_pvs
|
||||
|
||||
from phoenix_bec.devices.phoenix_trigger import SAMPLING, PhoenixTrigger
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def mock_trigger():
|
||||
name = "phoenix_trigger"
|
||||
prefix = "X07MB-OP2:"
|
||||
dm = DMMock()
|
||||
with mock.patch.object(dm, "connector"):
|
||||
with (
|
||||
mock.patch(
|
||||
"ophyd_devices.interfaces.base_classes.psi_detector_base.FileWriter"
|
||||
) as filemixin,
|
||||
mock.patch(
|
||||
"ophyd_devices.interfaces.base_classes.psi_detector_base.PSIDetectorBase._update_service_config"
|
||||
) as mock_service_config,
|
||||
):
|
||||
with mock.patch.object(ophyd, "cl") as mock_cl:
|
||||
mock_cl.get_pv = MockPV
|
||||
mock_cl.thread_class = threading.Thread
|
||||
with mock.patch.object(PhoenixTrigger, "_init"):
|
||||
det = PhoenixTrigger(name=name, prefix=prefix, device_manager=dm)
|
||||
patch_dual_pvs(det)
|
||||
det.TIMEOUT_FOR_SIGNALS = 0.1
|
||||
yield det
|
||||
|
||||
|
||||
def test_phoenix_trigger_init(mock_trigger):
|
||||
"""Test PhoenixTrigger init"""
|
||||
assert mock_trigger.name == "phoenix_trigger"
|
||||
assert mock_trigger.prefix == "X07MB-OP2:"
|
||||
|
||||
|
||||
def test_phoenix_trigger_stage(mock_trigger):
|
||||
"""Test PhoenixTrigger on_stage"""
|
||||
with mock.patch.object(mock_trigger.scaninfo, "load_scan_metadata") as mock_load_scan_metadata:
|
||||
mock_trigger.scaninfo.scan_type = "step"
|
||||
mock_trigger.scaninfo.exp_time = exp_time = 1
|
||||
mock_trigger.stage()
|
||||
assert mock_load_scan_metadata.call_count == 1
|
||||
assert mock_trigger.start_csmpl.get() == 0
|
||||
assert mock_trigger.total_cycles.get() == np.ceil(exp_time * 5)
|
||||
assert mock_trigger.smpl.get() == 1
|
||||
|
||||
|
||||
def test_phoenix_trigger_unstage(mock_trigger):
|
||||
"""Test PhoenixTrigger on_unstage"""
|
||||
with mock.patch.object(mock_trigger.custom_prepare, "on_stop") as mock_on_stop:
|
||||
mock_trigger.unstage()
|
||||
assert mock_on_stop.call_count == 1
|
||||
|
||||
|
||||
def test_phoenix_trigger_stop(mock_trigger):
|
||||
"""Test PhoenixTrigger on_stop"""
|
||||
with mock.patch.object(mock_trigger.smpl, "put") as mock_smpl_put:
|
||||
mock_trigger.smpl_done._read_pv.mock_data = SAMPLING.RUNNING
|
||||
mock_trigger.stop()
|
||||
assert mock_trigger.stopped is True
|
||||
assert mock_trigger.total_cycles.get() == 5
|
||||
assert mock_trigger.start_csmpl.get() == 1
|
||||
assert mock_smpl_put.call_args_list == [mock.call(1), mock.call(1)]
|
||||
|
||||
|
||||
def test_phoenix_trigger_trigger(mock_trigger):
|
||||
"""Test PhoenixTrigger on_trigger
|
||||
|
||||
First test that the trigger timeouts due to readback from smpl_done not being done.
|
||||
Afterwards, check that status object resolved correctly if smpl_done is done.
|
||||
"""
|
||||
exp_time = 0.05
|
||||
mock_trigger.device_manager.add_device("falcon_nohdf5")
|
||||
falcon_state = mock_trigger.device_manager.devices.falcon_nohdf5.state = mock.MagicMock()
|
||||
falcon_state.get = mock.MagicMock(return_value=1)
|
||||
mock_trigger.scaninfo.scan_type = "step"
|
||||
mock_trigger.scaninfo.exp_time = exp_time
|
||||
|
||||
with mock.patch.object(
|
||||
mock_trigger.custom_prepare, "wait_with_status", return_value=mock.MagicMock()
|
||||
) as mock_wait_with_status:
|
||||
status = mock_trigger.trigger()
|
||||
assert mock_wait_with_status.call_count == 1
|
||||
assert mock_wait_with_status.call_args[1]["signal_conditions"] == [
|
||||
(mock_trigger.smpl_done.get, SAMPLING.DONE)
|
||||
]
|
||||
assert mock_wait_with_status.call_args[1]["timeout"] == 5 * exp_time
|
||||
assert mock_wait_with_status.call_args[1]["check_stopped"] is True
|
||||
Reference in New Issue
Block a user