From 861012344377d9ae2cd12f6ca754eaa5e3736be3 Mon Sep 17 00:00:00 2001 From: Sven Augustin Date: Thu, 5 Mar 2020 12:12:20 +0000 Subject: [PATCH] first iteration re-write BS/DIA wrapper and Acquisition (Thread wrapper) --- slic/daq/__init__.py | 4 ++-- slic/daq/bs.py | 55 +++++++++++++++++-------------------------- slic/daq/dia.py | 54 ++++++++++++++++++++++-------------------- slic/daq/utilities.py | 39 +++++++++++++++++------------- 4 files changed, 75 insertions(+), 77 deletions(-) diff --git a/slic/daq/__init__.py b/slic/daq/__init__.py index c84b60440..014751fc7 100644 --- a/slic/daq/__init__.py +++ b/slic/daq/__init__.py @@ -1,5 +1,5 @@ -from .bs import BS -from .dia import DIA +from .bs import BSCounter +from .dia import DIACounter diff --git a/slic/daq/bs.py b/slic/daq/bs.py index ce2a9b7d8..fee95c050 100644 --- a/slic/daq/bs.py +++ b/slic/daq/bs.py @@ -1,37 +1,26 @@ -from bsread import Source +import datetime +import os +import zmq + +import data_api as dapi from bsread.h5 import receive from bsread.avail import dispatcher -import zmq -import os -import data_api as api -import datetime -from threading import Thread from .utilities import Acquisition -from time import sleep +from .basecounter import BaseCounter -## for a hack called DIA -#try: -# import sys, os -# tpath = os.path.dirname(__file__) -# sys.path.insert(0,os.path.join(tpath,'../../detector_integration_api')) -# #ask Leo(2018.03.14): -# #sys.path.insert(0,os.path.join(tpath,'../../jungfrau_utils')) -# from detector_integration_api import DetectorIntegrationClient -#except: -# print('NB: detector integration could not be imported!') -class BS: +class BSCounter(BaseCounter): + def __init__(self, default_channel_list={'listname':[]}, - default_file_path='%s', - elog=None): + default_file_path='%s' + ): self._default_file_path = default_file_path self._default_channel_list = default_channel_list - self._elog = elog - def avail(self,*args,**kwargs): - return dispatcher.get_current_channels(*args,**kwargs) + def avail(self, *args, **kwargs): + return dispatcher.get_current_channels(*args, **kwargs) def check_channel_list(self, printResult=True, printOnlineChannels=False): all_available = set([i['name'] for i in self.avail()]) @@ -45,13 +34,13 @@ class BS: for listname in status.keys(): if printOnlineChannels: print('#### Online Channels in {} ####'.format(listname)) - print('\n'.join(status[listname]['online'])) + print('\n'.join(status[listname]['online'])) print('\n') print('#### Offline Channels in {} ####'.format(listname)) print('\n'.join(status[listname]['offline'])) else: return status - + def cleanup_channel_list(self,listname): status = self.check_channel_list(printResult=False) self._default_channel_list[listname] = \ @@ -61,11 +50,11 @@ class BS: print('NB: The channels will be back after restart if they originate from a config file.') - + def h5(self,fina=None,channel_list=None,N_pulses=None,default_path=True,queue_size=100): # if default_path: # fina = self._default_file_path%fina -# + if os.path.isfile(fina): print('!!! File %s already exists, would you like to delete it?'%fina) if input('(y/n)')=='y': @@ -76,7 +65,7 @@ class BS: if not channel_list: print('No channels specified, using default list \'%s\' instead.'%list(self._default_channel_list.keys())[0]) channel_list = self._default_channel_list[list(self._default_channel_list.keys())[0]] - + source = dispatcher.request_stream(channel_list) mode = zmq.SUB receive(source, fina, queue_size=queue_size, mode=mode, n_messages=N_pulses) @@ -88,13 +77,13 @@ class BS: now = datetime.datetime.now() end = now-datetime.timedelta(**end_time_delta) start = end-datetime.timedelta(**start_time_delta) - return api.get_data(channels=channel_list, start=start, end=end) + return dapi.get_data(channels=channel_list, start=start, end=end) def h5_db(self,fina,channel_list=None,start_time_delta=dict(),end_time_delta=dict(),default_path=True): - data = self.db(channel_list=None,start_time_delta=start_time_delta,end_time_delta=end_time_delta,default_path=True) + data = self.db(channel_list=None,start_time_delta=start_time_delta,end_time_delta=end_time_delta,default_path=True) if default_path: fina = self._default_file_path%fina - + if os.path.isfile(fina): print('!!! File %s already exists, would you like to delete it?'%fina) if input('(y/n)')=='y': @@ -102,7 +91,7 @@ class BS: os.remove(fina) else: return - + data.to_hdf(fina,"/data") @@ -110,7 +99,7 @@ class BS: file_name += '.h5' def acquire(): self.h5(fina=file_name,N_pulses=Npulses) - return Acquisition(acquire=acquire,acquisition_kwargs={'file_names':[file_name], 'Npulses':Npulses},hold=False) + return Acquisition(acquire=acquire, hold=False) def wait_done(self): self.check_running() diff --git a/slic/daq/dia.py b/slic/daq/dia.py index 834bccbac..2060f9339 100644 --- a/slic/daq/dia.py +++ b/slic/daq/dia.py @@ -3,11 +3,13 @@ import datetime from time import sleep from detector_integration_api import DetectorIntegrationClient from .utilities import Acquisition +from .basecounter import BaseCounter -class DIA: +class DIACounter(BaseCounter): + def __init__(self, instrument=None, api_address = "http://sf-daq-alvra:10000", jf_name=None, pgroup=None): - self._api_address = api_address + self._api_address = api_address self.client = DetectorIntegrationClient(api_address) print("\nDetector Integration API on %s" % api_address) # No pgroup by default @@ -28,8 +30,8 @@ class DIA: def update_config(self, ): self.writer_config = { - "output_file": "/sf/%s/data/%s/raw/test_data.h5" % (self.instrument, self.pgroup), - "user_id": self.pgroup, + "output_file": "/sf/%s/data/%s/raw/test_data.h5" % (self.instrument, self.pgroup), + "user_id": self.pgroup, "n_frames": self.n_frames, "general/user": str(self.pgroup), "general/process": __name__, @@ -39,21 +41,21 @@ class DIA: } self.backend_config = { - "n_frames": self.n_frames, - "bit_depth": 16, - "gain_corrections_filename": self.gain_file, # "/sf/alvra/config/jungfrau/jungfrau_4p5_gaincorrections_v0.h5", - #"gain_corrections_dataset": "gains", - #"pede_corrections_filename": "/sf/alvra/data/res/p%d/pedestal_20171210_1628_res.h5" % self.pgroup, - #"pede_corrections_dataset": "gains", - #"pede_mask_dataset": "pixel_mask", + "n_frames": self.n_frames, + "bit_depth": 16, + "gain_corrections_filename": self.gain_file, # "/sf/alvra/config/jungfrau/jungfrau_4p5_gaincorrections_v0.h5", + #"gain_corrections_dataset": "gains", + #"pede_corrections_filename": "/sf/alvra/data/res/p%d/pedestal_20171210_1628_res.h5" % self.pgroup, + #"pede_corrections_dataset": "gains", + #"pede_mask_dataset": "pixel_mask", #"activate_corrections_preview": True, "is_HG0": self.isHG0 } if self.pede_file != "": - self.backend_config["gain_corrections_filename"] = self.gain_file # "/sf/alvra/config/jungfrau/jungfrau_4p5_gaincorrections_v0.h5", + self.backend_config["gain_corrections_filename"] = self.gain_file # "/sf/alvra/config/jungfrau/jungfrau_4p5_gaincorrections_v0.h5", self.backend_config["gain_corrections_dataset"] = "gains" - self.backend_config["pede_corrections_filename"] = self.pede_file # "/sf/alvra/data/res/p%d/pedestal_20171210_1628_res.h5" % self.pgroup, + self.backend_config["pede_corrections_filename"] = self.pede_file # "/sf/alvra/data/res/p%d/pedestal_20171210_1628_res.h5" % self.pgroup, self.backend_config["pede_corrections_dataset"] = "gains" self.backend_config["pede_mask_dataset"] = "pixel_mask" self.backend_config["activate_corrections_preview"] = True @@ -68,20 +70,20 @@ class DIA: "timing": "trigger", # FIXME: HARDCODED: For Alvra JF4.5 it's 0.000005, Bernina is using 0.00001 - "exptime": 0.000005, + "exptime": 0.000005, "cycles": self.n_frames, #"delay" : 0.001992, "frames" : 1, "dr": 16, } - + # Not needed anymore? #default_channels_list = parseChannelListFile( # '/sf/alvra/config/com/channel_lists/default_channel_list') self.bsread_config = { - 'output_file': '/sf/%s/data/%s/raw/test_bsread.h5' % (self.instrument, self.pgroup), - 'user_id': self.pgroup, + 'output_file': '/sf/%s/data/%s/raw/test_bsread.h5' % (self.instrument, self.pgroup), + 'user_id': self.pgroup, "general/user": str(self.pgroup), "general/process": __name__, "general/created": str(datetime.datetime.now()), @@ -124,7 +126,7 @@ class DIA: #self.reset() #print('Just resetted in set_config') self.client.set_config({"writer": self.writer_config, "backend": self.backend_config, "detector": self.detector_config, "bsread": self.bsread_config}) - + def check_still_running(self, time_interval=.5): cfg = self.get_config() running = True @@ -137,14 +139,14 @@ class DIA: # break else: sleep(time_interval) - + def take_pedestal(self, n_frames, analyze=True, n_bad_modules=0, update_config=True, period=0.04): from jungfrau_utils.scripts.jungfrau_run_pedestals import run as jungfrau_utils_run directory = '/sf/%s/data/p%d/raw/JF_pedestals/' % (self.instrument, self.pgroup) if not os.path.exists(directory): print("Directory %s not existing, AND I CAN NOT CREATE IT, CALL DIMA" % directory) #os.makedirs(directory) - + res_dir = directory.replace("/raw/", "/res/") if not os.path.exists(res_dir): print("Directory %s not existing, creating it" % res_dir) @@ -158,7 +160,7 @@ class DIA: self.pede_file = (directory + filename).replace("raw/", "res/").replace(".h5", ".res.h5") print("Pedestal file updated to %s" % self.pede_file) return self.pede_file - + def start(self): self.client.start() print("start acquisition") @@ -166,7 +168,7 @@ class DIA: def stop(self): self.client.stop() - print("stop acquisition") + print("stop acquisition") pass def config_and_start_test(self): @@ -184,12 +186,12 @@ class DIA: bsread_padding? """ file_rootdir = '/sf/%s/data/%s/raw/' % (self.instrument, self.pgroup) - + if file_name is None: # FIXME /dev/null crashes the data taking (h5py can't close /dev/null and crashes) print("Not saving any data, as file_name is not set") - file_name_JF = file_rootdir + "DelMe" - file_name_bsread = file_rootdir + "DelMe" + file_name_JF = file_rootdir + "DelMe" + file_name_bsread = file_rootdir + "DelMe" else: # FIXME hardcoded file_name_JF = file_rootdir + file_name @@ -218,7 +220,7 @@ class DIA: 'output_file':file_name_bsread, # 'Npulses': Npulses + bsread_padding }) - + if reset_before: # print('Starting reset in acquire') self.reset() diff --git a/slic/daq/utilities.py b/slic/daq/utilities.py index e4d2b8c7d..78caafe27 100644 --- a/slic/daq/utilities.py +++ b/slic/daq/utilities.py @@ -1,29 +1,36 @@ from threading import Thread -class Acquisition: - def __init__(self, parent=None, acquire=None, acquisition_kwargs = {}, hold=True, stopper=None): - self.acquisition_kwargs = acquisition_kwargs - self.file_names = acquisition_kwargs['file_names'] - self._acquire = acquire - self._stopper = stopper - self._thread = Thread(target=self._acquire) - if not hold: - self._thread.start() - def wait(self): - self._thread.join() +class Acquisition: + + def __init__(self, acquire, stopper=None, hold=True): + self._stopper = stopper + self._thread = Thread(target=acquire) + if not hold: + self.start() def start(self): self._thread.start() + def stop(self): + if self._stopper is not None: + self._stopper() + + def wait(self): + self._thread.join() + + @property def status(self): if self._thread.ident is None: - return 'waiting' + return "waiting" else: if self._thread.isAlive(): - return 'acquiring' + return "acquiring" else: - return 'done' - def stop(self): - self._stopper() + return "done" + + def __repr__(self): + return "Acquisition {}".format(self.status) + +