diff --git a/core-buffer/include/BufferUtils.hpp b/core-buffer/include/BufferUtils.hpp index 2aa7139..38a7b56 100644 --- a/core-buffer/include/BufferUtils.hpp +++ b/core-buffer/include/BufferUtils.hpp @@ -56,6 +56,11 @@ namespace BufferUtils const std::string& detector_name, const std::string& stream_name); + void* connect_socket_gf( + void* ctx, + const std::string& detector_name, + const std::string& stream_name); + DetectorConfig read_json_config(const std::string& filename); } diff --git a/core-buffer/src/BufferUtils.cpp b/core-buffer/src/BufferUtils.cpp index 314f3db..018f0e6 100644 --- a/core-buffer/src/BufferUtils.cpp +++ b/core-buffer/src/BufferUtils.cpp @@ -124,6 +124,41 @@ void* BufferUtils::connect_socket( return socket; } +void* BufferUtils::connect_socket_gf( + void* ctx, const string& detector_name, const string& stream_name) +{ + // string ipc_address = buffer_config::IPC_URL_BASE + + // detector_name + "-" + + // stream_name; + + string ipc_address = stream_name; + + void* socket = zmq_socket(ctx, ZMQ_SUB); + if (socket == nullptr) { + throw runtime_error(zmq_strerror(errno)); + } + + int rcvhwm = BUFFER_ZMQ_RCVHWM; + if (zmq_setsockopt(socket, ZMQ_RCVHWM, &rcvhwm, sizeof(rcvhwm)) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + int linger = 0; + if (zmq_setsockopt(socket, ZMQ_LINGER, &linger, sizeof(linger)) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + if (zmq_connect(socket, ipc_address.c_str()) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + if (zmq_setsockopt(socket, ZMQ_SUBSCRIBE, "", 0) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + return socket; +} + void* BufferUtils::bind_socket( void* ctx, const string& detector_name, const string& stream_name) { diff --git a/gf/gf.json b/gf/gf.json new file mode 100644 index 0000000..64aef5d --- /dev/null +++ b/gf/gf.json @@ -0,0 +1,8 @@ +{ + "detector_name": "GF1", + "detector_type": "gf", + "n_modules": 16, + "image_height": 1200, + "image_width": 2016, + "start_udp_port": 50200 +} diff --git a/std-det-writer/include/JFH5Writer.hpp b/std-det-writer/include/JFH5Writer.hpp index 2702f4e..8e439db 100644 --- a/std-det-writer/include/JFH5Writer.hpp +++ b/std-det-writer/include/JFH5Writer.hpp @@ -52,6 +52,11 @@ public: void write_meta(const int64_t run_id, const uint32_t index, const ImageMetadata* meta); + + void write_meta_gf(const int64_t run_id, + const uint32_t index, + const uint16_t id, + const uint64_t status); }; #endif //JF_LIVE_WRITER_HPP diff --git a/std-det-writer/src/JFH5Writer.cpp b/std-det-writer/src/JFH5Writer.cpp index 98d93fb..9cbadb2 100644 --- a/std-det-writer/src/JFH5Writer.cpp +++ b/std-det-writer/src/JFH5Writer.cpp @@ -236,6 +236,47 @@ void JFH5Writer::write_data( H5Sclose(ram_ds); } +void JFH5Writer::write_meta_gf( + const int64_t run_id, const uint32_t index, const uint16_t id, const uint64_t status) +{ + if (run_id != current_run_id_) { + throw runtime_error("Invalid run_id."); + } + + const hsize_t ram_dims[3] = {1, 1, 1}; + auto ram_ds = H5Screate_simple(3, ram_dims, nullptr); + if (ram_ds < 0) { + throw runtime_error("Cannot create metadata ram dataspace."); + } + + auto file_ds = H5Dget_space(image_id_dataset_); + if (file_ds < 0) { + throw runtime_error("Cannot get metadata dataset file dataspace."); + } + + const hsize_t file_ds_start[] = {index, 0, 0}; + const hsize_t file_ds_stride[] = {1, 1, 1}; + const hsize_t file_ds_count[] = {1, 1, 1}; + const hsize_t file_ds_block[] = {1, 1, 1}; + if (H5Sselect_hyperslab(file_ds, H5S_SELECT_SET, + file_ds_start, file_ds_stride, file_ds_count, file_ds_block) < 0) { + throw runtime_error("Cannot select metadata dataset file hyperslab."); + } + + if (H5Dwrite(image_id_dataset_, H5T_NATIVE_UINT64, + ram_ds, file_ds, H5P_DEFAULT, &id) < 0) { + throw runtime_error("Cannot write data to pulse_id dataset."); + } + + if (H5Dwrite(status_dataset_, H5T_NATIVE_UINT64, + ram_ds, file_ds, H5P_DEFAULT, &status) < 0) { + throw runtime_error("Cannot write data to is_good_image dataset."); + } + + H5Sclose(file_ds); + H5Sclose(ram_ds); +} + void JFH5Writer::write_meta( const int64_t run_id, const uint32_t index, const ImageMetadata* meta) { diff --git a/std-det-writer/src/main.cpp b/std-det-writer/src/main.cpp index bbd9b4e..58a92af 100644 --- a/std-det-writer/src/main.cpp +++ b/std-det-writer/src/main.cpp @@ -11,6 +11,8 @@ #include "DetWriterConfig.hpp" #include "rapidjson/document.h" +#include "rapidjson/writer.h" + using namespace std; using namespace buffer_config; @@ -42,65 +44,96 @@ int main (int argc, char *argv[]) auto ctx = zmq_ctx_new(); zmq_ctx_set(ctx, ZMQ_IO_THREADS, LIVE_ZMQ_IO_THREADS); - auto receiver = BufferUtils::connect_socket( - ctx, config.detector_name, "writer_agent"); + auto receiver = BufferUtils::connect_socket_gf( + ctx, config.detector_name, "tcp://localhost:9911"); const size_t IMAGE_N_BYTES = config.image_width * config.image_height * bit_depth / 8; - RamBuffer image_buffer(config.detector_name + "_assembler", - sizeof(ImageMetadata), IMAGE_N_BYTES, 1, RAM_BUFFER_N_SLOTS); + JFH5Writer writer(config.detector_name); WriterStats stats(config.detector_name, IMAGE_N_BYTES); - char recv_buffer[8192]; + char recv_buffer_meta[512]; + char recv_buffer_data[4838400]; + bool open_run = false; + bool header_in = false; + int last_run_id = -1; while (true) { - auto nbytes = zmq_recv(receiver, &recv_buffer, sizeof(recv_buffer), 0); - rapidjson::Document document; - if (document.Parse(recv_buffer, nbytes).HasParseError()) { - std::string error_str(recv_buffer, nbytes); - throw runtime_error(error_str); - } - - const string output_file = document["output_file"].GetString(); - const uint64_t image_id = document["image_id"].GetUint64(); - const int run_id = document["run_id"].GetInt(); - const int i_image = document["i_image"].GetInt(); - const int n_images = document["n_images"].GetInt(); - // i_image == n_images -> end of run. - if (i_image == n_images) { - writer.close_run(); - stats.end_run(); - continue; - } - // i_image == 0 -> we have a new run. - if (i_image == 0) { - auto image_meta = (ImageMetadata*) - image_buffer.get_slot_meta(image_id); + //meta + auto header_nbytes = zmq_recv(receiver, &recv_buffer_meta, sizeof(recv_buffer_meta), 0); + if (header_nbytes != -1 && header_in == false){ + header_in = true; + // cout << "Header nbytes " << header_nbytes << endl; + rapidjson::Document document; + if (document.Parse(recv_buffer_meta, header_nbytes).HasParseError()) { + std::string error_str(recv_buffer_meta, header_nbytes); + throw runtime_error(error_str); + } - writer.open_run(output_file, - run_id, - n_images, - image_meta->height, - image_meta->width, - image_meta->dtype); - } + // #ifdef DEBUG_OUTPUT + // // using namespace date; + // // cout << " [" << std::chrono::system_clock::now() << "]"; + // cout << " [std_det_writer::json metadata] :"; + // rapidjson::StringBuffer strbuf; + // strbuf.Clear(); + // rapidjson::Writer writer_json_dsa(strbuf); + // document.Accept(writer_json_dsa); + // cout << strbuf.GetString() << endl; + // cout << endl; + // #endif + const string output_file = document["output_file"].GetString(); + const int run_id = document["run_id"].GetInt(); + const int i_image = document["i_image"].GetInt(); + const int n_images = document["n_images"].GetInt(); + const int status = document["status"].GetInt(); + const rapidjson::Value& a = document["shape"]; + const int width = a[0].GetInt(); + const int heigth = a[1].GetInt(); + const int dtype = 2; - // Fair distribution of images among writers. - if (i_image % n_writers == i_writer) { - char* data = image_buffer.get_slot_data(image_id); + // i_image == n_images -> end of run. + if (i_image == n_images && open_run == true) { + writer.close_run(); + cout << "[ Writer" << i_writer << " AFTER CLOSE RUN.... " << endl; + stats.end_run(); + open_run = false; + continue; + } - stats.start_image_write(); - writer.write_data(run_id, i_image, data); - stats.end_image_write(); - } + // i_image == 0 -> we have a new run. + if (i_image == 0 && open_run == false) { + writer.open_run(output_file, + run_id, + n_images, + heigth, + width, + dtype); + + open_run = true; + } + // data + auto img_nbytes = zmq_recv(receiver, &recv_buffer_data, sizeof(recv_buffer_data), 0); + if (img_nbytes != -1 && header_in == true && open_run == true){ + // Fair distribution of images among writers. + if (i_image % n_writers == i_writer) { + cout << "[ Writer" << i_writer << ":: Frame" << i_image << "]"<< endl; + stats.start_image_write(); + writer.write_data(run_id, i_image, recv_buffer_data); + stats.end_image_write(); + } + header_in = false; + } - // Only the first instance writes metadata. - if (i_writer == 0) { - auto image_meta = (ImageMetadata*) - image_buffer.get_slot_meta(image_id); - writer.write_meta(run_id, i_image, image_meta); + // Only the first instance writes metadata. + if (i_writer == 0 && header_in == true && open_run == true) { + cout << "[ Writer" << i_writer << "::META from image" << i_image << "]"<< endl; + writer.write_meta_gf(run_id, + i_image, + (uint16_t)run_id, + (uint64_t)status); + } } }