Simplify interface

This commit is contained in:
2020-04-28 17:44:55 +02:00
parent 3fc8de83bf
commit c15b74a569
+38 -24
View File
@@ -3,21 +3,28 @@
#include "buffer_config.hpp"
#include "zmq.h"
#include <string>
#include <RingBuffer.hpp>
#include <jungfrau.hpp>
#include <thread>
#include <chrono>
#include "SFWriter.hpp"
#include <config.hpp>
#include <FastQueue.hpp>
using namespace std;
using namespace core_buffer;
struct DetectorFrame
{
uint64_t pulse_id;
uint64_t frame_index;
uint32_t daq_rec;
uint16_t n_received_packets;
};
void receive_replay(
const string ipc_prefix,
const size_t n_modules,
RingBuffer<DetectorFrame>& ring_buffer,
FastQueue<DetectorFrame>& queue,
void* ctx)
{
try {
@@ -45,22 +52,24 @@ void receive_replay(
}
}
auto metadata_buffer = make_unique<ModuleFrame>();
char *image_buffer = nullptr;
auto module_meta_buffer = make_unique<ModuleFrame>();
while (true) {
auto rb_metadata = make_shared<DetectorFrame>();
image_buffer = ring_buffer.reserve(rb_metadata);
if (image_buffer == nullptr){
auto slot_id = queue.reserve();
if (slot_id == -1){
this_thread::sleep_for(chrono::milliseconds(5));
continue;
}
auto frame_meta_buffer = queue.get_metadata_buffer(slot_id);
auto frame_buffer = queue.get_data_buffer(slot_id);
for (size_t i = 0; i < n_modules; i++) {
auto n_bytes_metadata = zmq_recv(
sockets[i],
metadata_buffer.get(),
(char*) frame_meta_buffer,
sizeof(ModuleFrame),
0);
@@ -71,20 +80,24 @@ void receive_replay(
// Initialize buffers in first iteration for each pulse_id.
if (i == 0) {
rb_metadata->pulse_id = metadata_buffer->pulse_id;
rb_metadata->frame_index = metadata_buffer->frame_index;
rb_metadata->daq_rec = metadata_buffer->daq_rec;
rb_metadata->n_received_packets =
metadata_buffer->n_received_packets;
frame_meta_buffer->pulse_id =
module_meta_buffer->pulse_id;
frame_meta_buffer->frame_index =
module_meta_buffer->frame_index;
frame_meta_buffer->daq_rec =
module_meta_buffer->daq_rec;
frame_meta_buffer->n_received_packets =
module_meta_buffer->n_received_packets;
}
if (rb_metadata->pulse_id != metadata_buffer->pulse_id) {
if (frame_meta_buffer->pulse_id !=
module_meta_buffer->pulse_id) {
throw runtime_error("Unexpected pulse_id received.");
}
auto n_bytes_image = zmq_recv(
sockets[i],
(image_buffer + (MODULE_N_PIXELS * i)),
(frame_buffer + (MODULE_N_PIXELS * i)),
MODULE_N_BYTES,
0);
@@ -94,7 +107,7 @@ void receive_replay(
}
}
ring_buffer.commit(rb_metadata);
queue.commit();
}
for (size_t i = 0; i < n_modules; i++) {
@@ -136,8 +149,9 @@ int main (int argc, char *argv[])
size_t n_modules = 32;
RingBuffer<DetectorFrame> ring_buffer(WRITER_RB_BUFFER_SLOTS);
ring_buffer.initialize(MODULE_N_BYTES*n_modules);
FastQueue<DetectorFrame> queue(
n_modules * MODULE_N_BYTES,
WRITER_RB_BUFFER_SLOTS);
string ipc_prefix = "ipc://sf-replay-";
auto ctx = zmq_ctx_new();
@@ -147,7 +161,7 @@ int main (int argc, char *argv[])
receive_replay,
ipc_prefix,
n_modules,
ref(ring_buffer),
ref(queue),
ctx);
size_t n_frames = stop_pulse_id - start_pulse_id;
@@ -166,16 +180,16 @@ int main (int argc, char *argv[])
auto current_pulse_id = start_pulse_id;
while (current_pulse_id <= stop_pulse_id) {
auto received_data = ring_buffer.read();
auto slot_id = queue.read();
if(received_data.first == nullptr) {
if(slot_id == -1) {
this_thread::sleep_for(chrono::milliseconds(
config::ring_buffer_read_retry_interval));
continue;
}
auto metadata = received_data.first;
auto data = received_data.second;
auto metadata = queue.get_metadata_buffer(slot_id);
auto data = queue.get_data_buffer(slot_id);
if (metadata->pulse_id != current_pulse_id) {
stringstream err_msg;
@@ -199,7 +213,7 @@ int main (int argc, char *argv[])
start_time = chrono::steady_clock::now();
writer.write(metadata, data);
ring_buffer.release(metadata->buffer_slot_index);
queue.release();
current_pulse_id++;
// TODO: Some poor statistics.