diff --git a/.github/workflows/format.yml b/.github/workflows/format.yml index 37f3857..3accb34 100644 --- a/.github/workflows/format.yml +++ b/.github/workflows/format.yml @@ -13,9 +13,7 @@ jobs: pwd mkdir build cd build - cmake .. - cmake --build . --target=check-format - + find \( -name "*.cpp" -o -name "*.hpp" \) -not -path "./build/*" | xargs -I {} -n 1 -P 10 bash -c "clang-format -i -style=\"file:.clang-format\" {}" diff --git a/CMakeLists.txt b/CMakeLists.txt index 14ed946..7442cbe 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -27,6 +27,8 @@ option(AARE_FETCH_PYBIND11 "Use FetchContent to download pybind11" ON) option(AARE_FETCH_CATCH "Use FetchContent to download catch2" ON) option(AARE_FETCH_JSON "Use FetchContent to download nlohmann::json" ON) option(AARE_FETCH_ZMQ "Use FetchContent to download libzmq" ON) +option(AARE_FETCH_BOOST "Use FetchContent to download boost" ON) + #Convenience option to use system libraries option(AARE_SYSTEM_LIBRARIES "Use system libraries" OFF) @@ -37,6 +39,8 @@ if(AARE_SYSTEM_LIBRARIES) set(AARE_FETCH_CATCH OFF CACHE BOOL "Disabled FetchContent for catch2" FORCE) set(AARE_FETCH_JSON OFF CACHE BOOL "Disabled FetchContent for nlohmann::json" FORCE) set(AARE_FETCH_ZMQ OFF CACHE BOOL "Disabled FetchContent for libzmq" FORCE) + set(AARE_FETCH_BOOST OFF CACHE BOOL "Disabled FetchContent for boost" FORCE) + endif() @@ -72,9 +76,27 @@ if (AARE_FETCH_FMT) ) FetchContent_MakeAvailable(fmt) else() - find_package(fmt 6 REQUIRED) + find_package(fmt 6 REQUIRED) endif() + +if (AARE_FETCH_BOOST) + set(BOOST_INCLUDE_LIBRARIES program_options) + set(BOOST_ENABLE_CMAKE ON) + FetchContent_Declare( + Boost + GIT_REPOSITORY https://github.com/boostorg/boost.git + GIT_TAG boost-1.80.0 + ) + FetchContent_MakeAvailable(Boost) + set(Boost_LIBRARIES Boost::program_options) +else() + + find_package(Boost 1.80 REQUIRED COMPONENTS program_options) +endif() + + + add_library(aare_compiler_flags INTERFACE) target_compile_features(aare_compiler_flags INTERFACE cxx_std_17) @@ -88,7 +110,7 @@ else() INTERFACE -Og -ggdb3 - -D_GLIBCXX_DEBUG + # -D_GLIBCXX_DEBUG # causes errors with boost -D_GLIBCXX_DEBUG_PEDANTIC ) @@ -153,7 +175,7 @@ add_subdirectory(network_io) #Overall target to link to when using the library add_library(aare INTERFACE) -target_link_libraries(aare INTERFACE core file_io utils network_io) +target_link_libraries(aare INTERFACE core file_io utils network_io ${Boost_LIBRARIES}) target_include_directories(aare INTERFACE $ $ diff --git a/aare-environment.yml b/aare-environment.yml index b8112e2..71a0df3 100644 --- a/aare-environment.yml +++ b/aare-environment.yml @@ -8,3 +8,4 @@ dependencies: - nlohmann_json # should be removed - catch2 - zeromq + - boost-cpp diff --git a/core/include/aare/CircularFifo.hpp b/core/include/aare/core/CircularFifo.hpp similarity index 98% rename from core/include/aare/CircularFifo.hpp rename to core/include/aare/core/CircularFifo.hpp index 0684737..9d04883 100644 --- a/core/include/aare/CircularFifo.hpp +++ b/core/include/aare/core/CircularFifo.hpp @@ -6,7 +6,7 @@ #include #include -#include "aare/ProducerConsumerQueue.hpp" +#include "aare/core/ProducerConsumerQueue.hpp" namespace aare { diff --git a/core/include/aare/DType.hpp b/core/include/aare/core/DType.hpp similarity index 87% rename from core/include/aare/DType.hpp rename to core/include/aare/core/DType.hpp index 9a696fb..212a7f0 100644 --- a/core/include/aare/DType.hpp +++ b/core/include/aare/core/DType.hpp @@ -6,6 +6,9 @@ namespace aare { +/** + * @brief enum class to define the endianess of the system + */ enum class endian { #ifdef _WIN32 little = 0, @@ -18,6 +21,10 @@ enum class endian { #endif }; +/** + * @brief class to define the data type of the pixels + * @note only native endianess is supported + */ class DType { // TODO! support for non native endianess? static_assert(sizeof(long) == sizeof(int64_t), "long should be 64bits"); diff --git a/core/include/aare/Frame.hpp b/core/include/aare/core/Frame.hpp similarity index 85% rename from core/include/aare/Frame.hpp rename to core/include/aare/core/Frame.hpp index e785fc2..9ef404e 100644 --- a/core/include/aare/Frame.hpp +++ b/core/include/aare/core/Frame.hpp @@ -1,20 +1,19 @@ #pragma once -#include "aare/NDArray.hpp" -#include "aare/defs.hpp" +#include "aare/core/NDArray.hpp" +#include "aare/core/defs.hpp" #include #include #include #include #include +namespace aare { + /** * @brief Frame class to represent a single frame of data * model class * should be able to work with streams coming from files or network */ - -namespace aare { - class Frame { ssize_t m_rows; ssize_t m_cols; @@ -58,6 +57,14 @@ class Frame { other.m_data = nullptr; other.m_rows = other.m_cols = other.m_bitdepth = 0; } + // copy constructor + Frame(const Frame &other) { + m_rows = other.rows(); + m_cols = other.cols(); + m_bitdepth = other.bitdepth(); + m_data = new std::byte[m_rows * m_cols * m_bitdepth / 8]; + std::memcpy(m_data, other.m_data, m_rows * m_cols * m_bitdepth / 8); + } template NDView view() { std::vector shape = {m_rows, m_cols}; diff --git a/core/include/aare/NDArray.hpp b/core/include/aare/core/NDArray.hpp similarity index 99% rename from core/include/aare/NDArray.hpp rename to core/include/aare/core/NDArray.hpp index b394763..14f0649 100644 --- a/core/include/aare/NDArray.hpp +++ b/core/include/aare/core/NDArray.hpp @@ -7,7 +7,7 @@ memory. TODO! Add expression templates for operators */ -#include "aare/NDView.hpp" +#include "aare/core/NDView.hpp" #include #include diff --git a/core/include/aare/NDView.hpp b/core/include/aare/core/NDView.hpp similarity index 100% rename from core/include/aare/NDView.hpp rename to core/include/aare/core/NDView.hpp diff --git a/core/include/aare/ProducerConsumerQueue.hpp b/core/include/aare/core/ProducerConsumerQueue.hpp similarity index 100% rename from core/include/aare/ProducerConsumerQueue.hpp rename to core/include/aare/core/ProducerConsumerQueue.hpp diff --git a/core/include/aare/VariableSizeClusterFinder.hpp b/core/include/aare/core/VariableSizeClusterFinder.hpp similarity index 99% rename from core/include/aare/VariableSizeClusterFinder.hpp rename to core/include/aare/core/VariableSizeClusterFinder.hpp index 01b7c0c..2cedb0d 100644 --- a/core/include/aare/VariableSizeClusterFinder.hpp +++ b/core/include/aare/core/VariableSizeClusterFinder.hpp @@ -5,7 +5,7 @@ #include #include -#include "aare/NDArray.hpp" +#include "aare/core/NDArray.hpp" const int MAX_CLUSTER_SIZE = 200; namespace aare { diff --git a/core/include/aare/defs.hpp b/core/include/aare/core/defs.hpp similarity index 90% rename from core/include/aare/defs.hpp rename to core/include/aare/core/defs.hpp index 8ce8037..c6806fd 100644 --- a/core/include/aare/defs.hpp +++ b/core/include/aare/core/defs.hpp @@ -1,16 +1,19 @@ #pragma once #include -#include #include + +#include #include #include -#include #include #include namespace aare { +/** + * @brief header contained in parts of frames + */ typedef struct { uint64_t frameNumber; uint32_t expLength; @@ -26,14 +29,13 @@ typedef struct { uint8_t detType; uint8_t version; uint8_t packetMask[64]; -} __attribute__((packed)) sls_detector_header; +} sls_detector_header; struct xy { int row; int col; }; -// using image_shape = std::array; using dynamic_shape = std::vector; enum class DetectorType { Jungfrau, Eiger, Mythen3, Moench, ChipTestBoard }; diff --git a/core/src/DType.cpp b/core/src/DType.cpp index 1a06085..303fda5 100644 --- a/core/src/DType.cpp +++ b/core/src/DType.cpp @@ -1,9 +1,16 @@ -#include "aare/DType.hpp" +#include "aare/core/DType.hpp" #include namespace aare { +/** + * @brief Construct a DType object from a type_info object + * @param t type_info object + * @throw runtime_error if the type is not supported + * @note supported types are: int8_t, uint8_t, int16_t, uint16_t, int32_t, uint32_t, int64_t, uint64_t, float, double + * @note the type_info object is obtained using typeid (e.g. typeid(int)) + */ DType::DType(const std::type_info &t) { if (t == typeid(int8_t)) m_type = TypeIndex::INT8; @@ -31,6 +38,10 @@ DType::DType(const std::type_info &t) { throw std::runtime_error("Could not construct data type. Type not supported."); } +/** + * @brief Get the bitdepth of the data type + * @return bitdepth + */ uint8_t DType::bitdepth() const { switch (m_type) { case TypeIndex::INT8: @@ -56,8 +67,20 @@ uint8_t DType::bitdepth() const { } } +/** + * @brief Construct a DType object from a TypeIndex + * @param ti TypeIndex + * + */ DType::DType(DType::TypeIndex ti) : m_type(ti) {} +/** + * @brief Construct a DType object from a string + * @param sv string_view + * @throw runtime_error if the type is not supported + * @note example strings: " #include namespace aare { +/** + * @brief Construct a new Frame + * @param bytes pointer to the data to be copied into the frame + * @param rows number of rows + * @param cols number of columns + * @param bitdepth bitdepth of the pixels + */ Frame::Frame(std::byte *bytes, ssize_t rows, ssize_t cols, ssize_t bitdepth) : m_rows(rows), m_cols(cols), m_bitdepth(bitdepth) { m_data = new std::byte[rows * cols * bitdepth / 8]; std::memcpy(m_data, bytes, rows * cols * bitdepth / 8); } +/** + * @brief Construct a new Frame + * @param rows number of rows + * @param cols number of columns + * @param bitdepth bitdepth of the pixels + * @note the data is initialized to zero + */ Frame::Frame(ssize_t rows, ssize_t cols, ssize_t bitdepth) : m_rows(rows), m_cols(cols), m_bitdepth(bitdepth) { m_data = new std::byte[rows * cols * bitdepth / 8]; std::memset(m_data, 0, rows * cols * bitdepth / 8); } +/** + * @brief Get the pointer to the pixel at the given row and column + * @param row row index + * @param col column index + * @return pointer to the pixel + * @note the user should cast the pointer to the appropriate type + */ std::byte *Frame::get(int row, int col) { if (row < 0 || row >= m_rows || col < 0 || col >= m_cols) { std::cerr << "Invalid row or column index" << std::endl; diff --git a/core/src/defs.cpp b/core/src/defs.cpp index 4d6e6f8..3ce6c07 100644 --- a/core/src/defs.cpp +++ b/core/src/defs.cpp @@ -1,7 +1,12 @@ -#include "aare/defs.hpp" +#include "aare/core/defs.hpp" namespace aare { +/** + * @brief Convert a DetectorType to a string + * @param type DetectorType + * @return string representation of the DetectorType + */ template <> std::string toString(DetectorType type) { switch (type) { case DetectorType::Jungfrau: @@ -19,6 +24,12 @@ template <> std::string toString(DetectorType type) { } } +/** + * @brief Convert a string to a DetectorType + * @param name string representation of the DetectorType + * @return DetectorType + * @throw runtime_error if the string does not match any DetectorType + */ template <> DetectorType StringTo(std::string name) { if (name == "Jungfrau") return DetectorType::Jungfrau; @@ -31,19 +42,23 @@ template <> DetectorType StringTo(std::string name) { else if (name == "ChipTestBoard") return DetectorType::ChipTestBoard; else { - auto msg = fmt::format("Could not decode dector from: \"{}\"", name); - throw std::runtime_error(msg); + throw std::runtime_error("Could not decode dector from: \"" + name + "\""); } } +/** + * @brief Convert a string to a TimingMode + * @param mode string representation of the TimingMode + * @return TimingMode + * @throw runtime_error if the string does not match any TimingMode + */ template <> TimingMode StringTo(std::string mode) { if (mode == "auto") return TimingMode::Auto; else if (mode == "trigger") return TimingMode::Trigger; else { - auto msg = fmt::format("Could not decode timing mode from: \"{}\"", mode); - throw std::runtime_error(msg); + throw std::runtime_error("Could not decode timing mode from: \"" + mode + "\""); } } diff --git a/core/test/CircularFifo.test.cpp b/core/test/CircularFifo.test.cpp index f17abba..cc18c38 100644 --- a/core/test/CircularFifo.test.cpp +++ b/core/test/CircularFifo.test.cpp @@ -1,6 +1,6 @@ #include -#include "aare/CircularFifo.hpp" +#include "aare/core/CircularFifo.hpp" using aare::CircularFifo; diff --git a/core/test/DType.test.cpp b/core/test/DType.test.cpp index 1490344..98ec528 100644 --- a/core/test/DType.test.cpp +++ b/core/test/DType.test.cpp @@ -1,6 +1,6 @@ -#include "aare/DType.hpp" +#include "aare/core/DType.hpp" #include using aare::DType; diff --git a/core/test/Frame.test.cpp b/core/test/Frame.test.cpp index 31c0c9c..fdff8e9 100644 --- a/core/test/Frame.test.cpp +++ b/core/test/Frame.test.cpp @@ -1,4 +1,4 @@ -#include "aare/Frame.hpp" +#include "aare/core/Frame.hpp" #include using aare::Frame; diff --git a/core/test/NDArray.test.cpp b/core/test/NDArray.test.cpp index 136a5b6..456cf08 100644 --- a/core/test/NDArray.test.cpp +++ b/core/test/NDArray.test.cpp @@ -1,4 +1,4 @@ -#include "aare/NDArray.hpp" +#include "aare/core/NDArray.hpp" #include #include diff --git a/core/test/NDView.test.cpp b/core/test/NDView.test.cpp index 35d20f3..fd6ddc5 100644 --- a/core/test/NDView.test.cpp +++ b/core/test/NDView.test.cpp @@ -1,4 +1,4 @@ -#include "aare/NDView.hpp" +#include "aare/core/NDView.hpp" #include #include diff --git a/core/test/ProducerConsumerQueue.test.cpp b/core/test/ProducerConsumerQueue.test.cpp index baa960a..b1ba414 100644 --- a/core/test/ProducerConsumerQueue.test.cpp +++ b/core/test/ProducerConsumerQueue.test.cpp @@ -1,4 +1,4 @@ -#include "aare/ProducerConsumerQueue.hpp" +#include "aare/core/ProducerConsumerQueue.hpp" #include // using arve::SimpleQueue; diff --git a/core/test/defs.test.cpp b/core/test/defs.test.cpp index a3163a4..25fdc2f 100644 --- a/core/test/defs.test.cpp +++ b/core/test/defs.test.cpp @@ -1,4 +1,4 @@ -#include "aare/defs.hpp" +#include "aare/core/defs.hpp" #include #include TEST_CASE("Enum to string conversion") { diff --git a/core/test/wrappers.test.cpp b/core/test/wrappers.test.cpp index 2718795..64d4338 100644 --- a/core/test/wrappers.test.cpp +++ b/core/test/wrappers.test.cpp @@ -1,5 +1,5 @@ -#include -#include +#include +#include #include #include diff --git a/etc/multimodule_virtual_jf.config b/etc/multimodule_virtual_jf.config new file mode 100644 index 0000000..b377fd7 --- /dev/null +++ b/etc/multimodule_virtual_jf.config @@ -0,0 +1,14 @@ +hostname localhost +rx_hostname localhost + +udp_dstip auto +powerchip 1 +frames 1 +exptime 5us +period 1ms + + + +rx_zmqip 127.0.0.1 +rx_zmqport 5555 +rx_zmqstream 1 diff --git a/etc/virtual_jf.config b/etc/virtual_jf.config index 96ac0db..23725c7 100644 --- a/etc/virtual_jf.config +++ b/etc/virtual_jf.config @@ -5,7 +5,7 @@ udp_dstip auto powerchip 1 frames 1 exptime 5us -period 100ms +period 1ms rx_zmqip 127.0.0.1 rx_zmqport 5555 diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index dfe6150..12a7569 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -1,14 +1,13 @@ -set(EXAMPLE_LIST "json_example;logger_example;numpy_read_example;multiport_example;raw_example;") -set(EXAMPLE_LIST "${EXAMPLE_LIST};mythen_example;numpy_write_example;zmq_receiver_example;zmq_sender_example;") +set(EXAMPLE_LIST "json_example;logger_example;numpy_read_example;multiport_example;raw_example;zmq_restream_example") +set(EXAMPLE_LIST "${EXAMPLE_LIST};mythen_example;numpy_write_example;zmq_receiver_example;zmq_sender_example;") foreach(example ${EXAMPLE_LIST}) add_executable(${example} ${example}.cpp) target_link_libraries(${example} PUBLIC aare PRIVATE aare_compiler_flags) - - endforeach() +message(STATUS "Boost_LIBRARIES: ${Boost_LIBRARIES}") diff --git a/examples/json_example.cpp b/examples/json_example.cpp index 1f9ed2a..30f95cf 100644 --- a/examples/json_example.cpp +++ b/examples/json_example.cpp @@ -1,5 +1,5 @@ // Your First C++ Program -#include "aare/File.hpp" +#include "aare/file_io/File.hpp" #include "aare/utils/logger.hpp" #include diff --git a/examples/multiport_example.cpp b/examples/multiport_example.cpp index b250899..a697e1e 100644 --- a/examples/multiport_example.cpp +++ b/examples/multiport_example.cpp @@ -1,5 +1,5 @@ // Your First C++ Program -#include "aare/File.hpp" +#include "aare/file_io/File.hpp" #include "aare/utils/logger.hpp" #include diff --git a/examples/mythen_example.cpp b/examples/mythen_example.cpp index c7960bc..eee01f5 100644 --- a/examples/mythen_example.cpp +++ b/examples/mythen_example.cpp @@ -1,5 +1,5 @@ // Your First C++ Program -#include "aare/File.hpp" +#include "aare/file_io/File.hpp" #include "aare/utils/logger.hpp" #include diff --git a/examples/numpy_read_example.cpp b/examples/numpy_read_example.cpp index f9fc4bb..6ca3721 100644 --- a/examples/numpy_read_example.cpp +++ b/examples/numpy_read_example.cpp @@ -1,5 +1,5 @@ // Your First C++ Program -#include "aare/File.hpp" +#include "aare/file_io/File.hpp" #include #define AARE_ROOT_DIR_VAR "PROJECT_ROOT_DIR" diff --git a/examples/numpy_write_example.cpp b/examples/numpy_write_example.cpp index c999267..c9029ff 100644 --- a/examples/numpy_write_example.cpp +++ b/examples/numpy_write_example.cpp @@ -1,6 +1,6 @@ // Your First C++ Program -#include "aare/File.hpp" -#include "aare/Frame.hpp" +#include "aare/core/Frame.hpp" +#include "aare/file_io/File.hpp" #include #define AARE_ROOT_DIR_VAR "PROJECT_ROOT_DIR" diff --git a/examples/raw_example.cpp b/examples/raw_example.cpp index c32840d..785f33c 100644 --- a/examples/raw_example.cpp +++ b/examples/raw_example.cpp @@ -1,5 +1,5 @@ // Your First C++ Program -#include "aare/File.hpp" +#include "aare/file_io/File.hpp" #include "aare/utils/logger.hpp" #include diff --git a/examples/zmq_receiver_example.cpp b/examples/zmq_receiver_example.cpp index 888d640..6189ef2 100644 --- a/examples/zmq_receiver_example.cpp +++ b/examples/zmq_receiver_example.cpp @@ -1,27 +1,60 @@ -#include "aare/ZmqSocketReceiver.hpp" +#include "aare/network_io/ZmqSocketReceiver.hpp" +#include "aare/network_io/defs.hpp" + +#include #include #include #include +using namespace aare; +namespace po = boost::program_options; +using namespace std; -int main() { - std::string endpoint = "tcp://localhost:5555"; +int main(int argc, char **argv) { + aare::logger::set_verbosity(aare::logger::DEBUG); + + po::options_description desc("options"); + desc.add_options()("help", "produce help message")("port,p", po::value()->default_value(5555), + "port number"); + po::positional_options_description pd; + pd.add("port", 1); + po::variables_map vm; + try { + auto parsed = po::command_line_parser(argc, argv).options(desc).positional(pd).run(); + po::store(parsed, vm); + po::notify(vm); + + } catch (const boost::program_options::error &e) { + cout << e.what() << "\n"; + cout << desc << "\n"; + return 1; + } + if (vm.count("help")) { + cout << desc << "\n"; + return 1; + } + + auto port = vm["port"].as(); + + std::string endpoint = "udp://127.0.0.1:" + std::to_string(port); aare::ZmqSocketReceiver socket(endpoint); socket.connect(); - char *data = new char[1024 * 1024 * 10]; - aare::ZmqHeader header; - while (true) { - int rc = socket.receive(header, reinterpret_cast(data)); - aare::logger::info("Received bytes", rc, "Received header: ", header.to_string()); - auto *data_int = reinterpret_cast(data); - for (uint32_t i = 0; i < header.npixelsx; i++) { - for (uint32_t j = 0; j < header.npixelsy; j++) { - // verify that the sent data is correct - assert(data_int[i * header.npixelsy + j] == i + j); - } - } - aare::logger::info("Frame verified"); + std::vector v = socket.receive_n(); + aare::logger::info("Received ", v.size(), " frames"); + aare::logger::info("acquisition:", v[0].header.acqIndex); + aare::logger::info("Header size:", v[0].header.to_string().size()); + aare::logger::info("Frame size:", v[0].frame.size()); + aare::logger::info("Header:", v[0].header.to_string()); + + // for (ZmqFrame zmq_frame : v) { + // auto &[header, frame] = zmq_frame; + // for (int i = 0; i < 1024; i++) { + // for (int j = 0; j < 1024; j++) { + // assert(*(uint32_t *)frame.get(i, j) == (uint32_t)i + j); + // } + // } + // aare::logger::info("Frame verified"); + // } } - delete[] data; return 0; } \ No newline at end of file diff --git a/examples/zmq_restream_example.cpp b/examples/zmq_restream_example.cpp new file mode 100644 index 0000000..1d3f496 --- /dev/null +++ b/examples/zmq_restream_example.cpp @@ -0,0 +1,87 @@ +#include +#include + +#include "aare/file_io/File.hpp" +#include "aare/network_io/ZmqSocketSender.hpp" + +#include + +using namespace aare; +using namespace std; +namespace po = boost::program_options; + +int main(int argc, char **argv) { + aare::logger::set_verbosity(aare::logger::DEBUG); + + po::options_description desc("Allowed options"); + desc.add_options()("help", "produce help message")("file,f", po::value(), "input file")( + "port,p", po::value(), "port number")("fps", po::value()->default_value(1), + "frames per second (default 1)")("loop,l", + "loop over the file"); + po::positional_options_description pd; + pd.add("file", -1); + + po::variables_map vm; + try { + auto parsed = po::command_line_parser(argc, argv).options(desc).positional(pd).run(); + po::store(parsed, vm); + po::notify(vm); + + } catch (const boost::program_options::error &e) { + cout << e.what() << "\n"; + cout << desc << "\n"; + return 1; + } + + if (vm.count("help")) { + cout << desc << "\n"; + return 1; + } + if (vm.count("file") != 1) { + aare::logger::error("file is required"); + cout << desc << "\n"; + return 1; + } + if (vm.count("port") != 1) { + aare::logger::error("file is required"); + cout << desc << "\n"; + return 1; + } + + std::string path = vm["file"].as(); + uint16_t port = vm["port"].as(); + bool loop = vm.count("loop") == 1 ? true : false; + uint16_t fps = vm["fps"].as(); + + aare::logger::debug("ARGS: file:", path, "port:", port, "fps:", fps, "loop:", loop); + auto d = round(std::chrono::duration{1. / fps}); + aare::logger::debug("sleeping for", d.count(), "ms"); + + if (!std::filesystem::exists(path)) { + aare::logger::error("file does not exist"); + return 1; + } + + std::filesystem::path tmp(path); + + File file(tmp, "r"); + string endpoint = "tcp://*:" + std::to_string(port); + ZmqSocketSender sender(endpoint); + sender.bind(); + std::this_thread::sleep_for(d); // slow joiner problem should fix this + + for (size_t frameidx = 0; frameidx < file.total_frames(); frameidx++) { + + Frame frame = file.read(); + ZmqHeader header; + header.frameNumber = frameidx; + header.data = true; + header.npixelsx = frame.rows(); + header.npixelsy = frame.cols(); + header.dynamicRange = frame.bitdepth(); + header.size = frame.size(); + + sender.send({header, frame}); + std::this_thread::sleep_for(d); + } +} \ No newline at end of file diff --git a/examples/zmq_sender_example.cpp b/examples/zmq_sender_example.cpp index 5739d91..b0083c6 100644 --- a/examples/zmq_sender_example.cpp +++ b/examples/zmq_sender_example.cpp @@ -1,13 +1,17 @@ -#include "aare/Frame.hpp" -#include "aare/ZmqSocketSender.hpp" +#include "aare/core/Frame.hpp" +#include "aare/network_io/ZmqHeader.hpp" +#include "aare/network_io/ZmqSocketSender.hpp" +#include "aare/network_io/defs.hpp" #include "aare/utils/logger.hpp" +#include // std::time #include #include #include // sleep using namespace aare; int main() { + std::srand(std::time(nullptr)); std::string endpoint = "tcp://*:5555"; aare::ZmqSocketSender socket(endpoint); socket.bind(); @@ -20,16 +24,27 @@ int main() { aare::ZmqHeader header; header.npixelsx = 1024; header.npixelsy = 1024; - header.imageSize = sizeof(uint32_t) * 1024 * 1024; + header.size = sizeof(uint32_t) * 1024 * 1024; header.dynamicRange = 32; - int i = 0; - while (true) { - aare::logger::info("Sending frame:", i++); - aare::logger::info("Header size:", sizeof(header.to_string())); - aare::logger::info("Frame size:", frame.size(), "\n"); + std::vector zmq_frames; + // send two exact frames - int rc = socket.send(header, frame.data(), frame.size()); + int acqid = 0; + while (true) { + zmq_frames.clear(); + header.acqIndex = acqid++; + size_t n_frames = std::rand() % 10 + 1; + + aare::logger::info("acquisition:", header.acqIndex); + aare::logger::info("Header size:", header.to_string().size()); + aare::logger::info("Frame size:", frame.size()); + aare::logger::info("Number of frames:", n_frames); + + for (size_t i = 0; i < n_frames; i++) { + zmq_frames.push_back({header, frame}); + } + size_t rc = socket.send(zmq_frames); aare::logger::info("Sent bytes", rc); sleep(1); } diff --git a/file_io/include/aare/File.hpp b/file_io/include/aare/file_io/File.hpp similarity index 95% rename from file_io/include/aare/File.hpp rename to file_io/include/aare/file_io/File.hpp index 7399543..09a2dd2 100644 --- a/file_io/include/aare/File.hpp +++ b/file_io/include/aare/file_io/File.hpp @@ -1,5 +1,5 @@ #pragma once -#include "aare/FileInterface.hpp" +#include "aare/file_io/FileInterface.hpp" namespace aare { class File { diff --git a/file_io/include/aare/FileFactory.hpp b/file_io/include/aare/file_io/FileFactory.hpp similarity index 94% rename from file_io/include/aare/FileFactory.hpp rename to file_io/include/aare/file_io/FileFactory.hpp index 4572e9c..a23b74e 100644 --- a/file_io/include/aare/FileFactory.hpp +++ b/file_io/include/aare/file_io/FileFactory.hpp @@ -1,6 +1,6 @@ #pragma once -#include "aare/DType.hpp" -#include "aare/FileInterface.hpp" +#include "aare/core/DType.hpp" +#include "aare/file_io/FileInterface.hpp" #include "aare/utils/logger.hpp" #include diff --git a/file_io/include/aare/FileInterface.hpp b/file_io/include/aare/file_io/FileInterface.hpp similarity index 96% rename from file_io/include/aare/FileInterface.hpp rename to file_io/include/aare/file_io/FileInterface.hpp index f403f62..67075e5 100644 --- a/file_io/include/aare/FileInterface.hpp +++ b/file_io/include/aare/file_io/FileInterface.hpp @@ -1,7 +1,7 @@ #pragma once -#include "aare/DType.hpp" -#include "aare/Frame.hpp" -#include "aare/defs.hpp" +#include "aare/core/DType.hpp" +#include "aare/core/Frame.hpp" +#include "aare/core/defs.hpp" #include "aare/utils/logger.hpp" #include #include diff --git a/file_io/include/aare/NumpyFile.hpp b/file_io/include/aare/file_io/NumpyFile.hpp similarity index 93% rename from file_io/include/aare/NumpyFile.hpp rename to file_io/include/aare/file_io/NumpyFile.hpp index 4a8276a..8068745 100644 --- a/file_io/include/aare/NumpyFile.hpp +++ b/file_io/include/aare/file_io/NumpyFile.hpp @@ -1,8 +1,8 @@ #pragma once -#include "aare/DType.hpp" -#include "aare/FileInterface.hpp" -#include "aare/NumpyHelpers.hpp" -#include "aare/defs.hpp" +#include "aare/core/DType.hpp" +#include "aare/core/defs.hpp" +#include "aare/file_io/FileInterface.hpp" +#include "aare/file_io/NumpyHelpers.hpp" #include #include #include diff --git a/file_io/include/aare/NumpyFileFactory.hpp b/file_io/include/aare/file_io/NumpyFileFactory.hpp similarity index 81% rename from file_io/include/aare/NumpyFileFactory.hpp rename to file_io/include/aare/file_io/NumpyFileFactory.hpp index 3e27ed0..f725c91 100644 --- a/file_io/include/aare/NumpyFileFactory.hpp +++ b/file_io/include/aare/file_io/NumpyFileFactory.hpp @@ -1,7 +1,7 @@ #pragma once -#include "aare/FileFactory.hpp" -#include "aare/NumpyFile.hpp" -#include "aare/defs.hpp" +#include "aare/core/defs.hpp" +#include "aare/file_io/FileFactory.hpp" +#include "aare/file_io/NumpyFile.hpp" #include namespace aare { diff --git a/file_io/include/aare/NumpyHelpers.hpp b/file_io/include/aare/file_io/NumpyHelpers.hpp similarity index 95% rename from file_io/include/aare/NumpyHelpers.hpp rename to file_io/include/aare/file_io/NumpyHelpers.hpp index b45fbbb..f00ba27 100644 --- a/file_io/include/aare/NumpyHelpers.hpp +++ b/file_io/include/aare/file_io/NumpyHelpers.hpp @@ -11,8 +11,8 @@ #include #include -#include "aare/DType.hpp" -#include "aare/defs.hpp" +#include "aare/core/DType.hpp" +#include "aare/core/defs.hpp" namespace aare { diff --git a/file_io/include/aare/RawFile.hpp b/file_io/include/aare/file_io/RawFile.hpp similarity index 94% rename from file_io/include/aare/RawFile.hpp rename to file_io/include/aare/file_io/RawFile.hpp index f5bd4de..5e4b544 100644 --- a/file_io/include/aare/RawFile.hpp +++ b/file_io/include/aare/file_io/RawFile.hpp @@ -1,8 +1,8 @@ #pragma once -#include "aare/FileInterface.hpp" -#include "aare/Frame.hpp" -#include "aare/SubFile.hpp" -#include "aare/defs.hpp" +#include "aare/core/Frame.hpp" +#include "aare/core/defs.hpp" +#include "aare/file_io/FileInterface.hpp" +#include "aare/file_io/SubFile.hpp" namespace aare { diff --git a/file_io/include/aare/RawFileFactory.hpp b/file_io/include/aare/file_io/RawFileFactory.hpp similarity index 89% rename from file_io/include/aare/RawFileFactory.hpp rename to file_io/include/aare/file_io/RawFileFactory.hpp index 901b5ff..569ad06 100644 --- a/file_io/include/aare/RawFileFactory.hpp +++ b/file_io/include/aare/file_io/RawFileFactory.hpp @@ -1,6 +1,6 @@ #pragma once -#include "aare/FileFactory.hpp" -#include "aare/RawFile.hpp" +#include "aare/file_io/FileFactory.hpp" +#include "aare/file_io/RawFile.hpp" namespace aare { diff --git a/file_io/include/aare/SubFile.hpp b/file_io/include/aare/file_io/SubFile.hpp similarity index 98% rename from file_io/include/aare/SubFile.hpp rename to file_io/include/aare/file_io/SubFile.hpp index 4a8212a..eea343d 100644 --- a/file_io/include/aare/SubFile.hpp +++ b/file_io/include/aare/file_io/SubFile.hpp @@ -1,5 +1,5 @@ #pragma once -#include "aare/defs.hpp" +#include "aare/core/defs.hpp" #include #include #include diff --git a/file_io/include/aare/helpers.hpp b/file_io/include/aare/file_io/helpers.hpp similarity index 75% rename from file_io/include/aare/helpers.hpp rename to file_io/include/aare/file_io/helpers.hpp index f8d52fb..ac0d060 100644 --- a/file_io/include/aare/helpers.hpp +++ b/file_io/include/aare/file_io/helpers.hpp @@ -1,6 +1,6 @@ #pragma once -#include "aare/FileInterface.hpp" +#include "aare/file_io/FileInterface.hpp" #include #include diff --git a/file_io/src/File.cpp b/file_io/src/File.cpp index 2c397d5..462d2e4 100644 --- a/file_io/src/File.cpp +++ b/file_io/src/File.cpp @@ -1,5 +1,5 @@ -#include "aare/File.hpp" -#include "aare/FileFactory.hpp" +#include "aare/file_io/File.hpp" +#include "aare/file_io/FileFactory.hpp" #include "aare/utils/logger.hpp" namespace aare { diff --git a/file_io/src/FileFactory.cpp b/file_io/src/FileFactory.cpp index 4d1b047..bbe8ed3 100644 --- a/file_io/src/FileFactory.cpp +++ b/file_io/src/FileFactory.cpp @@ -1,7 +1,7 @@ -#include "aare/FileFactory.hpp" -#include "aare/FileInterface.hpp" -#include "aare/NumpyFileFactory.hpp" -#include "aare/RawFileFactory.hpp" +#include "aare/file_io/FileFactory.hpp" +#include "aare/file_io/FileInterface.hpp" +#include "aare/file_io/NumpyFileFactory.hpp" +#include "aare/file_io/RawFileFactory.hpp" #include "aare/utils/logger.hpp" #include diff --git a/file_io/src/NumpyFile.cpp b/file_io/src/NumpyFile.cpp index 50e925d..d18b5ac 100644 --- a/file_io/src/NumpyFile.cpp +++ b/file_io/src/NumpyFile.cpp @@ -1,5 +1,5 @@ -#include "aare/NumpyFile.hpp" +#include "aare/file_io/NumpyFile.hpp" namespace aare { diff --git a/file_io/src/NumpyFileFactory.cpp b/file_io/src/NumpyFileFactory.cpp index 4c73be7..d29c753 100644 --- a/file_io/src/NumpyFileFactory.cpp +++ b/file_io/src/NumpyFileFactory.cpp @@ -1,5 +1,5 @@ -#include "aare/NumpyFileFactory.hpp" -#include "aare/NumpyHelpers.hpp" +#include "aare/file_io/NumpyFileFactory.hpp" +#include "aare/file_io/NumpyHelpers.hpp" namespace aare { diff --git a/file_io/src/NumpyHelpers.cpp b/file_io/src/NumpyHelpers.cpp index 619255d..f65100e 100644 --- a/file_io/src/NumpyHelpers.cpp +++ b/file_io/src/NumpyHelpers.cpp @@ -22,7 +22,8 @@ SOFTWARE. */ -#include "aare/NumpyHelpers.hpp" +#include "aare/file_io/NumpyHelpers.hpp" +#include namespace aare { diff --git a/file_io/src/RawFile.cpp b/file_io/src/RawFile.cpp index ee1d889..7094614 100644 --- a/file_io/src/RawFile.cpp +++ b/file_io/src/RawFile.cpp @@ -1,4 +1,4 @@ -#include "aare/RawFile.hpp" +#include "aare/file_io/RawFile.hpp" #include "aare/utils/logger.hpp" namespace aare { diff --git a/file_io/src/RawFileFactory.cpp b/file_io/src/RawFileFactory.cpp index c11fa20..80d6ffb 100644 --- a/file_io/src/RawFileFactory.cpp +++ b/file_io/src/RawFileFactory.cpp @@ -1,8 +1,8 @@ -#include "aare/RawFileFactory.hpp" -#include "aare/RawFile.hpp" -#include "aare/SubFile.hpp" -#include "aare/defs.hpp" -#include "aare/helpers.hpp" +#include "aare/file_io/RawFileFactory.hpp" +#include "aare/core/defs.hpp" +#include "aare/file_io/RawFile.hpp" +#include "aare/file_io/SubFile.hpp" +#include "aare/file_io/helpers.hpp" #include "aare/utils/logger.hpp" #include diff --git a/file_io/src/SubFile.cpp b/file_io/src/SubFile.cpp index f25371f..22823cf 100644 --- a/file_io/src/SubFile.cpp +++ b/file_io/src/SubFile.cpp @@ -1,5 +1,7 @@ -#include "aare/SubFile.hpp" +#include "aare/file_io/SubFile.hpp" #include "aare/utils/logger.hpp" +#include // memcpy +#include #include // #include @@ -79,7 +81,7 @@ template size_t SubFile::read_impl_flip(std::byte *buffer) { auto src = &tmp[0]; for (int i = 0; i != this->m_rows; ++i) { - memcpy(dst, src, row_size); + std::memcpy(dst, src, row_size); dst -= row_size; src += row_size; } diff --git a/file_io/src/helpers.cpp b/file_io/src/helpers.cpp index 52181d1..0819eb9 100644 --- a/file_io/src/helpers.cpp +++ b/file_io/src/helpers.cpp @@ -1,4 +1,4 @@ -#include "aare/helpers.hpp" +#include "aare/file_io/helpers.hpp" namespace aare { diff --git a/file_io/test/NumpyFile.test.cpp b/file_io/test/NumpyFile.test.cpp index 4468b7d..62ec3b8 100644 --- a/file_io/test/NumpyFile.test.cpp +++ b/file_io/test/NumpyFile.test.cpp @@ -1,5 +1,5 @@ -#include "aare/NumpyFile.hpp" -#include "aare/NDArray.hpp" +#include "aare/file_io/NumpyFile.hpp" +#include "aare/core/NDArray.hpp" #include #include "test_config.hpp" diff --git a/file_io/test/NumpyHelpers.test.cpp b/file_io/test/NumpyHelpers.test.cpp index 2bfde86..dd44332 100644 --- a/file_io/test/NumpyHelpers.test.cpp +++ b/file_io/test/NumpyHelpers.test.cpp @@ -1,4 +1,4 @@ -#include "aare/NumpyHelpers.hpp" //Is this really a public header? +#include "aare/file_io/NumpyHelpers.hpp" //Is this really a public header? #include using namespace aare::NumpyHelpers; diff --git a/file_io/test/RawFile.test.cpp b/file_io/test/RawFile.test.cpp index a2d6e70..34f2f03 100644 --- a/file_io/test/RawFile.test.cpp +++ b/file_io/test/RawFile.test.cpp @@ -1,4 +1,4 @@ -#include "aare/File.hpp" +#include "aare/file_io/File.hpp" #include "aare/utils/logger.hpp" #include #include diff --git a/network_io/include/aare/ZmqSocketReceiver.hpp b/network_io/include/aare/ZmqSocketReceiver.hpp deleted file mode 100644 index 894c750..0000000 --- a/network_io/include/aare/ZmqSocketReceiver.hpp +++ /dev/null @@ -1,26 +0,0 @@ -#pragma once - -#include "ZmqHeader.hpp" -#include "ZmqSocket.hpp" - -#include -#include -#include -#include - -// Socket to receive data from a ZMQ publisher -// needs to be in sync with the main library (or maybe better use the versioning in the header) - -// forward declare zmq_msg_t to avoid including zmq.h in the header -class zmq_msg_t; - -namespace aare { - -class ZmqSocketReceiver : public ZmqSocket { - public: - ZmqSocketReceiver(const std::string &endpoint); - void connect(); - size_t receive(ZmqHeader &header, std::byte *data, bool serialized_header = false); -}; - -} // namespace aare \ No newline at end of file diff --git a/network_io/include/aare/ZmqSocketSender.hpp b/network_io/include/aare/ZmqSocketSender.hpp deleted file mode 100644 index ce2b91f..0000000 --- a/network_io/include/aare/ZmqSocketSender.hpp +++ /dev/null @@ -1,12 +0,0 @@ -#pragma once -#include "ZmqHeader.hpp" -#include "ZmqSocket.hpp" - -namespace aare { -class ZmqSocketSender : public ZmqSocket { - public: - ZmqSocketSender(const std::string &endpoint); - void bind(); - size_t send(ZmqHeader &header, const std::byte *data, size_t size, bool serialize_header = false); -}; -} // namespace aare \ No newline at end of file diff --git a/network_io/include/aare/ZmqHeader.hpp b/network_io/include/aare/network_io/ZmqHeader.hpp similarity index 98% rename from network_io/include/aare/ZmqHeader.hpp rename to network_io/include/aare/network_io/ZmqHeader.hpp index 6096282..4c4b6bf 100644 --- a/network_io/include/aare/ZmqHeader.hpp +++ b/network_io/include/aare/network_io/ZmqHeader.hpp @@ -1,4 +1,7 @@ +#pragma once +#include "aare/core/Frame.hpp" #include "aare/utils/logger.hpp" + #include "simdjson.h" #include #include @@ -95,7 +98,7 @@ struct ZmqHeader { /** number of pixels/channels in y axis for this zmq socket */ uint32_t npixelsy{0}; /** number of bytes for an image in this socket */ - uint32_t imageSize{0}; + uint32_t size{0}; /** frame number from detector */ uint64_t acqIndex{0}; /** frame index (starting at 0 for each acquisition) */ @@ -135,4 +138,5 @@ struct ZmqHeader { // compare operator bool operator==(const ZmqHeader &other) const; }; + } // namespace aare \ No newline at end of file diff --git a/network_io/include/aare/ZmqSocket.hpp b/network_io/include/aare/network_io/ZmqSocket.hpp similarity index 89% rename from network_io/include/aare/ZmqSocket.hpp rename to network_io/include/aare/network_io/ZmqSocket.hpp index 474e054..96ea923 100644 --- a/network_io/include/aare/ZmqSocket.hpp +++ b/network_io/include/aare/network_io/ZmqSocket.hpp @@ -1,7 +1,5 @@ #pragma once -#include -#include #include // Socket to receive data from a ZMQ publisher @@ -12,7 +10,22 @@ class zmq_msg_t; namespace aare { +/** + * @brief parent class for ZmqSocketReceiver and ZmqSocketSender + * contains common functions and variables + */ class ZmqSocket { + public: + ZmqSocket() = default; + ~ZmqSocket(); + ZmqSocket(const ZmqSocket &) = delete; + ZmqSocket operator=(const ZmqSocket &) = delete; + ZmqSocket(ZmqSocket &&) = delete; + void disconnect(); + void set_zmq_hwm(int hwm); + void set_timeout_ms(int n); + void set_potential_frame_size(size_t size); + protected: void *m_context{nullptr}; void *m_socket{nullptr}; @@ -22,19 +35,6 @@ class ZmqSocket { size_t m_potential_frame_size{1024 * 1024}; constexpr static size_t m_max_header_size = 1024; char *m_header_buffer = new char[m_max_header_size]; - - public: - ZmqSocket() = default; - ~ZmqSocket(); - - ZmqSocket(const ZmqSocket &) = delete; - ZmqSocket operator=(const ZmqSocket &) = delete; - ZmqSocket(ZmqSocket &&) = delete; - - void disconnect(); - void set_zmq_hwm(int hwm); - void set_timeout_ms(int n); - void set_potential_frame_size(size_t size); }; } // namespace aare \ No newline at end of file diff --git a/network_io/include/aare/network_io/ZmqSocketReceiver.hpp b/network_io/include/aare/network_io/ZmqSocketReceiver.hpp new file mode 100644 index 0000000..7d207a5 --- /dev/null +++ b/network_io/include/aare/network_io/ZmqSocketReceiver.hpp @@ -0,0 +1,32 @@ +#pragma once + +#include "aare/core/Frame.hpp" +#include "aare/network_io/ZmqHeader.hpp" +#include "aare/network_io/ZmqSocket.hpp" +#include "aare/network_io/defs.hpp" + +#include +#include + +// forward declare zmq_msg_t to avoid including zmq.h in the header +class zmq_msg_t; + +namespace aare { + +/** + * @brief Socket to receive data from a ZMQ publisher + * @note needs to be in sync with the main library (or maybe better use the versioning in the header) + */ +class ZmqSocketReceiver : public ZmqSocket { + public: + ZmqSocketReceiver(const std::string &endpoint); + void connect(); + std::vector receive_n(); + + private: + int receive_data(std::byte *data, size_t size); + ZmqFrame receive_zmqframe(); + ZmqHeader receive_header(); +}; + +} // namespace aare \ No newline at end of file diff --git a/network_io/include/aare/network_io/ZmqSocketSender.hpp b/network_io/include/aare/network_io/ZmqSocketSender.hpp new file mode 100644 index 0000000..2ef379f --- /dev/null +++ b/network_io/include/aare/network_io/ZmqSocketSender.hpp @@ -0,0 +1,21 @@ +#pragma once +#include "aare/core/Frame.hpp" +#include "aare/network_io/ZmqHeader.hpp" +#include "aare/network_io/ZmqSocket.hpp" +#include "aare/network_io/defs.hpp" + +namespace aare { + +/** + * @brief Socket to send data to a ZMQ subscriber + * @note needs to be in sync with the main library (or maybe better use the versioning in the header) + */ +class ZmqSocketSender : public ZmqSocket { + public: + ZmqSocketSender(const std::string &endpoint); + void bind(); + size_t send(const ZmqHeader &header, const std::byte *data, size_t size); + size_t send(const ZmqFrame &zmq_frame); + size_t send(const std::vector &zmq_frames); +}; +} // namespace aare \ No newline at end of file diff --git a/network_io/include/aare/network_io/defs.hpp b/network_io/include/aare/network_io/defs.hpp new file mode 100644 index 0000000..2752750 --- /dev/null +++ b/network_io/include/aare/network_io/defs.hpp @@ -0,0 +1,34 @@ +#pragma once +#include "aare/core/Frame.hpp" +#include "aare/network_io/ZmqHeader.hpp" + +#include +#include + +namespace aare { +/** + * @brief ZmqFrame structure + * wrapper class to contain a ZmqHeader and a Frame + */ +struct ZmqFrame { + ZmqHeader header; + Frame frame; +}; + +namespace network_io { +/** + * @brief NetworkError exception class + */ +class NetworkError : public std::runtime_error { + private: + const char *m_msg; + + public: + NetworkError(const char *msg) : std::runtime_error(msg), m_msg(msg) {} + NetworkError(const std::string msg) : std::runtime_error(msg) { m_msg = strdup(msg.c_str()); } + virtual const char *what() const noexcept override { return m_msg; } +}; + +} // namespace network_io + +} // namespace aare \ No newline at end of file diff --git a/network_io/src/ZmqHeader.cpp b/network_io/src/ZmqHeader.cpp index 3eff459..76bf34f 100644 --- a/network_io/src/ZmqHeader.cpp +++ b/network_io/src/ZmqHeader.cpp @@ -1,5 +1,5 @@ -#include "aare/ZmqHeader.hpp" +#include "aare/network_io/ZmqHeader.hpp" #include "simdjson.h" @@ -77,7 +77,7 @@ std::string ZmqHeader::to_string() const { write_digit(s, "ndety", ndety); write_digit(s, "npixelsx", npixelsx); write_digit(s, "npixelsy", npixelsy); - write_digit(s, "imageSize", imageSize); + write_digit(s, "size", size); write_digit(s, "acqIndex", acqIndex); write_digit(s, "frameIndex", frameIndex); write_digit(s, "progress", progress); @@ -117,7 +117,6 @@ void ZmqHeader::from_string(std::string &s) { for (auto field : object) { std::string_view key = field.unescaped_key(); - if (key == "data") { data = uint64_t(field.value()) ? true : false; } else if (key == "jsonversion") { @@ -134,8 +133,8 @@ void ZmqHeader::from_string(std::string &s) { npixelsx = uint32_t(field.value()); } else if (key == "npixelsy") { npixelsy = uint32_t(field.value()); - } else if (key == "imageSize") { - imageSize = uint32_t(field.value()); + } else if (key == "size") { + size = uint32_t(field.value()); } else if (key == "acqIndex") { acqIndex = uint64_t(field.value()); } else if (key == "frameIndex") { @@ -187,7 +186,7 @@ void ZmqHeader::from_string(std::string &s) { bool ZmqHeader::operator==(const ZmqHeader &other) const { return data == other.data && jsonversion == other.jsonversion && dynamicRange == other.dynamicRange && fileIndex == other.fileIndex && ndetx == other.ndetx && ndety == other.ndety && npixelsx == other.npixelsx && - npixelsy == other.npixelsy && imageSize == other.imageSize && acqIndex == other.acqIndex && + npixelsy == other.npixelsy && size == other.size && acqIndex == other.acqIndex && frameIndex == other.frameIndex && progress == other.progress && fname == other.fname && frameNumber == other.frameNumber && expLength == other.expLength && packetNumber == other.packetNumber && detSpec1 == other.detSpec1 && timestamp == other.timestamp && modId == other.modId && row == other.row && diff --git a/network_io/src/ZmqSocket.cpp b/network_io/src/ZmqSocket.cpp index 47ddf79..e3d6b3f 100644 --- a/network_io/src/ZmqSocket.cpp +++ b/network_io/src/ZmqSocket.cpp @@ -1,9 +1,13 @@ -#include "aare/ZmqSocket.hpp" -#include +#include "aare/network_io/ZmqSocket.hpp" #include namespace aare { +/** + * @brief closes the socket and destroys the context + * @return void + * @note this function is called by the destructor + */ void ZmqSocket::disconnect() { zmq_close(m_socket); zmq_ctx_destroy(m_context); @@ -11,6 +15,10 @@ void ZmqSocket::disconnect() { m_context = nullptr; } +/** + * @brief destructor + * @note called from child classes (ZmqSocketReceiver and ZmqSocketSender) + */ ZmqSocket::~ZmqSocket() { if (m_socket) disconnect(); diff --git a/network_io/src/ZmqSocketReceiver.cpp b/network_io/src/ZmqSocketReceiver.cpp index 2d395ea..60d191b 100644 --- a/network_io/src/ZmqSocketReceiver.cpp +++ b/network_io/src/ZmqSocketReceiver.cpp @@ -1,4 +1,4 @@ -#include "aare/ZmqSocketReceiver.hpp" +#include "aare/network_io/ZmqSocketReceiver.hpp" #include "aare/utils/logger.hpp" #include @@ -6,69 +6,120 @@ namespace aare { +/** + * @brief Construct a new ZmqSocketReceiver object + */ ZmqSocketReceiver::ZmqSocketReceiver(const std::string &endpoint) { m_endpoint = endpoint; memset(m_header_buffer, 0, m_max_header_size); } +/** + * @brief Connect to the given endpoint + * subscribe to a Zmq published + */ void ZmqSocketReceiver::connect() { m_context = zmq_ctx_new(); m_socket = zmq_socket(m_context, ZMQ_SUB); fmt::print("Setting ZMQ_RCVHWM to {}\n", m_zmq_hwm); int rc = zmq_setsockopt(m_socket, ZMQ_RCVHWM, &m_zmq_hwm, sizeof(m_zmq_hwm)); // should be set before connect if (rc) - throw std::runtime_error(fmt::format("Could not set ZMQ_RCVHWM: {}", strerror(errno))); + throw network_io::NetworkError(fmt::format("Could not set ZMQ_RCVHWM: {}", strerror(errno))); int bufsize = m_potential_frame_size * m_zmq_hwm; fmt::print("Setting ZMQ_RCVBUF to: {} MB\n", bufsize / (1024 * 1024)); rc = zmq_setsockopt(m_socket, ZMQ_RCVBUF, &bufsize, sizeof(bufsize)); if (rc) - throw std::runtime_error(fmt::format("Could not set ZMQ_RCVBUF: {}", strerror(errno))); + throw network_io::NetworkError(fmt::format("Could not set ZMQ_RCVBUF: {}", strerror(errno))); zmq_connect(m_socket, m_endpoint.c_str()); zmq_setsockopt(m_socket, ZMQ_SUBSCRIBE, "", 0); } -size_t ZmqSocketReceiver::receive(ZmqHeader &header, std::byte *data, bool serialized_header) { - - size_t data_bytes_received{}; - - if (serialized_header) - throw std::runtime_error("Not implemented"); +/** + * @brief receive a ZmqHeader + * @return ZmqHeader + */ +ZmqHeader ZmqSocketReceiver::receive_header() { + // receive string ZmqHeader + aare::logger::debug("Receiving header"); size_t header_bytes_received = zmq_recv(m_socket, m_header_buffer, m_max_header_size, 0); + aare::logger::debug("Bytes: ", header_bytes_received); - // receive header m_header_buffer[header_bytes_received] = '\0'; // make sure we zero terminate if (header_bytes_received < 0) { - fmt::print("Error receiving header: {}\n", strerror(errno)); - return -1; + throw network_io::NetworkError(LOCATION + "Error receiving header"); } aare::logger::debug("Bytes: ", header_bytes_received, ", Header: ", m_header_buffer); // parse header + ZmqHeader header; try { std::string header_str(m_header_buffer); header.from_string(header_str); } catch (const simdjson::simdjson_error &e) { - aare::logger::error(LOCATION + "Error parsing header: ", e.what()); - return -1; + throw network_io::NetworkError(LOCATION + "Error parsing header: " + e.what()); + } + return header; +} + +/** + * @brief receive data following a ZmqHeader + * @param data pointer to data + * @param size size of data + * @return ZmqHeader + */ +int ZmqSocketReceiver::receive_data(std::byte *data, size_t size) { + int data_bytes_received = zmq_recv(m_socket, data, size, 0); + if (data_bytes_received == -1) + network_io::NetworkError("Got half of a multipart msg!!!"); + aare::logger::debug("Bytes: ", data_bytes_received); + + return data_bytes_received; +} + +/** + * @brief receive a ZmqFrame (header and data) + * @return ZmqFrame + */ +ZmqFrame ZmqSocketReceiver::receive_zmqframe() { + // receive header from zmq and parse it + ZmqHeader header = receive_header(); + + if (!header.data) { + // no data following header + return {header, Frame(0, 0, 0)}; } - // do we have a multipart message (data following header)? - int more; - size_t more_size = sizeof(more); - zmq_getsockopt(m_socket, ZMQ_RCVMORE, &more, &more_size); - if (!more) { - return 0; // no data following header - } else { - - data_bytes_received = zmq_recv(m_socket, data, header.imageSize, 0); // TODO! configurable size!!!! - if (data_bytes_received == -1) - throw std::runtime_error("Got half of a multipart msg!!!"); - aare::logger::debug("Bytes: ", data_bytes_received); + // receive frame data + Frame frame(header.npixelsx, header.npixelsy, header.dynamicRange); + int bytes_received = receive_data(frame.data(), frame.size()); + if (bytes_received == -1) { + throw network_io::NetworkError(LOCATION + "Error receiving frame"); } - return data_bytes_received + header_bytes_received; + if ((uint32_t)bytes_received != header.size) { + throw network_io::NetworkError( + fmt::format("{} Expected {} bytes but received {}", LOCATION, header.size, bytes_received)); + } + return {header, std::move(frame)}; +} + +/** + * @brief receive multiple ZmqFrames (header and data) + * @return std::vector + */ +std::vector ZmqSocketReceiver::receive_n() { + std::vector frames; + while (true) { + // receive header and frame + ZmqFrame zmq_frame = receive_zmqframe(); + if (!zmq_frame.header.data) { + break; + } + frames.push_back(zmq_frame); + } + return frames; } } // namespace aare diff --git a/network_io/src/ZmqSocketSender.cpp b/network_io/src/ZmqSocketSender.cpp index ba296c6..3600234 100644 --- a/network_io/src/ZmqSocketSender.cpp +++ b/network_io/src/ZmqSocketSender.cpp @@ -1,27 +1,46 @@ -#include "aare/ZmqSocketSender.hpp" - +#include "aare/network_io/ZmqSocketSender.hpp" #include #include namespace aare { + +/** + * Constructor + * @param endpoint ZMQ endpoint + */ ZmqSocketSender::ZmqSocketSender(const std::string &endpoint) { m_endpoint = endpoint; } +/** + * bind to the given port + */ void ZmqSocketSender::bind() { m_context = zmq_ctx_new(); m_socket = zmq_socket(m_context, ZMQ_PUB); size_t rc = zmq_bind(m_socket, m_endpoint.c_str()); - assert(rc == 0); + if (rc != 0) { + std::string error = zmq_strerror(zmq_errno()); + throw network_io::NetworkError("zmq_bind failed: " + error); + } } -size_t ZmqSocketSender::send(ZmqHeader &header, const std::byte *data, size_t size, bool serialize_header) { +/** + * send a header and data + * @param header + * @param data pointer to data + * @param size size of data + * @return number of bytes sent + */ +size_t ZmqSocketSender::send(const ZmqHeader &header, const std::byte *data, size_t size) { size_t rc; - if (serialize_header) { - rc = zmq_send(m_socket, &header, sizeof(ZmqHeader), ZMQ_SNDMORE); - assert(rc == sizeof(ZmqHeader)); - } else { - std::string header_str = header.to_string(); - rc = zmq_send(m_socket, header_str.c_str(), header_str.size(), ZMQ_SNDMORE); - assert(rc == header_str.size()); + // if (serialize_header) { + // rc = zmq_send(m_socket, &header, sizeof(ZmqHeader), ZMQ_SNDMORE); + // assert(rc == sizeof(ZmqHeader)); + std::string header_str = header.to_string(); + aare::logger::debug("Header :", header_str); + rc = zmq_send(m_socket, header_str.c_str(), header_str.size(), ZMQ_SNDMORE); + assert(rc == header_str.size()); + if (data == nullptr) { + return rc; } size_t rc2 = zmq_send(m_socket, data, size, 0); @@ -29,4 +48,42 @@ size_t ZmqSocketSender::send(ZmqHeader &header, const std::byte *data, size_t si return rc + rc2; } +/** + * Send a frame with a header + * @param ZmqFrame that contains a header and a frame + * @return number of bytes sent + */ +size_t ZmqSocketSender::send(const ZmqFrame &zmq_frame) { + const Frame &frame = zmq_frame.frame; + // send frame + size_t rc = send(zmq_frame.header, frame.data(), frame.size()); + // send end of message header + ZmqHeader end_header = zmq_frame.header; + end_header.data = false; + size_t rc2 = send(end_header, nullptr, 0); + + return rc + rc2; +} + +/** + * Send a vector of headers and frames + * @param zmq_frames vector of ZmqFrame + * @return number of bytes sent + */ +size_t ZmqSocketSender::send(const std::vector &zmq_frames) { + size_t rc = 0; + for (size_t i = 0; i < zmq_frames.size(); i++) { + const ZmqHeader &header = zmq_frames[i].header; + const Frame &frame = zmq_frames[i].frame; + // send header and frame + if (i < zmq_frames.size() - 1) { + // send header and frame + rc += send(header, frame.data(), frame.size()); + } else { + // send header, frame and end of message header + rc += send({header, frame}); + } + } + return rc; +} } // namespace aare \ No newline at end of file diff --git a/network_io/test/ZmqHeader.test.cpp b/network_io/test/ZmqHeader.test.cpp index 787dfb6..de5b8b9 100644 --- a/network_io/test/ZmqHeader.test.cpp +++ b/network_io/test/ZmqHeader.test.cpp @@ -1,4 +1,4 @@ -#include "aare/ZmqHeader.hpp" +#include "aare/network_io/ZmqHeader.hpp" #include "aare/utils/logger.hpp" #include @@ -13,7 +13,7 @@ TEST_CASE("Test ZmqHeader") { header.fileIndex = 4; header.ndetx = 5; header.ndety = 6; - header.imageSize = 4800; + header.size = 4800; header.acqIndex = 8; header.frameIndex = 9; header.progress = 0.1; @@ -46,7 +46,7 @@ TEST_CASE("Test ZmqHeader") { "\"ndety\": 6, " "\"npixelsx\": 10, " "\"npixelsy\": 15, " - "\"imageSize\": 4800, " + "\"size\": 4800, " "\"acqIndex\": 8, " "\"frameIndex\": 9, " "\"progress\": 0.100000, " diff --git a/python/src/bindings.cpp b/python/src/bindings.cpp index b73d4b2..2a0f50a 100644 --- a/python/src/bindings.cpp +++ b/python/src/bindings.cpp @@ -5,8 +5,8 @@ #include #include "aare/FileHandler.hpp" -#include "aare/Frame.hpp" -#include "aare/defs.hpp" +#include "aare/core/Frame.hpp" +#include "aare/core/defs.hpp" namespace py = pybind11; diff --git a/utils/include/aare/utils/logger.hpp b/utils/include/aare/utils/logger.hpp index 87928bf..3091971 100644 --- a/utils/include/aare/utils/logger.hpp +++ b/utils/include/aare/utils/logger.hpp @@ -5,10 +5,19 @@ #include #include +/** + * @brief LOCATION macro to get the current location in the code + */ #define LOCATION std::string(__FILE__) + std::string(":") + std::to_string(__LINE__) + ":" + std::string(__func__) + ":" -// operator overload to print vectors -// typename T must be printable (i.e. have the << operator) +/** + * @brief operator overload for std::vector + * @tparam T type of the vector. T should have operator<< defined + * @param out output stream + * @param v vector to print + * @return std::ostream& output stream + * @note this is used to print vectors in the logger (or anywhere else) + */ template std::ostream &operator<<(std::ostream &out, const std::vector &v) { out << "["; size_t last = v.size() - 1; @@ -21,7 +30,15 @@ template std::ostream &operator<<(std::ostream &out, const std::vec return out; } -// operator overload for std::array +/** + * @brief operator overload for std::array + * @tparam T type of the array. T should have operator<< defined + * @tparam N size of the array + * @param out output stream + * @param v array to print + * @return std::ostream& output stream + * + */ template std::ostream &operator<<(std::ostream &out, const std::array &v) { out << "["; size_t last = N - 1; @@ -33,7 +50,16 @@ template std::ostream &operator<<(std::ostream &out, cons out << "]"; return out; } -// operator overlaod for std::map + +/** + * @brief operator overload for std::map + * @tparam K type of the key in the map. K should have operator<< defined + * @tparam V type of the value in the map. V should have operator<< defined + * @param out output stream + * @param v map to print + * @return std::ostream& output stream + * + */ template std::ostream &operator<<(std::ostream &out, const std::map &v) { out << "{"; size_t i = 0; @@ -48,6 +74,9 @@ template std::ostream &operator<<(std::ostream &out, co namespace aare { namespace logger { +/** + * @brief enum to define the logging level + */ enum LOGGING_LEVEL { DEBUG = 0, INFO = 1, @@ -56,36 +85,64 @@ enum LOGGING_LEVEL { }; +/** + * @brief Logger class to log messages + * @note can be used to log to file or to a std::streambuf (like std::cout) + * @note by default logs to std::cout and std::cerr with INFO verbosity + */ class Logger { - std::streambuf *standard_buf = std::cout.rdbuf(); - std::streambuf *error_buf = std::cerr.rdbuf(); - std::ostream *standard_output; - std::ostream *error_output; - LOGGING_LEVEL VERBOSITY_LEVEL = LOGGING_LEVEL::INFO; - - std::ofstream out_file; - public: + /** + * @brief get the instance of the logger + */ + Logger() { + standard_output = new std::ostream(standard_buf); + error_output = new std::ostream(error_buf); + } + + /** + * @brief set the output file for the logger by filename + * @param filename name of the file to log to + * @return void + */ void set_output_file(std::string filename) { if (out_file.is_open()) out_file.close(); out_file.open(filename); set_streams(out_file.rdbuf()); } + + /** + * @brief set the output streams for the logger + * @param out output stream for standard output + * @param err output stream for error output + * @return void + */ void set_streams(std::streambuf *out, std::streambuf *err) { delete standard_output; delete error_output; standard_output = new std::ostream(out); error_output = new std::ostream(err); } + /** + * @brief set the output streams for the logger + * @param out output stream for both standard and error output + * @return void + */ void set_streams(std::streambuf *out) { set_streams(out, out); } - void set_verbosity(LOGGING_LEVEL level) { VERBOSITY_LEVEL = level; } - Logger() { - standard_output = new std::ostream(standard_buf); - error_output = new std::ostream(error_buf); - } + /** + * @brief set the verbosity level of the logger + * @param level verbosity level + */ + void set_verbosity(LOGGING_LEVEL level) { VERBOSITY_LEVEL = level; } + + /** + * @brief destructor for the logger + * @note closes the file if it is open + * @note flushes the output streams + */ ~Logger() { if (out_file.is_open()) out_file.close(); @@ -95,16 +152,51 @@ class Logger { delete standard_output; delete error_output; } + + /** + * @brief log a message + * @tparam level logging level + * @tparam Strings variadic template for inferring the types of the arguments (not necessarily strings but can be + * any printable type) + * @param s arguments to log + * @return void + */ template void log(const Strings... s) { if (level >= VERBOSITY_LEVEL) log_(s...); } + /** + * @brief log a message with DEBUG level + */ template void debug(const Strings... s) { log("[DEBUG]", s...); } + /** + * @brief log a message with INFO level + */ template void info(const Strings... s) { log("[INFO]", s...); } + /** + * @brief log a message with WARNING level + */ template void warn(const Strings... s) { log("[WARN]", s...); } + /** + * @brief log a message with ERROR level + */ template void error(const Strings... s) { log("[ERROR]", s...); } private: + std::streambuf *standard_buf = std::cout.rdbuf(); + std::streambuf *error_buf = std::cerr.rdbuf(); + std::ostream *standard_output; + std::ostream *error_output; + LOGGING_LEVEL VERBOSITY_LEVEL = LOGGING_LEVEL::INFO; + + std::ofstream out_file; + /** + * @brief log_ function private function to log messages + * @tparam level logging level + * @note this is the final function in the recursive template function log_ + * @note this function is called when there are no more arguments to log + * @note adds a newline at the end of the log message + */ template void log_() { if (level == LOGGING_LEVEL::ERROR) { *error_output << std::endl; @@ -112,6 +204,16 @@ class Logger { *standard_output << std::endl; } } + + /** + * @brief log_ recursive function private function to log messages + * @tparam level logging level + * @tparam First type of the first argument + * @tparam Strings variadic template for inferring the types of the arguments + * @param arg first argument to log + * @param s rest of the arguments to log + * @note called at first from the public log function + */ template void log_(First arg, const Strings... s) { if (level == LOGGING_LEVEL::ERROR) { *error_output << (arg) << ' '; @@ -125,16 +227,44 @@ class Logger { }; namespace internal { - +/** + * @brief global instance of the logger + */ extern aare::logger::Logger logger_instance; } // namespace internal +/** + * functions below are the public interface to the logger. + * These functions call the corresponding functions in the logger_instance + * @note this is done to avoid having to pass the logger_instance around and allow users to assign their own + * logger_instance + */ + +/** + * @brief log a message with the given level + * @tparam level logging level + * @tparam Strings variadic template for inferring the types of the arguments + * @param s arguments to log + * @return void + */ template void log(const Strings... s) { internal::logger_instance.log(s...); } +/** + * @brief log a message with DEBUG level + */ template void debug(const Strings... s) { internal::logger_instance.debug(s...); } +/** + * @brief log a message with INFO level + */ template void info(const Strings... s) { internal::logger_instance.info(s...); } +/** + * @brief log a message with WARNING level + */ template void warn(const Strings... s) { internal::logger_instance.warn(s...); } +/** + * @brief log a message with ERROR level + */ template void error(const Strings... s) { internal::logger_instance.error(s...); } extern void set_streams(std::streambuf *out, std::streambuf *err);