330 lines
12 KiB
C++
330 lines
12 KiB
C++
// SPDX-FileCopyrightText: 2024 Filip Leonarski, Paul Scherrer Institute <filip.leonarski@psi.ch>
|
|
// SPDX-License-Identifier: GPL-3.0-only
|
|
|
|
#include <random>
|
|
#include <catch2/catch_all.hpp>
|
|
#include "../image_puller/ZMQImagePuller.h"
|
|
#include "../image_pusher/ZMQStream2Pusher.h"
|
|
|
|
void test_puller(ZMQImagePuller *puller,
|
|
const DiffractionExperiment& x,
|
|
const std::vector<uint16_t> &image1,
|
|
int64_t nwriter,
|
|
int64_t writer_id,
|
|
std::vector<size_t> &diff_split,
|
|
std::vector<size_t> &diff_size,
|
|
std::vector<size_t> &diff_content,
|
|
std::vector<size_t> &nimages) {
|
|
|
|
auto timeout = std::chrono::minutes(3);
|
|
auto img = puller->PollImage(timeout);
|
|
|
|
if (!img || !img->cbor || !img->cbor->start_message) {
|
|
diff_content[writer_id]++;
|
|
return;
|
|
}
|
|
|
|
if ((!img->cbor->start_message->write_master_file) || (img->cbor->start_message->write_master_file.value() != (writer_id == 0)))
|
|
diff_content[writer_id]++;
|
|
img = puller->PollImage(timeout);
|
|
while (img && img->cbor && !img->cbor->end_message) {
|
|
if (img->cbor->data_message) {
|
|
if ((nwriter > 1) && ((img->cbor->data_message->number / 16) % nwriter != writer_id))
|
|
diff_split[writer_id]++;
|
|
|
|
|
|
if (img->cbor->data_message->image.size != x.GetPixelsNum() * sizeof(uint16_t))
|
|
diff_size[writer_id]++;
|
|
else if (memcmp(img->cbor->data_message->image.data, image1.data() + img->cbor->data_message->number * x.GetPixelsNum(),
|
|
x.GetPixelsNum() * sizeof(uint16_t)) != 0)
|
|
diff_content[writer_id]++;
|
|
if (img->cbor->data_message->image.xpixel != RAW_MODULE_COLS)
|
|
diff_content[writer_id]++;
|
|
if (img->cbor->data_message->image.ypixel != RAW_MODULE_LINES)
|
|
diff_content[writer_id]++;
|
|
if (img->cbor->data_message->image.pixel_depth_bytes != 2)
|
|
diff_content[writer_id]++;
|
|
if (img->cbor->data_message->image.algorithm != CompressionAlgorithm::NO_COMPRESSION)
|
|
diff_content[writer_id]++;
|
|
nimages[writer_id]++;
|
|
}
|
|
img = puller->PollImage(timeout);
|
|
}
|
|
}
|
|
|
|
TEST_CASE("ZMQImageCommTest_1Writer","[ZeroMQ]") {
|
|
const size_t nframes = 256;
|
|
|
|
Logger logger(Catch::getResultCapture().getCurrentTestName());
|
|
|
|
DiffractionExperiment x(DetJF(1));
|
|
x.Mode(DetectorMode::Raw);
|
|
x.PedestalG0Frames(0).NumTriggers(1).UseInternalPacketGenerator(false).IncidentEnergy_keV(12.4)
|
|
.ImagesPerTrigger(nframes).Compression(CompressionAlgorithm::NO_COMPRESSION);
|
|
|
|
std::vector<DiffractionSpot> empty_spot_vector;
|
|
std::vector<float> empty_rad_int_profile;
|
|
|
|
REQUIRE(x.GetImageNum() == nframes);
|
|
|
|
std::mt19937 g1(1387);
|
|
std::uniform_int_distribution<uint16_t> dist;
|
|
|
|
std::vector<uint16_t> image1(x.GetPixelsNum()*nframes);
|
|
for (auto &i: image1) i = dist(g1);
|
|
|
|
// Puller needs to be declared first, but both objects need to exist till communication finished
|
|
// TODO: ImageSender should not allow if there are still completions to be done
|
|
ZMQStream2Pusher pusher({"ipc://*"});
|
|
|
|
std::vector<size_t> diff_size(1), diff_content(1), diff_split(1), nimages(1);
|
|
|
|
auto pusher_addr = pusher.GetAddress();
|
|
ZMQImagePuller puller(pusher_addr[0]);
|
|
|
|
std::thread sender_thread = std::thread([&] {
|
|
std::vector<uint8_t> serialization_buffer(16*1024*1024);
|
|
CBORStream2Serializer serializer(serialization_buffer.data(), serialization_buffer.size());
|
|
|
|
StartMessage message {
|
|
.images_per_file = 16,
|
|
.write_master_file = true
|
|
};
|
|
EndMessage end_message{};
|
|
|
|
pusher.StartDataCollection(message);
|
|
for (int i = 0; i < nframes; i++) {
|
|
DataMessage data_message;
|
|
data_message.number = i;
|
|
PrepareCBORImage(data_message, x, image1.data() + i * x.GetPixelsNum(), x.GetPixelsNum() * sizeof(uint16_t));
|
|
serializer.SerializeImage(data_message);
|
|
pusher.SendImage(serialization_buffer.data(), serializer.GetBufferSize(), i);
|
|
}
|
|
|
|
pusher.EndDataCollection(end_message);
|
|
});
|
|
|
|
std::thread puller_thread(test_puller, &puller, std::cref(x), std::cref(image1), 1, 0,
|
|
std::ref(diff_split), std::ref(diff_size), std::ref(diff_content),
|
|
std::ref(nimages));
|
|
|
|
sender_thread.join();
|
|
puller_thread.join();
|
|
|
|
puller.Disconnect();
|
|
|
|
REQUIRE(nimages[0] == nframes);
|
|
REQUIRE(diff_size[0] == 0);
|
|
REQUIRE(diff_content[0] == 0);
|
|
}
|
|
|
|
TEST_CASE("ZMQImageCommTest_2Writers","[ZeroMQ]") {
|
|
const size_t nframes = 256;
|
|
|
|
Logger logger(Catch::getResultCapture().getCurrentTestName());
|
|
DiffractionExperiment x(DetJF(1));
|
|
x.Mode(DetectorMode::Raw);
|
|
x.PedestalG0Frames(0).NumTriggers(1).UseInternalPacketGenerator(false).IncidentEnergy_keV(12.4)
|
|
.ImagesPerTrigger(nframes).Compression(CompressionAlgorithm::NO_COMPRESSION);
|
|
|
|
REQUIRE(x.GetImageNum() == nframes);
|
|
|
|
std::mt19937 g1(1387);
|
|
std::uniform_int_distribution<uint16_t> dist;
|
|
|
|
std::vector<uint16_t> image1(x.GetPixelsNum()*nframes);
|
|
for (auto &i: image1) i = dist(g1);
|
|
|
|
std::vector<DiffractionSpot> empty_spot_vector;
|
|
std::vector<float> empty_rad_int_profile;
|
|
|
|
std::vector<std::string> zmq_addr;
|
|
|
|
int64_t npullers = 2;
|
|
|
|
for (int i = 0; i < npullers; i++)
|
|
zmq_addr.push_back("ipc://*");
|
|
|
|
ZMQStream2Pusher pusher(zmq_addr);
|
|
|
|
// Puller needs to be declared first, but both objects need to exist till communication finished
|
|
// TODO: ImageSender should not allow if there are still completions to be done
|
|
std::vector<std::unique_ptr<ZMQImagePuller> > puller;
|
|
auto pusher_addr = pusher.GetAddress();
|
|
REQUIRE(pusher_addr.size() == 2);
|
|
for (int i = 0; i < npullers; i++) {
|
|
puller.push_back(std::make_unique<ZMQImagePuller>(pusher_addr[i]));
|
|
}
|
|
|
|
std::vector<size_t> diff_size(npullers), diff_content(npullers), diff_split(npullers), nimages(npullers);
|
|
|
|
std::thread sender_thread = std::thread([&] {
|
|
std::vector<uint8_t> serialization_buffer(16*1024*1024);
|
|
CBORStream2Serializer serializer(serialization_buffer.data(), serialization_buffer.size());
|
|
|
|
StartMessage message {
|
|
.images_per_file = 16,
|
|
.write_master_file = true
|
|
};
|
|
EndMessage end_message{};
|
|
|
|
pusher.StartDataCollection(message);
|
|
for (int i = 0; i < nframes; i++) {
|
|
DataMessage data_message;
|
|
data_message.number = i;
|
|
PrepareCBORImage(data_message, x, image1.data() + i * x.GetPixelsNum(), x.GetPixelsNum() * sizeof(uint16_t));
|
|
serializer.SerializeImage(data_message);
|
|
pusher.SendImage(serialization_buffer.data(), serializer.GetBufferSize(), i);
|
|
}
|
|
|
|
pusher.EndDataCollection(end_message);
|
|
});
|
|
|
|
std::vector<std::thread> puller_threads;
|
|
for (int i = 0; i < npullers; i++)
|
|
puller_threads.emplace_back(test_puller, puller[i].get(), std::cref(x),
|
|
std::cref(image1), npullers, i,
|
|
std::ref(diff_split), std::ref(diff_size), std::ref(diff_content), std::ref(nimages));
|
|
|
|
for (int i = 0; i < npullers; i++)
|
|
puller_threads[i].join();
|
|
|
|
sender_thread.join();
|
|
|
|
REQUIRE_NOTHROW(puller[0]->Disconnect());
|
|
REQUIRE_NOTHROW(puller[1]->Disconnect());
|
|
|
|
REQUIRE(nimages[0] == nframes / 2);
|
|
REQUIRE(nimages[1] == nframes / 2);
|
|
REQUIRE(diff_size[0] == 0);
|
|
REQUIRE(diff_content[0] == 0);
|
|
|
|
REQUIRE(diff_size[1] == 0);
|
|
REQUIRE(diff_content[1] == 0);
|
|
|
|
REQUIRE(diff_split[0] == 0);
|
|
REQUIRE(diff_split[1] == 0);
|
|
}
|
|
|
|
TEST_CASE("ZMQImageCommTest_4Writers","[ZeroMQ]") {
|
|
const size_t nframes = 255;
|
|
|
|
Logger logger(Catch::getResultCapture().getCurrentTestName());
|
|
DiffractionExperiment x(DetJF(1));
|
|
x.Mode(DetectorMode::Raw);
|
|
x.PedestalG0Frames(0).NumTriggers(1).UseInternalPacketGenerator(false).IncidentEnergy_keV(12.4)
|
|
.ImagesPerTrigger(nframes).Compression(CompressionAlgorithm::NO_COMPRESSION);
|
|
|
|
REQUIRE(x.GetImageNum() == nframes);
|
|
|
|
std::mt19937 g1(1387);
|
|
std::uniform_int_distribution<uint16_t> dist;
|
|
|
|
std::vector<uint16_t> image1(x.GetPixelsNum()*nframes);
|
|
for (auto &i: image1) i = dist(g1);
|
|
|
|
std::vector<DiffractionSpot> empty_spot_vector;
|
|
std::vector<float> empty_rad_int_profile;
|
|
|
|
std::vector<std::string> zmq_addr;
|
|
|
|
int64_t npullers = 4;
|
|
|
|
for (int i = 0; i < npullers; i++)
|
|
zmq_addr.push_back("ipc://*");
|
|
|
|
ZMQStream2Pusher pusher(zmq_addr);
|
|
auto pusher_addr = pusher.GetAddress();
|
|
REQUIRE(pusher_addr.size() == npullers);
|
|
// Puller needs to be declared first, but both objects need to exist till communication finished
|
|
// TODO: ImageSender should not allow if there are still completions to be done
|
|
std::vector<std::unique_ptr<ZMQImagePuller> > puller;
|
|
for (int i = 0; i < npullers; i++) {
|
|
puller.push_back(std::make_unique<ZMQImagePuller>(pusher_addr[i]));
|
|
}
|
|
|
|
std::vector<size_t> diff_size(npullers), diff_content(npullers), diff_split(npullers), nimages(npullers);
|
|
|
|
std::thread sender_thread = std::thread([&] {
|
|
std::vector<uint8_t> serialization_buffer(16*1024*1024);
|
|
CBORStream2Serializer serializer(serialization_buffer.data(), serialization_buffer.size());
|
|
|
|
StartMessage message {
|
|
.images_per_file = 16,
|
|
.write_master_file = true
|
|
};
|
|
|
|
EndMessage end_message{};
|
|
|
|
pusher.StartDataCollection(message);
|
|
for (int i = 0; i < nframes; i++) {
|
|
DataMessage data_message;
|
|
data_message.number = i;
|
|
PrepareCBORImage(data_message, x, image1.data() + i * x.GetPixelsNum(), x.GetPixelsNum() * sizeof(uint16_t));
|
|
serializer.SerializeImage(data_message);
|
|
pusher.SendImage(serialization_buffer.data(), serializer.GetBufferSize(), i);
|
|
}
|
|
|
|
pusher.EndDataCollection(end_message);
|
|
});
|
|
|
|
std::vector<std::thread> puller_threads;
|
|
for (int i = 0; i < npullers; i++)
|
|
puller_threads.emplace_back(test_puller, puller[i].get(), std::cref(x),
|
|
std::cref(image1), npullers, i,
|
|
std::ref(diff_split), std::ref(diff_size), std::ref(diff_content), std::ref(nimages));
|
|
|
|
for (int i = 0; i < npullers; i++)
|
|
puller_threads[i].join();
|
|
|
|
sender_thread.join();
|
|
|
|
REQUIRE_NOTHROW(puller[0]->Disconnect());
|
|
REQUIRE_NOTHROW(puller[1]->Disconnect());
|
|
REQUIRE_NOTHROW(puller[2]->Disconnect());
|
|
REQUIRE_NOTHROW(puller[3]->Disconnect());
|
|
|
|
REQUIRE(nimages[0] == 64);
|
|
REQUIRE(nimages[1] == 64);
|
|
REQUIRE(nimages[2] == 64);
|
|
REQUIRE(nimages[3] == 63);
|
|
|
|
for (int i = 0; i < npullers; i++) {
|
|
REQUIRE(diff_size[i] == 0);
|
|
REQUIRE(diff_content[i] == 0);
|
|
REQUIRE(diff_split[i] == 0);
|
|
}
|
|
}
|
|
|
|
TEST_CASE("ZMQImageCommTest_NoWriter","[ZeroMQ]") {
|
|
Logger logger(Catch::getResultCapture().getCurrentTestName());
|
|
ZMQStream2Pusher pusher({"ipc://*"});
|
|
StartMessage msg{};
|
|
REQUIRE_THROWS(pusher.StartDataCollection(msg));
|
|
|
|
std::vector<uint8_t> test(512*1024, 11);
|
|
|
|
CompressedImage image {
|
|
.data = test.data(),
|
|
.size = 1024 * 512,
|
|
.xpixel = 1024,
|
|
.ypixel = 512,
|
|
.pixel_depth_bytes = 1,
|
|
.pixel_is_signed = false,
|
|
.pixel_is_float = false,
|
|
.algorithm = CompressionAlgorithm::NO_COMPRESSION,
|
|
.channel = "default"
|
|
};
|
|
|
|
DataMessage data_message{};
|
|
data_message.number = 1;
|
|
data_message.image = image;
|
|
std::vector<uint8_t> v(16*1024*1024);
|
|
CBORStream2Serializer serializer(v.data(), v.size());
|
|
serializer.SerializeImage(data_message);
|
|
REQUIRE(!pusher.SendImage(v.data(), serializer.GetBufferSize(), 1));
|
|
|
|
EndMessage end_message{};
|
|
REQUIRE(!pusher.EndDataCollection(end_message));
|
|
}
|