diff --git a/core-buffer/include/LiveH5Reader.hpp b/core-buffer/include/LiveH5Reader.hpp new file mode 100644 index 0000000..ebbe69f --- /dev/null +++ b/core-buffer/include/LiveH5Reader.hpp @@ -0,0 +1,47 @@ +#ifndef SF_DAQ_BUFFER_LIVEH5READER_HPP +#define SF_DAQ_BUFFER_LIVEH5READER_HPP + +#include +#include +#include "jungfrau.hpp" +#include "buffer_config.hpp" +#include + +class LiveH5Reader { + + const std::string current_filename_; + const uint16_t source_id_; + + std::unique_ptr pulse_id_buffer_; + std::unique_ptr data_buffer_; + + uint64_t current_file_max_pulse_id_; + H5::H5File file_; + + H5::DataSet image_dataset_; + H5::DataSet pulse_id_dataset_; + H5::DataSet frame_index_dataset_; + H5::DataSet daq_rec_dataset_; + H5::DataSet n_received_packets_dataset_; + + void open_file(); + +public: + LiveH5Reader( + const std::string& device, + const std::string& channel_name, + const uint16_t source_id); + + ~LiveH5Reader(); + + uint64_t get_latest_pulse_id(); + void load_pulse_id(uint64_t pulse_id); + + ModuleFrame get_metadata(); + char* get_data(); + + void close_file(); +}; + + +#endif //SF_DAQ_BUFFER_LIVEH5READER_HPP diff --git a/core-buffer/include/UdpRecvModule.hpp b/core-buffer/include/UdpRecvModule.hpp index e884cc0..36e9afc 100644 --- a/core-buffer/include/UdpRecvModule.hpp +++ b/core-buffer/include/UdpRecvModule.hpp @@ -2,30 +2,31 @@ #define UDPRECVMODULE_HPP #include "RingBuffer.hpp" +#include "FastQueue.hpp" +#include "jungfrau.hpp" #include class UdpRecvModule { - RingBuffer& ring_buffer_; - - std::atomic_bool is_receiving_; + FastQueue& queue_; std::thread receiving_thread_; + std::atomic_bool is_receiving_; + + inline void init_frame( + ModuleFrame* frame_metadata, + jungfrau_packet& packet_buffer); + + inline void reserve_next_frame_buffers( + ModuleFrame*& frame_metadata, + char*& frame_buffer); protected: - void receive_thread( - const uint16_t udp_port, - const size_t frame_size); + void receive_thread(const uint16_t udp_port); public: - UdpRecvModule(RingBuffer& ring_buffer); - + UdpRecvModule(FastQueue& queue, const uint16_t udp_port); virtual ~UdpRecvModule(); - void start_recv( - const uint16_t udp_port, - const size_t frame_n_bytes); - void stop_recv(); - bool is_receiving(); }; diff --git a/core-buffer/include/buffer_config.hpp b/core-buffer/include/buffer_config.hpp index b9a8349..1b9c3a8 100644 --- a/core-buffer/include/buffer_config.hpp +++ b/core-buffer/include/buffer_config.hpp @@ -26,11 +26,17 @@ namespace core_buffer { const size_t REPLAY_READ_BLOCK_SIZE = 100; // Size of sf_buffer RB in elements. - const size_t BUFFER_RB_SIZE = 1000; + const size_t BUFFER_INTERNAL_QUEUE_SIZE = 1000; + + // Time to sleep before retrying to read the queue. + const size_t BUFFER_QUEUE_RETRY_MS = 10; // Microseconds timeout for UDP recv. const int BUFFER_UDP_US_TIMEOUT = 10 * 1000; + // Output queue length for buffer live stream. + const int BUFFER_LIVE_SEND_HWM = 10; + // ZMQ threads for receiving data from sf_replay. const int WRITER_ZMQ_IO_THREADS = 2; diff --git a/core-buffer/src/FastH5Writer.cpp b/core-buffer/src/FastH5Writer.cpp index 8c80af4..04405c1 100644 --- a/core-buffer/src/FastH5Writer.cpp +++ b/core-buffer/src/FastH5Writer.cpp @@ -1,10 +1,8 @@ #include #include "FastH5Writer.hpp" -#include "date.h" #include #include #include -#include extern "C" { diff --git a/core-buffer/src/FastQueue.cpp b/core-buffer/src/FastQueue.cpp index 3e9765f..e39ecc4 100644 --- a/core-buffer/src/FastQueue.cpp +++ b/core-buffer/src/FastQueue.cpp @@ -1,5 +1,6 @@ #include #include +#include #include "FastQueue.hpp" using namespace std; @@ -102,4 +103,5 @@ void FastQueue::release() read_slot_id_ %= n_slots_; } -template class FastQueue; \ No newline at end of file +template class FastQueue; +template class FastQueue; \ No newline at end of file diff --git a/core-buffer/src/LiveH5Reader.cpp b/core-buffer/src/LiveH5Reader.cpp new file mode 100644 index 0000000..e5ef1d8 --- /dev/null +++ b/core-buffer/src/LiveH5Reader.cpp @@ -0,0 +1,98 @@ +#include "LiveH5Reader.hpp" +#include "BufferUtils.hpp" + +using namespace std; +using namespace core_buffer; + +LiveH5Reader::LiveH5Reader( + const std::string& device, + const std::string& channel_name, + const uint16_t source_id): + current_filename_(device + "/" + channel_name + "/CURRENT"), + source_id_(source_id), + pulse_id_buffer_(make_unique(FILE_MOD)), + data_buffer_(make_unique(MODULE_N_PIXELS)) +{ +// auto filename = BufferUtils::get_latest_file(current_filename_); +// file_ = H5::H5File(filename, H5F_ACC_RDONLY | H5F_ACC_SWMR_READ); +// +// uint64_t base_pulse_id = start_pulse_id / core_buffer::FILE_MOD; +// base_pulse_id *= core_buffer::FILE_MOD; +// +// current_file_max_pulse_id_ = +// +// image_dataset_ = input_file.openDataSet("image"); +// pulse_id_dataset_ = input_file.openDataSet("pulse_id"); +// frame_index_dataset_ = input_file.openDataSet("frame_id"); +// daq_rec_dataset_ = input_file.openDataSet("daq_rec"); +// n_received_packets_dataset_ = input_file.openDataSet("received_packets"); + +} + +LiveH5Reader::~LiveH5Reader() { + close_file(); +} + + + +//void load_data_from_file ( +// FileBufferMetadata* metadata_buffer, +// char* image_buffer, +// const string &filename, +// const size_t start_index) +//{ +// +// hsize_t b_image_dim[3] = {REPLAY_READ_BLOCK_SIZE, 512, 1024}; +// H5::DataSpace b_i_space (3, b_image_dim); +// hsize_t b_i_count[] = {REPLAY_READ_BLOCK_SIZE, 512, 1024}; +// hsize_t b_i_start[] = {0, 0, 0}; +// b_i_space.selectHyperslab(H5S_SELECT_SET, b_i_count, b_i_start); +// +// hsize_t f_image_dim[3] = {FILE_MOD, 512, 1024}; +// H5::DataSpace f_i_space (3, f_image_dim); +// hsize_t f_i_count[] = {REPLAY_READ_BLOCK_SIZE, 512, 1024}; +// hsize_t f_i_start[] = {start_index, 0, 0}; +// f_i_space.selectHyperslab(H5S_SELECT_SET, f_i_count, f_i_start); +// +// hsize_t b_metadata_dim[2] = {REPLAY_READ_BLOCK_SIZE, 1}; +// H5::DataSpace b_m_space (2, b_metadata_dim); +// hsize_t b_m_count[] = {REPLAY_READ_BLOCK_SIZE, 1}; +// hsize_t b_m_start[] = {0, 0}; +// b_m_space.selectHyperslab(H5S_SELECT_SET, b_m_count, b_m_start); +// +// hsize_t f_metadata_dim[2] = {FILE_MOD, 1}; +// H5::DataSpace f_m_space (2, f_metadata_dim); +// hsize_t f_m_count[] = {REPLAY_READ_BLOCK_SIZE, 1}; +// hsize_t f_m_start[] = {start_index, 0}; +// f_m_space.selectHyperslab(H5S_SELECT_SET, f_m_count, f_m_start); +// +// H5::H5File input_file(filename, H5F_ACC_RDONLY); +// +// auto image_dataset = input_file.openDataSet("image"); +// image_dataset.read( +// image_buffer, H5::PredType::NATIVE_UINT16, +// b_i_space, f_i_space); +// +// auto pulse_id_dataset = input_file.openDataSet("pulse_id"); +// pulse_id_dataset.read( +// metadata_buffer->pulse_id, H5::PredType::NATIVE_UINT64, +// b_m_space, f_m_space); +// +// auto frame_id_dataset = input_file.openDataSet("frame_id"); +// frame_id_dataset.read( +// metadata_buffer->frame_index, H5::PredType::NATIVE_UINT64, +// b_m_space, f_m_space); +// +// auto daq_rec_dataset = input_file.openDataSet("daq_rec"); +// daq_rec_dataset.read( +// metadata_buffer->daq_rec, H5::PredType::NATIVE_UINT32, +// b_m_space, f_m_space); +// +// auto received_packets_dataset = +// input_file.openDataSet("received_packets"); +// received_packets_dataset.read( +// metadata_buffer->n_received_packets, H5::PredType::NATIVE_UINT16, +// b_m_space, f_m_space); +// +// input_file.close(); +//} \ No newline at end of file diff --git a/core-buffer/src/UdpRecvModule.cpp b/core-buffer/src/UdpRecvModule.cpp index c0514af..f633f29 100644 --- a/core-buffer/src/UdpRecvModule.cpp +++ b/core-buffer/src/UdpRecvModule.cpp @@ -5,100 +5,66 @@ using namespace std; -UdpRecvModule::UdpRecvModule(RingBuffer& ring_buffer) : - ring_buffer_(ring_buffer), - is_receiving_(false) +UdpRecvModule::UdpRecvModule( + FastQueue& queue, + const uint16_t udp_port) : + queue_(queue), + is_receiving_(true) { - -} - -UdpRecvModule::~UdpRecvModule() -{ - stop_recv(); -} - -void UdpRecvModule::start_recv( - const uint16_t udp_port, - const size_t frame_n_bytes) -{ - if (is_receiving_ == true) { - std::stringstream err_msg; - - using namespace date; - using namespace chrono; - err_msg << "[" << system_clock::now() << "]"; - err_msg << "[UdpRecvModule::start_recv]"; - err_msg << " Receivers already running." << endl; - - throw runtime_error(err_msg.str()); - } - #ifdef DEBUG_OUTPUT using namespace date; using namespace chrono; cout << "[" << system_clock::now() << "]"; - cout << "[UdpRecvModule::start_recv]"; + cout << "[UdpRecvModule::UdpRecvModule]"; cout << " Starting with "; cout << "udp_port " << udp_port << endl; #endif - is_receiving_ = true; - - if (receiving_thread_.joinable()) { - receiving_thread_.join(); - } - receiving_thread_ = thread( &UdpRecvModule::receive_thread, this, - udp_port, - frame_n_bytes); + udp_port); } -void UdpRecvModule::stop_recv() +UdpRecvModule::~UdpRecvModule() { -#ifdef DEBUG_OUTPUT - using namespace date; - using namespace chrono; - cout << "[" << system_clock::now() << "]"; - cout << "UdpRecvModule::stop_recv"; - cout << " Stop receiving." << endl; -#endif - is_receiving_ = false; - - if (receiving_thread_.joinable()) { - receiving_thread_.join(); - } + receiving_thread_.join(); } -void UdpRecvModule::receive_thread( - const uint16_t udp_port, - const size_t frame_size) +inline void UdpRecvModule::init_frame ( + ModuleFrame* frame_metadata, + jungfrau_packet& packet_buffer) +{ + frame_metadata->frame_index = packet_buffer.framenum; + frame_metadata->pulse_id = packet_buffer.bunchid; + frame_metadata->daq_rec = packet_buffer.debug; +} + +inline void UdpRecvModule::reserve_next_frame_buffers( + ModuleFrame*& frame_metadata, + char*& frame_buffer) +{ + int slot_id; + if ((slot_id = queue_.reserve()) == -1) + throw runtime_error("Queue is full."); + + frame_metadata = queue_.get_metadata_buffer(slot_id); + frame_metadata->pulse_id=0; + frame_metadata->n_received_packets=0; + + frame_buffer = queue_.get_data_buffer(slot_id); + memset(frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME); +} + +void UdpRecvModule::receive_thread(const uint16_t udp_port) { try { - ring_buffer_.initialize(frame_size); - UdpReceiver udp_receiver; udp_receiver.bind(udp_port); - auto metadata = make_shared(); - metadata->frame_bytes_size = JUNGFRAU_DATA_BYTES_PER_FRAME; - metadata->pulse_id = 0; - metadata->n_recv_packets = 0; - - char* frame_buffer = ring_buffer_.reserve(metadata); - if (frame_buffer == nullptr) { - stringstream err_msg; - - using namespace date; - using namespace chrono; - err_msg << "[" << system_clock::now() << "]"; - err_msg << "[UdpRecvModule::receive_thread]"; - err_msg << " Ring buffer is full."; - err_msg << endl; - - throw runtime_error(err_msg.str()); - } + ModuleFrame* frame_metadata; + char* frame_buffer; + reserve_next_frame_buffers(frame_metadata, frame_buffer); jungfrau_packet packet_buffer; @@ -110,43 +76,16 @@ void UdpRecvModule::receive_thread( continue; } - auto* frame_metadata = metadata.get(); - - // TODO: Horrible. Breake it down into methods. - // First packet for this frame. if (frame_metadata->pulse_id == 0) { - frame_metadata->frame_index = packet_buffer.framenum; - frame_metadata->pulse_id = packet_buffer.bunchid; - frame_metadata->daq_rec = packet_buffer.debug; - // Packet from new frame, while we lost the last packet of - // previous frame. + init_frame(frame_metadata, packet_buffer); + + // Happens if the last packet from the previous frame gets lost. } else if (frame_metadata->pulse_id != packet_buffer.bunchid) { - ring_buffer_.commit(metadata); + queue_.commit(); + reserve_next_frame_buffers(frame_metadata, frame_buffer); - metadata = make_shared(); - metadata->frame_bytes_size = JUNGFRAU_DATA_BYTES_PER_FRAME; - metadata->pulse_id = 0; - metadata->n_recv_packets = 0; - - frame_buffer = ring_buffer_.reserve(metadata); - if (frame_buffer == nullptr) { - stringstream err_msg; - - using namespace date; - using namespace chrono; - err_msg << "[" << system_clock::now() << "]"; - err_msg << "[UdpRecvModule::receive_thread]"; - err_msg << " Ring buffer is full."; - err_msg << endl; - - throw runtime_error(err_msg.str()); - } - memset(frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME); - - frame_metadata->frame_index = packet_buffer.framenum; - frame_metadata->pulse_id = packet_buffer.bunchid; - frame_metadata->daq_rec = packet_buffer.debug; + init_frame(frame_metadata, packet_buffer); } size_t frame_buffer_offset = @@ -157,32 +96,13 @@ void UdpRecvModule::receive_thread( packet_buffer.data, JUNGFRAU_DATA_BYTES_PER_PACKET); - frame_metadata->n_recv_packets++; + frame_metadata->n_received_packets++; - // Frame finished with last packet. + // Last frame packet received. Frame finished. if (packet_buffer.packetnum == JUNGFRAU_N_PACKETS_PER_FRAME-1) { - ring_buffer_.commit(metadata); - - metadata = make_shared(); - metadata->frame_bytes_size = JUNGFRAU_DATA_BYTES_PER_FRAME; - metadata->pulse_id = 0; - metadata->n_recv_packets = 0; - - frame_buffer = ring_buffer_.reserve(metadata); - if (frame_buffer == nullptr) { - stringstream err_msg; - - using namespace date; - using namespace chrono; - err_msg << "[" << system_clock::now() << "]"; - err_msg << "[UdpRecvModule::receive_thread]"; - err_msg << " Ring buffer is full."; - err_msg << endl; - - throw runtime_error(err_msg.str()); - } - memset(frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME); + queue_.commit(); + reserve_next_frame_buffers(frame_metadata, frame_buffer); } } diff --git a/sf-buffer/src/sf_buffer.cpp b/sf-buffer/src/sf_buffer.cpp index 5515f9f..04da883 100644 --- a/sf-buffer/src/sf_buffer.cpp +++ b/sf-buffer/src/sf_buffer.cpp @@ -3,7 +3,8 @@ #include #include #include - +#include +#include "zmq.h" #include "buffer_config.hpp" #include "jungfrau.hpp" @@ -16,10 +17,12 @@ int main (int argc, char *argv[]) { if (argc != 4) { cout << endl; cout << "Usage: sf_buffer [device_name] [udp_port] [root_folder]"; + cout << "[source_id]"; cout << endl; cout << "\tdevice_name: Name to write to disk."; cout << "\tudp_port: UDP port to connect to." << endl; cout << "\troot_folder: FS root folder." << endl; + cout << "\tsource_id: ID of the source for live stream." << endl; cout << endl; exit(-1); @@ -28,11 +31,29 @@ int main (int argc, char *argv[]) { string device_name = string(argv[1]); int udp_port = atoi(argv[2]); string root_folder = string(argv[3]); + int source_id = atoi(argv[2]); - RingBuffer ring_buffer(BUFFER_RB_SIZE); + stringstream ipc_stream; + ipc_stream << "ipc://sf-live-" << source_id; + const auto ipc_address = ipc_stream.str(); - UdpRecvModule udp_module(ring_buffer); - udp_module.start_recv(udp_port, JUNGFRAU_DATA_BYTES_PER_FRAME); + auto ctx = zmq_ctx_new(); + auto socket = zmq_socket(ctx, ZMQ_PUB); + + const int sndhwm = BUFFER_LIVE_SEND_HWM; + if (zmq_setsockopt(socket, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)) != 0) + throw runtime_error(strerror (errno)); + + const int linger_ms = 0; + if (zmq_setsockopt(socket, ZMQ_LINGER, &linger_ms, sizeof(linger_ms)) != 0) + throw runtime_error(strerror (errno)); + + if (zmq_bind(socket, ipc_address.c_str()) != 0) + throw runtime_error(strerror (errno)); + + FastQueue queue(MODULE_N_BYTES, BUFFER_INTERNAL_QUEUE_SIZE); + + UdpRecvModule udp_module(queue, udp_port); uint64_t stats_counter(0); uint64_t n_missed_packets = 0; @@ -49,43 +70,56 @@ int main (int argc, char *argv[]) { writer.add_scalar_metadata("received_packets"); while (true) { - auto data = ring_buffer.read(); + auto slot_id = queue.read(); - if (data.first == nullptr) { - this_thread::sleep_for(chrono::milliseconds(10)); + if (slot_id == -1){ + this_thread::sleep_for(chrono::milliseconds(BUFFER_QUEUE_RETRY_MS)); continue; } - auto pulse_id = data.first->pulse_id; + ModuleFrame* metadata = queue.get_metadata_buffer(slot_id); + char* data = queue.get_data_buffer(slot_id); + + auto pulse_id = metadata->pulse_id; writer.set_pulse_id(pulse_id); - writer.write_data(data.second); + writer.write_data(data); // TODO: Combine all this into 1 struct. writer.write_scalar_metadata( - "pulse_id", &(data.first->pulse_id)); + "pulse_id", &(metadata->pulse_id)); writer.write_scalar_metadata( "frame_id", - &(data.first->frame_index)); + &(metadata->frame_index)); writer.write_scalar_metadata( "daq_rec", - &(data.first->daq_rec)); + &(metadata->daq_rec)); writer.write_scalar_metadata( "received_packets", - &(data.first->n_recv_packets)); + &(metadata->n_received_packets)); - ring_buffer.release(data.first->buffer_slot_index); + zmq_send(socket, + metadata, + sizeof(ModuleFrame), + ZMQ_SNDMORE); + + zmq_send(socket, + data, + MODULE_N_BYTES, + 0); + + queue.release(); // TODO: Make real statistics, please. stats_counter++; - if (data.first->n_recv_packets < JUNGFRAU_N_PACKETS_PER_FRAME) { + if (metadata->n_received_packets < JUNGFRAU_N_PACKETS_PER_FRAME) { n_missed_packets += - JUNGFRAU_N_PACKETS_PER_FRAME - data.first->n_recv_packets; + JUNGFRAU_N_PACKETS_PER_FRAME - metadata->n_received_packets; } if (last_pulse_id>0) { diff --git a/sf-buffer/src/sf_live.cpp b/sf-buffer/src/sf_live.cpp index d2c4592..6d7cb4e 100644 --- a/sf-buffer/src/sf_live.cpp +++ b/sf-buffer/src/sf_live.cpp @@ -1,199 +1,62 @@ #include -#include #include "jungfrau.hpp" -#include "BufferUtils.hpp" #include "zmq.h" #include "buffer_config.hpp" -#include #include #include "date.h" +#include "LiveH5Reader.hpp" using namespace std; using namespace core_buffer; -struct FileBufferMetadata { - uint64_t pulse_id[REPLAY_READ_BLOCK_SIZE]; - uint64_t frame_index[REPLAY_READ_BLOCK_SIZE]; - uint32_t daq_rec[REPLAY_READ_BLOCK_SIZE]; - uint16_t n_received_packets[REPLAY_READ_BLOCK_SIZE]; -}; - -void load_data_from_file ( - FileBufferMetadata* metadata_buffer, - char* image_buffer, - const string &filename, - const size_t start_index) -{ - - hsize_t b_image_dim[3] = {REPLAY_READ_BLOCK_SIZE, 512, 1024}; - H5::DataSpace b_i_space (3, b_image_dim); - hsize_t b_i_count[] = {REPLAY_READ_BLOCK_SIZE, 512, 1024}; - hsize_t b_i_start[] = {0, 0, 0}; - b_i_space.selectHyperslab(H5S_SELECT_SET, b_i_count, b_i_start); - - hsize_t f_image_dim[3] = {FILE_MOD, 512, 1024}; - H5::DataSpace f_i_space (3, f_image_dim); - hsize_t f_i_count[] = {REPLAY_READ_BLOCK_SIZE, 512, 1024}; - hsize_t f_i_start[] = {start_index, 0, 0}; - f_i_space.selectHyperslab(H5S_SELECT_SET, f_i_count, f_i_start); - - hsize_t b_metadata_dim[2] = {REPLAY_READ_BLOCK_SIZE, 1}; - H5::DataSpace b_m_space (2, b_metadata_dim); - hsize_t b_m_count[] = {REPLAY_READ_BLOCK_SIZE, 1}; - hsize_t b_m_start[] = {0, 0}; - b_m_space.selectHyperslab(H5S_SELECT_SET, b_m_count, b_m_start); - - hsize_t f_metadata_dim[2] = {FILE_MOD, 1}; - H5::DataSpace f_m_space (2, f_metadata_dim); - hsize_t f_m_count[] = {REPLAY_READ_BLOCK_SIZE, 1}; - hsize_t f_m_start[] = {start_index, 0}; - f_m_space.selectHyperslab(H5S_SELECT_SET, f_m_count, f_m_start); - - H5::H5File input_file(filename, H5F_ACC_RDONLY); - - auto image_dataset = input_file.openDataSet("image"); - image_dataset.read( - image_buffer, H5::PredType::NATIVE_UINT16, - b_i_space, f_i_space); - - auto pulse_id_dataset = input_file.openDataSet("pulse_id"); - pulse_id_dataset.read( - metadata_buffer->pulse_id, H5::PredType::NATIVE_UINT64, - b_m_space, f_m_space); - - auto frame_id_dataset = input_file.openDataSet("frame_id"); - frame_id_dataset.read( - metadata_buffer->frame_index, H5::PredType::NATIVE_UINT64, - b_m_space, f_m_space); - - auto daq_rec_dataset = input_file.openDataSet("daq_rec"); - daq_rec_dataset.read( - metadata_buffer->daq_rec, H5::PredType::NATIVE_UINT32, - b_m_space, f_m_space); - - auto received_packets_dataset = - input_file.openDataSet("received_packets"); - received_packets_dataset.read( - metadata_buffer->n_received_packets, H5::PredType::NATIVE_UINT16, - b_m_space, f_m_space); - - input_file.close(); -} - void sf_live ( void* socket, const string& device, const string& channel_name, - const uint16_t source_id, - const uint64_t start_pulse_id) + const uint16_t source_id) { - auto metadata_buffer = make_unique(); - auto image_buffer = make_unique( - LIVE_READ_BLOCK_SIZE * MODULE_N_PIXELS); + LiveH5Reader reader(device, channel_name, source_id); - auto latest_filename = ""; + auto current_pulse_id = reader.get_latest_pulse_id(); + while (true) { - uint64_t base_pulse_id = start_pulse_id / core_buffer::FILE_MOD; - base_pulse_id *= core_buffer::FILE_MOD; + reader.load_pulse_id(current_pulse_id); - size_t current_pulse_id = base_pulse_id; - string filename_base = device + "/" + channel_name + "/"; + auto metadata = reader.get_metadata(); - for (const auto& filename_suffix:path_suffixes) { + zmq_send(socket, + &metadata, + sizeof(ModuleFrame), + ZMQ_SNDMORE); - string filename = filename_base + filename_suffix.path; + auto data = reader.get_data(); + + zmq_send(socket, + data, + MODULE_N_BYTES, + 0); #ifdef DEBUG_OUTPUT using namespace date; using namespace chrono; cout << "[" << system_clock::now() << "]"; - cout << "[sf_replay::sf_replay]"; - - cout << " Reading from filename " << filename << endl; + cout << "[sf_live::sf_live]"; + cout << " Sent pulse_id "; + cout << current_pulse_id << endl; #endif - for (size_t file_index_offset=0; - file_index_offset < FILE_MOD; - file_index_offset += REPLAY_READ_BLOCK_SIZE) - { - auto start_time = chrono::steady_clock::now(); - - load_data_from_file( - metadata_buffer.get(), - (char*)(image_buffer.get()), - filename, - file_index_offset); - - auto end_time = chrono::steady_clock::now(); - auto ms_duration = chrono::duration_cast( - end_time-start_time).count(); - - cout << "sf_replay:batch_read_ms " << ms_duration << endl; - - for (size_t i_frame=0; i_frame < REPLAY_READ_BLOCK_SIZE; i_frame++) { - - ModuleFrame module_frame = { - metadata_buffer->pulse_id[i_frame], - metadata_buffer->frame_index[i_frame], - metadata_buffer->daq_rec[i_frame], - metadata_buffer->n_received_packets[i_frame], - source_id - }; - - if (current_pulse_id < start_pulse_id) { - current_pulse_id++; - continue; - } - - if (current_pulse_id != module_frame.pulse_id) { - stringstream err_msg; - - using namespace date; - using namespace chrono; - err_msg << "[" << system_clock::now() << "]"; - err_msg << "[sf_live::sf_live]"; - err_msg << " Read unexpected pulse_id. "; - err_msg << " Expected " << current_pulse_id; - err_msg << " received " << module_frame.pulse_id; - err_msg << endl; - - throw runtime_error(err_msg.str()); - } - - zmq_send(socket, - &module_frame, - sizeof(ModuleFrame), - ZMQ_SNDMORE); - - auto buff_offset = i_frame * MODULE_N_PIXELS; - zmq_send(socket, - (char*)(image_buffer.get() + buff_offset), - MODULE_N_BYTES, - 0); - - #ifdef DEBUG_OUTPUT - using namespace date; - using namespace chrono; - - cout << "[" << system_clock::now() << "]"; - cout << "[sf_replay::sf_replay]"; - cout << " Sent pulse_id "; - cout << current_pulse_id << endl; - #endif - - current_pulse_id++; - } - } + current_pulse_id++; } + + reader.close_file(); } int main (int argc, char *argv[]) { if (argc != 6) { cout << endl; - cout << "Usage: sf_live [device]"; - cout << " [channel_name] [source_id] [start_pulse_id]"; + cout << "Usage: sf_live [device] [channel_name] [source_id]"; cout << endl; cout << "\tdevice: Name of detector." << endl; cout << "\tchannel_name: M00-M31 for JF16M." << endl; @@ -206,7 +69,6 @@ int main (int argc, char *argv[]) { const string device = string(argv[1]); const string channel_name = string(argv[2]); const uint16_t source_id = (uint16_t) atoi(argv[3]); - const uint64_t start_pulse_id = (uint64_t) atoll(argv[4]); stringstream ipc_stream; ipc_stream << "ipc://sf-live-" << (int)source_id; @@ -241,7 +103,7 @@ int main (int argc, char *argv[]) { if (zmq_connect(socket, ipc_address.c_str()) != 0) throw runtime_error(strerror (errno)); - sf_live(socket, device, channel_name, source_id, start_pulse_id); + sf_live(socket, device, channel_name, source_id); zmq_close(socket); zmq_ctx_destroy(ctx);