From 683103d868db7e5766649a353538523cc9c007ba Mon Sep 17 00:00:00 2001 From: Sven Augustin Date: Sat, 25 Mar 2023 15:54:07 +0100 Subject: [PATCH] added CTA stuff --- CTAstuff.py | 447 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 447 insertions(+) create mode 100644 CTAstuff.py diff --git a/CTAstuff.py b/CTAstuff.py new file mode 100644 index 0000000..17be279 --- /dev/null +++ b/CTAstuff.py @@ -0,0 +1,447 @@ +from time import sleep +import numpy as np +from epics import PV +from tqdm import tqdm + +from slic.core.acquisition import SFAcquisition +from slic.core.acquisition.sfacquisition import BSChannels, transpose_dicts, print_response +from slic.core.task import DAQTask + +from devices import primeSample + + + +class CCTA: + + def __init__(self, ID): + self.ID = ID + self.pv_mode = PV(ID + ":REPETITION-SP") + self.pv_nreps = PV(ID + ":NR-REPETITIONS-SP") + self.pv_go = PV(ID + ":CTA-START-SEQ") + self.pv_pid = PV(ID + ":seq0Ctrl-StartedAt-O") + + def burst(self, n=1): + self.set_nreps(n) + self.set_mode_burst() + self.go() + + def set_nreps(self, n): + self.pv_nreps.put(n) + + def set_mode_continuous(self): + self.pv_mode.put(0) + + def set_mode_burst(self): + self.pv_mode.put(1) + + def go(self): + self.pv_go.put(1) + + @property + def pid(self): + pid = self.pv_pid.get() + if pid is not None: + pid = int(pid) + return pid + + def __repr__(self): + tn = type(self).__name__ + return f"{tn} \"{self.ID}\" started at pulse ID {self.pid}" + + + +ccta = CCTA("SAR-CCTA-ESA") + +def calc_coordinates_snake(A, B, C, stepsize_X, stepsize_Y, print_flag=True): + + ang1 = np.arctan((B[1]-A[1])/(B[0]-A[0])) + ang2 = np.arctan((C[0]-B[0])/(C[1]-B[1])) + ang = ang1 #(np.abs(ang1)+np.abs(ang2))/2 + + stepX_x = stepsize_X*np.cos(ang) + stepX_y = stepsize_X*np.sin(ang) + + stepY_x = - stepsize_Y*np.sin(ang) + stepY_y = stepsize_Y*np.cos(ang) + + targetsX = int(1+np.rint(np.sqrt((B[0]-A[0])**2+(B[1]-A[1])**2) / stepsize_X)) + targetsY = int(1+np.rint(np.sqrt((C[0]-B[0])**2+(C[1]-B[1])**2) / stepsize_Y)) + + rowX = np.linspace(A[0], A[0]+targetsX*stepX_x, targetsX, endpoint=False) + rowY = np.linspace(A[1], A[1]+targetsX*stepX_y, targetsX, endpoint=False) + + colX = np.linspace(A[0], A[0]+targetsY*stepY_x, targetsY, endpoint=False) + colY = np.linspace(A[1], A[1]+targetsY*stepY_y, targetsY, endpoint=False) + + coordx = [] + for row in range(targetsY): + coordx.append(rowX+stepY_x*row) + + coordy = [] + for col in range(targetsX): + coordy.append(colY+stepX_y*col) + + coordxF = np.array(coordx) + coordxF[1::2] = coordxF[1::2][..., ::-1] + + coordxF = coordxF.flatten() + coordyTF = (np.array(coordy).T).flatten() + + coord = list(zip(coordxF, coordyTF)) + + if print_flag: + for index, (x,y) in enumerate(coord): + print ('index {}:\tX={:.3f},\tY={:.3f}'.format(index,x,y)) + print ('----------------------') + print ('Angles = {:.5f} and {:.5f} degrees'.format(np.abs(ang1*180/np.pi), np.abs(ang2*180/np.pi))) + print ('Targets in X = {}, Targets in Y = {}'.format(targetsX, targetsY)) + print ('Calc Stepsizes: X = {:.5f}, Y = {:.5f}'.format(stepX_x, stepY_y) ) + print ('{} total positions'.format(len(coordxF))) + + return coord + + + +def calc_coordinates_test(A, B, C, D, stepsize_X, stepsize_Y, print_flag=True, snake=False): + ntargetsX = 1 + int(round( np.sqrt((B[0]-A[0])**2+(B[1]-A[1])**2) / stepsize_X )) + ntargetsY = 1 + int(round( np.sqrt((C[0]-B[0])**2+(C[1]-B[1])**2) / stepsize_Y )) + + ntargetsX_comp = 1 + int(round( np.sqrt((D[0]-C[0])**2+(D[1]-C[1])**2) / stepsize_X )) + ntargetsY_comp = 1 + int(round( np.sqrt((D[0]-A[0])**2+(D[1]-A[1])**2) / stepsize_Y )) + + if ntargetsX != ntargetsX_comp: + raise ValueError(f"arguments inconsistent (X): {ntargetsX} != {ntargetsX_comp}") + if ntargetsY != ntargetsY_comp: + raise ValueError(f"arguments inconsistent (Y): {ntargetsY} != {ntargetsY_comp}") + + leftlineX = np.linspace(A[0], D[0], ntargetsY) + leftlineY = np.linspace(A[1], D[1], ntargetsY) + + rightlineX = np.linspace(B[0], C[0], ntargetsY) + rightlineY = np.linspace(B[1], C[1], ntargetsY) + + coordsX = [] + for left, right in zip(leftlineX, rightlineX): + coordsX.append( + np.linspace(left, right, ntargetsX) + ) + + coordsY = [] + for left, right in zip(leftlineY, rightlineY): + coordsY.append( + np.linspace(left, right, ntargetsX) + ) + + coordsX = np.array(coordsX) + coordsY = np.array(coordsY) + + if snake: + coordsX[1::2] = coordsX[1::2][..., ::-1] + coordsY[1::2] = coordsY[1::2][..., ::-1] + + coordsXF = coordsX.flatten() + coordsYF = coordsY.flatten() + + coord = list(zip(coordsXF, coordsYF)) + + if print_flag: + for index, (x,y) in enumerate(coord): + print ('index {}:\tX={:.3f},\tY={:.3f}'.format(index,x,y)) + print ('----------------------') +# print ('Angles = {:.5f} and {:.5f} degrees'.format(np.abs(ang1*180/np.pi), np.abs(ang2*180/np.pi))) + print ('Targets in X = {}, Targets in Y = {}'.format(ntargetsX, ntargetsY)) +# print ('Calc Stepsizes: X = {:.5f}, Y = {:.5f}'.format(stepX_x, stepY_y) ) + print ('{} total positions'.format(len(coordsXF))) + + return coord + + + +def calc_coordinates2(A, B, C, stepsize_X, stepsize_Y, print_flag=True): + + ang1 = np.arctan((B[1]-A[1])/(B[0]-A[0])) + ang2 = np.arctan((C[0]-B[0])/(C[1]-B[1])) + ang = ang1 #(np.abs(ang1)+np.abs(ang2))/2 + + stepX_x = stepsize_X*np.cos(ang) + stepX_y = stepsize_X*np.sin(ang) + + stepY_x = - stepsize_Y*np.sin(ang) + stepY_y = stepsize_Y*np.cos(ang) + + targetsX = int(1+np.rint(np.sqrt((B[0]-A[0])**2+(B[1]-A[1])**2) / stepsize_X)) + targetsY = int(1+np.rint(np.sqrt((C[0]-B[0])**2+(C[1]-B[1])**2) / stepsize_Y)) + + rowX = np.linspace(A[0], A[0]+targetsX*stepX_x, targetsX, endpoint=False) + rowY = np.linspace(A[1], A[1]+targetsX*stepX_y, targetsX, endpoint=False) + + colX = np.linspace(A[0], A[0]+targetsY*stepY_x, targetsY, endpoint=False) + colY = np.linspace(A[1], A[1]+targetsY*stepY_y, targetsY, endpoint=False) + + coordx = [] + for row in range(targetsY): + coordx.extend(rowX+stepY_x*row) + + coordy = [] + for col in range(targetsX): + coordy.append(colY+stepX_y*col) + + coordyTF = (np.array(coordy).T).flatten() + + coord = list(zip(coordx, coordyTF)) + + if print_flag: + for index, (x,y) in enumerate(coord): + print ('index {}:\tX={:.3f},\tY={:.3f}'.format(index,x,y)) + print ('----------------------') + print ('Angles = {:.5f} and {:.5f} degrees'.format(np.abs(ang1*180/np.pi), np.abs(ang2*180/np.pi))) + print ('Targets in X = {}, Targets in Y = {}'.format(targetsX, targetsY)) + print ('Calc Stepsizes: X = {:.5f}, Y = {:.5f}'.format(stepX_x, stepY_y) ) + print ('{} total positions'.format(len(coordx))) + + return coord + + + +def calc_coordinates(A, B, C, targets_X, targets_Y, print_flag=True): + ang1 = np.arctan((B[1]-A[1])/(B[0]-A[0])) + ang2 = np.arctan((C[0]-B[0])/(C[1]-B[1])) + ang = ang1 #(np.abs(ang1)+np.abs(ang2))/2 + + #Bx = A[0]+(B[0]-A[0])*np.cos(ang) + #Cy = C[1]-(C[1]-B[1])*np.sin(ang) + + Bx = np.cos(ang)*B[0]+np.sin(ang)*B[1] + By = -np.sin(ang)*B[0]+np.cos(ang)*B[1] + Cx = np.cos(ang)*C[0]+np.sin(ang)*C[1] + Cy = -np.sin(ang)*C[0]+np.cos(ang)*C[1] + + x = np.linspace(A[0], Bx, targets_X) + y = np.linspace(By, Cy, targets_Y) + + #x = np.arange(A[0], Bx+stepsize_x, stepsize_x) + #y = np.arange(A[1], Cy+stepsize_y, stepsize_y) + + gx, gy = np.meshgrid(x,y) + gx[1::2] = gx[1::2][..., ::-1] + + gxr = A[0] + (gx-A[0])*np.cos(ang) - (gy-A[1])*np.sin(ang) + gyr = A[1] + (gx-A[0])*np.sin(ang) + (gy-A[1])*np.cos(ang) + + coord = list(zip(gxr.flatten(), gyr.flatten())) + if print_flag: + for index, (x,y) in enumerate(coord): + print ('index {}:\tX={:.3f},\tY={:.3f}'.format(index,x,y)) + print ('----------------------') + print ('Angles = {:.5f} and {:.5f} degrees'.format(np.abs(ang1*180/np.pi), np.abs(ang2*180/np.pi))) + print ('{} total positions'.format(len(gxr.flatten()))) + + return coord + + +def static_acq(coord, nshots, start_pos=0): + pids_list = [] + for index_pos, (x,y) in enumerate(tqdm(coord[start_pos:start_pos+nshots])): + tx = primeSample.x.set(x) + ty = primeSample.y.set(y) + for t in (tx,ty): + t.wait() + ccta.burst() + sleep(0.1) + pids_list.append(ccta.pid) + current_shot = start_pos + index_pos + tqdm.write('index = {}, X={:.3f}, Y={:.3f}: {}'.format(current_shot, primeSample.x.get_current_value(), primeSample.y.get_current_value(), ccta.pid)) + next_pos = current_shot+1 + + print ('----------------------') + print ('Used {} targets until position: ({:.3f}, {:.3f})'.format((next_pos), *coord[current_shot])) + print ('{} targets left before reaching end point ({:.3f}, {:.3f})'.format(len(coord)-(next_pos), *coord[-1])) + print ('----------------------') + + return pids_list, next_pos + + +#def delay_acq2(filename, delaylist, t0, coord, nshots, start_pos=0, savescan=True): +# next_pos=start_pos +# laser.pumpTopas_delay.delay.set(t0).wait() +# targets_remaining = len(coord)-start_pos +# pids_scan = [] +# #delays = np.arange(start, stop+stepsize_delay, stepsize_delay) +# delays = delaylist +# run_number = None +# if (len(delays)*nshots) > targets_remaining: +# print ("Not enough targets to complete the scan! Missing {} targets".format(len(delays)*nshots - targets_remaining)) +# else: +# print ("Will use {} targets out of the {} remaining".format(len(delays)*nshots, targets_remaining)) +# print ('----------------------') +# for index_delay, delay in enumerate(delays): +# new_t = t0+delay +# laser.pumpTopas_delay.delay.set(new_t).wait() +# print ('Delay = {} fs, will record {} shots'.format(delay, nshots)) +# pids_list, next_pos = static_acq(coord, nshots, start_pos=next_pos) +# pids_scan.extend(pids_list) +# if next_pos == len(coord): +# break +# if savescan: +# if run_number is None: run_number = daq.client.next_run() +# daq.retrieve(filename, pids_list, run_number) +# else: +# print("This is a dry mode, in reality will get run_number and do acquisition step retrieve") +# laser.pumpTopas_delay.delay.set(t0).wait() +# targets_used = next_pos +# print ('----------------------') +# if targets_used == len(coord): +# print ('All {} targets used, reached end position {}'.format(len(coord), coord[-1])) +# next_pos=0 +# else: +# print ('Used {} targets until position: {}'.format((targets_used), coord[next_pos-1])) +# print ('{} targets left before reaching end point {}'.format(len(coord)-(targets_used), coord[-1])) +# next_pos = next_pos +# print ('Next scan will start from index {} at position {}'.format(next_pos, coord[next_pos])) +# return pids_scan, next_pos + + +#def delay_acq(filename, start, stop, stepsize_delay, t0, coord, nshots, start_pos=0, savescan=True): +# laser.pumpTopas_delay.delay.set(t0).wait() +# targets_remaining = len(coord)-start_pos +# pids_scan = [] +# delays = np.arange(start, stop+stepsize_delay, stepsize_delay) +# if (len(delays)*nshots) > targets_remaining: +# print ("Not enough targets to complete the scan! Missing {} targets".format(len(delays)*nshots - targets_remaining)) +# else: +# print ("Will use {} targets out of the {} remaining".format(len(delays)*nshots, targets_remaining)) +# print ('----------------------') +# for index_delay, delay in enumerate(delays): +# new_t = t0+delay +# laser.pumpTopas_delay.delay.set(new_t).wait() +# for index_pos, (x,y) in enumerate(coord[start_pos+index_delay*nshots:start_pos+index_delay*nshots+nshots]): +# tx = primeSample.x.set(x) +# ty = primeSample.y.set(y) +# for t in (tx,ty): +# t.wait() +# ccta.burst() +# sleep(0.05) +# pids_scan.append(ccta.pid) +# current_shot = start_pos+(index_pos+index_delay*nshots) +# print ('delay={} fs, index = {}, X={:.3f}, Y={:.3f}: {}'.format(delay, current_shot, primeSample.x.get_current_value(), primeSample.y.get_current_value(), ccta.pid)) +# laser.pumpTopas_delay.delay.set(t0).wait() +# targets_used = current_shot+1 +# print ('----------------------') +# if targets_used == len(coord): +# print ('All {} targets used, reached end position {}'.format(len(coord), coord[-1])) +# next_pos=0 +# else: +# print ('Used {} targets until position: {}'.format((targets_used), coord[current_shot])) +# print ('{} targets left before reaching end point {}'.format(len(coord)-(targets_used), coord[-1])) +# next_pos = current_shot+1 +# print ('Next scan will start from index {} at position {}'.format(next_pos, coord[next_pos])) +# if savescan: +# daq.retrieve(filename, pids_scan) +# return pids_scan, next_pos + + + +class CCAcquisition(SFAcquisition): + + coords = None + current_pos = 0 + + def go_to_pos(self, pos): + tx = primeSample.x.set(pos[0]) + ty = primeSample.y.set(pos[1]) + for t in (tx,ty): + t.wait() + targetx = primeSample.x.get_current_value() + targety = primeSample.y.get_current_value() + print ("Moved to pos: X={:.3f}, Y={:.3f}".format(targetx, targety)) + + def update_coords(self, A, B, C, stepsize_x, stepsize_y, print_flag=True, snake=False): + if snake: + self.coords = calc_coordinates_snake(A, B, C, stepsize_x, stepsize_y, print_flag=print_flag) + else: + self.coords = calc_coordinates2(A, B, C, stepsize_x, stepsize_y, print_flag=print_flag) + self.current_pos = 0 + + + def update_coords_test(self, A, B, C, D, stepsize_x, stepsize_y, print_flag=True, snake=False): + self.coords = calc_coordinates_test(A, B, C, D, stepsize_x, stepsize_y, print_flag=print_flag, snake=snake) + self.current_pos = 0 + + + def acquire(self, filename, data_base_dir=None, detectors=None, channels=None, pvs=None, scan_info=None, n_pulses=100, n_repeat=1, is_scan_step=False, wait=True): + if self.coords is None: + raise ValueError("Please use update_coords(...)") + + if len(self.coords) <= self.current_pos: + raise ValueError("All targets shots, need new ones, please use update_coords(...)") + + if not is_scan_step: + run_number = self.client.next_run() + print(f"Advanced run number to {run_number}.") + else: + run_number = self.client.run_number + print(f"Continuing run number {run_number}.") + + if not filename or filename == "/dev/null": + print("Skipping retrieval since no filename was given.") + return + + if detectors is None: + print("No detectors specified, using default detector list.") + detectors = self.default_detectors + + if pvs is None: + print("No PVs specified, using default PV list.") + pvs = self.default_pvs + + if channels is None: + print("No channels specified, using default channel list.") + channels = self.default_channels + + bschs = BSChannels(*channels) + bschs.check() + + client = self.client + client.set_config(n_pulses, filename, detectors=detectors, channels=channels, pvs=pvs, scan_info=scan_info) + + def _acquire(): + pids_list, self.current_pos = static_acq(self.coords, n_pulses, start_pos=self.current_pos) + res = self.retrieve(filename, pids_list, run_number=run_number) + + res = transpose_dicts(res) + filenames = res.pop("filenames") + print_response(res) + return filenames + + task = DAQTask(_acquire, stopper=client.stop, filename=filename, hold=False) + self.current_task = task + + if wait: + try: + task.wait() + except KeyboardInterrupt: + print("Stopped current DAQ task:") + + return task + + + + def __repr__(self): + if self.coords is None: + return super().__repr__() + + coords = self.coords + current_pos = self.current_pos + + if len(coords) <= current_pos: + return "is done" + + res = [ + super().__repr__(), + 'Used {} targets until position: ({:.3f}, {:.3f})'.format(current_pos, *coords[current_pos]), + '{} targets left before reaching the end point ({:.3f}, {:.3f})'.format(len(coords) - current_pos, *coords[-1]), + ] + return "\n".join(res) + + +