diff --git a/jfj-combined/include/JfjFrameCache.hpp b/jfj-combined/include/JfjFrameCache.hpp index 8c34279..0207d14 100644 --- a/jfj-combined/include/JfjFrameCache.hpp +++ b/jfj-combined/include/JfjFrameCache.hpp @@ -45,8 +45,6 @@ public: uint64_t idx = pulseID % m_CAP; // A new frame is starting - std::cout << " Pulse_ids: " << ref_frame.meta.pulse_id << " (new)\t" << m_buffer[idx].meta.pulse_id << " (old)" << std::endl; - if(ref_frame.meta.pulse_id != m_buffer[idx].meta.pulse_id){ start_line(idx, ref_frame.meta); } diff --git a/jfj-combined/include/ZmqImagePublisher.hpp b/jfj-combined/include/ZmqImagePublisher.hpp index c5061d7..5ababe4 100644 --- a/jfj-combined/include/ZmqImagePublisher.hpp +++ b/jfj-combined/include/ZmqImagePublisher.hpp @@ -38,7 +38,7 @@ class ZmqPublisher { std::mutex g_zmq_socket; public: - ZmqPublisher(uint16_t port) : + ZmqPublisher(std::string ip, uint16_t port) : m_port(port), m_address("tcp://*:" + std::to_string(port)), m_ctx(1), m_socket(m_ctx, ZMQ_PUB) { // Bind the socket m_socket.bind(m_address.c_str()); @@ -56,15 +56,21 @@ class ZmqPublisher { **/ class ZmqImagePublisher: public ZmqPublisher { public: - ZmqImagePublisher(uint16_t port) : ZmqPublisher(port) {}; + ZmqImagePublisher(std::string ip, uint16_t port) : ZmqPublisher(ip, port) {}; + const std::string topic = "IMAGEDATA"; void sendImage(ImageBinaryFormat& image){ std::lock_guard guard(g_zmq_socket); int len; + + len = m_socket.send(topic.c_str(), topic.size(), ZMQ_SNDMORE); + ASSERT_TRUE( len >=0, "Failed to send topic data" ) len = m_socket.send(&image.meta, sizeof(image.meta), ZMQ_SNDMORE); - ASSERT_TRUE( len >=0, "Failed to send image data" ) + ASSERT_TRUE( len >=0, "Failed to send meta data" ) + // std::cout << "\tPT1 Sent " << len << "\n"; len = m_socket.send(image.data.data(), image.data.size(), 0); ASSERT_TRUE( len >=0, "Failed to send image data" ) + // std::cout << "\tPT1 Sent " << len << "\n"; std::cout << "Sent ZMQ stream of pulse: " << image.meta.pulse_id << std::endl; } diff --git a/jfj-combined/src/JfjFrameWorker.cpp b/jfj-combined/src/JfjFrameWorker.cpp index f1d5056..8ab4fbc 100644 --- a/jfj-combined/src/JfjFrameWorker.cpp +++ b/jfj-combined/src/JfjFrameWorker.cpp @@ -47,6 +47,10 @@ inline uint64_t JfjFrameWorker::process_packets(BufferBinaryFormat& buffer){ buffer.meta.module_id = m_moduleID; // Copy data to frame buffer + if(c_packet.packetnum >= JF_N_PACKETS_PER_FRAME){ + std::cout << "Too high packet index: " << c_packet.packetnum << std::endl; + return 0; + } size_t offset = JUNGFRAU_DATA_BYTES_PER_PACKET * c_packet.packetnum; memcpy( (void*) (buffer.data + offset), c_packet.data, JUNGFRAU_DATA_BYTES_PER_PACKET); buffer.meta.n_recv_packets++; @@ -105,7 +109,6 @@ void JfjFrameWorker::run(){ auto pulse_id = get_frame(buffer); if(pulse_id>10){ - std::cout << "Pushing " << pulse_id << std::endl; f_push_callback(pulse_id, m_moduleID, buffer); } } diff --git a/jfj-combined/src/main.cpp b/jfj-combined/src/main.cpp index 8d61a70..dbe7662 100644 --- a/jfj-combined/src/main.cpp +++ b/jfj-combined/src/main.cpp @@ -11,11 +11,10 @@ int main (int argc, char *argv[]) { std::cout << "Creating ZMQ socket..." << std::endl; - ZmqImagePublisher pub(5558); + ZmqImagePublisher pub("129.129.144.76", 5158); std::function zmq_publish = std::bind(&ZmqImagePublisher::sendImage, &pub, std::placeholders::_1); - std::cout << "Creating frame cache..." << std::endl; FrameCache cache(32, 3, zmq_publish); @@ -24,17 +23,18 @@ int main (int argc, char *argv[]) { std::cout << "Creating workers..." << std::endl; JfjFrameWorker W0(5005, 0, push_cb); - JfjFrameWorker W1(5006, 2, push_cb); - // JfjFrameWorker W2(5007, 2, push_cb); + JfjFrameWorker W1(5006, 1, push_cb); + JfjFrameWorker W2(5007, 2, push_cb); - - + std::cout << "Starting worker threads..." << std::endl; std::thread T0(&JfjFrameWorker::run, &W0); std::thread T1(&JfjFrameWorker::run, &W1); + std::thread T2(&JfjFrameWorker::run, &W2); T0.join(); T1.join(); + T2.join(); std::cout << "Exiting program..." << std::endl; return 0; }