Move buffer writing to sf-buffer-writer

This commit is contained in:
2020-09-21 11:05:58 +02:00
parent 8627caed11
commit 29fce99f67
4 changed files with 0 additions and 5 deletions
-36
View File
@@ -1,36 +0,0 @@
#ifndef BINARYWRITER_HPP
#define BINARYWRITER_HPP
#include <string>
#include "formats.hpp"
class BufferBinaryWriter {
const size_t MAX_FILE_BYTES =
buffer_config::FILE_MOD * sizeof(BufferBinaryFormat);
const std::string detector_folder_;
const std::string module_name_;
std::string latest_filename_;
std::string current_output_filename_;
int output_file_fd_;
void open_file(const std::string& filename);
void close_current_file();
public:
BufferBinaryWriter(
const std::string& detector_folder,
const std::string& module_name);
virtual ~BufferBinaryWriter();
void write(const uint64_t pulse_id, const BufferBinaryFormat* buffer);
};
#endif //BINARYWRITER_HPP
-167
View File
@@ -1,167 +0,0 @@
#include "BufferBinaryWriter.hpp"
#include <unistd.h>
#include <iostream>
#include "date.h"
#include <cerrno>
#include <chrono>
#include <cstring>
#include <fcntl.h>
#include "BufferUtils.hpp"
using namespace std;
BufferBinaryWriter::BufferBinaryWriter(
const string& detector_folder,
const string& module_name):
detector_folder_(detector_folder),
module_name_(module_name),
latest_filename_(detector_folder + "/" + module_name + "/LATEST"),
current_output_filename_(""),
output_file_fd_(-1)
{
}
BufferBinaryWriter::~BufferBinaryWriter()
{
close_current_file();
}
void BufferBinaryWriter::write(
uint64_t pulse_id,
const BufferBinaryFormat* buffer)
{
auto current_frame_file =
BufferUtils::get_filename(detector_folder_, module_name_, pulse_id);
if (current_frame_file != current_output_filename_) {
open_file(current_frame_file);
}
size_t n_bytes_offset =
BufferUtils::get_file_frame_index(pulse_id) *
sizeof(BufferBinaryFormat);
auto lseek_result = lseek(output_file_fd_, n_bytes_offset, SEEK_SET);
if (lseek_result < 0) {
stringstream err_msg;
using namespace date;
using namespace chrono;
err_msg << "[" << system_clock::now() << "]";
err_msg << "[BufferBinaryWriter::write]";
err_msg << " Error while lseek on file ";
err_msg << current_output_filename_;
err_msg << " for n_bytes_offset ";
err_msg << n_bytes_offset << ": ";
err_msg << strerror(errno) << endl;
throw runtime_error(err_msg.str());
}
auto n_bytes = ::write(output_file_fd_, buffer, sizeof(BufferBinaryFormat));
if (n_bytes < sizeof(BufferBinaryFormat)) {
stringstream err_msg;
using namespace date;
using namespace chrono;
err_msg << "[" << system_clock::now() << "]";
err_msg << "[BufferBinaryWriter::write]";
err_msg << " Error while writing to file ";
err_msg << current_output_filename_ << ": ";
err_msg << strerror(errno) << endl;
throw runtime_error(err_msg.str());
}
}
void BufferBinaryWriter::open_file(const std::string& filename)
{
close_current_file();
BufferUtils::create_destination_folder(filename);
output_file_fd_ = ::open(filename.c_str(), O_WRONLY | O_CREAT,
S_IRWXU | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH);
if (output_file_fd_ < 0) {
stringstream err_msg;
using namespace date;
using namespace chrono;
err_msg << "[" << system_clock::now() << "]";
err_msg << "[BinaryWriter::open_file]";
err_msg << " Cannot create file ";
err_msg << filename << ": ";
err_msg << strerror(errno) << endl;
throw runtime_error(err_msg.str());
}
// TODO: Remove context if test successful.
/** Setting the buffer file size in advance to try to lower the number of
metadata updates on GPFS. */
{
// TODO: Try instead to use fallocate.
if (lseek(output_file_fd_, MAX_FILE_BYTES, SEEK_SET) < 0) {
stringstream err_msg;
using namespace date;
using namespace chrono;
err_msg << "[" << system_clock::now() << "]";
err_msg << "[BufferBinaryWriter::open_file]";
err_msg << " Error while lseek on end of file ";
err_msg << current_output_filename_;
err_msg << " for MAX_FILE_BYTES ";
err_msg << MAX_FILE_BYTES << ": ";
err_msg << strerror(errno) << endl;
throw runtime_error(err_msg.str());
}
const uint8_t mark = 255;
if(::write(output_file_fd_, &mark, sizeof(mark)) != sizeof(mark)) {
stringstream err_msg;
using namespace date;
using namespace chrono;
err_msg << "[" << system_clock::now() << "]";
err_msg << "[BufferBinaryWriter::open_file]";
err_msg << " Error while writing to file ";
err_msg << current_output_filename_ << ": ";
err_msg << strerror(errno) << endl;
throw runtime_error(err_msg.str());
}
}
current_output_filename_ = filename;
}
void BufferBinaryWriter::close_current_file()
{
if (output_file_fd_ != -1) {
if (close(output_file_fd_) < 0) {
stringstream err_msg;
using namespace date;
using namespace chrono;
err_msg << "[" << system_clock::now() << "]";
err_msg << "[BufferBinaryWriter::close_current_file]";
err_msg << " Error while closing file ";
err_msg << current_output_filename_ << ": ";
err_msg << strerror(errno) << endl;
throw runtime_error(err_msg.str());
}
output_file_fd_ = -1;
BufferUtils::update_latest_file(
latest_filename_, current_output_filename_);
current_output_filename_ = "";
}
}
-4
View File
@@ -10,7 +10,6 @@
#include "buffer_config.hpp"
#include "jungfrau.hpp"
#include "FrameUdpReceiver.hpp"
#include "BufferBinaryWriter.hpp"
using namespace std;
using namespace chrono;
@@ -72,7 +71,6 @@ int main (int argc, char *argv[]) {
uint64_t n_missed_packets = 0;
uint64_t n_corrupted_frames = 0;
BufferBinaryWriter writer(root_folder, device_name);
FrameUdpReceiver receiver(udp_port, source_id);
RamBuffer buffer(detector_name, n_modules);
@@ -84,8 +82,6 @@ int main (int argc, char *argv[]) {
auto pulse_id = receiver.get_frame_from_udp(
binary_buffer->metadata, binary_buffer->data);
writer.write(pulse_id, binary_buffer);
buffer.write_frame(&(binary_buffer->metadata),
&(binary_buffer->data[0]));