WIP PSIWriter

This commit is contained in:
2019-04-25 13:16:04 +02:00
parent 3a8e926e69
commit 60bfb3a664
2 changed files with 32 additions and 79 deletions
+16 -61
View File
@@ -1,70 +1,25 @@
#include "PSIWriter.hpp"
using namespace std;
void PSIWriter::notify_first_pulse_id(uint64_t pulse_id)
{
string request_address(bsread_rest_address);
async(launch::async, [pulse_id, &request_address]{
try {
cout << "Sending first received pulse_id " << pulse_id <<;
cout << " to bsread_rest_address " << request_address << endl;
stringstream request;
request << "curl -X PUT " << request_address;
request << "/start_pulse_id/" << pulse_id;
string request_call(request.str());
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << chrono::system_clock::now() << "]";
cout << "[ProcessManager::notify_first_pulse_id] Sending request";
cout << "(" << request_call << ")." << endl;
#endif
system(request_call.c_str());
} catch (...){}
});
void PSIWriter::join_writer(){
writing_thread.join();
}
void PSIWriter::notify_last_pulse_id(uint64_t pulse_id)
void PSIWriter::run_writer(WriterManager& writer_manager,
string output_file,
uint64_t n_frames)
{
try {
cout << "Sending last received pulse_id " << pulse_id;
cout << " to bsread address " << bsread_rest_address << endl;
stringstream request;
request << "curl -X PUT " << bsread_rest_address;
request << "/stop_pulse_id/" << pulse_id;
cout << "Request: " << request.str() << endl;
string request_call(request.str());
#ifdef DEBUG_OUTPUT
using namespace date;
cout << "[" << chrono::system_clock::now() << "]";
cout << "[ProcessManager::notify_last_pulse_id] Sending request";
cout << "(" << request_call << ")." << endl;
#endif
system(request_call.c_str());
} catch (...){}
writing_thread = boost::thread(&PSIWriter::write_h5,
this,
&writer_manager,
output_file,
n_frames);
}
void PSIWriter::run_writer(std::string output_file, uint64_t n_frames)
{
}
void PSIWriter::write_h5 (string output_file, uint64_t n_frames)
void PSIWriter::write_h5(WriterManager& writer_manager,
string output_file,
uint64_t n_frames)
{
try {
@@ -178,7 +133,7 @@ void PSIWriter::write_h5 (string output_file, uint64_t n_frames)
if (name == "pulse_id") {
if (!last_pulse_id) {
last_pulse_id = *(reinterpret_cast<uint64_t*>(value.get()));
notify_first_pulse_id(last_pulse_id);
//notify_first_pulse_id(last_pulse_id);
} else {
last_pulse_id = *(reinterpret_cast<uint64_t*>(value.get()));
}
@@ -203,7 +158,7 @@ void PSIWriter::write_h5 (string output_file, uint64_t n_frames)
// Send the last_pulse_id only if it was set.
if (last_pulse_id) {
notify_last_pulse_id(last_pulse_id);
//notify_last_pulse_id(last_pulse_id);
}
if (writer->is_file_open()) {
+16 -18
View File
@@ -2,42 +2,40 @@
#ifndef PSIWRITER_H
#define PSIWRITER_H
#include <chrono>
#include "date.h"
#include <boost/thread.hpp>
#include "WriterManager.hpp"
#include "H5Format.hpp"
#include "RingBuffer.hpp"
#include "ZmqReceiver.hpp"
#include <chrono>
#include "date.h"
#include "MetadataBuffer.hpp"
class PSIWriter
{
WriterManager& writer_manager;
ZmqReceiver& receiver;
RingBuffer& ring_buffer;
const H5Format& format;
uint16_t rest_port;
const std::string& bsread_rest_address;
hsize_t frames_per_file;
void notify_first_pulse_id(uint64_t pulse_id);
void notify_last_pulse_id(uint64_t pulse_id);
protected:
void write_h5(std::string output_file, uint64_t n_frames);
boost::thread writing_thread;
void write_h5(WriterManager& writer_manager,
std::string output_file,
uint64_t n_frames);
void write_h5_format(H5::H5File& file);
public:
PSIWriter(WriterManager& writer_manager,
ZmqReceiver& receiver,
RingBuffer& ring_buffer,
PSIWriter(RingBuffer& ring_buffer,
const H5Format& format,
uint16_t rest_port,
const std::string& bsread_rest_address,
hsize_t frames_per_file=0);
void run_writer(std::string output_file, uint64_t n_frames);
void run_writer(WriterManager& writer_manager,
std::string output_file,
uint64_t n_frames);
void join_writer();
};
#endif