first iteration re-write BS/DIA wrapper and Acquisition (Thread wrapper)

This commit is contained in:
2020-03-05 12:12:20 +00:00
parent cec3923501
commit 8610123443
4 changed files with 75 additions and 77 deletions
+2 -2
View File
@@ -1,5 +1,5 @@
from .bs import BS
from .dia import DIA
from .bs import BSCounter
from .dia import DIACounter
+22 -33
View File
@@ -1,37 +1,26 @@
from bsread import Source
import datetime
import os
import zmq
import data_api as dapi
from bsread.h5 import receive
from bsread.avail import dispatcher
import zmq
import os
import data_api as api
import datetime
from threading import Thread
from .utilities import Acquisition
from time import sleep
from .basecounter import BaseCounter
## for a hack called DIA
#try:
# import sys, os
# tpath = os.path.dirname(__file__)
# sys.path.insert(0,os.path.join(tpath,'../../detector_integration_api'))
# #ask Leo(2018.03.14):
# #sys.path.insert(0,os.path.join(tpath,'../../jungfrau_utils'))
# from detector_integration_api import DetectorIntegrationClient
#except:
# print('NB: detector integration could not be imported!')
class BS:
class BSCounter(BaseCounter):
def __init__(self,
default_channel_list={'listname':[]},
default_file_path='%s',
elog=None):
default_file_path='%s'
):
self._default_file_path = default_file_path
self._default_channel_list = default_channel_list
self._elog = elog
def avail(self,*args,**kwargs):
return dispatcher.get_current_channels(*args,**kwargs)
def avail(self, *args, **kwargs):
return dispatcher.get_current_channels(*args, **kwargs)
def check_channel_list(self, printResult=True, printOnlineChannels=False):
all_available = set([i['name'] for i in self.avail()])
@@ -45,13 +34,13 @@ class BS:
for listname in status.keys():
if printOnlineChannels:
print('#### Online Channels in {} ####'.format(listname))
print('\n'.join(status[listname]['online']))
print('\n'.join(status[listname]['online']))
print('\n')
print('#### Offline Channels in {} ####'.format(listname))
print('\n'.join(status[listname]['offline']))
else:
return status
def cleanup_channel_list(self,listname):
status = self.check_channel_list(printResult=False)
self._default_channel_list[listname] = \
@@ -61,11 +50,11 @@ class BS:
print('NB: The channels will be back after restart if they originate from a config file.')
def h5(self,fina=None,channel_list=None,N_pulses=None,default_path=True,queue_size=100):
# if default_path:
# fina = self._default_file_path%fina
#
if os.path.isfile(fina):
print('!!! File %s already exists, would you like to delete it?'%fina)
if input('(y/n)')=='y':
@@ -76,7 +65,7 @@ class BS:
if not channel_list:
print('No channels specified, using default list \'%s\' instead.'%list(self._default_channel_list.keys())[0])
channel_list = self._default_channel_list[list(self._default_channel_list.keys())[0]]
source = dispatcher.request_stream(channel_list)
mode = zmq.SUB
receive(source, fina, queue_size=queue_size, mode=mode, n_messages=N_pulses)
@@ -88,13 +77,13 @@ class BS:
now = datetime.datetime.now()
end = now-datetime.timedelta(**end_time_delta)
start = end-datetime.timedelta(**start_time_delta)
return api.get_data(channels=channel_list, start=start, end=end)
return dapi.get_data(channels=channel_list, start=start, end=end)
def h5_db(self,fina,channel_list=None,start_time_delta=dict(),end_time_delta=dict(),default_path=True):
data = self.db(channel_list=None,start_time_delta=start_time_delta,end_time_delta=end_time_delta,default_path=True)
data = self.db(channel_list=None,start_time_delta=start_time_delta,end_time_delta=end_time_delta,default_path=True)
if default_path:
fina = self._default_file_path%fina
if os.path.isfile(fina):
print('!!! File %s already exists, would you like to delete it?'%fina)
if input('(y/n)')=='y':
@@ -102,7 +91,7 @@ class BS:
os.remove(fina)
else:
return
data.to_hdf(fina,"/data")
@@ -110,7 +99,7 @@ class BS:
file_name += '.h5'
def acquire():
self.h5(fina=file_name,N_pulses=Npulses)
return Acquisition(acquire=acquire,acquisition_kwargs={'file_names':[file_name], 'Npulses':Npulses},hold=False)
return Acquisition(acquire=acquire, hold=False)
def wait_done(self):
self.check_running()
+28 -26
View File
@@ -3,11 +3,13 @@ import datetime
from time import sleep
from detector_integration_api import DetectorIntegrationClient
from .utilities import Acquisition
from .basecounter import BaseCounter
class DIA:
class DIACounter(BaseCounter):
def __init__(self, instrument=None, api_address = "http://sf-daq-alvra:10000", jf_name=None, pgroup=None):
self._api_address = api_address
self._api_address = api_address
self.client = DetectorIntegrationClient(api_address)
print("\nDetector Integration API on %s" % api_address)
# No pgroup by default
@@ -28,8 +30,8 @@ class DIA:
def update_config(self, ):
self.writer_config = {
"output_file": "/sf/%s/data/%s/raw/test_data.h5" % (self.instrument, self.pgroup),
"user_id": self.pgroup,
"output_file": "/sf/%s/data/%s/raw/test_data.h5" % (self.instrument, self.pgroup),
"user_id": self.pgroup,
"n_frames": self.n_frames,
"general/user": str(self.pgroup),
"general/process": __name__,
@@ -39,21 +41,21 @@ class DIA:
}
self.backend_config = {
"n_frames": self.n_frames,
"bit_depth": 16,
"gain_corrections_filename": self.gain_file, # "/sf/alvra/config/jungfrau/jungfrau_4p5_gaincorrections_v0.h5",
#"gain_corrections_dataset": "gains",
#"pede_corrections_filename": "/sf/alvra/data/res/p%d/pedestal_20171210_1628_res.h5" % self.pgroup,
#"pede_corrections_dataset": "gains",
#"pede_mask_dataset": "pixel_mask",
"n_frames": self.n_frames,
"bit_depth": 16,
"gain_corrections_filename": self.gain_file, # "/sf/alvra/config/jungfrau/jungfrau_4p5_gaincorrections_v0.h5",
#"gain_corrections_dataset": "gains",
#"pede_corrections_filename": "/sf/alvra/data/res/p%d/pedestal_20171210_1628_res.h5" % self.pgroup,
#"pede_corrections_dataset": "gains",
#"pede_mask_dataset": "pixel_mask",
#"activate_corrections_preview": True,
"is_HG0": self.isHG0
}
if self.pede_file != "":
self.backend_config["gain_corrections_filename"] = self.gain_file # "/sf/alvra/config/jungfrau/jungfrau_4p5_gaincorrections_v0.h5",
self.backend_config["gain_corrections_filename"] = self.gain_file # "/sf/alvra/config/jungfrau/jungfrau_4p5_gaincorrections_v0.h5",
self.backend_config["gain_corrections_dataset"] = "gains"
self.backend_config["pede_corrections_filename"] = self.pede_file # "/sf/alvra/data/res/p%d/pedestal_20171210_1628_res.h5" % self.pgroup,
self.backend_config["pede_corrections_filename"] = self.pede_file # "/sf/alvra/data/res/p%d/pedestal_20171210_1628_res.h5" % self.pgroup,
self.backend_config["pede_corrections_dataset"] = "gains"
self.backend_config["pede_mask_dataset"] = "pixel_mask"
self.backend_config["activate_corrections_preview"] = True
@@ -68,20 +70,20 @@ class DIA:
"timing": "trigger",
# FIXME: HARDCODED: For Alvra JF4.5 it's 0.000005, Bernina is using 0.00001
"exptime": 0.000005,
"exptime": 0.000005,
"cycles": self.n_frames,
#"delay" : 0.001992,
"frames" : 1,
"dr": 16,
}
# Not needed anymore?
#default_channels_list = parseChannelListFile(
# '/sf/alvra/config/com/channel_lists/default_channel_list')
self.bsread_config = {
'output_file': '/sf/%s/data/%s/raw/test_bsread.h5' % (self.instrument, self.pgroup),
'user_id': self.pgroup,
'output_file': '/sf/%s/data/%s/raw/test_bsread.h5' % (self.instrument, self.pgroup),
'user_id': self.pgroup,
"general/user": str(self.pgroup),
"general/process": __name__,
"general/created": str(datetime.datetime.now()),
@@ -124,7 +126,7 @@ class DIA:
#self.reset()
#print('Just resetted in set_config')
self.client.set_config({"writer": self.writer_config, "backend": self.backend_config, "detector": self.detector_config, "bsread": self.bsread_config})
def check_still_running(self, time_interval=.5):
cfg = self.get_config()
running = True
@@ -137,14 +139,14 @@ class DIA:
# break
else:
sleep(time_interval)
def take_pedestal(self, n_frames, analyze=True, n_bad_modules=0, update_config=True, period=0.04):
from jungfrau_utils.scripts.jungfrau_run_pedestals import run as jungfrau_utils_run
directory = '/sf/%s/data/p%d/raw/JF_pedestals/' % (self.instrument, self.pgroup)
if not os.path.exists(directory):
print("Directory %s not existing, AND I CAN NOT CREATE IT, CALL DIMA" % directory)
#os.makedirs(directory)
res_dir = directory.replace("/raw/", "/res/")
if not os.path.exists(res_dir):
print("Directory %s not existing, creating it" % res_dir)
@@ -158,7 +160,7 @@ class DIA:
self.pede_file = (directory + filename).replace("raw/", "res/").replace(".h5", ".res.h5")
print("Pedestal file updated to %s" % self.pede_file)
return self.pede_file
def start(self):
self.client.start()
print("start acquisition")
@@ -166,7 +168,7 @@ class DIA:
def stop(self):
self.client.stop()
print("stop acquisition")
print("stop acquisition")
pass
def config_and_start_test(self):
@@ -184,12 +186,12 @@ class DIA:
bsread_padding?
"""
file_rootdir = '/sf/%s/data/%s/raw/' % (self.instrument, self.pgroup)
if file_name is None:
# FIXME /dev/null crashes the data taking (h5py can't close /dev/null and crashes)
print("Not saving any data, as file_name is not set")
file_name_JF = file_rootdir + "DelMe"
file_name_bsread = file_rootdir + "DelMe"
file_name_JF = file_rootdir + "DelMe"
file_name_bsread = file_rootdir + "DelMe"
else:
# FIXME hardcoded
file_name_JF = file_rootdir + file_name
@@ -218,7 +220,7 @@ class DIA:
'output_file':file_name_bsread,
# 'Npulses': Npulses + bsread_padding
})
if reset_before:
# print('Starting reset in acquire')
self.reset()
+23 -16
View File
@@ -1,29 +1,36 @@
from threading import Thread
class Acquisition:
def __init__(self, parent=None, acquire=None, acquisition_kwargs = {}, hold=True, stopper=None):
self.acquisition_kwargs = acquisition_kwargs
self.file_names = acquisition_kwargs['file_names']
self._acquire = acquire
self._stopper = stopper
self._thread = Thread(target=self._acquire)
if not hold:
self._thread.start()
def wait(self):
self._thread.join()
class Acquisition:
def __init__(self, acquire, stopper=None, hold=True):
self._stopper = stopper
self._thread = Thread(target=acquire)
if not hold:
self.start()
def start(self):
self._thread.start()
def stop(self):
if self._stopper is not None:
self._stopper()
def wait(self):
self._thread.join()
@property
def status(self):
if self._thread.ident is None:
return 'waiting'
return "waiting"
else:
if self._thread.isAlive():
return 'acquiring'
return "acquiring"
else:
return 'done'
def stop(self):
self._stopper()
return "done"
def __repr__(self):
return "Acquisition {}".format(self.status)