diff --git a/image_pusher/HDF5FilePusher.cpp b/image_pusher/HDF5FilePusher.cpp index d6c00c74..a5f9138c 100644 --- a/image_pusher/HDF5FilePusher.cpp +++ b/image_pusher/HDF5FilePusher.cpp @@ -27,12 +27,6 @@ void HDF5FilePusher::StartDataCollection(StartMessage &message) { StartMessage repub_message = message; repub_message.writer_notification_zmq_addr = ""; - size_t approx_size = 1024 * 1024; - for (const auto &[key, value] : repub_message.pixel_mask) - approx_size += value.size() * sizeof(uint32_t); - - std::vector serialization_buffer(approx_size); - CBORStream2Serializer serializer(serialization_buffer.data(), serialization_buffer.size()); serializer.SerializeSequenceStart(repub_message); repub_active = repub_socket->Send(serialization_buffer.data(), serializer.GetBufferSize(), true); @@ -59,9 +53,6 @@ bool HDF5FilePusher::EndDataCollection(const EndMessage &message) { if (repub_socket) { try { - size_t approx_size = 1024 * 1024; - std::vector serialization_buffer(approx_size); - CBORStream2Serializer serializer(serialization_buffer.data(), serialization_buffer.size()); serializer.SerializeSequenceEnd(message); if (repub_active) diff --git a/image_pusher/ImagePusher.cpp b/image_pusher/ImagePusher.cpp index 8f5ca4d3..cca67c54 100644 --- a/image_pusher/ImagePusher.cpp +++ b/image_pusher/ImagePusher.cpp @@ -13,6 +13,10 @@ void PrepareCBORImage(DataMessage& message, experiment.GetCompressionAlgorithm()); } +ImagePusher::ImagePusher() +: serialization_buffer(MESSAGE_SIZE_FOR_START_END), +serializer(serialization_buffer.data(), serialization_buffer.size()) {} + std::string ImagePusher::Finalize() { return ""; } diff --git a/image_pusher/ImagePusher.h b/image_pusher/ImagePusher.h index cb5e64d1..ac41375b 100644 --- a/image_pusher/ImagePusher.h +++ b/image_pusher/ImagePusher.h @@ -10,6 +10,7 @@ #include "../common/JFJochMessages.h" #include "../common/ZeroCopyReturnValue.h" #include "../common/Logger.h" +#include "../frame_serialize/CBORStream2Serializer.h" enum class ImagePusherType {HDF5, CBOR, TCP, ZMQ, Test, None}; @@ -33,6 +34,10 @@ void PrepareCBORImage(DataMessage& message, void *image, size_t image_size); class ImagePusher { +protected: + std::vector serialization_buffer; + CBORStream2Serializer serializer; + ImagePusher(); public: virtual void StartDataCollection(StartMessage& message) = 0; virtual bool EndDataCollection(const EndMessage& message) = 0; // Non-blocking diff --git a/image_pusher/TCPStreamPusher.cpp b/image_pusher/TCPStreamPusher.cpp index 0f777b68..d3184d2c 100644 --- a/image_pusher/TCPStreamPusher.cpp +++ b/image_pusher/TCPStreamPusher.cpp @@ -108,9 +108,7 @@ void TCPStreamPusher::CloseFd(std::atomic& fd) { TCPStreamPusher::TCPStreamPusher(const std::string& addr, size_t in_max_connections, std::optional in_send_buffer_size) - : serialization_buffer(256 * 1024 * 1024), - serializer(serialization_buffer.data(), serialization_buffer.size()), - endpoint(addr), + : endpoint(addr), max_connections(in_max_connections), send_buffer_size(in_send_buffer_size) { if (endpoint.empty()) diff --git a/image_pusher/TCPStreamPusher.h b/image_pusher/TCPStreamPusher.h index 69b4d59c..1b6eb0ef 100644 --- a/image_pusher/TCPStreamPusher.h +++ b/image_pusher/TCPStreamPusher.h @@ -97,9 +97,6 @@ class TCPStreamPusher : public ImagePusher { std::chrono::steady_clock::time_point last_keepalive_recv{}; }; - std::vector serialization_buffer; - CBORStream2Serializer serializer; - std::string endpoint; size_t max_connections; std::optional send_buffer_size; diff --git a/image_pusher/ZMQStream2Pusher.cpp b/image_pusher/ZMQStream2Pusher.cpp index ab4ce8f7..f115ce94 100644 --- a/image_pusher/ZMQStream2Pusher.cpp +++ b/image_pusher/ZMQStream2Pusher.cpp @@ -7,8 +7,6 @@ ZMQStream2Pusher::ZMQStream2Pusher(const std::vector &addr, std::optional send_buffer_high_watermark, std::optional send_buffer_size) - : serialization_buffer(256*1024*1024), - serializer(serialization_buffer.data(), serialization_buffer.size()) { if (addr.empty()) throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "No writer ZMQ address provided"); diff --git a/image_pusher/ZMQStream2Pusher.h b/image_pusher/ZMQStream2Pusher.h index 5f922363..aa5dbd36 100644 --- a/image_pusher/ZMQStream2Pusher.h +++ b/image_pusher/ZMQStream2Pusher.h @@ -9,12 +9,8 @@ #include "../preview/PreviewCounter.h" #include "ZMQWriterNotificationPuller.h" #include "ZMQStream2PusherSocket.h" -#include "../frame_serialize/CBORStream2Serializer.h" class ZMQStream2Pusher : public ImagePusher { - std::vector serialization_buffer; - CBORStream2Serializer serializer; - std::vector> socket; std::unique_ptr writer_notification_socket;