109 lines
3.5 KiB
C++
109 lines
3.5 KiB
C++
// Copyright (2019-2023) Paul Scherrer Institute
|
|
|
|
#include <cmath>
|
|
#include "ZMQImagePuller.h"
|
|
|
|
ZMQImagePuller::ZMQImagePuller(ZMQContext &context, const std::string &repub_address) :
|
|
socket (context, ZMQSocketType::Pull) {
|
|
socket.ReceiveWaterMark(ReceiverWaterMark);
|
|
socket.ReceiveTimeout(ReceiveTimeout);
|
|
|
|
if (!repub_address.empty()) {
|
|
repub_socket = std::make_unique<ZMQSocket>(context, ZMQSocketType::Push);
|
|
repub_socket->SendWaterMark(100);
|
|
repub_socket->SendTimeout(std::chrono::milliseconds(100));
|
|
repub_socket->Bind(repub_address);
|
|
}
|
|
}
|
|
|
|
void ZMQImagePuller::Connect(const std::string &in_address) {
|
|
Disconnect();
|
|
|
|
abort = 0;
|
|
addr = in_address;
|
|
socket.Connect(in_address);
|
|
}
|
|
|
|
void ZMQImagePuller::Disconnect() {
|
|
if (!addr.empty())
|
|
socket.Disconnect(addr);
|
|
addr = "";
|
|
}
|
|
|
|
void ZMQImagePuller::Abort() {
|
|
abort = 1;
|
|
}
|
|
|
|
bool ZMQImagePuller::WaitForImage() {
|
|
bool received;
|
|
|
|
do
|
|
received = socket.Receive(msg, true);
|
|
while (!received && !abort);
|
|
|
|
if (received) {
|
|
deserializer.Process(msg.data(), msg.size());
|
|
switch (deserializer.GetType()) {
|
|
case CBORStream2Deserializer::Type::START:
|
|
start_message = std::make_unique<StartMessage>(deserializer.GetStartMessage());
|
|
end_message.reset();
|
|
break;
|
|
case CBORStream2Deserializer::Type::END:
|
|
end_message = std::make_unique<EndMessage>(deserializer.GetEndMessage());
|
|
break;
|
|
case CBORStream2Deserializer::Type::IMAGE:
|
|
deserialized_image_message = std::make_unique<DataMessage>(deserializer.GetDataMessage());
|
|
break;
|
|
case CBORStream2Deserializer::Type::CALIBRATION:
|
|
calibration_message = std::make_unique<CompressedImage>(deserializer.GetCalibrationImage());
|
|
break;
|
|
case CBORStream2Deserializer::Type::NONE:
|
|
break;
|
|
}
|
|
|
|
if (repub_socket) {
|
|
// Republishing is non-blocking for images
|
|
// and blocking (with 100ms timeout) for START/END
|
|
repub_socket->Send(msg.data(), msg.size(), deserializer.GetType() != CBORStream2Deserializer::Type::IMAGE);
|
|
}
|
|
|
|
return true;
|
|
} else
|
|
return false; // This is all kinds of error
|
|
}
|
|
|
|
const DataMessage &ZMQImagePuller::GetDataMessage() const {
|
|
if (deserialized_image_message)
|
|
return *deserialized_image_message;
|
|
else
|
|
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Image message not received so far");
|
|
}
|
|
|
|
CBORStream2Deserializer::Type ZMQImagePuller::GetFrameType() const {
|
|
if (abort)
|
|
return CBORStream2Deserializer::Type::NONE;
|
|
else
|
|
return deserializer.GetType();
|
|
}
|
|
|
|
StartMessage ZMQImagePuller::GetStartMessage() const {
|
|
if (start_message)
|
|
return *start_message;
|
|
else
|
|
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Start message not received so far");
|
|
}
|
|
|
|
EndMessage ZMQImagePuller::GetEndMessage() const {
|
|
if (end_message)
|
|
return *end_message;
|
|
else
|
|
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Start message not received so far");
|
|
}
|
|
|
|
CompressedImage ZMQImagePuller::GetCalibrationMessage() const {
|
|
if (calibration_message)
|
|
return *calibration_message;
|
|
else
|
|
throw JFJochException(JFJochExceptionCategory::InputParameterInvalid, "Calibration message not received so far");
|
|
}
|