From 94072d362698ba8c6b8f3e4bf64f198dc2c0811f Mon Sep 17 00:00:00 2001 From: Filip Leonarski Date: Wed, 4 Mar 2026 13:37:52 +0100 Subject: [PATCH] jfjoch_test: Add TCP/IP integration test --- tests/JFJochReceiverProcessingTest.cpp | 81 ++++++++++++++++++++++++++ writer/StreamWriter.cpp | 17 ++++-- 2 files changed, 92 insertions(+), 6 deletions(-) diff --git a/tests/JFJochReceiverProcessingTest.cpp b/tests/JFJochReceiverProcessingTest.cpp index 79b80591..03427553 100644 --- a/tests/JFJochReceiverProcessingTest.cpp +++ b/tests/JFJochReceiverProcessingTest.cpp @@ -14,6 +14,8 @@ #include "../writer/StreamWriter.h" #include "../image_pusher/NonePusher.h" #include "../image_pusher/HDF5FilePusher.h" +#include "../image_pusher/TCPStreamPusher.h" +#include "../image_puller/TCPImagePuller.h" TEST_CASE("JFJochIntegrationTest_ZMQ_lysozyme_spot_and_index", "[JFJochReceiver]") { Logger logger(Catch::getResultCapture().getCurrentTestName()); @@ -1467,3 +1469,82 @@ TEST_CASE("JFJochIntegrationTest_HDF5FilePusher_Raw", "[JFJochReceiver]") { CHECK(receiver_out.status.images_sent == 5); CHECK(!receiver_out.status.cancelled); } + +TEST_CASE("JFJochIntegrationTest_TCP_lysozyme_spot_and_index", "[JFJochReceiver]") { + Logger logger(Catch::getResultCapture().getCurrentTestName()); + + RegisterHDF5Filter(); + + const uint16_t nthreads = 4; + + DiffractionExperiment experiment(DetJF4M()); + experiment.ImagesPerTrigger(5).NumTriggers(1).UseInternalPacketGenerator(true).ImagesPerFile(2) + .FilePrefix("lyso_test_tcp").JungfrauConvPhotonCnt(false).SetFileWriterFormat(FileWriterFormat::NXmxVDS).OverwriteExistingFiles(true) + .DetectorDistance_mm(75).BeamY_pxl(1136).BeamX_pxl(1090).IncidentEnergy_keV(12.4) + .SetUnitCell(UnitCell{.a = 36.9, .b = 78.95, .c = 78.95, .alpha =90, .beta = 90, .gamma = 90}); + experiment.SampleTemperature_K(123.0).RingCurrent_mA(115); + + PixelMask pixel_mask(experiment); + + // Load example image + HDF5ReadOnlyFile data("../../tests/test_data/compression_benchmark.h5"); + HDF5DataSet dataset(data, "/entry/data/data"); + HDF5DataSpace file_space(dataset); + + REQUIRE(file_space.GetDimensions()[2] == experiment.GetXPixelsNum()); + REQUIRE(file_space.GetDimensions()[1] == experiment.GetYPixelsNum()); + std::vector image_conv (file_space.GetDimensions()[1] * file_space.GetDimensions()[2]); + + std::vector start = {4,0,0}; + std::vector file_size = {1, file_space.GetDimensions()[1], file_space.GetDimensions()[2]}; + dataset.ReadVector(image_conv, start, file_size); + + std::vector image_raw_geom(experiment.GetModulesNum() * RAW_MODULE_SIZE); + ConvertedToRawGeometry(experiment, image_raw_geom.data(), image_conv.data()); + logger.Info("Loaded image"); + + // Setup acquisition device + AcquisitionDeviceGroup aq_devices; + std::unique_ptr test = std::make_unique(0, 64); + for (int m = 0; m < experiment.GetModulesNum(); m++) + test->SetInternalGeneratorFrame((uint16_t *) image_raw_geom.data() + m * RAW_MODULE_SIZE, m); + + aq_devices.Add(std::move(test)); + + TCPStreamPusher pusher({"tcp://127.0.0.1:9121"}); + + TCPImagePuller puller("tcp://127.0.0.1:9121"); + StreamWriter writer(logger, puller); + auto writer_future = std::async(std::launch::async, &StreamWriter::Run, &writer); + + JFJochReceiverService service(aq_devices, logger, pusher); + service.NumThreads(nthreads); + service.Indexing(experiment.GetIndexingSettings()); + + // No progress value at the start of measurement + REQUIRE(!service.GetProgress().has_value()); + + SpotFindingSettings settings = DiffractionExperiment::DefaultDataProcessingSettings(); + settings.signal_to_noise_threshold = 2.5; + settings.photon_count_threshold = 5; + settings.min_pix_per_spot = 1; + settings.max_pix_per_spot = 200; + settings.high_resolution_limit = 2.0; + settings.low_resolution_limit = 50.0; + service.SetSpotFindingSettings(settings); + + service.Start(experiment, pixel_mask, nullptr); + auto receiver_out = service.Stop(); + + CHECK(receiver_out.efficiency == 1.0); + REQUIRE(receiver_out.status.indexing_rate); + CHECK(receiver_out.status.indexing_rate.value() == 1.0); + CHECK(receiver_out.status.images_sent == experiment.GetImageNum()); + CHECK(receiver_out.writer_err.empty()); + CHECK(!receiver_out.status.cancelled); + + // No progress value at the end of measurement + REQUIRE(!service.GetProgress().has_value()); + + REQUIRE_NOTHROW(writer_future.get()); +} diff --git a/writer/StreamWriter.cpp b/writer/StreamWriter.cpp index 372b6abd..ff7e7440 100644 --- a/writer/StreamWriter.cpp +++ b/writer/StreamWriter.cpp @@ -91,6 +91,7 @@ void StreamWriter::ProcessCalibrationImage() { } catch (const std::exception &e) { logger.Warning(e.what()); logger.Warning("Error during writing calibration data - skipping"); + NotifyTcpAck(TCPFrameType::CALIBRATION, false, false, TCPAckCode::DataWriteFailed, e.what()); } break; case StreamWriterState::Receiving: @@ -139,6 +140,7 @@ void StreamWriter::ProcessDataImage() { } break; case StreamWriterState::Error: + // Error state => Wait till end only case StreamWriterState::Finalized: break; } @@ -164,7 +166,16 @@ void StreamWriter::ProcessEndMessage() { err = e.what(); } } + bool error_state = (state == StreamWriterState::Error); + FinalizeDataCollection(); + + // Notifications happen only when handling END message + // No end message ==> no need to ACK + NotifyReceiverOnFinalizedWrite(writer_notification_zmq_addr); + NotifyTcpAck(TCPFrameType::END, !error_state, error_state, + error_state ? TCPAckCode::EndFailed : TCPAckCode::None, + error_state ? err : ""); } void StreamWriter::FinalizeDataCollection() { @@ -183,12 +194,6 @@ void StreamWriter::FinalizeDataCollection() { hdf5_data_file_statistics.clear(); } file_writer.reset(); - NotifyReceiverOnFinalizedWrite(writer_notification_zmq_addr); - NotifyTcpAck(TCPFrameType::END, - state != StreamWriterState::Error, - state == StreamWriterState::Error, - state == StreamWriterState::Error ? TCPAckCode::EndFailed : TCPAckCode::None, - state == StreamWriterState::Error ? err : ""); logger.Info("Data writing finished"); state = StreamWriterState::Finalized; }