Files
Jungfraujoch/tests/ZMQImagePusherTest.cpp

344 lines
12 KiB
C++

// Copyright (2019-2023) Paul Scherrer Institute
#include <catch2/catch.hpp>
#include "../writer/ZMQImagePuller.h"
#include "../frame_serialize/ZMQStream2PusherGroup.h"
void test_puller(ZMQImagePuller *puller,
const DiffractionExperiment& x,
const std::vector<uint16_t> &image1,
int64_t nwriter,
int64_t writer_id,
std::vector<size_t> &diff_split,
std::vector<size_t> &diff_size,
std::vector<size_t> &diff_content,
std::vector<size_t> &nimages) {
puller->WaitForImage();
if (puller->GetFrameType() != CBORStream2Deserializer::Type::START) {
diff_content[writer_id]++;
return;
}
puller->WaitForImage();
while (puller->GetFrameType() != CBORStream2Deserializer::Type::END) {
if (puller->GetFrameType() == CBORStream2Deserializer::Type::IMAGE) {
auto image = puller->GetDataMessage();
if ((nwriter > 1) && ((image.number / 16) % nwriter != writer_id))
diff_split[writer_id]++;
if (image.image.size != x.GetPixelsNum() * sizeof(uint16_t))
diff_size[writer_id]++;
else if (memcmp(image.image.data, image1.data() + image.number * x.GetPixelsNum(),
x.GetPixelsNum() * sizeof(uint16_t)) != 0)
diff_content[writer_id]++;
if (image.image.xpixel != RAW_MODULE_COLS)
diff_content[writer_id]++;
if (image.image.ypixel != RAW_MODULE_LINES)
diff_content[writer_id]++;
if (image.image.pixel_depth_bytes != 2)
diff_content[writer_id]++;
if (image.image.algorithm != CompressionAlgorithm::NO_COMPRESSION)
diff_content[writer_id]++;
nimages[writer_id]++;
}
puller->WaitForImage();
}
// frame is END
auto end = puller->GetEndMessage();
if ((!end.write_master_file) || (end.write_master_file.value() != (writer_id == 0)))
diff_content[writer_id]++;
}
TEST_CASE("ZMQImageCommTest_1Writer","[ZeroMQ]") {
const size_t nframes = 256;
ZMQContext context;
Logger logger("test");
DiffractionExperiment x(DetectorGeometry(1));
x.Mode(DetectorMode::Raw);
x.PedestalG0Frames(0).NumTriggers(1).UseInternalPacketGenerator(false).PhotonEnergy_keV(12.4)
.ImagesPerTrigger(nframes);
std::vector<DiffractionSpot> empty_spot_vector;
std::vector<float> empty_rad_int_profile;
REQUIRE(x.GetImageNum() == nframes);
std::mt19937 g1(1387);
std::uniform_int_distribution<uint16_t> dist;
std::vector<uint16_t> image1(x.GetPixelsNum()*nframes);
for (auto &i: image1) i = dist(g1);
std::string zmq_addr = "ipc://*";
// Puller needs to be declared first, but both objects need to exist till communication finished
// TODO: ImageSender should not allow if there are still completions to be done
ZMQImagePuller puller(context);
ZMQStream2PusherGroup pusher(context, {zmq_addr});
std::vector<size_t> diff_size(1), diff_content(1), diff_split(1), nimages(1);
auto pusher_addr = pusher.GetAddress();
puller.Connect(pusher_addr[0]);
std::thread sender_thread = std::thread([&] {
StartMessage message {
.images_per_file = 16
};
EndMessage end_message{
.write_master_file = true
};
pusher.StartDataCollection(message);
for (int i = 0; i < nframes; i++) {
DataMessage data_message;
data_message.number = i;
PrepareCBORImage(data_message, x, image1.data() + i * x.GetPixelsNum(), x.GetPixelsNum() * sizeof(uint16_t));
pusher.SendImage(data_message);
}
pusher.EndDataCollection(end_message);
});
std::thread puller_thread(test_puller, &puller, std::cref(x), std::cref(image1), 1, 0,
std::ref(diff_split), std::ref(diff_size), std::ref(diff_content),
std::ref(nimages));
sender_thread.join();
puller_thread.join();
puller.Disconnect();
REQUIRE(nimages[0] == nframes);
REQUIRE(diff_size[0] == 0);
REQUIRE(diff_content[0] == 0);
}
TEST_CASE("ZMQImageCommTest_2Writers","[ZeroMQ]") {
const size_t nframes = 256;
ZMQContext context;
Logger logger("test");
DiffractionExperiment x(DetectorGeometry(1));
x.Mode(DetectorMode::Raw);
x.PedestalG0Frames(0).NumTriggers(1).UseInternalPacketGenerator(false).PhotonEnergy_keV(12.4)
.ImagesPerTrigger(nframes);
REQUIRE(x.GetImageNum() == nframes);
std::mt19937 g1(1387);
std::uniform_int_distribution<uint16_t> dist;
std::vector<uint16_t> image1(x.GetPixelsNum()*nframes);
for (auto &i: image1) i = dist(g1);
std::vector<DiffractionSpot> empty_spot_vector;
std::vector<float> empty_rad_int_profile;
std::vector<std::string> zmq_addr;
int64_t npullers = 2;
for (int i = 0; i < npullers; i++)
zmq_addr.push_back("ipc://*");
ZMQStream2PusherGroup pusher(context, zmq_addr);
// Puller needs to be declared first, but both objects need to exist till communication finished
// TODO: ImageSender should not allow if there are still completions to be done
std::vector<std::unique_ptr<ZMQImagePuller> > puller;
auto pusher_addr = pusher.GetAddress();
REQUIRE(pusher_addr.size() == 2);
for (int i = 0; i < npullers; i++) {
puller.push_back(std::make_unique<ZMQImagePuller>(context));
puller[i]->Connect(pusher_addr[i]);
}
std::vector<size_t> diff_size(npullers), diff_content(npullers), diff_split(npullers), nimages(npullers);
std::thread sender_thread = std::thread([&] {
StartMessage message {
.images_per_file = 16
};
EndMessage end_message{
.write_master_file = true
};
pusher.StartDataCollection(message);
for (int i = 0; i < nframes; i++) {
DataMessage data_message;
data_message.number = i;
PrepareCBORImage(data_message, x, image1.data() + i * x.GetPixelsNum(), x.GetPixelsNum() * sizeof(uint16_t));
pusher.SendImage(data_message);
}
pusher.EndDataCollection(end_message);
});
std::vector<std::thread> puller_threads;
for (int i = 0; i < npullers; i++)
puller_threads.emplace_back(test_puller, puller[i].get(), std::cref(x),
std::cref(image1), npullers, i,
std::ref(diff_split), std::ref(diff_size), std::ref(diff_content), std::ref(nimages));
for (int i = 0; i < npullers; i++)
puller_threads[i].join();
sender_thread.join();
REQUIRE_NOTHROW(puller[0]->Disconnect());
REQUIRE_NOTHROW(puller[1]->Disconnect());
REQUIRE(nimages[0] == nframes / 2);
REQUIRE(nimages[1] == nframes / 2);
REQUIRE(diff_size[0] == 0);
REQUIRE(diff_content[0] == 0);
REQUIRE(diff_size[1] == 0);
REQUIRE(diff_content[1] == 0);
REQUIRE(diff_split[0] == 0);
REQUIRE(diff_split[1] == 0);
}
TEST_CASE("ZMQImageCommTest_4Writers","[ZeroMQ]") {
const size_t nframes = 255;
ZMQContext context;
Logger logger("test");
DiffractionExperiment x(DetectorGeometry(1));
x.Mode(DetectorMode::Raw);
x.PedestalG0Frames(0).NumTriggers(1).UseInternalPacketGenerator(false).PhotonEnergy_keV(12.4)
.ImagesPerTrigger(nframes);
REQUIRE(x.GetImageNum() == nframes);
std::mt19937 g1(1387);
std::uniform_int_distribution<uint16_t> dist;
std::vector<uint16_t> image1(x.GetPixelsNum()*nframes);
for (auto &i: image1) i = dist(g1);
std::vector<DiffractionSpot> empty_spot_vector;
std::vector<float> empty_rad_int_profile;
std::vector<std::string> zmq_addr;
int64_t npullers = 4;
for (int i = 0; i < npullers; i++)
zmq_addr.push_back("ipc://*");
ZMQStream2PusherGroup pusher(context, zmq_addr);
auto pusher_addr = pusher.GetAddress();
REQUIRE(pusher_addr.size() == npullers);
// Puller needs to be declared first, but both objects need to exist till communication finished
// TODO: ImageSender should not allow if there are still completions to be done
std::vector<std::unique_ptr<ZMQImagePuller> > puller;
for (int i = 0; i < npullers; i++) {
puller.push_back(std::make_unique<ZMQImagePuller>(context));
puller[i]->Connect(pusher_addr[i]);
}
std::vector<size_t> diff_size(npullers), diff_content(npullers), diff_split(npullers), nimages(npullers);
std::thread sender_thread = std::thread([&] {
StartMessage message {
.images_per_file = 16
};
EndMessage end_message{
.write_master_file = true
};
pusher.StartDataCollection(message);
for (int i = 0; i < nframes; i++) {
DataMessage data_message;
data_message.number = i;
PrepareCBORImage(data_message, x, image1.data() + i * x.GetPixelsNum(), x.GetPixelsNum() * sizeof(uint16_t));
pusher.SendImage(data_message);
}
pusher.EndDataCollection(end_message);
});
std::vector<std::thread> puller_threads;
for (int i = 0; i < npullers; i++)
puller_threads.emplace_back(test_puller, puller[i].get(), std::cref(x),
std::cref(image1), npullers, i,
std::ref(diff_split), std::ref(diff_size), std::ref(diff_content), std::ref(nimages));
for (int i = 0; i < npullers; i++)
puller_threads[i].join();
sender_thread.join();
REQUIRE_NOTHROW(puller[0]->Disconnect());
REQUIRE_NOTHROW(puller[1]->Disconnect());
REQUIRE_NOTHROW(puller[2]->Disconnect());
REQUIRE_NOTHROW(puller[3]->Disconnect());
REQUIRE(nimages[0] == 64);
REQUIRE(nimages[1] == 64);
REQUIRE(nimages[2] == 64);
REQUIRE(nimages[3] == 63);
for (int i = 0; i < npullers; i++) {
REQUIRE(diff_size[i] == 0);
REQUIRE(diff_content[i] == 0);
REQUIRE(diff_split[i] == 0);
}
}
TEST_CASE("ZMQImagePuller_abort","[ZeroMQ]") {
const size_t nframes = 256;
DiffractionExperiment x;
x.Mode(DetectorMode::Raw);
x.PedestalG0Frames(0).NumTriggers(1).UseInternalPacketGenerator(false).PhotonEnergy_keV(12.4)
.ImagesPerTrigger(nframes);
ZMQContext context;
ZMQImagePuller puller(context);
std::vector<size_t> diff_size(1), diff_content(1), diff_split(1), nimages(1);
std::vector<uint16_t> image1(x.GetPixelsNum());
std::thread puller_thread(test_puller, &puller, std::cref(x), std::cref(image1), 1, 0,
std::ref(diff_split), std::ref(diff_size), std::ref(diff_content),
std::ref(nimages));
puller.Abort();
puller_thread.join();
REQUIRE(nimages[0] == 0);
}
TEST_CASE("ZMQImageCommTest_NoWriter","[ZeroMQ]") {
ZMQStream2PusherGroup pusher({"ipc://*"});
StartMessage msg;
REQUIRE_THROWS(pusher.StartDataCollection(msg));
std::vector<uint8_t> test(512*1024, 11);
CompressedImage image {
.data = test.data(),
.size = 1024 * 512,
.xpixel = 1024,
.ypixel = 512,
.pixel_depth_bytes = 1,
.pixel_is_signed = false,
.pixel_is_float = false,
.algorithm = CompressionAlgorithm::NO_COMPRESSION,
.channel = "default"
};
DataMessage data_message;
data_message.number = 1;
data_message.image = image;
REQUIRE(!pusher.SendImage(data_message));
EndMessage end_message;
REQUIRE(!pusher.EndDataCollection(end_message));
}