From 9b787e226499220d5f1c7a8da6347468c8352b4f Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Wed, 15 Apr 2020 15:31:31 +0200 Subject: [PATCH] First stab of sf_buffer --- sf-buffer/sf_buffer.cpp | 121 ++++++++++++++++++---------------------- 1 file changed, 53 insertions(+), 68 deletions(-) diff --git a/sf-buffer/sf_buffer.cpp b/sf-buffer/sf_buffer.cpp index 2f43df1..8cbf20b 100644 --- a/sf-buffer/sf_buffer.cpp +++ b/sf-buffer/sf_buffer.cpp @@ -3,6 +3,9 @@ #include #include #include +#include +#include +#include #include "config.hpp" #include "jungfrau.hpp" @@ -27,88 +30,70 @@ int main (int argc, char *argv[]) { int udp_port = atoi(argv[2]); string root_folder = string(argv[3]); - auto socket_fd = socket(AF_INET, SOCK_DGRAM, 0); - if (socket_fd < 0) { - throw runtime_error("Cannot open socket."); - } - - sockaddr_in server_address = {0}; - server_address.sin_family = AF_INET; - server_address.sin_addr.s_addr = INADDR_ANY; - server_address.sin_port = htons(udp_port); - - if(bind(socket_fd, - reinterpret_cast(&server_address), - sizeof(server_address)) < 0) { - throw runtime_error("Cannot bind socket."); - } - - struct timeval udp_socket_timeout; - udp_socket_timeout.tv_sec = 0; - udp_socket_timeout.tv_usec = 100; - - setsockopt( - socket_fd, - SOL_SOCKET, - SO_RCVTIMEO, - (const char*)&udp_socket_timeout, - sizeof(struct timeval)); - RingBuffer ring_buffer(config::ring_buffer_n_slots); - ring_buffer.initialize(JUNGFRAU_DATA_BYTES_PER_FRAME); - jungfrau_packet packet; + UdpRecvModule udp_module(ring_buffer); + udp_module.start_recv(udp_port, JUNGFRAU_DATA_BYTES_PER_FRAME); - shared_ptr metadata = nullptr; - char* frame_buffer = nullptr; + H5Writer writer("output.h5"); + writer.create_file(); while (true) { - auto data_len = recv(socket_fd, &packet, JUNGFRAU_BYTES_PER_PACKET, 0); + auto data = ring_buffer.read(); - if (data_len < 0) { + if (data.first == nullptr) { + this_thread::sleep_for(chrono::milliseconds(10)); continue; } - if (data_len != JUNGFRAU_BYTES_PER_PACKET) { - cout << "Invalid packet length " << data_len << endl; - continue; - } + auto pulse_id = data.first->pulse_id; + // TODO: Make this modulo of pulse_id. + auto file_frame_index = 0; - auto* current_metadata = metadata.get(); + writer.write_data( + "pulse_id", + file_frame_index, + (char*)(&data.first->pulse_id), + {1}, + 8, + "uint64", + "little"); - if (packet.framenum != current_metadata->frame_index) { - if (frame_buffer != nullptr) { - ring_buffer.commit(metadata); - } + writer.write_data( + "frame_id", + file_frame_index, + (char*)(&data.first->frame_index), + {1}, + 8, + "uint64", + "little"); - metadata = make_shared(); - current_metadata = metadata.get(); + writer.write_data( + "daq_rec", + file_frame_index, + (char*)(&data.first->daq_rec), + {1}, + 8, + "uint32", + "little"); - current_metadata->frame_index = packet.framenum; - current_metadata->pulse_id = packet.bunchid; - current_metadata->frame_bytes_size = JUNGFRAU_DATA_BYTES_PER_FRAME; - current_metadata->recv_packets_1 = 0; - current_metadata->recv_packets_2 = 0; + writer.write_data( + "recv_packets_1", + file_frame_index, + (char*)(&data.first->recv_packets_1), + {1}, + 8, + "uint64", + "little"); - frame_buffer = ring_buffer.reserve(metadata); - memset(frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME); - } - - size_t frame_buffer_offset = - JUNGFRAU_DATA_BYTES_PER_PACKET * packet.packetnum; - - memcpy( - (void *)frame_buffer[frame_buffer_offset], - packet.data, - JUNGFRAU_DATA_BYTES_PER_PACKET); - - if (packet.packetnum < 64) { - current_metadata->recv_packets_1 ^= - (uint64_t)1 << packet.packetnum; - } else { - current_metadata->recv_packets_2 ^= - (uint64_t)1 << (packet.packetnum - 64); - } + writer.write_data( + "recv_packets_2", + file_frame_index, + (char*)(&data.first->recv_packets_2), + {1}, + 8, + "uint64", + "little"); } }