Files
SwissMX/ctadaq.py

146 lines
4.5 KiB
Python

#!/usr/bin/env python
# *-----------------------------------------------------------------------*
# | |
# | Copyright (c) 2024 by Paul Scherrer Institute (http://www.psi.ch) |
# | |
# | Author xxx (xxx.xxx@psi.ch) |
# *-----------------------------------------------------------------------*
'''
slic interface class for CTAAcquisition
based on xxx code snipplets
'''
import numpy as np
from tqdm import tqdm
from time import time
from time import sleep
from slic.core.acquisition import SFAcquisition
from slic.core.acquisition.sfacquisition import BSChannels, transpose_dicts, print_response
from slic.core.task import DAQTask
class CTAAcquisition(SFAcquisition):
def __init__(self, cta, *args, n_block_size=None, **kwargs):
self.cta = cta
self.n_block_size = n_block_size
self.wait_time=60.0
super().__init__(*args, **kwargs)
def acquire(self, filename, data_base_dir=None, detectors=None, channels=None, pvs=None, scan_info=None, n_block_size=None, n_pulses=100, n_repeat=1, is_scan_step=False, wait=True, **kwargs):
if n_repeat != 1:
raise NotImplementedError("Repetitions are not implemented") #TODO
if not is_scan_step:
run_number = self.client.next_run()
print(f"Advanced run number to {run_number}.")
else:
run_number = self.client.run_number
print(f"Continuing run number {run_number}.")
# if not filename or filename == "/dev/null":
# print("Skipping retrieval since no filename was given.")
# return
if detectors is None:
print("No detectors specified, using default detector list.")
detectors = self.default_detectors
if pvs is None:
print("No PVs specified, using default PV list.")
pvs = self.default_pvs
if channels is None:
print("No channels specified, using default channel list.")
channels = self.default_channels
n_block_size = n_block_size or self.n_block_size
#print("block size:", n_block_size)
bschs = BSChannels(*channels)
bschs.check()
client = self.client
print(kwargs)
client.set_config(n_pulses, filename, detectors=detectors, channels=channels, pvs=pvs, scan_info=scan_info, **kwargs)
#print('set client')
def _acquire():
try:
self.cta.stop()
self.cta.start()
sleep(0.1) ###?
time_start = time()
start_pid = self.cta.get_start_pid()
print("CTA start pid:", start_pid)
stop_pid = start_pid + n_pulses
#print("CTA stop pid:", stop_pid)
pids = np.arange(start_pid, stop_pid)
pids_blocks = split(pids, n_block_size)
#print(pids_blocks)
for pb in tqdm(pids_blocks):
#print(pb)
if filename:
sleep(n_block_size / 100 + 0.2)
res = self.retrieve(filename, pb, run_number=run_number, scan_info=scan_info, **kwargs) # error here
res = transpose_dicts(res)
filenames = res.pop("filenames")
#print(filenames)
else:
res = {}
#print(res)
print_response(res)
# possibly optional, because cta should be done!
while self.cta.is_running():
sleep(self.wait_time)
delta_time = time() - time_start
print(f"Waiting since {delta_time} seconds for CTA sequence to finish")
return filenames
except Exception as e:
print("error we should get: ", e)
def stopper():
client.stop()
self.cta.stop()
task = DAQTask(_acquire, stopper=stopper, filename=filename, hold=False)
self.current_task = task
if wait:
try:
task.wait()
except KeyboardInterrupt:
print("Stopped current DAQ task:")
return task
def split(a, block_size):
if block_size is None:
return [a]
length = len(a)
indices = np.arange(block_size, length, block_size) # must not start at 0, otherwise the first entry is an empty array
return np.array_split(a, indices)
if __name__ == "__main__":
from slic.devices.timing.events import CTASequencer
cta = CTASequencer("SAT-CCTA-ESC")
daq = CTAAcquisition(cta, "cristallina", "p19150", default_channels=["SAR-CVME-TIFALL6:EvtSet"], append_user_tag_to_data_dir=True)
cta.cfg.repetitions = n_pulses # etc. etc.
daq.acquire("test")