From 70c6e754f3a717e80b64994bc81c1d091de57b4e Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Tue, 29 Sep 2020 09:56:01 +0200 Subject: [PATCH] Cleanup of version files --- CMakeLists.txt | 2 - core-broker/core-broker/__init__.py | 0 core-broker/core-broker/broker_client.py | 71 ------- core-broker/core-broker/broker_debugger.py | 70 ------- epics-writer/Readme.md | 34 --- epics-writer/conda-recipe/build.sh | 3 - epics-writer/conda-recipe/meta.yaml | 20 -- epics-writer/epics_writer/__init__.py | 0 epics-writer/epics_writer/start.py | 99 --------- epics-writer/epics_writer/writer.py | 120 ----------- epics-writer/setup.py | 15 -- epics-writer/tests/__init__.py | 0 epics-writer/tests/channels.txt | 6 - epics-writer/tests/test_download_data.py | 34 --- jf-live-daq/CMakeLists.txt | 26 --- jf-live-daq/src/main.cpp | 49 ----- jf-live-daq/test/CMakeLists.txt | 10 - jf-live-daq/test/main.cpp | 9 - jf-live-writer/CMakeLists.txt | 22 -- jf-live-writer/include/BinaryReader.hpp | 28 --- jf-live-writer/include/JFH5LiveWriter.hpp | 49 ----- jf-live-writer/include/LiveImageAssembler.hpp | 51 ----- jf-live-writer/include/live_writer_config.hpp | 9 - jf-live-writer/src/BinaryReader.cpp | 102 --------- jf-live-writer/src/JFH5LiveWriter.cpp | 133 ------------ jf-live-writer/src/LiveImageAssembler.cpp | 159 -------------- jf-live-writer/src/main.cpp | 195 ------------------ jf-live-writer/test/CMakeLists.txt | 10 - jf-live-writer/test/main.cpp | 10 - jf-live-writer/test/test_BinaryReader.cpp | 10 - sf-stream/src/ZmqLiveSender.cpp | 25 --- 31 files changed, 1371 deletions(-) delete mode 100644 core-broker/core-broker/__init__.py delete mode 100644 core-broker/core-broker/broker_client.py delete mode 100644 core-broker/core-broker/broker_debugger.py delete mode 100644 epics-writer/Readme.md delete mode 100644 epics-writer/conda-recipe/build.sh delete mode 100644 epics-writer/conda-recipe/meta.yaml delete mode 100644 epics-writer/epics_writer/__init__.py delete mode 100644 epics-writer/epics_writer/start.py delete mode 100644 epics-writer/epics_writer/writer.py delete mode 100644 epics-writer/setup.py delete mode 100644 epics-writer/tests/__init__.py delete mode 100644 epics-writer/tests/channels.txt delete mode 100644 epics-writer/tests/test_download_data.py delete mode 100644 jf-live-daq/CMakeLists.txt delete mode 100644 jf-live-daq/src/main.cpp delete mode 100644 jf-live-daq/test/CMakeLists.txt delete mode 100644 jf-live-daq/test/main.cpp delete mode 100644 jf-live-writer/CMakeLists.txt delete mode 100644 jf-live-writer/include/BinaryReader.hpp delete mode 100644 jf-live-writer/include/JFH5LiveWriter.hpp delete mode 100644 jf-live-writer/include/LiveImageAssembler.hpp delete mode 100644 jf-live-writer/include/live_writer_config.hpp delete mode 100644 jf-live-writer/src/BinaryReader.cpp delete mode 100644 jf-live-writer/src/JFH5LiveWriter.cpp delete mode 100644 jf-live-writer/src/LiveImageAssembler.cpp delete mode 100644 jf-live-writer/src/main.cpp delete mode 100644 jf-live-writer/test/CMakeLists.txt delete mode 100644 jf-live-writer/test/main.cpp delete mode 100644 jf-live-writer/test/test_BinaryReader.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 9bb6af6..6dbaafa 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -32,5 +32,3 @@ add_subdirectory("core-buffer") add_subdirectory("sf-buffer") add_subdirectory("sf-stream") add_subdirectory("sf-writer") -#add_subdirectory("jf-live-writer") -#add_subdirectory("jf-live-daq") diff --git a/core-broker/core-broker/__init__.py b/core-broker/core-broker/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/core-broker/core-broker/broker_client.py b/core-broker/core-broker/broker_client.py deleted file mode 100644 index 2b676fe..0000000 --- a/core-broker/core-broker/broker_client.py +++ /dev/null @@ -1,71 +0,0 @@ -import json - -from pika import BlockingConnection, ConnectionParameters, BasicProperties - - -class BrokerClient(object): - - REQUEST_EXCHANGE = "request" - STATUS_EXCHANGE = "status" - DEFAULT_BROKER_URL = "127.0.0.1" - - def __init__(self, broker_url=DEFAULT_BROKER_URL): - self.connection = BlockingConnection(ConnectionParameters(broker_url)) - self.channel = self.connection.channel() - - self.channel.exchange_declare(exchange=self.REQUEST_EXCHANGE, - exchange_type="topic") - - self.channel.exchange_declare(exchange=self.STATUS_EXCHANGE, - exchange_type="fanout") - - def close(self): - self.connection.close() - - def request_write(self, - output_prefix, - metadata=None, - detectors=None, - bsread_channels=None, - epics_pvs=None): - - routing_key = "." - - if detectors: - for detector in detectors: - routing_key += detector + "." - - if bsread_channels: - routing_key += "bsread" + "." - - if epics_pvs: - routing_key += "epics" + "." - - body_bytes = json.dumps({ - "output_prefix": output_prefix, - "metadata": metadata, - "detectors": detectors, - "bsread_channels": bsread_channels, - "epics_pvs": epics_pvs - }).encode() - - self.channel.basic_publish(exchange=self.REQUEST_EXCHANGE, - routing_key=routing_key, - body=body_bytes) - - status_header = { - "action": "write_request", - "source": "BrokerClient", - "routing_key": routing_key - } - - self.channel.basic_publish(exchange=self.STATUS_EXCHANGE, - properties=BasicProperties( - headers=status_header), - routing_key="", - body=body_bytes) - - -broker = BrokerClient() -broker.request_write("/tmp/test", epics_pvs=["test"]) -broker.close() diff --git a/core-broker/core-broker/broker_debugger.py b/core-broker/core-broker/broker_debugger.py deleted file mode 100644 index 52b504b..0000000 --- a/core-broker/core-broker/broker_debugger.py +++ /dev/null @@ -1,70 +0,0 @@ -from datetime import datetime -import json - -from pika import BlockingConnection, ConnectionParameters - -DEFAULT_BROKER_URL = "127.0.0.1" -STATUS_EXCHANGE = "status" - -COLOR_END_MARKER = '\x1b[0m' - - -def get_color_for_action(action): - - color_mapping = { - "write_request": "\x1b[34;1m", - "write_start": "\x1b[1;33;1m", - "write_finished": "\x1b[1;32;1m" - } - - return color_mapping.get(action, "") - - -def on_status(channel, method_frame, header_frame, body): - header = header_frame.headers - request = json.loads(body.decode()) - - action = header["action"] - source = header["source"] - - action_output = get_color_for_action(action) + action + COLOR_END_MARKER - time_output = datetime.utcnow().strftime("%Y%m%d-%H:%M:%S.%f") - - print("[%s] %s %s" % (time_output, action_output, source)) - print(request) - - -def connect_to_broker(broker_url): - connection = BlockingConnection(ConnectionParameters(broker_url)) - channel = connection.channel() - - channel.exchange_declare(exchange=STATUS_EXCHANGE, - exchange_type="fanout") - queue = channel.queue_declare(queue="", exclusive=True).method.queue - channel.queue_bind(queue=queue, - exchange=STATUS_EXCHANGE) - - channel.basic_consume(queue, on_status) - - try: - channel.start_consuming() - except KeyboardInterrupt: - channel.stop_consuming() - - -def main(): - import argparse - parser = argparse.ArgumentParser( - description="Connect and listen to broker events.") - - parser.add_argument('--broker_url', dest='broker_url', - default=DEFAULT_BROKER_URL, - help='RabbitMQ broker URL') - - args = parser.parse_args() - - connect_to_broker(broker_url=args.broker_url) - - -if __name__ == '__main__': - main() diff --git a/epics-writer/Readme.md b/epics-writer/Readme.md deleted file mode 100644 index 5f6ab51..0000000 --- a/epics-writer/Readme.md +++ /dev/null @@ -1,34 +0,0 @@ -# Overview - -Simple server to dump Epics Channel Access data to an HDF5 file. -The server gets an http callback from the Broker whenever there was an acquisition. - - -__Note: THIS IS/WAS A FRIDAY AFTERNOON HACK TO MAKE THE SWISSFEL DAQ WORK__ - - -The format of the request is as follows: -``` -{ - 'range': { - 'startPulseId': 100, - 'endPulseId': 120 - }, - - 'parameters': { - 'general/created': 'test', - 'general/user': 'tester', - 'general/process': 'test_process', - 'general/instrument': 'mac', - 'output_file': '/bla/test.h5'} -} - -``` - -Right now this server needs to run on the same server than the - -# Testing - -```bash -curl -XPUT -d '{"range":{"startPulseId": 7281433214, "endPulseId": 7281489688}, "parameters":{"output_file":"test.h5"}}' http://localhost:10200/notify -``` diff --git a/epics-writer/conda-recipe/build.sh b/epics-writer/conda-recipe/build.sh deleted file mode 100644 index d7a34f9..0000000 --- a/epics-writer/conda-recipe/build.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/usr/bin/env bash - -$PYTHON setup.py install # Python command to install the script diff --git a/epics-writer/conda-recipe/meta.yaml b/epics-writer/conda-recipe/meta.yaml deleted file mode 100644 index 8b464c1..0000000 --- a/epics-writer/conda-recipe/meta.yaml +++ /dev/null @@ -1,20 +0,0 @@ -package: - name: epics-writer - version: 0.0.1 - -source: - path: .. - -build: - noarch: python - entry_points: - - epics-writer = epics_writer.start:main - -requirements: - build: - - python - run: - - python - - data_api >=0.7.6 - - requests - - pika diff --git a/epics-writer/epics_writer/__init__.py b/epics-writer/epics_writer/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/epics-writer/epics_writer/start.py b/epics-writer/epics_writer/start.py deleted file mode 100644 index 3647d6d..0000000 --- a/epics-writer/epics_writer/start.py +++ /dev/null @@ -1,99 +0,0 @@ -import json -from pika import BlockingConnection, ConnectionParameters, BasicProperties - -from epics_writer.writer import write_epics_pvs - - -DEFAULT_BROKER_URL = "127.0.0.1" -REQUEST_EXCHANGE = "request" -STATUS_EXCHANGE = "status" -QUEUE_NAME = "epics" -OUTPUT_FILE_SUFFIX = ".PVCHANNELS.h5" - - -def update_status(channel, body, action, file, message=None): - status_header = { - "action": action, - "source": "epics_writer", - "routing_key": QUEUE_NAME, - "file": file, - "message": message - } - - channel.basic_publish(exchange=STATUS_EXCHANGE, - properties=BasicProperties( - headers=status_header), - routing_key="", - body=body) - - -def on_message(channel, method_frame, header_frame, body): - - output_file = None - - try: - request = json.loads(body.decode()) - output_prefix = request["output_prefix"] - start_pulse_id = 1 - stop_pulse_id = 10 - metadata = request["metadata"] - epics_pvs = request["epics_pvs"] - - output_file = output_prefix + OUTPUT_FILE_SUFFIX - - update_status(channel, body, "write_start", output_file) - - write_epics_pvs(output_file=output_file, - start_pulse_id=start_pulse_id, - stop_pulse_id=stop_pulse_id, - metadata=metadata, - epics_pvs=epics_pvs) - - except Exception as e: - channel.basic_reject(delivery_tag=method_frame.delivery_tag, - requeue=False) - - update_status(channel, body, "write_rejected", output_file, str(e)) - - else: - channel.basic_ack(delivery_tag=method_frame.delivery_tag) - - update_status(channel, body, "write_finished", output_file) - - -def connect_to_broker(broker_url): - connection = BlockingConnection(ConnectionParameters(broker_url)) - channel = connection.channel() - - channel.exchange_declare(exchange=STATUS_EXCHANGE, - exchange_type="fanout") - channel.exchange_declare(exchange=REQUEST_EXCHANGE, - exchange_type="topic") - - channel.queue_declare(queue=QUEUE_NAME, auto_delete=True) - channel.queue_bind(queue=QUEUE_NAME, - exchange=REQUEST_EXCHANGE, - routing_key="*.%s.*" % QUEUE_NAME) - channel.basic_qos(prefetch_count=1) - channel.basic_consume(QUEUE_NAME, on_message) - - try: - channel.start_consuming() - except KeyboardInterrupt: - channel.stop_consuming() - - -def main(): - import argparse - parser = argparse.ArgumentParser(description='Epics HDF5 writer') - parser.add_argument('--broker_url', dest='broker_url', - default=DEFAULT_BROKER_URL, - help='RabbitMQ broker URL') - - args = parser.parse_args() - - connect_to_broker(broker_url=args.broker_url) - - -if __name__ == '__main__': - main() diff --git a/epics-writer/epics_writer/writer.py b/epics-writer/epics_writer/writer.py deleted file mode 100644 index fe9bc06..0000000 --- a/epics-writer/epics_writer/writer.py +++ /dev/null @@ -1,120 +0,0 @@ -import datetime -import time -import logging -import requests -import data_api - -logger = logging.getLogger(__name__) - -DATA_API_QUERY_URL = "https://data-api.psi.ch/sf/query" - - -def write_epics_pvs(output_file, start_pulse_id, stop_pulse_id, metadata, epics_pvs): - - start_date = get_pulse_id_date_mapping(start_pulse_id) - stop_date = get_pulse_id_date_mapping(stop_pulse_id) - - data = get_data(epics_pvs, start=start_date, stop=stop_date) - # TODO: Merge metadata to data. - - if data: - logger.info("Persist data to hdf5 file") - data_api.to_hdf5(data, output_file, overwrite=True, compression=None, shuffle=False) - else: - logger.error("No data retrieved") - open(output_file + "_NO_DATA", 'a').close() - - -def get_data(channel_list, start=None, stop=None, base_url=None): - logger.info("Requesting range %s to %s for channels: " % (start, stop, channel_list)) - - query = {"range": {"startDate": datetime.datetime.isoformat(start), - "endDate": datetime.datetime.isoformat(stop), - "startExpansion": True}, - "channels": channel_list, - "fields": ["pulseId", "globalSeconds", "globalDate", "value", - "eventCount"]} - logger.debug(query) - - response = requests.post(DATA_API_QUERY_URL, json=query) - - # Check for successful return of data - if response.status_code != 200: - logger.info("Data retrievali failed, sleep for another time and try") - - itry = 0 - while itry < 5: - itry += 1 - time.sleep(60) - response = requests.post(DATA_API_QUERY_URL, json=query) - if response.status_code == 200: - break - - logger.info("Data retrieval failed, post attempt %d" % itry) - - if response.status_code != 200: - raise RuntimeError("Unable to retrieve data from server: ", response) - - logger.info("Data retieval is successful") - - data = response.json() - - return data_api.client._build_pandas_data_frame(data, index_field="globalDate") - - -def get_pulse_id_date_mapping(pulse_id): - # See https://jira.psi.ch/browse/ATEST-897 for more details ... - logger.info("Retrieve pulse-id/date mapping for pulse_id %s" % pulse_id) - - try: - - query = {"range": {"startPulseId": 0, - "endPulseId": pulse_id}, - "limit": 1, - "ordering": "desc", - "channels": ["SIN-CVME-TIFGUN-EVR0:BUNCH-1-OK"], - "fields": ["pulseId", "globalDate"]} - - for c in range(10): - - response = requests.post("https://data-api.psi.ch/sf/query", json=query) - - # Check for successful return of data - if response.status_code != 200: - raise RuntimeError("Unable to retrieve data from server: ", response) - - data = response.json() - - if len(data[0]["data"]) == 0 or not "pulseId" in data[0]["data"][0]: - raise RuntimeError( - "Didn't get good responce from data_api : %s " % data) - - if not pulse_id == data[0]["data"][0]["pulseId"]: - logger.info("retrieval failed") - if c == 0: - ref_date = data[0]["data"][0]["globalDate"] - ref_date = dateutil.parser.parse(ref_date) - - now_date = datetime.datetime.now() - now_date = pytz.timezone('Europe/Zurich').localize( - now_date) - - check_date = ref_date + datetime.timedelta( - seconds=24) # 20 seconds should be enough - delta_date = check_date - now_date - - s = delta_date.seconds - logger.info("retry in " + str(s) + " seconds ") - if not s <= 0: - time.sleep(s) - continue - - raise RuntimeError('Unable to retrieve mapping') - - date = data[0]["data"][0]["globalDate"] - date = dateutil.parser.parse(date) - dates.append(date) - break - - except Exception as e: - raise RuntimeError('Unable to retrieve pulse_id date mapping') from e diff --git a/epics-writer/setup.py b/epics-writer/setup.py deleted file mode 100644 index d4358df..0000000 --- a/epics-writer/setup.py +++ /dev/null @@ -1,15 +0,0 @@ -from setuptools import setup - -setup( - name="cadump", - version="0.0.12", - author="Paul Scherrer Institute", - author_email="daq@psi.ch", - description="Interface to dump data from archiver/databuffer", - packages=["cadump"], - entry_points={ - 'console_scripts': [ - 'cadump_server = cadump.cadump:main', - ], - } -) diff --git a/epics-writer/tests/__init__.py b/epics-writer/tests/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/epics-writer/tests/channels.txt b/epics-writer/tests/channels.txt deleted file mode 100644 index 63868d2..0000000 --- a/epics-writer/tests/channels.txt +++ /dev/null @@ -1,6 +0,0 @@ -S10-CPCL-VM1MGC:LOAD - -ONE - TWO -# Comment that need to be removed -THREE # one more comment at the end that need to be removed diff --git a/epics-writer/tests/test_download_data.py b/epics-writer/tests/test_download_data.py deleted file mode 100644 index 87a7194..0000000 --- a/epics-writer/tests/test_download_data.py +++ /dev/null @@ -1,34 +0,0 @@ -import unittest -from unittest import TestCase -from cadump import cadump -import logging - -class TestDownloadData(TestCase): - - def test_download_data(self): - config = { - 'range': { - 'startPulseId': 9618913001, - 'endPulseId': 9618923000 - }, - - 'parameters': { - 'general/created': 'test', - 'general/user': 'tester', - 'general/process': 'test_process', - 'general/instrument': 'mac', - 'output_file': 'test.h5'} # this is usually the full path - } - - cadump.base_url = "https://data-api.psi.ch/sf" - cadump.download_data(config) - # self.fail() - - def test_read_channels(self): - channels = cadump.read_channels("channels.txt") - logging.info(channels) - self.assertEqual(len(channels), 4) - - -if __name__ == '__main__': - unittest.main() diff --git a/jf-live-daq/CMakeLists.txt b/jf-live-daq/CMakeLists.txt deleted file mode 100644 index f2763dc..0000000 --- a/jf-live-daq/CMakeLists.txt +++ /dev/null @@ -1,26 +0,0 @@ - -find_package(MPI REQUIRED) -# Because of openmpi. -add_definitions(-DOMPI_SKIP_MPICXX) - -file(GLOB SOURCES - src/*.cpp) - -add_library(jf-live-daq-lib STATIC ${SOURCES}) -target_include_directories(jf-live-daq-lib - PUBLIC include/ - SYSTEM ${MPI_INCLUDE_PATH}) - -target_link_libraries(jf-live-daq-lib - external - core-buffer-lib - ${MPI_LIBRARIES}) - -add_executable(jf-live-daq src/main.cpp) -set_target_properties(jf-live-daq PROPERTIES OUTPUT_NAME jf_live_daq) -target_link_libraries(jf-live-daq - jf-live-daq-lib - ) - -enable_testing() -add_subdirectory(test/) \ No newline at end of file diff --git a/jf-live-daq/src/main.cpp b/jf-live-daq/src/main.cpp deleted file mode 100644 index 6069a0b..0000000 --- a/jf-live-daq/src/main.cpp +++ /dev/null @@ -1,49 +0,0 @@ -#include -#include - -void receive() -{ - -} - -void assemble() -{ - -} - -void write() -{ - -} - -int main(int argc, char** argv) -{ - // Initialize the MPI environment - MPI_Init(NULL, NULL); - - // Get the number of processes - int world_size; - MPI_Comm_size(MPI_COMM_WORLD, &world_size); - - // Get the rank of the process - int world_rank; - MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); - - // Get the name of the processor - char processor_name[MPI_MAX_PROCESSOR_NAME]; - int name_len; - MPI_Get_processor_name(processor_name, &name_len); - - const int n_modules = 16; - - if (world_rank == 0) { - assemble(); - } else if (world_rank <= n_modules) { - receive(); - } else { - write(); - } - - // Finalize the MPI environment. - MPI_Finalize(); -} diff --git a/jf-live-daq/test/CMakeLists.txt b/jf-live-daq/test/CMakeLists.txt deleted file mode 100644 index 7dc93bb..0000000 --- a/jf-live-daq/test/CMakeLists.txt +++ /dev/null @@ -1,10 +0,0 @@ -add_executable(jf-live-daq-tests main.cpp) - -target_link_libraries(jf-live-daq-tests - jf-live-daq-lib - hdf5 - hdf5_hl - hdf5_cpp - zmq - gtest - ) diff --git a/jf-live-daq/test/main.cpp b/jf-live-daq/test/main.cpp deleted file mode 100644 index 1ea4d8a..0000000 --- a/jf-live-daq/test/main.cpp +++ /dev/null @@ -1,9 +0,0 @@ -#include "gtest/gtest.h" - - -using namespace std; - -int main(int argc, char **argv) { - ::testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); -} diff --git a/jf-live-writer/CMakeLists.txt b/jf-live-writer/CMakeLists.txt deleted file mode 100644 index 6028bfe..0000000 --- a/jf-live-writer/CMakeLists.txt +++ /dev/null @@ -1,22 +0,0 @@ -file(GLOB SOURCES - src/*.cpp) - -add_library(jf-live-writer-lib STATIC ${SOURCES}) -target_include_directories(jf-live-writer-lib PUBLIC include/) -target_link_libraries(jf-live-writer-lib - external - core-buffer-lib) - -add_executable(jf-live-writer src/main.cpp) -set_target_properties(jf-live-writer PROPERTIES OUTPUT_NAME jf_live_writer) -target_link_libraries(jf-live-writer - jf-live-writer-lib - sf-writer-lib - hdf5 - hdf5_hl - hdf5_cpp - pthread - ) - -enable_testing() -add_subdirectory(test/) \ No newline at end of file diff --git a/jf-live-writer/include/BinaryReader.hpp b/jf-live-writer/include/BinaryReader.hpp deleted file mode 100644 index 85d2a0c..0000000 --- a/jf-live-writer/include/BinaryReader.hpp +++ /dev/null @@ -1,28 +0,0 @@ -#ifndef SF_DAQ_BUFFER_BINARYREADER_HPP -#define SF_DAQ_BUFFER_BINARYREADER_HPP - - -#include - -class BinaryReader { - - const std::string detector_folder_; - const std::string module_name_; - - std::string current_input_file_; - int input_file_fd_; - - void open_file(const std::string& filename); - void close_current_file(); - -public: - BinaryReader(const std::string &detector_folder, - const std::string &module_name); - - ~BinaryReader(); - - void get_frame(const uint64_t pulse_id, BufferBinaryFormat *buffer); -}; - - -#endif //SF_DAQ_BUFFER_BINARYREADER_HPP diff --git a/jf-live-writer/include/JFH5LiveWriter.hpp b/jf-live-writer/include/JFH5LiveWriter.hpp deleted file mode 100644 index a417631..0000000 --- a/jf-live-writer/include/JFH5LiveWriter.hpp +++ /dev/null @@ -1,49 +0,0 @@ -#ifndef SFWRITER_HPP -#define SFWRITER_HPP - -#include -#include -#include - -#include "LiveImageAssembler.hpp" - -const auto& H5_UINT64 = H5::PredType::NATIVE_UINT64; -const auto& H5_UINT32 = H5::PredType::NATIVE_UINT32; -const auto& H5_UINT16 = H5::PredType::NATIVE_UINT16; -const auto& H5_UINT8 = H5::PredType::NATIVE_UINT8; - -class JFH5LiveWriter { - - const std::string detector_name_; - const size_t n_modules_; - const size_t n_pulses_; - - size_t write_index_; - - H5::H5File file_; - H5::DataSet image_dataset_; - - uint64_t* b_pulse_id_; - uint64_t* b_frame_index_; - uint32_t* b_daq_rec_; - uint8_t* b_is_good_frame_ ; - - void init_file(const std::string &output_file); - void write_dataset(const std::string name, - const void *buffer, - const H5::PredType &type); - void write_metadata(); - std::string get_detector_name(const std::string& detector_folder); - - void close_file(); - -public: - JFH5LiveWriter(const std::string& output_file, - const std::string& detector_folder, - const size_t n_modules, - const size_t n_pulses); - ~JFH5LiveWriter(); - void write(const ImageMetadata* metadata, const char* data); -}; - -#endif //SFWRITER_HPP diff --git a/jf-live-writer/include/LiveImageAssembler.hpp b/jf-live-writer/include/LiveImageAssembler.hpp deleted file mode 100644 index 5bcb749..0000000 --- a/jf-live-writer/include/LiveImageAssembler.hpp +++ /dev/null @@ -1,51 +0,0 @@ -#ifndef SF_DAQ_BUFFER_LIVEIMAGEASSEMBLER_HPP -#define SF_DAQ_BUFFER_LIVEIMAGEASSEMBLER_HPP - -#include - -#include "buffer_config.hpp" -#include "formats.hpp" - -const uint64_t IA_EMPTY_SLOT_VALUE = 0; - -struct ImageMetadata -{ - uint64_t pulse_id; - uint64_t frame_index; - uint32_t daq_rec; - uint8_t is_good_image; -}; - -class LiveImageAssembler { - const size_t n_modules_; - const size_t image_buffer_slot_n_bytes_; - - char* image_buffer_; - ImageMetadata* image_meta_buffer_; - ModuleFrame* frame_meta_buffer_; - std::atomic_int* buffer_status_; - std::atomic_uint64_t* buffer_pulse_id_; - - size_t get_data_offset(const uint64_t slot_id, const int i_module); - size_t get_frame_metadata_offset(const uint64_t slot_id, const int i_module); - -public: - LiveImageAssembler(const size_t n_modules); - - virtual ~LiveImageAssembler(); - - bool is_slot_free(const uint64_t pulse_id); - bool is_slot_full(const uint64_t pulse_id); - - void process(const uint64_t pulse_id, - const int i_module, - const BufferBinaryFormat* block_buffer); - - void free_slot(const uint64_t pulse_id); - - ImageMetadata* get_metadata_buffer(const uint64_t pulse_id); - char* get_data_buffer(const uint64_t pulse_id); -}; - - -#endif //SF_DAQ_BUFFER_LIVEIMAGEASSEMBLER_HPP diff --git a/jf-live-writer/include/live_writer_config.hpp b/jf-live-writer/include/live_writer_config.hpp deleted file mode 100644 index 0a62457..0000000 --- a/jf-live-writer/include/live_writer_config.hpp +++ /dev/null @@ -1,9 +0,0 @@ -#include - -namespace live_writer_config -{ - // MS to retry reading from the image assembler. - const size_t ASSEMBLER_RETRY_MS = 5; - // Number of slots in the reconstruction buffer. - const size_t WRITER_IA_N_SLOTS = 200; -} \ No newline at end of file diff --git a/jf-live-writer/src/BinaryReader.cpp b/jf-live-writer/src/BinaryReader.cpp deleted file mode 100644 index 0512ac7..0000000 --- a/jf-live-writer/src/BinaryReader.cpp +++ /dev/null @@ -1,102 +0,0 @@ -#include "BinaryReader.hpp" - -#include -#include -#include -#include - -#include "BufferUtils.hpp" -#include "buffer_config.hpp" - -using namespace std; -using namespace buffer_config; - -BinaryReader::BinaryReader( - const std::string &detector_folder, - const std::string &module_name) : - detector_folder_(detector_folder), - module_name_(module_name), - current_input_file_(""), - input_file_fd_(-1) -{} - -BinaryReader::~BinaryReader() -{ - close_current_file(); -} - -void BinaryReader::get_frame( - const uint64_t pulse_id, BufferBinaryFormat* buffer) -{ - - auto current_frame_file = BufferUtils::get_filename( - detector_folder_, module_name_, pulse_id); - - if (current_frame_file != current_input_file_) { - open_file(current_frame_file); - } - - size_t file_index = BufferUtils::get_file_frame_index(pulse_id); - size_t n_bytes_offset = file_index * sizeof(BufferBinaryFormat); - - auto lseek_result = lseek(input_file_fd_, n_bytes_offset, SEEK_SET); - if (lseek_result < 0) { - stringstream err_msg; - - err_msg << "[BinaryReader::get_frame]"; - err_msg << " Error while lseek on file "; - err_msg << current_input_file_ << " for n_bytes_offset "; - err_msg << n_bytes_offset << ": " << strerror(errno) << endl; - - throw runtime_error(err_msg.str()); - } - - auto n_bytes = ::read(input_file_fd_, buffer, sizeof(BufferBinaryFormat)); - - if (n_bytes < sizeof(BufferBinaryFormat)) { - stringstream err_msg; - - err_msg << "[BinaryReader::get_block]"; - err_msg << " Error while reading from file "; - err_msg << current_input_file_ << ": " << strerror(errno) << endl; - - throw runtime_error(err_msg.str()); - } -} - -void BinaryReader::open_file(const std::string& filename) -{ - close_current_file(); - - input_file_fd_ = open(filename.c_str(), O_RDONLY); - - if (input_file_fd_ < 0) { - stringstream err_msg; - - err_msg << "[BinaryReader::open_file]"; - err_msg << " Cannot open file " << filename << ": "; - err_msg << strerror(errno) << endl; - - throw runtime_error(err_msg.str()); - } - - current_input_file_ = filename; -} - -void BinaryReader::close_current_file() -{ - if (input_file_fd_ != -1) { - if (close(input_file_fd_) < 0) { - stringstream err_msg; - - err_msg << "[BinaryWriter::close_current_file]"; - err_msg << " Error while closing file " << current_input_file_; - err_msg << ": " << strerror(errno) << endl; - - throw runtime_error(err_msg.str()); - } - - input_file_fd_ = -1; - current_input_file_ = ""; - } -} diff --git a/jf-live-writer/src/JFH5LiveWriter.cpp b/jf-live-writer/src/JFH5LiveWriter.cpp deleted file mode 100644 index 5928a6e..0000000 --- a/jf-live-writer/src/JFH5LiveWriter.cpp +++ /dev/null @@ -1,133 +0,0 @@ -#include "JFH5LiveWriter.hpp" - -#include -#include - - -#include "buffer_config.hpp" - -using namespace std; -using namespace buffer_config; - -JFH5LiveWriter::JFH5LiveWriter(const string& output_file, - const string& detector_folder, - const size_t n_modules, - const size_t n_pulses) : - detector_name_(get_detector_name(detector_folder)), - n_modules_(n_modules), - n_pulses_(n_pulses), - write_index_(0) -{ - b_pulse_id_ = new uint64_t[n_pulses_]; - b_frame_index_= new uint64_t[n_pulses_]; - b_daq_rec_ = new uint32_t[n_pulses_]; - b_is_good_frame_ = new uint8_t[n_pulses_]; - - init_file(output_file); -} - -void JFH5LiveWriter::init_file(const string& output_file) -{ - file_ = H5::H5File(output_file, H5F_ACC_TRUNC); - file_.createGroup("/data"); - file_.createGroup("/data/" + detector_name_); - - H5::DataSpace att_space(H5S_SCALAR); - H5::DataType data_type = H5::StrType(0, H5T_VARIABLE); - - file_.createGroup("/general"); - auto detector_dataset = file_.createDataSet( - "/general/detector_name", data_type ,att_space); - - detector_dataset.write(detector_name_, data_type); - - hsize_t image_dataset_dims[3] = - {n_pulses_, n_modules_ * MODULE_Y_SIZE, MODULE_X_SIZE}; - - H5::DataSpace image_dataspace(3, image_dataset_dims); - - hsize_t image_dataset_chunking[3] = - {1, n_modules_ * MODULE_Y_SIZE, MODULE_X_SIZE}; - H5::DSetCreatPropList image_dataset_properties; - image_dataset_properties.setChunk(3, image_dataset_chunking); - - image_dataset_ = file_.createDataSet( - "/data/" + detector_name_ + "/data", - H5_UINT16, - image_dataspace, - image_dataset_properties); -} - - -std::string JFH5LiveWriter::get_detector_name(const string& detector_folder) -{ - size_t last_separator; - if ((last_separator = detector_folder.rfind("/")) == string::npos) { - return detector_folder; - } - - return detector_folder.substr(last_separator + 1); -} - -JFH5LiveWriter::~JFH5LiveWriter() -{ - close_file(); - - delete[] b_pulse_id_; - delete[] b_frame_index_; - delete[] b_daq_rec_; - delete[] b_is_good_frame_; -} - -void JFH5LiveWriter::write_dataset( - const string name, const void* buffer, const H5::PredType& type) -{ - hsize_t b_m_dims[] = {n_pulses_}; - H5::DataSpace b_m_space (1, b_m_dims); - - hsize_t f_m_dims[] = {n_pulses_, 1}; - H5::DataSpace f_m_space(2, f_m_dims); - - auto complete_name = "/data/" + detector_name_ + "/" + name; - auto dataset = file_.createDataSet(complete_name, type, f_m_space); - - dataset.write(buffer, type, b_m_space, f_m_space); - - dataset.close(); -} - -void JFH5LiveWriter::write_metadata() -{ - write_dataset("pulse_id", &b_pulse_id_, H5_UINT64); - write_dataset("frame_index", &b_frame_index_, H5_UINT64); - write_dataset("daq_rec", &b_daq_rec_, H5_UINT32); - write_dataset("is_good_frame", &b_is_good_frame_, H5_UINT8); -} - -void JFH5LiveWriter::close_file() -{ - if (file_.getId() == -1) { - return; - } - - image_dataset_.close(); - - write_metadata(); - - file_.close(); -} - -void JFH5LiveWriter::write(const ImageMetadata* metadata, const char* data) -{ - hsize_t offset[] = {write_index_, 0, 0}; - - H5DOwrite_chunk(image_dataset_.getId(), H5P_DEFAULT, 0, - offset, MODULE_N_BYTES * n_modules_, data); - - b_pulse_id_[write_index_] = metadata->pulse_id; - b_frame_index_[write_index_] = metadata->frame_index; - b_daq_rec_[write_index_] = metadata->daq_rec; - b_is_good_frame_[write_index_] = metadata->is_good_image; - - write_index_++; -} diff --git a/jf-live-writer/src/LiveImageAssembler.cpp b/jf-live-writer/src/LiveImageAssembler.cpp deleted file mode 100644 index 57cf48b..0000000 --- a/jf-live-writer/src/LiveImageAssembler.cpp +++ /dev/null @@ -1,159 +0,0 @@ -#include - -#include "LiveImageAssembler.hpp" -#include "buffer_config.hpp" -#include "live_writer_config.hpp" - -using namespace std; -using namespace buffer_config; -using namespace live_writer_config; - -LiveImageAssembler::LiveImageAssembler(const size_t n_modules) : - n_modules_(n_modules), - image_buffer_slot_n_bytes_(MODULE_N_BYTES * n_modules_) -{ - image_buffer_ = new char[WRITER_IA_N_SLOTS * image_buffer_slot_n_bytes_]; - image_meta_buffer_ = new ImageMetadata[WRITER_IA_N_SLOTS]; - frame_meta_buffer_ = new ModuleFrame[WRITER_IA_N_SLOTS * n_modules]; - buffer_status_ = new atomic_int[WRITER_IA_N_SLOTS]; - buffer_pulse_id_ = new atomic_uint64_t[WRITER_IA_N_SLOTS]; - - for (size_t i=0; i < WRITER_IA_N_SLOTS; i++) { - free_slot(i); - } -} - -LiveImageAssembler::~LiveImageAssembler() -{ - delete[] image_buffer_; - delete[] image_meta_buffer_; -} - -bool LiveImageAssembler::is_slot_free(const uint64_t pulse_id) -{ - auto slot_id = pulse_id % WRITER_IA_N_SLOTS; - - uint64_t slot_pulse_id = IA_EMPTY_SLOT_VALUE; - if (buffer_pulse_id_[slot_id].compare_exchange_strong( - slot_pulse_id, pulse_id)) { - return true; - } - - auto is_free = buffer_status_[slot_id].load(memory_order_relaxed) > 0; - return is_free && (slot_pulse_id == pulse_id); -} - -bool LiveImageAssembler::is_slot_full(const uint64_t pulse_id) -{ - auto slot_id = pulse_id % WRITER_IA_N_SLOTS; - return buffer_status_[slot_id].load(memory_order_relaxed) == 0; -} - -size_t LiveImageAssembler::get_data_offset( - const uint64_t slot_id, const int i_module) -{ - size_t slot_i_offset = slot_id * image_buffer_slot_n_bytes_; - size_t module_i_offset = i_module * MODULE_N_BYTES; - - return slot_i_offset + module_i_offset; -} - -size_t LiveImageAssembler::get_frame_metadata_offset( - const uint64_t slot_id, const int i_module) -{ - size_t slot_m_offset = slot_id * n_modules_; - size_t module_m_offset = i_module; - - return slot_m_offset + module_m_offset; -} - -void LiveImageAssembler::process( - const uint64_t pulse_id, - const int i_module, - const BufferBinaryFormat* file_buffer) -{ - const auto slot_id = pulse_id % WRITER_IA_N_SLOTS; - - auto frame_meta_offset = get_frame_metadata_offset(slot_id, i_module); - auto image_offset = get_data_offset(slot_id, i_module); - - memcpy( - &(frame_meta_buffer_[frame_meta_offset]), - &(file_buffer->metadata), - sizeof(file_buffer->metadata)); - - memcpy( - image_buffer_ + image_offset, - &(file_buffer->data[0]), - MODULE_N_BYTES); - - buffer_status_[slot_id].fetch_sub(1, memory_order_relaxed); -} - -void LiveImageAssembler::free_slot(const uint64_t pulse_id) -{ - auto slot_id = pulse_id % WRITER_IA_N_SLOTS; - buffer_status_[slot_id].store(n_modules_, memory_order_relaxed); - buffer_pulse_id_[slot_id].store(IA_EMPTY_SLOT_VALUE, memory_order_relaxed); -} - -ImageMetadata* LiveImageAssembler::get_metadata_buffer(const uint64_t pulse_id) -{ - const auto slot_id = pulse_id % WRITER_IA_N_SLOTS; - - ImageMetadata& image_meta = image_meta_buffer_[slot_id]; - - auto frame_meta_offset = get_frame_metadata_offset(slot_id, 0); - - auto is_pulse_init = false; - image_meta.is_good_image = 1; - image_meta.pulse_id = 0; - - for (size_t i_module=0; i_module < n_modules_; i_module++) { - - auto& frame_meta = frame_meta_buffer_[frame_meta_offset]; - frame_meta_offset += 1; - - auto is_good_frame = - frame_meta.n_recv_packets == JF_N_PACKETS_PER_FRAME; - - if (!is_good_frame) { - image_meta.pulse_id = 0; - continue; - } - - if (!is_pulse_init) { - image_meta.pulse_id = frame_meta.pulse_id; - image_meta.frame_index = frame_meta.frame_index; - image_meta.daq_rec = frame_meta.daq_rec; - - is_pulse_init = true; - } - - if (image_meta.is_good_image == 1) { - if (frame_meta.pulse_id != image_meta.pulse_id) { - image_meta.is_good_image = 0; - } - - if (frame_meta.frame_index != image_meta.frame_index) { - image_meta.is_good_image = 0; - } - - if (frame_meta.daq_rec != image_meta.daq_rec) { - image_meta.is_good_image = 0; - } - - if (frame_meta.n_recv_packets != JF_N_PACKETS_PER_FRAME) { - image_meta.is_good_image = 0; - } - } - } - - return &image_meta; -} - -char* LiveImageAssembler::get_data_buffer(const uint64_t pulse_id) -{ - auto slot_id = pulse_id % WRITER_IA_N_SLOTS; - return image_buffer_ + (slot_id * image_buffer_slot_n_bytes_); -} diff --git a/jf-live-writer/src/main.cpp b/jf-live-writer/src/main.cpp deleted file mode 100644 index 139a34f..0000000 --- a/jf-live-writer/src/main.cpp +++ /dev/null @@ -1,195 +0,0 @@ -#include -#include -#include -#include -#include - -#include "zmq.h" -#include "live_writer_config.hpp" -#include "buffer_config.hpp" -#include "bitshuffle/bitshuffle.h" -#include "JFH5LiveWriter.hpp" -#include "LiveImageAssembler.hpp" -#include "BinaryReader.hpp" - -using namespace std; -using namespace chrono; -using namespace buffer_config; -using namespace live_writer_config; - -void read_buffer( - const string detector_folder, - const string module_name, - const int i_module, - const vector& pulse_ids_to_write, - LiveImageAssembler& image_assembler, - void* ctx) -{ - BinaryReader reader(detector_folder, module_name); - auto frame_buffer = new BufferBinaryFormat(); - - void* socket = zmq_socket(ctx, ZMQ_SUB); - if (socket == nullptr) { - throw runtime_error(zmq_strerror(errno)); - } - - int rcvhwm = 100; - if (zmq_setsockopt(socket, ZMQ_RCVHWM, &rcvhwm, sizeof(rcvhwm)) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - - int linger = 0; - if (zmq_setsockopt(socket, ZMQ_LINGER, &linger, sizeof(linger)) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - - // In milliseconds. - int rcvto = 2000; - if (zmq_setsockopt(socket, ZMQ_RCVTIMEO, &rcvto, sizeof(rcvto)) != 0 ){ - throw runtime_error(zmq_strerror(errno)); - } - - if (zmq_connect(socket, "tcp://127.0.0.1:51234") != 0) { - throw runtime_error(zmq_strerror(errno)); - } - - if (zmq_setsockopt(socket, ZMQ_SUBSCRIBE, "", 0) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - - const uint64_t PULSE_ID_DELAY = 100; - - uint64_t live_pulse_id = pulse_ids_to_write.front(); - for (uint64_t pulse_id:pulse_ids_to_write) { - - while(!image_assembler.is_slot_free(pulse_id)) { - this_thread::sleep_for(chrono::milliseconds(ASSEMBLER_RETRY_MS)); - } - - auto start_time = steady_clock::now(); - - // Enforce a delay of 1 second for writing. - while (live_pulse_id - pulse_id < PULSE_ID_DELAY) { - if (zmq_recv(socket, &live_pulse_id, - sizeof(live_pulse_id), 0) == -1) { - if (errno == EAGAIN) { - throw runtime_error("Did not receive pulse_id in time."); - } else { - throw runtime_error(zmq_strerror(errno)); - } - } - } - - reader.get_frame(pulse_id, frame_buffer); - - auto end_time = steady_clock::now(); - uint64_t read_us_duration = duration_cast( - end_time-start_time).count(); - - start_time = steady_clock::now(); - - image_assembler.process(pulse_id, i_module, frame_buffer); - - end_time = steady_clock::now(); - uint64_t compose_us_duration = duration_cast( - end_time-start_time).count(); - - cout << "sf_writer:avg_read_us "; - cout << read_us_duration / BUFFER_BLOCK_SIZE << endl; - cout << "sf_writer:avg_assemble_us "; - cout << compose_us_duration / BUFFER_BLOCK_SIZE << endl; - } - - delete frame_buffer; -} - -int main (int argc, char *argv[]) -{ - if (argc != 7) { - cout << endl; - cout << "Usage: sf_writer [output_file] [detector_folder] [n_modules]"; - cout << " [start_pulse_id] [n_pulses] [pulse_id_step]"; - cout << endl; - cout << "\toutput_file: Complete path to the output file." << endl; - cout << "\tdetector_folder: Absolute path to detector buffer." << endl; - cout << "\tn_modules: number of modules" << endl; - cout << "\tstart_pulse_id: Start pulse_id of retrieval." << endl; - cout << "\tn_pulses: Number of pulses to write." << endl; - cout << "\tpulse_id_step: 1==100Hz, 2==50hz, 4==25Hz.." << endl; - cout << endl; - - exit(-1); - } - - string output_file = string(argv[1]); - const string detector_folder = string(argv[2]); - size_t n_modules = atoi(argv[3]); - uint64_t start_pulse_id = (uint64_t) atoll(argv[4]); - size_t n_pulses = (size_t) atoll(argv[5]); - int pulse_id_step = atoi(argv[6]); - - std::vector pulse_ids_to_write; - uint64_t i_pulse_id = start_pulse_id; - for (size_t i=0; i reading_threads(n_modules); - for (size_t i_module=0; i_module( - end_time-start_time).count(); - - image_assembler.free_slot(pulse_id); - - cout << "sf_writer:avg_write_us "; - cout << write_us_duration / BUFFER_BLOCK_SIZE << endl; - } - - for (auto& reading_thread : reading_threads) { - if (reading_thread.joinable()) { - reading_thread.join(); - } - } - - return 0; -} diff --git a/jf-live-writer/test/CMakeLists.txt b/jf-live-writer/test/CMakeLists.txt deleted file mode 100644 index 1079fc2..0000000 --- a/jf-live-writer/test/CMakeLists.txt +++ /dev/null @@ -1,10 +0,0 @@ -add_executable(jf-live-writer-tests main.cpp) - -target_link_libraries(jf-live-writer-tests - jf-live-writer-lib - hdf5 - hdf5_hl - hdf5_cpp - zmq - gtest - ) diff --git a/jf-live-writer/test/main.cpp b/jf-live-writer/test/main.cpp deleted file mode 100644 index 69b7f53..0000000 --- a/jf-live-writer/test/main.cpp +++ /dev/null @@ -1,10 +0,0 @@ -#include "gtest/gtest.h" - -#include "test_BinaryReader.cpp" - -using namespace std; - -int main(int argc, char **argv) { - ::testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); -} diff --git a/jf-live-writer/test/test_BinaryReader.cpp b/jf-live-writer/test/test_BinaryReader.cpp deleted file mode 100644 index cc30157..0000000 --- a/jf-live-writer/test/test_BinaryReader.cpp +++ /dev/null @@ -1,10 +0,0 @@ -#include -#include "gtest/gtest.h" - -TEST(BinaryReader, basic_interaction) { - // TODO: Write some real tests. - auto detector_folder = "test_device"; - auto module_name = "M1"; - BinaryReader reader(detector_folder, module_name); -} - diff --git a/sf-stream/src/ZmqLiveSender.cpp b/sf-stream/src/ZmqLiveSender.cpp index cf1b1aa..78cf055 100644 --- a/sf-stream/src/ZmqLiveSender.cpp +++ b/sf-stream/src/ZmqLiveSender.cpp @@ -59,27 +59,6 @@ ZmqLiveSender::ZmqLiveSender( throw runtime_error(zmq_strerror(errno)); } } - - if (false) { - socket_pulse_ = zmq_socket(ctx, ZMQ_PUB); - - if (zmq_bind(socket_pulse_, config.pulse_address.c_str()) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - - const int sndhwm = PULSE_ZMQ_SNDHWM; - if (zmq_setsockopt( - socket_pulse_, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - - const int linger = 0; - if (zmq_setsockopt( - socket_pulse_, ZMQ_LINGER, &linger, sizeof(linger)) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - } - } ZmqLiveSender::~ZmqLiveSender() @@ -121,10 +100,6 @@ void ZmqLiveSender::send(const ModuleFrameBuffer *meta, const char *data) } } -// if(zmq_send(socket_pulse_, &pulse_id, sizeof(pulse_id), 0) == -1) { -// throw runtime_error(zmq_strerror(errno)); -// } - // TODO: Here we need to send to streamvis and live analysis metadata(probably need to operate still on them) and data(not every frame) header.AddMember("frame", frame_index, header_alloc);