From f77243407263819df6e473aed2182cdacd9f9e20 Mon Sep 17 00:00:00 2001 From: Bechir Braham Date: Wed, 3 Apr 2024 16:40:00 +0200 Subject: [PATCH 1/6] add network_io folder --- .github/workflows/common-workflow.yml | 2 +- CMakeLists.txt | 3 ++- core/CMakeLists.txt | 3 +-- examples/CMakeLists.txt | 4 ++-- ...mq_example.cpp => zmq_receiver_example.cpp} | 4 ++-- examples/zmq_sender_example.cpp | 16 ++++++++++++++++ network_io/CMakeLists.txt | 15 +++++++++++++++ .../include/aare/ZmqSocketReceiver.hpp | 12 ++++++------ .../src/ZmqSocketReceiver.cpp | 18 +++++++++--------- 9 files changed, 54 insertions(+), 23 deletions(-) rename examples/{zmq_example.cpp => zmq_receiver_example.cpp} (80%) create mode 100644 examples/zmq_sender_example.cpp create mode 100644 network_io/CMakeLists.txt rename core/include/aare/ZmqSocket.hpp => network_io/include/aare/ZmqSocketReceiver.hpp (89%) rename core/src/ZmqSocket.cpp => network_io/src/ZmqSocketReceiver.cpp (81%) diff --git a/.github/workflows/common-workflow.yml b/.github/workflows/common-workflow.yml index 701fcaa..d0b0811 100644 --- a/.github/workflows/common-workflow.yml +++ b/.github/workflows/common-workflow.yml @@ -45,7 +45,7 @@ jobs: pwd export PROJECT_ROOT_DIR="." ls build/examples/*_example - find build/examples -name "*_example" -not -name "zmq_example" | xargs -I {} -n 1 -t bash -c {} + find build/examples -name "*_example" -not -name "zmq_*" | xargs -I {} -n 1 -t bash -c {} diff --git a/CMakeLists.txt b/CMakeLists.txt index d330d16..14ed946 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -148,11 +148,12 @@ endif() add_subdirectory(core) add_subdirectory(file_io) add_subdirectory(utils) +add_subdirectory(network_io) #Overall target to link to when using the library add_library(aare INTERFACE) -target_link_libraries(aare INTERFACE core file_io utils) +target_link_libraries(aare INTERFACE core file_io utils network_io) target_include_directories(aare INTERFACE $ $ diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index 0a08c84..460e0f5 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -4,7 +4,6 @@ set(SourceFiles ${CMAKE_CURRENT_SOURCE_DIR}/src/defs.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/DType.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/Frame.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/src/ZmqSocket.cpp ) @@ -12,7 +11,7 @@ add_library(core STATIC ${SourceFiles}) target_include_directories(core PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include) -target_link_libraries(core PUBLIC fmt::fmt PRIVATE aare_compiler_flags utils libzmq) +target_link_libraries(core PUBLIC fmt::fmt PRIVATE aare_compiler_flags utils ) if (AARE_PYTHON_BINDINGS) set_property(TARGET core PROPERTY POSITION_INDEPENDENT_CODE ON) diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 2d8bb9c..dfe6150 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -1,6 +1,6 @@ -set(EXAMPLE_LIST "json_example;logger_example;numpy_read_example;multiport_example;raw_example;zmq_example;") -set(EXAMPLE_LIST "${EXAMPLE_LIST};mythen_example;numpy_write_example") +set(EXAMPLE_LIST "json_example;logger_example;numpy_read_example;multiport_example;raw_example;") +set(EXAMPLE_LIST "${EXAMPLE_LIST};mythen_example;numpy_write_example;zmq_receiver_example;zmq_sender_example;") foreach(example ${EXAMPLE_LIST}) add_executable(${example} ${example}.cpp) target_link_libraries(${example} PUBLIC aare PRIVATE aare_compiler_flags) diff --git a/examples/zmq_example.cpp b/examples/zmq_receiver_example.cpp similarity index 80% rename from examples/zmq_example.cpp rename to examples/zmq_receiver_example.cpp index 05747e6..08e3c11 100644 --- a/examples/zmq_example.cpp +++ b/examples/zmq_receiver_example.cpp @@ -1,10 +1,10 @@ -#include "aare/ZmqSocket.hpp" +#include "aare/ZmqSocketReceiver.hpp" #include #include int main() { std::string endpoint = "tcp://localhost:5555"; - aare::ZmqSocket socket(endpoint); + aare::ZmqSocketReceiver socket(endpoint); socket.connect(); char *data = new char[1024 * 1024 * 10]; aare::zmqHeader header; diff --git a/examples/zmq_sender_example.cpp b/examples/zmq_sender_example.cpp new file mode 100644 index 0000000..08e3c11 --- /dev/null +++ b/examples/zmq_sender_example.cpp @@ -0,0 +1,16 @@ +#include "aare/ZmqSocketReceiver.hpp" +#include +#include + +int main() { + std::string endpoint = "tcp://localhost:5555"; + aare::ZmqSocketReceiver socket(endpoint); + socket.connect(); + char *data = new char[1024 * 1024 * 10]; + aare::zmqHeader header; + while (true) { + int rc = socket.receive(header, reinterpret_cast(data)); + } + delete[] data; + return 0; +} \ No newline at end of file diff --git a/network_io/CMakeLists.txt b/network_io/CMakeLists.txt new file mode 100644 index 0000000..ab6bcfa --- /dev/null +++ b/network_io/CMakeLists.txt @@ -0,0 +1,15 @@ +add_library(network_io STATIC src/ZmqSocketReceiver.cpp) +target_include_directories(network_io PUBLIC include) +target_link_libraries(network_io PRIVATE libzmq fmt::fmt core utils aare_compiler_flags) + +if(AARE_PYTHON_BINDINGS) +set_property(TARGET file_io PROPERTY POSITION_INDEPENDENT_CODE ON) +endif() + +# if(AARE_TESTS) +# set(TestSources +# ${CMAKE_CURRENT_SOURCE_DIR}/test/NumpyFile.test.cpp +# ) +# target_sources(tests PRIVATE ${TestSources} ) +# target_link_libraries(tests PRIVATE core network_io) +# endif() \ No newline at end of file diff --git a/core/include/aare/ZmqSocket.hpp b/network_io/include/aare/ZmqSocketReceiver.hpp similarity index 89% rename from core/include/aare/ZmqSocket.hpp rename to network_io/include/aare/ZmqSocketReceiver.hpp index 67859df..f4eaf88 100644 --- a/core/include/aare/ZmqSocket.hpp +++ b/network_io/include/aare/ZmqSocketReceiver.hpp @@ -64,7 +64,7 @@ struct zmqHeader { std::array rx_roi{}; }; -class ZmqSocket { +class ZmqSocketReceiver { void *m_context{nullptr}; void *m_socket{nullptr}; std::string m_endpoint; @@ -76,11 +76,11 @@ class ZmqSocket { bool decode_header(zmqHeader &h); public: - ZmqSocket(const std::string &endpoint); - ~ZmqSocket(); - ZmqSocket(const ZmqSocket &) = delete; - ZmqSocket operator=(const ZmqSocket &) = delete; - ZmqSocket(ZmqSocket &&) = delete; + ZmqSocketReceiver(const std::string &endpoint); + ~ZmqSocketReceiver(); + ZmqSocketReceiver(const ZmqSocketReceiver &) = delete; + ZmqSocketReceiver operator=(const ZmqSocketReceiver &) = delete; + ZmqSocketReceiver(ZmqSocketReceiver &&) = delete; void connect(); void disconnect(); diff --git a/core/src/ZmqSocket.cpp b/network_io/src/ZmqSocketReceiver.cpp similarity index 81% rename from core/src/ZmqSocket.cpp rename to network_io/src/ZmqSocketReceiver.cpp index ab4fb2b..8b2592a 100644 --- a/core/src/ZmqSocket.cpp +++ b/network_io/src/ZmqSocketReceiver.cpp @@ -1,14 +1,14 @@ -#include "aare/ZmqSocket.hpp" +#include "aare/ZmqSocketReceiver.hpp" #include #include namespace aare { -ZmqSocket::ZmqSocket(const std::string &endpoint) : m_endpoint(endpoint) { +ZmqSocketReceiver::ZmqSocketReceiver(const std::string &endpoint) : m_endpoint(endpoint) { memset(m_header_buffer, 0, m_max_header_size); } -void ZmqSocket::connect() { +void ZmqSocketReceiver::connect() { m_context = zmq_ctx_new(); m_socket = zmq_socket(m_context, ZMQ_SUB); fmt::print("Setting ZMQ_RCVHWM to {}\n", m_zmq_hwm); @@ -26,24 +26,24 @@ void ZmqSocket::connect() { zmq_setsockopt(m_socket, ZMQ_SUBSCRIBE, "", 0); } -void ZmqSocket::disconnect() { +void ZmqSocketReceiver::disconnect() { zmq_close(m_socket); zmq_ctx_destroy(m_context); m_socket = nullptr; m_context = nullptr; } -ZmqSocket::~ZmqSocket() { +ZmqSocketReceiver::~ZmqSocketReceiver() { if (m_socket) disconnect(); delete[] m_header_buffer; } -void ZmqSocket::set_zmq_hwm(int hwm) { m_zmq_hwm = hwm; } +void ZmqSocketReceiver::set_zmq_hwm(int hwm) { m_zmq_hwm = hwm; } -void ZmqSocket::set_timeout_ms(int n) { m_timeout_ms = n; } +void ZmqSocketReceiver::set_timeout_ms(int n) { m_timeout_ms = n; } -int ZmqSocket::receive(zmqHeader &header, std::byte *data) { +int ZmqSocketReceiver::receive(zmqHeader &header, std::byte *data) { // receive header int header_bytes_received = zmq_recv(m_socket, m_header_buffer, m_max_header_size, 0); @@ -74,7 +74,7 @@ int ZmqSocket::receive(zmqHeader &header, std::byte *data) { return 1; } -bool ZmqSocket::decode_header(zmqHeader &h) { +bool ZmqSocketReceiver::decode_header(zmqHeader &h) { // TODO: implement return true; } From 47cf462f3d27b8853882d06b0dcd930e11ab35e6 Mon Sep 17 00:00:00 2001 From: Bechir Braham Date: Thu, 4 Apr 2024 15:51:18 +0200 Subject: [PATCH 2/6] zmq sender and receiver examples --- aare-environment.yml | 2 +- examples/zmq_receiver_example.cpp | 12 +- examples/zmq_sender_example.cpp | 38 +++- network_io/CMakeLists.txt | 27 ++- network_io/include/aare/ZmqHeader.hpp | 106 ++++++++++ network_io/include/aare/ZmqSocket.hpp | 40 ++++ network_io/include/aare/ZmqSocketReceiver.hpp | 77 +------ network_io/include/aare/ZmqSocketSender.hpp | 12 ++ network_io/src/ZmqHeader.cpp | 188 ++++++++++++++++++ network_io/src/ZmqSocket.cpp | 26 +++ network_io/src/ZmqSocketReceiver.cpp | 48 ++--- network_io/src/ZmqSocketSender.cpp | 32 +++ utils/include/aare/utils/logger.hpp | 13 ++ 13 files changed, 506 insertions(+), 115 deletions(-) create mode 100644 network_io/include/aare/ZmqHeader.hpp create mode 100644 network_io/include/aare/ZmqSocket.hpp create mode 100644 network_io/include/aare/ZmqSocketSender.hpp create mode 100644 network_io/src/ZmqHeader.cpp create mode 100644 network_io/src/ZmqSocket.cpp create mode 100644 network_io/src/ZmqSocketSender.cpp diff --git a/aare-environment.yml b/aare-environment.yml index d2babc2..b8112e2 100644 --- a/aare-environment.yml +++ b/aare-environment.yml @@ -5,6 +5,6 @@ channels: dependencies: - fmt - pybind11 - - nlohmann_json + - nlohmann_json # should be removed - catch2 - zeromq diff --git a/examples/zmq_receiver_example.cpp b/examples/zmq_receiver_example.cpp index 08e3c11..ac615a4 100644 --- a/examples/zmq_receiver_example.cpp +++ b/examples/zmq_receiver_example.cpp @@ -7,9 +7,19 @@ int main() { aare::ZmqSocketReceiver socket(endpoint); socket.connect(); char *data = new char[1024 * 1024 * 10]; - aare::zmqHeader header; + aare::ZmqHeader header; + while (true) { int rc = socket.receive(header, reinterpret_cast(data)); + aare::logger::info("Received header: ", header.to_string()); + auto *data_int = reinterpret_cast(data); + for (int i=0;i #include +#include // sleep +using namespace aare; int main() { - std::string endpoint = "tcp://localhost:5555"; - aare::ZmqSocketReceiver socket(endpoint); - socket.connect(); - char *data = new char[1024 * 1024 * 10]; - aare::zmqHeader header; - while (true) { - int rc = socket.receive(header, reinterpret_cast(data)); + std::string endpoint = "tcp://*:5555"; + aare::ZmqSocketSender socket(endpoint); + socket.bind(); + Frame frame(1024, 1024, sizeof(uint32_t) * 8); + for (int i = 0; i < 1024; i++) { + for (int j = 0; j < 1024; j++) { + frame.set(i, j, i + j); + } + } + aare::ZmqHeader header; + header.npixelsx = 1024; + header.npixelsy = 1024; + header.imageSize = sizeof(uint32_t) * 1024 * 1024; + header.dynamicRange = 32; + + int i = 0; + while (true) { + aare::logger::info("Sending frame:", i++); + aare::logger::info("Header size:", sizeof(header.to_string())); + aare::logger::info("Frame size:", frame.size(), "\n"); + + int rc = socket.send(header, frame.data(), frame.size()); + sleep(1); } - delete[] data; return 0; } \ No newline at end of file diff --git a/network_io/CMakeLists.txt b/network_io/CMakeLists.txt index ab6bcfa..02b23cc 100644 --- a/network_io/CMakeLists.txt +++ b/network_io/CMakeLists.txt @@ -1,6 +1,27 @@ -add_library(network_io STATIC src/ZmqSocketReceiver.cpp) + +FetchContent_Declare( + simdjson + GIT_REPOSITORY https://github.com/simdjson/simdjson.git + GIT_TAG tags/v3.8.0 + GIT_SHALLOW TRUE +) + +FetchContent_MakeAvailable(simdjson) + + + +add_library(network_io STATIC + src/ZmqSocketReceiver.cpp + src/ZmqSocketSender.cpp + src/ZmqSocket.cpp + src/ZmqHeader.cpp + ) + + + target_include_directories(network_io PUBLIC include) -target_link_libraries(network_io PRIVATE libzmq fmt::fmt core utils aare_compiler_flags) +target_link_libraries(network_io PRIVATE simdjson libzmq fmt::fmt core utils aare_compiler_flags ) +# target_link_libraries(network_io LINK_PRIVATE ) if(AARE_PYTHON_BINDINGS) set_property(TARGET file_io PROPERTY POSITION_INDEPENDENT_CODE ON) @@ -12,4 +33,4 @@ endif() # ) # target_sources(tests PRIVATE ${TestSources} ) # target_link_libraries(tests PRIVATE core network_io) -# endif() \ No newline at end of file +# endif() diff --git a/network_io/include/aare/ZmqHeader.hpp b/network_io/include/aare/ZmqHeader.hpp new file mode 100644 index 0000000..64ff7ca --- /dev/null +++ b/network_io/include/aare/ZmqHeader.hpp @@ -0,0 +1,106 @@ +#include "aare/utils/logger.hpp" +#include "simdjson.h" +#include +#include +#include +#include +namespace simdjson { +/** + * @brief cast a simdjson::ondemand::value to a std::array + * useful for writing rx_roi from json header + */ +template <> simdjson_inline simdjson::simdjson_result> simdjson::ondemand::value::get() noexcept { + ondemand::array array; + auto error = get_array().get(array); + if (error) { + return error; + } + std::array arr; + int i = 0; + for (auto v : array) { + int64_t val; + error = v.get_int64().get(val); + + if (error) { + return error; + } + arr[i++] = val; + } + return arr; +} + +/** + * @brief cast a simdjson::ondemand::value to a uint32_t + * adds a check for 32bit overflow + */ +template <> simdjson_inline simdjson::simdjson_result simdjson::ondemand::value::get() noexcept { + size_t val; + auto error = get_uint64().get(val); + if (error) { + return error; + } + if (val > std::numeric_limits::max()) { + return 1; + } + return static_cast(val); +} + +} // namespace simdjson + +namespace aare { + +/** zmq header structure (from slsDetectorPackage)*/ +struct ZmqHeader { + /** true if incoming data, false if end of acquisition */ + bool data{true}; + uint32_t jsonversion{0}; + uint32_t dynamicRange{0}; + uint64_t fileIndex{0}; + /** number of detectors/port in x axis */ + uint32_t ndetx{0}; + /** number of detectors/port in y axis */ + uint32_t ndety{0}; + /** number of pixels/channels in x axis for this zmq socket */ + uint32_t npixelsx{0}; + /** number of pixels/channels in y axis for this zmq socket */ + uint32_t npixelsy{0}; + /** number of bytes for an image in this socket */ + uint32_t imageSize{0}; + /** frame number from detector */ + uint64_t acqIndex{0}; + /** frame index (starting at 0 for each acquisition) */ + uint64_t frameIndex{0}; + /** progress in percentage */ + double progress{0}; + /** file name prefix */ + std::string fname; + /** header from detector */ + uint64_t frameNumber{0}; + uint32_t expLength{0}; + uint32_t packetNumber{0}; + uint64_t detSpec1{0}; + uint64_t timestamp{0}; + uint16_t modId{0}; + uint16_t row{0}; + uint16_t column{0}; + uint16_t detSpec2{0}; + uint32_t detSpec3{0}; + uint16_t detSpec4{0}; + uint8_t detType{0}; + uint8_t version{0}; + /** if rows of image should be flipped */ + int flipRows{0}; + /** quad type (eiger hardware specific) */ + uint32_t quad{0}; + /** true if complete image, else missing packets */ + bool completeImage{false}; + /** additional json header */ + std::map addJsonHeader; + /** (xmin, xmax, ymin, ymax) roi only in files written */ + std::array rx_roi{}; + + /** serialize struct to json string */ + std::string to_string() const; + void from_string(std::string &s); +}; +} // namespace aare \ No newline at end of file diff --git a/network_io/include/aare/ZmqSocket.hpp b/network_io/include/aare/ZmqSocket.hpp new file mode 100644 index 0000000..474e054 --- /dev/null +++ b/network_io/include/aare/ZmqSocket.hpp @@ -0,0 +1,40 @@ +#pragma once + +#include +#include +#include + +// Socket to receive data from a ZMQ publisher +// needs to be in sync with the main library (or maybe better use the versioning in the header) + +// forward declare zmq_msg_t to avoid including zmq.h in the header +class zmq_msg_t; + +namespace aare { + +class ZmqSocket { + protected: + void *m_context{nullptr}; + void *m_socket{nullptr}; + std::string m_endpoint; + int m_zmq_hwm{1000}; + int m_timeout_ms{1000}; + size_t m_potential_frame_size{1024 * 1024}; + constexpr static size_t m_max_header_size = 1024; + char *m_header_buffer = new char[m_max_header_size]; + + public: + ZmqSocket() = default; + ~ZmqSocket(); + + ZmqSocket(const ZmqSocket &) = delete; + ZmqSocket operator=(const ZmqSocket &) = delete; + ZmqSocket(ZmqSocket &&) = delete; + + void disconnect(); + void set_zmq_hwm(int hwm); + void set_timeout_ms(int n); + void set_potential_frame_size(size_t size); +}; + +} // namespace aare \ No newline at end of file diff --git a/network_io/include/aare/ZmqSocketReceiver.hpp b/network_io/include/aare/ZmqSocketReceiver.hpp index f4eaf88..8c4a22b 100644 --- a/network_io/include/aare/ZmqSocketReceiver.hpp +++ b/network_io/include/aare/ZmqSocketReceiver.hpp @@ -1,5 +1,8 @@ #pragma once +#include "ZmqHeader.hpp" +#include "ZmqSocket.hpp" + #include #include #include @@ -13,81 +16,11 @@ class zmq_msg_t; namespace aare { -/** zmq header structure (from slsDetectorPackage)*/ -struct zmqHeader { - /** true if incoming data, false if end of acquisition */ - bool data{true}; - uint32_t jsonversion{0}; - uint32_t dynamicRange{0}; - uint64_t fileIndex{0}; - /** number of detectors/port in x axis */ - uint32_t ndetx{0}; - /** number of detectors/port in y axis */ - uint32_t ndety{0}; - /** number of pixels/channels in x axis for this zmq socket */ - uint32_t npixelsx{0}; - /** number of pixels/channels in y axis for this zmq socket */ - uint32_t npixelsy{0}; - /** number of bytes for an image in this socket */ - uint32_t imageSize{0}; - /** frame number from detector */ - uint64_t acqIndex{0}; - /** frame index (starting at 0 for each acquisition) */ - uint64_t frameIndex{0}; - /** progress in percentage */ - double progress{0}; - /** file name prefix */ - std::string fname; - /** header from detector */ - uint64_t frameNumber{0}; - uint32_t expLength{0}; - uint32_t packetNumber{0}; - uint64_t detSpec1{0}; - uint64_t timestamp{0}; - uint16_t modId{0}; - uint16_t row{0}; - uint16_t column{0}; - uint16_t detSpec2{0}; - uint32_t detSpec3{0}; - uint16_t detSpec4{0}; - uint8_t detType{0}; - uint8_t version{0}; - /** if rows of image should be flipped */ - int flipRows{0}; - /** quad type (eiger hardware specific) */ - uint32_t quad{0}; - /** true if complete image, else missing packets */ - bool completeImage{false}; - /** additional json header */ - std::map addJsonHeader; - /** (xmin, xmax, ymin, ymax) roi only in files written */ - std::array rx_roi{}; -}; - -class ZmqSocketReceiver { - void *m_context{nullptr}; - void *m_socket{nullptr}; - std::string m_endpoint; - int m_zmq_hwm{1000}; - int m_timeout_ms{1000}; - constexpr static size_t m_max_header_size = 1024; - char *m_header_buffer = new char[m_max_header_size]; - - bool decode_header(zmqHeader &h); - +class ZmqSocketReceiver : public ZmqSocket { public: ZmqSocketReceiver(const std::string &endpoint); - ~ZmqSocketReceiver(); - ZmqSocketReceiver(const ZmqSocketReceiver &) = delete; - ZmqSocketReceiver operator=(const ZmqSocketReceiver &) = delete; - ZmqSocketReceiver(ZmqSocketReceiver &&) = delete; - void connect(); - void disconnect(); - void set_zmq_hwm(int hwm); - void set_timeout_ms(int n); - - int receive(zmqHeader &header, std::byte *data); + int receive(ZmqHeader &header, std::byte *data, bool serialized_header = false); }; } // namespace aare \ No newline at end of file diff --git a/network_io/include/aare/ZmqSocketSender.hpp b/network_io/include/aare/ZmqSocketSender.hpp new file mode 100644 index 0000000..d5a7767 --- /dev/null +++ b/network_io/include/aare/ZmqSocketSender.hpp @@ -0,0 +1,12 @@ +#pragma once +#include "ZmqHeader.hpp" +#include "ZmqSocket.hpp" + +namespace aare { +class ZmqSocketSender : public ZmqSocket { + public: + ZmqSocketSender(const std::string &endpoint); + void bind(); + int send(ZmqHeader &header, const std::byte *data, size_t size, bool serialize_header = false); +}; +} // namespace aare \ No newline at end of file diff --git a/network_io/src/ZmqHeader.cpp b/network_io/src/ZmqHeader.cpp new file mode 100644 index 0000000..93a81fc --- /dev/null +++ b/network_io/src/ZmqHeader.cpp @@ -0,0 +1,188 @@ + +#include "aare/ZmqHeader.hpp" + +#include "simdjson.h" + +using namespace simdjson; + +// helper functions to write json +// append to string for better performance (not tested) + +/** + * @brief write a digit to a string + * takes key and value and outputs->"key": value, + * @tparam T type of value (int, uint32_t, ...) + * @param s string to append to + * @param key key to write + * @param value value to write + * @return void + * @note + * - can't use concepts here because we are using c++17 + */ +template void write_digit(std::string &s, const std::string &key, const T &value) { + s += "\""; + s += key; + s += "\": "; + s += std::to_string(value); + s += ", "; +} +void write_str(std::string &s, const std::string &key, const std::string &value) { + s += "\""; + s += key; + s += "\": \""; + s += value; + s += "\", "; +} +void write_map(std::string &s, const std::string &key, const std::map &value) { + s += "\""; + s += key; + s += "\": {"; + for (auto &kv : value) { + write_str(s, kv.first, kv.second); + } + s += "}, "; +} +void write_array(std::string &s, const std::string &key, const std::array &value) { + s += "\""; + s += key; + s += "\": ["; + s += std::to_string(value[0]); + s += ", "; + s += std::to_string(value[1]); + s += ", "; + s += std::to_string(value[2]); + s += ", "; + s += std::to_string(value[3]); + s += "], "; +} + +namespace aare { + +std::string ZmqHeader::to_string() const { + std::string s = ""; + s.reserve(1024); + s += "{"; + write_digit(s, "data", data ? 1 : 0); + write_digit(s, "jsonversion", jsonversion); + write_digit(s, "dynamicRange", dynamicRange); + write_digit(s, "fileIndex", fileIndex); + write_digit(s, "ndetx", ndetx); + write_digit(s, "ndety", ndety); + write_digit(s, "npixelsx", npixelsx); + write_digit(s, "npixelsy", npixelsy); + write_digit(s, "imageSize", imageSize); + write_digit(s, "acqIndex", acqIndex); + write_digit(s, "frameIndex", frameIndex); + write_digit(s, "progress", progress); + write_str(s, "fname", fname); + write_digit(s, "frameNumber", frameNumber); + write_digit(s, "expLength", expLength); + write_digit(s, "packetNumber", packetNumber); + write_digit(s, "detSpec1", detSpec1); + write_digit(s, "timestamp", timestamp); + write_digit(s, "modId", modId); + write_digit(s, "row", row); + write_digit(s, "column", column); + write_digit(s, "detSpec2", detSpec2); + write_digit(s, "detSpec3", detSpec3); + write_digit(s, "detSpec4", detSpec4); + write_digit(s, "detType", detType); + write_digit(s, "version", version); + write_digit(s, "flipRows", flipRows); + write_digit(s, "quad", quad); + write_digit(s, "completeImage", completeImage ? 1 : 0); + write_map(s, "addJsonHeader", addJsonHeader); + write_array(s, "rx_roi", rx_roi); + // remove last comma + s.pop_back(); + s.pop_back(); + + s += "}"; + return s; +} + +void ZmqHeader::from_string(std::string &s) { + + simdjson::padded_string ps(s.c_str(), s.size()); + ondemand::parser parser; + ondemand::document doc = parser.iterate(ps); + ondemand::object object = doc.get_object(); + + for (auto field : object) { + std::string_view key = field.unescaped_key(); + + if (key == "data") { + data = uint64_t(field.value()) ? true : false; + } else if (key == "jsonversion") { + jsonversion = uint32_t(field.value()); + } else if (key == "dynamicRange") { + dynamicRange = uint32_t(field.value()); + } else if (key == "fileIndex") { + fileIndex = uint64_t(field.value()); + } else if (key == "ndetx") { + ndetx = uint32_t(field.value()); + } else if (key == "ndety") { + ndety = uint32_t(field.value()); + } else if (key == "npixelsx") { + npixelsx = uint32_t(field.value()); + } else if (key == "npixelsy") { + npixelsy = uint32_t(field.value()); + } else if (key == "imageSize") { + imageSize = uint32_t(field.value()); + } else if (key == "acqIndex") { + acqIndex = uint64_t(field.value()); + } else if (key == "frameIndex") { + frameIndex = uint64_t(field.value()); + } else if (key == "progress") { + progress = field.value().get_double(); + } else if (key == "fname") { + std::string_view tmp = field.value().get_string(); + fname = {tmp.begin(), tmp.end()}; + } else if (key == "frameNumber") { + frameNumber = uint64_t(field.value()); + } else if (key == "expLength") { + expLength = uint32_t(field.value()); + } else if (key == "packetNumber") { + packetNumber = uint32_t(field.value()); + } else if (key == "detSpec1") { + detSpec1 = uint64_t(field.value()); + } else if (key == "timestamp") { + timestamp = uint64_t(field.value()); + } else if (key == "modId") { + modId = uint32_t(field.value()); + } else if (key == "row") { + row = uint32_t(field.value()); + } else if (key == "column") { + column = uint32_t(field.value()); + } else if (key == "detSpec2") { + detSpec2 = uint32_t(field.value()); + } else if (key == "detSpec3") { + detSpec3 = uint32_t(field.value()); + } else if (key == "detSpec4") { + detSpec4 = uint32_t(field.value()); + } else if (key == "detType") { + detType = uint32_t(field.value()); + } else if (key == "version") { + version = uint32_t(field.value()); + } else if (key == "flipRows") { + flipRows = uint32_t(field.value()); + } else if (key == "quad") { + quad = uint32_t(field.value()); + } else if (key == "completeImage") { + completeImage = uint64_t(field.value()) ? true : false; + } else if (key == "addJsonHeader") { + for (auto field2 : field.value().get_object()) { + simdjson::ondemand::raw_json_string tmp; + auto error = field2.key().get(tmp); + std::string key2(tmp.raw()); + std::string val; + error = field2.value().get_string(val); + addJsonHeader[key2] = std::string(val); + } + } else if (key == "rx_roi") { + rx_roi = std::array(field.value()); + } + } +} + +} // namespace aare \ No newline at end of file diff --git a/network_io/src/ZmqSocket.cpp b/network_io/src/ZmqSocket.cpp new file mode 100644 index 0000000..47ddf79 --- /dev/null +++ b/network_io/src/ZmqSocket.cpp @@ -0,0 +1,26 @@ +#include "aare/ZmqSocket.hpp" +#include +#include + +namespace aare { + +void ZmqSocket::disconnect() { + zmq_close(m_socket); + zmq_ctx_destroy(m_context); + m_socket = nullptr; + m_context = nullptr; +} + +ZmqSocket::~ZmqSocket() { + if (m_socket) + disconnect(); + delete[] m_header_buffer; +} + +void ZmqSocket::set_zmq_hwm(int hwm) { m_zmq_hwm = hwm; } + +void ZmqSocket::set_timeout_ms(int n) { m_timeout_ms = n; } + +void ZmqSocket::set_potential_frame_size(size_t size) { m_potential_frame_size = size; } + +} // namespace aare diff --git a/network_io/src/ZmqSocketReceiver.cpp b/network_io/src/ZmqSocketReceiver.cpp index 8b2592a..951de0d 100644 --- a/network_io/src/ZmqSocketReceiver.cpp +++ b/network_io/src/ZmqSocketReceiver.cpp @@ -1,10 +1,13 @@ #include "aare/ZmqSocketReceiver.hpp" +#include "aare/utils/logger.hpp" + #include #include namespace aare { -ZmqSocketReceiver::ZmqSocketReceiver(const std::string &endpoint) : m_endpoint(endpoint) { +ZmqSocketReceiver::ZmqSocketReceiver(const std::string &endpoint) { + m_endpoint = endpoint; memset(m_header_buffer, 0, m_max_header_size); } @@ -16,7 +19,7 @@ void ZmqSocketReceiver::connect() { if (rc) throw std::runtime_error(fmt::format("Could not set ZMQ_RCVHWM: {}", strerror(errno))); - int bufsize = 1024 * 1024 * m_zmq_hwm; + int bufsize = m_potential_frame_size * m_zmq_hwm; fmt::print("Setting ZMQ_RCVBUF to: {} MB\n", bufsize / (1024 * 1024)); rc = zmq_setsockopt(m_socket, ZMQ_RCVBUF, &bufsize, sizeof(bufsize)); if (rc) @@ -26,37 +29,27 @@ void ZmqSocketReceiver::connect() { zmq_setsockopt(m_socket, ZMQ_SUBSCRIBE, "", 0); } -void ZmqSocketReceiver::disconnect() { - zmq_close(m_socket); - zmq_ctx_destroy(m_context); - m_socket = nullptr; - m_context = nullptr; -} +int ZmqSocketReceiver::receive(ZmqHeader &header, std::byte *data, bool serialized_header) { -ZmqSocketReceiver::~ZmqSocketReceiver() { - if (m_socket) - disconnect(); - delete[] m_header_buffer; -} + if (serialized_header) + throw std::runtime_error("Not implemented"); -void ZmqSocketReceiver::set_zmq_hwm(int hwm) { m_zmq_hwm = hwm; } - -void ZmqSocketReceiver::set_timeout_ms(int n) { m_timeout_ms = n; } - -int ZmqSocketReceiver::receive(zmqHeader &header, std::byte *data) { + int header_bytes_received = zmq_recv(m_socket, m_header_buffer, m_max_header_size, 0); // receive header - int header_bytes_received = zmq_recv(m_socket, m_header_buffer, m_max_header_size, 0); m_header_buffer[header_bytes_received] = '\0'; // make sure we zero terminate if (header_bytes_received < 0) { fmt::print("Error receiving header: {}\n", strerror(errno)); return -1; } - fmt::print("Bytes: {}, Header: {}\n", header_bytes_received, m_header_buffer); + aare::logger::debug("Bytes: ", header_bytes_received, ", Header: ", m_header_buffer); - // decode header - if (!decode_header(header)) { - fmt::print("Error decoding header\n"); + // parse header + try { + std::string header_str(m_header_buffer); + header.from_string(header_str); + } catch (const simdjson::simdjson_error &e) { + aare::logger::error(LOCATION + "Error parsing header: ", e.what()); return -1; } @@ -67,16 +60,13 @@ int ZmqSocketReceiver::receive(zmqHeader &header, std::byte *data) { if (!more) { return 0; // no data following header } else { - int data_bytes_received = zmq_recv(m_socket, data, 1024 * 1024 * 2, 0); // TODO! configurable size!!!! + + int data_bytes_received = zmq_recv(m_socket, data, header.imageSize, 0); // TODO! configurable size!!!! if (data_bytes_received == -1) throw std::runtime_error("Got half of a multipart msg!!!"); + aare::logger::debug("Bytes: ", data_bytes_received); } return 1; } -bool ZmqSocketReceiver::decode_header(zmqHeader &h) { - // TODO: implement - return true; -} - } // namespace aare diff --git a/network_io/src/ZmqSocketSender.cpp b/network_io/src/ZmqSocketSender.cpp new file mode 100644 index 0000000..dc15dae --- /dev/null +++ b/network_io/src/ZmqSocketSender.cpp @@ -0,0 +1,32 @@ +#include "aare/ZmqSocketSender.hpp" + +#include +#include + +namespace aare { +ZmqSocketSender::ZmqSocketSender(const std::string &endpoint) { m_endpoint = endpoint; } + +void ZmqSocketSender::bind() { + m_context = zmq_ctx_new(); + m_socket = zmq_socket(m_context, ZMQ_PUB); + int rc = zmq_bind(m_socket, m_endpoint.c_str()); + assert(rc == 0); +} + +int ZmqSocketSender::send(ZmqHeader &header, const std::byte *data, size_t size, bool serialize_header) { + int rc; + if (serialize_header) { + rc = zmq_send(m_socket, &header, sizeof(ZmqHeader), ZMQ_SNDMORE); + assert(rc == sizeof(ZmqHeader)); + } else { + std::string header_str = header.to_string(); + rc = zmq_send(m_socket, header_str.c_str(), header_str.size(), ZMQ_SNDMORE); + assert(rc == header_str.size()); + } + + int rc2 = zmq_send(m_socket, data, size, 0); + assert(rc2 == size); + return rc + rc2; +} + +} // namespace aare \ No newline at end of file diff --git a/utils/include/aare/utils/logger.hpp b/utils/include/aare/utils/logger.hpp index ec7d05d..bf49489 100644 --- a/utils/include/aare/utils/logger.hpp +++ b/utils/include/aare/utils/logger.hpp @@ -20,6 +20,19 @@ template std::ostream &operator<<(std::ostream &out, const std::vec return out; } +// operator overload for std::array +template std::ostream &operator<<(std::ostream &out, const std::array &v) { + out << "["; + size_t last = N - 1; + for (size_t i = 0; i < N; ++i) { + out << v[i]; + if (i != last) + out << ", "; + } + out << "]"; + return out; +} + namespace aare { namespace logger { From 35d2b274f491981f5e5a64863402dec2ea2b7d26 Mon Sep 17 00:00:00 2001 From: Bechir Braham Date: Thu, 4 Apr 2024 16:35:46 +0200 Subject: [PATCH 3/6] add simdjson as public for network_io in ZmqHeader.hpp we are using templates that depend on simdjson. other libraries should also link simdjson --- examples/zmq_receiver_example.cpp | 6 +++--- network_io/CMakeLists.txt | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/examples/zmq_receiver_example.cpp b/examples/zmq_receiver_example.cpp index ac615a4..0fe1220 100644 --- a/examples/zmq_receiver_example.cpp +++ b/examples/zmq_receiver_example.cpp @@ -13,10 +13,10 @@ int main() { int rc = socket.receive(header, reinterpret_cast(data)); aare::logger::info("Received header: ", header.to_string()); auto *data_int = reinterpret_cast(data); - for (int i=0;i Date: Fri, 5 Apr 2024 15:22:31 +0200 Subject: [PATCH 4/6] add tests for zmqheader and remove simdjson warnings --- examples/zmq_receiver_example.cpp | 1 + network_io/CMakeLists.txt | 18 +++--- network_io/include/aare/ZmqHeader.hpp | 30 +++++++++ network_io/src/ZmqHeader.cpp | 28 ++++++--- network_io/test/ZmqHeader.test.cpp | 88 +++++++++++++++++++++++++++ utils/include/aare/utils/logger.hpp | 12 ++++ 6 files changed, 161 insertions(+), 16 deletions(-) create mode 100644 network_io/test/ZmqHeader.test.cpp diff --git a/examples/zmq_receiver_example.cpp b/examples/zmq_receiver_example.cpp index 0fe1220..142d4b9 100644 --- a/examples/zmq_receiver_example.cpp +++ b/examples/zmq_receiver_example.cpp @@ -1,4 +1,5 @@ #include "aare/ZmqSocketReceiver.hpp" +#include #include #include diff --git a/network_io/CMakeLists.txt b/network_io/CMakeLists.txt index d3f27fc..7d544dd 100644 --- a/network_io/CMakeLists.txt +++ b/network_io/CMakeLists.txt @@ -7,7 +7,9 @@ FetchContent_Declare( ) FetchContent_MakeAvailable(simdjson) - +# hide simdjson warnings by making the includes system includes +get_target_property(_inc simdjson INTERFACE_INCLUDE_DIRECTORIES) +target_include_directories(simdjson SYSTEM INTERFACE ${_inc}) add_library(network_io STATIC @@ -27,10 +29,10 @@ if(AARE_PYTHON_BINDINGS) set_property(TARGET file_io PROPERTY POSITION_INDEPENDENT_CODE ON) endif() -# if(AARE_TESTS) -# set(TestSources -# ${CMAKE_CURRENT_SOURCE_DIR}/test/NumpyFile.test.cpp -# ) -# target_sources(tests PRIVATE ${TestSources} ) -# target_link_libraries(tests PRIVATE core network_io) -# endif() +if(AARE_TESTS) + set(TestSources + ${CMAKE_CURRENT_SOURCE_DIR}/test/ZmqHeader.test.cpp + ) + target_sources(tests PRIVATE ${TestSources} ) + target_link_libraries(tests PRIVATE network_io core utils) +endif() \ No newline at end of file diff --git a/network_io/include/aare/ZmqHeader.hpp b/network_io/include/aare/ZmqHeader.hpp index 64ff7ca..c721d78 100644 --- a/network_io/include/aare/ZmqHeader.hpp +++ b/network_io/include/aare/ZmqHeader.hpp @@ -45,6 +45,34 @@ template <> simdjson_inline simdjson::simdjson_result simdjson::ondema return static_cast(val); } +/** + * @brief cast a simdjson::ondemand::value to a std::map +*/ +template <> simdjson_inline simdjson::simdjson_result> simdjson::ondemand::value::get() noexcept { + std::map map; + ondemand::object obj; + auto error = get_object().get(obj); + if (error) { + return error; + } + for (auto field : obj) { + simdjson::ondemand::raw_json_string tmp; + error = field.key().get(tmp); + if (error) { + return error; + } + error = field.value().get(tmp); + if (error) { + return error; + } + std::string_view key_view = field.unescaped_key(); + std::string key_str(key_view.data(), key_view.size()); + std::string_view value_view = field.value().get_string(); + map[key_str] = {value_view.data(), value_view.size()}; + } + return map; +} + } // namespace simdjson namespace aare { @@ -102,5 +130,7 @@ struct ZmqHeader { /** serialize struct to json string */ std::string to_string() const; void from_string(std::string &s); + // compare operator + bool operator==(const ZmqHeader &other) const ; }; } // namespace aare \ No newline at end of file diff --git a/network_io/src/ZmqHeader.cpp b/network_io/src/ZmqHeader.cpp index 93a81fc..3eff459 100644 --- a/network_io/src/ZmqHeader.cpp +++ b/network_io/src/ZmqHeader.cpp @@ -40,6 +40,13 @@ void write_map(std::string &s, const std::string &key, const std::map= 0; i--) { + if (s[i] == ',' or s[i] == ' ') { + s.pop_back(); + } else + break; + } s += "}, "; } void write_array(std::string &s, const std::string &key, const std::array &value) { @@ -171,18 +178,23 @@ void ZmqHeader::from_string(std::string &s) { } else if (key == "completeImage") { completeImage = uint64_t(field.value()) ? true : false; } else if (key == "addJsonHeader") { - for (auto field2 : field.value().get_object()) { - simdjson::ondemand::raw_json_string tmp; - auto error = field2.key().get(tmp); - std::string key2(tmp.raw()); - std::string val; - error = field2.value().get_string(val); - addJsonHeader[key2] = std::string(val); - } + addJsonHeader = std::map(field.value()); } else if (key == "rx_roi") { rx_roi = std::array(field.value()); } } } +bool ZmqHeader::operator==(const ZmqHeader &other) const { + return data == other.data && jsonversion == other.jsonversion && dynamicRange == other.dynamicRange && + fileIndex == other.fileIndex && ndetx == other.ndetx && ndety == other.ndety && npixelsx == other.npixelsx && + npixelsy == other.npixelsy && imageSize == other.imageSize && acqIndex == other.acqIndex && + frameIndex == other.frameIndex && progress == other.progress && fname == other.fname && + frameNumber == other.frameNumber && expLength == other.expLength && packetNumber == other.packetNumber && + detSpec1 == other.detSpec1 && timestamp == other.timestamp && modId == other.modId && row == other.row && + column == other.column && detSpec2 == other.detSpec2 && detSpec3 == other.detSpec3 && + detSpec4 == other.detSpec4 && detType == other.detType && version == other.version && + flipRows == other.flipRows && quad == other.quad && completeImage == other.completeImage && + addJsonHeader == other.addJsonHeader && rx_roi == other.rx_roi; +} } // namespace aare \ No newline at end of file diff --git a/network_io/test/ZmqHeader.test.cpp b/network_io/test/ZmqHeader.test.cpp new file mode 100644 index 0000000..60eb11f --- /dev/null +++ b/network_io/test/ZmqHeader.test.cpp @@ -0,0 +1,88 @@ +#include +#include "aare/ZmqHeader.hpp" +#include "aare/utils/logger.hpp" + + +using namespace aare; +TEST_CASE("Test ZmqHeader") { + ZmqHeader header; + header.npixelsx = 10; + header.npixelsy = 15; + header.data= 1; + header.jsonversion= 2; + header.dynamicRange= 32; + header.fileIndex= 4; + header.ndetx= 5; + header.ndety= 6; + header.imageSize= 4800; + header.acqIndex= 8; + header.frameIndex= 9; + header.progress= 0.1; + header.fname= "test"; + header.frameNumber= 11; + header.expLength= 12; + header.packetNumber= 13; + header.detSpec1= 14; + header.timestamp= 15; + header.modId= 16; + header.row= 17; + header.column= 18; + header.detSpec2= 19; + header.detSpec3= 20; + header.detSpec4= 21; + header.detType= 22; + header.version= 23; + header.flipRows= 24; + header.quad= 25; + header.completeImage= 1; + header.addJsonHeader= {{"key1", "value1"}, {"key2", "value2"}}; + header.rx_roi= {27, 28, 29, 30}; + + std::string json_header = "{" + "\"data\": 1, " + "\"jsonversion\": 2, " + "\"dynamicRange\": 32, " + "\"fileIndex\": 4, " + "\"ndetx\": 5, " + "\"ndety\": 6, " + "\"npixelsx\": 10, " + "\"npixelsy\": 15, " + "\"imageSize\": 4800, " + "\"acqIndex\": 8, " + "\"frameIndex\": 9, " + "\"progress\": 0.100000, " + "\"fname\": \"test\", " + "\"frameNumber\": 11, " + "\"expLength\": 12, " + "\"packetNumber\": 13, " + "\"detSpec1\": 14, " + "\"timestamp\": 15, " + "\"modId\": 16, " + "\"row\": 17, " + "\"column\": 18, " + "\"detSpec2\": 19, " + "\"detSpec3\": 20, " + "\"detSpec4\": 21, " + "\"detType\": 22, " + "\"version\": 23, " + "\"flipRows\": 24, " + "\"quad\": 25, " + "\"completeImage\": 1, " + "\"addJsonHeader\": {\"key1\": \"value1\", \"key2\": \"value2\"}, " + "\"rx_roi\": [27, 28, 29, 30]" + "}"; + + SECTION("Test converting ZmqHeader to json string"){ + REQUIRE(header.to_string() == json_header); + + } + SECTION("Test converting json string to ZmqHeader"){ + ZmqHeader header2; + header2.from_string(json_header); + REQUIRE(header2== header); + } + + + + +} diff --git a/utils/include/aare/utils/logger.hpp b/utils/include/aare/utils/logger.hpp index bf49489..80a1500 100644 --- a/utils/include/aare/utils/logger.hpp +++ b/utils/include/aare/utils/logger.hpp @@ -3,6 +3,7 @@ #include #include #include +#include #define LOCATION std::string(__FILE__) + std::string(":") + std::to_string(__LINE__) + ":" + std::string(__func__) + ":" @@ -32,6 +33,17 @@ template std::ostream &operator<<(std::ostream &out, cons out << "]"; return out; } +// operator overlaod for std::map +template std::ostream &operator<<(std::ostream &out, const std::map &v) { + out << "{"; + size_t i = 0; + for (auto &kv : v) { + out << kv.first << ": " << kv.second << ((++i!=v.size())?", ":""); + } + + out << "}"; + return out; +} namespace aare { From 2f23e4610dee36824e8542d6036962873d61fde2 Mon Sep 17 00:00:00 2001 From: Bechir Date: Fri, 5 Apr 2024 16:04:01 +0200 Subject: [PATCH 5/6] fix warnings --- examples/zmq_receiver_example.cpp | 8 ++++---- examples/zmq_sender_example.cpp | 1 + file_io/include/aare/RawFile.hpp | 7 ++++++- file_io/src/SubFile.cpp | 2 +- file_io/test/NumpyFile.test.cpp | 2 +- network_io/include/aare/ZmqSocketReceiver.hpp | 2 +- network_io/include/aare/ZmqSocketSender.hpp | 2 +- network_io/src/ZmqSocketReceiver.cpp | 10 ++++++---- network_io/src/ZmqSocketSender.cpp | 8 ++++---- 9 files changed, 25 insertions(+), 17 deletions(-) diff --git a/examples/zmq_receiver_example.cpp b/examples/zmq_receiver_example.cpp index 142d4b9..72fab8e 100644 --- a/examples/zmq_receiver_example.cpp +++ b/examples/zmq_receiver_example.cpp @@ -12,12 +12,12 @@ int main() { while (true) { int rc = socket.receive(header, reinterpret_cast(data)); - aare::logger::info("Received header: ", header.to_string()); + aare::logger::info("Received bytes",rc,"Received header: ", header.to_string()); auto *data_int = reinterpret_cast(data); - for (int i = 0; i < header.npixelsx; i++) { - for (int j = 0; j < header.npixelsy; j++) { + for (uint32_t i = 0; i < header.npixelsx; i++) { + for (uint32_t j = 0; j < header.npixelsy; j++) { // verify that the sent data is correct - assert(data_int[i * header.npixelsy + j] == (i + j)); + assert(data_int[i * header.npixelsy + j] == i + j); } } aare::logger::info("Frame verified"); diff --git a/examples/zmq_sender_example.cpp b/examples/zmq_sender_example.cpp index 8e4a6b4..5739d91 100644 --- a/examples/zmq_sender_example.cpp +++ b/examples/zmq_sender_example.cpp @@ -30,6 +30,7 @@ int main() { aare::logger::info("Frame size:", frame.size(), "\n"); int rc = socket.send(header, frame.data(), frame.size()); + aare::logger::info("Sent bytes", rc); sleep(1); } return 0; diff --git a/file_io/include/aare/RawFile.hpp b/file_io/include/aare/RawFile.hpp index 7b8e450..b8f7b7d 100644 --- a/file_io/include/aare/RawFile.hpp +++ b/file_io/include/aare/RawFile.hpp @@ -12,7 +12,12 @@ class RawFile : public FileInterface { public: std::filesystem::path m_fname; // TO be made private! - void write(Frame &frame) override{}; + + // pragma to ignore warnings + void write(Frame &frame) override{ + throw std::runtime_error("Not implemented"); + }; + Frame read() override { return get_frame(this->current_frame++); }; std::vector read(size_t n_frames) override; void read_into(std::byte *image_buf) override { return get_frame_into(this->current_frame++, image_buf); }; diff --git a/file_io/src/SubFile.cpp b/file_io/src/SubFile.cpp index eca4ea5..f25371f 100644 --- a/file_io/src/SubFile.cpp +++ b/file_io/src/SubFile.cpp @@ -89,7 +89,7 @@ template size_t SubFile::read_impl_flip(std::byte *buffer) { size_t SubFile::frame_number(int frame_index) { sls_detector_header h{}; - FILE *fp = fopen(this->m_fname.c_str(), "r"); + fp = fopen(this->m_fname.c_str(), "r"); if (!fp) throw std::runtime_error(fmt::format("Could not open: {} for reading", m_fname.c_str())); fseek(fp, (sizeof(sls_detector_header) + bytes_per_part()) * frame_index, SEEK_SET); diff --git a/file_io/test/NumpyFile.test.cpp b/file_io/test/NumpyFile.test.cpp index 60a5afd..4468b7d 100644 --- a/file_io/test/NumpyFile.test.cpp +++ b/file_io/test/NumpyFile.test.cpp @@ -19,7 +19,7 @@ TEST_CASE("Read a 1D numpy file with int32 data type") { // use the load function to read the full file into a NDArray auto data = f.load(); - for (size_t i = 0; i < 10; i++) { + for (int32_t i = 0; i < 10; i++) { REQUIRE(data(i) == i); } } diff --git a/network_io/include/aare/ZmqSocketReceiver.hpp b/network_io/include/aare/ZmqSocketReceiver.hpp index 8c4a22b..894c750 100644 --- a/network_io/include/aare/ZmqSocketReceiver.hpp +++ b/network_io/include/aare/ZmqSocketReceiver.hpp @@ -20,7 +20,7 @@ class ZmqSocketReceiver : public ZmqSocket { public: ZmqSocketReceiver(const std::string &endpoint); void connect(); - int receive(ZmqHeader &header, std::byte *data, bool serialized_header = false); + size_t receive(ZmqHeader &header, std::byte *data, bool serialized_header = false); }; } // namespace aare \ No newline at end of file diff --git a/network_io/include/aare/ZmqSocketSender.hpp b/network_io/include/aare/ZmqSocketSender.hpp index d5a7767..ce2b91f 100644 --- a/network_io/include/aare/ZmqSocketSender.hpp +++ b/network_io/include/aare/ZmqSocketSender.hpp @@ -7,6 +7,6 @@ class ZmqSocketSender : public ZmqSocket { public: ZmqSocketSender(const std::string &endpoint); void bind(); - int send(ZmqHeader &header, const std::byte *data, size_t size, bool serialize_header = false); + size_t send(ZmqHeader &header, const std::byte *data, size_t size, bool serialize_header = false); }; } // namespace aare \ No newline at end of file diff --git a/network_io/src/ZmqSocketReceiver.cpp b/network_io/src/ZmqSocketReceiver.cpp index 951de0d..7d9a79b 100644 --- a/network_io/src/ZmqSocketReceiver.cpp +++ b/network_io/src/ZmqSocketReceiver.cpp @@ -29,12 +29,14 @@ void ZmqSocketReceiver::connect() { zmq_setsockopt(m_socket, ZMQ_SUBSCRIBE, "", 0); } -int ZmqSocketReceiver::receive(ZmqHeader &header, std::byte *data, bool serialized_header) { +size_t ZmqSocketReceiver::receive(ZmqHeader &header, std::byte *data, bool serialized_header) { + size_t data_bytes_received{}; + if (serialized_header) throw std::runtime_error("Not implemented"); - int header_bytes_received = zmq_recv(m_socket, m_header_buffer, m_max_header_size, 0); + size_t header_bytes_received = zmq_recv(m_socket, m_header_buffer, m_max_header_size, 0); // receive header m_header_buffer[header_bytes_received] = '\0'; // make sure we zero terminate @@ -61,12 +63,12 @@ int ZmqSocketReceiver::receive(ZmqHeader &header, std::byte *data, bool serializ return 0; // no data following header } else { - int data_bytes_received = zmq_recv(m_socket, data, header.imageSize, 0); // TODO! configurable size!!!! + data_bytes_received = zmq_recv(m_socket, data, header.imageSize, 0); // TODO! configurable size!!!! if (data_bytes_received == -1) throw std::runtime_error("Got half of a multipart msg!!!"); aare::logger::debug("Bytes: ", data_bytes_received); } - return 1; + return data_bytes_received + header_bytes_received; } } // namespace aare diff --git a/network_io/src/ZmqSocketSender.cpp b/network_io/src/ZmqSocketSender.cpp index dc15dae..ba296c6 100644 --- a/network_io/src/ZmqSocketSender.cpp +++ b/network_io/src/ZmqSocketSender.cpp @@ -9,12 +9,12 @@ ZmqSocketSender::ZmqSocketSender(const std::string &endpoint) { m_endpoint = end void ZmqSocketSender::bind() { m_context = zmq_ctx_new(); m_socket = zmq_socket(m_context, ZMQ_PUB); - int rc = zmq_bind(m_socket, m_endpoint.c_str()); + size_t rc = zmq_bind(m_socket, m_endpoint.c_str()); assert(rc == 0); } -int ZmqSocketSender::send(ZmqHeader &header, const std::byte *data, size_t size, bool serialize_header) { - int rc; +size_t ZmqSocketSender::send(ZmqHeader &header, const std::byte *data, size_t size, bool serialize_header) { + size_t rc; if (serialize_header) { rc = zmq_send(m_socket, &header, sizeof(ZmqHeader), ZMQ_SNDMORE); assert(rc == sizeof(ZmqHeader)); @@ -24,7 +24,7 @@ int ZmqSocketSender::send(ZmqHeader &header, const std::byte *data, size_t size, assert(rc == header_str.size()); } - int rc2 = zmq_send(m_socket, data, size, 0); + size_t rc2 = zmq_send(m_socket, data, size, 0); assert(rc2 == size); return rc + rc2; } From 598e9f67080b27463041d5772fa6f6369e29642d Mon Sep 17 00:00:00 2001 From: Bechir Date: Fri, 5 Apr 2024 17:05:42 +0200 Subject: [PATCH 6/6] format files --- examples/zmq_receiver_example.cpp | 2 +- file_io/include/aare/RawFile.hpp | 4 +- network_io/include/aare/ZmqHeader.hpp | 10 +- network_io/src/ZmqSocketReceiver.cpp | 2 +- network_io/test/ZmqHeader.test.cpp | 138 ++++++++++++-------------- utils/include/aare/utils/logger.hpp | 4 +- 6 files changed, 76 insertions(+), 84 deletions(-) diff --git a/examples/zmq_receiver_example.cpp b/examples/zmq_receiver_example.cpp index 72fab8e..888d640 100644 --- a/examples/zmq_receiver_example.cpp +++ b/examples/zmq_receiver_example.cpp @@ -12,7 +12,7 @@ int main() { while (true) { int rc = socket.receive(header, reinterpret_cast(data)); - aare::logger::info("Received bytes",rc,"Received header: ", header.to_string()); + aare::logger::info("Received bytes", rc, "Received header: ", header.to_string()); auto *data_int = reinterpret_cast(data); for (uint32_t i = 0; i < header.npixelsx; i++) { for (uint32_t j = 0; j < header.npixelsy; j++) { diff --git a/file_io/include/aare/RawFile.hpp b/file_io/include/aare/RawFile.hpp index b8f7b7d..f5bd4de 100644 --- a/file_io/include/aare/RawFile.hpp +++ b/file_io/include/aare/RawFile.hpp @@ -14,9 +14,7 @@ class RawFile : public FileInterface { std::filesystem::path m_fname; // TO be made private! // pragma to ignore warnings - void write(Frame &frame) override{ - throw std::runtime_error("Not implemented"); - }; + void write(Frame &frame) override { throw std::runtime_error("Not implemented"); }; Frame read() override { return get_frame(this->current_frame++); }; std::vector read(size_t n_frames) override; diff --git a/network_io/include/aare/ZmqHeader.hpp b/network_io/include/aare/ZmqHeader.hpp index c721d78..6096282 100644 --- a/network_io/include/aare/ZmqHeader.hpp +++ b/network_io/include/aare/ZmqHeader.hpp @@ -46,9 +46,11 @@ template <> simdjson_inline simdjson::simdjson_result simdjson::ondema } /** - * @brief cast a simdjson::ondemand::value to a std::map -*/ -template <> simdjson_inline simdjson::simdjson_result> simdjson::ondemand::value::get() noexcept { + * @brief cast a simdjson::ondemand::value to a std::map + */ +template <> +simdjson_inline simdjson::simdjson_result> +simdjson::ondemand::value::get() noexcept { std::map map; ondemand::object obj; auto error = get_object().get(obj); @@ -131,6 +133,6 @@ struct ZmqHeader { std::string to_string() const; void from_string(std::string &s); // compare operator - bool operator==(const ZmqHeader &other) const ; + bool operator==(const ZmqHeader &other) const; }; } // namespace aare \ No newline at end of file diff --git a/network_io/src/ZmqSocketReceiver.cpp b/network_io/src/ZmqSocketReceiver.cpp index 7d9a79b..2d395ea 100644 --- a/network_io/src/ZmqSocketReceiver.cpp +++ b/network_io/src/ZmqSocketReceiver.cpp @@ -32,7 +32,7 @@ void ZmqSocketReceiver::connect() { size_t ZmqSocketReceiver::receive(ZmqHeader &header, std::byte *data, bool serialized_header) { size_t data_bytes_received{}; - + if (serialized_header) throw std::runtime_error("Not implemented"); diff --git a/network_io/test/ZmqHeader.test.cpp b/network_io/test/ZmqHeader.test.cpp index 60eb11f..787dfb6 100644 --- a/network_io/test/ZmqHeader.test.cpp +++ b/network_io/test/ZmqHeader.test.cpp @@ -1,88 +1,80 @@ -#include #include "aare/ZmqHeader.hpp" #include "aare/utils/logger.hpp" - +#include using namespace aare; TEST_CASE("Test ZmqHeader") { ZmqHeader header; header.npixelsx = 10; header.npixelsy = 15; - header.data= 1; - header.jsonversion= 2; - header.dynamicRange= 32; - header.fileIndex= 4; - header.ndetx= 5; - header.ndety= 6; - header.imageSize= 4800; - header.acqIndex= 8; - header.frameIndex= 9; - header.progress= 0.1; - header.fname= "test"; - header.frameNumber= 11; - header.expLength= 12; - header.packetNumber= 13; - header.detSpec1= 14; - header.timestamp= 15; - header.modId= 16; - header.row= 17; - header.column= 18; - header.detSpec2= 19; - header.detSpec3= 20; - header.detSpec4= 21; - header.detType= 22; - header.version= 23; - header.flipRows= 24; - header.quad= 25; - header.completeImage= 1; - header.addJsonHeader= {{"key1", "value1"}, {"key2", "value2"}}; - header.rx_roi= {27, 28, 29, 30}; + header.data = 1; + header.jsonversion = 2; + header.dynamicRange = 32; + header.fileIndex = 4; + header.ndetx = 5; + header.ndety = 6; + header.imageSize = 4800; + header.acqIndex = 8; + header.frameIndex = 9; + header.progress = 0.1; + header.fname = "test"; + header.frameNumber = 11; + header.expLength = 12; + header.packetNumber = 13; + header.detSpec1 = 14; + header.timestamp = 15; + header.modId = 16; + header.row = 17; + header.column = 18; + header.detSpec2 = 19; + header.detSpec3 = 20; + header.detSpec4 = 21; + header.detType = 22; + header.version = 23; + header.flipRows = 24; + header.quad = 25; + header.completeImage = 1; + header.addJsonHeader = {{"key1", "value1"}, {"key2", "value2"}}; + header.rx_roi = {27, 28, 29, 30}; std::string json_header = "{" - "\"data\": 1, " - "\"jsonversion\": 2, " - "\"dynamicRange\": 32, " - "\"fileIndex\": 4, " - "\"ndetx\": 5, " - "\"ndety\": 6, " - "\"npixelsx\": 10, " - "\"npixelsy\": 15, " - "\"imageSize\": 4800, " - "\"acqIndex\": 8, " - "\"frameIndex\": 9, " - "\"progress\": 0.100000, " - "\"fname\": \"test\", " - "\"frameNumber\": 11, " - "\"expLength\": 12, " - "\"packetNumber\": 13, " - "\"detSpec1\": 14, " - "\"timestamp\": 15, " - "\"modId\": 16, " - "\"row\": 17, " - "\"column\": 18, " - "\"detSpec2\": 19, " - "\"detSpec3\": 20, " - "\"detSpec4\": 21, " - "\"detType\": 22, " - "\"version\": 23, " - "\"flipRows\": 24, " - "\"quad\": 25, " - "\"completeImage\": 1, " - "\"addJsonHeader\": {\"key1\": \"value1\", \"key2\": \"value2\"}, " - "\"rx_roi\": [27, 28, 29, 30]" - "}"; + "\"data\": 1, " + "\"jsonversion\": 2, " + "\"dynamicRange\": 32, " + "\"fileIndex\": 4, " + "\"ndetx\": 5, " + "\"ndety\": 6, " + "\"npixelsx\": 10, " + "\"npixelsy\": 15, " + "\"imageSize\": 4800, " + "\"acqIndex\": 8, " + "\"frameIndex\": 9, " + "\"progress\": 0.100000, " + "\"fname\": \"test\", " + "\"frameNumber\": 11, " + "\"expLength\": 12, " + "\"packetNumber\": 13, " + "\"detSpec1\": 14, " + "\"timestamp\": 15, " + "\"modId\": 16, " + "\"row\": 17, " + "\"column\": 18, " + "\"detSpec2\": 19, " + "\"detSpec3\": 20, " + "\"detSpec4\": 21, " + "\"detType\": 22, " + "\"version\": 23, " + "\"flipRows\": 24, " + "\"quad\": 25, " + "\"completeImage\": 1, " + "\"addJsonHeader\": {\"key1\": \"value1\", \"key2\": \"value2\"}, " + "\"rx_roi\": [27, 28, 29, 30]" + "}"; - SECTION("Test converting ZmqHeader to json string"){ - REQUIRE(header.to_string() == json_header); - - } - SECTION("Test converting json string to ZmqHeader"){ + SECTION("Test converting ZmqHeader to json string") { REQUIRE(header.to_string() == json_header); } + SECTION("Test converting json string to ZmqHeader") { ZmqHeader header2; header2.from_string(json_header); - REQUIRE(header2== header); + REQUIRE(header2 == header); } - - - - } diff --git a/utils/include/aare/utils/logger.hpp b/utils/include/aare/utils/logger.hpp index 80a1500..87928bf 100644 --- a/utils/include/aare/utils/logger.hpp +++ b/utils/include/aare/utils/logger.hpp @@ -2,8 +2,8 @@ #include #include #include -#include #include +#include #define LOCATION std::string(__FILE__) + std::string(":") + std::to_string(__LINE__) + ":" + std::string(__func__) + ":" @@ -38,7 +38,7 @@ template std::ostream &operator<<(std::ostream &out, co out << "{"; size_t i = 0; for (auto &kv : v) { - out << kv.first << ": " << kv.second << ((++i!=v.size())?", ":""); + out << kv.first << ": " << kv.second << ((++i != v.size()) ? ", " : ""); } out << "}";