add try to put full scan logig into one single device

This commit is contained in:
gac-x07mb
2024-12-21 15:25:42 +01:00
parent 03789c34f4
commit 77403aea1e
6 changed files with 508 additions and 32 deletions

View File

@ -10,11 +10,24 @@ PH_TTL:
- phoenix_devices.yaml - phoenix_devices.yaml
- class PhoenixTrigger - class PhoenixTrigger
onFailure: buffer onFailure: buffer
enabled: true enabled: false
readoutPriority: monitored readoutPriority: monitored
softwareTrigger: true softwareTrigger: true
PH_SCAN_1:
description: PHOENIX Scan_1 (Experimental)
deviceClass: phoenix_bec.devices.phoenix_scan_1.PhoenixScan_1
deviceConfig:
prefix: 'X07MB-OP2:'
deviceTags:
- phoenix
- TTL Trigger
- phoenix_devices.yaml
- class PhoenixTrigger
onFailure: buffer
enabled: false
readoutPriority: monitored
softwareTrigger: true
################################################### ###################################################
# #
# phoenix standard devices (motors) # phoenix standard devices (motors)

View File

@ -0,0 +1,433 @@
"""
This module serve to test whether we can abuse the device concept to
Define the scan logic in a device which collects EPICS Channels whic operate the
dat aaquisition from several devices, such as
such as trigger channel, xmap falcon, etc.
Aim is to define the data aquisition logic in the on_trigger() routine in this file,
but leave data reading in the device definitions.
Hops is to create an easy to maintain/ edit setup.
The module is bases on the PhoenixTrigger class to connect to the ADC card
that creates TTL signals to trigger cameras and detectors at Phoenix.
TO Do
-- allow for variable dwell times
-- add erase/Start for XMAP and FALCON
-- check values for time.sleep()
-- check in on_triggerthe status check for Falcon
-- rework togther with Xiaoquiang the functionality of involved EPICS channels
(Callbacks etc.) to minimize the need of sleeping times.
"""
import enum
import time
import numpy as np
from bec_lib import bec_logger
from ophyd import Component as Cpt
from ophyd import DeviceStatus, EpicsSignal, EpicsSignalRO, Kind, Device
from ophyd_devices.interfaces.base_classes.psi_detector_base import (
CustomDetectorMixin,
PSIDetectorBase,
)
from ophyd_devices.devices.dxp import xMAP
#, EpicsMCARecord
#from phoenix_bec.scripts.phoenix import PhoenixBL
logger = bec_logger.logger
DETECTOR_TIMEOUT = 5
"""
#class ph_Logger():
# PHOENIX Logger to my Logfile #
# p_s = PhoenixBL.my_log
"""
class PhoenixTriggerError(Exception):
"""PhoenixTrigger specific error"""
class SAMPLING(int, enum.Enum):
"""Sampling Done PV
This class serves redabilty of missinx class and ensure correct setting of
certain conditions.
defiend preset values for certain states to be called in the
mixing class, where we for example check whether the sampling is done of not
by comparing with SAMPLING.DONE
xxx==SAMPLING.DONE
"""
RUNNING = 0
DONE = 1
class PhoenixScan_1Setup(CustomDetectorMixin):
"""
Mixin Class to setup the PhoenixTrigger device
"""
def on_stage(self) -> None:
"""
On stage actions which are executed upon staging the device
"""
if self.parent.scaninfo.scan_type == "step": # check whether we use step scanning
###############
# next lines ensure that TTL trigger is on single sampling mode (posible )
##############
self.parent.start_csmpl.set(0)
time.sleep(0.1)
self.parent.total_cycles.set(1)
self.parent.smpl.put(1)
time.sleep(0.5)
#####
# set sampling to dwell time of scan
######
self.parent.total_cycles.set(np.ceil(self.parent.scaninfo.exp_time * 5))
logger.info(f"Device {self.parent.name} was staged for step scan")
def on_unstage(self) -> None:
"""On unstage actions which are executed upon unstaging the device"""
self.on_stop()
def on_trigger(self) -> DeviceStatus:
"""On trigger actions which are executed upon triggering the device"""
# TODO Test the proper check for the falcon state
# Check first that falcon is set to acquiring
# check for falcon
print('on_trigger')
falcon = self.parent.device_manager.devices.get("falcon", None) # get device
timeout = 20
print('ss')
print('Check for falcon')
if falcon is not None:
# TODO Check that falcon.aquiring.get() == 1 is the correct check.
# --> When is the falcon acquiring, this assumes 1?
# self.wait_for_signals is defined in PSI_detector_base.CustomDetectorMixin
# ALSO check for enabled or not here!!!
#
if not self.wait_for_signals([(falcon.acquiring.get, 1)], timeout=timeout):
raise PhoenixTriggerError(
f"Device {self.parent.name} is not ready to take trigger, timeout due to waiting for Falcon to get ready. Timeout after {timeout}s"
)
#endif
else:
print('Falcon is None')
#endif
print(self.parent.simulate_step_logic)
if ( self.parent.scaninfo.scan_type == "step") or self.parent.simulate_step_logic==True:
print('stepscan')
time.sleep(0.2)
self.parent.smpl.put(1)
self.parent.xmap_erase_start.put(1)
# Minimum of 1 cycle has to be waited. Cycle == 0.2s
time.sleep(0.2)
# Trigger function from ophyd.Device returns a DeviceStatus. This function
# starts a process that creates a DeviceStatus, and waits for the signal_conditions
# self.parent.smpl_done.get to change to the value SAMPLING.DONE
# Once this takes place, the DeviceStatus.done flag will be set to True.
# When BEC calls trigger() on the devices, this method will be called assuming that
# the devices config softwareTrigger=True is set.
# In ScanBase, the _at_each_point function calls
# self.stubs.wait(wait_type="trigger", group="trigger", wait_time=self.exp_time)
# which ensures that the DeviceStatus object resolves before continuing,
# i.e. DeviceStatus.done = True
#calling this does not seem to wait, but seem sto call on_stop
#status = self.wait_with_status(
# signal_conditions=[(self.parent.smpl_done.get, SAMPLING.DONE)],
# timeout=5 * self.parent.scaninfo.exp_time, # Check if timeout is appropriate
# check_stopped=True,
#)
print('dd',self.parent.smpl_done.get())
# explanation of last line (self.parent.smpl_done.get, SAMPLINGDONE.DONE)
# creates a tuple which defines a
# condition, which is tested in self.wait_with_status, here it tests for :
# (self.parent.smpl_done.get() == SAMPLINGDONE.DONE),
# where SAMPLINGDONE.DONE =1, as set in code above
# As this is in mixing class (PhoenixtriggerSetup), parent.sample_done is defined in
# main class as smpl_done = Cpt(EpicsSignalRO, "SMPL-DONE", kind=Kind.config)
# the wait for status did not work so explicit, make own routine later
#ii=0
#while (self.parent.smpl_done.get() == 0 and ii< 10000):
# time.sleep(0.05)
# ii=ii+1
# print(ii,self.parent.smpl_done.get())
##endwhile
self.parent.xmap_stop_all.put(1)
print('leave on trigger')
time.sleep(4)
return
#return status # should this be in if clause level or outside?
# endif
def on_stop(self) -> None:
"""
Actions to stop the Device
Here the device is switched back to continuous sampling.
"""
print('phoenix_scan_1 on_stop')
self.parent.total_cycles.set(5)
self.parent.start_csmpl.set(1)
time.sleep(0.5)
self.parent.smpl.put(1)
time.sleep(0.5)
self.parent.smpl.put(1)
time.sleep(0.2)
if self.parent.smpl_done.get() == SAMPLING.RUNNING:
return
self.parent.smpl.put(1)
class PhoenixScan_1(PSIDetectorBase):
"""
Class for PHOENIX TTL hardware trigger: 'X07MB-OP2:'
This device is used to trigger communicate with an ADC card
that creates TTL signals to trigger cameras and detectors at Phoenix.
"""
##################################################################
#
# The Variable USER_ACCESS contains an ascii list of functions which will be
# visible in dev.TTL. Here, this list is empty.
# note that components are alway public
#
##################################################################
USER_ACCESS = ["describe", "help"]
####################################################################
#
# # specify Setup class into variable custom_prepare_cls
#
####################################################################
custom_prepare_cls = PhoenixScan_1Setup
###################################################################
# in __init__ of PSIDetectorBase will the initialzed by
# self.custom_prepare = self.custom_prepare_cls(parent=self, **kwargs)
# making the instance of PSIDetectorBase availble to functions
# in PhoenixTriggerSetup.
#
# This inherits, among other things, the input parameters, such as
# the notable prefix, which is here 'X07MB-OP2:'
#
# The input of Component=Cpt is Cpt(deviceClass,suffix)
# if Cpt is used in a class, which has interited Device, here via:
#
# (Here PhoenixTrigger <-- PSIDetectorBase <- Device
#
# intr_count = Cpt(
# EpicsSignal, "INTR-COUN
# then Cpt will construct - magically- the
# Epics channel name = prefix+suffix,
#
# for example
# 'X07MB-OP2:' + 'START-CSMPL' -> 'X07MB-OP2:' + 'START-CSMPL'
#
# to construct names, for now we keep the convention to derive
# them from the EPICS NAMES (such as ) X07MB-OP2:SMPL-DONE --> smpl_done
#
# this mean access to channel using dev.PH_TTL.smpl_done.get()
#
#
###################################################################
simulate_step_logic=True # set to true for debugging should be false in operation
# SIGNALS FOR TRIGGER
start_csmpl = Cpt(
EpicsSignal, "START-CSMPL", kind=Kind.config, put_complete=True
) # cont on / off
# intr_count = Cpt(
# EpicsSignal, "INTR-COUNT", kind=Kind.config, put_complete=True
# ) # conter run up
intr_count = Cpt(EpicsSignal, "INTR-COUNT") # counter run up NEEDS TO BE READ OUT !!!!
total_cycles = Cpt(
EpicsSignal, "TOTAL-CYCLES", kind=Kind.config, put_complete=True
) # cycles set
smpl = Cpt(
EpicsSignal, "SMPL", kind=Kind.config, put_complete=True
) # start sampling --> aquire
csmpl = Cpt(
EpicsSignal, "START-CSMPL", kind=Kind.config, put_complete=True
) # start sampling --> aquire
smpl_done = Cpt(
EpicsSignalRO, "SMPL-DONE", kind=Kind.config
) # show trigger is done, consider using string=True
# create subset in name spacs
ph_start_csmpl = start_csmpl
ph_intr_count = intr_count
ph_total_cycles = total_cycles
ph_smpl = smpl
ph_csmpl = csmpl
ph_smpl_done = smpl_done
##################################################################
# channels for xmap
##################################################################
class xmap_local(Device):
"""
subclass to PhoenixScan_1
local class for xmap controll
consider importing general xmap setup ??
less channels might be less risk free and faster!
cannot import on higher level as it takes wron prefix,
possibly import here and set prefic in def __init__
for now we keep it simple
"""
erase_start = Cpt(EpicsSignal,'EraseStart',kind=Kind.config)
start_all = Cpt(EpicsSignal,'StartAll',kind=Kind.config)
stop_all = Cpt(EpicsSignal,'StopAll',kind=Kind.config)
erase_all = Cpt(EpicsSignal,'EraseAll',kind=Kind.config)
acquiring = Cpt(EpicsSignal,'Acquiring',kind=Kind.config)
#endclass
xmap_control=xmap_local(name='ddd',prefix='X07MB-XMAP:')
xmap_erase_start = xmap_control.erase_start
xmap_start_all = xmap_control.start_all
xmap_stop_all = xmap_control.stop_all
xmap_erase_all = xmap_control.erase_all
xmap_acquiring = xmap_control.acquiring
ph_xmap_erase_start = xmap_erase_start
ph_xmap_start_all = xmap_start_all
ph_xmap_stop_all = xmap_stop_all
ph_xmap_erase_all = xmap_erase_all
ph_xmap_aquiring = xmap_acquiring
def help(self):
"""
Help function for phoenix_trigger
"""
help_text = """
PHOENIXScan_1
HELP NT CORRECT NEEDSA to be updated
signal attributes vs Epics Channels
description device Epics Channel
attribute
Cont sampling on/off .start_csmpl ~ X07MB-OP2:SMPL
1: continous
0: continous off
Counter run up .intr_count ~ " :INTR-COUNT
Cycles set .total_cycles ~ " :TOTAL-CYCLES
1 cycle = 200 ms
Start sampling (aquiring) .smpl ~ " :SMPL
Show trigger is done .smpl_done ~ " :SMPL-DONE
1: aquiring
0: not aquiring
Assuming device is called dev.PH_TTL
Read Channel dev.PH_TTL.start_csmpl.get()
Write into Channel dev.PH_TTL.total_cycles.put(1)
dev.PH_TTL.total_cycles.set(1)
For further general help try the attributes
.describe() considered as stable dict
.describe_confguration()
.summary()
._config ._info
Important channel are collected with prefix ph, to ease finding of the channels
for example
dev.PH_TTl.ph_total_cycles = dev.PH_TTl.total_cycles
.. etc
"""
print("Name of device is", self.name)
print("")
print(help_text)
# end def help
if __name__ == "__main__":
# Test the PhoenixTrigger class
trigger = PhoenixScan_1(name="Scan_1", prefix="X07MB-OP2:")
trigger.wait_for_connection(all_signals=True)
trigger.read()
trigger.read_configuration()
trigger.stage()
device_status = trigger.trigger()
device_status.wait()
trigger.unstage()

View File

@ -24,12 +24,18 @@ from ophyd_devices.interfaces.base_classes.psi_detector_base import (
CustomDetectorMixin, CustomDetectorMixin,
PSIDetectorBase, PSIDetectorBase,
) )
#from phoenix_bec.scripts.phoenix import PhoenixBL
logger = bec_logger.logger logger = bec_logger.logger
DETECTOR_TIMEOUT = 5 DETECTOR_TIMEOUT = 5
"""
#class ph_Logger():
# PHOENIX Logger to my Logfile #
# p_s = PhoenixBL.my_log
"""
class PhoenixTriggerError(Exception): class PhoenixTriggerError(Exception):
"""PhoenixTrigger specific error""" """PhoenixTrigger specific error"""
@ -93,9 +99,11 @@ class PhoenixTriggerSetup(CustomDetectorMixin):
# Check first that falcon is set to acquiring # Check first that falcon is set to acquiring
# check for falcon # check for falcon
print('on_trigger')
falcon = self.parent.device_manager.devices.get("falcon", None) # get device falcon = self.parent.device_manager.devices.get("falcon", None) # get device
timeout = 2 timeout = 20
print('ss')
if falcon is not None: if falcon is not None:
# TODO Check that falcon.aquiring.get() == 1 is the correct check. # TODO Check that falcon.aquiring.get() == 1 is the correct check.
# --> When is the falcon acquiring, this assumes 1? # --> When is the falcon acquiring, this assumes 1?
@ -103,31 +111,34 @@ class PhoenixTriggerSetup(CustomDetectorMixin):
# ALSO check for enabled or not here!!! # ALSO check for enabled or not here!!!
# #
if not self.wait_for_signals([(falcon.aquiring.get, 1)], timeout=timeout): if not self.wait_for_signals([(falcon.acquiring.get, 1)], timeout=timeout):
raise PhoenixTriggerError( raise PhoenixTriggerError(
f"Device {self.parent.name} is not ready to take trigger, timeout due to waiting for Falcon to get ready. Timeout after {timeout}s" f"Device {self.parent.name} is not ready to take trigger, timeout due to waiting for Falcon to get ready. Timeout after {timeout}s"
) )
#endif
#endif
xmap = self.parent.device_manager.devices.get("xmap", None) # get device xmap = self.parent.device_manager.devices.get("xmap", None) # get device
timeout = 2
timeout = 20 # 21.17 change to 20 sec
print('ss2')
if xmap is not None: if xmap is not None:
print('xmap erase start')
# TODO Check that falcon.aquiring.get() == 1 is the correct check. xmap.erase_start.put(1) # .. should be in phoenix_xmap?
# --> When is the falcon acquiring, this assumes 1?
# self.wait_for_signals is defined in PSI_detector_base.CustomDetectorMixin
#
print('xmap erase start') print('xmap erase start')
xmap.erase_start.set(1) # .. should be in phoenix_xmap? ww=xmap.acquiring.get()
print(ww)
print('xmap erase start')
print(xmap.acquiring.get())
time.sleep(0.1) time.sleep(0.1)
print(xmap.acquiring.get()) w2=xmap.acquiring.get()
print(w2)
if not self.wait_for_signals([(xmap.acquiring.get, 1)], timeout=timeout): if not self.wait_for_signals([(xmap.acquiring.get, 1)], timeout=timeout):
raise PhoenixTriggerError( raise PhoenixTriggerError(

View File

@ -5,3 +5,5 @@ from ophyd import Component as Cpt
import phoenix_bec.devices.phoenix_trigger as pt import phoenix_bec.devices.phoenix_trigger as pt
trig = pt.PhoenixTrigger(name="phoenixTrigger", prefix="X07MB-OP2:") trig = pt.PhoenixTrigger(name="phoenixTrigger", prefix="X07MB-OP2:")
trig.describe()
trig.custom_prepare.on_trigger

View File

@ -0,0 +1,19 @@
import time
from phoenix_bec.scripts.phoenix import PhoenixBL
from ophyd import Component as Cpt
import phoenix_bec.devices.phoenix_scan_1 as sc_1
ss = sc_1.PhoenixScan_1(name="phoenixTrigger", prefix="X07MB-OP2:")
print('s')
ss.describe()
print('o')
ss.ph_xmap_stop_all.put(1)
ss.csmpl.set(0)
time.sleep(1.4)
ss.custom_prepare.on_trigger()

View File

@ -25,39 +25,37 @@ import time as tt
import os import os
import sys import sys
# logger = bec_logger.logger logger = bec_logger.logger
# load simulation # load simulation
# bec.config.load_demo_config() # bec.config.load_demo_config()
# bec.config.update_session_with_file("config/config_1.yaml") # bec.config.update_session_with_file("config/config_1.yaml")
# create PHOENIX base configuration # create PHOENIX base configuration
load_data=True
phoenix.create_base_config() if load_data == True:
phoenix.add_xmap() phoenix.create_base_config()
phoenix.add_xmap()
#endif
bec.queue.request_queue_reset()
dev.MA1_ScanX.enabled = True dev.MA1_ScanX.enabled = True
dev.xmap.enabled=True dev.xmap.enabled=True
dev.xmap.unstage() # needed in case scan went wrong
print("---------------------------------") print("---------------------------------")
# scan will not diode # scan will not diode
print(" SCAN DO NOT READ DIODE ") print(" SCAN DO NOT READ DIODE ")
dev.SAI_01_MEAN.readout_priority = "monitored" # do not read detector dev.SAI_01_MEAN.readout_priority = "monitored" # do not read detector
ti = tt.time_ns() ti = tt.time_ns()
phoenix.run_shell("sh monitor.sh > monitor.out & ")
dev.PP2_VO5.enabled = True
dev.MA1_ScanX.enabled = True
#dev.xmap.enabled = True
"""
s1 = scans.line_scan( s1 = scans.line_scan(
dev.MA1_ScanX, 0, 0.1, dev.PP2_VO5, 0, 5, steps=4, exp_time=1, relative=False, delay=2 dev.PP2_VO5, 0, 1, steps=4, exp_time=1, relative=False, delay=2)
)
"""
s1 = scans.line_scan(dev.MA1_ScanX, 0, 0.1, steps=4, exp_time=1, relative=False, delay=2)
#s1 = scans.line_scan(dev.MA1_ScanX, 0, 0.1, steps=4, exp_time=1, relative=False, delay=2)
tf = tt.time_ns() tf = tt.time_ns()
dev.PH_TTL.start_csmpl.put(0) dev.PH_TTL.start_csmpl.put(0)