Writer broker notifications

This commit is contained in:
2018-07-12 15:48:51 +02:00
parent c46e74daa5
commit e49d48917a
2 changed files with 75 additions and 5 deletions
+68 -3
View File
@@ -5,6 +5,7 @@
#include <iostream>
#include <memory>
#include <boost/thread.hpp>
#include <future>
#include "RestApi.hpp"
#include "ProcessManager.hpp"
@@ -13,8 +14,55 @@
using namespace std;
void ProcessManager::notify_first_pulse_id(const string& bsread_rest_address, uint64_t pulse_id)
{
// First pulse_id should be an async operation - we do not want to make the writer wait.
async(launch::async, [pulse_id, &bsread_rest_address]{
try {
cout << "Sending first received pulse_id " << pulse_id << " to bsread_rest_address " << bsread_rest_address << endl;
stringstream request;
request << "curl -X PUT " << bsread_rest_address << "start_pulse_id/" << pulse_id;
string request_call(request.str());
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[SfProcessManager::notify_first_pulse_id] Sending request (" << request_call << ")." << endl;
#endif
system(request_call.c_str());
} catch (...){}
});
}
void ProcessManager::notify_last_pulse_id(const string& bsread_rest_address, uint64_t pulse_id)
{
// Last pulse_id should be a sync operation - we do not want to terminate the process to quickly.
cout << "Sending last received pulse_id " << pulse_id << " to bsread address " << bsread_rest_address << endl;
try {
stringstream request;
request << "curl -X PUT " << bsread_rest_address << "stop_pulse_id/" << pulse_id;
cout << "Request: " << request.str() << endl;
string request_call(request.str());
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[SfProcessManager::notify_last_pulse_id] Sending request (" << request_call << ")." << endl;
#endif
system(request_call.c_str());
} catch (...){}
}
void ProcessManager::run_writer(WriterManager& manager, const H5Format& format,
ZmqReceiver& receiver, uint16_t rest_port)
ZmqReceiver& receiver, uint16_t rest_port, const string& bsread_rest_address)
{
size_t n_slots = config::ring_buffer_n_slots;
RingBuffer ring_buffer(n_slots);
@@ -31,7 +79,7 @@ void ProcessManager::run_writer(WriterManager& manager, const H5Format& format,
boost::thread receiver_thread(receive_zmq, boost::ref(manager), boost::ref(ring_buffer),
boost::ref(receiver), boost::ref(format));
boost::thread writer_thread(write_h5, boost::ref(manager),
boost::ref(format), boost::ref(ring_buffer), receiver.get_header_values_type());
boost::ref(format), boost::ref(ring_buffer), receiver.get_header_values_type(), boost::ref(bsread_rest_address));
RestApi::start_rest_api(manager, rest_port);
@@ -97,10 +145,12 @@ void ProcessManager::receive_zmq(WriterManager& manager, RingBuffer& ring_buffer
}
void ProcessManager::write_h5(WriterManager& manager, const H5Format& format, RingBuffer& ring_buffer,
const shared_ptr<unordered_map<string, HeaderDataType>> header_values_type)
const shared_ptr<unordered_map<string, HeaderDataType>> header_values_type, const string& bsread_rest_address)
{
H5Writer writer(manager.get_output_file(), 0, config::initial_dataset_size, config::dataset_increase_step);
auto raw_frames_dataset_name = config::raw_image_dataset_name;
uint64_t last_pulse_id = 0;
// Run until the running flag is set or the ring_buffer is empty.
while(manager.is_running() || !ring_buffer.is_empty()) {
@@ -160,6 +210,16 @@ void ProcessManager::write_h5(WriterManager& manager, const H5Format& format, Ri
auto value = received_data.first->header_values.at(name);
// TODO: Ugly hack until we get the start sequence in the bsread stream itself.
if (name == "pulse_id") {
if (!last_pulse_id) {
last_pulse_id = *(reinterpret_cast<uint64_t*>(value.get()));
notify_first_pulse_id(bsread_rest_address, last_pulse_id);
} else {
last_pulse_id = *(reinterpret_cast<uint64_t*>(value.get()));
}
}
// Header data are fixed to scalars in little endian.
vector<size_t> value_shape = {header_data_type.value_shape};
@@ -188,6 +248,11 @@ void ProcessManager::write_h5(WriterManager& manager, const H5Format& format, Ri
manager.written_frame(received_data.first->frame_index);
}
// Send the last_pulse_id only if it was set.
if (last_pulse_id) {
notify_last_pulse_id(bsread_rest_address, last_pulse_id);
}
if (writer.is_file_open()) {
#ifdef DEBUG_OUTPUT
using namespace date;