ZmqSocket prototype

This commit is contained in:
Erik Frojdh
2024-04-03 10:50:35 +02:00
parent 6644406535
commit 70b0ae2ae8
7 changed files with 239 additions and 2 deletions

View File

@ -4,6 +4,7 @@ set(SourceFiles
${CMAKE_CURRENT_SOURCE_DIR}/src/defs.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/DType.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/Frame.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/ZmqSocket.cpp
)
@ -11,7 +12,7 @@ add_library(core STATIC ${SourceFiles})
target_include_directories(core PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include)
target_link_libraries(core PUBLIC fmt::fmt PRIVATE aare_compiler_flags utils)
target_link_libraries(core PUBLIC fmt::fmt PRIVATE aare_compiler_flags utils libzmq)
if (AARE_PYTHON_BINDINGS)
set_property(TARGET core PROPERTY POSITION_INDEPENDENT_CODE ON)

View File

@ -0,0 +1,97 @@
#pragma once
#include <array>
#include <map>
#include <string>
#include <cstdint>
// Socket to receive data from a ZMQ publisher
// needs to be in sync with the main library (or maybe better use the versioning in the header)
// forward declare zmq_msg_t to avoid including zmq.h in the header
class zmq_msg_t;
namespace aare{
/** zmq header structure (from slsDetectorPackage)*/
struct zmqHeader {
/** true if incoming data, false if end of acquisition */
bool data{true};
uint32_t jsonversion{0};
uint32_t dynamicRange{0};
uint64_t fileIndex{0};
/** number of detectors/port in x axis */
uint32_t ndetx{0};
/** number of detectors/port in y axis */
uint32_t ndety{0};
/** number of pixels/channels in x axis for this zmq socket */
uint32_t npixelsx{0};
/** number of pixels/channels in y axis for this zmq socket */
uint32_t npixelsy{0};
/** number of bytes for an image in this socket */
uint32_t imageSize{0};
/** frame number from detector */
uint64_t acqIndex{0};
/** frame index (starting at 0 for each acquisition) */
uint64_t frameIndex{0};
/** progress in percentage */
double progress{0};
/** file name prefix */
std::string fname;
/** header from detector */
uint64_t frameNumber{0};
uint32_t expLength{0};
uint32_t packetNumber{0};
uint64_t detSpec1{0};
uint64_t timestamp{0};
uint16_t modId{0};
uint16_t row{0};
uint16_t column{0};
uint16_t detSpec2{0};
uint32_t detSpec3{0};
uint16_t detSpec4{0};
uint8_t detType{0};
uint8_t version{0};
/** if rows of image should be flipped */
int flipRows{0};
/** quad type (eiger hardware specific) */
uint32_t quad{0};
/** true if complete image, else missing packets */
bool completeImage{false};
/** additional json header */
std::map<std::string, std::string> addJsonHeader;
/** (xmin, xmax, ymin, ymax) roi only in files written */
std::array<int, 4> rx_roi{};
};
class ZmqSocket{
void *m_context{nullptr};
void *m_socket{nullptr};
std::string m_endpoint;
int m_zmq_hwm{1000};
int m_timeout_ms{1000};
constexpr static size_t m_max_header_size = 1024;
char* m_header_buffer = new char[m_max_header_size];
bool decode_header(zmqHeader &h);
public:
ZmqSocket(const std::string& endpoint);
~ZmqSocket();
ZmqSocket(const ZmqSocket&) = delete;
ZmqSocket operator=(const ZmqSocket&) = delete;
ZmqSocket(ZmqSocket&&) = delete;
void connect();
void disconnect();
void set_zmq_hwm(int hwm);
void set_timeout_ms(int n);
int receive(zmqHeader &header, std::byte *data);
};
} // namespace aare

87
core/src/ZmqSocket.cpp Normal file
View File

@ -0,0 +1,87 @@
#include "aare/ZmqSocket.hpp"
#include <zmq.h>
#include <fmt/core.h>
namespace aare{
ZmqSocket::ZmqSocket(const std::string& endpoint):m_endpoint(endpoint){
memset(m_header_buffer, 0, m_max_header_size);
}
void ZmqSocket::connect(){
m_context = zmq_ctx_new();
m_socket = zmq_socket(m_context, ZMQ_SUB);
fmt::print("Setting ZMQ_RCVHWM to {}\n", m_zmq_hwm);
int rc = zmq_setsockopt(m_socket, ZMQ_RCVHWM, &m_zmq_hwm, sizeof(m_zmq_hwm)); //should be set before connect
if (rc)
throw std::runtime_error(fmt::format("Could not set ZMQ_RCVHWM: {}", strerror(errno)));
int bufsize = 1024*1024*m_zmq_hwm;
fmt::print("Setting ZMQ_RCVBUF to: {} MB\n", bufsize/(1024*1024));
rc = zmq_setsockopt(m_socket, ZMQ_RCVBUF, &bufsize, sizeof(bufsize));
if (rc)
throw std::runtime_error(fmt::format("Could not set ZMQ_RCVBUF: {}", strerror(errno)));
zmq_connect(m_socket, m_endpoint.c_str());
zmq_setsockopt(m_socket, ZMQ_SUBSCRIBE, "", 0);
}
void ZmqSocket::disconnect() {
zmq_close(m_socket);
zmq_ctx_destroy(m_context);
m_socket = nullptr;
m_context = nullptr;
}
ZmqSocket::~ZmqSocket(){
if (m_socket)
disconnect();
delete[] m_header_buffer;
}
void ZmqSocket::set_zmq_hwm(int hwm){
m_zmq_hwm = hwm;
}
void ZmqSocket::set_timeout_ms(int n){
m_timeout_ms = n;
}
int ZmqSocket::receive(zmqHeader &header, std::byte *data){
//receive header
int header_bytes_received = zmq_recv(m_socket, m_header_buffer, m_max_header_size, 0);
m_header_buffer[header_bytes_received] = '\0'; //make sure we zero terminate
if (header_bytes_received < 0){
fmt::print("Error receiving header: {}\n", strerror(errno));
return -1;
}
fmt::print("Bytes: {}, Header: {}\n", header_bytes_received, m_header_buffer);
//decode header
if (!decode_header(header)){
fmt::print("Error decoding header\n");
return -1;
}
//do we have a multipart message (data following header)?
int more;
size_t more_size = sizeof(more);
zmq_getsockopt(m_socket, ZMQ_RCVMORE, &more, &more_size);
if (!more) {
return 0; //no data following header
}else{
int data_bytes_received = zmq_recv(m_socket, data, 1024*1024*2, 0); //TODO! configurable size!!!!
if (data_bytes_received == -1)
throw std::runtime_error("Got half of a multipart msg!!!");
}
return 1;
}
bool ZmqSocket::decode_header(zmqHeader &h){
//TODO: implement
return true;
}
} // namespace aare