diff --git a/sf-buffer/src/sf_buffer.cpp b/sf-buffer/src/sf_buffer.cpp index 2a9f443..4bd6f0a 100644 --- a/sf-buffer/src/sf_buffer.cpp +++ b/sf-buffer/src/sf_buffer.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include "zmq.h" #include "buffer_config.hpp" #include "jungfrau.hpp" @@ -50,9 +51,9 @@ int main (int argc, char *argv[]) { if (zmq_bind(socket, ipc_address.c_str()) != 0) throw runtime_error(strerror (errno)); - RingBuffer ring_buffer(BUFFER_RB_SIZE); + FastQueue queue(MODULE_N_BYTES, BUFFER_INTERNAL_QUEUE_SIZE); - UdpRecvModule udp_module(ring_buffer); + UdpRecvModule udp_module(queue); udp_module.start_recv(udp_port, JUNGFRAU_DATA_BYTES_PER_FRAME); uint64_t stats_counter(0); @@ -70,61 +71,56 @@ int main (int argc, char *argv[]) { writer.add_scalar_metadata("received_packets"); while (true) { - auto data = ring_buffer.read(); + auto slot_id = queue.read(); - if (data.first == nullptr) { - this_thread::sleep_for(chrono::milliseconds(10)); + if (slot_id == -1){ + this_thread::sleep_for(chrono::milliseconds(BUFFER_QUEUE_RETRY_MS)); continue; } - auto pulse_id = data.first->pulse_id; + ModuleFrame* metadata = queue.get_metadata_buffer(slot_id); + char* data = queue.get_data_buffer(slot_id); + + auto pulse_id = metadata->pulse_id; writer.set_pulse_id(pulse_id); - writer.write_data(data.second); + writer.write_data(data); // TODO: Combine all this into 1 struct. writer.write_scalar_metadata( - "pulse_id", &(data.first->pulse_id)); + "pulse_id", &(metadata->pulse_id)); writer.write_scalar_metadata( "frame_id", - &(data.first->frame_index)); + &(metadata->frame_index)); writer.write_scalar_metadata( "daq_rec", - &(data.first->daq_rec)); + &(metadata->daq_rec)); writer.write_scalar_metadata( "received_packets", - &(data.first->n_recv_packets)); - - ModuleFrame metadata = { - metadata.pulse_id, - metadata.frame_index, - metadata.daq_rec, - metadata.n_received_packets, - (uint16_t) source_id - }; + &(metadata->n_received_packets)); zmq_send(socket, - &metadata, + metadata, sizeof(ModuleFrame), ZMQ_SNDMORE); zmq_send(socket, - data.second, + data, MODULE_N_BYTES, 0); - ring_buffer.release(data.first->buffer_slot_index); + queue.release(); // TODO: Make real statistics, please. stats_counter++; - if (data.first->n_recv_packets < JUNGFRAU_N_PACKETS_PER_FRAME) { + if (metadata->n_received_packets < JUNGFRAU_N_PACKETS_PER_FRAME) { n_missed_packets += - JUNGFRAU_N_PACKETS_PER_FRAME - data.first->n_recv_packets; + JUNGFRAU_N_PACKETS_PER_FRAME - metadata->n_received_packets; } if (last_pulse_id>0) {