Cleanup sf_streamer

This commit is contained in:
2020-07-17 11:20:50 +02:00
parent 465369a322
commit c7185f249b
8 changed files with 1 additions and 492 deletions
-40
View File
@@ -1,40 +0,0 @@
#ifndef FASTQUEUE_HPP
#define FASTQUEUE_HPP
#include <cstddef>
#include <cstdint>
#include <atomic>
template <class T>
class FastQueue {
const size_t slot_n_bytes_;
const size_t n_slots_;
char* buffer_;
std::atomic_int* buffer_status_;
std::atomic_int write_slot_id_;
std::atomic_int read_slot_id_;
public:
enum SLOT_STATUS {
EMPTY=0,
RESERVED=1,
READY=2
};
FastQueue(const size_t slot_data_n_bytes, const uint16_t n_slots);
virtual ~FastQueue();
T* get_metadata_buffer(const int slot_id);
char* get_data_buffer(const int slot_id);
int reserve();
void commit();
int read();
void release();
};
#endif //FASTQUEUE_HPP
-34
View File
@@ -1,34 +0,0 @@
#ifndef SF_DAQ_BUFFER_LIVERECVMODULE_HPP
#define SF_DAQ_BUFFER_LIVERECVMODULE_HPP
#include <vector>
#include <thread>
#include "FastQueue.hpp"
#include "jungfrau.hpp"
#include "formats.hpp"
#include "ZmqLiveReceiver.hpp"
class LiveRecvModule {
FastQueue<ModuleFrameBuffer>& queue_;
ZmqLiveReceiver& receiver_;
std::atomic_bool is_receiving_;
std::thread receiving_thread_;
void receive_thread();
void stop();
public:
LiveRecvModule(
FastQueue<ModuleFrameBuffer>& queue,
ZmqLiveReceiver& receiver);
~LiveRecvModule();
};
#endif //SF_DAQ_BUFFER_LIVERECVMODULE_HPP
-107
View File
@@ -1,107 +0,0 @@
#include <stdexcept>
#include <formats.hpp>
#include <jungfrau.hpp>
#include "FastQueue.hpp"
using namespace std;
template <class T>
FastQueue<T>::FastQueue(
const size_t slot_data_n_bytes,
const uint16_t n_slots) :
slot_n_bytes_(slot_data_n_bytes + sizeof(T)),
n_slots_(n_slots)
{
buffer_ = new char[slot_n_bytes_ * n_slots_];
buffer_status_ = new atomic_int[n_slots];
for (size_t i=0; i < n_slots_; i++) {
buffer_status_[i] = 0;
}
write_slot_id_ = 0;
read_slot_id_ = 0;
}
template <class T>
FastQueue<T>::~FastQueue()
{
delete[] buffer_;
delete[] buffer_status_;
}
template<class T>
T* FastQueue<T>::get_metadata_buffer(const int slot_id)
{
return (T*)(buffer_ + (slot_id * slot_n_bytes_));
}
template<class T>
char* FastQueue<T>::get_data_buffer(const int slot_id)
{
return (char*)(buffer_ + (slot_id * slot_n_bytes_) + sizeof(T));
}
template<class T>
int FastQueue<T>::reserve()
{
int expected = SLOT_STATUS::EMPTY;
// If (buffer_status==SLOT_EMPTY) buffer_status=SLOT_RESERVED.
bool slot_reserved =
buffer_status_[write_slot_id_].compare_exchange_strong(
expected, SLOT_STATUS::RESERVED);
if (!slot_reserved) {
return -1;
}
return write_slot_id_;
}
template<class T>
void FastQueue<T>::commit()
{
int expected = SLOT_STATUS::RESERVED;
// If (buffer_status==SLOT_RESERVED) buffer_status=SLOT_READY.
bool slot_ready =
buffer_status_[write_slot_id_].compare_exchange_strong(
expected, SLOT_STATUS::READY);
if (!slot_ready) {
throw runtime_error("Slot should be reserved first.");
}
write_slot_id_++;
write_slot_id_ = write_slot_id_ % n_slots_;
}
template<class T>
int FastQueue<T>::read()
{
if (buffer_status_[read_slot_id_] != SLOT_STATUS::READY) {
return -1;
}
return read_slot_id_;
}
template<class T>
void FastQueue<T>::release()
{
int expected = SLOT_STATUS::READY;
// If (buffer_status==SLOT_RESERVED) buffer_status=SLOT_READY.
bool slot_empty =
buffer_status_[read_slot_id_].compare_exchange_strong(
expected, SLOT_STATUS::EMPTY);
if (!slot_empty) {
throw runtime_error("Slot should be ready first.");
}
read_slot_id_++;
read_slot_id_ = read_slot_id_ % n_slots_;
}
template class FastQueue<BufferBinaryBlock>;
template class FastQueue<ModuleFrame>;
template class FastQueue<ModuleFrameBuffer>;
-71
View File
@@ -1,71 +0,0 @@
#include "LiveRecvModule.hpp"
#include <iostream>
#include <date.h>
#include "buffer_config.hpp"
#include "stream_config.hpp"
using namespace std;
using namespace chrono;
using namespace buffer_config;
using namespace stream_config;
LiveRecvModule::LiveRecvModule(
FastQueue<ModuleFrameBuffer>& queue_,
ZmqLiveReceiver& receiver) :
queue_(queue_),
receiver_(receiver),
is_receiving_(true)
{
receiving_thread_ = thread(&LiveRecvModule::receive_thread, this);
}
LiveRecvModule::~LiveRecvModule()
{
stop();
}
void LiveRecvModule::stop()
{
is_receiving_ = false;
receiving_thread_.join();
}
void LiveRecvModule::receive_thread()
{
try {
int slot_id;
while(is_receiving_.load(memory_order_relaxed)) {
while ((slot_id == queue_.reserve()) == -1) {
this_thread::sleep_for(milliseconds(RB_READ_RETRY_INTERVAL_MS));
}
auto meta = queue_.get_metadata_buffer(slot_id);
auto data = queue_.get_data_buffer(slot_id);
auto n_lost_pulses = receiver_.get_next_image(meta, data);
if (n_lost_pulses > 0) {
cout << "sf_stream:sync_lost_pulses " << n_lost_pulses << endl;
}
queue_.commit();
}
} catch (const std::exception& e) {
is_receiving_ = false;
using namespace date;
using namespace chrono;
cout << "[" << system_clock::now() << "]";
cout << "[LiveRecvModule::receive_thread]";
cout << " Stopped because of exception: " << endl;
cout << e.what() << endl;
throw;
}
}
+1 -1
View File
@@ -4,10 +4,10 @@
#include <cstring>
#include <zmq.h>
#include "LiveRecvModule.hpp"
#include "buffer_config.hpp"
#include "stream_config.hpp"
#include "ZmqLiveSender.hpp"
#include "ZmqLiveReceiver.hpp"
using namespace std;
using namespace chrono;
-2
View File
@@ -1,6 +1,4 @@
#include "gtest/gtest.h"
#include "test_LiveRecvModule.cpp"
#include "test_FastQueue.cpp"
using namespace std;
-150
View File
@@ -1,150 +0,0 @@
//#include "FastQueue.hpp"
//#include "formats.hpp"
//#include "gtest/gtest.h"
//
//using namespace buffer_config;
//
//TEST(FastQueue, basic_interaction)
//{
// size_t n_slots = 5;
// size_t slot_data_n_bytes = MODULE_N_BYTES * 2;
// FastQueue<ImageMetadata> queue(slot_data_n_bytes, n_slots);
// int slot_id;
//
// // The queue at the beginning should be empty.
// ASSERT_EQ(queue.read(), -1);
// // Cannot commit a slot until you reserve it.
// ASSERT_THROW(queue.commit(), runtime_error);
// // Cannot release a slot until its ready.
// ASSERT_THROW(queue.release(), runtime_error);
//
// // Reserve a slot.
// slot_id = queue.reserve();
// ASSERT_NE(slot_id, -1);
// // But you cannot reserve 2 slots at once.
// ASSERT_EQ(queue.reserve(), -1);
// // And cannot read this slot until its committed.
// ASSERT_EQ(queue.read(), -1);
//
// auto detector_frame = queue.get_metadata_buffer(slot_id);
// char* meta_ptr = (char*) detector_frame;
// char* data_ptr = (char*) queue.get_data_buffer(slot_id);
//
// queue.commit();
//
// slot_id = queue.read();
// // Once the slot is committed we should be able to read it.
// ASSERT_NE(slot_id, -1);
// // You can read the same slot multiple times.
// ASSERT_NE(queue.read(), -1);
// // The 2 buffers should match the committed slot.
// ASSERT_EQ(meta_ptr, (char*)(queue.get_metadata_buffer(slot_id)));
// ASSERT_EQ(data_ptr, (char*)(queue.get_data_buffer(slot_id)));
//
// queue.release();
//}
//
//TEST(FastQueue, queue_full)
//{
// size_t n_slots = 5;
// size_t slot_data_n_bytes = MODULE_N_BYTES * 2;
// FastQueue<ImageMetadata> queue(slot_data_n_bytes, n_slots);
//
// // There is nothing to be read in the queue.
// ASSERT_EQ(queue.read(), -1);
//
// for (size_t i=0; i<n_slots; i++) {
// // Business as usual here, we still have slots left.
// ASSERT_NE(queue.reserve(), -1);
// queue.commit();
// }
//
// // There are no more slots available.
// ASSERT_EQ(queue.reserve(), -1);
// // We now read the first slot.
// ASSERT_EQ(queue.read(), 0);
// // But until we release it we cannot re-use it.
// ASSERT_EQ(queue.reserve(), -1);
//
// queue.release();
// // After the release, the first slot is again ready for writing.
// ASSERT_EQ(queue.reserve(), 0);
//}
//
//TEST(FastQueue, data_transfer)
//{
// size_t n_slots = 5;
// size_t slot_data_n_bytes = MODULE_N_BYTES * 2;
// FastQueue<ImageMetadata> queue(slot_data_n_bytes, n_slots);
//
// int write_slot_id = queue.reserve();
//
// auto w_metadata = queue.get_metadata_buffer(write_slot_id);
// w_metadata->pulse_id = 1;
// w_metadata->frame_index = 2;
// w_metadata->daq_rec = 3;
// w_metadata->is_good_frame = 4;
//
// auto w_data = (uint16_t*)(queue.get_data_buffer(write_slot_id));
// for (size_t i=0; i<MODULE_N_PIXELS; i++) {
// w_data[i] = (uint16_t) i;
// }
//
// queue.commit();
//
// auto read_slot_id = queue.read();
//
// auto r_metadata = queue.get_metadata_buffer(read_slot_id);
// EXPECT_EQ(w_metadata->pulse_id,
// r_metadata->pulse_id);
// EXPECT_EQ(w_metadata->frame_index,
// r_metadata->frame_index);
// EXPECT_EQ(w_metadata->daq_rec,
// r_metadata->daq_rec);
// EXPECT_EQ(w_metadata->is_good_frame,
// r_metadata->is_good_frame);
//
// auto r_data = (uint16_t*)(queue.get_data_buffer(read_slot_id));
// for (size_t i=0; i<MODULE_N_PIXELS; i++) {
// ASSERT_EQ(r_data[i], (uint16_t) i);
// }
//}
//
//TEST(FaseQueue, array_parameter)
//{
// size_t n_modules = 32;
// FastQueue<ModuleFrameBuffer> queue(
// n_modules * MODULE_N_BYTES,
// WRITER_FASTQUEUE_N_SLOTS);
//
// ModuleFrame frame;
//
// auto slot_id = queue.reserve();
// auto metadata = queue.get_metadata_buffer(slot_id);
//
// for (int i_module=0; i_module<n_modules; i_module++) {
// auto& module_metadata = metadata->module[i_module];
//
// frame.pulse_id = i_module;
// frame.frame_index = i_module;
// frame.daq_rec = i_module;
// frame.n_recv_packets = i_module;
// frame.module_id = i_module;
//
// ModuleFrame* p_metadata = &module_metadata;
//
// memcpy(p_metadata, &frame, sizeof(ModuleFrame));
// }
//
// for (int i_module=0; i_module<n_modules; i_module++) {
// auto& module_metadata = metadata->module[i_module];
//
// ASSERT_EQ(module_metadata.pulse_id, i_module);
// ASSERT_EQ(module_metadata.frame_index, i_module);
// ASSERT_EQ(module_metadata.daq_rec, i_module);
// ASSERT_EQ(module_metadata.n_recv_packets, i_module);
// ASSERT_EQ(module_metadata.module_id, i_module);
// }
//}
//
//// TODO: Test with payload of zero (metadata only).
-87
View File
@@ -1,87 +0,0 @@
#include <zmq.h>
#include "LiveRecvModule.hpp"
#include "gtest/gtest.h"
#include "buffer_config.hpp"
#include <future>
using namespace std;
using namespace buffer_config;
TEST(LiveRecvModule, transfer_test) {
// TODO: Make this test work again.
// auto ctx = zmq_ctx_new();
//
// size_t n_modules = 32;
// size_t n_slots = 5;
// FastQueue<ModuleFrameBuffer> queue(MODULE_N_BYTES * n_modules, n_slots);
//
// void *sockets[n_modules];
// for (size_t i = 0; i < n_modules; i++) {
// sockets[i] = zmq_socket(ctx, ZMQ_PUB);
//
// int linger = 0;
// if (zmq_setsockopt(sockets[i], ZMQ_LINGER, &linger,
// sizeof(linger)) != 0) {
// throw runtime_error(zmq_strerror(errno));
// }
//
// stringstream ipc_addr;
// ipc_addr << BUFFER_LIVE_IPC_URL << i;
// const auto ipc = ipc_addr.str();
//
// if (zmq_bind(sockets[i], ipc.c_str()) != 0) {
// throw runtime_error(zmq_strerror(errno));
// }
// }
//
// LiveRecvModule recv_module(queue, n_modules, ctx, BUFFER_LIVE_IPC_URL);
//
// // Nothing should be committed, queue, should be empty.
// ASSERT_EQ(queue.read(), -1);
//
// ModuleFrame metadata;
// auto data = make_unique<char[]>(MODULE_N_BYTES);
//
// for (size_t i = 0; i < n_modules; i++) {
// metadata.pulse_id = 1;
// metadata.frame_index = 2;
// metadata.daq_rec = 3;
// metadata.n_received_packets = 4;
// metadata.module_id = i;
//
// zmq_send(sockets[i], &metadata, sizeof(ModuleFrame), ZMQ_SNDMORE);
// zmq_send(sockets[i], data.get(), MODULE_N_BYTES, 0);
// }
//
// this_thread::sleep_for(chrono::milliseconds(100));
//
// auto slot_id = queue.read();
// // We should have the first Detector frame in the buffer.
// //ASSERT_NE(slot_id, -1);
//
// auto recv_stopped = async(launch::async, [&](){
// recv_module.stop();
// });
//
// this_thread::sleep_for(chrono::milliseconds(100));
//
// for (size_t i = 0; i < n_modules; i++) {
// metadata.pulse_id = 1;
// metadata.frame_index = 2;
// metadata.daq_rec = 3;
// metadata.n_received_packets = 4;
// metadata.module_id = i;
//
// zmq_send(sockets[i], &metadata, sizeof(ModuleFrame), ZMQ_SNDMORE);
// zmq_send(sockets[i], data.get(), MODULE_N_BYTES, 0);
// }
//
// recv_stopped.wait();
//
// for (size_t i = 0; i < n_modules; i++) {
// zmq_close(sockets[i]);
// }
//
// zmq_ctx_destroy(ctx);
// cout << "We are finished" << endl;
}