// SPDX-FileCopyrightText: 2024 Filip Leonarski, Paul Scherrer Institute // SPDX-License-Identifier: GPL-3.0-only #include "ZMQMetadataSocket.h" #include "../frame_serialize/CBORStream2Serializer.h" #include "../common/ZMQWrappers.h" #include "../common/Definitions.h" #define MESSAGE_SIZE_FOR_IMAGE (20*1024) // 20 kB ZMQMetadataSocket::ZMQMetadataSocket(const std::string &addr) : metadata_message({}), socket(ZMQSocketType::Pub), counter(std::chrono::seconds(1)) { socket.SendWaterMark(DefaultSendWatermark); socket.Bind(addr); first_image = true; } void ZMQMetadataSocket::StartDataCollection(const StartMessage &message) { std::unique_lock ul(m); if (buffer.size() < MESSAGE_SIZE_FOR_START_END) buffer.resize(MESSAGE_SIZE_FOR_START_END); CBORStream2Serializer serializer(buffer.data(), buffer.size()); serializer.SerializeSequenceStart(message); socket.Send(buffer.data(), serializer.GetBufferSize(), true); first_image = true; } void ZMQMetadataSocket::SendAll() { if (buffer.size() < metadata_message.images.size() * MESSAGE_SIZE_FOR_IMAGE) buffer.resize(metadata_message.images.size() * MESSAGE_SIZE_FOR_IMAGE); // Serialize CBORStream2Serializer serializer(buffer.data(), buffer.size()); serializer.SerializeMetadata(metadata_message); // Stream out socket.Send(buffer.data(), serializer.GetBufferSize(), false); metadata_message = MetadataMessage{}; } void ZMQMetadataSocket::AddDataMessage(const DataMessage &msg) { std::unique_lock ul(m); if (first_image) { // GeneratePreview() is designed to fire on the first try. This would result however in situation where // first metadata packet always contains only 1 image, and this is not desired behavior. // Therefore, on first use there is one extra "artificial" check of generate preview, that starts the clock counter.GeneratePreview(); metadata_message.run_name = msg.run_name; metadata_message.run_number = msg.run_number; first_image = false; } metadata_message.images.emplace_back(msg); if (counter.GeneratePreview()) SendAll(); } void ZMQMetadataSocket::EndDataCollection(const EndMessage &message) { std::unique_lock ul(m); if (!metadata_message.images.empty()) SendAll(); if (buffer.size() < MESSAGE_SIZE_FOR_START_END) buffer.resize(MESSAGE_SIZE_FOR_START_END); CBORStream2Serializer serializer(buffer.data(), buffer.size()); serializer.SerializeSequenceEnd(message); socket.Send(buffer.data(), serializer.GetBufferSize(), true); } ZMQMetadataSocket &ZMQMetadataSocket::Period(const std::optional &period) { counter.Period(period); return *this; } ZMQMetadataSocket& ZMQMetadataSocket::ImportSettings(const ZMQMetadataSettings &settings) { std::unique_lock ul(m); Period(settings.period); return *this; } ZMQMetadataSettings ZMQMetadataSocket::GetSettings() { std::unique_lock ul(m); return ZMQMetadataSettings{ .period = counter.GetPeriod(), .address = socket.GetEndpointName() }; } std::string ZMQMetadataSocket::GetAddress() { return socket.GetEndpointName(); }