This commit is contained in:
gac-x03da
2022-02-02 16:23:05 +01:00
parent eeda4ed629
commit a3e546ce0b
13 changed files with 1616 additions and 0 deletions

View File

@@ -0,0 +1,343 @@
"""
Line/vector/area/holo scan of multiple spectral regions
save this script into your script/user folder before editing!
usage:
1. uncomment one of the MOTORS lines.
add another line if necessary.
2. uncomment one of the scan blocks and adjust the parameters.
add another block if necessary.
3. declare the regions.
4. add the regions to the REGIONS list.
5. run the script.
"""
# dummy scan (time series)
MOTORS = [dummy]
# photon energy scan (do not include 'ephot' in regions in this case!)
#MOTORS = [Eph]
# phi scan
#MOTORS = [ManipulatorPhi]
# holo scan
#MOTORS = (ManipulatorPhi, ManipulatorTheta)
# 2D YZ scan
#MOTORS = [ManipulatorY, ManipulatorZ]
# line scan [start, stop, step]
POSITIONS = [0.]
SCAN = 'vscan'
# vector scan [pos1, pos2, pos3, ...]
#POSITIONS = [600., 900.]
#SCAN = 'vscan'
# area scan [(start1, start2), (stop1, stop2), (step1, step2)]
# corresponding to (positioner1, positioner2)
#POSITIONS = [(-1., 114.), (+1., 116.), (20, 20)]
#ZIGZAG = True
#SCAN = 'ascan'
# holo scan
#PHI_RANGE = (-160.0, 160.0) # (tuple (min, max))
#THETA_RANGE = (-9.0, 81.0) # (tuple (min, max))
#STEPS = (40.0, 1.0) # (tuple (phi, theta))
#ZIGZAG = True
#POSITIONS = [(PHI_RANGE[0], THETA_RANGE[0]), (PHI_RANGE[1], THETA_RANGE[1]), STEPS]
#SCAN = 'ascan'
# seconds to wait between positioning command and triggering the detector
LATENCY = 0.0
# region setup
#
# for each region, define a python dictionary with the following items.
# optional items can be left unspecified and will default to the indicated values.
# for swept mode, include 'elo', 'ehi', 'estep', 'iter' values, but do not include 'efix'.
# for fixed mode, include 'efix' value, but do not include 'elo', 'ehi', 'estep', 'iter'.
#
# 'name': user-specific name of the region (for graph title and RegionName attribute in data file)
# 'elo': lower kinetic energy boundary of the spectrum
# 'ehi': upper kinetic energy boundary of the spectrum
# 'estep': energy step size
# 'efix': center kinetic energy in fixed mode
# 'epass': pass energy
# 'ephot': photon energy (default: unchanged)
# 'tstep': dwell time in seconds
# 'iter': number of iterations/sweeps (default 1)
# 'cis': True = constant initial state (photoemission line), False = constant final state (Auger peak), (default False)
# 'slit': exit slit (default: unchanged)
# First window should be set to first photon energy (see energies above)
REFERENCE_POSITION = {'X':1.2 ,'Y':0.0, 'Z':110.6, 'Theta':-9.1, 'Tilt':0.9, 'Phi':-90.0}
SAMPLE_POSITION = {'X':0.65 ,'Y':0.0, 'Z':115.0, 'Theta':-9.1, 'Tilt':0.9, 'Phi':-90.0}
REGION1 = {'name': 'AuFermi', 'ephot':600. , 'elo': 595., 'ehi':597.0, 'estep':0.2, 'epass': 20., 'tstep': 1., 'iter': 1, 'cis': False, 'position': 'reference'}
REGION2 = {'name': 'In3d','ephot':600., 'elo': 140., 'ehi':153.0, 'estep':0.2, 'epass': 20., 'tstep': 1., 'iter': 1, 'cis': False, 'position': 'sample'}
REGION3 = {'name': 'AuFermi', 'ephot':900. , 'elo': 895., 'ehi':897.0, 'estep':0.1, 'epass': 20., 'tstep': 1., 'iter': 1, 'cis': False, 'position': 'reference'}
REGION4 = {'name': 'Au4f', 'ephot':900. , 'elo': 804., 'ehi':815.0, 'estep':0.1, 'epass': 20., 'tstep': 1., 'iter': 1, 'cis': False, 'position': 'reference'}
REGION5 = {'name': 'In3d','ephot':900., 'elo': 440., 'ehi':453.0, 'estep':0.1, 'epass': 20., 'tstep': 1., 'iter': 1, 'cis': False, 'position': 'sample'}
# list of region dictionaries to execute at each scan position
REGIONS = [REGION1, REGION2]#, REGION3, REGION4, REGION5]
# close beam shutter and turn off analyser at the end of the scan
CLOSE_SHUTTER_AT_END = False
def move_to_position(pdict):
ManipulatorX.move(pdict['X'])
ManipulatorY.move(pdict['Y'])
ManipulatorZ.move(pdict['Z'])
ManipulatorTheta.move(pdict['Theta'])
ManipulatorTilt.move(pdict['Tilt'])
ManipulatorPhi.move(pdict['Phi'])
# --- DO NOT EDIT BELOW THIS LINE! ---
set_exec_pars(keep=False)
def check_region(region):
"""
check region dictionary items and apply defaults where necessary
"""
region['fixed'] = 'efix' in region
if region['fixed']:
region['elo'] = region['efix']
region['ehi'] = region['efix']
if 'iter' not in region:
region['iter'] = 1
print("region {0}: setting default iter = {1}".format(region['name'], region['iter']))
if 'cis' not in region:
region['cis'] = False
print("region {0}: setting default cis = {1}".format(region['name'], region['cis']))
class SpectrumReader(ReadonlyRegisterBase, ReadonlyRegisterArray):
def initialize(self):
#super(SpectrumReader, self).initialize()
self.scan_index = -1
def create_datasets(self):
path = get_exec_pars().scanPath + self.region_name + "/"
if self.region['fixed']:
self.channel_center_dataset_name = path + "ScientaChannelCenter"
create_dataset(self.channel_center_dataset_name, 'd')
else:
self.channel_begin_dataset_name = path + "ScientaChannelBegin"
self.channel_end_dataset_name = path + "ScientaChannelEnd"
self.step_energy_dataset_name = path + "ScientaStepEnergy"
create_dataset(self.channel_begin_dataset_name, 'd')
create_dataset(self.channel_end_dataset_name, 'd')
create_dataset(self.step_energy_dataset_name, 'd')
if 'epass' in self.region:
self.pass_energy_dataset_name = path + "ScientaPassEnergy"
create_dataset(self.pass_energy_dataset_name, 'd')
if 'tstep' in self.region:
self.step_time_dataset_name = path + "ScientaStepTime"
create_dataset(self.step_time_dataset_name, 'd')
if 'iter' in self.region:
self.iterations_dataset_name = path + "ScientaIterations"
create_dataset(self.iterations_dataset_name, 'd')
if 'slit' in self.region:
self.slit_dataset_name = path + "ExitSlit"
create_dataset(self.slit_dataset_name, 'd')
if 'position' in self.region:
position_names = {key:path + "Position"+ key for key in ['X','Y','Z','Theta','Tilt','Phi']}
self.position_dataset_names = position_names
for datanames in self.position_dataset_names.values():
create_dataset(datanames, 'd')
def setup(self):
# print("spectrum.setup")
if self.scan_index != get_exec_pars().index:
self.scan_index = get_exec_pars().index
self.create_datasets()
if self.region_index == 0:
print "scan {0}".format(self.scan_index)
edelta = 0.0
try:
ephot = self.region['ephot']
Eph.move(ephot)
except KeyError:
ephot = Eph.take(100)
if isinstance(ephot, float) and ephot > 0.:
try:
if self.region['cis']:
edelta = ephot - self.ephot_start
except AttributeError:
self.ephot_start = ephot
elo = self.region['elo'] + edelta
ehi = self.region['ehi'] + edelta
if self.region['fixed']:
Scienta.setAcquisitionMode(ch.psi.pshell.epics.Scienta.AcquisitionMode.Fixed)
Scienta.centerEnergy.write(elo)
append_dataset(self.channel_center_dataset_name, elo)
else:
Scienta.setAcquisitionMode(ch.psi.pshell.epics.Scienta.AcquisitionMode.Swept)
Scienta.lowEnergy.write(elo)
Scienta.highEnergy.write(ehi)
Scienta.stepSize.write(self.region['estep'])
append_dataset(self.channel_begin_dataset_name, elo)
append_dataset(self.channel_end_dataset_name, ehi)
append_dataset(self.step_energy_dataset_name, self.region['estep'])
try:
Scienta.setPassEnergy(int(self.region['epass']))
append_dataset(self.pass_energy_dataset_name, self.region['epass'])
except KeyError:
pass
try:
Scienta.stepTime.write(self.region['tstep'])
append_dataset(self.step_time_dataset_name, self.region['tstep'])
except KeyError:
pass
try:
Scienta.setIterations(self.region['iter'])
append_dataset(self.iterations_dataset_name, self.region['iter'])
except KeyError:
pass
try:
ExitSlit.write(self.region['slit'])
append_dataset(self.slit_dataset_name, self.region['slit'])
except KeyError:
pass
if self.region['position'] == 'sample':
move_to_position(SAMPLE_POSITION)
for name in SAMPLE_POSITION.keys():
append_dataset(self.position_dataset_names[name], SAMPLE_POSITION[name])
elif self.region['position'] == 'reference':
move_to_position(REFERENCE_POSITION)
for name in REFERENCE_POSITION.keys():
append_dataset(self.position_dataset_names[name], SAMPLE_POSITION[name])
Scienta.update()
def read(self):
# print("spectrum.read")
global current_region_index
current_region_index = self.region_index
self.setup()
print("Acquiring region {0}.".format(self.region['name']))
trig_scienta()
time.sleep(0.5)
sp = Scienta.getSpectrum().read()
return sp
def getSize(self):
if self.region['fixed']:
nx = 992
else:
nx = int((self.region['ehi'] - self.region['elo']) / self.region['estep']) + 1
return nx
class ImageReader(ReadonlyRegisterBase, ReadonlyRegisterMatrix):
def read(self):
# print("image.read")
return Scienta.getDataMatrix().read()
def getWidth(self):
if self.region['fixed']:
nx = 992
else:
nx = int((self.region['ehi'] - self.region['elo']) / self.region['estep']) + 1
return nx
def getHeight(self):
ny = Scienta.slices.read()
return ny
def setup_live_plots(regions):
global live_plots
global current_region_index
names = [region['name'] for region in regions]
live_plots = plot(None, names, title="Live Spectra")
current_region_index = 0
def update_live_plots():
global live_plots
global current_region_index
try:
while get_context().state.running:
y = Scienta.spectrum.take(100)
x = Scienta.spectrumX
try:
series = live_plots[current_region_index].getSeries(0)
series.setData(x, y)
except IndexError:
pass
time.sleep(1.0)
finally:
print "Stopping live spectra"
def do_scan(scan, motors, positions, regions, latency):
global SENSORS
SENSORS = []
for (index, region) in enumerate(regions):
check_region(region)
reader = SpectrumReader()
reader.region_index = index
reader.region_name = "region{0}".format(index + 1)
reader.region = region
reader.initialize()
set_device_alias(reader, reader.region_name + "/ScientaSpectrum")
SENSORS.append(reader)
image = ImageReader()
image.region_index = index
image.region = region
image.initialize()
set_device_alias(image, reader.region_name + "/ScientaImage")
SENSORS.append(image)
SENSORS.append(SampleCurrent)
SENSORS.append(RefCurrent)
adjust_sensors()
set_adc_averaging()
if scan == 'ascan':
ascan(motors, SENSORS, positions[0], positions[1], positions[2], latency, False, zigzag = True, before_read=wait_beam, after_read = after_readout)
elif scan == 'lscan':
lscan(motors, SENSORS, positions[0], positions[1], positions[2], latency, False, before_read=wait_beam, after_read = after_readout)
elif scan == 'vscan':
vscan(motors, SENSORS, positions, True, latency,False, before_read=wait_beam, after_read = after_readout)
else:
print('unknown scan mode {}'.format(scan))
for (index, region) in enumerate(regions):
set_attribute(get_exec_pars().scanPath + "region{0}/ScientaSpectrum".format(index + 1), "RegionName", region['name'])
set_attribute(get_exec_pars().scanPath + "region{0}/ScientaImage".format(index + 1), "RegionName", region['name'])
set_attribute(get_exec_pars().scanPath, "Regions", [region['name'] for region in regions])
try:
setup_live_plots(REGIONS)
task = fork(update_live_plots)
do_scan(SCAN, MOTORS, POSITIONS, REGIONS, LATENCY)
finally:
if CLOSE_SHUTTER_AT_END:
after_scan()

View File

@@ -0,0 +1,67 @@
"""
Flying hologram scan (experimental)
Arguments:
THETA_RANGE (tuple (min, max))
PHI_RANGE (tuple (min, max))
THETA_STEP (scalar)
PHI_STEP (scalar)
ZIGZAG (BOOLEAN)
LATENCY (float) in seconds
SENSORS (list of devices)
"""
THETA_RANGE = (-9.0, 81.0)
THETA_STEP = 1.0
#PHI_RANGE = (-40.0, +40.0)
PHI_RANGE = (-160.0, +160.0)
PHI_STEP = 40.0
LATENCY = 0.0
ZIGZAG = True
ENDSCAN = True
MOTORS = (ManipulatorTheta)
#SENSORS = (Counts, Scienta.spectrum, SampleCurrent, RefCurrent, MachineCurrent)
SENSORS = (Counts, Scienta.dataMatrix, SampleCurrent, RefCurrent, MachineCurrent)
#set_preference(Preference.ENABLED_PLOTS, [ManipulatorPhi, ManipulatorTheta, Scienta.dataMatrix, ImageIntegrator])
#set_preference(Preference.PLOT_TYPES,{'ImageIntegrator':1})
adjust_sensors()
set_adc_averaging()
set_preference(Preference.PLOT_TYPES, {'Scienta spectrum':1})
# time per scienta acquisition in seconds
time1 = time.time()
trig_scienta()
time2 = time.time()
scienta_time = (time2 - time1)
print "scienta_time: ", scienta_time
# time for one theta scan in seconds
THETA_NSTEPS = int((THETA_RANGE[1] - THETA_RANGE[0]) / THETA_STEP) + 1
theta_time = scienta_time * THETA_NSTEPS
print "theta_time: ", theta_time
PHI_NSTEPS = int((PHI_RANGE[1] - PHI_RANGE[0]) / PHI_STEP) + 1
phi_positions = [PHI_RANGE[0] + PHI_STEP * i for i in range(PHI_NSTEPS)]
print "phi_positions: ", phi_positions
def before_pass(index, scan):
global phi_positions
print "Starting pass: ", index
phi = phi_positions[index-1]
ManipulatorPhi.write(phi)
print "phi = ", phi
ManipulatorPhi.waitValueInRange(phi, 1.0, 100)
try:
cscan(MOTORS, SENSORS, THETA_RANGE[0], THETA_RANGE[1], THETA_NSTEPS - 1, time=theta_time, passes=len(phi_positions), zigzag=ZIGZAG, before_read=before_readout, after_read = after_readout, before_pass = before_pass, check_positions = False)
finally:
if ENDSCAN:
after_scan()

View File

@@ -0,0 +1,25 @@
"""
Arguments:
SENSORS (list)
PHI_RANGE (tuple (min, max))
THETA_RANGE (tuple (min, max))
STEPS (tuple (phi, theta))
LATENCY (double)
RELATIVE (BOOLEAN)
ZIGZAG (BOOLEAN)
"""
#set_preference(Preference.ENABLED_PLOTS, [ManipulatorPhi, ManipulatorTheta, Scienta.dataMatrix, ImageIntegrator])
#set_preference(Preference.PLOT_TYPES,{'ImageIntegrator':1})
adjust_sensors()
set_adc_averaging()
set_preference(Preference.PLOT_TYPES, {'Scienta spectrum':1})
try:
ascan((ManipulatorPhi, ManipulatorTheta), SENSORS, (PHI_RANGE[0], THETA_RANGE[0]), (PHI_RANGE[1], THETA_RANGE[1]), STEPS, LATENCY, RELATIVE, zigzag = ZIGZAG, \
before_read=before_readout, after_read = after_readout, compression = True)
finally:
if ENDSCAN:
after_scan()

View File

@@ -0,0 +1,77 @@
"""
Continuous 1D Manipulator scan
set manipulator scan parameters below.
set analyser parameters in the scienta window. recommended: fixed mode, dwell time between 0.2 and 1.0 s
the motor speed is determined from the STEP parameter and the scienta busy time (dwell time + dead time).
note that the motors have a limited speed range!
"""
import math
MOTORS = (ManipulatorX)
CENTER = -2.8
WIDTH = 2.
RANGE = (CENTER - WIDTH / 2., CENTER + WIDTH / 2.)
STEP = 0.020
#MOTORS = (ManipulatorY)
#RANGE = (-3.5, +3.5)
#STEP = 0.1
# Z axis cannot be used in fly scan. minimum speed is too high.
#MOTORS = (ManipulatorZ)
#RANGE = (112., 118.)
#STEP = 0.4
#MOTORS = (ManipulatorTheta)
#RANGE = (-9., 81.)
## minimum speed 0.001, maximum speed 0.5 deg/s
#SPEED = 0.1
#MOTORS = (ManipulatorTilt)
#RANGE = (-20., +20.)
## minimum speed 0.1, maximum speed 1.4 mm/s
#SPEED = 1.0
#MOTORS = (ManipulatorPhi)
#RANGE = (-179., +180.)
## minimum speed 0.6, maximum speed 6.0 mm/s
#SPEED = 1.0
SENSORS = (Counts, Scienta.dataMatrix, SampleCurrent, RefCurrent, MachineCurrent, EnergyDistribution, AngleDistribution)
# --- do not edit below ---
RELATIVE = False
LATENCY = 0.0
ZIGZAG = False
ENDSCAN = False
adjust_sensors()
set_adc_averaging()
# time per scienta acquisition in seconds
time1 = time.time()
trig_scienta()
time2 = time.time()
scienta_time = (time2 - time1)
print "scienta time: ", scienta_time
# time for one scan in seconds
SPEED = STEP / scienta_time
print "speed: ", SPEED
fly_time = (RANGE[1] - RANGE[0]) / SPEED
STEPS = int(fly_time / scienta_time) + 1
print "scan time: ", fly_time
try:
cscan(MOTORS, SENSORS, RANGE[0], RANGE[1], STEPS, time=fly_time, before_read=before_readout, after_read = after_readout, check_positions = False)
finally:
if ENDSCAN:
after_scan()

View File

@@ -0,0 +1,44 @@
"""
Arguments:
MOTOR (device)
SENSORS (list)
RANGE (tuple (min, max))
STEPS (int or tuple)
LATENCY (double)
RELATIVE (BOOLEAN)
FLY_SCAN (BOOLEAN)
"""
#set_preference(Preference.PLOT_TYPES,{'ImageIntegrator':1})
adjust_sensors()
set_adc_averaging()
set_preference(Preference.PLOT_TYPES, {'Scienta spectrum':1})
try:
if FLY_SCAN:
# time per scienta acquisition in seconds
time1 = time.time()
trig_scienta()
time.sleep(0.2)
time2 = time.time()
scienta_time = (time2 - time1)
print "scienta time: ", scienta_time
if isinstance(STEPS,int):
raise Exception ("Fly Scan must define step size, and not number of steps")
STEP = STEPS[0]
SPEED = STEP / scienta_time
fly_time = (RANGE[1] - RANGE[0]) / SPEED
STEPS = int(fly_time / scienta_time) + 1
print "speed: ", SPEED
print "scan time: ", fly_time
cscan(MOTOR, SENSORS, RANGE[0], RANGE[1], STEPS, LATENCY, fly_time, RELATIVE, before_read=before_readout, after_read = after_readout, check_positions = False)
else:
lscan(MOTOR, SENSORS, RANGE[0], RANGE[1], STEPS, LATENCY, RELATIVE, before_read=before_readout, after_read = after_readout)
finally:
if ENDSCAN:
after_scan()

View File

@@ -0,0 +1,39 @@
"""
Manipulator scan across the beam relative to current position
set manipulator scan parameters below.
set analyser parameters separately!
move manipulator to center position before start!
set ANGLE = -30.0 to move the sample across the beam.
set ANGLE = +60.0 to move the sample along the beam.
"""
import math
# adjust the following parameters
DISTANCE = 1.
#ANGLE = -30.0 # move sample across beam
ANGLE = +60.0 # move sample along beam
STEPS = 50
LATENCY = 0.0
ENDSCAN = False
# do not edit below
DISTANCE_X = DISTANCE * math.cos(math.radians(ANGLE))
DISTANCE_Y = DISTANCE * math.sin(math.radians(ANGLE))
MOTOR = (ManipulatorX, ManipulatorY)
SENSORS = (Counts, SampleCurrent, RefCurrent, MachineCurrent, EnergyDistribution, AngleDistribution)
STARTPOS = (-DISTANCE_X / 2.0, -DISTANCE_Y / 2.0)
ENDPOS = (DISTANCE_X / 2.0, DISTANCE_Y / 2.0)
RELATIVE = True
adjust_sensors()
set_adc_averaging()
try:
lscan(MOTOR, SENSORS, STARTPOS, ENDPOS, STEPS, LATENCY, RELATIVE, before_read=before_readout, after_read = after_readout)
finally:
if ENDSCAN:
after_scan()

View File

@@ -0,0 +1,60 @@
"""
Continuous 2D Manipulator scan
set manipulator scan parameters below.
set analyser parameters separately!
"""
import math
RANGE_Z = (114.6, 116.0)
# actual number of positions will be +1!
STEPS_Z = 10
RANGE_Y = (-1.5, 0.5)
# minimum speed 0.01, maximum speed 0.125 mm/s
SPEED_Y = 11
RELATIVE = False
LATENCY = 0.0
ZIGZAG = False
ENDSCAN = False
MOTORS = (ManipulatorY)
#SENSORS = (Counts, Scienta.spectrum, SampleCurrent, RefCurrent, MachineCurrent, EnergyDistribution, AngleDistribution)
SENSORS = (Counts, Scienta.dataMatrix, SampleCurrent, RefCurrent, MachineCurrent, EnergyDistribution, AngleDistribution)
adjust_sensors()
set_adc_averaging()
# time per scienta acquisition in seconds
time1 = time.time()
trig_scienta()
time2 = time.time()
scienta_time = (time2 - time1)
print "scienta time: ", scienta_time
# time for one Y scan in seconds
fly_time = (RANGE_Y[1] - RANGE_Y[0]) / SPEED_Y
STEPS_Y = int(fly_time / scienta_time) + 1
print "Y time: ", fly_time
STEP_Z = (RANGE_Z[1] - RANGE_Z[0]) / STEPS_Z
positions_z = [RANGE_Z[0] + STEP_Z * i for i in range(STEPS_Z + 1)]
print "Z positions: ", positions_z
def before_pass(index, scan):
global positions_z
print "Starting pass: ", index
z = positions_z[index-1]
ManipulatorZ.write(z)
print "z = ", z
ManipulatorZ.waitValueInRange(z, 1.0, 100)
try:
cscan(MOTORS, SENSORS, RANGE_Y[0], RANGE_Y[1], STEPS_Y, time=fly_time, passes=len(positions_z), zigzag=ZIGZAG, before_read=before_readout, after_read = after_readout, before_pass = before_pass, check_positions = False)
finally:
if ENDSCAN:
after_scan()

View File

@@ -0,0 +1,34 @@
"""
2D Manipulator scan
set manipulator scan parameters below.
set analyser parameters separately!
move manipulator to center position before start!
"""
import math
# actual number of positions will be +1!
STEPS = (10, 10)
LATENCY = 0.0
ENDSCAN = False
ZIGZAG = True
MOTORS = (ManipulatorZ, ManipulatorY)
#SENSORS = (Counts, Scienta.spectrum, SampleCurrent, RefCurrent, MachineCurrent, EnergyDistribution, AngleDistribution)
SENSORS = (Counts, Scienta.dataMatrix, SampleCurrent, RefCurrent, MachineCurrent, EnergyDistribution, AngleDistribution)
STARTPOS = (114.6, -1.5)
ENDPOS = (116.0, 0.5)
RELATIVE = False
adjust_sensors()
set_adc_averaging()
#set_preference(Preference.PLOT_TYPES, {'Scienta spectrum':1})
try:
ascan(MOTORS, SENSORS, STARTPOS, ENDPOS, STEPS, LATENCY, RELATIVE, zigzag = ZIGZAG, before_read=before_readout, after_read = after_readout)
finally:
if ENDSCAN:
after_scan()

View File

@@ -0,0 +1,35 @@
"""
Arguments:
VECTOR (Double[][], Scan vector: Eph,Elow,Ehigh or Eph,Ecenter)
SENSORS (list)
LATENCY (double)
MODE ('fixed' or 'swept')
TYPE ('CIS' or 'CFS')
STEP (double)
"""
if MODE == "swept":
Scienta.setAcquisitionMode(ch.psi.pshell.epics.Scienta.AcquisitionMode.Swept)
else:
Scienta.setAcquisitionMode(ch.psi.pshell.epics.Scienta.AcquisitionMode.Fixed)
if len(VECTOR[0]) == 2:
#FIXED
Scienta.centerEnergy.write(VECTOR[0][1])
writables = (Eph, Scienta.centerEnergy)
else:
#SWEPT
Scienta.lowEnergy.write(VECTOR[0][1])
Scienta.highEnergy.write(VECTOR[0][2])
writables = (Eph, Scienta.lowEnergy, Scienta.highEnergy)
adjust_sensors()
set_adc_averaging()
set_preference(Preference.PLOT_TYPES, {'Scienta spectrum':1})
try:
vscan(writables, SENSORS, VECTOR, True, LATENCY,False, before_read=before_readout, after_read = after_readout)
finally:
if ENDSCAN:
after_scan()

View File

@@ -0,0 +1,30 @@
"""
XAS scan
"""
POSITIONERS = (Eph)
# SENSORS = (Keithley1, Keithley2, MachineCurrent)
SENSORS = (SampleCurrent, RefCurrent, MachineCurrent)
#SENSORS = (SampleCurrent, RefCurrent, AuxCurrent, MachineCurrent, OpticsCameraCentroidX, OpticsCameraSigmaX)
STARTPOS = (440.)
ENDPOS = (450.)
#NUMPOINTS = 76
STEPSIZE = 2.5
LATENCY = 0.0
ENDSCAN = False # close shutter at end
def trig():
time.sleep(10.)
before_readout()
#wait_beam()
#caput("X03DA-OP-10ADC:TRG.PROC", 1)
try:
#prepare_keithleys(DWELL)
lscan(POSITIONERS, SENSORS, STARTPOS, ENDPOS, STEPSIZE, LATENCY, before_read=trig, after_read=after_readout)
finally:
if ENDSCAN:
after_scan()

View File

@@ -0,0 +1,191 @@
#Parameters (global variables):
# ranges: list of RangeSelection havinf args = (step_size, step_time, iterations)
# pass_energy
# save_scienta_image
#
# skip_iteration: if set to 1 then skips after end of current iteration
from ch.psi.pshell.data.LayoutDefault import ATTR_WRITABLE_DIMENSION as ATTR_WRITABLE_DIMENSION
import org.jfree.chart.axis.NumberAxis as NumberAxis
import math
cur_range = 0
cur_iteration = 0
if Scienta.acquisitionMode != Scienta.AcquisitionMode.Swept:
Scienta.acquisitionMode = Scienta.AcquisitionMode.Swept
ret=[]
adjusted_ranges = []
for cur_range in range(len(ranges)):
r = ranges[cur_range]
print r
print r.vars
ar = [round(r.min / r.vars[1]) * r.vars[1], round(r.max / r.vars[1]) * r.vars[1]]
adjusted_ranges.append(ar)
set_exec_pars(open = True)
create_metadata_datasets()
#Global arguments
Scienta.passEnergy = pass_energy
names=[]
names.append("Online Spectrum")
for i in range(len(ranges)):
names.append(str(ranges[i]))
plots = plot(None, names)
for p in plots[1:]:
p.getAxis(p.AxisId.X).label = "kinetic energy"
p.getAxis(p.AxisId.X2).setLabel("binding energy")
p.getAxis(p.AxisId.X2).inverted = True
p = plots[0]
be_axis = plots[0].getAxis(p.AxisId.X2)
be_axis.inverted=True
be_axis.setLabel("binding energy")
spectrum_series = p.getSeries(0)
def get_binding_energy(e):
ephot = Eph.take(100)
workfunc = 4.5
if type(ephot) != float or ephot < 0.:
ephot = Scienta.highEnergy.take(100)
return ephot - e - workfunc
def get_binding_range(p=None):
if p is None:
return get_binding_energy(Scienta.highEnergy.take(100)), get_binding_energy(Scienta.lowEnergy.take(100))
else:
ke_range=p.getAxis(p.AxisId.X).getDisplayRange()
return get_binding_energy(ke_range.max), get_binding_energy(ke_range.min)
eb2, eb1 = get_binding_range(p)
be_axis.setRange(eb2, eb1)
def plot_cur_spectrum():
try:
while get_context().state.running:
y = Scienta.spectrum.take(100)
x = Scienta.spectrumX
spectrum_series.setData(x, y)
eb2, eb1 = get_binding_range(plots[0])
if (be_axis.min != eb2) or (be_axis.max != eb1):
plots[0].resetZoom()
be_axis.setRange(eb2, eb1)
time.sleep(1.0)
finally:
print "Stopping spectrum plotting"
task = None
# measurements
try:
for cur_range in range(len(ranges)):
cur_iteration = 0
skip_iteration = False
vars = ranges[cur_range].vars
#Check if photon energy is defined
if len(vars) > 2:
eph = vars[3]
if eph and (not math.isnan(eph)):
Eph.move(eph)
Scienta.lowEnergy.write(adjusted_ranges[cur_range][0])
Scienta.highEnergy.write(adjusted_ranges[cur_range][1])
Scienta.update()
Scienta.stepTime.write(vars[0])
Scienta.stepSize.write(vars[1])
Scienta.setIterations(1)
set_adc_averaging()
#iterations done in script
xdata = None
ydata = None
image_data = None
task = fork(plot_cur_spectrum)
path="scan" + str(cur_range+1) + "/"
for cur_iteration in range(vars[2]):
p = plots[cur_range+1]
p.setTitle(str(ranges[cur_range]) + " - iteration " + str(cur_iteration+1))
while True:
wait_beam()
trig_scienta()
spectrum_array = Scienta.spectrum.read()
if beam_ok:
if image_data is None:
time.sleep(2.0)
(_width, _height) = Scienta.getImageSize()
break
if ydata is None:
ydata = spectrum_array
else:
for k in range (len(spectrum_array)):
ydata[k] = ydata[k] + spectrum_array[k]
if xdata is None:
xdata = Scienta.spectrumX
p.getSeries(0).setData(xdata, ydata)
eb2, eb1 = get_binding_range()
p.getAxis(p.AxisId.X2).setRange(eb2, eb1)
if save_scienta_image:
image_array = Scienta.dataMatrix.read()
if _width != len(image_array[0]) or _height != len(image_array):
err = "Scienta image size changed during the acquisition: " + str((len(image_array[0]), len(image_array))) + " - original: " + str((_width, _height))
print err
log(err)
raise Exception(err)
if image_data is None:
image_data = image_array
else:
for k in range (len(image_data)):
for j in range (len(image_data[0])):
image_data[k][j] = image_data[k][j] + image_array[k][j]
if skip_iteration:
break
save_dataset(path + "ScientaSpectrum", ydata)
set_attribute(path, "Iterations",cur_iteration+1)
if save_scienta_image:
save_dataset(path + "ScientaImage", image_data, features = {"compression":True})
if cur_iteration==0:
save_dataset(path + "ScientaChannels", xdata)
set_attribute(path + "ScientaChannels", ATTR_WRITABLE_DIMENSION, 1)
set_attribute(path, "Range Low", adjusted_ranges[cur_range][0])
set_attribute(path, "Range High", adjusted_ranges[cur_range][1])
set_attribute(path, "Step Time", vars[0])
set_attribute(path, "Step Size", vars[1])
set_attribute(path, "Pass Energy",pass_energy)
set_attribute(path, "Readables", ["ScientaSpectrum","ScientaImage"] if save_scienta_image else ["ScientaSpectrum",])
set_attribute(path, "Writables", ["ScientaChannels",])
create_diag_datasets(path)
append_diag_datasets(path)
plots[cur_range+1].setTitle(str(ranges[cur_range]))
ret.append((xdata, ydata))
finally:
cur_range = -1
if not Scienta.isReady():
Scienta.stop()
Scienta.update()
if task:
task[0].cancel(True)
if ENDSCAN:
after_scan()
set_return(to_array(ret,'o'))

View File

@@ -0,0 +1,204 @@
import ch.psi.pshell.epics as epics
import math
class Keithley(object):
RANGE_STATES = ['AUTO', '20 mA', '2 mA', '200 uA', '20 uA', '2 uA', '200 nA', '20 nA', '2 nA', '200 pA', '20 pA']
TTYPE_STATES = ['IMM', 'TLIN', 'BUS', 'EXT']
USER_MODE_STATES = ['def setting', 'poll curr fast', 'poll curr medi', 'poll curr slow', 'trig setting', 'trigger BUS', 'trigger TLIN', 'trigger EXT', 'poll volt medi']
SCAN_STATES = ['Passive', 'Event', 'I/O Intr', '10 second', '5 second', '2 second', '1 second', '.5 second', '.2 second', '.1 second']
SCAN_INTERVALS = [0., 0., 0., 10., 5., 2., 1., .5, .2, .1]
def __init__(self, base_name, base_channel):
self.dwell = 0.
self.triggered = False
self.base_channel = base_channel
self.base_name = base_name
self.rangeCh = None
self.scanCh = None
self.ttypeCh = None
self.nplcCh = None
self.navgCh = None
self.tottimeCh = None
self.doinitCh = None
self.dotriggerCh = None
self.dofetchCh = None
self.readoutCh = None
def initialize(self):
self.rangeCh = epics.ChannelInteger(self.base_name + "Range", self.base_channel + "RANGE")
self.scanCh = epics.ChannelInteger(self.base_name + "Scan", self.base_channel + "READSCAN.SCAN")
self.ttypeCh = epics.ChannelInteger(self.base_name + "TType", self.base_channel + "TTYPE")
self.nplcCh = epics.ChannelDouble(self.base_name + "Nplc", self.base_channel + "NPLC")
self.navgCh = epics.ChannelDouble(self.base_name + "Navg", self.base_channel + "NAVG")
self.tottimeCh = epics.ChannelDouble(self.base_name + "TotTime", self.base_channel + "TOTTIME")
self.doinitCh = epics.ChannelInteger(self.base_name + "DoInit", self.base_channel + "DOINIT")
self.dotriggerCh = epics.ChannelInteger(self.base_name + "DoTrigger", self.base_channel + "DOTRIGGER")
self.dofetchCh = epics.ChannelInteger(self.base_name + "DoFetch", self.base_channel + "DOFETCH")
self.readoutCh = epics.ChannelDouble(self.base_name + "Readout", self.base_channel + "READOUT")
# DOZCHOFF
# ZCH_SP
# DOCURRENT
# DORESET
# DOSETDEFAULT
# USER_MODE
self.rangeCh.initialize()
self.scanCh.initialize()
self.ttypeCh.initialize()
self.nplcCh.initialize()
self.navgCh.initialize()
self.tottimeCh.initialize()
self.doinitCh.initialize()
self.dotriggerCh.initialize()
self.dofetchCh.initialize()
self.readoutCh.initialize()
def setup():
"""
EXPERIMENTAL
to set up the keithley after Reset, there are two options
1) do set defaults, set scan, set range
2) set user mode, set range
"""
#self.dosetdefaultCh.write(1)
#self.scanCh.write(9)
def prepare(self, dwell, triggered):
"""
prepare keithley for gpib polling.
setting keithley parameters has several issues.
the dwell time and trigger mode cannot be set programmatically at the moment.
the user should select poll slow (100 ms), medium (20 ms) or fast (2 ms).
this method just reads the current value and stores it in self.dwell.
dwell: dwell time in seconds.
0.1 - 20.0 in triggered mode,
0.1 - 1.0 in free running mode.
triggered:
True: wait for self.trig call and trigger once per call.
False: 1 Hz free run using EPICS SCAN attribute.
"""
self.triggered = False
self.dwell = self.tottimeCh.read() / 1000.
def prepare_not_working(self, dwell, triggered):
"""
prepare keithley for gpib polling:
scan passive, bus triggered, set dwell time
this doesn't to work.
dwell: dwell time in seconds.
0.1 - 20.0 in triggered mode,
0.1 - 1.0 in free running mode.
triggered:
True: wait for self.trig call and trigger once per call.
False: 1 Hz free run using EPICS SCAN attribute.
"""
self.triggered = triggered
if triggered:
self.scanCh.write(0)
self.ttypeCh.write(2)
else:
self.ttypeCh.write(0)
self.scanCh.write(6)
dwell = min(dwell, 1.)
nplc = 5.
navg = dwell / 0.1
if navg > 100:
nplc *= 2
navg /= 2
navg = min(navg, 100.)
nplc = min(nplc, 10.)
self.nplcCh.write(nplc)
self.navgCh.write(navg)
self.dwell = self.tottimeCh.read() / 1000.
def trig(self):
"""
trigger keithleys, wait until done, and read the result into EPICS.
the value can then be read by pshell from the channel.
if self.prepare was called with triggered = False,
this method has no effect.
"""
if self.triggered:
self.doinitCh.write(1)
self.dotriggerCh.write(1)
def get_dwell(self):
"""
get dwell time in seconds.
"""
dwell = self.tottimeCh.read() / 1000.
self.dwell = dwell
return dwell
def fetch(self):
"""
fetch the current value from the keithley into EPICS.
if self.prepare was called with triggered = False,
this method has no effect.
"""
if self.triggered:
self.dofetchCh.write(1)
def read(self):
"""
read the curent value.
"""
return self.readoutCh.read()
def release(self):
"""
switch keithleys to free run.
0.1 s dwell time, 1 s poll interval.
_do nothing for now!_
"""
return None
self.nplcCh.write(5.)
self.navgCh.write(1.)
self.scanCh.write(6)
self.ttypeCh.write(0)
def reset(self):
"""
switch to zero check
"""
self.doresetCh.write(1)
def set_range(self, value):
"""
set the current range.
value can be:
- float: current limit in A.
values between 2e-11 and 2e-3 set a fixed range.
values greater than 0.02 select AUTO.
- str: state label of EPICS channel (cf. self.RANGE_STATES).
must be one of the state labels, otherwise a ValueError is raised.
- int: state index of EPICS channel (0...10)
"""
if isinstance(value, float):
try:
value = int(-math.log10(value / 2)) - 1
value = max(value, 0)
value = min(value, len(self.RANGE_STATES)-1)
except ValueError:
value = 0
elif isinstance(value, str):
value = self.RANGE_STATES.index(value)
self.rangeCh.write(value)
KeiSample = Keithley("SampleKeithley", "X03DA-KEITHLEY-1:")
KeiReference = Keithley("ReferenceKeithley", "X03DA-KEITHLEY-2:")
KeiSample.initialize()
KeiReference.initialize()

View File

@@ -0,0 +1,467 @@
import random
import ch.psi.pshell.device.Readable.ReadableArray as ReadableArray
import ch.psi.pshell.device.Readable.ReadableCalibratedArray as ReadableCalibratedArray
import ch.psi.pshell.device.ArrayCalibration as ArrayCalibration
import ch.psi.utils.Str
from mathutils import estimate_peak_indexes, fit_gaussians, create_fit_point_list, Gaussian
import java.awt.Color as Color
#Synchrronized Scienta counts
for stat in Scienta.stats:
add_device(stat, True)
beam_ok = True
class SimulatedOutput(Writable):
def write(self, value):
pass
class SimulatedInput(Readable):
def __init__(self):
self.x = 0.0
def read(self):
self.x = self.x + 0.2
noise = (random.random() - 0.5) / 20.0
return math.sin(self.x) + noise
sout = SimulatedOutput()
sinp = SimulatedInput()
def integrate_image(vertical = True):
data = Scienta.dataArray.read()
#Integrate and plot
(width,height) = Scienta.getImageSize().tolist()
integration = []
if vertical:
for i in range(width):
p=0.0
for j in range(height):
p=p+data[j*width+i]
integration.append(p)
else:
for j in range(height):
p=0.0
for i in range(width):
p=p+data[j*width+i]
integration.append(p)
return integration
class ImageEnergyDistribution(ReadableCalibratedArray):
def getSize(self):
(width,height) = Scienta.getImageSize().tolist()
return width
def read(self):
return to_array(integrate_image(),'d')
def getCalibration(self):
c=Scienta.readImageDescriptor().calibration
if c is None:
return None
return ArrayCalibration(c.scaleX, c.offsetX)
EnergyDistribution = ImageEnergyDistribution()
class ImageAngleDistribution(ReadableCalibratedArray):
def getSize(self):
(width,height) = Scienta.getImageSize().tolist()
return height
def read(self):
return to_array(integrate_image(False),'d')
def getCalibration(self):
c=Scienta.readImageDescriptor().calibration
if c is None:
return None
return ArrayCalibration(c.scaleY, c.offsetY)
AngleDistribution = ImageAngleDistribution()
def init_scienta():
"""
turn on the analyser and start a mock measurement so that we get the correct array size.
start a scienta acquisition and abort after 4 seconds.
"""
if Scienta.isSimulated():
time.sleep(0.1)
else:
image_id = Scienta.currentImageCount
Scienta.start()
Scienta.waitReady(4000)
Scienta.stop()
Scienta.waitNewImage(500, image_id)
def trig_scienta():
if Scienta.isSimulated():
time.sleep(0.1)
else:
image_id = Scienta.currentImageCount
Scienta.start()
Scienta.waitReady(-1)
Scienta.waitNewImage(3000, image_id)
from keithley import KeiSample, KeiReference
def prepare_keithleys(dwell, triggered):
"""
prepare keithleys.
at the moment, the dwell time has to be set manually by selecting one of the poll modes
slow = 100 ms, medium = 20 ms, fast = 2 ms.
"""
KeiSample.prepare(dwell, triggered)
KeiReference.prepare(dwell, triggered)
def trig_keithleys():
"""
trigger keithleys, do not wait.
after this, you have to wait for at least the dwell time before reading the value!
"""
KeiSample.trig()
KeiReference.trig()
def wait_keithleys():
"""
wait for one dwell time so that the keithleys can finish their measurement.
"""
time.sleep(KeiSample.dwell * 2.2)
def fetch_keithleys():
"""
read the keithley readings into EPICS.
this requires that at least the dwell time has passed since the last trigger.
the value can then be read from the SampleCurrent and ReferenceCurrent devices.
"""
KeiSample.fetch()
KeiReference.fetch()
def release_keithleys():
"""
switch keithleys to free run.
0.1 s polling and dwell time
"""
KeiSample.release()
KeiReference.release()
diag_channels = []
diag_channels.append(Scienta.channelBegin) #diag_channels.append(ChannelDouble("ChannelBegin", "X03DA-SCIENTA:cam1:CHANNEL_BEGIN_RBV"))
diag_channels.append(Scienta.channelEnd) #diag_channels.append(ChannelDouble("ChannelEnd", "X03DA-SCIENTA:cam1:CHANNEL_END_RBV"))
diag_channels.append(Scienta.sliceBegin) # diag_channels.append(ChannelDouble("SliceBegin", "X03DA-SCIENTA:cam1:SLICE_BEGIN_RBV"))
diag_channels.append(Scienta.sliceEnd) #diag_channels.append(ChannelDouble("StepTime", "X03DA-SCIENTA:cam1:SLICE_END_RBV"))
diag_channels.append(Scienta.numSlices) # diag_channels.append(ChannelDouble("NumSlices", "X03DA-SCIENTA:cam1:SLICES_RBV"))
#diag_channels.append(Scienta.frames) # diag_channels.append(ChannelDouble("NumFrames", "X03DA-SCIENTA:cam1:FRAMES"))
diag_channels.append(Scienta.numChannels) #diag_channels.append(ChannelDouble("NumChannels", "X03DA-SCIENTA:cam1:NUM_CHANNELS_RBV"))
diag_channels.append(Scienta.lowEnergy) #diag_channels.append(ChannelDouble("LowEnergy", "X03DA-SCIENTA:cam1:LOW_ENERGY_RBV"))
diag_channels.append(Scienta.centerEnergy) #diag_channels.append(ChannelDouble("CenterEnergy", "X03DA-SCIENTA:cam1:CENTRE_ENERGY_RBV"))
diag_channels.append(Scienta.highEnergy) #diag_channels.append(ChannelDouble("HighEnergy", "X03DA-SCIENTA:cam1:HIGH_ENERGY_RBV"))
#TODO: These are not of Scienta device interface. Should be included?
#diag_channels.append(ChannelDouble("AcquisitionModeNum", "X03DA-SCIENTA:cam1:ACQ_MODE_RBV"))
#diag_channels.append(ChannelDouble("EnergyModeNum", "X03DA-SCIENTA:cam1:ENERGY_MODE_RBV"))
#diag_channels.append(ChannelDouble("LensModeNum", "X03DA-SCIENTA:cam1:LENS_MODE_RBV"))
#diag_channels.append(ChannelDouble("DetectorModeNum", "X03DA-SCIENTA:cam1:DETECTOR_MODE_RBV"))
#diag_channels.append(ChannelDouble("PassEnergyNum", "X03DA-SCIENTA:cam1:PASS_ENERGY_RBV"))
#diag_channels.append(ChannelDouble("ElementSetNum", "X03DA-SCIENTA:cam1:ELEMENT_SET_RBV"))
diag_channels.append(ScientaDwellTime)
diag_channels.append(AcquisitionMode) #diag_attrs.append(ChannelString("AcquisitionMode", "X03DA-SCIENTA:cam1:ACQ_MODE_RBV"))
diag_channels.append(EnergyMode) #diag_attrs.append(ChannelString("EnergyMode", "X03DA-SCIENTA:cam1:ENERGY_MODE_RBV"))
diag_channels.append(LensMode) #diag_attrs.append(ChannelString("LensMode", "X03DA-SCIENTA:cam1:LENS_MODE_RBV"))
diag_channels.append(DetectorMode) #diag_attrs.append(ChannelString("DetectorMode", "X03DA-SCIENTA:cam1:DETECTOR_MODE_RBV"))
diag_channels.append(PassEnergy) #diag_attrs.append(ChannelString("PassEnergy", "X03DA-SCIENTA:cam1:PASS_ENERGY_RBV"))
diag_channels.append(ElementSet) #diag_attrs.append(ChannelString("ElementSet", "X03DA-SCIENTA:cam1:ELEMENT_SET_RBV"))
diag_channels.append(ExcitationEnergy) #diag_channels.append(ChannelDouble("ExcitationEnergy", "X03DA-SCIENTA:cam1:EXCITATION_ENERGY_RBV"))
diag_channels.append(StepSize) #diag_channels.append(ChannelDouble("StepSize", "X03DA-SCIENTA:cam1:STEP_SIZE_RBV"))
diag_channels.append(NumIterations) #diag_channels.append(ChannelDouble("NumIterations", "X03DA-SCIENTA:cam1:NumExposures_RBV"))
diag_channels.append(AnalyserSlit) #diag_attrs.append(ChannelString("ElemeAnalyserSlitntSet", "X03DA-SCIENTA:cam1:ANALYSER_SLIT_RBV"))
#Manipulator Settings
diag_channels.append(ManipulatorX.readback)
diag_channels.append(ManipulatorY.readback)
diag_channels.append(ManipulatorZ.readback)
diag_channels.append(ManipulatorTheta.readback)
diag_channels.append(ManipulatorTilt.readback)
diag_channels.append(ManipulatorPhi.readback)
# Beamline Settings
diag_channels.append(MachineBumpXOffset)
diag_channels.append(MachineBumpXAngle)
diag_channels.append(MachineBumpYOffset)
diag_channels.append(MachineBumpYAngle)
diag_channels.append(DynamicBumpYOffset)
diag_channels.append(DynamicBumpYAngle)
diag_channels.append(FrontendVCenter)
diag_channels.append(FrontendVSize)
diag_channels.append(FrontendHCenter)
diag_channels.append(FrontendHSize)
diag_channels.append(MonoVCenter)
diag_channels.append(MonoVSize)
diag_channels.append(MonoBladeDown)
diag_channels.append(MonoBladeUp)
diag_channels.append(MonoHCenter)
diag_channels.append(MonoHSize)
diag_channels.append(MonoApertureMode)
diag_channels.append(RefocusVCenter)
diag_channels.append(RefocusVSize)
diag_channels.append(RefocusHCenter)
diag_channels.append(RefocusHSize)
diag_channels.append(FocusYTrans)
diag_channels.append(FocusZTrans)
diag_channels.append(FocusXRot)
diag_channels.append(FocusYRot)
diag_channels.append(FocusZRot)
diag_channels.append(RefocusYTrans)
diag_channels.append(RefocusZTrans)
diag_channels.append(RefocusXRot)
diag_channels.append(RefocusYRot)
diag_channels.append(RefocusZRot)
diag_channels.append(MonoEnergy)
diag_channels.append(MonoCff)
diag_channels.append(MonoBeta)
diag_channels.append(MonoTheta)
diag_channels.append(ExitSlit)
# Auxiliary Measurements
diag_channels.append(MachineCurrent)
diag_channels.append(FocusWaterTemp)
diag_channels.append(SampleCurrent)
diag_channels.append(RefCurrent)
#diag_channels.append(AuxCurrent)
#diag_channels.append(AuxVoltage)
diag_channels.append(SampleCurrentGain)
diag_channels.append(RefCurrentGain)
#diag_channels.append(AuxCurrentGain)
#diag_channels.append(SampleCurrentAveraging)
#diag_channels.append(RefCurrentAveraging)
#diag_channels.append(AuxCurrentAveraging)
#diag_channels.append(AuxVoltageAveraging)
#diag_channels.append(SampleCurrentSampling)
#diag_channels.append(RefCurrentSampling)
#diag_channels.append(AuxCurrentSampling)
#diag_channels.append(AuxVoltageSampling)
diag_channels.append(ChamberPressure)
diag_channels.append(BeamlinePressure)
diag_channels.append(ManipulatorTempA)
diag_channels.append(ManipulatorTempB)
diag_channels.append(ManipulatorCoolFlow)
diag_channels.append(ManipulatorCoolFlowSet)
diag_channels.append(MonoGrating)
diag_channels = sorted(diag_channels, key=lambda channel: channel.name)
def get_diag_name(diag):
return ch.psi.utils.Str.toTitleCase(diag.getName()).replace(" ", "").replace("Readback", "")
def print_diag():
for f in diag_channels:
print "%-25s %s" % (get_diag_name(f) , str(f.read()))
def create_diag_datasets(parent = None):
if parent is None:
parent = get_exec_pars().group
group = parent + "attrs/"
for f in diag_channels:
create_dataset(group+get_diag_name(f) , 's' if (type(f) is ch.psi.pshell.epics.ChannelString) else 'd')
def append_diag_datasets(parent = None):
if parent is None:
parent = get_exec_pars().group
group = parent + "attrs/"
for f in diag_channels:
try:
x = f.read()
if x is None:
x = '' if (type(f) is ch.psi.pshell.epics.ChannelString) else float('nan')
append_dataset(group+get_diag_name(f), x)
except:
log("Error sampling " + str(get_diag_name(f)) + ": " + str(sys.exc_info()[1]))
def create_metadata_datasets(parent = None):
if parent is None:
parent = "/"
group = parent + "general/"
for name in ["proposer", "proposal", "pgroup", "sample"]:
setting = get_setting(name)
save_dataset(group+name, setting if setting is not None else "", 's')
setting = get_setting("authors")
save_dataset(group+"authors", setting.split("|") if setting is not None else [""], '[s')
def wait_beam():
if not beam_ok:
print "Waiting for beam..."
while not beam_ok:
time.sleep(0.1)
print "Beam ok"
def before_readout():
sample_scienta = False
for dev in ["Scienta.spectrum","EnergyDistribution", "AngleDistribution", "Scienta.dataMatrix", "Counts"]:
if dev in SENSORS:
sample_scienta = True
break
for dev in [Scienta.spectrum,EnergyDistribution, AngleDistribution, Scienta.dataMatrix, Counts]:
if dev in SENSORS:
sample_scienta = True
break
wait_beam()
trig_keithleys()
if sample_scienta:
trig_scienta()
else:
wait_keithleys()
fetch_keithleys()
def after_readout(rec, scan):
if beam_ok:
if get_exec_pars().save:
if rec.index == 0:
if scan.index == 1:
create_metadata_datasets()
create_diag_datasets()
append_diag_datasets()
else:
rec.invalidate()
def after_scan():
"""
Close shutter and turn off analyser
"""
caput("X03DA-PC:AFTER-SCAN.PROC", 1)
caput("X03DA-OP-VG7:WT_SET", 0)
#caput("X03DA-FE-AB1:CLOSE4BL", 0)
#release_keithleys()
def set_adc_averaging(dwelltime=0.0):
if dwelltime == 0.0:
dwelltime = Scienta.getStepTime().read()
dwelltime = min(dwelltime, 20.0)
dwelltime = max(dwelltime, 0.1)
fixed = AcquisitionMode.read() == "Fixed"
else:
fixed = True
prepare_keithleys(dwelltime, fixed)
#value = Scienta.getStepTime().read() * 10.0 #averaging count in 100ms
#SampleCurrentAveraging.write(value)
#RefCurrentAveraging.write(value)
#AuxCurrentAveraging.write(value)
#AuxVoltageAveraging.write(value)
def adjust_sensors():
#Updating ranges from Scienta
Scienta.update()
global SENSORS
if SENSORS is not None:
# Move integration to end
#sample_scienta = False
for dev in ["Scienta.spectrum","EnergyDistribution", "AngleDistribution", "Scienta.dataMatrix"]:
if dev in SENSORS:
#sample_scienta = True
SENSORS=SENSORS+[SENSORS.pop(SENSORS.index(dev))]
for dev in ["Counts"]:
if dev in SENSORS:
#sample_scienta = True
SENSORS=[SENSORS.pop(SENSORS.index(dev))] + SENSORS
if "Scienta.dataMatrix" in SENSORS or Scienta.dataMatrix in SENSORS:
print "Not ACC"
set_exec_pars(accumulate = False)
#if sample_scienta:
# init_scienta()
#Device aliases for data files
set_device_alias(Scienta.dataMatrix, "ScientaImage")
set_device_alias(Scienta.spectrum, "ScientaSpectrum")
set_device_alias(Scienta.centerEnergy, get_diag_name(Scienta.centerEnergy))
set_device_alias(Scienta.lowEnergy, get_diag_name(Scienta.lowEnergy))
set_device_alias(Scienta.highEnergy, get_diag_name(Scienta.highEnergy))
#Additional device configuration
ManipulatorPhi.trustedWrite = False
def fit(ydata, xdata = None):
"""
"""
if xdata is None:
xdata = frange(0, len(ydata), 1)
max_y= max(ydata)
index_max = ydata.index(max_y)
max_x= xdata[index_max]
print "Max index:" + str(index_max),
print " x:" + str(max_x),
print " y:" + str(max_y)
gaussians = fit_gaussians(ydata, xdata, [index_max,])
(norm, mean, sigma) = gaussians[0]
p = plot([ydata],["data"],[xdata], title="Fit" )[0]
fitted_gaussian_function = Gaussian(norm, mean, sigma)
scale_x = [float(min(xdata)), float(max(xdata)) ]
points = max((len(xdata)+1), 100)
resolution = (scale_x[1]-scale_x[0]) / points
fit_y = []
fit_x = frange(scale_x[0],scale_x[1],resolution, True)
for x in fit_x:
fit_y.append(fitted_gaussian_function.value(x))
p.addSeries(LinePlotSeries("fit"))
p.getSeries(1).setData(fit_x, fit_y)
if abs(mean - xdata[index_max]) < ((scale_x[0] + scale_x[1])/2):
print "Mean -> " + str(mean)
p.addMarker(mean, None, "Mean="+str(round(norm,2)), Color.MAGENTA.darker())
return (norm, mean, sigma)
else:
p.addMarker(max_x, None, "Max="+str(round(max_x,2)), Color.GRAY)
print "Invalid gaussian fit: " + str(mean)
return (None, None, None)
def elog(title, message, attachments = [], author = None, category = "Info", domain = "", logbook = "Experiments", encoding=1):
"""
Add entry to ELOG.
"""
if author is None:
author = "pshell" #get_context().getUser().name
typ = "pshell"
entry = ""
cmd = 'G_CS_ELOG_add -l "' + logbook+ '" '
cmd = cmd + '-a "Author=' + author + '" '
cmd = cmd + '-a "Type=' + typ + '" '
cmd = cmd + '-a "Entry=' + entry + '" '
cmd = cmd + '-a "Title=' + title + '" '
cmd = cmd + '-a "Category=' + category + '" '
cmd = cmd + '-a "Domain=' + domain + '" '
for attachment in attachments:
cmd = cmd + '-f "' + attachment + '" '
cmd = cmd + '-n ' + str(encoding)
cmd = cmd + ' "' + message + '"'
#print cmd
#os.system (cmd)
#print os.popen(cmd).read()
import subprocess
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
(out, err) = proc.communicate()
if (err is not None) and err!="":
raise Exception(err)
print out
def get_plot_snapshots(title = None, file_type = "jpg", temp_path = get_context().setup.getContextPath()):
"""
Returns list with file names of plots snapshots from a plotting context.
"""
sleep(0.02) #Give some time to plot to be finished - it is not sync with acquisition
ret = []
for p in get_plots(title):
file_name = os.path.abspath(temp_path + "/" + p.getTitle() + "." + file_type)
p.saveSnapshot(file_name , file_type)
ret.append(file_name)
return ret