From c7185f249b510b5ab81c648e3abd3eff1a245c83 Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Fri, 17 Jul 2020 11:20:50 +0200 Subject: [PATCH] Cleanup sf_streamer --- sf-stream/include/FastQueue.hpp | 40 ------- sf-stream/include/LiveRecvModule.hpp | 34 ------ sf-stream/src/FastQueue.cpp | 107 ------------------ sf-stream/src/LiveRecvModule.cpp | 71 ------------ sf-stream/src/main.cpp | 2 +- sf-stream/test/main.cpp | 2 - sf-stream/test/test_FastQueue.cpp | 150 ------------------------- sf-stream/test/test_LiveRecvModule.cpp | 87 -------------- 8 files changed, 1 insertion(+), 492 deletions(-) delete mode 100644 sf-stream/include/FastQueue.hpp delete mode 100644 sf-stream/include/LiveRecvModule.hpp delete mode 100644 sf-stream/src/FastQueue.cpp delete mode 100644 sf-stream/src/LiveRecvModule.cpp delete mode 100644 sf-stream/test/test_FastQueue.cpp delete mode 100644 sf-stream/test/test_LiveRecvModule.cpp diff --git a/sf-stream/include/FastQueue.hpp b/sf-stream/include/FastQueue.hpp deleted file mode 100644 index 916b838..0000000 --- a/sf-stream/include/FastQueue.hpp +++ /dev/null @@ -1,40 +0,0 @@ -#ifndef FASTQUEUE_HPP -#define FASTQUEUE_HPP - -#include -#include -#include - -template -class FastQueue { - const size_t slot_n_bytes_; - const size_t n_slots_; - char* buffer_; - std::atomic_int* buffer_status_; - - std::atomic_int write_slot_id_; - std::atomic_int read_slot_id_; - -public: - - enum SLOT_STATUS { - EMPTY=0, - RESERVED=1, - READY=2 - }; - - FastQueue(const size_t slot_data_n_bytes, const uint16_t n_slots); - virtual ~FastQueue(); - - T* get_metadata_buffer(const int slot_id); - char* get_data_buffer(const int slot_id); - - int reserve(); - void commit(); - - int read(); - void release(); -}; - - -#endif //FASTQUEUE_HPP diff --git a/sf-stream/include/LiveRecvModule.hpp b/sf-stream/include/LiveRecvModule.hpp deleted file mode 100644 index 837e5e4..0000000 --- a/sf-stream/include/LiveRecvModule.hpp +++ /dev/null @@ -1,34 +0,0 @@ -#ifndef SF_DAQ_BUFFER_LIVERECVMODULE_HPP -#define SF_DAQ_BUFFER_LIVERECVMODULE_HPP - -#include -#include - -#include "FastQueue.hpp" -#include "jungfrau.hpp" -#include "formats.hpp" -#include "ZmqLiveReceiver.hpp" - - -class LiveRecvModule { - - FastQueue& queue_; - ZmqLiveReceiver& receiver_; - - std::atomic_bool is_receiving_; - std::thread receiving_thread_; - - void receive_thread(); - - void stop(); - -public: - LiveRecvModule( - FastQueue& queue, - ZmqLiveReceiver& receiver); - - ~LiveRecvModule(); -}; - - -#endif //SF_DAQ_BUFFER_LIVERECVMODULE_HPP diff --git a/sf-stream/src/FastQueue.cpp b/sf-stream/src/FastQueue.cpp deleted file mode 100644 index e8f5987..0000000 --- a/sf-stream/src/FastQueue.cpp +++ /dev/null @@ -1,107 +0,0 @@ -#include -#include -#include -#include "FastQueue.hpp" - -using namespace std; - -template -FastQueue::FastQueue( - const size_t slot_data_n_bytes, - const uint16_t n_slots) : - slot_n_bytes_(slot_data_n_bytes + sizeof(T)), - n_slots_(n_slots) -{ - buffer_ = new char[slot_n_bytes_ * n_slots_]; - buffer_status_ = new atomic_int[n_slots]; - - for (size_t i=0; i < n_slots_; i++) { - buffer_status_[i] = 0; - } - - write_slot_id_ = 0; - read_slot_id_ = 0; -} - -template -FastQueue::~FastQueue() -{ - delete[] buffer_; - delete[] buffer_status_; -} - -template -T* FastQueue::get_metadata_buffer(const int slot_id) -{ - return (T*)(buffer_ + (slot_id * slot_n_bytes_)); -} - -template -char* FastQueue::get_data_buffer(const int slot_id) -{ - return (char*)(buffer_ + (slot_id * slot_n_bytes_) + sizeof(T)); -} - -template -int FastQueue::reserve() -{ - int expected = SLOT_STATUS::EMPTY; - // If (buffer_status==SLOT_EMPTY) buffer_status=SLOT_RESERVED. - bool slot_reserved = - buffer_status_[write_slot_id_].compare_exchange_strong( - expected, SLOT_STATUS::RESERVED); - - if (!slot_reserved) { - return -1; - } - - return write_slot_id_; -} - -template -void FastQueue::commit() -{ - int expected = SLOT_STATUS::RESERVED; - // If (buffer_status==SLOT_RESERVED) buffer_status=SLOT_READY. - bool slot_ready = - buffer_status_[write_slot_id_].compare_exchange_strong( - expected, SLOT_STATUS::READY); - - if (!slot_ready) { - throw runtime_error("Slot should be reserved first."); - } - - write_slot_id_++; - write_slot_id_ = write_slot_id_ % n_slots_; -} - -template -int FastQueue::read() -{ - if (buffer_status_[read_slot_id_] != SLOT_STATUS::READY) { - return -1; - } - - return read_slot_id_; -} - -template -void FastQueue::release() -{ - int expected = SLOT_STATUS::READY; - // If (buffer_status==SLOT_RESERVED) buffer_status=SLOT_READY. - bool slot_empty = - buffer_status_[read_slot_id_].compare_exchange_strong( - expected, SLOT_STATUS::EMPTY); - - if (!slot_empty) { - throw runtime_error("Slot should be ready first."); - } - - read_slot_id_++; - read_slot_id_ = read_slot_id_ % n_slots_; -} - -template class FastQueue; -template class FastQueue; -template class FastQueue; diff --git a/sf-stream/src/LiveRecvModule.cpp b/sf-stream/src/LiveRecvModule.cpp deleted file mode 100644 index 264dc21..0000000 --- a/sf-stream/src/LiveRecvModule.cpp +++ /dev/null @@ -1,71 +0,0 @@ -#include "LiveRecvModule.hpp" - -#include -#include - -#include "buffer_config.hpp" -#include "stream_config.hpp" - -using namespace std; -using namespace chrono; -using namespace buffer_config; -using namespace stream_config; - -LiveRecvModule::LiveRecvModule( - FastQueue& queue_, - ZmqLiveReceiver& receiver) : - queue_(queue_), - receiver_(receiver), - is_receiving_(true) -{ - receiving_thread_ = thread(&LiveRecvModule::receive_thread, this); -} - -LiveRecvModule::~LiveRecvModule() -{ - stop(); -} - -void LiveRecvModule::stop() -{ - is_receiving_ = false; - receiving_thread_.join(); -} - -void LiveRecvModule::receive_thread() -{ - try { - - int slot_id; - while(is_receiving_.load(memory_order_relaxed)) { - - while ((slot_id == queue_.reserve()) == -1) { - this_thread::sleep_for(milliseconds(RB_READ_RETRY_INTERVAL_MS)); - } - - auto meta = queue_.get_metadata_buffer(slot_id); - auto data = queue_.get_data_buffer(slot_id); - - auto n_lost_pulses = receiver_.get_next_image(meta, data); - - if (n_lost_pulses > 0) { - cout << "sf_stream:sync_lost_pulses " << n_lost_pulses << endl; - } - - queue_.commit(); - } - - } catch (const std::exception& e) { - is_receiving_ = false; - - using namespace date; - using namespace chrono; - - cout << "[" << system_clock::now() << "]"; - cout << "[LiveRecvModule::receive_thread]"; - cout << " Stopped because of exception: " << endl; - cout << e.what() << endl; - - throw; - } -} \ No newline at end of file diff --git a/sf-stream/src/main.cpp b/sf-stream/src/main.cpp index 78e798d..07e0ac3 100644 --- a/sf-stream/src/main.cpp +++ b/sf-stream/src/main.cpp @@ -4,10 +4,10 @@ #include #include -#include "LiveRecvModule.hpp" #include "buffer_config.hpp" #include "stream_config.hpp" #include "ZmqLiveSender.hpp" +#include "ZmqLiveReceiver.hpp" using namespace std; using namespace chrono; diff --git a/sf-stream/test/main.cpp b/sf-stream/test/main.cpp index e4c5df7..e819294 100644 --- a/sf-stream/test/main.cpp +++ b/sf-stream/test/main.cpp @@ -1,6 +1,4 @@ #include "gtest/gtest.h" -#include "test_LiveRecvModule.cpp" -#include "test_FastQueue.cpp" using namespace std; diff --git a/sf-stream/test/test_FastQueue.cpp b/sf-stream/test/test_FastQueue.cpp deleted file mode 100644 index 8683242..0000000 --- a/sf-stream/test/test_FastQueue.cpp +++ /dev/null @@ -1,150 +0,0 @@ -//#include "FastQueue.hpp" -//#include "formats.hpp" -//#include "gtest/gtest.h" -// -//using namespace buffer_config; -// -//TEST(FastQueue, basic_interaction) -//{ -// size_t n_slots = 5; -// size_t slot_data_n_bytes = MODULE_N_BYTES * 2; -// FastQueue queue(slot_data_n_bytes, n_slots); -// int slot_id; -// -// // The queue at the beginning should be empty. -// ASSERT_EQ(queue.read(), -1); -// // Cannot commit a slot until you reserve it. -// ASSERT_THROW(queue.commit(), runtime_error); -// // Cannot release a slot until its ready. -// ASSERT_THROW(queue.release(), runtime_error); -// -// // Reserve a slot. -// slot_id = queue.reserve(); -// ASSERT_NE(slot_id, -1); -// // But you cannot reserve 2 slots at once. -// ASSERT_EQ(queue.reserve(), -1); -// // And cannot read this slot until its committed. -// ASSERT_EQ(queue.read(), -1); -// -// auto detector_frame = queue.get_metadata_buffer(slot_id); -// char* meta_ptr = (char*) detector_frame; -// char* data_ptr = (char*) queue.get_data_buffer(slot_id); -// -// queue.commit(); -// -// slot_id = queue.read(); -// // Once the slot is committed we should be able to read it. -// ASSERT_NE(slot_id, -1); -// // You can read the same slot multiple times. -// ASSERT_NE(queue.read(), -1); -// // The 2 buffers should match the committed slot. -// ASSERT_EQ(meta_ptr, (char*)(queue.get_metadata_buffer(slot_id))); -// ASSERT_EQ(data_ptr, (char*)(queue.get_data_buffer(slot_id))); -// -// queue.release(); -//} -// -//TEST(FastQueue, queue_full) -//{ -// size_t n_slots = 5; -// size_t slot_data_n_bytes = MODULE_N_BYTES * 2; -// FastQueue queue(slot_data_n_bytes, n_slots); -// -// // There is nothing to be read in the queue. -// ASSERT_EQ(queue.read(), -1); -// -// for (size_t i=0; i queue(slot_data_n_bytes, n_slots); -// -// int write_slot_id = queue.reserve(); -// -// auto w_metadata = queue.get_metadata_buffer(write_slot_id); -// w_metadata->pulse_id = 1; -// w_metadata->frame_index = 2; -// w_metadata->daq_rec = 3; -// w_metadata->is_good_frame = 4; -// -// auto w_data = (uint16_t*)(queue.get_data_buffer(write_slot_id)); -// for (size_t i=0; ipulse_id, -// r_metadata->pulse_id); -// EXPECT_EQ(w_metadata->frame_index, -// r_metadata->frame_index); -// EXPECT_EQ(w_metadata->daq_rec, -// r_metadata->daq_rec); -// EXPECT_EQ(w_metadata->is_good_frame, -// r_metadata->is_good_frame); -// -// auto r_data = (uint16_t*)(queue.get_data_buffer(read_slot_id)); -// for (size_t i=0; i queue( -// n_modules * MODULE_N_BYTES, -// WRITER_FASTQUEUE_N_SLOTS); -// -// ModuleFrame frame; -// -// auto slot_id = queue.reserve(); -// auto metadata = queue.get_metadata_buffer(slot_id); -// -// for (int i_module=0; i_modulemodule[i_module]; -// -// frame.pulse_id = i_module; -// frame.frame_index = i_module; -// frame.daq_rec = i_module; -// frame.n_recv_packets = i_module; -// frame.module_id = i_module; -// -// ModuleFrame* p_metadata = &module_metadata; -// -// memcpy(p_metadata, &frame, sizeof(ModuleFrame)); -// } -// -// for (int i_module=0; i_modulemodule[i_module]; -// -// ASSERT_EQ(module_metadata.pulse_id, i_module); -// ASSERT_EQ(module_metadata.frame_index, i_module); -// ASSERT_EQ(module_metadata.daq_rec, i_module); -// ASSERT_EQ(module_metadata.n_recv_packets, i_module); -// ASSERT_EQ(module_metadata.module_id, i_module); -// } -//} -// -//// TODO: Test with payload of zero (metadata only). \ No newline at end of file diff --git a/sf-stream/test/test_LiveRecvModule.cpp b/sf-stream/test/test_LiveRecvModule.cpp deleted file mode 100644 index 4fec472..0000000 --- a/sf-stream/test/test_LiveRecvModule.cpp +++ /dev/null @@ -1,87 +0,0 @@ -#include -#include "LiveRecvModule.hpp" -#include "gtest/gtest.h" -#include "buffer_config.hpp" -#include - -using namespace std; -using namespace buffer_config; - -TEST(LiveRecvModule, transfer_test) { - // TODO: Make this test work again. -// auto ctx = zmq_ctx_new(); -// -// size_t n_modules = 32; -// size_t n_slots = 5; -// FastQueue queue(MODULE_N_BYTES * n_modules, n_slots); -// -// void *sockets[n_modules]; -// for (size_t i = 0; i < n_modules; i++) { -// sockets[i] = zmq_socket(ctx, ZMQ_PUB); -// -// int linger = 0; -// if (zmq_setsockopt(sockets[i], ZMQ_LINGER, &linger, -// sizeof(linger)) != 0) { -// throw runtime_error(zmq_strerror(errno)); -// } -// -// stringstream ipc_addr; -// ipc_addr << BUFFER_LIVE_IPC_URL << i; -// const auto ipc = ipc_addr.str(); -// -// if (zmq_bind(sockets[i], ipc.c_str()) != 0) { -// throw runtime_error(zmq_strerror(errno)); -// } -// } -// -// LiveRecvModule recv_module(queue, n_modules, ctx, BUFFER_LIVE_IPC_URL); -// -// // Nothing should be committed, queue, should be empty. -// ASSERT_EQ(queue.read(), -1); -// -// ModuleFrame metadata; -// auto data = make_unique(MODULE_N_BYTES); -// -// for (size_t i = 0; i < n_modules; i++) { -// metadata.pulse_id = 1; -// metadata.frame_index = 2; -// metadata.daq_rec = 3; -// metadata.n_received_packets = 4; -// metadata.module_id = i; -// -// zmq_send(sockets[i], &metadata, sizeof(ModuleFrame), ZMQ_SNDMORE); -// zmq_send(sockets[i], data.get(), MODULE_N_BYTES, 0); -// } -// -// this_thread::sleep_for(chrono::milliseconds(100)); -// -// auto slot_id = queue.read(); -// // We should have the first Detector frame in the buffer. -// //ASSERT_NE(slot_id, -1); -// -// auto recv_stopped = async(launch::async, [&](){ -// recv_module.stop(); -// }); -// -// this_thread::sleep_for(chrono::milliseconds(100)); -// -// for (size_t i = 0; i < n_modules; i++) { -// metadata.pulse_id = 1; -// metadata.frame_index = 2; -// metadata.daq_rec = 3; -// metadata.n_received_packets = 4; -// metadata.module_id = i; -// -// zmq_send(sockets[i], &metadata, sizeof(ModuleFrame), ZMQ_SNDMORE); -// zmq_send(sockets[i], data.get(), MODULE_N_BYTES, 0); -// } -// -// recv_stopped.wait(); -// -// for (size_t i = 0; i < n_modules; i++) { -// zmq_close(sockets[i]); -// } -// -// zmq_ctx_destroy(ctx); -// cout << "We are finished" << endl; -} \ No newline at end of file