diff --git a/eiger/sf-daq-4/control/start_daq.sh b/eiger/sf-daq-4/control/start_daq.sh index 9a3d3eb..61d663a 100755 --- a/eiger/sf-daq-4/control/start_daq.sh +++ b/eiger/sf-daq-4/control/start_daq.sh @@ -12,6 +12,7 @@ BUILD_PATH='/home/hax_l/sf_daq_buffer/build/' UDP_RECV='std_udp_recv' UDP_SYNC='std_udp_sync' EIGER_ASSEMBLER='eiger_assembler' +STD_STREAM_SEND='std_stream_send' STD_DET_WRITER='std_det_writer' # default config file @@ -22,6 +23,7 @@ HELP_FLAG=0 # CONFIGURATION BIT_DEPTH=16 N_MPI_EXEC=3 +STREAM_NAME='streamvis' while getopts h:c:b:m: flag do case "${flag}" in @@ -29,15 +31,17 @@ do c ) CONFIG_FILE=${OPTARG};; b ) BIT_DEPTH=${OPTARG};; m ) N_MPIT_EXEC=${OPTARG};; + s ) STREAMVIS=${OPTARG};; esac done # prints help and exits if (( ${HELP_FLAG} == 1 )); then - echo "Usage : $0 -h -c -b " + echo "Usage : $0 -h -c -b -s " echo " help_flag : show this help and exits." echo " config_file : detector configuration file." echo " bit_depth : detector bit depth." + echo " stream name : live stream name." exit fi @@ -127,6 +131,25 @@ else exit fi +# Start the stream +echo "Starting the ${STD_STREAM_SEND}..." +if [ -f "${BUILD_PATH}${STD_STREAM_SEND}" ]; then + if [ -f "${CONFIG_FILE}" ]; then + if [ ${BIT_DEPTH} -ne 0 ]; then + ${BUILD_PATH}${STD_STREAM_SEND} ${CONFIG_FILE} ${BIT_DEPTH} ${STREAM_NAME} & + else + echo "Error: ${BIT_DEPTH} can't be zero..." + exit + fi + else + echo "Something went wrong while starting the ${STD_STREAM_SEND}..." + exit + fi +else + echo "Error: ${STD_STREAM_SEND} wasn't found..." + exit +fi + # Start the eiger writer echo "Starting the ${STD_DET_WRITER}..." export PATH="/usr/lib64/mpich-3.2/bin:${PATH}"; diff --git a/eiger/test/consume_live.py b/eiger/test/consume_live.py new file mode 100644 index 0000000..bd3e38b --- /dev/null +++ b/eiger/test/consume_live.py @@ -0,0 +1,49 @@ +from _ctypes import Structure +from ctypes import c_uint64, c_uint16 +import zmq + + +image_metadata_dtype_mapping = { + 1: 'uint8', + 2: 'uint16', + 4: 'uint32', + 8: 'uint64', + 12: 'float16', + 14: 'float32', + 18: 'float64' +} + +class ImageMetadata(Structure): + _pack_ = 1 + _fields_ = [ + ("version", c_uint64), + ("id", c_uint64), + ("height", c_uint64), + ("width", c_uint64), + + ("dtype", c_uint16), + ("encoding", c_uint16), + ("source_id", c_uint16), + ("status", c_uint16), + + ("user_1", c_uint64), + ("user_2", c_uint64),] + + def as_dict(self): + return dict((f, getattr(self, f)) for f, _ in self._fields_) + +flags=0 +zmq_context = zmq.Context(io_threads=4) +poller = zmq.Poller() +backend_socket = zmq_context.socket(zmq.SUB) +backend_socket.setsockopt_string(zmq.SUBSCRIBE, "") +backend_socket.connect("ipc:///tmp/std-daq-cSAXS.EG01V01-image_stream") +poller.register(backend_socket, zmq.POLLIN) +while True: + events = dict(poller.poll(2000)) + if backend_socket in events: + metadata = ImageMetadata.from_buffer_copy(backend_socket.recv(flags)) + image = backend_socket.recv(flags, copy=False, track=False) + print(f"Recv img id: {metadata.id}") + else: + print("nothing") \ No newline at end of file diff --git a/std-stream-send/CMakeLists.txt b/std-stream-send/CMakeLists.txt index 3d8b419..83a6b7d 100644 --- a/std-stream-send/CMakeLists.txt +++ b/std-stream-send/CMakeLists.txt @@ -4,15 +4,18 @@ file(GLOB SOURCES add_library(std-stream-send-lib STATIC ${SOURCES}) target_include_directories(std-stream-send-lib PUBLIC include/) target_link_libraries(std-stream-send-lib + external core-buffer-lib) add_executable(std-stream-send src/main.cpp) set_target_properties(std-stream-send PROPERTIES OUTPUT_NAME std_stream_send) target_link_libraries(std-stream-send + external + core-buffer-lib std-stream-send-lib zmq pthread rt) - + enable_testing() add_subdirectory(test/) diff --git a/std-stream-send/include/StreamSendConfig.hpp b/std-stream-send/include/StreamSendConfig.hpp deleted file mode 100644 index 28bdf6e..0000000 --- a/std-stream-send/include/StreamSendConfig.hpp +++ /dev/null @@ -1,33 +0,0 @@ -#ifndef SF_DAQ_BUFFER_UDPRECVCONFIG_HPP -#define SF_DAQ_BUFFER_UDPRECVCONFIG_HPP - - -#include -#include -#include -#include -#include - -struct StreamSendConfig { - static StreamSendConfig from_json_file(const std::string& filename) { - std::ifstream ifs(filename); - rapidjson::IStreamWrapper isw(ifs); - rapidjson::Document config_parameters; - config_parameters.ParseStream(isw); - - return { - config_parameters["detector_name"].GetString(), - config_parameters["n_modules"].GetInt(), - config_parameters["image_height"].GetInt(), - config_parameters["image_width"].GetInt(), - }; - } - - const std::string detector_name; - const int n_modules; - const int image_height; - const int image_width; -}; - - -#endif //SF_DAQ_BUFFER_UDPRECVCONFIG_HPP diff --git a/std-stream-send/include/ZmqLiveSender.hpp b/std-stream-send/include/ZmqLiveSender.hpp new file mode 100644 index 0000000..167af65 --- /dev/null +++ b/std-stream-send/include/ZmqLiveSender.hpp @@ -0,0 +1,31 @@ +#ifndef SF_DAQ_BUFFER_ZMQLIVESENDER_HPP +#define SF_DAQ_BUFFER_ZMQLIVESENDER_HPP + +#include +#include "formats.hpp" +#include "BufferUtils.hpp" + + +class ZmqLiveSender { + void* ctx_; + const std::string& det_name_; + const std::string& stream_address_; + + void* socket_streamvis_; + +private: + std::string _get_data_type_mapping(int dtype) const; + +public: + ZmqLiveSender(void* ctx, + const std::string& det_name, + const std::string& stream_address); + ~ZmqLiveSender(); + + void send(const ImageMetadata& meta, const char* data, + const size_t image_n_bytes); + +}; + + +#endif //SF_DAQ_BUFFER_ZMQLIVESENDER_HPP diff --git a/std-stream-send/include/stream_config.hpp b/std-stream-send/include/stream_config.hpp index a0f1d72..2452fd9 100644 --- a/std-stream-send/include/stream_config.hpp +++ b/std-stream-send/include/stream_config.hpp @@ -11,4 +11,7 @@ namespace stream_config // Number of pulses between each statistics print out. const size_t STREAM_STATS_MODULO = 1000; + + // reduction factor + const int REDUCTION_FACTOR = 5; } diff --git a/std-stream-send/src/ZmqLiveSender.cpp b/std-stream-send/src/ZmqLiveSender.cpp new file mode 100644 index 0000000..f8b5039 --- /dev/null +++ b/std-stream-send/src/ZmqLiveSender.cpp @@ -0,0 +1,108 @@ +#include "ZmqLiveSender.hpp" +#include "stream_config.hpp" + +#include "zmq.h" +#include +#include +#include +#include +#include +// +using namespace std; +using namespace stream_config; + +ZmqLiveSender::ZmqLiveSender( + void* ctx, + const std::string& det_name, + const std::string& stream_address): + ctx_(ctx), + det_name_(det_name), + stream_address_(stream_address) +{ + socket_streamvis_ = BufferUtils::bind_socket( + ctx_, det_name_, stream_address_); +} + +ZmqLiveSender::~ZmqLiveSender() +{ + zmq_close(socket_streamvis_); +} + +std::string ZmqLiveSender::_get_data_type_mapping(const int dtype) const{ + if (dtype == 1) + return "uint8"; + else if (dtype == 2) + return "uint16"; + else if (dtype == 4) + return "uint32"; + else if (dtype == 8) + return "uint64"; +} + +void ZmqLiveSender::send(const ImageMetadata& meta, const char *data, + const size_t image_n_bytes) +{ + uint16_t data_empty [] = { 0, 0, 0, 0}; + + rapidjson::Document header(rapidjson::kObjectType); + auto& header_alloc = header.GetAllocator(); + string text_header; + + // TODO: Here we need to send to streamvis and live analysis metadata(probably need to operate still on them) and data(not every frame) + header.AddMember("frame", meta.id, header_alloc); + header.AddMember("is_good_frame", meta.status, header_alloc); + + rapidjson::Value detector_name; + detector_name.SetString(det_name_.c_str(), header_alloc); + header.AddMember("detector_name", detector_name, header_alloc); + + header.AddMember("htype", "array-1.0", header_alloc); + + rapidjson::Value dtype; + dtype.SetString(_get_data_type_mapping(meta.dtype).c_str(), header_alloc); + header.AddMember("type", dtype, header_alloc); + + // To be retrieved and filled with correct values down. + auto shape_value = rapidjson::Value(rapidjson::kArrayType); + shape_value.PushBack((uint64_t)0, header_alloc); + shape_value.PushBack((uint64_t)0, header_alloc); + header.AddMember("shape", shape_value, header_alloc); + + int send_streamvis = 0; + send_streamvis = rand() % REDUCTION_FACTOR; + if ( send_streamvis == 0 ) { + auto& shape = header["shape"]; + shape[0] = meta.width; + shape[1] = meta.height; + } else{ + auto& shape = header["shape"]; + shape[0] = 2; + shape[1] = 2; + } + + { + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + header.Accept(writer); + + text_header = buffer.GetString(); + } + + zmq_send(socket_streamvis_, + text_header.c_str(), + text_header.size(), + ZMQ_SNDMORE); + + if ( send_streamvis == 0 ) { + zmq_send(socket_streamvis_, + (char*)data, + image_n_bytes, + 0); + } else { + zmq_send(socket_streamvis_, + (char*)data_empty, + 8, + 0); + } + +} \ No newline at end of file diff --git a/std-stream-send/src/main.cpp b/std-stream-send/src/main.cpp index f1ee721..4e4f651 100644 --- a/std-stream-send/src/main.cpp +++ b/std-stream-send/src/main.cpp @@ -1,11 +1,12 @@ #include #include -#include "stream_config.hpp" #include #include -#include -#include "RamBuffer.hpp" +#include +#include +#include "stream_config.hpp" +#include "ZmqLiveSender.hpp" using namespace std; using namespace stream_config; @@ -25,48 +26,33 @@ int main (int argc, char *argv[]) exit(-1); } - const auto config = StreamSendConfig::from_json_file(string(argv[1])); + auto config = BufferUtils::read_json_config(string(argv[1])); const int bit_depth = atoi(argv[2]); - const string stream_address = string(argv[3]); + const auto stream_address = string(argv[3]); auto ctx = zmq_ctx_new(); zmq_ctx_set(ctx, ZMQ_IO_THREADS, STREAM_ZMQ_IO_THREADS); + ZmqLiveSender sender(ctx, config.detector_name, stream_address); - auto sender = zmq_socket(ctx, ZMQ_PUSH); - const int sndhwm = PROCESSING_ZMQ_SNDHWM; - if (zmq_setsockopt( - sender, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)) != 0) { - throw runtime_error(zmq_strerror(errno)); - } + auto receiver_assembler = BufferUtils::connect_socket( + ctx, config.detector_name, "assembler"); - const int linger = 0; - if (zmq_setsockopt( - sender, ZMQ_LINGER, &linger, sizeof(linger)) != 0) { - throw runtime_error(zmq_strerror(errno)); - } + const size_t IMAGE_N_BYTES = config.image_height * config.image_width * bit_depth / 8; - if (zmq_bind(sender, stream_address.c_str()) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - - const size_t IMAGE_N_BYTES = config.width * config.height * bit_depth / 8; RamBuffer image_buffer(config.detector_name + "_assembler", sizeof(ImageMetadata), IMAGE_N_BYTES, - config.n_modules, RAM_BUFFER_N_SLOTS); + 1, RAM_BUFFER_N_SLOTS); + + ImageMetadata meta; while (true) { - image_id = 123; + // receives the assembled image id from the assembler + zmq_recv(receiver_assembler, &meta, sizeof(meta), 0); + // gets the image data + char* dst_data = image_buffer.get_slot_data(meta.id); + // sends the json metadata with the data + sender.send(meta, dst_data, IMAGE_N_BYTES); - zmq_send(sender, - ram_buffer.get_slot_meta(image_id), - sizeof(ImageMetadata), ZMQ_SNDMORE); - - zmq_send(sender, - ram_buffer.get_slot_data(image_id), - buffer_config::MODULE_N_BYTES * 4, 0); - - pulse_id++; - std::this_thread::sleep_for(std::chrono::milliseconds(10)); } }