mirror of
https://github.com/paulscherrerinstitute/sf_daq_buffer.git
synced 2026-05-01 15:12:23 +02:00
ZMQ receives acting like assembler
This commit is contained in:
@@ -26,11 +26,9 @@ public:
|
||||
~RamBuffer();
|
||||
|
||||
void write_frame(const ModuleFrame &src_meta, const char *src_data) const;
|
||||
void read_frame(const uint64_t pulse_id,
|
||||
const uint64_t module_id,
|
||||
ModuleFrame &meta,
|
||||
char *data) const;
|
||||
void read_frame(const uint64_t pulse_id, const uint64_t module_id, ModuleFrame &meta, char *data) const;
|
||||
char* read_image(const uint64_t pulse_id) const;
|
||||
char* write_image(const ImageMetadata &src_meta, const char *src_data);
|
||||
void assemble_image(const uint64_t pulse_id, ImageMetadata &image_meta) const;
|
||||
};
|
||||
|
||||
|
||||
@@ -0,0 +1,61 @@
|
||||
#ifndef SF_DAQ_BUFFER_TYPEMAP_HPP
|
||||
#define SF_DAQ_BUFFER_TYPEMAP_HPP
|
||||
|
||||
enum class TypeMap {
|
||||
VOID,
|
||||
CHAR,
|
||||
INT8,
|
||||
UINT8,
|
||||
INT16,
|
||||
UINT16,
|
||||
INT32,
|
||||
UINT32,
|
||||
INT64,
|
||||
UINT64,
|
||||
FLOAT,
|
||||
DOUBLE,
|
||||
COMPLEX_FLOAT,
|
||||
COMPLEX_DOUBLE
|
||||
};
|
||||
|
||||
struct Type{
|
||||
const size_t size;
|
||||
const int value;
|
||||
};
|
||||
|
||||
|
||||
const std::unordered_map<std::type_index, Type> TypeTable = {
|
||||
{ typeid(void), {sizeof(void), TypeMap::VOID} },
|
||||
{ typeid(char), {sizeof(char), TypeMap::CHAR} },
|
||||
{ typeid(int8_t), {sizeof(int8_t), TypeMap::INT8} },
|
||||
{ typeid(uint8_t), {sizeof(uint8_t), TypeMap::UINT8} },
|
||||
{ typeid(int16_t), {sizeof(int16_t), TypeMap::INT16} },
|
||||
{ typeid(uint16_t), {sizeof(uint16_t), TypeMap::UINT16} },
|
||||
{ typeid(int32_t), {sizeof(int32_t), TypeMap::INT32} },
|
||||
{ typeid(uint32_t), {sizeof(uint32_t), TypeMap::UINT32} },
|
||||
{ typeid(int64_t), {sizeof(int64_t), TypeMap::INT64} },
|
||||
{ typeid(uint64_t), {sizeof(uint64_t), TypeMap::UINT64} },
|
||||
{ typeid(float), {sizeof(float), TypeMap::float} },
|
||||
{ typeid(double), {sizeof(double), TypeMap::DOUBLE} },
|
||||
{ typeid(std::complex<float>), {sizeof(std::complex<float>), TypeMap::COMPLEX_FLOAT} },
|
||||
{ typeid(std::complex<double>), {sizeof(std::complex<double>), TypeMap::COMPLEX_DOUBLE} }
|
||||
};
|
||||
|
||||
const std::unordered_map<int , Type> TypeTable = {
|
||||
{ TypeMap::VOID, {sizeof(void), TypeMap::VOID} },
|
||||
{ TypeMap::CHAR, {sizeof(char), TypeMap::CHAR} },
|
||||
{ TypeMap::INT8, {sizeof(int8_t), TypeMap::INT8} },
|
||||
{ TypeMap::UINT8, {sizeof(uint8_t), TypeMap::UINT8} },
|
||||
{ TypeMap::INT16, {sizeof(int16_t), TypeMap::INT16} },
|
||||
{ TypeMap::UINT16, {sizeof(uint16_t), TypeMap::UINT16} },
|
||||
{ TypeMap::INT32, {sizeof(int32_t), TypeMap::INT32} },
|
||||
{ TypeMap::UINT32, {sizeof(uint32_t), TypeMap::UINT32} },
|
||||
{ TypeMap::INT64, {sizeof(int64_t), TypeMap::INT64} },
|
||||
{ TypeMap::UINT64, {sizeof(uint64_t), TypeMap::UINT64} },
|
||||
{ TypeMap::FLOAT, {sizeof(float), TypeMap::float} },
|
||||
{ TypeMap::DOUBLE, {sizeof(double), TypeMap::DOUBLE} },
|
||||
{ TypeMap::COMPLEX_FLOAT, {sizeof(std::complex<float>), TypeMap::COMPLEX_FLOAT} },
|
||||
{ TypeMap::COMPLEX_DOUBLE, {sizeof(std::complex<double>), TypeMap::COMPLEX_DOUBLE} }
|
||||
};
|
||||
|
||||
#endif // SF_DAQ_BUFFER_TYPEMAP_HPP
|
||||
@@ -1,9 +1,10 @@
|
||||
#ifndef SF_DAQ_BUFFER_FORMATS_HPP
|
||||
#define SF_DAQ_BUFFER_FORMATS_HPP
|
||||
|
||||
#include <iostream>
|
||||
#include <iostream>
|
||||
#include <vector>
|
||||
|
||||
#include "TypeMap.hpp"
|
||||
#include "buffer_config.hpp"
|
||||
#include "jungfrau.hpp"
|
||||
#include "jungfraujoch.hpp"
|
||||
@@ -24,11 +25,20 @@ struct ModuleFrame {
|
||||
#pragma pack(push)
|
||||
#pragma pack(1)
|
||||
struct ImageMetadata {
|
||||
uint64_t pulse_id;
|
||||
uint64_t frame_index;
|
||||
uint32_t daq_rec;
|
||||
uint32_t is_good_image;
|
||||
};
|
||||
uint64_t version; // protocol version
|
||||
|
||||
uint64_t id; // pulse_id for SF, frame_index for SLS
|
||||
uint64_t height; // in pixels
|
||||
uint64_t width; // in pixels
|
||||
|
||||
uint16_t dtype; // enum of data types (uint8, 16, 32, float etc.)
|
||||
uint16_t encoding; // enum of encodings (raw, bshuf_lz4...)
|
||||
uint16_t array_id; // if you want to interleave 2 buffers in the same data stream
|
||||
uint16_t status; // Denotate some status of the images - corrupt for example.
|
||||
|
||||
uint64_t user_1; // extra field for custom needs
|
||||
uint64_t user_2; // extra field for custom needs
|
||||
}
|
||||
#pragma pack(pop)
|
||||
|
||||
struct ModuleFrameBuffer {
|
||||
|
||||
@@ -164,3 +164,18 @@ char* RamBuffer::read_image(const uint64_t pulse_id) const
|
||||
|
||||
return src_data;
|
||||
}
|
||||
|
||||
|
||||
void RamBuffer::write_image(const ImageMetadata& src_meta, const char *src_data)
|
||||
{
|
||||
const int slot_n = src_meta.id % n_slots_;
|
||||
const int image_n_bytes = src_meta.height * src_meta.width * TypeMap[src_meta.dtype].size;
|
||||
|
||||
char *dst_data = image_buffer_ + (image_n_bytes * slot_n);
|
||||
|
||||
// Hope this won't segfault
|
||||
memcpy(dst_data, src_data, image_n_bytes);
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -28,8 +28,7 @@ int main (int argc, char *argv[])
|
||||
|
||||
auto ctx = zmq_ctx_new();
|
||||
zmq_ctx_set(ctx, ZMQ_IO_THREADS, ASSEMBLER_ZMQ_IO_THREADS);
|
||||
auto sender = BufferUtils::bind_socket(
|
||||
ctx, config.detector_name, stream_name);
|
||||
auto sender = BufferUtils::bind_socket(ctx, config.detector_name, stream_name);
|
||||
|
||||
ZmqPulseSyncReceiver receiver(ctx, config.detector_name, config.n_modules);
|
||||
RamBuffer ram_buffer(config.detector_name, config.n_modules);
|
||||
|
||||
@@ -0,0 +1,51 @@
|
||||
//
|
||||
// Weather update client in C++
|
||||
// Connects SUB socket to tcp://localhost:5556
|
||||
// Collects weather updates and finds avg temp in zipcode
|
||||
//
|
||||
|
||||
#include <zmq.hpp>
|
||||
#include <iostream>
|
||||
#include <sstream>
|
||||
|
||||
int main (int argc, char *argv[]){
|
||||
if (argc != 2) {
|
||||
std::cout << "\nERROR\nUsage: jf_buffer_writer [zmq_port]\n";
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
int zmq_port = atoi(argv[1]);
|
||||
std::string addr("tcp://localhost:" + std::to_string(zmq_port));
|
||||
|
||||
zmq::context_t context (1);
|
||||
|
||||
// Socket to talk to server
|
||||
std::cout << "Subscribing to server...\n" << std::endl;
|
||||
zmq::socket_t subscriber (context, ZMQ_SUB);
|
||||
subscriber.connect(addr.c_str());
|
||||
|
||||
// Subscribe to IMAGEDATA
|
||||
const char *filter = "IMAGEDATA";
|
||||
subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen (filter));
|
||||
|
||||
// Process 100 updates
|
||||
int num_img = 0;
|
||||
long total_temp = 0;
|
||||
zmq::message_t msg_topic;
|
||||
zmq::message_t msg_meta;
|
||||
zmq::message_t msg_image;
|
||||
std::cout << "I'm listening...\n" << std::endl;
|
||||
for (int idx = 0; idx < 100000; idx++) {
|
||||
subscriber.recv(&msg_topic);
|
||||
if(msg_topic.size()==strlen(filter)){
|
||||
subscriber.recv(&msg_meta);
|
||||
subscriber.recv(&msg_image);
|
||||
num_img++;
|
||||
}
|
||||
|
||||
if(idx%500==0){
|
||||
std::cout << "Received " << idx << " (at size " << msg_image.size() << " )\t Received: " << num_img << std::endl;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
@@ -0,0 +1,95 @@
|
||||
#include <zmq.hpp>
|
||||
#include <iostream>
|
||||
#include <sstream>
|
||||
|
||||
#include "../../core-buffer/include/buffer_config.hpp"
|
||||
|
||||
|
||||
int main (int argc, char *argv[]){
|
||||
if (argc != 4) {
|
||||
std::cout << "\nERROR\nUsage: jf_buffer_writer [zmq_topic] [zmq_sub_addr] [detector_name]\n";
|
||||
std::cout << "Topic corresponds to a supported format: IMAGEDATA\n";
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
|
||||
std::string topic(argv[1]);
|
||||
std::string sub_raw(argv[2]);
|
||||
std::string detector_name(argv[3]);
|
||||
std::string sub_addr("tcp://" + sub_raw);
|
||||
|
||||
std::cout << "Starting ZMQ receiver at:\nPORT:\t" << sub_addr << "\nTOPIC:\t" << topic << std::endl;
|
||||
|
||||
// Allocate the RamBuffer
|
||||
//RamBuffer buffer(config.detector_name, config.n_modules);
|
||||
|
||||
// ZMQ communication setup
|
||||
std::cout << "Subscribing to server...\n" << std::endl;
|
||||
zmq::context_t context (1);
|
||||
// Subscribe to TOPIC (expected schema)
|
||||
zmq::socket_t subscriber (context, ZMQ_SUB);
|
||||
subscriber.connect(sub_addr.c_str());
|
||||
subscriber.setsockopt(ZMQ_SUBSCRIBE, topic.c_str(), topic.size());
|
||||
|
||||
// Publisher to ipc
|
||||
std::cout << "Crating publisher...\n" << std::endl;
|
||||
|
||||
zmq::socket_t republisher (context, ZMQ_PUB);
|
||||
// const int sndhwm = buffer_config::BUFFER_ZMQ_SNDHWM;
|
||||
// republisher.setsockopt(ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm));
|
||||
// const int linger = 0;
|
||||
// republisher.setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
|
||||
|
||||
// std::string ipc_addr = buffer_config::BUFFER_LIVE_IPC_URL + detector_name + "-assembler";
|
||||
std::string ipc_addr = "tcp://*:5333";
|
||||
try{
|
||||
republisher.bind(ipc_addr.c_str());
|
||||
} catch (std::exception& ex){
|
||||
std::string msg = "Failed to bing publisher to address '" + ipc_addr + "': " + ex.what();
|
||||
throw std::runtime_error(msg);
|
||||
|
||||
}
|
||||
|
||||
|
||||
// Process 100 updates
|
||||
int num_img = 0;
|
||||
long total_temp = 0;
|
||||
zmq::message_t msg_topic;
|
||||
zmq::message_t msg_meta;
|
||||
zmq::message_t msg_data;
|
||||
|
||||
std::cout << "I'm listening...\n" << std::endl;
|
||||
for (int idx = 0; idx < 100000; idx++) {
|
||||
// ZMQ guarantees full delivery of multipart massages!
|
||||
// Packets are sent as three part messages: topic + meta + data
|
||||
// Blocks until recv succesfull!
|
||||
subscriber.recv(&msg_topic, 0);
|
||||
subscriber.recv(&msg_meta, 0);
|
||||
subscriber.recv(&msg_data, 0);
|
||||
|
||||
// Schema (topic) specific saving)
|
||||
if(topic=="IMAGEDATA"){
|
||||
//buffer.write_image((ImageMetadata*)msg_meta.data(), (char*)msg_data.data);
|
||||
if(idx%100==0){
|
||||
std::cout << "Received " << idx << " (at size " << msg_data.size() << " )" << std::endl;
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user