add ph_ names to xmap to ease finding of epics channels

This commit is contained in:
gac-x07mb
2024-12-19 18:41:55 +01:00
parent c021972d97
commit cd81030430
10 changed files with 256 additions and 53 deletions

View File

@ -133,6 +133,23 @@ MA1_TRX1:
#
#
PP2_VO5:
readoutPriority: baseline
description: Chamber light
deviceClass: ophyd.EpicsSignal
deviceConfig:
auto_monitor: true
read_pv: 'X07MB-ES1-PP2:VO5'
deviceTags:
- PHOENIX
- phoenix_devices.yaml
- class EpicsSignal
onFailure: buffer
enabled: true
readOnly: false
softwareTrigger: false
SAI_01_MEAN:
readoutPriority: baseline
description: DIODE SAI01

View File

@ -17,18 +17,20 @@ from ophyd_devices.interfaces.base_classes.psi_detector_base import (
PSIDetectorBase,
)
from ophyd.mca import EpicsDXP
#from ophyd_devices.devices.dxp import xMAP, EpicsMCARecord
from phoenix_bec.devices.dxp_loc import xMAP, EpicsMCARecord
from ophyd_devices.devices.dxp import xMAP, EpicsMCARecord
#from phoenix_bec.devices.dxp_loc import xMAP, EpicsMCARecord
from ophyd_devices.devices.areadetector.plugins import HDF5Plugin_V35 as HDF5Plugin
logger = bec_logger.logger
#bec_logger.level = bec_logger.LOGLEVEL.TRACE
# bec_logger.level = bec_logger.LOGLEVEL.TRACE
bec_logger.level = bec_logger.LOGLEVEL.INFO
class XMAPError(Exception):
"""Base class for exceptions in this module."""
@ -86,8 +88,9 @@ class XMAPSetup(CustomDetectorMixin):
- value_pixel_per_buffer (int): number of spectra in buffer of XMAP
"""
self.parent.value_pixel_per_buffer = 20
self.update_readout_time()
# self.parent.value_pixel_per_buffer = 20
# self.update_readout_time()
pass
def update_readout_time(self) -> None:
"""Set readout time for Eiger9M detector"""
@ -100,6 +103,7 @@ class XMAPSetup(CustomDetectorMixin):
def initialize_detector(self) -> None:
"""Initialize XMAP detector"""
"""
self.stop_detector()
self.stop_detector_backend()
self.set_trigger(
@ -113,28 +117,46 @@ class XMAPSetup(CustomDetectorMixin):
self.parent.auto_pixels_per_buffer.put(0)
# Sets the number of pixels/spectra in the buffer
self.parent.pixels_per_buffer.put(self.parent.value_pixel_per_buffer)
"""
pass
def initialize_detector_backend(self) -> None:
"""Initialize the detector backend for XMAP."""
w=0
#----------------------------------------------------------------------
#self.parent.hdf5.enable.put(1)
"""
Initialize the detector backend for XMAP.
currently no function
"""
pass
# ----------------------------------------------------------------------
# self.parent.hdf5.enable.put(1)
# file location of h5 layout for cSAXS
#self.parent.hdf5.xml_file_name.put("layout.xml")
# self.parent.hdf5.xml_file_name.put("layout.xml")
# TODO Check if lazy open is needed and wanted!
#self.parent.hdf5.lazy_open.put(1)
#self.parent.hdf5.temp_suffix.put("")
# size of queue for number of spectra allowed in the buffer, if too small at high throughput, data is lost
#self.parent.hdf5.queue_size.put(2000)
# self.parent.hdf5.lazy_open.put(1)
# self.parent.hdf5.temp_suffix.put("")
# size of queue for number of spectra a time.sleep(0.05)llowed in the buffer, if too small at high throughput, data is lost
# self.parent.hdf5.queue_size.put(2000)
# Segmentation into Spectra within EPICS, 1 is activate, 0 is deactivate
#self.parent.nd_array_mode.put(1)
# self.parent.nd_array_mode.put(1)
def on_stage(self) -> None:
"""Prepare detector and backend for acquisition"""
self.prepare_detector()
self.prepare_data_backend()
self.publish_file_location(done=False, successful=False)
self.arm_acquisition()
# staging for XMAP as used in FDA
self.parent.stop_all.set(1)
time.sleep(0.05)
self.parent.collect_mode.set(0)
time.sleep(0.05)
self.parent.apply(0)
time.sleep(0.05)
self.parent.preset_real.set(0)
time.sleep(0.05)
# self.prepare_detector()
# self.prepare_data_backend()
# self.publish_file_location(done=False, successful=False)
# self.arm_acquisition() time.sleep(0.05)
def prepare_detector(self) -> None:
"""Prepare detector for acquisition"""
@ -148,7 +170,7 @@ class XMAPSetup(CustomDetectorMixin):
def prepare_data_backend(self) -> None:
"""Prepare data backend for acquisition"""
w=9
pass
""" --------------------------------------------------------------
self.parent.filepath.set(
self.parent.filewriter.compile_full_filename(f"{self.parent.name}.h5")
@ -192,27 +214,29 @@ class XMAPSetup(CustomDetectorMixin):
def on_complete(self) -> None:
"""Complete detector and backend"""
#------------------------------------------------------------------
#self.finished(timeout=self.parent.TIMEOUT_FOR_SIGNALS)
#self.publish_file_location(done=True, successful=True)
w=9
# ------------------------------------------------------------------
# self.finished(timeout=self.parent.TIMEOUT_FOR_SIGNALS)
# self.publish_file_location(done=True, successful=True)
w = 9
def on_stop(self) -> None:
"""Stop detector and backend"""
self.stop_detector()
#self.stop_detector_backend()
# self.stop_detector_backend()
def stop_detector(self) -> None:
"""Stops detector"""
self.parent.stop_all.put(1)
self.parent.erase_all.put(1)
#-------------------------------------------------------------------
#signal_conditions = [
# -------------------------------------------------------------------
# signal_conditions = [
# (lambda: self.parent.acquiring.read()[self.parent.acquiring.name]["value"], DetectorState.DONE)
#]stage2 = StageXY(prefix='X07MB',name='-ES-MA1', name='stage2')
# ]stage2 = StageXY(prefix='X07MB',name='-ES-MA1', name='stage2')
# timeout=self.parent.TIMEOUT_FOR_SIGNALS - self.parent.TIMEOUT_FOR_SIGNALS // 2,
# all_signals=False,
#):
# ):
# # Retry stop detector and wait for remaining time
# raise XMAPTimeoutError(
# f"Failed to stop detector, timeout with state {signal_conditions[0][0]}"
@ -220,8 +244,8 @@ class XMAPSetup(CustomDetectorMixin):
def stop_detector_backend(self) -> None:
"""Stop the detector backend"""
#self.parent.hdf5.capture.put(0)
w=0
# self.parent.hdf5.capture.put(0)
w = 0
def finished(self, timeout: int = 5) -> None:
"""Check if scan finished succesfully"""
@ -280,7 +304,7 @@ class XMAPPhoenix(PSIDetectorBase, xMAP):
"""
# Specify which functions are revealed to the user in BEC client
USER_ACCESS = ["describe"]
USER_ACCESS = ["describe", "ep"]
# specify Setup class
custom_prepare_cls = XMAPSetup
@ -289,13 +313,26 @@ class XMAPPhoenix(PSIDetectorBase, xMAP):
TIMEOUT_FOR_SIGNALS = 5
dxp1 = Cpt(EpicsDXP, "dxp1:")
#dxp2 = Cpt(EpicsDXP, "dxp2:")
#dxp3 = Cpt(EpicsDXP, "dxp3:")
#dxp4 = Cpt(EpicsDXP, "dxp4:")
# dxp2 = Cpt(EpicsDXP, "dxp2:")
# dxp3 = Cpt(EpicsDXP, "dxp3:")
# dxp4 = Cpt(EpicsDXP, "dxp4:")
mca1 = Cpt(EpicsMCARecord, "mca1")
#mca2 = Cpt(EpicsMCARecord, "mca2")
#mca3 = Cpt(EpicsMCARecord, "mca3")
#mca4 = Cpt(EpicsMCARecord, "mca4")
# mca2 = Cpt(EpicsMCARecord, "mca2")
# mca3 = Cpt(EpicsMCARecord, "mca3")
# mca4 = Cpt(EpicsMCARecord, "mca4")
hdf5 = Cpt(HDF5Plugin, "HDF1:")
# create some easy to find names for frequently used channels
ph_erase_start = xMAP.erase_start
ph_start_all = xMAP.start_all
ph_stop_all = xMAP.stop_all
ph_erase_all = xMAP.erase_all
ph_collect_mode = xMAP.collect_mode
ph_preset_mode = xMAP.preset_mode
ph_preset_real = xMAP.preset_real_time
ph_preset_live = xMAP.preset_live_time
ph_elapsed_real = xMAP.elapsed_real
ph_elapsed_live = xMAP.elapsed_live

View File

@ -0,0 +1,49 @@
# against all rues, make sure ff and falcon are really
# creates newly
ff = 0
falcon = 0
from ophyd import Component as Cpt
import phoenix_bec.devices.phoenix_xmap as ff
xmap = ff.XMAPPhoenix(name="xmap", prefix="X07MB-XMAP:")
# xmap = ff.FalconPhoenix(name="falcon_hdf5", prefix="X07MB-XMAP:")
# make a 'get to read all epics channels
# there will be an error message, if device contains a channel whcih does not exist
w = xmap.read()
# phoenix_bec / local_scripts / Code_to_test_devices / test_falcon.py
# for attr in falcon.hdf5.component_names:
# if not attr.startswith("_"):
# print(attr)
# signal = getattr(falcon, attr)
# signal.get()
"""
print(this_scan.scan.data.keys())
for outer_key in this_scan.scan.data.keys():
print("outer_key", outer_key)
n_outer = len(this_scan.scan.data.keys())
for inner_key in this_scan.scan.data[outer_key].keys():
print("inner_key", inner_key)
# calculate nunber of points
n_inner = len(this_scan.scan.data[outer_key][inner_key].keys())
value = np.zeros(n_inner)
timestamp = np.zeros(n_inner)
for i in range(n_inner):
try:
value[i] = this_scan.scan.data[outer_key][inner_key][i]["value"]
except:
value = None
try:
timestamp[i] = this_scan.scan.data[outer_key][inner_key][i]["timestamp"]
except:
timestamp[i] = None
# endfor
self.add(inner_key + "_" + outer_key + "_val", value)
self.add(innerprint("test")
"""

View File

@ -0,0 +1,26 @@
from ophyd import Device, EpicsMotor, EpicsSignal, EpicsSignalRO
from ophyd import Component as Cpt
# option I via direct acces to classes
def print_dic(clname, cl):
print("")
print("-------- ", clname)
for ii in cl.__dict__:
if "_" not in ii:
try:
print(ii, " ---- ", cl.__getattribute__(ii))
except:
print(ii)
ScanX = EpicsMotor(name="ScanX", prefix="X07MB-ES-MA1:ScanX")
ScanX_RBV = EpicsMotor(name="ScanX", prefix="X07MB-ES-MA1:ScanX.RBV")
ScanY = EpicsMotor(name="ScanY", prefix="X07MB-ES-MA1:ScanY")
DIODE = EpicsSignal(name="SI", read_pv="X07MB-OP2-SAI_07:MEAN")
SMPL = EpicsSignal(name="SMPL", read_pv="X07MB-OP2:SMPL")
CYCLES = EpicsSignal(
name="SMPL", read_pv="X07MB-OP2:TOTAL-CYCLES", write_pv="X07MB-OP2:TOTAL-CYCLES"
)

View File

@ -18,6 +18,7 @@ from ophyd import Device, EpicsMotor, EpicsSignal, EpicsSignalRO
# from ophyd.pv_positioner import PVPositionerComparator
# from ophyd.status import DeviceStatus, SubscriptionStatus
import time as tt
# import ophyd
@ -39,23 +40,29 @@ print("---------------------------------")
# scan will not diode
print(" SCAN DO NOT READ DIODE ")
dev.SAI_01_MEAN.readout_priority = "baseline" # do not read detector
dev.SAI_01_MEAN.readout_priority = "monitored" # do not read detector
ti = tt.time_ns()
phoenix.run_shell("sh monitor.sh > monitor.out & ")
s1 = scans.line_scan(dev.MA1_ScanX, 0, 0.002, steps=4, exp_time=1, relative=False, delay=2)
tf = tt.time_ns()
dev.PP2_VO5.enabled = True
dev.MA1_ScanX.enabled = True
"""
s1 = scans.line_scan(
dev.MA1_ScanX, 0, 0.1, dev.PP2_VO5, 0, 5, steps=4, exp_time=1, relative=False, delay=2
)
"""
s1 = scans.line_scan(dev.MA1_ScanX, 0, 0.1, steps=4, exp_time=1, relative=False, delay=2)
tf = tt.time_ns()
dev.PH_TTL.start_csmpl.put(0)
print("elapsed time", (tf - ti) / 1e9)
# scan will read diode
print(" SCAN READ DIODE ")
tt.sleep(2)
dev.SAI_01_MEAN.readout_priority = "monitored" # read detector
s2 = scans.line_scan(dev.MA1_ScanX, 0, 0.002, steps=11, exp_time=0.01, relative=False, delay=2)
dev.MA1_ScanX.enabled = False
# next lines do not work as pandas is not installed on test system
@ -64,7 +71,3 @@ re1 = res1.to_numpy()
print("Scana")
print(res1)
print("")
print("Scan2 at pandas ")
print(res2)
print("Scan2 as numpy ")
print(res2)

View File

@ -0,0 +1,72 @@
"""
test script for linescans
"""
# from unittest import mock
import numpy as np
# import pandas
# import pytest
# from bec_lib import messages
# import device_server
# from ophyd import Component as Cpt
from ophyd import Device, EpicsMotor, EpicsSignal, EpicsSignalRO
# from ophyd import FormattedComponent as FCpt
# from ophyd import Kind, PVPositioner, Signal
# from ophyd.flyers import FlyerInterface
# from ophyd.pv_positioner import PVPositionerComparator
# from ophyd.status import DeviceStatus, SubscriptionStatus
import time as tt
# import ophyd
import os
import sys
# logger = bec_logger.logger
# load simulation
# bec.config.load_demo_config()
# bec.config.update_session_with_file("config/config_1.yaml")
# create PHOENIX base configuration
phoenix.create_base_config()
dev.MA1_ScanX.enabled = True
print("---------------------------------")
# scan will not diode
print(" SCAN DO NOT READ DIODE ")
dev.SAI_01_MEAN.readout_priority = "baseline" # do not read detector
ti = tt.time_ns()
phoenix.run_shell("sh monitor.sh > monitor.out & ")
asdhjk
s1 = scans.line_scan(dev.MA1_ScanX, 0, 0.002, steps=4, exp_time=1, relative=False, delay=2)
tf = tt.time_ns()
print("elapsed time", (tf - ti) / 1e9)
# scan will read diode
print(" SCAN READ DIODE ")
tt.sleep(2)
dev.SAI_01_MEAN.readout_priority = "monitored" # read detector
s2 = scans.line_scan(dev.MA1_ScanX, 0, 0.002, steps=11, exp_time=0.01, relative=False, delay=2)
dev.MA1_ScanX.enabled = False
# next lines do not work as pandas is not installed on test system
res1 = s1.scan.to_pandas()
re1 = res1.to_numpy()
print("Scana")
print(res1)
print("")
print("Scan2 at pandas ")
print(res2)
print("Scan2 as numpy ")
print(res2)

View File

@ -1,2 +1 @@
camonitor X07MB-ES-MA1:ScanX.VAL X07MB-ES-MA1:ScanX.RBV X07MB-OP2:START-CSMPL X07MB-OP2:SMPL X07MB-OP2:INTR-COUNT X07MB-XMAP:StartAll X07MB-XMAP:EraseStart X07MB-XMAP:StopAll
W