Use standard shared mutexes

This commit is contained in:
Mohacsi Istvan
2021-06-28 15:52:15 +02:00
parent a1573d354f
commit 64236ad22e
4 changed files with 67 additions and 118 deletions
+31 -37
View File
@@ -6,10 +6,12 @@
#include <stdexcept>
#include <iostream>
#include <mutex>
#include <shared_mutex>
#include <vector>
#include <atomic>
#include <functional>
#include <thread>
#include <mutex>
#include "../../core-buffer/include/formats.hpp"
@@ -25,8 +27,13 @@ class FrameCache{
public:
FrameCache(uint64_t _C, uint64_t N_MOD, std::function<void(ImageBinaryFormat&)> callback):
m_CAP(_C), m_M(N_MOD),
m_buffer(_C, ImageBinaryFormat(512*9, 1024, sizeof(uint16_t))),
f_send(callback), m_vlock(_C), m_valid(_C), m_fill(_C) {
m_buffer(_C, ImageBinaryFormat(512*N_MOD, 1024, sizeof(uint16_t))),
f_send(callback), m_lock(_C), m_valid(_C) {
// Initialize buffer metadata
for(auto& it: m_buffer){ memset(&it.meta, 0, sizeof(it.meta)); }
// Initialize Mutexes
for(auto& it: m_valid){ it = 0; }
};
@@ -36,40 +43,25 @@ public:
This simultaneously handles buffering and assembly. **/
void emplace(uint64_t pulseID, uint64_t moduleIDX, BufferBinaryFormat& ref_frame){
uint64_t idx = pulseID % m_CAP;
std::cout << " Emplace: " << idx << std::endl;
// Wait for unlocking block
// while(m_vlock[idx]){ std::this_thread::yield(); }
// Invalid cache line: Just start a new line
//if(m_valid[idx]){ start_line(idx, ref_frame.meta); }
// A new frame is starting
std::cout << " Pulse_ids: " << ref_frame.meta.pulse_id << "\t" << m_buffer[idx].meta.pulse_id << std::endl;
std::cout << " Pulse_ids: " << ref_frame.meta.pulse_id << " (new)\t" << m_buffer[idx].meta.pulse_id << " (old)" << std::endl;
if(ref_frame.meta.pulse_id != m_buffer[idx].meta.pulse_id){
std::cout << "NOT EQUAL" << std::endl;
flush_line(idx);
start_line(idx, ref_frame.meta);
}
std::cout << " fill/cpy" << std::endl;
m_fill[idx]++;
char* ptr_dest = m_buffer[idx].data.data() + moduleIDX * m_blocksize;
//char* ptr_dest = m_buffer[idx].data;
std::cout << " Root: " << (void*)m_buffer[idx].data.data() << "\tTarget:" << (void*)ptr_dest
<< "\tblocksize: " << m_blocksize << "\tcontainer: " << m_buffer[idx].data.size() << std::endl;
}
// Shared lock for concurrent PUT operations
std::shared_lock<std::shared_mutex> guard(m_lock[idx]);
// Copy metadata (manually for now...)
m_buffer[idx].meta.pulse_id = ref_frame.meta.pulse_id;
m_buffer[idx].meta.frame_index = ref_frame.meta.frame_index;
m_buffer[idx].meta.daq_rec = ref_frame.meta.daq_rec;
std::cout << "NI " << std::endl;
std::memcpy((void*)ptr_dest, (void*)&ref_frame.data, m_blocksize);
// std::memcpy((void*)&ptr_dest[moduleIDX * m_blocksize], (void*)&ref_frame.data, m_blocksize);
std::cout << " Fill ctr: " << m_fill[idx] << std::endl;
// Calculate destination pointer (easier to debug)
char* ptr_dest = m_buffer[idx].data.data() + moduleIDX * m_blocksize;
std::memcpy((void*)ptr_dest, (void*)&ref_frame.data, m_blocksize);
}
void flush_all(){
@@ -78,26 +70,29 @@ public:
}
}
// Flush and invalidate a line (incl. lock)
void flush_line(uint64_t idx){
std::unique_lock<std::shared_mutex> guard(m_lock[idx]);
if(m_valid[idx]){
std::cout << "Flushing line: " << idx << std::endl;
m_vlock[idx] = 1;
f_send(m_buffer[idx]);
m_valid[idx] = 0;
m_fill[idx] = 0;
m_vlock[idx] = 0;
}
}
// Flush and start a new line (incl. lock)
void start_line(uint64_t idx, ModuleFrame& ref_meta){
m_vlock[idx] = 1;
// 0. Guard
std::unique_lock<std::shared_mutex> guard(m_lock[idx]);
// 1. Flush
if(m_valid[idx]){
f_send(m_buffer[idx]);
}
// 2. Init
m_buffer[idx].meta.pulse_id = ref_meta.pulse_id;
m_buffer[idx].meta.frame_index = ref_meta.frame_index;
m_buffer[idx].meta.daq_rec = ref_meta.daq_rec;
m_buffer[idx].meta.is_good_image = true;
m_valid[idx].exchange(1);
m_fill[idx] = 0;
m_vlock[idx] = 0;
m_valid[idx] = 1;
}
private:
@@ -107,9 +102,8 @@ private:
std::function<void(ImageBinaryFormat&)> f_send;
/** Main container and mutex guard **/
std::vector<std::atomic<uint32_t>> m_vlock;
std::vector<std::shared_mutex> m_lock;
std::vector<std::atomic<uint32_t>> m_valid;
std::vector<std::atomic<uint32_t>> m_fill;
std::vector<ImageBinaryFormat> m_buffer;
};
+21 -9
View File
@@ -6,13 +6,22 @@
#include "../../core-buffer/include/formats.hpp"
#define ASSERT_FALSE(expr, msg)
#define ASSERT_FALSE(expr, msg) \
if(bool(expr)){ \
std::string text = "ASSERTION called at " + std::string(__FILE__) + " line " + std::to_string(__LINE__) + "\n"; \
text = text + "Message:" + msg + "\nErrno: " + std::to_sting(errno); \
text = text + "Reason: " + std::to_string(expr) + "\n"; \
text = text + "Message:" + msg + "\nErrno: " + std::to_string(errno); \
throw std::runtime_error(text); \
} \
#define ASSERT_TRUE(expr, msg) \
if(!bool(expr)){ \
std::string text = "ASSERTION called at " + std::string(__FILE__) + " line " + std::to_string(__LINE__) + "\n"; \
text = text + "Reason: " + std::to_string(expr) + "\n"; \
text = text + "Message:" + msg + "\nErrno: " + std::to_string(errno); \
throw std::runtime_error(text); \
}
/** ZMQ Publisher
Lightweight wrapper base class to initialize a ZMQ Publisher.
@@ -29,11 +38,10 @@ class ZmqPublisher {
std::mutex g_zmq_socket;
public:
ZmqPublisher(const uint16_t port):
ZmqPublisher(uint16_t port) :
m_port(port), m_address("tcp://*:" + std::to_string(port)), m_ctx(1), m_socket(m_ctx, ZMQ_PUB) {
// Bind the socket
auto err = m_socket.bind(m_address.c_str();
ASSERT_FALSE( err, "Failed to bind ZMQ socket" )
m_socket.bind(m_address.c_str());
std::cout << "Initialized ZMQ publisher at " << m_address << std::endl;
};
@@ -48,12 +56,16 @@ class ZmqPublisher {
**/
class ZmqImagePublisher: public ZmqPublisher {
public:
ZmqImagePublisher(uint16_t port) : ZmqPublisher(port) {};
void sendImage(ImageBinaryFormat& image){
std::lock_guard<std::mutex> guard(g_zmq_socket);
int err = 0;
err |= m_socket.send(&image.meta, sizeof(image.meta), ZMQ_SNDMORE);
err |= m_socket.send(image.data, image.size, 0);
ASSERT_FALSE( err, "Failed to send image data" )
int len;
len = m_socket.send(&image.meta, sizeof(image.meta), ZMQ_SNDMORE);
ASSERT_TRUE( len >=0, "Failed to send image data" )
len = m_socket.send(image.data.data(), image.data.size(), 0);
ASSERT_TRUE( len >=0, "Failed to send image data" )
std::cout << "Sent ZMQ stream of pulse: " << image.meta.pulse_id << std::endl;
}
};
-61
View File
@@ -1,61 +0,0 @@
#include <iostream>
#include <zmq.hpp>
using namespace std;
using namespace buffer_config;
PacketUdpReceiver::PacketUdpReceiver() : socket_fd_(-1) { }
PacketUdpReceiver::~PacketUdpReceiver() {
disconnect();
}
void PacketUdpReceiver::bind(const uint16_t port){
if (socket_fd_ > -1) {
throw runtime_error("Socket already bound.");
}
socket_fd_ = socket(AF_INET, SOCK_DGRAM, 0);
if (socket_fd_ < 0) {
throw runtime_error("Cannot open socket.");
}
sockaddr_in server_address = {0};
server_address.sin_family = AF_INET;
server_address.sin_addr.s_addr = INADDR_ANY;
server_address.sin_port = htons(port);
timeval udp_socket_timeout;
udp_socket_timeout.tv_sec = 0;
udp_socket_timeout.tv_usec = BUFFER_UDP_US_TIMEOUT;
if (setsockopt(socket_fd_, SOL_SOCKET, SO_RCVTIMEO, &udp_socket_timeout, sizeof(timeval)) == -1) {
throw runtime_error("Cannot set SO_RCVTIMEO. " + string(strerror(errno)));
}
if (setsockopt(socket_fd_, SOL_SOCKET, SO_RCVBUF, &BUFFER_UDP_RCVBUF_BYTES, sizeof(int)) == -1) {
throw runtime_error("Cannot set SO_RCVBUF. " + string(strerror(errno)));
};
//TODO: try to set SO_RCVLOWAT
auto bind_result = ::bind(socket_fd_, reinterpret_cast<const sockaddr *>(&server_address), sizeof(server_address));
if (bind_result < 0) {
throw runtime_error("Cannot bind socket.");
}
}
int PacketUdpReceiver::receive_many(mmsghdr* msgs, const size_t n_msgs){
return recvmmsg(socket_fd_, msgs, n_msgs, 0, 0);
}
bool PacketUdpReceiver::receive(void* buffer, const size_t buffer_n_bytes){
auto data_len = recv(socket_fd_, buffer, buffer_n_bytes, 0);
return (data_len == buffer_n_bytes) ? true : false;
}
void PacketUdpReceiver::disconnect(){
close(socket_fd_);
socket_fd_ = -1;
}
+15 -11
View File
@@ -5,32 +5,36 @@
#include "../../core-buffer/include/formats.hpp"
#include "../include/JfjFrameCache.hpp"
#include "../include/JfjFrameWorker.hpp"
void dummy_sender(ImageBinaryFormat& image){
std::cout << "Sending " << image.meta.frame_index << std::endl;
}
#include "../include/ZmqImagePublisher.hpp"
int main (int argc, char *argv[]) {
std::cout << "Creating ZMQ socket..." << std::endl;
ZmqImagePublisher pub(5558);
std::function<void(ImageBinaryFormat&)> zmq_publish =
std::bind(&ZmqImagePublisher::sendImage, &pub, std::placeholders::_1);
std::cout << "Creating frame cache..." << std::endl;
FrameCache cache(32, 3, &dummy_sender);
FrameCache cache(32, 3, zmq_publish);
std::function<void(uint64_t, uint64_t, BufferBinaryFormat&)> push_cb =
std::bind(&FrameCache::emplace, &cache, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
std::cout << "Creating workers..." << std::endl;
JfjFrameWorker W0(5005, 1, push_cb);
//JfjFrameWorker W1(5005, 0, push_cb);
JfjFrameWorker W0(5005, 0, push_cb);
JfjFrameWorker W1(5006, 2, push_cb);
// JfjFrameWorker W2(5007, 2, push_cb);
std::thread T0(&JfjFrameWorker::run, &W0);
//std::thread T1(&JfjFrameWorker::run, &W1);
std::thread T1(&JfjFrameWorker::run, &W1);
T0.join();
//T1.join();
T1.join();
std::cout << "Exiting program..." << std::endl;
return 0;
}