Major project refactoring WIP

This commit is contained in:
2020-05-20 11:45:34 +02:00
parent 4e7eefcdc6
commit b45b7d17fa
45 changed files with 48 additions and 3868 deletions
@@ -1,23 +0,0 @@
#ifndef JFFILEFORMAT_HPP
#define JFFILEFORMAT_HPP
#include "jungfrau.hpp"
const char JF_FORMAT_START_BYTE = 0xBE;
#pragma pack(push)
#pragma pack(1)
struct BufferBinaryFormat {
BufferBinaryFormat() : FORMAT_MARKER(JF_FORMAT_START_BYTE) {};
const char FORMAT_MARKER;
uint64_t pulse_id;
uint64_t frame_id;
uint32_t daq_rec;
uint16_t n_recv_packets;
char data[JUNGFRAU_DATA_BYTES_PER_FRAME];
};
#pragma pack(pop)
#endif // JFFILEFORMAT_HPP
@@ -1,32 +0,0 @@
#ifndef BINARYWRITER_HPP
#define BINARYWRITER_HPP
#include <string>
#include "BufferBinaryFormat.hpp"
class BufferBinaryWriter {
const std::string device_name_;
const std::string root_folder_;
std::string latest_filename_;
std::string current_output_filename_;
int output_file_fd_;
void open_file(const std::string& filename);
void close_current_file();
public:
BufferBinaryWriter(
const std::string& device_name,
const std::string& root_folder);
virtual ~BufferBinaryWriter();
void write(const uint64_t pulse_id, const BufferBinaryFormat* buffer);
};
#endif //BINARYWRITER_HPP
-58
View File
@@ -1,58 +0,0 @@
#ifndef FASTH5WRITER_HPP
#define FASTH5WRITER_HPP
#include <cstdint>
#include <vector>
#include <string>
#include <H5Cpp.h>
#include <memory>
#include "jungfrau.hpp"
#include <unordered_map>
#include <buffer_config.hpp>
class BufferH5Writer {
const hsize_t meta_disk_dims[2] = {
core_buffer::FILE_MOD,
ModuleFrame_N_FIELDS
};
const hsize_t data_disk_dims[3] = {
core_buffer::FILE_MOD,
core_buffer::MODULE_Y_SIZE,
core_buffer::MODULE_X_SIZE
};
const std::string root_folder_;
const std::string device_name_;
const std::string LATEST_filename_;
const std::string CURRENT_filename_;
std::string output_filename_;
H5::H5File h5_file_;
H5::DataSet current_image_dataset_;
H5::DataSet current_metadata_dataset_;
uint64_t current_pulse_id_;
size_t current_file_index_;
void create_file(const std::string& filename);
public:
BufferH5Writer(
const std::string& root_folder,
const std::string& device_name);
virtual ~BufferH5Writer();
void set_pulse_id(const uint64_t pulse_id);
void write(const ModuleFrame* metadata, const char* data);
void close_file();
};
#endif //FASTH5WRITER_HPP
-36
View File
@@ -1,36 +0,0 @@
#ifndef SF_DAQ_BUFFER_BUFFERUDPRECEIVER_HPP
#define SF_DAQ_BUFFER_BUFFERUDPRECEIVER_HPP
#include <netinet/in.h>
#include "UdpReceiver.hpp"
#include "jungfrau.hpp"
#include "buffer_config.hpp"
class BufferUdpReceiver {
const int source_id_;
UdpReceiver udp_receiver_;
jungfrau_packet packet_buffer_[core_buffer::BUFFER_UDP_N_RECV_MSG];
iovec recv_buff_ptr_[core_buffer::BUFFER_UDP_N_RECV_MSG];
mmsghdr msgs_[core_buffer::BUFFER_UDP_N_RECV_MSG];
sockaddr_in sock_from_[core_buffer::BUFFER_UDP_N_RECV_MSG];
bool packet_buffer_loaded_ = false;
int packet_buffer_n_packets_ = 0;
int packet_buffer_offset_ = 0;
inline void init_frame(ModuleFrame& frame_metadata, const int i_packet);
inline void copy_packet_to_buffers(
ModuleFrame& metadata, char* frame_buffer, const int i_packet);
inline uint64_t process_packets(
const int n_packets, ModuleFrame& metadata, char* frame_buffer);
public:
BufferUdpReceiver(const uint16_t port, const int source_id);
virtual ~BufferUdpReceiver();
uint64_t get_frame_from_udp(ModuleFrame& metadata, char* frame_buffer);
};
#endif //SF_DAQ_BUFFER_BUFFERUDPRECEIVER_HPP
-32
View File
@@ -1,32 +0,0 @@
#ifndef SF_DAQ_BUFFER_BUFFEREDFASTQUEUE_HPP
#define SF_DAQ_BUFFER_BUFFEREDFASTQUEUE_HPP
#include "FastQueue.hpp"
#include "WriterH5Writer.hpp"
class BufferedFastQueue {
FastQueue<ImageMetadataBuffer>& queue_;
const size_t buffer_n_pulses_;
const size_t n_modules_;
ImageMetadataBuffer* queue_meta_buffer_ = nullptr;
char* queue_data_buffer_ = nullptr;
int current_slot_id_ = -1;
ImageMetadata image_metadata_;
public:
BufferedFastQueue(FastQueue<ImageMetadataBuffer>& queue,
const size_t buffer_n_pulses,
const size_t n_modules);
ImageMetadata* get_metadata_buffer();
char* get_data_buffer();
void commit();
void finalize();
};
#endif //SF_DAQ_BUFFER_BUFFEREDFASTQUEUE_HPP
-40
View File
@@ -1,40 +0,0 @@
#ifndef FASTQUEUE_HPP
#define FASTQUEUE_HPP
#include <cstddef>
#include <cstdint>
#include <atomic>
template <class T>
class FastQueue {
const size_t slot_n_bytes_;
const size_t n_slots_;
char* buffer_;
std::atomic_int* buffer_status_;
uint16_t write_slot_id_;
uint16_t read_slot_id_;
public:
enum SLOT_STATUS {
EMPTY=0,
RESERVED=1,
READY=2
};
FastQueue(const size_t slot_data_n_bytes, const uint16_t n_slots);
virtual ~FastQueue();
T* get_metadata_buffer(const int slot_id);
char* get_data_buffer(const int slot_id);
int reserve();
void commit();
int read();
void release();
};
#endif //FASTQUEUE_HPP
-47
View File
@@ -1,47 +0,0 @@
#ifndef SF_DAQ_BUFFER_LIVEH5READER_HPP
#define SF_DAQ_BUFFER_LIVEH5READER_HPP
#include <string>
#include <memory>
#include "jungfrau.hpp"
#include "buffer_config.hpp"
#include <H5Cpp.h>
class LiveH5Reader {
const std::string current_filename_;
const uint16_t source_id_;
std::unique_ptr<uint64_t[]> pulse_id_buffer_;
std::unique_ptr<uint16_t[]> data_buffer_;
uint64_t current_file_max_pulse_id_;
H5::H5File file_;
H5::DataSet image_dataset_;
H5::DataSet pulse_id_dataset_;
H5::DataSet frame_index_dataset_;
H5::DataSet daq_rec_dataset_;
H5::DataSet n_received_packets_dataset_;
void open_file();
public:
LiveH5Reader(
const std::string& device,
const std::string& channel_name,
const uint16_t source_id);
~LiveH5Reader();
uint64_t get_latest_pulse_id();
void load_pulse_id(uint64_t pulse_id);
ModuleFrame get_metadata();
char* get_data();
void close_file();
};
#endif //SF_DAQ_BUFFER_LIVEH5READER_HPP
-39
View File
@@ -1,39 +0,0 @@
#ifndef SF_DAQ_BUFFER_LIVERECVMODULE_HPP
#define SF_DAQ_BUFFER_LIVERECVMODULE_HPP
#include "FastQueue.hpp"
#include <thread>
#include "jungfrau.hpp"
#include <vector>
class LiveRecvModule {
FastQueue<ModuleFrameBuffer>& queue_;
const size_t n_modules_;
void* ctx_;
const std::string ipc_prefix_;
std::atomic_bool is_receiving_;
std::thread receiving_thread_;
public:
LiveRecvModule(
FastQueue<ModuleFrameBuffer>& queue,
const size_t n_modules,
void* ctx,
const std::string& ipc_prefix);
virtual ~LiveRecvModule();
void* connect_socket(size_t module_id);
void recv_single_module(void* socket, ModuleFrame* metadata, char* data);
void receive_thread();
uint64_t align_modules(
const std::vector<void*>& sockets,
ModuleFrameBuffer *metadata,
char *data);
void stop();
};
#endif //SF_DAQ_BUFFER_LIVERECVMODULE_HPP
-39
View File
@@ -1,39 +0,0 @@
#ifndef SF_DAQ_BUFFER_REPLAYH5READER_HPP
#define SF_DAQ_BUFFER_REPLAYH5READER_HPP
#include <string>
#include "jungfrau.hpp"
#include <H5Cpp.h>
#include <memory>
#include "buffer_config.hpp"
class ReplayH5Reader {
const std::string device_;
const std::string channel_name_;
H5::H5File current_file_;
std::string current_filename_;
H5::DataSet dset_metadata_;
H5::DataSet dset_frame_;
std::unique_ptr<char[]> frame_buffer_ = std::make_unique<char[]>(
core_buffer::MODULE_N_BYTES * core_buffer::REPLAY_READ_BUFFER_SIZE);
std::unique_ptr<ModuleFrame[]> metadata_buffer_ =
std::make_unique<ModuleFrame[]>(core_buffer::FILE_MOD);
uint64_t buffer_start_pulse_id_ = 0;
uint64_t buffer_end_pulse_id_ = 0;
void prepare_buffer_for_pulse(const uint64_t pulse_id);
public:
ReplayH5Reader(const std::string device, const std::string channel_name);
virtual ~ReplayH5Reader();
void close_file();
bool get_frame(
const uint64_t pulse_id, ModuleFrame* metadata, char* frame_buffer);
};
#endif //SF_DAQ_BUFFER_REPLAYH5READER_HPP
-81
View File
@@ -1,81 +0,0 @@
#ifndef RINGBUFFER_H
#define RINGBUFFER_H
#include <list>
#include <vector>
#include <map>
#include <mutex>
#include <atomic>
#include <memory>
#include <string>
#include <boost/any.hpp>
#include <chrono>
#include "date.h"
struct FrameMetadata
{
// Ring buffer needed data.
size_t buffer_slot_index;
size_t frame_bytes_size;
// Image header data.
uint64_t frame_index;
std::string endianness;
std::string type;
std::vector<size_t> frame_shape;
// Pass additional header values.
std::map<std::string, std::shared_ptr<char>> header_values;
};
struct UdpFrameMetadata
{
// Ring buffer needed data.
size_t buffer_slot_index;
size_t frame_bytes_size;
uint64_t pulse_id;
uint64_t frame_index;
uint32_t daq_rec;
uint16_t n_recv_packets;
};
template <class T>
class RingBuffer
{
// Initialized in constructor.
size_t n_slots_ = 0;
std::vector<bool> ringbuffer_slots_;
// Set in initialize().
size_t slot_size_ = 0;
size_t buffer_size_ = 0;
char* frame_data_buffer_ = NULL;
size_t write_index_ = 0;
size_t buffer_used_slots_ = 0;
std::atomic_bool initialized_ = false;
std::list< std::shared_ptr<T> > frame_metadata_queue_;
std::mutex frame_metadata_queue_mutex_;
std::mutex ringbuffer_slots_mutex_;
char* get_buffer_slot_address(size_t buffer_slot_index);
public:
RingBuffer(size_t n_slots);
virtual ~RingBuffer();
void initialize(size_t slot_size);
char* reserve(std::shared_ptr<T> metadata);
void commit(std::shared_ptr<T> metadata);
std::pair<std::shared_ptr<T>, char*> read();
void release(size_t buffer_slot_index);
bool is_empty();
bool is_initialized();
void clear();
size_t get_slot_size();
};
#endif
-22
View File
@@ -1,22 +0,0 @@
#ifndef UDPRECEIVER_H
#define UDPRECEIVER_H
#include <sys/socket.h>
class UdpReceiver {
int socket_fd_;
public:
UdpReceiver();
virtual ~UdpReceiver();
bool receive(void* buffer, const size_t buffer_n_bytes);
int receive_many(mmsghdr* msgs, const size_t n_msgs);
void bind(const uint16_t port);
void disconnect();
};
#endif //LIB_CPP_H5_WRITER_UDPRECEIVER_H
-33
View File
@@ -1,33 +0,0 @@
#ifndef UDPRECVMODULE_HPP
#define UDPRECVMODULE_HPP
#include "RingBuffer.hpp"
#include "FastQueue.hpp"
#include "jungfrau.hpp"
#include <thread>
class UdpRecvModule {
FastQueue<ModuleFrame>& queue_;
std::thread receiving_thread_;
std::atomic_bool is_receiving_;
inline void init_frame(
ModuleFrame* frame_metadata,
jungfrau_packet& packet_buffer);
inline void reserve_next_frame_buffers(
ModuleFrame*& frame_metadata,
char*& frame_buffer);
protected:
void receive_thread(const uint16_t udp_port);
public:
UdpRecvModule(FastQueue<ModuleFrame>& queue, const uint16_t udp_port);
virtual ~UdpRecvModule();
};
#endif // UDPRECVMODULE_HPP
-53
View File
@@ -1,53 +0,0 @@
#ifndef SFWRITER_HPP
#define SFWRITER_HPP
#include <memory>
#include <string>
#include <H5Cpp.h>
#include "buffer_config.hpp"
struct ImageMetadataBuffer
{
uint64_t pulse_id[core_buffer::WRITER_DATA_CACHE_N_IMAGES];
uint64_t frame_index[core_buffer::WRITER_DATA_CACHE_N_IMAGES];
uint32_t daq_rec[core_buffer::WRITER_DATA_CACHE_N_IMAGES];
uint8_t is_good_frame[core_buffer::WRITER_DATA_CACHE_N_IMAGES];
uint64_t data_n_bytes[core_buffer::WRITER_DATA_CACHE_N_IMAGES];
uint16_t n_pulses_in_buffer;
};
struct ImageMetadata
{
uint64_t pulse_id;
uint64_t frame_index;
uint32_t daq_rec;
uint8_t is_good_frame;
uint64_t data_n_bytes;
};
class WriterH5Writer {
const size_t n_frames_;
const size_t n_modules_;
size_t current_write_index_;
H5::H5File file_;
H5::DataSet image_dataset_;
H5::DataSet pulse_id_dataset_;
H5::DataSet frame_index_dataset_;
H5::DataSet daq_rec_dataset_;
H5::DataSet is_good_frame_dataset_;
public:
WriterH5Writer(const std::string& output_file,
const size_t n_frames,
const size_t n_modules);
~WriterH5Writer();
void write(const ImageMetadataBuffer* metadata, const char* data);
void close_file();
};
#endif //SFWRITER_HPP
-11
View File
@@ -1,11 +0,0 @@
#ifndef WRITERUTILS_H
#define WRITERUTILS_H
#include <string>
namespace WriterUtils {
void set_process_effective_id(int user_id);
void create_destination_folder(const std::string& output_file);
}
#endif // WRITERUTILS_H
-31
View File
@@ -1,31 +0,0 @@
#ifndef SF_DAQ_BUFFER_WRITERZMQRECEIVER_HPP
#define SF_DAQ_BUFFER_WRITERZMQRECEIVER_HPP
#include <string>
#include "WriterH5Writer.hpp"
#include <vector>
class WriterZmqReceiver {
const size_t n_modules_;
std::vector<void*> sockets_;
StreamModuleFrame frame_metadata;
public:
WriterZmqReceiver(
void *ctx,
const std::string& ipc_prefix,
const size_t n_modules);
virtual ~WriterZmqReceiver();
void get_next_image(
const uint64_t pulse_id,
ImageMetadata* image_metadata,
char* image_buffer);
};
#endif //SF_DAQ_BUFFER_WRITERZMQRECEIVER_HPP