273 lines
9.7 KiB
Python
273 lines
9.7 KiB
Python
import os.path
|
|
|
|
class ImageFilename(DeviceBase, Readable, Readable.StringType):
|
|
def __init__(self, pixel):
|
|
DeviceBase.__init__(self, "image filename")
|
|
self.setParent(pixel)
|
|
self.timeout = None
|
|
|
|
def read(self):
|
|
return self.getParent().get_image_filename()
|
|
|
|
class Pixel(DeviceBase, Readable, Readable.IntegerType):
|
|
def __init__(self, name, prefix):
|
|
DeviceBase.__init__(self, name)
|
|
self.prefix = prefix
|
|
self.image_root_folder = "/sls/X04SA/data/x04sa/ES3/pixel/images/"
|
|
self.image_filename = ImageFilename(self)
|
|
|
|
def doInitialize(self):
|
|
self.PIX_TYPE = "PilatusII"
|
|
self.PIX_FILE_HEADER_LENGTH = 0
|
|
self.PIX_COLOR_DEPTH = 32
|
|
self.PIX_XDIM = 487
|
|
self.PIX_YDIM = 195
|
|
self.PIX_THRESH1 = 40
|
|
self.PIX_THRESH2 = 500
|
|
self.PIX_THRESH3 = 8000
|
|
self.PIX_THRESH4 = 100000
|
|
self.PIX_EXP_START_TIME = 0
|
|
self._update_header_length()
|
|
self.show()
|
|
|
|
if not self.is_connected():
|
|
print "The Pixel detector is not connected to Epics/Spec."
|
|
print "Please disconnect in tvx and connect via the CCD/Pixel Epics widget."
|
|
else :
|
|
if self.get_error_code() != 0:
|
|
print "The pixel detector returned an error. It may not be properly"
|
|
print "connected. Please disconnect in tvx and disconnect and"
|
|
print "re-connect with the CCD/Pixel Epics widget."
|
|
|
|
def set_image_root_folder(self, path):
|
|
self.image_root_folder = path
|
|
|
|
def set_threshold(self, threshold1= None, threshold2= None, threshold3= None, threshold4= None):
|
|
if threshold1 is not None:
|
|
self.PIX_THRESH1 = threshold1
|
|
if threshold2 is not None:
|
|
self.PIX_THRESH2 = threshold2
|
|
if threshold3 is not None:
|
|
self.PIX_THRESH3 = threshold3
|
|
if threshold4 is not None:
|
|
self.PIX_THRESH4 = threshold4
|
|
|
|
def is_connected(self):
|
|
return caget(self.prefix + ":STAT1", 's') == "Connected"
|
|
|
|
def get_error_code(self):
|
|
return caget(self.prefix + ":ERRCODE", 'i')
|
|
|
|
def assert_ready(self):
|
|
if not self.is_connected():
|
|
if not self.try_connect():
|
|
raise Exception (self.name + " not connected")
|
|
err = self.get_error_code()
|
|
if err!=0:
|
|
raise Exception (self.name + " error: " + str(err))
|
|
status = self.get_status()
|
|
if not self.is_status_ready(status):
|
|
raise Exception (self.name + " is not ready - status: " + str(status))
|
|
|
|
def try_connect(self):
|
|
if not self.is_connected():
|
|
print "Detector not disconnected. Trying to connect... ",
|
|
self.set_trigger("Connect")
|
|
start = time.time()
|
|
while not self.is_connected():
|
|
if (time.time() - start) > 3.0:
|
|
print "failure"
|
|
return False
|
|
print "success"
|
|
return True
|
|
|
|
def _update_header_length(self):
|
|
format = self.get_image_format()
|
|
ext = format.split('.')[-1]
|
|
if ext == "tif":
|
|
self.IMAGE_HEADER_LENGTH = 4096
|
|
elif ext == "edf":
|
|
self.IMAGE_HEADER_LENGTH = 1024
|
|
else:
|
|
self.IMAGE_HEADER_LENGTH = 0
|
|
|
|
def get_image_filename(self):
|
|
return self.get_full_path() + self.get_file_short_name()
|
|
|
|
def get_next_image_filename(self):
|
|
return self.get_full_path() + ( self.get_image_format() % (self.get_count_id()+1))
|
|
|
|
def get_image_format(self):
|
|
return caget(self.prefix + ":FNAM_FMT")
|
|
|
|
def set_image_format(self, fmt):
|
|
caput(self.prefix + ":FNAM_FMT", fmt)
|
|
self._update_header_length();
|
|
|
|
def get_count_id(self):
|
|
ret = caget (self.prefix + ":FNUM", 'i')
|
|
self.setCache(ret, None)
|
|
return ret
|
|
|
|
def set_count_id(self, val):
|
|
caput (self.prefix + ":FNUM", int(val))
|
|
self.setCache(int(val), None)
|
|
|
|
def get_expose(self):
|
|
return caget (self.prefix + ":EXPOSE", 'd') / 1000.0
|
|
|
|
def set_expose(self, val):
|
|
val = max(float(val), 0.000005)
|
|
caput (self.prefix + ":EXPOSE", val * 1000)
|
|
|
|
def get_status(self):
|
|
ret = caget (self.prefix + ":STATUS", 's')
|
|
if ret in ["Ready0", "Ready1"]:
|
|
self.setState(State.Ready)
|
|
elif ret in ["Snapping", "Writing", "Updating"]:
|
|
self.setState(State.Busy)
|
|
elif ret in ["Error"]:
|
|
self.setState(State.Error)
|
|
elif ret in ["Time-out", "Disconnected"]:
|
|
self.setState(State.Offline)
|
|
elif ret in ["Undefined","Connected",]:
|
|
self.setState(State.Invalid)
|
|
else:
|
|
self.setState(State.Invalid)
|
|
return ret
|
|
|
|
def is_status_ready(self, status=None):
|
|
if status is None:
|
|
status = self.get_status()
|
|
return status in ["Ready0", "Ready1"]
|
|
|
|
def set_path(self, patha, pathb=""):
|
|
if not patha.endswith("/"):
|
|
patha=patha + "/"
|
|
if pathb!="":
|
|
if not pathb.endswith("/"):
|
|
pathb=pathb + "/"
|
|
|
|
caput(self.prefix + ":PATHa", patha)
|
|
caput(self.prefix + ":PATHb", pathb)
|
|
|
|
def get_path(self):
|
|
return caget(self.prefix + ":PATHa") + caget(self.prefix + ":PATHb")
|
|
|
|
def get_full_path(self):
|
|
return self.image_root_folder + self.get_path()
|
|
|
|
def get_log_path(self):
|
|
return self.get_path() + "log/"
|
|
|
|
def get_log_full_path(self):
|
|
return self.image_root_folder + self.get_log_path()
|
|
|
|
def get_log_file(self):
|
|
myfile = self.get_file_short_name()
|
|
myfile = myfile[0:myfile.rfind(".")]+".pxl"
|
|
return self.get_log_full_path()+myfile
|
|
|
|
|
|
def get_file_short_name(self):
|
|
return caget(self.prefix + ":FNAM")
|
|
|
|
#Trigger Snap Increment Write Inc+Wr Inc+Sn+Wr Connect Disconnect Update
|
|
def set_trigger(self, val):
|
|
caput (self.prefix + ":TRIG", val)
|
|
|
|
def get_trigger(self):
|
|
return caget (self.prefix + ":TRIG")
|
|
|
|
|
|
def get_trigger_id(self):
|
|
return caget (self.prefix + ":TRIG", 'i')
|
|
|
|
def start(self):
|
|
self.wait_finished() # Wait until pixel detector is ready
|
|
self.PIX_EXPOSE = caget(self.prefix + ":EXPOSE")
|
|
self.set_trigger ("Inc+Sn+Wr")
|
|
self.PIX_EXP_START_TIME = time.time()
|
|
|
|
|
|
def wait_finished(self, file_timeout = None):
|
|
expose = self.get_expose()
|
|
|
|
# First wait for "TRIG" to go back to idle. This should ensure that execution of the command has started.
|
|
start = time.time()
|
|
while (self.get_trigger_id() != 0):
|
|
if (time.time() - start > 10.0):
|
|
raise Exception("Trigger signal still busy, pixwait timed-out!\n")
|
|
time.sleep (0.05)
|
|
|
|
# Then wait for exposure time to elapse.
|
|
while time.time() <= self.PIX_EXP_START_TIME + expose:
|
|
time.sleep(0.05)
|
|
|
|
# Then wait for the pixel detector to become ready.
|
|
sms_flag = 0
|
|
start = time.time()
|
|
while (True):
|
|
stat = self.get_status()
|
|
if self.is_status_ready(stat):
|
|
break
|
|
if stat in ("Undefined", "Time-out", "Error"):
|
|
raise Exception ("Invalid PIX status:", stat)
|
|
timeout=(self.timeout is not None) and (time.time() - start > self.timeout )
|
|
if (time.time() - start > 50.0) or timeout:
|
|
if (smsFlag == 0):
|
|
msg = "Pixel status did not return to 'Ready' within " + str(int(time.time() - start)) + " seconds."
|
|
print msg
|
|
notify (msg)
|
|
sms_flag = 1
|
|
if timedout:
|
|
raise Exception (msg)
|
|
time.sleep (0.05)
|
|
|
|
if sms_flag == 1:
|
|
notify ("Pixel detector seems to have recovered from error.")
|
|
|
|
if file_timeout is not None and file_timeout>0:
|
|
start = time.time()
|
|
filename = self.get_image_filename()
|
|
while (True):
|
|
if os.path.isfile(filename):
|
|
break
|
|
if (time.time() - start) > file_timeout:
|
|
raise Exception("Timeout waiting for file: " + filename)
|
|
time.sleep (0.05)
|
|
|
|
def set_roi():
|
|
#TODO
|
|
pass
|
|
|
|
def set_auto_threshold(self):
|
|
#TODO
|
|
pass
|
|
|
|
def show(self):
|
|
print "\nThe current pixel detector settings are:"
|
|
print " The frame number: ", self.get_count_id()
|
|
print " The file path is:", self.get_path()
|
|
print " The file template is: ", self.get_image_format()
|
|
print " The last file name was: ", self.get_image_filename()
|
|
print " The next file name will be: ", self.get_next_image_filename()
|
|
print " Exposure time = %5d sec" % (self.get_expose(), )
|
|
print " Image dimensions: %d x %d pixels (total = %d)" % (self.PIX_XDIM, self.PIX_YDIM, self.PIX_XDIM*self.PIX_YDIM)
|
|
print " Threshold values: Thresh1 = %d, Thresh2 = %d, Thresh3 = %d, Thresh4 = %d" % (self.PIX_THRESH1,self.PIX_THRESH2,self.PIX_THRESH3,self.PIX_THRESH4)
|
|
|
|
print "\n\nThe current pixel detector status is:", self.get_status()
|
|
|
|
def doUpdate(self):
|
|
self.get_status()
|
|
self.get_count_id()
|
|
|
|
def read(self):
|
|
#Readable interface: current
|
|
return self.get_count_id()
|
|
|
|
|
|
add_device( Pixel("pixel", "X04SA-ES3-CCD"), True)
|
|
pixel.polling = 1000
|
|
pixel.update()
|