################################################################################################### # Deployment specific global definitions - executed after startup.py ################################################################################################### import os import os.path ################################################################################################### # Interlocks ################################################################################################### class MyInterlock1 (Interlock): def __init__(self): Interlock.__init__(self, (alpha, gamma)) def check(self, (a, g)): if a>=g: return False return True #interlock1 = MyInterlock1() ################################################################################################### # Hardware ################################################################################################### def open_shutter(): """ """ shutter.write("On") time.sleep(0.1) def close_shutter(): """ """ shutter.write("Off") def transm_up(factor = 10.0): close_shutter() transm.write(min(transm.position * factor, 1.0)) def transm_down(factor = 10.0): close_shutter() transm.write(transm.position / factor) def lup(): if get_option("Moving detector to direct beam... are you sure?") == "Yes": close_shutter() auto_set_level(0) transm.write(1e-8) gamma.moveAsync(0.0) delta.moveAsync(0.0) delta.waitReady(60000) gamma.waitReady(60000) ################################################################################################### # Pseudo-devices ################################################################################################### run("cpython/wrapper") if not get_context().isSimulation(): run("device/Mythen") run("device/Pixel") run("device/Image") run("device/Hexapod") else: energy.write(9.5) run("utils/Auto") ################################################################################################### # Configuration ################################################################################################### COUNT_TIME_PREFERENCE = "count_time" GEOMETRY_PREFERENCE = "geometry" ROI_PREFERENCE = "roi" BG_ROI_PREFERENCE = "bg_roi" def get_count_time(): """ """ setting = get_setting(COUNT_TIME_PREFERENCE) try: return float(setting) except: return 1.0 def set_count_time(value): """ """ set_setting(COUNT_TIME_PREFERENCE, value ) def get_geometry(): """ """ setting = get_setting(GEOMETRY_PREFERENCE) if setting is None or (len(setting.strip()) == 0): return None return setting def set_geometry(value, apply = None): """ """ if value is None or (len(value.strip()) == 0): set_setting(GEOMETRY_PREFERENCE, "" ) for name in "wavelength", "hkl_group", "h", "k", "l": dev = get_device(name) if dev is not None: remove_device(dev) return filename = get_context().setup.expandPath("{script}/geometry/"+ str(value)+".py") if not os.path.isfile(filename): raise Exception("Invalid geometry file: " + value) former = get_geometry() if ((apply is None) and former != value) or (apply==True) : set_setting(GEOMETRY_PREFERENCE, value ) run(filename) def is_geometry_set(): return get_device("wavelength") is not None def get_roi(): """ """ setting = get_setting(ROI_PREFERENCE) try: t = setting.strip().split(" ") return (int(t[0]), int(t[1]), int(t[2]), int(t[3])) except: return (1, 1, pixel.PIX_XDIM-1, pixel.PIX_YDIM-1) def set_roi(x1, y1, x2, y2): """ """ set_setting(ROI_PREFERENCE, str(int(x1)) + " " + str(int(y1)) + " " + str(int(x2)) + " " + str(int(y2)) ) def get_bg_roi(): """ """ setting = get_setting(BG_ROI_PREFERENCE) try: t = setting.strip().split(" ") return (int(t[0]), int(t[1]), int(t[2]), int(t[3])) except: return (1, 1, pixel.PIX_XDIM-1, pixel.PIX_YDIM-1) def set_bg_roi(x1, y1, x2, y2): """ """ set_setting(BG_ROI_PREFERENCE, str(int(x1)) + " " + str(int(y1)) + " " + str(int(x2)) + " " + str(int(y2)) ) ################################################################################################### # Scan callbacks ################################################################################################### def trigger_pilatus(position, scan): count_time = get_count_time() open_shutter() try: if count_time>0: pixel.set_expose(count_time) pixel.start() pixel.wait_finished(10.0) finally: close_shutter() count_id = None image_filename = None def trigger_detectors(position, scan): global count_id, image_filename count_time = get_count_time() pix_is_on = pixel in scan.readables myt_is_on = mythen in scan.readables if myt_is_on: mythen.abort() open_shutter() try: if (count_time != 0): if pix_is_on: pixel.set_expose(count_time) pixel.start() if myt_is_on: if (count_time > 0): mythen.set_acquire_time(count_time) mythen.set_acquire_mode("Single") mythen.start() else: print "Preset monitor counting is not supported\!" if pix_is_on: pixel.wait_finished(10.0) if myt_is_on: i = 0 while (mythen.is_acquiring()): time.sleep (0.05) i += 1 if (i * 0.05 >= count_time * 2): print "MYTHEN Izero times out, status: " + str(mythen.get_status()) break if pix_is_on: image_filename = pixel.get_image_filename() count_id = pixel.doUpdate() else : image_filename = None count_id = + 1 finally: close_shutter() def save_metadata(rec, scan): print "Acquired record ", rec.index if get_exec_pars().save: if rec.index == 0: create_diag_datasets() append_diag_datasets() def before_sample(position, scan): auto_before_sample(position, scan) trigger_detectors(position, scan) def after_sample(rec, scan): close_shutter() if auto_after_sample(rec, scan): save_metadata(rec, scan) ################################################################################################### # Scan metadata ################################################################################################### def get_diag_channels(): diag_channels = [ phi, \ omegaH, nu,\ omegaV, \ alpha, \ delta, \ gamma, \ xv, \ y1, \ y2, \ y3, \ trx, \ thy, \ ] if is_geometry_set(): diag_channels.append(h.readback) diag_channels.append(k.readback) diag_channels.append(l.readback) diag_channels.append(wavelength) return diag_channels def get_diag_name(diag): return ch.psi.utils.Str.toTitleCase(diag if is_string(diag) else diag.getName()).replace(" ", "") def print_diag(): for f in get_diag_channels(): print "%-25s %s" % (get_diag_name(f) , str(f.read())) def create_diag_datasets(parent = None): if parent is None: parent = get_exec_pars().group group = parent + "meta/" for f in get_diag_channels(): create_dataset(group+get_diag_name(f) , 's' if (type(f) is ch.psi.pshell.epics.ChannelString) else 'd') def append_diag_datasets(parent = None): if parent is None: parent = get_exec_pars().group group = parent + "meta/" for f in get_diag_channels(): try: x = f.read() if x is None: x = '' if (type(f) is ch.psi.pshell.epics.ChannelString) else float('nan') append_dataset(group+get_diag_name(f), x) except: log("Error sampling " + str(get_diag_name(f)) + ": " + str(sys.exc_info()[1])) ################################################################################################### # Utilities ################################################################################################### def is_locked(filepath): """Checks if a file is locked by opening it in append mode. If no exception thrown, then the file is not locked. """ locked = None file_object = None filepath = os.path.abspath(filepath) if os.path.exists(filepath): try: print "Trying to open %s." % filepath buffer_size = 8 # Opening file in append mode and read the first 8 characters. file_object = open(filepath, 'a', buffer_size) if file_object: print "%s is not locked." % filepath locked = False except IOError, message: print "File is locked (unable to open in append mode). %s." % \ message locked = True finally: if file_object: file_object.close() print "%s closed." % filepath else: print "%s not found." % filepath return locked def wait_for_files(filepaths, timeout = None): """Checks if the files are ready. For a file to be ready it must exist and can be opened in append mode. """ wait_time = 0.01 for filepath in filepaths: filepath = os.path.abspath(filepath) # If the file doesn't exist, wait wait_time seconds and try again # until it's found. while not os.path.exists(filepath): if (timeout is not None) and (time.time() -start > timeout): err = filepath + " hasn't arrived in time" print err raise Exception(err) time.sleep(wait_time) # If the file exists but locked, wait wait_time seconds and check # again until it's no longer locked by another process. while is_locked(filepath): if (timeout is not None) and (time.time() -start > timeout): err = filepath + " hasn't unlock in time" print err raise Exception(err) time.sleep(wait_time) def wait_for_file_size(filepath, size, timeout = None): """Wait for a file to exist, and reach a given size. """ wait_time = 0.01 filepath = os.path.abspath(filepath) start = time.time() # If the file doesn't exist, wait wait_time seconds and try again # until it's found. while not os.path.exists(filepath) or size > os.path.getsize(filepath): if (timeout is not None) and (time.time() -start > timeout): err = filepath + " hasn't arrived in time" print err raise Exception(err) time.sleep(wait_time) def set_data_path(path): get_context().setDataPath(path) def set_script_path(path): get_context().setScriptPath(path) ################################################################################################### # HKL commands ################################################################################################### def ci(positions, energy=None): return hklci(positions, energy)[0] def ca(hkl, energy=None): return hklca(hkl, energy)[0] def wh(): hklwh() ################################################################################################### # Scan commands ################################################################################################### def relscan(motor, start, end, number_of_steps, count_time): """ Relative scan """ set_count_time(count_time) detectors = [mythen, mythen.acquire_time, pixel, pixel.image_filename, image.intensity,image.corrected_intensity, image.matrix] #TODO: Set based on experiment context pixel.set_path("../../expdata/bml_20190703/pixel/", "images") #TODO: should be set automatically set_roi(173,88,285,136) show_panel(detector_image) pixel.show() pixel.assert_ready() return lscan (motor, detectors, start, end, int(number_of_steps), relative=True, before_read=before_sample, after_read=after_sample) def absscan(motor, start, end, number_of_steps, count_time): """ Absolute scan """ set_count_time(count_time) detectors = [mythen, mythen.acquire_time, pixel, pixel.image_filename, image.intensity,image.corrected_intensity, image.matrix] #TODO: Set based on experiment context pixel.set_path("../../expdata/bml_20190703/pixel/", "images") #TODO: should be set automatically set_roi(173,88,285,136) show_panel(detector_image) pixel.show() pixel.assert_ready() return lscan (motor, detectors, start, end, int(number_of_steps), relative=False, before_read=before_sample, after_read=after_sample) def abs2scan(motor1, start1, end1, motor2, start2, end2, number_of_steps, count_time): """ Absolute scan of 2 motors """ set_count_time(count_time) detectors = [mythen, mythen.acquire_time, pixel, pixel.image_filename, image.intensity,image.corrected_intensity, image.matrix] #TODO: Set based on experiment context pixel.set_path("../../expdata/bml_20190703/pixel/", "images") #TODO: should be set automatically set_roi(173,88,285,136) show_panel(detector_image) pixel.show() pixel.assert_ready() return lscan ([motor1, motor2], detectors, [start1, start2], [end1, end2], int(number_of_steps), relative=False, before_read=before_sample, after_read=after_sample) def hrodscan(start, end, number_of_steps, count_time): """ Scan on l """ return absscan (h, start, end, number_of_steps, count_time) def krodscan(start, end, number_of_steps, count_time): """ Scan on k """ return absscan (k, start, end, number_of_steps, count_time) def lrodscan(start, end, number_of_steps, count_time): """ Scan on l """ return absscan (l, start, end, number_of_steps, count_time) def hkllinscan(hstart, hfinish, kstart, kfinish, lstart, lfinish, number_of_steps, count_time): """ Linear scan on hkl """ vector = [] hs = float(hfinish - hstart)/number_of_steps ks = float(kfinish - kstart)/number_of_steps ls = float(lfinish - lstart)/number_of_steps for i in range(number_of_steps+1): hp = hstart + i * hs kp = kstart + i * ks lp = lstart + i * ls vector.append([hp, kp, lp] ) set_count_time(count_time) detectors = [mythen, mythen.acquire_time, pixel, pixel.image_filename, image.intensity,image.corrected_intensity, image.matrix] #TODO: Set based on experiment context pixel.set_path("../../expdata/bml_20190703/pixel/", "images") #TODO: should be set automatically set_roi(173,88,285,136) show_panel(detector_image) pixel.show() pixel.assert_ready() return hklscan(vector, detectors,latency = 0.0, before_read=before_sample, after_read=after_sample) ################################################################################################### # Configuration ################################################################################################### set_geometry(get_geometry(),True)