diff --git a/README.md b/README.md index 3d97181..6f650e1 100644 --- a/README.md +++ b/README.md @@ -102,6 +102,49 @@ ln -s "$(pwd)""/""sf_stream" /usr/bin/sf_stream ln -s "$(pwd)""/""sf_writer" /usr/bin/sf_writer ``` +### Integration testing +Apart from unit-testing an integration pipeline can be started on your local +machine or dedicated server. + +You need to have RabbitMQ running locally in order to use it: +```bash +docker run -d --name sf-msg-broker -p 15672:15672 -p 5672:5672 rabbitmq:3-management +``` + +Go into the **docker/** folder and run: +```bash +docker-compose -f integration-compose.yml up +``` + +**Note**: you need to have docker-compose installed on your system. You can do this +by running: +```bash +yum install docker-compose +``` + +#### Manual start +To manually start the integration pipeline you will have to start the following +containers: + +UDP generators: +```bash +docker run -d --rm --net=host --name=udp-sim paulscherrerinstitute/std-daq-buffer ./std_udp_sim example_detector.json 16 +``` + +4 UDP receivers: +```bash +for i in {0..3}; do docker run --rm -d --net=host --ipc=host --shm-size=8G -v /tmp:/tmp --name=udp-recv-${i} paulscherrerinstitute/std-daq-buffer ./std_udp_recv example_detector.json ${i} 16; done +``` + +1 UDP synchronizer: +```bash +docker run -d --rm -v /tmp:/tmp --name=udp-sync paulscherrerinstitute/std-daq-buffer ./std_udp_sync example_detector.json +``` + +1 Image assembler: +```bash +docker run -d --rm --ipc=host --shm-size=8G -v /tmp:/tmp --name=image-assembler paulscherrerinstitute/std-daq-buffer ./eiger_assembler example_detector.json 16 +``` ### Warnings #### UDP recv tests failing @@ -112,6 +155,7 @@ problems is the rmem limit. Please increase your rmem_max to something large: ```bash echo 2147483646 > /proc/sys/net/core/rmem_max ``` +You need to do this on your host when running the integration pipeline. #### Zeromq diff --git a/core-buffer/src/RamBuffer.cpp b/core-buffer/src/RamBuffer.cpp index 8b3f298..fea7407 100644 --- a/core-buffer/src/RamBuffer.cpp +++ b/core-buffer/src/RamBuffer.cpp @@ -40,7 +40,7 @@ RamBuffer::RamBuffer(const string& buffer_name, shm_fd_ = shm_open(buffer_name_.c_str(), O_RDWR | O_CREAT, 0777); if (shm_fd_ < 0) { - throw runtime_error(strerror(errno)); + throw runtime_error(string("shm_open failed: ") + strerror(errno)); } if ((ftruncate(shm_fd_, buffer_bytes_)) == -1) { diff --git a/docker/example_detector.json b/docker/example_detector.json index 02b0012..6886cd5 100644 --- a/docker/example_detector.json +++ b/docker/example_detector.json @@ -2,6 +2,6 @@ "detector_name": "cSAXS.EG01V01", "detector_type": "eiger", "n_modules": 4, - "image_n_pixels": 123456, + "image_n_pixels": 264196, "start_udp_port": 50000 } \ No newline at end of file diff --git a/docker/integration-compose.yml b/docker/integration-compose.yml new file mode 100644 index 0000000..45d9b91 --- /dev/null +++ b/docker/integration-compose.yml @@ -0,0 +1,66 @@ +version: "3.6" +services: + udp-sim: + image: "paulscherrerinstitute/std-daq-buffer:1.0.0" + network_mode: "host" + container_name: "udp-sim" + command: ./std_udp_sim example_detector.json 16 95 + + udp-recv-0: + image: "paulscherrerinstitute/std-daq-buffer:1.0.0" + network_mode: "host" + ipc: "host" + shm_size: 2G + volumes: + - /tmp:/tmp + container_name: "udp-recv-0" + command: ./std_udp_recv example_detector.json 0 16 + + udp-recv-1: + image: "paulscherrerinstitute/std-daq-buffer:1.0.0" + network_mode: "host" + ipc: "host" + shm_size: 2G + volumes: + - /tmp:/tmp + container_name: "udp-recv-1" + command: ./std_udp_recv example_detector.json 1 16 + + udp-recv-2: + image: "paulscherrerinstitute/std-daq-buffer:1.0.0" + network_mode: "host" + ipc: "host" + shm_size: 2G + volumes: + - /tmp:/tmp + container_name: "udp-recv-2" + command: ./std_udp_recv example_detector.json 2 16 + + udp-recv-3: + image: "paulscherrerinstitute/std-daq-buffer:1.0.0" + network_mode: "host" + ipc: "host" + shm_size: 2G + volumes: + - /tmp:/tmp + container_name: "udp-recv-3" + command: ./std_udp_recv example_detector.json 3 16 + + udp-sync: + image: "paulscherrerinstitute/std-daq-buffer:1.0.0" + restart: always + network_mode: "host" + volumes: + - /tmp:/tmp + container_name: "udp-sync" + command: ./std_udp_sync example_detector.json + + image-assembler: + image: "paulscherrerinstitute/std-daq-buffer:1.0.0" + network_mode: "host" + ipc: "host" + shm_size: 4G + volumes: + - /tmp:/tmp + container_name: "image-assembler" + command: ./eiger_assembler example_detector.json 16 diff --git a/jf-assembler/include/AssemblerStats.hpp b/jf-assembler/include/AssemblerStats.hpp index 170521a..6dc13fd 100644 --- a/jf-assembler/include/AssemblerStats.hpp +++ b/jf-assembler/include/AssemblerStats.hpp @@ -7,7 +7,7 @@ class AssemblerStats { const std::string detector_name_; - const size_t stats_modulo_; + const size_t stats_time_; int image_counter_; int n_corrupted_images_; @@ -19,7 +19,7 @@ class AssemblerStats { public: AssemblerStats(const std::string &detector_name, - const size_t stats_modulo); + const size_t stats_time); void record_stats(const ImageMetadata *meta, const uint32_t n_lost_pulses); }; diff --git a/jf-assembler/src/AssemblerStats.cpp b/jf-assembler/src/AssemblerStats.cpp index 15e821a..673fe4f 100644 --- a/jf-assembler/src/AssemblerStats.cpp +++ b/jf-assembler/src/AssemblerStats.cpp @@ -7,9 +7,9 @@ using namespace chrono; AssemblerStats::AssemblerStats( const std::string &detector_name, - const size_t stats_modulo) : + const size_t stats_time) : detector_name_(detector_name), - stats_modulo_(stats_modulo) + stats_time_(stats_time) { reset_counters(); } @@ -32,7 +32,10 @@ void AssemblerStats::record_stats( n_corrupted_images_++; } - if (image_counter_ == stats_modulo_) { + const auto time_passed = duration_cast( + steady_clock::now()-stats_interval_start_).count(); + + if (time_passed >= stats_time_*1000) { print_stats(); reset_counters(); } diff --git a/jf-assembler/src/main.cpp b/jf-assembler/src/main.cpp index 2d646f5..6efe04b 100644 --- a/jf-assembler/src/main.cpp +++ b/jf-assembler/src/main.cpp @@ -86,7 +86,7 @@ int main (int argc, char *argv[]) sizeof(ImageMetadata), IMAGE_N_BYTES, 1, buffer_config::RAM_BUFFER_N_SLOTS); - AssemblerStats stats(config.detector_name, ASSEMBLER_STATS_MODULO); + AssemblerStats stats(config.detector_name, STATS_TIME); uint64_t image_id = 0; diff --git a/std-udp-recv/src/FrameStats.cpp b/std-udp-recv/src/FrameStats.cpp index 22948d1..5e09867 100644 --- a/std-udp-recv/src/FrameStats.cpp +++ b/std-udp-recv/src/FrameStats.cpp @@ -66,7 +66,7 @@ void FrameStats::print_stats() system_clock::now()).time_since_epoch().count(); // Output in InfluxDB line protocol - cout << "std_udp_recv,"; + cout << "std_udp_recv"; cout << ",detector_name=" << detector_name_; cout << ",module_id=" << module_id_; cout << " "; diff --git a/std-udp-recv/test/simulator.cpp b/std-udp-recv/test/simulator.cpp index 078047b..5b3de3c 100644 --- a/std-udp-recv/test/simulator.cpp +++ b/std-udp-recv/test/simulator.cpp @@ -17,18 +17,21 @@ using namespace std; int main(int argc, char **argv) { - if (argc != 3) { + if (argc != 4) { cout << endl; - cout << "Usage: std_udp_sim [detector_json_filename] [bit_depth]"; + cout << "Usage: std_udp_sim [detector_json_filename] [bit_depth] " + "[ms_delay]"; cout << endl; cout << "\tdetector_json_filename: detector config file path." << endl; cout << "\tbit_depth: bit depth of the incoming udp packets." << endl; + cout << "\tms_delay: delay in milliseconds between images." << endl; cout << endl; exit(-1); } const auto config = UdpRecvConfig::from_json_file(string(argv[1])); const int bit_depth = atoi(argv[2]); + const int ms_delay = atoi(argv[3]); if (DETECTOR_TYPE != config.detector_type) { throw runtime_error("UDP recv version for " + DETECTOR_TYPE + @@ -63,6 +66,9 @@ int main(int argc, char **argv) { #ifdef USE_EIGER send_udp_buffer.framenum = image_id; send_udp_buffer.bunchid = image_id + 100; + + send_udp_buffer.row = i_module / 2; + send_udp_buffer.column = i_module %2; #else send_udp_buffer.framenum = image_id+100; send_udp_buffer.bunchid = image_id; @@ -80,7 +86,7 @@ int main(int argc, char **argv) { cout << "Sent image_id " << image_id << endl; // 10Hz == 100ms between images - usleep(100 * 1000); + usleep(ms_delay * 1000); image_id = ++image_id % MAX_IMAGE_ID; } diff --git a/std-udp-sync/include/SyncStats.hpp b/std-udp-sync/include/SyncStats.hpp index 18b9d1d..f315bea 100644 --- a/std-udp-sync/include/SyncStats.hpp +++ b/std-udp-sync/include/SyncStats.hpp @@ -7,7 +7,7 @@ class SyncStats { const std::string detector_name_; - const size_t stats_modulo_; + const size_t stats_time_; int image_counter_; int n_sync_lost_images_; @@ -18,7 +18,7 @@ class SyncStats { public: SyncStats(const std::string &detector_name, - const size_t stats_modulo); + const size_t stats_time); void record_stats(const uint32_t n_lost_pulses); }; diff --git a/std-udp-sync/include/sync_config.hpp b/std-udp-sync/include/sync_config.hpp index 253b4e2..a66e08e 100644 --- a/std-udp-sync/include/sync_config.hpp +++ b/std-udp-sync/include/sync_config.hpp @@ -2,7 +2,4 @@ namespace sync_config { // Number of times we try to re-sync in case of failure. const int SYNC_RETRY_LIMIT = 3; - - // Number of pulses between each statistics print out. - const size_t SYNC_STATS_MODULO = 1000; } diff --git a/std-udp-sync/src/SyncStats.cpp b/std-udp-sync/src/SyncStats.cpp index e9bb76d..8b86af3 100644 --- a/std-udp-sync/src/SyncStats.cpp +++ b/std-udp-sync/src/SyncStats.cpp @@ -7,9 +7,9 @@ using namespace chrono; SyncStats::SyncStats( const std::string &detector_name, - const size_t stats_modulo) : + const size_t stats_time) : detector_name_(detector_name), - stats_modulo_(stats_modulo) + stats_time_(stats_time) { reset_counters(); } @@ -26,7 +26,10 @@ void SyncStats::record_stats(const uint32_t n_lost_pulses) image_counter_++; n_sync_lost_images_ += n_lost_pulses; - if (image_counter_ == stats_modulo_) { + const auto time_passed = duration_cast( + steady_clock::now()-stats_interval_start_).count(); + + if (time_passed >= stats_time_*1000) { print_stats(); reset_counters(); } diff --git a/std-udp-sync/src/main.cpp b/std-udp-sync/src/main.cpp index b2d5150..0a7396d 100644 --- a/std-udp-sync/src/main.cpp +++ b/std-udp-sync/src/main.cpp @@ -1,7 +1,6 @@ #include #include #include -#include #include #include @@ -17,39 +16,26 @@ using namespace std; using namespace sync_config; using namespace buffer_config; -#ifdef USE_EIGER - #include "eiger.hpp" -#else - #include "jungfrau.hpp" -#endif - int main (int argc, char *argv[]) { - if (argc != 3) { + if (argc != 2) { cout << endl; - cout << "Usage: std_udp_sync [detector_json_filename] [bit_depth]" << endl; + cout << "Usage: std_udp_sync [detector_json_filename]" << endl; cout << "\tdetector_json_filename: detector config file path." << endl; - cout << "\tbit_depth: bit depth of the incoming udp packets." << endl; cout << endl; exit(-1); } const auto config = UdpSyncConfig::from_json_file(string(argv[1])); - const int bit_depth = atoi(argv[2]); - - const size_t FRAME_N_BYTES = MODULE_N_PIXELS * bit_depth / 8; auto ctx = zmq_ctx_new(); zmq_ctx_set(ctx, ZMQ_IO_THREADS, 1); auto sender = BufferUtils::bind_socket(ctx, config.detector_name, "sync"); - RamBuffer frame_buffer(config.detector_name, sizeof(ModuleFrame), - FRAME_N_BYTES, config.n_modules, RAM_BUFFER_N_SLOTS); - ZmqPulseSyncReceiver receiver(ctx, config.detector_name, config.n_modules); - SyncStats stats(config.detector_name, SYNC_STATS_MODULO); + SyncStats stats(config.detector_name, STATS_TIME); while (true) { auto meta = receiver.get_next_pulse_id();