From a2499c96d643a74c09a2e7f4cf7f2f5809d9aef5 Mon Sep 17 00:00:00 2001 From: Bechir Date: Tue, 9 Apr 2024 08:32:53 +0200 Subject: [PATCH] restructure zmq socker interface zmq socket can now return vector of frames. it knows end of transmission with header.data == 0 it can also send vector of frames --- core/include/aare/Frame.hpp | 8 ++ core/include/aare/defs.hpp | 4 +- core/src/defs.cpp | 6 +- examples/zmq_receiver_example.cpp | 32 ++++--- examples/zmq_sender_example.cpp | 29 ++++-- file_io/src/NumpyHelpers.cpp | 1 + file_io/src/SubFile.cpp | 4 +- network_io/include/aare/ZmqSocketSender.hpp | 12 --- .../aare/{ => network_io}/ZmqHeader.hpp | 4 + .../aare/{ => network_io}/ZmqSocket.hpp | 2 - .../{ => network_io}/ZmqSocketReceiver.hpp | 16 +++- .../aare/network_io/ZmqSocketSender.hpp | 16 ++++ network_io/include/aare/network_io/defs.hpp | 34 +++++++ network_io/src/ZmqHeader.cpp | 2 +- network_io/src/ZmqSocket.cpp | 3 +- network_io/src/ZmqSocketReceiver.cpp | 94 +++++++++++++------ network_io/src/ZmqSocketSender.cpp | 73 ++++++++++++-- network_io/test/ZmqHeader.test.cpp | 2 +- 18 files changed, 255 insertions(+), 87 deletions(-) delete mode 100644 network_io/include/aare/ZmqSocketSender.hpp rename network_io/include/aare/{ => network_io}/ZmqHeader.hpp (98%) rename network_io/include/aare/{ => network_io}/ZmqSocket.hpp (96%) rename network_io/include/aare/{ => network_io}/ZmqSocketReceiver.hpp (55%) create mode 100644 network_io/include/aare/network_io/ZmqSocketSender.hpp create mode 100644 network_io/include/aare/network_io/defs.hpp diff --git a/core/include/aare/Frame.hpp b/core/include/aare/Frame.hpp index e785fc2..ea5ad2d 100644 --- a/core/include/aare/Frame.hpp +++ b/core/include/aare/Frame.hpp @@ -58,6 +58,14 @@ class Frame { other.m_data = nullptr; other.m_rows = other.m_cols = other.m_bitdepth = 0; } + // copy constructor + Frame(const Frame &other) { + m_rows = other.rows(); + m_cols = other.cols(); + m_bitdepth = other.bitdepth(); + m_data = new std::byte[m_rows * m_cols * m_bitdepth / 8]; + std::memcpy(m_data, other.m_data, m_rows * m_cols * m_bitdepth / 8); + } template NDView view() { std::vector shape = {m_rows, m_cols}; diff --git a/core/include/aare/defs.hpp b/core/include/aare/defs.hpp index 8ce8037..9838be7 100644 --- a/core/include/aare/defs.hpp +++ b/core/include/aare/defs.hpp @@ -1,11 +1,11 @@ #pragma once #include -#include #include + +#include #include #include -#include #include #include diff --git a/core/src/defs.cpp b/core/src/defs.cpp index 4d6e6f8..f7d06e5 100644 --- a/core/src/defs.cpp +++ b/core/src/defs.cpp @@ -31,8 +31,7 @@ template <> DetectorType StringTo(std::string name) { else if (name == "ChipTestBoard") return DetectorType::ChipTestBoard; else { - auto msg = fmt::format("Could not decode dector from: \"{}\"", name); - throw std::runtime_error(msg); + throw std::runtime_error("Could not decode dector from: \"" + name + "\""); } } @@ -42,8 +41,7 @@ template <> TimingMode StringTo(std::string mode) { else if (mode == "trigger") return TimingMode::Trigger; else { - auto msg = fmt::format("Could not decode timing mode from: \"{}\"", mode); - throw std::runtime_error(msg); + throw std::runtime_error("Could not decode timing mode from: \"" + mode + "\""); } } diff --git a/examples/zmq_receiver_example.cpp b/examples/zmq_receiver_example.cpp index 888d640..47f80fc 100644 --- a/examples/zmq_receiver_example.cpp +++ b/examples/zmq_receiver_example.cpp @@ -1,27 +1,33 @@ -#include "aare/ZmqSocketReceiver.hpp" +#include "aare/network_io/ZmqSocketReceiver.hpp" +#include "aare/network_io/defs.hpp" + #include #include #include +using namespace aare; int main() { + // aare::logger::set_verbosity(aare::logger::DEBUG); std::string endpoint = "tcp://localhost:5555"; aare::ZmqSocketReceiver socket(endpoint); socket.connect(); - char *data = new char[1024 * 1024 * 10]; - aare::ZmqHeader header; - while (true) { - int rc = socket.receive(header, reinterpret_cast(data)); - aare::logger::info("Received bytes", rc, "Received header: ", header.to_string()); - auto *data_int = reinterpret_cast(data); - for (uint32_t i = 0; i < header.npixelsx; i++) { - for (uint32_t j = 0; j < header.npixelsy; j++) { - // verify that the sent data is correct - assert(data_int[i * header.npixelsy + j] == i + j); + std::vector v = socket.receive_n(); + aare::logger::info("Received ", v.size(), " frames"); + aare::logger::info("acquisition:", v[0].header.acqIndex); + aare::logger::info("Header size:", v[0].header.to_string().size()); + aare::logger::info("Frame size:", v[0].frame.size()); + aare::logger::info("Header:", v[0].header.to_string()); + + for (ZmqFrame zmq_frame : v) { + auto &[header, frame] = zmq_frame; + for (int i = 0; i < 1024; i++) { + for (int j = 0; j < 1024; j++) { + assert(*(uint32_t *)frame.get(i, j) == (uint32_t)i + j); + } } + aare::logger::info("Frame verified"); } - aare::logger::info("Frame verified"); } - delete[] data; return 0; } \ No newline at end of file diff --git a/examples/zmq_sender_example.cpp b/examples/zmq_sender_example.cpp index 5739d91..48f5db0 100644 --- a/examples/zmq_sender_example.cpp +++ b/examples/zmq_sender_example.cpp @@ -1,13 +1,17 @@ #include "aare/Frame.hpp" -#include "aare/ZmqSocketSender.hpp" +#include "aare/network_io/ZmqHeader.hpp" +#include "aare/network_io/ZmqSocketSender.hpp" +#include "aare/network_io/defs.hpp" #include "aare/utils/logger.hpp" +#include // std::time #include #include #include // sleep using namespace aare; int main() { + std::srand(std::time(nullptr)); std::string endpoint = "tcp://*:5555"; aare::ZmqSocketSender socket(endpoint); socket.bind(); @@ -23,13 +27,24 @@ int main() { header.imageSize = sizeof(uint32_t) * 1024 * 1024; header.dynamicRange = 32; - int i = 0; - while (true) { - aare::logger::info("Sending frame:", i++); - aare::logger::info("Header size:", sizeof(header.to_string())); - aare::logger::info("Frame size:", frame.size(), "\n"); + std::vector zmq_frames; + // send two exact frames - int rc = socket.send(header, frame.data(), frame.size()); + int acqid = 0; + while (true) { + zmq_frames.clear(); + header.acqIndex = acqid++; + size_t n_frames = std::rand() % 10 + 1; + + aare::logger::info("acquisition:", header.acqIndex); + aare::logger::info("Header size:", header.to_string().size()); + aare::logger::info("Frame size:", frame.size()); + aare::logger::info("Number of frames:", n_frames); + + for (size_t i = 0; i < n_frames; i++) { + zmq_frames.push_back({header, frame}); + } + size_t rc = socket.send(zmq_frames); aare::logger::info("Sent bytes", rc); sleep(1); } diff --git a/file_io/src/NumpyHelpers.cpp b/file_io/src/NumpyHelpers.cpp index 619255d..dbfa9e6 100644 --- a/file_io/src/NumpyHelpers.cpp +++ b/file_io/src/NumpyHelpers.cpp @@ -23,6 +23,7 @@ */ #include "aare/NumpyHelpers.hpp" +#include namespace aare { diff --git a/file_io/src/SubFile.cpp b/file_io/src/SubFile.cpp index f25371f..f65e7d5 100644 --- a/file_io/src/SubFile.cpp +++ b/file_io/src/SubFile.cpp @@ -1,5 +1,7 @@ #include "aare/SubFile.hpp" #include "aare/utils/logger.hpp" +#include // memcpy +#include #include // #include @@ -79,7 +81,7 @@ template size_t SubFile::read_impl_flip(std::byte *buffer) { auto src = &tmp[0]; for (int i = 0; i != this->m_rows; ++i) { - memcpy(dst, src, row_size); + std::memcpy(dst, src, row_size); dst -= row_size; src += row_size; } diff --git a/network_io/include/aare/ZmqSocketSender.hpp b/network_io/include/aare/ZmqSocketSender.hpp deleted file mode 100644 index ce2b91f..0000000 --- a/network_io/include/aare/ZmqSocketSender.hpp +++ /dev/null @@ -1,12 +0,0 @@ -#pragma once -#include "ZmqHeader.hpp" -#include "ZmqSocket.hpp" - -namespace aare { -class ZmqSocketSender : public ZmqSocket { - public: - ZmqSocketSender(const std::string &endpoint); - void bind(); - size_t send(ZmqHeader &header, const std::byte *data, size_t size, bool serialize_header = false); -}; -} // namespace aare \ No newline at end of file diff --git a/network_io/include/aare/ZmqHeader.hpp b/network_io/include/aare/network_io/ZmqHeader.hpp similarity index 98% rename from network_io/include/aare/ZmqHeader.hpp rename to network_io/include/aare/network_io/ZmqHeader.hpp index 6096282..392bfc5 100644 --- a/network_io/include/aare/ZmqHeader.hpp +++ b/network_io/include/aare/network_io/ZmqHeader.hpp @@ -1,4 +1,7 @@ +#pragma once +#include "aare/Frame.hpp" #include "aare/utils/logger.hpp" + #include "simdjson.h" #include #include @@ -135,4 +138,5 @@ struct ZmqHeader { // compare operator bool operator==(const ZmqHeader &other) const; }; + } // namespace aare \ No newline at end of file diff --git a/network_io/include/aare/ZmqSocket.hpp b/network_io/include/aare/network_io/ZmqSocket.hpp similarity index 96% rename from network_io/include/aare/ZmqSocket.hpp rename to network_io/include/aare/network_io/ZmqSocket.hpp index 474e054..1338658 100644 --- a/network_io/include/aare/ZmqSocket.hpp +++ b/network_io/include/aare/network_io/ZmqSocket.hpp @@ -1,7 +1,5 @@ #pragma once -#include -#include #include // Socket to receive data from a ZMQ publisher diff --git a/network_io/include/aare/ZmqSocketReceiver.hpp b/network_io/include/aare/network_io/ZmqSocketReceiver.hpp similarity index 55% rename from network_io/include/aare/ZmqSocketReceiver.hpp rename to network_io/include/aare/network_io/ZmqSocketReceiver.hpp index 894c750..793d196 100644 --- a/network_io/include/aare/ZmqSocketReceiver.hpp +++ b/network_io/include/aare/network_io/ZmqSocketReceiver.hpp @@ -1,11 +1,11 @@ #pragma once -#include "ZmqHeader.hpp" -#include "ZmqSocket.hpp" +#include "aare/Frame.hpp" +#include "aare/network_io/ZmqHeader.hpp" +#include "aare/network_io/ZmqSocket.hpp" +#include "aare/network_io/defs.hpp" -#include #include -#include #include // Socket to receive data from a ZMQ publisher @@ -20,7 +20,13 @@ class ZmqSocketReceiver : public ZmqSocket { public: ZmqSocketReceiver(const std::string &endpoint); void connect(); - size_t receive(ZmqHeader &header, std::byte *data, bool serialized_header = false); + ZmqFrame receive(ZmqFrame &zmq_frame); + std::vector receive_n(); + + private: + int receive_data(std::byte *data, size_t size); + ZmqFrame receive_zmqframe(); + ZmqHeader receive_header(); }; } // namespace aare \ No newline at end of file diff --git a/network_io/include/aare/network_io/ZmqSocketSender.hpp b/network_io/include/aare/network_io/ZmqSocketSender.hpp new file mode 100644 index 0000000..cbba321 --- /dev/null +++ b/network_io/include/aare/network_io/ZmqSocketSender.hpp @@ -0,0 +1,16 @@ +#pragma once +#include "aare/Frame.hpp" +#include "aare/network_io/ZmqHeader.hpp" +#include "aare/network_io/ZmqSocket.hpp" +#include "aare/network_io/defs.hpp" + +namespace aare { +class ZmqSocketSender : public ZmqSocket { + public: + ZmqSocketSender(const std::string &endpoint); + void bind(); + size_t send(const ZmqHeader &header, const std::byte *data, size_t size); + size_t send(const ZmqFrame &zmq_frame); + size_t send(const std::vector &zmq_frames); +}; +} // namespace aare \ No newline at end of file diff --git a/network_io/include/aare/network_io/defs.hpp b/network_io/include/aare/network_io/defs.hpp new file mode 100644 index 0000000..c97dcf8 --- /dev/null +++ b/network_io/include/aare/network_io/defs.hpp @@ -0,0 +1,34 @@ +#pragma once +#include "aare/Frame.hpp" +#include "aare/network_io/ZmqHeader.hpp" + +#include +#include + +namespace aare { +/** + * @brief ZmqFrame structure + * wrapper class to contain a ZmqHeader and a Frame + */ +struct ZmqFrame { + ZmqHeader header; + Frame frame; +}; + +namespace network_io { +/** + * @brief NetworkError exception class + */ +class NetworkError : public std::runtime_error { + private: + const char *m_msg; + + public: + NetworkError(const char *msg) : std::runtime_error(msg), m_msg(msg) {} + NetworkError(const std::string msg) : std::runtime_error(msg) { m_msg = strdup(msg.c_str()); } + virtual const char *what() const noexcept override { return m_msg; } +}; + +} // namespace network_io + +} // namespace aare \ No newline at end of file diff --git a/network_io/src/ZmqHeader.cpp b/network_io/src/ZmqHeader.cpp index 3eff459..fdc67a2 100644 --- a/network_io/src/ZmqHeader.cpp +++ b/network_io/src/ZmqHeader.cpp @@ -1,5 +1,5 @@ -#include "aare/ZmqHeader.hpp" +#include "aare/network_io/ZmqHeader.hpp" #include "simdjson.h" diff --git a/network_io/src/ZmqSocket.cpp b/network_io/src/ZmqSocket.cpp index 47ddf79..6200aac 100644 --- a/network_io/src/ZmqSocket.cpp +++ b/network_io/src/ZmqSocket.cpp @@ -1,5 +1,4 @@ -#include "aare/ZmqSocket.hpp" -#include +#include "aare/network_io/ZmqSocket.hpp" #include namespace aare { diff --git a/network_io/src/ZmqSocketReceiver.cpp b/network_io/src/ZmqSocketReceiver.cpp index 2d395ea..36559b2 100644 --- a/network_io/src/ZmqSocketReceiver.cpp +++ b/network_io/src/ZmqSocketReceiver.cpp @@ -1,4 +1,4 @@ -#include "aare/ZmqSocketReceiver.hpp" +#include "aare/network_io/ZmqSocketReceiver.hpp" #include "aare/utils/logger.hpp" #include @@ -6,69 +6,109 @@ namespace aare { +/** + * @brief Construct a new ZmqSocketReceiver object + */ ZmqSocketReceiver::ZmqSocketReceiver(const std::string &endpoint) { m_endpoint = endpoint; memset(m_header_buffer, 0, m_max_header_size); } +/** + * @brief Connect to the given endpoint + * subscribe to a Zmq published + */ void ZmqSocketReceiver::connect() { m_context = zmq_ctx_new(); m_socket = zmq_socket(m_context, ZMQ_SUB); fmt::print("Setting ZMQ_RCVHWM to {}\n", m_zmq_hwm); int rc = zmq_setsockopt(m_socket, ZMQ_RCVHWM, &m_zmq_hwm, sizeof(m_zmq_hwm)); // should be set before connect if (rc) - throw std::runtime_error(fmt::format("Could not set ZMQ_RCVHWM: {}", strerror(errno))); + throw network_io::NetworkError(fmt::format("Could not set ZMQ_RCVHWM: {}", strerror(errno))); int bufsize = m_potential_frame_size * m_zmq_hwm; fmt::print("Setting ZMQ_RCVBUF to: {} MB\n", bufsize / (1024 * 1024)); rc = zmq_setsockopt(m_socket, ZMQ_RCVBUF, &bufsize, sizeof(bufsize)); if (rc) - throw std::runtime_error(fmt::format("Could not set ZMQ_RCVBUF: {}", strerror(errno))); + throw network_io::NetworkError(fmt::format("Could not set ZMQ_RCVBUF: {}", strerror(errno))); zmq_connect(m_socket, m_endpoint.c_str()); zmq_setsockopt(m_socket, ZMQ_SUBSCRIBE, "", 0); } -size_t ZmqSocketReceiver::receive(ZmqHeader &header, std::byte *data, bool serialized_header) { - - size_t data_bytes_received{}; - - if (serialized_header) - throw std::runtime_error("Not implemented"); +/** + * @brief receive a ZmqHeader + * @return ZmqHeader + */ +ZmqHeader ZmqSocketReceiver::receive_header() { + // receive string ZmqHeader size_t header_bytes_received = zmq_recv(m_socket, m_header_buffer, m_max_header_size, 0); - // receive header m_header_buffer[header_bytes_received] = '\0'; // make sure we zero terminate if (header_bytes_received < 0) { - fmt::print("Error receiving header: {}\n", strerror(errno)); - return -1; + throw network_io::NetworkError(LOCATION + "Error receiving header"); } aare::logger::debug("Bytes: ", header_bytes_received, ", Header: ", m_header_buffer); // parse header + ZmqHeader header; try { std::string header_str(m_header_buffer); header.from_string(header_str); } catch (const simdjson::simdjson_error &e) { - aare::logger::error(LOCATION + "Error parsing header: ", e.what()); - return -1; + throw network_io::NetworkError(LOCATION + "Error parsing header: " + e.what()); + } + return header; +} + +/** + * @brief receive data following a ZmqHeader + * @param data pointer to data + * @param size size of data + * @return ZmqHeader + */ +int ZmqSocketReceiver::receive_data(std::byte *data, size_t size) { + int data_bytes_received = zmq_recv(m_socket, data, size, 0); + if (data_bytes_received == -1) + network_io::NetworkError("Got half of a multipart msg!!!"); + aare::logger::debug("Bytes: ", data_bytes_received); + + return data_bytes_received; +} + +ZmqFrame ZmqSocketReceiver::receive_zmqframe() { + // receive header from zmq and parse it + ZmqHeader header = receive_header(); + if (!header.data) { + // no data following header + return {header, Frame(0, 0, 0)}; } - // do we have a multipart message (data following header)? - int more; - size_t more_size = sizeof(more); - zmq_getsockopt(m_socket, ZMQ_RCVMORE, &more, &more_size); - if (!more) { - return 0; // no data following header - } else { - - data_bytes_received = zmq_recv(m_socket, data, header.imageSize, 0); // TODO! configurable size!!!! - if (data_bytes_received == -1) - throw std::runtime_error("Got half of a multipart msg!!!"); - aare::logger::debug("Bytes: ", data_bytes_received); + // receive frame data + Frame frame(header.npixelsx, header.npixelsy, header.dynamicRange); + int bytes_received = receive_data(frame.data(), frame.size()); + if (bytes_received == -1) { + throw network_io::NetworkError(LOCATION + "Error receiving frame"); } - return data_bytes_received + header_bytes_received; + if ((uint32_t)bytes_received != header.imageSize) { + throw network_io::NetworkError( + fmt::format("{} Expected {} bytes but received {}", LOCATION, header.imageSize, bytes_received)); + } + return {header, std::move(frame)}; +} + +std::vector ZmqSocketReceiver::receive_n() { + std::vector frames; + while (true) { + // receive header and frame + ZmqFrame zmq_frame = receive_zmqframe(); + if (!zmq_frame.header.data) { + break; + } + frames.push_back(zmq_frame); + } + return frames; } } // namespace aare diff --git a/network_io/src/ZmqSocketSender.cpp b/network_io/src/ZmqSocketSender.cpp index ba296c6..777e680 100644 --- a/network_io/src/ZmqSocketSender.cpp +++ b/network_io/src/ZmqSocketSender.cpp @@ -1,11 +1,18 @@ -#include "aare/ZmqSocketSender.hpp" - +#include "aare/network_io/ZmqSocketSender.hpp" #include #include namespace aare { + +/** + * Constructor + * @param endpoint ZMQ endpoint + */ ZmqSocketSender::ZmqSocketSender(const std::string &endpoint) { m_endpoint = endpoint; } +/** + * bind to the given port + */ void ZmqSocketSender::bind() { m_context = zmq_ctx_new(); m_socket = zmq_socket(m_context, ZMQ_PUB); @@ -13,15 +20,23 @@ void ZmqSocketSender::bind() { assert(rc == 0); } -size_t ZmqSocketSender::send(ZmqHeader &header, const std::byte *data, size_t size, bool serialize_header) { +/** + * send a header and data + * @param header + * @param data pointer to data + * @param size size of data + * @return number of bytes sent + */ +size_t ZmqSocketSender::send(const ZmqHeader &header, const std::byte *data, size_t size) { size_t rc; - if (serialize_header) { - rc = zmq_send(m_socket, &header, sizeof(ZmqHeader), ZMQ_SNDMORE); - assert(rc == sizeof(ZmqHeader)); - } else { - std::string header_str = header.to_string(); - rc = zmq_send(m_socket, header_str.c_str(), header_str.size(), ZMQ_SNDMORE); - assert(rc == header_str.size()); + // if (serialize_header) { + // rc = zmq_send(m_socket, &header, sizeof(ZmqHeader), ZMQ_SNDMORE); + // assert(rc == sizeof(ZmqHeader)); + std::string header_str = header.to_string(); + rc = zmq_send(m_socket, header_str.c_str(), header_str.size(), ZMQ_SNDMORE); + assert(rc == header_str.size()); + if (data == nullptr) { + return rc; } size_t rc2 = zmq_send(m_socket, data, size, 0); @@ -29,4 +44,42 @@ size_t ZmqSocketSender::send(ZmqHeader &header, const std::byte *data, size_t si return rc + rc2; } +/** + * Send a frame with a header + * @param ZmqFrame that contains a header and a frame + * @return number of bytes sent + */ +size_t ZmqSocketSender::send(const ZmqFrame &zmq_frame) { + const Frame &frame = zmq_frame.frame; + // send frame + size_t rc = send(zmq_frame.header, frame.data(), frame.size()); + // send end of message header + ZmqHeader end_header = zmq_frame.header; + end_header.data = false; + size_t rc2 = send(end_header, nullptr, 0); + + return rc + rc2; +} + +/** + * Send a vector of headers and frames + * @param zmq_frames vector of ZmqFrame + * @return number of bytes sent + */ +size_t ZmqSocketSender::send(const std::vector &zmq_frames) { + size_t rc = 0; + for (size_t i = 0; i < zmq_frames.size(); i++) { + const ZmqHeader &header = zmq_frames[i].header; + const Frame &frame = zmq_frames[i].frame; + // send header and frame + if (i < zmq_frames.size() - 1) { + // send header and frame + rc += send(header, frame.data(), frame.size()); + } else { + // send header, frame and end of message header + rc += send({header, frame}); + } + } + return rc; +} } // namespace aare \ No newline at end of file diff --git a/network_io/test/ZmqHeader.test.cpp b/network_io/test/ZmqHeader.test.cpp index 787dfb6..c7769ff 100644 --- a/network_io/test/ZmqHeader.test.cpp +++ b/network_io/test/ZmqHeader.test.cpp @@ -1,4 +1,4 @@ -#include "aare/ZmqHeader.hpp" +#include "aare/network_io/ZmqHeader.hpp" #include "aare/utils/logger.hpp" #include