diff --git a/.github/workflows/common-workflow.yml b/.github/workflows/common-workflow.yml index 701fcaa..d0b0811 100644 --- a/.github/workflows/common-workflow.yml +++ b/.github/workflows/common-workflow.yml @@ -45,7 +45,7 @@ jobs: pwd export PROJECT_ROOT_DIR="." ls build/examples/*_example - find build/examples -name "*_example" -not -name "zmq_example" | xargs -I {} -n 1 -t bash -c {} + find build/examples -name "*_example" -not -name "zmq_*" | xargs -I {} -n 1 -t bash -c {} diff --git a/CMakeLists.txt b/CMakeLists.txt index d330d16..14ed946 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -148,11 +148,12 @@ endif() add_subdirectory(core) add_subdirectory(file_io) add_subdirectory(utils) +add_subdirectory(network_io) #Overall target to link to when using the library add_library(aare INTERFACE) -target_link_libraries(aare INTERFACE core file_io utils) +target_link_libraries(aare INTERFACE core file_io utils network_io) target_include_directories(aare INTERFACE $ $ diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index 0a08c84..460e0f5 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -4,7 +4,6 @@ set(SourceFiles ${CMAKE_CURRENT_SOURCE_DIR}/src/defs.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/DType.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/Frame.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/src/ZmqSocket.cpp ) @@ -12,7 +11,7 @@ add_library(core STATIC ${SourceFiles}) target_include_directories(core PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include) -target_link_libraries(core PUBLIC fmt::fmt PRIVATE aare_compiler_flags utils libzmq) +target_link_libraries(core PUBLIC fmt::fmt PRIVATE aare_compiler_flags utils ) if (AARE_PYTHON_BINDINGS) set_property(TARGET core PROPERTY POSITION_INDEPENDENT_CODE ON) diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 2d8bb9c..dfe6150 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -1,6 +1,6 @@ -set(EXAMPLE_LIST "json_example;logger_example;numpy_read_example;multiport_example;raw_example;zmq_example;") -set(EXAMPLE_LIST "${EXAMPLE_LIST};mythen_example;numpy_write_example") +set(EXAMPLE_LIST "json_example;logger_example;numpy_read_example;multiport_example;raw_example;") +set(EXAMPLE_LIST "${EXAMPLE_LIST};mythen_example;numpy_write_example;zmq_receiver_example;zmq_sender_example;") foreach(example ${EXAMPLE_LIST}) add_executable(${example} ${example}.cpp) target_link_libraries(${example} PUBLIC aare PRIVATE aare_compiler_flags) diff --git a/examples/zmq_example.cpp b/examples/zmq_receiver_example.cpp similarity index 80% rename from examples/zmq_example.cpp rename to examples/zmq_receiver_example.cpp index 05747e6..08e3c11 100644 --- a/examples/zmq_example.cpp +++ b/examples/zmq_receiver_example.cpp @@ -1,10 +1,10 @@ -#include "aare/ZmqSocket.hpp" +#include "aare/ZmqSocketReceiver.hpp" #include #include int main() { std::string endpoint = "tcp://localhost:5555"; - aare::ZmqSocket socket(endpoint); + aare::ZmqSocketReceiver socket(endpoint); socket.connect(); char *data = new char[1024 * 1024 * 10]; aare::zmqHeader header; diff --git a/examples/zmq_sender_example.cpp b/examples/zmq_sender_example.cpp new file mode 100644 index 0000000..08e3c11 --- /dev/null +++ b/examples/zmq_sender_example.cpp @@ -0,0 +1,16 @@ +#include "aare/ZmqSocketReceiver.hpp" +#include +#include + +int main() { + std::string endpoint = "tcp://localhost:5555"; + aare::ZmqSocketReceiver socket(endpoint); + socket.connect(); + char *data = new char[1024 * 1024 * 10]; + aare::zmqHeader header; + while (true) { + int rc = socket.receive(header, reinterpret_cast(data)); + } + delete[] data; + return 0; +} \ No newline at end of file diff --git a/network_io/CMakeLists.txt b/network_io/CMakeLists.txt new file mode 100644 index 0000000..ab6bcfa --- /dev/null +++ b/network_io/CMakeLists.txt @@ -0,0 +1,15 @@ +add_library(network_io STATIC src/ZmqSocketReceiver.cpp) +target_include_directories(network_io PUBLIC include) +target_link_libraries(network_io PRIVATE libzmq fmt::fmt core utils aare_compiler_flags) + +if(AARE_PYTHON_BINDINGS) +set_property(TARGET file_io PROPERTY POSITION_INDEPENDENT_CODE ON) +endif() + +# if(AARE_TESTS) +# set(TestSources +# ${CMAKE_CURRENT_SOURCE_DIR}/test/NumpyFile.test.cpp +# ) +# target_sources(tests PRIVATE ${TestSources} ) +# target_link_libraries(tests PRIVATE core network_io) +# endif() \ No newline at end of file diff --git a/core/include/aare/ZmqSocket.hpp b/network_io/include/aare/ZmqSocketReceiver.hpp similarity index 89% rename from core/include/aare/ZmqSocket.hpp rename to network_io/include/aare/ZmqSocketReceiver.hpp index 67859df..f4eaf88 100644 --- a/core/include/aare/ZmqSocket.hpp +++ b/network_io/include/aare/ZmqSocketReceiver.hpp @@ -64,7 +64,7 @@ struct zmqHeader { std::array rx_roi{}; }; -class ZmqSocket { +class ZmqSocketReceiver { void *m_context{nullptr}; void *m_socket{nullptr}; std::string m_endpoint; @@ -76,11 +76,11 @@ class ZmqSocket { bool decode_header(zmqHeader &h); public: - ZmqSocket(const std::string &endpoint); - ~ZmqSocket(); - ZmqSocket(const ZmqSocket &) = delete; - ZmqSocket operator=(const ZmqSocket &) = delete; - ZmqSocket(ZmqSocket &&) = delete; + ZmqSocketReceiver(const std::string &endpoint); + ~ZmqSocketReceiver(); + ZmqSocketReceiver(const ZmqSocketReceiver &) = delete; + ZmqSocketReceiver operator=(const ZmqSocketReceiver &) = delete; + ZmqSocketReceiver(ZmqSocketReceiver &&) = delete; void connect(); void disconnect(); diff --git a/core/src/ZmqSocket.cpp b/network_io/src/ZmqSocketReceiver.cpp similarity index 81% rename from core/src/ZmqSocket.cpp rename to network_io/src/ZmqSocketReceiver.cpp index ab4fb2b..8b2592a 100644 --- a/core/src/ZmqSocket.cpp +++ b/network_io/src/ZmqSocketReceiver.cpp @@ -1,14 +1,14 @@ -#include "aare/ZmqSocket.hpp" +#include "aare/ZmqSocketReceiver.hpp" #include #include namespace aare { -ZmqSocket::ZmqSocket(const std::string &endpoint) : m_endpoint(endpoint) { +ZmqSocketReceiver::ZmqSocketReceiver(const std::string &endpoint) : m_endpoint(endpoint) { memset(m_header_buffer, 0, m_max_header_size); } -void ZmqSocket::connect() { +void ZmqSocketReceiver::connect() { m_context = zmq_ctx_new(); m_socket = zmq_socket(m_context, ZMQ_SUB); fmt::print("Setting ZMQ_RCVHWM to {}\n", m_zmq_hwm); @@ -26,24 +26,24 @@ void ZmqSocket::connect() { zmq_setsockopt(m_socket, ZMQ_SUBSCRIBE, "", 0); } -void ZmqSocket::disconnect() { +void ZmqSocketReceiver::disconnect() { zmq_close(m_socket); zmq_ctx_destroy(m_context); m_socket = nullptr; m_context = nullptr; } -ZmqSocket::~ZmqSocket() { +ZmqSocketReceiver::~ZmqSocketReceiver() { if (m_socket) disconnect(); delete[] m_header_buffer; } -void ZmqSocket::set_zmq_hwm(int hwm) { m_zmq_hwm = hwm; } +void ZmqSocketReceiver::set_zmq_hwm(int hwm) { m_zmq_hwm = hwm; } -void ZmqSocket::set_timeout_ms(int n) { m_timeout_ms = n; } +void ZmqSocketReceiver::set_timeout_ms(int n) { m_timeout_ms = n; } -int ZmqSocket::receive(zmqHeader &header, std::byte *data) { +int ZmqSocketReceiver::receive(zmqHeader &header, std::byte *data) { // receive header int header_bytes_received = zmq_recv(m_socket, m_header_buffer, m_max_header_size, 0); @@ -74,7 +74,7 @@ int ZmqSocket::receive(zmqHeader &header, std::byte *data) { return 1; } -bool ZmqSocket::decode_header(zmqHeader &h) { +bool ZmqSocketReceiver::decode_header(zmqHeader &h) { // TODO: implement return true; }