TCPImagePusher: Further improvement
All checks were successful
Build Packages / build:rpm (rocky8_nocuda) (push) Successful in 11m9s
Build Packages / build:rpm (ubuntu2204_nocuda) (push) Successful in 14m35s
Build Packages / build:rpm (ubuntu2404_nocuda) (push) Successful in 15m33s
Build Packages / Generate python client (push) Successful in 40s
Build Packages / Build documentation (push) Successful in 57s
Build Packages / build:rpm (rocky9_nocuda) (push) Successful in 17m53s
Build Packages / Create release (push) Has been skipped
Build Packages / build:rpm (rocky9) (push) Successful in 18m28s
Build Packages / build:rpm (rocky8) (push) Successful in 19m26s
Build Packages / build:rpm (rocky8_sls9) (push) Successful in 19m43s
Build Packages / build:rpm (rocky9_sls9) (push) Successful in 20m31s
Build Packages / build:rpm (ubuntu2204) (push) Successful in 11m24s
Build Packages / build:rpm (ubuntu2404) (push) Successful in 9m28s
Build Packages / Unit tests (push) Successful in 57m13s
All checks were successful
Build Packages / build:rpm (rocky8_nocuda) (push) Successful in 11m9s
Build Packages / build:rpm (ubuntu2204_nocuda) (push) Successful in 14m35s
Build Packages / build:rpm (ubuntu2404_nocuda) (push) Successful in 15m33s
Build Packages / Generate python client (push) Successful in 40s
Build Packages / Build documentation (push) Successful in 57s
Build Packages / build:rpm (rocky9_nocuda) (push) Successful in 17m53s
Build Packages / Create release (push) Has been skipped
Build Packages / build:rpm (rocky9) (push) Successful in 18m28s
Build Packages / build:rpm (rocky8) (push) Successful in 19m26s
Build Packages / build:rpm (rocky8_sls9) (push) Successful in 19m43s
Build Packages / build:rpm (rocky9_sls9) (push) Successful in 20m31s
Build Packages / build:rpm (ubuntu2204) (push) Successful in 11m24s
Build Packages / build:rpm (ubuntu2404) (push) Successful in 9m28s
Build Packages / Unit tests (push) Successful in 57m13s
This commit is contained in:
@@ -31,6 +31,11 @@ TEST_CASE("TCPImageCommTest_2Writers_WithAck", "[TCP]") {
|
||||
|
||||
TCPStreamPusher pusher(addr,npullers);
|
||||
|
||||
// Wait for all pullers to connect before starting data collection
|
||||
for (int attempt = 0; attempt < 100 && pusher.GetConnectedWriters() < static_cast<size_t>(npullers); ++attempt)
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
||||
REQUIRE(pusher.GetConnectedWriters() == static_cast<size_t>(npullers));
|
||||
|
||||
std::vector<size_t> received(npullers, 0);
|
||||
std::vector<size_t> processed(npullers, 0);
|
||||
|
||||
@@ -152,6 +157,11 @@ TEST_CASE("TCPImageCommTest_DataFatalAck_PropagatesToPusher", "[TCP]") {
|
||||
|
||||
TCPStreamPusher pusher(addr,npullers);
|
||||
|
||||
// Wait for all pullers to connect before starting data collection
|
||||
for (int attempt = 0; attempt < 100 && pusher.GetConnectedWriters() < static_cast<size_t>(npullers); ++attempt)
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
||||
REQUIRE(pusher.GetConnectedWriters() == static_cast<size_t>(npullers));
|
||||
|
||||
std::atomic<bool> sent_fatal{false};
|
||||
|
||||
std::thread sender([&] {
|
||||
@@ -278,6 +288,11 @@ TEST_CASE("TCPImageCommTest_GetAckProgress_Correct", "[TCP]") {
|
||||
|
||||
TCPStreamPusher pusher(addr,npullers);
|
||||
|
||||
// Wait for all pullers to connect before starting data collection
|
||||
for (int attempt = 0; attempt < 100 && pusher.GetConnectedWriters() < static_cast<size_t>(npullers); ++attempt)
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
||||
REQUIRE(pusher.GetConnectedWriters() == static_cast<size_t>(npullers));
|
||||
|
||||
std::thread sender([&] {
|
||||
std::vector<uint8_t> serialization_buffer(16 * 1024 * 1024);
|
||||
CBORStream2Serializer serializer(serialization_buffer.data(), serialization_buffer.size());
|
||||
@@ -382,3 +397,129 @@ TEST_CASE("TCPImageCommTest_GetAckProgress_Correct", "[TCP]") {
|
||||
for (auto &p : puller)
|
||||
p->Disconnect();
|
||||
}
|
||||
|
||||
TEST_CASE("TCPImageCommTest_AutoPort_StarBind", "[TCP]") {
|
||||
const size_t nframes = 8;
|
||||
const int64_t images_per_file = 4;
|
||||
|
||||
DiffractionExperiment x(DetJF(1));
|
||||
x.Raw();
|
||||
x.PedestalG0Frames(0).NumTriggers(1).UseInternalPacketGenerator(false).IncidentEnergy_keV(12.4)
|
||||
.ImagesPerTrigger(nframes).Compression(CompressionAlgorithm::NO_COMPRESSION);
|
||||
|
||||
std::vector<uint16_t> image1(x.GetPixelsNum() * nframes, 7u);
|
||||
|
||||
TCPStreamPusher pusher("tcp://127.0.0.1:*", 1);
|
||||
TCPImagePuller puller(pusher.GetAddress(), 64 * 1024 * 1024);
|
||||
|
||||
std::thread receiver([&] {
|
||||
bool seen_end = false;
|
||||
uint64_t processed = 0;
|
||||
|
||||
while (!seen_end) {
|
||||
auto out = puller.PollImage(std::chrono::seconds(10));
|
||||
REQUIRE(out.has_value());
|
||||
REQUIRE(out->cbor != nullptr);
|
||||
REQUIRE(out->tcp_msg != nullptr);
|
||||
|
||||
const auto &h = out->tcp_msg->header;
|
||||
if (out->cbor->start_message) {
|
||||
PullerAckMessage ack{.ack_for = TCPFrameType::START, .ok = true, .run_number = h.run_number,
|
||||
.socket_number = h.socket_number, .error_code = TCPAckCode::None};
|
||||
REQUIRE(puller.SendAck(ack));
|
||||
} else if (out->cbor->data_message) {
|
||||
processed++;
|
||||
} else if (out->cbor->end_message) {
|
||||
PullerAckMessage ack{.ack_for = TCPFrameType::END, .ok = true, .run_number = h.run_number,
|
||||
.socket_number = h.socket_number, .processed_images = processed, .error_code = TCPAckCode::None};
|
||||
REQUIRE(puller.SendAck(ack));
|
||||
seen_end = true;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
std::vector<uint8_t> serialization_buffer(16 * 1024 * 1024);
|
||||
CBORStream2Serializer serializer(serialization_buffer.data(), serialization_buffer.size());
|
||||
|
||||
StartMessage start{.images_per_file = images_per_file, .write_master_file = true};
|
||||
EndMessage end{};
|
||||
pusher.StartDataCollection(start);
|
||||
|
||||
for (int64_t i = 0; i < static_cast<int64_t>(nframes); i++) {
|
||||
DataMessage data_message;
|
||||
data_message.number = i;
|
||||
data_message.image = CompressedImage(image1.data() + i * x.GetPixelsNum(),
|
||||
x.GetPixelsNum() * sizeof(uint16_t),
|
||||
x.GetXPixelsNum(), x.GetYPixelsNum(),
|
||||
x.GetImageMode(), x.GetCompressionAlgorithm());
|
||||
serializer.SerializeImage(data_message);
|
||||
REQUIRE(pusher.SendImage(serialization_buffer.data(), serializer.GetBufferSize(), i));
|
||||
}
|
||||
|
||||
REQUIRE(pusher.EndDataCollection(end));
|
||||
receiver.join();
|
||||
puller.Disconnect();
|
||||
}
|
||||
|
||||
TEST_CASE("TCPImageCommTest_DisconnectMidWrite_NoHang", "[TCP]") {
|
||||
const size_t nframes = 256;
|
||||
const int64_t images_per_file = 16;
|
||||
|
||||
DiffractionExperiment x(DetJF(1));
|
||||
x.Raw();
|
||||
x.PedestalG0Frames(0).NumTriggers(1).UseInternalPacketGenerator(false).IncidentEnergy_keV(12.4)
|
||||
.ImagesPerTrigger(nframes).Compression(CompressionAlgorithm::NO_COMPRESSION);
|
||||
|
||||
std::vector<uint16_t> image1(x.GetPixelsNum() * nframes, 11u);
|
||||
|
||||
TCPStreamPusher pusher("tcp://127.0.0.1:*", 1);
|
||||
TCPImagePuller puller(pusher.GetAddress(), 64 * 1024 * 1024);
|
||||
|
||||
std::thread receiver([&] {
|
||||
bool disconnected = false;
|
||||
while (!disconnected) {
|
||||
auto out = puller.PollImage(std::chrono::seconds(10));
|
||||
REQUIRE(out.has_value());
|
||||
REQUIRE(out->cbor != nullptr);
|
||||
REQUIRE(out->tcp_msg != nullptr);
|
||||
|
||||
const auto &h = out->tcp_msg->header;
|
||||
if (out->cbor->start_message) {
|
||||
PullerAckMessage ack{.ack_for = TCPFrameType::START, .ok = true, .run_number = h.run_number,
|
||||
.socket_number = h.socket_number, .error_code = TCPAckCode::None};
|
||||
REQUIRE(puller.SendAck(ack));
|
||||
} else if (out->cbor->data_message) {
|
||||
puller.Disconnect(); // simulate puller disappearing mid-stream
|
||||
disconnected = true;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
auto sender = std::async(std::launch::async, [&] {
|
||||
std::vector<uint8_t> serialization_buffer(16 * 1024 * 1024);
|
||||
CBORStream2Serializer serializer(serialization_buffer.data(), serialization_buffer.size());
|
||||
|
||||
StartMessage start{.images_per_file = images_per_file, .write_master_file = true};
|
||||
EndMessage end{};
|
||||
|
||||
pusher.StartDataCollection(start);
|
||||
|
||||
for (int64_t i = 0; i < static_cast<int64_t>(nframes); i++) {
|
||||
DataMessage data_message;
|
||||
data_message.number = i;
|
||||
data_message.image = CompressedImage(image1.data() + i * x.GetPixelsNum(),
|
||||
x.GetPixelsNum() * sizeof(uint16_t),
|
||||
x.GetXPixelsNum(), x.GetYPixelsNum(),
|
||||
x.GetImageMode(), x.GetCompressionAlgorithm());
|
||||
serializer.SerializeImage(data_message);
|
||||
(void)pusher.SendImage(serialization_buffer.data(), serializer.GetBufferSize(), i);
|
||||
}
|
||||
|
||||
return pusher.EndDataCollection(end);
|
||||
});
|
||||
|
||||
REQUIRE(sender.wait_for(std::chrono::seconds(20)) == std::future_status::ready);
|
||||
CHECK(sender.get() == false);
|
||||
|
||||
receiver.join();
|
||||
}
|
||||
Reference in New Issue
Block a user