diff --git a/sf-stream/CMakeLists.txt b/sf-stream/CMakeLists.txt new file mode 100644 index 0000000..216e2dd --- /dev/null +++ b/sf-stream/CMakeLists.txt @@ -0,0 +1,18 @@ +file(GLOB SOURCES + src/*.cpp) + +add_library(sf-stream-lib STATIC ${SOURCES}) +target_include_directories(sf-stream-lib PUBLIC include/) +target_link_libraries(sf-stream-lib + external + core-buffer-lib) + +add_executable(sf-stream src/main.cpp) +set_target_properties(sf-stream PROPERTIES OUTPUT_NAME sf_stream) +target_link_libraries(sf-stream + sf-stream-lib + zmq + pthread) + +enable_testing() +add_subdirectory(test/) \ No newline at end of file diff --git a/sf-stream/include/LiveRecvModule.hpp b/sf-stream/include/LiveRecvModule.hpp new file mode 100644 index 0000000..be0f8e5 --- /dev/null +++ b/sf-stream/include/LiveRecvModule.hpp @@ -0,0 +1,39 @@ +#ifndef SF_DAQ_BUFFER_LIVERECVMODULE_HPP +#define SF_DAQ_BUFFER_LIVERECVMODULE_HPP + +#include "FastQueue.hpp" +#include +#include "jungfrau.hpp" +#include + +class LiveRecvModule { + + FastQueue& queue_; + const size_t n_modules_; + void* ctx_; + const std::string ipc_prefix_; + std::atomic_bool is_receiving_; + std::thread receiving_thread_; + +public: + LiveRecvModule( + FastQueue& queue, + const size_t n_modules, + void* ctx, + const std::string& ipc_prefix); + + virtual ~LiveRecvModule(); + void* connect_socket(size_t module_id); + void recv_single_module(void* socket, ModuleFrame* metadata, char* data); + void receive_thread(); + uint64_t align_modules( + const std::vector& sockets, + ModuleFrameBuffer *metadata, + char *data); + + void stop(); + +}; + + +#endif //SF_DAQ_BUFFER_LIVERECVMODULE_HPP diff --git a/sf-stream/src/LiveRecvModule.cpp b/sf-stream/src/LiveRecvModule.cpp new file mode 100644 index 0000000..d18054b --- /dev/null +++ b/sf-stream/src/LiveRecvModule.cpp @@ -0,0 +1,223 @@ +#include "LiveRecvModule.hpp" +#include "date.h" +#include +#include +#include "zmq.h" +#include "buffer_config.hpp" + +using namespace std; +using namespace core_buffer; + +LiveRecvModule::LiveRecvModule( + FastQueue& queue_, + const size_t n_modules, + void* ctx_, + const string& ipc_prefix) : + queue_(queue_), + n_modules_(n_modules), + ctx_(ctx_), + ipc_prefix_(ipc_prefix), + is_receiving_(true) +{ + receiving_thread_ = thread(&LiveRecvModule::receive_thread, this); +} + +LiveRecvModule::~LiveRecvModule() +{ + stop(); +} + +void LiveRecvModule::stop() +{ + is_receiving_ = false; + receiving_thread_.join(); +} + +void* LiveRecvModule::connect_socket(size_t module_id) +{ + void* sock = zmq_socket(ctx_, ZMQ_SUB); + if (sock == nullptr) { + throw runtime_error(zmq_strerror(errno)); + } + + int rcvhwm = STREAM_RCVHWM; + if (zmq_setsockopt(sock, ZMQ_RCVHWM, &rcvhwm, sizeof(rcvhwm)) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + int linger = 0; + if (zmq_setsockopt(sock, ZMQ_LINGER, &linger, sizeof(linger)) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + stringstream ipc_addr; + ipc_addr << ipc_prefix_ << module_id; + const auto ipc = ipc_addr.str(); + + if (zmq_connect(sock, ipc.c_str()) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + if (zmq_setsockopt(sock, ZMQ_SUBSCRIBE, "", 0) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + return sock; +} + +void LiveRecvModule::recv_single_module( + void* socket, ModuleFrame* metadata, char* data) +{ + auto n_bytes_metadata = zmq_recv( + socket, + metadata, + sizeof(ModuleFrame), + 0); + + if (n_bytes_metadata == -1) { + throw runtime_error(zmq_strerror(errno)); + }else if (n_bytes_metadata != sizeof(ModuleFrame)) { + throw runtime_error("Stream header of wrong size."); + } + + if (metadata->pulse_id == 0) { + throw runtime_error("Received invalid pulse_id=0."); + } + + auto n_bytes_image = zmq_recv( + socket, + data, + MODULE_N_BYTES, + 0); + + if (n_bytes_image == -1) { + throw runtime_error(zmq_strerror(errno)); + } else if (n_bytes_image != MODULE_N_BYTES) { + throw runtime_error("Stream data of wrong size."); + } +} + +uint64_t LiveRecvModule::align_modules( + const vector& sockets, ModuleFrameBuffer *metadata, char *data) +{ + uint64_t max_pulse_id = 0; + + // First pass - determine current max_pulse_id. + for (size_t i_module = 0; i_module < n_modules_; i_module++) { + auto& module_metadata = metadata->module[i_module]; + max_pulse_id = max(max_pulse_id, module_metadata.pulse_id); + } + + // Second pass - align all receivers to max_pulse_id. + for (size_t i_module = 0; i_module < n_modules_; i_module++) { + auto& module_metadata = metadata->module[i_module]; + + size_t diff_to_max = max_pulse_id - module_metadata.pulse_id; + for (size_t i = 0; i < diff_to_max; i++) { + recv_single_module( + sockets[i_module], + &module_metadata, + data + (MODULE_N_BYTES * i_module)); + } + + if (module_metadata.pulse_id != max_pulse_id) { + throw runtime_error("Cannot align pulse_ids."); + } + } + + return max_pulse_id; +} + +void LiveRecvModule::receive_thread() +{ + try { + + vector sockets(n_modules_); + + for (size_t i = 0; i < n_modules_; i++) { + sockets[i] = connect_socket(i); + } + + auto slot_id = queue_.reserve(); + if (slot_id == -1) throw runtime_error("This cannot really happen"); + + auto metadata = queue_.get_metadata_buffer(slot_id); + auto data = queue_.get_data_buffer(slot_id); + + // First buffer load for alignment. + for (size_t i_module = 0; i_module < n_modules_; i_module++) { + auto& module_metadata = metadata->module[i_module]; + + recv_single_module( + sockets[i_module], + &module_metadata, + data + (MODULE_N_BYTES * i_module)); + } + + auto current_pulse_id = align_modules(sockets, metadata, data); + + queue_.commit(); + current_pulse_id++; + + while(is_receiving_.load(memory_order_relaxed)) { + auto slot_id = queue_.reserve(); + + if (slot_id == -1){ + this_thread::sleep_for(chrono::milliseconds(5)); + continue; + } + + metadata = queue_.get_metadata_buffer(slot_id); + data = queue_.get_data_buffer(slot_id); + + bool sync_needed = false; + for (size_t i_module = 0; i_module < n_modules_; i_module++) { + auto& module_metadata = metadata->module[i_module]; + + recv_single_module( + sockets[i_module], + &module_metadata, + data + (MODULE_N_BYTES * i_module)); + + if (module_metadata.pulse_id != current_pulse_id) { + sync_needed = true; + } + } + + if (sync_needed) { + auto start_time = chrono::steady_clock::now(); + + auto new_pulse_id = align_modules(sockets, metadata, data); + auto lost_pulses = new_pulse_id - current_pulse_id; + current_pulse_id = new_pulse_id; + + auto end_time = chrono::steady_clock::now(); + auto us_duration = chrono::duration_cast( + end_time-start_time).count(); + + cout << "sf_stream:sync_lost_pulses " << lost_pulses; + cout << " sf_stream::sync_us " << us_duration; + cout << endl; + } + + queue_.commit(); + current_pulse_id++; + } + + for (size_t i = 0; i < n_modules_; i++) { + zmq_close(sockets[i]); + } + + } catch (const std::exception& e) { + is_receiving_ = false; + + using namespace date; + using namespace chrono; + + cout << "[" << system_clock::now() << "]"; + cout << "[LiveRecvModule::receive_thread]"; + cout << " Stopped because of exception: " << endl; + cout << e.what() << endl; + + throw; + } +} \ No newline at end of file diff --git a/sf-stream/src/main.cpp b/sf-stream/src/main.cpp new file mode 100644 index 0000000..2c236ef --- /dev/null +++ b/sf-stream/src/main.cpp @@ -0,0 +1,220 @@ +#include +#include +#include "buffer_config.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include "date.h" +#include + +using namespace std; +using namespace core_buffer; + +int main (int argc, char *argv[]) +{ + if (argc != 5) { + cout << endl; + cout << "Usage: sf_stream "; + cout << " [streamvis_address] [reduction_factor_streamvis]"; + cout << " [live_analysis_address] [reduction_factor_live_analysis]"; + cout << endl; + cout << "\tstreamvis_address: address to streamvis, example tcp://129.129.241.42:9007" << endl; + cout << "\treduction_factor_streamvis: 1 out of N (example 10) images to send to streamvis. For remaining send metadata." << endl; + cout << "\tlive_analysis_address: address to live_analysis, example tcp://129.129.241.42:9107" << endl; + cout << "\treduction_factor_live_analysis: 1 out of N (example 10) images to send to live analysis. For remaining send metadata. N<=1 - send every image" << endl; + cout << endl; + + exit(-1); + } + + string streamvis_address = string(argv[1]); + int reduction_factor_streamvis = (int) atoll(argv[2]); + string live_analysis_address = string(argv[3]); + int reduction_factor_live_analysis = (uint64_t) atoll(argv[4]); + + size_t n_modules = 32; + FastQueue queue( + n_modules * MODULE_N_BYTES, + STREAM_FASTQUEUE_SLOTS); + + auto ctx = zmq_ctx_new(); + zmq_ctx_set (ctx, ZMQ_IO_THREADS, STREAM_ZMQ_IO_THREADS); + + LiveRecvModule recv_module(queue, n_modules, ctx, BUFFER_LIVE_IPC_URL); + + // 0mq sockets to streamvis and live analysis + void *socket_streamvis = zmq_socket(ctx, ZMQ_PUB); + if (zmq_bind(socket_streamvis, streamvis_address.c_str()) != 0) { + throw runtime_error(strerror(errno)); + } + void *socket_live = zmq_socket(ctx, ZMQ_PUB); + if (zmq_bind(socket_live, live_analysis_address.c_str()) != 0) { + throw runtime_error(strerror(errno)); + } + + uint16_t data_empty [] = { 0, 0, 0, 0}; + Json::Value header; + Json::StreamWriterBuilder builder; + + // TODO: Remove stats trash. + int stats_counter = 0; + + size_t read_total_us = 0; + size_t read_max_us = 0; + + while (true) { + + auto start_time = chrono::steady_clock::now(); + + auto slot_id = queue.read(); + + if(slot_id == -1) { + this_thread::sleep_for(chrono::milliseconds( + core_buffer::RB_READ_RETRY_INTERVAL_MS)); + continue; + } + + auto metadata = queue.get_metadata_buffer(slot_id); + auto data = queue.get_data_buffer(slot_id); + + auto read_end_time = chrono::steady_clock::now(); + auto read_us_duration = chrono::duration_cast( + read_end_time-start_time).count(); + + uint64_t pulse_id = 0; + uint64_t frame_index = 0; + uint64_t daq_rec = 0; + bool is_good_frame = true; + + for (size_t i_module = 0; i_module < n_modules; i_module++) { + // TODO: Place this tests in the appropriate spot. + auto& module_metadata = metadata->module[i_module]; + if (i_module == 0) { + pulse_id = module_metadata.pulse_id; + frame_index = module_metadata.frame_index; + daq_rec = module_metadata.daq_rec; + + if ( module_metadata.n_received_packets != 128 ) is_good_frame = false; + } else { + if (module_metadata.pulse_id != pulse_id) is_good_frame = false; + + if (module_metadata.frame_index != frame_index) is_good_frame = false; + + if (module_metadata.daq_rec != daq_rec) is_good_frame = false; + + if (module_metadata.n_received_packets != 128 ) is_good_frame = false; + } + } + + //Here we need to send to streamvis and live analysis metadata(probably need to operate still on them) and data(not every frame) + + header["frame"] = (Json::Value::UInt64)frame_index; + header["is_good_frame"] = is_good_frame; + header["daq_rec"] = (Json::Value::UInt64)daq_rec; + header["pulse_id"] = (Json::Value::UInt64)pulse_id; + + //this needs to be re-read from external source + header["pedestal_file"] = "/sf/bernina/data/p17534/res/JF_pedestals/pedestal_20200423_1018.JF07T32V01.res.h5"; + header["gain_file"] = "/sf/bernina/config/jungfrau/gainMaps/JF07T32V01/gains.h5"; + + header["number_frames_expected"] = 10000; + header["run_name"] = to_string(uint64_t(pulse_id/10000)*10000); + + // detector name should come as parameter to sf_stream + header["detector_name"] = "JF07T32V01"; + + header["htype"] = "array-1.0"; + header["type"] = "uint16"; + + int send_streamvis = 0; + if ( reduction_factor_streamvis > 1 ) { + send_streamvis = rand() % reduction_factor_streamvis; + } + if ( send_streamvis == 0 ) { + header["shape"][0] = 16384; + header["shape"][1] = 1024; + } else{ + header["shape"][0] = 2; + header["shape"][1] = 2; + } + + string text_header = Json::writeString(builder, header); + + zmq_send(socket_streamvis, + text_header.c_str(), + text_header.size(), + ZMQ_SNDMORE); + + if ( send_streamvis == 0 ) { + zmq_send(socket_streamvis, + (char*)data, + core_buffer::MODULE_N_BYTES*n_modules, + 0); + } else { + zmq_send(socket_streamvis, + (char*)data_empty, + 8, + 0); + } + + //same for live analysis + int send_live_analysis = 0; + if ( reduction_factor_live_analysis > 1 ) { + send_live_analysis = rand() % reduction_factor_live_analysis; + } + if ( send_live_analysis == 0 ) { + header["shape"][0] = 16384; + header["shape"][1] = 1024; + } else{ + header["shape"][0] = 2; + header["shape"][1] = 2; + } + + text_header = Json::writeString(builder, header); + + zmq_send(socket_live, + text_header.c_str(), + text_header.size(), + ZMQ_SNDMORE); + + if ( send_live_analysis == 0 ) { + zmq_send(socket_live, + (char*)data, + core_buffer::MODULE_N_BYTES*n_modules, + 0); + } else { + zmq_send(socket_live, + (char*)data_empty, + 8, + 0); + } + + queue.release(); + + // TODO: Some poor statistics. + stats_counter++; + read_total_us += read_us_duration; + + if (read_us_duration > read_max_us) { + read_max_us = read_us_duration; + } + + if (stats_counter == STATS_MODULO) { + cout << "sf_stream:read_us " << read_total_us / STATS_MODULO; + cout << " sf_stream:read_max_us " << read_max_us; + cout << endl; + + stats_counter = 0; + read_total_us = 0; + read_max_us = 0; + } + } + + return 0; +} diff --git a/sf-stream/test/CMakeLists.txt b/sf-stream/test/CMakeLists.txt new file mode 100644 index 0000000..45c6f8d --- /dev/null +++ b/sf-stream/test/CMakeLists.txt @@ -0,0 +1,7 @@ +add_executable(sf-stream-tests main.cpp) + +target_link_libraries(sf-stream-tests + sf-stream-lib + gtest + ) + diff --git a/sf-stream/test/main.cpp b/sf-stream/test/main.cpp new file mode 100644 index 0000000..e1b1b18 --- /dev/null +++ b/sf-stream/test/main.cpp @@ -0,0 +1,9 @@ +#include "gtest/gtest.h" +#include "test_LiveRecvModule.cpp" + +using namespace std; + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/sf-stream/test/test_LiveRecvModule.cpp b/sf-stream/test/test_LiveRecvModule.cpp new file mode 100644 index 0000000..31c35d4 --- /dev/null +++ b/sf-stream/test/test_LiveRecvModule.cpp @@ -0,0 +1,87 @@ +#include +#include "LiveRecvModule.hpp" +#include "gtest/gtest.h" +#include "buffer_config.hpp" +#include + +using namespace std; +using namespace core_buffer; + +TEST(LiveRecvModule, transfer_test) { + // TODO: Make this test work again. +// auto ctx = zmq_ctx_new(); +// +// size_t n_modules = 32; +// size_t n_slots = 5; +// FastQueue queue(MODULE_N_BYTES * n_modules, n_slots); +// +// void *sockets[n_modules]; +// for (size_t i = 0; i < n_modules; i++) { +// sockets[i] = zmq_socket(ctx, ZMQ_PUB); +// +// int linger = 0; +// if (zmq_setsockopt(sockets[i], ZMQ_LINGER, &linger, +// sizeof(linger)) != 0) { +// throw runtime_error(zmq_strerror(errno)); +// } +// +// stringstream ipc_addr; +// ipc_addr << BUFFER_LIVE_IPC_URL << i; +// const auto ipc = ipc_addr.str(); +// +// if (zmq_bind(sockets[i], ipc.c_str()) != 0) { +// throw runtime_error(zmq_strerror(errno)); +// } +// } +// +// LiveRecvModule recv_module(queue, n_modules, ctx, BUFFER_LIVE_IPC_URL); +// +// // Nothing should be committed, queue, should be empty. +// ASSERT_EQ(queue.read(), -1); +// +// ModuleFrame metadata; +// auto data = make_unique(MODULE_N_BYTES); +// +// for (size_t i = 0; i < n_modules; i++) { +// metadata.pulse_id = 1; +// metadata.frame_index = 2; +// metadata.daq_rec = 3; +// metadata.n_received_packets = 4; +// metadata.module_id = i; +// +// zmq_send(sockets[i], &metadata, sizeof(ModuleFrame), ZMQ_SNDMORE); +// zmq_send(sockets[i], data.get(), MODULE_N_BYTES, 0); +// } +// +// this_thread::sleep_for(chrono::milliseconds(100)); +// +// auto slot_id = queue.read(); +// // We should have the first Detector frame in the buffer. +// //ASSERT_NE(slot_id, -1); +// +// auto recv_stopped = async(launch::async, [&](){ +// recv_module.stop(); +// }); +// +// this_thread::sleep_for(chrono::milliseconds(100)); +// +// for (size_t i = 0; i < n_modules; i++) { +// metadata.pulse_id = 1; +// metadata.frame_index = 2; +// metadata.daq_rec = 3; +// metadata.n_received_packets = 4; +// metadata.module_id = i; +// +// zmq_send(sockets[i], &metadata, sizeof(ModuleFrame), ZMQ_SNDMORE); +// zmq_send(sockets[i], data.get(), MODULE_N_BYTES, 0); +// } +// +// recv_stopped.wait(); +// +// for (size_t i = 0; i < n_modules; i++) { +// zmq_close(sockets[i]); +// } +// +// zmq_ctx_destroy(ctx); +// cout << "We are finished" << endl; +} \ No newline at end of file