mirror of
https://github.com/paulscherrerinstitute/sf_daq_buffer.git
synced 2026-04-29 06:02:24 +02:00
Remove sf_replay as its not needed anymore
This commit is contained in:
@@ -30,6 +30,5 @@ add_subdirectory(
|
||||
|
||||
add_subdirectory("core-buffer")
|
||||
add_subdirectory("sf-buffer")
|
||||
add_subdirectory("sf-replay")
|
||||
add_subdirectory("sf-stream")
|
||||
add_subdirectory("sf-writer")
|
||||
@@ -1,21 +0,0 @@
|
||||
file(GLOB SOURCES
|
||||
src/*.cpp)
|
||||
|
||||
add_library(sf-replay-lib STATIC ${SOURCES})
|
||||
target_include_directories(sf-replay-lib PUBLIC include/)
|
||||
target_link_libraries(sf-replay-lib
|
||||
external
|
||||
core-buffer-lib)
|
||||
|
||||
add_executable(sf-replay src/main.cpp)
|
||||
set_target_properties(sf-replay PROPERTIES OUTPUT_NAME sf_replay)
|
||||
target_link_libraries(sf-replay
|
||||
core-buffer-lib
|
||||
sf-replay-lib
|
||||
zmq
|
||||
hdf5
|
||||
hdf5_cpp
|
||||
pthread)
|
||||
|
||||
enable_testing()
|
||||
add_subdirectory(test/)
|
||||
@@ -1,23 +0,0 @@
|
||||
#ifndef SF_DAQ_BUFFER_BLOCKZMQSENDER_HPP
|
||||
#define SF_DAQ_BUFFER_BLOCKZMQSENDER_HPP
|
||||
|
||||
#include <string>
|
||||
#include <jungfrau.hpp>
|
||||
#include <formats.hpp>
|
||||
|
||||
class BlockZmqSender {
|
||||
|
||||
void* ctx_;
|
||||
void* socket_;
|
||||
|
||||
public:
|
||||
BlockZmqSender(const std::string& ipc_id, const int source_id);
|
||||
virtual ~BlockZmqSender();
|
||||
|
||||
void close();
|
||||
|
||||
void send(const BufferBinaryBlock* block_data);
|
||||
};
|
||||
|
||||
|
||||
#endif //SF_DAQ_BUFFER_BLOCKZMQSENDER_HPP
|
||||
@@ -1,47 +0,0 @@
|
||||
#include "BlockZmqSender.hpp"
|
||||
|
||||
#include <sstream>
|
||||
#include <zmq.h>
|
||||
|
||||
#include "buffer_config.hpp"
|
||||
|
||||
using namespace std;
|
||||
using namespace core_buffer;
|
||||
|
||||
|
||||
BlockZmqSender::BlockZmqSender(const string& ipc_id, const int source_id)
|
||||
{
|
||||
auto ipc_base = REPLAY_STREAM_IPC_URL + ipc_id + "-";
|
||||
stringstream ipc_stream;
|
||||
ipc_stream << ipc_base << source_id;
|
||||
const auto ipc_address = ipc_stream.str();
|
||||
|
||||
ctx_ = zmq_ctx_new();
|
||||
socket_ = zmq_socket(ctx_, ZMQ_PUSH);
|
||||
|
||||
const int sndhwm = REPLAY_SNDHWM;
|
||||
if (zmq_setsockopt(socket_, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)) != 0)
|
||||
throw runtime_error(zmq_strerror (errno));
|
||||
|
||||
const int linger_ms = -1;
|
||||
if (zmq_setsockopt(socket_, ZMQ_LINGER, &linger_ms, sizeof(linger_ms)) != 0)
|
||||
throw runtime_error(zmq_strerror (errno));
|
||||
|
||||
if (zmq_bind(socket_, ipc_address.c_str()) != 0)
|
||||
throw runtime_error(zmq_strerror (errno));
|
||||
}
|
||||
|
||||
BlockZmqSender::~BlockZmqSender()
|
||||
{
|
||||
close();
|
||||
}
|
||||
|
||||
void BlockZmqSender::close() {
|
||||
zmq_close(socket_);
|
||||
zmq_ctx_destroy(ctx_);
|
||||
}
|
||||
|
||||
void BlockZmqSender::send(const BufferBinaryBlock* block_data)
|
||||
{
|
||||
zmq_send(socket_, block_data, sizeof(BufferBinaryBlock), ZMQ_SNDMORE);
|
||||
}
|
||||
@@ -1,137 +0,0 @@
|
||||
#include <iostream>
|
||||
#include <thread>
|
||||
|
||||
#include "FastQueue.hpp"
|
||||
#include "buffer_config.hpp"
|
||||
#include "BufferBinaryReader.hpp"
|
||||
#include "BlockZmqSender.hpp"
|
||||
|
||||
using namespace std;
|
||||
using namespace core_buffer;
|
||||
using namespace chrono;
|
||||
|
||||
void sf_replay (
|
||||
const string device,
|
||||
const string channel_name,
|
||||
FastQueue<BufferBinaryBlock>& queue,
|
||||
const uint64_t start_pulse_id,
|
||||
const uint64_t stop_pulse_id
|
||||
)
|
||||
{
|
||||
BufferBinaryReader block_reader(device, channel_name);
|
||||
|
||||
uint64_t start_block = start_pulse_id / BUFFER_BLOCK_SIZE;
|
||||
uint64_t stop_block = stop_pulse_id / BUFFER_BLOCK_SIZE;
|
||||
|
||||
// "<= stop_block" because we include the stop_block in the transfer.
|
||||
for (uint64_t curr_block=start_block;
|
||||
curr_block <= stop_block;
|
||||
curr_block++) {
|
||||
|
||||
int slot_id;
|
||||
while((slot_id = queue.reserve()) == -1) {
|
||||
this_thread::sleep_for(chrono::milliseconds(
|
||||
RB_READ_RETRY_INTERVAL_MS));
|
||||
}
|
||||
|
||||
auto start_time = steady_clock::now();
|
||||
|
||||
auto block_buffer = queue.get_metadata_buffer(slot_id);
|
||||
|
||||
block_reader.get_block(curr_block, block_buffer);
|
||||
|
||||
auto end_time = steady_clock::now();
|
||||
uint64_t read_us_duration = duration_cast<microseconds>(
|
||||
end_time-start_time).count();
|
||||
|
||||
queue.commit();
|
||||
|
||||
// TODO: Proper statistics
|
||||
cout << "sf_replay:avg_read_us ";
|
||||
cout << read_us_duration / BUFFER_BLOCK_SIZE << endl;
|
||||
}
|
||||
}
|
||||
|
||||
int main (int argc, char *argv[]) {
|
||||
|
||||
if (argc != 7) {
|
||||
cout << endl;
|
||||
cout << "Usage: sf_replay [ipc_id] [device]";
|
||||
cout << " [channel_name] [source_id] [start_pulse_id] [stop_pulse_id]";
|
||||
cout << endl;
|
||||
cout << "\tdevice: Name of detector." << endl;
|
||||
cout << "\tchannel_name: M00-M31 for JF16M." << endl;
|
||||
cout << "\tsource_id: Module index" << endl;
|
||||
cout << "\tstart_pulse_id: Start pulse_id of retrieval." << endl;
|
||||
cout << "\tstop_pulse_id: Stop pulse_id of retrieval." << endl;
|
||||
cout << endl;
|
||||
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
const string ipc_id = string(argv[1]);
|
||||
const string device = string(argv[2]);
|
||||
const string channel_name = string(argv[3]);
|
||||
const auto source_id = atoi(argv[4]);
|
||||
const auto start_pulse_id = (uint64_t) atoll(argv[5]);
|
||||
const auto stop_pulse_id = (uint64_t) atoll(argv[6]);
|
||||
|
||||
// 0 bytes for data since everything is in the header.
|
||||
FastQueue<BufferBinaryBlock> queue(0, REPLAY_FASTQUEUE_N_SLOTS);
|
||||
|
||||
thread file_read_thread(sf_replay,
|
||||
device, channel_name, ref(queue),
|
||||
start_pulse_id, stop_pulse_id);
|
||||
|
||||
uint64_t send_us = 0;
|
||||
uint64_t max_send_us = 0;
|
||||
uint64_t n_stats = 0;
|
||||
|
||||
BlockZmqSender sender(ipc_id, source_id);
|
||||
|
||||
uint64_t start_block = start_pulse_id / BUFFER_BLOCK_SIZE;
|
||||
uint64_t stop_block = stop_pulse_id / BUFFER_BLOCK_SIZE;
|
||||
|
||||
// "<= stop_block" because we include the stop_block in the transfer.
|
||||
for (uint64_t curr_block=start_block;
|
||||
curr_block <= stop_block;
|
||||
curr_block++) {
|
||||
|
||||
int slot_id;
|
||||
while((slot_id = queue.read()) == -1) {
|
||||
this_thread::sleep_for(chrono::milliseconds(
|
||||
RB_READ_RETRY_INTERVAL_MS));
|
||||
}
|
||||
|
||||
auto block_buffer = queue.get_metadata_buffer(slot_id);
|
||||
|
||||
auto start_time = steady_clock::now();
|
||||
|
||||
sender.send(block_buffer);
|
||||
|
||||
auto end_time = steady_clock::now();
|
||||
uint64_t send_us_duration =
|
||||
duration_cast<microseconds>(end_time-start_time).count();
|
||||
|
||||
queue.release();
|
||||
|
||||
// TODO: Proper statistics
|
||||
n_stats++;
|
||||
|
||||
send_us += send_us_duration;
|
||||
max_send_us = max(max_send_us, send_us_duration);
|
||||
|
||||
if (n_stats == STATS_MODULO) {
|
||||
cout << "sf_replay:avg_send_us " << send_us / STATS_MODULO;
|
||||
cout << " sf_replay:max_send_us " << max_send_us;
|
||||
cout << endl;
|
||||
|
||||
n_stats = 0;
|
||||
send_us = 0;
|
||||
max_send_us = 0;
|
||||
}
|
||||
}
|
||||
|
||||
file_read_thread.join();
|
||||
return 0;
|
||||
}
|
||||
@@ -1,10 +0,0 @@
|
||||
add_executable(sf-replay-tests main.cpp)
|
||||
|
||||
target_link_libraries(sf-replay-tests
|
||||
core-buffer-lib
|
||||
sf-buffer-lib
|
||||
sf-replay-lib
|
||||
hdf5
|
||||
hdf5_cpp
|
||||
gtest
|
||||
)
|
||||
@@ -1,9 +0,0 @@
|
||||
#include "gtest/gtest.h"
|
||||
#include "test_ReplayH5Reader.cpp"
|
||||
|
||||
using namespace std;
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
::testing::InitGoogleTest(&argc, argv);
|
||||
return RUN_ALL_TESTS();
|
||||
}
|
||||
@@ -1,73 +0,0 @@
|
||||
#include <gtest/gtest.h>
|
||||
#include <thread>
|
||||
|
||||
#include "ReplayH5Reader.hpp"
|
||||
#include "BufferH5Writer.hpp"
|
||||
|
||||
using namespace std;
|
||||
using namespace core_buffer;
|
||||
|
||||
TEST(ReplayH5Reader, basic_interaction)
|
||||
{
|
||||
auto root_folder = ".";
|
||||
auto device_name = "fast_device";
|
||||
size_t pulse_id = 65;
|
||||
uint16_t source_id = 124;
|
||||
|
||||
// This 2 must be compatible by design.
|
||||
BufferH5Writer writer(root_folder, device_name);
|
||||
ReplayH5Reader reader(root_folder, device_name);
|
||||
|
||||
ModuleFrame w_metadata;
|
||||
ModuleFrame* r_metadata;
|
||||
auto w_frame_buffer = make_unique<uint16_t[]>(MODULE_N_PIXELS);
|
||||
char* r_frame_buffer;
|
||||
|
||||
// Setup test values.
|
||||
w_metadata.pulse_id = pulse_id;
|
||||
w_metadata.frame_index = 2;
|
||||
w_metadata.daq_rec = 3;
|
||||
w_metadata.n_received_packets = 128;
|
||||
w_metadata.module_id = source_id;
|
||||
|
||||
for (size_t i=0; i<MODULE_N_PIXELS; i++) {
|
||||
w_frame_buffer[i] = i % 100;
|
||||
}
|
||||
|
||||
// Write to file.
|
||||
writer.set_pulse_id(pulse_id);
|
||||
writer.write(&w_metadata, (char*)&(w_frame_buffer[0]));
|
||||
writer.close_file();
|
||||
|
||||
reader.get_buffer(pulse_id, r_metadata, r_frame_buffer);
|
||||
|
||||
ASSERT_EQ(r_metadata->pulse_id, pulse_id);
|
||||
ASSERT_EQ(r_metadata->module_id, source_id);
|
||||
ASSERT_EQ(r_metadata->frame_index, 2);
|
||||
ASSERT_EQ(r_metadata->daq_rec, 3);
|
||||
ASSERT_EQ(r_metadata->n_received_packets, 128);
|
||||
|
||||
// Data as well.
|
||||
auto offset = MODULE_N_PIXELS * (pulse_id-1);
|
||||
for (size_t i=0; i<MODULE_N_PIXELS; i++) {
|
||||
w_frame_buffer[i] = r_frame_buffer[offset + i];
|
||||
}
|
||||
|
||||
for (uint64_t i_pulse=0; i_pulse<100; i_pulse++) {
|
||||
|
||||
// Verify that all but the saved pulse_id are zero.
|
||||
if (i_pulse == pulse_id) {
|
||||
continue;
|
||||
}
|
||||
|
||||
reader.get_buffer(i_pulse, r_metadata, r_frame_buffer);
|
||||
|
||||
ASSERT_EQ(r_metadata->pulse_id, 0);
|
||||
ASSERT_EQ(r_metadata->frame_index, 0);
|
||||
ASSERT_EQ(r_metadata->daq_rec, 0);
|
||||
ASSERT_EQ(r_metadata->n_received_packets, 0);
|
||||
ASSERT_EQ(r_metadata->module_id, 0);
|
||||
}
|
||||
|
||||
reader.close_file();
|
||||
}
|
||||
Reference in New Issue
Block a user