From 62f09eb12078589db95bef911bf2df2a2fb0e35c Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Wed, 20 May 2020 11:09:23 +0200 Subject: [PATCH] Merged to 1 project --- CMakeLists.txt | 1 - core-buffer/CMakeLists.txt | 50 +++- core-buffer/src/LiveH5Reader.cpp | 98 ------- core-buffer/src/RingBuffer.cpp | 255 ------------------ core-buffer/src/UdpRecvModule.cpp | 115 -------- .../src/{ => buffer}/BufferBinaryWriter.cpp | 0 .../src/{ => buffer}/BufferH5Writer.cpp | 0 .../src/{ => buffer}/BufferUdpReceiver.cpp | 0 core-buffer/src/{ => buffer}/UdpReceiver.cpp | 0 core-buffer/src/{ => buffer}/WriterUtils.cpp | 0 .../src/buffer}/sf_buffer.cpp | 0 .../src/{ => replay}/ReplayH5Reader.cpp | 0 .../src/replay}/sf_replay.cpp | 0 .../src/{ => stream}/LiveRecvModule.cpp | 0 .../src/stream}/sf_stream.cpp | 4 +- .../src/{ => writer}/BufferedFastQueue.cpp | 0 core-buffer/src/{ => writer}/FastQueue.cpp | 0 .../src/{ => writer}/WriterH5Writer.cpp | 0 .../src/{ => writer}/WriterZmqReceiver.cpp | 0 .../src/writer}/sf_writer.cpp | 0 core-buffer/test/main.cpp | 1 - core-buffer/test/test_UdpRecvModule.cpp | 89 ------ core-buffer/test/test_WriterH5Writer.cpp | 7 +- sf-buffer/CMakeLists.txt | 68 ----- sf-buffer/src/sf_live.cpp | 85 ------ sf-buffer/src/sf_reader.cpp | 121 --------- sf-buffer/test/CMakeLists.txt | 9 - sf-buffer/test/test_main.cpp | 9 - sf-buffer/test/test_sf_replay.cpp | 9 - 29 files changed, 55 insertions(+), 866 deletions(-) delete mode 100644 core-buffer/src/LiveH5Reader.cpp delete mode 100644 core-buffer/src/RingBuffer.cpp delete mode 100644 core-buffer/src/UdpRecvModule.cpp rename core-buffer/src/{ => buffer}/BufferBinaryWriter.cpp (100%) rename core-buffer/src/{ => buffer}/BufferH5Writer.cpp (100%) rename core-buffer/src/{ => buffer}/BufferUdpReceiver.cpp (100%) rename core-buffer/src/{ => buffer}/UdpReceiver.cpp (100%) rename core-buffer/src/{ => buffer}/WriterUtils.cpp (100%) rename {sf-buffer/src => core-buffer/src/buffer}/sf_buffer.cpp (100%) rename core-buffer/src/{ => replay}/ReplayH5Reader.cpp (100%) rename {sf-buffer/src => core-buffer/src/replay}/sf_replay.cpp (100%) rename core-buffer/src/{ => stream}/LiveRecvModule.cpp (100%) rename {sf-buffer/src => core-buffer/src/stream}/sf_stream.cpp (99%) rename core-buffer/src/{ => writer}/BufferedFastQueue.cpp (100%) rename core-buffer/src/{ => writer}/FastQueue.cpp (100%) rename core-buffer/src/{ => writer}/WriterH5Writer.cpp (100%) rename core-buffer/src/{ => writer}/WriterZmqReceiver.cpp (100%) rename {sf-buffer/src => core-buffer/src/writer}/sf_writer.cpp (100%) delete mode 100644 core-buffer/test/test_UdpRecvModule.cpp delete mode 100644 sf-buffer/CMakeLists.txt delete mode 100644 sf-buffer/src/sf_live.cpp delete mode 100644 sf-buffer/src/sf_reader.cpp delete mode 100644 sf-buffer/test/CMakeLists.txt delete mode 100644 sf-buffer/test/test_main.cpp delete mode 100644 sf-buffer/test/test_sf_replay.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 1969585..aea01a3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -29,4 +29,3 @@ add_subdirectory( EXCLUDE_FROM_ALL) add_subdirectory("core-buffer") -add_subdirectory("sf-buffer") diff --git a/core-buffer/CMakeLists.txt b/core-buffer/CMakeLists.txt index 5be991e..1b931e5 100644 --- a/core-buffer/CMakeLists.txt +++ b/core-buffer/CMakeLists.txt @@ -1,7 +1,11 @@ add_subdirectory(external/) file(GLOB SOURCES - src/*.cpp) + src/*.cpp + src/buffer/*.cpp + src/replay/*.cpp + src/stream/*.cpp + src/writer/*.cpp) add_library(core-buffer STATIC ${SOURCES}) target_include_directories(core-buffer PUBLIC include/) @@ -11,5 +15,49 @@ if(CMAKE_BUILD_TYPE STREQUAL "Debug") target_compile_definitions(core-buffer PRIVATE DEBUG_OUTPUT) endif() +add_executable(sf-buffer src/buffer/sf_buffer.cpp) +set_target_properties(sf-buffer PROPERTIES OUTPUT_NAME sf_buffer) +target_link_libraries(sf-buffer + core-buffer + zmq + hdf5 + hdf5_hl + hdf5_cpp + boost_system + pthread) + +add_executable(sf-replay src/replay/sf_replay.cpp) +set_target_properties(sf-replay PROPERTIES OUTPUT_NAME sf_replay) +target_link_libraries(sf-replay + core-buffer + external + zmq + hdf5 + hdf5_hl + hdf5_cpp + boost_system + pthread) + +add_executable(sf-writer src/writer/sf_writer.cpp) +set_target_properties(sf-writer PROPERTIES OUTPUT_NAME sf_writer) +target_link_libraries(sf-writer + core-buffer + external + zmq + hdf5 + hdf5_hl + hdf5_cpp + boost_system + pthread) + +add_executable(sf-stream src/stream/sf_stream.cpp) +set_target_properties(sf-stream PROPERTIES OUTPUT_NAME sf_stream) +target_link_libraries(sf-stream + core-buffer + zmq + jsoncpp + boost_system + pthread) + enable_testing() add_subdirectory(test/) \ No newline at end of file diff --git a/core-buffer/src/LiveH5Reader.cpp b/core-buffer/src/LiveH5Reader.cpp deleted file mode 100644 index e5ef1d8..0000000 --- a/core-buffer/src/LiveH5Reader.cpp +++ /dev/null @@ -1,98 +0,0 @@ -#include "LiveH5Reader.hpp" -#include "BufferUtils.hpp" - -using namespace std; -using namespace core_buffer; - -LiveH5Reader::LiveH5Reader( - const std::string& device, - const std::string& channel_name, - const uint16_t source_id): - current_filename_(device + "/" + channel_name + "/CURRENT"), - source_id_(source_id), - pulse_id_buffer_(make_unique(FILE_MOD)), - data_buffer_(make_unique(MODULE_N_PIXELS)) -{ -// auto filename = BufferUtils::get_latest_file(current_filename_); -// file_ = H5::H5File(filename, H5F_ACC_RDONLY | H5F_ACC_SWMR_READ); -// -// uint64_t base_pulse_id = start_pulse_id / core_buffer::FILE_MOD; -// base_pulse_id *= core_buffer::FILE_MOD; -// -// current_file_max_pulse_id_ = -// -// image_dataset_ = input_file.openDataSet("image"); -// pulse_id_dataset_ = input_file.openDataSet("pulse_id"); -// frame_index_dataset_ = input_file.openDataSet("frame_id"); -// daq_rec_dataset_ = input_file.openDataSet("daq_rec"); -// n_received_packets_dataset_ = input_file.openDataSet("received_packets"); - -} - -LiveH5Reader::~LiveH5Reader() { - close_file(); -} - - - -//void load_data_from_file ( -// FileBufferMetadata* metadata_buffer, -// char* image_buffer, -// const string &filename, -// const size_t start_index) -//{ -// -// hsize_t b_image_dim[3] = {REPLAY_READ_BLOCK_SIZE, 512, 1024}; -// H5::DataSpace b_i_space (3, b_image_dim); -// hsize_t b_i_count[] = {REPLAY_READ_BLOCK_SIZE, 512, 1024}; -// hsize_t b_i_start[] = {0, 0, 0}; -// b_i_space.selectHyperslab(H5S_SELECT_SET, b_i_count, b_i_start); -// -// hsize_t f_image_dim[3] = {FILE_MOD, 512, 1024}; -// H5::DataSpace f_i_space (3, f_image_dim); -// hsize_t f_i_count[] = {REPLAY_READ_BLOCK_SIZE, 512, 1024}; -// hsize_t f_i_start[] = {start_index, 0, 0}; -// f_i_space.selectHyperslab(H5S_SELECT_SET, f_i_count, f_i_start); -// -// hsize_t b_metadata_dim[2] = {REPLAY_READ_BLOCK_SIZE, 1}; -// H5::DataSpace b_m_space (2, b_metadata_dim); -// hsize_t b_m_count[] = {REPLAY_READ_BLOCK_SIZE, 1}; -// hsize_t b_m_start[] = {0, 0}; -// b_m_space.selectHyperslab(H5S_SELECT_SET, b_m_count, b_m_start); -// -// hsize_t f_metadata_dim[2] = {FILE_MOD, 1}; -// H5::DataSpace f_m_space (2, f_metadata_dim); -// hsize_t f_m_count[] = {REPLAY_READ_BLOCK_SIZE, 1}; -// hsize_t f_m_start[] = {start_index, 0}; -// f_m_space.selectHyperslab(H5S_SELECT_SET, f_m_count, f_m_start); -// -// H5::H5File input_file(filename, H5F_ACC_RDONLY); -// -// auto image_dataset = input_file.openDataSet("image"); -// image_dataset.read( -// image_buffer, H5::PredType::NATIVE_UINT16, -// b_i_space, f_i_space); -// -// auto pulse_id_dataset = input_file.openDataSet("pulse_id"); -// pulse_id_dataset.read( -// metadata_buffer->pulse_id, H5::PredType::NATIVE_UINT64, -// b_m_space, f_m_space); -// -// auto frame_id_dataset = input_file.openDataSet("frame_id"); -// frame_id_dataset.read( -// metadata_buffer->frame_index, H5::PredType::NATIVE_UINT64, -// b_m_space, f_m_space); -// -// auto daq_rec_dataset = input_file.openDataSet("daq_rec"); -// daq_rec_dataset.read( -// metadata_buffer->daq_rec, H5::PredType::NATIVE_UINT32, -// b_m_space, f_m_space); -// -// auto received_packets_dataset = -// input_file.openDataSet("received_packets"); -// received_packets_dataset.read( -// metadata_buffer->n_received_packets, H5::PredType::NATIVE_UINT16, -// b_m_space, f_m_space); -// -// input_file.close(); -//} \ No newline at end of file diff --git a/core-buffer/src/RingBuffer.cpp b/core-buffer/src/RingBuffer.cpp deleted file mode 100644 index 61e1f02..0000000 --- a/core-buffer/src/RingBuffer.cpp +++ /dev/null @@ -1,255 +0,0 @@ -#include -#include -#include - -#include "RingBuffer.hpp" - -using namespace std; - -template -RingBuffer::RingBuffer(size_t n_slots) : - n_slots_(n_slots), - ringbuffer_slots_(n_slots, 0) -{ -} - -template -RingBuffer::~RingBuffer() -{ - if (frame_data_buffer_ != NULL) { - free(frame_data_buffer_); - frame_data_buffer_ = NULL; - } -} - -template -void RingBuffer::initialize(const size_t requested_slot_size) -{ - if (is_initialized()) { - return; - } - - lock_guard lock(ringbuffer_slots_mutex_); - - if (initialized_) { - - if (requested_slot_size > slot_size_) { - stringstream err_msg; - - using namespace date; - using namespace chrono; - err_msg << "[" << system_clock::now() << "]"; - err_msg << "[RingBuffer::initialize]"; - err_msg << " Already initialized with smaller slot_size "; - err_msg << slot_size_ << " than requested_slot_size "; - err_msg << requested_slot_size << endl; - - throw runtime_error(err_msg.str()); - } - - return; - } - - write_index_ = 0; - slot_size_ = requested_slot_size; - buffer_size_ = slot_size_ * n_slots_; - frame_data_buffer_ = new char[buffer_size_]; - buffer_used_slots_ = 0; - - initialized_ = true; -} - -template -char* RingBuffer::reserve(shared_ptr frame_metadata) -{ - if (!is_initialized()) { - stringstream err_msg; - - using namespace date; - using namespace chrono; - err_msg << "[" << system_clock::now() << "]"; - err_msg << "[RingBuffer::reserve]"; - err_msg << " Ringbuffer not initialized."; - - throw runtime_error(err_msg.str()); - } - - if (frame_metadata->frame_bytes_size > slot_size_) { - stringstream err_msg; - - using namespace date; - using namespace chrono; - err_msg << "[" << system_clock::now() << "]"; - err_msg << "[RingBuffer::reserve]"; - err_msg << " Received frame index " << frame_metadata->frame_index; - err_msg << " that is too large for ring buffer slot."; - err_msg << " Slot size " << slot_size_ << ", but frame bytes size "; - err_msg << frame_metadata->frame_bytes_size << endl; - - throw runtime_error(err_msg.str()); - } - - // Check and reserve slot in the buffer. - { - lock_guard lock(ringbuffer_slots_mutex_); - - if (!ringbuffer_slots_[write_index_]) { - ringbuffer_slots_[write_index_] = true; - - frame_metadata->buffer_slot_index = write_index_; - - write_index_ = (write_index_ + 1) % n_slots_; - buffer_used_slots_++; - - } else { - return nullptr; - } - } - - return get_buffer_slot_address(frame_metadata->buffer_slot_index); -} - -template -void RingBuffer::commit(shared_ptr frame_metadata) -{ - lock_guard lock(frame_metadata_queue_mutex_); - - frame_metadata_queue_.push_back(frame_metadata); -} - -template -char* RingBuffer::get_buffer_slot_address(size_t buffer_slot_index) -{ - char* slot_memory_address = - frame_data_buffer_ + (buffer_slot_index * slot_size_); - - // Check if the memory address is valid. - if (slot_memory_address > frame_data_buffer_ + buffer_size_) { - stringstream err_msg; - - using namespace date; - using namespace chrono; - err_msg << "[" << system_clock::now() << "]"; - err_msg << "[RingBuffer::get_buffer_slot_address]"; - err_msg << " Ring buffer address out of range." << endl; - - throw runtime_error(err_msg.str()); - } - - return slot_memory_address; -} - -template -pair, char*> RingBuffer::read() -{ - shared_ptr frame_metadata; - - { - lock_guard lock(frame_metadata_queue_mutex_); - - // A NULL char* means no waiting data in the ring buffer. - if (frame_metadata_queue_.empty()) { - return {NULL, NULL}; - } - - frame_metadata = frame_metadata_queue_.front(); - frame_metadata_queue_.pop_front(); - } - - // Check if the references ring buffer slot is valid. - { - lock_guard lock(ringbuffer_slots_mutex_); - - if (!ringbuffer_slots_[frame_metadata->buffer_slot_index]) { - stringstream err_msg; - - using namespace date; - using namespace chrono; - err_msg << "[" << system_clock::now() << "]"; - err_msg << "[RingBuffer::read]"; - err_msg << " Ring buffer slot"; - err_msg << " referenced in message header "; - err_msg << frame_metadata->buffer_slot_index << " is empty."; - err_msg << endl; - - throw runtime_error(err_msg.str()); - } - } - - return {frame_metadata, - get_buffer_slot_address(frame_metadata->buffer_slot_index)}; -} - -template -void RingBuffer::release(size_t buffer_slot_index) -{ - if (buffer_slot_index >= n_slots_) { - stringstream err_msg; - - using namespace date; - using namespace chrono; - err_msg << "[" << system_clock::now() << "]"; - err_msg << "[RingBuffer::release]"; - err_msg << " Slot index " << buffer_slot_index << " is out of range."; - err_msg << " Ring buffer n_slots = " << n_slots_ << endl; - - throw runtime_error(err_msg.str()); - } - - { - lock_guard lock(ringbuffer_slots_mutex_); - - if (ringbuffer_slots_[buffer_slot_index]) { - ringbuffer_slots_[buffer_slot_index] = false; - - buffer_used_slots_--; - - } else { - stringstream err_msg; - - using namespace date; - using namespace chrono; - err_msg << "[" << system_clock::now() << "]"; - err_msg << "[RingBuffer::release]"; - err_msg << " Cannot release empty ring buffer slot "; - err_msg << buffer_slot_index << endl; - - throw runtime_error(err_msg.str()); - } - } -} - -template -bool RingBuffer::is_empty() -{ - lock_guard lock(ringbuffer_slots_mutex_); - - return buffer_used_slots_ == 0; -} - -template -bool RingBuffer::is_initialized() -{ - return initialized_.load(memory_order_relaxed); -} - -template -void RingBuffer::clear() -{ - lock_guard lock_slots(ringbuffer_slots_mutex_); - lock_guard lock_metadata(frame_metadata_queue_mutex_); - - write_index_ = 0; - buffer_used_slots_ = 0; - ringbuffer_slots_ = vector(n_slots_, 0); - frame_metadata_queue_.clear(); -} - -template -size_t RingBuffer::get_slot_size() -{ - return slot_size_; -} - -template class RingBuffer; -template class RingBuffer; \ No newline at end of file diff --git a/core-buffer/src/UdpRecvModule.cpp b/core-buffer/src/UdpRecvModule.cpp deleted file mode 100644 index d21f438..0000000 --- a/core-buffer/src/UdpRecvModule.cpp +++ /dev/null @@ -1,115 +0,0 @@ -#include "UdpRecvModule.hpp" -#include "jungfrau.hpp" -#include -#include - -using namespace std; - -UdpRecvModule::UdpRecvModule( - FastQueue& queue, - const uint16_t udp_port) : - queue_(queue), - is_receiving_(true) -{ - receiving_thread_ = thread( - &UdpRecvModule::receive_thread, this, - udp_port); -} - -UdpRecvModule::~UdpRecvModule() -{ - is_receiving_ = false; - receiving_thread_.join(); -} - -inline void UdpRecvModule::init_frame ( - ModuleFrame* frame_metadata, - jungfrau_packet& packet_buffer) -{ - frame_metadata->pulse_id = packet_buffer.bunchid; - frame_metadata->frame_index = packet_buffer.framenum; - frame_metadata->daq_rec = (uint64_t)packet_buffer.debug; -} - -inline void UdpRecvModule::reserve_next_frame_buffers( - ModuleFrame*& frame_metadata, - char*& frame_buffer) -{ - int slot_id; - if ((slot_id = queue_.reserve()) == -1) - throw runtime_error("Queue is full."); - - frame_metadata = queue_.get_metadata_buffer(slot_id); - frame_metadata->pulse_id = 0; - frame_metadata->n_received_packets = 0; - - frame_buffer = queue_.get_data_buffer(slot_id); - memset(frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME); -} - -void UdpRecvModule::receive_thread(const uint16_t udp_port) -{ - try { - - UdpReceiver udp_receiver; - udp_receiver.bind(udp_port); - - ModuleFrame* frame_metadata; - char* frame_buffer; - reserve_next_frame_buffers(frame_metadata, frame_buffer); - - jungfrau_packet packet_buffer; - - while (is_receiving_.load(memory_order_relaxed)) { - - if (!udp_receiver.receive( - &packet_buffer, - JUNGFRAU_BYTES_PER_PACKET)) { - continue; - } - - // First packet for this frame. - if (frame_metadata->pulse_id == 0) { - init_frame(frame_metadata, packet_buffer); - - // Happens if the last packet from the previous frame gets lost. - } else if (frame_metadata->pulse_id != packet_buffer.bunchid) { - queue_.commit(); - reserve_next_frame_buffers(frame_metadata, frame_buffer); - - init_frame(frame_metadata, packet_buffer); - } - - size_t frame_buffer_offset = - JUNGFRAU_DATA_BYTES_PER_PACKET * packet_buffer.packetnum; - - memcpy( - (void*) (frame_buffer + frame_buffer_offset), - packet_buffer.data, - JUNGFRAU_DATA_BYTES_PER_PACKET); - - frame_metadata->n_received_packets++; - - // Last frame packet received. Frame finished. - if (packet_buffer.packetnum == JUNGFRAU_N_PACKETS_PER_FRAME-1) - { - queue_.commit(); - reserve_next_frame_buffers(frame_metadata, frame_buffer); - this_thread::yield(); - } - } - - } catch (const std::exception& e) { - is_receiving_ = false; - - using namespace date; - using namespace chrono; - - cout << "[" << system_clock::now() << "]"; - cout << "[UdpRecvModule::receive_thread]"; - cout << " Stopped because of exception: " << endl; - cout << e.what() << endl; - - throw; - } -} \ No newline at end of file diff --git a/core-buffer/src/BufferBinaryWriter.cpp b/core-buffer/src/buffer/BufferBinaryWriter.cpp similarity index 100% rename from core-buffer/src/BufferBinaryWriter.cpp rename to core-buffer/src/buffer/BufferBinaryWriter.cpp diff --git a/core-buffer/src/BufferH5Writer.cpp b/core-buffer/src/buffer/BufferH5Writer.cpp similarity index 100% rename from core-buffer/src/BufferH5Writer.cpp rename to core-buffer/src/buffer/BufferH5Writer.cpp diff --git a/core-buffer/src/BufferUdpReceiver.cpp b/core-buffer/src/buffer/BufferUdpReceiver.cpp similarity index 100% rename from core-buffer/src/BufferUdpReceiver.cpp rename to core-buffer/src/buffer/BufferUdpReceiver.cpp diff --git a/core-buffer/src/UdpReceiver.cpp b/core-buffer/src/buffer/UdpReceiver.cpp similarity index 100% rename from core-buffer/src/UdpReceiver.cpp rename to core-buffer/src/buffer/UdpReceiver.cpp diff --git a/core-buffer/src/WriterUtils.cpp b/core-buffer/src/buffer/WriterUtils.cpp similarity index 100% rename from core-buffer/src/WriterUtils.cpp rename to core-buffer/src/buffer/WriterUtils.cpp diff --git a/sf-buffer/src/sf_buffer.cpp b/core-buffer/src/buffer/sf_buffer.cpp similarity index 100% rename from sf-buffer/src/sf_buffer.cpp rename to core-buffer/src/buffer/sf_buffer.cpp diff --git a/core-buffer/src/ReplayH5Reader.cpp b/core-buffer/src/replay/ReplayH5Reader.cpp similarity index 100% rename from core-buffer/src/ReplayH5Reader.cpp rename to core-buffer/src/replay/ReplayH5Reader.cpp diff --git a/sf-buffer/src/sf_replay.cpp b/core-buffer/src/replay/sf_replay.cpp similarity index 100% rename from sf-buffer/src/sf_replay.cpp rename to core-buffer/src/replay/sf_replay.cpp diff --git a/core-buffer/src/LiveRecvModule.cpp b/core-buffer/src/stream/LiveRecvModule.cpp similarity index 100% rename from core-buffer/src/LiveRecvModule.cpp rename to core-buffer/src/stream/LiveRecvModule.cpp diff --git a/sf-buffer/src/sf_stream.cpp b/core-buffer/src/stream/sf_stream.cpp similarity index 99% rename from sf-buffer/src/sf_stream.cpp rename to core-buffer/src/stream/sf_stream.cpp index 9b71ee8..18f578f 100644 --- a/sf-buffer/src/sf_stream.cpp +++ b/core-buffer/src/stream/sf_stream.cpp @@ -101,7 +101,7 @@ int main (int argc, char *argv[]) frame_index = module_metadata.frame_index; daq_rec = module_metadata.daq_rec; - if ( module_metadata.n_received_packets != 128 ) is_good_frame = false; + if ( module_metadata.n_received_packets != 128 ) is_good_frame = false; } else { if (module_metadata.pulse_id != pulse_id) is_good_frame = false; @@ -114,7 +114,7 @@ int main (int argc, char *argv[]) } //Here we need to send to streamvis and live analysis metadata(probably need to operate still on them) and data(not every frame) - + header["frame"] = (Json::Value::UInt64)frame_index; header["is_good_frame"] = is_good_frame; header["daq_rec"] = (Json::Value::UInt64)daq_rec; diff --git a/core-buffer/src/BufferedFastQueue.cpp b/core-buffer/src/writer/BufferedFastQueue.cpp similarity index 100% rename from core-buffer/src/BufferedFastQueue.cpp rename to core-buffer/src/writer/BufferedFastQueue.cpp diff --git a/core-buffer/src/FastQueue.cpp b/core-buffer/src/writer/FastQueue.cpp similarity index 100% rename from core-buffer/src/FastQueue.cpp rename to core-buffer/src/writer/FastQueue.cpp diff --git a/core-buffer/src/WriterH5Writer.cpp b/core-buffer/src/writer/WriterH5Writer.cpp similarity index 100% rename from core-buffer/src/WriterH5Writer.cpp rename to core-buffer/src/writer/WriterH5Writer.cpp diff --git a/core-buffer/src/WriterZmqReceiver.cpp b/core-buffer/src/writer/WriterZmqReceiver.cpp similarity index 100% rename from core-buffer/src/WriterZmqReceiver.cpp rename to core-buffer/src/writer/WriterZmqReceiver.cpp diff --git a/sf-buffer/src/sf_writer.cpp b/core-buffer/src/writer/sf_writer.cpp similarity index 100% rename from sf-buffer/src/sf_writer.cpp rename to core-buffer/src/writer/sf_writer.cpp diff --git a/core-buffer/test/main.cpp b/core-buffer/test/main.cpp index 209c258..b8e0cae 100644 --- a/core-buffer/test/main.cpp +++ b/core-buffer/test/main.cpp @@ -1,6 +1,5 @@ #include "gtest/gtest.h" #include "test_UdpReceiver.cpp" -#include "test_UdpRecvModule.cpp" #include "test_BufferBinaryWriter.cpp" #include "test_buffer_utils.cpp" #include "test_BufferH5Writer.cpp" diff --git a/core-buffer/test/test_UdpRecvModule.cpp b/core-buffer/test/test_UdpRecvModule.cpp deleted file mode 100644 index a4ecdf6..0000000 --- a/core-buffer/test/test_UdpRecvModule.cpp +++ /dev/null @@ -1,89 +0,0 @@ -#include "gtest/gtest.h" -#include "UdpRecvModule.hpp" -#include "jungfrau.hpp" -#include "mock/udp.hpp" - -using namespace std; - -TEST(UdpRecvModule, basic_interaction) -{ - uint16_t udp_port(MOCK_UDP_PORT); - - FastQueue queue(JUNGFRAU_DATA_BYTES_PER_FRAME, 10); - UdpRecvModule udp_recv_module(queue, udp_port); -} - -TEST(UdpRecvModule, simple_recv) -{ - int slot_id; - uint16_t udp_port(MOCK_UDP_PORT); - size_t n_msg(128); - - FastQueue queue(JUNGFRAU_DATA_BYTES_PER_FRAME, 10); - UdpRecvModule udp_recv_module(queue, udp_port); - - this_thread::sleep_for(chrono::milliseconds(100)); - - // The first slot should not be available to read yet. - ASSERT_EQ(queue.read(), -1); - - auto send_socket_fd = socket(AF_INET,SOCK_DGRAM,0); - ASSERT_TRUE(send_socket_fd >= 0); - - auto server_address = get_server_address(udp_port); - - jungfrau_packet send_udp_buffer; - send_udp_buffer.bunchid = 100; - send_udp_buffer.debug = 1000; - - send_udp_buffer.framenum = 1; - for (size_t i=0; iframe_index, 1); - queue.release(); - - // Next slot not yet ready. - ASSERT_EQ(queue.read(), -1); - - send_udp_buffer.framenum = 2; - for (size_t i=0; i<128; i++){ - send_udp_buffer.packetnum = i; - - ::sendto( - send_socket_fd, - &send_udp_buffer, - JUNGFRAU_BYTES_PER_PACKET, - 0, - (sockaddr*) &server_address, - sizeof(server_address)); - - } - - this_thread::sleep_for(chrono::milliseconds(100)); - - slot_id = queue.read(); - // This time we are supposed to get slot 1. - ASSERT_EQ(slot_id, 1); - // We sent a frame with frame_index == 2. - ASSERT_EQ(queue.get_metadata_buffer(slot_id)->frame_index, 2); - queue.release(); - - ::close(send_socket_fd); -} diff --git a/core-buffer/test/test_WriterH5Writer.cpp b/core-buffer/test/test_WriterH5Writer.cpp index 20a571e..fd3f91c 100644 --- a/core-buffer/test/test_WriterH5Writer.cpp +++ b/core-buffer/test/test_WriterH5Writer.cpp @@ -12,12 +12,13 @@ TEST(WriterH5Writer, basic_interaction) size_t n_frames = 5; auto data = make_unique(n_modules*MODULE_N_BYTES); - auto metadata = make_shared(); + auto metadata = make_shared(); // Needed by writer. - metadata->data_n_bytes = 500; + metadata->data_n_bytes[0] = 500; + metadata->n_pulses_in_buffer = 1; - WriterH5Writer writer("ignore.h5", n_frames, n_modules, 1); + WriterH5Writer writer("ignore.h5", n_frames, n_modules); writer.write(metadata.get(), data.get()); writer.close_file(); } diff --git a/sf-buffer/CMakeLists.txt b/sf-buffer/CMakeLists.txt deleted file mode 100644 index 8a7152b..0000000 --- a/sf-buffer/CMakeLists.txt +++ /dev/null @@ -1,68 +0,0 @@ -add_executable(sf-buffer src/sf_buffer.cpp) -set_target_properties(sf-buffer PROPERTIES OUTPUT_NAME sf_buffer) -target_link_libraries(sf-buffer - core-buffer - zmq - hdf5 - hdf5_hl - hdf5_cpp - boost_system - pthread) - -#add_executable(sf-reader src/sf_reader.cpp) -#set_target_properties(sf-reader PROPERTIES OUTPUT_NAME sf_reader) -#target_link_libraries(sf-reader -# core-buffer -# zmq -# hdf5 -# hdf5_hl -# hdf5_cpp -# boost_system -# pthread) - -add_executable(sf-replay src/sf_replay.cpp) -set_target_properties(sf-replay PROPERTIES OUTPUT_NAME sf_replay) -target_link_libraries(sf-replay - core-buffer - external - zmq - hdf5 - hdf5_hl - hdf5_cpp - boost_system - pthread) - -#add_executable(sf-live src/sf_live.cpp) -#set_target_properties(sf-live PROPERTIES OUTPUT_NAME sf_live) -#target_link_libraries(sf-live -# core-buffer -# zmq -# hdf5 -# hdf5_hl -# hdf5_cpp -# boost_system -# pthread) - -add_executable(sf-writer src/sf_writer.cpp) -set_target_properties(sf-writer PROPERTIES OUTPUT_NAME sf_writer) -target_link_libraries(sf-writer - core-buffer - external - zmq - hdf5 - hdf5_hl - hdf5_cpp - boost_system - pthread) - -add_executable(sf-stream src/sf_stream.cpp) -set_target_properties(sf-stream PROPERTIES OUTPUT_NAME sf_stream) -target_link_libraries(sf-stream - core-buffer - zmq - jsoncpp - boost_system - pthread) - -enable_testing() -add_subdirectory(test/) diff --git a/sf-buffer/src/sf_live.cpp b/sf-buffer/src/sf_live.cpp deleted file mode 100644 index b699485..0000000 --- a/sf-buffer/src/sf_live.cpp +++ /dev/null @@ -1,85 +0,0 @@ -#include -#include "jungfrau.hpp" -#include "zmq.h" -#include "buffer_config.hpp" -#include -#include "date.h" -#include "LiveH5Reader.hpp" - -using namespace std; -using namespace core_buffer; - -void sf_live ( - void* socket, - const string& device, - const string& channel_name, - const uint16_t source_id) -{ - LiveH5Reader reader(device, channel_name, source_id); - - auto current_pulse_id = reader.get_latest_pulse_id(); - while (true) { - - reader.load_pulse_id(current_pulse_id); - - auto metadata = reader.get_metadata(); - - zmq_send(socket, - &metadata, - sizeof(ModuleFrame), - ZMQ_SNDMORE); - - auto data = reader.get_data(); - - zmq_send(socket, - data, - MODULE_N_BYTES, - 0); - - current_pulse_id++; - } - - reader.close_file(); -} - -int main (int argc, char *argv[]) { - - if (argc != 6) { - cout << endl; - cout << "Usage: sf_live [device] [channel_name] [source_id]"; - cout << endl; - cout << "\tdevice: Name of detector." << endl; - cout << "\tchannel_name: M00-M31 for JF16M." << endl; - cout << "\tsource_id: Module index" << endl; - cout << endl; - - exit(-1); - } - - const string device = string(argv[1]); - const string channel_name = string(argv[2]); - const uint16_t source_id = (uint16_t) atoi(argv[3]); - - stringstream ipc_stream; - ipc_stream << BUFFER_LIVE_IPC_URL << (int)source_id; - const auto ipc_address = ipc_stream.str(); - - auto ctx = zmq_ctx_new(); - auto socket = zmq_socket(ctx, ZMQ_PUSH); - - const int sndhwm = REPLAY_READ_BLOCK_SIZE; - if (zmq_setsockopt(socket, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)) != 0) - throw runtime_error(strerror (errno)); - - const int linger_ms = 0; - if (zmq_setsockopt(socket, ZMQ_LINGER, &linger_ms, sizeof(linger_ms)) != 0) - throw runtime_error(strerror (errno)); - - if (zmq_connect(socket, ipc_address.c_str()) != 0) - throw runtime_error(strerror (errno)); - - sf_live(socket, device, channel_name, source_id); - - zmq_close(socket); - zmq_ctx_destroy(ctx); -} diff --git a/sf-buffer/src/sf_reader.cpp b/sf-buffer/src/sf_reader.cpp deleted file mode 100644 index f23c3fd..0000000 --- a/sf-buffer/src/sf_reader.cpp +++ /dev/null @@ -1,121 +0,0 @@ -#include -#include -#include -#include -#include -#include "jungfrau.hpp" -#include "BufferUtils.hpp" - - -using namespace std; - -int main (int argc, char *argv[]) { - if (argc != 3) { - cout << endl; - cout << "Usage: sf_reader [device_name] [root_folder]"; - cout << endl; - cout << "\tdevice_name: Device files to read."; - cout << "\troot_folder: FS root folder." << endl; - cout << endl; - - exit(-1); - } - - string device_name = string(argv[1]); - string root_folder = string(argv[2]); - - string current_filename = root_folder + "/" + device_name + "/CURRENT"; - - uint64_t pulse_id_buffer[1000]; - uint16_t* image_buffer = new uint16_t[100*512*1024]; - - string last_open_file = ""; - uint64_t last_pulse_id = 0; - - int current_file_last_processed = -1; - - while (true) { -// auto filename = BufferUtils::get_latest_file(current_filename); -// -// // Next file not yet ready. -// if (last_open_file == filename) { -// this_thread::sleep_for(chrono::milliseconds(100)); -// cout << "Waiting for CURRENT to change." << endl; -// continue; -// } -// -// std::cout << "Opening " << filename << endl; -// last_open_file = filename; -// current_file_last_processed = -1; -// -// H5::H5File input_file(filename, H5F_ACC_RDONLY | H5F_ACC_SWMR_READ); -// auto image_dataset = input_file.openDataSet("image"); -// auto pulse_id_dataset = input_file.openDataSet("pulse_id"); -// -// ::memset(pulse_id_buffer, 0, sizeof(pulse_id_buffer)); -// -// while (true) { -// -// pulse_id_dataset.read( -// pulse_id_buffer, -// H5::PredType::NATIVE_UINT64); -// -// size_t n_new_pulses = 0; -// for (size_t i=current_file_last_processed+1; i<1000; i++) { -// if (pulse_id_buffer[i] > 0) { -// n_new_pulses++; -// } -// } -// -// // There is more stuff to be processed. -// if (n_new_pulses > 0) { -// // TODO: Just temporary due to buffer size. -// if (n_new_pulses > 100) { -// n_new_pulses = 100; -// } -// -// H5Drefresh(image_dataset.getId()); -// -// uint64_t start_pulse_id = current_file_last_processed+1; -// uint64_t end_pulse_id = -// current_file_last_processed + n_new_pulses; -// -// -// hsize_t buff_dim[3] = {100, 512, 1024}; -// H5::DataSpace buffer_space (3, buff_dim); -// hsize_t b_count[] = {n_new_pulses, 512, 1024}; -// hsize_t b_start[] = {0, 0, 0}; -// buffer_space.selectHyperslab(H5S_SELECT_SET, b_count, b_start); -// -// hsize_t disk_dim[3] = {1000, 512, 1024}; -// H5::DataSpace disk_space(3, disk_dim); -// -// hsize_t d_count[] = {n_new_pulses, 512, 1024}; -// hsize_t d_start[] = {start_pulse_id, 0, 0}; -// disk_space.selectHyperslab(H5S_SELECT_SET, d_count, d_start); -// -// image_dataset.read( -// image_buffer, -// H5::PredType::NATIVE_UINT16, -// buffer_space, -// disk_space); -// -// current_file_last_processed = end_pulse_id; -// -// cout << "Read n_new_pulses=" << n_new_pulses; -// cout << " current_file_last_processed "; -// cout << current_file_last_processed << endl; -// } -// -// // Time for next file. -// if (pulse_id_buffer[999] != 0) { -// break; -// } -// -// // Stream delay. -// this_thread::sleep_for(chrono::milliseconds(100)); -// H5Drefresh(pulse_id_dataset.getId()); -// } - - } -} diff --git a/sf-buffer/test/CMakeLists.txt b/sf-buffer/test/CMakeLists.txt deleted file mode 100644 index 5408b02..0000000 --- a/sf-buffer/test/CMakeLists.txt +++ /dev/null @@ -1,9 +0,0 @@ -add_executable(sf-buffer_tests test_main.cpp) - -target_link_libraries(sf-buffer_tests - core-buffer - gtest - zmq - hdf5 - hdf5_hl - hdf5_cpp) \ No newline at end of file diff --git a/sf-buffer/test/test_main.cpp b/sf-buffer/test/test_main.cpp deleted file mode 100644 index a55e597..0000000 --- a/sf-buffer/test/test_main.cpp +++ /dev/null @@ -1,9 +0,0 @@ -#include "gtest/gtest.h" -#include "test_sf_replay.cpp" - -using namespace std; - -int main(int argc, char **argv) { - ::testing::InitGoogleTest(&argc, argv); - return RUN_ALL_TESTS(); -} diff --git a/sf-buffer/test/test_sf_replay.cpp b/sf-buffer/test/test_sf_replay.cpp deleted file mode 100644 index c9e18e7..0000000 --- a/sf-buffer/test/test_sf_replay.cpp +++ /dev/null @@ -1,9 +0,0 @@ -#include "gtest/gtest.h" - -using namespace std; - - -TEST(sf_replay, non_aligned_start) -{ - // TODO: Write the test!! -} \ No newline at end of file