Major writer re-write

This commit is contained in:
2020-05-27 12:21:20 +02:00
parent 6995e71543
commit 759803389e
+58 -55
View File
@@ -1,63 +1,58 @@
#include <iostream>
#include <stdexcept>
#include "buffer_config.hpp"
#include "zmq.h"
#include <string>
#include <jungfrau.hpp>
#include <thread>
#include <chrono>
#include "JFH5Writer.hpp"
#include <FastQueue.hpp>
#include <cstring>
#include "date.h"
#include "zmq.h"
#include "jungfrau.hpp"
#include "buffer_config.hpp"
#include "bitshuffle/bitshuffle.h"
#include "WriterZmqReceiver.hpp"
#include "JFH5Writer.hpp"
#include "BufferBinaryReader.hpp"
using namespace std;
using namespace core_buffer;
using namespace chrono;
void receive_replay(
void* ctx,
const string ipc_prefix,
const size_t n_modules,
FastQueue<ImageMetadataBlock>& queue,
const uint64_t start_pulse_id,
const uint64_t stop_pulse_id)
void read_buffer(
const string device,
const string channel_name,
const vector<uint64_t>& blocks)
{
try {
WriterZmqReceiver receiver(ctx, ipc_prefix, n_modules, stop_pulse_id);
BufferBinaryReader block_reader(device, channel_name);
uint64_t current_pulse_id=start_pulse_id;
// "<= stop_pulse_id" because we include the last pulse_id.
while(current_pulse_id<=stop_pulse_id) {
int slot_id;
while((slot_id = queue.reserve()) == -1) {
this_thread::sleep_for(chrono::milliseconds(
RB_READ_RETRY_INTERVAL_MS));
}
// "<= stop_block" because we include the stop_block in the transfer.
for (uint64_t curr_block=start_block;
curr_block <= stop_block;
curr_block++) {
auto metadata = queue.get_metadata_buffer(slot_id);
auto buffer = queue.get_data_buffer(slot_id);
receiver.get_next_buffer(
current_pulse_id, metadata, buffer);
queue.commit();
current_pulse_id += metadata->n_images;
int slot_id;
while((slot_id = queue.reserve()) == -1) {
this_thread::sleep_for(chrono::milliseconds(
RB_READ_RETRY_INTERVAL_MS));
}
} catch (const std::exception& e) {
using namespace date;
using namespace chrono;
auto start_time = steady_clock::now();
cout << "[" << system_clock::now() << "]";
cout << "[sf_writer::receive_replay]";
cout << " Stopped because of exception: " << endl;
cout << e.what() << endl;
auto block_buffer = queue.get_metadata_buffer(slot_id);
throw;
block_reader.get_block(curr_block, block_buffer);
auto end_time = steady_clock::now();
uint64_t read_us_duration = duration_cast<microseconds>(
end_time-start_time).count();
queue.commit();
// TODO: Proper statistics
cout << "sf_replay:avg_read_us ";
cout << read_us_duration / BUFFER_BLOCK_SIZE << endl;
}
}
@@ -65,11 +60,11 @@ int main (int argc, char *argv[])
{
if (argc != 5) {
cout << endl;
cout << "Usage: sf_writer ";
cout << " [ipc_id] [output_file] [start_pulse_id] [stop_pulse_id]";
cout << "Usage: sf_writer [output_file] [device]";
cout << " [start_pulse_id] [stop_pulse_id]";
cout << endl;
cout << "\tipc_id: Unique identifier for ipc." << endl;
cout << "\toutput_file: Complete path to the output file." << endl;
cout << "\tdevice: Name of detector." << endl;
cout << "\tstart_pulse_id: Start pulse_id of retrieval." << endl;
cout << "\tstop_pulse_id: Stop pulse_id of retrieval." << endl;
cout << endl;
@@ -77,24 +72,32 @@ int main (int argc, char *argv[])
exit(-1);
}
const string ipc_id = string(argv[1]);
string output_file = string(argv[2]);
uint64_t start_pulse_id = (uint64_t) atoll(argv[3]);
uint64_t stop_pulse_id = (uint64_t) atoll(argv[4]);
string output_file = string(argv[1]);
const string device = string(argv[2]);
uint64_t start_pulse_id = (uint64_t) atoll(argv[4]);
uint64_t stop_pulse_id = (uint64_t) atoll(argv[5]);
size_t n_modules = 32;
FastQueue<ImageMetadataBlock> queue(
MODULE_N_BYTES * n_modules * WRITER_DATA_CACHE_N_IMAGES,
WRITER_FASTQUEUE_N_SLOTS);
uint64_t start_block = start_pulse_id / BUFFER_BLOCK_SIZE;
uint64_t stop_block = stop_pulse_id / BUFFER_BLOCK_SIZE;
auto n_blocks = stop_block - start_block + 1;
auto ctx = zmq_ctx_new();
zmq_ctx_set (ctx, ZMQ_IO_THREADS, WRITER_ZMQ_IO_THREADS);
// Generate list of buffer blocks that need to be loaded.
std::vector<uint64_t> buffer_blocks(n_blocks);
for (uint64_t curr_block=start_block;
curr_block<=stop_block;
curr_block++) {
buffer_blocks.push_back(curr_block);
}
auto ipc_base = REPLAY_STREAM_IPC_URL + ipc_id + "-";
thread replay_receive_thread(receive_replay,
ctx, ipc_base, n_modules,
ref(queue), start_pulse_id, stop_pulse_id);
std::vector<std::thread> reading_threads(n_modules);
for (size_t i_module=0; i_module<n_modules; i_module++) {
string channel_name = "M" + to_string(i_module);
reading_threads.emplace_back(
read_buffer, device, channel_name, ref(buffer_blocks));
}
size_t n_frames = stop_pulse_id - start_pulse_id + 1;
JFH5Writer writer(output_file, n_frames, n_modules);