import os.path class ImageFilename(DeviceBase, Readable, Readable.StringType): def __init__(self, pixel): DeviceBase.__init__(self, "image filename") self.setParent(pixel) def read(self): return self.getParent().get_image_filename() class Pixel(DeviceBase, Readable, Readable.IntegerType): def __init__(self, name, prefix): DeviceBase.__init__(self, name) self.prefix = prefix self.image_root_folder = "/sls/X04SA/data/x04sa/ES3/pixel/images/" self.image_filename = ImageFilename(self) self.PIX_TYPE = "PilatusII" self.PIX_COLOR_DEPTH = 32 self.PIX_XDIM = 487 self.PIX_YDIM = 195 self.set_threshold(40, 500, 8000, 100000) self.exposure_start_time = 0 self.timeout = None self._update_header_length() def doInitialize(self): self.show() if not self.is_connected(): print "The Pixel detector is not connected to Epics/Spec." print "Please disconnect in tvx and connect via the CCD/Pixel Epics widget." else : if self.get_error_code() != 0: print "The pixel detector returned an error. It may not be properly" print "connected. Please disconnect in tvx and disconnect and" print "re-connect with the CCD/Pixel Epics widget." def set_image_root_folder(self, path): self.image_root_folder = path def set_threshold(self, threshold1= None, threshold2= None, threshold3= None, threshold4= None): if threshold1 is not None: self.threshold1 = threshold1 if threshold2 is not None: self.threshold2 = threshold2 if threshold3 is not None: self.threshold3 = threshold3 if threshold4 is not None: self.threshold4 = threshold4 def is_connected(self): return caget(self.prefix + ":STAT1", 's') == "Connected" def get_error_code(self): return caget(self.prefix + ":ERRCODE", 'i') def assert_ready(self): if not self.is_connected(): if not self.try_connect(): raise Exception (self.name + " not connected") err = self.get_error_code() if err!=0: raise Exception (self.name + " error: " + str(err)) status = self.get_status() if not self.is_status_ready(status): raise Exception (self.name + " is not ready - status: " + str(status)) def try_connect(self): if not self.is_connected(): print "Detector not disconnected. Trying to connect... ", self.set_trigger("Connect") start = time.time() while not self.is_connected(): if (time.time() - start) > 3.0: print "failure" return False print "success" return True def _update_header_length(self): format = self.get_image_format() ext = format.split('.')[-1] if ext == "tif": self.image_header_length = 4096 elif ext == "edf": self.image_header_length = 1024 else: self.image_header_length = 0 def get_image_filename(self): return self.get_full_path() + self.get_file_short_name() def get_next_image_filename(self): return self.get_full_path() + ( self.get_image_format() % (self.get_count_id()+1)) def get_image_format(self): return caget(self.prefix + ":FNAM_FMT") def set_image_format(self, fmt): caput(self.prefix + ":FNAM_FMT", fmt) self._update_header_length(); def get_count_id(self): ret = caget (self.prefix + ":FNUM", 'i') self.setCache(ret, None) return ret def set_count_id(self, val): caput (self.prefix + ":FNUM", int(val)) self.setCache(int(val), None) def get_expose(self): return caget (self.prefix + ":EXPOSE", 'd') / 1000.0 def set_expose(self, val): val = max(float(val), 0.000005) caput (self.prefix + ":EXPOSE", val * 1000) def get_status(self): ret = caget (self.prefix + ":STATUS", 's') if ret in ["Ready0", "Ready1"]: self.setState(State.Ready) elif ret in ["Snapping", "Writing", "Updating"]: self.setState(State.Busy) elif ret in ["Error"]: self.setState(State.Error) elif ret in ["Time-out", "Disconnected"]: self.setState(State.Offline) elif ret in ["Undefined","Connected",]: self.setState(State.Invalid) else: self.setState(State.Invalid) return ret def is_status_ready(self, status=None): if status is None: status = self.get_status() return status in ["Ready0", "Ready1"] def set_path(self, patha, pathb=""): if not patha.endswith("/"): patha=patha + "/" if pathb!="": if not pathb.endswith("/"): pathb=pathb + "/" caput(self.prefix + ":PATHa", patha) caput(self.prefix + ":PATHb", pathb) def get_path(self): return caget(self.prefix + ":PATHa") + caget(self.prefix + ":PATHb") def get_full_path(self): return self.image_root_folder + self.get_path() def get_log_path(self): return self.get_path() + "log/" def get_log_full_path(self): return self.image_root_folder + self.get_log_path() def get_log_file(self): myfile = self.get_file_short_name() myfile = myfile[0:myfile.rfind(".")]+".pxl" return self.get_log_full_path()+myfile def get_file_short_name(self): return caget(self.prefix + ":FNAM") #Trigger Snap Increment Write Inc+Wr Inc+Sn+Wr Connect Disconnect Update def set_trigger(self, val): caput (self.prefix + ":TRIG", val) def get_trigger(self): return caget (self.prefix + ":TRIG") def get_trigger_id(self): return caget (self.prefix + ":TRIG", 'i') def start(self): self.wait_finished() # Wait until pixel detector is ready self.PIX_EXPOSE = caget(self.prefix + ":EXPOSE") self.set_trigger ("Inc+Sn+Wr") self.exposure_start_time = time.time() def wait_finished(self, file_timeout = None): expose = self.get_expose() # First wait for "TRIG" to go back to idle. This should ensure that execution of the command has started. start = time.time() while (self.get_trigger_id() != 0): if (time.time() - start > 10.0): raise Exception("Trigger signal still busy, pixwait timed-out!\n") time.sleep (0.05) # Then wait for exposure time to elapse. while time.time() <= self.exposure_start_time + expose: time.sleep(0.05) # Then wait for the pixel detector to become ready. sms_flag = 0 start = time.time() while (True): stat = self.get_status() if self.is_status_ready(stat): break if stat in ("Undefined", "Time-out", "Error"): raise Exception ("Invalid PIX status:", stat) timeout=(self.timeout is not None) and (time.time() - start > self.timeout ) if (time.time() - start > 50.0) or timeout: if (smsFlag == 0): msg = "Pixel status did not return to 'Ready' within " + str(int(time.time() - start)) + " seconds." print msg notify (msg) sms_flag = 1 if timedout: raise Exception (msg) time.sleep (0.05) if sms_flag == 1: notify ("Pixel detector seems to have recovered from error.") if file_timeout is not None and file_timeout>0: start = time.time() filename = self.get_image_filename() #print "Waiting for file: " + filename while (True): if os.path.isfile(filename): #print "Ok" break if (time.time() - start) > file_timeout: #raise Exception("Timeout waiting for file: " + filename) print "Timeout waiting for file: " + filename break time.sleep (0.05) def set_auto_threshold(self): #TODO pass def show(self): print "\nThe current pixel detector settings are:" print " The frame number: ", self.get_count_id() print " The file path is:", self.get_path() print " The file template is: ", self.get_image_format() print " The last file name was: ", self.get_image_filename() print " The next file name will be: ", self.get_next_image_filename() print " Exposure time = %5d sec" % (self.get_expose(), ) print " Image dimensions: %d x %d pixels (total = %d)" % (self.PIX_XDIM, self.PIX_YDIM, self.PIX_XDIM*self.PIX_YDIM) print " Threshold values: Thresh1 = %d, Thresh2 = %d, Thresh3 = %d, Thresh4 = %d" % (self.threshold1,self.threshold2,self.threshold3,self.threshold4) print "\n\nThe current pixel detector status is:", self.get_status() def doUpdate(self): self.get_status() self.get_count_id() def read(self): #Readable interface: current return self.get_count_id() add_device( Pixel("pixel", "X04SA-ES3-CCD"), True) pixel.polling = 1000 pixel.update()