diff --git a/common/ImagePusher.cpp b/common/ImagePusher.cpp index 15335931..64c9f54f 100644 --- a/common/ImagePusher.cpp +++ b/common/ImagePusher.cpp @@ -15,15 +15,3 @@ void PrepareCBORImage(DataMessage& message, message.image.algorithm = experiment.GetCompressionAlgorithmEnum(); message.image.channel = "default"; } - - -void PrepareDataMessageSpots(DataMessage& message, - const std::vector& spots) { - message.spots.clear(); - for (const auto & spot : spots) - message.spots.push_back(spot); -} - -void ImagePusher::SendData(const std::vector &serialized_image, int64_t image_number) { - SendDataInternal(serialized_image, image_number); -} \ No newline at end of file diff --git a/common/ImagePusher.h b/common/ImagePusher.h index 2a16c632..7d2f4ee9 100644 --- a/common/ImagePusher.h +++ b/common/ImagePusher.h @@ -18,12 +18,10 @@ void PrepareCBORImage(DataMessage& message, void *image, size_t image_size); class ImagePusher { -protected: - virtual void SendDataInternal(const std::vector& serialized_image, int64_t image_number) = 0; public: virtual void StartDataCollection(const StartMessage& message) = 0; virtual void EndDataCollection(const EndMessage& message) = 0; - void SendData(const std::vector& serialized_image, int64_t image_number); + virtual void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) = 0; }; diff --git a/common/TestImagePusher.cpp b/common/TestImagePusher.cpp index e4331ab1..2bd2254a 100644 --- a/common/TestImagePusher.cpp +++ b/common/TestImagePusher.cpp @@ -29,14 +29,14 @@ void TestImagePusher::EndDataCollection(const EndMessage& message) { is_running = false; } -void TestImagePusher::SendDataInternal(const std::vector &serialized_image, int64_t image_number) { +void TestImagePusher::SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) { std::unique_lock ul(m); frame_counter++; if (image_number == image_id) { JFJochFrameDeserializer deserializer; - deserializer.Process(serialized_image); + deserializer.Process(image_data, image_size); auto image_array = deserializer.GetDataMessage(); receiver_generated_image.resize(image_array.image.size); diff --git a/common/TestImagePusher.h b/common/TestImagePusher.h index 906348c3..47d89ba2 100644 --- a/common/TestImagePusher.h +++ b/common/TestImagePusher.h @@ -19,8 +19,8 @@ class TestImagePusher : public ImagePusher { bool correct_sequence = true; bool is_running = false; size_t frame_counter = 0; - void SendDataInternal(const std::vector &serialized_image, int64_t image_number) override; public: + void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) override; explicit TestImagePusher(int64_t image_number); void StartDataCollection(const StartMessage& message) override; void EndDataCollection(const EndMessage& message) override; diff --git a/common/ZMQImagePusher.cpp b/common/ZMQImagePusher.cpp index bf6bf01a..96537e5b 100644 --- a/common/ZMQImagePusher.cpp +++ b/common/ZMQImagePusher.cpp @@ -40,11 +40,11 @@ ZMQImagePusher::ZMQImagePusher(const std::vector &addr, } } -void ZMQImagePusher::SendDataInternal(const std::vector &serialized_image, int64_t image_number) { +void ZMQImagePusher::SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) { if (sockets.empty()) return; auto socket_number = (image_number % file_count) % sockets.size(); - sockets[socket_number]->Send(serialized_image.data(), serialized_image.size()); + sockets[socket_number]->Send(image_data, image_size); } void ZMQImagePusher::StartDataCollection(const StartMessage& message) { @@ -56,9 +56,9 @@ void ZMQImagePusher::StartDataCollection(const StartMessage& message) { file_count = message.data_file_count; serializer.SerializeSequenceStart(message); - auto &buffer = serializer.GetBuffer(); + for (const auto &s: sockets) - s->Send(buffer.data(), buffer.size(), true); + s->Send(serializer.GetBuffer(), serializer.GetBufferSize(), true); } @@ -69,9 +69,7 @@ void ZMQImagePusher::EndDataCollection(const EndMessage& message) { for (const auto &s: sockets) { serializer.SerializeSequenceEnd(end_message); - auto &buffer = serializer.GetBuffer(); - - s->Send(buffer.data(), buffer.size(), true); + s->Send(serializer.GetBuffer(), serializer.GetBufferSize(), true); end_message.write_master_file = false; } } \ No newline at end of file diff --git a/common/ZMQImagePusher.h b/common/ZMQImagePusher.h index cea7fb13..ccdaa544 100644 --- a/common/ZMQImagePusher.h +++ b/common/ZMQImagePusher.h @@ -16,8 +16,8 @@ class ZMQImagePusher : public ImagePusher { std::vector> contexts; std::vector> sockets; int64_t file_count = 1; - void SendDataInternal(const std::vector& serialized_image, int64_t image_number) override; public: + void SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) override; ZMQImagePusher(ZMQContext &context, const std::vector& addr, int32_t send_buffer_high_watermark = -1, int32_t send_buffer_size = -1); // High performance implementation, where each socket has dedicated ZMQ context diff --git a/frame_serialize/JFJochFrameDeserializer.cpp b/frame_serialize/JFJochFrameDeserializer.cpp index f3b62e2a..a7c46eb5 100644 --- a/frame_serialize/JFJochFrameDeserializer.cpp +++ b/frame_serialize/JFJochFrameDeserializer.cpp @@ -603,12 +603,16 @@ void JFJochFrameDeserializer::ProcessImageData(CborValue &value) { } void JFJochFrameDeserializer::Process(const std::vector &buffer) { + Process(buffer.data(), buffer.size()); +} + +void JFJochFrameDeserializer::Process(const uint8_t *msg, size_t msg_size) { std::unique_lock ul(m); data_message = DataMessage(); CborParser parser; CborValue value; - cborErr(cbor_parser_init(buffer.data(), buffer.size(), 0, &parser, &value)); + cborErr(cbor_parser_init(msg, msg_size, 0, &parser, &value)); if (GetCBORTag(value) != CborSignatureTag) throw JFJochException(JFJochExceptionCategory::CBORError, "CBOR must start with dedicated tag"); diff --git a/frame_serialize/JFJochFrameDeserializer.h b/frame_serialize/JFJochFrameDeserializer.h index 90011f1b..42172498 100644 --- a/frame_serialize/JFJochFrameDeserializer.h +++ b/frame_serialize/JFJochFrameDeserializer.h @@ -47,6 +47,7 @@ private: bool ProcessEndMessageElement(CborValue &value); public: void Process(const std::vector& buffer); + void Process(const uint8_t *msg, size_t msg_size); [[nodiscard]] Type GetType() const; [[nodiscard]] EndMessage GetEndMessage() const; [[nodiscard]] StartMessage GetStartMessage() const; diff --git a/frame_serialize/JFJochFrameSerializer.cpp b/frame_serialize/JFJochFrameSerializer.cpp index 62e935ce..5dbd51ac 100644 --- a/frame_serialize/JFJochFrameSerializer.cpp +++ b/frame_serialize/JFJochFrameSerializer.cpp @@ -52,9 +52,9 @@ void CBOR_ENC_COMPRESSED(CborEncoder &encoder, const void *image, size_t image_size, CompressionAlgorithm algorithm, size_t elem_size) { - if (algorithm == CompressionAlgorithm::NO_COMPRESSION) { + if (algorithm == CompressionAlgorithm::NO_COMPRESSION) cborErr(cbor_encode_byte_string(&encoder, (uint8_t *) image, image_size)); - } else { + else { cbor_encode_tag(&encoder, TagDECTRISCompression); CborEncoder arrayEncoder; @@ -288,16 +288,19 @@ inline void CBOR_ENC_USER_DATA(CborEncoder &encoder, const StartMessage& message } JFJochFrameSerializer::JFJochFrameSerializer(size_t in_max_buffer_size) : -max_buffer_size(in_max_buffer_size) { - buffer.reserve(in_max_buffer_size); +max_buffer_size(in_max_buffer_size), curr_size(0) { + buffer.resize(max_buffer_size); } -const std::vector &JFJochFrameSerializer::GetBuffer() const { - return buffer; +const uint8_t *JFJochFrameSerializer::GetBuffer() const { + return buffer.data(); +} + +size_t JFJochFrameSerializer::GetBufferSize() const { + return curr_size; } void JFJochFrameSerializer::SerializeSequenceStart(const StartMessage& message) { - buffer.resize(max_buffer_size); CborEncoder encoder, mapEncoder; cbor_encoder_init(&encoder, buffer.data(), buffer.size(), 0); @@ -342,13 +345,10 @@ void JFJochFrameSerializer::SerializeSequenceStart(const StartMessage& message) CBOR_ENC_CHANNELS(mapEncoder, "channels", message.channels); cborErr(cbor_encoder_close_container(&encoder, &mapEncoder)); - - buffer.resize(cbor_encoder_get_buffer_size(&encoder, buffer.data())); + curr_size = cbor_encoder_get_buffer_size(&encoder, buffer.data()); } void JFJochFrameSerializer::SerializeSequenceEnd(const EndMessage& message) { - buffer.resize(max_buffer_size); - CborEncoder encoder, mapEncoder; cbor_encoder_init(&encoder, buffer.data(), buffer.size(), 0); cborErr(cbor_encode_tag(&encoder,CborSignatureTag )); @@ -365,12 +365,10 @@ void JFJochFrameSerializer::SerializeSequenceEnd(const EndMessage& message) { cborErr(cbor_encoder_close_container(&encoder, &mapEncoder)); - buffer.resize(cbor_encoder_get_buffer_size(&encoder, buffer.data())); + curr_size = cbor_encoder_get_buffer_size(&encoder, buffer.data()); } void JFJochFrameSerializer::SerializeImage(const DataMessage& message) { - buffer.resize(max_buffer_size); - CborEncoder encoder, mapEncoder, userDataMapEncoder; cbor_encoder_init(&encoder, buffer.data(), buffer.size(), 0); @@ -397,5 +395,5 @@ void JFJochFrameSerializer::SerializeImage(const DataMessage& message) { cborErr(cbor_encoder_close_container(&encoder, &mapEncoder)); - buffer.resize(cbor_encoder_get_buffer_size(&encoder, buffer.data())); + curr_size = cbor_encoder_get_buffer_size(&encoder, buffer.data()); } diff --git a/frame_serialize/JFJochFrameSerializer.h b/frame_serialize/JFJochFrameSerializer.h index ecc28706..251f407b 100644 --- a/frame_serialize/JFJochFrameSerializer.h +++ b/frame_serialize/JFJochFrameSerializer.h @@ -15,9 +15,11 @@ class JFJochFrameSerializer { std::vector buffer; size_t max_buffer_size; + size_t curr_size; public: explicit JFJochFrameSerializer(size_t buffer_size); - [[nodiscard]] const std::vector &GetBuffer() const; + [[nodiscard]] const uint8_t *GetBuffer() const; + [[nodiscard]] size_t GetBufferSize() const; void SerializeSequenceStart(const StartMessage& message); void SerializeSequenceEnd(const EndMessage& message); void SerializeImage(const DataMessage& message); diff --git a/image_analysis/fast-feedback-indexer b/image_analysis/fast-feedback-indexer index 111b4975..6eccc70b 160000 --- a/image_analysis/fast-feedback-indexer +++ b/image_analysis/fast-feedback-indexer @@ -1 +1 @@ -Subproject commit 111b4975cadc013ad98fb982dfc3d252c9721cb5 +Subproject commit 6eccc70b524ba7a0207aa2c93b85769c1c9f5a62 diff --git a/receiver/JFJochReceiver.cpp b/receiver/JFJochReceiver.cpp index 88c4d658..7bb5591c 100644 --- a/receiver/JFJochReceiver.cpp +++ b/receiver/JFJochReceiver.cpp @@ -413,7 +413,7 @@ int64_t JFJochReceiver::FrameTransformationThread() { if (push_images_to_writer) { PrepareCBORImage(message, experiment, writer_buffer.data(), image_size); serializer.SerializeImage(message); - image_pusher.SendData(serializer.GetBuffer(), image_number); + image_pusher.SendImage(serializer.GetBuffer(), serializer.GetBufferSize(), image_number); } UpdateMaxImage(image_number); diff --git a/tests/CBORTest.cpp b/tests/CBORTest.cpp index 59dfa809..b22bb1dc 100644 --- a/tests/CBORTest.cpp +++ b/tests/CBORTest.cpp @@ -63,10 +63,8 @@ TEST_CASE("CBORSerialize_Start", "[CBOR]") { REQUIRE_NOTHROW(serializer.SerializeSequenceStart(message)); - auto image = serializer.GetBuffer(); - JFJochFrameDeserializer deserializer; - REQUIRE_NOTHROW(deserializer.Process(image)); + REQUIRE_NOTHROW(deserializer.Process(serializer.GetBuffer(), serializer.GetBufferSize())); REQUIRE(deserializer.GetType() == JFJochFrameDeserializer::Type::START); StartMessage output_message; @@ -137,11 +135,9 @@ TEST_CASE("CBORSerialize_End", "[CBOR]") { }; REQUIRE_NOTHROW(serializer.SerializeSequenceEnd(message)); - - auto image = serializer.GetBuffer(); - + JFJochFrameDeserializer deserializer; - REQUIRE_NOTHROW(deserializer.Process(image)); + REQUIRE_NOTHROW(deserializer.Process(serializer.GetBuffer(), serializer.GetBufferSize())); REQUIRE(deserializer.GetType() == JFJochFrameDeserializer::Type::END); EndMessage output_message{}; @@ -188,10 +184,9 @@ TEST_CASE("CBORSerialize_Image", "[CBOR]") { }; REQUIRE_NOTHROW(serializer.SerializeImage(message)); - auto serialized = serializer.GetBuffer(); JFJochFrameDeserializer deserializer; - REQUIRE_NOTHROW(deserializer.Process(serialized)); + REQUIRE_NOTHROW(deserializer.Process(serializer.GetBuffer(), serializer.GetBufferSize())); REQUIRE(deserializer.GetType() == JFJochFrameDeserializer::Type::IMAGE); auto image_array = deserializer.GetDataMessage(); @@ -241,13 +236,11 @@ TEST_CASE("CBORSerialize_Image_2", "[CBOR]") { }; REQUIRE_NOTHROW(serializer.SerializeImage(message)); - auto serialized = serializer.GetBuffer(); JFJochFrameDeserializer deserializer; - REQUIRE_NOTHROW(deserializer.Process(serialized)); + REQUIRE_NOTHROW(deserializer.Process(serializer.GetBuffer(), serializer.GetBufferSize())); REQUIRE(deserializer.GetType() == JFJochFrameDeserializer::Type::IMAGE); - auto image_array = deserializer.GetDataMessage(); REQUIRE(image_array.image.algorithm == CompressionAlgorithm::NO_COMPRESSION); REQUIRE(image_array.image.xpixel == 1024); @@ -288,10 +281,9 @@ TEST_CASE("CBORSerialize_Image_Compressed", "[CBOR]") { }; REQUIRE_NOTHROW(serializer.SerializeImage(message)); - auto serialized = serializer.GetBuffer(); JFJochFrameDeserializer deserializer; - REQUIRE_NOTHROW(deserializer.Process(serialized)); + REQUIRE_NOTHROW(deserializer.Process(serializer.GetBuffer(), serializer.GetBufferSize())); REQUIRE(deserializer.GetType() == JFJochFrameDeserializer::Type::IMAGE); auto image_array = deserializer.GetDataMessage(); @@ -331,10 +323,8 @@ TEST_CASE("CBORSerialize_Image_Rad_Int_Profile", "[CBOR]") { REQUIRE_NOTHROW(serializer.SerializeImage(message)); - auto serialized = serializer.GetBuffer(); - JFJochFrameDeserializer deserializer; - REQUIRE_NOTHROW(deserializer.Process(serialized)); + REQUIRE_NOTHROW(deserializer.Process(serializer.GetBuffer(), serializer.GetBufferSize())); REQUIRE(deserializer.GetType() == JFJochFrameDeserializer::Type::IMAGE); auto image_array = deserializer.GetDataMessage(); @@ -374,10 +364,8 @@ TEST_CASE("CBORSerialize_Image_Spots", "[CBOR]") { REQUIRE_NOTHROW(serializer.SerializeImage(message)); - auto serialized = serializer.GetBuffer(); - JFJochFrameDeserializer deserializer; - REQUIRE_NOTHROW(deserializer.Process(serialized)); + REQUIRE_NOTHROW(deserializer.Process(serializer.GetBuffer(), serializer.GetBufferSize())); REQUIRE(deserializer.GetType() == JFJochFrameDeserializer::Type::IMAGE); auto image_array = deserializer.GetDataMessage(); @@ -452,10 +440,9 @@ TEST_CASE("CBORSerialize_Start_stream2", "[CBOR]") { REQUIRE_NOTHROW(serializer.SerializeSequenceStart(message)); - auto image = serializer.GetBuffer(); stream2_msg *msg; - auto ret = stream2_parse_msg(image.data(), image.size(), &msg); + auto ret = stream2_parse_msg(serializer.GetBuffer(), serializer.GetBufferSize(), &msg); REQUIRE(ret == STREAM2_OK); CHECK(msg->type == STREAM2_MSG_START); auto msg2 = (stream2_start_msg *) msg; @@ -499,10 +486,9 @@ TEST_CASE("CBORSerialize_End_stream2", "[CBOR]") { REQUIRE_NOTHROW(serializer.SerializeSequenceEnd(message)); - auto image = serializer.GetBuffer(); stream2_msg *msg; - auto ret = stream2_parse_msg(image.data(), image.size(), &msg); + auto ret = stream2_parse_msg(serializer.GetBuffer(), serializer.GetBufferSize(), &msg); REQUIRE(ret == STREAM2_OK); CHECK(msg->type == STREAM2_MSG_END); auto msg2 = (stream2_end_msg *) msg; @@ -546,11 +532,10 @@ TEST_CASE("CBORSerialize_Image_compressed_stream2", "[CBOR]") { }; REQUIRE_NOTHROW(serializer.SerializeImage(message)); - auto cbor_image = serializer.GetBuffer(); stream2_msg *msg; - auto ret = stream2_parse_msg(cbor_image.data(), cbor_image.size(), &msg); + auto ret = stream2_parse_msg(serializer.GetBuffer(), serializer.GetBufferSize(), &msg); REQUIRE(ret == STREAM2_OK); CHECK(msg->type == STREAM2_MSG_IMAGE); auto msg2 = (stream2_image_msg *) msg; @@ -601,7 +586,7 @@ TEST_CASE("CBORSerialize_Image_uncompressed_stream2", "[CBOR]") { stream2_msg *msg; - auto ret = stream2_parse_msg(cbor_image.data(), cbor_image.size(), &msg); + auto ret = stream2_parse_msg(serializer.GetBuffer(), serializer.GetBufferSize(), &msg); REQUIRE(ret == STREAM2_OK); CHECK(msg->type == STREAM2_MSG_IMAGE); auto msg2 = (stream2_image_msg *) msg; diff --git a/tests/ZMQImagePusherTest.cpp b/tests/ZMQImagePusherTest.cpp index 2dda7ecf..68d080d8 100644 --- a/tests/ZMQImagePusherTest.cpp +++ b/tests/ZMQImagePusherTest.cpp @@ -97,7 +97,7 @@ TEST_CASE("ZMQImageCommTest_1Writer","[ZeroMQ]") { data_message.number = i; PrepareCBORImage(data_message, x, image1.data() + i * x.GetPixelsNum(), x.GetPixelsNum() * sizeof(uint16_t)); serializer.SerializeImage(data_message); - pusher.SendData(serializer.GetBuffer(), i); + pusher.SendImage(serializer.GetBuffer(), serializer.GetBufferSize(), i); } pusher.EndDataCollection(end_message); @@ -172,7 +172,7 @@ TEST_CASE("ZMQImageCommTest_2Writers","[ZeroMQ]") { data_message.number = i; PrepareCBORImage(data_message, x, image1.data() + i * x.GetPixelsNum(), x.GetPixelsNum() * sizeof(uint16_t)); serializer.SerializeImage(data_message); - pusher.SendData(serializer.GetBuffer(), i); + pusher.SendImage(serializer.GetBuffer(), serializer.GetBufferSize(), i); } pusher.EndDataCollection(end_message); @@ -260,7 +260,7 @@ TEST_CASE("ZMQImageCommTest_4Writers","[ZeroMQ]") { data_message.number = i; PrepareCBORImage(data_message, x, image1.data() + i * x.GetPixelsNum(), x.GetPixelsNum() * sizeof(uint16_t)); serializer.SerializeImage(data_message); - pusher.SendData(serializer.GetBuffer(), i); + pusher.SendImage(serializer.GetBuffer(), serializer.GetBufferSize(), i); } pusher.EndDataCollection(end_message); diff --git a/writer/jfjoch_writer_test.cpp b/writer/jfjoch_writer_test.cpp index ea55712a..90d8c7c5 100644 --- a/writer/jfjoch_writer_test.cpp +++ b/writer/jfjoch_writer_test.cpp @@ -102,7 +102,7 @@ int main(int argc, char **argv) { data_message.number = i; PrepareCBORImage(data_message, x, output[i % nimages_in_file].data(), output_size[i % nimages_in_file]); serializer.SerializeImage(data_message); - pusher.SendData(serializer.GetBuffer(), i); + pusher.SendImage(serializer.GetBuffer(), serializer.GetBufferSize(), i); } EndMessage end_message{};