From 6d210ed70294878e85ad2d5b6c5b289c844d2f1e Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Wed, 20 May 2020 12:22:20 +0200 Subject: [PATCH] Finished refactoring sf-buffer --- sf-buffer/CMakeLists.txt | 20 ++ sf-buffer/include/BufferBinaryFormat.hpp | 23 ++ sf-buffer/include/BufferBinaryWriter.hpp | 32 +++ sf-buffer/include/BufferUdpReceiver.hpp | 36 ++++ sf-buffer/include/UdpReceiver.hpp | 22 ++ sf-buffer/include/WriterUtils.hpp | 11 + sf-buffer/src/BufferBinaryWriter.cpp | 124 +++++++++++ sf-buffer/src/BufferH5Writer.cpp | 126 +++++++++++ sf-buffer/src/BufferUdpReceiver.cpp | 137 ++++++++++++ sf-buffer/src/UdpReceiver.cpp | 89 ++++++++ sf-buffer/src/WriterUtils.cpp | 49 +++++ sf-buffer/src/main.cpp | 150 +++++++++++++ sf-buffer/test/CMakeLists.txt | 12 ++ sf-buffer/test/main.cpp | 12 ++ sf-buffer/test/test_BufferBinaryWriter.cpp | 86 ++++++++ sf-buffer/test/test_BufferH5Writer.cpp | 130 +++++++++++ sf-buffer/test/test_BufferUdpReceiver.cpp | 239 +++++++++++++++++++++ sf-buffer/test/test_UdpReceiver.cpp | 170 +++++++++++++++ 18 files changed, 1468 insertions(+) create mode 100644 sf-buffer/CMakeLists.txt create mode 100644 sf-buffer/include/BufferBinaryFormat.hpp create mode 100644 sf-buffer/include/BufferBinaryWriter.hpp create mode 100644 sf-buffer/include/BufferUdpReceiver.hpp create mode 100644 sf-buffer/include/UdpReceiver.hpp create mode 100644 sf-buffer/include/WriterUtils.hpp create mode 100644 sf-buffer/src/BufferBinaryWriter.cpp create mode 100644 sf-buffer/src/BufferH5Writer.cpp create mode 100644 sf-buffer/src/BufferUdpReceiver.cpp create mode 100644 sf-buffer/src/UdpReceiver.cpp create mode 100644 sf-buffer/src/WriterUtils.cpp create mode 100644 sf-buffer/src/main.cpp create mode 100644 sf-buffer/test/CMakeLists.txt create mode 100644 sf-buffer/test/main.cpp create mode 100644 sf-buffer/test/test_BufferBinaryWriter.cpp create mode 100644 sf-buffer/test/test_BufferH5Writer.cpp create mode 100644 sf-buffer/test/test_BufferUdpReceiver.cpp create mode 100644 sf-buffer/test/test_UdpReceiver.cpp diff --git a/sf-buffer/CMakeLists.txt b/sf-buffer/CMakeLists.txt new file mode 100644 index 0000000..c4c74ed --- /dev/null +++ b/sf-buffer/CMakeLists.txt @@ -0,0 +1,20 @@ +file(GLOB SOURCES + src/*.cpp) + +add_library(sf-buffer-lib STATIC ${SOURCES}) +target_include_directories(sf-buffer-lib PUBLIC include/) +target_link_libraries(sf-buffer-lib + external + core-buffer-lib) + +add_executable(sf-buffer src/main.cpp) +set_target_properties(sf-buffer PROPERTIES OUTPUT_NAME sf_buffer) +target_link_libraries(sf-buffer + core-buffer-lib + sf-buffer-lib + zmq + hdf5 + hdf5_cpp) + +enable_testing() +add_subdirectory(test/) \ No newline at end of file diff --git a/sf-buffer/include/BufferBinaryFormat.hpp b/sf-buffer/include/BufferBinaryFormat.hpp new file mode 100644 index 0000000..d9f72cd --- /dev/null +++ b/sf-buffer/include/BufferBinaryFormat.hpp @@ -0,0 +1,23 @@ +#ifndef JFFILEFORMAT_HPP +#define JFFILEFORMAT_HPP + +#include "jungfrau.hpp" + +const char JF_FORMAT_START_BYTE = 0xBE; + +#pragma pack(push) +#pragma pack(1) +struct BufferBinaryFormat { + + BufferBinaryFormat() : FORMAT_MARKER(JF_FORMAT_START_BYTE) {}; + + const char FORMAT_MARKER; + uint64_t pulse_id; + uint64_t frame_id; + uint32_t daq_rec; + uint16_t n_recv_packets; + char data[JUNGFRAU_DATA_BYTES_PER_FRAME]; +}; +#pragma pack(pop) + +#endif // JFFILEFORMAT_HPP \ No newline at end of file diff --git a/sf-buffer/include/BufferBinaryWriter.hpp b/sf-buffer/include/BufferBinaryWriter.hpp new file mode 100644 index 0000000..ee90afb --- /dev/null +++ b/sf-buffer/include/BufferBinaryWriter.hpp @@ -0,0 +1,32 @@ +#ifndef BINARYWRITER_HPP +#define BINARYWRITER_HPP + +#include +#include "BufferBinaryFormat.hpp" + +class BufferBinaryWriter { + + const std::string device_name_; + const std::string root_folder_; + std::string latest_filename_; + + std::string current_output_filename_; + int output_file_fd_; + + void open_file(const std::string& filename); + void close_current_file(); + + +public: + BufferBinaryWriter( + const std::string& device_name, + const std::string& root_folder); + + virtual ~BufferBinaryWriter(); + + void write(const uint64_t pulse_id, const BufferBinaryFormat* buffer); + +}; + + +#endif //BINARYWRITER_HPP diff --git a/sf-buffer/include/BufferUdpReceiver.hpp b/sf-buffer/include/BufferUdpReceiver.hpp new file mode 100644 index 0000000..2d97a47 --- /dev/null +++ b/sf-buffer/include/BufferUdpReceiver.hpp @@ -0,0 +1,36 @@ +#ifndef SF_DAQ_BUFFER_BUFFERUDPRECEIVER_HPP +#define SF_DAQ_BUFFER_BUFFERUDPRECEIVER_HPP + +#include +#include "UdpReceiver.hpp" +#include "jungfrau.hpp" +#include "buffer_config.hpp" + +class BufferUdpReceiver { + const int source_id_; + + UdpReceiver udp_receiver_; + + jungfrau_packet packet_buffer_[core_buffer::BUFFER_UDP_N_RECV_MSG]; + iovec recv_buff_ptr_[core_buffer::BUFFER_UDP_N_RECV_MSG]; + mmsghdr msgs_[core_buffer::BUFFER_UDP_N_RECV_MSG]; + sockaddr_in sock_from_[core_buffer::BUFFER_UDP_N_RECV_MSG]; + + bool packet_buffer_loaded_ = false; + int packet_buffer_n_packets_ = 0; + int packet_buffer_offset_ = 0; + + inline void init_frame(ModuleFrame& frame_metadata, const int i_packet); + inline void copy_packet_to_buffers( + ModuleFrame& metadata, char* frame_buffer, const int i_packet); + inline uint64_t process_packets( + const int n_packets, ModuleFrame& metadata, char* frame_buffer); + +public: + BufferUdpReceiver(const uint16_t port, const int source_id); + virtual ~BufferUdpReceiver(); + uint64_t get_frame_from_udp(ModuleFrame& metadata, char* frame_buffer); +}; + + +#endif //SF_DAQ_BUFFER_BUFFERUDPRECEIVER_HPP diff --git a/sf-buffer/include/UdpReceiver.hpp b/sf-buffer/include/UdpReceiver.hpp new file mode 100644 index 0000000..0bea8ef --- /dev/null +++ b/sf-buffer/include/UdpReceiver.hpp @@ -0,0 +1,22 @@ +#ifndef UDPRECEIVER_H +#define UDPRECEIVER_H + +#include + +class UdpReceiver { + + int socket_fd_; + +public: + UdpReceiver(); + virtual ~UdpReceiver(); + + bool receive(void* buffer, const size_t buffer_n_bytes); + int receive_many(mmsghdr* msgs, const size_t n_msgs); + + void bind(const uint16_t port); + void disconnect(); +}; + + +#endif //LIB_CPP_H5_WRITER_UDPRECEIVER_H diff --git a/sf-buffer/include/WriterUtils.hpp b/sf-buffer/include/WriterUtils.hpp new file mode 100644 index 0000000..266518f --- /dev/null +++ b/sf-buffer/include/WriterUtils.hpp @@ -0,0 +1,11 @@ +#ifndef WRITERUTILS_H +#define WRITERUTILS_H + +#include + +namespace WriterUtils { + void set_process_effective_id(int user_id); + void create_destination_folder(const std::string& output_file); +} + +#endif // WRITERUTILS_H diff --git a/sf-buffer/src/BufferBinaryWriter.cpp b/sf-buffer/src/BufferBinaryWriter.cpp new file mode 100644 index 0000000..750ef76 --- /dev/null +++ b/sf-buffer/src/BufferBinaryWriter.cpp @@ -0,0 +1,124 @@ +#include "BufferBinaryWriter.hpp" +#include +#include +#include "date.h" +#include +#include +#include +#include +#include +#include + +using namespace std; + +BufferBinaryWriter::BufferBinaryWriter( + const string& device_name, + const string& root_folder) : + device_name_(device_name), + root_folder_(root_folder), + latest_filename_(root_folder + "/" + device_name + "/LATEST"), + current_output_filename_(""), + output_file_fd_(-1) +{ +} + +BufferBinaryWriter::~BufferBinaryWriter() +{ + close_current_file(); +} + +void BufferBinaryWriter::write(uint64_t pulse_id, const BufferBinaryFormat* buffer) +{ + auto current_frame_file = + BufferUtils::get_filename(root_folder_, device_name_, pulse_id); + + if (current_frame_file != current_output_filename_) { + open_file(current_frame_file); + } + + size_t n_bytes_offset = + BufferUtils::get_file_frame_index(pulse_id) * sizeof(BufferBinaryFormat); + + auto lseek_result = lseek(output_file_fd_, n_bytes_offset, SEEK_SET); + if (lseek_result < 0) { + stringstream err_msg; + + using namespace date; + using namespace chrono; + err_msg << "[" << system_clock::now() << "]"; + err_msg << "[BinaryWriter::write]"; + err_msg << " Error while lseek on file "; + err_msg << current_output_filename_; + err_msg << " for n_bytes_offset "; + err_msg << n_bytes_offset << ": "; + err_msg << strerror(errno) << endl; + + throw runtime_error(err_msg.str()); + } + + auto n_bytes = ::write(output_file_fd_, buffer, sizeof(BufferBinaryFormat)); + if (n_bytes < sizeof(BufferBinaryFormat)) { + stringstream err_msg; + + using namespace date; + using namespace chrono; + err_msg << "[" << system_clock::now() << "]"; + err_msg << "[BinaryWriter::write]"; + err_msg << " Error while writing to file "; + err_msg << current_output_filename_ << ": "; + err_msg << strerror(errno) << endl; + + throw runtime_error(err_msg.str()); + } +} + +void BufferBinaryWriter::open_file(const std::string& filename) +{ + close_current_file(); + + WriterUtils::create_destination_folder(filename); + + output_file_fd_ = ::open(filename.c_str(), O_WRONLY | O_CREAT, + S_IRWXU | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH); + if (output_file_fd_ < 0) { + stringstream err_msg; + + using namespace date; + using namespace chrono; + err_msg << "[" << system_clock::now() << "]"; + err_msg << "[BinaryWriter::open_file]"; + err_msg << " Cannot create file "; + err_msg << filename << ": "; + err_msg << strerror(errno) << endl; + + throw runtime_error(err_msg.str()); + } + + current_output_filename_ = filename; +} + +void BufferBinaryWriter::close_current_file() +{ + if (output_file_fd_ != -1) { + if (close(output_file_fd_) < 0) { + stringstream err_msg; + + using namespace date; + using namespace chrono; + err_msg << "[" << system_clock::now() << "]"; + err_msg << "[BinaryWriter::close_current_file]"; + err_msg << " Error while closing file "; + err_msg << current_output_filename_ << ": "; + err_msg << strerror(errno) << endl; + + throw runtime_error(err_msg.str()); + } + + output_file_fd_ = -1; + + BufferUtils::update_latest_file( + latest_filename_, current_output_filename_); + } + + current_output_filename_ = ""; +} \ No newline at end of file diff --git a/sf-buffer/src/BufferH5Writer.cpp b/sf-buffer/src/BufferH5Writer.cpp new file mode 100644 index 0000000..ed7ce39 --- /dev/null +++ b/sf-buffer/src/BufferH5Writer.cpp @@ -0,0 +1,126 @@ +#include +#include "BufferH5Writer.hpp" +#include +#include +#include + +extern "C" +{ + #include "H5DOpublic.h" +} + +using namespace std; +using namespace core_buffer; + +BufferH5Writer::BufferH5Writer( + const string& root_folder, + const string& device_name) : + root_folder_(root_folder), + device_name_(device_name), + LATEST_filename_(root_folder + "/" + device_name + "/LATEST"), + CURRENT_filename_(root_folder + "/" + device_name + "/CURRENT"), + output_filename_(""), + current_pulse_id_(0), + current_file_index_(0) +{ +} + +void BufferH5Writer::create_file(const string& filename) +{ + + h5_file_ = H5::H5File(filename, H5F_ACC_TRUNC); + + output_filename_ = filename; + + H5::DataSpace data_dspace(3, data_disk_dims, data_disk_dims); + H5::DSetCreatPropList data_dset_prop; + hsize_t data_dset_chunking[3] = {1, MODULE_Y_SIZE, MODULE_X_SIZE}; + data_dset_prop.setChunk(3, data_dset_chunking); + + current_image_dataset_ = h5_file_.createDataSet( + BUFFER_H5_FRAME_DATASET, + H5::PredType::NATIVE_UINT16, + data_dspace, + data_dset_prop); + + H5::DataSpace meta_dspace(2, meta_disk_dims, meta_disk_dims); + H5::DSetCreatPropList meta_dset_prop; + hsize_t meta_dset_chunking[2] = {1, ModuleFrame_N_FIELDS}; + meta_dset_prop.setChunk(2, meta_dset_chunking); + + current_metadata_dataset_ = h5_file_.createDataSet( + BUFFER_H5_METADATA_DATASET, + H5::PredType::NATIVE_UINT64, + meta_dspace, + meta_dset_prop); +} + +BufferH5Writer::~BufferH5Writer() +{ + close_file(); +} + +void BufferH5Writer::close_file() { + current_image_dataset_.close(); + current_metadata_dataset_.close(); + + h5_file_.close(); + output_filename_ = ""; + + current_pulse_id_ = 0; + current_file_index_ = 0; +} + +void BufferH5Writer::set_pulse_id(const uint64_t pulse_id) +{ + current_pulse_id_ = pulse_id; + current_file_index_ = BufferUtils::get_file_frame_index(pulse_id); + + auto new_output_filename = BufferUtils::get_filename( + root_folder_, device_name_, pulse_id); + + if (new_output_filename != output_filename_){ + + if (h5_file_.getId() != -1) { + auto latest_filename = output_filename_; + close_file(); + BufferUtils::update_latest_file(LATEST_filename_, latest_filename); + } + + WriterUtils::create_destination_folder(new_output_filename); + create_file(new_output_filename); + + BufferUtils::update_latest_file(CURRENT_filename_, output_filename_); + } +} + +void BufferH5Writer::write(const ModuleFrame* metadata, const char* data) +{ + hsize_t meta_buff_dims[1] = {ModuleFrame_N_FIELDS}; + H5::DataSpace meta_buffer_space (1, meta_buff_dims); + + H5::DataSpace meta_disk_space(2, meta_disk_dims); + hsize_t meta_count[] = {1, ModuleFrame_N_FIELDS}; + hsize_t meta_start[] = {current_file_index_, 0}; + meta_disk_space.selectHyperslab(H5S_SELECT_SET, meta_count, meta_start); + + current_metadata_dataset_.write( + (char*) metadata, + H5::PredType::NATIVE_UINT64, + meta_buffer_space, + meta_disk_space); + + hsize_t data_buff_dims[2] = {MODULE_Y_SIZE, MODULE_X_SIZE}; + H5::DataSpace data_buffer_space (2, data_buff_dims); + + H5::DataSpace data_disk_space(3, data_disk_dims); + hsize_t data_count[] = {1, MODULE_Y_SIZE, MODULE_X_SIZE}; + hsize_t data_start[] = {current_file_index_, 0, 0}; + data_disk_space.selectHyperslab(H5S_SELECT_SET, data_count, data_start); + + current_image_dataset_.write( + data, + H5::PredType::NATIVE_UINT16, + data_buffer_space, + data_disk_space); +} diff --git a/sf-buffer/src/BufferUdpReceiver.cpp b/sf-buffer/src/BufferUdpReceiver.cpp new file mode 100644 index 0000000..68b51dc --- /dev/null +++ b/sf-buffer/src/BufferUdpReceiver.cpp @@ -0,0 +1,137 @@ +#include +#include +#include "BufferUdpReceiver.hpp" + +using namespace std; +using namespace core_buffer; + +BufferUdpReceiver::BufferUdpReceiver( + const uint16_t port, + const int source_id) : + source_id_(source_id) +{ + udp_receiver_.bind(port); + + for (int i = 0; i < BUFFER_UDP_N_RECV_MSG; i++) { + recv_buff_ptr_[i].iov_base = (void*) &(packet_buffer_[i]); + recv_buff_ptr_[i].iov_len = sizeof(jungfrau_packet); + + msgs_[i].msg_hdr.msg_iov = &recv_buff_ptr_[i]; + msgs_[i].msg_hdr.msg_iovlen = 1; + msgs_[i].msg_hdr.msg_name = &sock_from_[i]; + msgs_[i].msg_hdr.msg_namelen = sizeof(sockaddr_in); + } +} + +BufferUdpReceiver::~BufferUdpReceiver() { + udp_receiver_.disconnect(); +} + +inline void BufferUdpReceiver::init_frame( + ModuleFrame& frame_metadata, const int i_packet) +{ + frame_metadata.pulse_id = packet_buffer_[i_packet].bunchid; + frame_metadata.frame_index = packet_buffer_[i_packet].framenum; + frame_metadata.daq_rec = (uint64_t) packet_buffer_[i_packet].debug; + frame_metadata.module_id = (int64_t) source_id_; +} + +inline void BufferUdpReceiver::copy_packet_to_buffers( + ModuleFrame& metadata, char* frame_buffer, const int i_packet) +{ + size_t frame_buffer_offset = + JUNGFRAU_DATA_BYTES_PER_PACKET * packet_buffer_[i_packet].packetnum; + memcpy( + (void*) (frame_buffer + frame_buffer_offset), + packet_buffer_[i_packet].data, + JUNGFRAU_DATA_BYTES_PER_PACKET); + + metadata.n_received_packets++; +} + +inline uint64_t BufferUdpReceiver::process_packets( + const int start_offset, + ModuleFrame& metadata, + char* frame_buffer) +{ + for ( + int i_packet=start_offset; + i_packet < packet_buffer_n_packets_; + i_packet++) { + + // First packet for this frame. + if (metadata.pulse_id == 0) { + init_frame(metadata, i_packet); + + // Happens if the last packet from the previous frame gets lost. + } else if (metadata.pulse_id != packet_buffer_[i_packet].bunchid) { + packet_buffer_loaded_ = true; + // Continue on this packet. + packet_buffer_offset_ = i_packet; + + return metadata.pulse_id; + } + + copy_packet_to_buffers(metadata, frame_buffer, i_packet); + + // Last frame packet received. Frame finished. + if (packet_buffer_[i_packet].packetnum == + JUNGFRAU_N_PACKETS_PER_FRAME-1) + { + // Buffer is loaded only if this is not the last message. + if (i_packet+1 != packet_buffer_n_packets_) { + packet_buffer_loaded_ = true; + // Continue on next packet. + packet_buffer_offset_ = i_packet + 1; + + // If i_packet is the last packet the buffer is empty. + } else { + packet_buffer_loaded_ = false; + packet_buffer_offset_ = 0; + } + + return metadata.pulse_id; + } + } + // We emptied the buffer. + packet_buffer_loaded_ = false; + packet_buffer_offset_ = 0; + + return 0; +} + +uint64_t BufferUdpReceiver::get_frame_from_udp( + ModuleFrame& metadata, char* frame_buffer) +{ + // Reset the metadata and frame buffer for the next frame. + metadata.pulse_id = 0; + metadata.n_received_packets = 0; + memset(frame_buffer, 0, JUNGFRAU_DATA_BYTES_PER_FRAME); + + // Happens when last packet from previous frame was missed. + if (packet_buffer_loaded_) { + + auto pulse_id = process_packets( + packet_buffer_offset_, metadata, frame_buffer); + + if (pulse_id != 0) { + return pulse_id; + } + } + + while (true) { + + packet_buffer_n_packets_ = udp_receiver_.receive_many( + msgs_, BUFFER_UDP_N_RECV_MSG); + + if (packet_buffer_n_packets_ == 0) { + continue; + } + + auto pulse_id = process_packets(0, metadata, frame_buffer); + + if (pulse_id != 0) { + return pulse_id; + } + } +} \ No newline at end of file diff --git a/sf-buffer/src/UdpReceiver.cpp b/sf-buffer/src/UdpReceiver.cpp new file mode 100644 index 0000000..1832674 --- /dev/null +++ b/sf-buffer/src/UdpReceiver.cpp @@ -0,0 +1,89 @@ +#include +#include +#include "UdpReceiver.hpp" +#include "jungfrau.hpp" +#include +#include +#include "buffer_config.hpp" + +using namespace std; +using namespace core_buffer; + +UdpReceiver::UdpReceiver() : + socket_fd_(-1) +{ +} + +UdpReceiver::~UdpReceiver() +{ + disconnect(); +} + +void UdpReceiver::bind(const uint16_t port) +{ + if (socket_fd_ > -1) { + throw runtime_error("Socket already bound."); + } + + socket_fd_ = socket(AF_INET, SOCK_DGRAM, 0); + if (socket_fd_ < 0) { + throw runtime_error("Cannot open socket."); + } + + sockaddr_in server_address = {0}; + server_address.sin_family = AF_INET; + server_address.sin_addr.s_addr = INADDR_ANY; + server_address.sin_port = htons(port); + + timeval udp_socket_timeout; + udp_socket_timeout.tv_sec = 0; + udp_socket_timeout.tv_usec = BUFFER_UDP_US_TIMEOUT; + + if (setsockopt(socket_fd_, SOL_SOCKET, SO_RCVTIMEO, + &udp_socket_timeout, sizeof(timeval)) == -1) { + throw runtime_error( + "Cannot set SO_RCVTIMEO. " + string(strerror(errno))); + } + + if (setsockopt(socket_fd_, SOL_SOCKET, SO_RCVBUF, + &BUFFER_UDP_RCVBUF_BYTES, sizeof(int)) == -1) { + throw runtime_error( + "Cannot set SO_RCVBUF. " + string(strerror(errno))); + }; + //TODO: try to set SO_RCVLOWAT + + auto bind_result = ::bind( + socket_fd_, + reinterpret_cast(&server_address), + sizeof(server_address)); + + if (bind_result < 0) { + throw runtime_error("Cannot bind socket."); + } +} + +int UdpReceiver::receive_many(mmsghdr* msgs, const size_t n_msgs) +{ + return recvmmsg(socket_fd_, msgs, n_msgs, 0, 0); +} + +bool UdpReceiver::receive(void* buffer, const size_t buffer_n_bytes) +{ + auto data_len = recv(socket_fd_, buffer, buffer_n_bytes, 0); + + if (data_len < 0) { + return false; + } + + if (data_len != buffer_n_bytes) { + return false; + } + + return true; +} + +void UdpReceiver::disconnect() +{ + close(socket_fd_); + socket_fd_ = -1; +} diff --git a/sf-buffer/src/WriterUtils.cpp b/sf-buffer/src/WriterUtils.cpp new file mode 100644 index 0000000..7653bee --- /dev/null +++ b/sf-buffer/src/WriterUtils.cpp @@ -0,0 +1,49 @@ +#include +#include + +#include "WriterUtils.hpp" +#include "date.h" + +using namespace std; + +void WriterUtils::set_process_effective_id(int user_id) +{ + + // TODO: use setfsuid and setfsgid + + if (setegid(user_id)) { + stringstream err_msg; + + using namespace date; + using namespace chrono; + err_msg << "[" << system_clock::now() << "]"; + err_msg << "[WriterUtils::set_process_effective_id]"; + err_msg << " Cannot set group_id to " << user_id << endl; + + throw runtime_error(err_msg.str()); + } + + if (seteuid(user_id)) { + stringstream err_msg; + + using namespace date; + using namespace chrono; + err_msg << "[" << system_clock::now() << "]"; + err_msg << "[WriterUtils::set_process_effective_id]"; + err_msg << " Cannot set user_id to " << user_id << endl; + + throw runtime_error(err_msg.str()); + } +} + +void WriterUtils::create_destination_folder(const string& output_file) +{ + auto file_separator_index = output_file.rfind('/'); + + if (file_separator_index != string::npos) { + string output_folder(output_file.substr(0, file_separator_index)); + + string create_folder_command("mkdir -p " + output_folder); + system(create_folder_command.c_str()); + } +} diff --git a/sf-buffer/src/main.cpp b/sf-buffer/src/main.cpp new file mode 100644 index 0000000..248e32a --- /dev/null +++ b/sf-buffer/src/main.cpp @@ -0,0 +1,150 @@ +#include +#include +#include +#include "zmq.h" +#include "buffer_config.hpp" +#include "jungfrau.hpp" +#include "BufferUdpReceiver.hpp" +#include +#include + +#include +#include +#include + +using namespace std; +using namespace core_buffer; + +int main (int argc, char *argv[]) { + if (argc != 5) { + cout << endl; + cout << "Usage: sf_buffer [device_name] [udp_port] [root_folder]"; + cout << "[source_id]"; + cout << endl; + cout << "\tdevice_name: Name to write to disk."; + cout << "\tudp_port: UDP port to connect to." << endl; + cout << "\troot_folder: FS root folder." << endl; + cout << "\tsource_id: ID of the source for live stream." << endl; + cout << endl; + + exit(-1); + } + + string device_name = string(argv[1]); + int udp_port = atoi(argv[2]); + string root_folder = string(argv[3]); + int source_id = atoi(argv[4]); + + stringstream ipc_stream; + ipc_stream << BUFFER_LIVE_IPC_URL << source_id; + const auto ipc_address = ipc_stream.str(); + + auto ctx = zmq_ctx_new(); + auto socket = zmq_socket(ctx, ZMQ_PUB); + + const int sndhwm = BUFFER_ZMQ_SNDHWM; + if (zmq_setsockopt(socket, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)) != 0) + throw runtime_error(zmq_strerror(errno)); + + const int linger_ms = 0; + if (zmq_setsockopt(socket, ZMQ_LINGER, &linger_ms, sizeof(linger_ms)) != 0) + throw runtime_error(zmq_strerror(errno)); + + if (zmq_bind(socket, ipc_address.c_str()) != 0) + throw runtime_error(zmq_strerror(errno)); + + uint64_t stats_counter(0); + uint64_t n_missed_packets = 0; + uint64_t n_missed_frames = 0; + uint64_t n_corrupted_frames = 0; + uint64_t last_pulse_id = 0; + + BufferH5Writer writer(root_folder, device_name); + BufferUdpReceiver receiver(udp_port, source_id); + + pid_t tid; + tid = syscall(SYS_gettid); + int ret = setpriority(PRIO_PROCESS, tid, 0); + if (ret == -1) throw runtime_error("cannot set nice"); + + ModuleFrame metadata; + auto frame_buffer = new char[MODULE_N_BYTES * JUNGFRAU_N_MODULES]; + + size_t write_total_us = 0; + size_t write_max_us = 0; + size_t send_total_us = 0; + size_t send_max_us = 0; + + while (true) { + + auto pulse_id = receiver.get_frame_from_udp(metadata, frame_buffer); + + auto start_time = chrono::steady_clock::now(); + + writer.set_pulse_id(pulse_id); + writer.write(&metadata, frame_buffer); + + auto write_end_time = chrono::steady_clock::now(); + auto write_us_duration = chrono::duration_cast( + write_end_time-start_time).count(); + + start_time = chrono::steady_clock::now(); + + zmq_send(socket, &metadata, sizeof(ModuleFrame), ZMQ_SNDMORE); + zmq_send(socket, frame_buffer, MODULE_N_BYTES, 0); + + auto send_end_time = chrono::steady_clock::now(); + auto send_us_duration = chrono::duration_cast( + send_end_time-start_time).count(); + + // TODO: Make real statistics, please. + stats_counter++; + write_total_us += write_us_duration; + send_total_us += send_us_duration; + + if (write_us_duration > write_max_us) { + write_max_us = write_us_duration; + } + + if (send_us_duration > send_max_us) { + send_max_us = send_us_duration; + } + + if (metadata.n_received_packets < JUNGFRAU_N_PACKETS_PER_FRAME) { + n_missed_packets += + JUNGFRAU_N_PACKETS_PER_FRAME - metadata.n_received_packets; + n_corrupted_frames++; + } + + if (last_pulse_id>0) { + n_missed_frames += (pulse_id - last_pulse_id) - 1; + } + last_pulse_id = pulse_id; + + if (stats_counter == STATS_MODULO) { + cout << "sf_buffer:device_name " << device_name; + cout << " sf_buffer:pulse_id " << pulse_id; + cout << " sf_buffer:n_missed_frames " << n_missed_frames; + cout << " sf_buffer:n_missed_packets " << n_missed_packets; + cout << " sf_buffer:n_corrupted_frames " << n_corrupted_frames; + + cout << " sf_buffer:write_total_us " << write_total_us/STATS_MODULO; + cout << " sf_buffer:write_max_us " << write_max_us; + cout << " sf_buffer:send_total_us " << send_total_us/STATS_MODULO; + cout << " sf_buffer:send_max_us " << send_max_us; + cout << endl; + + stats_counter = 0; + n_missed_packets = 0; + n_corrupted_frames = 0; + n_missed_frames = 0; + + write_total_us = 0; + write_max_us = 0; + send_total_us = 0; + send_max_us = 0; + } + } + + delete[] frame_buffer; +} diff --git a/sf-buffer/test/CMakeLists.txt b/sf-buffer/test/CMakeLists.txt new file mode 100644 index 0000000..4d6d7e5 --- /dev/null +++ b/sf-buffer/test/CMakeLists.txt @@ -0,0 +1,12 @@ +add_executable(sf-buffer-tests main.cpp) + +target_link_libraries(sf-buffer-tests + core-buffer-lib + sf-buffer-lib + external + hdf5 + hdf5_hl + hdf5_cpp + zmq + gtest) + diff --git a/sf-buffer/test/main.cpp b/sf-buffer/test/main.cpp new file mode 100644 index 0000000..052527e --- /dev/null +++ b/sf-buffer/test/main.cpp @@ -0,0 +1,12 @@ +#include "gtest/gtest.h" +#include "test_UdpReceiver.cpp" +#include "test_BufferBinaryWriter.cpp" +#include "test_BufferH5Writer.cpp" +#include "test_BufferUdpReceiver.cpp" + +using namespace std; + +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/sf-buffer/test/test_BufferBinaryWriter.cpp b/sf-buffer/test/test_BufferBinaryWriter.cpp new file mode 100644 index 0000000..0d86265 --- /dev/null +++ b/sf-buffer/test/test_BufferBinaryWriter.cpp @@ -0,0 +1,86 @@ +#include "BufferBinaryWriter.hpp" +#include "BufferUtils.hpp" +#include +#include "gtest/gtest.h" + +TEST(BinaryWriter, basic_interaction) +{ + auto root_folder = "."; + auto device_name = "test_device"; + uint64_t pulse_id = 5; + + BufferBinaryWriter writer(device_name, root_folder); + + BufferBinaryFormat frame_data; + frame_data.pulse_id = 1; + frame_data.frame_id = 2; + frame_data.daq_rec = 3; + frame_data.n_recv_packets = 4; + + ASSERT_EQ(frame_data.FORMAT_MARKER, JF_FORMAT_START_BYTE); + + writer.write(5, &frame_data); + + auto output_filename = + BufferUtils::get_filename(root_folder, device_name, pulse_id); + + auto read_fd = open(output_filename.c_str(), O_RDONLY); + ASSERT_NE(read_fd, -1); + + auto file_frame_index = BufferUtils::get_file_frame_index(pulse_id); + + BufferBinaryFormat read_data; + + ::lseek(read_fd, file_frame_index * sizeof(BufferBinaryFormat), SEEK_SET); + ::read(read_fd, &read_data, sizeof(BufferBinaryFormat)); + + ASSERT_EQ(frame_data.FORMAT_MARKER, JF_FORMAT_START_BYTE); + ASSERT_EQ(frame_data.FORMAT_MARKER, read_data.FORMAT_MARKER); + ASSERT_EQ(frame_data.pulse_id, read_data.pulse_id); + ASSERT_EQ(frame_data.frame_id, read_data.frame_id); + ASSERT_EQ(frame_data.daq_rec, read_data.daq_rec); + ASSERT_EQ(frame_data.n_recv_packets, read_data.n_recv_packets); +} + +TEST(BinaryWriter, test_format_marker) +{ + auto root_folder = "."; + auto device_name = "test_device"; + uint64_t pulse_id = 5; + + BufferBinaryWriter writer(device_name, root_folder); + + BufferBinaryFormat frame_data; + frame_data.pulse_id = 1; + frame_data.frame_id = 2; + frame_data.daq_rec = 3; + frame_data.n_recv_packets = 4; + + writer.write(5, &frame_data); + + auto output_filename = + BufferUtils::get_filename(root_folder, device_name, pulse_id); + + auto read_fd = open(output_filename.c_str(), O_RDONLY); + ASSERT_NE(read_fd, -1); + + auto file_frame_index = BufferUtils::get_file_frame_index(pulse_id); + + BufferBinaryFormat read_data; + + // One frame before should be empty. + ::lseek(read_fd, (file_frame_index-1) * sizeof(BufferBinaryFormat), SEEK_SET); + ::read(read_fd, &read_data, sizeof(BufferBinaryFormat)); + ASSERT_NE(read_data.FORMAT_MARKER, JF_FORMAT_START_BYTE); + + // One frame after should be empty as well. + ::lseek(read_fd, (file_frame_index+1) * sizeof(BufferBinaryFormat), SEEK_SET); + ::read(read_fd, &read_data, sizeof(BufferBinaryFormat)); + ASSERT_NE(read_data.FORMAT_MARKER, JF_FORMAT_START_BYTE); + + // But this frame should be here. + ::lseek(read_fd, (file_frame_index) * sizeof(BufferBinaryFormat), SEEK_SET); + ::read(read_fd, &read_data, sizeof(BufferBinaryFormat)); + ASSERT_EQ(read_data.FORMAT_MARKER, JF_FORMAT_START_BYTE); + +} \ No newline at end of file diff --git a/sf-buffer/test/test_BufferH5Writer.cpp b/sf-buffer/test/test_BufferH5Writer.cpp new file mode 100644 index 0000000..816b41c --- /dev/null +++ b/sf-buffer/test/test_BufferH5Writer.cpp @@ -0,0 +1,130 @@ +#include "BufferH5Writer.hpp" +#include "gtest/gtest.h" + +using namespace core_buffer; + +TEST(BufferH5Writer, basic_interaction) +{ + auto root_folder = "."; + auto device_name = "fast_device"; + size_t pulse_id = 1; + + auto buffer = make_unique(JUNGFRAU_DATA_BYTES_PER_FRAME); + + ModuleFrame metadata; + metadata.pulse_id = 1; + metadata.frame_index = 2; + metadata.daq_rec = 3; + metadata.n_received_packets = 128; + + BufferH5Writer writer(root_folder, device_name); + writer.set_pulse_id(pulse_id); + writer.write(&metadata, buffer.get()); + writer.close_file(); + + auto filename = BufferUtils::get_filename( + root_folder, device_name, pulse_id); + + auto file_frame_index = BufferUtils::get_file_frame_index(pulse_id); + + H5::H5File input_file(filename, H5F_ACC_RDONLY); + + auto image_dataset = input_file.openDataSet("image"); + size_t image_buffer_n_bytes = JUNGFRAU_DATA_BYTES_PER_FRAME * FILE_MOD; + auto image_buffer = make_unique(image_buffer_n_bytes); + image_dataset.read(image_buffer.get(), H5::PredType::NATIVE_UINT16); + + auto metadata_dataset = input_file.openDataSet("metadata"); + auto metadata_buffer = make_unique(FILE_MOD); + metadata_dataset.read(metadata_buffer.get(), H5::PredType::NATIVE_UINT64); + + EXPECT_EQ(metadata_buffer[file_frame_index].pulse_id, 1); + EXPECT_EQ(metadata_buffer[file_frame_index].frame_index, 2); + EXPECT_EQ(metadata_buffer[file_frame_index].daq_rec, 3); + EXPECT_EQ(metadata_buffer[file_frame_index].n_received_packets, 128); +} +// +//TEST(BufferH5Writer, SWMR) +//{ +// auto root_folder = "."; +// auto device_name = "fast_device"; +// size_t pulse_id = 0; +// +// auto i_write_buffer = make_unique(JUNGFRAU_DATA_BYTES_PER_FRAME); +// size_t image_buffer_n_bytes = JUNGFRAU_DATA_BYTES_PER_FRAME * FILE_MOD; +// auto i_read_buffer = make_unique(image_buffer_n_bytes); +// +// ModuleFrame m_write_buffer = {1, 2, 3, 4, 5}; +// auto m_read_buffer = make_unique(FILE_MOD); +// +// for (size_t i=0; i +#include +#include "gtest/gtest.h" +#include "BufferUdpReceiver.hpp" +#include "mock/udp.hpp" + +#include +#include +#include + +using namespace std; + +TEST(BufferUdpReceiver, simple_recv) +{ + auto n_packets = JUNGFRAU_N_PACKETS_PER_FRAME; + int source_id = 1234; + int n_frames = 5; + + uint16_t udp_port = MOCK_UDP_PORT; + auto server_address = get_server_address(udp_port); + auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); + ASSERT_TRUE(send_socket_fd >= 0); + + BufferUdpReceiver udp_receiver(udp_port, source_id); + + auto handle = async(launch::async, [&](){ + for (int i_frame=0; i_frame < n_frames; i_frame++){ + for (size_t i_packet=0; i_packet(JUNGFRAU_DATA_BYTES_PER_FRAME); + + for (int i_frame=0; i_frame < n_frames; i_frame++) { + auto pulse_id = udp_receiver.get_frame_from_udp( + metadata, frame_buffer.get()); + + ASSERT_EQ(i_frame + 1, pulse_id); + ASSERT_EQ(metadata.frame_index, i_frame + 1000); + ASSERT_EQ(metadata.daq_rec, i_frame + 10000); + // -1 because we skipped a packet. + ASSERT_EQ(metadata.n_received_packets, n_packets); + ASSERT_EQ(metadata.module_id, source_id); + } + + ::close(send_socket_fd); +} + +TEST(BufferUdpReceiver, missing_middle_packet) +{ + auto n_packets = JUNGFRAU_N_PACKETS_PER_FRAME; + int source_id = 1234; + int n_frames = 3; + + uint16_t udp_port = MOCK_UDP_PORT; + auto server_address = get_server_address(udp_port); + auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); + ASSERT_TRUE(send_socket_fd >= 0); + + BufferUdpReceiver udp_receiver(udp_port, source_id); + + auto handle = async(launch::async, [&](){ + for (int i_frame=0; i_frame < n_frames; i_frame++){ + for (size_t i_packet=0; i_packet(JUNGFRAU_DATA_BYTES_PER_FRAME); + + for (int i_frame=0; i_frame < n_frames; i_frame++) { + auto pulse_id = udp_receiver.get_frame_from_udp( + metadata, frame_buffer.get()); + + ASSERT_EQ(i_frame + 1, pulse_id); + ASSERT_EQ(metadata.frame_index, i_frame + 1000); + ASSERT_EQ(metadata.daq_rec, i_frame + 10000); + // -1 because we skipped a packet. + ASSERT_EQ(metadata.n_received_packets, n_packets-1); + ASSERT_EQ(metadata.module_id, source_id); + } + + ::close(send_socket_fd); +} + +TEST(BufferUdpReceiver, missing_first_packet) +{ + auto n_packets = JUNGFRAU_N_PACKETS_PER_FRAME; + int source_id = 1234; + int n_frames = 3; + + uint16_t udp_port = MOCK_UDP_PORT; + auto server_address = get_server_address(udp_port); + auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); + ASSERT_TRUE(send_socket_fd >= 0); + + BufferUdpReceiver udp_receiver(udp_port, source_id); + + auto handle = async(launch::async, [&](){ + for (int i_frame=0; i_frame < n_frames; i_frame++){ + for (size_t i_packet=0; i_packet(JUNGFRAU_DATA_BYTES_PER_FRAME); + + for (int i_frame=0; i_frame < n_frames; i_frame++) { + auto pulse_id = udp_receiver.get_frame_from_udp( + metadata, frame_buffer.get()); + + ASSERT_EQ(i_frame + 1, pulse_id); + ASSERT_EQ(metadata.frame_index, i_frame + 1000); + ASSERT_EQ(metadata.daq_rec, i_frame + 10000); + // -1 because we skipped a packet. + ASSERT_EQ(metadata.n_received_packets, n_packets-1); + ASSERT_EQ(metadata.module_id, source_id); + } + + ::close(send_socket_fd); +} + +TEST(BufferUdpReceiver, missing_last_packet) +{ + auto n_packets = JUNGFRAU_N_PACKETS_PER_FRAME; + int source_id = 1234; + int n_frames = 3; + + uint16_t udp_port = MOCK_UDP_PORT; + auto server_address = get_server_address(udp_port); + auto send_socket_fd = socket(AF_INET, SOCK_DGRAM, 0); + ASSERT_TRUE(send_socket_fd >= 0); + + BufferUdpReceiver udp_receiver(udp_port, source_id); + + auto handle = async(launch::async, [&](){ + for (int i_frame=0; i_frame < n_frames; i_frame++){ + for (size_t i_packet=0; i_packet(JUNGFRAU_DATA_BYTES_PER_FRAME); + + // n_frames -1 because the last frame is not complete. + for (int i_frame=0; i_frame < n_frames - 1; i_frame++) { + auto pulse_id = udp_receiver.get_frame_from_udp( + metadata, frame_buffer.get()); + + ASSERT_EQ(i_frame + 1, pulse_id); + ASSERT_EQ(metadata.frame_index, i_frame + 1000); + ASSERT_EQ(metadata.daq_rec, i_frame + 10000); + // -1 because we skipped a packet. + ASSERT_EQ(metadata.n_received_packets, n_packets-1); + ASSERT_EQ(metadata.module_id, source_id); + } + + ::close(send_socket_fd); +} \ No newline at end of file diff --git a/sf-buffer/test/test_UdpReceiver.cpp b/sf-buffer/test/test_UdpReceiver.cpp new file mode 100644 index 0000000..fd3a527 --- /dev/null +++ b/sf-buffer/test/test_UdpReceiver.cpp @@ -0,0 +1,170 @@ +#include +#include +#include "gtest/gtest.h" +#include "mock/udp.hpp" +#include "UdpReceiver.hpp" + +#include +#include + +using namespace std; + +TEST(UdpReceiver, simple_recv) +{ + uint16_t udp_port = MOCK_UDP_PORT; + + auto send_socket_fd = socket(AF_INET,SOCK_DGRAM,0); + ASSERT_TRUE(send_socket_fd >= 0); + + UdpReceiver udp_receiver; + udp_receiver.bind(udp_port); + + jungfrau_packet send_udp_buffer; + send_udp_buffer.packetnum = 91; + send_udp_buffer.framenum = 92; + send_udp_buffer.bunchid = 93; + send_udp_buffer.debug = 94; + + auto server_address = get_server_address(udp_port); + ::sendto( + send_socket_fd, + &send_udp_buffer, + JUNGFRAU_BYTES_PER_PACKET, + 0, + (sockaddr*) &server_address, + sizeof(server_address)); + + this_thread::sleep_for(chrono::milliseconds(100)); + + jungfrau_packet recv_udp_buffer; + ASSERT_TRUE(udp_receiver.receive( + &recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET)); + + EXPECT_EQ(send_udp_buffer.packetnum, recv_udp_buffer.packetnum); + EXPECT_EQ(send_udp_buffer.framenum, recv_udp_buffer.framenum); + EXPECT_EQ(send_udp_buffer.bunchid, recv_udp_buffer.bunchid); + EXPECT_EQ(send_udp_buffer.debug, recv_udp_buffer.debug); + + ASSERT_FALSE(udp_receiver.receive( + &recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET)); + + udp_receiver.disconnect(); + ::close(send_socket_fd); +} + +TEST(UdpReceiver, false_recv) +{ + uint16_t udp_port = MOCK_UDP_PORT; + + auto send_socket_fd = socket(AF_INET,SOCK_DGRAM,0); + ASSERT_TRUE(send_socket_fd >= 0); + + UdpReceiver udp_receiver; + udp_receiver.bind(udp_port); + + jungfrau_packet send_udp_buffer; + jungfrau_packet recv_udp_buffer; + + auto server_address = get_server_address(udp_port); + + ::sendto( + send_socket_fd, + &send_udp_buffer, + JUNGFRAU_BYTES_PER_PACKET-1, + 0, + (sockaddr*) &server_address, + sizeof(server_address)); + + ASSERT_FALSE(udp_receiver.receive( + &recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET)); + + ::sendto( + send_socket_fd, + &send_udp_buffer, + JUNGFRAU_BYTES_PER_PACKET, + 0, + (sockaddr*) &server_address, + sizeof(server_address)); + + ASSERT_TRUE(udp_receiver.receive( + &recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET)); + + ::sendto( + send_socket_fd, + &send_udp_buffer, + JUNGFRAU_BYTES_PER_PACKET-1, + 0, + (sockaddr*) &server_address, + sizeof(server_address)); + + ASSERT_TRUE(udp_receiver.receive( + &recv_udp_buffer, JUNGFRAU_BYTES_PER_PACKET-1)); + + udp_receiver.disconnect(); + ::close(send_socket_fd); +} + +TEST(UdpReceiver, receive_many) +{ + auto n_msg_buffer = JUNGFRAU_N_PACKETS_PER_FRAME; + jungfrau_packet recv_buffer[n_msg_buffer]; + iovec recv_buff_ptr[n_msg_buffer]; + struct mmsghdr msgs[n_msg_buffer]; + struct sockaddr_in sockFrom[n_msg_buffer]; + + for (int i = 0; i < n_msg_buffer; i++) { + recv_buff_ptr[i].iov_base = (void*) &(recv_buffer[i]); + recv_buff_ptr[i].iov_len = sizeof(jungfrau_packet); + + msgs[i].msg_hdr.msg_iov = &recv_buff_ptr[i]; + msgs[i].msg_hdr.msg_iovlen = 1; + msgs[i].msg_hdr.msg_name = &sockFrom[i]; + msgs[i].msg_hdr.msg_namelen = sizeof(sockaddr_in); + } + + uint16_t udp_port = MOCK_UDP_PORT; + + auto send_socket_fd = socket(AF_INET,SOCK_DGRAM,0); + ASSERT_TRUE(send_socket_fd >= 0); + + UdpReceiver udp_receiver; + udp_receiver.bind(udp_port); + + jungfrau_packet send_udp_buffer; + + auto server_address = get_server_address(udp_port); + + send_udp_buffer.bunchid = 0; + ::sendto( + send_socket_fd, + &send_udp_buffer, + JUNGFRAU_BYTES_PER_PACKET, + 0, + (sockaddr*) &server_address, + sizeof(server_address)); + + send_udp_buffer.bunchid = 1; + ::sendto( + send_socket_fd, + &send_udp_buffer, + JUNGFRAU_BYTES_PER_PACKET, + 0, + (sockaddr*) &server_address, + sizeof(server_address)); + + this_thread::sleep_for(chrono::milliseconds(10)); + + auto n_msgs = udp_receiver.receive_many(msgs, JUNGFRAU_N_PACKETS_PER_FRAME); + ASSERT_EQ(n_msgs, 2); + + for (size_t i=0;i