#!/usr/bin/env python # *-----------------------------------------------------------------------* # | | # | Copyright (c) 2024 by Paul Scherrer Institute (http://www.psi.ch) | # | | # | Author xxx (xxx.xxx@psi.ch) | # *-----------------------------------------------------------------------* ''' slic interface class for CTAAcquisition based on xxx code snipplets ''' import numpy as np from tqdm import tqdm from time import time from time import sleep from slic.core.acquisition import SFAcquisition from slic.core.acquisition.sfacquisition import BSChannels, transpose_dicts, print_response from slic.core.task import DAQTask class CTAAcquisition(SFAcquisition): def __init__(self, cta, *args, n_block_size=None, **kwargs): self.cta = cta self.n_block_size = n_block_size self.wait_time=60.0 super().__init__(*args, **kwargs) def acquire(self, filename, data_base_dir=None, detectors=None, channels=None, pvs=None, scan_info=None, n_block_size=None, n_pulses=100, n_repeat=1, is_scan_step=False, wait=True, **kwargs): if n_repeat != 1: raise NotImplementedError("Repetitions are not implemented") #TODO if not is_scan_step: run_number = self.client.next_run() print(f"Advanced run number to {run_number}.") else: run_number = self.client.run_number print(f"Continuing run number {run_number}.") # if not filename or filename == "/dev/null": # print("Skipping retrieval since no filename was given.") # return if detectors is None: print("No detectors specified, using default detector list.") detectors = self.default_detectors if pvs is None: print("No PVs specified, using default PV list.") pvs = self.default_pvs if channels is None: print("No channels specified, using default channel list.") channels = self.default_channels n_block_size = n_block_size or self.n_block_size #print("block size:", n_block_size) bschs = BSChannels(*channels) bschs.check() client = self.client print(kwargs) client.set_config(n_pulses, filename, detectors=detectors, channels=channels, pvs=pvs, scan_info=scan_info, **kwargs) #print('set client') def _acquire(): try: self.cta.stop() self.cta.start() sleep(0.1) ###? time_start = time() start_pid = self.cta.get_start_pid() print("CTA start pid:", start_pid) stop_pid = start_pid + n_pulses #print("CTA stop pid:", stop_pid) pids = np.arange(start_pid, stop_pid) pids_blocks = split(pids, n_block_size) #print(pids_blocks) for pb in tqdm(pids_blocks): #print(pb) if filename: sleep(n_block_size / 100 + 0.2) res = self.retrieve(filename, pb, run_number=run_number, scan_info=scan_info, **kwargs) # error here res = transpose_dicts(res) filenames = res.pop("filenames") #print(filenames) else: res = {} #print(res) print_response(res) # possibly optional, because cta should be done! while self.cta.is_running(): sleep(self.wait_time) delta_time = time() - time_start print(f"Waiting since {delta_time} seconds for CTA sequence to finish") return filenames except Exception as e: print("error we should get: ", e) def stopper(): client.stop() self.cta.stop() task = DAQTask(_acquire, stopper=stopper, filename=filename, hold=False) self.current_task = task if wait: try: task.wait() except KeyboardInterrupt: print("Stopped current DAQ task:") return task def split(a, block_size): if block_size is None: return [a] length = len(a) indices = np.arange(block_size, length, block_size) # must not start at 0, otherwise the first entry is an empty array return np.array_split(a, indices) if __name__ == "__main__": from slic.devices.timing.events import CTASequencer cta = CTASequencer("SAT-CCTA-ESC") daq = CTAAcquisition(cta, "cristallina", "p19150", default_channels=["SAR-CVME-TIFALL6:EvtSet"], append_user_tag_to_data_dir=True) cta.cfg.repetitions = n_pulses # etc. etc. daq.acquire("test")