This commit is contained in:
2021-09-30 16:45:20 +02:00
8 changed files with 238 additions and 68 deletions
+24 -1
View File
@@ -12,6 +12,7 @@ BUILD_PATH='/home/hax_l/sf_daq_buffer/build/'
UDP_RECV='std_udp_recv'
UDP_SYNC='std_udp_sync'
EIGER_ASSEMBLER='eiger_assembler'
STD_STREAM_SEND='std_stream_send'
STD_DET_WRITER='std_det_writer'
# default config file
@@ -22,6 +23,7 @@ HELP_FLAG=0
# CONFIGURATION
BIT_DEPTH=16
N_MPI_EXEC=3
STREAM_NAME='streamvis'
while getopts h:c:b:m: flag
do
case "${flag}" in
@@ -29,15 +31,17 @@ do
c ) CONFIG_FILE=${OPTARG};;
b ) BIT_DEPTH=${OPTARG};;
m ) N_MPIT_EXEC=${OPTARG};;
s ) STREAMVIS=${OPTARG};;
esac
done
# prints help and exits
if (( ${HELP_FLAG} == 1 )); then
echo "Usage : $0 -h <help_flag> -c <config_file> -b <bit_depth>"
echo "Usage : $0 -h <help_flag> -c <config_file> -b <bit_depth> -s <stream_name>"
echo " help_flag : show this help and exits."
echo " config_file : detector configuration file."
echo " bit_depth : detector bit depth."
echo " stream name : live stream name."
exit
fi
@@ -127,6 +131,25 @@ else
exit
fi
# Start the stream
echo "Starting the ${STD_STREAM_SEND}..."
if [ -f "${BUILD_PATH}${STD_STREAM_SEND}" ]; then
if [ -f "${CONFIG_FILE}" ]; then
if [ ${BIT_DEPTH} -ne 0 ]; then
${BUILD_PATH}${STD_STREAM_SEND} ${CONFIG_FILE} ${BIT_DEPTH} ${STREAM_NAME} &
else
echo "Error: ${BIT_DEPTH} can't be zero..."
exit
fi
else
echo "Something went wrong while starting the ${STD_STREAM_SEND}..."
exit
fi
else
echo "Error: ${STD_STREAM_SEND} wasn't found..."
exit
fi
# Start the eiger writer
echo "Starting the ${STD_DET_WRITER}..."
export PATH="/usr/lib64/mpich-3.2/bin:${PATH}";
+49
View File
@@ -0,0 +1,49 @@
from _ctypes import Structure
from ctypes import c_uint64, c_uint16
import zmq
image_metadata_dtype_mapping = {
1: 'uint8',
2: 'uint16',
4: 'uint32',
8: 'uint64',
12: 'float16',
14: 'float32',
18: 'float64'
}
class ImageMetadata(Structure):
_pack_ = 1
_fields_ = [
("version", c_uint64),
("id", c_uint64),
("height", c_uint64),
("width", c_uint64),
("dtype", c_uint16),
("encoding", c_uint16),
("source_id", c_uint16),
("status", c_uint16),
("user_1", c_uint64),
("user_2", c_uint64),]
def as_dict(self):
return dict((f, getattr(self, f)) for f, _ in self._fields_)
flags=0
zmq_context = zmq.Context(io_threads=4)
poller = zmq.Poller()
backend_socket = zmq_context.socket(zmq.SUB)
backend_socket.setsockopt_string(zmq.SUBSCRIBE, "")
backend_socket.connect("ipc:///tmp/std-daq-cSAXS.EG01V01-image_stream")
poller.register(backend_socket, zmq.POLLIN)
while True:
events = dict(poller.poll(2000))
if backend_socket in events:
metadata = ImageMetadata.from_buffer_copy(backend_socket.recv(flags))
image = backend_socket.recv(flags, copy=False, track=False)
print(f"Recv img id: {metadata.id}")
else:
print("nothing")
+4 -1
View File
@@ -4,15 +4,18 @@ file(GLOB SOURCES
add_library(std-stream-send-lib STATIC ${SOURCES})
target_include_directories(std-stream-send-lib PUBLIC include/)
target_link_libraries(std-stream-send-lib
external
core-buffer-lib)
add_executable(std-stream-send src/main.cpp)
set_target_properties(std-stream-send PROPERTIES OUTPUT_NAME std_stream_send)
target_link_libraries(std-stream-send
external
core-buffer-lib
std-stream-send-lib
zmq
pthread
rt)
enable_testing()
add_subdirectory(test/)
@@ -1,33 +0,0 @@
#ifndef SF_DAQ_BUFFER_UDPRECVCONFIG_HPP
#define SF_DAQ_BUFFER_UDPRECVCONFIG_HPP
#include <rapidjson/istreamwrapper.h>
#include <rapidjson/document.h>
#include <rapidjson/stringbuffer.h>
#include <string>
#include <fstream>
struct StreamSendConfig {
static StreamSendConfig from_json_file(const std::string& filename) {
std::ifstream ifs(filename);
rapidjson::IStreamWrapper isw(ifs);
rapidjson::Document config_parameters;
config_parameters.ParseStream(isw);
return {
config_parameters["detector_name"].GetString(),
config_parameters["n_modules"].GetInt(),
config_parameters["image_height"].GetInt(),
config_parameters["image_width"].GetInt(),
};
}
const std::string detector_name;
const int n_modules;
const int image_height;
const int image_width;
};
#endif //SF_DAQ_BUFFER_UDPRECVCONFIG_HPP
+31
View File
@@ -0,0 +1,31 @@
#ifndef SF_DAQ_BUFFER_ZMQLIVESENDER_HPP
#define SF_DAQ_BUFFER_ZMQLIVESENDER_HPP
#include <string>
#include "formats.hpp"
#include "BufferUtils.hpp"
class ZmqLiveSender {
void* ctx_;
const std::string& det_name_;
const std::string& stream_address_;
void* socket_streamvis_;
private:
std::string _get_data_type_mapping(int dtype) const;
public:
ZmqLiveSender(void* ctx,
const std::string& det_name,
const std::string& stream_address);
~ZmqLiveSender();
void send(const ImageMetadata& meta, const char* data,
const size_t image_n_bytes);
};
#endif //SF_DAQ_BUFFER_ZMQLIVESENDER_HPP
@@ -11,4 +11,7 @@ namespace stream_config
// Number of pulses between each statistics print out.
const size_t STREAM_STATS_MODULO = 1000;
// reduction factor
const int REDUCTION_FACTOR = 5;
}
+108
View File
@@ -0,0 +1,108 @@
#include "ZmqLiveSender.hpp"
#include "stream_config.hpp"
#include "zmq.h"
#include <stdexcept>
#include <rapidjson/document.h>
#include <rapidjson/stringbuffer.h>
#include <rapidjson/writer.h>
#include <iostream>
//
using namespace std;
using namespace stream_config;
ZmqLiveSender::ZmqLiveSender(
void* ctx,
const std::string& det_name,
const std::string& stream_address):
ctx_(ctx),
det_name_(det_name),
stream_address_(stream_address)
{
socket_streamvis_ = BufferUtils::bind_socket(
ctx_, det_name_, stream_address_);
}
ZmqLiveSender::~ZmqLiveSender()
{
zmq_close(socket_streamvis_);
}
std::string ZmqLiveSender::_get_data_type_mapping(const int dtype) const{
if (dtype == 1)
return "uint8";
else if (dtype == 2)
return "uint16";
else if (dtype == 4)
return "uint32";
else if (dtype == 8)
return "uint64";
}
void ZmqLiveSender::send(const ImageMetadata& meta, const char *data,
const size_t image_n_bytes)
{
uint16_t data_empty [] = { 0, 0, 0, 0};
rapidjson::Document header(rapidjson::kObjectType);
auto& header_alloc = header.GetAllocator();
string text_header;
// TODO: Here we need to send to streamvis and live analysis metadata(probably need to operate still on them) and data(not every frame)
header.AddMember("frame", meta.id, header_alloc);
header.AddMember("is_good_frame", meta.status, header_alloc);
rapidjson::Value detector_name;
detector_name.SetString(det_name_.c_str(), header_alloc);
header.AddMember("detector_name", detector_name, header_alloc);
header.AddMember("htype", "array-1.0", header_alloc);
rapidjson::Value dtype;
dtype.SetString(_get_data_type_mapping(meta.dtype).c_str(), header_alloc);
header.AddMember("type", dtype, header_alloc);
// To be retrieved and filled with correct values down.
auto shape_value = rapidjson::Value(rapidjson::kArrayType);
shape_value.PushBack((uint64_t)0, header_alloc);
shape_value.PushBack((uint64_t)0, header_alloc);
header.AddMember("shape", shape_value, header_alloc);
int send_streamvis = 0;
send_streamvis = rand() % REDUCTION_FACTOR;
if ( send_streamvis == 0 ) {
auto& shape = header["shape"];
shape[0] = meta.width;
shape[1] = meta.height;
} else{
auto& shape = header["shape"];
shape[0] = 2;
shape[1] = 2;
}
{
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
header.Accept(writer);
text_header = buffer.GetString();
}
zmq_send(socket_streamvis_,
text_header.c_str(),
text_header.size(),
ZMQ_SNDMORE);
if ( send_streamvis == 0 ) {
zmq_send(socket_streamvis_,
(char*)data,
image_n_bytes,
0);
} else {
zmq_send(socket_streamvis_,
(char*)data_empty,
8,
0);
}
}
+19 -33
View File
@@ -1,11 +1,12 @@
#include <iostream>
#include <zmq.h>
#include "stream_config.hpp"
#include <chrono>
#include <thread>
#include <StreamSendConfig.hpp>
#include "RamBuffer.hpp"
#include <BufferUtils.hpp>
#include <RamBuffer.hpp>
#include "stream_config.hpp"
#include "ZmqLiveSender.hpp"
using namespace std;
using namespace stream_config;
@@ -25,48 +26,33 @@ int main (int argc, char *argv[])
exit(-1);
}
const auto config = StreamSendConfig::from_json_file(string(argv[1]));
auto config = BufferUtils::read_json_config(string(argv[1]));
const int bit_depth = atoi(argv[2]);
const string stream_address = string(argv[3]);
const auto stream_address = string(argv[3]);
auto ctx = zmq_ctx_new();
zmq_ctx_set(ctx, ZMQ_IO_THREADS, STREAM_ZMQ_IO_THREADS);
ZmqLiveSender sender(ctx, config.detector_name, stream_address);
auto sender = zmq_socket(ctx, ZMQ_PUSH);
const int sndhwm = PROCESSING_ZMQ_SNDHWM;
if (zmq_setsockopt(
sender, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)) != 0) {
throw runtime_error(zmq_strerror(errno));
}
auto receiver_assembler = BufferUtils::connect_socket(
ctx, config.detector_name, "assembler");
const int linger = 0;
if (zmq_setsockopt(
sender, ZMQ_LINGER, &linger, sizeof(linger)) != 0) {
throw runtime_error(zmq_strerror(errno));
}
const size_t IMAGE_N_BYTES = config.image_height * config.image_width * bit_depth / 8;
if (zmq_bind(sender, stream_address.c_str()) != 0) {
throw runtime_error(zmq_strerror(errno));
}
const size_t IMAGE_N_BYTES = config.width * config.height * bit_depth / 8;
RamBuffer image_buffer(config.detector_name + "_assembler",
sizeof(ImageMetadata), IMAGE_N_BYTES,
config.n_modules, RAM_BUFFER_N_SLOTS);
1, RAM_BUFFER_N_SLOTS);
ImageMetadata meta;
while (true) {
image_id = 123;
// receives the assembled image id from the assembler
zmq_recv(receiver_assembler, &meta, sizeof(meta), 0);
// gets the image data
char* dst_data = image_buffer.get_slot_data(meta.id);
// sends the json metadata with the data
sender.send(meta, dst_data, IMAGE_N_BYTES);
zmq_send(sender,
ram_buffer.get_slot_meta(image_id),
sizeof(ImageMetadata), ZMQ_SNDMORE);
zmq_send(sender,
ram_buffer.get_slot_data(image_id),
buffer_config::MODULE_N_BYTES * 4, 0);
pulse_id++;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}