mirror of
https://github.com/paulscherrerinstitute/sf_daq_buffer.git
synced 2026-05-07 20:52:05 +02:00
Upgrade runner
This commit is contained in:
+12
-13
@@ -1,20 +1,22 @@
|
||||
#include <iostream>
|
||||
#include <zmq.hpp>
|
||||
#include "rapidjson/document.h"
|
||||
#include <cstdlib>
|
||||
#include "RingBuffer.hpp"
|
||||
#include "H5ChunkedWriter.hpp"
|
||||
#include <thread>
|
||||
#include "rapidjson/document.h"
|
||||
|
||||
#include "WriterManager.hpp"
|
||||
#include "H5ChunkedWriter.hpp"
|
||||
#include "RingBuffer.hpp"
|
||||
|
||||
using namespace std;
|
||||
|
||||
void write(RingBuffer *ring_buffer, string output_file, WriterManager &manager)
|
||||
void write(RingBuffer *ring_buffer, string output_file, WriterManager *manager)
|
||||
{
|
||||
string dataset_name = "data";
|
||||
HDF5ChunkedWriter writer(output_file, dataset_name);
|
||||
|
||||
// Run until the running flag is set or the ring_buffer is empty.
|
||||
while(manager.is_running() || !ring_buffer->is_empty()) {
|
||||
while(manager->is_running() || !ring_buffer->is_empty()) {
|
||||
pair<FrameMetadata, char*> received_data = ring_buffer->read();
|
||||
|
||||
writer.write_data(received_data.first.frame_index,
|
||||
@@ -24,7 +26,7 @@ void write(RingBuffer *ring_buffer, string output_file, WriterManager &manager)
|
||||
|
||||
ring_buffer->release(received_data.first.buffer_slot_index);
|
||||
|
||||
manager.written_frame(received_data.first.frame_index);
|
||||
manager->written_frame(received_data.first.frame_index);
|
||||
}
|
||||
|
||||
writer.close_file();
|
||||
@@ -35,7 +37,7 @@ void receive(string connect_address, uint64_t n_images, int n_slots=100, int n_i
|
||||
WriterManager manager(n_images);
|
||||
RingBuffer ring_buffer(n_slots);
|
||||
|
||||
thread writer_thread(write, &ring_buffer, "output.h5", &running_flag);
|
||||
thread writer_thread(write, &ring_buffer, "output.h5", &manager);
|
||||
|
||||
zmq::context_t context(n_io_threads);
|
||||
zmq::socket_t receiver(context, ZMQ_PULL);
|
||||
@@ -46,7 +48,7 @@ void receive(string connect_address, uint64_t n_images, int n_slots=100, int n_i
|
||||
|
||||
rapidjson::Document header_parser;
|
||||
|
||||
for (manager.is_running()) {
|
||||
while (manager.is_running()) {
|
||||
// Get the message header.
|
||||
receiver.recv(&message_data);
|
||||
|
||||
@@ -65,11 +67,6 @@ void receive(string connect_address, uint64_t n_images, int n_slots=100, int n_i
|
||||
receiver.recv(&message_data);
|
||||
frame_metadata.frame_bytes_size = message_data.size();
|
||||
|
||||
if (!ring_buffer_initialized) {
|
||||
ring_buffer.initialize(frame_metadata.frame_bytes_size);
|
||||
ring_buffer_initialized = true;
|
||||
}
|
||||
|
||||
// Commit the frame to the buffer.
|
||||
ring_buffer.write(frame_metadata, static_cast<char*>(message_data.data()));
|
||||
|
||||
@@ -83,6 +80,8 @@ int main (int argc, char *argv[])
|
||||
{
|
||||
if (argc != 3) {
|
||||
cout << "Usage: h5_zmq_writer [connection_address] [n_images]" << endl;
|
||||
cout << "\tconnection_address: Address to connect to the stream (PULL). Example: tcp://127.0.0.1:40000" << endl;
|
||||
cout << "\tn_images: Number of images to acquire. 0 for infinity (untill STOP is called)." << endl;
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user