Files
x03da/script/local.py
gac-x03da 1b9f2a51fa Closedown
2025-07-16 14:42:30 +02:00

512 lines
18 KiB
Python

import random
import ch.psi.pshell.device.Readable.ReadableArray as ReadableArray
import ch.psi.pshell.device.Readable.ReadableCalibratedArray as ReadableCalibratedArray
import ch.psi.pshell.device.ArrayCalibration as ArrayCalibration
import ch.psi.utils.Str
from mathutils import estimate_peak_indexes, fit_gaussians, create_fit_point_list, Gaussian
import java.awt.Color as Color
Scienta = get_device("Scienta") #Scienta class is imported in startup.py, shadowing Scienta device name
#Synchrronized Scienta counts
for stat in Scienta.stats:
add_device(stat, True)
beam_ok = True
class SimulatedOutput(Writable):
def write(self, value):
pass
class SimulatedInput(Readable):
def __init__(self):
self.x = 0.0
def read(self):
self.x = self.x + 0.2
noise = (random.random() - 0.5) / 20.0
return math.sin(self.x) + noise
sout = SimulatedOutput()
sinp = SimulatedInput()
def integrate_image(vertical = True):
data = Scienta.dataArray.read()
#Integrate and plot
dims = Scienta.getImageSize().tolist()
if len(dims)==2:
(width,height) = dims
else:
(width,height, images) = dims
integration = []
if vertical:
for i in range(width):
p=0.0
for j in range(height):
p=p+data[j*width+i]
integration.append(p)
else:
for j in range(height):
p=0.0
for i in range(width):
p=p+data[j*width+i]
integration.append(p)
return integration
class ImageEnergyDistribution(ReadableCalibratedArray):
def getSize(self):
dims = Scienta.getImageSize().tolist()
if len(dims)==2:
(width,height) = dims
else:
(width,height, images) = dims
return width
def read(self):
return to_array(integrate_image(),'d')
def getCalibration(self):
c=Scienta.readImageDescriptor().calibration
if c is None:
return None
return ArrayCalibration(c.scaleX, c.offsetX)
EnergyDistribution = ImageEnergyDistribution()
class ImageAngleDistribution(ReadableCalibratedArray):
def getSize(self):
dims = Scienta.getImageSize().tolist()
if len(dims)==2:
(width,height) = dims
else:
(width,height, images) = dims
return height
def read(self):
return to_array(integrate_image(False),'d')
def getCalibration(self):
c=Scienta.readImageDescriptor().calibration
if c is None:
return None
return ArrayCalibration(c.scaleY, c.offsetY)
AngleDistribution = ImageAngleDistribution()
class ImageCounts(Readable):
def read(self):
data = Scienta.dataArray.read()
counts = sum(data)
return counts
Counts = ImageCounts()
def init_scienta():
"""
turn on the analyser and start a mock measurement so that we get the correct array size.
start a scienta acquisition and abort after 4 seconds.
"""
if Scienta.isSimulated():
time.sleep(0.1)
else:
image_id = Scienta.currentImageCount
Scienta.start()
Scienta.waitReady(4000)
Scienta.stop()
Scienta.waitNewImage(500, image_id)
def trig_scienta():
if Scienta.isSimulated():
time.sleep(0.1)
else:
image_id = Scienta.currentImageCount
Scienta.start()
Scienta.waitReady(-1)
Scienta.waitNewImage(3000, image_id)
from keithley import KeiSample, KeiReference
def prepare_keithleys(dwell, triggered):
"""
prepare keithleys.
at the moment, the dwell time has to be set manually by selecting one of the poll modes
slow = 100 ms, medium = 20 ms, fast = 2 ms.
"""
KeiSample.prepare(dwell, triggered)
KeiReference.prepare(dwell, triggered)
def trig_keithleys():
"""
trigger keithleys, do not wait.
after this, you have to wait for at least the dwell time before reading the value!
"""
KeiSample.trig()
KeiReference.trig()
def wait_keithleys():
"""
wait for one dwell time so that the keithleys can finish their measurement.
"""
time.sleep(KeiSample.dwell * 2.2)
def fetch_keithleys():
"""
read the keithley readings into EPICS.
this requires that at least the dwell time has passed since the last trigger.
the value can then be read from the SampleCurrent and ReferenceCurrent devices.
"""
KeiSample.fetch()
KeiReference.fetch()
def release_keithleys():
"""
switch keithleys to free run.
0.1 s polling and dwell time
"""
KeiSample.release()
KeiReference.release()
def otf(mode="ENERGY", e1=None, e2=None, beta1=None, beta2=None, theta1=None, theta2=None, \
time=1.0, modulo=1, turn_off_beam=False):
"""
mode: "ENERGY" or "AMNGLE"
"""
run("otf", {
"MODE":mode, \
"E1":float(e1) if e1 is not None else None, \
"E2":float(e2) if e2 is not None else None, \
"BETA1":float(beta1) if beta1 is not None else None, \
"BETA2":float(beta2) if beta2 is not None else None, \
"THETA1":float(theta1) if theta1 is not None else None, \
"THETA2":float(theta2) if theta2 is not None else None, \
"TIME":float(time), \
"MODULO":int(modulo), \
"ENDSCAN":turn_off_beam, \
})
diag_channels = []
#Manipulator Settings
diag_channels.append(ManipulatorX.readback)
diag_channels.append(ManipulatorY.readback)
diag_channels.append(ManipulatorZ.readback)
diag_channels.append(ManipulatorTheta.readback)
diag_channels.append(ManipulatorTilt.readback)
diag_channels.append(ManipulatorPhi.readback)
# Beamline Settings
# Auxiliary Measurements
diag_channels.append(SampleCurrent)
diag_channels.append(RefCurrent)
#diag_channels.append(AuxCurrent)
#diag_channels.append(AuxVoltage)
diag_channels.append(SampleCurrentGain)
diag_channels.append(RefCurrentGain)
#diag_channels.append(AuxCurrentGain)
#diag_channels.append(SampleCurrentAveraging)
#diag_channels.append(RefCurrentAveraging)
#diag_channels.append(AuxCurrentAveraging)
#diag_channels.append(AuxVoltageAveraging)
#diag_channels.append(SampleCurrentSampling)
#diag_channels.append(RefCurrentSampling)
#diag_channels.append(AuxCurrentSampling)
#diag_channels.append(AuxVoltageSampling)
diag_channels.append(ChamberPressure)
diag_channels.append(ManipulatorTempA)
diag_channels.append(ManipulatorTempB)
if get_device("ManipulatorCoolFlow"):
diag_channels.append(ManipulatorCoolFlow)
if get_device("ManipulatorCoolFlowSet"):
diag_channels.append(ManipulatorCoolFlowSet)
diag_channels_no_Scienta = list(diag_channels)
# Scienta
diag_channels.append(Scienta.channelBegin) #diag_channels.append(ChannelDouble("ChannelBegin", "X03DA-SCIENTA:cam1:CHANNEL_BEGIN_RBV"))
diag_channels.append(Scienta.channelEnd) #diag_channels.append(ChannelDouble("ChannelEnd", "X03DA-SCIENTA:cam1:CHANNEL_END_RBV"))
diag_channels.append(Scienta.sliceBegin) # diag_channels.append(ChannelDouble("SliceBegin", "X03DA-SCIENTA:cam1:SLICE_BEGIN_RBV"))
diag_channels.append(Scienta.sliceEnd) #diag_channels.append(ChannelDouble("StepTime", "X03DA-SCIENTA:cam1:SLICE_END_RBV"))
diag_channels.append(Scienta.numSlices) # diag_channels.append(ChannelDouble("NumSlices", "X03DA-SCIENTA:cam1:SLICES_RBV"))
#diag_channels.append(Scienta.frames) # diag_channels.append(ChannelDouble("NumFrames", "X03DA-SCIENTA:cam1:FRAMES"))
diag_channels.append(Scienta.numChannels) #diag_channels.append(ChannelDouble("NumChannels", "X03DA-SCIENTA:cam1:NUM_CHANNELS_RBV"))
diag_channels.append(Scienta.lowEnergy) #diag_channels.append(ChannelDouble("LowEnergy", "X03DA-SCIENTA:cam1:LOW_ENERGY_RBV"))
diag_channels.append(Scienta.centerEnergy) #diag_channels.append(ChannelDouble("CenterEnergy", "X03DA-SCIENTA:cam1:CENTRE_ENERGY_RBV"))
diag_channels.append(Scienta.highEnergy) #diag_channels.append(ChannelDouble("HighEnergy", "X03DA-SCIENTA:cam1:HIGH_ENERGY_RBV"))
diag_channels.append(ScientaDwellTime)
diag_channels.append(AcquisitionMode) #diag_attrs.append(ChannelString("AcquisitionMode", "X03DA-SCIENTA:cam1:ACQ_MODE_RBV"))
diag_channels.append(EnergyMode) #diag_attrs.append(ChannelString("EnergyMode", "X03DA-SCIENTA:cam1:ENERGY_MODE_RBV"))
diag_channels.append(LensMode) #diag_attrs.append(ChannelString("LensMode", "X03DA-SCIENTA:cam1:LENS_MODE_RBV"))
diag_channels.append(DetectorMode) #diag_attrs.append(ChannelString("DetectorMode", "X03DA-SCIENTA:cam1:DETECTOR_MODE_RBV"))
diag_channels.append(PassEnergy) #diag_attrs.append(ChannelString("PassEnergy", "X03DA-SCIENTA:cam1:PASS_ENERGY_RBV"))
diag_channels.append(ElementSet) #diag_attrs.append(ChannelString("ElementSet", "X03DA-SCIENTA:cam1:ELEMENT_SET_RBV"))
diag_channels.append(ExcitationEnergy) #diag_channels.append(ChannelDouble("ExcitationEnergy", "X03DA-SCIENTA:cam1:EXCITATION_ENERGY_RBV"))
diag_channels.append(StepSize) #diag_channels.append(ChannelDouble("StepSize", "X03DA-SCIENTA:cam1:STEP_SIZE_RBV"))
diag_channels.append(NumIterations) #diag_channels.append(ChannelDouble("NumIterations", "X03DA-SCIENTA:cam1:NumExposures_RBV"))
#diag_channels.append(AnalyserSlit) #diag_attrs.append(ChannelString("ElemeAnalyserSlitntSet", "X03DA-SCIENTA:cam1:ANALYSER_SLIT_RBV"))
snap_channels = []
snap_channels.append(KeiSample.rangeCh)
snap_channels.append(KeiSample.usermodeCh)
snap_channels.append(KeiSample.tottimeCh)
snap_channels.append(KeiSample.setvoltageCh)
snap_channels.append(KeiSample.voltoutCh)
snap_channels.append(KeiSample.invertreadoutCh)
snap_channels.append(KeiReference.rangeCh)
snap_channels.append(KeiReference.usermodeCh)
snap_channels.append(KeiReference.tottimeCh)
snap_channels.append(KeiReference.setvoltageCh)
snap_channels.append(KeiReference.voltoutCh)
snap_channels.append(KeiReference.invertreadoutCh)
diag_channels = sorted(diag_channels, key=lambda channel: channel.name)
snap_channels = sorted(snap_channels, key=lambda channel: channel.name)
def get_diag_name(diag):
return ch.psi.utils.Str.toTitleCase(diag.getName()).replace(" ", "").replace("Readback", "")
def print_diag():
for f in diag_channels:
print "%-25s %s" % (get_diag_name(f) , str(f.read()))
def create_diag_datasets(parent = None):
if parent is None:
parent = get_exec_pars().group
group = parent + "attrs/"
for f in diag_channels:
create_dataset(group+get_diag_name(f) , 's' if (type(f) is ch.psi.pshell.epics.ChannelString) else 'd')
def append_diag_datasets(parent = None):
if parent is None:
parent = get_exec_pars().group
group = parent + "attrs/"
for f in diag_channels:
try:
x = f.read()
if x is None:
x = '' if (type(f) is ch.psi.pshell.epics.ChannelString) else float('nan')
append_dataset(group+get_diag_name(f), x)
except:
log("Error sampling " + str(get_diag_name(f)) + ": " + str(sys.exc_info()[1]))
def create_metadata_datasets(parent = None):
if parent is None:
parent = "/"
group = parent + "general/"
for name in ["proposer", "proposal", "pgroup", "sample"]:
setting = get_setting(name)
save_dataset(group+name, setting if setting is not None else "", 's')
setting = get_setting("authors")
save_dataset(group+"authors", setting.split("|") if setting is not None else [""], '[s')
def wait_beam():
if not beam_ok:
print "Waiting for beam..."
while not beam_ok:
time.sleep(0.1)
print "Beam ok"
def is_scienta_sampling():
global SENSORS
sample_scienta = False
for dev in [ #Scienta.spectrum",
"EnergyDistribution", "AngleDistribution", "Scienta.dataMatrix", "Counts"]:
if dev in SENSORS:
sample_scienta = True
break
for dev in [ #"Scienta.spectrum," ,
EnergyDistribution, AngleDistribution, Scienta.dataMatrix, Counts]:
if dev in SENSORS:
sample_scienta = True
break
return sample_scienta
def before_readout():
sample_scienta = is_scienta_sampling()
wait_beam()
trig_keithleys()
if sample_scienta:
trig_scienta()
else:
wait_keithleys()
fetch_keithleys()
def after_readout(rec, scan):
if beam_ok:
if get_exec_pars().save:
if rec.index == 0:
if scan.index == 1:
create_metadata_datasets()
create_diag_datasets()
append_diag_datasets()
else:
rec.invalidate()
def after_scan():
"""
Close shutter and turn off analyser
"""
caput("X03DA-SCIENTA:cam1:ZERO_SUPPLIES", 1)
# caput("X03DA-PC:AFTER-SCAN.PROC", 1)
# caput("X03DA-OP-VG7:WT_SET", 0)
#caput("X03DA-FE-AB1:CLOSE4BL", 0)
#release_keithleys()
def set_adc_averaging(dwelltime=0.0):
if dwelltime == 0.0:
if is_scienta_sampling():
dwelltime = Scienta.getStepTime().read()
fixed = AcquisitionMode.read() == "Fixed"
else:
fixed = True
dwelltime = min(dwelltime, 20.0)
dwelltime = max(dwelltime, 0.1)
else:
fixed = True
prepare_keithleys(dwelltime, fixed)
#value = Scienta.getStepTime().read() * 10.0 #averaging count in 100ms
#SampleCurrentAveraging.write(value)
#RefCurrentAveraging.write(value)
#AuxCurrentAveraging.write(value)
#AuxVoltageAveraging.write(value)
def adjust_sensors():
#Updating ranges from Scienta
if is_scienta_sampling():
Scienta.update()
global SENSORS
if SENSORS is not None:
# Move integration to end
#sample_scienta = False
for dev in [ # "Scienta.spectrum",
"EnergyDistribution", "AngleDistribution", "Scienta.dataMatrix"]:
if dev in SENSORS:
#sample_scienta = True
SENSORS=SENSORS+[SENSORS.pop(SENSORS.index(dev))]
for dev in ["Counts"]:
if dev in SENSORS:
#sample_scienta = True
SENSORS=[SENSORS.pop(SENSORS.index(dev))] + SENSORS
if "Scienta.dataMatrix" in SENSORS or Scienta.dataMatrix in SENSORS:
print "Not ACC"
set_exec_pars(accumulate = False)
#if sample_scienta:
# init_scienta()
#Device aliases for data files
set_device_alias(Scienta.dataMatrix, "ScientaImage")
### NOT AVAILABLE IN NEW DRIVER
#set_device_alias(Scienta.spectrum, "ScientaSpectrum")
set_device_alias(Scienta.channelBegin, get_diag_name(Scienta.channelBegin))
set_device_alias(Scienta.channelEnd, get_diag_name(Scienta.channelEnd))
set_device_alias(Scienta.sliceBegin, get_diag_name(Scienta.sliceBegin))
set_device_alias(Scienta.sliceEnd, get_diag_name(Scienta.sliceEnd))
set_device_alias(Scienta.numChannels, get_diag_name(Scienta.numChannels))
set_device_alias(Scienta.numSlices, get_diag_name(Scienta.numSlices))
set_device_alias(Scienta.lowEnergy, get_diag_name(Scienta.lowEnergy))
set_device_alias(Scienta.centerEnergy, get_diag_name(Scienta.centerEnergy))
set_device_alias(Scienta.highEnergy, get_diag_name(Scienta.highEnergy))
set_device_alias(ManipulatorX.readback, get_diag_name(ManipulatorX.readback))
set_device_alias(ManipulatorY.readback, get_diag_name(ManipulatorY.readback))
set_device_alias(ManipulatorZ.readback, get_diag_name(ManipulatorZ.readback))
set_device_alias(ManipulatorTheta.readback, get_diag_name(ManipulatorTheta.readback))
set_device_alias(ManipulatorTilt.readback, get_diag_name(ManipulatorTilt.readback))
set_device_alias(ManipulatorPhi.readback, get_diag_name(ManipulatorPhi.readback))
#Additional device configuration
ManipulatorPhi.trustedWrite = False
def fit(ydata, xdata = None):
"""
"""
if xdata is None:
xdata = frange(0, len(ydata), 1)
max_y= max(ydata)
index_max = ydata.index(max_y)
max_x= xdata[index_max]
print "Max index:" + str(index_max),
print " x:" + str(max_x),
print " y:" + str(max_y)
gaussians = fit_gaussians(ydata, xdata, [index_max,])
(norm, mean, sigma) = gaussians[0]
p = plot([ydata],["data"],[xdata], title="Fit" )[0]
fitted_gaussian_function = Gaussian(norm, mean, sigma)
scale_x = [float(min(xdata)), float(max(xdata)) ]
points = max((len(xdata)+1), 100)
resolution = (scale_x[1]-scale_x[0]) / points
fit_y = []
fit_x = frange(scale_x[0],scale_x[1],resolution, True)
for x in fit_x:
fit_y.append(fitted_gaussian_function.value(x))
p.addSeries(LinePlotSeries("fit"))
p.getSeries(1).setData(fit_x, fit_y)
if abs(mean - xdata[index_max]) < ((scale_x[0] + scale_x[1])/2):
print "Mean -> " + str(mean)
p.addMarker(mean, None, "Mean="+str(round(norm,2)), Color.MAGENTA.darker())
return (norm, mean, sigma)
else:
p.addMarker(max_x, None, "Max="+str(round(max_x,2)), Color.GRAY)
print "Invalid gaussian fit: " + str(mean)
return (None, None, None)
def elog(title, message, attachments = [], author = None, category = "Info", domain = "", logbook = "Experiments", encoding=1):
"""
Add entry to ELOG.
"""
if author is None:
author = "pshell" #get_context().getUser().name
typ = "pshell"
entry = ""
cmd = 'G_CS_ELOG_add -l "' + logbook+ '" '
cmd = cmd + '-a "Author=' + author + '" '
cmd = cmd + '-a "Type=' + typ + '" '
cmd = cmd + '-a "Entry=' + entry + '" '
cmd = cmd + '-a "Title=' + title + '" '
cmd = cmd + '-a "Category=' + category + '" '
cmd = cmd + '-a "Domain=' + domain + '" '
for attachment in attachments:
cmd = cmd + '-f "' + attachment + '" '
cmd = cmd + '-n ' + str(encoding)
cmd = cmd + ' "' + message + '"'
#print cmd
#os.system (cmd)
#print os.popen(cmd).read()
import subprocess
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
(out, err) = proc.communicate()
if (err is not None) and err!="":
raise Exception(err)
print out
def get_plot_snapshots(title = None, file_type = "jpg", temp_path = get_context().setup.getContextPath()):
"""
Returns list with file names of plots snapshots from a plotting context.
"""
sleep(0.02) #Give some time to plot to be finished - it is not sync with acquisition
ret = []
for p in get_plots(title):
file_name = os.path.abspath(temp_path + "/" + p.getTitle() + "." + file_type)
p.saveSnapshot(file_name , file_type)
ret.append(file_name)
return ret