from time import sleep import numpy as np from epics import PV from tqdm import tqdm from slic.core.acquisition import SFAcquisition from slic.core.acquisition.sfacquisition import BSChannels, transpose_dicts, print_response from slic.core.task import DAQTask from devices import primeSample class CCTA: def __init__(self, ID): self.ID = ID self.pv_mode = PV(ID + ":REPETITION-SP") self.pv_nreps = PV(ID + ":NR-REPETITIONS-SP") self.pv_go = PV(ID + ":CTA-START-SEQ") self.pv_pid = PV(ID + ":seq0Ctrl-StartedAt-O") def burst(self, n=1): self.set_nreps(n) self.set_mode_burst() self.go() def set_nreps(self, n): self.pv_nreps.put(n) def set_mode_continuous(self): self.pv_mode.put(0) def set_mode_burst(self): self.pv_mode.put(1) def go(self): self.pv_go.put(1) @property def pid(self): pid = self.pv_pid.get() if pid is not None: pid = int(pid) return pid def __repr__(self): tn = type(self).__name__ return f"{tn} \"{self.ID}\" started at pulse ID {self.pid}" ccta = CCTA("SAR-CCTA-ESA") def calc_coordinates_snake(A, B, C, stepsize_X, stepsize_Y, print_flag=True): ang1 = np.arctan((B[1]-A[1])/(B[0]-A[0])) ang2 = np.arctan((C[0]-B[0])/(C[1]-B[1])) ang = ang1 #(np.abs(ang1)+np.abs(ang2))/2 stepX_x = stepsize_X*np.cos(ang) stepX_y = stepsize_X*np.sin(ang) stepY_x = - stepsize_Y*np.sin(ang) stepY_y = stepsize_Y*np.cos(ang) targetsX = int(1+np.rint(np.sqrt((B[0]-A[0])**2+(B[1]-A[1])**2) / stepsize_X)) targetsY = int(1+np.rint(np.sqrt((C[0]-B[0])**2+(C[1]-B[1])**2) / stepsize_Y)) rowX = np.linspace(A[0], A[0]+targetsX*stepX_x, targetsX, endpoint=False) rowY = np.linspace(A[1], A[1]+targetsX*stepX_y, targetsX, endpoint=False) colX = np.linspace(A[0], A[0]+targetsY*stepY_x, targetsY, endpoint=False) colY = np.linspace(A[1], A[1]+targetsY*stepY_y, targetsY, endpoint=False) coordx = [] for row in range(targetsY): coordx.append(rowX+stepY_x*row) coordy = [] for col in range(targetsX): coordy.append(colY+stepX_y*col) coordxF = np.array(coordx) coordxF[1::2] = coordxF[1::2][..., ::-1] coordxF = coordxF.flatten() coordyTF = (np.array(coordy).T).flatten() coord = list(zip(coordxF, coordyTF)) if print_flag: for index, (x,y) in enumerate(coord): print ('index {}:\tX={:.3f},\tY={:.3f}'.format(index,x,y)) print ('----------------------') print ('Angles = {:.5f} and {:.5f} degrees'.format(np.abs(ang1*180/np.pi), np.abs(ang2*180/np.pi))) print ('Targets in X = {}, Targets in Y = {}'.format(targetsX, targetsY)) print ('Calc Stepsizes: X = {:.5f}, Y = {:.5f}'.format(stepX_x, stepY_y) ) print ('{} total positions'.format(len(coordxF))) return coord def calc_coordinates_test(A, B, C, D, stepsize_X, stepsize_Y, print_flag=True, snake=False): ntargetsX = 1 + int(round( np.sqrt((B[0]-A[0])**2+(B[1]-A[1])**2) / stepsize_X )) ntargetsY = 1 + int(round( np.sqrt((C[0]-B[0])**2+(C[1]-B[1])**2) / stepsize_Y )) ntargetsX_comp = 1 + int(round( np.sqrt((D[0]-C[0])**2+(D[1]-C[1])**2) / stepsize_X )) ntargetsY_comp = 1 + int(round( np.sqrt((D[0]-A[0])**2+(D[1]-A[1])**2) / stepsize_Y )) if ntargetsX != ntargetsX_comp: raise ValueError(f"arguments inconsistent (X): {ntargetsX} != {ntargetsX_comp}") if ntargetsY != ntargetsY_comp: raise ValueError(f"arguments inconsistent (Y): {ntargetsY} != {ntargetsY_comp}") leftlineX = np.linspace(A[0], D[0], ntargetsY) leftlineY = np.linspace(A[1], D[1], ntargetsY) rightlineX = np.linspace(B[0], C[0], ntargetsY) rightlineY = np.linspace(B[1], C[1], ntargetsY) coordsX = [] for left, right in zip(leftlineX, rightlineX): coordsX.append( np.linspace(left, right, ntargetsX) ) coordsY = [] for left, right in zip(leftlineY, rightlineY): coordsY.append( np.linspace(left, right, ntargetsX) ) coordsX = np.array(coordsX) coordsY = np.array(coordsY) if snake: coordsX[1::2] = coordsX[1::2][..., ::-1] coordsY[1::2] = coordsY[1::2][..., ::-1] coordsXF = coordsX.flatten() coordsYF = coordsY.flatten() coord = list(zip(coordsXF, coordsYF)) if print_flag: for index, (x,y) in enumerate(coord): print ('index {}:\tX={:.3f},\tY={:.3f}'.format(index,x,y)) print ('----------------------') # print ('Angles = {:.5f} and {:.5f} degrees'.format(np.abs(ang1*180/np.pi), np.abs(ang2*180/np.pi))) print ('Targets in X = {}, Targets in Y = {}'.format(ntargetsX, ntargetsY)) # print ('Calc Stepsizes: X = {:.5f}, Y = {:.5f}'.format(stepX_x, stepY_y) ) print ('{} total positions'.format(len(coordsXF))) return coord def calc_coordinates2(A, B, C, stepsize_X, stepsize_Y, print_flag=True): ang1 = np.arctan((B[1]-A[1])/(B[0]-A[0])) ang2 = np.arctan((C[0]-B[0])/(C[1]-B[1])) ang = ang1 #(np.abs(ang1)+np.abs(ang2))/2 stepX_x = stepsize_X*np.cos(ang) stepX_y = stepsize_X*np.sin(ang) stepY_x = - stepsize_Y*np.sin(ang) stepY_y = stepsize_Y*np.cos(ang) targetsX = int(1+np.rint(np.sqrt((B[0]-A[0])**2+(B[1]-A[1])**2) / stepsize_X)) targetsY = int(1+np.rint(np.sqrt((C[0]-B[0])**2+(C[1]-B[1])**2) / stepsize_Y)) rowX = np.linspace(A[0], A[0]+targetsX*stepX_x, targetsX, endpoint=False) rowY = np.linspace(A[1], A[1]+targetsX*stepX_y, targetsX, endpoint=False) colX = np.linspace(A[0], A[0]+targetsY*stepY_x, targetsY, endpoint=False) colY = np.linspace(A[1], A[1]+targetsY*stepY_y, targetsY, endpoint=False) coordx = [] for row in range(targetsY): coordx.extend(rowX+stepY_x*row) coordy = [] for col in range(targetsX): coordy.append(colY+stepX_y*col) coordyTF = (np.array(coordy).T).flatten() coord = list(zip(coordx, coordyTF)) if print_flag: for index, (x,y) in enumerate(coord): print ('index {}:\tX={:.3f},\tY={:.3f}'.format(index,x,y)) print ('----------------------') print ('Angles = {:.5f} and {:.5f} degrees'.format(np.abs(ang1*180/np.pi), np.abs(ang2*180/np.pi))) print ('Targets in X = {}, Targets in Y = {}'.format(targetsX, targetsY)) print ('Calc Stepsizes: X = {:.5f}, Y = {:.5f}'.format(stepX_x, stepY_y) ) print ('{} total positions'.format(len(coordx))) return coord def calc_coordinates(A, B, C, targets_X, targets_Y, print_flag=True): ang1 = np.arctan((B[1]-A[1])/(B[0]-A[0])) ang2 = np.arctan((C[0]-B[0])/(C[1]-B[1])) ang = ang1 #(np.abs(ang1)+np.abs(ang2))/2 #Bx = A[0]+(B[0]-A[0])*np.cos(ang) #Cy = C[1]-(C[1]-B[1])*np.sin(ang) Bx = np.cos(ang)*B[0]+np.sin(ang)*B[1] By = -np.sin(ang)*B[0]+np.cos(ang)*B[1] Cx = np.cos(ang)*C[0]+np.sin(ang)*C[1] Cy = -np.sin(ang)*C[0]+np.cos(ang)*C[1] x = np.linspace(A[0], Bx, targets_X) y = np.linspace(By, Cy, targets_Y) #x = np.arange(A[0], Bx+stepsize_x, stepsize_x) #y = np.arange(A[1], Cy+stepsize_y, stepsize_y) gx, gy = np.meshgrid(x,y) gx[1::2] = gx[1::2][..., ::-1] gxr = A[0] + (gx-A[0])*np.cos(ang) - (gy-A[1])*np.sin(ang) gyr = A[1] + (gx-A[0])*np.sin(ang) + (gy-A[1])*np.cos(ang) coord = list(zip(gxr.flatten(), gyr.flatten())) if print_flag: for index, (x,y) in enumerate(coord): print ('index {}:\tX={:.3f},\tY={:.3f}'.format(index,x,y)) print ('----------------------') print ('Angles = {:.5f} and {:.5f} degrees'.format(np.abs(ang1*180/np.pi), np.abs(ang2*180/np.pi))) print ('{} total positions'.format(len(gxr.flatten()))) return coord def static_acq(coord, nshots, start_pos=0): pids_list = [] for index_pos, (x,y) in enumerate(tqdm(coord[start_pos:start_pos+nshots])): tx = primeSample.x.set(x) ty = primeSample.y.set(y) for t in (tx,ty): t.wait() ccta.burst() sleep(0.1) pids_list.append(ccta.pid) current_shot = start_pos + index_pos tqdm.write('index = {}, X={:.3f}, Y={:.3f}: {}'.format(current_shot, primeSample.x.get_current_value(), primeSample.y.get_current_value(), ccta.pid)) next_pos = current_shot+1 print ('----------------------') print ('Used {} targets until position: ({:.3f}, {:.3f})'.format((next_pos), *coord[current_shot])) print ('{} targets left before reaching end point ({:.3f}, {:.3f})'.format(len(coord)-(next_pos), *coord[-1])) print ('----------------------') return pids_list, next_pos #def delay_acq2(filename, delaylist, t0, coord, nshots, start_pos=0, savescan=True): # next_pos=start_pos # laser.pumpTopas_delay.delay.set(t0).wait() # targets_remaining = len(coord)-start_pos # pids_scan = [] # #delays = np.arange(start, stop+stepsize_delay, stepsize_delay) # delays = delaylist # run_number = None # if (len(delays)*nshots) > targets_remaining: # print ("Not enough targets to complete the scan! Missing {} targets".format(len(delays)*nshots - targets_remaining)) # else: # print ("Will use {} targets out of the {} remaining".format(len(delays)*nshots, targets_remaining)) # print ('----------------------') # for index_delay, delay in enumerate(delays): # new_t = t0+delay # laser.pumpTopas_delay.delay.set(new_t).wait() # print ('Delay = {} fs, will record {} shots'.format(delay, nshots)) # pids_list, next_pos = static_acq(coord, nshots, start_pos=next_pos) # pids_scan.extend(pids_list) # if next_pos == len(coord): # break # if savescan: # if run_number is None: run_number = daq.client.next_run() # daq.retrieve(filename, pids_list, run_number) # else: # print("This is a dry mode, in reality will get run_number and do acquisition step retrieve") # laser.pumpTopas_delay.delay.set(t0).wait() # targets_used = next_pos # print ('----------------------') # if targets_used == len(coord): # print ('All {} targets used, reached end position {}'.format(len(coord), coord[-1])) # next_pos=0 # else: # print ('Used {} targets until position: {}'.format((targets_used), coord[next_pos-1])) # print ('{} targets left before reaching end point {}'.format(len(coord)-(targets_used), coord[-1])) # next_pos = next_pos # print ('Next scan will start from index {} at position {}'.format(next_pos, coord[next_pos])) # return pids_scan, next_pos #def delay_acq(filename, start, stop, stepsize_delay, t0, coord, nshots, start_pos=0, savescan=True): # laser.pumpTopas_delay.delay.set(t0).wait() # targets_remaining = len(coord)-start_pos # pids_scan = [] # delays = np.arange(start, stop+stepsize_delay, stepsize_delay) # if (len(delays)*nshots) > targets_remaining: # print ("Not enough targets to complete the scan! Missing {} targets".format(len(delays)*nshots - targets_remaining)) # else: # print ("Will use {} targets out of the {} remaining".format(len(delays)*nshots, targets_remaining)) # print ('----------------------') # for index_delay, delay in enumerate(delays): # new_t = t0+delay # laser.pumpTopas_delay.delay.set(new_t).wait() # for index_pos, (x,y) in enumerate(coord[start_pos+index_delay*nshots:start_pos+index_delay*nshots+nshots]): # tx = primeSample.x.set(x) # ty = primeSample.y.set(y) # for t in (tx,ty): # t.wait() # ccta.burst() # sleep(0.05) # pids_scan.append(ccta.pid) # current_shot = start_pos+(index_pos+index_delay*nshots) # print ('delay={} fs, index = {}, X={:.3f}, Y={:.3f}: {}'.format(delay, current_shot, primeSample.x.get_current_value(), primeSample.y.get_current_value(), ccta.pid)) # laser.pumpTopas_delay.delay.set(t0).wait() # targets_used = current_shot+1 # print ('----------------------') # if targets_used == len(coord): # print ('All {} targets used, reached end position {}'.format(len(coord), coord[-1])) # next_pos=0 # else: # print ('Used {} targets until position: {}'.format((targets_used), coord[current_shot])) # print ('{} targets left before reaching end point {}'.format(len(coord)-(targets_used), coord[-1])) # next_pos = current_shot+1 # print ('Next scan will start from index {} at position {}'.format(next_pos, coord[next_pos])) # if savescan: # daq.retrieve(filename, pids_scan) # return pids_scan, next_pos class CCAcquisition(SFAcquisition): coords = None current_pos = 0 def go_to_pos(self, pos): tx = primeSample.x.set(pos[0]) ty = primeSample.y.set(pos[1]) for t in (tx,ty): t.wait() targetx = primeSample.x.get_current_value() targety = primeSample.y.get_current_value() print ("Moved to pos: X={:.3f}, Y={:.3f}".format(targetx, targety)) def update_coords(self, A, B, C, stepsize_x, stepsize_y, print_flag=True, snake=False): if snake: self.coords = calc_coordinates_snake(A, B, C, stepsize_x, stepsize_y, print_flag=print_flag) else: self.coords = calc_coordinates2(A, B, C, stepsize_x, stepsize_y, print_flag=print_flag) self.current_pos = 0 def update_coords_test(self, A, B, C, D, stepsize_x, stepsize_y, print_flag=True, snake=False): self.coords = calc_coordinates_test(A, B, C, D, stepsize_x, stepsize_y, print_flag=print_flag, snake=snake) self.current_pos = 0 def acquire(self, filename, data_base_dir=None, detectors=None, channels=None, pvs=None, scan_info=None, n_pulses=100, n_repeat=1, is_scan_step=False, wait=True): if self.coords is None: raise ValueError("Please use update_coords(...)") if len(self.coords) <= self.current_pos: raise ValueError("All targets shots, need new ones, please use update_coords(...)") if not is_scan_step: run_number = self.client.next_run() print(f"Advanced run number to {run_number}.") else: run_number = self.client.run_number print(f"Continuing run number {run_number}.") if not filename or filename == "/dev/null": print("Skipping retrieval since no filename was given.") return if detectors is None: print("No detectors specified, using default detector list.") detectors = self.default_detectors if pvs is None: print("No PVs specified, using default PV list.") pvs = self.default_pvs if channels is None: print("No channels specified, using default channel list.") channels = self.default_channels bschs = BSChannels(*channels) bschs.check() client = self.client client.set_config(n_pulses, filename, detectors=detectors, channels=channels, pvs=pvs, scan_info=scan_info) def _acquire(): pids_list, self.current_pos = static_acq(self.coords, n_pulses, start_pos=self.current_pos) res = self.retrieve(filename, pids_list, run_number=run_number) res = transpose_dicts(res) filenames = res.pop("filenames") print_response(res) return filenames task = DAQTask(_acquire, stopper=client.stop, filename=filename, hold=False) self.current_task = task if wait: try: task.wait() except KeyboardInterrupt: print("Stopped current DAQ task:") return task def __repr__(self): if self.coords is None: return super().__repr__() coords = self.coords current_pos = self.current_pos if len(coords) <= current_pos: return "is done" res = [ super().__repr__(), 'Used {} targets until position: ({:.3f}, {:.3f})'.format(current_pos, *coords[current_pos]), '{} targets left before reaching the end point ({:.3f}, {:.3f})'.format(len(coords) - current_pos, *coords[-1]), ] return "\n".join(res)