mirror of
https://github.com/paulscherrerinstitute/sf_daq_buffer.git
synced 2026-04-28 05:32:23 +02:00
WIP
This commit is contained in:
+42
-23
@@ -12,13 +12,25 @@
|
||||
#include "ProcessManager.hpp"
|
||||
#include "config.hpp"
|
||||
#include "BufferedWriter.hpp"
|
||||
#include "compression/compression.h"
|
||||
|
||||
using namespace std;
|
||||
|
||||
ProcessManager::ProcessManager(WriterManager& writer_manager, ZmqReceiver& receiver, RingBuffer& ring_buffer,
|
||||
const H5Format& format, uint16_t rest_port, const string& bsread_rest_address, hsize_t frames_per_file) :
|
||||
writer_manager(writer_manager), receiver(receiver), ring_buffer(ring_buffer), format(format), rest_port(rest_port),
|
||||
bsread_rest_address(bsread_rest_address), frames_per_file(frames_per_file)
|
||||
ProcessManager::ProcessManager(
|
||||
WriterManager& writer_manager,
|
||||
ZmqReceiver& receiver,
|
||||
RingBuffer& ring_buffer,
|
||||
const H5Format& format,
|
||||
uint16_t rest_port,
|
||||
const string& bsread_rest_address,
|
||||
hsize_t frames_per_file) :
|
||||
writer_manager(writer_manager),
|
||||
receiver(receiver),
|
||||
ring_buffer(ring_buffer),
|
||||
format(format),
|
||||
rest_port(rest_port),
|
||||
bsread_rest_address(bsread_rest_address),
|
||||
frames_per_file(frames_per_file)
|
||||
{
|
||||
}
|
||||
|
||||
@@ -26,31 +38,13 @@ ProcessManager::ProcessManager(WriterManager& writer_manager, ZmqReceiver& recei
|
||||
void ProcessManager::run_receivers(uint8_t n_receiving_threads)
|
||||
{
|
||||
|
||||
#ifdef DEBUG_OUTPUT
|
||||
using namespace date;
|
||||
cout << "[" << std::chrono::system_clock::now() << "]";
|
||||
cout << "[ProcessManager::run_writer] Running writer";
|
||||
cout << " with n_receiving_threads " << n_receiving_threads;
|
||||
cout << endl;
|
||||
#endif
|
||||
|
||||
boost::thread_group receivers;
|
||||
for (uint8_t i=0; i<n_receiving_threads; i++) {
|
||||
receivers.add_thread(new boost::thread(&ProcessManager::receive_zmq, this));
|
||||
}
|
||||
|
||||
RestApi::start_rest_api(writer_manager, rest_port);
|
||||
|
||||
#ifdef DEBUG_OUTPU
|
||||
using namespace date;
|
||||
cout << "[" << std::chrono::system_clock::now() << "]";
|
||||
cout << "[ProcessManager::run_writer] Rest API stopped." << endl;
|
||||
#endif
|
||||
|
||||
// In case SIGINT stopped the rest_api.
|
||||
writer_manager.stop();
|
||||
|
||||
receivers.join_all();
|
||||
|
||||
#ifdef DEBUG_OUTPUT
|
||||
using namespace date;
|
||||
@@ -87,7 +81,32 @@ void ProcessManager::receive_zmq()
|
||||
cout << "." << endl;
|
||||
#endif
|
||||
|
||||
ring_buffer.write(frame_metadata, frame_data);
|
||||
char* buffer = ring_buffer.reserve(frame_metadata);
|
||||
|
||||
size_t max_buffer_size = compression::get_bitshuffle_max_buffer_size(
|
||||
frame_metadata->frame_bytes_size, 1);
|
||||
|
||||
if (max_buffer_size > ring_buffer.get_slot_size()) {
|
||||
|
||||
}
|
||||
|
||||
auto compressed_size = compression::compress_bitshuffle(
|
||||
static_cast<const char*>(frame_data),
|
||||
frame_metadata->frame_bytes_size,
|
||||
1,
|
||||
buffer);
|
||||
|
||||
#ifdef DEBUG_OUTPUT
|
||||
using namespace date;
|
||||
cout << "[" << std::chrono::system_clock::now() << "]";
|
||||
cout << "[ProcessManager::receive_zmq] Compressed image from ";
|
||||
cout << frame_metadata->frame_bytes_size << " bytes to ";
|
||||
cout << compressed_size << " bytes." << endl;
|
||||
#endif
|
||||
|
||||
frame_metadata->frame_bytes_size = compressed_size;
|
||||
|
||||
ring_buffer.commit(frame_metadata);
|
||||
}
|
||||
|
||||
#ifdef DEBUG_OUTPUT
|
||||
|
||||
Reference in New Issue
Block a user