Revert back to multithreaded writer

This commit is contained in:
2020-04-20 20:12:53 +02:00
parent 54acb47e31
commit 8fc02e0789
+32 -104
View File
@@ -3,40 +3,14 @@
#include <RingBuffer.hpp>
#include <UdpRecvModule.hpp>
#include <FastH5Writer.hpp>
#include <UdpReceiver.hpp>
#include "config.hpp"
#include "jungfrau.hpp"
#include "BufferUtils.hpp"
using namespace std;
void write_frame(
FastH5Writer &writer,
const uint64_t pulse_id,
const UdpFrameMetadata* frame_metadata,
const char* frame_buffer)
{
writer.set_pulse_id(pulse_id);
writer.write_data(frame_buffer);
writer.write_scalar_metadata<uint64_t>(
"pulse_id", &(frame_metadata->pulse_id));
writer.write_scalar_metadata<uint64_t>(
"frame_id",
&(frame_metadata->frame_index));
writer.write_scalar_metadata<uint32_t>(
"daq_rec",
&(frame_metadata->daq_rec));
writer.write_scalar_metadata<uint16_t>(
"received_packets",
&(frame_metadata->n_recv_packets));
}
int main (int argc, char *argv[]) {
if (argc != 4) {
@@ -55,6 +29,11 @@ int main (int argc, char *argv[]) {
int udp_port = atoi(argv[2]);
string root_folder = string(argv[3]);
RingBuffer<UdpFrameMetadata> ring_buffer(config::ring_buffer_n_slots);
UdpRecvModule udp_module(ring_buffer);
udp_module.start_recv(udp_port, JUNGFRAU_DATA_BYTES_PER_FRAME);
uint64_t n_stat_out(0);
uint64_t n_frames_with_missing_packets = 0;
uint64_t n_missed_frames = 0;
@@ -68,91 +47,42 @@ int main (int argc, char *argv[]) {
writer.add_scalar_metadata<uint32_t>("daq_rec");
writer.add_scalar_metadata<uint16_t>("received_packets");
jungfrau_packet packet_buffer;
UdpReceiver udp_receiver;
udp_receiver.bind(udp_port);
char* previous_frame_buffer = new char[2*512*1024];
memset(previous_frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME);
UdpFrameMetadata previous_metadata;
previous_metadata.pulse_id = 0;
previous_metadata.n_recv_packets = 0;
previous_metadata.daq_rec = 0;
previous_metadata.n_recv_packets = 0;
char* current_frame_buffer = new char[2*512*1024];
memset(current_frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME);
UdpFrameMetadata current_metadata;
current_metadata.pulse_id = 0;
current_metadata.n_recv_packets = 0;
current_metadata.daq_rec = 0;
current_metadata.n_recv_packets = 0;
while (true) {
auto data = ring_buffer.read();
while (true) {
if (!udp_receiver.receive(
&packet_buffer,
JUNGFRAU_BYTES_PER_PACKET)) {
continue;
}
if (current_metadata.pulse_id != packet_buffer.bunchid) {
if (current_metadata.pulse_id != 0) {
// Commit
previous_metadata = current_metadata;
swap(previous_frame_buffer, current_frame_buffer);
}
// Init current_metadata
current_metadata.frame_index = packet_buffer.framenum;
current_metadata.pulse_id = packet_buffer.bunchid;
current_metadata.daq_rec = packet_buffer.debug;
current_metadata.n_recv_packets = 0;
memset(current_frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME);
}
size_t frame_buffer_offset =
JUNGFRAU_DATA_BYTES_PER_PACKET * packet_buffer.packetnum;
memcpy((void*) (current_frame_buffer + frame_buffer_offset),
packet_buffer.data,
JUNGFRAU_DATA_BYTES_PER_PACKET);
current_metadata.n_recv_packets++;
// Frame finished with last packet.
if (packet_buffer.packetnum == JUNGFRAU_N_PACKETS_PER_FRAME-1)
{
// Commit.
previous_metadata = current_metadata;
swap(previous_frame_buffer, current_frame_buffer);
// This will cause a reset on the next iteration.
current_metadata.pulse_id = 0;
}
if (previous_metadata.pulse_id != 0) {
break;
}
if (data.first == nullptr) {
this_thread::sleep_for(chrono::milliseconds(10));
continue;
}
uint64_t pulse_id = previous_metadata.pulse_id;
write_frame(writer, pulse_id, &previous_metadata, previous_frame_buffer);
auto pulse_id = data.first->pulse_id;
writer.set_pulse_id(pulse_id);
// Indicates that is processed.
previous_metadata.pulse_id = 0;
writer.write_data(data.second);
writer.write_scalar_metadata<uint64_t>(
"pulse_id", &(data.first->pulse_id));
writer.write_scalar_metadata<uint64_t>(
"frame_id",
&(data.first->frame_index));
writer.write_scalar_metadata<uint32_t>(
"daq_rec",
&(data.first->daq_rec));
writer.write_scalar_metadata<uint16_t>(
"received_packets",
&(data.first->n_recv_packets));
ring_buffer.release(data.first->buffer_slot_index);
// TODO: Make real statistics, please.
n_stat_out++;
if (previous_metadata.n_recv_packets < JUNGFRAU_N_PACKETS_PER_FRAME) {
if (data.first->n_recv_packets < JUNGFRAU_N_PACKETS_PER_FRAME) {
n_frames_with_missing_packets +=
JUNGFRAU_N_PACKETS_PER_FRAME -
previous_metadata.n_recv_packets;
JUNGFRAU_N_PACKETS_PER_FRAME - data.first->n_recv_packets;
}
if (last_pulse_id>0) {
@@ -172,7 +102,5 @@ int main (int argc, char *argv[]) {
n_frames_with_missing_packets = 0;
n_missed_frames = 0;
}
this_thread::yield();
}
}