diff --git a/VERSION b/VERSION index 9c218192..fedbd0b1 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.0.0-rc.1 +1.0.0-rc.2 diff --git a/broker/jfjoch_broker.cpp b/broker/jfjoch_broker.cpp index a286edc8..ebaca167 100644 --- a/broker/jfjoch_broker.cpp +++ b/broker/jfjoch_broker.cpp @@ -2,7 +2,7 @@ // Using OpenAPI licensed with Apache License 2.0 #include -#include +#include #include #include @@ -12,7 +12,7 @@ #include "JFJochBrokerHttp.h" #include "JFJochBrokerParser.h" -#include "../frame_serialize/ZMQStream2PusherGroup.h" +#include "../frame_serialize/ZMQStream2Pusher.h" #include "../frame_serialize/DumpCBORToFilePusher.h" static Pistache::Http::Endpoint *httpEndpoint; @@ -73,8 +73,6 @@ int main (int argc, char **argv) { std::unique_ptr receiver; std::unique_ptr image_pusher; - ZMQContext context; - DiffractionExperiment experiment; experiment.MaskChipEdges(true).MaskModuleEdges(true); @@ -88,9 +86,15 @@ int main (int argc, char **argv) { int32_t zmq_send_watermark = ParseInt32(input, "zmq_send_watermark", 100); int32_t zmq_send_buffer_size = ParseInt32(input, "zmq_send_buffer_size", -1); - image_pusher = std::make_unique(ParseStringArray(input, "zmq_image_addr"), + auto tmp = std::make_unique(ParseStringArray(input, "zmq_image_addr"), zmq_send_watermark, zmq_send_buffer_size); + + std::string preview_addr = ParseString(input, "zmq_preview_addr", ""); + if (!preview_addr.empty()) + tmp->PreviewSocket(preview_addr); + + image_pusher = std::move(tmp); } else if (pusher_type == "dump_cbor") { image_pusher = std::make_unique(); } else diff --git a/common/ZMQWrappers.cpp b/common/ZMQWrappers.cpp index 4b4ebf76..c3054e57 100644 --- a/common/ZMQWrappers.cpp +++ b/common/ZMQWrappers.cpp @@ -8,8 +8,7 @@ ZMQContext::ZMQContext() { // Default is to have 2 I/O threads per ZMQ context if (zmq_ctx_set(context, ZMQ_IO_THREADS, 2) != 0) - throw JFJochException(JFJochExceptionCategory::ZeroMQ, - "Cannot set number of I/O threads"); + throw JFJochException(JFJochExceptionCategory::ZeroMQ, "Cannot set number of I/O threads"); } ZMQContext &ZMQContext::NumThreads(int32_t threads) { @@ -27,7 +26,7 @@ void *ZMQContext::GetContext() const { return context; } -ZMQSocket::ZMQSocket(ZMQContext &context, ZMQSocketType in_socket_type) : socket_type(in_socket_type) { +ZMQSocket::ZMQSocket(ZMQSocketType in_socket_type) : socket_type(in_socket_type) { socket = zmq_socket(context.GetContext(), static_cast(socket_type)); if (socket == nullptr) diff --git a/common/ZMQWrappers.h b/common/ZMQWrappers.h index e9270734..032d9ada 100644 --- a/common/ZMQWrappers.h +++ b/common/ZMQWrappers.h @@ -42,14 +42,15 @@ public: class ZMQSocket { std::mutex m; + ZMQContext context; ZMQSocketType socket_type; void *socket; void SetSocketOption(int32_t option_name, int32_t value); public: ZMQSocket(ZMQSocket &socket) = delete; const ZMQSocket& operator=(ZMQSocket &socket) = delete; - ZMQSocket(ZMQContext &context, ZMQSocketType socket_type); - ~ZMQSocket(); + explicit ZMQSocket(ZMQSocketType socket_type); + ~ZMQSocket(); void Connect(const std::string& addr); void Disconnect(const std::string& addr); void Bind(const std::string& addr); diff --git a/frame_serialize/CMakeLists.txt b/frame_serialize/CMakeLists.txt index 5af4b5d3..0493a5ea 100644 --- a/frame_serialize/CMakeLists.txt +++ b/frame_serialize/CMakeLists.txt @@ -20,9 +20,7 @@ TARGET_LINK_LIBRARIES(CBORStream2FrameSerialize tinycbor) ADD_LIBRARY(ImagePusher STATIC ImagePusher.cpp ImagePusher.h TestImagePusher.cpp TestImagePusher.h - ZMQStream2PusherGroup.cpp ZMQStream2PusherGroup.h - ZMQStream2Pusher.cpp - ZMQStream2Pusher.h + ZMQStream2Pusher.cpp ZMQStream2Pusher.h DumpCBORToFilePusher.cpp DumpCBORToFilePusher.h) diff --git a/frame_serialize/ZMQStream2Pusher.cpp b/frame_serialize/ZMQStream2Pusher.cpp index 18d51153..e9464940 100644 --- a/frame_serialize/ZMQStream2Pusher.cpp +++ b/frame_serialize/ZMQStream2Pusher.cpp @@ -1,66 +1,117 @@ // Copyright (2019-2024) Paul Scherrer Institute #include "ZMQStream2Pusher.h" +#include "CBORStream2Serializer.h" -ZMQStream2Pusher::ZMQStream2Pusher(ZMQContext &context, const std::string &addr, int32_t send_buffer_high_watermark, - int32_t send_buffer_size) - : socket(context, ZMQSocketType::Push) { - Bind(addr, send_buffer_high_watermark, send_buffer_size); -} +ZMQStream2Pusher::ZMQStream2Pusher(const std::vector &addr, + int32_t send_buffer_high_watermark, int32_t send_buffer_size) + : serialization_buffer(256*1024*1024), + serializer(serialization_buffer.data(), serialization_buffer.size()), + preview_counter(std::chrono::seconds(1)) { + if (addr.empty()) + throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "No writer ZMQ address provided"); -ZMQStream2Pusher::ZMQStream2Pusher(const std::string &addr, int32_t send_buffer_high_watermark, - int32_t send_buffer_size) - : context(std::make_unique()), - socket(*context, ZMQSocketType::Push) { - Bind(addr, send_buffer_high_watermark, send_buffer_size); -} - -void ZMQStream2Pusher::Bind(const std::string &addr, int32_t send_buffer_high_watermark, int32_t send_buffer_size) { - - if (send_buffer_size > 0) - socket.SendBufferSize(send_buffer_size); - if (send_buffer_high_watermark > 0) - socket.SendWaterMark(send_buffer_high_watermark); - socket.SendTimeout(std::chrono::seconds(5)); // 5 seconds should be more than enough to flush buffers and to still give fast response - socket.Bind(addr); -} - -void ZMQStream2Pusher::StartDataCollection(StartMessage &message) { - size_t approx_size = 1024*1024; - for (const auto &x : message.pixel_mask) - approx_size += x.size; - - std::vector serialization_buffer(approx_size); - CBORStream2Serializer serializer(serialization_buffer.data(), serialization_buffer.size()); - serializer.SerializeSequenceStart(message); - if (!socket.Send(serialization_buffer.data(), serializer.GetBufferSize(), true)) - throw JFJochException(JFJochExceptionCategory::ZeroMQ, "Timeout on pushing start message on addr " + GetAddress()); + for (const auto &a : addr) { + auto s = std::make_unique(ZMQSocketType::Push); + if (send_buffer_size > 0) + s->SendBufferSize(send_buffer_size); + if (send_buffer_high_watermark > 0) + s->SendWaterMark(send_buffer_high_watermark); + s->SendTimeout(std::chrono::seconds(5)); // 5 seconds should be more than enough to flush buffers and to still give fast response + s->Bind(a); + socket.emplace_back(std::move(s)); + } } bool ZMQStream2Pusher::SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) { - return socket.Send(image_data, image_size, false); + if (preview_socket) { + if (preview_counter.GeneratePreview()) + preview_socket->Send(image_data, image_size, false); + } + + if (!socket.empty()) { + auto socket_number = (image_number / images_per_file) % socket.size(); + return socket[socket_number]->Send(image_data, image_size, false); + } else + return false; } void ZMQStream2Pusher::SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number, - ZeroCopyReturnValue *z) { - socket.SendZeroCopy(image_data,image_size, z); + ZeroCopyReturnValue *z) { + if (preview_socket) { + if (preview_counter.GeneratePreview()) + preview_socket->Send(image_data, image_size, false); + } + + if (!socket.empty()) { + auto socket_number = (image_number / images_per_file) % socket.size(); + socket[socket_number]->SendZeroCopy(image_data, image_size, z); + } else + z->release(); } -bool ZMQStream2Pusher::EndDataCollection(const EndMessage &message) { - std::vector serialization_buffer(80 * 1024 * 1024); - CBORStream2Serializer serializer(serialization_buffer.data(), serialization_buffer.size()); +void ZMQStream2Pusher::StartDataCollection(StartMessage& message) { + if (message.images_per_file < 1) + throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, + "Images per file cannot be zero or negative"); + images_per_file = message.images_per_file; - serializer.SerializeSequenceEnd(message); - return socket.Send(serialization_buffer.data(), serializer.GetBufferSize(), true); // Blocking + serializer.SerializeSequenceStart(message); + + for (auto &s: socket) { + if (!s->Send(serialization_buffer.data(), serializer.GetBufferSize(), true)) + throw JFJochException(JFJochExceptionCategory::ZeroMQ, "Timeout on pushing start message on addr " + + s->GetEndpointName()); + if (message.write_master_file) { + message.write_master_file = false; + serializer.SerializeSequenceStart(message); + } + } + + if (preview_socket) + preview_socket->Send(serialization_buffer.data(), serializer.GetBufferSize(), true); } bool ZMQStream2Pusher::SendCalibration(const CompressedImage &message) { - std::vector serialization_buffer(80 * 1024 * 1024); - CBORStream2Serializer serializer(serialization_buffer.data(), serialization_buffer.size()); + if (socket.empty()) + return false; + serializer.SerializeCalibration(message); - return socket.Send(serialization_buffer.data(), serializer.GetBufferSize(), true); // Blocking + + return socket[0]->Send(serialization_buffer.data(), serializer.GetBufferSize(), true); } -std::string ZMQStream2Pusher::GetAddress() { - return socket.GetEndpointName(); +bool ZMQStream2Pusher::EndDataCollection(const EndMessage& message) { + serializer.SerializeSequenceEnd(message); + + bool ret = true; + for (auto &s: socket) { + if (!s->Send(serialization_buffer.data(), serializer.GetBufferSize(), true)) + ret = false; + } + + if (preview_socket) + preview_socket->Send(serialization_buffer.data(), serializer.GetBufferSize(), true); + + return ret; +} + +std::vector ZMQStream2Pusher::GetAddress() { + std::vector ret; + for (auto &p: socket) + ret.push_back(p->GetEndpointName()); + return ret; +} + +ZMQStream2Pusher &ZMQStream2Pusher::PreviewSocket(const std::string &addr) { + preview_socket = std::make_unique(ZMQSocketType::Pub); + preview_socket->Bind(addr); + return *this; +} + +std::string ZMQStream2Pusher::GetPreviewAddress() { + if (preview_socket) + return preview_socket->GetEndpointName(); + else + return ""; } diff --git a/frame_serialize/ZMQStream2Pusher.h b/frame_serialize/ZMQStream2Pusher.h index fe997449..bb7a622e 100644 --- a/frame_serialize/ZMQStream2Pusher.h +++ b/frame_serialize/ZMQStream2Pusher.h @@ -3,30 +3,40 @@ #ifndef JUNGFRAUJOCH_ZMQSTREAM2PUSHER_H #define JUNGFRAUJOCH_ZMQSTREAM2PUSHER_H +#include + #include "ImagePusher.h" #include "../common/ZMQWrappers.h" +#include "../preview/PreviewCounter.h" class ZMQStream2Pusher : public ImagePusher { - std::unique_ptr context; - ZMQSocket socket; -public: - ZMQStream2Pusher(ZMQContext& context, - const std::string& addr, - int32_t send_buffer_high_watermark = -1, - int32_t send_buffer_size = -1); + std::vector serialization_buffer; + CBORStream2Serializer serializer; - explicit ZMQStream2Pusher(const std::string& addr, + std::vector> socket; + + std::unique_ptr preview_socket; + PreviewCounter preview_counter; + + int64_t images_per_file = 1; +public: + explicit ZMQStream2Pusher(const std::vector& addr, int32_t send_buffer_high_watermark = -1, int32_t send_buffer_size = -1); - void Bind(const std::string& addr, int32_t send_buffer_high_watermark, int32_t send_buffer_size); + + ZMQStream2Pusher& PreviewSocket(const std::string& addr); + std::string GetPreviewAddress(); + + std::vector GetAddress(); + + // Strictly serial, as order of these is important void StartDataCollection(StartMessage& message) override; - bool SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) override; - void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number, ZeroCopyReturnValue *z) override; - bool EndDataCollection(const EndMessage &message) override; + bool EndDataCollection(const EndMessage& message) override; bool SendCalibration(const CompressedImage& message) override; - std::string GetAddress(); + // Thread-safe + void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number, ZeroCopyReturnValue *z) override; + bool SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) override; }; - #endif //JUNGFRAUJOCH_ZMQSTREAM2PUSHER_H diff --git a/frame_serialize/ZMQStream2PusherGroup.cpp b/frame_serialize/ZMQStream2PusherGroup.cpp deleted file mode 100644 index b5d1dc04..00000000 --- a/frame_serialize/ZMQStream2PusherGroup.cpp +++ /dev/null @@ -1,75 +0,0 @@ -// Copyright (2019-2024) Paul Scherrer Institute - -#include "ZMQStream2PusherGroup.h" -#include "CBORStream2Serializer.h" - -ZMQStream2PusherGroup::ZMQStream2PusherGroup(ZMQContext &zmq_context, const std::vector &addr, - int32_t send_buffer_high_watermark, int32_t send_buffer_size) { - if (addr.empty()) - throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, - "No writer ZMQ address provided"); - - for (const auto &a : addr) - pusher.emplace_back(std::make_unique - (zmq_context, a, send_buffer_high_watermark, send_buffer_size)); -} - -ZMQStream2PusherGroup::ZMQStream2PusherGroup(const std::vector &addr, - int32_t send_buffer_high_watermark, int32_t send_buffer_size) { - if (addr.empty()) - throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, - "No writer ZMQ address provided"); - for (const auto &a : addr) - pusher.emplace_back(std::make_unique - (a, send_buffer_high_watermark, send_buffer_size)); -} - -bool ZMQStream2PusherGroup::SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) { - if (!pusher.empty()) { - auto socket_number = (image_number / images_per_file) % pusher.size(); - return pusher[socket_number]->SendImage(image_data, image_size, image_number); - } else - return false; -} - -void ZMQStream2PusherGroup::SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number, - ZeroCopyReturnValue *z) { - if (!pusher.empty()) { - auto socket_number = (image_number / images_per_file) % pusher.size(); - pusher[socket_number]->SendImage(image_data, image_size, image_number, z); - } -} - -void ZMQStream2PusherGroup::StartDataCollection(StartMessage& message) { - if (message.images_per_file < 1) - throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, - "Images per file cannot be zero or negative"); - images_per_file = message.images_per_file; - - for (auto &p: pusher) { - p->StartDataCollection(message); - message.write_master_file = false; - } -} - -bool ZMQStream2PusherGroup::SendCalibration(const CompressedImage &message) { - if (pusher.empty()) - return false; - return pusher[0]->SendCalibration(message); -} - -bool ZMQStream2PusherGroup::EndDataCollection(const EndMessage& message) { - bool ret = true; - for (auto &p: pusher) { - if (!p->EndDataCollection(message)) - ret = false; - } - return ret; -} - -std::vector ZMQStream2PusherGroup::GetAddress() { - std::vector ret; - for (auto &p: pusher) - ret.push_back(p->GetAddress()); - return ret; -} diff --git a/frame_serialize/ZMQStream2PusherGroup.h b/frame_serialize/ZMQStream2PusherGroup.h deleted file mode 100644 index ab3c31bf..00000000 --- a/frame_serialize/ZMQStream2PusherGroup.h +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright (2019-2024) Paul Scherrer Institute - -#ifndef JUNGFRAUJOCH_ZMQSTREAM2PUSHERGROUP_H -#define JUNGFRAUJOCH_ZMQSTREAM2PUSHERGROUP_H - -#include "ImagePusher.h" -#include "ZMQStream2Pusher.h" -#include "../common/ZMQWrappers.h" - -class ZMQStream2PusherGroup : public ImagePusher { - std::vector> pusher; - int64_t images_per_file = 1; -public: - ZMQStream2PusherGroup(ZMQContext &context, const std::vector& addr, - int32_t send_buffer_high_watermark = -1, int32_t send_buffer_size = -1); - // High performance implementation, where each socket has dedicated ZMQ context - explicit ZMQStream2PusherGroup(const std::vector& addr, - int32_t send_buffer_high_watermark = -1, int32_t send_buffer_size = -1); - - void StartDataCollection(StartMessage& message) override; - void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number, ZeroCopyReturnValue *z) override; - bool SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) override; - bool EndDataCollection(const EndMessage& message) override; - bool SendCalibration(const CompressedImage& message) override; - - std::vector GetAddress(); -}; - -#endif //JUNGFRAUJOCH_ZMQSTREAM2PUSHERGROUP_H diff --git a/frontend_ui/src/components/PreviewImage.tsx b/frontend_ui/src/components/PreviewImage.tsx index 19c688e3..51b60b1c 100644 --- a/frontend_ui/src/components/PreviewImage.tsx +++ b/frontend_ui/src/components/PreviewImage.tsx @@ -28,7 +28,7 @@ class PreviewImage extends Component { show_indexed: false, resolution_ring: 0.5 }, - s_url: "", + s_url: null, update: true, connection_error: true } @@ -124,9 +124,12 @@ class PreviewImage extends Component { clearInterval(this.interval); } - preview() { - return

- + render() { + return +
+   Preview image @@ -140,13 +143,13 @@ class PreviewImage extends Component { Show only indexed images    - +
Saturation value
    - + {

- { - this.state.s_url !== null ? + {(!this.state.connection_error && (this.state.s_url !== null)) ? { > - Live preview + Live preview - :
+ :
Preview not available
}
-
- } - - render() { - return - {(!this.state.connection_error && (this.state.s_url !== null)) ? this.preview() : "Preview not available"} } } diff --git a/image_analysis/MXAnalyzer.cpp b/image_analysis/MXAnalyzer.cpp index c69e5ff7..b40eb47b 100644 --- a/image_analysis/MXAnalyzer.cpp +++ b/image_analysis/MXAnalyzer.cpp @@ -27,8 +27,12 @@ MXAnalyzer::MXAnalyzer(const DiffractionExperiment &in_experiment) : experiment(in_experiment) { auto uc = experiment.GetUnitCell(); if (uc) { - do_indexing = true; - indexer.Setup(uc.value()); + try { + indexer = std::make_unique(); + indexer->Setup(uc.value()); + } catch (const std::exception &e) { + throw JFJochException(JFJochExceptionCategory::GPUCUDAError, e.what()); + } } if (experiment.IsSpotFindingEnabled()) find_spots = true; @@ -73,13 +77,13 @@ void MXAnalyzer::Process(DataMessage &message, const SpotFindingSettings& settin for (const auto &spot: spots_out) message.spots.push_back(spot); - if (do_indexing && settings.indexing) { + if (indexer && settings.indexing) { std::vector recip; recip.reserve(spots_out.size()); for (const auto &i: spots_out) recip.push_back(i.ReciprocalCoord(experiment)); - auto indexer_result = indexer.Run(recip, settings.indexing_tolerance); + auto indexer_result = indexer->Run(recip, settings.indexing_tolerance); if (!indexer_result.empty()) { message.indexing_result = true; diff --git a/image_analysis/MXAnalyzer.h b/image_analysis/MXAnalyzer.h index 3a8f677e..b01cea60 100644 --- a/image_analysis/MXAnalyzer.h +++ b/image_analysis/MXAnalyzer.h @@ -9,8 +9,7 @@ class MXAnalyzer { const DiffractionExperiment &experiment; - IndexerWrapper indexer; - bool do_indexing = false; + std::unique_ptr indexer; bool find_spots = false; std::vector spots; constexpr static const float spot_distance_threshold_pxl = 2.0f; diff --git a/preview/PreviewImage.cpp b/preview/PreviewImage.cpp index 12c67ddb..bfd40dba 100644 --- a/preview/PreviewImage.cpp +++ b/preview/PreviewImage.cpp @@ -33,6 +33,7 @@ constexpr const static rgb gray = {.r = 0xbe, .g = 0xbe, .b = 0xbe}; PreviewImage::PreviewImage(const DiffractionExperiment &in_experiment) : experiment(in_experiment), + initialized(false), xpixel(experiment.GetXPixelsNum()), ypixel(experiment.GetYPixelsNum()), beam_x(experiment.GetBeamX_pxl()), @@ -47,6 +48,7 @@ void PreviewImage::UpdateImage(const void *in_uncompressed_image, const std::vector &in_spots) { if (counter.GeneratePreview()) { std::unique_lock ul(m); + initialized = true; memcpy(uncompressed_image.data(), in_uncompressed_image, xpixel * ypixel * pixel_depth_bytes); spots = in_spots; } @@ -109,6 +111,10 @@ std::string PreviewImage::GenerateJPEG(const PreviewJPEGSettings &settings) cons { // JPEG compression is outside the critical loop protected by m std::unique_lock ul(m); + + if (!initialized) + return {}; + if (!pixel_is_signed) { if (pixel_depth_bytes == 2) v = GenerateRGB((uint16_t *) uncompressed_image.data(), xpixel * ypixel, @@ -141,6 +147,9 @@ std::string PreviewImage::GenerateJPEG(const PreviewJPEGSettings &settings) cons std::string PreviewImage::GenerateTIFF() const { std::unique_lock ul(m); + if (!initialized) + return {}; + std::string s = WriteTIFFToString(const_cast(uncompressed_image.data()), xpixel, ypixel, pixel_depth_bytes, pixel_is_signed); return s; @@ -167,6 +176,8 @@ std::vector GenerateDioptasPreview(const void* input, size_t xpixel, s std::string PreviewImage::GenerateTIFFDioptas() const { std::unique_lock ul(m); + if (!initialized) + return {}; std::vector vec; if (pixel_is_signed) { diff --git a/preview/PreviewImage.h b/preview/PreviewImage.h index 13e2c9df..e84e8ed9 100644 --- a/preview/PreviewImage.h +++ b/preview/PreviewImage.h @@ -29,6 +29,7 @@ class PreviewImage { mutable std::mutex m; DiffractionExperiment experiment; + bool initialized; const ROIMap roi_map; std::vector uncompressed_image; std::vector spots; diff --git a/receiver/JFJochReceiver.cpp b/receiver/JFJochReceiver.cpp index 3ca04a8d..e6ee8459 100644 --- a/receiver/JFJochReceiver.cpp +++ b/receiver/JFJochReceiver.cpp @@ -257,18 +257,20 @@ void JFJochReceiver::RetrievePedestal() { void JFJochReceiver::FrameTransformationThread() { + std::unique_ptr analyzer; + try { numa_policy.Bind(); + analyzer = std::make_unique(experiment); } catch (const JFJochException &e) { frame_transformation_ready.count_down(); - logger.Error("HW bind error {}", e.what()); + logger.Error("Thread setup error {}", e.what()); Cancel(e); + return; } FrameTransformation transformation(experiment); - MXAnalyzer analyzer(experiment); - frame_transformation_ready.count_down(); uint16_t az_int_min_bin = std::floor(az_int_mapping.QToBin(experiment.GetLowQForBkgEstimate_recipA())); @@ -313,7 +315,7 @@ void JFJochReceiver::FrameTransformationThread() { adu_histogram_module[module_abs_number].Add(*output); az_int_profile_image.Add(*output); - analyzer.ReadFromFPGA(output, local_spot_finding_settings, module_abs_number); + analyzer->ReadFromFPGA(output, local_spot_finding_settings, module_abs_number); transformation.ProcessModule(output, d); } else @@ -332,7 +334,7 @@ void JFJochReceiver::FrameTransformationThread() { continue; } - analyzer.Process(message, local_spot_finding_settings); + analyzer->Process(message, local_spot_finding_settings); message.receiver_free_send_buf = send_buf_ctrl.GetAvailBufLocations(); message.az_int_profile = az_int_profile_image.GetResult(); diff --git a/receiver/JFJochReceiverTest.cpp b/receiver/JFJochReceiverTest.cpp index 3084322c..f3c6256e 100644 --- a/receiver/JFJochReceiverTest.cpp +++ b/receiver/JFJochReceiverTest.cpp @@ -2,7 +2,7 @@ #include "JFJochReceiverTest.h" #include "JFJochReceiverService.h" -#include "../frame_serialize/ZMQStream2PusherGroup.h" +#include "../frame_serialize/ZMQStream2Pusher.h" #include "../frame_serialize/TestImagePusher.h" #define STORAGE_CELL_FOR_TEST 11 diff --git a/tests/HDF5WritingTest.cpp b/tests/HDF5WritingTest.cpp index 2edc657c..3baf54bf 100644 --- a/tests/HDF5WritingTest.cpp +++ b/tests/HDF5WritingTest.cpp @@ -221,8 +221,6 @@ TEST_CASE("HDF5Writer", "[HDF5][Full]") { TEST_CASE("HDF5Writer_Socket", "[HDF5][Full]") { { - ZMQContext c; - RegisterHDF5Filter(); DiffractionExperiment x(DetectorGeometry(8, 2, 8, 36)); std::vector spots; @@ -233,10 +231,10 @@ TEST_CASE("HDF5Writer_Socket", "[HDF5][Full]") { x.FillMessage(start_message); HDF5Writer file_set(start_message); - file_set.SetupSocket(c, "ipc://#1"); + file_set.SetupSocket("ipc://#1"); std::vector image(x.GetPixelsNum()); - ZMQSocket s(c, ZMQSocketType::Sub); + ZMQSocket s(ZMQSocketType::Sub); s.Connect("ipc://#1"); s.SubscribeAll(); s.ReceiveTimeout(std::chrono::seconds(5)); diff --git a/tests/JFJochReceiverProcessingTest.cpp b/tests/JFJochReceiverProcessingTest.cpp index 5d27db14..778e39e4 100644 --- a/tests/JFJochReceiverProcessingTest.cpp +++ b/tests/JFJochReceiverProcessingTest.cpp @@ -52,13 +52,10 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_lysozyme_spot_and_index", "[JFJochReceiver] aq_devices.Add(std::move(test)); - ZMQContext context; - ZMQStream2Pusher pusher(context, "ipc://*"); - StreamWriter writer(context, logger, pusher.GetAddress()); + ZMQStream2Pusher pusher({"ipc://*"}); + StreamWriter writer(logger, pusher.GetAddress()[0]); auto writer_future = std::async(std::launch::async, &StreamWriter::Run, &writer); - context.NumThreads(4); - JFJochReceiverService service(aq_devices, logger, pusher); service.NumThreads(nthreads); @@ -127,11 +124,8 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_lysozyme_spot_and_index_min_pix_2", "[JFJoc test->SetInternalGeneratorFrame((uint16_t *) image_raw_geom.data() + m * RAW_MODULE_SIZE, m); aq_devices.Add(std::move(test)); - ZMQContext context; - context.NumThreads(4); - - ZMQStream2Pusher pusher(context, "ipc://*"); - StreamWriter writer(context, logger, pusher.GetAddress()); + ZMQStream2Pusher pusher({"ipc://*"}); + StreamWriter writer(logger, pusher.GetAddress()[0]); auto writer_future = std::async(std::launch::async, &StreamWriter::Run, &writer); JFJochReceiverService service(aq_devices, logger, pusher); @@ -241,11 +235,8 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_ROI", "[JFJochReceiver]") { aq_devices.Add(std::move(test)); - ZMQContext context; - - context.NumThreads(4); - ZMQStream2Pusher pusher(context, "ipc://*"); - StreamWriter writer(context, logger, pusher.GetAddress()); + ZMQStream2Pusher pusher({"ipc://*"}); + StreamWriter writer(logger, pusher.GetAddress()[0]); auto writer_future = std::async(std::launch::async, &StreamWriter::Run, &writer); JFJochReceiverService service(aq_devices, logger, pusher); @@ -269,6 +260,7 @@ TEST_CASE("JFJochIntegrationTest_ZMQ_ROI", "[JFJochReceiver]") { REQUIRE(plot.size() == 2); CHECK(plot[0].title == "roi0"); + REQUIRE(!plot[0].x.empty()); CHECK(plot[0].x[0] == 0); CHECK(plot[0].y[0] == roi_value); CHECK(plot[1].title == "roi1"); diff --git a/tests/JPEGTest.cpp b/tests/JPEGTest.cpp index cd2b9087..9eea75f0 100644 --- a/tests/JPEGTest.cpp +++ b/tests/JPEGTest.cpp @@ -58,7 +58,6 @@ TEST_CASE("PreviewImage_GenerateJPEG","[JPEG]") { {.x = 1200, .y = 500, .indexed = true} }; PreviewImage image(experiment); - image.UpdateImage(image_conv_2.data(), spots); PreviewJPEGSettings preview_settings{ .saturation_value = 5, @@ -66,6 +65,11 @@ TEST_CASE("PreviewImage_GenerateJPEG","[JPEG]") { .show_spots = true }; + REQUIRE(image.GenerateJPEG(preview_settings).empty()); + + image.UpdateImage(image_conv_2.data(), spots); + + std::string s; REQUIRE_NOTHROW(s = image.GenerateJPEG(preview_settings)); std::ofstream f("lyso_diff.jpeg", std::ios::binary); diff --git a/tests/StreamWriterTest.cpp b/tests/StreamWriterTest.cpp index 1dd70be1..23743d2f 100644 --- a/tests/StreamWriterTest.cpp +++ b/tests/StreamWriterTest.cpp @@ -4,15 +4,13 @@ #include #include "../writer/StreamWriter.h" -#include "../frame_serialize/ZMQStream2PusherGroup.h" +#include "../frame_serialize/ZMQStream2Pusher.h" #include "../receiver/JFJochReceiverService.h" TEST_CASE("StreamWriterTest_ZMQ","[JFJochWriter]") { RegisterHDF5Filter(); Logger logger("test"); - ZMQContext context; - std::string zmq_addr = "ipc://*"; DiffractionExperiment x(DetectorGeometry(2)); x.FilePrefix("subdir/JFJochWriterTest").NumTriggers(1).ImagesPerTrigger(5) @@ -23,7 +21,7 @@ TEST_CASE("StreamWriterTest_ZMQ","[JFJochWriter]") { for (int i = 0; i < x.GetDataStreamsNum(); i++) aq_devices.AddHLSDevice(64); - ZMQStream2PusherGroup pusher (context, {zmq_addr}); + ZMQStream2Pusher pusher ({"ipc://*"}); JFJochReceiverService fpga_receiver_service(aq_devices, logger, pusher); JFJochReceiverOutput receiver_output; @@ -32,7 +30,7 @@ TEST_CASE("StreamWriterTest_ZMQ","[JFJochWriter]") { REQUIRE(x.GetImageNum() == 5); auto pusher_addr = pusher.GetAddress(); REQUIRE(pusher_addr.size() == 1); - REQUIRE_NOTHROW(writer = std::make_unique(context, logger, pusher_addr[0])); + REQUIRE_NOTHROW(writer = std::make_unique(logger, pusher_addr[0])); CHECK (writer->GetStatistics().state == StreamWriterState::Idle); REQUIRE_NOTHROW(fpga_receiver_service.Start(x, nullptr)); diff --git a/tests/ZMQImagePusherTest.cpp b/tests/ZMQImagePusherTest.cpp index 7139891c..880d86bf 100644 --- a/tests/ZMQImagePusherTest.cpp +++ b/tests/ZMQImagePusherTest.cpp @@ -3,7 +3,7 @@ #include #include #include "../writer/ZMQImagePuller.h" -#include "../frame_serialize/ZMQStream2PusherGroup.h" +#include "../frame_serialize/ZMQStream2Pusher.h" void test_puller(ZMQImagePuller *puller, const DiffractionExperiment& x, @@ -53,7 +53,6 @@ void test_puller(ZMQImagePuller *puller, TEST_CASE("ZMQImageCommTest_1Writer","[ZeroMQ]") { const size_t nframes = 256; - ZMQContext context; Logger logger("test"); DiffractionExperiment x(DetectorGeometry(1)); x.Mode(DetectorMode::Raw); @@ -71,12 +70,10 @@ TEST_CASE("ZMQImageCommTest_1Writer","[ZeroMQ]") { std::vector image1(x.GetPixelsNum()*nframes); for (auto &i: image1) i = dist(g1); - std::string zmq_addr = "ipc://*"; - // Puller needs to be declared first, but both objects need to exist till communication finished // TODO: ImageSender should not allow if there are still completions to be done - ZMQImagePuller puller(context); - ZMQStream2PusherGroup pusher(context, {zmq_addr}); + ZMQImagePuller puller; + ZMQStream2Pusher pusher({"ipc://*"}); std::vector diff_size(1), diff_content(1), diff_split(1), nimages(1); @@ -119,11 +116,87 @@ TEST_CASE("ZMQImageCommTest_1Writer","[ZeroMQ]") { REQUIRE(diff_content[0] == 0); } +TEST_CASE("ZMQImageCommTest_1Writer_Preview","[ZeroMQ]") { + const size_t nframes = 1; + + Logger logger("test"); + DiffractionExperiment x(DetectorGeometry(1)); + x.Mode(DetectorMode::Raw); + x.PedestalG0Frames(0).NumTriggers(1).UseInternalPacketGenerator(false).PhotonEnergy_keV(12.4) + .ImagesPerTrigger(nframes); + + std::vector empty_spot_vector; + std::vector empty_rad_int_profile; + + REQUIRE(x.GetImageNum() == nframes); + + std::mt19937 g1(1387); + std::uniform_int_distribution dist; + + std::vector image1(x.GetPixelsNum()*nframes); + for (auto &i: image1) i = dist(g1); + + // Puller needs to be declared first, but both objects need to exist till communication finished + ZMQImagePuller puller; + ZMQStream2Pusher pusher({"ipc://*"}); + REQUIRE(pusher.GetPreviewAddress().empty()); + + pusher.PreviewSocket("ipc://*"); + REQUIRE(!pusher.GetPreviewAddress().empty()); + + ZMQSocket preview_sub_socket(ZMQSocketType::Sub); + preview_sub_socket.Connect(pusher.GetPreviewAddress()); + preview_sub_socket.Subscribe(""); + + std::vector diff_size(1), diff_content(1), diff_split(1), nimages(1); + + auto pusher_addr = pusher.GetAddress(); + puller.Connect(pusher_addr[0]); + + std::thread sender_thread = std::thread([&] { + std::vector serialization_buffer(16*1024*1024); + CBORStream2Serializer serializer(serialization_buffer.data(), serialization_buffer.size()); + + StartMessage message { + .images_per_file = 16, + .write_master_file = true + }; + EndMessage end_message{}; + + pusher.StartDataCollection(message); + for (int i = 0; i < nframes; i++) { + DataMessage data_message; + data_message.number = i; + PrepareCBORImage(data_message, x, image1.data() + i * x.GetPixelsNum(), x.GetPixelsNum() * sizeof(uint16_t)); + serializer.SerializeImage(data_message); + pusher.SendImage(serialization_buffer.data(), serializer.GetBufferSize(), i); + } + + pusher.EndDataCollection(end_message); + }); + + std::thread puller_thread(test_puller, &puller, std::cref(x), std::cref(image1), 1, 0, + std::ref(diff_split), std::ref(diff_size), std::ref(diff_content), + std::ref(nimages)); + + sender_thread.join(); + puller_thread.join(); + + puller.Disconnect(); + + REQUIRE(nimages[0] == nframes); + REQUIRE(diff_size[0] == 0); + REQUIRE(diff_content[0] == 0); + + ZMQMessage msg; + REQUIRE(preview_sub_socket.Receive(msg)); + REQUIRE(preview_sub_socket.Receive(msg)); + REQUIRE(preview_sub_socket.Receive(msg)); +} TEST_CASE("ZMQImageCommTest_2Writers","[ZeroMQ]") { const size_t nframes = 256; - ZMQContext context; Logger logger("test"); DiffractionExperiment x(DetectorGeometry(1)); x.Mode(DetectorMode::Raw); @@ -148,7 +221,7 @@ TEST_CASE("ZMQImageCommTest_2Writers","[ZeroMQ]") { for (int i = 0; i < npullers; i++) zmq_addr.push_back("ipc://*"); - ZMQStream2PusherGroup pusher(context, zmq_addr); + ZMQStream2Pusher pusher(zmq_addr); // Puller needs to be declared first, but both objects need to exist till communication finished // TODO: ImageSender should not allow if there are still completions to be done @@ -156,7 +229,7 @@ TEST_CASE("ZMQImageCommTest_2Writers","[ZeroMQ]") { auto pusher_addr = pusher.GetAddress(); REQUIRE(pusher_addr.size() == 2); for (int i = 0; i < npullers; i++) { - puller.push_back(std::make_unique(context)); + puller.push_back(std::make_unique()); puller[i]->Connect(pusher_addr[i]); } @@ -213,7 +286,6 @@ TEST_CASE("ZMQImageCommTest_2Writers","[ZeroMQ]") { TEST_CASE("ZMQImageCommTest_4Writers","[ZeroMQ]") { const size_t nframes = 255; - ZMQContext context; Logger logger("test"); DiffractionExperiment x(DetectorGeometry(1)); x.Mode(DetectorMode::Raw); @@ -238,14 +310,14 @@ TEST_CASE("ZMQImageCommTest_4Writers","[ZeroMQ]") { for (int i = 0; i < npullers; i++) zmq_addr.push_back("ipc://*"); - ZMQStream2PusherGroup pusher(context, zmq_addr); + ZMQStream2Pusher pusher(zmq_addr); auto pusher_addr = pusher.GetAddress(); REQUIRE(pusher_addr.size() == npullers); // Puller needs to be declared first, but both objects need to exist till communication finished // TODO: ImageSender should not allow if there are still completions to be done std::vector > puller; for (int i = 0; i < npullers; i++) { - puller.push_back(std::make_unique(context)); + puller.push_back(std::make_unique()); puller[i]->Connect(pusher_addr[i]); } @@ -310,8 +382,7 @@ TEST_CASE("ZMQImagePuller_abort","[ZeroMQ]") { x.PedestalG0Frames(0).NumTriggers(1).UseInternalPacketGenerator(false).PhotonEnergy_keV(12.4) .ImagesPerTrigger(nframes); - ZMQContext context; - ZMQImagePuller puller(context); + ZMQImagePuller puller; std::vector diff_size(1), diff_content(1), diff_split(1), nimages(1); std::vector image1(x.GetPixelsNum()); @@ -325,7 +396,7 @@ TEST_CASE("ZMQImagePuller_abort","[ZeroMQ]") { } TEST_CASE("ZMQImageCommTest_NoWriter","[ZeroMQ]") { - ZMQStream2PusherGroup pusher({"ipc://*"}); + ZMQStream2Pusher pusher({"ipc://*"}); StartMessage msg; REQUIRE_THROWS(pusher.StartDataCollection(msg)); diff --git a/tools/jfjoch_writer_test.cpp b/tools/jfjoch_writer_test.cpp index 49d9e417..201b76ed 100644 --- a/tools/jfjoch_writer_test.cpp +++ b/tools/jfjoch_writer_test.cpp @@ -6,7 +6,7 @@ #include "../common/Logger.h" #include "../receiver/FrameTransformation.h" #include "../common/RawToConvertedGeometry.h" -#include "../frame_serialize/ZMQStream2PusherGroup.h" +#include "../frame_serialize/ZMQStream2Pusher.h" #define BASE_TCP_PORT 8000 @@ -54,7 +54,7 @@ int main(int argc, char **argv) { for (int i = 0; i < nsockets; i++) zmq_addr.emplace_back("tcp://0.0.0.0:" + std::to_string(BASE_TCP_PORT + i)); - ZMQStream2PusherGroup pusher(context, zmq_addr); + ZMQStream2Pusher pusher(zmq_addr); FrameTransformation transformation(x); diff --git a/writer/HDF5Writer.cpp b/writer/HDF5Writer.cpp index aa95dbad..5378eeb5 100644 --- a/writer/HDF5Writer.cpp +++ b/writer/HDF5Writer.cpp @@ -74,8 +74,8 @@ void HDF5Writer::AddStats(const std::optional& s) { } } -void HDF5Writer::SetupSocket(ZMQContext &c, const std::string &addr) { - socket = std::make_unique(c, ZMQSocketType::Pub); +void HDF5Writer::SetupSocket(const std::string &addr) { + socket = std::make_unique(ZMQSocketType::Pub); socket->Bind(addr); } diff --git a/writer/HDF5Writer.h b/writer/HDF5Writer.h index 84e4798f..5a85cfd1 100644 --- a/writer/HDF5Writer.h +++ b/writer/HDF5Writer.h @@ -25,7 +25,7 @@ public: explicit HDF5Writer(const StartMessage &request); void Write(const DataMessage& msg); std::vector Finalize(); - void SetupSocket(ZMQContext &c, const std::string &addr); + void SetupSocket(const std::string &addr); std::optional GetZMQAddr(); }; diff --git a/writer/StreamWriter.cpp b/writer/StreamWriter.cpp index 09fd26d4..2e453702 100644 --- a/writer/StreamWriter.cpp +++ b/writer/StreamWriter.cpp @@ -6,13 +6,11 @@ #include "HDF5NXmx.h" #include "MakeDirectory.h" -StreamWriter::StreamWriter(ZMQContext &in_context, - Logger &in_logger, +StreamWriter::StreamWriter(Logger &in_logger, const std::string &zmq_addr, const std::string &repub_address, const std::string &in_file_done_address) - : zmq_context(in_context), - image_puller(in_context, repub_address), + : image_puller(repub_address), logger(in_logger), file_done_address(in_file_done_address) { image_puller.Connect(zmq_addr); @@ -46,7 +44,7 @@ void StreamWriter::CollectImages(std::vector &v) { HDF5Writer writer(*image_puller_output.cbor->start_message); if (!file_done_address.empty()) - writer.SetupSocket(zmq_context, file_done_address); + writer.SetupSocket(file_done_address); std::unique_ptr master_file; if (!image_puller_output.cbor->start_message->write_master_file || image_puller_output.cbor->start_message->write_master_file.value()) diff --git a/writer/StreamWriter.h b/writer/StreamWriter.h index 02e7085d..eea3b536 100644 --- a/writer/StreamWriter.h +++ b/writer/StreamWriter.h @@ -24,7 +24,6 @@ struct StreamWriterOutput { }; class StreamWriter { - ZMQContext &zmq_context; std::string file_done_address; StreamWriterState state = StreamWriterState::Idle; @@ -42,8 +41,7 @@ class StreamWriter { void CollectImages(std::vector &v); bool WaitForImage(); public: - StreamWriter(ZMQContext& context, - Logger &logger, + StreamWriter(Logger &logger, const std::string& zmq_addr, const std::string& repub_address = "", const std::string& file_done_address = ""); diff --git a/writer/ZMQImagePuller.cpp b/writer/ZMQImagePuller.cpp index 44de993a..1986ee3e 100644 --- a/writer/ZMQImagePuller.cpp +++ b/writer/ZMQImagePuller.cpp @@ -2,13 +2,13 @@ #include "ZMQImagePuller.h" -ZMQImagePuller::ZMQImagePuller(ZMQContext &context, const std::string &repub_address) : - socket (context, ZMQSocketType::Pull) { +ZMQImagePuller::ZMQImagePuller(const std::string &repub_address) : + socket (ZMQSocketType::Pull) { socket.ReceiveWaterMark(ReceiverWaterMark); socket.ReceiveTimeout(ReceiveTimeout); if (!repub_address.empty()) { - repub_socket = std::make_unique(context, ZMQSocketType::Push); + repub_socket = std::make_unique(ZMQSocketType::Push); repub_socket->SendWaterMark(100); repub_socket->SendTimeout(std::chrono::milliseconds(100)); repub_socket->Bind(repub_address); diff --git a/writer/ZMQImagePuller.h b/writer/ZMQImagePuller.h index e276e17a..fedebb1e 100644 --- a/writer/ZMQImagePuller.h +++ b/writer/ZMQImagePuller.h @@ -42,7 +42,7 @@ class ZMQImagePuller { void RepubThread(); Logger logger{"ZMQImagePuller"}; public: - explicit ZMQImagePuller(ZMQContext &context, const std::string &repub_address = ""); + explicit ZMQImagePuller(const std::string &repub_address = ""); ~ZMQImagePuller(); void Connect(const std::string &in_address); void Disconnect(); diff --git a/writer/jfjoch_writer.cpp b/writer/jfjoch_writer.cpp index 28655f6e..082f7c6c 100644 --- a/writer/jfjoch_writer.cpp +++ b/writer/jfjoch_writer.cpp @@ -117,7 +117,7 @@ int main(int argc, char **argv) { ZMQContext context; Pistache::Address addr(Pistache::Ipv4::any(), Pistache::Port(http_port)); - writer = new StreamWriter(context, logger, argv[first_argc], repub_zmq_addr, file_done_zmq_addr); + writer = new StreamWriter(logger, argv[first_argc], repub_zmq_addr, file_done_zmq_addr); httpEndpoint = new Pistache::Http::Endpoint(addr); auto router = std::make_shared();