ImagePusher: Serializer defined inside ImagePusher class, not in subclasses

This commit is contained in:
2026-05-08 17:47:01 +02:00
parent f5176b56a9
commit 27bcb19328
7 changed files with 10 additions and 21 deletions
-9
View File
@@ -27,12 +27,6 @@ void HDF5FilePusher::StartDataCollection(StartMessage &message) {
StartMessage repub_message = message;
repub_message.writer_notification_zmq_addr = "";
size_t approx_size = 1024 * 1024;
for (const auto &[key, value] : repub_message.pixel_mask)
approx_size += value.size() * sizeof(uint32_t);
std::vector<uint8_t> serialization_buffer(approx_size);
CBORStream2Serializer serializer(serialization_buffer.data(), serialization_buffer.size());
serializer.SerializeSequenceStart(repub_message);
repub_active = repub_socket->Send(serialization_buffer.data(), serializer.GetBufferSize(), true);
@@ -59,9 +53,6 @@ bool HDF5FilePusher::EndDataCollection(const EndMessage &message) {
if (repub_socket) {
try {
size_t approx_size = 1024 * 1024;
std::vector<uint8_t> serialization_buffer(approx_size);
CBORStream2Serializer serializer(serialization_buffer.data(), serialization_buffer.size());
serializer.SerializeSequenceEnd(message);
if (repub_active)
+4
View File
@@ -13,6 +13,10 @@ void PrepareCBORImage(DataMessage& message,
experiment.GetCompressionAlgorithm());
}
ImagePusher::ImagePusher()
: serialization_buffer(MESSAGE_SIZE_FOR_START_END),
serializer(serialization_buffer.data(), serialization_buffer.size()) {}
std::string ImagePusher::Finalize() {
return "";
}
+5
View File
@@ -10,6 +10,7 @@
#include "../common/JFJochMessages.h"
#include "../common/ZeroCopyReturnValue.h"
#include "../common/Logger.h"
#include "../frame_serialize/CBORStream2Serializer.h"
enum class ImagePusherType {HDF5, CBOR, TCP, ZMQ, Test, None};
@@ -33,6 +34,10 @@ void PrepareCBORImage(DataMessage& message,
void *image, size_t image_size);
class ImagePusher {
protected:
std::vector<uint8_t> serialization_buffer;
CBORStream2Serializer serializer;
ImagePusher();
public:
virtual void StartDataCollection(StartMessage& message) = 0;
virtual bool EndDataCollection(const EndMessage& message) = 0; // Non-blocking
+1 -3
View File
@@ -108,9 +108,7 @@ void TCPStreamPusher::CloseFd(std::atomic<int>& fd) {
TCPStreamPusher::TCPStreamPusher(const std::string& addr,
size_t in_max_connections,
std::optional<int32_t> in_send_buffer_size)
: serialization_buffer(256 * 1024 * 1024),
serializer(serialization_buffer.data(), serialization_buffer.size()),
endpoint(addr),
: endpoint(addr),
max_connections(in_max_connections),
send_buffer_size(in_send_buffer_size) {
if (endpoint.empty())
-3
View File
@@ -97,9 +97,6 @@ class TCPStreamPusher : public ImagePusher {
std::chrono::steady_clock::time_point last_keepalive_recv{};
};
std::vector<uint8_t> serialization_buffer;
CBORStream2Serializer serializer;
std::string endpoint;
size_t max_connections;
std::optional<int32_t> send_buffer_size;
-2
View File
@@ -7,8 +7,6 @@
ZMQStream2Pusher::ZMQStream2Pusher(const std::vector<std::string> &addr,
std::optional<int32_t> send_buffer_high_watermark,
std::optional<int32_t> send_buffer_size)
: serialization_buffer(256*1024*1024),
serializer(serialization_buffer.data(), serialization_buffer.size())
{
if (addr.empty())
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "No writer ZMQ address provided");
-4
View File
@@ -9,12 +9,8 @@
#include "../preview/PreviewCounter.h"
#include "ZMQWriterNotificationPuller.h"
#include "ZMQStream2PusherSocket.h"
#include "../frame_serialize/CBORStream2Serializer.h"
class ZMQStream2Pusher : public ImagePusher {
std::vector<uint8_t> serialization_buffer;
CBORStream2Serializer serializer;
std::vector<std::unique_ptr<ZMQStream2PusherSocket>> socket;
std::unique_ptr<ZMQWriterNotificationPuller> writer_notification_socket;