Works with 3 module but synchronization problem

This commit is contained in:
Mohacsi Istvan
2021-06-29 17:09:16 +02:00
parent 64236ad22e
commit c9863a3071
4 changed files with 19 additions and 12 deletions
-2
View File
@@ -45,8 +45,6 @@ public:
uint64_t idx = pulseID % m_CAP;
// A new frame is starting
std::cout << " Pulse_ids: " << ref_frame.meta.pulse_id << " (new)\t" << m_buffer[idx].meta.pulse_id << " (old)" << std::endl;
if(ref_frame.meta.pulse_id != m_buffer[idx].meta.pulse_id){
start_line(idx, ref_frame.meta);
}
+9 -3
View File
@@ -38,7 +38,7 @@ class ZmqPublisher {
std::mutex g_zmq_socket;
public:
ZmqPublisher(uint16_t port) :
ZmqPublisher(std::string ip, uint16_t port) :
m_port(port), m_address("tcp://*:" + std::to_string(port)), m_ctx(1), m_socket(m_ctx, ZMQ_PUB) {
// Bind the socket
m_socket.bind(m_address.c_str());
@@ -56,15 +56,21 @@ class ZmqPublisher {
**/
class ZmqImagePublisher: public ZmqPublisher {
public:
ZmqImagePublisher(uint16_t port) : ZmqPublisher(port) {};
ZmqImagePublisher(std::string ip, uint16_t port) : ZmqPublisher(ip, port) {};
const std::string topic = "IMAGEDATA";
void sendImage(ImageBinaryFormat& image){
std::lock_guard<std::mutex> guard(g_zmq_socket);
int len;
len = m_socket.send(topic.c_str(), topic.size(), ZMQ_SNDMORE);
ASSERT_TRUE( len >=0, "Failed to send topic data" )
len = m_socket.send(&image.meta, sizeof(image.meta), ZMQ_SNDMORE);
ASSERT_TRUE( len >=0, "Failed to send image data" )
ASSERT_TRUE( len >=0, "Failed to send meta data" )
// std::cout << "\tPT1 Sent " << len << "\n";
len = m_socket.send(image.data.data(), image.data.size(), 0);
ASSERT_TRUE( len >=0, "Failed to send image data" )
// std::cout << "\tPT1 Sent " << len << "\n";
std::cout << "Sent ZMQ stream of pulse: " << image.meta.pulse_id << std::endl;
}
+4 -1
View File
@@ -47,6 +47,10 @@ inline uint64_t JfjFrameWorker::process_packets(BufferBinaryFormat& buffer){
buffer.meta.module_id = m_moduleID;
// Copy data to frame buffer
if(c_packet.packetnum >= JF_N_PACKETS_PER_FRAME){
std::cout << "Too high packet index: " << c_packet.packetnum << std::endl;
return 0;
}
size_t offset = JUNGFRAU_DATA_BYTES_PER_PACKET * c_packet.packetnum;
memcpy( (void*) (buffer.data + offset), c_packet.data, JUNGFRAU_DATA_BYTES_PER_PACKET);
buffer.meta.n_recv_packets++;
@@ -105,7 +109,6 @@ void JfjFrameWorker::run(){
auto pulse_id = get_frame(buffer);
if(pulse_id>10){
std::cout << "Pushing " << pulse_id << std::endl;
f_push_callback(pulse_id, m_moduleID, buffer);
}
}
+6 -6
View File
@@ -11,11 +11,10 @@
int main (int argc, char *argv[]) {
std::cout << "Creating ZMQ socket..." << std::endl;
ZmqImagePublisher pub(5558);
ZmqImagePublisher pub("129.129.144.76", 5158);
std::function<void(ImageBinaryFormat&)> zmq_publish =
std::bind(&ZmqImagePublisher::sendImage, &pub, std::placeholders::_1);
std::cout << "Creating frame cache..." << std::endl;
FrameCache cache(32, 3, zmq_publish);
@@ -24,17 +23,18 @@ int main (int argc, char *argv[]) {
std::cout << "Creating workers..." << std::endl;
JfjFrameWorker W0(5005, 0, push_cb);
JfjFrameWorker W1(5006, 2, push_cb);
// JfjFrameWorker W2(5007, 2, push_cb);
JfjFrameWorker W1(5006, 1, push_cb);
JfjFrameWorker W2(5007, 2, push_cb);
std::cout << "Starting worker threads..." << std::endl;
std::thread T0(&JfjFrameWorker::run, &W0);
std::thread T1(&JfjFrameWorker::run, &W1);
std::thread T2(&JfjFrameWorker::run, &W2);
T0.join();
T1.join();
T2.join();
std::cout << "Exiting program..." << std::endl;
return 0;
}