mirror of
https://github.com/paulscherrerinstitute/sf_daq_buffer.git
synced 2026-05-03 23:34:14 +02:00
First stab of sf_buffer
This commit is contained in:
+53
-68
@@ -3,6 +3,9 @@
|
||||
#include <RingBuffer.hpp>
|
||||
#include <sys/socket.h>
|
||||
#include <netinet/in.h>
|
||||
#include <UdpReceiver.hpp>
|
||||
#include <UdpRecvModule.hpp>
|
||||
#include <H5Writer.hpp>
|
||||
|
||||
#include "config.hpp"
|
||||
#include "jungfrau.hpp"
|
||||
@@ -27,88 +30,70 @@ int main (int argc, char *argv[]) {
|
||||
int udp_port = atoi(argv[2]);
|
||||
string root_folder = string(argv[3]);
|
||||
|
||||
auto socket_fd = socket(AF_INET, SOCK_DGRAM, 0);
|
||||
if (socket_fd < 0) {
|
||||
throw runtime_error("Cannot open socket.");
|
||||
}
|
||||
|
||||
sockaddr_in server_address = {0};
|
||||
server_address.sin_family = AF_INET;
|
||||
server_address.sin_addr.s_addr = INADDR_ANY;
|
||||
server_address.sin_port = htons(udp_port);
|
||||
|
||||
if(bind(socket_fd,
|
||||
reinterpret_cast<const sockaddr *>(&server_address),
|
||||
sizeof(server_address)) < 0) {
|
||||
throw runtime_error("Cannot bind socket.");
|
||||
}
|
||||
|
||||
struct timeval udp_socket_timeout;
|
||||
udp_socket_timeout.tv_sec = 0;
|
||||
udp_socket_timeout.tv_usec = 100;
|
||||
|
||||
setsockopt(
|
||||
socket_fd,
|
||||
SOL_SOCKET,
|
||||
SO_RCVTIMEO,
|
||||
(const char*)&udp_socket_timeout,
|
||||
sizeof(struct timeval));
|
||||
|
||||
RingBuffer<UdpFrameMetadata> ring_buffer(config::ring_buffer_n_slots);
|
||||
ring_buffer.initialize(JUNGFRAU_DATA_BYTES_PER_FRAME);
|
||||
|
||||
jungfrau_packet packet;
|
||||
UdpRecvModule udp_module(ring_buffer);
|
||||
udp_module.start_recv(udp_port, JUNGFRAU_DATA_BYTES_PER_FRAME);
|
||||
|
||||
shared_ptr<UdpFrameMetadata> metadata = nullptr;
|
||||
char* frame_buffer = nullptr;
|
||||
H5Writer writer("output.h5");
|
||||
writer.create_file();
|
||||
|
||||
while (true) {
|
||||
auto data_len = recv(socket_fd, &packet, JUNGFRAU_BYTES_PER_PACKET, 0);
|
||||
auto data = ring_buffer.read();
|
||||
|
||||
if (data_len < 0) {
|
||||
if (data.first == nullptr) {
|
||||
this_thread::sleep_for(chrono::milliseconds(10));
|
||||
continue;
|
||||
}
|
||||
|
||||
if (data_len != JUNGFRAU_BYTES_PER_PACKET) {
|
||||
cout << "Invalid packet length " << data_len << endl;
|
||||
continue;
|
||||
}
|
||||
auto pulse_id = data.first->pulse_id;
|
||||
// TODO: Make this modulo of pulse_id.
|
||||
auto file_frame_index = 0;
|
||||
|
||||
auto* current_metadata = metadata.get();
|
||||
writer.write_data(
|
||||
"pulse_id",
|
||||
file_frame_index,
|
||||
(char*)(&data.first->pulse_id),
|
||||
{1},
|
||||
8,
|
||||
"uint64",
|
||||
"little");
|
||||
|
||||
if (packet.framenum != current_metadata->frame_index) {
|
||||
if (frame_buffer != nullptr) {
|
||||
ring_buffer.commit(metadata);
|
||||
}
|
||||
writer.write_data(
|
||||
"frame_id",
|
||||
file_frame_index,
|
||||
(char*)(&data.first->frame_index),
|
||||
{1},
|
||||
8,
|
||||
"uint64",
|
||||
"little");
|
||||
|
||||
metadata = make_shared<UdpFrameMetadata>();
|
||||
current_metadata = metadata.get();
|
||||
writer.write_data(
|
||||
"daq_rec",
|
||||
file_frame_index,
|
||||
(char*)(&data.first->daq_rec),
|
||||
{1},
|
||||
8,
|
||||
"uint32",
|
||||
"little");
|
||||
|
||||
current_metadata->frame_index = packet.framenum;
|
||||
current_metadata->pulse_id = packet.bunchid;
|
||||
current_metadata->frame_bytes_size = JUNGFRAU_DATA_BYTES_PER_FRAME;
|
||||
current_metadata->recv_packets_1 = 0;
|
||||
current_metadata->recv_packets_2 = 0;
|
||||
writer.write_data(
|
||||
"recv_packets_1",
|
||||
file_frame_index,
|
||||
(char*)(&data.first->recv_packets_1),
|
||||
{1},
|
||||
8,
|
||||
"uint64",
|
||||
"little");
|
||||
|
||||
frame_buffer = ring_buffer.reserve(metadata);
|
||||
memset(frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME);
|
||||
}
|
||||
|
||||
size_t frame_buffer_offset =
|
||||
JUNGFRAU_DATA_BYTES_PER_PACKET * packet.packetnum;
|
||||
|
||||
memcpy(
|
||||
(void *)frame_buffer[frame_buffer_offset],
|
||||
packet.data,
|
||||
JUNGFRAU_DATA_BYTES_PER_PACKET);
|
||||
|
||||
if (packet.packetnum < 64) {
|
||||
current_metadata->recv_packets_1 ^=
|
||||
(uint64_t)1 << packet.packetnum;
|
||||
} else {
|
||||
current_metadata->recv_packets_2 ^=
|
||||
(uint64_t)1 << (packet.packetnum - 64);
|
||||
}
|
||||
writer.write_data(
|
||||
"recv_packets_2",
|
||||
file_frame_index,
|
||||
(char*)(&data.first->recv_packets_2),
|
||||
{1},
|
||||
8,
|
||||
"uint64",
|
||||
"little");
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user