merged conflicts. assembler with external sync.

This commit is contained in:
lhdamiani
2021-07-06 14:37:41 +02:00
13 changed files with 141 additions and 139 deletions
+9
View File
@@ -104,6 +104,15 @@ ln -s "$(pwd)""/""sf_writer" /usr/bin/sf_writer
### Warnings
#### UDP recv tests failing
In case unit tests for std-udp-recv are failing the most common cause of
problems is the rmem limit. Please increase your rmem_max to something large:
```bash
echo 2147483646 > /proc/sys/net/core/rmem_max
```
#### Zeromq
Zeromq version 4.1.4 (default on RH7) has a LINGER bug. Sometimes, the last
+2
View File
@@ -1,6 +1,8 @@
#ifndef SF_DAQ_BUFFER_FORMATS_HPP
#define SF_DAQ_BUFFER_FORMATS_HPP
#define INVALID_FRAME_INDEX UINT64_C(-1)
#pragma pack(push)
#pragma pack(1)
struct ModuleFrame {
+1 -1
View File
@@ -89,7 +89,7 @@ int main (int argc, char *argv[])
AssemblerStats stats(config.detector_name, ASSEMBLER_STATS_MODULO);
uint64_t image_id = 0;
while (true) {
+4 -4
View File
@@ -17,16 +17,16 @@ struct StreamSendConfig {
return {
config_parameters["detector_name"].GetString(),
config_parameters["detector_type"].GetString(),
config_parameters["n_modules"].GetInt(),
config_parameters["start_udp_port"].GetInt(),
config_parameters["image_n_pixels"].GetInt(),
config_parameters["stream_address"].GetString()
};
}
const std::string detector_name;
const std::string detector_type;
const int n_modules;
const int start_udp_port;
const int image_n_pixels;
const std::string stream_address;
};
+20 -2
View File
@@ -3,14 +3,30 @@
#include "stream_config.hpp"
#include <chrono>
#include <thread>
#include <StreamSendConfig.hpp>
#include "RamBuffer.hpp"
using namespace std;
using namespace stream_config;
using namespace buffer_config;
int main (int argc, char *argv[])
{
if (argc != 3) {
cout << endl;
cout << "Usage: std_stream_send [detector_json_filename]"
" [bit_depth]" << endl;
cout << "\tdetector_json_filename: detector config file path." << endl;
cout << "\tbit_depth: bit depth of the incoming udp packets." << endl;
cout << endl;
exit(-1);
}
const auto config = StreamSendConfig::from_json_file(string(argv[1]));
const int bit_depth = atoi(argv[2]);
auto ctx = zmq_ctx_new();
zmq_ctx_set(ctx, ZMQ_IO_THREADS, STREAM_ZMQ_IO_THREADS);
@@ -27,12 +43,14 @@ int main (int argc, char *argv[])
throw runtime_error(zmq_strerror(errno));
}
if (zmq_bind(sender, "tcp://127.0.0.1:10000") != 0) {
if (zmq_bind(sender, config.stream_address.c_str()) != 0) {
throw runtime_error(zmq_strerror(errno));
}
const size_t IMAGE_N_BYTES = config.image_n_pixels * bit_depth / 8;
RamBuffer image_buffer(config.detector_name + "_assembler",
sizeof(ImageMetadata), assembler.get_image_n_bytes(), 1);
sizeof(ImageMetadata), IMAGE_N_BYTES,
config.n_modules, RAM_BUFFER_N_SLOTS);
while (true) {
+13 -9
View File
@@ -56,15 +56,17 @@ inline void FrameUdpReceiver::init_frame(
frame_metadata.daq_rec = (uint64_t) packet_buffer_[i_packet].debug;
frame_metadata.pos_y = (int16_t) packet_buffer_[i_packet].row;
frame_metadata.pos_x = (int16_t) packet_buffer_[i_packet].column;
frame_metadata.n_recv_packets = 0;
#ifdef DEBUG_OUTPUT
using namespace date;
cout << " [" << std::chrono::system_clock::now();
cout << "] [FrameUdpReceiver::init_frame] :";
cout << " || pos_y: " << frame_metadata.pos_x;
cout << " || pos_y: " << frame_metadata.pos_y;
cout << " || pos_x: " << frame_metadata.pos_x;
cout << " || pulse_id: " << frame_metadata.pulse_id;
cout << " || frame_index: " << frame_metadata.frame_index;
cout << " || daq_rec: " << frame_metadata.daq_rec;
cout << endl;
#endif
@@ -95,7 +97,7 @@ inline uint64_t FrameUdpReceiver::process_packets(
i_packet++) {
// First packet for this frame.
if (metadata.frame_index == 0) {
if (metadata.frame_index == INVALID_FRAME_INDEX) {
init_frame(metadata, i_packet);
// Happens if the last packet from the previous frame gets lost.
@@ -118,10 +120,10 @@ inline uint64_t FrameUdpReceiver::process_packets(
#ifdef DEBUG_OUTPUT
using namespace date;
cout << " [" << std::chrono::system_clock::now();
cout << "] [frameudpreceiver::process_packets] :";
cout << "] [FrameUdpReceiver::process_packets] :";
cout << " frame " << metadata.frame_index << " || ";
cout << packet_buffer_[i_packet].packetnum << " packets received.";
cout << " id "<< metadata.id;
cout << packet_buffer_[i_packet].packetnum+1;
cout << " packets received.";
cout << endl;
#endif
// buffer is loaded only if this is not the last message.
@@ -136,25 +138,27 @@ inline uint64_t FrameUdpReceiver::process_packets(
packet_buffer_offset_ = 0;
}
return metadata.pulse_id;
return metadata.frame_index;
}
}
// We emptied the buffer.
packet_buffer_loaded_ = false;
packet_buffer_offset_ = 0;
return 0;
return INVALID_FRAME_INDEX;
}
uint64_t FrameUdpReceiver::get_frame_from_udp(
ModuleFrame& meta, char* frame_buffer)
{
meta.frame_index = INVALID_FRAME_INDEX;
// Happens when last packet from previous frame was missed.
if (packet_buffer_loaded_) {
auto frame_index = process_packets(
packet_buffer_offset_, meta, frame_buffer);
if (frame_index != 0) {
if (frame_index != INVALID_FRAME_INDEX) {
return frame_index;
}
}
@@ -170,7 +174,7 @@ uint64_t FrameUdpReceiver::get_frame_from_udp(
auto frame_index = process_packets(0, meta, frame_buffer);
if (frame_index != 0) {
if (frame_index != INVALID_FRAME_INDEX) {
return frame_index;
}
}
+4 -4
View File
@@ -20,10 +20,10 @@ int main (int argc, char *argv[]) {
if (argc != 4) {
cout << endl;
cout << "Usage: std_udp_recv [udp_recv_config_filename] [module_id] "
cout << "Usage: std_udp_recv [detector_json_filename] [module_id] "
"[bit_depth]";
cout << endl;
cout << "\tudp_recv_config_filename: detector config file path." << endl;
cout << "\tdetector_json_filename: detector config file path." << endl;
cout << "\tmodule_id: id of the module for this process." << endl;
cout << "\tbit_depth: bit depth of the incoming udp packets." << endl;
cout << endl;
@@ -45,8 +45,7 @@ int main (int argc, char *argv[]) {
FrameUdpReceiver receiver(udp_port, N_PACKETS_PER_FRAME);
RamBuffer frame_buffer(config.detector_name, sizeof(ModuleFrame),
FRAME_N_BYTES, config.n_modules,
buffer_config::RAM_BUFFER_N_SLOTS);
FRAME_N_BYTES, config.n_modules, RAM_BUFFER_N_SLOTS);
FrameStats stats(config.detector_name, module_id,
N_PACKETS_PER_FRAME, STATS_TIME);
@@ -63,6 +62,7 @@ int main (int argc, char *argv[]) {
// Reset the metadata and frame buffer for the next frame.
meta.frame_index = 0;
meta.n_recv_packets = 0;
// Reset the data buffer.
memset(data, 0, FRAME_N_BYTES);
receiver.get_frame_from_udp(meta, data);
+3 -3
View File
@@ -1,8 +1,8 @@
add_executable(jf-udp-recv-tests main.cpp)
add_executable(std-udp-recv-tests main.cpp)
target_link_libraries(jf-udp-recv-tests
target_link_libraries(std-udp-recv-tests
core-buffer-lib
jf-udp-recv-lib
std-udp-recv-lib
gtest
)
+48 -45
View File
@@ -1,5 +1,4 @@
#include <netinet/in.h>
#include <jungfrau.hpp>
#include "gtest/gtest.h"
#include "FrameUdpReceiver.hpp"
#include "mock/udp.hpp"
@@ -10,10 +9,12 @@
using namespace std;
const int DATA_BYTES_PER_FRAME = 512*1024*2;
TEST(BufferUdpReceiver, simple_recv)
{
auto n_packets = N_PACKETS_PER_FRAME;
int source_id = 1234;
auto n_packets = 128;
int module_id = 0;
int n_frames = 5;
uint16_t udp_port = MOCK_UDP_PORT;
@@ -21,7 +22,7 @@ TEST(BufferUdpReceiver, simple_recv)
auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0);
ASSERT_TRUE(send_socket_fd >= 0);
FrameUdpReceiver udp_receiver(udp_port, source_id);
FrameUdpReceiver udp_receiver(udp_port, n_packets);
auto handle = async(launch::async, [&](){
for (int i_frame=0; i_frame < n_frames; i_frame++){
@@ -45,19 +46,20 @@ TEST(BufferUdpReceiver, simple_recv)
handle.wait();
ModuleFrame metadata;
ModuleFrame meta;
meta.module_id = module_id;
meta.bit_depth = 16;
auto frame_buffer = make_unique<char[]>(DATA_BYTES_PER_FRAME);
for (int i_frame=0; i_frame < n_frames; i_frame++) {
auto pulse_id = udp_receiver.get_frame_from_udp(
metadata, frame_buffer.get());
udp_receiver.get_frame_from_udp(meta, frame_buffer.get());
ASSERT_EQ(i_frame + 1, pulse_id);
ASSERT_EQ(metadata.frame_index, i_frame + 1000);
ASSERT_EQ(metadata.daq_rec, i_frame + 10000);
ASSERT_EQ(i_frame + 1, meta.pulse_id);
ASSERT_EQ(meta.frame_index, i_frame + 1000);
ASSERT_EQ(meta.daq_rec, i_frame + 10000);
// -1 because we skipped a packet.
ASSERT_EQ(metadata.n_recv_packets, n_packets);
ASSERT_EQ(metadata.module_id, source_id);
ASSERT_EQ(meta.n_recv_packets, n_packets);
ASSERT_EQ(meta.module_id, module_id);
}
::close(send_socket_fd);
@@ -65,8 +67,8 @@ TEST(BufferUdpReceiver, simple_recv)
TEST(BufferUdpReceiver, missing_middle_packet)
{
auto n_packets = N_PACKETS_PER_FRAME;
int source_id = 1234;
auto n_packets = 128;
int module_id = 1234;
int n_frames = 3;
uint16_t udp_port = MOCK_UDP_PORT;
@@ -74,7 +76,7 @@ TEST(BufferUdpReceiver, missing_middle_packet)
auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0);
ASSERT_TRUE(send_socket_fd >= 0);
FrameUdpReceiver udp_receiver(udp_port, source_id);
FrameUdpReceiver udp_receiver(udp_port, n_packets);
auto handle = async(launch::async, [&](){
for (int i_frame=0; i_frame < n_frames; i_frame++){
@@ -103,19 +105,20 @@ TEST(BufferUdpReceiver, missing_middle_packet)
handle.wait();
ModuleFrame metadata;
ModuleFrame meta;
meta.module_id = module_id;
auto frame_buffer = make_unique<char[]>(DATA_BYTES_PER_FRAME);
for (int i_frame=0; i_frame < n_frames; i_frame++) {
auto pulse_id = udp_receiver.get_frame_from_udp(
metadata, frame_buffer.get());
udp_receiver.get_frame_from_udp(meta, frame_buffer.get());
ASSERT_EQ(i_frame + 1, pulse_id);
ASSERT_EQ(metadata.frame_index, i_frame + 1000);
ASSERT_EQ(metadata.daq_rec, i_frame + 10000);
ASSERT_EQ(i_frame + 1, meta.pulse_id);
ASSERT_EQ(meta.frame_index, i_frame + 1000);
ASSERT_EQ(meta.daq_rec, i_frame + 10000);
// -1 because we skipped a packet.
ASSERT_EQ(metadata.n_recv_packets, n_packets - 1);
ASSERT_EQ(metadata.module_id, source_id);
ASSERT_EQ(meta.n_recv_packets, n_packets - 1);
ASSERT_EQ(meta.module_id, module_id);
}
::close(send_socket_fd);
@@ -123,8 +126,8 @@ TEST(BufferUdpReceiver, missing_middle_packet)
TEST(BufferUdpReceiver, missing_first_packet)
{
auto n_packets = N_PACKETS_PER_FRAME;
int source_id = 1234;
auto n_packets = 128;
int module_id = 1234;
int n_frames = 3;
uint16_t udp_port = MOCK_UDP_PORT;
@@ -132,7 +135,7 @@ TEST(BufferUdpReceiver, missing_first_packet)
auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0);
ASSERT_TRUE(send_socket_fd >= 0);
FrameUdpReceiver udp_receiver(udp_port, source_id);
FrameUdpReceiver udp_receiver(udp_port, n_packets);
auto handle = async(launch::async, [&](){
for (int i_frame=0; i_frame < n_frames; i_frame++){
@@ -161,19 +164,19 @@ TEST(BufferUdpReceiver, missing_first_packet)
handle.wait();
ModuleFrame metadata;
ModuleFrame meta;
meta.module_id = module_id;
auto frame_buffer = make_unique<char[]>(DATA_BYTES_PER_FRAME);
for (int i_frame=0; i_frame < n_frames; i_frame++) {
auto pulse_id = udp_receiver.get_frame_from_udp(
metadata, frame_buffer.get());
udp_receiver.get_frame_from_udp(meta, frame_buffer.get());
ASSERT_EQ(i_frame + 1, pulse_id);
ASSERT_EQ(metadata.frame_index, i_frame + 1000);
ASSERT_EQ(metadata.daq_rec, i_frame + 10000);
ASSERT_EQ(i_frame + 1, meta.pulse_id);
ASSERT_EQ(meta.frame_index, i_frame + 1000);
ASSERT_EQ(meta.daq_rec, i_frame + 10000);
// -1 because we skipped a packet.
ASSERT_EQ(metadata.n_recv_packets, n_packets - 1);
ASSERT_EQ(metadata.module_id, source_id);
ASSERT_EQ(meta.n_recv_packets, n_packets - 1);
ASSERT_EQ(meta.module_id, module_id);
}
::close(send_socket_fd);
@@ -181,8 +184,8 @@ TEST(BufferUdpReceiver, missing_first_packet)
TEST(BufferUdpReceiver, missing_last_packet)
{
auto n_packets = N_PACKETS_PER_FRAME;
int source_id = 1234;
auto n_packets = 128;
int module_id = 1234;
int n_frames = 3;
uint16_t udp_port = MOCK_UDP_PORT;
@@ -190,7 +193,7 @@ TEST(BufferUdpReceiver, missing_last_packet)
auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0);
ASSERT_TRUE(send_socket_fd >= 0);
FrameUdpReceiver udp_receiver(udp_port, source_id);
FrameUdpReceiver udp_receiver(udp_port, n_packets);
auto handle = async(launch::async, [&](){
for (int i_frame=0; i_frame < n_frames; i_frame++){
@@ -219,20 +222,20 @@ TEST(BufferUdpReceiver, missing_last_packet)
handle.wait();
ModuleFrame metadata;
ModuleFrame meta;
meta.module_id = module_id;
auto frame_buffer = make_unique<char[]>(DATA_BYTES_PER_FRAME);
// n_frames -1 because the last frame is not complete.
for (int i_frame=0; i_frame < n_frames - 1; i_frame++) {
auto pulse_id = udp_receiver.get_frame_from_udp(
metadata, frame_buffer.get());
udp_receiver.get_frame_from_udp(meta, frame_buffer.get());
ASSERT_EQ(i_frame + 1, pulse_id);
ASSERT_EQ(metadata.frame_index, i_frame + 1000);
ASSERT_EQ(metadata.daq_rec, i_frame + 10000);
ASSERT_EQ(i_frame + 1, meta.pulse_id);
ASSERT_EQ(meta.frame_index, i_frame + 1000);
ASSERT_EQ(meta.daq_rec, i_frame + 10000);
// -1 because we skipped a packet.
ASSERT_EQ(metadata.n_recv_packets, n_packets - 1);
ASSERT_EQ(metadata.module_id, source_id);
ASSERT_EQ(meta.n_recv_packets, n_packets - 1);
ASSERT_EQ(meta.module_id, module_id);
}
::close(send_socket_fd);
+8 -1
View File
@@ -1,5 +1,4 @@
#include <netinet/in.h>
#include <jungfrau.hpp>
#include "gtest/gtest.h"
#include "mock/udp.hpp"
#include "PacketUdpReceiver.hpp"
@@ -7,8 +6,16 @@
#include <thread>
#include <chrono>
#ifdef USE_EIGER
#include "eiger.hpp"
#else
#include "jungfrau.hpp"
#endif
using namespace std;
const int N_PACKETS_PER_FRAME = 128;
TEST(PacketUdpReceiver, simple_recv)
{
uint16_t udp_port = MOCK_UDP_PORT;
-3
View File
@@ -1,8 +1,5 @@
namespace sync_config
{
// If the modules are offset more than 1000 pulses, crush.
const uint64_t PULSE_OFFSET_LIMIT = 100;
// Number of times we try to re-sync in case of failure.
const int SYNC_RETRY_LIMIT = 3;
+26 -65
View File
@@ -39,88 +39,49 @@ ZmqPulseSyncReceiver::~ZmqPulseSyncReceiver()
PulseAndSync ZmqPulseSyncReceiver::get_next_pulse_id() const
{
uint64_t pulses[n_modules_];
bool modules_in_sync = true;
for (int i = 0; i < n_modules_; i++) {
zmq_recv(sockets_[i], &pulses[i], sizeof(uint64_t), 0);
if (pulses[0] != pulses[i]) {
modules_in_sync = false;
}
}
if (modules_in_sync) {
#ifdef DEBUG_OUTPUT
using namespace date;
cout << " [" << std::chrono::system_clock::now();
cout << "] [ZmqPulseSyncReceiver::get_next_pulse_id] ";
cout << "] (modules_in_sync) Frame index:" << pulses[0];
cout << endl;
#endif
return {pulses[0], 0};
}
// How many pulses we lost in total to get the next pulse_id.
uint32_t n_lost_pulses = 0;
for (int i_sync=0; i_sync < SYNC_RETRY_LIMIT; i_sync++) {
uint64_t min_pulse_id = numeric_limits<uint64_t>::max();;
uint64_t max_pulse_id = 0;
uint64_t ids[n_modules_];
for (uint32_t i_sync=0; i_sync < SYNC_RETRY_LIMIT; i_sync++) {
bool modules_in_sync = true;
for (int i = 0; i < n_modules_; i++) {
min_pulse_id = min(min_pulse_id, pulses[i]);
max_pulse_id = max(max_pulse_id, pulses[i]);
}
auto max_diff = max_pulse_id - min_pulse_id;
if (max_diff > PULSE_OFFSET_LIMIT) {
stringstream err_msg;
err_msg << "[ZmqPulseSyncReceiver::get_next_pulse_id]";
err_msg << " PULSE_OFFSET_LIMIT exceeded.";
err_msg << " max_diff=" << max_diff << " pulses.";
zmq_recv(sockets_[i], &ids[i], sizeof(uint64_t), 0);
for (int i = 0; i < n_modules_; i++) {
err_msg << " (module " << i << ", ";
err_msg << pulses[i] << "),";
}
err_msg << endl;
throw runtime_error(err_msg.str());
}
modules_in_sync = true;
// Max pulses we lost in this sync attempt.
uint32_t i_sync_lost_pulses = 0;
for (int i = 0; i < n_modules_; i++) {
// How many pulses we lost for this specific module.
uint32_t i_module_lost_pulses = 0;
while (pulses[i] < max_pulse_id) {
zmq_recv(sockets_[i], &pulses[i], sizeof(uint64_t), 0);
i_module_lost_pulses++;
}
i_sync_lost_pulses = max(i_sync_lost_pulses, i_module_lost_pulses);
if (pulses[i] != max_pulse_id) {
if (ids[0] != ids[i]) {
modules_in_sync = false;
}
}
n_lost_pulses += i_sync_lost_pulses;
if (modules_in_sync) {
#ifdef DEBUG_OUTPUT
using namespace date;
cout << " [" << std::chrono::system_clock::now();
cout << "] [ZmqPulseSyncReceiver::get_next_pulse_id]";
cout << " modules_in_sync false";
cout << "[" << std::chrono::system_clock::now() << "]";
cout << " [ZmqPulseSyncReceiver::get_next_pulse_id]";
cout << " Modules in sync (";
cout << " pulse_id " << ids[0] <<").";
cout << endl;
#endif
return {pulses[0], n_lost_pulses};
return {ids[0], i_sync};
}
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << " [ZmqPulseSyncReceiver::get_next_pulse_id]";
cout << " Modules out of sync:" << endl;
for (int i=0; i < n_modules_; i++) {
cout << " module" << i << ":" << ids[i];
}
cout << endl;
#endif
}
stringstream err_msg;
err_msg << "[ZmqLiveReceiver::get_next_pulse_id]";
err_msg << " SYNC_RETRY_LIMIT exceeded.";
err_msg << "[ZmqPulseSyncReceiver::get_next_pulse_id]";
err_msg << " SYNC_RETRY_LIMIT exceeded. State:";
for (int i=0; i < n_modules_; i++) {
err_msg << " module" << i << ":" << ids[i];
}
err_msg << endl;
throw runtime_error(err_msg.str());
+3 -2
View File
@@ -10,10 +10,12 @@
#include "sync_config.hpp"
#include "ZmqPulseSyncReceiver.hpp"
#include "UdpSyncConfig.hpp"
#include "buffer_config.hpp"
using namespace std;
using namespace sync_config;
using namespace buffer_config;
#ifdef USE_EIGER
#include "eiger.hpp"
@@ -44,8 +46,7 @@ int main (int argc, char *argv[])
auto sender = BufferUtils::bind_socket(ctx, config.detector_name, "sync");
RamBuffer frame_buffer(config.detector_name, sizeof(ModuleFrame),
FRAME_N_BYTES, config.n_modules,
buffer_config::RAM_BUFFER_N_SLOTS);
FRAME_N_BYTES, config.n_modules, RAM_BUFFER_N_SLOTS);
ZmqPulseSyncReceiver receiver(ctx, config.detector_name, config.n_modules);
SyncStats stats(config.detector_name, SYNC_STATS_MODULO);