This commit is contained in:
lhdamiani
2021-07-20 17:22:49 +02:00
13 changed files with 142 additions and 37 deletions
+44
View File
@@ -102,6 +102,49 @@ ln -s "$(pwd)""/""sf_stream" /usr/bin/sf_stream
ln -s "$(pwd)""/""sf_writer" /usr/bin/sf_writer
```
### Integration testing
Apart from unit-testing an integration pipeline can be started on your local
machine or dedicated server.
You need to have RabbitMQ running locally in order to use it:
```bash
docker run -d --name sf-msg-broker -p 15672:15672 -p 5672:5672 rabbitmq:3-management
```
Go into the **docker/** folder and run:
```bash
docker-compose -f integration-compose.yml up
```
**Note**: you need to have docker-compose installed on your system. You can do this
by running:
```bash
yum install docker-compose
```
#### Manual start
To manually start the integration pipeline you will have to start the following
containers:
UDP generators:
```bash
docker run -d --rm --net=host --name=udp-sim paulscherrerinstitute/std-daq-buffer ./std_udp_sim example_detector.json 16
```
4 UDP receivers:
```bash
for i in {0..3}; do docker run --rm -d --net=host --ipc=host --shm-size=8G -v /tmp:/tmp --name=udp-recv-${i} paulscherrerinstitute/std-daq-buffer ./std_udp_recv example_detector.json ${i} 16; done
```
1 UDP synchronizer:
```bash
docker run -d --rm -v /tmp:/tmp --name=udp-sync paulscherrerinstitute/std-daq-buffer ./std_udp_sync example_detector.json
```
1 Image assembler:
```bash
docker run -d --rm --ipc=host --shm-size=8G -v /tmp:/tmp --name=image-assembler paulscherrerinstitute/std-daq-buffer ./eiger_assembler example_detector.json 16
```
### Warnings
#### UDP recv tests failing
@@ -112,6 +155,7 @@ problems is the rmem limit. Please increase your rmem_max to something large:
```bash
echo 2147483646 > /proc/sys/net/core/rmem_max
```
You need to do this on your host when running the integration pipeline.
#### Zeromq
+1 -1
View File
@@ -40,7 +40,7 @@ RamBuffer::RamBuffer(const string& buffer_name,
shm_fd_ = shm_open(buffer_name_.c_str(), O_RDWR | O_CREAT, 0777);
if (shm_fd_ < 0) {
throw runtime_error(strerror(errno));
throw runtime_error(string("shm_open failed: ") + strerror(errno));
}
if ((ftruncate(shm_fd_, buffer_bytes_)) == -1) {
+1 -1
View File
@@ -2,6 +2,6 @@
"detector_name": "cSAXS.EG01V01",
"detector_type": "eiger",
"n_modules": 4,
"image_n_pixels": 123456,
"image_n_pixels": 264196,
"start_udp_port": 50000
}
+66
View File
@@ -0,0 +1,66 @@
version: "3.6"
services:
udp-sim:
image: "paulscherrerinstitute/std-daq-buffer:1.0.0"
network_mode: "host"
container_name: "udp-sim"
command: ./std_udp_sim example_detector.json 16 95
udp-recv-0:
image: "paulscherrerinstitute/std-daq-buffer:1.0.0"
network_mode: "host"
ipc: "host"
shm_size: 2G
volumes:
- /tmp:/tmp
container_name: "udp-recv-0"
command: ./std_udp_recv example_detector.json 0 16
udp-recv-1:
image: "paulscherrerinstitute/std-daq-buffer:1.0.0"
network_mode: "host"
ipc: "host"
shm_size: 2G
volumes:
- /tmp:/tmp
container_name: "udp-recv-1"
command: ./std_udp_recv example_detector.json 1 16
udp-recv-2:
image: "paulscherrerinstitute/std-daq-buffer:1.0.0"
network_mode: "host"
ipc: "host"
shm_size: 2G
volumes:
- /tmp:/tmp
container_name: "udp-recv-2"
command: ./std_udp_recv example_detector.json 2 16
udp-recv-3:
image: "paulscherrerinstitute/std-daq-buffer:1.0.0"
network_mode: "host"
ipc: "host"
shm_size: 2G
volumes:
- /tmp:/tmp
container_name: "udp-recv-3"
command: ./std_udp_recv example_detector.json 3 16
udp-sync:
image: "paulscherrerinstitute/std-daq-buffer:1.0.0"
restart: always
network_mode: "host"
volumes:
- /tmp:/tmp
container_name: "udp-sync"
command: ./std_udp_sync example_detector.json
image-assembler:
image: "paulscherrerinstitute/std-daq-buffer:1.0.0"
network_mode: "host"
ipc: "host"
shm_size: 4G
volumes:
- /tmp:/tmp
container_name: "image-assembler"
command: ./eiger_assembler example_detector.json 16
+2 -2
View File
@@ -7,7 +7,7 @@
class AssemblerStats {
const std::string detector_name_;
const size_t stats_modulo_;
const size_t stats_time_;
int image_counter_;
int n_corrupted_images_;
@@ -19,7 +19,7 @@ class AssemblerStats {
public:
AssemblerStats(const std::string &detector_name,
const size_t stats_modulo);
const size_t stats_time);
void record_stats(const ImageMetadata *meta, const uint32_t n_lost_pulses);
};
+6 -3
View File
@@ -7,9 +7,9 @@ using namespace chrono;
AssemblerStats::AssemblerStats(
const std::string &detector_name,
const size_t stats_modulo) :
const size_t stats_time) :
detector_name_(detector_name),
stats_modulo_(stats_modulo)
stats_time_(stats_time)
{
reset_counters();
}
@@ -32,7 +32,10 @@ void AssemblerStats::record_stats(
n_corrupted_images_++;
}
if (image_counter_ == stats_modulo_) {
const auto time_passed = duration_cast<milliseconds>(
steady_clock::now()-stats_interval_start_).count();
if (time_passed >= stats_time_*1000) {
print_stats();
reset_counters();
}
+1 -1
View File
@@ -86,7 +86,7 @@ int main (int argc, char *argv[])
sizeof(ImageMetadata), IMAGE_N_BYTES, 1,
buffer_config::RAM_BUFFER_N_SLOTS);
AssemblerStats stats(config.detector_name, ASSEMBLER_STATS_MODULO);
AssemblerStats stats(config.detector_name, STATS_TIME);
uint64_t image_id = 0;
+1 -1
View File
@@ -66,7 +66,7 @@ void FrameStats::print_stats()
system_clock::now()).time_since_epoch().count();
// Output in InfluxDB line protocol
cout << "std_udp_recv,";
cout << "std_udp_recv";
cout << ",detector_name=" << detector_name_;
cout << ",module_id=" << module_id_;
cout << " ";
+9 -3
View File
@@ -17,18 +17,21 @@ using namespace std;
int main(int argc, char **argv) {
if (argc != 3) {
if (argc != 4) {
cout << endl;
cout << "Usage: std_udp_sim [detector_json_filename] [bit_depth]";
cout << "Usage: std_udp_sim [detector_json_filename] [bit_depth] "
"[ms_delay]";
cout << endl;
cout << "\tdetector_json_filename: detector config file path." << endl;
cout << "\tbit_depth: bit depth of the incoming udp packets." << endl;
cout << "\tms_delay: delay in milliseconds between images." << endl;
cout << endl;
exit(-1);
}
const auto config = UdpRecvConfig::from_json_file(string(argv[1]));
const int bit_depth = atoi(argv[2]);
const int ms_delay = atoi(argv[3]);
if (DETECTOR_TYPE != config.detector_type) {
throw runtime_error("UDP recv version for " + DETECTOR_TYPE +
@@ -63,6 +66,9 @@ int main(int argc, char **argv) {
#ifdef USE_EIGER
send_udp_buffer.framenum = image_id;
send_udp_buffer.bunchid = image_id + 100;
send_udp_buffer.row = i_module / 2;
send_udp_buffer.column = i_module %2;
#else
send_udp_buffer.framenum = image_id+100;
send_udp_buffer.bunchid = image_id;
@@ -80,7 +86,7 @@ int main(int argc, char **argv) {
cout << "Sent image_id " << image_id << endl;
// 10Hz == 100ms between images
usleep(100 * 1000);
usleep(ms_delay * 1000);
image_id = ++image_id % MAX_IMAGE_ID;
}
+2 -2
View File
@@ -7,7 +7,7 @@
class SyncStats {
const std::string detector_name_;
const size_t stats_modulo_;
const size_t stats_time_;
int image_counter_;
int n_sync_lost_images_;
@@ -18,7 +18,7 @@ class SyncStats {
public:
SyncStats(const std::string &detector_name,
const size_t stats_modulo);
const size_t stats_time);
void record_stats(const uint32_t n_lost_pulses);
};
-3
View File
@@ -2,7 +2,4 @@ namespace sync_config
{
// Number of times we try to re-sync in case of failure.
const int SYNC_RETRY_LIMIT = 3;
// Number of pulses between each statistics print out.
const size_t SYNC_STATS_MODULO = 1000;
}
+6 -3
View File
@@ -7,9 +7,9 @@ using namespace chrono;
SyncStats::SyncStats(
const std::string &detector_name,
const size_t stats_modulo) :
const size_t stats_time) :
detector_name_(detector_name),
stats_modulo_(stats_modulo)
stats_time_(stats_time)
{
reset_counters();
}
@@ -26,7 +26,10 @@ void SyncStats::record_stats(const uint32_t n_lost_pulses)
image_counter_++;
n_sync_lost_images_ += n_lost_pulses;
if (image_counter_ == stats_modulo_) {
const auto time_passed = duration_cast<milliseconds>(
steady_clock::now()-stats_interval_start_).count();
if (time_passed >= stats_time_*1000) {
print_stats();
reset_counters();
}
+3 -17
View File
@@ -1,7 +1,6 @@
#include <iostream>
#include <string>
#include <zmq.h>
#include <RamBuffer.hpp>
#include <BufferUtils.hpp>
#include <SyncStats.hpp>
@@ -17,39 +16,26 @@ using namespace std;
using namespace sync_config;
using namespace buffer_config;
#ifdef USE_EIGER
#include "eiger.hpp"
#else
#include "jungfrau.hpp"
#endif
int main (int argc, char *argv[])
{
if (argc != 3) {
if (argc != 2) {
cout << endl;
cout << "Usage: std_udp_sync [detector_json_filename] [bit_depth]" << endl;
cout << "Usage: std_udp_sync [detector_json_filename]" << endl;
cout << "\tdetector_json_filename: detector config file path." << endl;
cout << "\tbit_depth: bit depth of the incoming udp packets." << endl;
cout << endl;
exit(-1);
}
const auto config = UdpSyncConfig::from_json_file(string(argv[1]));
const int bit_depth = atoi(argv[2]);
const size_t FRAME_N_BYTES = MODULE_N_PIXELS * bit_depth / 8;
auto ctx = zmq_ctx_new();
zmq_ctx_set(ctx, ZMQ_IO_THREADS, 1);
auto sender = BufferUtils::bind_socket(ctx, config.detector_name, "sync");
RamBuffer frame_buffer(config.detector_name, sizeof(ModuleFrame),
FRAME_N_BYTES, config.n_modules, RAM_BUFFER_N_SLOTS);
ZmqPulseSyncReceiver receiver(ctx, config.detector_name, config.n_modules);
SyncStats stats(config.detector_name, SYNC_STATS_MODULO);
SyncStats stats(config.detector_name, STATS_TIME);
while (true) {
auto meta = receiver.get_next_pulse_id();