diff --git a/core-buffer/src/RamBuffer.cpp b/core-buffer/src/RamBuffer.cpp index e03016e..85cb980 100644 --- a/core-buffer/src/RamBuffer.cpp +++ b/core-buffer/src/RamBuffer.cpp @@ -89,11 +89,7 @@ void RamBuffer::write_frame( cout << " || src_meta.n_recv_packets " << src_meta.n_recv_packets; cout << " || src_meta.daq_rec " << src_meta.daq_rec; cout << " || src_meta.module_id " << src_meta.module_id; - cout << " || dst_data " << dst_data; - cout << " || image_buffer_ " << image_buffer_; - cout << " || image_bytes_ " << image_bytes_; cout << " || slot_n " << slot_n; - cout << " || MODULE_N_BYTES " << MODULE_N_BYTES; cout << endl; #endif @@ -136,23 +132,20 @@ void RamBuffer::assemble_image( if (!is_good_frame) { is_good_image = false; - continue; - } - - if (!is_pulse_init) { #ifdef DEBUG_OUTPUT using namespace date; cout << " [" << std::chrono::system_clock::now(); - cout << "] [RamBuffer::read_image] !is_pulse_init:"; - cout << "Frame_meta pulse id: " << frame_meta->pulse_id; - cout << " || pulse id: " << pulse_id; - cout << " || frame_meta->n_recv_packets " << frame_meta->n_recv_packets; - cout << " || frame_index: " << frame_meta->frame_index; + cout << "] [RamBuffer::assemble_image] "; + cout << " not a good frame " << is_good_frame; + cout << "n_recv_packets != N_PACKETS_PER_FRAME"; cout << endl; #endif - if (frame_meta->pulse_id != pulse_id) { + continue; + } + if (!is_pulse_init) { + if (frame_meta->frame_index != pulse_id) { stringstream err_msg; - err_msg << "[RamBuffer::read_image]"; + err_msg << "[RamBuffer::assemble_image]"; err_msg << " Unexpected pulse_id in ram buffer."; err_msg << " expected=" << pulse_id; err_msg << " got=" << frame_meta->pulse_id; @@ -178,15 +171,38 @@ void RamBuffer::assemble_image( if (is_good_image) { if (frame_meta->pulse_id != image_meta.pulse_id) { is_good_image = false; + #ifdef DEBUG_OUTPUT + using namespace date; + cout << " [" << std::chrono::system_clock::now(); + cout << "] [RamBuffer::assemble_image] "; + cout << "not good image"; + cout << "frame_meta->pulse_id != image_meta.pulse_id"; + cout << endl; + #endif + // TODO: Add some diagnostics in case this happens. } if (frame_meta->frame_index != image_meta.frame_index) { is_good_image = false; + #ifdef DEBUG_OUTPUT + using namespace date; + cout << " [" << std::chrono::system_clock::now(); + cout << "] [RamBuffer::assemble_image] !is_pulse_init:"; + cout << "frame_meta->frame_index != image_meta.frame_index"; + cout << endl; + #endif } if (frame_meta->daq_rec != image_meta.daq_rec) { is_good_image = false; + #ifdef DEBUG_OUTPUT + using namespace date; + cout << " [" << std::chrono::system_clock::now(); + cout << "] [RamBuffer::assemble_image] !is_pulse_init:"; + cout << "frame_meta->daq_rec != image_meta.daq_rec"; + cout << endl; + #endif } } } diff --git a/jf-assembler/src/ZmqPulseSyncReceiver.cpp b/jf-assembler/src/ZmqPulseSyncReceiver.cpp index 6dbe2fc..33490fd 100644 --- a/jf-assembler/src/ZmqPulseSyncReceiver.cpp +++ b/jf-assembler/src/ZmqPulseSyncReceiver.cpp @@ -7,6 +7,7 @@ #include #include #include +#include "date.h" #include "assembler_config.hpp" @@ -52,6 +53,13 @@ PulseAndSync ZmqPulseSyncReceiver::get_next_pulse_id() const } if (modules_in_sync) { + #ifdef DEBUG_OUTPUT + using namespace date; + cout << " [" << std::chrono::system_clock::now(); + cout << "] [ZmqPulseSyncReceiver::get_next_pulse_id] modules_in_sync true"; + cout << "] returning pulses[0]"; + cout << endl; + #endif return {pulses[0], 0}; } @@ -102,6 +110,12 @@ PulseAndSync ZmqPulseSyncReceiver::get_next_pulse_id() const n_lost_pulses += i_sync_lost_pulses; if (modules_in_sync) { + #ifdef DEBUG_OUTPUT + using namespace date; + cout << " [" << std::chrono::system_clock::now(); + cout << "] [ZmqPulseSyncReceiver::get_next_pulse_id] modules_in_sync false"; + cout << endl; + #endif return {pulses[0], n_lost_pulses}; } } diff --git a/jf-assembler/src/main.cpp b/jf-assembler/src/main.cpp index e1b76ab..32cae0e 100644 --- a/jf-assembler/src/main.cpp +++ b/jf-assembler/src/main.cpp @@ -16,7 +16,11 @@ int main (int argc, char *argv[]) { if (argc != 2) { cout << endl; - cout << "Usage: jf_assembler [detector_json_filename]" << endl; + #ifndef USE_EIGER + cout << "Usage: jf_assembler [detector_json_filename]" << endl; + #else + cout << "Usage: eiger_assembler [detector_json_filename]" << endl; + #endif cout << "\tdetector_json_filename: detector config file path." << endl; cout << endl; diff --git a/jf-udp-recv/src/FrameUdpReceiver.cpp b/jf-udp-recv/src/FrameUdpReceiver.cpp index 81efbe6..503924d 100644 --- a/jf-udp-recv/src/FrameUdpReceiver.cpp +++ b/jf-udp-recv/src/FrameUdpReceiver.cpp @@ -52,8 +52,6 @@ inline void FrameUdpReceiver::init_frame( cout << " [" << std::chrono::system_clock::now(); cout << "] [FrameUdpReceiver::init_frame] :"; cout << " Frame number: " << frame_metadata.frame_index << endl; - // cout << " packet_buffer_ " << packet_buffer_[i_packet] << endl; - cout << "i_packet" << i_packet << endl; cout << endl; #endif } @@ -98,7 +96,7 @@ inline uint64_t FrameUdpReceiver::process_packets( // Continue on this packet. packet_buffer_offset_ = i_packet; - return metadata.pulse_id; + return metadata.frame_index; } copy_packet_to_buffers(metadata, frame_buffer, i_packet); @@ -114,7 +112,8 @@ inline uint64_t FrameUdpReceiver::process_packets( cout << " Frame " << metadata.frame_index << " || "; cout << packet_buffer_[i_packet].packetnum << " packets received."; cout << " packet_buffer_n_packets_ " << packet_buffer_n_packets_; - cout << " i_packet "<< i_packet; + cout << " I_PACKET "<< i_packet; + cout << " PULSE ID "<< metadata.pulse_id; cout << endl; #endif // Buffer is loaded only if this is not the last message. diff --git a/jf-udp-recv/src/main.cpp b/jf-udp-recv/src/main.cpp index 1038b54..eb36bc9 100644 --- a/jf-udp-recv/src/main.cpp +++ b/jf-udp-recv/src/main.cpp @@ -56,12 +56,10 @@ int main (int argc, char *argv[]) { bool bad_pulse_id = false; - if ( ( meta.frame_index != (frame_index_previous+1) ) || - ( (pulse_id-pulse_id_previous) < 0 ) || - ( (pulse_id-pulse_id_previous) > 1000 ) ) { + if ( meta.frame_index != (frame_index_previous+1) && frame_index_previous != 0) { bad_pulse_id = true; - + } else { buffer.write_frame(meta, data); diff --git a/sf-stream/CMakeLists.txt b/sf-stream/CMakeLists.txt index 9957767..e75bc4a 100644 --- a/sf-stream/CMakeLists.txt +++ b/sf-stream/CMakeLists.txt @@ -8,7 +8,14 @@ target_link_libraries(sf-stream-lib core-buffer-lib) add_executable(sf-stream src/main.cpp) -set_target_properties(sf-stream PROPERTIES OUTPUT_NAME sf_stream) + +if (USE_EIGER) + set (LIB_NAME_STREAMER "eiger_stream") +else() + set (LIB_NAME_STREAMER "sf_stream") +endif() + +set_target_properties(sf-stream PROPERTIES OUTPUT_NAME ${LIB_NAME_STREAMER}) target_link_libraries(sf-stream external core-buffer-lib diff --git a/sf-stream/include/StreamStats.hpp b/sf-stream/include/StreamStats.hpp index bca5ce0..f4c6a2c 100644 --- a/sf-stream/include/StreamStats.hpp +++ b/sf-stream/include/StreamStats.hpp @@ -1,6 +1,7 @@ #ifndef SF_DAQ_BUFFER_STREAMSTATS_HPP #define SF_DAQ_BUFFER_STREAMSTATS_HPP +#include "date.h" #include #include #include diff --git a/sf-stream/src/StreamStats.cpp b/sf-stream/src/StreamStats.cpp index 7408629..141c881 100644 --- a/sf-stream/src/StreamStats.cpp +++ b/sf-stream/src/StreamStats.cpp @@ -13,6 +13,15 @@ StreamStats::StreamStats( stream_name_(stream_name), stats_modulo_(stats_modulo) { + #ifdef DEBUG_OUTPUT + using namespace date; + cout << " [" << std::chrono::system_clock::now(); + cout << "] [StreamStats::StreamStats] "; + cout << " detector_name: " << detector_name_; + cout << " || stream_name: " << stream_name; + cout << " || stats_modulo: " << stats_modulo; + cout << endl; + #endif reset_counters(); } diff --git a/sf-stream/src/ZmqLiveSender.cpp b/sf-stream/src/ZmqLiveSender.cpp index 724bf04..195bbde 100644 --- a/sf-stream/src/ZmqLiveSender.cpp +++ b/sf-stream/src/ZmqLiveSender.cpp @@ -2,6 +2,9 @@ #include "stream_config.hpp" #include "zmq.h" +#include "date.h" +#include +#include #include #include #include @@ -121,17 +124,19 @@ void ZmqLiveSender::send(const ImageMetadata& meta, const char *data) text_header.size(), ZMQ_SNDMORE); + auto size_to_send = buffer_config::MODULE_N_BYTES * config_.n_modules; if ( send_streamvis == 0 ) { - zmq_send(socket_streamvis_, - (char*)data, - buffer_config::MODULE_N_BYTES * config_.n_modules, - 0); - } else { - zmq_send(socket_streamvis_, - (char*)data_empty, - 8, - 0); + size_to_send = 8; } + #ifdef DEBUG_OUTPUT + using namespace date; + cout << " [" << std::chrono::system_clock::now(); + cout << "] [ZmqLiveSender::send] "; + cout << "send_streamvis : " << text_header.c_str(); + cout << endl; + #endif + + zmq_send(socket_streamvis_, (char*)data, size_to_send, 0); //same for live analysis int send_live_analysis = 0; @@ -156,24 +161,27 @@ void ZmqLiveSender::send(const ImageMetadata& meta, const char *data) text_header = buffer.GetString(); } - // TODO: Ugly. Fix this flow control. + size_to_send = buffer_config::MODULE_N_BYTES * config_.n_modules; + if ( send_live_analysis == 0 ) { + size_to_send = 8; + } + if (zmq_send(socket_live_, text_header.c_str(), text_header.size(), ZMQ_SNDMORE | ZMQ_NOBLOCK) != -1) { - - if ( send_live_analysis == 0 ) { - zmq_send(socket_live_, - (char*)data, - buffer_config::MODULE_N_BYTES * config_.n_modules, - ZMQ_NOBLOCK); - } else { - zmq_send(socket_live_, - (char*)data_empty, - 8, - ZMQ_NOBLOCK); - } + zmq_send(socket_live_, + (char*)data, + size_to_send, + ZMQ_NOBLOCK); } + #ifdef DEBUG_OUTPUT + using namespace date; + cout << " [" << std::chrono::system_clock::now(); + cout << "] [ZmqLiveSender::send] "; + cout << "send_live_analysis : " << text_header.c_str(); + cout << endl; + #endif } diff --git a/sf-stream/src/ZmqPulseSyncReceiver.cpp b/sf-stream/src/ZmqPulseSyncReceiver.cpp index 96221f3..f4b50a6 100644 --- a/sf-stream/src/ZmqPulseSyncReceiver.cpp +++ b/sf-stream/src/ZmqPulseSyncReceiver.cpp @@ -42,6 +42,12 @@ PulseAndSync ZmqPulseSyncReceiver::get_next_pulse_id() const { uint64_t pulses[n_modules_]; + #ifdef DEBUG_OUTPUT + cout << "[ZmqPulseSyncReceiver::get_next_pulse_id()]"; + cout << "n_modules_" << n_modules_; + cout << endl; + #endif + bool modules_in_sync = true; for (int i = 0; i < n_modules_; i++) { zmq_recv(sockets_[i], &pulses[i], sizeof(uint64_t), 0); diff --git a/sf-stream/src/main.cpp b/sf-stream/src/main.cpp index dbe313a..1e1f65e 100644 --- a/sf-stream/src/main.cpp +++ b/sf-stream/src/main.cpp @@ -19,6 +19,7 @@ int main (int argc, char *argv[]) cout << "Usage: sf_stream [detector_json_filename]" " [stream_name]" << endl; cout << "\tdetector_json_filename: detector config file path." << endl; + cout << "\tstream_name: name of the stream." << endl; cout << endl; exit(-1); @@ -40,7 +41,8 @@ int main (int argc, char *argv[]) ImageMetadata meta; while (true) { zmq_recv(receiver, &meta, sizeof(meta), 0); - char* data = ram_buffer.read_image(meta.pulse_id); + char* data = ram_buffer.read_image(meta.frame_index); + sender.send(meta, data);