added CTA stuff

This commit is contained in:
2023-03-25 15:54:07 +01:00
parent 57ce108ab7
commit 683103d868

447
CTAstuff.py Normal file
View File

@ -0,0 +1,447 @@
from time import sleep
import numpy as np
from epics import PV
from tqdm import tqdm
from slic.core.acquisition import SFAcquisition
from slic.core.acquisition.sfacquisition import BSChannels, transpose_dicts, print_response
from slic.core.task import DAQTask
from devices import primeSample
class CCTA:
def __init__(self, ID):
self.ID = ID
self.pv_mode = PV(ID + ":REPETITION-SP")
self.pv_nreps = PV(ID + ":NR-REPETITIONS-SP")
self.pv_go = PV(ID + ":CTA-START-SEQ")
self.pv_pid = PV(ID + ":seq0Ctrl-StartedAt-O")
def burst(self, n=1):
self.set_nreps(n)
self.set_mode_burst()
self.go()
def set_nreps(self, n):
self.pv_nreps.put(n)
def set_mode_continuous(self):
self.pv_mode.put(0)
def set_mode_burst(self):
self.pv_mode.put(1)
def go(self):
self.pv_go.put(1)
@property
def pid(self):
pid = self.pv_pid.get()
if pid is not None:
pid = int(pid)
return pid
def __repr__(self):
tn = type(self).__name__
return f"{tn} \"{self.ID}\" started at pulse ID {self.pid}"
ccta = CCTA("SAR-CCTA-ESA")
def calc_coordinates_snake(A, B, C, stepsize_X, stepsize_Y, print_flag=True):
ang1 = np.arctan((B[1]-A[1])/(B[0]-A[0]))
ang2 = np.arctan((C[0]-B[0])/(C[1]-B[1]))
ang = ang1 #(np.abs(ang1)+np.abs(ang2))/2
stepX_x = stepsize_X*np.cos(ang)
stepX_y = stepsize_X*np.sin(ang)
stepY_x = - stepsize_Y*np.sin(ang)
stepY_y = stepsize_Y*np.cos(ang)
targetsX = int(1+np.rint(np.sqrt((B[0]-A[0])**2+(B[1]-A[1])**2) / stepsize_X))
targetsY = int(1+np.rint(np.sqrt((C[0]-B[0])**2+(C[1]-B[1])**2) / stepsize_Y))
rowX = np.linspace(A[0], A[0]+targetsX*stepX_x, targetsX, endpoint=False)
rowY = np.linspace(A[1], A[1]+targetsX*stepX_y, targetsX, endpoint=False)
colX = np.linspace(A[0], A[0]+targetsY*stepY_x, targetsY, endpoint=False)
colY = np.linspace(A[1], A[1]+targetsY*stepY_y, targetsY, endpoint=False)
coordx = []
for row in range(targetsY):
coordx.append(rowX+stepY_x*row)
coordy = []
for col in range(targetsX):
coordy.append(colY+stepX_y*col)
coordxF = np.array(coordx)
coordxF[1::2] = coordxF[1::2][..., ::-1]
coordxF = coordxF.flatten()
coordyTF = (np.array(coordy).T).flatten()
coord = list(zip(coordxF, coordyTF))
if print_flag:
for index, (x,y) in enumerate(coord):
print ('index {}:\tX={:.3f},\tY={:.3f}'.format(index,x,y))
print ('----------------------')
print ('Angles = {:.5f} and {:.5f} degrees'.format(np.abs(ang1*180/np.pi), np.abs(ang2*180/np.pi)))
print ('Targets in X = {}, Targets in Y = {}'.format(targetsX, targetsY))
print ('Calc Stepsizes: X = {:.5f}, Y = {:.5f}'.format(stepX_x, stepY_y) )
print ('{} total positions'.format(len(coordxF)))
return coord
def calc_coordinates_test(A, B, C, D, stepsize_X, stepsize_Y, print_flag=True, snake=False):
ntargetsX = 1 + int(round( np.sqrt((B[0]-A[0])**2+(B[1]-A[1])**2) / stepsize_X ))
ntargetsY = 1 + int(round( np.sqrt((C[0]-B[0])**2+(C[1]-B[1])**2) / stepsize_Y ))
ntargetsX_comp = 1 + int(round( np.sqrt((D[0]-C[0])**2+(D[1]-C[1])**2) / stepsize_X ))
ntargetsY_comp = 1 + int(round( np.sqrt((D[0]-A[0])**2+(D[1]-A[1])**2) / stepsize_Y ))
if ntargetsX != ntargetsX_comp:
raise ValueError(f"arguments inconsistent (X): {ntargetsX} != {ntargetsX_comp}")
if ntargetsY != ntargetsY_comp:
raise ValueError(f"arguments inconsistent (Y): {ntargetsY} != {ntargetsY_comp}")
leftlineX = np.linspace(A[0], D[0], ntargetsY)
leftlineY = np.linspace(A[1], D[1], ntargetsY)
rightlineX = np.linspace(B[0], C[0], ntargetsY)
rightlineY = np.linspace(B[1], C[1], ntargetsY)
coordsX = []
for left, right in zip(leftlineX, rightlineX):
coordsX.append(
np.linspace(left, right, ntargetsX)
)
coordsY = []
for left, right in zip(leftlineY, rightlineY):
coordsY.append(
np.linspace(left, right, ntargetsX)
)
coordsX = np.array(coordsX)
coordsY = np.array(coordsY)
if snake:
coordsX[1::2] = coordsX[1::2][..., ::-1]
coordsY[1::2] = coordsY[1::2][..., ::-1]
coordsXF = coordsX.flatten()
coordsYF = coordsY.flatten()
coord = list(zip(coordsXF, coordsYF))
if print_flag:
for index, (x,y) in enumerate(coord):
print ('index {}:\tX={:.3f},\tY={:.3f}'.format(index,x,y))
print ('----------------------')
# print ('Angles = {:.5f} and {:.5f} degrees'.format(np.abs(ang1*180/np.pi), np.abs(ang2*180/np.pi)))
print ('Targets in X = {}, Targets in Y = {}'.format(ntargetsX, ntargetsY))
# print ('Calc Stepsizes: X = {:.5f}, Y = {:.5f}'.format(stepX_x, stepY_y) )
print ('{} total positions'.format(len(coordsXF)))
return coord
def calc_coordinates2(A, B, C, stepsize_X, stepsize_Y, print_flag=True):
ang1 = np.arctan((B[1]-A[1])/(B[0]-A[0]))
ang2 = np.arctan((C[0]-B[0])/(C[1]-B[1]))
ang = ang1 #(np.abs(ang1)+np.abs(ang2))/2
stepX_x = stepsize_X*np.cos(ang)
stepX_y = stepsize_X*np.sin(ang)
stepY_x = - stepsize_Y*np.sin(ang)
stepY_y = stepsize_Y*np.cos(ang)
targetsX = int(1+np.rint(np.sqrt((B[0]-A[0])**2+(B[1]-A[1])**2) / stepsize_X))
targetsY = int(1+np.rint(np.sqrt((C[0]-B[0])**2+(C[1]-B[1])**2) / stepsize_Y))
rowX = np.linspace(A[0], A[0]+targetsX*stepX_x, targetsX, endpoint=False)
rowY = np.linspace(A[1], A[1]+targetsX*stepX_y, targetsX, endpoint=False)
colX = np.linspace(A[0], A[0]+targetsY*stepY_x, targetsY, endpoint=False)
colY = np.linspace(A[1], A[1]+targetsY*stepY_y, targetsY, endpoint=False)
coordx = []
for row in range(targetsY):
coordx.extend(rowX+stepY_x*row)
coordy = []
for col in range(targetsX):
coordy.append(colY+stepX_y*col)
coordyTF = (np.array(coordy).T).flatten()
coord = list(zip(coordx, coordyTF))
if print_flag:
for index, (x,y) in enumerate(coord):
print ('index {}:\tX={:.3f},\tY={:.3f}'.format(index,x,y))
print ('----------------------')
print ('Angles = {:.5f} and {:.5f} degrees'.format(np.abs(ang1*180/np.pi), np.abs(ang2*180/np.pi)))
print ('Targets in X = {}, Targets in Y = {}'.format(targetsX, targetsY))
print ('Calc Stepsizes: X = {:.5f}, Y = {:.5f}'.format(stepX_x, stepY_y) )
print ('{} total positions'.format(len(coordx)))
return coord
def calc_coordinates(A, B, C, targets_X, targets_Y, print_flag=True):
ang1 = np.arctan((B[1]-A[1])/(B[0]-A[0]))
ang2 = np.arctan((C[0]-B[0])/(C[1]-B[1]))
ang = ang1 #(np.abs(ang1)+np.abs(ang2))/2
#Bx = A[0]+(B[0]-A[0])*np.cos(ang)
#Cy = C[1]-(C[1]-B[1])*np.sin(ang)
Bx = np.cos(ang)*B[0]+np.sin(ang)*B[1]
By = -np.sin(ang)*B[0]+np.cos(ang)*B[1]
Cx = np.cos(ang)*C[0]+np.sin(ang)*C[1]
Cy = -np.sin(ang)*C[0]+np.cos(ang)*C[1]
x = np.linspace(A[0], Bx, targets_X)
y = np.linspace(By, Cy, targets_Y)
#x = np.arange(A[0], Bx+stepsize_x, stepsize_x)
#y = np.arange(A[1], Cy+stepsize_y, stepsize_y)
gx, gy = np.meshgrid(x,y)
gx[1::2] = gx[1::2][..., ::-1]
gxr = A[0] + (gx-A[0])*np.cos(ang) - (gy-A[1])*np.sin(ang)
gyr = A[1] + (gx-A[0])*np.sin(ang) + (gy-A[1])*np.cos(ang)
coord = list(zip(gxr.flatten(), gyr.flatten()))
if print_flag:
for index, (x,y) in enumerate(coord):
print ('index {}:\tX={:.3f},\tY={:.3f}'.format(index,x,y))
print ('----------------------')
print ('Angles = {:.5f} and {:.5f} degrees'.format(np.abs(ang1*180/np.pi), np.abs(ang2*180/np.pi)))
print ('{} total positions'.format(len(gxr.flatten())))
return coord
def static_acq(coord, nshots, start_pos=0):
pids_list = []
for index_pos, (x,y) in enumerate(tqdm(coord[start_pos:start_pos+nshots])):
tx = primeSample.x.set(x)
ty = primeSample.y.set(y)
for t in (tx,ty):
t.wait()
ccta.burst()
sleep(0.1)
pids_list.append(ccta.pid)
current_shot = start_pos + index_pos
tqdm.write('index = {}, X={:.3f}, Y={:.3f}: {}'.format(current_shot, primeSample.x.get_current_value(), primeSample.y.get_current_value(), ccta.pid))
next_pos = current_shot+1
print ('----------------------')
print ('Used {} targets until position: ({:.3f}, {:.3f})'.format((next_pos), *coord[current_shot]))
print ('{} targets left before reaching end point ({:.3f}, {:.3f})'.format(len(coord)-(next_pos), *coord[-1]))
print ('----------------------')
return pids_list, next_pos
#def delay_acq2(filename, delaylist, t0, coord, nshots, start_pos=0, savescan=True):
# next_pos=start_pos
# laser.pumpTopas_delay.delay.set(t0).wait()
# targets_remaining = len(coord)-start_pos
# pids_scan = []
# #delays = np.arange(start, stop+stepsize_delay, stepsize_delay)
# delays = delaylist
# run_number = None
# if (len(delays)*nshots) > targets_remaining:
# print ("Not enough targets to complete the scan! Missing {} targets".format(len(delays)*nshots - targets_remaining))
# else:
# print ("Will use {} targets out of the {} remaining".format(len(delays)*nshots, targets_remaining))
# print ('----------------------')
# for index_delay, delay in enumerate(delays):
# new_t = t0+delay
# laser.pumpTopas_delay.delay.set(new_t).wait()
# print ('Delay = {} fs, will record {} shots'.format(delay, nshots))
# pids_list, next_pos = static_acq(coord, nshots, start_pos=next_pos)
# pids_scan.extend(pids_list)
# if next_pos == len(coord):
# break
# if savescan:
# if run_number is None: run_number = daq.client.next_run()
# daq.retrieve(filename, pids_list, run_number)
# else:
# print("This is a dry mode, in reality will get run_number and do acquisition step retrieve")
# laser.pumpTopas_delay.delay.set(t0).wait()
# targets_used = next_pos
# print ('----------------------')
# if targets_used == len(coord):
# print ('All {} targets used, reached end position {}'.format(len(coord), coord[-1]))
# next_pos=0
# else:
# print ('Used {} targets until position: {}'.format((targets_used), coord[next_pos-1]))
# print ('{} targets left before reaching end point {}'.format(len(coord)-(targets_used), coord[-1]))
# next_pos = next_pos
# print ('Next scan will start from index {} at position {}'.format(next_pos, coord[next_pos]))
# return pids_scan, next_pos
#def delay_acq(filename, start, stop, stepsize_delay, t0, coord, nshots, start_pos=0, savescan=True):
# laser.pumpTopas_delay.delay.set(t0).wait()
# targets_remaining = len(coord)-start_pos
# pids_scan = []
# delays = np.arange(start, stop+stepsize_delay, stepsize_delay)
# if (len(delays)*nshots) > targets_remaining:
# print ("Not enough targets to complete the scan! Missing {} targets".format(len(delays)*nshots - targets_remaining))
# else:
# print ("Will use {} targets out of the {} remaining".format(len(delays)*nshots, targets_remaining))
# print ('----------------------')
# for index_delay, delay in enumerate(delays):
# new_t = t0+delay
# laser.pumpTopas_delay.delay.set(new_t).wait()
# for index_pos, (x,y) in enumerate(coord[start_pos+index_delay*nshots:start_pos+index_delay*nshots+nshots]):
# tx = primeSample.x.set(x)
# ty = primeSample.y.set(y)
# for t in (tx,ty):
# t.wait()
# ccta.burst()
# sleep(0.05)
# pids_scan.append(ccta.pid)
# current_shot = start_pos+(index_pos+index_delay*nshots)
# print ('delay={} fs, index = {}, X={:.3f}, Y={:.3f}: {}'.format(delay, current_shot, primeSample.x.get_current_value(), primeSample.y.get_current_value(), ccta.pid))
# laser.pumpTopas_delay.delay.set(t0).wait()
# targets_used = current_shot+1
# print ('----------------------')
# if targets_used == len(coord):
# print ('All {} targets used, reached end position {}'.format(len(coord), coord[-1]))
# next_pos=0
# else:
# print ('Used {} targets until position: {}'.format((targets_used), coord[current_shot]))
# print ('{} targets left before reaching end point {}'.format(len(coord)-(targets_used), coord[-1]))
# next_pos = current_shot+1
# print ('Next scan will start from index {} at position {}'.format(next_pos, coord[next_pos]))
# if savescan:
# daq.retrieve(filename, pids_scan)
# return pids_scan, next_pos
class CCAcquisition(SFAcquisition):
coords = None
current_pos = 0
def go_to_pos(self, pos):
tx = primeSample.x.set(pos[0])
ty = primeSample.y.set(pos[1])
for t in (tx,ty):
t.wait()
targetx = primeSample.x.get_current_value()
targety = primeSample.y.get_current_value()
print ("Moved to pos: X={:.3f}, Y={:.3f}".format(targetx, targety))
def update_coords(self, A, B, C, stepsize_x, stepsize_y, print_flag=True, snake=False):
if snake:
self.coords = calc_coordinates_snake(A, B, C, stepsize_x, stepsize_y, print_flag=print_flag)
else:
self.coords = calc_coordinates2(A, B, C, stepsize_x, stepsize_y, print_flag=print_flag)
self.current_pos = 0
def update_coords_test(self, A, B, C, D, stepsize_x, stepsize_y, print_flag=True, snake=False):
self.coords = calc_coordinates_test(A, B, C, D, stepsize_x, stepsize_y, print_flag=print_flag, snake=snake)
self.current_pos = 0
def acquire(self, filename, data_base_dir=None, detectors=None, channels=None, pvs=None, scan_info=None, n_pulses=100, n_repeat=1, is_scan_step=False, wait=True):
if self.coords is None:
raise ValueError("Please use update_coords(...)")
if len(self.coords) <= self.current_pos:
raise ValueError("All targets shots, need new ones, please use update_coords(...)")
if not is_scan_step:
run_number = self.client.next_run()
print(f"Advanced run number to {run_number}.")
else:
run_number = self.client.run_number
print(f"Continuing run number {run_number}.")
if not filename or filename == "/dev/null":
print("Skipping retrieval since no filename was given.")
return
if detectors is None:
print("No detectors specified, using default detector list.")
detectors = self.default_detectors
if pvs is None:
print("No PVs specified, using default PV list.")
pvs = self.default_pvs
if channels is None:
print("No channels specified, using default channel list.")
channels = self.default_channels
bschs = BSChannels(*channels)
bschs.check()
client = self.client
client.set_config(n_pulses, filename, detectors=detectors, channels=channels, pvs=pvs, scan_info=scan_info)
def _acquire():
pids_list, self.current_pos = static_acq(self.coords, n_pulses, start_pos=self.current_pos)
res = self.retrieve(filename, pids_list, run_number=run_number)
res = transpose_dicts(res)
filenames = res.pop("filenames")
print_response(res)
return filenames
task = DAQTask(_acquire, stopper=client.stop, filename=filename, hold=False)
self.current_task = task
if wait:
try:
task.wait()
except KeyboardInterrupt:
print("Stopped current DAQ task:")
return task
def __repr__(self):
if self.coords is None:
return super().__repr__()
coords = self.coords
current_pos = self.current_pos
if len(coords) <= current_pos:
return "is done"
res = [
super().__repr__(),
'Used {} targets until position: ({:.3f}, {:.3f})'.format(current_pos, *coords[current_pos]),
'{} targets left before reaching the end point ({:.3f}, {:.3f})'.format(len(coords) - current_pos, *coords[-1]),
]
return "\n".join(res)