separate by writers pedestal taking and retrieve/processing

This commit is contained in:
Dmitry Ozerov
2023-02-15 13:05:08 +01:00
committed by Data Backend account
parent 51a079330a
commit 929f405342
3 changed files with 40 additions and 28 deletions
+1 -2
View File
@@ -90,8 +90,7 @@ def detector_retrieve(request, output_file_detector):
_logger.info("Finished retrieve from the buffer")
if pedestal_run:
# sleep, to make sure h5 file is readable (strange but got problem rarely trying to read it)
sleep(60)
sleep(5)
time_start = time()
if detector in PEDESTAL_SPECIFIC:
+12 -8
View File
@@ -1,11 +1,13 @@
import argparse
import os
import json
from glob import glob
import logging
from datetime import datetime
from sf_daq_broker.writer.bsread_writer import write_from_imagebuffer, write_from_databuffer_api3
from sf_daq_broker.utils import get_data_api_request
import logging
from sf_daq_broker import config
from datetime import datetime
#logger = logging.getLogger("data_api3")
logger = logging.getLogger("broker_writer")
@@ -64,25 +66,27 @@ data_request["channels"] = [{'name': ch, 'backend': config.IMAGE_BACKEND if ch.e
run_number = run_info.get("run_number", 0)
acquisition_number = run_info.get("acquisition_number", 0)
user_tag = run_info.get("user_tag_cleaned", None)
parameters = None
if user_tag is not None:
run_dir_name = f'run{run_number:04}-{user_tag}'
list_data_directories_run = glob(f'/sf/{run_info["beamline"]}/data/{run_info["pgroup"]}/raw/run{run_number:04}*')
if len(list_data_directories_run) != 1:
print(f"Ambiguous data directries : {list_data_directories_run}")
exit()
data_directory=list_data_directories_run[0]
if source == "image":
output_file = f'/sf/{run_info["beamline"]}/data/{run_info["pgroup"]}/raw/{run_dir_name}/data/acq{acquisition_number:04}.CAMERAS.h5.2'
output_file = f'{data_directory}/data/acq{acquisition_number:04}.CAMERAS.h5.2'
write_from_imagebuffer(data_request, output_file, parameters)
elif source == "data_api3":
output_file = f'/sf/{run_info["beamline"]}/data/{run_info["pgroup"]}/raw/{run_dir_name}/data/acq{acquisition_number:04}.BSDATA.h5.2'
output_file = f'{data_directory}/data/acq{acquisition_number:04}.BSDATA.h5.2'
write_from_databuffer_api3(data_request, output_file, parameters)
else:
output_file = f'/sf/{run_info["beamline"]}/data/{run_info["pgroup"]}/raw/run{run_number:04}/data/acq{acquisition_number:04}.PVCHANNELS.h5'
output_file = f'{data_directory}/data/acq{acquisition_number:04}.PVCHANNELS.h5'
metadata = {
"general/user": run_info["pgroup"],
+27 -18
View File
@@ -10,7 +10,8 @@ from pika import BlockingConnection, ConnectionParameters, BasicProperties
from sf_daq_broker import config, utils
import sf_daq_broker.rabbitmq.config as broker_config
from sf_daq_broker.utils import get_data_api_request
from sf_daq_broker.rabbitmq.msg_broker_client import RabbitMqClient
from sf_daq_broker.utils import get_data_api_request, get_writer_request
from sf_daq_broker.writer.bsread_writer import write_from_imagebuffer, write_from_databuffer_api3
from sf_daq_broker.detector.pedestal import take_pedestal
from sf_daq_broker.writer.detector_writer import detector_retrieve
@@ -64,7 +65,7 @@ def wait_for_delay(request_timestamp, writer_type):
sleep(adjusted_retrieval_delay)
def process_request(request):
def process_request(request, broker_client):
writer_type = request["writer_type"]
channels = request.get("channels", None)
@@ -108,7 +109,7 @@ def process_request(request):
wait_for_delay(request_timestamp, writer_type)
_logger.info("Starting download.")
_logger.info("Starting payload.")
start_time = time()
@@ -122,8 +123,10 @@ def process_request(request):
elif writer_type == broker_config.TAG_PEDESTAL:
_logger.info("Doing pedestal.")
detectors = request.get("detectors", [])
det_start_pulse_id, det_stop_pulse_id = take_pedestal(detectors_name=detectors, rate=request.get("rate_multiplicator", 1))
# overwrite start/stop pulse_id's in run_info json file
run_file_json = request.get("run_file_json", None)
if run_file_json is not None:
@@ -144,24 +147,28 @@ def process_request(request):
"directory_name" : request.get("directory_name"),
"request_time" : request.get("request_time", str(datetime.now()))
}
broker_client.open()
for detector in detectors:
request_det_retrieve["detector_name"] = detector
request_det_retrieve["detectors"] = {}
request_det_retrieve["detectors"][detector] = {}
output_file_prefix = request.get("output_file_prefix", "/tmp/error")
output_file_det = f'{output_file_prefix}.{detector}.h5'
try:
detector_retrieve(request_det_retrieve, output_file_det)
except Exception as ex:
_logger.exception("Error while trying to retrieve and convert pedestal data")
sleep(120)
try:
detector_retrieve(request_det_retrieve, output_file_det)
except Exception as ex2:
_logger.exception("(second attempt) Error while trying to retrieve and convert pedestal data")
run_log_file_det = run_log_file[:-4] + "." + detector + ".log"
write_request = get_writer_request(writer_type=broker_config.TAG_DETECTOR_RETRIEVE,
channels=request_det_retrieve,
output_file=output_file_det,
metadata=None,
start_pulse_id=det_start_pulse_id,
stop_pulse_id=det_start_pulse_id,
run_log_file=run_log_file_det)
broker_client.send(write_request, broker_config.TAG_DETECTOR_RETRIEVE)
broker_client.close()
elif writer_type == broker_config.TAG_POWER_ON:
_logger.info("Power ON detector")
@@ -222,7 +229,7 @@ def reject_request(channel, method_frame, body, output_file, e):
update_status(channel, body, "write_rejected", output_file, str(e))
def on_broker_message(channel, method_frame, header_frame, body, connection):
def on_broker_message(channel, method_frame, header_frame, body, connection, broker_client):
try:
request = json.loads(body.decode())
@@ -232,7 +239,7 @@ def on_broker_message(channel, method_frame, header_frame, body, connection):
def process_async():
try:
process_request(request)
process_request(request, broker_client)
except Exception as ex:
_logger.exception("Error while trying to write a requested data.")
@@ -282,7 +289,9 @@ def start_service(broker_url, writer_type=0):
channel.basic_qos(prefetch_count=1)
on_broker_message_f = partial(on_broker_message, connection=connection)
broker_client = RabbitMqClient(broker_url=broker_url)
on_broker_message_f = partial(on_broker_message, connection=connection, broker_client=broker_client)
channel.basic_consume(request_queue, on_broker_message_f)
try: