mirror of
https://github.com/slsdetectorgroup/aare.git
synced 2025-06-08 13:40:39 +02:00
Merge pull request #54 from slsdetectorgroup/feature/zmq_sender_example
zmq sender example
This commit is contained in:
commit
cd46f59b99
2
.github/workflows/common-workflow.yml
vendored
2
.github/workflows/common-workflow.yml
vendored
@ -45,7 +45,7 @@ jobs:
|
|||||||
pwd
|
pwd
|
||||||
export PROJECT_ROOT_DIR="."
|
export PROJECT_ROOT_DIR="."
|
||||||
ls build/examples/*_example
|
ls build/examples/*_example
|
||||||
find build/examples -name "*_example" -not -name "zmq_example" | xargs -I {} -n 1 -t bash -c {}
|
find build/examples -name "*_example" -not -name "zmq_*" | xargs -I {} -n 1 -t bash -c {}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -148,11 +148,12 @@ endif()
|
|||||||
add_subdirectory(core)
|
add_subdirectory(core)
|
||||||
add_subdirectory(file_io)
|
add_subdirectory(file_io)
|
||||||
add_subdirectory(utils)
|
add_subdirectory(utils)
|
||||||
|
add_subdirectory(network_io)
|
||||||
|
|
||||||
|
|
||||||
#Overall target to link to when using the library
|
#Overall target to link to when using the library
|
||||||
add_library(aare INTERFACE)
|
add_library(aare INTERFACE)
|
||||||
target_link_libraries(aare INTERFACE core file_io utils)
|
target_link_libraries(aare INTERFACE core file_io utils network_io)
|
||||||
target_include_directories(aare INTERFACE
|
target_include_directories(aare INTERFACE
|
||||||
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
|
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
|
||||||
$<INSTALL_INTERFACE:include>
|
$<INSTALL_INTERFACE:include>
|
||||||
|
@ -5,6 +5,6 @@ channels:
|
|||||||
dependencies:
|
dependencies:
|
||||||
- fmt
|
- fmt
|
||||||
- pybind11
|
- pybind11
|
||||||
- nlohmann_json
|
- nlohmann_json # should be removed
|
||||||
- catch2
|
- catch2
|
||||||
- zeromq
|
- zeromq
|
||||||
|
@ -4,7 +4,6 @@ set(SourceFiles
|
|||||||
${CMAKE_CURRENT_SOURCE_DIR}/src/defs.cpp
|
${CMAKE_CURRENT_SOURCE_DIR}/src/defs.cpp
|
||||||
${CMAKE_CURRENT_SOURCE_DIR}/src/DType.cpp
|
${CMAKE_CURRENT_SOURCE_DIR}/src/DType.cpp
|
||||||
${CMAKE_CURRENT_SOURCE_DIR}/src/Frame.cpp
|
${CMAKE_CURRENT_SOURCE_DIR}/src/Frame.cpp
|
||||||
${CMAKE_CURRENT_SOURCE_DIR}/src/ZmqSocket.cpp
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@ -12,7 +11,7 @@ add_library(core STATIC ${SourceFiles})
|
|||||||
target_include_directories(core PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include)
|
target_include_directories(core PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include)
|
||||||
|
|
||||||
|
|
||||||
target_link_libraries(core PUBLIC fmt::fmt PRIVATE aare_compiler_flags utils libzmq)
|
target_link_libraries(core PUBLIC fmt::fmt PRIVATE aare_compiler_flags utils )
|
||||||
|
|
||||||
if (AARE_PYTHON_BINDINGS)
|
if (AARE_PYTHON_BINDINGS)
|
||||||
set_property(TARGET core PROPERTY POSITION_INDEPENDENT_CODE ON)
|
set_property(TARGET core PROPERTY POSITION_INDEPENDENT_CODE ON)
|
||||||
|
@ -1,93 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
#include <array>
|
|
||||||
#include <cstdint>
|
|
||||||
#include <map>
|
|
||||||
#include <string>
|
|
||||||
|
|
||||||
// Socket to receive data from a ZMQ publisher
|
|
||||||
// needs to be in sync with the main library (or maybe better use the versioning in the header)
|
|
||||||
|
|
||||||
// forward declare zmq_msg_t to avoid including zmq.h in the header
|
|
||||||
class zmq_msg_t;
|
|
||||||
|
|
||||||
namespace aare {
|
|
||||||
|
|
||||||
/** zmq header structure (from slsDetectorPackage)*/
|
|
||||||
struct zmqHeader {
|
|
||||||
/** true if incoming data, false if end of acquisition */
|
|
||||||
bool data{true};
|
|
||||||
uint32_t jsonversion{0};
|
|
||||||
uint32_t dynamicRange{0};
|
|
||||||
uint64_t fileIndex{0};
|
|
||||||
/** number of detectors/port in x axis */
|
|
||||||
uint32_t ndetx{0};
|
|
||||||
/** number of detectors/port in y axis */
|
|
||||||
uint32_t ndety{0};
|
|
||||||
/** number of pixels/channels in x axis for this zmq socket */
|
|
||||||
uint32_t npixelsx{0};
|
|
||||||
/** number of pixels/channels in y axis for this zmq socket */
|
|
||||||
uint32_t npixelsy{0};
|
|
||||||
/** number of bytes for an image in this socket */
|
|
||||||
uint32_t imageSize{0};
|
|
||||||
/** frame number from detector */
|
|
||||||
uint64_t acqIndex{0};
|
|
||||||
/** frame index (starting at 0 for each acquisition) */
|
|
||||||
uint64_t frameIndex{0};
|
|
||||||
/** progress in percentage */
|
|
||||||
double progress{0};
|
|
||||||
/** file name prefix */
|
|
||||||
std::string fname;
|
|
||||||
/** header from detector */
|
|
||||||
uint64_t frameNumber{0};
|
|
||||||
uint32_t expLength{0};
|
|
||||||
uint32_t packetNumber{0};
|
|
||||||
uint64_t detSpec1{0};
|
|
||||||
uint64_t timestamp{0};
|
|
||||||
uint16_t modId{0};
|
|
||||||
uint16_t row{0};
|
|
||||||
uint16_t column{0};
|
|
||||||
uint16_t detSpec2{0};
|
|
||||||
uint32_t detSpec3{0};
|
|
||||||
uint16_t detSpec4{0};
|
|
||||||
uint8_t detType{0};
|
|
||||||
uint8_t version{0};
|
|
||||||
/** if rows of image should be flipped */
|
|
||||||
int flipRows{0};
|
|
||||||
/** quad type (eiger hardware specific) */
|
|
||||||
uint32_t quad{0};
|
|
||||||
/** true if complete image, else missing packets */
|
|
||||||
bool completeImage{false};
|
|
||||||
/** additional json header */
|
|
||||||
std::map<std::string, std::string> addJsonHeader;
|
|
||||||
/** (xmin, xmax, ymin, ymax) roi only in files written */
|
|
||||||
std::array<int, 4> rx_roi{};
|
|
||||||
};
|
|
||||||
|
|
||||||
class ZmqSocket {
|
|
||||||
void *m_context{nullptr};
|
|
||||||
void *m_socket{nullptr};
|
|
||||||
std::string m_endpoint;
|
|
||||||
int m_zmq_hwm{1000};
|
|
||||||
int m_timeout_ms{1000};
|
|
||||||
constexpr static size_t m_max_header_size = 1024;
|
|
||||||
char *m_header_buffer = new char[m_max_header_size];
|
|
||||||
|
|
||||||
bool decode_header(zmqHeader &h);
|
|
||||||
|
|
||||||
public:
|
|
||||||
ZmqSocket(const std::string &endpoint);
|
|
||||||
~ZmqSocket();
|
|
||||||
ZmqSocket(const ZmqSocket &) = delete;
|
|
||||||
ZmqSocket operator=(const ZmqSocket &) = delete;
|
|
||||||
ZmqSocket(ZmqSocket &&) = delete;
|
|
||||||
|
|
||||||
void connect();
|
|
||||||
void disconnect();
|
|
||||||
void set_zmq_hwm(int hwm);
|
|
||||||
void set_timeout_ms(int n);
|
|
||||||
|
|
||||||
int receive(zmqHeader &header, std::byte *data);
|
|
||||||
};
|
|
||||||
|
|
||||||
} // namespace aare
|
|
@ -1,6 +1,6 @@
|
|||||||
|
|
||||||
set(EXAMPLE_LIST "json_example;logger_example;numpy_read_example;multiport_example;raw_example;zmq_example;")
|
set(EXAMPLE_LIST "json_example;logger_example;numpy_read_example;multiport_example;raw_example;")
|
||||||
set(EXAMPLE_LIST "${EXAMPLE_LIST};mythen_example;numpy_write_example")
|
set(EXAMPLE_LIST "${EXAMPLE_LIST};mythen_example;numpy_write_example;zmq_receiver_example;zmq_sender_example;")
|
||||||
foreach(example ${EXAMPLE_LIST})
|
foreach(example ${EXAMPLE_LIST})
|
||||||
add_executable(${example} ${example}.cpp)
|
add_executable(${example} ${example}.cpp)
|
||||||
target_link_libraries(${example} PUBLIC aare PRIVATE aare_compiler_flags)
|
target_link_libraries(${example} PUBLIC aare PRIVATE aare_compiler_flags)
|
||||||
|
@ -1,16 +0,0 @@
|
|||||||
#include "aare/ZmqSocket.hpp"
|
|
||||||
#include <fmt/core.h>
|
|
||||||
#include <string>
|
|
||||||
|
|
||||||
int main() {
|
|
||||||
std::string endpoint = "tcp://localhost:5555";
|
|
||||||
aare::ZmqSocket socket(endpoint);
|
|
||||||
socket.connect();
|
|
||||||
char *data = new char[1024 * 1024 * 10];
|
|
||||||
aare::zmqHeader header;
|
|
||||||
while (true) {
|
|
||||||
int rc = socket.receive(header, reinterpret_cast<std::byte *>(data));
|
|
||||||
}
|
|
||||||
delete[] data;
|
|
||||||
return 0;
|
|
||||||
}
|
|
27
examples/zmq_receiver_example.cpp
Normal file
27
examples/zmq_receiver_example.cpp
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
#include "aare/ZmqSocketReceiver.hpp"
|
||||||
|
#include <cassert>
|
||||||
|
#include <fmt/core.h>
|
||||||
|
#include <string>
|
||||||
|
|
||||||
|
int main() {
|
||||||
|
std::string endpoint = "tcp://localhost:5555";
|
||||||
|
aare::ZmqSocketReceiver socket(endpoint);
|
||||||
|
socket.connect();
|
||||||
|
char *data = new char[1024 * 1024 * 10];
|
||||||
|
aare::ZmqHeader header;
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
int rc = socket.receive(header, reinterpret_cast<std::byte *>(data));
|
||||||
|
aare::logger::info("Received bytes", rc, "Received header: ", header.to_string());
|
||||||
|
auto *data_int = reinterpret_cast<uint32_t *>(data);
|
||||||
|
for (uint32_t i = 0; i < header.npixelsx; i++) {
|
||||||
|
for (uint32_t j = 0; j < header.npixelsy; j++) {
|
||||||
|
// verify that the sent data is correct
|
||||||
|
assert(data_int[i * header.npixelsy + j] == i + j);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
aare::logger::info("Frame verified");
|
||||||
|
}
|
||||||
|
delete[] data;
|
||||||
|
return 0;
|
||||||
|
}
|
37
examples/zmq_sender_example.cpp
Normal file
37
examples/zmq_sender_example.cpp
Normal file
@ -0,0 +1,37 @@
|
|||||||
|
#include "aare/Frame.hpp"
|
||||||
|
#include "aare/ZmqSocketSender.hpp"
|
||||||
|
#include "aare/utils/logger.hpp"
|
||||||
|
|
||||||
|
#include <fmt/core.h>
|
||||||
|
#include <string>
|
||||||
|
#include <unistd.h> // sleep
|
||||||
|
using namespace aare;
|
||||||
|
|
||||||
|
int main() {
|
||||||
|
std::string endpoint = "tcp://*:5555";
|
||||||
|
aare::ZmqSocketSender socket(endpoint);
|
||||||
|
socket.bind();
|
||||||
|
Frame frame(1024, 1024, sizeof(uint32_t) * 8);
|
||||||
|
for (int i = 0; i < 1024; i++) {
|
||||||
|
for (int j = 0; j < 1024; j++) {
|
||||||
|
frame.set(i, j, i + j);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
aare::ZmqHeader header;
|
||||||
|
header.npixelsx = 1024;
|
||||||
|
header.npixelsy = 1024;
|
||||||
|
header.imageSize = sizeof(uint32_t) * 1024 * 1024;
|
||||||
|
header.dynamicRange = 32;
|
||||||
|
|
||||||
|
int i = 0;
|
||||||
|
while (true) {
|
||||||
|
aare::logger::info("Sending frame:", i++);
|
||||||
|
aare::logger::info("Header size:", sizeof(header.to_string()));
|
||||||
|
aare::logger::info("Frame size:", frame.size(), "\n");
|
||||||
|
|
||||||
|
int rc = socket.send(header, frame.data(), frame.size());
|
||||||
|
aare::logger::info("Sent bytes", rc);
|
||||||
|
sleep(1);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
@ -12,7 +12,10 @@ class RawFile : public FileInterface {
|
|||||||
|
|
||||||
public:
|
public:
|
||||||
std::filesystem::path m_fname; // TO be made private!
|
std::filesystem::path m_fname; // TO be made private!
|
||||||
void write(Frame &frame) override{};
|
|
||||||
|
// pragma to ignore warnings
|
||||||
|
void write(Frame &frame) override { throw std::runtime_error("Not implemented"); };
|
||||||
|
|
||||||
Frame read() override { return get_frame(this->current_frame++); };
|
Frame read() override { return get_frame(this->current_frame++); };
|
||||||
std::vector<Frame> read(size_t n_frames) override;
|
std::vector<Frame> read(size_t n_frames) override;
|
||||||
void read_into(std::byte *image_buf) override { return get_frame_into(this->current_frame++, image_buf); };
|
void read_into(std::byte *image_buf) override { return get_frame_into(this->current_frame++, image_buf); };
|
||||||
|
@ -89,7 +89,7 @@ template <typename DataType> size_t SubFile::read_impl_flip(std::byte *buffer) {
|
|||||||
|
|
||||||
size_t SubFile::frame_number(int frame_index) {
|
size_t SubFile::frame_number(int frame_index) {
|
||||||
sls_detector_header h{};
|
sls_detector_header h{};
|
||||||
FILE *fp = fopen(this->m_fname.c_str(), "r");
|
fp = fopen(this->m_fname.c_str(), "r");
|
||||||
if (!fp)
|
if (!fp)
|
||||||
throw std::runtime_error(fmt::format("Could not open: {} for reading", m_fname.c_str()));
|
throw std::runtime_error(fmt::format("Could not open: {} for reading", m_fname.c_str()));
|
||||||
fseek(fp, (sizeof(sls_detector_header) + bytes_per_part()) * frame_index, SEEK_SET);
|
fseek(fp, (sizeof(sls_detector_header) + bytes_per_part()) * frame_index, SEEK_SET);
|
||||||
|
@ -19,7 +19,7 @@ TEST_CASE("Read a 1D numpy file with int32 data type") {
|
|||||||
|
|
||||||
// use the load function to read the full file into a NDArray
|
// use the load function to read the full file into a NDArray
|
||||||
auto data = f.load<int32_t, 1>();
|
auto data = f.load<int32_t, 1>();
|
||||||
for (size_t i = 0; i < 10; i++) {
|
for (int32_t i = 0; i < 10; i++) {
|
||||||
REQUIRE(data(i) == i);
|
REQUIRE(data(i) == i);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
38
network_io/CMakeLists.txt
Normal file
38
network_io/CMakeLists.txt
Normal file
@ -0,0 +1,38 @@
|
|||||||
|
|
||||||
|
FetchContent_Declare(
|
||||||
|
simdjson
|
||||||
|
GIT_REPOSITORY https://github.com/simdjson/simdjson.git
|
||||||
|
GIT_TAG tags/v3.8.0
|
||||||
|
GIT_SHALLOW TRUE
|
||||||
|
)
|
||||||
|
|
||||||
|
FetchContent_MakeAvailable(simdjson)
|
||||||
|
# hide simdjson warnings by making the includes system includes
|
||||||
|
get_target_property(_inc simdjson INTERFACE_INCLUDE_DIRECTORIES)
|
||||||
|
target_include_directories(simdjson SYSTEM INTERFACE ${_inc})
|
||||||
|
|
||||||
|
|
||||||
|
add_library(network_io STATIC
|
||||||
|
src/ZmqSocketReceiver.cpp
|
||||||
|
src/ZmqSocketSender.cpp
|
||||||
|
src/ZmqSocket.cpp
|
||||||
|
src/ZmqHeader.cpp
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
target_include_directories(network_io PUBLIC include)
|
||||||
|
target_link_libraries(network_io PRIVATE libzmq fmt::fmt core utils aare_compiler_flags )
|
||||||
|
target_link_libraries(network_io PUBLIC simdjson)
|
||||||
|
|
||||||
|
if(AARE_PYTHON_BINDINGS)
|
||||||
|
set_property(TARGET file_io PROPERTY POSITION_INDEPENDENT_CODE ON)
|
||||||
|
endif()
|
||||||
|
|
||||||
|
if(AARE_TESTS)
|
||||||
|
set(TestSources
|
||||||
|
${CMAKE_CURRENT_SOURCE_DIR}/test/ZmqHeader.test.cpp
|
||||||
|
)
|
||||||
|
target_sources(tests PRIVATE ${TestSources} )
|
||||||
|
target_link_libraries(tests PRIVATE network_io core utils)
|
||||||
|
endif()
|
138
network_io/include/aare/ZmqHeader.hpp
Normal file
138
network_io/include/aare/ZmqHeader.hpp
Normal file
@ -0,0 +1,138 @@
|
|||||||
|
#include "aare/utils/logger.hpp"
|
||||||
|
#include "simdjson.h"
|
||||||
|
#include <array>
|
||||||
|
#include <cstdint>
|
||||||
|
#include <map>
|
||||||
|
#include <string>
|
||||||
|
namespace simdjson {
|
||||||
|
/**
|
||||||
|
* @brief cast a simdjson::ondemand::value to a std::array<int,4>
|
||||||
|
* useful for writing rx_roi from json header
|
||||||
|
*/
|
||||||
|
template <> simdjson_inline simdjson::simdjson_result<std::array<int, 4>> simdjson::ondemand::value::get() noexcept {
|
||||||
|
ondemand::array array;
|
||||||
|
auto error = get_array().get(array);
|
||||||
|
if (error) {
|
||||||
|
return error;
|
||||||
|
}
|
||||||
|
std::array<int, 4> arr;
|
||||||
|
int i = 0;
|
||||||
|
for (auto v : array) {
|
||||||
|
int64_t val;
|
||||||
|
error = v.get_int64().get(val);
|
||||||
|
|
||||||
|
if (error) {
|
||||||
|
return error;
|
||||||
|
}
|
||||||
|
arr[i++] = val;
|
||||||
|
}
|
||||||
|
return arr;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief cast a simdjson::ondemand::value to a uint32_t
|
||||||
|
* adds a check for 32bit overflow
|
||||||
|
*/
|
||||||
|
template <> simdjson_inline simdjson::simdjson_result<uint32_t> simdjson::ondemand::value::get() noexcept {
|
||||||
|
size_t val;
|
||||||
|
auto error = get_uint64().get(val);
|
||||||
|
if (error) {
|
||||||
|
return error;
|
||||||
|
}
|
||||||
|
if (val > std::numeric_limits<uint32_t>::max()) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
return static_cast<uint32_t>(val);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief cast a simdjson::ondemand::value to a std::map<std::string, std::string>
|
||||||
|
*/
|
||||||
|
template <>
|
||||||
|
simdjson_inline simdjson::simdjson_result<std::map<std::string, std::string>>
|
||||||
|
simdjson::ondemand::value::get() noexcept {
|
||||||
|
std::map<std::string, std::string> map;
|
||||||
|
ondemand::object obj;
|
||||||
|
auto error = get_object().get(obj);
|
||||||
|
if (error) {
|
||||||
|
return error;
|
||||||
|
}
|
||||||
|
for (auto field : obj) {
|
||||||
|
simdjson::ondemand::raw_json_string tmp;
|
||||||
|
error = field.key().get(tmp);
|
||||||
|
if (error) {
|
||||||
|
return error;
|
||||||
|
}
|
||||||
|
error = field.value().get(tmp);
|
||||||
|
if (error) {
|
||||||
|
return error;
|
||||||
|
}
|
||||||
|
std::string_view key_view = field.unescaped_key();
|
||||||
|
std::string key_str(key_view.data(), key_view.size());
|
||||||
|
std::string_view value_view = field.value().get_string();
|
||||||
|
map[key_str] = {value_view.data(), value_view.size()};
|
||||||
|
}
|
||||||
|
return map;
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace simdjson
|
||||||
|
|
||||||
|
namespace aare {
|
||||||
|
|
||||||
|
/** zmq header structure (from slsDetectorPackage)*/
|
||||||
|
struct ZmqHeader {
|
||||||
|
/** true if incoming data, false if end of acquisition */
|
||||||
|
bool data{true};
|
||||||
|
uint32_t jsonversion{0};
|
||||||
|
uint32_t dynamicRange{0};
|
||||||
|
uint64_t fileIndex{0};
|
||||||
|
/** number of detectors/port in x axis */
|
||||||
|
uint32_t ndetx{0};
|
||||||
|
/** number of detectors/port in y axis */
|
||||||
|
uint32_t ndety{0};
|
||||||
|
/** number of pixels/channels in x axis for this zmq socket */
|
||||||
|
uint32_t npixelsx{0};
|
||||||
|
/** number of pixels/channels in y axis for this zmq socket */
|
||||||
|
uint32_t npixelsy{0};
|
||||||
|
/** number of bytes for an image in this socket */
|
||||||
|
uint32_t imageSize{0};
|
||||||
|
/** frame number from detector */
|
||||||
|
uint64_t acqIndex{0};
|
||||||
|
/** frame index (starting at 0 for each acquisition) */
|
||||||
|
uint64_t frameIndex{0};
|
||||||
|
/** progress in percentage */
|
||||||
|
double progress{0};
|
||||||
|
/** file name prefix */
|
||||||
|
std::string fname;
|
||||||
|
/** header from detector */
|
||||||
|
uint64_t frameNumber{0};
|
||||||
|
uint32_t expLength{0};
|
||||||
|
uint32_t packetNumber{0};
|
||||||
|
uint64_t detSpec1{0};
|
||||||
|
uint64_t timestamp{0};
|
||||||
|
uint16_t modId{0};
|
||||||
|
uint16_t row{0};
|
||||||
|
uint16_t column{0};
|
||||||
|
uint16_t detSpec2{0};
|
||||||
|
uint32_t detSpec3{0};
|
||||||
|
uint16_t detSpec4{0};
|
||||||
|
uint8_t detType{0};
|
||||||
|
uint8_t version{0};
|
||||||
|
/** if rows of image should be flipped */
|
||||||
|
int flipRows{0};
|
||||||
|
/** quad type (eiger hardware specific) */
|
||||||
|
uint32_t quad{0};
|
||||||
|
/** true if complete image, else missing packets */
|
||||||
|
bool completeImage{false};
|
||||||
|
/** additional json header */
|
||||||
|
std::map<std::string, std::string> addJsonHeader;
|
||||||
|
/** (xmin, xmax, ymin, ymax) roi only in files written */
|
||||||
|
std::array<int, 4> rx_roi{};
|
||||||
|
|
||||||
|
/** serialize struct to json string */
|
||||||
|
std::string to_string() const;
|
||||||
|
void from_string(std::string &s);
|
||||||
|
// compare operator
|
||||||
|
bool operator==(const ZmqHeader &other) const;
|
||||||
|
};
|
||||||
|
} // namespace aare
|
40
network_io/include/aare/ZmqSocket.hpp
Normal file
40
network_io/include/aare/ZmqSocket.hpp
Normal file
@ -0,0 +1,40 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <array>
|
||||||
|
#include <map>
|
||||||
|
#include <string>
|
||||||
|
|
||||||
|
// Socket to receive data from a ZMQ publisher
|
||||||
|
// needs to be in sync with the main library (or maybe better use the versioning in the header)
|
||||||
|
|
||||||
|
// forward declare zmq_msg_t to avoid including zmq.h in the header
|
||||||
|
class zmq_msg_t;
|
||||||
|
|
||||||
|
namespace aare {
|
||||||
|
|
||||||
|
class ZmqSocket {
|
||||||
|
protected:
|
||||||
|
void *m_context{nullptr};
|
||||||
|
void *m_socket{nullptr};
|
||||||
|
std::string m_endpoint;
|
||||||
|
int m_zmq_hwm{1000};
|
||||||
|
int m_timeout_ms{1000};
|
||||||
|
size_t m_potential_frame_size{1024 * 1024};
|
||||||
|
constexpr static size_t m_max_header_size = 1024;
|
||||||
|
char *m_header_buffer = new char[m_max_header_size];
|
||||||
|
|
||||||
|
public:
|
||||||
|
ZmqSocket() = default;
|
||||||
|
~ZmqSocket();
|
||||||
|
|
||||||
|
ZmqSocket(const ZmqSocket &) = delete;
|
||||||
|
ZmqSocket operator=(const ZmqSocket &) = delete;
|
||||||
|
ZmqSocket(ZmqSocket &&) = delete;
|
||||||
|
|
||||||
|
void disconnect();
|
||||||
|
void set_zmq_hwm(int hwm);
|
||||||
|
void set_timeout_ms(int n);
|
||||||
|
void set_potential_frame_size(size_t size);
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace aare
|
26
network_io/include/aare/ZmqSocketReceiver.hpp
Normal file
26
network_io/include/aare/ZmqSocketReceiver.hpp
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include "ZmqHeader.hpp"
|
||||||
|
#include "ZmqSocket.hpp"
|
||||||
|
|
||||||
|
#include <array>
|
||||||
|
#include <cstdint>
|
||||||
|
#include <map>
|
||||||
|
#include <string>
|
||||||
|
|
||||||
|
// Socket to receive data from a ZMQ publisher
|
||||||
|
// needs to be in sync with the main library (or maybe better use the versioning in the header)
|
||||||
|
|
||||||
|
// forward declare zmq_msg_t to avoid including zmq.h in the header
|
||||||
|
class zmq_msg_t;
|
||||||
|
|
||||||
|
namespace aare {
|
||||||
|
|
||||||
|
class ZmqSocketReceiver : public ZmqSocket {
|
||||||
|
public:
|
||||||
|
ZmqSocketReceiver(const std::string &endpoint);
|
||||||
|
void connect();
|
||||||
|
size_t receive(ZmqHeader &header, std::byte *data, bool serialized_header = false);
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace aare
|
12
network_io/include/aare/ZmqSocketSender.hpp
Normal file
12
network_io/include/aare/ZmqSocketSender.hpp
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
#pragma once
|
||||||
|
#include "ZmqHeader.hpp"
|
||||||
|
#include "ZmqSocket.hpp"
|
||||||
|
|
||||||
|
namespace aare {
|
||||||
|
class ZmqSocketSender : public ZmqSocket {
|
||||||
|
public:
|
||||||
|
ZmqSocketSender(const std::string &endpoint);
|
||||||
|
void bind();
|
||||||
|
size_t send(ZmqHeader &header, const std::byte *data, size_t size, bool serialize_header = false);
|
||||||
|
};
|
||||||
|
} // namespace aare
|
200
network_io/src/ZmqHeader.cpp
Normal file
200
network_io/src/ZmqHeader.cpp
Normal file
@ -0,0 +1,200 @@
|
|||||||
|
|
||||||
|
#include "aare/ZmqHeader.hpp"
|
||||||
|
|
||||||
|
#include "simdjson.h"
|
||||||
|
|
||||||
|
using namespace simdjson;
|
||||||
|
|
||||||
|
// helper functions to write json
|
||||||
|
// append to string for better performance (not tested)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief write a digit to a string
|
||||||
|
* takes key and value and outputs->"key": value,
|
||||||
|
* @tparam T type of value (int, uint32_t, ...)
|
||||||
|
* @param s string to append to
|
||||||
|
* @param key key to write
|
||||||
|
* @param value value to write
|
||||||
|
* @return void
|
||||||
|
* @note
|
||||||
|
* - can't use concepts here because we are using c++17
|
||||||
|
*/
|
||||||
|
template <typename T> void write_digit(std::string &s, const std::string &key, const T &value) {
|
||||||
|
s += "\"";
|
||||||
|
s += key;
|
||||||
|
s += "\": ";
|
||||||
|
s += std::to_string(value);
|
||||||
|
s += ", ";
|
||||||
|
}
|
||||||
|
void write_str(std::string &s, const std::string &key, const std::string &value) {
|
||||||
|
s += "\"";
|
||||||
|
s += key;
|
||||||
|
s += "\": \"";
|
||||||
|
s += value;
|
||||||
|
s += "\", ";
|
||||||
|
}
|
||||||
|
void write_map(std::string &s, const std::string &key, const std::map<std::string, std::string> &value) {
|
||||||
|
s += "\"";
|
||||||
|
s += key;
|
||||||
|
s += "\": {";
|
||||||
|
for (auto &kv : value) {
|
||||||
|
write_str(s, kv.first, kv.second);
|
||||||
|
}
|
||||||
|
// remove last comma or trailing spaces
|
||||||
|
for (int i = s.size() - 1; i >= 0; i--) {
|
||||||
|
if (s[i] == ',' or s[i] == ' ') {
|
||||||
|
s.pop_back();
|
||||||
|
} else
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
s += "}, ";
|
||||||
|
}
|
||||||
|
void write_array(std::string &s, const std::string &key, const std::array<int, 4> &value) {
|
||||||
|
s += "\"";
|
||||||
|
s += key;
|
||||||
|
s += "\": [";
|
||||||
|
s += std::to_string(value[0]);
|
||||||
|
s += ", ";
|
||||||
|
s += std::to_string(value[1]);
|
||||||
|
s += ", ";
|
||||||
|
s += std::to_string(value[2]);
|
||||||
|
s += ", ";
|
||||||
|
s += std::to_string(value[3]);
|
||||||
|
s += "], ";
|
||||||
|
}
|
||||||
|
|
||||||
|
namespace aare {
|
||||||
|
|
||||||
|
std::string ZmqHeader::to_string() const {
|
||||||
|
std::string s = "";
|
||||||
|
s.reserve(1024);
|
||||||
|
s += "{";
|
||||||
|
write_digit(s, "data", data ? 1 : 0);
|
||||||
|
write_digit(s, "jsonversion", jsonversion);
|
||||||
|
write_digit(s, "dynamicRange", dynamicRange);
|
||||||
|
write_digit(s, "fileIndex", fileIndex);
|
||||||
|
write_digit(s, "ndetx", ndetx);
|
||||||
|
write_digit(s, "ndety", ndety);
|
||||||
|
write_digit(s, "npixelsx", npixelsx);
|
||||||
|
write_digit(s, "npixelsy", npixelsy);
|
||||||
|
write_digit(s, "imageSize", imageSize);
|
||||||
|
write_digit(s, "acqIndex", acqIndex);
|
||||||
|
write_digit(s, "frameIndex", frameIndex);
|
||||||
|
write_digit(s, "progress", progress);
|
||||||
|
write_str(s, "fname", fname);
|
||||||
|
write_digit(s, "frameNumber", frameNumber);
|
||||||
|
write_digit(s, "expLength", expLength);
|
||||||
|
write_digit(s, "packetNumber", packetNumber);
|
||||||
|
write_digit(s, "detSpec1", detSpec1);
|
||||||
|
write_digit(s, "timestamp", timestamp);
|
||||||
|
write_digit(s, "modId", modId);
|
||||||
|
write_digit(s, "row", row);
|
||||||
|
write_digit(s, "column", column);
|
||||||
|
write_digit(s, "detSpec2", detSpec2);
|
||||||
|
write_digit(s, "detSpec3", detSpec3);
|
||||||
|
write_digit(s, "detSpec4", detSpec4);
|
||||||
|
write_digit(s, "detType", detType);
|
||||||
|
write_digit(s, "version", version);
|
||||||
|
write_digit(s, "flipRows", flipRows);
|
||||||
|
write_digit(s, "quad", quad);
|
||||||
|
write_digit(s, "completeImage", completeImage ? 1 : 0);
|
||||||
|
write_map(s, "addJsonHeader", addJsonHeader);
|
||||||
|
write_array(s, "rx_roi", rx_roi);
|
||||||
|
// remove last comma
|
||||||
|
s.pop_back();
|
||||||
|
s.pop_back();
|
||||||
|
|
||||||
|
s += "}";
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
|
void ZmqHeader::from_string(std::string &s) {
|
||||||
|
|
||||||
|
simdjson::padded_string ps(s.c_str(), s.size());
|
||||||
|
ondemand::parser parser;
|
||||||
|
ondemand::document doc = parser.iterate(ps);
|
||||||
|
ondemand::object object = doc.get_object();
|
||||||
|
|
||||||
|
for (auto field : object) {
|
||||||
|
std::string_view key = field.unescaped_key();
|
||||||
|
|
||||||
|
if (key == "data") {
|
||||||
|
data = uint64_t(field.value()) ? true : false;
|
||||||
|
} else if (key == "jsonversion") {
|
||||||
|
jsonversion = uint32_t(field.value());
|
||||||
|
} else if (key == "dynamicRange") {
|
||||||
|
dynamicRange = uint32_t(field.value());
|
||||||
|
} else if (key == "fileIndex") {
|
||||||
|
fileIndex = uint64_t(field.value());
|
||||||
|
} else if (key == "ndetx") {
|
||||||
|
ndetx = uint32_t(field.value());
|
||||||
|
} else if (key == "ndety") {
|
||||||
|
ndety = uint32_t(field.value());
|
||||||
|
} else if (key == "npixelsx") {
|
||||||
|
npixelsx = uint32_t(field.value());
|
||||||
|
} else if (key == "npixelsy") {
|
||||||
|
npixelsy = uint32_t(field.value());
|
||||||
|
} else if (key == "imageSize") {
|
||||||
|
imageSize = uint32_t(field.value());
|
||||||
|
} else if (key == "acqIndex") {
|
||||||
|
acqIndex = uint64_t(field.value());
|
||||||
|
} else if (key == "frameIndex") {
|
||||||
|
frameIndex = uint64_t(field.value());
|
||||||
|
} else if (key == "progress") {
|
||||||
|
progress = field.value().get_double();
|
||||||
|
} else if (key == "fname") {
|
||||||
|
std::string_view tmp = field.value().get_string();
|
||||||
|
fname = {tmp.begin(), tmp.end()};
|
||||||
|
} else if (key == "frameNumber") {
|
||||||
|
frameNumber = uint64_t(field.value());
|
||||||
|
} else if (key == "expLength") {
|
||||||
|
expLength = uint32_t(field.value());
|
||||||
|
} else if (key == "packetNumber") {
|
||||||
|
packetNumber = uint32_t(field.value());
|
||||||
|
} else if (key == "detSpec1") {
|
||||||
|
detSpec1 = uint64_t(field.value());
|
||||||
|
} else if (key == "timestamp") {
|
||||||
|
timestamp = uint64_t(field.value());
|
||||||
|
} else if (key == "modId") {
|
||||||
|
modId = uint32_t(field.value());
|
||||||
|
} else if (key == "row") {
|
||||||
|
row = uint32_t(field.value());
|
||||||
|
} else if (key == "column") {
|
||||||
|
column = uint32_t(field.value());
|
||||||
|
} else if (key == "detSpec2") {
|
||||||
|
detSpec2 = uint32_t(field.value());
|
||||||
|
} else if (key == "detSpec3") {
|
||||||
|
detSpec3 = uint32_t(field.value());
|
||||||
|
} else if (key == "detSpec4") {
|
||||||
|
detSpec4 = uint32_t(field.value());
|
||||||
|
} else if (key == "detType") {
|
||||||
|
detType = uint32_t(field.value());
|
||||||
|
} else if (key == "version") {
|
||||||
|
version = uint32_t(field.value());
|
||||||
|
} else if (key == "flipRows") {
|
||||||
|
flipRows = uint32_t(field.value());
|
||||||
|
} else if (key == "quad") {
|
||||||
|
quad = uint32_t(field.value());
|
||||||
|
} else if (key == "completeImage") {
|
||||||
|
completeImage = uint64_t(field.value()) ? true : false;
|
||||||
|
} else if (key == "addJsonHeader") {
|
||||||
|
addJsonHeader = std::map<std::string, std::string>(field.value());
|
||||||
|
} else if (key == "rx_roi") {
|
||||||
|
rx_roi = std::array<int, 4>(field.value());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
bool ZmqHeader::operator==(const ZmqHeader &other) const {
|
||||||
|
return data == other.data && jsonversion == other.jsonversion && dynamicRange == other.dynamicRange &&
|
||||||
|
fileIndex == other.fileIndex && ndetx == other.ndetx && ndety == other.ndety && npixelsx == other.npixelsx &&
|
||||||
|
npixelsy == other.npixelsy && imageSize == other.imageSize && acqIndex == other.acqIndex &&
|
||||||
|
frameIndex == other.frameIndex && progress == other.progress && fname == other.fname &&
|
||||||
|
frameNumber == other.frameNumber && expLength == other.expLength && packetNumber == other.packetNumber &&
|
||||||
|
detSpec1 == other.detSpec1 && timestamp == other.timestamp && modId == other.modId && row == other.row &&
|
||||||
|
column == other.column && detSpec2 == other.detSpec2 && detSpec3 == other.detSpec3 &&
|
||||||
|
detSpec4 == other.detSpec4 && detType == other.detType && version == other.version &&
|
||||||
|
flipRows == other.flipRows && quad == other.quad && completeImage == other.completeImage &&
|
||||||
|
addJsonHeader == other.addJsonHeader && rx_roi == other.rx_roi;
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace aare
|
26
network_io/src/ZmqSocket.cpp
Normal file
26
network_io/src/ZmqSocket.cpp
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
#include "aare/ZmqSocket.hpp"
|
||||||
|
#include <fmt/core.h>
|
||||||
|
#include <zmq.h>
|
||||||
|
|
||||||
|
namespace aare {
|
||||||
|
|
||||||
|
void ZmqSocket::disconnect() {
|
||||||
|
zmq_close(m_socket);
|
||||||
|
zmq_ctx_destroy(m_context);
|
||||||
|
m_socket = nullptr;
|
||||||
|
m_context = nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
ZmqSocket::~ZmqSocket() {
|
||||||
|
if (m_socket)
|
||||||
|
disconnect();
|
||||||
|
delete[] m_header_buffer;
|
||||||
|
}
|
||||||
|
|
||||||
|
void ZmqSocket::set_zmq_hwm(int hwm) { m_zmq_hwm = hwm; }
|
||||||
|
|
||||||
|
void ZmqSocket::set_timeout_ms(int n) { m_timeout_ms = n; }
|
||||||
|
|
||||||
|
void ZmqSocket::set_potential_frame_size(size_t size) { m_potential_frame_size = size; }
|
||||||
|
|
||||||
|
} // namespace aare
|
@ -1,14 +1,17 @@
|
|||||||
#include "aare/ZmqSocket.hpp"
|
#include "aare/ZmqSocketReceiver.hpp"
|
||||||
|
#include "aare/utils/logger.hpp"
|
||||||
|
|
||||||
#include <fmt/core.h>
|
#include <fmt/core.h>
|
||||||
#include <zmq.h>
|
#include <zmq.h>
|
||||||
|
|
||||||
namespace aare {
|
namespace aare {
|
||||||
|
|
||||||
ZmqSocket::ZmqSocket(const std::string &endpoint) : m_endpoint(endpoint) {
|
ZmqSocketReceiver::ZmqSocketReceiver(const std::string &endpoint) {
|
||||||
|
m_endpoint = endpoint;
|
||||||
memset(m_header_buffer, 0, m_max_header_size);
|
memset(m_header_buffer, 0, m_max_header_size);
|
||||||
}
|
}
|
||||||
|
|
||||||
void ZmqSocket::connect() {
|
void ZmqSocketReceiver::connect() {
|
||||||
m_context = zmq_ctx_new();
|
m_context = zmq_ctx_new();
|
||||||
m_socket = zmq_socket(m_context, ZMQ_SUB);
|
m_socket = zmq_socket(m_context, ZMQ_SUB);
|
||||||
fmt::print("Setting ZMQ_RCVHWM to {}\n", m_zmq_hwm);
|
fmt::print("Setting ZMQ_RCVHWM to {}\n", m_zmq_hwm);
|
||||||
@ -16,7 +19,7 @@ void ZmqSocket::connect() {
|
|||||||
if (rc)
|
if (rc)
|
||||||
throw std::runtime_error(fmt::format("Could not set ZMQ_RCVHWM: {}", strerror(errno)));
|
throw std::runtime_error(fmt::format("Could not set ZMQ_RCVHWM: {}", strerror(errno)));
|
||||||
|
|
||||||
int bufsize = 1024 * 1024 * m_zmq_hwm;
|
int bufsize = m_potential_frame_size * m_zmq_hwm;
|
||||||
fmt::print("Setting ZMQ_RCVBUF to: {} MB\n", bufsize / (1024 * 1024));
|
fmt::print("Setting ZMQ_RCVBUF to: {} MB\n", bufsize / (1024 * 1024));
|
||||||
rc = zmq_setsockopt(m_socket, ZMQ_RCVBUF, &bufsize, sizeof(bufsize));
|
rc = zmq_setsockopt(m_socket, ZMQ_RCVBUF, &bufsize, sizeof(bufsize));
|
||||||
if (rc)
|
if (rc)
|
||||||
@ -26,37 +29,29 @@ void ZmqSocket::connect() {
|
|||||||
zmq_setsockopt(m_socket, ZMQ_SUBSCRIBE, "", 0);
|
zmq_setsockopt(m_socket, ZMQ_SUBSCRIBE, "", 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
void ZmqSocket::disconnect() {
|
size_t ZmqSocketReceiver::receive(ZmqHeader &header, std::byte *data, bool serialized_header) {
|
||||||
zmq_close(m_socket);
|
|
||||||
zmq_ctx_destroy(m_context);
|
|
||||||
m_socket = nullptr;
|
|
||||||
m_context = nullptr;
|
|
||||||
}
|
|
||||||
|
|
||||||
ZmqSocket::~ZmqSocket() {
|
size_t data_bytes_received{};
|
||||||
if (m_socket)
|
|
||||||
disconnect();
|
|
||||||
delete[] m_header_buffer;
|
|
||||||
}
|
|
||||||
|
|
||||||
void ZmqSocket::set_zmq_hwm(int hwm) { m_zmq_hwm = hwm; }
|
if (serialized_header)
|
||||||
|
throw std::runtime_error("Not implemented");
|
||||||
|
|
||||||
void ZmqSocket::set_timeout_ms(int n) { m_timeout_ms = n; }
|
size_t header_bytes_received = zmq_recv(m_socket, m_header_buffer, m_max_header_size, 0);
|
||||||
|
|
||||||
int ZmqSocket::receive(zmqHeader &header, std::byte *data) {
|
|
||||||
|
|
||||||
// receive header
|
// receive header
|
||||||
int header_bytes_received = zmq_recv(m_socket, m_header_buffer, m_max_header_size, 0);
|
|
||||||
m_header_buffer[header_bytes_received] = '\0'; // make sure we zero terminate
|
m_header_buffer[header_bytes_received] = '\0'; // make sure we zero terminate
|
||||||
if (header_bytes_received < 0) {
|
if (header_bytes_received < 0) {
|
||||||
fmt::print("Error receiving header: {}\n", strerror(errno));
|
fmt::print("Error receiving header: {}\n", strerror(errno));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
fmt::print("Bytes: {}, Header: {}\n", header_bytes_received, m_header_buffer);
|
aare::logger::debug("Bytes: ", header_bytes_received, ", Header: ", m_header_buffer);
|
||||||
|
|
||||||
// decode header
|
// parse header
|
||||||
if (!decode_header(header)) {
|
try {
|
||||||
fmt::print("Error decoding header\n");
|
std::string header_str(m_header_buffer);
|
||||||
|
header.from_string(header_str);
|
||||||
|
} catch (const simdjson::simdjson_error &e) {
|
||||||
|
aare::logger::error(LOCATION + "Error parsing header: ", e.what());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -67,16 +62,13 @@ int ZmqSocket::receive(zmqHeader &header, std::byte *data) {
|
|||||||
if (!more) {
|
if (!more) {
|
||||||
return 0; // no data following header
|
return 0; // no data following header
|
||||||
} else {
|
} else {
|
||||||
int data_bytes_received = zmq_recv(m_socket, data, 1024 * 1024 * 2, 0); // TODO! configurable size!!!!
|
|
||||||
|
data_bytes_received = zmq_recv(m_socket, data, header.imageSize, 0); // TODO! configurable size!!!!
|
||||||
if (data_bytes_received == -1)
|
if (data_bytes_received == -1)
|
||||||
throw std::runtime_error("Got half of a multipart msg!!!");
|
throw std::runtime_error("Got half of a multipart msg!!!");
|
||||||
|
aare::logger::debug("Bytes: ", data_bytes_received);
|
||||||
}
|
}
|
||||||
return 1;
|
return data_bytes_received + header_bytes_received;
|
||||||
}
|
|
||||||
|
|
||||||
bool ZmqSocket::decode_header(zmqHeader &h) {
|
|
||||||
// TODO: implement
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace aare
|
} // namespace aare
|
32
network_io/src/ZmqSocketSender.cpp
Normal file
32
network_io/src/ZmqSocketSender.cpp
Normal file
@ -0,0 +1,32 @@
|
|||||||
|
#include "aare/ZmqSocketSender.hpp"
|
||||||
|
|
||||||
|
#include <cassert>
|
||||||
|
#include <zmq.h>
|
||||||
|
|
||||||
|
namespace aare {
|
||||||
|
ZmqSocketSender::ZmqSocketSender(const std::string &endpoint) { m_endpoint = endpoint; }
|
||||||
|
|
||||||
|
void ZmqSocketSender::bind() {
|
||||||
|
m_context = zmq_ctx_new();
|
||||||
|
m_socket = zmq_socket(m_context, ZMQ_PUB);
|
||||||
|
size_t rc = zmq_bind(m_socket, m_endpoint.c_str());
|
||||||
|
assert(rc == 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t ZmqSocketSender::send(ZmqHeader &header, const std::byte *data, size_t size, bool serialize_header) {
|
||||||
|
size_t rc;
|
||||||
|
if (serialize_header) {
|
||||||
|
rc = zmq_send(m_socket, &header, sizeof(ZmqHeader), ZMQ_SNDMORE);
|
||||||
|
assert(rc == sizeof(ZmqHeader));
|
||||||
|
} else {
|
||||||
|
std::string header_str = header.to_string();
|
||||||
|
rc = zmq_send(m_socket, header_str.c_str(), header_str.size(), ZMQ_SNDMORE);
|
||||||
|
assert(rc == header_str.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t rc2 = zmq_send(m_socket, data, size, 0);
|
||||||
|
assert(rc2 == size);
|
||||||
|
return rc + rc2;
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace aare
|
80
network_io/test/ZmqHeader.test.cpp
Normal file
80
network_io/test/ZmqHeader.test.cpp
Normal file
@ -0,0 +1,80 @@
|
|||||||
|
#include "aare/ZmqHeader.hpp"
|
||||||
|
#include "aare/utils/logger.hpp"
|
||||||
|
#include <catch2/catch_test_macros.hpp>
|
||||||
|
|
||||||
|
using namespace aare;
|
||||||
|
TEST_CASE("Test ZmqHeader") {
|
||||||
|
ZmqHeader header;
|
||||||
|
header.npixelsx = 10;
|
||||||
|
header.npixelsy = 15;
|
||||||
|
header.data = 1;
|
||||||
|
header.jsonversion = 2;
|
||||||
|
header.dynamicRange = 32;
|
||||||
|
header.fileIndex = 4;
|
||||||
|
header.ndetx = 5;
|
||||||
|
header.ndety = 6;
|
||||||
|
header.imageSize = 4800;
|
||||||
|
header.acqIndex = 8;
|
||||||
|
header.frameIndex = 9;
|
||||||
|
header.progress = 0.1;
|
||||||
|
header.fname = "test";
|
||||||
|
header.frameNumber = 11;
|
||||||
|
header.expLength = 12;
|
||||||
|
header.packetNumber = 13;
|
||||||
|
header.detSpec1 = 14;
|
||||||
|
header.timestamp = 15;
|
||||||
|
header.modId = 16;
|
||||||
|
header.row = 17;
|
||||||
|
header.column = 18;
|
||||||
|
header.detSpec2 = 19;
|
||||||
|
header.detSpec3 = 20;
|
||||||
|
header.detSpec4 = 21;
|
||||||
|
header.detType = 22;
|
||||||
|
header.version = 23;
|
||||||
|
header.flipRows = 24;
|
||||||
|
header.quad = 25;
|
||||||
|
header.completeImage = 1;
|
||||||
|
header.addJsonHeader = {{"key1", "value1"}, {"key2", "value2"}};
|
||||||
|
header.rx_roi = {27, 28, 29, 30};
|
||||||
|
|
||||||
|
std::string json_header = "{"
|
||||||
|
"\"data\": 1, "
|
||||||
|
"\"jsonversion\": 2, "
|
||||||
|
"\"dynamicRange\": 32, "
|
||||||
|
"\"fileIndex\": 4, "
|
||||||
|
"\"ndetx\": 5, "
|
||||||
|
"\"ndety\": 6, "
|
||||||
|
"\"npixelsx\": 10, "
|
||||||
|
"\"npixelsy\": 15, "
|
||||||
|
"\"imageSize\": 4800, "
|
||||||
|
"\"acqIndex\": 8, "
|
||||||
|
"\"frameIndex\": 9, "
|
||||||
|
"\"progress\": 0.100000, "
|
||||||
|
"\"fname\": \"test\", "
|
||||||
|
"\"frameNumber\": 11, "
|
||||||
|
"\"expLength\": 12, "
|
||||||
|
"\"packetNumber\": 13, "
|
||||||
|
"\"detSpec1\": 14, "
|
||||||
|
"\"timestamp\": 15, "
|
||||||
|
"\"modId\": 16, "
|
||||||
|
"\"row\": 17, "
|
||||||
|
"\"column\": 18, "
|
||||||
|
"\"detSpec2\": 19, "
|
||||||
|
"\"detSpec3\": 20, "
|
||||||
|
"\"detSpec4\": 21, "
|
||||||
|
"\"detType\": 22, "
|
||||||
|
"\"version\": 23, "
|
||||||
|
"\"flipRows\": 24, "
|
||||||
|
"\"quad\": 25, "
|
||||||
|
"\"completeImage\": 1, "
|
||||||
|
"\"addJsonHeader\": {\"key1\": \"value1\", \"key2\": \"value2\"}, "
|
||||||
|
"\"rx_roi\": [27, 28, 29, 30]"
|
||||||
|
"}";
|
||||||
|
|
||||||
|
SECTION("Test converting ZmqHeader to json string") { REQUIRE(header.to_string() == json_header); }
|
||||||
|
SECTION("Test converting json string to ZmqHeader") {
|
||||||
|
ZmqHeader header2;
|
||||||
|
header2.from_string(json_header);
|
||||||
|
REQUIRE(header2 == header);
|
||||||
|
}
|
||||||
|
}
|
@ -2,6 +2,7 @@
|
|||||||
#include <filesystem>
|
#include <filesystem>
|
||||||
#include <fstream>
|
#include <fstream>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
|
#include <map>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
#define LOCATION std::string(__FILE__) + std::string(":") + std::to_string(__LINE__) + ":" + std::string(__func__) + ":"
|
#define LOCATION std::string(__FILE__) + std::string(":") + std::to_string(__LINE__) + ":" + std::string(__func__) + ":"
|
||||||
@ -20,6 +21,30 @@ template <typename T> std::ostream &operator<<(std::ostream &out, const std::vec
|
|||||||
return out;
|
return out;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// operator overload for std::array
|
||||||
|
template <typename T, size_t N> std::ostream &operator<<(std::ostream &out, const std::array<T, N> &v) {
|
||||||
|
out << "[";
|
||||||
|
size_t last = N - 1;
|
||||||
|
for (size_t i = 0; i < N; ++i) {
|
||||||
|
out << v[i];
|
||||||
|
if (i != last)
|
||||||
|
out << ", ";
|
||||||
|
}
|
||||||
|
out << "]";
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
// operator overlaod for std::map
|
||||||
|
template <typename K, typename V> std::ostream &operator<<(std::ostream &out, const std::map<K, V> &v) {
|
||||||
|
out << "{";
|
||||||
|
size_t i = 0;
|
||||||
|
for (auto &kv : v) {
|
||||||
|
out << kv.first << ": " << kv.second << ((++i != v.size()) ? ", " : "");
|
||||||
|
}
|
||||||
|
|
||||||
|
out << "}";
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
|
||||||
namespace aare {
|
namespace aare {
|
||||||
|
|
||||||
namespace logger {
|
namespace logger {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user