################################################################################################### # Deployment specific global definitions - executed after startup.py ################################################################################################### import os import os.path ################################################################################################### # Interlocks ################################################################################################### class MyInterlock1 (Interlock): def __init__(self): Interlock.__init__(self, (alpha, gamma)) def check(self, (a, g)): if a>=g: return False return True #interlock1 = MyInterlock1() ################################################################################################### # Hardware ################################################################################################### def open_shutter(): """ """ shutter.write("On") time.sleep(0.1) def close_shutter(): """ """ shutter.write("Off") ################################################################################################### # Pseudo-devices ################################################################################################### run("cpython/wrapper") if not get_context().isSimulation(): run("device/Mythen") run("device/Pixel") run("device/Image") else: energy.write(9.5) def after_sample(record, scan): close_shutter() ################################################################################################### # Configuration ################################################################################################### COUNT_TIME_PREFERENCE = "count_time" GEOMETRY_PREFERENCE = "geometry" ROI_PREFERENCE = "roi" BG_ROI_PREFERENCE = "bg_roi" def get_count_time(): """ """ setting = get_setting(COUNT_TIME_PREFERENCE) try: return float(setting) except: return 1.0 def set_count_time(value): """ """ set_setting(COUNT_TIME_PREFERENCE, value ) def get_geometry(): """ """ setting = get_setting(GEOMETRY_PREFERENCE) if setting is None or (len(setting.strip()) == 0): return None return setting def set_geometry(value, apply = None): """ """ if value is None or (len(value.strip()) == 0): set_setting(GEOMETRY_PREFERENCE, "" ) for name in "wavelength", "hkl_group", "h", "k", "l": dev = get_device(name) if dev is not None: remove_device(dev) return filename = get_context().setup.expandPath("{script}/geometry/"+ str(value)+".py") if not os.path.isfile(filename): raise Exception("Invalid geometry file: " + value) former = get_geometry() if ((apply is None) and former != value) or (apply==True) : set_setting(GEOMETRY_PREFERENCE, value ) run(filename) def is_geometry_set(): return get_device("wavelength") is not None def get_roi(): """ """ setting = get_setting(ROI_PREFERENCE) try: t = setting.strip().split(" ") return (int(t[0]), int(t[1]), int(t[2]), int(t[3])) except: return (1, 1, pixel.PIX_XDIM-1, pixel.PIX_YDIM-1) def set_roi(x1, y1, x2, y2): """ """ set_setting(ROI_PREFERENCE, str(int(x1)) + " " + str(int(y1)) + " " + str(int(x2)) + " " + str(int(y2)) ) def get_bg_roi(): """ """ setting = get_setting(BG_ROI_PREFERENCE) try: t = setting.strip().split(" ") return (int(t[0]), int(t[1]), int(t[2]), int(t[3])) except: return (1, 1, pixel.PIX_XDIM-1, pixel.PIX_YDIM-1) def set_bg_roi(x1, y1, x2, y2): """ """ set_setting(BG_ROI_PREFERENCE, str(int(x1)) + " " + str(int(y1)) + " " + str(int(x2)) + " " + str(int(y2)) ) ################################################################################################### # Scan callbacks ################################################################################################### def trigger_pilatus(position, scan): count_time = get_count_time() open_shutter() try: if count_time>0: pixel.set_expose(count_time) pixel.start() pixel.wait_finished(30.0) finally: close_shutter() count_id = None image_filename = None def trigger_detectors(position, scan): global count_id, image_filename count_time = get_count_time() pix_is_on = pixel in scan.readables myt_is_on = mythen in scan.readables if myt_is_on: mythen.abort() open_shutter() try: if (count_time != 0): if pix_is_on: pixel.set_expose(count_time) pixel.start() if myt_is_on: if (count_time > 0): mythen.set_acquire_time(count_time) mythen.set_acquire_mode("Single") mythen.start() else: print "Preset monitor counting is not supported\!" if pix_is_on: pixel.wait_finished(10.0) if myt_is_on: i = 0 while (mythen.is_acquiring()): time.sleep (0.05) i += 1 if (i * 0.05 >= count_time * 2): print "MYTHEN Izero times out, status: " + str(mythen.get_status()) break if pix_is_on: image_filename = pixel.get_image_filename() count_id = pixel.doUpdate() else : image_filename = None count_id = + 1 finally: close_shutter() def save_metadata(rec): print "Acquired record ", rec.index if get_exec_pars().save: if rec.index == 0: create_diag_datasets() append_diag_datasets() ################################################################################################### # Scan metadata ################################################################################################### def get_diag_channels(): diag_channels = [ phi, \ omegaH, nu,\ omegaV, \ alpha, \ delta, \ gamma, \ xv, \ y1, \ y2, \ y3, \ trx, \ thy, \ ] if is_geometry_set(): diag_channels.append(h.readback) diag_channels.append(k.readback) diag_channels.append(l.readback) diag_channels.append("wavelength") return diag_channels def get_diag_name(diag): return ch.psi.utils.Str.toTitleCase(diag.getName()).replace(" ", "") def print_diag(): for f in get_diag_channels(): print "%-25s %s" % (get_diag_name(f) , str(f.read())) def create_diag_datasets(parent = None): if parent is None: parent = get_exec_pars().group group = parent + "meta/" for f in get_diag_channels(): create_dataset(group+get_diag_name(f) , 's' if (type(f) is ch.psi.pshell.epics.ChannelString) else 'd') def append_diag_datasets(parent = None): if parent is None: parent = get_exec_pars().group group = parent + "meta/" for f in get_diag_channels(): try: x = f.read() if x is None: x = '' if (type(f) is ch.psi.pshell.epics.ChannelString) else float('nan') append_dataset(group+get_diag_name(f), x) except: log("Error sampling " + str(get_diag_name(f)) + ": " + str(sys.exc_info()[1])) ################################################################################################### # Configuration ################################################################################################### def is_locked(filepath): """Checks if a file is locked by opening it in append mode. If no exception thrown, then the file is not locked. """ locked = None file_object = None filepath = os.path.abspath(filepath) if os.path.exists(filepath): try: buffer_size = 8 # Opening file in append mode and read the first 8 characters. file_object = open(filepath, 'a', buffer_size) if file_object: locked = False except IOError, message: print "File is locked (unable to open in append mode). %s." % \ message locked = True finally: if file_object: file_object.close() else: print "%s not found." % filepath return locked def wait_for_files(filepaths): """Checks if the files are ready. For a file to be ready it must exist and can be opened in append mode. """ wait_time = 0.01 for filepath in filepaths: filepath = os.path.abspath(filepath) # If the file doesn't exist, wait wait_time seconds and try again # until it's found. while not os.path.exists(filepath): time.sleep(wait_time) # If the file exists but locked, wait wait_time seconds and check # again until it's no longer locked by another process. while is_locked(filepath): time.sleep(wait_time) def wait_for_file_size(filepath, size): wait_time = 0.01 filepath = os.path.abspath(filepath) # If the file doesn't exist, wait wait_time seconds and try again # until it's found. while not os.path.exists(filepath) and os.path.getsize(filepath) < size: time.sleep(wait_time) ################################################################################################### # Configuration ################################################################################################### set_geometry(get_geometry(),True)