Files
phoenix_bec/phoenix_bec/scripts/phoenix.py
2024-12-17 17:02:39 +01:00

416 lines
12 KiB
Python

"""
General collection of calsses for PHEONIX beamline
"""
# from unittest import mock
import os
# import sys
import time
import numpy as np
# import pandas
# import pytest
# from bec_lib import messages
# import device_server
# from ophyd import Component as Cpt
# from ophyd import Device, EpicsMotor, EpicsSignal, EpicsSignalRO
# from ophyd import FormattedComponent as FCpt
# from ophyd import Kind, PVPositioner, Signal
# from ophyd.flyers import FlyerInterface
# from ophyd.pv_positioner import PVPositionerComparator
# from ophyd.status import DeviceStatus, SubscriptionStatus
from bec_lib.config_helper import ConfigHelper
from bec_lib.logger import bec_logger
from phoenix_bec.scripts.phoenix_help import PhoenixHelp
logger = bec_logger.logger
# import ophyd
# logger = bec_logger.logger
# load simulation
# bec.config.load_demo_config()
# .. define base path for directory with scripts
class Utilities:
"""
Utiliy class for PHOENIX
"""
def __init__(self):
print("init phoenix.Utilities")
# setattr(self, "description", description)
# atribute 'label' for compatibility woith La groups...
# setattr(self, "label", description)
# if type(NameTag)==list:
# for i in NameTag:
# setattr(self,i,None)
# #endfor
# else:
# setattr(self,NameTag,None)
# endif
def list_signals_(self, cls=None, check=False):
"""
Create a list of all signals in a device
including name of Epics channel
"""
print("List of all Signals")
print("......try also attributes")
print(" .describe_configuration()")
print(" ._info")
try:
name = cls.name
n_name = len(name)
except:
name = " "
try:
config = cls._info["describe"]
except:
config = " "
# endexcept
# print(config)
try:
for kk in config.keys():
print(kk[n_name + 1 : 40], config[kk]["source"])
# print('dev.'+def __init__():
# kk.replace('_','.'),' ',config[kk]['source'])
# endexcept
except:
print(config)
print("no key")
def list_signals_falcon(self):
"""
List signals for falcon
"""
print("........... dev.falcon ")
self.list_signals_(dev.falcon)
print("........... falcon.mca1 ")
self.list_signals_(dev.falcon.mca1)
print("........... falcon.dxp1 ")
self.list_signals_(dev.falcon.dxp1)
print("........... falcon.roi0")
self.list_signals_(dev.falcon.roi0)
def list_signals_xmap(self):
"""
List signals for XMAP
"""
print("........... dev.xmap ")
self.list_signals_(dev.xmap)
print("........... xmap.mca1 ")
self.list_signals_(dev.xmap.mca1)
print("........... xmap.dxp1 ")
self.list_signals_(dev.xmap.dxp1)
print("........... xmap.roi0")
self.list_signals_(dev.xmap.roi0)
class PhoenixBL(Utilities, PhoenixHelp):
"""
#
# General class for PHOENIX beamline located in phoenix_bec/phoenic_bec/scripts
#
"""
t0 = time.time()
def __init__(self):
"""
init PhoenixBL() in phoenix_bec/scripts
"""
# import os
# self.silent = False
# self.help()
# self.help_phoenix()
# self.help_attributes()
self.silent = False
print("..... init PhoenixBL from phoenix_bec/scripts/phoenix.py")
# Define important paths
self.base_path = "/data/test/x07mb-test-bec/production/phoenix_bec/"
self.path_phoenix_bec = self.base_path + "phoenix_bec/"
self.path_devices = self.path_phoenix_bec + "device_configs/"
self.path_scripts = self.path_phoenix_bec + "scripts/"
self.path_scans = self.path_phoenix_bec + "scans/"
self.path_scripts_local = self.path_phoenix_bec + "local_scripts/"
self.path_config_local = self.path_scripts_local + "TEST_ConfigPhoenix/"
# yamal file for default configuration
self.file_devices_file = self.path_devices + "phoenix_devices.yaml"
self.file_devices_xmap = self.path_devices + "phoenix_xmap.yaml"
self.file_devices_falcon = self.path_devices + "phoenix_falcon.yaml"
# temporary yaml file to allow adding devices with one single comamnd
self.file_devices_tmp = self.path_devices + "current_devices_tmp.yaml"
self.t0 = time.time()
# def read_local_phoenix_config(self):
# print("read file ")
# print(self.file_phoenix_devices_file)
# bec.config.update_session_with_file(self.file_devices_file_local)
def create_base_config(self):
""" "
create a yaml file from standard configuration
"""
os.system("cat " + self.file_devices_file + " > " + self.file_devices_tmp)
# os.system("ls -altr" + self.path_phoenix_bec + "phoenix_bec/devices")
bec.config.update_session_with_file(self.file_devices_tmp)
def add_phoenix_config(self):
"""
Add phoenix config
"""
print("add_phoenix_config ")
print("self.file_devices_file")
os.system("cat " + self.file_devices_file + " >> " + self.file_devices_tmp)
bec.config.update_session_with_file(self.file_devices_tmp)
def add_xmap(self):
"""Add XMAP to config"""
print("add xmap ")
os.system("cat " + self.file_devices_xmap + " >> " + self.file_devices_tmp)
bec.config.update_session_with_file(self.file_devices_tmp)
def load_xmap(self):
"""Load XMAP"""
print("load_xmap")
os.system("cat " + self.file_devices_xmap + " > " + self.file_devices_tmp)
bec.config.update_session_with_file(self.file_devices_xmap)
def add_falcon(self):
"""Add Falcon to config"""
print("add_falcon to existing configuration ")
os.system("cat " + self.file_devices_falcon + " >> " + self.file_devices_tmp)
bec.config.update_session_with_file(self.file_devices_tmp)
def load_falcon(self):
"""Load FALCON"""
print("load_falcon")
os.system("cat " + self.file_devices_falcon + " > " + self.file_devices_tmp)
bec.config.update_session_with_file(self.file_devices_falcon)
# def show_phoenix_setup(self):
# print(self.path_phoenix_bec)
# os.system("cat " + self.path_phoenix_bec + "phoenix_bec/scripts/Current_setup.txt")
def run_shell(self, commands):
"""
run one or sever shell comands in a new terminal
Example:
run_shell('ls')
runt_shell(['sh script.sh','pwd','ls'])
"""
cmd = 'gnome-terminal --geometry 100X30 -- bash -c "'
print(cmd)
if type(commands) == list:
for i in commands:
cmd = cmd + i + " ; "
# endfor
else:
cmd = cmd + commands + " ; "
# endelse
cmd = cmd + ' exec bash " '
print(cmd)
os.system(cmd)
# os.system(
# 'gnome-terminal --geometry 100X30 -- bash -c "source /data/test/x07mb-test-bec/production/bec_venv/bin/activate ; bec-server attach ; exec bash"'
# )
@classmethod
def my_log(cls, x):
"""
class method allows to write a user defined log file
time is seconds relative to some point max 10 minutes ago
"""
print(time.time())
now = time.time() - (86400 * (time.time() // 86400))
now = now - 3600.0 * (now // 3600.0)
now = now - 600.0 * (now // 600.0)
m = str(now) + " sec " + x
logger.success(m)
file = open("MyLogfile.txt", "a")
file.write(m + "\n")
file.close()
class PhGroup:
"""
Class to create data groups
compatible with larch groups
initialize by
ww=PhGroup('YourLabel')
it creates a group
with default attributes
ww.label = 'YourLabel' --- for compatibility with larch groups
ww.description =YourLabel'
Further data can be added with new tags by
ww.newtag=67
(bec_venv) [gac-x07mb@x07mb-bec-001 phoenix_bec]$
ww.keys() -- list all keys
ww.linescan2group -- converts bec linescan data to group format
"""
def __init__(self, description):
setattr(self, "description", description)
# atribute 'label' for compatibility woith La groups...
setattr(self, "label", description)
# if type(NameTag)==list:
# for i in NameTag:
# setattr(self,i,None)
# #endfor
# else:
# setattr(self,NameTag,None)
# endif
def add(self, NameTag, content):
"""
Add tags to group...
Parameters
----------
NameTag : TYPE
DESCRIPTION.
content : TYPE
DESCRIPTION.
Returns
-------
None.
"""
setattr(self, NameTag, content)
def keys(self):
"""
Method gets all atributes, which are not methods
and which do not start with __
Returns
-------
box : TYPE
DESCRIPTION.
"""
box = []
for i in self.__dir__():
if "__" not in i:
# print(i)
if str(type(self.__getattribute__(i))) != "<class 'method'>":
box.append(i)
# endif
# endfor
return box
def linescan2group(self, this_scan):
"""
method merges results of linescan into group and
creates for each data a numpy variable constructed as
group_name.{device_name}_{variable_name}_val (for value )
group_name.{device_name}_{variable_name}_ts (for timestamp )
"""
print(this_scan.scan.data.keys())
for outer_key in this_scan.scan.data.keys():
print("outer_key", outer_key)
# n_outer = len(this_scan.scan.data.keys())
for inner_key in this_scan.scan.data[outer_key].keys():
print("inner_key", inner_key)
# calculate nunber of points
n_inner = len(this_scan.scan.data[outer_key][inner_key].keys())
value = np.zeros(n_inner)
timestamp = np.zeros(n_inner)
for i in range(n_inner):
try:
value[i] = this_scan.scan.data[outer_key][inner_key][i]["value"]
except:
value = None
try:
timestamp[i] = this_scan.scan.data[outer_key][inner_key][i]["timestamp"]
except:
timestamp[i] = None
# endfor
self.add(inner_key + "_" + outer_key + "_val", value)
self.add(inner_key + "_" + outer_key + "_ts", timestamp)
# endfor
# endfor
# endfor
print(time.time())
# enddef
if __name__ == "__main__":
PH = PhoenixBL()
for x in PH.__dir__():
if "path" in x:
# print(x)
pa = getattr(PH, x)
if os.path.isdir(pa):
print(pa, " exists")
else:
print("!!- ", pa, " does not exist")
if "file" in x:
pa = getattr(PH, x)
if os.path.isfile(pa):
print(pa, " exists")
else:
print("!!- ", pa, " does not exist")