Files
x04sa-es3/script/local.py
gac-x04sa eaa5cb6919 Closedown
2019-08-22 16:06:35 +02:00

632 lines
20 KiB
Python

###################################################################################################
# Deployment specific global definitions - executed after startup.py
###################################################################################################
import os
import os.path
from shutil import copyfile
import json
###################################################################################################
# Interlocks
###################################################################################################
INTERLOCKS = []
def clear_interlocks():
global INTERLOCKS
for interlock in INTERLOCKS:
try:
interlock.close()
except:
pass
INTERLOCKS = []
def add_interlock(interlock):
global INTERLOCKS
INTERLOCKS.append(interlock)
###################################################################################################
# Hardware
###################################################################################################
def open_shutter():
"""
"""
shutter.write("On")
time.sleep(0.1)
def close_shutter():
"""
"""
shutter.write("Off")
def transm_up(factor = 10.0):
close_shutter()
transm.write(min(transm.position * factor, 1.0))
def transm_down(factor = 10.0):
close_shutter()
transm.write(transm.position / factor)
def lup():
if get_option("Moving detector to direct beam... are you sure?") == "Yes":
close_shutter()
auto_set_level(0)
transm.write(1e-8)
gamma.moveAsync(0.0)
delta.moveAsync(0.0)
delta.waitReady(60000)
gamma.waitReady(60000)
def kickstart():
parallelize(alpha.kickstart, delta.kickstart, gamma.kickstart, omegaV.kickstart)
###################################################################################################
# Pseudo-devices
###################################################################################################
run("cpython/wrapper")
if not get_context().isSimulation():
run("device/Mythen")
run("device/Pixel")
run("device/Image")
run("device/Hexapod")
else:
energy.write(9.5)
run("utils/Auto")
###################################################################################################
# Configuration
###################################################################################################
COUNT_TIME_PREFERENCE = "count_time"
GEOMETRY_PREFERENCE = "geometry"
ROI_PREFERENCE = "roi"
BG_ROI_PREFERENCE = "bg_roi"
USER_NAME_PREFERENCE = "user_name"
USER_EXP_PREFERENCE = "user_exp"
DATA_ROOT_PREFERENCE = "data_root"
DATA_ROOT_PREFERENCE = "data_root"
DETECTORS_PREFERENCE = "detectors"
def get_count_time():
"""
"""
setting = get_setting(COUNT_TIME_PREFERENCE)
try:
return float(setting)
except:
return 1.0
def set_count_time(value):
"""
"""
set_setting(COUNT_TIME_PREFERENCE, value )
def get_roi():
"""
"""
setting = get_setting(ROI_PREFERENCE)
try:
t = setting.strip().split(" ")
return (int(t[0]), int(t[1]), int(t[2]), int(t[3]))
except:
return (1, 1, pixel.PIX_XDIM-1, pixel.PIX_YDIM-1)
def set_roi(x1, y1, x2, y2):
"""
"""
set_setting(ROI_PREFERENCE, str(int(x1)) + " " + str(int(y1)) + " " + str(int(x2)) + " " + str(int(y2)) )
def get_bg_roi():
"""
"""
setting = get_setting(BG_ROI_PREFERENCE)
try:
t = setting.strip().split(" ")
return (int(t[0]), int(t[1]), int(t[2]), int(t[3]))
except:
return (1, 1, pixel.PIX_XDIM-1, pixel.PIX_YDIM-1)
def set_bg_roi(x1, y1, x2, y2):
"""
"""
set_setting(BG_ROI_PREFERENCE, str(int(x1)) + " " + str(int(y1)) + " " + str(int(x2)) + " " + str(int(y2)) )
###################################################################################################
# Context
###################################################################################################
def get_geometry():
"""
"""
setting = get_setting(GEOMETRY_PREFERENCE)
if setting is None or (len(setting.strip()) == 0):
return None
return setting
def set_geometry(value, apply = None):
"""
"""
if value is None or (len(value.strip()) == 0):
set_setting(GEOMETRY_PREFERENCE, "" )
for name in "wavelength", "hkl_group", "h", "k", "l":
dev = get_device(name)
if dev is not None:
remove_device(dev)
return
filename = get_context().setup.expandPath("{script}/geometry/"+ str(value)+".py")
if not os.path.isfile(filename):
raise Exception("Invalid geometry file: " + value)
former = get_geometry()
if ((apply is None) and former != value) or (apply==True) :
set_setting(GEOMETRY_PREFERENCE, value )
run(filename)
def is_geometry_set():
return get_device("wavelength") is not None
def set_user_env(user, exp, path="/sls/X04SA/data/x04sa/ES3/expdata"):
"""
Sets user folders
"""
set_setting(USER_NAME_PREFERENCE, user)
set_setting(USER_EXP_PREFERENCE, exp)
set_setting(DATA_ROOT_PREFERENCE, path)
data_path = path + "/" +user + "/" +exp
data_path = os.path.abspath(data_path)
set_data_path(data_path)
if not os.path.exists(data_path):
os.makedirs(data_path)
os.chmod (data_path, 0o777)
os.chmod (os.path.dirname(data_path), 0o777)
#pixel.set_path(path, user + "/" +exp + "/images")
pixel.set_path("../../expdata/" + user + "/" +exp, "images")
if not os.path.exists(pixel.get_full_path()):
os.makedirs(pixel.get_full_path())
os.chmod (pixel.get_full_path(), 0o777)
def get_user_env():
return (get_setting(USER_NAME_PREFERENCE), get_setting(USER_EXP_PREFERENCE), get_setting(DATA_ROOT_PREFERENCE))
def load_user_env():
(user, exp, path) = get_user_env()
if user and path and exp:
set_user_env(user, exp, path)
SUB_DEVICE_DETECTORS = {mythen.acquire_time : "mythen.acquire_time",
pixel.image_filename: "pixel.image_filename",
image.intensity : "image.intensity",
image.corrected_intensity : "image.corrected_intensity",
image.matrix : "image.matrix"}
def get_detectors():
det_str = get_setting(DETECTORS_PREFERENCE)
if not det_str or not (det_str.strip()):
return []
devs = det_str.split(",")
ret = []
for name in devs:
name = name.strip()
dev = get_context().devicePool.getByName(name)
if not dev:
try:
dev = eval(name)
assert issubclass(dev.__class__,Device)
except:
dev = None
if dev: ret.append(dev)
return ret
def set_detectors(detectors):
"""
"""
devs = []
if detectors:
for d in detectors:
if is_string(d):
name = d
elif d in SUB_DEVICE_DETECTORS.keys():
name = SUB_DEVICE_DETECTORS[d]
else:
name = d.name
devs.append(name)
set_setting(DETECTORS_PREFERENCE, ",".join(devs))
#set_detectors([mythen, mythen.acquire_time, pixel, pixel.image_filename, image.intensity,image.corrected_intensity, image.matrix])
###################################################################################################
# Scan callbacks
###################################################################################################
def trigger_pilatus(position, scan):
count_time = get_count_time()
open_shutter()
try:
if count_time>0:
pixel.set_expose(count_time)
pixel.start()
pixel.wait_finished(10.0)
finally:
close_shutter()
count_id = None
image_filename = None
def trigger_detectors(position, scan):
global count_id, image_filename
count_time = get_count_time()
pix_is_on = pixel in scan.readables
myt_is_on = mythen in scan.readables
if myt_is_on:
mythen.abort()
open_shutter()
try:
if (count_time != 0):
if pix_is_on:
pixel.set_expose(count_time)
pixel.start()
if myt_is_on:
if (count_time > 0):
mythen.set_acquire_time(count_time)
mythen.set_acquire_mode("Single")
mythen.start()
else:
print "Preset monitor counting is not supported\!"
if pix_is_on:
pixel.wait_finished(10.0)
if myt_is_on:
i = 0
while (mythen.is_acquiring()):
time.sleep (0.05)
i += 1
if (i * 0.05 >= count_time * 2):
print "MYTHEN Izero times out, status: " + str(mythen.get_status())
break
if pix_is_on:
image_filename = pixel.get_image_filename()
count_id = pixel.doUpdate()
else :
image_filename = None
count_id = + 1
finally:
close_shutter()
def save_metadata(rec, scan):
print "Acquired record ", rec.index
if get_exec_pars().save:
if rec.index == 0:
create_diag_datasets()
append_diag_datasets()
def save_experiment_context(parent = None):
if parent is None:
parent = get_exec_pars().group
group = parent + "experiment/"
if get_setting(USER_NAME_PREFERENCE): save_dataset(group+"user" , get_setting(USER_NAME_PREFERENCE))
if get_setting(USER_EXP_PREFERENCE): save_dataset(group+"name" , get_setting(USER_EXP_PREFERENCE))
if get_geometry(): save_dataset(group+"geometry" , get_geometry())
ctxt = get_exp_context()
limits,constraints,ub_name = ctxt["limits"], ctxt["constraints"], ctxt["ub"]
if limits:
save_dataset(group+"limits" , str(limits))
if (constraints):
save_dataset(group+"constraints" , str(constraints))
if (ub_name):
save_dataset(group+"ub_name" , str(ub_name))
ub_matrix = getub()
save_dataset(group+"ub" ,ub_matrix)
save_dataset(group+"ub_str" ,str(ub_matrix))
def before_sample(position, scan):
if scan.recordIndex == 1:
save_experiment_context()
auto_before_sample(position, scan)
trigger_detectors(position, scan)
def after_sample(rec, scan):
close_shutter()
if auto_after_sample(rec, scan):
save_metadata(rec, scan)
###################################################################################################
# Scan metadata
###################################################################################################
def get_diag_channels():
diag_channels = [ phi, \
omegaH,
nu,\
omegaV, \
alpha, \
delta, \
gamma, \
xv, \
y1, \
y2, \
y3, \
trx, \
thy, \
]
if is_geometry_set():
diag_channels.append(h.readback)
diag_channels.append(k.readback)
diag_channels.append(l.readback)
diag_channels.append(wavelength)
return diag_channels
def get_diag_name(diag):
return ch.psi.utils.Str.toTitleCase(diag if is_string(diag) else diag.getName()).replace(" ", "")
def print_diag():
for f in get_diag_channels():
print "%-25s %s" % (get_diag_name(f) , str(f.read()))
def create_diag_datasets(parent = None):
if parent is None:
parent = get_exec_pars().group
group = parent + "meta/"
for f in get_diag_channels():
create_dataset(group+get_diag_name(f) , 's' if (type(f) is ch.psi.pshell.epics.ChannelString) else 'd')
def append_diag_datasets(parent = None):
if parent is None:
parent = get_exec_pars().group
group = parent + "meta/"
for f in get_diag_channels():
try:
x = f.read()
if x is None:
x = '' if (type(f) is ch.psi.pshell.epics.ChannelString) else float('nan')
append_dataset(group+get_diag_name(f), x)
except:
log("Error sampling " + str(get_diag_name(f)) + ": " + str(sys.exc_info()[1]))
###################################################################################################
# Utilities
###################################################################################################
def is_locked(filepath):
"""Checks if a file is locked by opening it in append mode.
If no exception thrown, then the file is not locked.
"""
locked = None
file_object = None
filepath = os.path.abspath(filepath)
if os.path.exists(filepath):
try:
print "Trying to open %s." % filepath
buffer_size = 8
# Opening file in append mode and read the first 8 characters.
file_object = open(filepath, 'a', buffer_size)
if file_object:
print "%s is not locked." % filepath
locked = False
except IOError, message:
print "File is locked (unable to open in append mode). %s." % \
message
locked = True
finally:
if file_object:
file_object.close()
print "%s closed." % filepath
else:
print "%s not found." % filepath
return locked
def wait_for_files(filepaths, timeout = None):
"""Checks if the files are ready.
For a file to be ready it must exist and can be opened in append
mode.
"""
wait_time = 0.01
for filepath in filepaths:
filepath = os.path.abspath(filepath)
# If the file doesn't exist, wait wait_time seconds and try again
# until it's found.
while not os.path.exists(filepath):
if (timeout is not None) and (time.time() -start > timeout):
err = filepath + " hasn't arrived in time"
print err
raise Exception(err)
time.sleep(wait_time)
# If the file exists but locked, wait wait_time seconds and check
# again until it's no longer locked by another process.
while is_locked(filepath):
if (timeout is not None) and (time.time() -start > timeout):
err = filepath + " hasn't unlock in time"
print err
raise Exception(err)
time.sleep(wait_time)
def wait_for_file_size(filepath, size, timeout = None):
"""Wait for a file to exist, and reach a given size.
"""
wait_time = 0.01
filepath = os.path.abspath(filepath)
start = time.time()
# If the file doesn't exist, wait wait_time seconds and try again
# until it's found.
while not os.path.exists(filepath) or size > os.path.getsize(filepath):
if (timeout is not None) and (time.time() -start > timeout):
err = filepath + " hasn't arrived in time"
print err
raise Exception(err)
time.sleep(wait_time)
def set_data_path(path):
"""Changes data root path.
"""
get_context().setDataPath(path)
def set_script_path(path):
"""Changes script root path.
"""
get_context().setScriptPath(path)
def backup_ub(name=None, destination = "{data}"):
"""Copies ub matrix (default= current) to user space.
"""
if not name:
name = ub.ubcalc._state.name
if not name:
raise Exception("No UB matrix defined")
name = name + ".json"
f = settings.persistence_path + "/" + name
if not os.path.isfile(f):
raise Exception("Invalid UB name: " + str(name))
copyfile(f, get_context().setup.expandPath(destination + "/" +name))
def restore_ub(name, origin = "{data}"):
"""Restores ub matrix from user space and loads it.
"""
f = settings.persistence_path + "/" + name + ".json"
copyfile(get_context().setup.expandPath(origin + "/" +name + ".json"), f)
loadub(name)
###################################################################################################
# HKL commands
###################################################################################################
def ci(positions, energy=None):
return hklci(positions, energy)[0]
def ca(hkl, energy=None):
return hklca(hkl, energy)[0]
def wh():
hklwh()
###################################################################################################
# Scan commands
###################################################################################################
def relscan(motor, start, end, number_of_steps, count_time):
"""
Relative scan
"""
set_count_time(count_time)
show_panel(detector_image)
pixel.show()
pixel.assert_ready()
return lscan (motor, detectors, start, end, int(number_of_steps), relative=True, before_read=before_sample, after_read=after_sample, name="relscan")
def absscan(motor, start, end, number_of_steps, count_time):
"""
Absolute scan
"""
set_count_time(count_time)
show_panel(detector_image)
pixel.show()
pixel.assert_ready()
return lscan (motor, get_detectors(), start, end, int(number_of_steps), relative=False, before_read=before_sample, after_read=after_sample, name="absscan")
def abs2scan(motor1, start1, end1, motor2, start2, end2, number_of_steps, count_time):
"""
Absolute scan of 2 motors
"""
set_count_time(count_time)
show_panel(detector_image)
pixel.show()
pixel.assert_ready()
return lscan ([motor1, motor2], get_detectors(), [start1, start2], [end1, end2], int(number_of_steps), relative=False, before_read=before_sample, after_read=after_sample, name="abs2scan")
def hrodscan(start, end, number_of_steps, count_time):
"""
Scan on h
"""
return absscan (h, start, end, number_of_steps, count_time, name="hrodscan")
def krodscan(start, end, number_of_steps, count_time):
"""
Scan on k
"""
return absscan (k, start, end, number_of_steps, count_time, name="hrodscan")
def lrodscan(start, end, number_of_steps, count_time):
"""
Scan on l
"""
return absscan (l, start, end, number_of_steps, count_time, name="lrodscan")
def hkllinscan(hstart, hfinish, kstart, kfinish, lstart, lfinish, number_of_steps, count_time):
"""
Linear scan on hkl
"""
vector = []
hs = float(hfinish - hstart)/number_of_steps
ks = float(kfinish - kstart)/number_of_steps
ls = float(lfinish - lstart)/number_of_steps
for i in range(number_of_steps+1):
hp = hstart + i * hs
kp = kstart + i * ks
lp = lstart + i * ls
vector.append([hp, kp, lp] )
set_count_time(count_time)
show_panel(detector_image)
pixel.show()
pixel.assert_ready()
return hklscan(vector, get_detectors(),latency = 0.0, before_read=before_sample, after_read=after_sample, name="hkllinscan")
def ct(count_time):
"""
Detectors single sampling
"""
if count_time: set_count_time(count_time)
tscan(get_detectors(), 1, 1.0 , before_read=before_sample, after_read=after_sample, name="ct")
###################################################################################################
# Configuration
###################################################################################################
set_geometry(get_geometry(),True)
load_user_env()
load_exp_context()
###################################################################################################
# Device initialization
###################################################################################################
kickstart()