diff --git a/lib/src/ProcessManager.cpp b/lib/src/ProcessManager.cpp index 0e30e33..c7f3d7a 100644 --- a/lib/src/ProcessManager.cpp +++ b/lib/src/ProcessManager.cpp @@ -14,23 +14,32 @@ using namespace std; -void ProcessManager::notify_first_pulse_id(const string& bsread_rest_address, uint64_t pulse_id) +ProcessManager::ProcessManager(WriterManager& writer_manager, ZmqReceiver& receiver, RingBuffer& ring_buffer, + const H5Format& format, uint16_t rest_port, const string& bsread_rest_address) : + writer_manager(writer_manager), receiver(receiver), ring_buffer(ring_buffer), format(format), rest_port(rest_port), + bsread_rest_address(bsread_rest_address) { +} + +void ProcessManager::notify_first_pulse_id(uint64_t pulse_id) +{ + string request_address(bsread_rest_address); + // First pulse_id should be an async operation - we do not want to make the writer wait. - async(launch::async, [pulse_id, &bsread_rest_address]{ + async(launch::async, [pulse_id, &request_address]{ try { - cout << "Sending first received pulse_id " << pulse_id << " to bsread_rest_address " << bsread_rest_address << endl; + cout << "Sending first received pulse_id " << pulse_id << " to bsread_rest_address " << request_address << endl; stringstream request; - request << "curl -X PUT " << bsread_rest_address << "/start_pulse_id/" << pulse_id; + request << "curl -X PUT " << request_address << "/start_pulse_id/" << pulse_id; string request_call(request.str()); #ifdef DEBUG_OUTPUT using namespace date; cout << "[" << std::chrono::system_clock::now() << "]"; - cout << "[SfProcessManager::notify_first_pulse_id] Sending request (" << request_call << ")." << endl; + cout << "[ProcessManager::notify_first_pulse_id] Sending request (" << request_call << ")." << endl; #endif system(request_call.c_str()); @@ -39,7 +48,7 @@ void ProcessManager::notify_first_pulse_id(const string& bsread_rest_address, ui }); } -void ProcessManager::notify_last_pulse_id(const string& bsread_rest_address, uint64_t pulse_id) +void ProcessManager::notify_last_pulse_id(uint64_t pulse_id) { // Last pulse_id should be a sync operation - we do not want to terminate the process to quickly. cout << "Sending last received pulse_id " << pulse_id << " to bsread address " << bsread_rest_address << endl; @@ -55,15 +64,14 @@ void ProcessManager::notify_last_pulse_id(const string& bsread_rest_address, uin #ifdef DEBUG_OUTPUT using namespace date; cout << "[" << std::chrono::system_clock::now() << "]"; - cout << "[SfProcessManager::notify_last_pulse_id] Sending request (" << request_call << ")." << endl; + cout << "[ProcessManager::notify_last_pulse_id] Sending request (" << request_call << ")." << endl; #endif system(request_call.c_str()); } catch (...){} } -void ProcessManager::run_writer(WriterManager& manager, const H5Format& format, - ZmqReceiver& receiver, uint16_t rest_port, const string& bsread_rest_address) +void ProcessManager::run_writer() { size_t n_slots = config::ring_buffer_n_slots; RingBuffer ring_buffer(n_slots); @@ -72,17 +80,15 @@ void ProcessManager::run_writer(WriterManager& manager, const H5Format& format, using namespace date; cout << "[" << std::chrono::system_clock::now() << "]"; cout << "[ProcessManager::run_writer] Running writer"; - cout << " and output_file " << manager.get_output_file(); + cout << " and output_file " << writer_manager.get_output_file(); cout << " and n_slots " << n_slots; cout << endl; #endif - boost::thread receiver_thread(receive_zmq, boost::ref(manager), boost::ref(ring_buffer), - boost::ref(receiver), boost::ref(format)); - boost::thread writer_thread(write_h5, boost::ref(manager), - boost::ref(format), boost::ref(ring_buffer), receiver.get_header_values_type(), boost::ref(bsread_rest_address)); + boost::thread receiver_thread(&ProcessManager::receive_zmq, this); + boost::thread writer_thread(&ProcessManager::write_h5, this); - RestApi::start_rest_api(manager, rest_port); + RestApi::start_rest_api(writer_manager, rest_port); #ifdef DEBUG_OUTPUT using namespace date; @@ -91,7 +97,7 @@ void ProcessManager::run_writer(WriterManager& manager, const H5Format& format, #endif // In case SIGINT stopped the rest_api. - manager.stop(); + writer_manager.stop(); receiver_thread.join(); writer_thread.join(); @@ -103,12 +109,11 @@ void ProcessManager::run_writer(WriterManager& manager, const H5Format& format, #endif } -void ProcessManager::receive_zmq(WriterManager& manager, RingBuffer& ring_buffer, - ZmqReceiver& receiver, const H5Format& format) +void ProcessManager::receive_zmq() { receiver.connect(); - while (manager.is_running()) { + while (writer_manager.is_running()) { auto frame = receiver.receive(); @@ -135,7 +140,7 @@ void ProcessManager::receive_zmq(WriterManager& manager, RingBuffer& ring_buffer // Commit the frame to the buffer. ring_buffer.write(frame_metadata, frame_data); - manager.received_frame(frame_metadata->frame_index); + writer_manager.received_frame(frame_metadata->frame_index); } #ifdef DEBUG_OUTPUT @@ -145,16 +150,15 @@ void ProcessManager::receive_zmq(WriterManager& manager, RingBuffer& ring_buffer #endif } -void ProcessManager::write_h5(WriterManager& manager, const H5Format& format, RingBuffer& ring_buffer, - const shared_ptr> header_values_type, const string& bsread_rest_address) +void ProcessManager::write_h5() { - auto writer = get_h5_writer(manager.get_output_file(), 0, config::initial_dataset_size, config::dataset_increase_step); + auto writer = get_h5_writer(writer_manager.get_output_file(), 0, config::initial_dataset_size, config::dataset_increase_step); auto raw_frames_dataset_name = config::raw_image_dataset_name; uint64_t last_pulse_id = 0; // Run until the running flag is set or the ring_buffer is empty. - while(manager.is_running() || !ring_buffer.is_empty()) { + while(writer_manager.is_running() || !ring_buffer.is_empty()) { if (ring_buffer.is_empty()) { boost::this_thread::sleep_for(boost::chrono::milliseconds(config::ring_buffer_read_retry_interval)); @@ -202,6 +206,7 @@ void ProcessManager::write_h5(WriterManager& manager, const H5Format& format, Ri #endif // Write image metadata if mapping specified. + auto header_values_type = receiver.get_header_values_type(); if (header_values_type) { for (const auto& header_type : *header_values_type) { @@ -215,7 +220,7 @@ void ProcessManager::write_h5(WriterManager& manager, const H5Format& format, Ri if (name == "pulse_id") { if (!last_pulse_id) { last_pulse_id = *(reinterpret_cast(value.get())); - notify_first_pulse_id(bsread_rest_address, last_pulse_id); + notify_first_pulse_id(last_pulse_id); } else { last_pulse_id = *(reinterpret_cast(value.get())); } @@ -246,12 +251,12 @@ void ProcessManager::write_h5(WriterManager& manager, const H5Format& format, Ri cout << received_data.first->frame_index << " written in " << metadata_diff_ms << " ms." << endl; #endif - manager.written_frame(received_data.first->frame_index); + writer_manager.written_frame(received_data.first->frame_index); } // Send the last_pulse_id only if it was set. if (last_pulse_id) { - notify_last_pulse_id(bsread_rest_address, last_pulse_id); + notify_last_pulse_id(last_pulse_id); } if (writer->is_file_open()) { @@ -262,13 +267,13 @@ void ProcessManager::write_h5(WriterManager& manager, const H5Format& format, Ri #endif // Wait until all parameters are set or writer is killed. - while (!manager.are_all_parameters_set() && !manager.is_killed()) { + while (!writer_manager.are_all_parameters_set() && !writer_manager.is_killed()) { boost::this_thread::sleep_for(boost::chrono::milliseconds(config::parameters_read_retry_interval)); } // Need to check again if we have all parameters to write down the format. - if (manager.are_all_parameters_set()) { - const auto parameters = manager.get_parameters(); + if (writer_manager.are_all_parameters_set()) { + const auto parameters = writer_manager.get_parameters(); // Even if we can't write the format, lets try to preserve the data. try { @@ -284,7 +289,7 @@ void ProcessManager::write_h5(WriterManager& manager, const H5Format& format, Ri #ifdef DEBUG_OUTPUT using namespace date; cout << "[" << std::chrono::system_clock::now() << "]"; - cout << "[ProcessManager::write] Closing file " << manager.get_output_file() << endl; + cout << "[ProcessManager::write] Closing file " << writer_manager.get_output_file() << endl; #endif writer->close_file(); diff --git a/lib/src/ProcessManager.hpp b/lib/src/ProcessManager.hpp index 8e381cf..7e4fc43 100644 --- a/lib/src/ProcessManager.hpp +++ b/lib/src/ProcessManager.hpp @@ -8,19 +8,29 @@ #include #include "date.h" -namespace ProcessManager +class ProcessManager { - void run_writer(WriterManager& manager, const H5Format& format, ZmqReceiver& receiver, uint16_t rest_port, - const std::string& bsread_rest_address); + WriterManager& writer_manager; + ZmqReceiver& receiver; + RingBuffer& ring_buffer; + const H5Format& format; - void receive_zmq(WriterManager& manager, RingBuffer& ring_buffer, ZmqReceiver& receiver, const H5Format& format); + uint16_t rest_port; + const std::string& bsread_rest_address; - void write_h5(WriterManager& manager, const H5Format& format, RingBuffer& ring_buffer, - const std::shared_ptr> header_values_type, - const std::string& bsread_rest_address); + void notify_first_pulse_id(uint64_t pulse_id); + void notify_last_pulse_id(uint64_t pulse_id); + + public: + ProcessManager(WriterManager& writer_manager, ZmqReceiver& receiver, + RingBuffer& ring_buffer, const H5Format& format, uint16_t rest_port, const std::string& bsread_rest_address); + + void run_writer(); + + void receive_zmq(); + + void write_h5(); - void notify_first_pulse_id(const std::string& bsread_rest_address, uint64_t pulse_id); - void notify_last_pulse_id(const std::string& bsread_rest_address, uint64_t pulse_id); }; #endif \ No newline at end of file