From 90534583a86ffaafb35d8b6bdd46bc8565c35e30 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Wed, 27 May 2020 18:25:33 +0200 Subject: [PATCH] Remove sf_replay as its not needed anymore --- CMakeLists.txt | 1 - sf-replay/CMakeLists.txt | 21 ---- sf-replay/include/BlockZmqSender.hpp | 23 ----- sf-replay/src/BlockZmqSender.cpp | 47 --------- sf-replay/src/main.cpp | 137 ------------------------- sf-replay/test/CMakeLists.txt | 10 -- sf-replay/test/main.cpp | 9 -- sf-replay/test/test_ReplayH5Reader.cpp | 73 ------------- 8 files changed, 321 deletions(-) delete mode 100644 sf-replay/CMakeLists.txt delete mode 100644 sf-replay/include/BlockZmqSender.hpp delete mode 100644 sf-replay/src/BlockZmqSender.cpp delete mode 100644 sf-replay/src/main.cpp delete mode 100644 sf-replay/test/CMakeLists.txt delete mode 100644 sf-replay/test/main.cpp delete mode 100644 sf-replay/test/test_ReplayH5Reader.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index aed6e98..44e8243 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -30,6 +30,5 @@ add_subdirectory( add_subdirectory("core-buffer") add_subdirectory("sf-buffer") -add_subdirectory("sf-replay") add_subdirectory("sf-stream") add_subdirectory("sf-writer") \ No newline at end of file diff --git a/sf-replay/CMakeLists.txt b/sf-replay/CMakeLists.txt deleted file mode 100644 index 51b15c6..0000000 --- a/sf-replay/CMakeLists.txt +++ /dev/null @@ -1,21 +0,0 @@ -file(GLOB SOURCES - src/*.cpp) - -add_library(sf-replay-lib STATIC ${SOURCES}) -target_include_directories(sf-replay-lib PUBLIC include/) -target_link_libraries(sf-replay-lib - external - core-buffer-lib) - -add_executable(sf-replay src/main.cpp) -set_target_properties(sf-replay PROPERTIES OUTPUT_NAME sf_replay) -target_link_libraries(sf-replay - core-buffer-lib - sf-replay-lib - zmq - hdf5 - hdf5_cpp - pthread) - -enable_testing() -add_subdirectory(test/) \ No newline at end of file diff --git a/sf-replay/include/BlockZmqSender.hpp b/sf-replay/include/BlockZmqSender.hpp deleted file mode 100644 index e4eb813..0000000 --- a/sf-replay/include/BlockZmqSender.hpp +++ /dev/null @@ -1,23 +0,0 @@ -#ifndef SF_DAQ_BUFFER_BLOCKZMQSENDER_HPP -#define SF_DAQ_BUFFER_BLOCKZMQSENDER_HPP - -#include -#include -#include - -class BlockZmqSender { - - void* ctx_; - void* socket_; - -public: - BlockZmqSender(const std::string& ipc_id, const int source_id); - virtual ~BlockZmqSender(); - - void close(); - - void send(const BufferBinaryBlock* block_data); -}; - - -#endif //SF_DAQ_BUFFER_BLOCKZMQSENDER_HPP diff --git a/sf-replay/src/BlockZmqSender.cpp b/sf-replay/src/BlockZmqSender.cpp deleted file mode 100644 index fc6667f..0000000 --- a/sf-replay/src/BlockZmqSender.cpp +++ /dev/null @@ -1,47 +0,0 @@ -#include "BlockZmqSender.hpp" - -#include -#include - -#include "buffer_config.hpp" - -using namespace std; -using namespace core_buffer; - - -BlockZmqSender::BlockZmqSender(const string& ipc_id, const int source_id) -{ - auto ipc_base = REPLAY_STREAM_IPC_URL + ipc_id + "-"; - stringstream ipc_stream; - ipc_stream << ipc_base << source_id; - const auto ipc_address = ipc_stream.str(); - - ctx_ = zmq_ctx_new(); - socket_ = zmq_socket(ctx_, ZMQ_PUSH); - - const int sndhwm = REPLAY_SNDHWM; - if (zmq_setsockopt(socket_, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)) != 0) - throw runtime_error(zmq_strerror (errno)); - - const int linger_ms = -1; - if (zmq_setsockopt(socket_, ZMQ_LINGER, &linger_ms, sizeof(linger_ms)) != 0) - throw runtime_error(zmq_strerror (errno)); - - if (zmq_bind(socket_, ipc_address.c_str()) != 0) - throw runtime_error(zmq_strerror (errno)); -} - -BlockZmqSender::~BlockZmqSender() -{ - close(); -} - -void BlockZmqSender::close() { - zmq_close(socket_); - zmq_ctx_destroy(ctx_); -} - -void BlockZmqSender::send(const BufferBinaryBlock* block_data) -{ - zmq_send(socket_, block_data, sizeof(BufferBinaryBlock), ZMQ_SNDMORE); -} diff --git a/sf-replay/src/main.cpp b/sf-replay/src/main.cpp deleted file mode 100644 index 5b96e46..0000000 --- a/sf-replay/src/main.cpp +++ /dev/null @@ -1,137 +0,0 @@ -#include -#include - -#include "FastQueue.hpp" -#include "buffer_config.hpp" -#include "BufferBinaryReader.hpp" -#include "BlockZmqSender.hpp" - -using namespace std; -using namespace core_buffer; -using namespace chrono; - -void sf_replay ( - const string device, - const string channel_name, - FastQueue& queue, - const uint64_t start_pulse_id, - const uint64_t stop_pulse_id - ) -{ - BufferBinaryReader block_reader(device, channel_name); - - uint64_t start_block = start_pulse_id / BUFFER_BLOCK_SIZE; - uint64_t stop_block = stop_pulse_id / BUFFER_BLOCK_SIZE; - - // "<= stop_block" because we include the stop_block in the transfer. - for (uint64_t curr_block=start_block; - curr_block <= stop_block; - curr_block++) { - - int slot_id; - while((slot_id = queue.reserve()) == -1) { - this_thread::sleep_for(chrono::milliseconds( - RB_READ_RETRY_INTERVAL_MS)); - } - - auto start_time = steady_clock::now(); - - auto block_buffer = queue.get_metadata_buffer(slot_id); - - block_reader.get_block(curr_block, block_buffer); - - auto end_time = steady_clock::now(); - uint64_t read_us_duration = duration_cast( - end_time-start_time).count(); - - queue.commit(); - - // TODO: Proper statistics - cout << "sf_replay:avg_read_us "; - cout << read_us_duration / BUFFER_BLOCK_SIZE << endl; - } -} - -int main (int argc, char *argv[]) { - - if (argc != 7) { - cout << endl; - cout << "Usage: sf_replay [ipc_id] [device]"; - cout << " [channel_name] [source_id] [start_pulse_id] [stop_pulse_id]"; - cout << endl; - cout << "\tdevice: Name of detector." << endl; - cout << "\tchannel_name: M00-M31 for JF16M." << endl; - cout << "\tsource_id: Module index" << endl; - cout << "\tstart_pulse_id: Start pulse_id of retrieval." << endl; - cout << "\tstop_pulse_id: Stop pulse_id of retrieval." << endl; - cout << endl; - - exit(-1); - } - - const string ipc_id = string(argv[1]); - const string device = string(argv[2]); - const string channel_name = string(argv[3]); - const auto source_id = atoi(argv[4]); - const auto start_pulse_id = (uint64_t) atoll(argv[5]); - const auto stop_pulse_id = (uint64_t) atoll(argv[6]); - - // 0 bytes for data since everything is in the header. - FastQueue queue(0, REPLAY_FASTQUEUE_N_SLOTS); - - thread file_read_thread(sf_replay, - device, channel_name, ref(queue), - start_pulse_id, stop_pulse_id); - - uint64_t send_us = 0; - uint64_t max_send_us = 0; - uint64_t n_stats = 0; - - BlockZmqSender sender(ipc_id, source_id); - - uint64_t start_block = start_pulse_id / BUFFER_BLOCK_SIZE; - uint64_t stop_block = stop_pulse_id / BUFFER_BLOCK_SIZE; - - // "<= stop_block" because we include the stop_block in the transfer. - for (uint64_t curr_block=start_block; - curr_block <= stop_block; - curr_block++) { - - int slot_id; - while((slot_id = queue.read()) == -1) { - this_thread::sleep_for(chrono::milliseconds( - RB_READ_RETRY_INTERVAL_MS)); - } - - auto block_buffer = queue.get_metadata_buffer(slot_id); - - auto start_time = steady_clock::now(); - - sender.send(block_buffer); - - auto end_time = steady_clock::now(); - uint64_t send_us_duration = - duration_cast(end_time-start_time).count(); - - queue.release(); - - // TODO: Proper statistics - n_stats++; - - send_us += send_us_duration; - max_send_us = max(max_send_us, send_us_duration); - - if (n_stats == STATS_MODULO) { - cout << "sf_replay:avg_send_us " << send_us / STATS_MODULO; - cout << " sf_replay:max_send_us " << max_send_us; - cout << endl; - - n_stats = 0; - send_us = 0; - max_send_us = 0; - } - } - - file_read_thread.join(); - return 0; -} diff --git a/sf-replay/test/CMakeLists.txt b/sf-replay/test/CMakeLists.txt deleted file mode 100644 index 45588cf..0000000 --- a/sf-replay/test/CMakeLists.txt +++ /dev/null @@ -1,10 +0,0 @@ -add_executable(sf-replay-tests main.cpp) - -target_link_libraries(sf-replay-tests - core-buffer-lib - sf-buffer-lib - sf-replay-lib - hdf5 - hdf5_cpp - gtest - ) diff --git a/sf-replay/test/main.cpp b/sf-replay/test/main.cpp deleted file mode 100644 index d40a5df..0000000 --- a/sf-replay/test/main.cpp +++ /dev/null @@ -1,9 +0,0 @@ -#include "gtest/gtest.h" -#include "test_ReplayH5Reader.cpp" - -using namespace std; - -int main(int argc, char **argv) { - ::testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); -} diff --git a/sf-replay/test/test_ReplayH5Reader.cpp b/sf-replay/test/test_ReplayH5Reader.cpp deleted file mode 100644 index 17be0a0..0000000 --- a/sf-replay/test/test_ReplayH5Reader.cpp +++ /dev/null @@ -1,73 +0,0 @@ -#include -#include - -#include "ReplayH5Reader.hpp" -#include "BufferH5Writer.hpp" - -using namespace std; -using namespace core_buffer; - -TEST(ReplayH5Reader, basic_interaction) -{ - auto root_folder = "."; - auto device_name = "fast_device"; - size_t pulse_id = 65; - uint16_t source_id = 124; - - // This 2 must be compatible by design. - BufferH5Writer writer(root_folder, device_name); - ReplayH5Reader reader(root_folder, device_name); - - ModuleFrame w_metadata; - ModuleFrame* r_metadata; - auto w_frame_buffer = make_unique(MODULE_N_PIXELS); - char* r_frame_buffer; - - // Setup test values. - w_metadata.pulse_id = pulse_id; - w_metadata.frame_index = 2; - w_metadata.daq_rec = 3; - w_metadata.n_received_packets = 128; - w_metadata.module_id = source_id; - - for (size_t i=0; ipulse_id, pulse_id); - ASSERT_EQ(r_metadata->module_id, source_id); - ASSERT_EQ(r_metadata->frame_index, 2); - ASSERT_EQ(r_metadata->daq_rec, 3); - ASSERT_EQ(r_metadata->n_received_packets, 128); - - // Data as well. - auto offset = MODULE_N_PIXELS * (pulse_id-1); - for (size_t i=0; ipulse_id, 0); - ASSERT_EQ(r_metadata->frame_index, 0); - ASSERT_EQ(r_metadata->daq_rec, 0); - ASSERT_EQ(r_metadata->n_received_packets, 0); - ASSERT_EQ(r_metadata->module_id, 0); - } - - reader.close_file(); -}