From e49d48917adebbc14eed50ecab0de01405a59c0a Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Thu, 12 Jul 2018 15:48:51 +0200 Subject: [PATCH] Writer broker notifications --- lib/src/ProcessManager.cpp | 71 ++++++++++++++++++++++++++++++++++++-- lib/src/ProcessManager.hpp | 9 +++-- 2 files changed, 75 insertions(+), 5 deletions(-) diff --git a/lib/src/ProcessManager.cpp b/lib/src/ProcessManager.cpp index 1dbdbc1..4ae2509 100644 --- a/lib/src/ProcessManager.cpp +++ b/lib/src/ProcessManager.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include "RestApi.hpp" #include "ProcessManager.hpp" @@ -13,8 +14,55 @@ using namespace std; +void ProcessManager::notify_first_pulse_id(const string& bsread_rest_address, uint64_t pulse_id) +{ + // First pulse_id should be an async operation - we do not want to make the writer wait. + async(launch::async, [pulse_id, &bsread_rest_address]{ + try { + cout << "Sending first received pulse_id " << pulse_id << " to bsread_rest_address " << bsread_rest_address << endl; + + stringstream request; + request << "curl -X PUT " << bsread_rest_address << "start_pulse_id/" << pulse_id; + + string request_call(request.str()); + + #ifdef DEBUG_OUTPUT + using namespace date; + cout << "[" << std::chrono::system_clock::now() << "]"; + cout << "[SfProcessManager::notify_first_pulse_id] Sending request (" << request_call << ")." << endl; + #endif + + system(request_call.c_str()); + } catch (...){} + + }); +} + +void ProcessManager::notify_last_pulse_id(const string& bsread_rest_address, uint64_t pulse_id) +{ + // Last pulse_id should be a sync operation - we do not want to terminate the process to quickly. + cout << "Sending last received pulse_id " << pulse_id << " to bsread address " << bsread_rest_address << endl; + + try { + stringstream request; + request << "curl -X PUT " << bsread_rest_address << "stop_pulse_id/" << pulse_id; + + cout << "Request: " << request.str() << endl; + + string request_call(request.str()); + + #ifdef DEBUG_OUTPUT + using namespace date; + cout << "[" << std::chrono::system_clock::now() << "]"; + cout << "[SfProcessManager::notify_last_pulse_id] Sending request (" << request_call << ")." << endl; + #endif + + system(request_call.c_str()); + } catch (...){} +} + void ProcessManager::run_writer(WriterManager& manager, const H5Format& format, - ZmqReceiver& receiver, uint16_t rest_port) + ZmqReceiver& receiver, uint16_t rest_port, const string& bsread_rest_address) { size_t n_slots = config::ring_buffer_n_slots; RingBuffer ring_buffer(n_slots); @@ -31,7 +79,7 @@ void ProcessManager::run_writer(WriterManager& manager, const H5Format& format, boost::thread receiver_thread(receive_zmq, boost::ref(manager), boost::ref(ring_buffer), boost::ref(receiver), boost::ref(format)); boost::thread writer_thread(write_h5, boost::ref(manager), - boost::ref(format), boost::ref(ring_buffer), receiver.get_header_values_type()); + boost::ref(format), boost::ref(ring_buffer), receiver.get_header_values_type(), boost::ref(bsread_rest_address)); RestApi::start_rest_api(manager, rest_port); @@ -97,10 +145,12 @@ void ProcessManager::receive_zmq(WriterManager& manager, RingBuffer& ring_buffer } void ProcessManager::write_h5(WriterManager& manager, const H5Format& format, RingBuffer& ring_buffer, - const shared_ptr> header_values_type) + const shared_ptr> header_values_type, const string& bsread_rest_address) { H5Writer writer(manager.get_output_file(), 0, config::initial_dataset_size, config::dataset_increase_step); auto raw_frames_dataset_name = config::raw_image_dataset_name; + + uint64_t last_pulse_id = 0; // Run until the running flag is set or the ring_buffer is empty. while(manager.is_running() || !ring_buffer.is_empty()) { @@ -160,6 +210,16 @@ void ProcessManager::write_h5(WriterManager& manager, const H5Format& format, Ri auto value = received_data.first->header_values.at(name); + // TODO: Ugly hack until we get the start sequence in the bsread stream itself. + if (name == "pulse_id") { + if (!last_pulse_id) { + last_pulse_id = *(reinterpret_cast(value.get())); + notify_first_pulse_id(bsread_rest_address, last_pulse_id); + } else { + last_pulse_id = *(reinterpret_cast(value.get())); + } + } + // Header data are fixed to scalars in little endian. vector value_shape = {header_data_type.value_shape}; @@ -188,6 +248,11 @@ void ProcessManager::write_h5(WriterManager& manager, const H5Format& format, Ri manager.written_frame(received_data.first->frame_index); } + // Send the last_pulse_id only if it was set. + if (last_pulse_id) { + notify_last_pulse_id(bsread_rest_address, last_pulse_id); + } + if (writer.is_file_open()) { #ifdef DEBUG_OUTPUT using namespace date; diff --git a/lib/src/ProcessManager.hpp b/lib/src/ProcessManager.hpp index 2f33831..8e381cf 100644 --- a/lib/src/ProcessManager.hpp +++ b/lib/src/ProcessManager.hpp @@ -10,12 +10,17 @@ namespace ProcessManager { - void run_writer(WriterManager& manager, const H5Format& format, ZmqReceiver& receiver, uint16_t rest_port); + void run_writer(WriterManager& manager, const H5Format& format, ZmqReceiver& receiver, uint16_t rest_port, + const std::string& bsread_rest_address); void receive_zmq(WriterManager& manager, RingBuffer& ring_buffer, ZmqReceiver& receiver, const H5Format& format); void write_h5(WriterManager& manager, const H5Format& format, RingBuffer& ring_buffer, - const std::shared_ptr> header_values_type); + const std::shared_ptr> header_values_type, + const std::string& bsread_rest_address); + + void notify_first_pulse_id(const std::string& bsread_rest_address, uint64_t pulse_id); + void notify_last_pulse_id(const std::string& bsread_rest_address, uint64_t pulse_id); }; #endif \ No newline at end of file