From d2db741ababe26b44569d89d17f7be3f842e82fc Mon Sep 17 00:00:00 2001 From: Andrej Babic Date: Mon, 25 May 2020 16:03:04 +0200 Subject: [PATCH] Cleanup sf_replay --- sf-replay/src/main.cpp | 41 +++++++++-------------------------------- 1 file changed, 9 insertions(+), 32 deletions(-) diff --git a/sf-replay/src/main.cpp b/sf-replay/src/main.cpp index e39d149..db54320 100644 --- a/sf-replay/src/main.cpp +++ b/sf-replay/src/main.cpp @@ -1,21 +1,19 @@ -#include "ReplayH5Reader.hpp" - #include #include -#include -#include -#include #include "buffer_config.hpp" +#include "ReplayH5Reader.hpp" +#include "ReplayZmqSender.hpp" using namespace std; using namespace core_buffer; using namespace chrono; void sf_replay ( - void* socket, const string& device, const string& channel_name, + const int source_id, + const string& ipc_id, const uint64_t start_pulse_id, const uint64_t stop_pulse_id) { @@ -26,6 +24,7 @@ void sf_replay ( uint64_t n_stats = 0; ReplayH5Reader file_reader(device, channel_name); + ReplayZmqSender sender(ipc_id, source_id); // "<= stop_pulse_id" because we include the stop_pulse_id in the file. for (uint64_t curr_pulse_id=start_pulse_id; @@ -44,8 +43,7 @@ void sf_replay ( start_time = steady_clock::now(); - zmq_send(socket, m_buffer, sizeof(ModuleFrame), ZMQ_SNDMORE); - zmq_send(socket, f_buffer, MODULE_N_BYTES, 0); + sender.send(m_buffer, f_buffer); end_time = steady_clock::now(); uint64_t send_us_duration = @@ -95,31 +93,10 @@ int main (int argc, char *argv[]) { const string ipc_id = string(argv[1]); const string device = string(argv[2]); const string channel_name = string(argv[3]); - const auto source_id = (uint16_t) atoi(argv[4]); + const auto source_id = atoi(argv[4]); const auto start_pulse_id = (uint64_t) atoll(argv[5]); const auto stop_pulse_id = (uint64_t) atoll(argv[6]); - auto ipc_base = REPLAY_STREAM_IPC_URL + ipc_id + "-"; - stringstream ipc_stream; - ipc_stream << ipc_base << (int)source_id; - const auto ipc_address = ipc_stream.str(); - - auto ctx = zmq_ctx_new(); - auto socket = zmq_socket(ctx, ZMQ_PUSH); - - const int sndhwm = REPLAY_SNDHWM; - if (zmq_setsockopt(socket, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)) != 0) - throw runtime_error(strerror (errno)); - - const int linger_ms = -1; - if (zmq_setsockopt(socket, ZMQ_LINGER, &linger_ms, sizeof(linger_ms)) != 0) - throw runtime_error(strerror (errno)); - - if (zmq_bind(socket, ipc_address.c_str()) != 0) - throw runtime_error(strerror (errno)); - - sf_replay(socket, device, channel_name, start_pulse_id, stop_pulse_id); - - zmq_close(socket); - zmq_ctx_destroy(ctx); + sf_replay(device, channel_name, source_id, ipc_id, + start_pulse_id, stop_pulse_id); }