Files
x04sa-es3/script/local.py
gac-x04sa 006d995620 Closedown
2019-08-20 10:34:11 +02:00

537 lines
17 KiB
Python

###################################################################################################
# Deployment specific global definitions - executed after startup.py
###################################################################################################
import os
import os.path
from shutil import copyfile
import json
###################################################################################################
# Interlocks
###################################################################################################
class InterlockFourcv (Interlock):
def __init__(self):
Interlock.__init__(self, (fourcv, alpha, delta, gamma, omegaV))
def check(self, (p, a, d, g, o)):
if fourcv.isStartingSimultaneousMove():
a, d, g, o = p
if a>g:
return False
return True
#interlock1 = InterlockFourcv()
###################################################################################################
# Hardware
###################################################################################################
def open_shutter():
"""
"""
shutter.write("On")
time.sleep(0.1)
def close_shutter():
"""
"""
shutter.write("Off")
def transm_up(factor = 10.0):
close_shutter()
transm.write(min(transm.position * factor, 1.0))
def transm_down(factor = 10.0):
close_shutter()
transm.write(transm.position / factor)
def lup():
if get_option("Moving detector to direct beam... are you sure?") == "Yes":
close_shutter()
auto_set_level(0)
transm.write(1e-8)
gamma.moveAsync(0.0)
delta.moveAsync(0.0)
delta.waitReady(60000)
gamma.waitReady(60000)
def kickstart():
alpha.kickstart()
delta.kickstart()
gamma.kickstart()
omegaV.kickstart()
###################################################################################################
# Pseudo-devices
###################################################################################################
run("cpython/wrapper")
if not get_context().isSimulation():
run("device/Mythen")
run("device/Pixel")
run("device/Image")
run("device/Hexapod")
else:
energy.write(9.5)
run("utils/Auto")
###################################################################################################
# Configuration
###################################################################################################
COUNT_TIME_PREFERENCE = "count_time"
GEOMETRY_PREFERENCE = "geometry"
ROI_PREFERENCE = "roi"
BG_ROI_PREFERENCE = "bg_roi"
def get_count_time():
"""
"""
setting = get_setting(COUNT_TIME_PREFERENCE)
try:
return float(setting)
except:
return 1.0
def set_count_time(value):
"""
"""
set_setting(COUNT_TIME_PREFERENCE, value )
def get_roi():
"""
"""
setting = get_setting(ROI_PREFERENCE)
try:
t = setting.strip().split(" ")
return (int(t[0]), int(t[1]), int(t[2]), int(t[3]))
except:
return (1, 1, pixel.PIX_XDIM-1, pixel.PIX_YDIM-1)
def set_roi(x1, y1, x2, y2):
"""
"""
set_setting(ROI_PREFERENCE, str(int(x1)) + " " + str(int(y1)) + " " + str(int(x2)) + " " + str(int(y2)) )
def get_bg_roi():
"""
"""
setting = get_setting(BG_ROI_PREFERENCE)
try:
t = setting.strip().split(" ")
return (int(t[0]), int(t[1]), int(t[2]), int(t[3]))
except:
return (1, 1, pixel.PIX_XDIM-1, pixel.PIX_YDIM-1)
def set_bg_roi(x1, y1, x2, y2):
"""
"""
set_setting(BG_ROI_PREFERENCE, str(int(x1)) + " " + str(int(y1)) + " " + str(int(x2)) + " " + str(int(y2)) )
###################################################################################################
# Context
###################################################################################################
def get_geometry():
"""
"""
setting = get_setting(GEOMETRY_PREFERENCE)
if setting is None or (len(setting.strip()) == 0):
return None
return setting
def set_geometry(value, apply = None):
"""
"""
if value is None or (len(value.strip()) == 0):
set_setting(GEOMETRY_PREFERENCE, "" )
for name in "wavelength", "hkl_group", "h", "k", "l":
dev = get_device(name)
if dev is not None:
remove_device(dev)
return
filename = get_context().setup.expandPath("{script}/geometry/"+ str(value)+".py")
if not os.path.isfile(filename):
raise Exception("Invalid geometry file: " + value)
former = get_geometry()
if ((apply is None) and former != value) or (apply==True) :
set_setting(GEOMETRY_PREFERENCE, value )
run(filename)
def is_geometry_set():
return get_device("wavelength") is not None
###################################################################################################
# Scan callbacks
###################################################################################################
def trigger_pilatus(position, scan):
count_time = get_count_time()
open_shutter()
try:
if count_time>0:
pixel.set_expose(count_time)
pixel.start()
pixel.wait_finished(10.0)
finally:
close_shutter()
count_id = None
image_filename = None
def trigger_detectors(position, scan):
global count_id, image_filename
count_time = get_count_time()
pix_is_on = pixel in scan.readables
myt_is_on = mythen in scan.readables
if myt_is_on:
mythen.abort()
open_shutter()
try:
if (count_time != 0):
if pix_is_on:
pixel.set_expose(count_time)
pixel.start()
if myt_is_on:
if (count_time > 0):
mythen.set_acquire_time(count_time)
mythen.set_acquire_mode("Single")
mythen.start()
else:
print "Preset monitor counting is not supported\!"
if pix_is_on:
pixel.wait_finished(10.0)
if myt_is_on:
i = 0
while (mythen.is_acquiring()):
time.sleep (0.05)
i += 1
if (i * 0.05 >= count_time * 2):
print "MYTHEN Izero times out, status: " + str(mythen.get_status())
break
if pix_is_on:
image_filename = pixel.get_image_filename()
count_id = pixel.doUpdate()
else :
image_filename = None
count_id = + 1
finally:
close_shutter()
def save_metadata(rec, scan):
print "Acquired record ", rec.index
if get_exec_pars().save:
if rec.index == 0:
create_diag_datasets()
append_diag_datasets()
def before_sample(position, scan):
auto_before_sample(position, scan)
trigger_detectors(position, scan)
def after_sample(rec, scan):
close_shutter()
if auto_after_sample(rec, scan):
save_metadata(rec, scan)
###################################################################################################
# Scan metadata
###################################################################################################
def get_diag_channels():
diag_channels = [ phi, \
omegaH,
nu,\
omegaV, \
alpha, \
delta, \
gamma, \
xv, \
y1, \
y2, \
y3, \
trx, \
thy, \
]
if is_geometry_set():
diag_channels.append(h.readback)
diag_channels.append(k.readback)
diag_channels.append(l.readback)
diag_channels.append(wavelength)
return diag_channels
def get_diag_name(diag):
return ch.psi.utils.Str.toTitleCase(diag if is_string(diag) else diag.getName()).replace(" ", "")
def print_diag():
for f in get_diag_channels():
print "%-25s %s" % (get_diag_name(f) , str(f.read()))
def create_diag_datasets(parent = None):
if parent is None:
parent = get_exec_pars().group
group = parent + "meta/"
for f in get_diag_channels():
create_dataset(group+get_diag_name(f) , 's' if (type(f) is ch.psi.pshell.epics.ChannelString) else 'd')
def append_diag_datasets(parent = None):
if parent is None:
parent = get_exec_pars().group
group = parent + "meta/"
for f in get_diag_channels():
try:
x = f.read()
if x is None:
x = '' if (type(f) is ch.psi.pshell.epics.ChannelString) else float('nan')
append_dataset(group+get_diag_name(f), x)
except:
log("Error sampling " + str(get_diag_name(f)) + ": " + str(sys.exc_info()[1]))
###################################################################################################
# Utilities
###################################################################################################
def is_locked(filepath):
"""Checks if a file is locked by opening it in append mode.
If no exception thrown, then the file is not locked.
"""
locked = None
file_object = None
filepath = os.path.abspath(filepath)
if os.path.exists(filepath):
try:
print "Trying to open %s." % filepath
buffer_size = 8
# Opening file in append mode and read the first 8 characters.
file_object = open(filepath, 'a', buffer_size)
if file_object:
print "%s is not locked." % filepath
locked = False
except IOError, message:
print "File is locked (unable to open in append mode). %s." % \
message
locked = True
finally:
if file_object:
file_object.close()
print "%s closed." % filepath
else:
print "%s not found." % filepath
return locked
def wait_for_files(filepaths, timeout = None):
"""Checks if the files are ready.
For a file to be ready it must exist and can be opened in append
mode.
"""
wait_time = 0.01
for filepath in filepaths:
filepath = os.path.abspath(filepath)
# If the file doesn't exist, wait wait_time seconds and try again
# until it's found.
while not os.path.exists(filepath):
if (timeout is not None) and (time.time() -start > timeout):
err = filepath + " hasn't arrived in time"
print err
raise Exception(err)
time.sleep(wait_time)
# If the file exists but locked, wait wait_time seconds and check
# again until it's no longer locked by another process.
while is_locked(filepath):
if (timeout is not None) and (time.time() -start > timeout):
err = filepath + " hasn't unlock in time"
print err
raise Exception(err)
time.sleep(wait_time)
def wait_for_file_size(filepath, size, timeout = None):
"""Wait for a file to exist, and reach a given size.
"""
wait_time = 0.01
filepath = os.path.abspath(filepath)
start = time.time()
# If the file doesn't exist, wait wait_time seconds and try again
# until it's found.
while not os.path.exists(filepath) or size > os.path.getsize(filepath):
if (timeout is not None) and (time.time() -start > timeout):
err = filepath + " hasn't arrived in time"
print err
raise Exception(err)
time.sleep(wait_time)
def set_data_path(path):
"""Changes data root path.
"""
get_context().setDataPath(path)
def set_script_path(path):
"""Changes script root path.
"""
get_context().setScriptPath(path)
def backup_ub(name=None, destination = "{data}"):
"""Copies ub matrix (default= current) to user space.
"""
if not name:
name = ub.ubcalc._state.name
name = name + ".json"
f = settings.persistence_path + "/" + name
if not os.path.isfile(f):
raise Exception("Invalid UB name: " + str(name))
copyfile(f, get_context().setup.expandPath(destination + "/" +name))
def restore_ub(name, origin = "{data}"):
"""Restores ub matrix from user space and loads it.
"""
f = settings.persistence_path + "/" + name + ".json"
copyfile(get_context().setup.expandPath(origin + "/" +name + ".json"), f)
loadub(name)
###################################################################################################
# HKL commands
###################################################################################################
def ci(positions, energy=None):
return hklci(positions, energy)[0]
def ca(hkl, energy=None):
return hklca(hkl, energy)[0]
def wh():
hklwh()
###################################################################################################
# Scan commands
###################################################################################################
def relscan(motor, start, end, number_of_steps, count_time):
"""
Relative scan
"""
set_count_time(count_time)
detectors = [mythen, mythen.acquire_time, pixel, pixel.image_filename, image.intensity,image.corrected_intensity, image.matrix]
#TODO: Set based on experiment context
pixel.set_path("../../expdata/bml_20190703/pixel/", "images")
#TODO: should be set automatically
set_roi(173,88,285,136)
show_panel(detector_image)
pixel.show()
pixel.assert_ready()
return lscan (motor, detectors, start, end, int(number_of_steps), relative=True, before_read=before_sample, after_read=after_sample)
def absscan(motor, start, end, number_of_steps, count_time):
"""
Absolute scan
"""
set_count_time(count_time)
detectors = [mythen, mythen.acquire_time, pixel, pixel.image_filename, image.intensity,image.corrected_intensity, image.matrix]
#TODO: Set based on experiment context
pixel.set_path("../../expdata/bml_20190703/pixel/", "images")
#TODO: should be set automatically
set_roi(173,88,285,136)
show_panel(detector_image)
pixel.show()
pixel.assert_ready()
return lscan (motor, detectors, start, end, int(number_of_steps), relative=False, before_read=before_sample, after_read=after_sample)
def abs2scan(motor1, start1, end1, motor2, start2, end2, number_of_steps, count_time):
"""
Absolute scan of 2 motors
"""
set_count_time(count_time)
detectors = [mythen, mythen.acquire_time, pixel, pixel.image_filename, image.intensity,image.corrected_intensity, image.matrix]
#TODO: Set based on experiment context
pixel.set_path("../../expdata/bml_20190703/pixel/", "images")
#TODO: should be set automatically
set_roi(173,88,285,136)
show_panel(detector_image)
pixel.show()
pixel.assert_ready()
return lscan ([motor1, motor2], detectors, [start1, start2], [end1, end2], int(number_of_steps), relative=False, before_read=before_sample, after_read=after_sample)
def hrodscan(start, end, number_of_steps, count_time):
"""
Scan on l
"""
return absscan (h, start, end, number_of_steps, count_time)
def krodscan(start, end, number_of_steps, count_time):
"""
Scan on k
"""
return absscan (k, start, end, number_of_steps, count_time)
def lrodscan(start, end, number_of_steps, count_time):
"""
Scan on l
"""
return absscan (l, start, end, number_of_steps, count_time)
def hkllinscan(hstart, hfinish, kstart, kfinish, lstart, lfinish, number_of_steps, count_time):
"""
Linear scan on hkl
"""
vector = []
hs = float(hfinish - hstart)/number_of_steps
ks = float(kfinish - kstart)/number_of_steps
ls = float(lfinish - lstart)/number_of_steps
for i in range(number_of_steps+1):
hp = hstart + i * hs
kp = kstart + i * ks
lp = lstart + i * ls
vector.append([hp, kp, lp] )
set_count_time(count_time)
detectors = [mythen, mythen.acquire_time, pixel, pixel.image_filename, image.intensity,image.corrected_intensity, image.matrix]
#TODO: Set based on experiment context
pixel.set_path("../../expdata/bml_20190703/pixel/", "images")
#TODO: should be set automatically
set_roi(173,88,285,136)
show_panel(detector_image)
pixel.show()
pixel.assert_ready()
return hklscan(vector, detectors,latency = 0.0, before_read=before_sample, after_read=after_sample)
###################################################################################################
# Configuration
###################################################################################################
set_geometry(get_geometry(),True)
load_exp_context()