Files
x03da/script/local.py
gac-x03da b2373fbcf4 Closedown
2019-05-01 10:52:28 +02:00

456 lines
17 KiB
Python

import random
import ch.psi.pshell.device.Readable.ReadableArray as ReadableArray
import ch.psi.pshell.device.Readable.ReadableCalibratedArray as ReadableCalibratedArray
import ch.psi.pshell.device.ArrayCalibration as ArrayCalibration
import ch.psi.utils.Str
from mathutils import estimate_peak_indexes, fit_gaussians, create_fit_point_list, Gaussian
import java.awt.Color as Color
#Synchrronized Scienta counts
for stat in Scienta.stats:
add_device(stat, True)
beam_ok = True
class SimulatedOutput(Writable):
def write(self, value):
pass
class SimulatedInput(Readable):
def __init__(self):
self.x = 0.0
def read(self):
self.x = self.x + 0.2
noise = (random.random() - 0.5) / 20.0
return math.sin(self.x) + noise
sout = SimulatedOutput()
sinp = SimulatedInput()
def integrate_image(vertical = True):
data = Scienta.dataArray.read()
#Integrate and plot
(width,height) = Scienta.getImageSize().tolist()
integration = []
if vertical:
for i in range(width):
p=0.0
for j in range(height):
p=p+data[j*width+i]
integration.append(p)
else:
for j in range(height):
p=0.0
for i in range(width):
p=p+data[j*width+i]
integration.append(p)
return integration
class ImageEnergyDistribution(ReadableCalibratedArray):
def getSize(self):
(width,height) = Scienta.getImageSize().tolist()
return width
def read(self):
return to_array(integrate_image(),'d')
def getCalibration(self):
c=Scienta.readImageDescriptor().calibration
if c is None:
return None
return ArrayCalibration(c.scaleX, c.offsetX)
EnergyDistribution = ImageEnergyDistribution()
class ImageAngleDistribution(ReadableCalibratedArray):
def getSize(self):
(width,height) = Scienta.getImageSize().tolist()
return height
def read(self):
return to_array(integrate_image(False),'d')
def getCalibration(self):
c=Scienta.readImageDescriptor().calibration
if c is None:
return None
return ArrayCalibration(c.scaleY, c.offsetY)
AngleDistribution = ImageAngleDistribution()
def init_scienta():
"""
turn on the analyser and start a mock measurement so that we get the correct array size.
start a scienta acquisition and abort after 4 seconds.
"""
if Scienta.isSimulated():
time.sleep(0.1)
else:
image_id = Scienta.currentImageCount
Scienta.start()
Scienta.waitReady(4000)
Scienta.stop()
Scienta.waitNewImage(500, image_id)
def trig_scienta():
if Scienta.isSimulated():
time.sleep(0.1)
else:
image_id = Scienta.currentImageCount
Scienta.start()
Scienta.waitReady(-1)
Scienta.waitNewImage(3000, image_id)
from keithley import KeiSample, KeiReference
def prepare_keithleys(dwell, triggered):
"""
prepare keithleys.
at the moment, the dwell time has to be set manually by selecting one of the poll modes
slow = 100 ms, medium = 20 ms, fast = 2 ms.
"""
KeiSample.prepare(dwell, triggered)
KeiReference.prepare(dwell, triggered)
def trig_keithleys():
"""
trigger keithleys, do not wait.
after this, you have to wait for at least the dwell time before reading the value!
"""
KeiSample.trig()
KeiReference.trig()
def wait_keithleys():
"""
wait for one dwell time so that the keithleys can finish their measurement.
"""
time.sleep(KeiSample.dwell * 2.2)
def fetch_keithleys():
"""
read the keithley readings into EPICS.
this requires that at least the dwell time has passed since the last trigger.
the value can then be read from the SampleCurrent and ReferenceCurrent devices.
"""
KeiSample.fetch()
KeiReference.fetch()
def release_keithleys():
"""
switch keithleys to free run.
0.1 s polling and dwell time
"""
KeiSample.release()
KeiReference.release()
diag_channels = []
diag_channels.append(Scienta.channelBegin) #diag_channels.append(ChannelDouble("ChannelBegin", "X03DA-SCIENTA:cam1:CHANNEL_BEGIN_RBV"))
diag_channels.append(Scienta.channelEnd) #diag_channels.append(ChannelDouble("ChannelEnd", "X03DA-SCIENTA:cam1:CHANNEL_END_RBV"))
diag_channels.append(Scienta.sliceBegin) # diag_channels.append(ChannelDouble("SliceBegin", "X03DA-SCIENTA:cam1:SLICE_BEGIN_RBV"))
diag_channels.append(Scienta.sliceEnd) #diag_channels.append(ChannelDouble("StepTime", "X03DA-SCIENTA:cam1:SLICE_END_RBV"))
diag_channels.append(Scienta.numSlices) # diag_channels.append(ChannelDouble("NumSlices", "X03DA-SCIENTA:cam1:SLICES_RBV"))
#diag_channels.append(Scienta.frames) # diag_channels.append(ChannelDouble("NumFrames", "X03DA-SCIENTA:cam1:FRAMES"))
diag_channels.append(Scienta.numChannels) #diag_channels.append(ChannelDouble("NumChannels", "X03DA-SCIENTA:cam1:NUM_CHANNELS_RBV"))
diag_channels.append(Scienta.lowEnergy) #diag_channels.append(ChannelDouble("LowEnergy", "X03DA-SCIENTA:cam1:LOW_ENERGY_RBV"))
diag_channels.append(Scienta.centerEnergy) #diag_channels.append(ChannelDouble("CenterEnergy", "X03DA-SCIENTA:cam1:CENTRE_ENERGY_RBV"))
diag_channels.append(Scienta.highEnergy) #diag_channels.append(ChannelDouble("HighEnergy", "X03DA-SCIENTA:cam1:HIGH_ENERGY_RBV"))
#TODO: These are not of Scienta device interface. Should be included?
#diag_channels.append(ChannelDouble("AcquisitionModeNum", "X03DA-SCIENTA:cam1:ACQ_MODE_RBV"))
#diag_channels.append(ChannelDouble("EnergyModeNum", "X03DA-SCIENTA:cam1:ENERGY_MODE_RBV"))
#diag_channels.append(ChannelDouble("LensModeNum", "X03DA-SCIENTA:cam1:LENS_MODE_RBV"))
#diag_channels.append(ChannelDouble("DetectorModeNum", "X03DA-SCIENTA:cam1:DETECTOR_MODE_RBV"))
#diag_channels.append(ChannelDouble("PassEnergyNum", "X03DA-SCIENTA:cam1:PASS_ENERGY_RBV"))
#diag_channels.append(ChannelDouble("ElementSetNum", "X03DA-SCIENTA:cam1:ELEMENT_SET_RBV"))
diag_channels.append(ScientaDwellTime)
diag_channels.append(AcquisitionMode) #diag_attrs.append(ChannelString("AcquisitionMode", "X03DA-SCIENTA:cam1:ACQ_MODE_RBV"))
diag_channels.append(EnergyMode) #diag_attrs.append(ChannelString("EnergyMode", "X03DA-SCIENTA:cam1:ENERGY_MODE_RBV"))
diag_channels.append(LensMode) #diag_attrs.append(ChannelString("LensMode", "X03DA-SCIENTA:cam1:LENS_MODE_RBV"))
diag_channels.append(DetectorMode) #diag_attrs.append(ChannelString("DetectorMode", "X03DA-SCIENTA:cam1:DETECTOR_MODE_RBV"))
diag_channels.append(PassEnergy) #diag_attrs.append(ChannelString("PassEnergy", "X03DA-SCIENTA:cam1:PASS_ENERGY_RBV"))
diag_channels.append(ElementSet) #diag_attrs.append(ChannelString("ElementSet", "X03DA-SCIENTA:cam1:ELEMENT_SET_RBV"))
diag_channels.append(ExcitationEnergy) #diag_channels.append(ChannelDouble("ExcitationEnergy", "X03DA-SCIENTA:cam1:EXCITATION_ENERGY_RBV"))
diag_channels.append(StepSize) #diag_channels.append(ChannelDouble("StepSize", "X03DA-SCIENTA:cam1:STEP_SIZE_RBV"))
diag_channels.append(NumIterations) #diag_channels.append(ChannelDouble("NumIterations", "X03DA-SCIENTA:cam1:NumExposures_RBV"))
diag_channels.append(AnalyserSlit) #diag_attrs.append(ChannelString("ElemeAnalyserSlitntSet", "X03DA-SCIENTA:cam1:ANALYSER_SLIT_RBV"))
#Manipulator Settings
diag_channels.append(ManipulatorX.readback)
diag_channels.append(ManipulatorY.readback)
diag_channels.append(ManipulatorZ.readback)
diag_channels.append(ManipulatorTheta.readback)
diag_channels.append(ManipulatorTilt.readback)
diag_channels.append(ManipulatorPhi.readback)
# Beamline Settings
diag_channels.append(MachineBumpXOffset)
diag_channels.append(MachineBumpXAngle)
diag_channels.append(MachineBumpYOffset)
diag_channels.append(MachineBumpYAngle)
diag_channels.append(DynamicBumpYOffset)
diag_channels.append(DynamicBumpYAngle)
diag_channels.append(FrontendVCenter)
diag_channels.append(FrontendVSize)
diag_channels.append(FrontendHCenter)
diag_channels.append(FrontendHSize)
diag_channels.append(MonoVCenter)
diag_channels.append(MonoVSize)
diag_channels.append(MonoBladeDown)
diag_channels.append(MonoBladeUp)
diag_channels.append(MonoHCenter)
diag_channels.append(MonoHSize)
diag_channels.append(MonoApertureMode)
diag_channels.append(RefocusVCenter)
diag_channels.append(RefocusVSize)
diag_channels.append(RefocusHCenter)
diag_channels.append(RefocusHSize)
diag_channels.append(FocusYTrans)
diag_channels.append(FocusZTrans)
diag_channels.append(FocusXRot)
diag_channels.append(FocusYRot)
diag_channels.append(FocusZRot)
diag_channels.append(RefocusYTrans)
diag_channels.append(RefocusZTrans)
diag_channels.append(RefocusXRot)
diag_channels.append(RefocusYRot)
diag_channels.append(RefocusZRot)
diag_channels.append(MonoEnergy)
diag_channels.append(MonoCff)
diag_channels.append(MonoBeta)
diag_channels.append(MonoTheta)
diag_channels.append(ExitSlit)
# Auxiliary Measurements
diag_channels.append(MachineCurrent)
diag_channels.append(FocusWaterTemp)
diag_channels.append(SampleCurrent)
diag_channels.append(RefCurrent)
#diag_channels.append(AuxCurrent)
#diag_channels.append(AuxVoltage)
diag_channels.append(SampleCurrentGain)
diag_channels.append(RefCurrentGain)
#diag_channels.append(AuxCurrentGain)
#diag_channels.append(SampleCurrentAveraging)
#diag_channels.append(RefCurrentAveraging)
#diag_channels.append(AuxCurrentAveraging)
#diag_channels.append(AuxVoltageAveraging)
#diag_channels.append(SampleCurrentSampling)
#diag_channels.append(RefCurrentSampling)
#diag_channels.append(AuxCurrentSampling)
#diag_channels.append(AuxVoltageSampling)
diag_channels.append(ChamberPressure)
diag_channels.append(BeamlinePressure)
diag_channels.append(ManipulatorTempA)
diag_channels.append(ManipulatorTempB)
diag_channels.append(ManipulatorCoolFlow)
diag_channels.append(ManipulatorCoolFlowSet)
diag_channels.append(MonoGrating)
diag_channels = sorted(diag_channels, key=lambda channel: channel.name)
def get_diag_name(diag):
return ch.psi.utils.Str.toTitleCase(diag.getName()).replace(" ", "").replace("Readback", "")
def print_diag():
for f in diag_channels:
print "%-25s %s" % (get_diag_name(f) , str(f.read()))
def create_diag_datasets(parent = None):
if parent is None:
parent = get_exec_pars().group
group = parent + "attrs/"
for f in diag_channels:
create_dataset(group+get_diag_name(f) , 's' if (type(f) is ch.psi.pshell.epics.ChannelString) else 'd')
def append_diag_datasets(parent = None):
if parent is None:
parent = get_exec_pars().group
group = parent + "attrs/"
for f in diag_channels:
try:
x = f.read()
if x is None:
x = '' if (type(f) is ch.psi.pshell.epics.ChannelString) else float('nan')
append_dataset(group+get_diag_name(f), x)
except:
log("Error sampling " + str(get_diag_name(f)) + ": " + str(sys.exc_info()[1]))
def wait_beam():
if not beam_ok:
print "Waiting for beam..."
while not beam_ok:
time.sleep(0.1)
print "Beam ok"
def before_readout():
sample_scienta = False
for dev in ["Scienta.spectrum","EnergyDistribution", "AngleDistribution", "Scienta.dataMatrix"]:
if dev in SENSORS:
sample_scienta = True
break
for dev in [Scienta.spectrum,EnergyDistribution, AngleDistribution, Scienta.dataMatrix]:
if dev in SENSORS:
sample_scienta = True
break
wait_beam()
trig_keithleys()
if sample_scienta:
trig_scienta()
else:
wait_keithleys()
fetch_keithleys()
def after_readout(rec):
if beam_ok:
if get_exec_pars().save:
if rec.index == 0:
create_diag_datasets()
append_diag_datasets()
else:
rec.invalidate()
def after_scan():
"""
Close shutter and turn off analyser
"""
caput("X03DA-PC:AFTER-SCAN.PROC", 1)
caput("X03DA-OP-VG7:WT_SET", 0)
#caput("X03DA-FE-AB1:CLOSE4BL", 0)
#release_keithleys()
def set_adc_averaging(dwelltime=0.0):
if dwelltime == 0.0:
dwelltime = Scienta.getStepTime().read()
dwelltime = min(dwelltime, 20.0)
dwelltime = max(dwelltime, 0.1)
fixed = AcquisitionMode.read() == "Fixed"
else:
fixed = True
prepare_keithleys(dwelltime, fixed)
#value = Scienta.getStepTime().read() * 10.0 #averaging count in 100ms
#SampleCurrentAveraging.write(value)
#RefCurrentAveraging.write(value)
#AuxCurrentAveraging.write(value)
#AuxVoltageAveraging.write(value)
def adjust_sensors():
#Updating ranges from Scienta
Scienta.update()
global SENSORS
if SENSORS is not None:
# Move integration to end
#sample_scienta = False
for dev in ["Scienta.spectrum","EnergyDistribution", "AngleDistribution", "Scienta.dataMatrix"]:
if dev in SENSORS:
#sample_scienta = True
SENSORS=SENSORS+[SENSORS.pop(SENSORS.index(dev))]
for dev in ["Counts"]:
if dev in SENSORS:
#sample_scienta = True
SENSORS=[SENSORS.pop(SENSORS.index(dev))] + SENSORS
if "Scienta.dataMatrix" in SENSORS or Scienta.dataMatrix in SENSORS:
print "Not ACC"
set_exec_pars(accumulate = False)
#if sample_scienta:
# init_scienta()
#Device aliases for data files
set_device_alias(Scienta.dataMatrix, "ScientaImage")
set_device_alias(Scienta.spectrum, "ScientaSpectrum")
set_device_alias(Scienta.centerEnergy, get_diag_name(Scienta.centerEnergy))
set_device_alias(Scienta.lowEnergy, get_diag_name(Scienta.lowEnergy))
set_device_alias(Scienta.highEnergy, get_diag_name(Scienta.highEnergy))
#Additional device configuration
ManipulatorPhi.trustedWrite = False
def fit(ydata, xdata = None):
"""
"""
if xdata is None:
xdata = frange(0, len(ydata), 1)
max_y= max(ydata)
index_max = ydata.index(max_y)
max_x= xdata[index_max]
print "Max index:" + str(index_max),
print " x:" + str(max_x),
print " y:" + str(max_y)
gaussians = fit_gaussians(ydata, xdata, [index_max,])
(norm, mean, sigma) = gaussians[0]
p = plot([ydata],["data"],[xdata], title="Fit" )[0]
fitted_gaussian_function = Gaussian(norm, mean, sigma)
scale_x = [float(min(xdata)), float(max(xdata)) ]
points = max((len(xdata)+1), 100)
resolution = (scale_x[1]-scale_x[0]) / points
fit_y = []
fit_x = frange(scale_x[0],scale_x[1],resolution, True)
for x in fit_x:
fit_y.append(fitted_gaussian_function.value(x))
p.addSeries(LinePlotSeries("fit"))
p.getSeries(1).setData(fit_x, fit_y)
if abs(mean - xdata[index_max]) < ((scale_x[0] + scale_x[1])/2):
print "Mean -> " + str(mean)
p.addMarker(mean, None, "Mean="+str(round(norm,2)), Color.MAGENTA.darker())
return (norm, mean, sigma)
else:
p.addMarker(max_x, None, "Max="+str(round(max_x,2)), Color.GRAY)
print "Invalid gaussian fit: " + str(mean)
return (None, None, None)
def elog(title, message, attachments = [], author = None, category = "Info", domain = "", logbook = "Experiments", encoding=1):
"""
Add entry to ELOG.
"""
if author is None:
author = "pshell" #get_context().getUser().name
typ = "pshell"
entry = ""
cmd = 'G_CS_ELOG_add -l "' + logbook+ '" '
cmd = cmd + '-a "Author=' + author + '" '
cmd = cmd + '-a "Type=' + typ + '" '
cmd = cmd + '-a "Entry=' + entry + '" '
cmd = cmd + '-a "Title=' + title + '" '
cmd = cmd + '-a "Category=' + category + '" '
cmd = cmd + '-a "Domain=' + domain + '" '
for attachment in attachments:
cmd = cmd + '-f "' + attachment + '" '
cmd = cmd + '-n ' + str(encoding)
cmd = cmd + ' "' + message + '"'
#print cmd
#os.system (cmd)
#print os.popen(cmd).read()
import subprocess
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
(out, err) = proc.communicate()
if (err is not None) and err!="":
raise Exception(err)
print out
def get_plot_snapshots(title = None, file_type = "jpg", temp_path = get_context().setup.getContextPath()):
"""
Returns list with file names of plots snapshots from a plotting context.
"""
sleep(0.02) #Give some time to plot to be finished - it is not sync with acquisition
ret = []
for p in get_plots(title):
file_name = os.path.abspath(temp_path + "/" + p.getTitle() + "." + file_type)
p.saveSnapshot(file_name , file_type)
ret.append(file_name)
return ret