################################################################################################### # Deployment specific global definitions - executed after startup.py ################################################################################################### import os import os.path from shutil import copyfile import json ################################################################################################### # Interlocks ################################################################################################### INTERLOCKS = [] def clear_interlocks(): global INTERLOCKS for interlock in INTERLOCKS: try: interlock.close() except: pass INTERLOCKS = [] def add_interlock(interlock): global INTERLOCKS INTERLOCKS.append(interlock) ################################################################################################### # Hardware ################################################################################################### def open_shutter(): """ """ shutter.write("On") time.sleep(0.1) def close_shutter(): """ """ shutter.write("Off") def transm_up(factor = 10.0): close_shutter() transm.write(min(transm.position * factor, 1.0)) def transm_down(factor = 10.0): close_shutter() transm.write(transm.position / factor) def lup(): if get_option("Moving detector to direct beam... are you sure?") == "Yes": close_shutter() auto_set_level(0) transm.write(1e-8) gamma.moveAsync(0.0) delta.moveAsync(0.0) delta.waitReady(60000) gamma.waitReady(60000) def kickstart(): parallelize(alpha.kickstart, delta.kickstart, gamma.kickstart, omegaV.kickstart) ################################################################################################### # Pseudo-devices ################################################################################################### run("cpython/wrapper") if not get_context().isSimulation(): run("device/Mythen") run("device/Pixel") run("device/Image") run("device/Hexapod") else: energy.write(9.5) run("utils/Auto") ################################################################################################### # Configuration ################################################################################################### COUNT_TIME_PREFERENCE = "count_time" GEOMETRY_PREFERENCE = "geometry" ROI_PREFERENCE = "roi" BG_ROI_PREFERENCE = "bg_roi" USER_NAME_PREFERENCE = "user_name" USER_EXP_PREFERENCE = "user_exp" DATA_ROOT_PREFERENCE = "data_root" DATA_ROOT_PREFERENCE = "data_root" DETECTORS_PREFERENCE = "detectors" def get_count_time(): """ """ setting = get_setting(COUNT_TIME_PREFERENCE) try: return float(setting) except: return 1.0 def set_count_time(value): """ """ set_setting(COUNT_TIME_PREFERENCE, value ) def get_roi(): """ """ setting = get_setting(ROI_PREFERENCE) try: t = setting.strip().split(" ") return (int(t[0]), int(t[1]), int(t[2]), int(t[3])) except: return (1, 1, pixel.PIX_XDIM-1, pixel.PIX_YDIM-1) def set_roi(x1, y1, x2, y2): """ """ set_setting(ROI_PREFERENCE, str(int(x1)) + " " + str(int(y1)) + " " + str(int(x2)) + " " + str(int(y2)) ) def get_bg_roi(): """ """ setting = get_setting(BG_ROI_PREFERENCE) try: t = setting.strip().split(" ") return (int(t[0]), int(t[1]), int(t[2]), int(t[3])) except: return (1, 1, pixel.PIX_XDIM-1, pixel.PIX_YDIM-1) def set_bg_roi(x1, y1, x2, y2): """ """ set_setting(BG_ROI_PREFERENCE, str(int(x1)) + " " + str(int(y1)) + " " + str(int(x2)) + " " + str(int(y2)) ) ################################################################################################### # Context ################################################################################################### def get_geometry(): """ """ setting = get_setting(GEOMETRY_PREFERENCE) if setting is None or (len(setting.strip()) == 0): return None return setting def set_geometry(value, apply = None): """ """ if value is None or (len(value.strip()) == 0): set_setting(GEOMETRY_PREFERENCE, "" ) for name in "wavelength", "hkl_group", "h", "k", "l": dev = get_device(name) if dev is not None: remove_device(dev) return filename = get_context().setup.expandPath("{script}/geometry/"+ str(value)+".py") if not os.path.isfile(filename): raise Exception("Invalid geometry file: " + value) former = get_geometry() if ((apply is None) and former != value) or (apply==True) : set_setting(GEOMETRY_PREFERENCE, value ) run(filename) def is_geometry_set(): return get_device("wavelength") is not None def set_user_env(user, exp, path="/sls/X04SA/data/x04sa/ES3/expdata"): """ Sets user folders """ set_setting(USER_NAME_PREFERENCE, user) set_setting(USER_EXP_PREFERENCE, exp) set_setting(DATA_ROOT_PREFERENCE, path) data_path = path + "/" +user + "/" +exp data_path = os.path.abspath(data_path) set_data_path(data_path) if not os.path.exists(data_path): os.makedirs(data_path) os.chmod (data_path, 0o777) os.chmod (os.path.dirname(data_path), 0o777) #pixel.set_path(path, user + "/" +exp + "/images") pixel.set_path("../../expdata/" + user + "/" +exp, "images") if not os.path.exists(pixel.get_full_path()): os.makedirs(pixel.get_full_path()) os.chmod (pixel.get_full_path(), 0o777) def get_user_env(): return (get_setting(USER_NAME_PREFERENCE), get_setting(USER_EXP_PREFERENCE), get_setting(DATA_ROOT_PREFERENCE)) def load_user_env(): (user, exp, path) = get_user_env() if user and path and exp: set_user_env(user, exp, path) SUB_DEVICE_DETECTORS = {mythen.acquire_time : "mythen.acquire_time", pixel.image_filename: "pixel.image_filename", image.intensity : "image.intensity", image.corrected_intensity : "image.corrected_intensity", image.matrix : "image.matrix"} def get_detectors(): det_str = get_setting(DETECTORS_PREFERENCE) if not det_str or not (det_str.strip()): return [] devs = det_str.split(",") ret = [] for name in devs: name = name.strip() dev = get_context().devicePool.getByName(name) if not dev: try: dev = eval(name) assert issubclass(dev.__class__,Device) except: dev = None if dev: ret.append(dev) return ret def set_detectors(detectors): """ """ devs = [] if detectors: for d in detectors: if is_string(d): name = d elif d in SUB_DEVICE_DETECTORS.keys(): name = SUB_DEVICE_DETECTORS[d] else: name = d.name devs.append(name) set_setting(DETECTORS_PREFERENCE, ",".join(devs)) #set_detectors([mythen, mythen.acquire_time, pixel, pixel.image_filename, image.intensity,image.corrected_intensity, image.matrix]) ################################################################################################### # Scan callbacks ################################################################################################### def trigger_pilatus(position, scan): count_time = get_count_time() open_shutter() try: if count_time>0: pixel.set_expose(count_time) pixel.start() pixel.wait_finished(10.0) finally: close_shutter() count_id = None image_filename = None def trigger_detectors(position, scan): global count_id, image_filename count_time = get_count_time() pix_is_on = pixel in scan.readables myt_is_on = mythen in scan.readables if myt_is_on: mythen.abort() open_shutter() try: if (count_time != 0): if pix_is_on: pixel.set_expose(count_time) pixel.start() if myt_is_on: if (count_time > 0): mythen.set_acquire_time(count_time) mythen.set_acquire_mode("Single") mythen.start() else: print "Preset monitor counting is not supported\!" if pix_is_on: pixel.wait_finished(10.0) if myt_is_on: i = 0 while (mythen.is_acquiring()): time.sleep (0.05) i += 1 if (i * 0.05 >= count_time * 2): print "MYTHEN Izero times out, status: " + str(mythen.get_status()) break if pix_is_on: image_filename = pixel.get_image_filename() count_id = pixel.doUpdate() else : image_filename = None count_id = + 1 finally: close_shutter() def save_metadata(rec, scan): print "Acquired record ", rec.index if get_exec_pars().save: if rec.index == 0: create_diag_datasets() append_diag_datasets() def save_experiment_context(parent = None): if parent is None: parent = get_exec_pars().group group = parent + "experiment/" if get_setting(USER_NAME_PREFERENCE): save_dataset(group+"user" , get_setting(USER_NAME_PREFERENCE)) if get_setting(USER_EXP_PREFERENCE): save_dataset(group+"name" , get_setting(USER_EXP_PREFERENCE)) if get_geometry(): save_dataset(group+"geometry" , get_geometry()) ctxt = get_exp_context() limits,constraints,ub_name = ctxt["limits"], ctxt["constraints"], ctxt["ub"] if limits: save_dataset(group+"limits" , str(limits)) if (constraints): save_dataset(group+"constraints" , str(constraints)) if (ub_name): save_dataset(group+"ub_name" , str(ub_name)) ub_matrix = getub() save_dataset(group+"ub" ,ub_matrix) save_dataset(group+"ub_str" ,str(ub_matrix)) def before_sample(position, scan): if scan.recordIndex == 1: save_experiment_context() auto_before_sample(position, scan) trigger_detectors(position, scan) def after_sample(rec, scan): close_shutter() if auto_after_sample(rec, scan): save_metadata(rec, scan) ################################################################################################### # Scan metadata ################################################################################################### def get_diag_channels(): diag_channels = [ phi, \ omegaH, nu,\ omegaV, \ alpha, \ delta, \ gamma, \ xv, \ y1, \ y2, \ y3, \ trx, \ thy, \ ] if is_geometry_set(): diag_channels.append(h.readback) diag_channels.append(k.readback) diag_channels.append(l.readback) diag_channels.append(wavelength) return diag_channels def get_diag_name(diag): return ch.psi.utils.Str.toTitleCase(diag if is_string(diag) else diag.getName()).replace(" ", "") def print_diag(): for f in get_diag_channels(): print "%-25s %s" % (get_diag_name(f) , str(f.read())) def create_diag_datasets(parent = None): if parent is None: parent = get_exec_pars().group group = parent + "meta/" for f in get_diag_channels(): create_dataset(group+get_diag_name(f) , 's' if (type(f) is ch.psi.pshell.epics.ChannelString) else 'd') def append_diag_datasets(parent = None): if parent is None: parent = get_exec_pars().group group = parent + "meta/" for f in get_diag_channels(): try: x = f.read() if x is None: x = '' if (type(f) is ch.psi.pshell.epics.ChannelString) else float('nan') append_dataset(group+get_diag_name(f), x) except: log("Error sampling " + str(get_diag_name(f)) + ": " + str(sys.exc_info()[1])) ################################################################################################### # Utilities ################################################################################################### def is_locked(filepath): """Checks if a file is locked by opening it in append mode. If no exception thrown, then the file is not locked. """ locked = None file_object = None filepath = os.path.abspath(filepath) if os.path.exists(filepath): try: print "Trying to open %s." % filepath buffer_size = 8 # Opening file in append mode and read the first 8 characters. file_object = open(filepath, 'a', buffer_size) if file_object: print "%s is not locked." % filepath locked = False except IOError, message: print "File is locked (unable to open in append mode). %s." % \ message locked = True finally: if file_object: file_object.close() print "%s closed." % filepath else: print "%s not found." % filepath return locked def wait_for_files(filepaths, timeout = None): """Checks if the files are ready. For a file to be ready it must exist and can be opened in append mode. """ wait_time = 0.01 for filepath in filepaths: filepath = os.path.abspath(filepath) # If the file doesn't exist, wait wait_time seconds and try again # until it's found. while not os.path.exists(filepath): if (timeout is not None) and (time.time() -start > timeout): err = filepath + " hasn't arrived in time" print err raise Exception(err) time.sleep(wait_time) # If the file exists but locked, wait wait_time seconds and check # again until it's no longer locked by another process. while is_locked(filepath): if (timeout is not None) and (time.time() -start > timeout): err = filepath + " hasn't unlock in time" print err raise Exception(err) time.sleep(wait_time) def wait_for_file_size(filepath, size, timeout = None): """Wait for a file to exist, and reach a given size. """ wait_time = 0.01 filepath = os.path.abspath(filepath) start = time.time() # If the file doesn't exist, wait wait_time seconds and try again # until it's found. while not os.path.exists(filepath) or size > os.path.getsize(filepath): if (timeout is not None) and (time.time() -start > timeout): err = filepath + " hasn't arrived in time" print err raise Exception(err) time.sleep(wait_time) def set_data_path(path): """Changes data root path. """ get_context().setDataPath(path) def set_script_path(path): """Changes script root path. """ get_context().setScriptPath(path) def backup_ub(name=None, destination = "{data}"): """Copies ub matrix (default= current) to user space. """ if not name: name = ub.ubcalc._state.name if not name: raise Exception("No UB matrix defined") name = name + ".json" f = settings.persistence_path + "/" + name if not os.path.isfile(f): raise Exception("Invalid UB name: " + str(name)) copyfile(f, get_context().setup.expandPath(destination + "/" +name)) def restore_ub(name, origin = "{data}"): """Restores ub matrix from user space and loads it. """ f = settings.persistence_path + "/" + name + ".json" copyfile(get_context().setup.expandPath(origin + "/" +name + ".json"), f) loadub(name) ################################################################################################### # HKL commands ################################################################################################### def ci(positions, energy=None): return hklci(positions, energy)[0] def ca(hkl, energy=None): return hklca(hkl, energy)[0] def wh(): hklwh() ################################################################################################### # Scan commands ################################################################################################### def relscan(motor, start, end, number_of_steps, count_time): """ Relative scan """ set_count_time(count_time) show_panel(detector_image) pixel.show() pixel.assert_ready() return lscan (motor, detectors, start, end, int(number_of_steps), relative=True, before_read=before_sample, after_read=after_sample, name="relscan") def absscan(motor, start, end, number_of_steps, count_time): """ Absolute scan """ set_count_time(count_time) show_panel(detector_image) pixel.show() pixel.assert_ready() return lscan (motor, get_detectors(), start, end, int(number_of_steps), relative=False, before_read=before_sample, after_read=after_sample, name="absscan") def abs2scan(motor1, start1, end1, motor2, start2, end2, number_of_steps, count_time): """ Absolute scan of 2 motors """ set_count_time(count_time) show_panel(detector_image) pixel.show() pixel.assert_ready() return lscan ([motor1, motor2], get_detectors(), [start1, start2], [end1, end2], int(number_of_steps), relative=False, before_read=before_sample, after_read=after_sample, name="abs2scan") def hrodscan(start, end, number_of_steps, count_time): """ Scan on h """ return absscan (h, start, end, number_of_steps, count_time, name="hrodscan") def krodscan(start, end, number_of_steps, count_time): """ Scan on k """ return absscan (k, start, end, number_of_steps, count_time, name="hrodscan") def lrodscan(start, end, number_of_steps, count_time): """ Scan on l """ return absscan (l, start, end, number_of_steps, count_time, name="lrodscan") def hkllinscan(hstart, hfinish, kstart, kfinish, lstart, lfinish, number_of_steps, count_time): """ Linear scan on hkl """ vector = [] hs = float(hfinish - hstart)/number_of_steps ks = float(kfinish - kstart)/number_of_steps ls = float(lfinish - lstart)/number_of_steps for i in range(number_of_steps+1): hp = hstart + i * hs kp = kstart + i * ks lp = lstart + i * ls vector.append([hp, kp, lp] ) set_count_time(count_time) show_panel(detector_image) pixel.show() pixel.assert_ready() return hklscan(vector, get_detectors(),latency = 0.0, before_read=before_sample, after_read=after_sample, name="hkllinscan") def ct(count_time): """ Detectors single sampling """ if count_time: set_count_time(count_time) tscan(get_detectors(), 1, 1.0 , before_read=before_sample, after_read=after_sample, name="ct") ################################################################################################### # Configuration ################################################################################################### set_geometry(get_geometry(),True) load_user_env() load_exp_context() ################################################################################################### # Device initialization ################################################################################################### kickstart()