From 64236ad22eeb8160d494adc93bfc597b0747917b Mon Sep 17 00:00:00 2001 From: Mohacsi Istvan Date: Mon, 28 Jun 2021 15:52:15 +0200 Subject: [PATCH] Use standard shared mutexes --- jfj-combined/include/JfjFrameCache.hpp | 68 ++++++++++------------ jfj-combined/include/ZmqImagePublisher.hpp | 30 +++++++--- jfj-combined/src/ZmqImagePublisher.cpp | 61 ------------------- jfj-combined/src/main.cpp | 26 +++++---- 4 files changed, 67 insertions(+), 118 deletions(-) delete mode 100644 jfj-combined/src/ZmqImagePublisher.cpp diff --git a/jfj-combined/include/JfjFrameCache.hpp b/jfj-combined/include/JfjFrameCache.hpp index af9d02f..8c34279 100644 --- a/jfj-combined/include/JfjFrameCache.hpp +++ b/jfj-combined/include/JfjFrameCache.hpp @@ -6,10 +6,12 @@ #include #include #include +#include #include #include #include #include +#include #include "../../core-buffer/include/formats.hpp" @@ -25,8 +27,13 @@ class FrameCache{ public: FrameCache(uint64_t _C, uint64_t N_MOD, std::function callback): m_CAP(_C), m_M(N_MOD), - m_buffer(_C, ImageBinaryFormat(512*9, 1024, sizeof(uint16_t))), - f_send(callback), m_vlock(_C), m_valid(_C), m_fill(_C) { + m_buffer(_C, ImageBinaryFormat(512*N_MOD, 1024, sizeof(uint16_t))), + f_send(callback), m_lock(_C), m_valid(_C) { + // Initialize buffer metadata + for(auto& it: m_buffer){ memset(&it.meta, 0, sizeof(it.meta)); } + + // Initialize Mutexes + for(auto& it: m_valid){ it = 0; } }; @@ -36,40 +43,25 @@ public: This simultaneously handles buffering and assembly. **/ void emplace(uint64_t pulseID, uint64_t moduleIDX, BufferBinaryFormat& ref_frame){ uint64_t idx = pulseID % m_CAP; - std::cout << " Emplace: " << idx << std::endl; - - - // Wait for unlocking block - // while(m_vlock[idx]){ std::this_thread::yield(); } - - // Invalid cache line: Just start a new line - //if(m_valid[idx]){ start_line(idx, ref_frame.meta); } - + // A new frame is starting - std::cout << " Pulse_ids: " << ref_frame.meta.pulse_id << "\t" << m_buffer[idx].meta.pulse_id << std::endl; + std::cout << " Pulse_ids: " << ref_frame.meta.pulse_id << " (new)\t" << m_buffer[idx].meta.pulse_id << " (old)" << std::endl; if(ref_frame.meta.pulse_id != m_buffer[idx].meta.pulse_id){ - std::cout << "NOT EQUAL" << std::endl; - flush_line(idx); start_line(idx, ref_frame.meta); - } - - std::cout << " fill/cpy" << std::endl; - m_fill[idx]++; - char* ptr_dest = m_buffer[idx].data.data() + moduleIDX * m_blocksize; - //char* ptr_dest = m_buffer[idx].data; - std::cout << " Root: " << (void*)m_buffer[idx].data.data() << "\tTarget:" << (void*)ptr_dest - << "\tblocksize: " << m_blocksize << "\tcontainer: " << m_buffer[idx].data.size() << std::endl; + } + + // Shared lock for concurrent PUT operations + std::shared_lock guard(m_lock[idx]); + // Copy metadata (manually for now...) m_buffer[idx].meta.pulse_id = ref_frame.meta.pulse_id; m_buffer[idx].meta.frame_index = ref_frame.meta.frame_index; m_buffer[idx].meta.daq_rec = ref_frame.meta.daq_rec; - std::cout << "NI " << std::endl; - std::memcpy((void*)ptr_dest, (void*)&ref_frame.data, m_blocksize); - // std::memcpy((void*)&ptr_dest[moduleIDX * m_blocksize], (void*)&ref_frame.data, m_blocksize); - std::cout << " Fill ctr: " << m_fill[idx] << std::endl; - + // Calculate destination pointer (easier to debug) + char* ptr_dest = m_buffer[idx].data.data() + moduleIDX * m_blocksize; + std::memcpy((void*)ptr_dest, (void*)&ref_frame.data, m_blocksize); } void flush_all(){ @@ -78,26 +70,29 @@ public: } } + // Flush and invalidate a line (incl. lock) void flush_line(uint64_t idx){ + std::unique_lock guard(m_lock[idx]); if(m_valid[idx]){ - std::cout << "Flushing line: " << idx << std::endl; - m_vlock[idx] = 1; f_send(m_buffer[idx]); m_valid[idx] = 0; - m_fill[idx] = 0; - m_vlock[idx] = 0; } } + // Flush and start a new line (incl. lock) void start_line(uint64_t idx, ModuleFrame& ref_meta){ - m_vlock[idx] = 1; + // 0. Guard + std::unique_lock guard(m_lock[idx]); + // 1. Flush + if(m_valid[idx]){ + f_send(m_buffer[idx]); + } + // 2. Init m_buffer[idx].meta.pulse_id = ref_meta.pulse_id; m_buffer[idx].meta.frame_index = ref_meta.frame_index; m_buffer[idx].meta.daq_rec = ref_meta.daq_rec; m_buffer[idx].meta.is_good_image = true; - m_valid[idx].exchange(1); - m_fill[idx] = 0; - m_vlock[idx] = 0; + m_valid[idx] = 1; } private: @@ -107,9 +102,8 @@ private: std::function f_send; /** Main container and mutex guard **/ - std::vector> m_vlock; + std::vector m_lock; std::vector> m_valid; - std::vector> m_fill; std::vector m_buffer; }; diff --git a/jfj-combined/include/ZmqImagePublisher.hpp b/jfj-combined/include/ZmqImagePublisher.hpp index 12cdba9..c5061d7 100644 --- a/jfj-combined/include/ZmqImagePublisher.hpp +++ b/jfj-combined/include/ZmqImagePublisher.hpp @@ -6,13 +6,22 @@ #include "../../core-buffer/include/formats.hpp" -#define ASSERT_FALSE(expr, msg) +#define ASSERT_FALSE(expr, msg) \ if(bool(expr)){ \ std::string text = "ASSERTION called at " + std::string(__FILE__) + " line " + std::to_string(__LINE__) + "\n"; \ - text = text + "Message:" + msg + "\nErrno: " + std::to_sting(errno); \ + text = text + "Reason: " + std::to_string(expr) + "\n"; \ + text = text + "Message:" + msg + "\nErrno: " + std::to_string(errno); \ throw std::runtime_error(text); \ } \ +#define ASSERT_TRUE(expr, msg) \ + if(!bool(expr)){ \ + std::string text = "ASSERTION called at " + std::string(__FILE__) + " line " + std::to_string(__LINE__) + "\n"; \ + text = text + "Reason: " + std::to_string(expr) + "\n"; \ + text = text + "Message:" + msg + "\nErrno: " + std::to_string(errno); \ + throw std::runtime_error(text); \ + } + /** ZMQ Publisher Lightweight wrapper base class to initialize a ZMQ Publisher. @@ -29,11 +38,10 @@ class ZmqPublisher { std::mutex g_zmq_socket; public: - ZmqPublisher(const uint16_t port): + ZmqPublisher(uint16_t port) : m_port(port), m_address("tcp://*:" + std::to_string(port)), m_ctx(1), m_socket(m_ctx, ZMQ_PUB) { // Bind the socket - auto err = m_socket.bind(m_address.c_str(); - ASSERT_FALSE( err, "Failed to bind ZMQ socket" ) + m_socket.bind(m_address.c_str()); std::cout << "Initialized ZMQ publisher at " << m_address << std::endl; }; @@ -48,12 +56,16 @@ class ZmqPublisher { **/ class ZmqImagePublisher: public ZmqPublisher { public: + ZmqImagePublisher(uint16_t port) : ZmqPublisher(port) {}; + void sendImage(ImageBinaryFormat& image){ std::lock_guard guard(g_zmq_socket); - int err = 0; - err |= m_socket.send(&image.meta, sizeof(image.meta), ZMQ_SNDMORE); - err |= m_socket.send(image.data, image.size, 0); - ASSERT_FALSE( err, "Failed to send image data" ) + int len; + len = m_socket.send(&image.meta, sizeof(image.meta), ZMQ_SNDMORE); + ASSERT_TRUE( len >=0, "Failed to send image data" ) + len = m_socket.send(image.data.data(), image.data.size(), 0); + ASSERT_TRUE( len >=0, "Failed to send image data" ) + std::cout << "Sent ZMQ stream of pulse: " << image.meta.pulse_id << std::endl; } }; diff --git a/jfj-combined/src/ZmqImagePublisher.cpp b/jfj-combined/src/ZmqImagePublisher.cpp deleted file mode 100644 index 84de80b..0000000 --- a/jfj-combined/src/ZmqImagePublisher.cpp +++ /dev/null @@ -1,61 +0,0 @@ -#include -#include - -using namespace std; -using namespace buffer_config; - -PacketUdpReceiver::PacketUdpReceiver() : socket_fd_(-1) { } - -PacketUdpReceiver::~PacketUdpReceiver() { - disconnect(); -} - -void PacketUdpReceiver::bind(const uint16_t port){ - if (socket_fd_ > -1) { - throw runtime_error("Socket already bound."); - } - - socket_fd_ = socket(AF_INET, SOCK_DGRAM, 0); - if (socket_fd_ < 0) { - throw runtime_error("Cannot open socket."); - } - - sockaddr_in server_address = {0}; - server_address.sin_family = AF_INET; - server_address.sin_addr.s_addr = INADDR_ANY; - server_address.sin_port = htons(port); - - timeval udp_socket_timeout; - udp_socket_timeout.tv_sec = 0; - udp_socket_timeout.tv_usec = BUFFER_UDP_US_TIMEOUT; - - if (setsockopt(socket_fd_, SOL_SOCKET, SO_RCVTIMEO, &udp_socket_timeout, sizeof(timeval)) == -1) { - throw runtime_error("Cannot set SO_RCVTIMEO. " + string(strerror(errno))); - } - - if (setsockopt(socket_fd_, SOL_SOCKET, SO_RCVBUF, &BUFFER_UDP_RCVBUF_BYTES, sizeof(int)) == -1) { - throw runtime_error("Cannot set SO_RCVBUF. " + string(strerror(errno))); - }; - //TODO: try to set SO_RCVLOWAT - - auto bind_result = ::bind(socket_fd_, reinterpret_cast(&server_address), sizeof(server_address)); - - if (bind_result < 0) { - throw runtime_error("Cannot bind socket."); - } -} - -int PacketUdpReceiver::receive_many(mmsghdr* msgs, const size_t n_msgs){ - return recvmmsg(socket_fd_, msgs, n_msgs, 0, 0); -} - -bool PacketUdpReceiver::receive(void* buffer, const size_t buffer_n_bytes){ - auto data_len = recv(socket_fd_, buffer, buffer_n_bytes, 0); - - return (data_len == buffer_n_bytes) ? true : false; -} - -void PacketUdpReceiver::disconnect(){ - close(socket_fd_); - socket_fd_ = -1; -} diff --git a/jfj-combined/src/main.cpp b/jfj-combined/src/main.cpp index ee0393a..8d61a70 100644 --- a/jfj-combined/src/main.cpp +++ b/jfj-combined/src/main.cpp @@ -5,32 +5,36 @@ #include "../../core-buffer/include/formats.hpp" #include "../include/JfjFrameCache.hpp" #include "../include/JfjFrameWorker.hpp" - - -void dummy_sender(ImageBinaryFormat& image){ - std::cout << "Sending " << image.meta.frame_index << std::endl; -} - +#include "../include/ZmqImagePublisher.hpp" int main (int argc, char *argv[]) { + + std::cout << "Creating ZMQ socket..." << std::endl; + ZmqImagePublisher pub(5558); + std::function zmq_publish = + std::bind(&ZmqImagePublisher::sendImage, &pub, std::placeholders::_1); + + std::cout << "Creating frame cache..." << std::endl; - FrameCache cache(32, 3, &dummy_sender); + FrameCache cache(32, 3, zmq_publish); std::function push_cb = std::bind(&FrameCache::emplace, &cache, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); std::cout << "Creating workers..." << std::endl; - JfjFrameWorker W0(5005, 1, push_cb); - //JfjFrameWorker W1(5005, 0, push_cb); + JfjFrameWorker W0(5005, 0, push_cb); + JfjFrameWorker W1(5006, 2, push_cb); // JfjFrameWorker W2(5007, 2, push_cb); + + std::thread T0(&JfjFrameWorker::run, &W0); - //std::thread T1(&JfjFrameWorker::run, &W1); + std::thread T1(&JfjFrameWorker::run, &W1); T0.join(); - //T1.join(); + T1.join(); std::cout << "Exiting program..." << std::endl; return 0; }