Files
x03da/script/local_femto.py
gac-x03da da38c92c97 Closedown
2022-02-02 16:54:53 +01:00

462 lines
17 KiB
Python

import random
import ch.psi.pshell.device.Readable.ReadableArray as ReadableArray
import ch.psi.pshell.device.Readable.ReadableCalibratedArray as ReadableCalibratedArray
import ch.psi.pshell.device.ArrayCalibration as ArrayCalibration
import ch.psi.utils.Str
from mathutils import estimate_peak_indexes, fit_gaussians, create_fit_point_list, Gaussian
import java.awt.Color as Color
Scienta = get_device("Scienta") #Scienta class is imported in startup.py, shadowing Scienta device name
#Synchrronized Scienta counts
for stat in Scienta.stats:
add_device(stat, True)
beam_ok = True
class SimulatedOutput(Writable):
def write(self, value):
pass
class SimulatedInput(Readable):
def __init__(self):
self.x = 0.0
def read(self):
self.x = self.x + 0.2
noise = (random.random() - 0.5) / 20.0
return math.sin(self.x) + noise
sout = SimulatedOutput()
sinp = SimulatedInput()
def integrate_image(vertical = True):
data = Scienta.dataArray.read()
#Integrate and plot
(width,height) = Scienta.getImageSize().tolist()
integration = []
if vertical:
for i in range(width):
p=0.0
for j in range(height):
p=p+data[j*width+i]
integration.append(p)
else:
for j in range(height):
p=0.0
for i in range(width):
p=p+data[j*width+i]
integration.append(p)
return integration
class ImageEnergyDistribution(ReadableCalibratedArray):
def getSize(self):
(width,height) = Scienta.getImageSize().tolist()
return width
def read(self):
return to_array(integrate_image(),'d')
def getCalibration(self):
c=Scienta.readImageDescriptor().calibration
if c is None:
return None
return ArrayCalibration(c.scaleX, c.offsetX)
EnergyDistribution = ImageEnergyDistribution()
class ImageAngleDistribution(ReadableCalibratedArray):
def getSize(self):
(width,height) = Scienta.getImageSize().tolist()
return height
def read(self):
return to_array(integrate_image(False),'d')
def getCalibration(self):
c=Scienta.readImageDescriptor().calibration
if c is None:
return None
return ArrayCalibration(c.scaleY, c.offsetY)
AngleDistribution = ImageAngleDistribution()
class ImageCounts(Readable):
def read(self):
data = Scienta.dataArray.read()
counts = sum(data)
return counts
Counts = ImageCounts()
def init_scienta():
"""
turn on the analyser and start a mock measurement so that we get the correct array size.
start a scienta acquisition and abort after 4 seconds.
"""
if Scienta.isSimulated():
time.sleep(0.1)
else:
image_id = Scienta.currentImageCount
Scienta.start()
Scienta.waitReady(4000)
Scienta.stop()
Scienta.waitNewImage(500, image_id)
def trig_scienta():
if Scienta.isSimulated():
time.sleep(0.1)
else:
image_id = Scienta.currentImageCount
Scienta.start()
Scienta.waitReady(-1)
Scienta.waitNewImage(3000, image_id)
def otf(mode="ENERGY", e1=None, e2=None, beta1=None, beta2=None, theta1=None, theta2=None, \
time=1.0, modulo=1, turn_off_beam=False):
"""
mode: "ENERGY" or "AMNGLE"
"""
run("otf", {
"MODE":mode, \
"E1":float(e1) if e1 is not None else None, \
"E2":float(e2) if e2 is not None else None, \
"BETA1":float(beta1) if beta1 is not None else None, \
"BETA2":float(beta2) if beta2 is not None else None, \
"THETA1":float(theta1) if theta1 is not None else None, \
"THETA2":float(theta2) if theta2 is not None else None, \
"TIME":float(time), \
"MODULO":int(modulo), \
"ENDSCAN":turn_off_beam, \
})
diag_channels = []
#Manipulator Settings
diag_channels.append(ManipulatorX.readback)
diag_channels.append(ManipulatorY.readback)
diag_channels.append(ManipulatorZ.readback)
diag_channels.append(ManipulatorTheta.readback)
diag_channels.append(ManipulatorTilt.readback)
diag_channels.append(ManipulatorPhi.readback)
# Beamline Settings
diag_channels.append(MachineBumpXOffset)
diag_channels.append(MachineBumpXAngle)
diag_channels.append(MachineBumpYOffset)
diag_channels.append(MachineBumpYAngle)
diag_channels.append(DynamicBumpYOffset)
diag_channels.append(DynamicBumpYAngle)
diag_channels.append(FrontendVCenter)
diag_channels.append(FrontendVSize)
diag_channels.append(FrontendHCenter)
diag_channels.append(FrontendHSize)
diag_channels.append(MonoVCenter)
diag_channels.append(MonoVSize)
diag_channels.append(MonoBladeDown)
diag_channels.append(MonoBladeUp)
diag_channels.append(MonoHCenter)
diag_channels.append(MonoHSize)
diag_channels.append(MonoApertureMode)
diag_channels.append(RefocusVCenter)
diag_channels.append(RefocusVSize)
diag_channels.append(RefocusHCenter)
diag_channels.append(RefocusHSize)
diag_channels.append(FocusYTrans)
diag_channels.append(FocusZTrans)
diag_channels.append(FocusXRot)
diag_channels.append(FocusYRot)
diag_channels.append(FocusZRot)
diag_channels.append(RefocusYTrans)
diag_channels.append(RefocusZTrans)
diag_channels.append(RefocusXRot)
diag_channels.append(RefocusYRot)
diag_channels.append(RefocusZRot)
diag_channels.append(MonoEnergy)
diag_channels.append(MonoCff)
diag_channels.append(MonoBeta)
diag_channels.append(MonoTheta)
diag_channels.append(MonoGrating)
diag_channels.append(ExitSlit)
# Auxiliary Measurements
diag_channels.append(MachineCurrent)
diag_channels.append(FocusWaterTemp)
diag_channels.append(SampleCurrent)
diag_channels.append(RefCurrent)
#diag_channels.append(AuxCurrent)
#diag_channels.append(AuxVoltage)
diag_channels.append(ChamberPressure)
diag_channels.append(BeamlinePressure)
diag_channels.append(ManipulatorTempA)
diag_channels.append(ManipulatorTempB)
if get_device("ManipulatorCoolFlow"):
diag_channels.append(ManipulatorCoolFlow)
if get_device("ManipulatorCoolFlowSet"):
diag_channels.append(ManipulatorCoolFlowSet)
diag_channels_no_Scienta = list(diag_channels)
# Scienta
diag_channels.append(Scienta.channelBegin) #diag_channels.append(ChannelDouble("ChannelBegin", "X03DA-SCIENTA:cam1:CHANNEL_BEGIN_RBV"))
diag_channels.append(Scienta.channelEnd) #diag_channels.append(ChannelDouble("ChannelEnd", "X03DA-SCIENTA:cam1:CHANNEL_END_RBV"))
diag_channels.append(Scienta.sliceBegin) # diag_channels.append(ChannelDouble("SliceBegin", "X03DA-SCIENTA:cam1:SLICE_BEGIN_RBV"))
diag_channels.append(Scienta.sliceEnd) #diag_channels.append(ChannelDouble("StepTime", "X03DA-SCIENTA:cam1:SLICE_END_RBV"))
diag_channels.append(Scienta.numSlices) # diag_channels.append(ChannelDouble("NumSlices", "X03DA-SCIENTA:cam1:SLICES_RBV"))
#diag_channels.append(Scienta.frames) # diag_channels.append(ChannelDouble("NumFrames", "X03DA-SCIENTA:cam1:FRAMES"))
diag_channels.append(Scienta.numChannels) #diag_channels.append(ChannelDouble("NumChannels", "X03DA-SCIENTA:cam1:NUM_CHANNELS_RBV"))
diag_channels.append(Scienta.lowEnergy) #diag_channels.append(ChannelDouble("LowEnergy", "X03DA-SCIENTA:cam1:LOW_ENERGY_RBV"))
diag_channels.append(Scienta.centerEnergy) #diag_channels.append(ChannelDouble("CenterEnergy", "X03DA-SCIENTA:cam1:CENTRE_ENERGY_RBV"))
diag_channels.append(Scienta.highEnergy) #diag_channels.append(ChannelDouble("HighEnergy", "X03DA-SCIENTA:cam1:HIGH_ENERGY_RBV"))
diag_channels.append(ScientaDwellTime)
diag_channels.append(AcquisitionMode) #diag_attrs.append(ChannelString("AcquisitionMode", "X03DA-SCIENTA:cam1:ACQ_MODE_RBV"))
diag_channels.append(EnergyMode) #diag_attrs.append(ChannelString("EnergyMode", "X03DA-SCIENTA:cam1:ENERGY_MODE_RBV"))
diag_channels.append(LensMode) #diag_attrs.append(ChannelString("LensMode", "X03DA-SCIENTA:cam1:LENS_MODE_RBV"))
diag_channels.append(DetectorMode) #diag_attrs.append(ChannelString("DetectorMode", "X03DA-SCIENTA:cam1:DETECTOR_MODE_RBV"))
diag_channels.append(PassEnergy) #diag_attrs.append(ChannelString("PassEnergy", "X03DA-SCIENTA:cam1:PASS_ENERGY_RBV"))
diag_channels.append(ElementSet) #diag_attrs.append(ChannelString("ElementSet", "X03DA-SCIENTA:cam1:ELEMENT_SET_RBV"))
diag_channels.append(ExcitationEnergy) #diag_channels.append(ChannelDouble("ExcitationEnergy", "X03DA-SCIENTA:cam1:EXCITATION_ENERGY_RBV"))
diag_channels.append(StepSize) #diag_channels.append(ChannelDouble("StepSize", "X03DA-SCIENTA:cam1:STEP_SIZE_RBV"))
diag_channels.append(NumIterations) #diag_channels.append(ChannelDouble("NumIterations", "X03DA-SCIENTA:cam1:NumExposures_RBV"))
diag_channels.append(AnalyserSlit) #diag_attrs.append(ChannelString("ElemeAnalyserSlitntSet", "X03DA-SCIENTA:cam1:ANALYSER_SLIT_RBV"))
snap_channels = []
snap_channels.append(SampleCurrentGain)
snap_channels.append(RefCurrentGain)
#snap_channels.append(AuxCurrentGain)
snap_channels.append(SampleCurrentAveraging)
snap_channels.append(RefCurrentAveraging)
#snap_channels.append(AuxCurrentAveraging)
#snap_channels.append(AuxVoltageAveraging)
snap_channels.append(SampleCurrentSampling)
snap_channels.append(RefCurrentSampling)
#snap_channels.append(AuxCurrentSampling)
#snap_channels.append(AuxVoltageSampling)
diag_channels = sorted(diag_channels, key=lambda channel: channel.name)
snap_channels = sorted(snap_channels, key=lambda channel: channel.name)
def get_diag_name(diag):
return ch.psi.utils.Str.toTitleCase(diag.getName()).replace(" ", "").replace("Readback", "")
def print_diag():
for f in diag_channels:
print "%-25s %s" % (get_diag_name(f) , str(f.read()))
def create_diag_datasets(parent = None):
if parent is None:
parent = get_exec_pars().group
group = parent + "attrs/"
for f in diag_channels:
create_dataset(group+get_diag_name(f) , 's' if (type(f) is ch.psi.pshell.epics.ChannelString) else 'd')
def append_diag_datasets(parent = None):
if parent is None:
parent = get_exec_pars().group
group = parent + "attrs/"
for f in diag_channels:
try:
x = f.read()
if x is None:
x = '' if (type(f) is ch.psi.pshell.epics.ChannelString) else float('nan')
append_dataset(group+get_diag_name(f), x)
except:
log("Error sampling " + str(get_diag_name(f)) + ": " + str(sys.exc_info()[1]))
def create_metadata_datasets(parent = None):
if parent is None:
parent = "/"
group = parent + "general/"
for name in ["proposer", "proposal", "pgroup", "sample"]:
setting = get_setting(name)
save_dataset(group+name, setting if setting is not None else "", 's')
setting = get_setting("authors")
save_dataset(group+"authors", setting.split("|") if setting is not None else [""], '[s')
def wait_beam():
if not beam_ok:
print "Waiting for beam..."
while not beam_ok:
time.sleep(0.1)
print "Beam ok"
def before_readout():
sample_scienta = False
for dev in ["Scienta.spectrum","EnergyDistribution", "AngleDistribution", "Scienta.dataMatrix", "Counts"]:
if dev in SENSORS:
sample_scienta = True
break
for dev in [Scienta.spectrum,EnergyDistribution, AngleDistribution, Scienta.dataMatrix, Counts]:
if dev in SENSORS:
sample_scienta = True
break
wait_beam()
if sample_scienta:
trig_scienta()
else:
time.sleep(SampleCurrentAveraging.read() / 10.0)
def after_readout(rec, scan):
if beam_ok:
if get_exec_pars().save:
if rec.index == 0:
if scan.index == 1:
create_metadata_datasets()
create_diag_datasets()
append_diag_datasets()
else:
rec.invalidate()
def after_scan():
"""
Close shutter and turn off analyser
"""
caput("X03DA-PC:AFTER-SCAN.PROC", 1)
caput("X03DA-OP-VG7:WT_SET", 0)
#caput("X03DA-FE-AB1:CLOSE4BL", 0)
def set_adc_averaging(dwelltime=0.0):
if dwelltime < 0.001:
dwelltime = Scienta.getStepTime().read()
dwelltime = min(dwelltime, 20.0)
dwelltime = max(dwelltime, 0.1)
fixed = AcquisitionMode.read() == "Fixed"
else:
fixed = True
#value = int(dwelltime * 10.0) #averaging count in 100ms
#SampleCurrentAveraging.write(value)
#RefCurrentAveraging.write(value)
#AuxCurrentAveraging.write(value)
#AuxVoltageAveraging.write(value)
def adjust_sensors():
#Updating ranges from Scienta
Scienta.update()
global SENSORS
if SENSORS is not None:
# Move integration to end
#sample_scienta = False
for dev in ["Scienta.spectrum","EnergyDistribution", "AngleDistribution", "Scienta.dataMatrix"]:
if dev in SENSORS:
#sample_scienta = True
SENSORS=SENSORS+[SENSORS.pop(SENSORS.index(dev))]
for dev in ["Counts"]:
if dev in SENSORS:
#sample_scienta = True
SENSORS=[SENSORS.pop(SENSORS.index(dev))] + SENSORS
if "Scienta.dataMatrix" in SENSORS or Scienta.dataMatrix in SENSORS:
print "Not ACC"
set_exec_pars(accumulate = False)
#if sample_scienta:
# init_scienta()
#Device aliases for data files
set_device_alias(Scienta.dataMatrix, "ScientaImage")
set_device_alias(Scienta.spectrum, "ScientaSpectrum")
set_device_alias(Scienta.channelBegin, get_diag_name(Scienta.channelBegin))
set_device_alias(Scienta.channelEnd, get_diag_name(Scienta.channelEnd))
set_device_alias(Scienta.sliceBegin, get_diag_name(Scienta.sliceBegin))
set_device_alias(Scienta.sliceEnd, get_diag_name(Scienta.sliceEnd))
set_device_alias(Scienta.numChannels, get_diag_name(Scienta.numChannels))
set_device_alias(Scienta.numSlices, get_diag_name(Scienta.numSlices))
set_device_alias(Scienta.lowEnergy, get_diag_name(Scienta.lowEnergy))
set_device_alias(Scienta.centerEnergy, get_diag_name(Scienta.centerEnergy))
set_device_alias(Scienta.highEnergy, get_diag_name(Scienta.highEnergy))
set_device_alias(ManipulatorX.readback, get_diag_name(ManipulatorX.readback))
set_device_alias(ManipulatorY.readback, get_diag_name(ManipulatorY.readback))
set_device_alias(ManipulatorZ.readback, get_diag_name(ManipulatorZ.readback))
set_device_alias(ManipulatorTheta.readback, get_diag_name(ManipulatorTheta.readback))
set_device_alias(ManipulatorTilt.readback, get_diag_name(ManipulatorTilt.readback))
set_device_alias(ManipulatorPhi.readback, get_diag_name(ManipulatorPhi.readback))
#Additional device configuration
ManipulatorPhi.trustedWrite = False
def fit(ydata, xdata = None):
"""
"""
if xdata is None:
xdata = frange(0, len(ydata), 1)
max_y= max(ydata)
index_max = ydata.index(max_y)
max_x= xdata[index_max]
print "Max index:" + str(index_max),
print " x:" + str(max_x),
print " y:" + str(max_y)
gaussians = fit_gaussians(ydata, xdata, [index_max,])
(norm, mean, sigma) = gaussians[0]
p = plot([ydata],["data"],[xdata], title="Fit" )[0]
fitted_gaussian_function = Gaussian(norm, mean, sigma)
scale_x = [float(min(xdata)), float(max(xdata)) ]
points = max((len(xdata)+1), 100)
resolution = (scale_x[1]-scale_x[0]) / points
fit_y = []
fit_x = frange(scale_x[0],scale_x[1],resolution, True)
for x in fit_x:
fit_y.append(fitted_gaussian_function.value(x))
p.addSeries(LinePlotSeries("fit"))
p.getSeries(1).setData(fit_x, fit_y)
if abs(mean - xdata[index_max]) < ((scale_x[0] + scale_x[1])/2):
print "Mean -> " + str(mean)
p.addMarker(mean, None, "Mean="+str(round(norm,2)), Color.MAGENTA.darker())
return (norm, mean, sigma)
else:
p.addMarker(max_x, None, "Max="+str(round(max_x,2)), Color.GRAY)
print "Invalid gaussian fit: " + str(mean)
return (None, None, None)
def elog(title, message, attachments = [], author = None, category = "Info", domain = "", logbook = "Experiments", encoding=1):
"""
Add entry to ELOG.
"""
if author is None:
author = "pshell" #get_context().getUser().name
typ = "pshell"
entry = ""
cmd = 'G_CS_ELOG_add -l "' + logbook+ '" '
cmd = cmd + '-a "Author=' + author + '" '
cmd = cmd + '-a "Type=' + typ + '" '
cmd = cmd + '-a "Entry=' + entry + '" '
cmd = cmd + '-a "Title=' + title + '" '
cmd = cmd + '-a "Category=' + category + '" '
cmd = cmd + '-a "Domain=' + domain + '" '
for attachment in attachments:
cmd = cmd + '-f "' + attachment + '" '
cmd = cmd + '-n ' + str(encoding)
cmd = cmd + ' "' + message + '"'
#print cmd
#os.system (cmd)
#print os.popen(cmd).read()
import subprocess
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
(out, err) = proc.communicate()
if (err is not None) and err!="":
raise Exception(err)
print out
def get_plot_snapshots(title = None, file_type = "jpg", temp_path = get_context().setup.getContextPath()):
"""
Returns list with file names of plots snapshots from a plotting context.
"""
sleep(0.02) #Give some time to plot to be finished - it is not sync with acquisition
ret = []
for p in get_plots(title):
file_name = os.path.abspath(temp_path + "/" + p.getTitle() + "." + file_type)
p.saveSnapshot(file_name , file_type)
ret.append(file_name)
return ret