diff --git a/CMakeLists.txt b/CMakeLists.txt index 119bcd7..947552a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -26,6 +26,7 @@ option(AARE_FETCH_FMT "Use FetchContent to download fmt" ON) option(AARE_FETCH_PYBIND11 "Use FetchContent to download pybind11" ON) option(AARE_FETCH_CATCH "Use FetchContent to download catch2" ON) option(AARE_FETCH_JSON "Use FetchContent to download nlohmann::json" ON) +option(AARE_FETCH_ZMQ "Use FetchContent to download libzmq" ON) #Convenience option to use system libraries option(AARE_SYSTEM_LIBRARIES "Use system libraries" OFF) @@ -35,11 +36,31 @@ if(AARE_SYSTEM_LIBRARIES) set(AARE_FETCH_PYBIND11 OFF CACHE BOOL "Disabled FetchContent for pybind11" FORCE) set(AARE_FETCH_CATCH OFF CACHE BOOL "Disabled FetchContent for catch2" FORCE) set(AARE_FETCH_JSON OFF CACHE BOOL "Disabled FetchContent for nlohmann::json" FORCE) + set(AARE_FETCH_ZMQ OFF CACHE BOOL "Disabled FetchContent for libzmq" FORCE) endif() set(CMAKE_EXPORT_COMPILE_COMMANDS ON) +if(AARE_FETCH_ZMQ) + FetchContent_Declare( + libzmq + GIT_REPOSITORY https://github.com/zeromq/libzmq.git + GIT_TAG v4.3.4 + ) + # TODO! Verify that this is what we want to do in aare + # Using GetProperties and Populate to be able to exclude zmq + # from install (not possible with FetchContent_MakeAvailable(libzmq)) + FetchContent_GetProperties(libzmq) + if(NOT libzmq_POPULATED) + FetchContent_Populate(libzmq) + add_subdirectory(${libzmq_SOURCE_DIR} ${libzmq_BINARY_DIR} EXCLUDE_FROM_ALL) + endif() +else() + find_package(ZeroMQ 4 REQUIRED) +endif() + + if (AARE_FETCH_FMT) set(FMT_TEST OFF CACHE INTERNAL "disabling fmt tests") FetchContent_Declare( diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index 9db8c47..cb32367 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -4,6 +4,7 @@ set(SourceFiles ${CMAKE_CURRENT_SOURCE_DIR}/src/defs.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/DType.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/Frame.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/src/ZmqSocket.cpp ) @@ -11,7 +12,7 @@ add_library(core STATIC ${SourceFiles}) target_include_directories(core PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include) -target_link_libraries(core PUBLIC fmt::fmt PRIVATE aare_compiler_flags utils) +target_link_libraries(core PUBLIC fmt::fmt PRIVATE aare_compiler_flags utils libzmq) if (AARE_PYTHON_BINDINGS) set_property(TARGET core PROPERTY POSITION_INDEPENDENT_CODE ON) diff --git a/core/include/aare/ZmqSocket.hpp b/core/include/aare/ZmqSocket.hpp new file mode 100644 index 0000000..96973c9 --- /dev/null +++ b/core/include/aare/ZmqSocket.hpp @@ -0,0 +1,97 @@ +#pragma once + +#include +#include +#include +#include + + +// Socket to receive data from a ZMQ publisher +// needs to be in sync with the main library (or maybe better use the versioning in the header) + +// forward declare zmq_msg_t to avoid including zmq.h in the header +class zmq_msg_t; + +namespace aare{ + +/** zmq header structure (from slsDetectorPackage)*/ +struct zmqHeader { + /** true if incoming data, false if end of acquisition */ + bool data{true}; + uint32_t jsonversion{0}; + uint32_t dynamicRange{0}; + uint64_t fileIndex{0}; + /** number of detectors/port in x axis */ + uint32_t ndetx{0}; + /** number of detectors/port in y axis */ + uint32_t ndety{0}; + /** number of pixels/channels in x axis for this zmq socket */ + uint32_t npixelsx{0}; + /** number of pixels/channels in y axis for this zmq socket */ + uint32_t npixelsy{0}; + /** number of bytes for an image in this socket */ + uint32_t imageSize{0}; + /** frame number from detector */ + uint64_t acqIndex{0}; + /** frame index (starting at 0 for each acquisition) */ + uint64_t frameIndex{0}; + /** progress in percentage */ + double progress{0}; + /** file name prefix */ + std::string fname; + /** header from detector */ + uint64_t frameNumber{0}; + uint32_t expLength{0}; + uint32_t packetNumber{0}; + uint64_t detSpec1{0}; + uint64_t timestamp{0}; + uint16_t modId{0}; + uint16_t row{0}; + uint16_t column{0}; + uint16_t detSpec2{0}; + uint32_t detSpec3{0}; + uint16_t detSpec4{0}; + uint8_t detType{0}; + uint8_t version{0}; + /** if rows of image should be flipped */ + int flipRows{0}; + /** quad type (eiger hardware specific) */ + uint32_t quad{0}; + /** true if complete image, else missing packets */ + bool completeImage{false}; + /** additional json header */ + std::map addJsonHeader; + /** (xmin, xmax, ymin, ymax) roi only in files written */ + std::array rx_roi{}; +}; + + +class ZmqSocket{ + void *m_context{nullptr}; + void *m_socket{nullptr}; + std::string m_endpoint; + int m_zmq_hwm{1000}; + int m_timeout_ms{1000}; + constexpr static size_t m_max_header_size = 1024; + char* m_header_buffer = new char[m_max_header_size]; + + bool decode_header(zmqHeader &h); + +public: + ZmqSocket(const std::string& endpoint); + ~ZmqSocket(); + ZmqSocket(const ZmqSocket&) = delete; + ZmqSocket operator=(const ZmqSocket&) = delete; + ZmqSocket(ZmqSocket&&) = delete; + + void connect(); + void disconnect(); + void set_zmq_hwm(int hwm); + void set_timeout_ms(int n); + + int receive(zmqHeader &header, std::byte *data); + + +}; + +} // namespace aare \ No newline at end of file diff --git a/core/src/ZmqSocket.cpp b/core/src/ZmqSocket.cpp new file mode 100644 index 0000000..7e42f37 --- /dev/null +++ b/core/src/ZmqSocket.cpp @@ -0,0 +1,87 @@ +#include "aare/ZmqSocket.hpp" +#include +#include + +namespace aare{ + + +ZmqSocket::ZmqSocket(const std::string& endpoint):m_endpoint(endpoint){ + memset(m_header_buffer, 0, m_max_header_size); +} + +void ZmqSocket::connect(){ + m_context = zmq_ctx_new(); + m_socket = zmq_socket(m_context, ZMQ_SUB); + fmt::print("Setting ZMQ_RCVHWM to {}\n", m_zmq_hwm); + int rc = zmq_setsockopt(m_socket, ZMQ_RCVHWM, &m_zmq_hwm, sizeof(m_zmq_hwm)); //should be set before connect + if (rc) + throw std::runtime_error(fmt::format("Could not set ZMQ_RCVHWM: {}", strerror(errno))); + + int bufsize = 1024*1024*m_zmq_hwm; + fmt::print("Setting ZMQ_RCVBUF to: {} MB\n", bufsize/(1024*1024)); + rc = zmq_setsockopt(m_socket, ZMQ_RCVBUF, &bufsize, sizeof(bufsize)); + if (rc) + throw std::runtime_error(fmt::format("Could not set ZMQ_RCVBUF: {}", strerror(errno))); + + zmq_connect(m_socket, m_endpoint.c_str()); + zmq_setsockopt(m_socket, ZMQ_SUBSCRIBE, "", 0); +} + +void ZmqSocket::disconnect() { + zmq_close(m_socket); + zmq_ctx_destroy(m_context); + m_socket = nullptr; + m_context = nullptr; +} + +ZmqSocket::~ZmqSocket(){ + if (m_socket) + disconnect(); + delete[] m_header_buffer; +} + +void ZmqSocket::set_zmq_hwm(int hwm){ + m_zmq_hwm = hwm; +} + +void ZmqSocket::set_timeout_ms(int n){ + m_timeout_ms = n; +} + +int ZmqSocket::receive(zmqHeader &header, std::byte *data){ + + //receive header + int header_bytes_received = zmq_recv(m_socket, m_header_buffer, m_max_header_size, 0); + m_header_buffer[header_bytes_received] = '\0'; //make sure we zero terminate + if (header_bytes_received < 0){ + fmt::print("Error receiving header: {}\n", strerror(errno)); + return -1; + } + fmt::print("Bytes: {}, Header: {}\n", header_bytes_received, m_header_buffer); + + //decode header + if (!decode_header(header)){ + fmt::print("Error decoding header\n"); + return -1; + } + + //do we have a multipart message (data following header)? + int more; + size_t more_size = sizeof(more); + zmq_getsockopt(m_socket, ZMQ_RCVMORE, &more, &more_size); + if (!more) { + return 0; //no data following header + }else{ + int data_bytes_received = zmq_recv(m_socket, data, 1024*1024*2, 0); //TODO! configurable size!!!! + if (data_bytes_received == -1) + throw std::runtime_error("Got half of a multipart msg!!!"); + } + return 1; +} + +bool ZmqSocket::decode_header(zmqHeader &h){ + //TODO: implement + return true; +} + +} // namespace aare diff --git a/etc/virtual_jf.config b/etc/virtual_jf.config new file mode 100644 index 0000000..96ac0db --- /dev/null +++ b/etc/virtual_jf.config @@ -0,0 +1,12 @@ +hostname localhost +rx_hostname localhost + +udp_dstip auto +powerchip 1 +frames 1 +exptime 5us +period 100ms + +rx_zmqip 127.0.0.1 +rx_zmqport 5555 +rx_zmqstream 1 diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 7d324b5..2d8bb9c 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -1,5 +1,5 @@ -set(EXAMPLE_LIST "json_example;logger_example;numpy_read_example;multiport_example;raw_example") +set(EXAMPLE_LIST "json_example;logger_example;numpy_read_example;multiport_example;raw_example;zmq_example;") set(EXAMPLE_LIST "${EXAMPLE_LIST};mythen_example;numpy_write_example") foreach(example ${EXAMPLE_LIST}) add_executable(${example} ${example}.cpp) diff --git a/examples/zmq_example.cpp b/examples/zmq_example.cpp new file mode 100644 index 0000000..56a1dc1 --- /dev/null +++ b/examples/zmq_example.cpp @@ -0,0 +1,19 @@ +#include +#include +#include "aare/ZmqSocket.hpp" + + + +int main(){ + std::string endpoint = "tcp://localhost:5555"; + aare::ZmqSocket socket(endpoint); + socket.connect(); + char* data = new char[1024*1024*10]; + aare::zmqHeader header; + while(true){ + int rc = socket.receive(header, reinterpret_cast(data)); + + } + delete[] data; + return 0; +} \ No newline at end of file