diff --git a/core/include/aare/core/defs.hpp b/core/include/aare/core/defs.hpp index 2249075..ea8b641 100644 --- a/core/include/aare/core/defs.hpp +++ b/core/include/aare/core/defs.hpp @@ -45,13 +45,14 @@ struct sls_detector_header { std::array packetMask; }; -struct xy { - size_t row; - size_t col; - bool operator==(const xy &other) const { return row == other.row && col == other.col; } - bool operator!=(const xy &other) const { return !(*this == other); } +template struct t_xy { + T row; + T col; + bool operator==(const t_xy &other) const { return row == other.row && col == other.col; } + bool operator!=(const t_xy &other) const { return !(*this == other); } std::string to_string() const { return "{ x: " + std::to_string(row) + " y: " + std::to_string(col) + " }"; } }; +typedef t_xy xy; using dynamic_shape = std::vector; diff --git a/docs/commands_multimodule.md b/docs/commands_multimodule.md new file mode 100644 index 0000000..a3b23c8 --- /dev/null +++ b/docs/commands_multimodule.md @@ -0,0 +1,8 @@ +# receive data from two zmqstreams +```bash +killall jungfrauDetectorServer_virtual +jungfrauDetectorServer_virtual +jungfrauDetectorServer_virtual -p 1956 +slsMultiReceiver 1980 2 0 +sls_detector_put config etc/multimodule_virtual_jf.config +``` diff --git a/etc/multimodule_virtual_jf.config b/etc/multimodule_virtual_jf.config index b377fd7..b3d113c 100644 --- a/etc/multimodule_virtual_jf.config +++ b/etc/multimodule_virtual_jf.config @@ -1,14 +1,22 @@ -hostname localhost -rx_hostname localhost +hostname 127.0.0.1:1952+127.0.0.1:1956+ +rx_hostname 127.0.0.1:1980+127.0.0.1:1981+ -udp_dstip auto +0:udp_dstport 50001 +0:udp_dstport2 50002 +1:udp_dstport 50003 +1:udp_dstport2 50004 + +udp_dstip 127.0.0.1 +udp_dstip2 127.0.0.1 powerchip 1 -frames 1 +frames 10 exptime 5us -period 1ms +period 100ms +rx_discardpolicy discardpartial rx_zmqip 127.0.0.1 rx_zmqport 5555 rx_zmqstream 1 + diff --git a/etc/virtual_jf.config b/etc/virtual_jf.config index 23725c7..a8eba3e 100644 --- a/etc/virtual_jf.config +++ b/etc/virtual_jf.config @@ -3,9 +3,9 @@ rx_hostname localhost udp_dstip auto powerchip 1 -frames 1 +frames 10 exptime 5us -period 1ms +period 100ms rx_zmqip 127.0.0.1 rx_zmqport 5555 diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 66e403d..2180098 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -1,11 +1,11 @@ set(EXAMPLE_LIST "json_example;logger_example;numpy_read_example;multiport_example;raw_example;zmq_restream_example") set(EXAMPLE_LIST "${EXAMPLE_LIST};mythen_example;numpy_write_example;zmq_receiver_example;zmq_sender_example;") -set(EXAMPLE_LIST "${EXAMPLE_LIST};cluster_example") +set(EXAMPLE_LIST "${EXAMPLE_LIST};cluster_example;zmq_multi_receiver;zmq_task_ventilator;zmq_worker;zmq_sink") foreach(example ${EXAMPLE_LIST}) add_executable(${example} ${example}.cpp) target_include_directories(${example} PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include) - target_link_libraries(${example} PUBLIC aare PRIVATE aare_compiler_flags) + target_link_libraries(${example} PUBLIC aare PRIVATE aare_compiler_flags libzmq) endforeach() diff --git a/examples/zmq_multi_receiver.cpp b/examples/zmq_multi_receiver.cpp new file mode 100644 index 0000000..cf1dd94 --- /dev/null +++ b/examples/zmq_multi_receiver.cpp @@ -0,0 +1,29 @@ +#include "aare/examples/defs.hpp" +#include "aare/network_io/ZmqMultiReceiver.hpp" +#include "aare/network_io/ZmqSocketReceiver.hpp" +#include "aare/network_io/defs.hpp" + +#include +#include +#include +#include +using namespace aare; +namespace po = boost::program_options; +using namespace std; + +int main() { + logger::set_verbosity(logger::DEBUG); + + std::string const endpoint1 = "tcp://127.0.0.1:" + std::to_string(5555); + std::string const endpoint2 = "tcp://127.0.0.1:" + std::to_string(5556); + + ZmqMultiReceiver socket({endpoint1, endpoint2}, {1, 2}); + + socket.connect(); + while (true) { + auto frames = socket.receive_n(); + logger::info("Received", frames.size(), "frames"); + } + + return 0; +} \ No newline at end of file diff --git a/examples/zmq_receiver_example.cpp b/examples/zmq_receiver_example.cpp index 4f50b01..3ef23d2 100644 --- a/examples/zmq_receiver_example.cpp +++ b/examples/zmq_receiver_example.cpp @@ -36,7 +36,7 @@ int main(int argc, char **argv) { auto port = vm["port"].as(); - std::string const endpoint = "udp://127.0.0.1:" + std::to_string(port); + std::string const endpoint = "tcp://127.0.0.1:" + std::to_string(port); aare::ZmqSocketReceiver socket(endpoint); socket.connect(); while (true) { diff --git a/examples/zmq_restream_example.cpp b/examples/zmq_restream_example.cpp index 28da551..4154047 100644 --- a/examples/zmq_restream_example.cpp +++ b/examples/zmq_restream_example.cpp @@ -44,7 +44,7 @@ int main(int argc, char **argv) { return 1; } if (vm.count("port") != 1) { - aare::logger::error("file is required"); + aare::logger::error("port is required"); cout << desc << "\n"; return 1; } @@ -77,9 +77,9 @@ int main(int argc, char **argv) { ZmqHeader header; header.frameNumber = frameidx; header.data = true; - header.npixelsx = frame.rows(); - header.npixelsy = frame.cols(); - header.dynamicRange = frame.bitdepth(); + header.shape.row = frame.rows(); + header.shape.col = frame.cols(); + header.bitmode = frame.bitdepth(); header.size = frame.size(); sender.send({header, frame}); diff --git a/examples/zmq_sender_example.cpp b/examples/zmq_sender_example.cpp index 1e22503..d20eca7 100644 --- a/examples/zmq_sender_example.cpp +++ b/examples/zmq_sender_example.cpp @@ -23,10 +23,9 @@ int main() { } } aare::ZmqHeader header; - header.npixelsx = 1024; - header.npixelsy = 1024; + header.shape = {1024, 1024}; header.size = sizeof(uint32_t) * 1024 * 1024; - header.dynamicRange = 32; + header.bitmode = 32; std::vector zmq_frames; // send two exact frames diff --git a/examples/zmq_sink.cpp b/examples/zmq_sink.cpp new file mode 100644 index 0000000..add3b36 --- /dev/null +++ b/examples/zmq_sink.cpp @@ -0,0 +1,30 @@ + +#include "aare/network_io/ZmqSink.hpp" +#include "aare/network_io/ZmqSocketReceiver.hpp" +#include "aare/network_io/ZmqVentilator.hpp" +#include "aare/network_io/ZmqWorker.hpp" + +#include "zmq.h" +#include +#include +#include +#include +using namespace aare; +namespace po = boost::program_options; +using namespace std; + +int main() { + logger::set_verbosity(logger::DEBUG); + // 1. bind sink to endpoint + ZmqSink sink("tcp://*:4322"); + + int i = 0; + while (true) { + // 2. receive Task from ventilator + Task *task = sink.pull(); + logger::info("Received", i++, "tasks"); + + Task::destroy(task); + } + // read the command line arguments +} \ No newline at end of file diff --git a/examples/zmq_task_ventilator.cpp b/examples/zmq_task_ventilator.cpp new file mode 100644 index 0000000..169e610 --- /dev/null +++ b/examples/zmq_task_ventilator.cpp @@ -0,0 +1,72 @@ +#include "aare/network_io/ZmqSocketReceiver.hpp" +#include "aare/network_io/ZmqVentilator.hpp" + +#include "zmq.h" +#include +#include +#include +#include +using namespace aare; +namespace po = boost::program_options; +using namespace std; + +string setup(int argc, char **argv) { + logger::set_verbosity(logger::DEBUG); + po::options_description desc("options"); + desc.add_options()("help", "produce help message")("port,p", po::value()->default_value(5555), + "port number"); + po::positional_options_description pd; + pd.add("port", 1); + po::variables_map vm; + try { + auto parsed = po::command_line_parser(argc, argv).options(desc).positional(pd).run(); + po::store(parsed, vm); + po::notify(vm); + + } catch (const boost::program_options::error &e) { + cout << e.what() << "\n"; + cout << desc << "\n"; + exit(1); + } + if (vm.count("help")) { + cout << desc << "\n"; + exit(1); + } + + auto port = vm["port"].as(); + + return "tcp://127.0.0.1:" + to_string(port); +} + +int process(const std::string &endpoint) { + // 0. connect to slsReceiver + ZmqSocketReceiver receiver(endpoint, ZMQ_SUB); + receiver.connect(); + + // 1. create ventilator + ZmqVentilator ventilator("tcp://*:4321"); + + while (true) { + // 2. receive frame from slsReceiver + ZmqFrame zframe = receiver.receive_zmqframe(); + if (zframe.header.data == 0) + continue; + logger::info("Received frame, frame_number=", zframe.header.frameNumber); + logger::info(zframe.header.to_string()); + + // 3. create task + Task *task = Task::init(zframe.frame.data(), zframe.frame.size()); + task->opcode = (size_t)Task::Operation::PEDESTAL; + task->id = zframe.header.frameNumber; + + // 4. push task to ventilator + ventilator.push(task); + Task::destroy(task); + } +} +int main(int argc, char **argv) { + // read the command line arguments + string endpoint = setup(argc, argv); + int ret = process(endpoint); + return ret; +} \ No newline at end of file diff --git a/examples/zmq_worker.cpp b/examples/zmq_worker.cpp new file mode 100644 index 0000000..685c8e6 --- /dev/null +++ b/examples/zmq_worker.cpp @@ -0,0 +1,35 @@ + +#include "aare/network_io/ZmqSink.hpp" +#include "aare/network_io/ZmqSocketReceiver.hpp" +#include "aare/network_io/ZmqVentilator.hpp" +#include "aare/network_io/ZmqWorker.hpp" + +#include "zmq.h" +#include +#include +#include +#include +using namespace aare; +namespace po = boost::program_options; +using namespace std; + +int main() { + logger::set_verbosity(logger::DEBUG); + // 1. connect to ventilator and sink + ZmqWorker worker("tcp://127.0.0.1:4321", "tcp://127.0.0.1:4322"); + + while (true) { + // 2. receive Task from ventilator + Task *ventilator_task = worker.pull(); + logger::info("Received Task, id=", ventilator_task->id, " data_size=", ventilator_task->data_size); + + Task *sink_task = Task::init(nullptr, 0); + sink_task->id = ventilator_task->id; + sink_task->opcode = (size_t)Task::Operation::COUNT; + worker.push(sink_task); + + Task::destroy(sink_task); + Task::destroy(ventilator_task); + } + // read the command line arguments +} \ No newline at end of file diff --git a/file_io/src/RawFile.cpp b/file_io/src/RawFile.cpp index 8c09da8..a0876e7 100644 --- a/file_io/src/RawFile.cpp +++ b/file_io/src/RawFile.cpp @@ -77,7 +77,7 @@ void RawFile::write_master_file() { aare::write_str(ss, "Geometry", geometry.to_string()); ss += "\n\t"; - uint64_t img_size = (m_cols * m_rows) / (geometry.col * geometry.row); + uint64_t img_size = (m_cols * m_rows) / (static_cast(geometry.col * geometry.row)); img_size *= m_bitdepth; aare::write_digit(ss, "Image Size in bytes", img_size); ss += "\n\t"; @@ -85,7 +85,8 @@ void RawFile::write_master_file() { ss += "\n\t"; aare::write_digit(ss, "Dynamic Range", m_bitdepth); ss += "\n\t"; - const aare::xy pixels = {m_rows / geometry.row, m_cols / geometry.col}; + const aare::xy pixels = {static_cast(m_rows / geometry.row), + static_cast(m_cols / geometry.col)}; aare::write_str(ss, "Pixels", pixels.to_string()); ss += "\n\t"; aare::write_digit(ss, "Number of rows", m_rows); @@ -266,10 +267,8 @@ void RawFile::parse_raw_metadata() { max_frames_per_file = std::stoi(value); } else if (key == "Geometry") { pos = value.find(','); - const size_t x = static_cast(std::stoi(value.substr(1, pos))); - const size_t y = static_cast(std::stoi(value.substr(pos + 1))); - - geometry = {x, y}; + geometry = {static_cast(std::stoi(value.substr(1, pos))), + static_cast(std::stoi(value.substr(pos + 1)))}; } } } diff --git a/include/aare/network_io.hpp b/include/aare/network_io.hpp index fd559f0..be1166a 100644 --- a/include/aare/network_io.hpp +++ b/include/aare/network_io.hpp @@ -1,5 +1,8 @@ #include "aare/network_io/ZmqHeader.hpp" +#include "aare/network_io/ZmqSink.hpp" #include "aare/network_io/ZmqSocket.hpp" #include "aare/network_io/ZmqSocketReceiver.hpp" #include "aare/network_io/ZmqSocketSender.hpp" +#include "aare/network_io/ZmqVentilator.hpp" +#include "aare/network_io/ZmqWorker.hpp" #include "aare/network_io/defs.hpp" \ No newline at end of file diff --git a/network_io/CMakeLists.txt b/network_io/CMakeLists.txt index 7d544dd..12551a1 100644 --- a/network_io/CMakeLists.txt +++ b/network_io/CMakeLists.txt @@ -17,6 +17,10 @@ add_library(network_io STATIC src/ZmqSocketSender.cpp src/ZmqSocket.cpp src/ZmqHeader.cpp + src/ZmqMultiReceiver.cpp + src/ZmqVentilator.cpp + src/ZmqWorker.cpp + src/ZmqSink.cpp ) diff --git a/network_io/include/aare/network_io/ZmqHeader.hpp b/network_io/include/aare/network_io/ZmqHeader.hpp index c0e86ec..fb787fd 100644 --- a/network_io/include/aare/network_io/ZmqHeader.hpp +++ b/network_io/include/aare/network_io/ZmqHeader.hpp @@ -8,29 +8,28 @@ #include #include namespace simdjson { -/** - * @brief cast a simdjson::ondemand::value to a std::array - * useful for writing rx_roi from json header - */ -template <> simdjson_inline simdjson::simdjson_result> simdjson::ondemand::value::get() noexcept { - ondemand::array array; - auto error = get_array().get(array); - if (error) { - return error; - } - std::array arr{}; - int i = 0; - for (auto v : array) { - int64_t val = 0; - error = v.get_int64().get(val); +// template std::array - if (error) { - return error; - } - arr[i++] = static_cast(val); - } - return arr; -} +// template simdjson_inline simdjson::simdjson_result> simdjson::ondemand::value::get() +// noexcept { +// ondemand::array array; +// auto error = get_array().get(array); +// if (error) { +// return error; +// } +// std::array arr{}; +// int i = 0; +// for (auto v : array) { +// int64_t val = 0; +// error = v.get_int64().get(val); + +// if (error) { +// return error; +// } +// arr[i++] = static_cast(val); +// } +// return arr; +// } /** * @brief cast a simdjson::ondemand::value to a uint32_t @@ -81,22 +80,17 @@ simdjson::ondemand::value::get() noexcept { } // namespace simdjson namespace aare { - /** zmq header structure (from slsDetectorPackage)*/ struct ZmqHeader { /** true if incoming data, false if end of acquisition */ bool data{true}; uint32_t jsonversion{0}; - uint32_t dynamicRange{0}; + uint32_t bitmode{0}; uint64_t fileIndex{0}; - /** number of detectors/port in x axis */ - uint32_t ndetx{0}; - /** number of detectors/port in y axis */ - uint32_t ndety{0}; - /** number of pixels/channels in x axis for this zmq socket */ - uint32_t npixelsx{0}; - /** number of pixels/channels in y axis for this zmq socket */ - uint32_t npixelsy{0}; + /** number of detectors/port*/ + t_xy detshape{0, 0}; + /** number of pixels/channels for this zmq socket */ + t_xy shape{0, 0}; /** number of bytes for an image in this socket */ uint32_t size{0}; /** frame number from detector */ @@ -138,5 +132,25 @@ struct ZmqHeader { // compare operator bool operator==(const ZmqHeader &other) const; }; +/** + * @brief cast a simdjson::ondemand::value to a std::array + * useful for writing rx_roi from json header + */ +template std::array simd_convert_array(SIMDJSON_VALUE field) { + simdjson::ondemand::array simd_array; + auto err = field.value().get_array().get(simd_array); + if (err) + throw std::runtime_error("error converting simdjson::ondemand::value to simdjson::ondemend::array"); + std::array arr{}; + int i = 0; + for (auto v : simd_array) { + int64_t tmp; + err = v.get(tmp); + if (err) + throw std::runtime_error("error converting simdjson::ondemand::value"); + arr[i++] = tmp; + } + return arr; +} } // namespace aare \ No newline at end of file diff --git a/network_io/include/aare/network_io/ZmqMultiReceiver.hpp b/network_io/include/aare/network_io/ZmqMultiReceiver.hpp new file mode 100644 index 0000000..cd90bac --- /dev/null +++ b/network_io/include/aare/network_io/ZmqMultiReceiver.hpp @@ -0,0 +1,27 @@ +#pragma once +#include "aare/network_io/ZmqSocketReceiver.hpp" +#include "aare/network_io/defs.hpp" +#include + +#include +#include +using zmq_pollitem_t = struct zmq_pollitem_t; +namespace aare { + +class ZmqMultiReceiver { + public: + explicit ZmqMultiReceiver(const std::vector &endpoints, const xy &geometry = {1, 1}); + int connect(); + ZmqFrame receive_zmqframe(); + std::vector receive_n(); + ~ZmqMultiReceiver(); + + private: + ZmqFrame receive_zmqframe_(std::unordered_map> &frames_map); + xy m_geometry; + std::vector m_endpoints; + std::vector m_receivers; + zmq_pollitem_t *items{}; +}; + +} // namespace aare \ No newline at end of file diff --git a/network_io/include/aare/network_io/ZmqSink.hpp b/network_io/include/aare/network_io/ZmqSink.hpp new file mode 100644 index 0000000..f762e7e --- /dev/null +++ b/network_io/include/aare/network_io/ZmqSink.hpp @@ -0,0 +1,17 @@ +#pragma once + +#include "aare/network_io/ZmqSocketReceiver.hpp" + +namespace aare { + +class ZmqSink { + public: + explicit ZmqSink(const std::string &sink_endpoint); + Task *pull(); + ~ZmqSink(); + + private: + ZmqSocketReceiver *m_receiver; +}; + +} // namespace aare diff --git a/network_io/include/aare/network_io/ZmqSocket.hpp b/network_io/include/aare/network_io/ZmqSocket.hpp index af52ffb..f10bfaf 100644 --- a/network_io/include/aare/network_io/ZmqSocket.hpp +++ b/network_io/include/aare/network_io/ZmqSocket.hpp @@ -25,10 +25,12 @@ class ZmqSocket { void set_zmq_hwm(int hwm); void set_timeout_ms(int n); void set_potential_frame_size(size_t size); + void *get_socket(); protected: void *m_context{nullptr}; void *m_socket{nullptr}; + int m_socket_type{}; std::string m_endpoint; int m_zmq_hwm{1000}; int m_timeout_ms{1000}; diff --git a/network_io/include/aare/network_io/ZmqSocketReceiver.hpp b/network_io/include/aare/network_io/ZmqSocketReceiver.hpp index 3166f23..e708acb 100644 --- a/network_io/include/aare/network_io/ZmqSocketReceiver.hpp +++ b/network_io/include/aare/network_io/ZmqSocketReceiver.hpp @@ -19,14 +19,14 @@ namespace aare { */ class ZmqSocketReceiver : public ZmqSocket { public: - explicit ZmqSocketReceiver(const std::string &endpoint); + explicit ZmqSocketReceiver(const std::string &endpoint, int socket_type = 2 /* ZMQ_SUB */); void connect(); + void bind(); std::vector receive_n(); - private: - int receive_data(std::byte *data, size_t size); ZmqFrame receive_zmqframe(); ZmqHeader receive_header(); + int receive_data(std::byte *data, size_t size); }; } // namespace aare \ No newline at end of file diff --git a/network_io/include/aare/network_io/ZmqSocketSender.hpp b/network_io/include/aare/network_io/ZmqSocketSender.hpp index ab37536..ad1dd28 100644 --- a/network_io/include/aare/network_io/ZmqSocketSender.hpp +++ b/network_io/include/aare/network_io/ZmqSocketSender.hpp @@ -12,9 +12,11 @@ namespace aare { */ class ZmqSocketSender : public ZmqSocket { public: - explicit ZmqSocketSender(const std::string &endpoint); + explicit ZmqSocketSender(const std::string &endpoint, int socket_type = 1 /* ZMQ_PUB */); + void connect(); void bind(); - size_t send(const ZmqHeader &header, const std::byte *data, size_t size); + size_t send(const void *data, size_t size); + size_t send(const ZmqHeader &header, const void *data, size_t size); size_t send(const ZmqFrame &zmq_frame); size_t send(const std::vector &zmq_frames); }; diff --git a/network_io/include/aare/network_io/ZmqVentilator.hpp b/network_io/include/aare/network_io/ZmqVentilator.hpp new file mode 100644 index 0000000..f82bfe2 --- /dev/null +++ b/network_io/include/aare/network_io/ZmqVentilator.hpp @@ -0,0 +1,20 @@ + +#pragma once +#include "aare/core/Frame.hpp" +#include "aare/network_io/ZmqHeader.hpp" +#include "aare/network_io/ZmqSocket.hpp" +#include "aare/network_io/ZmqSocketSender.hpp" +#include "aare/network_io/defs.hpp" + +namespace aare { + +class ZmqVentilator { + public: + explicit ZmqVentilator(const std::string &endpoint); + size_t push(const Task *task); + ~ZmqVentilator(); + + private: + ZmqSocketSender *m_sender; +}; +} // namespace aare \ No newline at end of file diff --git a/network_io/include/aare/network_io/ZmqWorker.hpp b/network_io/include/aare/network_io/ZmqWorker.hpp new file mode 100644 index 0000000..5717c12 --- /dev/null +++ b/network_io/include/aare/network_io/ZmqWorker.hpp @@ -0,0 +1,18 @@ +#include "aare/network_io/ZmqSocketReceiver.hpp" +#include "aare/network_io/ZmqSocketSender.hpp" + +namespace aare { + +class ZmqWorker { + public: + explicit ZmqWorker(const std::string &ventilator_endpoint, const std::string &sink_endpoint = ""); + Task *pull(); + size_t push(const Task *task); + ~ZmqWorker(); + + private: + ZmqSocketReceiver *m_receiver; + ZmqSocketSender *m_sender; +}; + +} // namespace aare \ No newline at end of file diff --git a/network_io/include/aare/network_io/defs.hpp b/network_io/include/aare/network_io/defs.hpp index c1e4d9d..c7efa30 100644 --- a/network_io/include/aare/network_io/defs.hpp +++ b/network_io/include/aare/network_io/defs.hpp @@ -14,8 +14,52 @@ namespace aare { struct ZmqFrame { ZmqHeader header; Frame frame; + std::string to_string() const { + return "ZmqFrame{header: " + header.to_string() + ", frame:\nrows: " + std::to_string(frame.rows()) + + ", cols: " + std::to_string(frame.cols()) + ", bitdepth: " + std::to_string(frame.bitdepth()) + "\n}"; + } + size_t size() const { return frame.size() + header.size; } }; +struct Task { + + size_t id{}; + int opcode{}; // operation to perform on the data (what type should this be? char*? enum?) + size_t data_size{}; +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wpedantic" + std::byte payload[]; +#pragma GCC diagnostic pop + + static const size_t MAX_DATA_SIZE = 1024 * 1024; // 1MB + size_t size() const { return sizeof(Task) + data_size; } + static Task *init(std::byte *data, size_t data_size) { + Task *task = (Task *)new std::byte[sizeof(Task) + data_size]; + task->data_size = data_size; + if (data_size > 0) + memcpy(task->payload, data, data_size); + return task; + } + static int destroy(Task *task) { + delete[] task; + return 0; + } + + Task() = delete; + Task(Task &) = delete; + Task(Task &&) = default; + + // common operations to perform + // users can still send custom operations + enum class Operation { + PEDESTAL, + PEDESTAL_AND_SAVE, + PEDESTAL_AND_CLUSTER, + PEDESTAL_AND_CLUSTER_AND_SAVE, + COUNT, + }; +} __attribute__((packed)); + namespace network_io { /** * @brief NetworkError exception class diff --git a/network_io/src/ZmqHeader.cpp b/network_io/src/ZmqHeader.cpp index 8d8c889..015bb8c 100644 --- a/network_io/src/ZmqHeader.cpp +++ b/network_io/src/ZmqHeader.cpp @@ -12,12 +12,10 @@ std::string ZmqHeader::to_string() const { s += "{"; write_digit(s, "data", data ? 1 : 0); write_digit(s, "jsonversion", jsonversion); - write_digit(s, "dynamicRange", dynamicRange); + write_digit(s, "bitmode", bitmode); write_digit(s, "fileIndex", fileIndex); - write_digit(s, "ndetx", ndetx); - write_digit(s, "ndety", ndety); - write_digit(s, "npixelsx", npixelsx); - write_digit(s, "npixelsy", npixelsy); + write_array(s, "detshape", std::array{detshape.row, detshape.col}); + write_array(s, "shape", std::array{shape.row, shape.col}); write_digit(s, "size", size); write_digit(s, "acqIndex", acqIndex); write_digit(s, "frameIndex", frameIndex); @@ -40,7 +38,7 @@ std::string ZmqHeader::to_string() const { write_digit(s, "quad", quad); write_digit(s, "completeImage", completeImage ? 1 : 0); write_map(s, "addJsonHeader", addJsonHeader); - write_array(s, "rx_roi", rx_roi); + write_array(s, "rx_roi", rx_roi); // remove last comma s.pop_back(); s.pop_back(); @@ -63,18 +61,18 @@ void ZmqHeader::from_string(std::string &s) { // NOLINT data = static_cast(field.value()) != 0; } else if (key == "jsonversion") { jsonversion = static_cast(field.value()); - } else if (key == "dynamicRange") { - dynamicRange = static_cast(field.value()); + } else if (key == "bitmode") { + bitmode = static_cast(field.value()); } else if (key == "fileIndex") { fileIndex = static_cast(field.value()); - } else if (key == "ndetx") { - ndetx = static_cast(field.value()); - } else if (key == "ndety") { - ndety = static_cast(field.value()); - } else if (key == "npixelsx") { - npixelsx = static_cast(field.value()); - } else if (key == "npixelsy") { - npixelsy = static_cast(field.value()); + } else if (key == "detshape") { + std::array arr = simd_convert_array(field); + detshape.row = arr[0]; + detshape.col = arr[1]; + } else if (key == "shape") { + std::array arr = simd_convert_array(field); + shape.row = arr[0]; + shape.col = arr[1]; } else if (key == "size") { size = static_cast(field.value()); } else if (key == "acqIndex") { @@ -121,21 +119,20 @@ void ZmqHeader::from_string(std::string &s) { // NOLINT } else if (key == "addJsonHeader") { addJsonHeader = static_cast>(field.value()); } else if (key == "rx_roi") { - rx_roi = static_cast>(field.value()); + rx_roi = simd_convert_array(field); } } } bool ZmqHeader::operator==(const ZmqHeader &other) const { - return data == other.data && jsonversion == other.jsonversion && dynamicRange == other.dynamicRange && - fileIndex == other.fileIndex && ndetx == other.ndetx && ndety == other.ndety && npixelsx == other.npixelsx && - npixelsy == other.npixelsy && size == other.size && acqIndex == other.acqIndex && - frameIndex == other.frameIndex && progress == other.progress && fname == other.fname && - frameNumber == other.frameNumber && expLength == other.expLength && packetNumber == other.packetNumber && - detSpec1 == other.detSpec1 && timestamp == other.timestamp && modId == other.modId && row == other.row && - column == other.column && detSpec2 == other.detSpec2 && detSpec3 == other.detSpec3 && - detSpec4 == other.detSpec4 && detType == other.detType && version == other.version && - flipRows == other.flipRows && quad == other.quad && completeImage == other.completeImage && - addJsonHeader == other.addJsonHeader && rx_roi == other.rx_roi; + return data == other.data && jsonversion == other.jsonversion && bitmode == other.bitmode && + fileIndex == other.fileIndex && detshape == other.detshape && shape == other.shape && size == other.size && + acqIndex == other.acqIndex && frameIndex == other.frameIndex && progress == other.progress && + fname == other.fname && frameNumber == other.frameNumber && expLength == other.expLength && + packetNumber == other.packetNumber && detSpec1 == other.detSpec1 && timestamp == other.timestamp && + modId == other.modId && row == other.row && column == other.column && detSpec2 == other.detSpec2 && + detSpec3 == other.detSpec3 && detSpec4 == other.detSpec4 && detType == other.detType && + version == other.version && flipRows == other.flipRows && quad == other.quad && + completeImage == other.completeImage && addJsonHeader == other.addJsonHeader && rx_roi == other.rx_roi; } } // namespace aare \ No newline at end of file diff --git a/network_io/src/ZmqMultiReceiver.cpp b/network_io/src/ZmqMultiReceiver.cpp new file mode 100644 index 0000000..5910d76 --- /dev/null +++ b/network_io/src/ZmqMultiReceiver.cpp @@ -0,0 +1,101 @@ +#include "aare/network_io/ZmqMultiReceiver.hpp" +#include "aare/utils/merge_frames.hpp" +#include +#include +namespace aare { +ZmqMultiReceiver::ZmqMultiReceiver(const std::vector &endpoints, const xy &geometry) + : m_geometry(geometry), m_endpoints(endpoints) { + assert(m_geometry.row * m_geometry.col == static_cast(m_endpoints.size())); + for (const auto &endpoint : m_endpoints) { + m_receivers.push_back(new ZmqSocketReceiver(endpoint)); + } +} + +int ZmqMultiReceiver::connect() { + for (auto *receiver : m_receivers) { + receiver->connect(); + } + items = new zmq_pollitem_t[m_receivers.size()]; + for (size_t i = 0; i < m_receivers.size(); i++) { + items[i] = {m_receivers[i]->get_socket(), 0, ZMQ_POLLIN, 0}; + } + return 0; +} +ZmqFrame ZmqMultiReceiver::receive_zmqframe() { + std::unordered_map> frames_map; + return receive_zmqframe_(frames_map); +} +std::vector ZmqMultiReceiver::receive_n() { + std::vector frames; + std::unordered_map> frames_map; + while (true) { + // receive header and frame + ZmqFrame const zmq_frame = receive_zmqframe_(frames_map); + if (!zmq_frame.header.data) { + break; + } + frames.push_back(zmq_frame); + frames_map.erase(zmq_frame.header.frameNumber); + } + return frames; +} +ZmqFrame ZmqMultiReceiver::receive_zmqframe_(std::unordered_map> &frames_map) { + // iterator to store the frame to return + std::unordered_map>::iterator ret_frames; + bool exit_loop = false; + + while (true) { + zmq_poll(items, static_cast(m_receivers.size()), -1); + aare::logger::debug("Received frame"); + for (size_t i = 0; i < m_receivers.size() && !exit_loop; i++) { + if (items[i].revents & ZMQ_POLLIN) { + auto new_frame = m_receivers[i]->receive_zmqframe(); + if (frames_map.find(new_frame.header.frameNumber) == frames_map.end()) { + frames_map[new_frame.header.frameNumber] = {}; + } + + ret_frames = frames_map.find(new_frame.header.frameNumber); + ret_frames->second.push_back(new_frame); + + exit_loop = ret_frames->second.size() == m_receivers.size(); + } + } + if (exit_loop) { + break; + } + } + std::vector &frames = ret_frames->second; + if (!frames[0].header.data) { + return ZmqFrame{frames[0].header, Frame(0, 0, 0)}; + } + // check that all frames have the same shape + auto shape = frames[0].header.shape; + auto bitdepth = frames[0].header.bitmode; + auto part_size = shape.row * shape.col * (bitdepth / 8); + for (auto &frame : frames) { + assert(shape == frame.header.shape); + assert(bitdepth == frame.header.bitmode); + // TODO: find solution for numinterfaces=2 + assert(m_geometry == frame.header.detshape); + assert(part_size == frame.header.size); + } + // merge frames + // prepare the input for merge_frames + std::vector part_buffers; + part_buffers.reserve(frames.size()); + for (auto &zmq_frame : frames) { + part_buffers.push_back(zmq_frame.frame.data()); + } + Frame const f(shape.row, shape.col, bitdepth); + merge_frames(part_buffers, part_size, f.data(), m_geometry, shape.row, shape.col, bitdepth); + ZmqFrame zmq_frame = {std::move(frames[0].header), f}; + return zmq_frame; +} + +ZmqMultiReceiver::~ZmqMultiReceiver() { + delete[] items; + for (auto *receiver : m_receivers) { + delete receiver; + } +} +} // namespace aare diff --git a/network_io/src/ZmqSink.cpp b/network_io/src/ZmqSink.cpp new file mode 100644 index 0000000..fc8fd26 --- /dev/null +++ b/network_io/src/ZmqSink.cpp @@ -0,0 +1,19 @@ + +#include "aare/network_io/ZmqSink.hpp" +#include "zmq.h" + +namespace aare { + +ZmqSink::ZmqSink(const std::string &sink_endpoint) : m_receiver(new ZmqSocketReceiver(sink_endpoint, ZMQ_PULL)) { + m_receiver->bind(); +}; + +Task *ZmqSink::pull() { + Task *task = reinterpret_cast(new std::byte[Task::MAX_DATA_SIZE + sizeof(Task)]); + m_receiver->receive_data(reinterpret_cast(task), Task::MAX_DATA_SIZE); + return task; +}; + +ZmqSink::~ZmqSink() { delete m_receiver; }; + +} // namespace aare \ No newline at end of file diff --git a/network_io/src/ZmqSocket.cpp b/network_io/src/ZmqSocket.cpp index e3d6b3f..c78f8ec 100644 --- a/network_io/src/ZmqSocket.cpp +++ b/network_io/src/ZmqSocket.cpp @@ -25,6 +25,12 @@ ZmqSocket::~ZmqSocket() { delete[] m_header_buffer; } +/** + * @brief get the socket + * @return void* + */ +void *ZmqSocket::get_socket() { return m_socket; } + void ZmqSocket::set_zmq_hwm(int hwm) { m_zmq_hwm = hwm; } void ZmqSocket::set_timeout_ms(int n) { m_timeout_ms = n; } diff --git a/network_io/src/ZmqSocketReceiver.cpp b/network_io/src/ZmqSocketReceiver.cpp index c824016..b9ed4fb 100644 --- a/network_io/src/ZmqSocketReceiver.cpp +++ b/network_io/src/ZmqSocketReceiver.cpp @@ -9,8 +9,9 @@ namespace aare { /** * @brief Construct a new ZmqSocketReceiver object */ -ZmqSocketReceiver::ZmqSocketReceiver(const std::string &endpoint) { - m_endpoint = endpoint; +ZmqSocketReceiver::ZmqSocketReceiver(const std::string &endpoint, int socket_type) { + m_endpoint = (endpoint); + m_socket_type = (socket_type); memset(m_header_buffer, 0, m_max_header_size); } @@ -20,22 +21,33 @@ ZmqSocketReceiver::ZmqSocketReceiver(const std::string &endpoint) { */ void ZmqSocketReceiver::connect() { m_context = zmq_ctx_new(); - m_socket = zmq_socket(m_context, ZMQ_SUB); + m_socket = zmq_socket(m_context, m_socket_type); fmt::print("Setting ZMQ_RCVHWM to {}\n", m_zmq_hwm); int rc = zmq_setsockopt(m_socket, ZMQ_RCVHWM, &m_zmq_hwm, sizeof(m_zmq_hwm)); // should be set before connect if (rc) throw network_io::NetworkError(fmt::format("Could not set ZMQ_RCVHWM: {}", zmq_strerror(errno))); - size_t bufsize = m_potential_frame_size * m_zmq_hwm; + int bufsize = static_cast(m_potential_frame_size) * m_zmq_hwm; fmt::print("Setting ZMQ_RCVBUF to: {} MB\n", bufsize / (static_cast(1024) * 1024)); rc = zmq_setsockopt(m_socket, ZMQ_RCVBUF, &bufsize, sizeof(bufsize)); - if (rc) + if (rc) { + perror("zmq_setsockopt"); throw network_io::NetworkError(fmt::format("Could not set ZMQ_RCVBUF: {}", zmq_strerror(errno))); - + } zmq_connect(m_socket, m_endpoint.c_str()); zmq_setsockopt(m_socket, ZMQ_SUBSCRIBE, "", 0); } +void ZmqSocketReceiver::bind() { + m_context = zmq_ctx_new(); + m_socket = zmq_socket(m_context, m_socket_type); + size_t const rc = zmq_bind(m_socket, m_endpoint.c_str()); + if (rc != 0) { + std::string const error = zmq_strerror(zmq_errno()); + throw network_io::NetworkError("zmq_bind failed: " + error); + } +} + /** * @brief receive a ZmqHeader * @return ZmqHeader @@ -43,15 +55,13 @@ void ZmqSocketReceiver::connect() { ZmqHeader ZmqSocketReceiver::receive_header() { // receive string ZmqHeader - aare::logger::debug("Receiving header"); int const header_bytes_received = zmq_recv(m_socket, m_header_buffer, m_max_header_size, 0); - aare::logger::debug("Bytes: ", header_bytes_received); + aare::logger::debug("Header: ", m_header_buffer); m_header_buffer[header_bytes_received] = '\0'; // make sure we zero terminate if (header_bytes_received < 0) { throw network_io::NetworkError(LOCATION + "Error receiving header"); } - aare::logger::debug("Bytes: ", header_bytes_received, ", Header: ", m_header_buffer); // parse header ZmqHeader header; @@ -72,9 +82,12 @@ ZmqHeader ZmqSocketReceiver::receive_header() { */ int ZmqSocketReceiver::receive_data(std::byte *data, size_t size) { int const data_bytes_received = zmq_recv(m_socket, data, size, 0); - if (data_bytes_received == -1) - throw network_io::NetworkError("Got half of a multipart msg!!!"); - aare::logger::debug("Bytes: ", data_bytes_received); + if (data_bytes_received == -1) { + logger::error(zmq_strerror(zmq_errno())); + // TODO: refactor this error message + throw network_io::NetworkError(LOCATION + "Error receiving data"); + } + // aare::logger::debug("Bytes: ", data_bytes_received); return data_bytes_received; } @@ -93,7 +106,13 @@ ZmqFrame ZmqSocketReceiver::receive_zmqframe() { } // receive frame data - Frame frame(header.npixelsx, header.npixelsy, header.dynamicRange); + if (header.shape == t_xy{0, 0} || header.bitmode == 0) { + logger::warn("Invalid header"); + } + if (header.bitmode == 0) { + header.bitmode = 16; + } + Frame frame(header.shape.row, header.shape.col, header.bitmode); int bytes_received = receive_data(frame.data(), frame.size()); if (bytes_received == -1) { throw network_io::NetworkError(LOCATION + "Error receiving frame"); diff --git a/network_io/src/ZmqSocketSender.cpp b/network_io/src/ZmqSocketSender.cpp index dbe47e5..7087b35 100644 --- a/network_io/src/ZmqSocketSender.cpp +++ b/network_io/src/ZmqSocketSender.cpp @@ -8,14 +8,17 @@ namespace aare { * Constructor * @param endpoint ZMQ endpoint */ -ZmqSocketSender::ZmqSocketSender(const std::string &endpoint) { m_endpoint = endpoint; } +ZmqSocketSender::ZmqSocketSender(const std::string &endpoint, int socket_type) { + m_socket_type = socket_type; + m_endpoint = endpoint; +} /** * bind to the given port */ void ZmqSocketSender::bind() { m_context = zmq_ctx_new(); - m_socket = zmq_socket(m_context, ZMQ_PUB); + m_socket = zmq_socket(m_context, m_socket_type); size_t const rc = zmq_bind(m_socket, m_endpoint.c_str()); if (rc != 0) { std::string const error = zmq_strerror(zmq_errno()); @@ -23,6 +26,31 @@ void ZmqSocketSender::bind() { } } +void ZmqSocketSender::connect() { + m_context = zmq_ctx_new(); + m_socket = zmq_socket(m_context, m_socket_type); + fmt::print("Setting ZMQ_RCVHWM to {}\n", m_zmq_hwm); + int rc = zmq_setsockopt(m_socket, ZMQ_RCVHWM, &m_zmq_hwm, sizeof(m_zmq_hwm)); // should be set before connect + if (rc) + throw network_io::NetworkError(fmt::format("Could not set ZMQ_RCVHWM: {}", zmq_strerror(errno))); + + int bufsize = static_cast(m_potential_frame_size) * m_zmq_hwm; + fmt::print("Setting ZMQ_RCVBUF to: {} MB\n", bufsize / (static_cast(1024) * 1024)); + rc = zmq_setsockopt(m_socket, ZMQ_RCVBUF, &bufsize, sizeof(bufsize)); + if (rc) { + perror("zmq_setsockopt"); + throw network_io::NetworkError(fmt::format("Could not set ZMQ_RCVBUF: {}", zmq_strerror(errno))); + } + zmq_connect(m_socket, m_endpoint.c_str()); + zmq_setsockopt(m_socket, ZMQ_SUBSCRIBE, "", 0); +} + +size_t ZmqSocketSender::send(const void *data, size_t size) { + size_t const rc2 = zmq_send(m_socket, data, size, 0); + assert(rc2 == size); + return rc2; +} + /** * send a header and data * @param header @@ -30,7 +58,7 @@ void ZmqSocketSender::bind() { * @param size size of data * @return number of bytes sent */ -size_t ZmqSocketSender::send(const ZmqHeader &header, const std::byte *data, size_t size) { +size_t ZmqSocketSender::send(const ZmqHeader &header, const void *data, size_t size) { size_t rc = 0; // if (serialize_header) { // rc = zmq_send(m_socket, &header, sizeof(ZmqHeader), ZMQ_SNDMORE); diff --git a/network_io/src/ZmqVentilator.cpp b/network_io/src/ZmqVentilator.cpp new file mode 100644 index 0000000..5aac1d1 --- /dev/null +++ b/network_io/src/ZmqVentilator.cpp @@ -0,0 +1,21 @@ +#include "aare/network_io/ZmqVentilator.hpp" +#include +#include + +namespace aare { + +ZmqVentilator::ZmqVentilator(const std::string &endpoint) : m_sender(new ZmqSocketSender(endpoint, ZMQ_PUSH)) { + m_sender->bind(); +} + +size_t ZmqVentilator::push(const Task *task) { + if (task->data_size > Task::MAX_DATA_SIZE) { + throw network_io::NetworkError("Data size exceeds maximum allowed size"); + } + logger::debug("Pushing workers"); + return m_sender->send(task, task->size()); +} + +ZmqVentilator::~ZmqVentilator() { delete m_sender; } + +} // namespace aare \ No newline at end of file diff --git a/network_io/src/ZmqWorker.cpp b/network_io/src/ZmqWorker.cpp new file mode 100644 index 0000000..a5b9a1d --- /dev/null +++ b/network_io/src/ZmqWorker.cpp @@ -0,0 +1,39 @@ +#include "aare/network_io/ZmqWorker.hpp" +#include "aare/network_io/ZmqSocketReceiver.hpp" +#include "aare/network_io/ZmqSocketSender.hpp" +#include "aare/network_io/defs.hpp" +#include "zmq.h" +namespace aare { + +ZmqWorker::ZmqWorker(const std::string &ventilator_endpoint, const std::string &sink_endpoint) + : m_receiver(new ZmqSocketReceiver(ventilator_endpoint, ZMQ_PULL)), m_sender(nullptr) { + m_receiver->connect(); + if (not sink_endpoint.empty()) { + m_sender = new ZmqSocketSender(sink_endpoint, ZMQ_PUSH); + m_sender->connect(); + } +} + +Task *ZmqWorker::pull() { + Task *task = reinterpret_cast(new std::byte[Task::MAX_DATA_SIZE + sizeof(Task)]); + m_receiver->receive_data(reinterpret_cast(task), Task::MAX_DATA_SIZE); + logger::debug("Received task", task->id, task->data_size); + return task; +} + +size_t ZmqWorker::push(const Task *task) { + if (m_sender == nullptr) { + throw network_io::NetworkError("Worker not connected to sink: did you provide a sink endpoint?"); + } + if (task->data_size > Task::MAX_DATA_SIZE) { + throw network_io::NetworkError("Data size exceeds maximum allowed size"); + } + return m_sender->send(task, task->size()); +} + +ZmqWorker::~ZmqWorker() { + delete m_receiver; + delete m_sender; +} + +} // namespace aare \ No newline at end of file diff --git a/network_io/test/ZmqHeader.test.cpp b/network_io/test/ZmqHeader.test.cpp index de5b8b9..a2a5deb 100644 --- a/network_io/test/ZmqHeader.test.cpp +++ b/network_io/test/ZmqHeader.test.cpp @@ -5,14 +5,12 @@ using namespace aare; TEST_CASE("Test ZmqHeader") { ZmqHeader header; - header.npixelsx = 10; - header.npixelsy = 15; + header.shape = {10, 15}; header.data = 1; - header.jsonversion = 2; - header.dynamicRange = 32; + header.jsonversion = 5; + header.bitmode = 32; header.fileIndex = 4; - header.ndetx = 5; - header.ndety = 6; + header.detshape = {5, 6}; header.size = 4800; header.acqIndex = 8; header.frameIndex = 9; @@ -39,13 +37,11 @@ TEST_CASE("Test ZmqHeader") { std::string json_header = "{" "\"data\": 1, " - "\"jsonversion\": 2, " - "\"dynamicRange\": 32, " + "\"jsonversion\": 5, " + "\"bitmode\": 32, " "\"fileIndex\": 4, " - "\"ndetx\": 5, " - "\"ndety\": 6, " - "\"npixelsx\": 10, " - "\"npixelsy\": 15, " + "\"detshape\": [5, 6], " + "\"shape\": [10, 15], " "\"size\": 4800, " "\"acqIndex\": 8, " "\"frameIndex\": 9, " diff --git a/utils/CMakeLists.txt b/utils/CMakeLists.txt index 401f0a8..83d84d9 100644 --- a/utils/CMakeLists.txt +++ b/utils/CMakeLists.txt @@ -1,5 +1,14 @@ add_library(utils STATIC src/logger.cpp) target_include_directories(utils PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include) +target_link_libraries(utils PUBLIC core) + +if(AARE_TESTS) + set(TestSources + ${CMAKE_CURRENT_SOURCE_DIR}/test/merge_frames.test.cpp + ) + target_sources(tests PRIVATE ${TestSources} ) + target_link_libraries(tests PRIVATE utils) +endif() \ No newline at end of file diff --git a/utils/include/aare/utils/json.hpp b/utils/include/aare/utils/json.hpp index ab679e5..9d2943b 100644 --- a/utils/include/aare/utils/json.hpp +++ b/utils/include/aare/utils/json.hpp @@ -50,17 +50,18 @@ inline void write_map(std::string &s, const std::string &key, const std::map &value) { + +template void write_array(std::string &s, const std::string &key, const std::array &value) { s += "\""; s += key; s += "\": ["; - s += std::to_string(value[0]); - s += ", "; - s += std::to_string(value[1]); - s += ", "; - s += std::to_string(value[2]); - s += ", "; - s += std::to_string(value[3]); + + for (size_t i = 0; i < N - 1; i++) { + s += std::to_string(value[i]); + s += ", "; + } + s += std::to_string(value[N - 1]); + s += "], "; } diff --git a/utils/include/aare/utils/merge_frames.hpp b/utils/include/aare/utils/merge_frames.hpp new file mode 100644 index 0000000..643adb8 --- /dev/null +++ b/utils/include/aare/utils/merge_frames.hpp @@ -0,0 +1,41 @@ +#pragma once +#include "aare/core/Frame.hpp" +#include "aare/core/defs.hpp" +#include +#include +#include + +namespace aare { +void merge_frames(std::vector &part_buffers, size_t part_size, std::byte *merged_frame, const xy &geometry, + size_t rows = 0, size_t cols = 0, size_t bitdepth = 0) { + + assert(part_buffers.size() == geometry.row * geometry.col); + + if (geometry.col == 1) { + // get the part from each subfile and copy it to the frame + size_t part_idx = 0; + for (auto part_buffer : part_buffers) { + memcpy(merged_frame + part_idx * part_size, part_buffer, part_size); + part_idx++; + } + + } else { + std::cout << "cols: " << cols << " rows: " << rows << " bitdepth: " << bitdepth << std::endl; + assert(cols != 0 && rows != 0 && bitdepth != 0); + size_t part_rows = rows / geometry.row; + size_t part_cols = cols / geometry.col; + // create a buffer that will hold a the frame part + size_t part_idx = 0; + for (auto part_buffer : part_buffers) { + for (size_t cur_row = 0; cur_row < (part_rows); cur_row++) { + auto irow = cur_row + (part_idx / geometry.col) * part_rows; + auto icol = (part_idx % geometry.col) * part_cols; + auto dest = (irow * cols + icol); + dest = dest * bitdepth / 8; + memcpy(merged_frame + dest, part_buffer + cur_row * part_cols * bitdepth / 8, part_cols * bitdepth / 8); + } + part_idx++; + } + } +} +} // namespace aare \ No newline at end of file diff --git a/utils/test/merge_frames.test.cpp b/utils/test/merge_frames.test.cpp new file mode 100644 index 0000000..c692b40 --- /dev/null +++ b/utils/test/merge_frames.test.cpp @@ -0,0 +1,38 @@ +#include "aare/utils/merge_frames.hpp" +#include + +using namespace aare; +// void merge_frames(std::vector &part_buffers, size_t part_size, std::byte *merged_frame, const xy +// &geometry, +// size_t cols = 0, size_t rows = 0, size_t bitdepth = 0) { +TEST_CASE("merge frames {2,1}") { + xy geo = {2, 1}; + std::vector p1 = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; + std::vector p2 = {10, 11, 12, 13, 14, 15, 16, 17, 18, 19}; + size_t part_size = p1.size() * sizeof(uint32_t); + Frame f(10, 2, 32); + std::vector part_buffers = {reinterpret_cast(p1.data()), + reinterpret_cast(p2.data())}; + merge_frames(part_buffers, part_size, f.data(), geo); + + auto v = f.view(); + for (ssize_t i = 0; i < v.size(); i++) { + REQUIRE(v[i] == i); + } +} +TEST_CASE("merge frames {1,2}") { + xy geo = {1, 2}; + std::vector p1 = {0, 1, 2, 3, 4, 10, 11, 12, 13, 14}; + std::vector p2 = {5, 6, 7, 8, 9, 15, 16, 17, 18, 19}; + size_t part_size = p1.size() * sizeof(uint32_t); + Frame f(2, 10, 32); + std::vector part_buffers = {reinterpret_cast(p1.data()), + reinterpret_cast(p2.data())}; + merge_frames(part_buffers, part_size, f.data(), geo, 2, 10, 32); + + auto v = f.view(); + + for (ssize_t i = 0; i < v.size(); i++) { + REQUIRE(v[i] == i); + } +}