239a441ee6
Build Packages / Unit tests (push) Successful in 1h20m34s
Build Packages / build:rpm (rocky8) (push) Successful in 13m32s
Build Packages / Generate python client (push) Successful in 24s
Build Packages / build:rpm (ubuntu2404_nocuda) (push) Successful in 13m6s
Build Packages / build:rpm (rocky9_sls9) (push) Successful in 11m32s
Build Packages / XDS test (durin plugin) (push) Successful in 10m49s
Build Packages / build:rpm (ubuntu2404) (push) Successful in 14m8s
Build Packages / DIALS test (push) Successful in 14m57s
Build Packages / Build documentation (push) Successful in 47s
Build Packages / build:rpm (rocky8_nocuda) (push) Successful in 13m30s
Build Packages / build:rpm (rocky9_nocuda) (push) Successful in 14m23s
Build Packages / build:rpm (rocky8_sls9) (push) Successful in 14m40s
Build Packages / Create release (push) Has been skipped
Build Packages / build:rpm (ubuntu2204_nocuda) (push) Successful in 13m14s
Build Packages / build:rpm (ubuntu2204) (push) Successful in 11m55s
Build Packages / build:rpm (rocky9) (push) Successful in 14m23s
Build Packages / XDS test (JFJoch plugin) (push) Successful in 9m48s
Build Packages / XDS test (neggia plugin) (push) Successful in 7m10s
This is an UNSTABLE release. The release has significant modifications and bug fixes, if things go wrong, it is better to revert to 1.0.0-rc.132. * jfjoch_broker: For DECTRIS detectors, ZeroMQ link is persistent, to save time for establishing new connection * jfjoch_broker: Minor bug fixes for rare conditions Reviewed-on: #50
139 lines
4.8 KiB
C++
139 lines
4.8 KiB
C++
// SPDX-FileCopyrightText: 2024 Filip Leonarski, Paul Scherrer Institute <filip.leonarski@psi.ch>
|
|
// SPDX-License-Identifier: GPL-3.0-only
|
|
|
|
#include "ZMQImagePuller.h"
|
|
#include "../frame_serialize/CBORStream2Serializer.h"
|
|
|
|
ZMQImagePuller::ZMQImagePuller(const std::string &in_addr,
|
|
const std::string &repub_address,
|
|
const std::optional<int32_t> &rcv_watermark,
|
|
const std::optional<int32_t> &repub_watermark) :
|
|
socket (ZMQSocketType::Pull), addr(in_addr) {
|
|
auto start_time = std::chrono::steady_clock::now();
|
|
puller_thread = std::thread(&ZMQImagePuller::PullerThread, this);
|
|
cbor_thread = std::thread(&ZMQImagePuller::CBORThread, this);
|
|
|
|
socket.ReceiveWaterMark(rcv_watermark.value_or(default_receive_watermark));
|
|
socket.ReceiveTimeout(ReceiveTimeout);
|
|
socket.Connect(addr);
|
|
|
|
if (!repub_address.empty()) {
|
|
repub_socket = std::make_unique<ZMQSocket>(ZMQSocketType::Push);
|
|
repub_socket->SendWaterMark(repub_watermark.value_or(default_repub_watermark));
|
|
repub_socket->SendTimeout(RepubTimeout);
|
|
repub_socket->Bind(repub_address);
|
|
repub_thread = std::thread(&ZMQImagePuller::RepubThread, this);
|
|
}
|
|
auto end_time = std::chrono::steady_clock::now();
|
|
auto duration = std::chrono::duration<float>(end_time - start_time);
|
|
logger.Info("ZMQImagePuller connected to {} in {:.3f} s", addr, duration.count());
|
|
}
|
|
|
|
ZMQImagePuller::~ZMQImagePuller() {
|
|
ZMQImagePuller::Disconnect();
|
|
}
|
|
|
|
void ZMQImagePuller::Disconnect() {
|
|
disconnect = 1;
|
|
|
|
if (puller_thread.joinable())
|
|
puller_thread.join();
|
|
if (cbor_thread.joinable())
|
|
cbor_thread.join();
|
|
if (repub_thread.joinable())
|
|
repub_thread.join();
|
|
|
|
if (!addr.empty())
|
|
socket.Disconnect(addr);
|
|
|
|
addr = "";
|
|
}
|
|
|
|
void ZMQImagePuller::PullerThread() {
|
|
while (true) {
|
|
ImagePullerOutput ret;
|
|
ret.zmq_msg = std::make_shared<ZMQMessage>();
|
|
bool received = false;
|
|
while (!received) {
|
|
if (disconnect) {
|
|
cbor_fifo.PutBlocking(ImagePullerOutput{});
|
|
return;
|
|
}
|
|
try {
|
|
received = socket.Receive(*ret.zmq_msg, false);
|
|
if (!received)
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(1));
|
|
} catch (const JFJochException &e) {
|
|
logger.ErrorException(e);
|
|
}
|
|
}
|
|
cbor_fifo.PutBlocking(ret);
|
|
}
|
|
}
|
|
|
|
void ZMQImagePuller::CBORThread() {
|
|
auto ret = cbor_fifo.GetBlocking();
|
|
|
|
while (ret.zmq_msg) {
|
|
try {
|
|
ret.cbor = CBORStream2Deserialize(ret.zmq_msg->data(), ret.zmq_msg->size());
|
|
// Even if we suspend consuming of the messages by the receiver,
|
|
// it is still reasonable to have them republished
|
|
// so republish functionality is not affected by Suspend()
|
|
if (!suspend)
|
|
outside_fifo.PutBlocking(ret);
|
|
if (repub_socket) {
|
|
if ((ret.cbor->msg_type == CBORImageType::START)
|
|
|| (ret.cbor->msg_type == CBORImageType::END))
|
|
repub_fifo.PutBlocking(ret);
|
|
else
|
|
repub_fifo.Put(ret);
|
|
}
|
|
} catch (const JFJochException &e) {
|
|
logger.ErrorException(e);
|
|
}
|
|
ret = cbor_fifo.GetBlocking();
|
|
}
|
|
if (repub_socket)
|
|
repub_fifo.PutBlocking(ret);
|
|
outside_fifo.PutBlocking(ret);
|
|
}
|
|
|
|
void ZMQImagePuller::RepubThread() {
|
|
auto ret = repub_fifo.GetBlocking();
|
|
bool repub_active = false;
|
|
|
|
while (ret.zmq_msg) {
|
|
try {
|
|
if (ret.cbor->msg_type == CBORImageType::START) {
|
|
// Start message needs to be cleaned when running republish
|
|
StartMessage msg = ret.cbor->start_message.value();
|
|
msg.writer_notification_zmq_addr = "";
|
|
std::vector<uint8_t> serialization_buffer(256*1024*1024);
|
|
CBORStream2Serializer serializer(serialization_buffer.data(), serialization_buffer.size());
|
|
serializer.SerializeSequenceStart(msg);
|
|
repub_active = repub_socket->Send(serialization_buffer.data(), serializer.GetBufferSize(), true);
|
|
if (repub_active)
|
|
logger.Info("Republish active");
|
|
} else {
|
|
if (repub_active)
|
|
repub_socket->Send(ret.zmq_msg->data(), ret.zmq_msg->size(), true);
|
|
}
|
|
} catch (const JFJochException &e) {
|
|
logger.ErrorException(e);
|
|
}
|
|
ret = repub_fifo.GetBlocking();
|
|
}
|
|
if (repub_active)
|
|
logger.Info("Republish finished");
|
|
}
|
|
|
|
void ZMQImagePuller::Suspend() {
|
|
suspend = true;
|
|
}
|
|
|
|
void ZMQImagePuller::ResumeAndClear() {
|
|
outside_fifo.Clear();
|
|
outside_fifo.ClearMaxUtilization();
|
|
suspend = false;
|
|
} |