// SPDX-FileCopyrightText: 2024 Filip Leonarski, Paul Scherrer Institute // SPDX-License-Identifier: GPL-3.0-only #include #include #include "../image_puller/ZMQImagePuller.h" #include "../image_pusher/ZMQStream2Pusher.h" void test_puller(ZMQImagePuller *puller, const DiffractionExperiment& x, const std::vector &image1, int64_t nwriter, int64_t writer_id, std::vector &diff_split, std::vector &diff_size, std::vector &diff_content, std::vector &nimages) { auto timeout = std::chrono::minutes(3); auto img = puller->PollImage(timeout); if (!img || !img->cbor || !img->cbor->start_message) { diff_content[writer_id]++; return; } if ((!img->cbor->start_message->write_master_file) || (img->cbor->start_message->write_master_file.value() != (writer_id == 0))) diff_content[writer_id]++; img = puller->PollImage(timeout); while (img && img->cbor && !img->cbor->end_message) { if (img->cbor->data_message) { if ((nwriter > 1) && ((img->cbor->data_message->number / 16) % nwriter != writer_id)) diff_split[writer_id]++; if (img->cbor->data_message->image.GetCompressedSize() != x.GetPixelsNum() * sizeof(uint16_t)) diff_size[writer_id]++; else if (memcmp(img->cbor->data_message->image.GetCompressed(), image1.data() + img->cbor->data_message->number * x.GetPixelsNum(), x.GetPixelsNum() * sizeof(uint16_t)) != 0) diff_content[writer_id]++; if (img->cbor->data_message->image.GetWidth() != RAW_MODULE_COLS) diff_content[writer_id]++; if (img->cbor->data_message->image.GetHeight() != RAW_MODULE_LINES) diff_content[writer_id]++; if (img->cbor->data_message->image.GetByteDepth() != 2) diff_content[writer_id]++; if (img->cbor->data_message->image.GetCompressionAlgorithm() != CompressionAlgorithm::NO_COMPRESSION) diff_content[writer_id]++; nimages[writer_id]++; } img = puller->PollImage(timeout); } } TEST_CASE("ZMQImageCommTest_1Writer","[ZeroMQ]") { const size_t nframes = 256; Logger logger(Catch::getResultCapture().getCurrentTestName()); DiffractionExperiment x(DetJF(1)); x.Raw(); x.PedestalG0Frames(0).NumTriggers(1).UseInternalPacketGenerator(false).IncidentEnergy_keV(12.4) .ImagesPerTrigger(nframes).Compression(CompressionAlgorithm::NO_COMPRESSION); std::vector empty_spot_vector; std::vector empty_rad_int_profile; REQUIRE(x.GetImageNum() == nframes); std::mt19937 g1(1387); std::uniform_int_distribution dist; std::vector image1(x.GetPixelsNum()*nframes); for (auto &i: image1) i = dist(g1); // Puller needs to be declared first, but both objects need to exist till communication finished // TODO: ImageSender should not allow if there are still completions to be done ZMQStream2Pusher pusher({"ipc://*"}); std::vector diff_size(1), diff_content(1), diff_split(1), nimages(1); auto pusher_addr = pusher.GetAddress(); ZMQImagePuller puller(pusher_addr[0]); std::thread sender_thread = std::thread([&] { std::vector serialization_buffer(16*1024*1024); CBORStream2Serializer serializer(serialization_buffer.data(), serialization_buffer.size()); StartMessage message { .images_per_file = 16, .write_master_file = true }; EndMessage end_message{}; pusher.StartDataCollection(message); for (int i = 0; i < nframes; i++) { DataMessage data_message; data_message.number = i; data_message.image = CompressedImage(image1.data() + i * x.GetPixelsNum(), x.GetPixelsNum() * sizeof(uint16_t), x.GetXPixelsNum(), x.GetYPixelsNum(), x.GetImageMode(), x.GetCompressionAlgorithm()); serializer.SerializeImage(data_message); pusher.SendImage(serialization_buffer.data(), serializer.GetBufferSize(), i); } pusher.EndDataCollection(end_message); }); std::thread puller_thread(test_puller, &puller, std::cref(x), std::cref(image1), 1, 0, std::ref(diff_split), std::ref(diff_size), std::ref(diff_content), std::ref(nimages)); sender_thread.join(); puller_thread.join(); puller.Disconnect(); REQUIRE(nimages[0] == nframes); REQUIRE(diff_size[0] == 0); REQUIRE(diff_content[0] == 0); } TEST_CASE("ZMQImageCommTest_2Writers","[ZeroMQ]") { const size_t nframes = 256; Logger logger(Catch::getResultCapture().getCurrentTestName()); DiffractionExperiment x(DetJF(1)); x.Raw(); x.PedestalG0Frames(0).NumTriggers(1).UseInternalPacketGenerator(false).IncidentEnergy_keV(12.4) .ImagesPerTrigger(nframes).Compression(CompressionAlgorithm::NO_COMPRESSION); REQUIRE(x.GetImageNum() == nframes); std::mt19937 g1(1387); std::uniform_int_distribution dist; std::vector image1(x.GetPixelsNum()*nframes); for (auto &i: image1) i = dist(g1); std::vector empty_spot_vector; std::vector empty_rad_int_profile; std::vector zmq_addr; int64_t npullers = 2; for (int i = 0; i < npullers; i++) zmq_addr.push_back("ipc://*"); ZMQStream2Pusher pusher(zmq_addr); // Puller needs to be declared first, but both objects need to exist till communication finished // TODO: ImageSender should not allow if there are still completions to be done std::vector > puller; auto pusher_addr = pusher.GetAddress(); REQUIRE(pusher_addr.size() == 2); for (int i = 0; i < npullers; i++) { puller.push_back(std::make_unique(pusher_addr[i])); } std::vector diff_size(npullers), diff_content(npullers), diff_split(npullers), nimages(npullers); std::thread sender_thread = std::thread([&] { std::vector serialization_buffer(16*1024*1024); CBORStream2Serializer serializer(serialization_buffer.data(), serialization_buffer.size()); StartMessage message { .images_per_file = 16, .write_master_file = true }; EndMessage end_message{}; pusher.StartDataCollection(message); for (int i = 0; i < nframes; i++) { DataMessage data_message; data_message.number = i; data_message.image = CompressedImage(image1.data() + i * x.GetPixelsNum(), x.GetPixelsNum() * sizeof(uint16_t), x.GetXPixelsNum(), x.GetYPixelsNum(), x.GetImageMode(), x.GetCompressionAlgorithm()); serializer.SerializeImage(data_message); pusher.SendImage(serialization_buffer.data(), serializer.GetBufferSize(), i); } pusher.EndDataCollection(end_message); }); std::vector puller_threads; for (int i = 0; i < npullers; i++) puller_threads.emplace_back(test_puller, puller[i].get(), std::cref(x), std::cref(image1), npullers, i, std::ref(diff_split), std::ref(diff_size), std::ref(diff_content), std::ref(nimages)); for (int i = 0; i < npullers; i++) puller_threads[i].join(); sender_thread.join(); REQUIRE_NOTHROW(puller[0]->Disconnect()); REQUIRE_NOTHROW(puller[1]->Disconnect()); REQUIRE(nimages[0] == nframes / 2); REQUIRE(nimages[1] == nframes / 2); REQUIRE(diff_size[0] == 0); REQUIRE(diff_content[0] == 0); REQUIRE(diff_size[1] == 0); REQUIRE(diff_content[1] == 0); REQUIRE(diff_split[0] == 0); REQUIRE(diff_split[1] == 0); } TEST_CASE("ZMQImageCommTest_4Writers","[ZeroMQ]") { const size_t nframes = 255; Logger logger(Catch::getResultCapture().getCurrentTestName()); DiffractionExperiment x(DetJF(1)); x.Raw(); x.PedestalG0Frames(0).NumTriggers(1).UseInternalPacketGenerator(false).IncidentEnergy_keV(12.4) .ImagesPerTrigger(nframes).Compression(CompressionAlgorithm::NO_COMPRESSION); REQUIRE(x.GetImageNum() == nframes); std::mt19937 g1(1387); std::uniform_int_distribution dist; std::vector image1(x.GetPixelsNum()*nframes); for (auto &i: image1) i = dist(g1); std::vector empty_spot_vector; std::vector empty_rad_int_profile; std::vector zmq_addr; int64_t npullers = 4; for (int i = 0; i < npullers; i++) zmq_addr.push_back("ipc://*"); ZMQStream2Pusher pusher(zmq_addr); auto pusher_addr = pusher.GetAddress(); REQUIRE(pusher_addr.size() == npullers); // Puller needs to be declared first, but both objects need to exist till communication finished // TODO: ImageSender should not allow if there are still completions to be done std::vector > puller; for (int i = 0; i < npullers; i++) { puller.push_back(std::make_unique(pusher_addr[i])); } std::vector diff_size(npullers), diff_content(npullers), diff_split(npullers), nimages(npullers); std::thread sender_thread = std::thread([&] { std::vector serialization_buffer(16*1024*1024); CBORStream2Serializer serializer(serialization_buffer.data(), serialization_buffer.size()); StartMessage message { .images_per_file = 16, .write_master_file = true }; EndMessage end_message{}; pusher.StartDataCollection(message); for (int i = 0; i < nframes; i++) { DataMessage data_message; data_message.number = i; data_message.image = CompressedImage(image1.data() + i * x.GetPixelsNum(), x.GetPixelsNum() * sizeof(uint16_t), x.GetXPixelsNum(), x.GetYPixelsNum(), x.GetImageMode(), x.GetCompressionAlgorithm()); serializer.SerializeImage(data_message); pusher.SendImage(serialization_buffer.data(), serializer.GetBufferSize(), i); } pusher.EndDataCollection(end_message); }); std::vector puller_threads; for (int i = 0; i < npullers; i++) puller_threads.emplace_back(test_puller, puller[i].get(), std::cref(x), std::cref(image1), npullers, i, std::ref(diff_split), std::ref(diff_size), std::ref(diff_content), std::ref(nimages)); for (int i = 0; i < npullers; i++) puller_threads[i].join(); sender_thread.join(); REQUIRE_NOTHROW(puller[0]->Disconnect()); REQUIRE_NOTHROW(puller[1]->Disconnect()); REQUIRE_NOTHROW(puller[2]->Disconnect()); REQUIRE_NOTHROW(puller[3]->Disconnect()); REQUIRE(nimages[0] == 64); REQUIRE(nimages[1] == 64); REQUIRE(nimages[2] == 64); REQUIRE(nimages[3] == 63); for (int i = 0; i < npullers; i++) { REQUIRE(diff_size[i] == 0); REQUIRE(diff_content[i] == 0); REQUIRE(diff_split[i] == 0); } } TEST_CASE("ZMQImageCommTest_NoWriter","[ZeroMQ]") { Logger logger(Catch::getResultCapture().getCurrentTestName()); ZMQStream2Pusher pusher({"ipc://*"}); StartMessage msg{}; REQUIRE_THROWS(pusher.StartDataCollection(msg)); std::vector test(512*1024, 11); CompressedImage image(test, 1024, 512); DataMessage data_message{ .number = 1, .image = image }; std::vector v(16*1024*1024); CBORStream2Serializer serializer(v.data(), v.size()); serializer.SerializeImage(data_message); REQUIRE(!pusher.SendImage(v.data(), serializer.GetBufferSize(), 1)); EndMessage end_message{}; REQUIRE(!pusher.EndDataCollection(end_message)); }