// Copyright (2019-2024) Paul Scherrer Institute #include "ZMQStream2PusherGroup.h" #include "CBORStream2Serializer.h" ZMQStream2PusherGroup::ZMQStream2PusherGroup(ZMQContext &zmq_context, const std::vector &addr, int32_t send_buffer_high_watermark, int32_t send_buffer_size) { if (addr.empty()) throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "No writer ZMQ address provided"); for (const auto &a : addr) pusher.emplace_back(std::make_unique (zmq_context, a, send_buffer_high_watermark, send_buffer_size)); } ZMQStream2PusherGroup::ZMQStream2PusherGroup(const std::vector &addr, int32_t send_buffer_high_watermark, int32_t send_buffer_size) { if (addr.empty()) throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "No writer ZMQ address provided"); for (const auto &a : addr) pusher.emplace_back(std::make_unique (a, send_buffer_high_watermark, send_buffer_size)); } bool ZMQStream2PusherGroup::SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number) { if (!pusher.empty()) { auto socket_number = (image_number / images_per_file) % pusher.size(); return pusher[socket_number]->SendImage(image_data, image_size, image_number); } else return false; } void ZMQStream2PusherGroup::SendImage(const uint8_t *image_data, size_t image_size, int64_t image_number, ZeroCopyReturnValue *z) { if (!pusher.empty()) { auto socket_number = (image_number / images_per_file) % pusher.size(); pusher[socket_number]->SendImage(image_data, image_size, image_number, z); } } void ZMQStream2PusherGroup::StartDataCollection(StartMessage& message) { if (message.images_per_file < 1) throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Images per file cannot be zero or negative"); images_per_file = message.images_per_file; for (auto &p: pusher) { p->StartDataCollection(message); message.write_master_file = false; } } bool ZMQStream2PusherGroup::SendCalibration(const CompressedImage &message) { if (pusher.empty()) return false; return pusher[0]->SendCalibration(message); } bool ZMQStream2PusherGroup::EndDataCollection(const EndMessage& message) { bool ret = true; for (auto &p: pusher) { if (!p->EndDataCollection(message)) ret = false; } return ret; } std::vector ZMQStream2PusherGroup::GetAddress() { std::vector ret; for (auto &p: pusher) ret.push_back(p->GetAddress()); return ret; }