fixed fram sync issue. mainly due to zmq msg being cleaned up before sending it

This commit is contained in:
maliakal_d 2025-05-05 14:03:00 +02:00
parent 6c604e2340
commit 0ecc6030c6

View File

@ -107,11 +107,11 @@ void zmq_free(void *data, void *hint) { delete[] static_cast<char *>(data); }
void print_frames(const PortFrameMap &frame_port_map) { void print_frames(const PortFrameMap &frame_port_map) {
LOG(sls::logDEBUG) << "Printing frames"; LOG(sls::logDEBUG) << "Printing frames";
for (const auto &it : frame_port_map) { for (const auto &it : frame_port_map) {
uint16_t udpPort = it.first; const uint16_t udpPort = it.first;
const auto &frame_map = it.second; const auto &frame_map = it.second;
LOG(sls::logDEBUG) << "UDP port: " << udpPort; LOG(sls::logDEBUG) << "UDP port: " << udpPort;
for (const auto &frame : frame_map) { for (const auto &frame : frame_map) {
uint64_t fnum = frame.first; const uint64_t fnum = frame.first;
const auto &msg_list = frame.second; const auto &msg_list = frame.second;
LOG(sls::logDEBUG) LOG(sls::logDEBUG)
<< " acq index: " << fnum << '[' << msg_list.size() << ']'; << " acq index: " << fnum << '[' << msg_list.size() << ']';
@ -130,31 +130,32 @@ std::set<uint64_t> get_valid_fnums(const PortFrameMap &port_frame_map) {
// collect all unique frame numbers from all ports // collect all unique frame numbers from all ports
std::set<uint64_t> unique_fnums; std::set<uint64_t> unique_fnums;
for (auto it = port_frame_map.begin(); it != port_frame_map.end(); ++it) { for (const auto &it : port_frame_map) {
const FrameMap &frame_map = it->second; const FrameMap &frame_map = it.second;
for (auto frame = frame_map.begin(); frame != frame_map.end(); for (const auto &frame : frame_map) {
++frame) { unique_fnums.insert(frame.first);
unique_fnums.insert(frame->first);
} }
} }
// collect valid frame numbers // collect valid frame numbers
for (auto &fnum : unique_fnums) { for (auto &fnum : unique_fnums) {
bool is_valid = true; bool is_valid = true;
for (auto it = port_frame_map.begin(); it != port_frame_map.end(); for (const auto &it : port_frame_map) {
++it) { const uint16_t port = it.first;
uint16_t port = it->first; const FrameMap &frame_map = it.second;
const FrameMap &frame_map = it->second;
auto frame = frame_map.find(fnum); auto frame = frame_map.find(fnum);
// invalid: fnum missing in one port // invalid: fnum missing in one port
if (frame == frame_map.end()) { if (frame == frame_map.end()) {
LOG(sls::logDEBUG) LOG(sls::logDEBUG)
<< "Fnum " << fnum << " is missing in port " << port; << "Fnum " << fnum << " is missing in port " << port;
/*
// invalid: fnum greater than all in that port // invalid: fnum greater than all in that port
auto last_frame = auto last_frame = std::prev(frame_map.end());
frame_map.upper_bound(fnum); // std::prev(frame_map.end()); auto last_fnum = last_frame->first;
// auto last_fnum = last_frame->first; if (fnum > last_fnum) {
if (last_frame == frame_map.end()) { //(fnum > last_fnum) { */
auto upper_frame = frame_map.upper_bound(fnum);
if (upper_frame == frame_map.end()) {
LOG(sls::logDEBUG) << "And no larger fnum found. Fnum " LOG(sls::logDEBUG) << "And no larger fnum found. Fnum "
<< fnum << " is invalid.\n"; << fnum << " is invalid.\n";
is_valid = false; is_valid = false;
@ -224,29 +225,38 @@ void Correlate(FrameStatus *stat) {
// sending all valid fnum data packets // sending all valid fnum data packets
for (const auto &fnum : valid_fnums) { for (const auto &fnum : valid_fnums) {
ZmqMsgList msg_list; ZmqMsgList msg_list;
PortFrameMap &port_frame_map = stat->frames; // PortFrameMap &port_frame_map = stat->frames;
for (auto it = port_frame_map.begin(); for (const auto &it : stat->frames) {
it != port_frame_map.end(); ++it) { const uint16_t port = it.first;
uint16_t port = it->first; const FrameMap &frame_map = it.second;
const FrameMap &frame_map = it->second;
auto frame = frame_map.find(fnum); auto frame = frame_map.find(fnum);
if (frame != frame_map.end()) { if (frame != frame_map.end()) {
msg_list.insert(msg_list.end(), msg_list.insert(msg_list.end(),
stat->frames[port][fnum].begin(), stat->frames[port][fnum].begin(),
stat->frames[port][fnum].end()); stat->frames[port][fnum].end());
// clean up // clean up
for (zmq_msg_t *msg : stat->frames[port][fnum]) { /*for (zmq_msg_t *msg : stat->frames[port][fnum]) {
if (msg) { if (msg) {
zmq_msg_close(msg); zmq_msg_close(msg);
delete msg; delete msg;
} }
} }*/
stat->frames[port].erase(fnum); stat->frames[port].erase(fnum);
} }
} }
LOG(printHeadersLevel) LOG(printHeadersLevel)
<< "Sending data packets for fnum " << fnum; << "Sending data packets for fnum " << fnum;
zmq_send_multipart(socket, msg_list); zmq_send_multipart(socket, msg_list);
for (const auto &it : stat->frames) {
const uint16_t port = it.first;
// clean up
for (zmq_msg_t *msg : stat->frames[port][fnum]) {
if (msg) {
zmq_msg_close(msg);
delete msg;
}
}
}
} }
} }
// sending all end packets // sending all end packets