Files
x11ma/script/test/Simu2Img.py

267 lines
8.4 KiB
Python

import ch.psi.pshell.imaging.Data as Data
#If running from editor
if get_exec_pars().source == CommandSource.ui:
METHOD = "Two_Pol"
SWITCHING = "Tune_Detune"
MEASUREMENTS = 4
AUTO_SAVE = True
EXPOSURE_1 = 1.0
AVERAGE_1 = 2.0
ENERGY_1 = 850.0
#POLARIZATION_1 = "Circ_Plus"
EXPOSURE_2 = 1.0
AVERAGE_2 = 2.0
ENERGY_2= 900.0
#POLARIZATION_2 = "Circ_Minus"
DRY_RUN = True
SHOW_IMAGES = True
SAVE_DIAGS = Fals
ID = get_setting("ID")
EXPOSURE = [EXPOSURE_1, EXPOSURE_2]
AVERAGE = [int(AVERAGE_1), int(AVERAGE_2)]
ENERGY = [ENERGY_1, ENERGY_2]
#POLARIZATION = [POLARIZATION_1, POLARIZATION_2]
if METHOD == "Two_Energies":
for en in [ENERGY_1, ENERGY_2]:
if en < 89.9 or en > 2000:
raise Exception("Bad energy argument")
start = time.time()
threads = []
path = "{data}/{year}_{month}/{date}" + "/" + "{seq}%03d_" + METHOD
set_exec_pars(path= path, format="txt", layout="table", open=True)
data_path = Setup.expandPath(path)
#Context.incrementFileSequentialNumber()
def get_image_file_name(cycle=-1, frame_index=0, short_name=False):
root = "" if short_name else (data_path + "/")
if cycle < 0:
ret = root + "i"+"{seq}%03d" + "_" + ("%d" % frame_index) + ".tif"
else:
ret= root + ("%d/" % frame_index) + "i"+"{seq}%03d" + "_" + str(cycle) + ("_%d" % frame_index) + ".tif"
ret = Setup.expandPath(ret)
return ret
"""
def getLEEM():
global startvoltage,objective, LEEMtemp
startvoltage=getStartvoltage()
objective=getObjective()
LEEMtemp=getLEEMtemp()
"""
set_exec_pars(path= path, format="txt", layout="table", open=True)
data_path = Setup.expandPath(path)
def assert_status_ok():
if DRY_RUN: return
assert_machine_ok()
#assert_beamline_ok()
"""
getbeamline
getLEEM
Rem check status of machine
If checkRing = 3 Then
Close
Exit Sub
End If
Rem check status of the beamline
If checkbeamline = 3 Then
Close
Exit Sub
"""
#Initialize vartiables
if not DRY_RUN and str(get_setting("AUTO_SWITCH_VALVE")).lower() == "true":
open_vg10()
if not get_dry_run() and str(get_setting("AUTO_SWITCH_SHUTTER")).lower() == "true":
caput(FAST_SHTR, 'Open')
active_id = 1
current_pol=None
pol_id1=POL_IDS[get_setting("POL_ID_1")]
pol_id2=POL_IDS[get_setting("POL_ID_2")]
#rbkEnergy=energy_rbk.read()
if METHOD == "Two_Pol":
if ID == "ID1":
current_pol = pol_id1
elif ID == "ID2":
current_pol = pol_id2
elif ID == "ID1_ID2":
current_pol=pol_id1
if SWITCHING == "Tune_Detune":
tune_detune(1) #Tune ID1, Detune ID2
else:
if ID == "ID1_ID2":
put_id_pol(2, current_pol) #Force both IDs to same polarization
wait_channel("X11PHS:alldone", 1)
def imageinfo(info):
pass
#Beamline setting
def nextpol():
global current_pol, pol_id1, pol_id2, active_id
if SWITCHING == "Normal":
if current_pol==1: return 2 #circ+ -> circ-
if current_pol==2: return 1 #circ+ -> circ-
if current_pol==3: return 4 #lin hor -> lin vert
if current_pol==4: return 3 #lin vert -> lin hor
if current_pol==5: return 5 #lin rot -> lin rot
elif SWITCHING == "Tune_Detune":
if active_id==1: return pol_id2
if active_id==2: return pol_id1
def switch_pol():
global active_id
if DRY_RUN:
return
newpol=nextpol()
if ID == "ID1":
put_id_pol(1,newpol)
elif ID == "ID2":
put_id_pol(2,newpol)
elif ID == "ID1_ID2":
if SWITCHING == "Normal":
put_id_pol(1, newpol)
put_id_pol(2, newpol)
elif SWITCHING == "Tune_Detune":
if active_id==1:
active_id=2
else:
active_id=1
tune_detune(active_id)
time.sleep(1.0)
wait_channel("X11PHS:alldone", 1)
def change_energy(v):
if v<91 or v>2500:
raise Exception ("Invalid energy: " + str(v))
if DRY_RUN:
return
put_energy(v)
def save_image_file(frame, cycle=-1, frame_index=0):
if SHOW_IMAGES:
print 3
plot(frame.matrix if (type(frame) == Data) else get_ip_array(frame), name = get_image_file_name(cycle, frame_index, True))
if AUTO_SAVE and (frame is not None):
print 4
filename = get_image_file_name(cycle, frame_index)
print 5
threads.append(save_as_tiff(frame, filename, check=False, parallel=True, metadata=(get_diags() if SAVE_DIAGS else {})))
print 6
log(filename)
print filename
# log("SV:"+Format(startvoltage,"0.000")+" OB:"+Format(objective,"0.00")+" ST:"+Format(LEEMtemp,"0.0"))
print 7
#eiger.stop()
#eiger.grabMode=eiger.GrabMode.Single
#time.sleep(0.3)
try:
#Do the measurement loop
for cycle in range(1, MEASUREMENTS + 1):
frames = []
if MEASUREMENTS > 1:
log("")
log("nround = " + str(cycle) + " / " + str(MEASUREMENTS))
if METHOD == "Two_Energies":
change_energy(ENERGY_1)
for i in range(2):
if (cycle == 1) or (METHOD != "Take_Image"):
assert_status_ok()
#set_exposure_time(EXPOSURE[i])
print "--- Grabing " + str(AVERAGE[i])+ " frames - cycle: " + str(cycle) + " step: " + str(i)
#ret = grab_frames(image, AVERAGE[i], roi=None, wait_next=True)
ret = []
for j in range( AVERAGE[i]):
ret.append(Data(Arr.indexesDouble(600),30,20))
frames.append(average_frames(ret) if AVERAGE[i] > 1 else ret[0])
imageinfo("I")
print 1
save_image_file(frames[i], cycle, i)
print 2
if METHOD == "Take_Image":
break
if METHOD == "Two_Pol":
print "--- Switching polatization..."
switch_pol()
if METHOD == "Two_Energies":
print "--- Switching energy..."
change_energy(ENERGY_2)
#time.sleep(1)
if (METHOD == "Two_Pol") or (METHOD == "Two_Energies"):
if (METHOD == "Two_Energies") or ((current_pol == 2) or (current_pol==3)):
###??? autocontrast???
frames.append(frames[0].copy())
frames[2].div(frames[1])
else:
frames.append(frames[1].copy())
frames[2].div(frames[0])
save_image_file(frames[2],cycle, 2)
# objective = getObjective()
# temp = getLEEMtemp()
# log(0 , "Temp : " + Format(temp,"0.00") + " OB : " + Format(objective,"0.00") + " StigmaA : " + " StigmaB : " )
# Auto average and save
print "Waiting file writing threads to finish..."
for t in threads:
join(t)
if (MEASUREMENTS > 1) and (AUTO_SAVE == 1):
print "--- Averaging..."
for i in range(len(frames)):
"""
measures = []
for cycle in range(1, MEASUREMENTS + 1):
filename = get_image_file_name(cycle, i)
ip = open_image(filename)
#measures.append(Data(get_ip_array(ip)))
measures.append(ip)
#av = average_frames(measures) #Result is transposed???
av=average_ips (measures, roi=None, as_float=True)
"""
integration = None
for cycle in range(1, MEASUREMENTS + 1):
filename = get_image_file_name(cycle, i)
print "Open " , filename
ip = open_image(filename)
if integration is None:
integration = ip
else:
integration =integrate_ips ([integration, ip], as_float=True)
av=op_const(integration, "divide", float(MEASUREMENTS), in_place=True)
save_image_file(av, -1, i)
finally:
if not DRY_RUN and str(get_setting("AUTO_SWITCH_VALVE")).lower() == "true":
close_vg10()
if not get_dry_run() and str(get_setting("AUTO_SWITCH_SHUTTER")).lower() == "true":
caput(FAST_SHTR, 'Close')
#restore_eiger()
print "Running time: " + str(time.time() - start)