receive multimodule (#65)

* add config files for multimodule receiving

* read subfiles with unordered and missing frames

* save work debugging

* Revert "save work debugging"

This reverts commit e791992a05efd754f93a80c980d17397eb4b6045.

* Revert "read subfiles with unordered and missing frames"

This reverts commit 1177fd129d3690db92e9597ccda62598e5a44d41.

* throw when two frames have different frame numbers

* write single part RawFile (working beta)

* correct total number of frames in master file

* add new mythen file with syncd frames

* save work

* save work for receive multimodule
multimodule config results in too much packet loss. needs more debugging.

* setup Task Distributiosn/ parallelization programming model

* read frames with same frame number

* clang-tidy fixes, formatting, add tests

* added second receiver

* Synchronize between zmq streams and merge frames

* improve readability in loop

* fix failing tests

* add simple test for merge frames

---------

Co-authored-by: Bechir <bechir.brahem420@gmail.com>
Co-authored-by: Erik Frojdh <erik.frojdh@gmail.com>
This commit is contained in:
Bechir Braham 2024-05-07 11:06:37 +02:00 committed by GitHub
parent 70acfbf4ac
commit 9637d0602f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
37 changed files with 775 additions and 128 deletions

View File

@ -45,13 +45,14 @@ struct sls_detector_header {
std::array<uint8_t, 64> packetMask;
};
struct xy {
size_t row;
size_t col;
bool operator==(const xy &other) const { return row == other.row && col == other.col; }
bool operator!=(const xy &other) const { return !(*this == other); }
template <typename T> struct t_xy {
T row;
T col;
bool operator==(const t_xy &other) const { return row == other.row && col == other.col; }
bool operator!=(const t_xy &other) const { return !(*this == other); }
std::string to_string() const { return "{ x: " + std::to_string(row) + " y: " + std::to_string(col) + " }"; }
};
typedef t_xy<uint32_t> xy;
using dynamic_shape = std::vector<ssize_t>;

View File

@ -0,0 +1,8 @@
# receive data from two zmqstreams
```bash
killall jungfrauDetectorServer_virtual
jungfrauDetectorServer_virtual
jungfrauDetectorServer_virtual -p 1956
slsMultiReceiver 1980 2 0
sls_detector_put config etc/multimodule_virtual_jf.config
```

View File

@ -1,14 +1,22 @@
hostname localhost
rx_hostname localhost
hostname 127.0.0.1:1952+127.0.0.1:1956+
rx_hostname 127.0.0.1:1980+127.0.0.1:1981+
udp_dstip auto
0:udp_dstport 50001
0:udp_dstport2 50002
1:udp_dstport 50003
1:udp_dstport2 50004
udp_dstip 127.0.0.1
udp_dstip2 127.0.0.1
powerchip 1
frames 1
frames 10
exptime 5us
period 1ms
period 100ms
rx_discardpolicy discardpartial
rx_zmqip 127.0.0.1
rx_zmqport 5555
rx_zmqstream 1

View File

@ -3,9 +3,9 @@ rx_hostname localhost
udp_dstip auto
powerchip 1
frames 1
frames 10
exptime 5us
period 1ms
period 100ms
rx_zmqip 127.0.0.1
rx_zmqport 5555

View File

@ -1,11 +1,11 @@
set(EXAMPLE_LIST "json_example;logger_example;numpy_read_example;multiport_example;raw_example;zmq_restream_example")
set(EXAMPLE_LIST "${EXAMPLE_LIST};mythen_example;numpy_write_example;zmq_receiver_example;zmq_sender_example;")
set(EXAMPLE_LIST "${EXAMPLE_LIST};cluster_example")
set(EXAMPLE_LIST "${EXAMPLE_LIST};cluster_example;zmq_multi_receiver;zmq_task_ventilator;zmq_worker;zmq_sink")
foreach(example ${EXAMPLE_LIST})
add_executable(${example} ${example}.cpp)
target_include_directories(${example} PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include)
target_link_libraries(${example} PUBLIC aare PRIVATE aare_compiler_flags)
target_link_libraries(${example} PUBLIC aare PRIVATE aare_compiler_flags libzmq)
endforeach()

View File

@ -0,0 +1,29 @@
#include "aare/examples/defs.hpp"
#include "aare/network_io/ZmqMultiReceiver.hpp"
#include "aare/network_io/ZmqSocketReceiver.hpp"
#include "aare/network_io/defs.hpp"
#include <boost/program_options.hpp>
#include <cassert>
#include <fmt/core.h>
#include <string>
using namespace aare;
namespace po = boost::program_options;
using namespace std;
int main() {
logger::set_verbosity(logger::DEBUG);
std::string const endpoint1 = "tcp://127.0.0.1:" + std::to_string(5555);
std::string const endpoint2 = "tcp://127.0.0.1:" + std::to_string(5556);
ZmqMultiReceiver socket({endpoint1, endpoint2}, {1, 2});
socket.connect();
while (true) {
auto frames = socket.receive_n();
logger::info("Received", frames.size(), "frames");
}
return 0;
}

View File

@ -36,7 +36,7 @@ int main(int argc, char **argv) {
auto port = vm["port"].as<uint16_t>();
std::string const endpoint = "udp://127.0.0.1:" + std::to_string(port);
std::string const endpoint = "tcp://127.0.0.1:" + std::to_string(port);
aare::ZmqSocketReceiver socket(endpoint);
socket.connect();
while (true) {

View File

@ -44,7 +44,7 @@ int main(int argc, char **argv) {
return 1;
}
if (vm.count("port") != 1) {
aare::logger::error("file is required");
aare::logger::error("port is required");
cout << desc << "\n";
return 1;
}
@ -77,9 +77,9 @@ int main(int argc, char **argv) {
ZmqHeader header;
header.frameNumber = frameidx;
header.data = true;
header.npixelsx = frame.rows();
header.npixelsy = frame.cols();
header.dynamicRange = frame.bitdepth();
header.shape.row = frame.rows();
header.shape.col = frame.cols();
header.bitmode = frame.bitdepth();
header.size = frame.size();
sender.send({header, frame});

View File

@ -23,10 +23,9 @@ int main() {
}
}
aare::ZmqHeader header;
header.npixelsx = 1024;
header.npixelsy = 1024;
header.shape = {1024, 1024};
header.size = sizeof(uint32_t) * 1024 * 1024;
header.dynamicRange = 32;
header.bitmode = 32;
std::vector<ZmqFrame> zmq_frames;
// send two exact frames

30
examples/zmq_sink.cpp Normal file
View File

@ -0,0 +1,30 @@
#include "aare/network_io/ZmqSink.hpp"
#include "aare/network_io/ZmqSocketReceiver.hpp"
#include "aare/network_io/ZmqVentilator.hpp"
#include "aare/network_io/ZmqWorker.hpp"
#include "zmq.h"
#include <boost/program_options.hpp>
#include <cassert>
#include <fmt/core.h>
#include <string>
using namespace aare;
namespace po = boost::program_options;
using namespace std;
int main() {
logger::set_verbosity(logger::DEBUG);
// 1. bind sink to endpoint
ZmqSink sink("tcp://*:4322");
int i = 0;
while (true) {
// 2. receive Task from ventilator
Task *task = sink.pull();
logger::info("Received", i++, "tasks");
Task::destroy(task);
}
// read the command line arguments
}

View File

@ -0,0 +1,72 @@
#include "aare/network_io/ZmqSocketReceiver.hpp"
#include "aare/network_io/ZmqVentilator.hpp"
#include "zmq.h"
#include <boost/program_options.hpp>
#include <cassert>
#include <fmt/core.h>
#include <string>
using namespace aare;
namespace po = boost::program_options;
using namespace std;
string setup(int argc, char **argv) {
logger::set_verbosity(logger::DEBUG);
po::options_description desc("options");
desc.add_options()("help", "produce help message")("port,p", po::value<uint16_t>()->default_value(5555),
"port number");
po::positional_options_description pd;
pd.add("port", 1);
po::variables_map vm;
try {
auto parsed = po::command_line_parser(argc, argv).options(desc).positional(pd).run();
po::store(parsed, vm);
po::notify(vm);
} catch (const boost::program_options::error &e) {
cout << e.what() << "\n";
cout << desc << "\n";
exit(1);
}
if (vm.count("help")) {
cout << desc << "\n";
exit(1);
}
auto port = vm["port"].as<uint16_t>();
return "tcp://127.0.0.1:" + to_string(port);
}
int process(const std::string &endpoint) {
// 0. connect to slsReceiver
ZmqSocketReceiver receiver(endpoint, ZMQ_SUB);
receiver.connect();
// 1. create ventilator
ZmqVentilator ventilator("tcp://*:4321");
while (true) {
// 2. receive frame from slsReceiver
ZmqFrame zframe = receiver.receive_zmqframe();
if (zframe.header.data == 0)
continue;
logger::info("Received frame, frame_number=", zframe.header.frameNumber);
logger::info(zframe.header.to_string());
// 3. create task
Task *task = Task::init(zframe.frame.data(), zframe.frame.size());
task->opcode = (size_t)Task::Operation::PEDESTAL;
task->id = zframe.header.frameNumber;
// 4. push task to ventilator
ventilator.push(task);
Task::destroy(task);
}
}
int main(int argc, char **argv) {
// read the command line arguments
string endpoint = setup(argc, argv);
int ret = process(endpoint);
return ret;
}

35
examples/zmq_worker.cpp Normal file
View File

@ -0,0 +1,35 @@
#include "aare/network_io/ZmqSink.hpp"
#include "aare/network_io/ZmqSocketReceiver.hpp"
#include "aare/network_io/ZmqVentilator.hpp"
#include "aare/network_io/ZmqWorker.hpp"
#include "zmq.h"
#include <boost/program_options.hpp>
#include <cassert>
#include <fmt/core.h>
#include <string>
using namespace aare;
namespace po = boost::program_options;
using namespace std;
int main() {
logger::set_verbosity(logger::DEBUG);
// 1. connect to ventilator and sink
ZmqWorker worker("tcp://127.0.0.1:4321", "tcp://127.0.0.1:4322");
while (true) {
// 2. receive Task from ventilator
Task *ventilator_task = worker.pull();
logger::info("Received Task, id=", ventilator_task->id, " data_size=", ventilator_task->data_size);
Task *sink_task = Task::init(nullptr, 0);
sink_task->id = ventilator_task->id;
sink_task->opcode = (size_t)Task::Operation::COUNT;
worker.push(sink_task);
Task::destroy(sink_task);
Task::destroy(ventilator_task);
}
// read the command line arguments
}

View File

@ -77,7 +77,7 @@ void RawFile::write_master_file() {
aare::write_str(ss, "Geometry", geometry.to_string());
ss += "\n\t";
uint64_t img_size = (m_cols * m_rows) / (geometry.col * geometry.row);
uint64_t img_size = (m_cols * m_rows) / (static_cast<size_t>(geometry.col * geometry.row));
img_size *= m_bitdepth;
aare::write_digit(ss, "Image Size in bytes", img_size);
ss += "\n\t";
@ -85,7 +85,8 @@ void RawFile::write_master_file() {
ss += "\n\t";
aare::write_digit(ss, "Dynamic Range", m_bitdepth);
ss += "\n\t";
const aare::xy pixels = {m_rows / geometry.row, m_cols / geometry.col};
const aare::xy pixels = {static_cast<uint32_t>(m_rows / geometry.row),
static_cast<uint32_t>(m_cols / geometry.col)};
aare::write_str(ss, "Pixels", pixels.to_string());
ss += "\n\t";
aare::write_digit(ss, "Number of rows", m_rows);
@ -266,10 +267,8 @@ void RawFile::parse_raw_metadata() {
max_frames_per_file = std::stoi(value);
} else if (key == "Geometry") {
pos = value.find(',');
const size_t x = static_cast<size_t>(std::stoi(value.substr(1, pos)));
const size_t y = static_cast<size_t>(std::stoi(value.substr(pos + 1)));
geometry = {x, y};
geometry = {static_cast<uint32_t>(std::stoi(value.substr(1, pos))),
static_cast<uint32_t>(std::stoi(value.substr(pos + 1)))};
}
}
}

View File

@ -1,5 +1,8 @@
#include "aare/network_io/ZmqHeader.hpp"
#include "aare/network_io/ZmqSink.hpp"
#include "aare/network_io/ZmqSocket.hpp"
#include "aare/network_io/ZmqSocketReceiver.hpp"
#include "aare/network_io/ZmqSocketSender.hpp"
#include "aare/network_io/ZmqVentilator.hpp"
#include "aare/network_io/ZmqWorker.hpp"
#include "aare/network_io/defs.hpp"

View File

@ -17,6 +17,10 @@ add_library(network_io STATIC
src/ZmqSocketSender.cpp
src/ZmqSocket.cpp
src/ZmqHeader.cpp
src/ZmqMultiReceiver.cpp
src/ZmqVentilator.cpp
src/ZmqWorker.cpp
src/ZmqSink.cpp
)

View File

@ -8,29 +8,28 @@
#include <map>
#include <string>
namespace simdjson {
/**
* @brief cast a simdjson::ondemand::value to a std::array<int,4>
* useful for writing rx_roi from json header
*/
template <> simdjson_inline simdjson::simdjson_result<std::array<int, 4>> simdjson::ondemand::value::get() noexcept {
ondemand::array array;
auto error = get_array().get(array);
if (error) {
return error;
}
std::array<int, 4> arr{};
int i = 0;
for (auto v : array) {
int64_t val = 0;
error = v.get_int64().get(val);
// template <typename T, int N> std::array
if (error) {
return error;
}
arr[i++] = static_cast<int>(val);
}
return arr;
}
// template <int N> simdjson_inline simdjson::simdjson_result<std::array<int, N>> simdjson::ondemand::value::get()
// noexcept {
// ondemand::array array;
// auto error = get_array().get(array);
// if (error) {
// return error;
// }
// std::array<int, N> arr{};
// int i = 0;
// for (auto v : array) {
// int64_t val = 0;
// error = v.get_int64().get(val);
// if (error) {
// return error;
// }
// arr[i++] = static_cast<int>(val);
// }
// return arr;
// }
/**
* @brief cast a simdjson::ondemand::value to a uint32_t
@ -81,22 +80,17 @@ simdjson::ondemand::value::get() noexcept {
} // namespace simdjson
namespace aare {
/** zmq header structure (from slsDetectorPackage)*/
struct ZmqHeader {
/** true if incoming data, false if end of acquisition */
bool data{true};
uint32_t jsonversion{0};
uint32_t dynamicRange{0};
uint32_t bitmode{0};
uint64_t fileIndex{0};
/** number of detectors/port in x axis */
uint32_t ndetx{0};
/** number of detectors/port in y axis */
uint32_t ndety{0};
/** number of pixels/channels in x axis for this zmq socket */
uint32_t npixelsx{0};
/** number of pixels/channels in y axis for this zmq socket */
uint32_t npixelsy{0};
/** number of detectors/port*/
t_xy<uint32_t> detshape{0, 0};
/** number of pixels/channels for this zmq socket */
t_xy<uint32_t> shape{0, 0};
/** number of bytes for an image in this socket */
uint32_t size{0};
/** frame number from detector */
@ -138,5 +132,25 @@ struct ZmqHeader {
// compare operator
bool operator==(const ZmqHeader &other) const;
};
/**
* @brief cast a simdjson::ondemand::value to a std::array<int,4>
* useful for writing rx_roi from json header
*/
template <typename T, int N, typename SIMDJSON_VALUE> std::array<T, N> simd_convert_array(SIMDJSON_VALUE field) {
simdjson::ondemand::array simd_array;
auto err = field.value().get_array().get(simd_array);
if (err)
throw std::runtime_error("error converting simdjson::ondemand::value to simdjson::ondemend::array");
std::array<T, N> arr{};
int i = 0;
for (auto v : simd_array) {
int64_t tmp;
err = v.get(tmp);
if (err)
throw std::runtime_error("error converting simdjson::ondemand::value");
arr[i++] = tmp;
}
return arr;
}
} // namespace aare

View File

@ -0,0 +1,27 @@
#pragma once
#include "aare/network_io/ZmqSocketReceiver.hpp"
#include "aare/network_io/defs.hpp"
#include <unordered_map>
#include <string>
#include <vector>
using zmq_pollitem_t = struct zmq_pollitem_t;
namespace aare {
class ZmqMultiReceiver {
public:
explicit ZmqMultiReceiver(const std::vector<std::string> &endpoints, const xy &geometry = {1, 1});
int connect();
ZmqFrame receive_zmqframe();
std::vector<ZmqFrame> receive_n();
~ZmqMultiReceiver();
private:
ZmqFrame receive_zmqframe_(std::unordered_map<uint64_t, std::vector<ZmqFrame>> &frames_map);
xy m_geometry;
std::vector<std::string> m_endpoints;
std::vector<ZmqSocketReceiver *> m_receivers;
zmq_pollitem_t *items{};
};
} // namespace aare

View File

@ -0,0 +1,17 @@
#pragma once
#include "aare/network_io/ZmqSocketReceiver.hpp"
namespace aare {
class ZmqSink {
public:
explicit ZmqSink(const std::string &sink_endpoint);
Task *pull();
~ZmqSink();
private:
ZmqSocketReceiver *m_receiver;
};
} // namespace aare

View File

@ -25,10 +25,12 @@ class ZmqSocket {
void set_zmq_hwm(int hwm);
void set_timeout_ms(int n);
void set_potential_frame_size(size_t size);
void *get_socket();
protected:
void *m_context{nullptr};
void *m_socket{nullptr};
int m_socket_type{};
std::string m_endpoint;
int m_zmq_hwm{1000};
int m_timeout_ms{1000};

View File

@ -19,14 +19,14 @@ namespace aare {
*/
class ZmqSocketReceiver : public ZmqSocket {
public:
explicit ZmqSocketReceiver(const std::string &endpoint);
explicit ZmqSocketReceiver(const std::string &endpoint, int socket_type = 2 /* ZMQ_SUB */);
void connect();
void bind();
std::vector<ZmqFrame> receive_n();
private:
int receive_data(std::byte *data, size_t size);
ZmqFrame receive_zmqframe();
ZmqHeader receive_header();
int receive_data(std::byte *data, size_t size);
};
} // namespace aare

View File

@ -12,9 +12,11 @@ namespace aare {
*/
class ZmqSocketSender : public ZmqSocket {
public:
explicit ZmqSocketSender(const std::string &endpoint);
explicit ZmqSocketSender(const std::string &endpoint, int socket_type = 1 /* ZMQ_PUB */);
void connect();
void bind();
size_t send(const ZmqHeader &header, const std::byte *data, size_t size);
size_t send(const void *data, size_t size);
size_t send(const ZmqHeader &header, const void *data, size_t size);
size_t send(const ZmqFrame &zmq_frame);
size_t send(const std::vector<ZmqFrame> &zmq_frames);
};

View File

@ -0,0 +1,20 @@
#pragma once
#include "aare/core/Frame.hpp"
#include "aare/network_io/ZmqHeader.hpp"
#include "aare/network_io/ZmqSocket.hpp"
#include "aare/network_io/ZmqSocketSender.hpp"
#include "aare/network_io/defs.hpp"
namespace aare {
class ZmqVentilator {
public:
explicit ZmqVentilator(const std::string &endpoint);
size_t push(const Task *task);
~ZmqVentilator();
private:
ZmqSocketSender *m_sender;
};
} // namespace aare

View File

@ -0,0 +1,18 @@
#include "aare/network_io/ZmqSocketReceiver.hpp"
#include "aare/network_io/ZmqSocketSender.hpp"
namespace aare {
class ZmqWorker {
public:
explicit ZmqWorker(const std::string &ventilator_endpoint, const std::string &sink_endpoint = "");
Task *pull();
size_t push(const Task *task);
~ZmqWorker();
private:
ZmqSocketReceiver *m_receiver;
ZmqSocketSender *m_sender;
};
} // namespace aare

View File

@ -14,8 +14,52 @@ namespace aare {
struct ZmqFrame {
ZmqHeader header;
Frame frame;
std::string to_string() const {
return "ZmqFrame{header: " + header.to_string() + ", frame:\nrows: " + std::to_string(frame.rows()) +
", cols: " + std::to_string(frame.cols()) + ", bitdepth: " + std::to_string(frame.bitdepth()) + "\n}";
}
size_t size() const { return frame.size() + header.size; }
};
struct Task {
size_t id{};
int opcode{}; // operation to perform on the data (what type should this be? char*? enum?)
size_t data_size{};
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wpedantic"
std::byte payload[];
#pragma GCC diagnostic pop
static const size_t MAX_DATA_SIZE = 1024 * 1024; // 1MB
size_t size() const { return sizeof(Task) + data_size; }
static Task *init(std::byte *data, size_t data_size) {
Task *task = (Task *)new std::byte[sizeof(Task) + data_size];
task->data_size = data_size;
if (data_size > 0)
memcpy(task->payload, data, data_size);
return task;
}
static int destroy(Task *task) {
delete[] task;
return 0;
}
Task() = delete;
Task(Task &) = delete;
Task(Task &&) = default;
// common operations to perform
// users can still send custom operations
enum class Operation {
PEDESTAL,
PEDESTAL_AND_SAVE,
PEDESTAL_AND_CLUSTER,
PEDESTAL_AND_CLUSTER_AND_SAVE,
COUNT,
};
} __attribute__((packed));
namespace network_io {
/**
* @brief NetworkError exception class

View File

@ -12,12 +12,10 @@ std::string ZmqHeader::to_string() const {
s += "{";
write_digit(s, "data", data ? 1 : 0);
write_digit(s, "jsonversion", jsonversion);
write_digit(s, "dynamicRange", dynamicRange);
write_digit(s, "bitmode", bitmode);
write_digit(s, "fileIndex", fileIndex);
write_digit(s, "ndetx", ndetx);
write_digit(s, "ndety", ndety);
write_digit(s, "npixelsx", npixelsx);
write_digit(s, "npixelsy", npixelsy);
write_array<uint32_t, 2>(s, "detshape", std::array<uint32_t, 2>{detshape.row, detshape.col});
write_array<uint32_t, 2>(s, "shape", std::array<uint32_t, 2>{shape.row, shape.col});
write_digit(s, "size", size);
write_digit(s, "acqIndex", acqIndex);
write_digit(s, "frameIndex", frameIndex);
@ -40,7 +38,7 @@ std::string ZmqHeader::to_string() const {
write_digit(s, "quad", quad);
write_digit(s, "completeImage", completeImage ? 1 : 0);
write_map(s, "addJsonHeader", addJsonHeader);
write_array(s, "rx_roi", rx_roi);
write_array<int, 4>(s, "rx_roi", rx_roi);
// remove last comma
s.pop_back();
s.pop_back();
@ -63,18 +61,18 @@ void ZmqHeader::from_string(std::string &s) { // NOLINT
data = static_cast<uint64_t>(field.value()) != 0;
} else if (key == "jsonversion") {
jsonversion = static_cast<uint32_t>(field.value());
} else if (key == "dynamicRange") {
dynamicRange = static_cast<uint32_t>(field.value());
} else if (key == "bitmode") {
bitmode = static_cast<uint32_t>(field.value());
} else if (key == "fileIndex") {
fileIndex = static_cast<uint64_t>(field.value());
} else if (key == "ndetx") {
ndetx = static_cast<uint32_t>(field.value());
} else if (key == "ndety") {
ndety = static_cast<uint32_t>(field.value());
} else if (key == "npixelsx") {
npixelsx = static_cast<uint32_t>(field.value());
} else if (key == "npixelsy") {
npixelsy = static_cast<uint32_t>(field.value());
} else if (key == "detshape") {
std::array<uint32_t, 2> arr = simd_convert_array<uint32_t, 2>(field);
detshape.row = arr[0];
detshape.col = arr[1];
} else if (key == "shape") {
std::array<uint32_t, 2> arr = simd_convert_array<uint32_t, 2>(field);
shape.row = arr[0];
shape.col = arr[1];
} else if (key == "size") {
size = static_cast<uint32_t>(field.value());
} else if (key == "acqIndex") {
@ -121,21 +119,20 @@ void ZmqHeader::from_string(std::string &s) { // NOLINT
} else if (key == "addJsonHeader") {
addJsonHeader = static_cast<std::map<std::string, std::string>>(field.value());
} else if (key == "rx_roi") {
rx_roi = static_cast<std::array<int, 4>>(field.value());
rx_roi = simd_convert_array<int, 4>(field);
}
}
}
bool ZmqHeader::operator==(const ZmqHeader &other) const {
return data == other.data && jsonversion == other.jsonversion && dynamicRange == other.dynamicRange &&
fileIndex == other.fileIndex && ndetx == other.ndetx && ndety == other.ndety && npixelsx == other.npixelsx &&
npixelsy == other.npixelsy && size == other.size && acqIndex == other.acqIndex &&
frameIndex == other.frameIndex && progress == other.progress && fname == other.fname &&
frameNumber == other.frameNumber && expLength == other.expLength && packetNumber == other.packetNumber &&
detSpec1 == other.detSpec1 && timestamp == other.timestamp && modId == other.modId && row == other.row &&
column == other.column && detSpec2 == other.detSpec2 && detSpec3 == other.detSpec3 &&
detSpec4 == other.detSpec4 && detType == other.detType && version == other.version &&
flipRows == other.flipRows && quad == other.quad && completeImage == other.completeImage &&
addJsonHeader == other.addJsonHeader && rx_roi == other.rx_roi;
return data == other.data && jsonversion == other.jsonversion && bitmode == other.bitmode &&
fileIndex == other.fileIndex && detshape == other.detshape && shape == other.shape && size == other.size &&
acqIndex == other.acqIndex && frameIndex == other.frameIndex && progress == other.progress &&
fname == other.fname && frameNumber == other.frameNumber && expLength == other.expLength &&
packetNumber == other.packetNumber && detSpec1 == other.detSpec1 && timestamp == other.timestamp &&
modId == other.modId && row == other.row && column == other.column && detSpec2 == other.detSpec2 &&
detSpec3 == other.detSpec3 && detSpec4 == other.detSpec4 && detType == other.detType &&
version == other.version && flipRows == other.flipRows && quad == other.quad &&
completeImage == other.completeImage && addJsonHeader == other.addJsonHeader && rx_roi == other.rx_roi;
}
} // namespace aare

View File

@ -0,0 +1,101 @@
#include "aare/network_io/ZmqMultiReceiver.hpp"
#include "aare/utils/merge_frames.hpp"
#include <unordered_map>
#include <zmq.h>
namespace aare {
ZmqMultiReceiver::ZmqMultiReceiver(const std::vector<std::string> &endpoints, const xy &geometry)
: m_geometry(geometry), m_endpoints(endpoints) {
assert(m_geometry.row * m_geometry.col == static_cast<uint32_t>(m_endpoints.size()));
for (const auto &endpoint : m_endpoints) {
m_receivers.push_back(new ZmqSocketReceiver(endpoint));
}
}
int ZmqMultiReceiver::connect() {
for (auto *receiver : m_receivers) {
receiver->connect();
}
items = new zmq_pollitem_t[m_receivers.size()];
for (size_t i = 0; i < m_receivers.size(); i++) {
items[i] = {m_receivers[i]->get_socket(), 0, ZMQ_POLLIN, 0};
}
return 0;
}
ZmqFrame ZmqMultiReceiver::receive_zmqframe() {
std::unordered_map<uint64_t, std::vector<ZmqFrame>> frames_map;
return receive_zmqframe_(frames_map);
}
std::vector<ZmqFrame> ZmqMultiReceiver::receive_n() {
std::vector<ZmqFrame> frames;
std::unordered_map<uint64_t, std::vector<ZmqFrame>> frames_map;
while (true) {
// receive header and frame
ZmqFrame const zmq_frame = receive_zmqframe_(frames_map);
if (!zmq_frame.header.data) {
break;
}
frames.push_back(zmq_frame);
frames_map.erase(zmq_frame.header.frameNumber);
}
return frames;
}
ZmqFrame ZmqMultiReceiver::receive_zmqframe_(std::unordered_map<uint64_t, std::vector<ZmqFrame>> &frames_map) {
// iterator to store the frame to return
std::unordered_map<uint64_t, std::vector<ZmqFrame>>::iterator ret_frames;
bool exit_loop = false;
while (true) {
zmq_poll(items, static_cast<int>(m_receivers.size()), -1);
aare::logger::debug("Received frame");
for (size_t i = 0; i < m_receivers.size() && !exit_loop; i++) {
if (items[i].revents & ZMQ_POLLIN) {
auto new_frame = m_receivers[i]->receive_zmqframe();
if (frames_map.find(new_frame.header.frameNumber) == frames_map.end()) {
frames_map[new_frame.header.frameNumber] = {};
}
ret_frames = frames_map.find(new_frame.header.frameNumber);
ret_frames->second.push_back(new_frame);
exit_loop = ret_frames->second.size() == m_receivers.size();
}
}
if (exit_loop) {
break;
}
}
std::vector<ZmqFrame> &frames = ret_frames->second;
if (!frames[0].header.data) {
return ZmqFrame{frames[0].header, Frame(0, 0, 0)};
}
// check that all frames have the same shape
auto shape = frames[0].header.shape;
auto bitdepth = frames[0].header.bitmode;
auto part_size = shape.row * shape.col * (bitdepth / 8);
for (auto &frame : frames) {
assert(shape == frame.header.shape);
assert(bitdepth == frame.header.bitmode);
// TODO: find solution for numinterfaces=2
assert(m_geometry == frame.header.detshape);
assert(part_size == frame.header.size);
}
// merge frames
// prepare the input for merge_frames
std::vector<std::byte *> part_buffers;
part_buffers.reserve(frames.size());
for (auto &zmq_frame : frames) {
part_buffers.push_back(zmq_frame.frame.data());
}
Frame const f(shape.row, shape.col, bitdepth);
merge_frames(part_buffers, part_size, f.data(), m_geometry, shape.row, shape.col, bitdepth);
ZmqFrame zmq_frame = {std::move(frames[0].header), f};
return zmq_frame;
}
ZmqMultiReceiver::~ZmqMultiReceiver() {
delete[] items;
for (auto *receiver : m_receivers) {
delete receiver;
}
}
} // namespace aare

View File

@ -0,0 +1,19 @@
#include "aare/network_io/ZmqSink.hpp"
#include "zmq.h"
namespace aare {
ZmqSink::ZmqSink(const std::string &sink_endpoint) : m_receiver(new ZmqSocketReceiver(sink_endpoint, ZMQ_PULL)) {
m_receiver->bind();
};
Task *ZmqSink::pull() {
Task *task = reinterpret_cast<Task *>(new std::byte[Task::MAX_DATA_SIZE + sizeof(Task)]);
m_receiver->receive_data(reinterpret_cast<std::byte *>(task), Task::MAX_DATA_SIZE);
return task;
};
ZmqSink::~ZmqSink() { delete m_receiver; };
} // namespace aare

View File

@ -25,6 +25,12 @@ ZmqSocket::~ZmqSocket() {
delete[] m_header_buffer;
}
/**
* @brief get the socket
* @return void*
*/
void *ZmqSocket::get_socket() { return m_socket; }
void ZmqSocket::set_zmq_hwm(int hwm) { m_zmq_hwm = hwm; }
void ZmqSocket::set_timeout_ms(int n) { m_timeout_ms = n; }

View File

@ -9,8 +9,9 @@ namespace aare {
/**
* @brief Construct a new ZmqSocketReceiver object
*/
ZmqSocketReceiver::ZmqSocketReceiver(const std::string &endpoint) {
m_endpoint = endpoint;
ZmqSocketReceiver::ZmqSocketReceiver(const std::string &endpoint, int socket_type) {
m_endpoint = (endpoint);
m_socket_type = (socket_type);
memset(m_header_buffer, 0, m_max_header_size);
}
@ -20,22 +21,33 @@ ZmqSocketReceiver::ZmqSocketReceiver(const std::string &endpoint) {
*/
void ZmqSocketReceiver::connect() {
m_context = zmq_ctx_new();
m_socket = zmq_socket(m_context, ZMQ_SUB);
m_socket = zmq_socket(m_context, m_socket_type);
fmt::print("Setting ZMQ_RCVHWM to {}\n", m_zmq_hwm);
int rc = zmq_setsockopt(m_socket, ZMQ_RCVHWM, &m_zmq_hwm, sizeof(m_zmq_hwm)); // should be set before connect
if (rc)
throw network_io::NetworkError(fmt::format("Could not set ZMQ_RCVHWM: {}", zmq_strerror(errno)));
size_t bufsize = m_potential_frame_size * m_zmq_hwm;
int bufsize = static_cast<int>(m_potential_frame_size) * m_zmq_hwm;
fmt::print("Setting ZMQ_RCVBUF to: {} MB\n", bufsize / (static_cast<size_t>(1024) * 1024));
rc = zmq_setsockopt(m_socket, ZMQ_RCVBUF, &bufsize, sizeof(bufsize));
if (rc)
if (rc) {
perror("zmq_setsockopt");
throw network_io::NetworkError(fmt::format("Could not set ZMQ_RCVBUF: {}", zmq_strerror(errno)));
}
zmq_connect(m_socket, m_endpoint.c_str());
zmq_setsockopt(m_socket, ZMQ_SUBSCRIBE, "", 0);
}
void ZmqSocketReceiver::bind() {
m_context = zmq_ctx_new();
m_socket = zmq_socket(m_context, m_socket_type);
size_t const rc = zmq_bind(m_socket, m_endpoint.c_str());
if (rc != 0) {
std::string const error = zmq_strerror(zmq_errno());
throw network_io::NetworkError("zmq_bind failed: " + error);
}
}
/**
* @brief receive a ZmqHeader
* @return ZmqHeader
@ -43,15 +55,13 @@ void ZmqSocketReceiver::connect() {
ZmqHeader ZmqSocketReceiver::receive_header() {
// receive string ZmqHeader
aare::logger::debug("Receiving header");
int const header_bytes_received = zmq_recv(m_socket, m_header_buffer, m_max_header_size, 0);
aare::logger::debug("Bytes: ", header_bytes_received);
aare::logger::debug("Header: ", m_header_buffer);
m_header_buffer[header_bytes_received] = '\0'; // make sure we zero terminate
if (header_bytes_received < 0) {
throw network_io::NetworkError(LOCATION + "Error receiving header");
}
aare::logger::debug("Bytes: ", header_bytes_received, ", Header: ", m_header_buffer);
// parse header
ZmqHeader header;
@ -72,9 +82,12 @@ ZmqHeader ZmqSocketReceiver::receive_header() {
*/
int ZmqSocketReceiver::receive_data(std::byte *data, size_t size) {
int const data_bytes_received = zmq_recv(m_socket, data, size, 0);
if (data_bytes_received == -1)
throw network_io::NetworkError("Got half of a multipart msg!!!");
aare::logger::debug("Bytes: ", data_bytes_received);
if (data_bytes_received == -1) {
logger::error(zmq_strerror(zmq_errno()));
// TODO: refactor this error message
throw network_io::NetworkError(LOCATION + "Error receiving data");
}
// aare::logger::debug("Bytes: ", data_bytes_received);
return data_bytes_received;
}
@ -93,7 +106,13 @@ ZmqFrame ZmqSocketReceiver::receive_zmqframe() {
}
// receive frame data
Frame frame(header.npixelsx, header.npixelsy, header.dynamicRange);
if (header.shape == t_xy<uint32_t>{0, 0} || header.bitmode == 0) {
logger::warn("Invalid header");
}
if (header.bitmode == 0) {
header.bitmode = 16;
}
Frame frame(header.shape.row, header.shape.col, header.bitmode);
int bytes_received = receive_data(frame.data(), frame.size());
if (bytes_received == -1) {
throw network_io::NetworkError(LOCATION + "Error receiving frame");

View File

@ -8,14 +8,17 @@ namespace aare {
* Constructor
* @param endpoint ZMQ endpoint
*/
ZmqSocketSender::ZmqSocketSender(const std::string &endpoint) { m_endpoint = endpoint; }
ZmqSocketSender::ZmqSocketSender(const std::string &endpoint, int socket_type) {
m_socket_type = socket_type;
m_endpoint = endpoint;
}
/**
* bind to the given port
*/
void ZmqSocketSender::bind() {
m_context = zmq_ctx_new();
m_socket = zmq_socket(m_context, ZMQ_PUB);
m_socket = zmq_socket(m_context, m_socket_type);
size_t const rc = zmq_bind(m_socket, m_endpoint.c_str());
if (rc != 0) {
std::string const error = zmq_strerror(zmq_errno());
@ -23,6 +26,31 @@ void ZmqSocketSender::bind() {
}
}
void ZmqSocketSender::connect() {
m_context = zmq_ctx_new();
m_socket = zmq_socket(m_context, m_socket_type);
fmt::print("Setting ZMQ_RCVHWM to {}\n", m_zmq_hwm);
int rc = zmq_setsockopt(m_socket, ZMQ_RCVHWM, &m_zmq_hwm, sizeof(m_zmq_hwm)); // should be set before connect
if (rc)
throw network_io::NetworkError(fmt::format("Could not set ZMQ_RCVHWM: {}", zmq_strerror(errno)));
int bufsize = static_cast<int>(m_potential_frame_size) * m_zmq_hwm;
fmt::print("Setting ZMQ_RCVBUF to: {} MB\n", bufsize / (static_cast<size_t>(1024) * 1024));
rc = zmq_setsockopt(m_socket, ZMQ_RCVBUF, &bufsize, sizeof(bufsize));
if (rc) {
perror("zmq_setsockopt");
throw network_io::NetworkError(fmt::format("Could not set ZMQ_RCVBUF: {}", zmq_strerror(errno)));
}
zmq_connect(m_socket, m_endpoint.c_str());
zmq_setsockopt(m_socket, ZMQ_SUBSCRIBE, "", 0);
}
size_t ZmqSocketSender::send(const void *data, size_t size) {
size_t const rc2 = zmq_send(m_socket, data, size, 0);
assert(rc2 == size);
return rc2;
}
/**
* send a header and data
* @param header
@ -30,7 +58,7 @@ void ZmqSocketSender::bind() {
* @param size size of data
* @return number of bytes sent
*/
size_t ZmqSocketSender::send(const ZmqHeader &header, const std::byte *data, size_t size) {
size_t ZmqSocketSender::send(const ZmqHeader &header, const void *data, size_t size) {
size_t rc = 0;
// if (serialize_header) {
// rc = zmq_send(m_socket, &header, sizeof(ZmqHeader), ZMQ_SNDMORE);

View File

@ -0,0 +1,21 @@
#include "aare/network_io/ZmqVentilator.hpp"
#include <cassert>
#include <zmq.h>
namespace aare {
ZmqVentilator::ZmqVentilator(const std::string &endpoint) : m_sender(new ZmqSocketSender(endpoint, ZMQ_PUSH)) {
m_sender->bind();
}
size_t ZmqVentilator::push(const Task *task) {
if (task->data_size > Task::MAX_DATA_SIZE) {
throw network_io::NetworkError("Data size exceeds maximum allowed size");
}
logger::debug("Pushing workers");
return m_sender->send(task, task->size());
}
ZmqVentilator::~ZmqVentilator() { delete m_sender; }
} // namespace aare

View File

@ -0,0 +1,39 @@
#include "aare/network_io/ZmqWorker.hpp"
#include "aare/network_io/ZmqSocketReceiver.hpp"
#include "aare/network_io/ZmqSocketSender.hpp"
#include "aare/network_io/defs.hpp"
#include "zmq.h"
namespace aare {
ZmqWorker::ZmqWorker(const std::string &ventilator_endpoint, const std::string &sink_endpoint)
: m_receiver(new ZmqSocketReceiver(ventilator_endpoint, ZMQ_PULL)), m_sender(nullptr) {
m_receiver->connect();
if (not sink_endpoint.empty()) {
m_sender = new ZmqSocketSender(sink_endpoint, ZMQ_PUSH);
m_sender->connect();
}
}
Task *ZmqWorker::pull() {
Task *task = reinterpret_cast<Task *>(new std::byte[Task::MAX_DATA_SIZE + sizeof(Task)]);
m_receiver->receive_data(reinterpret_cast<std::byte *>(task), Task::MAX_DATA_SIZE);
logger::debug("Received task", task->id, task->data_size);
return task;
}
size_t ZmqWorker::push(const Task *task) {
if (m_sender == nullptr) {
throw network_io::NetworkError("Worker not connected to sink: did you provide a sink endpoint?");
}
if (task->data_size > Task::MAX_DATA_SIZE) {
throw network_io::NetworkError("Data size exceeds maximum allowed size");
}
return m_sender->send(task, task->size());
}
ZmqWorker::~ZmqWorker() {
delete m_receiver;
delete m_sender;
}
} // namespace aare

View File

@ -5,14 +5,12 @@
using namespace aare;
TEST_CASE("Test ZmqHeader") {
ZmqHeader header;
header.npixelsx = 10;
header.npixelsy = 15;
header.shape = {10, 15};
header.data = 1;
header.jsonversion = 2;
header.dynamicRange = 32;
header.jsonversion = 5;
header.bitmode = 32;
header.fileIndex = 4;
header.ndetx = 5;
header.ndety = 6;
header.detshape = {5, 6};
header.size = 4800;
header.acqIndex = 8;
header.frameIndex = 9;
@ -39,13 +37,11 @@ TEST_CASE("Test ZmqHeader") {
std::string json_header = "{"
"\"data\": 1, "
"\"jsonversion\": 2, "
"\"dynamicRange\": 32, "
"\"jsonversion\": 5, "
"\"bitmode\": 32, "
"\"fileIndex\": 4, "
"\"ndetx\": 5, "
"\"ndety\": 6, "
"\"npixelsx\": 10, "
"\"npixelsy\": 15, "
"\"detshape\": [5, 6], "
"\"shape\": [10, 15], "
"\"size\": 4800, "
"\"acqIndex\": 8, "
"\"frameIndex\": 9, "

View File

@ -1,5 +1,14 @@
add_library(utils STATIC src/logger.cpp)
target_include_directories(utils PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include)
target_link_libraries(utils PUBLIC core)
if(AARE_TESTS)
set(TestSources
${CMAKE_CURRENT_SOURCE_DIR}/test/merge_frames.test.cpp
)
target_sources(tests PRIVATE ${TestSources} )
target_link_libraries(tests PRIVATE utils)
endif()

View File

@ -50,17 +50,18 @@ inline void write_map(std::string &s, const std::string &key, const std::map<std
}
s += "}, ";
}
inline void write_array(std::string &s, const std::string &key, const std::array<int, 4> &value) {
template <typename T, int N> void write_array(std::string &s, const std::string &key, const std::array<T, N> &value) {
s += "\"";
s += key;
s += "\": [";
s += std::to_string(value[0]);
s += ", ";
s += std::to_string(value[1]);
s += ", ";
s += std::to_string(value[2]);
s += ", ";
s += std::to_string(value[3]);
for (size_t i = 0; i < N - 1; i++) {
s += std::to_string(value[i]);
s += ", ";
}
s += std::to_string(value[N - 1]);
s += "], ";
}

View File

@ -0,0 +1,41 @@
#pragma once
#include "aare/core/Frame.hpp"
#include "aare/core/defs.hpp"
#include <cassert>
#include <cstring>
#include <vector>
namespace aare {
void merge_frames(std::vector<std::byte *> &part_buffers, size_t part_size, std::byte *merged_frame, const xy &geometry,
size_t rows = 0, size_t cols = 0, size_t bitdepth = 0) {
assert(part_buffers.size() == geometry.row * geometry.col);
if (geometry.col == 1) {
// get the part from each subfile and copy it to the frame
size_t part_idx = 0;
for (auto part_buffer : part_buffers) {
memcpy(merged_frame + part_idx * part_size, part_buffer, part_size);
part_idx++;
}
} else {
std::cout << "cols: " << cols << " rows: " << rows << " bitdepth: " << bitdepth << std::endl;
assert(cols != 0 && rows != 0 && bitdepth != 0);
size_t part_rows = rows / geometry.row;
size_t part_cols = cols / geometry.col;
// create a buffer that will hold a the frame part
size_t part_idx = 0;
for (auto part_buffer : part_buffers) {
for (size_t cur_row = 0; cur_row < (part_rows); cur_row++) {
auto irow = cur_row + (part_idx / geometry.col) * part_rows;
auto icol = (part_idx % geometry.col) * part_cols;
auto dest = (irow * cols + icol);
dest = dest * bitdepth / 8;
memcpy(merged_frame + dest, part_buffer + cur_row * part_cols * bitdepth / 8, part_cols * bitdepth / 8);
}
part_idx++;
}
}
}
} // namespace aare

View File

@ -0,0 +1,38 @@
#include "aare/utils/merge_frames.hpp"
#include <catch2/catch_test_macros.hpp>
using namespace aare;
// void merge_frames(std::vector<std::byte *> &part_buffers, size_t part_size, std::byte *merged_frame, const xy
// &geometry,
// size_t cols = 0, size_t rows = 0, size_t bitdepth = 0) {
TEST_CASE("merge frames {2,1}") {
xy geo = {2, 1};
std::vector<uint32_t> p1 = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
std::vector<uint32_t> p2 = {10, 11, 12, 13, 14, 15, 16, 17, 18, 19};
size_t part_size = p1.size() * sizeof(uint32_t);
Frame f(10, 2, 32);
std::vector<std::byte *> part_buffers = {reinterpret_cast<std::byte *>(p1.data()),
reinterpret_cast<std::byte *>(p2.data())};
merge_frames(part_buffers, part_size, f.data(), geo);
auto v = f.view<uint32_t>();
for (ssize_t i = 0; i < v.size(); i++) {
REQUIRE(v[i] == i);
}
}
TEST_CASE("merge frames {1,2}") {
xy geo = {1, 2};
std::vector<uint32_t> p1 = {0, 1, 2, 3, 4, 10, 11, 12, 13, 14};
std::vector<uint32_t> p2 = {5, 6, 7, 8, 9, 15, 16, 17, 18, 19};
size_t part_size = p1.size() * sizeof(uint32_t);
Frame f(2, 10, 32);
std::vector<std::byte *> part_buffers = {reinterpret_cast<std::byte *>(p1.data()),
reinterpret_cast<std::byte *>(p2.data())};
merge_frames(part_buffers, part_size, f.data(), geo, 2, 10, 32);
auto v = f.view<uint32_t>();
for (ssize_t i = 0; i < v.size(); i++) {
REQUIRE(v[i] == i);
}
}