diff --git a/core-buffer/include/RamBuffer.hpp b/core-buffer/include/RamBuffer.hpp index c4bb28f..ead2221 100644 --- a/core-buffer/include/RamBuffer.hpp +++ b/core-buffer/include/RamBuffer.hpp @@ -26,11 +26,9 @@ public: ~RamBuffer(); void write_frame(const ModuleFrame &src_meta, const char *src_data) const; - void read_frame(const uint64_t pulse_id, - const uint64_t module_id, - ModuleFrame &meta, - char *data) const; + void read_frame(const uint64_t pulse_id, const uint64_t module_id, ModuleFrame &meta, char *data) const; char* read_image(const uint64_t pulse_id) const; + char* write_image(const ImageMetadata &src_meta, const char *src_data); void assemble_image(const uint64_t pulse_id, ImageMetadata &image_meta) const; }; diff --git a/core-buffer/include/TypeMap.hpp b/core-buffer/include/TypeMap.hpp new file mode 100644 index 0000000..5835e31 --- /dev/null +++ b/core-buffer/include/TypeMap.hpp @@ -0,0 +1,61 @@ +#ifndef SF_DAQ_BUFFER_TYPEMAP_HPP +#define SF_DAQ_BUFFER_TYPEMAP_HPP + +enum class TypeMap { + VOID, + CHAR, + INT8, + UINT8, + INT16, + UINT16, + INT32, + UINT32, + INT64, + UINT64, + FLOAT, + DOUBLE, + COMPLEX_FLOAT, + COMPLEX_DOUBLE +}; + +struct Type{ + const size_t size; + const int value; +}; + + +const std::unordered_map TypeTable = { + { typeid(void), {sizeof(void), TypeMap::VOID} }, + { typeid(char), {sizeof(char), TypeMap::CHAR} }, + { typeid(int8_t), {sizeof(int8_t), TypeMap::INT8} }, + { typeid(uint8_t), {sizeof(uint8_t), TypeMap::UINT8} }, + { typeid(int16_t), {sizeof(int16_t), TypeMap::INT16} }, + { typeid(uint16_t), {sizeof(uint16_t), TypeMap::UINT16} }, + { typeid(int32_t), {sizeof(int32_t), TypeMap::INT32} }, + { typeid(uint32_t), {sizeof(uint32_t), TypeMap::UINT32} }, + { typeid(int64_t), {sizeof(int64_t), TypeMap::INT64} }, + { typeid(uint64_t), {sizeof(uint64_t), TypeMap::UINT64} }, + { typeid(float), {sizeof(float), TypeMap::float} }, + { typeid(double), {sizeof(double), TypeMap::DOUBLE} }, + { typeid(std::complex), {sizeof(std::complex), TypeMap::COMPLEX_FLOAT} }, + { typeid(std::complex), {sizeof(std::complex), TypeMap::COMPLEX_DOUBLE} } +}; + +const std::unordered_map TypeTable = { + { TypeMap::VOID, {sizeof(void), TypeMap::VOID} }, + { TypeMap::CHAR, {sizeof(char), TypeMap::CHAR} }, + { TypeMap::INT8, {sizeof(int8_t), TypeMap::INT8} }, + { TypeMap::UINT8, {sizeof(uint8_t), TypeMap::UINT8} }, + { TypeMap::INT16, {sizeof(int16_t), TypeMap::INT16} }, + { TypeMap::UINT16, {sizeof(uint16_t), TypeMap::UINT16} }, + { TypeMap::INT32, {sizeof(int32_t), TypeMap::INT32} }, + { TypeMap::UINT32, {sizeof(uint32_t), TypeMap::UINT32} }, + { TypeMap::INT64, {sizeof(int64_t), TypeMap::INT64} }, + { TypeMap::UINT64, {sizeof(uint64_t), TypeMap::UINT64} }, + { TypeMap::FLOAT, {sizeof(float), TypeMap::float} }, + { TypeMap::DOUBLE, {sizeof(double), TypeMap::DOUBLE} }, + { TypeMap::COMPLEX_FLOAT, {sizeof(std::complex), TypeMap::COMPLEX_FLOAT} }, + { TypeMap::COMPLEX_DOUBLE, {sizeof(std::complex), TypeMap::COMPLEX_DOUBLE} } +}; + +#endif // SF_DAQ_BUFFER_TYPEMAP_HPP diff --git a/core-buffer/include/formats.hpp b/core-buffer/include/formats.hpp index 4ad623c..462deb5 100644 --- a/core-buffer/include/formats.hpp +++ b/core-buffer/include/formats.hpp @@ -1,9 +1,10 @@ #ifndef SF_DAQ_BUFFER_FORMATS_HPP #define SF_DAQ_BUFFER_FORMATS_HPP -#include +#include #include +#include "TypeMap.hpp" #include "buffer_config.hpp" #include "jungfrau.hpp" #include "jungfraujoch.hpp" @@ -24,11 +25,20 @@ struct ModuleFrame { #pragma pack(push) #pragma pack(1) struct ImageMetadata { - uint64_t pulse_id; - uint64_t frame_index; - uint32_t daq_rec; - uint32_t is_good_image; -}; + uint64_t version; // protocol version + + uint64_t id; // pulse_id for SF, frame_index for SLS + uint64_t height; // in pixels + uint64_t width; // in pixels + + uint16_t dtype; // enum of data types (uint8, 16, 32, float etc.) + uint16_t encoding; // enum of encodings (raw, bshuf_lz4...) + uint16_t array_id; // if you want to interleave 2 buffers in the same data stream + uint16_t status; // Denotate some status of the images - corrupt for example. + + uint64_t user_1; // extra field for custom needs + uint64_t user_2; // extra field for custom needs +} #pragma pack(pop) struct ModuleFrameBuffer { diff --git a/core-buffer/src/RamBuffer.cpp b/core-buffer/src/RamBuffer.cpp index ff14011..6829112 100644 --- a/core-buffer/src/RamBuffer.cpp +++ b/core-buffer/src/RamBuffer.cpp @@ -164,3 +164,18 @@ char* RamBuffer::read_image(const uint64_t pulse_id) const return src_data; } + + +void RamBuffer::write_image(const ImageMetadata& src_meta, const char *src_data) +{ + const int slot_n = src_meta.id % n_slots_; + const int image_n_bytes = src_meta.height * src_meta.width * TypeMap[src_meta.dtype].size; + + char *dst_data = image_buffer_ + (image_n_bytes * slot_n); + + // Hope this won't segfault + memcpy(dst_data, src_data, image_n_bytes); +} + + + diff --git a/jf-assembler/src/main.cpp b/jf-assembler/src/main.cpp index e1b76ab..9fc5eb3 100644 --- a/jf-assembler/src/main.cpp +++ b/jf-assembler/src/main.cpp @@ -28,8 +28,7 @@ int main (int argc, char *argv[]) auto ctx = zmq_ctx_new(); zmq_ctx_set(ctx, ZMQ_IO_THREADS, ASSEMBLER_ZMQ_IO_THREADS); - auto sender = BufferUtils::bind_socket( - ctx, config.detector_name, stream_name); + auto sender = BufferUtils::bind_socket(ctx, config.detector_name, stream_name); ZmqPulseSyncReceiver receiver(ctx, config.detector_name, config.n_modules); RamBuffer ram_buffer(config.detector_name, config.n_modules); diff --git a/zmq-receiver/src/SimulatedZmqReceiver.cpp b/zmq-receiver/src/SimulatedZmqReceiver.cpp new file mode 100644 index 0000000..499785d --- /dev/null +++ b/zmq-receiver/src/SimulatedZmqReceiver.cpp @@ -0,0 +1,51 @@ +// +// Weather update client in C++ +// Connects SUB socket to tcp://localhost:5556 +// Collects weather updates and finds avg temp in zipcode +// + +#include +#include +#include + +int main (int argc, char *argv[]){ + if (argc != 2) { + std::cout << "\nERROR\nUsage: jf_buffer_writer [zmq_port]\n"; + exit(-1); + } + + int zmq_port = atoi(argv[1]); + std::string addr("tcp://localhost:" + std::to_string(zmq_port)); + + zmq::context_t context (1); + + // Socket to talk to server + std::cout << "Subscribing to server...\n" << std::endl; + zmq::socket_t subscriber (context, ZMQ_SUB); + subscriber.connect(addr.c_str()); + + // Subscribe to IMAGEDATA + const char *filter = "IMAGEDATA"; + subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen (filter)); + + // Process 100 updates + int num_img = 0; + long total_temp = 0; + zmq::message_t msg_topic; + zmq::message_t msg_meta; + zmq::message_t msg_image; + std::cout << "I'm listening...\n" << std::endl; + for (int idx = 0; idx < 100000; idx++) { + subscriber.recv(&msg_topic); + if(msg_topic.size()==strlen(filter)){ + subscriber.recv(&msg_meta); + subscriber.recv(&msg_image); + num_img++; + } + + if(idx%500==0){ + std::cout << "Received " << idx << " (at size " << msg_image.size() << " )\t Received: " << num_img << std::endl; + } + } + return 0; +} diff --git a/zmq-receiver/src/main.cpp b/zmq-receiver/src/main.cpp new file mode 100644 index 0000000..83c5dc9 --- /dev/null +++ b/zmq-receiver/src/main.cpp @@ -0,0 +1,95 @@ +#include +#include +#include + +#include "../../core-buffer/include/buffer_config.hpp" + + +int main (int argc, char *argv[]){ + if (argc != 4) { + std::cout << "\nERROR\nUsage: jf_buffer_writer [zmq_topic] [zmq_sub_addr] [detector_name]\n"; + std::cout << "Topic corresponds to a supported format: IMAGEDATA\n"; + exit(-1); + } + + + std::string topic(argv[1]); + std::string sub_raw(argv[2]); + std::string detector_name(argv[3]); + std::string sub_addr("tcp://" + sub_raw); + + std::cout << "Starting ZMQ receiver at:\nPORT:\t" << sub_addr << "\nTOPIC:\t" << topic << std::endl; + + // Allocate the RamBuffer + //RamBuffer buffer(config.detector_name, config.n_modules); + + // ZMQ communication setup + std::cout << "Subscribing to server...\n" << std::endl; + zmq::context_t context (1); + // Subscribe to TOPIC (expected schema) + zmq::socket_t subscriber (context, ZMQ_SUB); + subscriber.connect(sub_addr.c_str()); + subscriber.setsockopt(ZMQ_SUBSCRIBE, topic.c_str(), topic.size()); + + // Publisher to ipc + std::cout << "Crating publisher...\n" << std::endl; + + zmq::socket_t republisher (context, ZMQ_PUB); +// const int sndhwm = buffer_config::BUFFER_ZMQ_SNDHWM; +// republisher.setsockopt(ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)); +// const int linger = 0; +// republisher.setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); + +// std::string ipc_addr = buffer_config::BUFFER_LIVE_IPC_URL + detector_name + "-assembler"; + std::string ipc_addr = "tcp://*:5333"; + try{ + republisher.bind(ipc_addr.c_str()); + } catch (std::exception& ex){ + std::string msg = "Failed to bing publisher to address '" + ipc_addr + "': " + ex.what(); + throw std::runtime_error(msg); + + } + + + // Process 100 updates + int num_img = 0; + long total_temp = 0; + zmq::message_t msg_topic; + zmq::message_t msg_meta; + zmq::message_t msg_data; + + std::cout << "I'm listening...\n" << std::endl; + for (int idx = 0; idx < 100000; idx++) { + // ZMQ guarantees full delivery of multipart massages! + // Packets are sent as three part messages: topic + meta + data + // Blocks until recv succesfull! + subscriber.recv(&msg_topic, 0); + subscriber.recv(&msg_meta, 0); + subscriber.recv(&msg_data, 0); + + // Schema (topic) specific saving) + if(topic=="IMAGEDATA"){ + //buffer.write_image((ImageMetadata*)msg_meta.data(), (char*)msg_data.data); + if(idx%100==0){ + std::cout << "Received " << idx << " (at size " << msg_data.size() << " )" << std::endl; + } + } +} + return 0; +} + + + + + + + + + + + + + + + +