zmq sender and receiver examples

This commit is contained in:
Bechir Braham 2024-04-04 15:51:18 +02:00
parent f772434072
commit 47cf462f3d
No known key found for this signature in database
GPG Key ID: 7F511B55FD8E9671
13 changed files with 506 additions and 115 deletions

View File

@ -5,6 +5,6 @@ channels:
dependencies:
- fmt
- pybind11
- nlohmann_json
- nlohmann_json # should be removed
- catch2
- zeromq

View File

@ -7,9 +7,19 @@ int main() {
aare::ZmqSocketReceiver socket(endpoint);
socket.connect();
char *data = new char[1024 * 1024 * 10];
aare::zmqHeader header;
aare::ZmqHeader header;
while (true) {
int rc = socket.receive(header, reinterpret_cast<std::byte *>(data));
aare::logger::info("Received header: ", header.to_string());
auto *data_int = reinterpret_cast<uint32_t *>(data);
for (int i=0;i<header.npixelsx;i++){
for (int j=0;j<header.npixelsy;j++){
// verify that the sent data is correct
assert(data_int[i*header.npixelsy+j] == (i+j));
}
}
aare::logger::info("Frame verified");
}
delete[] data;
return 0;

View File

@ -1,16 +1,36 @@
#include "aare/ZmqSocketReceiver.hpp"
#include "aare/Frame.hpp"
#include "aare/ZmqSocketSender.hpp"
#include "aare/utils/logger.hpp"
#include <fmt/core.h>
#include <string>
#include <unistd.h> // sleep
using namespace aare;
int main() {
std::string endpoint = "tcp://localhost:5555";
aare::ZmqSocketReceiver socket(endpoint);
socket.connect();
char *data = new char[1024 * 1024 * 10];
aare::zmqHeader header;
while (true) {
int rc = socket.receive(header, reinterpret_cast<std::byte *>(data));
std::string endpoint = "tcp://*:5555";
aare::ZmqSocketSender socket(endpoint);
socket.bind();
Frame frame(1024, 1024, sizeof(uint32_t) * 8);
for (int i = 0; i < 1024; i++) {
for (int j = 0; j < 1024; j++) {
frame.set(i, j, i + j);
}
}
aare::ZmqHeader header;
header.npixelsx = 1024;
header.npixelsy = 1024;
header.imageSize = sizeof(uint32_t) * 1024 * 1024;
header.dynamicRange = 32;
int i = 0;
while (true) {
aare::logger::info("Sending frame:", i++);
aare::logger::info("Header size:", sizeof(header.to_string()));
aare::logger::info("Frame size:", frame.size(), "\n");
int rc = socket.send(header, frame.data(), frame.size());
sleep(1);
}
delete[] data;
return 0;
}

View File

@ -1,6 +1,27 @@
add_library(network_io STATIC src/ZmqSocketReceiver.cpp)
FetchContent_Declare(
simdjson
GIT_REPOSITORY https://github.com/simdjson/simdjson.git
GIT_TAG tags/v3.8.0
GIT_SHALLOW TRUE
)
FetchContent_MakeAvailable(simdjson)
add_library(network_io STATIC
src/ZmqSocketReceiver.cpp
src/ZmqSocketSender.cpp
src/ZmqSocket.cpp
src/ZmqHeader.cpp
)
target_include_directories(network_io PUBLIC include)
target_link_libraries(network_io PRIVATE libzmq fmt::fmt core utils aare_compiler_flags)
target_link_libraries(network_io PRIVATE simdjson libzmq fmt::fmt core utils aare_compiler_flags )
# target_link_libraries(network_io LINK_PRIVATE )
if(AARE_PYTHON_BINDINGS)
set_property(TARGET file_io PROPERTY POSITION_INDEPENDENT_CODE ON)
@ -12,4 +33,4 @@ endif()
# )
# target_sources(tests PRIVATE ${TestSources} )
# target_link_libraries(tests PRIVATE core network_io)
# endif()
# endif()

View File

@ -0,0 +1,106 @@
#include "aare/utils/logger.hpp"
#include "simdjson.h"
#include <array>
#include <cstdint>
#include <map>
#include <string>
namespace simdjson {
/**
* @brief cast a simdjson::ondemand::value to a std::array<int,4>
* useful for writing rx_roi from json header
*/
template <> simdjson_inline simdjson::simdjson_result<std::array<int, 4>> simdjson::ondemand::value::get() noexcept {
ondemand::array array;
auto error = get_array().get(array);
if (error) {
return error;
}
std::array<int, 4> arr;
int i = 0;
for (auto v : array) {
int64_t val;
error = v.get_int64().get(val);
if (error) {
return error;
}
arr[i++] = val;
}
return arr;
}
/**
* @brief cast a simdjson::ondemand::value to a uint32_t
* adds a check for 32bit overflow
*/
template <> simdjson_inline simdjson::simdjson_result<uint32_t> simdjson::ondemand::value::get() noexcept {
size_t val;
auto error = get_uint64().get(val);
if (error) {
return error;
}
if (val > std::numeric_limits<uint32_t>::max()) {
return 1;
}
return static_cast<uint32_t>(val);
}
} // namespace simdjson
namespace aare {
/** zmq header structure (from slsDetectorPackage)*/
struct ZmqHeader {
/** true if incoming data, false if end of acquisition */
bool data{true};
uint32_t jsonversion{0};
uint32_t dynamicRange{0};
uint64_t fileIndex{0};
/** number of detectors/port in x axis */
uint32_t ndetx{0};
/** number of detectors/port in y axis */
uint32_t ndety{0};
/** number of pixels/channels in x axis for this zmq socket */
uint32_t npixelsx{0};
/** number of pixels/channels in y axis for this zmq socket */
uint32_t npixelsy{0};
/** number of bytes for an image in this socket */
uint32_t imageSize{0};
/** frame number from detector */
uint64_t acqIndex{0};
/** frame index (starting at 0 for each acquisition) */
uint64_t frameIndex{0};
/** progress in percentage */
double progress{0};
/** file name prefix */
std::string fname;
/** header from detector */
uint64_t frameNumber{0};
uint32_t expLength{0};
uint32_t packetNumber{0};
uint64_t detSpec1{0};
uint64_t timestamp{0};
uint16_t modId{0};
uint16_t row{0};
uint16_t column{0};
uint16_t detSpec2{0};
uint32_t detSpec3{0};
uint16_t detSpec4{0};
uint8_t detType{0};
uint8_t version{0};
/** if rows of image should be flipped */
int flipRows{0};
/** quad type (eiger hardware specific) */
uint32_t quad{0};
/** true if complete image, else missing packets */
bool completeImage{false};
/** additional json header */
std::map<std::string, std::string> addJsonHeader;
/** (xmin, xmax, ymin, ymax) roi only in files written */
std::array<int, 4> rx_roi{};
/** serialize struct to json string */
std::string to_string() const;
void from_string(std::string &s);
};
} // namespace aare

View File

@ -0,0 +1,40 @@
#pragma once
#include <array>
#include <map>
#include <string>
// Socket to receive data from a ZMQ publisher
// needs to be in sync with the main library (or maybe better use the versioning in the header)
// forward declare zmq_msg_t to avoid including zmq.h in the header
class zmq_msg_t;
namespace aare {
class ZmqSocket {
protected:
void *m_context{nullptr};
void *m_socket{nullptr};
std::string m_endpoint;
int m_zmq_hwm{1000};
int m_timeout_ms{1000};
size_t m_potential_frame_size{1024 * 1024};
constexpr static size_t m_max_header_size = 1024;
char *m_header_buffer = new char[m_max_header_size];
public:
ZmqSocket() = default;
~ZmqSocket();
ZmqSocket(const ZmqSocket &) = delete;
ZmqSocket operator=(const ZmqSocket &) = delete;
ZmqSocket(ZmqSocket &&) = delete;
void disconnect();
void set_zmq_hwm(int hwm);
void set_timeout_ms(int n);
void set_potential_frame_size(size_t size);
};
} // namespace aare

View File

@ -1,5 +1,8 @@
#pragma once
#include "ZmqHeader.hpp"
#include "ZmqSocket.hpp"
#include <array>
#include <cstdint>
#include <map>
@ -13,81 +16,11 @@ class zmq_msg_t;
namespace aare {
/** zmq header structure (from slsDetectorPackage)*/
struct zmqHeader {
/** true if incoming data, false if end of acquisition */
bool data{true};
uint32_t jsonversion{0};
uint32_t dynamicRange{0};
uint64_t fileIndex{0};
/** number of detectors/port in x axis */
uint32_t ndetx{0};
/** number of detectors/port in y axis */
uint32_t ndety{0};
/** number of pixels/channels in x axis for this zmq socket */
uint32_t npixelsx{0};
/** number of pixels/channels in y axis for this zmq socket */
uint32_t npixelsy{0};
/** number of bytes for an image in this socket */
uint32_t imageSize{0};
/** frame number from detector */
uint64_t acqIndex{0};
/** frame index (starting at 0 for each acquisition) */
uint64_t frameIndex{0};
/** progress in percentage */
double progress{0};
/** file name prefix */
std::string fname;
/** header from detector */
uint64_t frameNumber{0};
uint32_t expLength{0};
uint32_t packetNumber{0};
uint64_t detSpec1{0};
uint64_t timestamp{0};
uint16_t modId{0};
uint16_t row{0};
uint16_t column{0};
uint16_t detSpec2{0};
uint32_t detSpec3{0};
uint16_t detSpec4{0};
uint8_t detType{0};
uint8_t version{0};
/** if rows of image should be flipped */
int flipRows{0};
/** quad type (eiger hardware specific) */
uint32_t quad{0};
/** true if complete image, else missing packets */
bool completeImage{false};
/** additional json header */
std::map<std::string, std::string> addJsonHeader;
/** (xmin, xmax, ymin, ymax) roi only in files written */
std::array<int, 4> rx_roi{};
};
class ZmqSocketReceiver {
void *m_context{nullptr};
void *m_socket{nullptr};
std::string m_endpoint;
int m_zmq_hwm{1000};
int m_timeout_ms{1000};
constexpr static size_t m_max_header_size = 1024;
char *m_header_buffer = new char[m_max_header_size];
bool decode_header(zmqHeader &h);
class ZmqSocketReceiver : public ZmqSocket {
public:
ZmqSocketReceiver(const std::string &endpoint);
~ZmqSocketReceiver();
ZmqSocketReceiver(const ZmqSocketReceiver &) = delete;
ZmqSocketReceiver operator=(const ZmqSocketReceiver &) = delete;
ZmqSocketReceiver(ZmqSocketReceiver &&) = delete;
void connect();
void disconnect();
void set_zmq_hwm(int hwm);
void set_timeout_ms(int n);
int receive(zmqHeader &header, std::byte *data);
int receive(ZmqHeader &header, std::byte *data, bool serialized_header = false);
};
} // namespace aare

View File

@ -0,0 +1,12 @@
#pragma once
#include "ZmqHeader.hpp"
#include "ZmqSocket.hpp"
namespace aare {
class ZmqSocketSender : public ZmqSocket {
public:
ZmqSocketSender(const std::string &endpoint);
void bind();
int send(ZmqHeader &header, const std::byte *data, size_t size, bool serialize_header = false);
};
} // namespace aare

View File

@ -0,0 +1,188 @@
#include "aare/ZmqHeader.hpp"
#include "simdjson.h"
using namespace simdjson;
// helper functions to write json
// append to string for better performance (not tested)
/**
* @brief write a digit to a string
* takes key and value and outputs->"key": value,
* @tparam T type of value (int, uint32_t, ...)
* @param s string to append to
* @param key key to write
* @param value value to write
* @return void
* @note
* - can't use concepts here because we are using c++17
*/
template <typename T> void write_digit(std::string &s, const std::string &key, const T &value) {
s += "\"";
s += key;
s += "\": ";
s += std::to_string(value);
s += ", ";
}
void write_str(std::string &s, const std::string &key, const std::string &value) {
s += "\"";
s += key;
s += "\": \"";
s += value;
s += "\", ";
}
void write_map(std::string &s, const std::string &key, const std::map<std::string, std::string> &value) {
s += "\"";
s += key;
s += "\": {";
for (auto &kv : value) {
write_str(s, kv.first, kv.second);
}
s += "}, ";
}
void write_array(std::string &s, const std::string &key, const std::array<int, 4> &value) {
s += "\"";
s += key;
s += "\": [";
s += std::to_string(value[0]);
s += ", ";
s += std::to_string(value[1]);
s += ", ";
s += std::to_string(value[2]);
s += ", ";
s += std::to_string(value[3]);
s += "], ";
}
namespace aare {
std::string ZmqHeader::to_string() const {
std::string s = "";
s.reserve(1024);
s += "{";
write_digit(s, "data", data ? 1 : 0);
write_digit(s, "jsonversion", jsonversion);
write_digit(s, "dynamicRange", dynamicRange);
write_digit(s, "fileIndex", fileIndex);
write_digit(s, "ndetx", ndetx);
write_digit(s, "ndety", ndety);
write_digit(s, "npixelsx", npixelsx);
write_digit(s, "npixelsy", npixelsy);
write_digit(s, "imageSize", imageSize);
write_digit(s, "acqIndex", acqIndex);
write_digit(s, "frameIndex", frameIndex);
write_digit(s, "progress", progress);
write_str(s, "fname", fname);
write_digit(s, "frameNumber", frameNumber);
write_digit(s, "expLength", expLength);
write_digit(s, "packetNumber", packetNumber);
write_digit(s, "detSpec1", detSpec1);
write_digit(s, "timestamp", timestamp);
write_digit(s, "modId", modId);
write_digit(s, "row", row);
write_digit(s, "column", column);
write_digit(s, "detSpec2", detSpec2);
write_digit(s, "detSpec3", detSpec3);
write_digit(s, "detSpec4", detSpec4);
write_digit(s, "detType", detType);
write_digit(s, "version", version);
write_digit(s, "flipRows", flipRows);
write_digit(s, "quad", quad);
write_digit(s, "completeImage", completeImage ? 1 : 0);
write_map(s, "addJsonHeader", addJsonHeader);
write_array(s, "rx_roi", rx_roi);
// remove last comma
s.pop_back();
s.pop_back();
s += "}";
return s;
}
void ZmqHeader::from_string(std::string &s) {
simdjson::padded_string ps(s.c_str(), s.size());
ondemand::parser parser;
ondemand::document doc = parser.iterate(ps);
ondemand::object object = doc.get_object();
for (auto field : object) {
std::string_view key = field.unescaped_key();
if (key == "data") {
data = uint64_t(field.value()) ? true : false;
} else if (key == "jsonversion") {
jsonversion = uint32_t(field.value());
} else if (key == "dynamicRange") {
dynamicRange = uint32_t(field.value());
} else if (key == "fileIndex") {
fileIndex = uint64_t(field.value());
} else if (key == "ndetx") {
ndetx = uint32_t(field.value());
} else if (key == "ndety") {
ndety = uint32_t(field.value());
} else if (key == "npixelsx") {
npixelsx = uint32_t(field.value());
} else if (key == "npixelsy") {
npixelsy = uint32_t(field.value());
} else if (key == "imageSize") {
imageSize = uint32_t(field.value());
} else if (key == "acqIndex") {
acqIndex = uint64_t(field.value());
} else if (key == "frameIndex") {
frameIndex = uint64_t(field.value());
} else if (key == "progress") {
progress = field.value().get_double();
} else if (key == "fname") {
std::string_view tmp = field.value().get_string();
fname = {tmp.begin(), tmp.end()};
} else if (key == "frameNumber") {
frameNumber = uint64_t(field.value());
} else if (key == "expLength") {
expLength = uint32_t(field.value());
} else if (key == "packetNumber") {
packetNumber = uint32_t(field.value());
} else if (key == "detSpec1") {
detSpec1 = uint64_t(field.value());
} else if (key == "timestamp") {
timestamp = uint64_t(field.value());
} else if (key == "modId") {
modId = uint32_t(field.value());
} else if (key == "row") {
row = uint32_t(field.value());
} else if (key == "column") {
column = uint32_t(field.value());
} else if (key == "detSpec2") {
detSpec2 = uint32_t(field.value());
} else if (key == "detSpec3") {
detSpec3 = uint32_t(field.value());
} else if (key == "detSpec4") {
detSpec4 = uint32_t(field.value());
} else if (key == "detType") {
detType = uint32_t(field.value());
} else if (key == "version") {
version = uint32_t(field.value());
} else if (key == "flipRows") {
flipRows = uint32_t(field.value());
} else if (key == "quad") {
quad = uint32_t(field.value());
} else if (key == "completeImage") {
completeImage = uint64_t(field.value()) ? true : false;
} else if (key == "addJsonHeader") {
for (auto field2 : field.value().get_object()) {
simdjson::ondemand::raw_json_string tmp;
auto error = field2.key().get(tmp);
std::string key2(tmp.raw());
std::string val;
error = field2.value().get_string(val);
addJsonHeader[key2] = std::string(val);
}
} else if (key == "rx_roi") {
rx_roi = std::array<int, 4>(field.value());
}
}
}
} // namespace aare

View File

@ -0,0 +1,26 @@
#include "aare/ZmqSocket.hpp"
#include <fmt/core.h>
#include <zmq.h>
namespace aare {
void ZmqSocket::disconnect() {
zmq_close(m_socket);
zmq_ctx_destroy(m_context);
m_socket = nullptr;
m_context = nullptr;
}
ZmqSocket::~ZmqSocket() {
if (m_socket)
disconnect();
delete[] m_header_buffer;
}
void ZmqSocket::set_zmq_hwm(int hwm) { m_zmq_hwm = hwm; }
void ZmqSocket::set_timeout_ms(int n) { m_timeout_ms = n; }
void ZmqSocket::set_potential_frame_size(size_t size) { m_potential_frame_size = size; }
} // namespace aare

View File

@ -1,10 +1,13 @@
#include "aare/ZmqSocketReceiver.hpp"
#include "aare/utils/logger.hpp"
#include <fmt/core.h>
#include <zmq.h>
namespace aare {
ZmqSocketReceiver::ZmqSocketReceiver(const std::string &endpoint) : m_endpoint(endpoint) {
ZmqSocketReceiver::ZmqSocketReceiver(const std::string &endpoint) {
m_endpoint = endpoint;
memset(m_header_buffer, 0, m_max_header_size);
}
@ -16,7 +19,7 @@ void ZmqSocketReceiver::connect() {
if (rc)
throw std::runtime_error(fmt::format("Could not set ZMQ_RCVHWM: {}", strerror(errno)));
int bufsize = 1024 * 1024 * m_zmq_hwm;
int bufsize = m_potential_frame_size * m_zmq_hwm;
fmt::print("Setting ZMQ_RCVBUF to: {} MB\n", bufsize / (1024 * 1024));
rc = zmq_setsockopt(m_socket, ZMQ_RCVBUF, &bufsize, sizeof(bufsize));
if (rc)
@ -26,37 +29,27 @@ void ZmqSocketReceiver::connect() {
zmq_setsockopt(m_socket, ZMQ_SUBSCRIBE, "", 0);
}
void ZmqSocketReceiver::disconnect() {
zmq_close(m_socket);
zmq_ctx_destroy(m_context);
m_socket = nullptr;
m_context = nullptr;
}
int ZmqSocketReceiver::receive(ZmqHeader &header, std::byte *data, bool serialized_header) {
ZmqSocketReceiver::~ZmqSocketReceiver() {
if (m_socket)
disconnect();
delete[] m_header_buffer;
}
if (serialized_header)
throw std::runtime_error("Not implemented");
void ZmqSocketReceiver::set_zmq_hwm(int hwm) { m_zmq_hwm = hwm; }
void ZmqSocketReceiver::set_timeout_ms(int n) { m_timeout_ms = n; }
int ZmqSocketReceiver::receive(zmqHeader &header, std::byte *data) {
int header_bytes_received = zmq_recv(m_socket, m_header_buffer, m_max_header_size, 0);
// receive header
int header_bytes_received = zmq_recv(m_socket, m_header_buffer, m_max_header_size, 0);
m_header_buffer[header_bytes_received] = '\0'; // make sure we zero terminate
if (header_bytes_received < 0) {
fmt::print("Error receiving header: {}\n", strerror(errno));
return -1;
}
fmt::print("Bytes: {}, Header: {}\n", header_bytes_received, m_header_buffer);
aare::logger::debug("Bytes: ", header_bytes_received, ", Header: ", m_header_buffer);
// decode header
if (!decode_header(header)) {
fmt::print("Error decoding header\n");
// parse header
try {
std::string header_str(m_header_buffer);
header.from_string(header_str);
} catch (const simdjson::simdjson_error &e) {
aare::logger::error(LOCATION + "Error parsing header: ", e.what());
return -1;
}
@ -67,16 +60,13 @@ int ZmqSocketReceiver::receive(zmqHeader &header, std::byte *data) {
if (!more) {
return 0; // no data following header
} else {
int data_bytes_received = zmq_recv(m_socket, data, 1024 * 1024 * 2, 0); // TODO! configurable size!!!!
int data_bytes_received = zmq_recv(m_socket, data, header.imageSize, 0); // TODO! configurable size!!!!
if (data_bytes_received == -1)
throw std::runtime_error("Got half of a multipart msg!!!");
aare::logger::debug("Bytes: ", data_bytes_received);
}
return 1;
}
bool ZmqSocketReceiver::decode_header(zmqHeader &h) {
// TODO: implement
return true;
}
} // namespace aare

View File

@ -0,0 +1,32 @@
#include "aare/ZmqSocketSender.hpp"
#include <cassert>
#include <zmq.h>
namespace aare {
ZmqSocketSender::ZmqSocketSender(const std::string &endpoint) { m_endpoint = endpoint; }
void ZmqSocketSender::bind() {
m_context = zmq_ctx_new();
m_socket = zmq_socket(m_context, ZMQ_PUB);
int rc = zmq_bind(m_socket, m_endpoint.c_str());
assert(rc == 0);
}
int ZmqSocketSender::send(ZmqHeader &header, const std::byte *data, size_t size, bool serialize_header) {
int rc;
if (serialize_header) {
rc = zmq_send(m_socket, &header, sizeof(ZmqHeader), ZMQ_SNDMORE);
assert(rc == sizeof(ZmqHeader));
} else {
std::string header_str = header.to_string();
rc = zmq_send(m_socket, header_str.c_str(), header_str.size(), ZMQ_SNDMORE);
assert(rc == header_str.size());
}
int rc2 = zmq_send(m_socket, data, size, 0);
assert(rc2 == size);
return rc + rc2;
}
} // namespace aare

View File

@ -20,6 +20,19 @@ template <typename T> std::ostream &operator<<(std::ostream &out, const std::vec
return out;
}
// operator overload for std::array
template <typename T, size_t N> std::ostream &operator<<(std::ostream &out, const std::array<T, N> &v) {
out << "[";
size_t last = N - 1;
for (size_t i = 0; i < N; ++i) {
out << v[i];
if (i != last)
out << ", ";
}
out << "]";
return out;
}
namespace aare {
namespace logger {