diff --git a/sf-writer/include/WriterZmqReceiver.hpp b/sf-writer/include/WriterZmqReceiver.hpp deleted file mode 100644 index 96bc16c..0000000 --- a/sf-writer/include/WriterZmqReceiver.hpp +++ /dev/null @@ -1,34 +0,0 @@ -#ifndef SF_DAQ_BUFFER_WRITERZMQRECEIVER_HPP -#define SF_DAQ_BUFFER_WRITERZMQRECEIVER_HPP - -#include -#include "JFH5Writer.hpp" -#include -#include - - -class WriterZmqReceiver { - - const size_t n_modules_; - std::vector sockets_; - const uint64_t stop_pulse_id_; - - ModuleFrame f_meta_; - -public: - WriterZmqReceiver( - void *ctx, - const std::string& ipc_prefix, - const size_t n_modules, - const uint64_t stop_pulse_id); - - virtual ~WriterZmqReceiver(); - - void get_next_buffer( - const uint64_t start_pulse_id, - ImageMetadataBlock* i_meta, - char* image_buffer); -}; - - -#endif //SF_DAQ_BUFFER_WRITERZMQRECEIVER_HPP diff --git a/sf-writer/src/JFH5Writer.cpp b/sf-writer/src/JFH5Writer.cpp index 1fdb94a..33bfe9f 100644 --- a/sf-writer/src/JFH5Writer.cpp +++ b/sf-writer/src/JFH5Writer.cpp @@ -138,14 +138,37 @@ void JFH5Writer::write( throw runtime_error("Received unexpected block for stop_pulse_id."); } - hsize_t offset[] = {current_write_index_, 0, 0}; - H5DOwrite_chunk( - image_dataset_.getId(), - H5P_DEFAULT, - 0, - offset, - n_images_to_copy * MODULE_N_BYTES * n_modules_, - data); + hsize_t b_i_dims[3] = {BUFFER_BLOCK_SIZE, + MODULE_Y_SIZE * n_modules_, + MODULE_X_SIZE}; + H5::DataSpace b_i_space(3, b_i_dims); + hsize_t b_i_count[] = {n_images_to_copy, + MODULE_Y_SIZE * n_modules_, + MODULE_X_SIZE}; + hsize_t b_i_start[] = {current_write_index_, 0, 0}; + b_i_space.selectHyperslab(H5S_SELECT_SET, b_i_count, b_i_start); + + hsize_t f_i_dims[3] = {n_images_, + MODULE_Y_SIZE * n_modules_, + MODULE_X_SIZE}; + H5::DataSpace f_i_space(3, f_i_dims); + hsize_t f_i_count[] = {n_images_to_copy, + MODULE_Y_SIZE * n_modules_, + MODULE_X_SIZE}; + hsize_t f_i_start[] = {current_write_index_, 0, 0}; + f_i_space.selectHyperslab(H5S_SELECT_SET, f_i_count, f_i_start); + + image_dataset_.write( + data, H5::PredType::NATIVE_UINT16, b_i_space, f_i_space); + +// hsize_t offset[] = {current_write_index_, 0, 0}; +// H5DOwrite_chunk( +// image_dataset_.getId(), +// H5P_DEFAULT, +// 0, +// offset, +// n_images_to_copy * MODULE_N_BYTES * n_modules_, +// data); // pulse_id { diff --git a/sf-writer/src/WriterZmqReceiver.cpp b/sf-writer/src/WriterZmqReceiver.cpp deleted file mode 100644 index 0d3d8f8..0000000 --- a/sf-writer/src/WriterZmqReceiver.cpp +++ /dev/null @@ -1,138 +0,0 @@ -#include "WriterZmqReceiver.hpp" -#include "zmq.h" -#include "date.h" -#include -#include - -using namespace std; -using namespace core_buffer; - -WriterZmqReceiver::WriterZmqReceiver( - void *ctx, - const string &ipc_prefix, - const size_t n_modules, - const uint64_t stop_pulse_id_) : - n_modules_(n_modules), - sockets_(n_modules), - stop_pulse_id_(stop_pulse_id_) -{ - - for (size_t i = 0; i < n_modules; i++) { - sockets_[i] = zmq_socket(ctx, ZMQ_PULL); - - int rcvhwm = WRITER_RCVHWM; - if (zmq_setsockopt(sockets_[i], ZMQ_RCVHWM, &rcvhwm, - sizeof(rcvhwm)) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - int linger = 0; - if (zmq_setsockopt(sockets_[i], ZMQ_LINGER, &linger, - sizeof(linger)) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - - stringstream ipc_addr; - ipc_addr << ipc_prefix << i; - const auto ipc = ipc_addr.str(); - - if (zmq_connect(sockets_[i], ipc.c_str()) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - } -} - -WriterZmqReceiver::~WriterZmqReceiver() -{ - for (size_t i = 0; i < n_modules_; i++) { - zmq_close(sockets_[i]); - } -} - -void WriterZmqReceiver::get_next_buffer( - const uint64_t start_pulse_id, - ImageMetadataBlock* i_meta, - char* image_buffer) -{ - auto n_images_in_buffer = WRITER_DATA_CACHE_N_IMAGES; - auto images_left = stop_pulse_id_ - start_pulse_id + 1; - if (images_left < n_images_in_buffer) { - n_images_in_buffer = images_left; - } - -// i_meta->n_images = (uint16_t)n_images_in_buffer; - - for (uint64_t i_pulse=0; i_pulsepulse_id[i_pulse] = pulse_id; - i_meta->is_good_image[i_pulse] = 1; - i_meta->frame_index[i_pulse] = 0; - i_meta->daq_rec[i_pulse] = 0; - - for (size_t i_module = 0; i_module < n_modules_; i_module++) { - - auto n_bytes_metadata = zmq_recv( - sockets_[i_module], &f_meta_, sizeof(f_meta_), 0); - - if (n_bytes_metadata != sizeof(f_meta_)) { - throw runtime_error("Wrong number of metadata bytes."); - } - - if (f_meta_.pulse_id == 0) { - i_meta->is_good_image[i_pulse] = 0; - - } else { - if (!pulse_id_initialized) { - // Init the image metadata with the first valid frame. - pulse_id_initialized = true; - - i_meta->frame_index[i_pulse] = f_meta_.frame_index; - i_meta->daq_rec[i_pulse] = f_meta_.daq_rec; - } - - if (f_meta_.pulse_id != i_meta->pulse_id[i_pulse]) { - stringstream err_msg; - - err_msg << "[WriterZmqReceiver::get_next_buffer]"; - err_msg << " Read unexpected pulse_id. "; - err_msg << " Expected " << pulse_id; - err_msg << " received "; - err_msg << f_meta_.pulse_id; - err_msg << " from i_module " << i_module << endl; - - throw runtime_error(err_msg.str()); - } - } - - // Once the image is not good, we don't care to re-flag it. - if (i_meta->is_good_image[i_pulse] == 1) { - - if (f_meta_.frame_index != i_meta->frame_index[i_pulse]) { - i_meta->is_good_image[i_pulse] = 0; - } - - if (f_meta_.daq_rec != i_meta->daq_rec[i_pulse]) { - i_meta->is_good_image[i_pulse] = 0; - } - - if (f_meta_.n_received_packets != JF_N_PACKETS_PER_FRAME) { - i_meta->is_good_image[i_pulse] = 0; - } - } - - auto pulse_offset = i_pulse * n_modules_ * MODULE_N_BYTES ; - auto module_offset = i_module * MODULE_N_BYTES; - - auto n_bytes_image = zmq_recv( - sockets_[i_module], - (image_buffer + pulse_offset + module_offset), - MODULE_N_BYTES, 0); - - if (n_bytes_image != MODULE_N_BYTES) { - throw runtime_error("Wrong number of data bytes."); - } - } - } -} diff --git a/sf-writer/test/test_WriterZmqReceiver.cpp b/sf-writer/test/test_WriterZmqReceiver.cpp deleted file mode 100644 index 402e409..0000000 --- a/sf-writer/test/test_WriterZmqReceiver.cpp +++ /dev/null @@ -1,80 +0,0 @@ -#include -#include "WriterZmqReceiver.hpp" -#include "bitshuffle/bitshuffle.h" -#include -#include -#include "buffer_config.hpp" -#include "zmq.h" - -using namespace std; -using namespace core_buffer; - -TEST(WriterZmqReceiver, basic_test) -{ - size_t n_modules = 4; - uint64_t pulse_id = 12345; - - auto ctx = zmq_ctx_new(); - zmq_ctx_set (ctx, ZMQ_IO_THREADS, 1); - - void* sockets[n_modules]; - for (size_t i = 0; i < n_modules; i++) { - sockets[i] = zmq_socket(ctx, ZMQ_PUSH); - - int linger = 0; - if (zmq_setsockopt(sockets[i], ZMQ_LINGER, &linger, - sizeof(linger)) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - - stringstream ipc_addr; - ipc_addr << REPLAY_STREAM_IPC_URL << i; - const auto ipc = ipc_addr.str(); - - if (zmq_bind(sockets[i], ipc.c_str()) != 0) { - throw runtime_error(zmq_strerror(errno)); - } - } - this_thread::sleep_for(chrono::milliseconds(100)); - - WriterZmqReceiver receiver(ctx, REPLAY_STREAM_IPC_URL, n_modules); - this_thread::sleep_for(chrono::milliseconds(100)); - - size_t compressed_frame_size = 5000; - auto frame_buffer = make_unique(compressed_frame_size); - - ImageMetadata image_metadata; - auto compress_size = bshuf_compress_lz4_bound( - MODULE_N_PIXELS, PIXEL_N_BYTES, MODULE_N_PIXELS); - auto image_buffer = make_unique(compress_size * n_modules); - - for (size_t i = 0; i < n_modules; i++) { - - ReplayModuleFrameBuffer frame_metadata; - frame_metadata.metadata.pulse_id = pulse_id; - frame_metadata.metadata.frame_index = pulse_id + 100; - frame_metadata.metadata.n_received_packets = 128; - frame_metadata.metadata.daq_rec = 4; - - frame_metadata.is_frame_present = 1; - frame_metadata.data_n_bytes = compressed_frame_size; - - zmq_send(sockets[i], - &frame_metadata, - sizeof(ReplayModuleFrameBuffer), - ZMQ_SNDMORE); - - zmq_send(sockets[i], - (char*)(frame_buffer.get()), - compressed_frame_size, - 0); - } - - receiver.get_next_buffer(pulse_id, &image_metadata, image_buffer.get()); - EXPECT_EQ(pulse_id, image_metadata.pulse_id); - EXPECT_EQ(image_metadata.is_good_frame, 1); - EXPECT_EQ(image_metadata.daq_rec, 4); - EXPECT_EQ(image_metadata.data_n_bytes, - 5000*n_modules); -// 5000*n_modules+BSHUF_LZ4_HEADER_BYTES); -} \ No newline at end of file