// Copyright (2019-2022) Paul Scherrer Institute // SPDX-License-Identifier: GPL-3.0-or-later #include #include "ZMQImagePuller.h" ZMQImagePuller::ZMQImagePuller(ZMQContext &context) : socket (context, ZMQSocketType::Pull) { socket.ReceiveWaterMark(ReceiverWaterMark); socket.ReceiveTimeout(ReceiveTimeout); zmq_recv_buffer.reserve(2*1024*1024); // Reasonable size 2 MiB } void ZMQImagePuller::Connect(const std::string &in_address) { Disconnect(); abort = 0; addr = in_address; socket.Connect(in_address); } void ZMQImagePuller::Disconnect() { if (!addr.empty()) socket.Disconnect(addr); addr = ""; } void ZMQImagePuller::Abort() { abort = 1; } void ZMQImagePuller::WaitForImage() { int64_t msg_size = -1; while ((msg_size < 0) && (!abort)) msg_size = socket.Receive(zmq_recv_buffer, true, true); if (!abort) { deserializer.Process(zmq_recv_buffer); if (deserializer.GetType() == JFJochFrameDeserializer::Type::START) { start_time = std::chrono::system_clock::now(); start_message = std::make_unique(deserializer.GetStartMessage()); end_message.reset(); } else if (deserializer.GetType() == JFJochFrameDeserializer::Type::END) { end_message = std::make_unique(deserializer.GetEndMessage()); end_time = std::chrono::system_clock::now(); } else if (deserializer.GetType() == JFJochFrameDeserializer::Type::IMAGE) { processed_images++; deserialized_image_message = std::make_unique(deserializer.GetDataMessage()); processed_size += deserialized_image_message->image.size; } } } const DataMessage &ZMQImagePuller::GetDataMessage() const { if (deserialized_image_message) return *deserialized_image_message; else throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Image message not received so far"); } JFJochFrameDeserializer::Type ZMQImagePuller::GetFrameType() const { if (abort) return JFJochFrameDeserializer::Type::END; else return deserializer.GetType(); } ZMQImagePullerStatistics ZMQImagePuller::GetStatistics() { float perf_MBs = 0.0f, perf_Hz = 0.0f; if (processed_images > 0) { auto time_us = std::chrono::duration_cast(end_time - start_time); // MByte/s ==> Byte/us perf_MBs = static_cast(processed_size) / static_cast(time_us.count()); perf_Hz = static_cast(processed_images) * 1e6f / static_cast(time_us.count()); } return { .processed_images = processed_images, .performance_MBs = perf_MBs, .performance_Hz = perf_Hz }; } StartMessage ZMQImagePuller::GetStartMessage() const { if (start_message) return *start_message; else throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Start message not received so far"); } EndMessage ZMQImagePuller::GetEndMessage() const { if (end_message) return *end_message; else throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Start message not received so far"); }