From 16f6919bb6ebee246ec17e1c178d810523b4a46f Mon Sep 17 00:00:00 2001 From: lhdamiani Date: Wed, 18 Aug 2021 17:29:51 +0200 Subject: [PATCH 1/8] test script to consume eiger live stream --- eiger/test/consume_live.py | 49 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) create mode 100644 eiger/test/consume_live.py diff --git a/eiger/test/consume_live.py b/eiger/test/consume_live.py new file mode 100644 index 0000000..bd3e38b --- /dev/null +++ b/eiger/test/consume_live.py @@ -0,0 +1,49 @@ +from _ctypes import Structure +from ctypes import c_uint64, c_uint16 +import zmq + + +image_metadata_dtype_mapping = { + 1: 'uint8', + 2: 'uint16', + 4: 'uint32', + 8: 'uint64', + 12: 'float16', + 14: 'float32', + 18: 'float64' +} + +class ImageMetadata(Structure): + _pack_ = 1 + _fields_ = [ + ("version", c_uint64), + ("id", c_uint64), + ("height", c_uint64), + ("width", c_uint64), + + ("dtype", c_uint16), + ("encoding", c_uint16), + ("source_id", c_uint16), + ("status", c_uint16), + + ("user_1", c_uint64), + ("user_2", c_uint64),] + + def as_dict(self): + return dict((f, getattr(self, f)) for f, _ in self._fields_) + +flags=0 +zmq_context = zmq.Context(io_threads=4) +poller = zmq.Poller() +backend_socket = zmq_context.socket(zmq.SUB) +backend_socket.setsockopt_string(zmq.SUBSCRIBE, "") +backend_socket.connect("ipc:///tmp/std-daq-cSAXS.EG01V01-image_stream") +poller.register(backend_socket, zmq.POLLIN) +while True: + events = dict(poller.poll(2000)) + if backend_socket in events: + metadata = ImageMetadata.from_buffer_copy(backend_socket.recv(flags)) + image = backend_socket.recv(flags, copy=False, track=False) + print(f"Recv img id: {metadata.id}") + else: + print("nothing") \ No newline at end of file From 2e0f28352bc960022ba79508b5e9f2a448c1dbb4 Mon Sep 17 00:00:00 2001 From: lhdamiani Date: Wed, 18 Aug 2021 17:35:48 +0200 Subject: [PATCH 2/8] zmq stream to live preview --- std-stream-send/src/main.cpp | 52 +++++++++++++++++------------------- 1 file changed, 24 insertions(+), 28 deletions(-) diff --git a/std-stream-send/src/main.cpp b/std-stream-send/src/main.cpp index f1ee721..602ce69 100644 --- a/std-stream-send/src/main.cpp +++ b/std-stream-send/src/main.cpp @@ -5,6 +5,7 @@ #include #include #include "RamBuffer.hpp" +#include using namespace std; @@ -32,41 +33,36 @@ int main (int argc, char *argv[]) auto ctx = zmq_ctx_new(); zmq_ctx_set(ctx, ZMQ_IO_THREADS, STREAM_ZMQ_IO_THREADS); - auto sender = zmq_socket(ctx, ZMQ_PUSH); - const int sndhwm = PROCESSING_ZMQ_SNDHWM; - if (zmq_setsockopt( - sender, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)) != 0) { - throw runtime_error(zmq_strerror(errno)); - } + auto sender = BufferUtils::bind_socket( + ctx, config.detector_name, stream_address.c_str()); - const int linger = 0; - if (zmq_setsockopt( - sender, ZMQ_LINGER, &linger, sizeof(linger)) != 0) { - throw runtime_error(zmq_strerror(errno)); - } + auto receiver_assembler = BufferUtils::connect_socket( + ctx, config.detector_name, "assembler"); - if (zmq_bind(sender, stream_address.c_str()) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - - const size_t IMAGE_N_BYTES = config.width * config.height * bit_depth / 8; + const size_t IMAGE_N_BYTES = config.image_height * config.image_width * bit_depth / 8; RamBuffer image_buffer(config.detector_name + "_assembler", sizeof(ImageMetadata), IMAGE_N_BYTES, - config.n_modules, RAM_BUFFER_N_SLOTS); + 1, RAM_BUFFER_N_SLOTS); + + ImageMetadata meta; while (true) { - image_id = 123; + // receives the assembled image id from the assembler + zmq_recv(receiver_assembler, &meta, sizeof(meta), 0); + - zmq_send(sender, - ram_buffer.get_slot_meta(image_id), - sizeof(ImageMetadata), ZMQ_SNDMORE); - - zmq_send(sender, - ram_buffer.get_slot_data(image_id), - buffer_config::MODULE_N_BYTES * 4, 0); - - pulse_id++; - std::this_thread::sleep_for(std::chrono::milliseconds(10)); + if (meta.id % 10){ + auto* dst_meta = image_buffer.get_slot_meta(meta.id); + auto* dst_data = image_buffer.get_slot_data(meta.id); + zmq_send(sender, + &meta, + sizeof(ImageMetadata), + ZMQ_SNDMORE | ZMQ_NOBLOCK); + + zmq_send(sender, + dst_data, + IMAGE_N_BYTES, ZMQ_NOBLOCK); + } } } From dd30e2c86e82479c1345415e600e4072eb150b3b Mon Sep 17 00:00:00 2001 From: lhdamiani Date: Wed, 18 Aug 2021 17:36:07 +0200 Subject: [PATCH 3/8] live preview streamer in start daq script --- eiger/sf-daq-4/control/start_daq.sh | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/eiger/sf-daq-4/control/start_daq.sh b/eiger/sf-daq-4/control/start_daq.sh index 9a3d3eb..61d663a 100755 --- a/eiger/sf-daq-4/control/start_daq.sh +++ b/eiger/sf-daq-4/control/start_daq.sh @@ -12,6 +12,7 @@ BUILD_PATH='/home/hax_l/sf_daq_buffer/build/' UDP_RECV='std_udp_recv' UDP_SYNC='std_udp_sync' EIGER_ASSEMBLER='eiger_assembler' +STD_STREAM_SEND='std_stream_send' STD_DET_WRITER='std_det_writer' # default config file @@ -22,6 +23,7 @@ HELP_FLAG=0 # CONFIGURATION BIT_DEPTH=16 N_MPI_EXEC=3 +STREAM_NAME='streamvis' while getopts h:c:b:m: flag do case "${flag}" in @@ -29,15 +31,17 @@ do c ) CONFIG_FILE=${OPTARG};; b ) BIT_DEPTH=${OPTARG};; m ) N_MPIT_EXEC=${OPTARG};; + s ) STREAMVIS=${OPTARG};; esac done # prints help and exits if (( ${HELP_FLAG} == 1 )); then - echo "Usage : $0 -h -c -b " + echo "Usage : $0 -h -c -b -s " echo " help_flag : show this help and exits." echo " config_file : detector configuration file." echo " bit_depth : detector bit depth." + echo " stream name : live stream name." exit fi @@ -127,6 +131,25 @@ else exit fi +# Start the stream +echo "Starting the ${STD_STREAM_SEND}..." +if [ -f "${BUILD_PATH}${STD_STREAM_SEND}" ]; then + if [ -f "${CONFIG_FILE}" ]; then + if [ ${BIT_DEPTH} -ne 0 ]; then + ${BUILD_PATH}${STD_STREAM_SEND} ${CONFIG_FILE} ${BIT_DEPTH} ${STREAM_NAME} & + else + echo "Error: ${BIT_DEPTH} can't be zero..." + exit + fi + else + echo "Something went wrong while starting the ${STD_STREAM_SEND}..." + exit + fi +else + echo "Error: ${STD_STREAM_SEND} wasn't found..." + exit +fi + # Start the eiger writer echo "Starting the ${STD_DET_WRITER}..." export PATH="/usr/lib64/mpich-3.2/bin:${PATH}"; From 72058acc02e83dc03842889fa23b74145900de8c Mon Sep 17 00:00:00 2001 From: lhdamiani Date: Wed, 18 Aug 2021 17:38:31 +0200 Subject: [PATCH 4/8] fix tab formatting --- std-stream-send/src/main.cpp | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/std-stream-send/src/main.cpp b/std-stream-send/src/main.cpp index 602ce69..b6f2ea0 100644 --- a/std-stream-send/src/main.cpp +++ b/std-stream-send/src/main.cpp @@ -53,16 +53,16 @@ int main (int argc, char *argv[]) if (meta.id % 10){ - auto* dst_meta = image_buffer.get_slot_meta(meta.id); - auto* dst_data = image_buffer.get_slot_data(meta.id); - zmq_send(sender, + auto* dst_meta = image_buffer.get_slot_meta(meta.id); + auto* dst_data = image_buffer.get_slot_data(meta.id); + zmq_send(sender, &meta, - sizeof(ImageMetadata), - ZMQ_SNDMORE | ZMQ_NOBLOCK); - - zmq_send(sender, - dst_data, - IMAGE_N_BYTES, ZMQ_NOBLOCK); + sizeof(ImageMetadata), + ZMQ_SNDMORE | ZMQ_NOBLOCK); + + zmq_send(sender, + dst_data, + IMAGE_N_BYTES, ZMQ_NOBLOCK); } } } From 9ac769024162b63b2285d8399c730e1edcb5b3dc Mon Sep 17 00:00:00 2001 From: lhdamiani Date: Wed, 18 Aug 2021 17:39:15 +0200 Subject: [PATCH 5/8] fix tab formatting --- std-stream-send/src/main.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/std-stream-send/src/main.cpp b/std-stream-send/src/main.cpp index b6f2ea0..0679200 100644 --- a/std-stream-send/src/main.cpp +++ b/std-stream-send/src/main.cpp @@ -60,7 +60,7 @@ int main (int argc, char *argv[]) sizeof(ImageMetadata), ZMQ_SNDMORE | ZMQ_NOBLOCK); - zmq_send(sender, + zmq_send(sender, dst_data, IMAGE_N_BYTES, ZMQ_NOBLOCK); } From 0165a9f38d6de75c44a0fe7dd7f51b89e7ff9461 Mon Sep 17 00:00:00 2001 From: Leonardo Hax Damiani Date: Wed, 18 Aug 2021 17:40:30 +0200 Subject: [PATCH 6/8] removal of unnecessary dst_meta --- std-stream-send/src/main.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/std-stream-send/src/main.cpp b/std-stream-send/src/main.cpp index 0679200..76b089a 100644 --- a/std-stream-send/src/main.cpp +++ b/std-stream-send/src/main.cpp @@ -53,7 +53,6 @@ int main (int argc, char *argv[]) if (meta.id % 10){ - auto* dst_meta = image_buffer.get_slot_meta(meta.id); auto* dst_data = image_buffer.get_slot_data(meta.id); zmq_send(sender, &meta, From 9331d066aa4d2523c8bd3eb40aafdbe28706486d Mon Sep 17 00:00:00 2001 From: lhdamiani Date: Thu, 19 Aug 2021 12:07:26 +0200 Subject: [PATCH 7/8] live stream with json+blob --- std-stream-send/CMakeLists.txt | 5 +- std-stream-send/include/ZmqLiveSender.hpp | 31 +++++++ std-stream-send/include/stream_config.hpp | 3 + std-stream-send/src/ZmqLiveSender.cpp | 108 ++++++++++++++++++++++ std-stream-send/src/main.cpp | 42 ++++----- 5 files changed, 167 insertions(+), 22 deletions(-) create mode 100644 std-stream-send/include/ZmqLiveSender.hpp create mode 100644 std-stream-send/src/ZmqLiveSender.cpp diff --git a/std-stream-send/CMakeLists.txt b/std-stream-send/CMakeLists.txt index 3d8b419..83a6b7d 100644 --- a/std-stream-send/CMakeLists.txt +++ b/std-stream-send/CMakeLists.txt @@ -4,15 +4,18 @@ file(GLOB SOURCES add_library(std-stream-send-lib STATIC ${SOURCES}) target_include_directories(std-stream-send-lib PUBLIC include/) target_link_libraries(std-stream-send-lib + external core-buffer-lib) add_executable(std-stream-send src/main.cpp) set_target_properties(std-stream-send PROPERTIES OUTPUT_NAME std_stream_send) target_link_libraries(std-stream-send + external + core-buffer-lib std-stream-send-lib zmq pthread rt) - + enable_testing() add_subdirectory(test/) diff --git a/std-stream-send/include/ZmqLiveSender.hpp b/std-stream-send/include/ZmqLiveSender.hpp new file mode 100644 index 0000000..167af65 --- /dev/null +++ b/std-stream-send/include/ZmqLiveSender.hpp @@ -0,0 +1,31 @@ +#ifndef SF_DAQ_BUFFER_ZMQLIVESENDER_HPP +#define SF_DAQ_BUFFER_ZMQLIVESENDER_HPP + +#include +#include "formats.hpp" +#include "BufferUtils.hpp" + + +class ZmqLiveSender { + void* ctx_; + const std::string& det_name_; + const std::string& stream_address_; + + void* socket_streamvis_; + +private: + std::string _get_data_type_mapping(int dtype) const; + +public: + ZmqLiveSender(void* ctx, + const std::string& det_name, + const std::string& stream_address); + ~ZmqLiveSender(); + + void send(const ImageMetadata& meta, const char* data, + const size_t image_n_bytes); + +}; + + +#endif //SF_DAQ_BUFFER_ZMQLIVESENDER_HPP diff --git a/std-stream-send/include/stream_config.hpp b/std-stream-send/include/stream_config.hpp index a0f1d72..2452fd9 100644 --- a/std-stream-send/include/stream_config.hpp +++ b/std-stream-send/include/stream_config.hpp @@ -11,4 +11,7 @@ namespace stream_config // Number of pulses between each statistics print out. const size_t STREAM_STATS_MODULO = 1000; + + // reduction factor + const int REDUCTION_FACTOR = 5; } diff --git a/std-stream-send/src/ZmqLiveSender.cpp b/std-stream-send/src/ZmqLiveSender.cpp new file mode 100644 index 0000000..f8b5039 --- /dev/null +++ b/std-stream-send/src/ZmqLiveSender.cpp @@ -0,0 +1,108 @@ +#include "ZmqLiveSender.hpp" +#include "stream_config.hpp" + +#include "zmq.h" +#include +#include +#include +#include +#include +// +using namespace std; +using namespace stream_config; + +ZmqLiveSender::ZmqLiveSender( + void* ctx, + const std::string& det_name, + const std::string& stream_address): + ctx_(ctx), + det_name_(det_name), + stream_address_(stream_address) +{ + socket_streamvis_ = BufferUtils::bind_socket( + ctx_, det_name_, stream_address_); +} + +ZmqLiveSender::~ZmqLiveSender() +{ + zmq_close(socket_streamvis_); +} + +std::string ZmqLiveSender::_get_data_type_mapping(const int dtype) const{ + if (dtype == 1) + return "uint8"; + else if (dtype == 2) + return "uint16"; + else if (dtype == 4) + return "uint32"; + else if (dtype == 8) + return "uint64"; +} + +void ZmqLiveSender::send(const ImageMetadata& meta, const char *data, + const size_t image_n_bytes) +{ + uint16_t data_empty [] = { 0, 0, 0, 0}; + + rapidjson::Document header(rapidjson::kObjectType); + auto& header_alloc = header.GetAllocator(); + string text_header; + + // TODO: Here we need to send to streamvis and live analysis metadata(probably need to operate still on them) and data(not every frame) + header.AddMember("frame", meta.id, header_alloc); + header.AddMember("is_good_frame", meta.status, header_alloc); + + rapidjson::Value detector_name; + detector_name.SetString(det_name_.c_str(), header_alloc); + header.AddMember("detector_name", detector_name, header_alloc); + + header.AddMember("htype", "array-1.0", header_alloc); + + rapidjson::Value dtype; + dtype.SetString(_get_data_type_mapping(meta.dtype).c_str(), header_alloc); + header.AddMember("type", dtype, header_alloc); + + // To be retrieved and filled with correct values down. + auto shape_value = rapidjson::Value(rapidjson::kArrayType); + shape_value.PushBack((uint64_t)0, header_alloc); + shape_value.PushBack((uint64_t)0, header_alloc); + header.AddMember("shape", shape_value, header_alloc); + + int send_streamvis = 0; + send_streamvis = rand() % REDUCTION_FACTOR; + if ( send_streamvis == 0 ) { + auto& shape = header["shape"]; + shape[0] = meta.width; + shape[1] = meta.height; + } else{ + auto& shape = header["shape"]; + shape[0] = 2; + shape[1] = 2; + } + + { + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + header.Accept(writer); + + text_header = buffer.GetString(); + } + + zmq_send(socket_streamvis_, + text_header.c_str(), + text_header.size(), + ZMQ_SNDMORE); + + if ( send_streamvis == 0 ) { + zmq_send(socket_streamvis_, + (char*)data, + image_n_bytes, + 0); + } else { + zmq_send(socket_streamvis_, + (char*)data_empty, + 8, + 0); + } + +} \ No newline at end of file diff --git a/std-stream-send/src/main.cpp b/std-stream-send/src/main.cpp index 0679200..b39f2d3 100644 --- a/std-stream-send/src/main.cpp +++ b/std-stream-send/src/main.cpp @@ -1,12 +1,12 @@ #include #include -#include "stream_config.hpp" #include #include -#include -#include "RamBuffer.hpp" #include +#include +#include "stream_config.hpp" +#include "ZmqLiveSender.hpp" using namespace std; using namespace stream_config; @@ -26,20 +26,19 @@ int main (int argc, char *argv[]) exit(-1); } - const auto config = StreamSendConfig::from_json_file(string(argv[1])); + auto config = BufferUtils::read_json_config(string(argv[1])); const int bit_depth = atoi(argv[2]); - const string stream_address = string(argv[3]); + const auto stream_address = string(argv[3]); auto ctx = zmq_ctx_new(); zmq_ctx_set(ctx, ZMQ_IO_THREADS, STREAM_ZMQ_IO_THREADS); - - auto sender = BufferUtils::bind_socket( - ctx, config.detector_name, stream_address.c_str()); + ZmqLiveSender sender(ctx, config.detector_name, stream_address); auto receiver_assembler = BufferUtils::connect_socket( ctx, config.detector_name, "assembler"); const size_t IMAGE_N_BYTES = config.image_height * config.image_width * bit_depth / 8; + RamBuffer image_buffer(config.detector_name + "_assembler", sizeof(ImageMetadata), IMAGE_N_BYTES, 1, RAM_BUFFER_N_SLOTS); @@ -50,19 +49,20 @@ int main (int argc, char *argv[]) // receives the assembled image id from the assembler zmq_recv(receiver_assembler, &meta, sizeof(meta), 0); + // gets the image data + char* dst_data = image_buffer.get_slot_data(meta.id); + // sends the json metadata with the data + sender.send(meta, dst_data, IMAGE_N_BYTES); + + + // zmq_send(sender, + // &meta, + // sizeof(ImageMetadata), + // ZMQ_SNDMORE | ZMQ_NOBLOCK); + + // zmq_send(sender, + // dst_data, + // IMAGE_N_BYTES, ZMQ_NOBLOCK); - - if (meta.id % 10){ - auto* dst_meta = image_buffer.get_slot_meta(meta.id); - auto* dst_data = image_buffer.get_slot_data(meta.id); - zmq_send(sender, - &meta, - sizeof(ImageMetadata), - ZMQ_SNDMORE | ZMQ_NOBLOCK); - - zmq_send(sender, - dst_data, - IMAGE_N_BYTES, ZMQ_NOBLOCK); - } } } From 8ebe6bd17b11779c262f72d5e4dccc332a41984d Mon Sep 17 00:00:00 2001 From: lhdamiani Date: Thu, 19 Aug 2021 12:08:46 +0200 Subject: [PATCH 8/8] using default json reader instead --- std-stream-send/include/StreamSendConfig.hpp | 33 -------------------- 1 file changed, 33 deletions(-) delete mode 100644 std-stream-send/include/StreamSendConfig.hpp diff --git a/std-stream-send/include/StreamSendConfig.hpp b/std-stream-send/include/StreamSendConfig.hpp deleted file mode 100644 index 28bdf6e..0000000 --- a/std-stream-send/include/StreamSendConfig.hpp +++ /dev/null @@ -1,33 +0,0 @@ -#ifndef SF_DAQ_BUFFER_UDPRECVCONFIG_HPP -#define SF_DAQ_BUFFER_UDPRECVCONFIG_HPP - - -#include -#include -#include -#include -#include - -struct StreamSendConfig { - static StreamSendConfig from_json_file(const std::string& filename) { - std::ifstream ifs(filename); - rapidjson::IStreamWrapper isw(ifs); - rapidjson::Document config_parameters; - config_parameters.ParseStream(isw); - - return { - config_parameters["detector_name"].GetString(), - config_parameters["n_modules"].GetInt(), - config_parameters["image_height"].GetInt(), - config_parameters["image_width"].GetInt(), - }; - } - - const std::string detector_name; - const int n_modules; - const int image_height; - const int image_width; -}; - - -#endif //SF_DAQ_BUFFER_UDPRECVCONFIG_HPP