more debug msgs. adjusts on streamer and assembler to work with frame_index instead of pulse_id.

This commit is contained in:
2021-02-19 18:16:34 +01:00
parent dda2f9610e
commit 7db438671c
11 changed files with 112 additions and 48 deletions
+31 -15
View File
@@ -89,11 +89,7 @@ void RamBuffer::write_frame(
cout << " || src_meta.n_recv_packets " << src_meta.n_recv_packets;
cout << " || src_meta.daq_rec " << src_meta.daq_rec;
cout << " || src_meta.module_id " << src_meta.module_id;
cout << " || dst_data " << dst_data;
cout << " || image_buffer_ " << image_buffer_;
cout << " || image_bytes_ " << image_bytes_;
cout << " || slot_n " << slot_n;
cout << " || MODULE_N_BYTES " << MODULE_N_BYTES;
cout << endl;
#endif
@@ -136,23 +132,20 @@ void RamBuffer::assemble_image(
if (!is_good_frame) {
is_good_image = false;
continue;
}
if (!is_pulse_init) {
#ifdef DEBUG_OUTPUT
using namespace date;
cout << " [" << std::chrono::system_clock::now();
cout << "] [RamBuffer::read_image] !is_pulse_init:";
cout << "Frame_meta pulse id: " << frame_meta->pulse_id;
cout << " || pulse id: " << pulse_id;
cout << " || frame_meta->n_recv_packets " << frame_meta->n_recv_packets;
cout << " || frame_index: " << frame_meta->frame_index;
cout << "] [RamBuffer::assemble_image] ";
cout << " not a good frame " << is_good_frame;
cout << "n_recv_packets != N_PACKETS_PER_FRAME";
cout << endl;
#endif
if (frame_meta->pulse_id != pulse_id) {
continue;
}
if (!is_pulse_init) {
if (frame_meta->frame_index != pulse_id) {
stringstream err_msg;
err_msg << "[RamBuffer::read_image]";
err_msg << "[RamBuffer::assemble_image]";
err_msg << " Unexpected pulse_id in ram buffer.";
err_msg << " expected=" << pulse_id;
err_msg << " got=" << frame_meta->pulse_id;
@@ -178,15 +171,38 @@ void RamBuffer::assemble_image(
if (is_good_image) {
if (frame_meta->pulse_id != image_meta.pulse_id) {
is_good_image = false;
#ifdef DEBUG_OUTPUT
using namespace date;
cout << " [" << std::chrono::system_clock::now();
cout << "] [RamBuffer::assemble_image] ";
cout << "not good image";
cout << "frame_meta->pulse_id != image_meta.pulse_id";
cout << endl;
#endif
// TODO: Add some diagnostics in case this happens.
}
if (frame_meta->frame_index != image_meta.frame_index) {
is_good_image = false;
#ifdef DEBUG_OUTPUT
using namespace date;
cout << " [" << std::chrono::system_clock::now();
cout << "] [RamBuffer::assemble_image] !is_pulse_init:";
cout << "frame_meta->frame_index != image_meta.frame_index";
cout << endl;
#endif
}
if (frame_meta->daq_rec != image_meta.daq_rec) {
is_good_image = false;
#ifdef DEBUG_OUTPUT
using namespace date;
cout << " [" << std::chrono::system_clock::now();
cout << "] [RamBuffer::assemble_image] !is_pulse_init:";
cout << "frame_meta->daq_rec != image_meta.daq_rec";
cout << endl;
#endif
}
}
}
+14
View File
@@ -7,6 +7,7 @@
#include <chrono>
#include <algorithm>
#include <iostream>
#include "date.h"
#include "assembler_config.hpp"
@@ -52,6 +53,13 @@ PulseAndSync ZmqPulseSyncReceiver::get_next_pulse_id() const
}
if (modules_in_sync) {
#ifdef DEBUG_OUTPUT
using namespace date;
cout << " [" << std::chrono::system_clock::now();
cout << "] [ZmqPulseSyncReceiver::get_next_pulse_id] modules_in_sync true";
cout << "] returning pulses[0]";
cout << endl;
#endif
return {pulses[0], 0};
}
@@ -102,6 +110,12 @@ PulseAndSync ZmqPulseSyncReceiver::get_next_pulse_id() const
n_lost_pulses += i_sync_lost_pulses;
if (modules_in_sync) {
#ifdef DEBUG_OUTPUT
using namespace date;
cout << " [" << std::chrono::system_clock::now();
cout << "] [ZmqPulseSyncReceiver::get_next_pulse_id] modules_in_sync false";
cout << endl;
#endif
return {pulses[0], n_lost_pulses};
}
}
+5 -1
View File
@@ -16,7 +16,11 @@ int main (int argc, char *argv[])
{
if (argc != 2) {
cout << endl;
cout << "Usage: jf_assembler [detector_json_filename]" << endl;
#ifndef USE_EIGER
cout << "Usage: jf_assembler [detector_json_filename]" << endl;
#else
cout << "Usage: eiger_assembler [detector_json_filename]" << endl;
#endif
cout << "\tdetector_json_filename: detector config file path." << endl;
cout << endl;
+3 -4
View File
@@ -52,8 +52,6 @@ inline void FrameUdpReceiver::init_frame(
cout << " [" << std::chrono::system_clock::now();
cout << "] [FrameUdpReceiver::init_frame] :";
cout << " Frame number: " << frame_metadata.frame_index << endl;
// cout << " packet_buffer_ " << packet_buffer_[i_packet] << endl;
cout << "i_packet" << i_packet << endl;
cout << endl;
#endif
}
@@ -98,7 +96,7 @@ inline uint64_t FrameUdpReceiver::process_packets(
// Continue on this packet.
packet_buffer_offset_ = i_packet;
return metadata.pulse_id;
return metadata.frame_index;
}
copy_packet_to_buffers(metadata, frame_buffer, i_packet);
@@ -114,7 +112,8 @@ inline uint64_t FrameUdpReceiver::process_packets(
cout << " Frame " << metadata.frame_index << " || ";
cout << packet_buffer_[i_packet].packetnum << " packets received.";
cout << " packet_buffer_n_packets_ " << packet_buffer_n_packets_;
cout << " i_packet "<< i_packet;
cout << " I_PACKET "<< i_packet;
cout << " PULSE ID "<< metadata.pulse_id;
cout << endl;
#endif
// Buffer is loaded only if this is not the last message.
+2 -4
View File
@@ -56,12 +56,10 @@ int main (int argc, char *argv[]) {
bool bad_pulse_id = false;
if ( ( meta.frame_index != (frame_index_previous+1) ) ||
( (pulse_id-pulse_id_previous) < 0 ) ||
( (pulse_id-pulse_id_previous) > 1000 ) ) {
if ( meta.frame_index != (frame_index_previous+1) && frame_index_previous != 0) {
bad_pulse_id = true;
} else {
buffer.write_frame(meta, data);
+8 -1
View File
@@ -8,7 +8,14 @@ target_link_libraries(sf-stream-lib
core-buffer-lib)
add_executable(sf-stream src/main.cpp)
set_target_properties(sf-stream PROPERTIES OUTPUT_NAME sf_stream)
if (USE_EIGER)
set (LIB_NAME_STREAMER "eiger_stream")
else()
set (LIB_NAME_STREAMER "sf_stream")
endif()
set_target_properties(sf-stream PROPERTIES OUTPUT_NAME ${LIB_NAME_STREAMER})
target_link_libraries(sf-stream
external
core-buffer-lib
+1
View File
@@ -1,6 +1,7 @@
#ifndef SF_DAQ_BUFFER_STREAMSTATS_HPP
#define SF_DAQ_BUFFER_STREAMSTATS_HPP
#include "date.h"
#include <chrono>
#include <string>
#include <formats.hpp>
+9
View File
@@ -13,6 +13,15 @@ StreamStats::StreamStats(
stream_name_(stream_name),
stats_modulo_(stats_modulo)
{
#ifdef DEBUG_OUTPUT
using namespace date;
cout << " [" << std::chrono::system_clock::now();
cout << "] [StreamStats::StreamStats] ";
cout << " detector_name: " << detector_name_;
cout << " || stream_name: " << stream_name;
cout << " || stats_modulo: " << stats_modulo;
cout << endl;
#endif
reset_counters();
}
+30 -22
View File
@@ -2,6 +2,9 @@
#include "stream_config.hpp"
#include "zmq.h"
#include "date.h"
#include <chrono>
#include <iostream>
#include <stdexcept>
#include <rapidjson/document.h>
#include <rapidjson/stringbuffer.h>
@@ -121,17 +124,19 @@ void ZmqLiveSender::send(const ImageMetadata& meta, const char *data)
text_header.size(),
ZMQ_SNDMORE);
auto size_to_send = buffer_config::MODULE_N_BYTES * config_.n_modules;
if ( send_streamvis == 0 ) {
zmq_send(socket_streamvis_,
(char*)data,
buffer_config::MODULE_N_BYTES * config_.n_modules,
0);
} else {
zmq_send(socket_streamvis_,
(char*)data_empty,
8,
0);
size_to_send = 8;
}
#ifdef DEBUG_OUTPUT
using namespace date;
cout << " [" << std::chrono::system_clock::now();
cout << "] [ZmqLiveSender::send] ";
cout << "send_streamvis : " << text_header.c_str();
cout << endl;
#endif
zmq_send(socket_streamvis_, (char*)data, size_to_send, 0);
//same for live analysis
int send_live_analysis = 0;
@@ -156,24 +161,27 @@ void ZmqLiveSender::send(const ImageMetadata& meta, const char *data)
text_header = buffer.GetString();
}
// TODO: Ugly. Fix this flow control.
size_to_send = buffer_config::MODULE_N_BYTES * config_.n_modules;
if ( send_live_analysis == 0 ) {
size_to_send = 8;
}
if (zmq_send(socket_live_,
text_header.c_str(),
text_header.size(),
ZMQ_SNDMORE | ZMQ_NOBLOCK) != -1) {
if ( send_live_analysis == 0 ) {
zmq_send(socket_live_,
(char*)data,
buffer_config::MODULE_N_BYTES * config_.n_modules,
ZMQ_NOBLOCK);
} else {
zmq_send(socket_live_,
(char*)data_empty,
8,
ZMQ_NOBLOCK);
}
zmq_send(socket_live_,
(char*)data,
size_to_send,
ZMQ_NOBLOCK);
}
#ifdef DEBUG_OUTPUT
using namespace date;
cout << " [" << std::chrono::system_clock::now();
cout << "] [ZmqLiveSender::send] ";
cout << "send_live_analysis : " << text_header.c_str();
cout << endl;
#endif
}
+6
View File
@@ -42,6 +42,12 @@ PulseAndSync ZmqPulseSyncReceiver::get_next_pulse_id() const
{
uint64_t pulses[n_modules_];
#ifdef DEBUG_OUTPUT
cout << "[ZmqPulseSyncReceiver::get_next_pulse_id()]";
cout << "n_modules_" << n_modules_;
cout << endl;
#endif
bool modules_in_sync = true;
for (int i = 0; i < n_modules_; i++) {
zmq_recv(sockets_[i], &pulses[i], sizeof(uint64_t), 0);
+3 -1
View File
@@ -19,6 +19,7 @@ int main (int argc, char *argv[])
cout << "Usage: sf_stream [detector_json_filename]"
" [stream_name]" << endl;
cout << "\tdetector_json_filename: detector config file path." << endl;
cout << "\tstream_name: name of the stream." << endl;
cout << endl;
exit(-1);
@@ -40,7 +41,8 @@ int main (int argc, char *argv[])
ImageMetadata meta;
while (true) {
zmq_recv(receiver, &meta, sizeof(meta), 0);
char* data = ram_buffer.read_image(meta.pulse_id);
char* data = ram_buffer.read_image(meta.frame_index);
sender.send(meta, data);