Converted ProcessManager into class

This commit is contained in:
2018-07-18 17:10:44 +02:00
parent 7c12c08a64
commit f3d90a2c4a
2 changed files with 55 additions and 40 deletions
+36 -31
View File
@@ -14,23 +14,32 @@
using namespace std;
void ProcessManager::notify_first_pulse_id(const string& bsread_rest_address, uint64_t pulse_id)
ProcessManager::ProcessManager(WriterManager& writer_manager, ZmqReceiver& receiver, RingBuffer& ring_buffer,
const H5Format& format, uint16_t rest_port, const string& bsread_rest_address) :
writer_manager(writer_manager), receiver(receiver), ring_buffer(ring_buffer), format(format), rest_port(rest_port),
bsread_rest_address(bsread_rest_address)
{
}
void ProcessManager::notify_first_pulse_id(uint64_t pulse_id)
{
string request_address(bsread_rest_address);
// First pulse_id should be an async operation - we do not want to make the writer wait.
async(launch::async, [pulse_id, &bsread_rest_address]{
async(launch::async, [pulse_id, &request_address]{
try {
cout << "Sending first received pulse_id " << pulse_id << " to bsread_rest_address " << bsread_rest_address << endl;
cout << "Sending first received pulse_id " << pulse_id << " to bsread_rest_address " << request_address << endl;
stringstream request;
request << "curl -X PUT " << bsread_rest_address << "/start_pulse_id/" << pulse_id;
request << "curl -X PUT " << request_address << "/start_pulse_id/" << pulse_id;
string request_call(request.str());
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[SfProcessManager::notify_first_pulse_id] Sending request (" << request_call << ")." << endl;
cout << "[ProcessManager::notify_first_pulse_id] Sending request (" << request_call << ")." << endl;
#endif
system(request_call.c_str());
@@ -39,7 +48,7 @@ void ProcessManager::notify_first_pulse_id(const string& bsread_rest_address, ui
});
}
void ProcessManager::notify_last_pulse_id(const string& bsread_rest_address, uint64_t pulse_id)
void ProcessManager::notify_last_pulse_id(uint64_t pulse_id)
{
// Last pulse_id should be a sync operation - we do not want to terminate the process to quickly.
cout << "Sending last received pulse_id " << pulse_id << " to bsread address " << bsread_rest_address << endl;
@@ -55,15 +64,14 @@ void ProcessManager::notify_last_pulse_id(const string& bsread_rest_address, uin
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[SfProcessManager::notify_last_pulse_id] Sending request (" << request_call << ")." << endl;
cout << "[ProcessManager::notify_last_pulse_id] Sending request (" << request_call << ")." << endl;
#endif
system(request_call.c_str());
} catch (...){}
}
void ProcessManager::run_writer(WriterManager& manager, const H5Format& format,
ZmqReceiver& receiver, uint16_t rest_port, const string& bsread_rest_address)
void ProcessManager::run_writer()
{
size_t n_slots = config::ring_buffer_n_slots;
RingBuffer ring_buffer(n_slots);
@@ -72,17 +80,15 @@ void ProcessManager::run_writer(WriterManager& manager, const H5Format& format,
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[ProcessManager::run_writer] Running writer";
cout << " and output_file " << manager.get_output_file();
cout << " and output_file " << writer_manager.get_output_file();
cout << " and n_slots " << n_slots;
cout << endl;
#endif
boost::thread receiver_thread(receive_zmq, boost::ref(manager), boost::ref(ring_buffer),
boost::ref(receiver), boost::ref(format));
boost::thread writer_thread(write_h5, boost::ref(manager),
boost::ref(format), boost::ref(ring_buffer), receiver.get_header_values_type(), boost::ref(bsread_rest_address));
boost::thread receiver_thread(&ProcessManager::receive_zmq, this);
boost::thread writer_thread(&ProcessManager::write_h5, this);
RestApi::start_rest_api(manager, rest_port);
RestApi::start_rest_api(writer_manager, rest_port);
#ifdef DEBUG_OUTPUT
using namespace date;
@@ -91,7 +97,7 @@ void ProcessManager::run_writer(WriterManager& manager, const H5Format& format,
#endif
// In case SIGINT stopped the rest_api.
manager.stop();
writer_manager.stop();
receiver_thread.join();
writer_thread.join();
@@ -103,12 +109,11 @@ void ProcessManager::run_writer(WriterManager& manager, const H5Format& format,
#endif
}
void ProcessManager::receive_zmq(WriterManager& manager, RingBuffer& ring_buffer,
ZmqReceiver& receiver, const H5Format& format)
void ProcessManager::receive_zmq()
{
receiver.connect();
while (manager.is_running()) {
while (writer_manager.is_running()) {
auto frame = receiver.receive();
@@ -135,7 +140,7 @@ void ProcessManager::receive_zmq(WriterManager& manager, RingBuffer& ring_buffer
// Commit the frame to the buffer.
ring_buffer.write(frame_metadata, frame_data);
manager.received_frame(frame_metadata->frame_index);
writer_manager.received_frame(frame_metadata->frame_index);
}
#ifdef DEBUG_OUTPUT
@@ -145,16 +150,15 @@ void ProcessManager::receive_zmq(WriterManager& manager, RingBuffer& ring_buffer
#endif
}
void ProcessManager::write_h5(WriterManager& manager, const H5Format& format, RingBuffer& ring_buffer,
const shared_ptr<unordered_map<string, HeaderDataType>> header_values_type, const string& bsread_rest_address)
void ProcessManager::write_h5()
{
auto writer = get_h5_writer(manager.get_output_file(), 0, config::initial_dataset_size, config::dataset_increase_step);
auto writer = get_h5_writer(writer_manager.get_output_file(), 0, config::initial_dataset_size, config::dataset_increase_step);
auto raw_frames_dataset_name = config::raw_image_dataset_name;
uint64_t last_pulse_id = 0;
// Run until the running flag is set or the ring_buffer is empty.
while(manager.is_running() || !ring_buffer.is_empty()) {
while(writer_manager.is_running() || !ring_buffer.is_empty()) {
if (ring_buffer.is_empty()) {
boost::this_thread::sleep_for(boost::chrono::milliseconds(config::ring_buffer_read_retry_interval));
@@ -202,6 +206,7 @@ void ProcessManager::write_h5(WriterManager& manager, const H5Format& format, Ri
#endif
// Write image metadata if mapping specified.
auto header_values_type = receiver.get_header_values_type();
if (header_values_type) {
for (const auto& header_type : *header_values_type) {
@@ -215,7 +220,7 @@ void ProcessManager::write_h5(WriterManager& manager, const H5Format& format, Ri
if (name == "pulse_id") {
if (!last_pulse_id) {
last_pulse_id = *(reinterpret_cast<uint64_t*>(value.get()));
notify_first_pulse_id(bsread_rest_address, last_pulse_id);
notify_first_pulse_id(last_pulse_id);
} else {
last_pulse_id = *(reinterpret_cast<uint64_t*>(value.get()));
}
@@ -246,12 +251,12 @@ void ProcessManager::write_h5(WriterManager& manager, const H5Format& format, Ri
cout << received_data.first->frame_index << " written in " << metadata_diff_ms << " ms." << endl;
#endif
manager.written_frame(received_data.first->frame_index);
writer_manager.written_frame(received_data.first->frame_index);
}
// Send the last_pulse_id only if it was set.
if (last_pulse_id) {
notify_last_pulse_id(bsread_rest_address, last_pulse_id);
notify_last_pulse_id(last_pulse_id);
}
if (writer->is_file_open()) {
@@ -262,13 +267,13 @@ void ProcessManager::write_h5(WriterManager& manager, const H5Format& format, Ri
#endif
// Wait until all parameters are set or writer is killed.
while (!manager.are_all_parameters_set() && !manager.is_killed()) {
while (!writer_manager.are_all_parameters_set() && !writer_manager.is_killed()) {
boost::this_thread::sleep_for(boost::chrono::milliseconds(config::parameters_read_retry_interval));
}
// Need to check again if we have all parameters to write down the format.
if (manager.are_all_parameters_set()) {
const auto parameters = manager.get_parameters();
if (writer_manager.are_all_parameters_set()) {
const auto parameters = writer_manager.get_parameters();
// Even if we can't write the format, lets try to preserve the data.
try {
@@ -284,7 +289,7 @@ void ProcessManager::write_h5(WriterManager& manager, const H5Format& format, Ri
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[ProcessManager::write] Closing file " << manager.get_output_file() << endl;
cout << "[ProcessManager::write] Closing file " << writer_manager.get_output_file() << endl;
#endif
writer->close_file();