merge with current ram_buffer branch

This commit is contained in:
2021-02-17 16:22:39 +01:00
43 changed files with 1052 additions and 779 deletions
+2 -1
View File
@@ -31,6 +31,7 @@ add_subdirectory(
add_subdirectory("core-buffer")
add_subdirectory("jf-udp-recv")
add_subdirectory("jf-buffer-writer")
add_subdirectory("jf-assembler")
add_subdirectory("sf-stream")
add_subdirectory("sf-writer")
#add_subdirectory("jf-live-writer")
add_subdirectory("jf-live-writer")
+7 -2
View File
@@ -51,9 +51,14 @@ namespace BufferUtils
void create_destination_folder(const std::string& output_file);
void* bind_socket(
void* ctx, const std::string& detector_name, const int source_id);
void* ctx,
const std::string& detector_name,
const std::string& stream_name);
void* connect_socket(
void* ctx, const std::string& detector_name, const int source_id);
void* ctx,
const std::string& detector_name,
const std::string& stream_name);
DetectorConfig read_json_config(const std::string& filename);
}
+3 -1
View File
@@ -30,7 +30,9 @@ public:
const uint64_t module_id,
ModuleFrame &meta,
char *data) const;
char* read_image(const uint64_t pulse_id, ImageMetadata &image_meta) const;
char* read_image(const uint64_t pulse_id) const;
void assemble_image(
const uint64_t pulse_id, ImageMetadata &image_meta) const;
};
+5 -3
View File
@@ -20,15 +20,17 @@ namespace buffer_config {
const size_t FOLDER_MOD = 100000;
// Extension of our file format.
const std::string FILE_EXTENSION = ".bin";
// Number of pulses between each statistics print out.
const size_t STATS_MODULO = 100;
// Number of pulses between each statistics print out (buffer_writer, stream2vis...)
const size_t STATS_MODULO = 1000;
// Number of seconds after which statistics is print out (udp_recv)
const size_t STATS_TIME = 10;
// If the RB is empty, how much time to wait before trying to read it again.
const size_t RB_READ_RETRY_INTERVAL_MS = 5;
// How many frames to read at once from file.
const size_t BUFFER_BLOCK_SIZE = 100;
const size_t BUFFER_UDP_N_RECV_MSG = 64;
const size_t BUFFER_UDP_N_RECV_MSG = 128;
// Size of UDP recv buffer
const int BUFFER_UDP_RCVBUF_N_SLOTS = 100;
// 8246 bytes for each UDP packet.
+4 -4
View File
@@ -73,11 +73,11 @@ void BufferUtils::create_destination_folder(const string& output_file)
}
void* BufferUtils::connect_socket(
void* ctx, const string& detector_name, const int source_id)
void* ctx, const string& detector_name, const string& stream_name)
{
string ipc_address = BUFFER_LIVE_IPC_URL +
detector_name + "-" +
to_string(source_id);
stream_name;
void* socket = zmq_socket(ctx, ZMQ_SUB);
if (socket == nullptr) {
@@ -106,11 +106,11 @@ void* BufferUtils::connect_socket(
}
void* BufferUtils::bind_socket(
void* ctx, const string& detector_name, const int source_id)
void* ctx, const string& detector_name, const string& stream_name)
{
string ipc_address = BUFFER_LIVE_IPC_URL +
detector_name + "-" +
to_string(source_id);
stream_name;
void* socket = zmq_socket(ctx, ZMQ_PUB);
+24 -7
View File
@@ -2,6 +2,7 @@
#include <fcntl.h>
#include <cstring>
#include <stdexcept>
#include <sstream>
#include <unistd.h>
#include "RamBuffer.hpp"
#include "buffer_config.hpp"
@@ -118,15 +119,12 @@ void RamBuffer::read_frame(
memcpy(dst_data, src_data, MODULE_N_BYTES);
}
char* RamBuffer::read_image(const uint64_t pulse_id,
ImageMetadata &image_meta) const
void RamBuffer::assemble_image(
const uint64_t pulse_id, ImageMetadata &image_meta) const
{
const size_t slot_n = pulse_id % n_slots_;
ModuleFrame *src_meta = meta_buffer_ + (n_modules_ * slot_n);
char *src_data = image_buffer_ + (image_bytes_ * slot_n);
auto is_pulse_init = false;
auto is_good_image = true;
@@ -153,7 +151,21 @@ char* RamBuffer::read_image(const uint64_t pulse_id,
cout << endl;
#endif
if (frame_meta->pulse_id != pulse_id) {
throw runtime_error("Wrong pulse_id in ram buffer slot.");
stringstream err_msg;
err_msg << "[RamBuffer::read_image]";
err_msg << " Unexpected pulse_id in ram buffer.";
err_msg << " expected=" << pulse_id;
err_msg << " got=" << frame_meta->pulse_id;
for (int i = 0; i < n_modules_; i++) {
ModuleFrame *meta = src_meta + i_module;
err_msg << " (module " << i << ", ";
err_msg << meta->pulse_id << "),";
}
err_msg << endl;
throw runtime_error(err_msg.str());
}
image_meta.pulse_id = frame_meta->pulse_id;
@@ -186,7 +198,12 @@ char* RamBuffer::read_image(const uint64_t pulse_id,
image_meta.frame_index = 0;
image_meta.daq_rec = 0;
}
}
char* RamBuffer::read_image(const uint64_t pulse_id) const
{
const size_t slot_n = pulse_id % n_slots_;
char *src_data = image_buffer_ + (image_bytes_ * slot_n);
return src_data;
}
-2
View File
@@ -4,7 +4,5 @@ target_link_libraries(core-buffer-tests
core-buffer-lib
external
rt
hdf5
hdf5_cpp
zmq
gtest)
+1 -1
View File
@@ -29,7 +29,7 @@ TEST(RamBuffer, simple_store)
}
ImageMetadata image_meta;
buffer.read_image(frame_meta.pulse_id, image_meta);
buffer.assemble_image(frame_meta.pulse_id, image_meta);
ASSERT_EQ(image_meta.pulse_id, frame_meta.pulse_id);
ASSERT_EQ(image_meta.daq_rec, frame_meta.daq_rec);
ASSERT_EQ(image_meta.frame_index, frame_meta.frame_index);
+21
View File
@@ -0,0 +1,21 @@
file(GLOB SOURCES
src/*.cpp)
add_library(jf-assembler-lib STATIC ${SOURCES})
target_include_directories(jf-assembler-lib PUBLIC include/)
target_link_libraries(jf-assembler-lib
external
core-buffer-lib)
add_executable(jf-assembler src/main.cpp)
set_target_properties(jf-assembler PROPERTIES OUTPUT_NAME jf_assembler)
target_link_libraries(jf-assembler
external
core-buffer-lib
jf-assembler-lib
zmq
pthread
rt)
enable_testing()
add_subdirectory(test/)
+179
View File
@@ -0,0 +1,179 @@
# sf-stream
sf-stream is the component that receives a live stream of frame data from
sf-buffers over ZMQ and assembles them into images. This images are then
sent again over ZMQ to external components. There is always only 1 sf-stream
per detector.
It currently has 3 output streams:
- **Full data full meta** rate stream (send all images and meta)
- **Reduced data full meta** rate stream (send less images, but
all meta)
- **Pulse_id** stream (send only the current pulse_id)
In addition to receiving and assembling images, sf-stream also calculates
additional meta and constructs the structures needed to send data in
Array 1.0 protocol.
This component does not guarantee that the streams will always contain all
the data - it can happen that frame resynchronization is needed, and in this
case 1 or more frames could potentially be lost. This happens so rarely that in
practice is not a problem.
## Overview
![image_stream_overview](../docs/sf_daq_buffer-overview-stream.jpg)
sf-stream is a single threaded application (without counting the ZMQ IO threads)
that is used for providing live assembled images to anyone willing to listen.
In addition, it also provides a pulse_id stream, which is the most immediate
pulse_id feedback we currently have in case we need to synchronize external
components to the current machine pulse_id.
## ZMQ receiving
Each ZMQ stream is coming from a separate sf-buffer. This means that we have as
many connections as we have modules in a detector.
Messages are multipart (2 parts) and are received in PUB/SUB mode.
There is no need for special synchronization between modules as we expect that
frames will always be in the correct order and all modules will provide the
same frame more or less at the same time. If any of this 2 conditions is not
met, the detector is not working properly and we cannot guaranty that sf-stream
will work correctly.
Nonetheless we provide the capability to synchronize the streams in image
assembly phase - this is needed rarely, but occasionally happens. In this sort
of hiccups we usually loose only a couple of consecutive images.
### Messages format
Each message is composed by 2 parts:
- Serialization of ModuleFrame in the first part.
- Frame data in the second part.
Module frame is defined as:
```c++
#pragma pack(push)
#pragma pack(1)
struct ModuleFrame {
uint64_t pulse_id;
uint64_t frame_index;
uint64_t daq_rec;
uint64_t n_recv_packets;
uint64_t module_id;
};
#pragma pack(pop)
```
The frame data is a 1MB (1024*512 pixels * 2 bytes/pixel) blob of data in
**uint16** representing the detector image.
## Image assembly
We first synchronize the modules. We do this by reading all sockets and
deciding the largest frame pulse_id among them (max_pulse_id). We then calculate
the diff between a specific socket pulse_id and the max_pulse_id.
This difference tells us how many messages we need to discard from a specific socket.
This discarding is the source of possible missing images in the output stream.
It can happen in 3 cases:
- At least one of the detector modules did not sent any packets for the specific
pulse_id.
- All the packets from a specific module for a pulse_id were lost before UDP
receiving them.
- ZMQ HWM was reached (either on the sf-buffer or sf-stream) and the message was
dropped.
All this 3 cases are highly unlikely, so synchronization is mostly needed when
first starting sf-stream. Different sockets connect to sf-buffers at different
times. Apart from the initial synchronization there should be no need to
re-synchronize modules in a healthy running environment.
If an image is missing any ZMQ messages from sf-buffers (not all modules data
arrived), the image will be dropped. We do not do partial reconstruction in
sf-stream. However, it is important to note, that this does not cover the case
where frames are incomplete (missing UDP packets on sf-buffer) - we still
assemble this images as long as at least 1 packet/frame for a specific pulse_id
arrived.
## ZMQ sending
We devide the ZMQ sending to 3 types of stream:
- Data processing stream. This is basically the complete stream from
the detector with all meta and data. It can be described as full data full
meta stream. Only 1 client at the time can be connected to this stream
(PUSH/PULL for load balancing).
- Live viewing stream. This is a reduced data full meta stream. We send
meta for all frames, but data only for subset of them (10Hz, for example).
Any number of clients can connect to the 10Hz stream, because we use PUB/SUB
for this socket.
- Pulse_id stream. This is a stream that sends out only the current pulse_id.
It can be used to synchronize any external system with the current pulse_id
being recorded. Multiple clients can connect to this stream.
In the data processing and live viewing stream we use
[Array 1.0](https://github.com/paulscherrerinstitute/htypes/blob/master/array-1.0.md)
as our protocol to be compatible with currently available external components.
We use following fields in the JSON header:
| Name | Type | Comment |
| --- | --- | --- |
| pulse_id | uint64 |bunchid from detector header|
|frame|uint64|frame_index from detector header|
|is_good_frame|bool|true if all packets for this frame are present|
|daq_rec|uint32|daqrec from detector header|
|pedestal_file|string|Path to pedestal file|
|gain_file|string|Path to gain file|
|number_frames_expected|int|Number of expected frames|
|run_name|string|Name of the run|
|detector_name|string|Name of the detector|
|htype|string|Value: "array-1.0"|
|type|string|Value: "uint16"|
|shape|Array[uint64]|Shape of the image in stream|
### Full data full meta stream
This stream runs at detector frequency and uses PUSH/PULL to distribute data
to max 1 client (this client can have many processes, but it needs to be a
single logical entity, since the images are evenly distributed to all
connected sockets).
![image_full_stream](../docs/sf_daq_buffer-FullStream.jpg)
The goal here is to provide a complete copy of the detector image stream
for purposes of online analysis. Given the large amount of data on this
stream only "pre-approved" applications that can handle the load should be
attached here.
### Reduced data full meta stream
This streams also runs at detector frequency for JSON headers (meta), but
it sends only part of the images in the stream. The rest of the images are
sent as empty buffers (the receiver needs to be aware of this behaviour, as
Array 1.0 alone does not define it).
![image_reduced_stream](../docs/sf_daq_buffer-ReducedStream.jpg)
This is the lightweight version of the image stream. Any number of clients
can connect to this stream (PUB/SUB) but no client can do load
balancing automatically (it would require PUSH/PULL).
This is a "public interface" for anyone who wants to get detector data live,
and can do with only a subset of images.
### Pulse_id stream
This stream runs ar detector frequency in PUB/SUB mode. The only thing it
does is sends out the pulse_id (of the just received image) in uint64_t
format.
![image_pulse_stream](../docs/sf_daq_buffer-PulseStream.jpg)
This is also a "public interface" for anyone who wants to get the current
system pulse_id.
+28
View File
@@ -0,0 +1,28 @@
#ifndef SF_DAQ_BUFFER_ASSEMBLERSTATS_HPP
#define SF_DAQ_BUFFER_ASSEMBLERSTATS_HPP
#include <chrono>
#include <string>
#include <formats.hpp>
class AssemblerStats {
const std::string detector_name_;
const size_t stats_modulo_;
int image_counter_;
int n_corrupted_images_;
int n_sync_lost_images_;
std::chrono::time_point<std::chrono::steady_clock> stats_interval_start_;
void reset_counters();
void print_stats();
public:
AssemblerStats(const std::string &detector_name,
const size_t stats_modulo);
void record_stats(const ImageMetadata &meta, const uint32_t n_lost_pulses);
};
#endif //SF_DAQ_BUFFER_ASSEMBLERSTATS_HPP
@@ -0,0 +1,34 @@
#ifndef SF_DAQ_BUFFER_ZMQPULSESYNCRECEIVER_HPP
#define SF_DAQ_BUFFER_ZMQPULSESYNCRECEIVER_HPP
#include <cstddef>
#include <string>
#include <vector>
#include "formats.hpp"
struct PulseAndSync {
const uint64_t pulse_id;
const uint32_t n_lost_pulses;
};
class ZmqPulseSyncReceiver {
void* ctx_;
const int n_modules_;
std::vector<void*> sockets_;
public:
ZmqPulseSyncReceiver(
void* ctx,
const std::string& detector_name,
const int n_modules);
~ZmqPulseSyncReceiver();
PulseAndSync get_next_pulse_id() const;
};
#endif //SF_DAQ_BUFFER_ZMQPULSESYNCRECEIVER_HPP
+14
View File
@@ -0,0 +1,14 @@
namespace assembler_config
{
// N of IO threads to send image metadata.
const int ASSEMBLER_ZMQ_IO_THREADS = 1;
// If the modules are offset more than 1000 pulses, crush.
const uint64_t PULSE_OFFSET_LIMIT = 100;
// Number of times we try to re-sync in case of failure.
const int SYNC_RETRY_LIMIT = 3;
// Number of pulses between each statistics print out.
const size_t ASSEMBLER_STATS_MODULO = 1000;
}
+62
View File
@@ -0,0 +1,62 @@
#include "AssemblerStats.hpp"
#include <iostream>
using namespace std;
using namespace chrono;
AssemblerStats::AssemblerStats(
const std::string &detector_name,
const size_t stats_modulo) :
detector_name_(detector_name),
stats_modulo_(stats_modulo)
{
reset_counters();
}
void AssemblerStats::reset_counters()
{
image_counter_ = 0;
n_sync_lost_images_ = 0;
n_corrupted_images_ = 0;
stats_interval_start_ = steady_clock::now();
}
void AssemblerStats::record_stats(
const ImageMetadata &meta, const uint32_t n_lost_pulses)
{
image_counter_++;
n_sync_lost_images_ += n_lost_pulses;
if (!meta.is_good_image) {
n_corrupted_images_++;
}
if (image_counter_ == stats_modulo_) {
print_stats();
reset_counters();
}
}
void AssemblerStats::print_stats()
{
auto interval_ms_duration = duration_cast<milliseconds>(
steady_clock::now()-stats_interval_start_).count();
// * 1000 because milliseconds, + 250 because of truncation.
int rep_rate = ((image_counter_ * 1000) + 250) / interval_ms_duration;
uint64_t timestamp = time_point_cast<nanoseconds>(
system_clock::now()).time_since_epoch().count();
// Output in InfluxDB line protocol
cout << "jf_assembler";
cout << ",detector_name=" << detector_name_;
cout << " ";
cout << "n_processed_images=" << image_counter_ << "i";
cout << ",n_corrupted_images=" << n_corrupted_images_ << "i";
cout << ",n_sync_lost_images=" << n_sync_lost_images_ << "i";
cout << ",repetition_rate=" << rep_rate << "i";
cout << " ";
cout << timestamp;
cout << endl;
}
+115
View File
@@ -0,0 +1,115 @@
#include "ZmqPulseSyncReceiver.hpp"
#include "BufferUtils.hpp"
#include <zmq.h>
#include <stdexcept>
#include <sstream>
#include <chrono>
#include <algorithm>
#include <iostream>
#include "assembler_config.hpp"
using namespace std;
using namespace chrono;
using namespace buffer_config;
using namespace assembler_config;
ZmqPulseSyncReceiver::ZmqPulseSyncReceiver(
void * ctx,
const string& detector_name,
const int n_modules) :
ctx_(ctx),
n_modules_(n_modules)
{
sockets_.reserve(n_modules_);
for (int i=0; i<n_modules_; i++) {
sockets_.push_back(
BufferUtils::connect_socket(ctx_, detector_name, to_string(i)));
}
}
ZmqPulseSyncReceiver::~ZmqPulseSyncReceiver()
{
for (auto& socket:sockets_) {
zmq_close(socket);
}
}
PulseAndSync ZmqPulseSyncReceiver::get_next_pulse_id() const
{
uint64_t pulses[n_modules_];
bool modules_in_sync = true;
for (int i = 0; i < n_modules_; i++) {
zmq_recv(sockets_[i], &pulses[i], sizeof(uint64_t), 0);
if (pulses[0] != pulses[i]) {
modules_in_sync = false;
}
}
if (modules_in_sync) {
return {pulses[0], 0};
}
// How many pulses we lost in total to get the next pulse_id.
uint32_t n_lost_pulses = 0;
for (int i_sync=0; i_sync < SYNC_RETRY_LIMIT; i_sync++) {
uint64_t min_pulse_id = numeric_limits<uint64_t>::max();;
uint64_t max_pulse_id = 0;
for (int i = 0; i < n_modules_; i++) {
min_pulse_id = min(min_pulse_id, pulses[i]);
max_pulse_id = max(max_pulse_id, pulses[i]);
}
auto max_diff = max_pulse_id - min_pulse_id;
if (max_diff > PULSE_OFFSET_LIMIT) {
stringstream err_msg;
err_msg << "[ZmqPulseSyncReceiver::get_next_pulse_id]";
err_msg << " PULSE_OFFSET_LIMIT exceeded.";
err_msg << " max_diff=" << max_diff << " pulses.";
for (int i = 0; i < n_modules_; i++) {
err_msg << " (module " << i << ", ";
err_msg << pulses[i] << "),";
}
err_msg << endl;
throw runtime_error(err_msg.str());
}
modules_in_sync = true;
// Max pulses we lost in this sync attempt.
uint32_t i_sync_lost_pulses = 0;
for (int i = 0; i < n_modules_; i++) {
// How many pulses we lost for this specific module.
uint32_t i_module_lost_pulses = 0;
while (pulses[i] < max_pulse_id) {
zmq_recv(sockets_[i], &pulses[i], sizeof(uint64_t), 0);
i_module_lost_pulses++;
}
i_sync_lost_pulses = max(i_sync_lost_pulses, i_module_lost_pulses);
if (pulses[i] != max_pulse_id) {
modules_in_sync = false;
}
}
n_lost_pulses += i_sync_lost_pulses;
if (modules_in_sync) {
return {pulses[0], n_lost_pulses};
}
}
stringstream err_msg;
err_msg << "[ZmqLiveReceiver::get_next_pulse_id]";
err_msg << " SYNC_RETRY_LIMIT exceeded.";
err_msg << endl;
throw runtime_error(err_msg.str());
}
+47
View File
@@ -0,0 +1,47 @@
#include <iostream>
#include <string>
#include <zmq.h>
#include <RamBuffer.hpp>
#include <BufferUtils.hpp>
#include <AssemblerStats.hpp>
#include "assembler_config.hpp"
#include "ZmqPulseSyncReceiver.hpp"
using namespace std;
using namespace buffer_config;
using namespace assembler_config;
int main (int argc, char *argv[])
{
if (argc != 2) {
cout << endl;
cout << "Usage: jf_assembler [detector_json_filename]" << endl;
cout << "\tdetector_json_filename: detector config file path." << endl;
cout << endl;
exit(-1);
}
auto config = BufferUtils::read_json_config(string(argv[1]));
auto const stream_name = "assembler";
auto ctx = zmq_ctx_new();
zmq_ctx_set(ctx, ZMQ_IO_THREADS, ASSEMBLER_ZMQ_IO_THREADS);
auto sender = BufferUtils::bind_socket(
ctx, config.detector_name, stream_name);
ZmqPulseSyncReceiver receiver(ctx, config.detector_name, config.n_modules);
RamBuffer ram_buffer(config.detector_name, config.n_modules);
AssemblerStats stats(config.detector_name, ASSEMBLER_STATS_MODULO);
ImageMetadata meta;
while (true) {
auto pulse_and_sync = receiver.get_next_pulse_id();
ram_buffer.assemble_image(pulse_and_sync.pulse_id, meta);
zmq_send(sender, &meta, sizeof(meta), 0);
stats.record_stats(meta, pulse_and_sync.n_lost_pulses);
}
}
+7
View File
@@ -0,0 +1,7 @@
add_executable(jf-assembler-tests main.cpp)
target_link_libraries(jf-assembler-tests
jf-assembler-lib
gtest
)
+8
View File
@@ -0,0 +1,8 @@
#include "gtest/gtest.h"
using namespace std;
int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
+1 -1
View File
@@ -51,7 +51,7 @@ void BufferStats::print_stats()
system_clock::now()).time_since_epoch().count();
// Output in InfluxDB line protocol
cout << "jf-buffer-writer";
cout << "jf_buffer_writer";
cout << ",detector_name=" << detector_name_;
cout << ",module_name=M" << module_id_;
cout << " ";
+2 -1
View File
@@ -39,7 +39,8 @@ int main (int argc, char *argv[]) {
BufferStats stats(config.detector_name, module_id, STATS_MODULO);
auto ctx = zmq_ctx_new();
auto socket = connect_socket(ctx, config.detector_name, module_id);
auto socket = connect_socket(
ctx, config.detector_name, to_string(module_id));
auto file_buff = new BufferBinaryFormat();
uint64_t pulse_id;
-28
View File
@@ -1,28 +0,0 @@
#ifndef SF_DAQ_BUFFER_BINARYREADER_HPP
#define SF_DAQ_BUFFER_BINARYREADER_HPP
#include <formats.hpp>
class BinaryReader {
const std::string detector_folder_;
const std::string module_name_;
std::string current_input_file_;
int input_file_fd_;
void open_file(const std::string& filename);
void close_current_file();
public:
BinaryReader(const std::string &detector_folder,
const std::string &module_name);
~BinaryReader();
void get_frame(const uint64_t pulse_id, BufferBinaryFormat *buffer);
};
#endif //SF_DAQ_BUFFER_BINARYREADER_HPP
@@ -0,0 +1,33 @@
#ifndef BINARYWRITER_HPP
#define BINARYWRITER_HPP
#include <string>
#include "formats.hpp"
class ImageBinaryWriter {
const size_t MAX_FILE_BYTES =
buffer_config::FILE_MOD * sizeof(BufferBinaryFormat);
const std::string detector_folder_;
std::string latest_filename_;
std::string current_output_filename_;
int output_file_fd_;
void open_file(const std::string& filename);
void close_current_file();
public:
ImageBinaryWriter(const std::string& detector_folder);
virtual ~ImageBinaryWriter();
void write(const uint64_t pulse_id, const BufferBinaryFormat* buffer);
};
#endif //BINARYWRITER_HPP
-49
View File
@@ -1,49 +0,0 @@
#ifndef SFWRITER_HPP
#define SFWRITER_HPP
#include <memory>
#include <string>
#include <H5Cpp.h>
#include "LiveImageAssembler.hpp"
const auto& H5_UINT64 = H5::PredType::NATIVE_UINT64;
const auto& H5_UINT32 = H5::PredType::NATIVE_UINT32;
const auto& H5_UINT16 = H5::PredType::NATIVE_UINT16;
const auto& H5_UINT8 = H5::PredType::NATIVE_UINT8;
class JFH5LiveWriter {
const std::string detector_name_;
const size_t n_modules_;
const size_t n_pulses_;
size_t write_index_;
H5::H5File file_;
H5::DataSet image_dataset_;
uint64_t* b_pulse_id_;
uint64_t* b_frame_index_;
uint32_t* b_daq_rec_;
uint8_t* b_is_good_frame_ ;
void init_file(const std::string &output_file);
void write_dataset(const std::string name,
const void *buffer,
const H5::PredType &type);
void write_metadata();
std::string get_detector_name(const std::string& detector_folder);
void close_file();
public:
JFH5LiveWriter(const std::string& output_file,
const std::string& detector_folder,
const size_t n_modules,
const size_t n_pulses);
~JFH5LiveWriter();
void write(const ImageMetadata* metadata, const char* data);
};
#endif //SFWRITER_HPP
@@ -1,51 +0,0 @@
#ifndef SF_DAQ_BUFFER_LIVEIMAGEASSEMBLER_HPP
#define SF_DAQ_BUFFER_LIVEIMAGEASSEMBLER_HPP
#include <atomic>
#include "buffer_config.hpp"
#include "formats.hpp"
const uint64_t IA_EMPTY_SLOT_VALUE = 0;
struct ImageMetadata
{
uint64_t pulse_id;
uint64_t frame_index;
uint32_t daq_rec;
uint8_t is_good_image;
};
class LiveImageAssembler {
const size_t n_modules_;
const size_t image_buffer_slot_n_bytes_;
char* image_buffer_;
ImageMetadata* image_meta_buffer_;
ModuleFrame* frame_meta_buffer_;
std::atomic_int* buffer_status_;
std::atomic_uint64_t* buffer_pulse_id_;
size_t get_data_offset(const uint64_t slot_id, const int i_module);
size_t get_frame_metadata_offset(const uint64_t slot_id, const int i_module);
public:
LiveImageAssembler(const size_t n_modules);
virtual ~LiveImageAssembler();
bool is_slot_free(const uint64_t pulse_id);
bool is_slot_full(const uint64_t pulse_id);
void process(const uint64_t pulse_id,
const int i_module,
const BufferBinaryFormat* block_buffer);
void free_slot(const uint64_t pulse_id);
ImageMetadata* get_metadata_buffer(const uint64_t pulse_id);
char* get_data_buffer(const uint64_t pulse_id);
};
#endif //SF_DAQ_BUFFER_LIVEIMAGEASSEMBLER_HPP
+30
View File
@@ -0,0 +1,30 @@
#include <cstddef>
#include <formats.hpp>
#include <chrono>
#ifndef SF_DAQ_BUFFER_FRAMESTATS_HPP
#define SF_DAQ_BUFFER_FRAMESTATS_HPP
class WriterStats {
const std::string detector_name_;
size_t stats_modulo_;
int image_counter_;
uint32_t total_buffer_write_us_;
uint32_t max_buffer_write_us_;
std::chrono::time_point<std::chrono::steady_clock> stats_interval_start_;
void reset_counters();
void print_stats();
public:
WriterStats(
const std::string &detector_name,
const size_t stats_modulo);
void start_image_write();
void end_image_write();
};
#endif //SF_DAQ_BUFFER_FRAMESTATS_HPP
@@ -2,8 +2,6 @@
namespace live_writer_config
{
// MS to retry reading from the image assembler.
const size_t ASSEMBLER_RETRY_MS = 5;
// Number of slots in the reconstruction buffer.
const size_t WRITER_IA_N_SLOTS = 200;
// N of IO threads to receive data from modules.
const int LIVE_ZMQ_IO_THREADS = 1;
}
-103
View File
@@ -1,103 +0,0 @@
#include "BinaryReader.hpp"
#include <unistd.h>
#include <sstream>
#include <cstring>
#include <fcntl.h>
#include <stdexcept>
#include "BufferUtils.hpp"
#include "buffer_config.hpp"
using namespace std;
using namespace buffer_config;
BinaryReader::BinaryReader(
const std::string &detector_folder,
const std::string &module_name) :
detector_folder_(detector_folder),
module_name_(module_name),
current_input_file_(""),
input_file_fd_(-1)
{}
BinaryReader::~BinaryReader()
{
close_current_file();
}
void BinaryReader::get_frame(
const uint64_t pulse_id, BufferBinaryFormat* buffer)
{
auto current_frame_file = BufferUtils::get_filename(
detector_folder_, module_name_, pulse_id);
if (current_frame_file != current_input_file_) {
open_file(current_frame_file);
}
size_t file_index = BufferUtils::get_file_frame_index(pulse_id);
size_t n_bytes_offset = file_index * sizeof(BufferBinaryFormat);
auto lseek_result = lseek(input_file_fd_, n_bytes_offset, SEEK_SET);
if (lseek_result < 0) {
stringstream err_msg;
err_msg << "[BinaryReader::get_frame]";
err_msg << " Error while lseek on file ";
err_msg << current_input_file_ << " for n_bytes_offset ";
err_msg << n_bytes_offset << ": " << strerror(errno) << endl;
throw runtime_error(err_msg.str());
}
auto n_bytes = ::read(input_file_fd_, buffer, sizeof(BufferBinaryFormat));
if (n_bytes < sizeof(BufferBinaryFormat)) {
stringstream err_msg;
err_msg << "[BinaryReader::get_block]";
err_msg << " Error while reading from file ";
err_msg << current_input_file_ << ": " << strerror(errno) << endl;
throw runtime_error(err_msg.str());
}
}
void BinaryReader::open_file(const std::string& filename)
{
close_current_file();
input_file_fd_ = open(filename.c_str(), O_RDONLY);
if (input_file_fd_ < 0) {
stringstream err_msg;
err_msg << "[BinaryReader::open_file]";
err_msg << " Cannot open file " << filename << ": ";
err_msg << strerror(errno) << endl;
throw runtime_error(err_msg.str());
}
current_input_file_ = filename;
}
void BinaryReader::close_current_file()
{
if (input_file_fd_ != -1) {
if (close(input_file_fd_) < 0) {
stringstream err_msg;
err_msg << "[BinaryWriter::close_current_file]";
err_msg << " Error while closing file " << current_input_file_;
err_msg << ": " << strerror(errno) << endl;
throw runtime_error(err_msg.str());
}
input_file_fd_ = -1;
current_input_file_ = "";
}
}
+165
View File
@@ -0,0 +1,165 @@
#include "ImageBinaryWriter.hpp"
#include <unistd.h>
#include <iostream>
#include "date.h"
#include <cerrno>
#include <chrono>
#include <cstring>
#include <fcntl.h>
#include "BufferUtils.hpp"
using namespace std;
ImageBinaryWriter::ImageBinaryWriter(
const string& detector_folder):
detector_folder_(detector_folder),
latest_filename_(detector_folder + "/LATEST"),
current_output_filename_(""),
output_file_fd_(-1)
{
}
ImageBinaryWriter::~ImageBinaryWriter()
{
close_current_file();
}
void ImageBinaryWriter::write(
const uint64_t pulse_id,
const BufferBinaryFormat* buffer)
{
auto current_frame_file =
BufferUtils::get_filename(detector_folder_, module_name_, pulse_id);
if (current_frame_file != current_output_filename_) {
open_file(current_frame_file);
}
size_t n_bytes_offset =
BufferUtils::get_file_frame_index(pulse_id) *
sizeof(BufferBinaryFormat);
auto lseek_result = lseek(output_file_fd_, n_bytes_offset, SEEK_SET);
if (lseek_result < 0) {
stringstream err_msg;
using namespace date;
using namespace chrono;
err_msg << "[" << system_clock::now() << "]";
err_msg << "[BufferBinaryWriter::write]";
err_msg << " Error while lseek on file ";
err_msg << current_output_filename_;
err_msg << " for n_bytes_offset ";
err_msg << n_bytes_offset << ": ";
err_msg << strerror(errno) << endl;
throw runtime_error(err_msg.str());
}
auto n_bytes = ::write(output_file_fd_, buffer, sizeof(BufferBinaryFormat));
if (n_bytes < sizeof(BufferBinaryFormat)) {
stringstream err_msg;
using namespace date;
using namespace chrono;
err_msg << "[" << system_clock::now() << "]";
err_msg << "[BufferBinaryWriter::write]";
err_msg << " Error while writing to file ";
err_msg << current_output_filename_ << ": ";
err_msg << strerror(errno) << endl;
throw runtime_error(err_msg.str());
}
}
void ImageBinaryWriter::open_file(const std::string& filename)
{
close_current_file();
BufferUtils::create_destination_folder(filename);
output_file_fd_ = ::open(filename.c_str(), O_WRONLY | O_CREAT,
S_IRWXU | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH);
if (output_file_fd_ < 0) {
stringstream err_msg;
using namespace date;
using namespace chrono;
err_msg << "[" << system_clock::now() << "]";
err_msg << "[BinaryWriter::open_file]";
err_msg << " Cannot create file ";
err_msg << filename << ": ";
err_msg << strerror(errno) << endl;
throw runtime_error(err_msg.str());
}
// TODO: Remove context if test successful.
/** Setting the buffer file size in advance to try to lower the number of
metadata updates on GPFS. */
{
// TODO: Try instead to use fallocate.
if (lseek(output_file_fd_, MAX_FILE_BYTES, SEEK_SET) < 0) {
stringstream err_msg;
using namespace date;
using namespace chrono;
err_msg << "[" << system_clock::now() << "]";
err_msg << "[BufferBinaryWriter::open_file]";
err_msg << " Error while lseek on end of file ";
err_msg << current_output_filename_;
err_msg << " for MAX_FILE_BYTES ";
err_msg << MAX_FILE_BYTES << ": ";
err_msg << strerror(errno) << endl;
throw runtime_error(err_msg.str());
}
const uint8_t mark = 255;
if(::write(output_file_fd_, &mark, sizeof(mark)) != sizeof(mark)) {
stringstream err_msg;
using namespace date;
using namespace chrono;
err_msg << "[" << system_clock::now() << "]";
err_msg << "[BufferBinaryWriter::open_file]";
err_msg << " Error while writing to file ";
err_msg << current_output_filename_ << ": ";
err_msg << strerror(errno) << endl;
throw runtime_error(err_msg.str());
}
}
current_output_filename_ = filename;
}
void ImageBinaryWriter::close_current_file()
{
if (output_file_fd_ != -1) {
if (close(output_file_fd_) < 0) {
stringstream err_msg;
using namespace date;
using namespace chrono;
err_msg << "[" << system_clock::now() << "]";
err_msg << "[BufferBinaryWriter::close_current_file]";
err_msg << " Error while closing file ";
err_msg << current_output_filename_ << ": ";
err_msg << strerror(errno) << endl;
throw runtime_error(err_msg.str());
}
output_file_fd_ = -1;
BufferUtils::update_latest_file(
latest_filename_, current_output_filename_);
current_output_filename_ = "";
}
}
-133
View File
@@ -1,133 +0,0 @@
#include "JFH5LiveWriter.hpp"
#include <cstring>
#include <hdf5_hl.h>
#include "buffer_config.hpp"
using namespace std;
using namespace buffer_config;
JFH5LiveWriter::JFH5LiveWriter(const string& output_file,
const string& detector_folder,
const size_t n_modules,
const size_t n_pulses) :
detector_name_(get_detector_name(detector_folder)),
n_modules_(n_modules),
n_pulses_(n_pulses),
write_index_(0)
{
b_pulse_id_ = new uint64_t[n_pulses_];
b_frame_index_= new uint64_t[n_pulses_];
b_daq_rec_ = new uint32_t[n_pulses_];
b_is_good_frame_ = new uint8_t[n_pulses_];
init_file(output_file);
}
void JFH5LiveWriter::init_file(const string& output_file)
{
file_ = H5::H5File(output_file, H5F_ACC_TRUNC);
file_.createGroup("/data");
file_.createGroup("/data/" + detector_name_);
H5::DataSpace att_space(H5S_SCALAR);
H5::DataType data_type = H5::StrType(0, H5T_VARIABLE);
file_.createGroup("/general");
auto detector_dataset = file_.createDataSet(
"/general/detector_name", data_type ,att_space);
detector_dataset.write(detector_name_, data_type);
hsize_t image_dataset_dims[3] =
{n_pulses_, n_modules_ * MODULE_Y_SIZE, MODULE_X_SIZE};
H5::DataSpace image_dataspace(3, image_dataset_dims);
hsize_t image_dataset_chunking[3] =
{1, n_modules_ * MODULE_Y_SIZE, MODULE_X_SIZE};
H5::DSetCreatPropList image_dataset_properties;
image_dataset_properties.setChunk(3, image_dataset_chunking);
image_dataset_ = file_.createDataSet(
"/data/" + detector_name_ + "/data",
H5::PredType::NATIVE_UINT16,
image_dataspace,
image_dataset_properties);
}
std::string JFH5LiveWriter::get_detector_name(const string& detector_folder)
{
size_t last_separator;
if ((last_separator = detector_folder.rfind("/")) == string::npos) {
return detector_folder;
}
return detector_folder.substr(last_separator + 1);
}
JFH5LiveWriter::~JFH5LiveWriter()
{
close_file();
delete[] b_pulse_id_;
delete[] b_frame_index_;
delete[] b_daq_rec_;
delete[] b_is_good_frame_;
}
void JFH5LiveWriter::write_dataset(
const string name, const void* buffer, const H5::PredType& type)
{
hsize_t b_m_dims[] = {n_pulses_};
H5::DataSpace b_m_space (1, b_m_dims);
hsize_t f_m_dims[] = {n_pulses_, 1};
H5::DataSpace f_m_space(2, f_m_dims);
auto complete_name = "/data/" + detector_name_ + "/" + name;
auto dataset = file_.createDataSet(complete_name, type, f_m_space);
dataset.write(buffer, type, b_m_space, f_m_space);
dataset.close();
}
void JFH5LiveWriter::write_metadata()
{
write_dataset("pulse_id", &b_pulse_id_, H5::PredType::NATIVE_UINT64);
write_dataset("frame_index", &b_frame_index_, H5::PredType::NATIVE_UINT64);
write_dataset("daq_rec", &b_daq_rec_, H5::PredType::NATIVE_UINT32);
write_dataset("is_good_frame", &b_is_good_frame_, H5::PredType::NATIVE_UINT8);
}
void JFH5LiveWriter::close_file()
{
if (file_.getId() == -1) {
return;
}
image_dataset_.close();
write_metadata();
file_.close();
}
void JFH5LiveWriter::write(const ImageMetadata* metadata, const char* data)
{
hsize_t offset[] = {write_index_, 0, 0};
H5DOwrite_chunk(image_dataset_.getId(), H5P_DEFAULT, 0,
offset, MODULE_N_BYTES * n_modules_, data);
b_pulse_id_[write_index_] = metadata->pulse_id;
b_frame_index_[write_index_] = metadata->frame_index;
b_daq_rec_[write_index_] = metadata->daq_rec;
b_is_good_frame_[write_index_] = metadata->is_good_image;
write_index_++;
}
-159
View File
@@ -1,159 +0,0 @@
#include <cstring>
#include "LiveImageAssembler.hpp"
#include "buffer_config.hpp"
#include "live_writer_config.hpp"
using namespace std;
using namespace buffer_config;
using namespace live_writer_config;
LiveImageAssembler::LiveImageAssembler(const size_t n_modules) :
n_modules_(n_modules),
image_buffer_slot_n_bytes_(MODULE_N_BYTES * n_modules_)
{
image_buffer_ = new char[WRITER_IA_N_SLOTS * image_buffer_slot_n_bytes_];
image_meta_buffer_ = new ImageMetadata[WRITER_IA_N_SLOTS];
frame_meta_buffer_ = new ModuleFrame[WRITER_IA_N_SLOTS * n_modules];
buffer_status_ = new atomic_int[WRITER_IA_N_SLOTS];
buffer_pulse_id_ = new atomic_uint64_t[WRITER_IA_N_SLOTS];
for (size_t i=0; i < WRITER_IA_N_SLOTS; i++) {
free_slot(i);
}
}
LiveImageAssembler::~LiveImageAssembler()
{
delete[] image_buffer_;
delete[] image_meta_buffer_;
}
bool LiveImageAssembler::is_slot_free(const uint64_t pulse_id)
{
auto slot_id = pulse_id % WRITER_IA_N_SLOTS;
uint64_t slot_pulse_id = IA_EMPTY_SLOT_VALUE;
if (buffer_pulse_id_[slot_id].compare_exchange_strong(
slot_pulse_id, pulse_id)) {
return true;
}
auto is_free = buffer_status_[slot_id].load(memory_order_relaxed) > 0;
return is_free && (slot_pulse_id == pulse_id);
}
bool LiveImageAssembler::is_slot_full(const uint64_t pulse_id)
{
auto slot_id = pulse_id % WRITER_IA_N_SLOTS;
return buffer_status_[slot_id].load(memory_order_relaxed) == 0;
}
size_t LiveImageAssembler::get_data_offset(
const uint64_t slot_id, const int i_module)
{
size_t slot_i_offset = slot_id * image_buffer_slot_n_bytes_;
size_t module_i_offset = i_module * MODULE_N_BYTES;
return slot_i_offset + module_i_offset;
}
size_t LiveImageAssembler::get_frame_metadata_offset(
const uint64_t slot_id, const int i_module)
{
size_t slot_m_offset = slot_id * n_modules_;
size_t module_m_offset = i_module;
return slot_m_offset + module_m_offset;
}
void LiveImageAssembler::process(
const uint64_t pulse_id,
const int i_module,
const BufferBinaryFormat* file_buffer)
{
const auto slot_id = pulse_id % WRITER_IA_N_SLOTS;
auto frame_meta_offset = get_frame_metadata_offset(slot_id, i_module);
auto image_offset = get_data_offset(slot_id, i_module);
memcpy(
&(frame_meta_buffer_[frame_meta_offset]),
&(file_buffer->metadata),
sizeof(file_buffer->metadata));
memcpy(
image_buffer_ + image_offset,
&(file_buffer->data[0]),
MODULE_N_BYTES);
buffer_status_[slot_id].fetch_sub(1, memory_order_relaxed);
}
void LiveImageAssembler::free_slot(const uint64_t pulse_id)
{
auto slot_id = pulse_id % WRITER_IA_N_SLOTS;
buffer_status_[slot_id].store(n_modules_, memory_order_relaxed);
buffer_pulse_id_[slot_id].store(IA_EMPTY_SLOT_VALUE, memory_order_relaxed);
}
ImageMetadata* LiveImageAssembler::get_metadata_buffer(const uint64_t pulse_id)
{
const auto slot_id = pulse_id % WRITER_IA_N_SLOTS;
ImageMetadata& image_meta = image_meta_buffer_[slot_id];
auto frame_meta_offset = get_frame_metadata_offset(slot_id, 0);
auto is_pulse_init = false;
image_meta.is_good_image = 1;
image_meta.pulse_id = 0;
for (size_t i_module=0; i_module < n_modules_; i_module++) {
auto& frame_meta = frame_meta_buffer_[frame_meta_offset];
frame_meta_offset += 1;
auto is_good_frame =
frame_meta.n_recv_packets == JF_N_PACKETS_PER_FRAME;
if (!is_good_frame) {
image_meta.pulse_id = 0;
continue;
}
if (!is_pulse_init) {
image_meta.pulse_id = frame_meta.pulse_id;
image_meta.frame_index = frame_meta.frame_index;
image_meta.daq_rec = frame_meta.daq_rec;
is_pulse_init = true;
}
if (image_meta.is_good_image == 1) {
if (frame_meta.pulse_id != image_meta.pulse_id) {
image_meta.is_good_image = 0;
}
if (frame_meta.frame_index != image_meta.frame_index) {
image_meta.is_good_image = 0;
}
if (frame_meta.daq_rec != image_meta.daq_rec) {
image_meta.is_good_image = 0;
}
if (frame_meta.n_recv_packets != JF_N_PACKETS_PER_FRAME) {
image_meta.is_good_image = 0;
}
}
}
return &image_meta;
}
char* LiveImageAssembler::get_data_buffer(const uint64_t pulse_id)
{
auto slot_id = pulse_id % WRITER_IA_N_SLOTS;
return image_buffer_ + (slot_id * image_buffer_slot_n_bytes_);
}
+61
View File
@@ -0,0 +1,61 @@
#include <iostream>
#include "WriterStats.hpp"
using namespace std;
using namespace chrono;
WriterStats::WriterStats(
const string& detector_name,
const size_t stats_modulo) :
detector_name_(detector_name),
stats_modulo_(stats_modulo)
{
reset_counters();
}
void WriterStats::reset_counters()
{
image_counter_ = 0;
total_buffer_write_us_ = 0;
max_buffer_write_us_ = 0;
}
void WriterStats::start_image_write()
{
stats_interval_start_ = steady_clock::now();
}
void WriterStats::end_image_write()
{
image_counter_++;
uint32_t write_us_duration = duration_cast<microseconds>(
steady_clock::now()-stats_interval_start_).count();
total_buffer_write_us_ += write_us_duration;
max_buffer_write_us_ = max(max_buffer_write_us_, write_us_duration);
if (image_counter_ == stats_modulo_) {
print_stats();
reset_counters();
}
}
void WriterStats::print_stats()
{
float avg_buffer_write_us = total_buffer_write_us_ / image_counter_;
uint64_t timestamp = time_point_cast<nanoseconds>(
system_clock::now()).time_since_epoch().count();
// Output in InfluxDB line protocol
cout << "jf_buffer_writer";
cout << ",detector_name=" << detector_name_;
cout << " ";
cout << "n_written_images=" << image_counter_ << "i";
cout << " ,avg_buffer_write_us=" << avg_buffer_write_us;
cout << ",max_buffer_write_us=" << max_buffer_write_us_ << "i";
cout << " ";
cout << timestamp;
cout << endl;
}
+23 -172
View File
@@ -1,195 +1,46 @@
#include <iostream>
#include <string>
#include <thread>
#include <chrono>
#include <vector>
#include "zmq.h"
#include <zmq.h>
#include <RamBuffer.hpp>
#include <BufferUtils.hpp>
#include "live_writer_config.hpp"
#include "buffer_config.hpp"
#include "bitshuffle/bitshuffle.h"
#include "JFH5LiveWriter.hpp"
#include "LiveImageAssembler.hpp"
#include "BinaryReader.hpp"
#include "../../jf-buffer-writer/include/BufferStats.hpp"
using namespace std;
using namespace chrono;
using namespace buffer_config;
using namespace live_writer_config;
void read_buffer(
const string detector_folder,
const string module_name,
const int i_module,
const vector<uint64_t>& pulse_ids_to_write,
LiveImageAssembler& image_assembler,
void* ctx)
{
BinaryReader reader(detector_folder, module_name);
auto frame_buffer = new BufferBinaryFormat();
void* socket = zmq_socket(ctx, ZMQ_SUB);
if (socket == nullptr) {
throw runtime_error(zmq_strerror(errno));
}
int rcvhwm = 100;
if (zmq_setsockopt(socket, ZMQ_RCVHWM, &rcvhwm, sizeof(rcvhwm)) != 0) {
throw runtime_error(zmq_strerror(errno));
}
int linger = 0;
if (zmq_setsockopt(socket, ZMQ_LINGER, &linger, sizeof(linger)) != 0) {
throw runtime_error(zmq_strerror(errno));
}
// In milliseconds.
int rcvto = 2000;
if (zmq_setsockopt(socket, ZMQ_RCVTIMEO, &rcvto, sizeof(rcvto)) != 0 ){
throw runtime_error(zmq_strerror(errno));
}
if (zmq_connect(socket, "tcp://127.0.0.1:51234") != 0) {
throw runtime_error(zmq_strerror(errno));
}
if (zmq_setsockopt(socket, ZMQ_SUBSCRIBE, "", 0) != 0) {
throw runtime_error(zmq_strerror(errno));
}
const uint64_t PULSE_ID_DELAY = 100;
uint64_t live_pulse_id = pulse_ids_to_write.front();
for (uint64_t pulse_id:pulse_ids_to_write) {
while(!image_assembler.is_slot_free(pulse_id)) {
this_thread::sleep_for(chrono::milliseconds(ASSEMBLER_RETRY_MS));
}
auto start_time = steady_clock::now();
// Enforce a delay of 1 second for writing.
while (live_pulse_id - pulse_id < PULSE_ID_DELAY) {
if (zmq_recv(socket, &live_pulse_id,
sizeof(live_pulse_id), 0) == -1) {
if (errno == EAGAIN) {
throw runtime_error("Did not receive pulse_id in time.");
} else {
throw runtime_error(zmq_strerror(errno));
}
}
}
reader.get_frame(pulse_id, frame_buffer);
auto end_time = steady_clock::now();
uint64_t read_us_duration = duration_cast<microseconds>(
end_time-start_time).count();
start_time = steady_clock::now();
image_assembler.process(pulse_id, i_module, frame_buffer);
end_time = steady_clock::now();
uint64_t compose_us_duration = duration_cast<microseconds>(
end_time-start_time).count();
cout << "sf_writer:avg_read_us ";
cout << read_us_duration / BUFFER_BLOCK_SIZE << endl;
cout << "sf_writer:avg_assemble_us ";
cout << compose_us_duration / BUFFER_BLOCK_SIZE << endl;
}
delete frame_buffer;
}
int main (int argc, char *argv[])
{
if (argc != 7) {
if (argc != 3) {
cout << endl;
cout << "Usage: sf_writer [output_file] [detector_folder] [n_modules]";
cout << " [start_pulse_id] [n_pulses] [pulse_id_step]";
cout << endl;
cout << "\toutput_file: Complete path to the output file." << endl;
cout << "\tdetector_folder: Absolute path to detector buffer." << endl;
cout << "\tn_modules: number of modules" << endl;
cout << "\tstart_pulse_id: Start pulse_id of retrieval." << endl;
cout << "\tn_pulses: Number of pulses to write." << endl;
cout << "\tpulse_id_step: 1==100Hz, 2==50hz, 4==25Hz.." << endl;
cout << "Usage: jf_live_writer [detector_json_filename]"
" [stream_name]" << endl;
cout << "\tdetector_json_filename: detector config file path." << endl;
cout << endl;
exit(-1);
}
string output_file = string(argv[1]);
const string detector_folder = string(argv[2]);
size_t n_modules = atoi(argv[3]);
uint64_t start_pulse_id = (uint64_t) atoll(argv[4]);
size_t n_pulses = (size_t) atoll(argv[5]);
int pulse_id_step = atoi(argv[6]);
std::vector<uint64_t> pulse_ids_to_write;
uint64_t i_pulse_id = start_pulse_id;
for (size_t i=0; i<n_pulses; i++) {
pulse_ids_to_write.push_back(i_pulse_id);
i_pulse_id += pulse_id_step;
}
LiveImageAssembler image_assembler(n_modules);
const auto stream_name = string(argv[2]);
auto config = BufferUtils::read_json_config(string(argv[1]));
auto ctx = zmq_ctx_new();
zmq_ctx_set (ctx, ZMQ_IO_THREADS, 1);
zmq_ctx_set(ctx, ZMQ_IO_THREADS, LIVE_ZMQ_IO_THREADS);
auto receiver = BufferUtils::connect_socket(
ctx, config.detector_name, "assembler");
std::vector<std::thread> reading_threads(n_modules);
for (size_t i_module=0; i_module<n_modules; i_module++) {
RamBuffer ram_buffer(config.detector_name, config.n_modules);
BufferStats stats(config.detector_name, stream_name, STATS_MODULO);
// TODO: Very ugly. Fix.
string module_name = "M";
if (i_module < 10) {
module_name += "0";
}
module_name += to_string(i_module);
ImageMetadata meta;
while (true) {
zmq_recv(receiver, &meta, sizeof(meta), 0);
char* data = ram_buffer.read_image(meta.pulse_id);
reading_threads.emplace_back(
read_buffer,
detector_folder,
module_name,
i_module,
ref(pulse_ids_to_write),
ref(image_assembler),
ctx);
sender.send(meta, data);
stats.record_stats(meta);
}
JFH5LiveWriter writer(output_file, detector_folder, n_modules, n_pulses);
for (uint64_t pulse_id:pulse_ids_to_write) {
while(!image_assembler.is_slot_full(pulse_id)) {
this_thread::sleep_for(chrono::milliseconds(ASSEMBLER_RETRY_MS));
}
auto metadata = image_assembler.get_metadata_buffer(pulse_id);
auto data = image_assembler.get_data_buffer(pulse_id);
auto start_time = steady_clock::now();
writer.write(metadata, data);
auto end_time = steady_clock::now();
auto write_us_duration = duration_cast<microseconds>(
end_time-start_time).count();
image_assembler.free_slot(pulse_id);
cout << "sf_writer:avg_write_us ";
cout << write_us_duration / BUFFER_BLOCK_SIZE << endl;
}
for (auto& reading_thread : reading_threads) {
if (reading_thread.joinable()) {
reading_thread.join();
}
}
return 0;
}
+4 -3
View File
@@ -9,11 +9,12 @@
class FrameStats {
const std::string detector_name_;
const int module_id_;
size_t stats_modulo_;
size_t stats_time_;
int frames_counter_;
int n_missed_packets_;
int n_corrupted_frames_;
int n_corrupted_pulse_id_;
std::chrono::time_point<std::chrono::steady_clock> stats_interval_start_;
void reset_counters();
@@ -22,8 +23,8 @@ class FrameStats {
public:
FrameStats(const std::string &detector_name,
const int module_id,
const size_t stats_modulo);
void record_stats(const ModuleFrame &meta);
const size_t stats_time);
void record_stats(const ModuleFrame &meta, const bool bad_pulse_id);
};
+15 -5
View File
@@ -7,10 +7,10 @@ using namespace chrono;
FrameStats::FrameStats(
const std::string &detector_name,
const int module_id,
const size_t stats_modulo) :
const size_t stats_time) :
detector_name_(detector_name),
module_id_(module_id),
stats_modulo_(stats_modulo)
stats_time_(stats_time)
{
reset_counters();
}
@@ -20,11 +20,17 @@ void FrameStats::reset_counters()
frames_counter_ = 0;
n_missed_packets_ = 0;
n_corrupted_frames_ = 0;
n_corrupted_pulse_id_ = 0;
stats_interval_start_ = steady_clock::now();
}
void FrameStats::record_stats(const ModuleFrame &meta)
void FrameStats::record_stats(const ModuleFrame &meta, const bool bad_pulse_id)
{
if (bad_pulse_id) {
n_corrupted_pulse_id_++;
}
if (meta.n_recv_packets < N_PACKETS_PER_FRAME) {
n_missed_packets_ += N_PACKETS_PER_FRAME - meta.n_recv_packets;
n_corrupted_frames_++;
@@ -32,7 +38,10 @@ void FrameStats::record_stats(const ModuleFrame &meta)
frames_counter_++;
if (frames_counter_ == stats_modulo_) {
auto time_passed = duration_cast<milliseconds>(
steady_clock::now()-stats_interval_start_).count();
if (time_passed >= stats_time_*1000) {
print_stats();
reset_counters();
}
@@ -48,13 +57,14 @@ void FrameStats::print_stats()
system_clock::now()).time_since_epoch().count();
// Output in InfluxDB line protocol
cout << "jf-udp-recv";
cout << "jf_udp_recv";
cout << ",detector_name=" << detector_name_;
cout << ",module_name=M" << module_id_;
cout << " ";
cout << "n_missed_packets=" << n_missed_packets_ << "i";
cout << ",n_corrupted_frames=" << n_corrupted_frames_ << "i";
cout << ",repetition_rate=" << rep_rate << "i";
cout << ",n_corrupted_pulse_ids=" << n_corrupted_pulse_id_ << "i";
cout << " ";
cout << timestamp;
cout << endl;
+3 -3
View File
@@ -92,8 +92,8 @@ inline uint64_t FrameUdpReceiver::process_packets(
init_frame(metadata, i_packet);
// Happens if the last packet from the previous frame gets lost.
// In the jungfrau_packet, pulse_id is called bunchid.
} else if (metadata.pulse_id != packet_buffer_[i_packet].bunchid) {
// In the jungfrau_packet, framenum is the trigger number (how many triggers from detector power-on) happened
} else if (metadata.frame_index != packet_buffer_[i_packet].framenum) {
packet_buffer_loaded_ = true;
// Continue on this packet.
packet_buffer_offset_ = i_packet;
@@ -186,4 +186,4 @@ uint64_t FrameUdpReceiver::get_frame_from_udp(
return pulse_id;
}
}
}
}
+25 -5
View File
@@ -39,22 +39,42 @@ int main (int argc, char *argv[]) {
const auto udp_port = config.start_udp_port + module_id;
FrameUdpReceiver receiver(udp_port, module_id);
RamBuffer buffer(config.detector_name, config.n_modules);
FrameStats stats(config.detector_name, module_id, STATS_MODULO);
FrameStats stats(config.detector_name, module_id, STATS_TIME);
auto ctx = zmq_ctx_new();
auto socket = bind_socket(ctx, config.detector_name, module_id);
auto socket = bind_socket(ctx, config.detector_name, to_string(module_id));
ModuleFrame meta;
char* data = new char[MODULE_N_BYTES];
uint64_t pulse_id_previous = 0;
uint64_t frame_index_previous = 0;
while (true) {
auto pulse_id = receiver.get_frame_from_udp(meta, data);
buffer.write_frame(meta, data);
bool bad_pulse_id = false;
zmq_send(socket, &pulse_id, sizeof(pulse_id), 0);
if ( ( meta.frame_index != (frame_index_previous+1) ) ||
( (pulse_id-pulse_id_previous) < 0 ) ||
( (pulse_id-pulse_id_previous) > 1000 ) ) {
bad_pulse_id = true;
} else {
buffer.write_frame(meta, data);
zmq_send(socket, &pulse_id, sizeof(pulse_id), 0);
}
stats.record_stats(meta, bad_pulse_id);
pulse_id_previous = pulse_id;
frame_index_previous = meta.frame_index;
stats.record_stats(meta);
}
delete[] data;
+29
View File
@@ -0,0 +1,29 @@
#ifndef SF_DAQ_BUFFER_STREAMSTATS_HPP
#define SF_DAQ_BUFFER_STREAMSTATS_HPP
#include <chrono>
#include <string>
#include <formats.hpp>
class StreamStats {
const std::string detector_name_;
const std::string stream_name_;
const size_t stats_modulo_;
int image_counter_;
int n_corrupted_images_;
std::chrono::time_point<std::chrono::steady_clock> stats_interval_start_;
void reset_counters();
void print_stats();
public:
StreamStats(const std::string &detector_name,
const std::string &stream_name,
const size_t stats_modulo);
void record_stats(const ImageMetadata &meta);
};
#endif //SF_DAQ_BUFFER_STREAMSTATS_HPP
+6 -1
View File
@@ -8,6 +8,11 @@
#include "formats.hpp"
struct PulseAndSync {
const uint64_t pulse_id;
const uint32_t n_lost_pulses;
};
class ZmqPulseSyncReceiver {
void* ctx_;
@@ -22,7 +27,7 @@ public:
const int n_modules);
~ZmqPulseSyncReceiver();
uint64_t get_next_pulse_id() const;
PulseAndSync get_next_pulse_id() const;
};
+3
View File
@@ -14,4 +14,7 @@ namespace stream_config
const int PULSE_ZMQ_SNDHWM = 100;
// Number of times we try to re-sync in case of failure.
const int SYNC_RETRY_LIMIT = 3;
// Number of pulses between each statistics print out.
const size_t STREAM_STATS_MODULO = 1000;
}
+62
View File
@@ -0,0 +1,62 @@
#include "StreamStats.hpp"
#include <iostream>
using namespace std;
using namespace chrono;
StreamStats::StreamStats(
const std::string &detector_name,
const std::string &stream_name,
const size_t stats_modulo) :
detector_name_(detector_name),
stream_name_(stream_name),
stats_modulo_(stats_modulo)
{
reset_counters();
}
void StreamStats::reset_counters()
{
image_counter_ = 0;
n_corrupted_images_ = 0;
stats_interval_start_ = steady_clock::now();
}
void StreamStats::record_stats(
const ImageMetadata &meta)
{
image_counter_++;
if (!meta.is_good_image) {
n_corrupted_images_++;
}
if (image_counter_ == stats_modulo_) {
print_stats();
reset_counters();
}
}
void StreamStats::print_stats()
{
auto interval_ms_duration = duration_cast<milliseconds>(
steady_clock::now()-stats_interval_start_).count();
// * 1000 because milliseconds, + 250 because of truncation.
int rep_rate = ((image_counter_ * 1000) + 250) / interval_ms_duration;
uint64_t timestamp = time_point_cast<nanoseconds>(
system_clock::now()).time_since_epoch().count();
// Output in InfluxDB line protocol
cout << "sf_stream";
cout << ",detector_name=" << detector_name_;
cout << ",stream_name=" << stream_name_;
cout << " ";
cout << "n_processed_images=" << image_counter_ << "i";
cout << ",n_corrupted_images=" << n_corrupted_images_ << "i";
cout << ",repetition_rate=" << rep_rate << "i";
cout << " ";
cout << timestamp;
cout << endl;
}
+14 -6
View File
@@ -27,7 +27,7 @@ ZmqPulseSyncReceiver::ZmqPulseSyncReceiver(
for (int i=0; i<n_modules_; i++) {
sockets_.push_back(
BufferUtils::connect_socket(ctx_, detector_name, i));
BufferUtils::connect_socket(ctx_, detector_name, to_string(i)));
}
}
@@ -38,7 +38,7 @@ ZmqPulseSyncReceiver::~ZmqPulseSyncReceiver()
}
}
uint64_t ZmqPulseSyncReceiver::get_next_pulse_id() const
PulseAndSync ZmqPulseSyncReceiver::get_next_pulse_id() const
{
uint64_t pulses[n_modules_];
@@ -52,12 +52,12 @@ uint64_t ZmqPulseSyncReceiver::get_next_pulse_id() const
}
if (modules_in_sync) {
return pulses[0];
return {pulses[0], 0};
}
// How many pulses we lost in total to get the next pulse_id.
uint32_t n_lost_pulses = 0;
for (int i_sync=0; i_sync < SYNC_RETRY_LIMIT; i_sync++) {
cout << "Sync attempt " << i_sync << endl;
uint64_t min_pulse_id = numeric_limits<uint64_t>::max();;
uint64_t max_pulse_id = 0;
@@ -83,18 +83,26 @@ uint64_t ZmqPulseSyncReceiver::get_next_pulse_id() const
}
modules_in_sync = true;
// Max pulses we lost in this sync attempt.
uint32_t i_sync_lost_pulses = 0;
for (int i = 0; i < n_modules_; i++) {
// How many pulses we lost for this specific module.
uint32_t i_module_lost_pulses = 0;
while (pulses[i] < max_pulse_id) {
zmq_recv(sockets_[i], &pulses[i], sizeof(uint64_t), 0);
i_module_lost_pulses++;
}
i_sync_lost_pulses = max(i_sync_lost_pulses, i_module_lost_pulses);
if (pulses[i] != max_pulse_id) {
modules_in_sync = false;
}
}
n_lost_pulses += i_sync_lost_pulses;
if (modules_in_sync) {
return pulses[0];
return {pulses[0], n_lost_pulses};
}
}
+12 -34
View File
@@ -3,11 +3,10 @@
#include <zmq.h>
#include <RamBuffer.hpp>
#include <BufferUtils.hpp>
#include <StreamStats.hpp>
#include "buffer_config.hpp"
#include "stream_config.hpp"
#include "ZmqLiveSender.hpp"
#include "ZmqPulseSyncReceiver.hpp"
using namespace std;
using namespace buffer_config;
@@ -15,57 +14,36 @@ using namespace stream_config;
int main (int argc, char *argv[])
{
if (argc != 2) {
if (argc != 3) {
cout << endl;
cout << "Usage: sf_stream [detector_json_filename]" << endl;
cout << "Usage: sf_stream [detector_json_filename]"
" [stream_name]" << endl;
cout << "\tdetector_json_filename: detector config file path." << endl;
cout << endl;
exit(-1);
}
const auto stream_name = string(argv[2]);
// TODO: Add stream_name to config reading - multiple stream definitions.
auto config = BufferUtils::read_json_config(string(argv[1]));
string RECV_IPC_URL = BUFFER_LIVE_IPC_URL + config.detector_name + "-";
auto ctx = zmq_ctx_new();
zmq_ctx_set(ctx, ZMQ_IO_THREADS, STREAM_ZMQ_IO_THREADS);
auto receiver = BufferUtils::connect_socket(
ctx, config.detector_name, "assembler");
ZmqPulseSyncReceiver receiver(ctx, config.detector_name, config.n_modules);
RamBuffer ram_buffer(config.detector_name, config.n_modules);
StreamStats stats(config.detector_name, stream_name, STREAM_STATS_MODULO);
ZmqLiveSender sender(ctx, config);
// TODO: Remove stats trash.
uint64_t last_pulse_id = 0;
uint64_t last_pulse_id_range = 0;
uint16_t n_good_images = 0;
ImageMetadata meta;
while (true) {
auto pulse_id = receiver.get_next_pulse_id();
char* data = ram_buffer.read_image(pulse_id, meta);
zmq_recv(receiver, &meta, sizeof(meta), 0);
char* data = ram_buffer.read_image(meta.pulse_id);
sender.send(meta, data);
// TODO: This logic works only at 100Hz. Fix it systematically.
uint64_t sync_lost_pulses = pulse_id - last_pulse_id;
if (last_pulse_id > 0 && sync_lost_pulses > 1) {
cout << "sf_stream:sync_lost_pulses " << sync_lost_pulses << endl;
}
last_pulse_id = pulse_id;
uint64_t curr_pulse_id_range = pulse_id / 10000;
if (last_pulse_id_range != curr_pulse_id_range) {
if (last_pulse_id_range > 0) {
cout << "sf_stream:n_good_images " << n_good_images;
cout << endl;
}
last_pulse_id_range = curr_pulse_id_range;
n_good_images = 0;
}
if (meta.is_good_image) {
n_good_images++;
}
stats.record_stats(meta);
}
}
+1
View File
@@ -11,6 +11,7 @@ add_executable(sf-writer src/main.cpp)
set_target_properties(sf-writer PROPERTIES OUTPUT_NAME sf_writer)
target_link_libraries(sf-writer
sf-writer-lib
zmq
hdf5
hdf5_hl
hdf5_cpp