Files
x05la/script/local.py
gac-x05la ff161e0297 05_2022
2022-05-24 15:50:20 +02:00

138 lines
4.5 KiB
Python

###################################################################################################
# Deployment specific global definitions - executed after startup.py
###################################################################################################
import ch.psi.fda.ProcessorFDA as ProcessorFDA
import ch.psi.pshell.crlogic.CrlogicPositioner as CrlogicPositioner
import ch.psi.pshell.crlogic.CrlogicSensor as CrlogicSensor
import ch.psi.pshell.epics.ChannelDouble as ChannelDouble
import ch.psi.pshell.epics.ChannelDoubleArray as ChannelDoubleArray
import ch.psi.pshell.epics.ChannelInteger as ChannelInteger
import ch.psi.pshell.epics.ChannelIntegerArray as ChannelIntegerArray
crlogic_config = {}
crlogic_config["class"] = "ch.psi.pshell.crlogic.CrlogicScan"
#crlogic_config["prefix"] = "X10DA-ES1-CRL"
#crlogic_config["ioc"] = "X10DA-VME-ES1"
#crlogic_config["integrationTime"] = 0.2
#crlogic_config["additionalBacklash"] = 0.01
run("CPython/wrapper")
DETECTORS={ \
'Eiger4m/scan': {"path":"X05LA-ES1-EIGER1:cam1:FilePath", "enabled": False}, \
'FalconX/scan': {"path":"X05LA-SITORO:HDF1:FilePath", "enabled": False}, \
'XRayEye': {"path":"X05LA-ES2-GIGE01:TIFF1:FilePath", "enabled": False} \
}
###################################################################################################
# EPICS utilities
###################################################################################################
def caget_str(ch):
return ''.join((chr(i) if i else "") for i in caget(ch))
def caput_str(ch, val):
ret = [ord(c) for c in val]
ret = ret + ([0] * (256-len(ret)))
ret = to_array(ret, 'b')
caput(ch, ret)
###################################################################################################
# FDA utilities
###################################################################################################
def run_fda(file_name, arguments={}):
"""
Run FDA loop
"""
ProcessorFDA().execute(file_name,arguments)
###################################################################################################
# System callbaks
###################################################################################################
def on_command_started(info):
pass
def on_command_finished(info):
pass
def check_detector_raw_data_folder(name, raw_data_folder):
info = DETECTORS[name]
dev_path = raw_data_folder + "/" + name
channel_path = info["path"]
try:
caget_str(channel_path)
info["enabled"] = True
channel_path = info["path"]
caput_str(channel_path, dev_path)
return name + " ok"
except:
info["enabled"] = False
return name + " error"
finally:
log(name + " enabled = " + str(info["enabled"])+ " folder = " + str(dev_path))
_detector_check_futures = None
def check_detectors_raw_data_folders(raw_data_folder, timeout = 0.5):
global _detector_check_futures
if _detector_check_futures is not None:
join(_detector_check_futures)
functions = []
for name, info in DETECTORS.items():
functions.append((check_detector_raw_data_folder,(name,raw_data_folder)))
_detector_check_futures = fork(*functions)
chrono = Chrono()
while True:
if not False in [f.isDone() for f in _detector_check_futures]:
break
if timeout >0:
if chrono.isTimeout(int(timeout*1000)):
break
time.sleep(0.01)
def on_change_data_path(path):
if path is not None:
print "Data path: " + str(path)
raw_data_folder = path + "/raw_data"
os.makedirs(raw_data_folder)
"""
for name, info in DETECTORS.items():
log(name + " enabled = " + str(info["enabled"]))
if info["enabled"]:
channel_path = info["path"]
caput_str(channel_path, raw_data_folder + "/" + name)
"""
check_detectors_raw_data_folders(raw_data_folder)
def on_session_started(id):
pass
def on_session_finished(id):
pass
###################################################################################################
# System utiomities
###################################################################################################
def memory():
import java.lang.Runtime as Runtime
r=Runtime.getRuntime()
print "Used: ", r.totalMemory()
print "Max : ", r.maxMemory()