From 0ecc6030c6aa0eb3a47bf59f448e26ef24b76b76 Mon Sep 17 00:00:00 2001 From: Dhanya Thattil Date: Mon, 5 May 2025 14:03:00 +0200 Subject: [PATCH] fixed fram sync issue. mainly due to zmq msg being cleaned up before sending it --- .../src/FrameSynchronizerApp.cpp | 54 +++++++++++-------- 1 file changed, 32 insertions(+), 22 deletions(-) diff --git a/slsReceiverSoftware/src/FrameSynchronizerApp.cpp b/slsReceiverSoftware/src/FrameSynchronizerApp.cpp index 96275e1a7..22983dc88 100644 --- a/slsReceiverSoftware/src/FrameSynchronizerApp.cpp +++ b/slsReceiverSoftware/src/FrameSynchronizerApp.cpp @@ -107,11 +107,11 @@ void zmq_free(void *data, void *hint) { delete[] static_cast(data); } void print_frames(const PortFrameMap &frame_port_map) { LOG(sls::logDEBUG) << "Printing frames"; for (const auto &it : frame_port_map) { - uint16_t udpPort = it.first; + const uint16_t udpPort = it.first; const auto &frame_map = it.second; LOG(sls::logDEBUG) << "UDP port: " << udpPort; for (const auto &frame : frame_map) { - uint64_t fnum = frame.first; + const uint64_t fnum = frame.first; const auto &msg_list = frame.second; LOG(sls::logDEBUG) << " acq index: " << fnum << '[' << msg_list.size() << ']'; @@ -130,31 +130,32 @@ std::set get_valid_fnums(const PortFrameMap &port_frame_map) { // collect all unique frame numbers from all ports std::set unique_fnums; - for (auto it = port_frame_map.begin(); it != port_frame_map.end(); ++it) { - const FrameMap &frame_map = it->second; - for (auto frame = frame_map.begin(); frame != frame_map.end(); - ++frame) { - unique_fnums.insert(frame->first); + for (const auto &it : port_frame_map) { + const FrameMap &frame_map = it.second; + for (const auto &frame : frame_map) { + unique_fnums.insert(frame.first); } } // collect valid frame numbers for (auto &fnum : unique_fnums) { bool is_valid = true; - for (auto it = port_frame_map.begin(); it != port_frame_map.end(); - ++it) { - uint16_t port = it->first; - const FrameMap &frame_map = it->second; + for (const auto &it : port_frame_map) { + const uint16_t port = it.first; + const FrameMap &frame_map = it.second; auto frame = frame_map.find(fnum); // invalid: fnum missing in one port if (frame == frame_map.end()) { LOG(sls::logDEBUG) << "Fnum " << fnum << " is missing in port " << port; + /* // invalid: fnum greater than all in that port - auto last_frame = - frame_map.upper_bound(fnum); // std::prev(frame_map.end()); - // auto last_fnum = last_frame->first; - if (last_frame == frame_map.end()) { //(fnum > last_fnum) { + auto last_frame = std::prev(frame_map.end()); + auto last_fnum = last_frame->first; + if (fnum > last_fnum) { + */ + auto upper_frame = frame_map.upper_bound(fnum); + if (upper_frame == frame_map.end()) { LOG(sls::logDEBUG) << "And no larger fnum found. Fnum " << fnum << " is invalid.\n"; is_valid = false; @@ -224,29 +225,38 @@ void Correlate(FrameStatus *stat) { // sending all valid fnum data packets for (const auto &fnum : valid_fnums) { ZmqMsgList msg_list; - PortFrameMap &port_frame_map = stat->frames; - for (auto it = port_frame_map.begin(); - it != port_frame_map.end(); ++it) { - uint16_t port = it->first; - const FrameMap &frame_map = it->second; + // PortFrameMap &port_frame_map = stat->frames; + for (const auto &it : stat->frames) { + const uint16_t port = it.first; + const FrameMap &frame_map = it.second; auto frame = frame_map.find(fnum); if (frame != frame_map.end()) { msg_list.insert(msg_list.end(), stat->frames[port][fnum].begin(), stat->frames[port][fnum].end()); // clean up - for (zmq_msg_t *msg : stat->frames[port][fnum]) { + /*for (zmq_msg_t *msg : stat->frames[port][fnum]) { if (msg) { zmq_msg_close(msg); delete msg; } - } + }*/ stat->frames[port].erase(fnum); } } LOG(printHeadersLevel) << "Sending data packets for fnum " << fnum; zmq_send_multipart(socket, msg_list); + for (const auto &it : stat->frames) { + const uint16_t port = it.first; + // clean up + for (zmq_msg_t *msg : stat->frames[port][fnum]) { + if (msg) { + zmq_msg_close(msg); + delete msg; + } + } + } } } // sending all end packets