diff --git a/sf-buffer/src/sf_replay.cpp b/sf-buffer/src/sf_replay.cpp index e64669c..66d9d8b 100644 --- a/sf-buffer/src/sf_replay.cpp +++ b/sf-buffer/src/sf_replay.cpp @@ -47,11 +47,11 @@ int main (int argc, char *argv[]) { auto ctx = zmq_ctx_new(); auto socket = zmq_socket(ctx, ZMQ_PUSH); - auto meta_socket = zmq_socket(ctx, ZMQ_PULL); -// -// if (zmq_setsockopt(meta_socket, ZMQ_SUBSCRIBE, nullptr, 0) != 0) { -// throw runtime_error(strerror (errno)); -// } + + + if (zmq_setsockopt(meta_socket, ZMQ_SUBSCRIBE, nullptr, 0) != 0) { + throw runtime_error(strerror (errno)); + } int status = 0; @@ -70,16 +70,22 @@ int main (int argc, char *argv[]) { if (zmq_connect(socket, "ipc://writer") != 0) { throw runtime_error(strerror (errno)); } - //TODO: Use ipc? - if (zmq_connect(socket, "ipc://writer_meta") != 0) { + + + auto meta_socket = zmq_socket(ctx, ZMQ_SUB); + if (zmq_connect(socket, "ipc://metadata") != 0) { throw runtime_error(strerror (errno)); } - - cout << "receiving " << endl; - uint64_t response; - zmq_recv(meta_socket, &response, sizeof(response), 0); - cout << "Done!! " << response << endl; + if (zmq_setsockopt(meta_socket, ZMQ_SUBSCRIBE, "", 0) != 0) { + throw runtime_error(strerror (errno)); + } + while (true) { + cout << "receiving " << endl; + uint64_t response; + zmq_recv(meta_socket, &response, sizeof(response), 0); + cout << "Done!! " << response << endl; + } for (const auto& suffix:path_suffixes) { metadata_buffer->start_pulse_id = suffix.start_pulse_id; diff --git a/sf-writer/sf_h5_writer.cpp b/sf-writer/sf_h5_writer.cpp index da26db8..d820926 100644 --- a/sf-writer/sf_h5_writer.cpp +++ b/sf-writer/sf_h5_writer.cpp @@ -36,7 +36,7 @@ int main (int argc, char *argv[]) zmq_ctx_set (ctx, ZMQ_IO_THREADS, 16); auto socket = zmq_socket(ctx, ZMQ_PULL); - auto meta_socket = zmq_socket(ctx, ZMQ_PUSH); + int rcvhwm = 1000; if (zmq_setsockopt(socket, ZMQ_RCVHWM, &rcvhwm, sizeof(rcvhwm)) != 0) { @@ -52,15 +52,25 @@ int main (int argc, char *argv[]) throw runtime_error(strerror (errno)); } - if (zmq_bind(meta_socket, "ipc://writer_meta") != 0) { + + + auto meta_socket = zmq_socket(ctx, ZMQ_PUB); + if (zmq_bind(meta_socket, "ipc://metadata") != 0) { throw runtime_error(strerror (errno)); } - this_thread::sleep_for(chrono::milliseconds(5000)); + while(true) { + string test = "test"; + auto c_test = test.c_str(); + zmq_send(meta_socket, c_test, strlen(c_test), 0); + cout << "sent test" << endl; + this_thread::sleep_for(chrono::milliseconds(100)); + } + + uint64_t test = 12; zmq_send(meta_socket, &test, sizeof(test),0); - cout << "sent pub" << endl; auto metadata_buffer = make_unique();