mirror of
https://github.com/paulscherrerinstitute/sf_daq_buffer.git
synced 2026-05-03 21:04:14 +02:00
Tests and bugfix
This commit is contained in:
@@ -1,13 +1,13 @@
|
||||
#ifndef JUNGFRAUJOCH_H
|
||||
#define JUNGFRAUJOCH_H
|
||||
#ifndef JUNGFRAUJOCH_HPP
|
||||
#define JUNGFRAUJOCH_HPP
|
||||
|
||||
#include <cstdint>
|
||||
|
||||
#define JFJOCH_N_MODULES 32
|
||||
#define JFJOCH_BYTES_PER_PACKET 8240
|
||||
#define JFJOCH_DATA_BYTES_PER_PACKET 8192
|
||||
#define JFJOCH_N_PACKETS_PER_FRAME JFJOCH_N_MODULES * 128
|
||||
#define JFJOCH_DATA_BYTES_PER_FRAME JFJOCH_N_MODULES * 1048576
|
||||
#define JFJOCH_N_PACKETS_PER_FRAME (JFJOCH_N_MODULES * 128)
|
||||
#define JFJOCH_DATA_BYTES_PER_FRAME (JFJOCH_N_MODULES * 1048576)
|
||||
|
||||
// 48 bytes + 8192 bytes = 8240 bytes
|
||||
#pragma pack(push)
|
||||
@@ -17,7 +17,7 @@ struct jfjoch_packet_t {
|
||||
uint32_t exptime;
|
||||
uint32_t packetnum;
|
||||
|
||||
uint64_t bunchid;
|
||||
int64_t bunchid;
|
||||
uint64_t timestamp;
|
||||
|
||||
uint16_t moduleID;
|
||||
|
||||
@@ -9,4 +9,4 @@ set_target_properties(jfj-udp-recv PROPERTIES OUTPUT_NAME jf_udp_recv)
|
||||
target_link_libraries(jfj-udp-recv jfj-udp-recv-lib zmq rt)
|
||||
|
||||
enable_testing()
|
||||
# add_subdirectory(test/)
|
||||
add_subdirectory(test/)
|
||||
|
||||
@@ -32,8 +32,8 @@ public:
|
||||
// ~PacketBuffer() {};
|
||||
|
||||
/**Diagnostics**/
|
||||
size_t size() const { return ( idx_write-idx_read ); }
|
||||
size_t capacity() const { return m_capacity; }
|
||||
int size() const { return ( idx_write-idx_read ); }
|
||||
int capacity() const { return m_capacity; }
|
||||
bool is_full() const { return bool(idx_write >= m_capacity); }
|
||||
bool is_empty() const { return bool(idx_write <= idx_read); }
|
||||
|
||||
@@ -52,6 +52,8 @@ public:
|
||||
void fill_from(TY& recv){
|
||||
std::lock_guard<std::mutex> g_guard(m_mutex);
|
||||
this->idx_write = recv.receive_many(m_msgs, this->capacity());
|
||||
// Returns -1 with errno=11 if no data received
|
||||
if(idx_write==-1){ idx_write = 0; }
|
||||
this->idx_read = 0;
|
||||
}
|
||||
|
||||
@@ -62,8 +64,8 @@ private:
|
||||
/**Guards**/
|
||||
std::mutex m_mutex;
|
||||
/**Read and write index**/
|
||||
size_t idx_write = 0;
|
||||
size_t idx_read = 0;
|
||||
int idx_write = 0;
|
||||
int idx_read = 0;
|
||||
|
||||
// C-structures as expected by <sockets.h>
|
||||
mmsghdr m_msgs[CAPACITY];
|
||||
|
||||
@@ -1,10 +1,17 @@
|
||||
#include <cstring>
|
||||
#include <jungfraujoch.hpp>
|
||||
#include "JfjFrameUdpReceiver.hpp"
|
||||
|
||||
using namespace std;
|
||||
using namespace buffer_config;
|
||||
|
||||
std::ostream &operator<<(std::ostream &os, jfjoch_packet_t const &packet) {
|
||||
os << "Frame number: " << packet.framenum << std::endl;
|
||||
os << "Packet number: " << packet.packetnum << std::endl;
|
||||
os << "Bunch id: " << packet.bunchid << std::endl;
|
||||
os << std::endl;
|
||||
return os;
|
||||
}
|
||||
|
||||
JfjFrameUdpReceiver::JfjFrameUdpReceiver(const uint16_t port) {
|
||||
m_udp_receiver.bind(port);
|
||||
}
|
||||
@@ -13,7 +20,9 @@ JfjFrameUdpReceiver::~JfjFrameUdpReceiver() {
|
||||
m_udp_receiver.disconnect();
|
||||
}
|
||||
|
||||
inline void JfjFrameUdpReceiver::init_frame(ModuleFrame& metadata, const jfjoch_packet_t& c_packet) {
|
||||
inline void JfjFrameUdpReceiver::init_frame(ModuleFrame& metadata, const jfjoch_packet_t& c_packet) {
|
||||
// std::cout << c_packet;
|
||||
|
||||
metadata.pulse_id = c_packet.bunchid;
|
||||
metadata.frame_index = c_packet.framenum;
|
||||
metadata.daq_rec = (uint64_t) c_packet.debug;
|
||||
@@ -26,6 +35,7 @@ inline uint64_t JfjFrameUdpReceiver::process_packets(ModuleFrame& metadata, char
|
||||
// Happens if the last packet from the previous frame gets lost.
|
||||
if (m_frame_index != m_buffer.peek_front().framenum) {
|
||||
m_frame_index = m_buffer.peek_front().framenum;
|
||||
std::cout << "Peeked pulse: " << metadata.pulse_id << std::endl;
|
||||
return metadata.pulse_id;
|
||||
}
|
||||
|
||||
@@ -48,7 +58,7 @@ inline uint64_t JfjFrameUdpReceiver::process_packets(ModuleFrame& metadata, char
|
||||
}
|
||||
|
||||
// We emptied the buffer.
|
||||
m_buffer.reset();
|
||||
// m_buffer.reset();
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -56,8 +66,7 @@ uint64_t JfjFrameUdpReceiver::get_frame_from_udp(ModuleFrame& metadata, char* fr
|
||||
// Reset the metadata and frame buffer for the next frame. (really needed?)
|
||||
metadata.pulse_id = 0;
|
||||
metadata.n_recv_packets = 0;
|
||||
memset(frame_buffer, 0, JFJOCH_DATA_BYTES_PER_PACKET);
|
||||
|
||||
memset(frame_buffer, 0, JFJOCH_DATA_BYTES_PER_FRAME);
|
||||
// Process leftover packages in the buffer
|
||||
if (!m_buffer.is_empty()) {
|
||||
auto pulse_id = process_packets(metadata, frame_buffer);
|
||||
@@ -66,7 +75,8 @@ uint64_t JfjFrameUdpReceiver::get_frame_from_udp(ModuleFrame& metadata, char* fr
|
||||
|
||||
while (true) {
|
||||
// Receive new packages (pass if none)...
|
||||
m_buffer.fill_from(m_udp_receiver);
|
||||
m_buffer.reset();
|
||||
m_buffer.fill_from(m_udp_receiver);
|
||||
if (m_buffer.is_empty()) { continue; }
|
||||
|
||||
// ... and process them
|
||||
|
||||
@@ -18,8 +18,7 @@ int main (int argc, char *argv[]) {
|
||||
|
||||
if (argc != 3) {
|
||||
cout << endl;
|
||||
cout << "Usage: jfj_udp_recv [detector_json_filename]";
|
||||
cout << endl;
|
||||
cout << "Usage: jfj_udp_recv [detector_json_filename]" << endl;
|
||||
cout << "\tdetector_json_filename: detector config file path." << endl;
|
||||
cout << endl;
|
||||
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
add_executable(jf-udp-recv-tests main.cpp)
|
||||
add_executable(jfj-udp-recv-tests main.cpp)
|
||||
|
||||
target_link_libraries(jf-udp-recv-tests
|
||||
target_link_libraries(jfj-udp-recv-tests
|
||||
core-buffer-lib
|
||||
jf-udp-recv-lib
|
||||
jfj-udp-recv-lib
|
||||
gtest
|
||||
)
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
#include <netinet/in.h>
|
||||
#include <jungfrau.hpp>
|
||||
#include <jungfraujoch.hpp>
|
||||
#include "gtest/gtest.h"
|
||||
#include "FrameUdpReceiver.hpp"
|
||||
#include "JfjFrameUdpReceiver.hpp"
|
||||
#include "mock/udp.hpp"
|
||||
|
||||
#include <thread>
|
||||
@@ -12,7 +12,7 @@ using namespace std;
|
||||
|
||||
TEST(BufferUdpReceiver, simple_recv)
|
||||
{
|
||||
auto n_packets = JF_N_PACKETS_PER_FRAME;
|
||||
int n_packets = JFJOCH_N_PACKETS_PER_FRAME;
|
||||
int n_frames = 5;
|
||||
|
||||
uint16_t udp_port = MOCK_UDP_PORT;
|
||||
@@ -23,9 +23,9 @@ TEST(BufferUdpReceiver, simple_recv)
|
||||
JfjFrameUdpReceiver udp_receiver(udp_port);
|
||||
|
||||
auto handle = async(launch::async, [&](){
|
||||
for (int i_frame=0; i_frame < n_frames; i_frame++){
|
||||
for (int64_t i_frame=0; i_frame < n_frames; i_frame++){
|
||||
for (size_t i_packet=0; i_packet<n_packets; i_packet++) {
|
||||
jungfrau_packet send_udp_buffer;
|
||||
jfjoch_packet_t send_udp_buffer;
|
||||
send_udp_buffer.packetnum = i_packet;
|
||||
send_udp_buffer.bunchid = i_frame + 1;
|
||||
send_udp_buffer.framenum = i_frame + 1000;
|
||||
@@ -34,7 +34,7 @@ TEST(BufferUdpReceiver, simple_recv)
|
||||
::sendto(
|
||||
send_socket_fd,
|
||||
&send_udp_buffer,
|
||||
JUNGFRAU_BYTES_PER_PACKET,
|
||||
JFJOCH_BYTES_PER_PACKET,
|
||||
0,
|
||||
(sockaddr*) &server_address,
|
||||
sizeof(server_address));
|
||||
@@ -45,7 +45,7 @@ TEST(BufferUdpReceiver, simple_recv)
|
||||
handle.wait();
|
||||
|
||||
ModuleFrame metadata;
|
||||
auto frame_buffer = make_unique<char[]>(JUNGFRAU_DATA_BYTES_PER_FRAME);
|
||||
auto frame_buffer = make_unique<char[]>(JFJOCH_DATA_BYTES_PER_FRAME);
|
||||
|
||||
for (int i_frame=0; i_frame < n_frames; i_frame++) {
|
||||
auto pulse_id = udp_receiver.get_frame_from_udp(
|
||||
@@ -63,7 +63,7 @@ TEST(BufferUdpReceiver, simple_recv)
|
||||
|
||||
TEST(BufferUdpReceiver, missing_middle_packet)
|
||||
{
|
||||
auto n_packets = JF_N_PACKETS_PER_FRAME;
|
||||
int n_packets = JFJOCH_N_PACKETS_PER_FRAME;
|
||||
int n_frames = 3;
|
||||
|
||||
uint16_t udp_port = MOCK_UDP_PORT;
|
||||
@@ -71,17 +71,17 @@ TEST(BufferUdpReceiver, missing_middle_packet)
|
||||
auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0);
|
||||
ASSERT_TRUE(send_socket_fd >= 0);
|
||||
|
||||
JfjFrameUdpReceiver udp_receiver(udp_port, source_id);
|
||||
JfjFrameUdpReceiver udp_receiver(udp_port);
|
||||
|
||||
auto handle = async(launch::async, [&](){
|
||||
for (int i_frame=0; i_frame < n_frames; i_frame++){
|
||||
for (int64_t i_frame=0; i_frame < n_frames; i_frame++){
|
||||
for (size_t i_packet=0; i_packet<n_packets; i_packet++) {
|
||||
// Skip some random middle packet.
|
||||
if (i_packet == 10) {
|
||||
continue;
|
||||
}
|
||||
|
||||
jungfrau_packet send_udp_buffer;
|
||||
jfjoch_packet_t send_udp_buffer;
|
||||
send_udp_buffer.packetnum = i_packet;
|
||||
send_udp_buffer.bunchid = i_frame + 1;
|
||||
send_udp_buffer.framenum = i_frame + 1000;
|
||||
@@ -90,7 +90,7 @@ TEST(BufferUdpReceiver, missing_middle_packet)
|
||||
::sendto(
|
||||
send_socket_fd,
|
||||
&send_udp_buffer,
|
||||
JUNGFRAU_BYTES_PER_PACKET,
|
||||
JFJOCH_BYTES_PER_PACKET,
|
||||
0,
|
||||
(sockaddr*) &server_address,
|
||||
sizeof(server_address));
|
||||
@@ -101,7 +101,7 @@ TEST(BufferUdpReceiver, missing_middle_packet)
|
||||
handle.wait();
|
||||
|
||||
ModuleFrame metadata;
|
||||
auto frame_buffer = make_unique<char[]>(JUNGFRAU_DATA_BYTES_PER_FRAME);
|
||||
auto frame_buffer = make_unique<char[]>(JFJOCH_DATA_BYTES_PER_FRAME);
|
||||
|
||||
for (int i_frame=0; i_frame < n_frames; i_frame++) {
|
||||
auto pulse_id = udp_receiver.get_frame_from_udp(
|
||||
@@ -119,7 +119,7 @@ TEST(BufferUdpReceiver, missing_middle_packet)
|
||||
|
||||
TEST(BufferUdpReceiver, missing_first_packet)
|
||||
{
|
||||
auto n_packets = JF_N_PACKETS_PER_FRAME;
|
||||
auto n_packets = JFJOCH_N_PACKETS_PER_FRAME;
|
||||
int n_frames = 3;
|
||||
|
||||
uint16_t udp_port = MOCK_UDP_PORT;
|
||||
@@ -130,14 +130,14 @@ TEST(BufferUdpReceiver, missing_first_packet)
|
||||
JfjFrameUdpReceiver udp_receiver(udp_port);
|
||||
|
||||
auto handle = async(launch::async, [&](){
|
||||
for (int i_frame=0; i_frame < n_frames; i_frame++){
|
||||
for (int64_t i_frame=0; i_frame < n_frames; i_frame++){
|
||||
for (size_t i_packet=0; i_packet<n_packets; i_packet++) {
|
||||
// Skip first packet.
|
||||
if (i_packet == 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
jungfrau_packet send_udp_buffer;
|
||||
jfjoch_packet_t send_udp_buffer;
|
||||
send_udp_buffer.packetnum = i_packet;
|
||||
send_udp_buffer.bunchid = i_frame + 1;
|
||||
send_udp_buffer.framenum = i_frame + 1000;
|
||||
@@ -157,7 +157,7 @@ TEST(BufferUdpReceiver, missing_first_packet)
|
||||
handle.wait();
|
||||
|
||||
ModuleFrame metadata;
|
||||
auto frame_buffer = make_unique<char[]>(JUNGFRAU_DATA_BYTES_PER_FRAME);
|
||||
auto frame_buffer = make_unique<char[]>(JFJOCH_DATA_BYTES_PER_FRAME);
|
||||
|
||||
for (int i_frame=0; i_frame < n_frames; i_frame++) {
|
||||
auto pulse_id = udp_receiver.get_frame_from_udp(
|
||||
@@ -175,7 +175,7 @@ TEST(BufferUdpReceiver, missing_first_packet)
|
||||
|
||||
TEST(BufferUdpReceiver, missing_last_packet)
|
||||
{
|
||||
auto n_packets = JF_N_PACKETS_PER_FRAME;
|
||||
int n_packets = JFJOCH_N_PACKETS_PER_FRAME;
|
||||
int n_frames = 3;
|
||||
|
||||
uint16_t udp_port = MOCK_UDP_PORT;
|
||||
@@ -186,14 +186,14 @@ TEST(BufferUdpReceiver, missing_last_packet)
|
||||
JfjFrameUdpReceiver udp_receiver(udp_port);
|
||||
|
||||
auto handle = async(launch::async, [&](){
|
||||
for (int i_frame=0; i_frame < n_frames; i_frame++){
|
||||
for (int64_t i_frame=0; i_frame < n_frames; i_frame++){
|
||||
for (size_t i_packet=0; i_packet<n_packets; i_packet++) {
|
||||
// Skip the last packet.
|
||||
if (i_packet == n_packets-1) {
|
||||
continue;
|
||||
}
|
||||
|
||||
jungfrau_packet send_udp_buffer;
|
||||
jfjoch_packet_t send_udp_buffer;
|
||||
send_udp_buffer.packetnum = i_packet;
|
||||
send_udp_buffer.bunchid = i_frame + 1;
|
||||
send_udp_buffer.framenum = i_frame + 1000;
|
||||
@@ -213,7 +213,7 @@ TEST(BufferUdpReceiver, missing_last_packet)
|
||||
handle.wait();
|
||||
|
||||
ModuleFrame metadata;
|
||||
auto frame_buffer = make_unique<char[]>(JUNGFRAU_DATA_BYTES_PER_FRAME);
|
||||
auto frame_buffer = make_unique<char[]>(JFJOCH_DATA_BYTES_PER_FRAME);
|
||||
|
||||
// n_frames -1 because the last frame is not complete.
|
||||
for (int i_frame=0; i_frame < n_frames - 1; i_frame++) {
|
||||
|
||||
Reference in New Issue
Block a user