Make UdpReceiver quicker and more simple

This commit is contained in:
2020-05-04 12:17:54 +02:00
parent 7841a70698
commit a6d065a285
2 changed files with 45 additions and 72 deletions
+37 -72
View File
@@ -31,6 +31,30 @@ UdpRecvModule::~UdpRecvModule()
receiving_thread_.join();
}
inline void UdpRecvModule::init_frame (
ModuleFrame* frame_metadata,
jungfrau_packet& packet_buffer)
{
frame_metadata->frame_index = packet_buffer.framenum;
frame_metadata->pulse_id = packet_buffer.bunchid;
frame_metadata->daq_rec = packet_buffer.debug;
}
inline void UdpRecvModule::reserve_next_frame_buffers(
ModuleFrame*& frame_metadata,
char*& frame_buffer)
{
int slot_id;
if ((slot_id = queue_.reserve()) == -1)
throw runtime_error("Queue is full.");
frame_metadata = queue_.get_metadata_buffer(slot_id);
frame_metadata->pulse_id=0;
frame_metadata->n_received_packets=0;
frame_buffer = queue_.get_data_buffer(slot_id);
memset(frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME);
}
void UdpRecvModule::receive_thread(const uint16_t udp_port)
{
@@ -38,27 +62,12 @@ void UdpRecvModule::receive_thread(const uint16_t udp_port)
UdpReceiver udp_receiver;
udp_receiver.bind(udp_port);
ModuleFrame* module_frame;
module_frame->pulse_id = 0;
module_frame->n_received_packets = 0;
ModuleFrame* frame_metadata;
char* frame_buffer;
reserve_next_frame_buffers(frame_metadata, frame_buffer);
jungfrau_packet packet_buffer;
auto slot_id = queue_.reserve();
if (slot_id == -1) {
stringstream err_msg;
using namespace date;
using namespace chrono;
err_msg << "[" << system_clock::now() << "]";
err_msg << "[UdpRecvModule::receive_thread]";
err_msg << " Queue is full.";
err_msg << endl;
throw runtime_error(err_msg.str());
}
while (is_receiving_.load(memory_order_relaxed)) {
if (!udp_receiver.receive(
@@ -67,41 +76,16 @@ void UdpRecvModule::receive_thread(const uint16_t udp_port)
continue;
}
// TODO: Horrible. Breake it down into methods.
// First packet for this frame.
if (frame_metadata->pulse_id == 0) {
frame_metadata->frame_index = packet_buffer.framenum;
frame_metadata->pulse_id = packet_buffer.bunchid;
frame_metadata->daq_rec = packet_buffer.debug;
// Packet from new frame, while we lost the last packet of
// previous frame.
init_frame(frame_metadata, packet_buffer);
// Happens if the last packet from the previous frame gets lost.
} else if (frame_metadata->pulse_id != packet_buffer.bunchid) {
ring_buffer_.commit(metadata);
queue_.commit();
reserve_next_frame_buffers(frame_metadata, frame_buffer);
metadata = make_shared<UdpFrameMetadata>();
metadata->frame_bytes_size = JUNGFRAU_DATA_BYTES_PER_FRAME;
metadata->pulse_id = 0;
metadata->n_recv_packets = 0;
frame_buffer = ring_buffer_.reserve(metadata);
if (frame_buffer == nullptr) {
stringstream err_msg;
using namespace date;
using namespace chrono;
err_msg << "[" << system_clock::now() << "]";
err_msg << "[UdpRecvModule::receive_thread]";
err_msg << " Ring buffer is full.";
err_msg << endl;
throw runtime_error(err_msg.str());
}
memset(frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME);
frame_metadata->frame_index = packet_buffer.framenum;
frame_metadata->pulse_id = packet_buffer.bunchid;
frame_metadata->daq_rec = packet_buffer.debug;
init_frame(frame_metadata, packet_buffer);
}
size_t frame_buffer_offset =
@@ -112,32 +96,13 @@ void UdpRecvModule::receive_thread(const uint16_t udp_port)
packet_buffer.data,
JUNGFRAU_DATA_BYTES_PER_PACKET);
frame_metadata->n_recv_packets++;
frame_metadata->n_received_packets++;
// Frame finished with last packet.
// Last frame packet received. Frame finished.
if (packet_buffer.packetnum == JUNGFRAU_N_PACKETS_PER_FRAME-1)
{
ring_buffer_.commit(metadata);
metadata = make_shared<UdpFrameMetadata>();
metadata->frame_bytes_size = JUNGFRAU_DATA_BYTES_PER_FRAME;
metadata->pulse_id = 0;
metadata->n_recv_packets = 0;
frame_buffer = ring_buffer_.reserve(metadata);
if (frame_buffer == nullptr) {
stringstream err_msg;
using namespace date;
using namespace chrono;
err_msg << "[" << system_clock::now() << "]";
err_msg << "[UdpRecvModule::receive_thread]";
err_msg << " Ring buffer is full.";
err_msg << endl;
throw runtime_error(err_msg.str());
}
memset(frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME);
queue_.commit();
reserve_next_frame_buffers(frame_metadata, frame_buffer);
}
}