Add socket to listen for current pulse_id

The live writer will listen to the stream of pulse_ids coming
from the sf-stream in order to determine the writing delay
it needs to apply. (we will wait 1 second after the data was
written to try to read it back)
This commit is contained in:
2020-07-21 13:22:34 +02:00
parent 9d9549b868
commit 3055c5bc60
+51 -3
View File
@@ -22,11 +22,44 @@ void read_buffer(
const string module_name,
const int i_module,
const vector<uint64_t>& pulse_ids_to_write,
LiveImageAssembler& image_assembler)
LiveImageAssembler& image_assembler,
void* ctx)
{
BinaryReader reader(detector_folder, module_name);
auto frame_buffer = new BufferBinaryFormat();
void* socket = zmq_socket(ctx, ZMQ_SUB);
if (socket == nullptr) {
throw runtime_error(zmq_strerror(errno));
}
int rcvhwm = 100;
if (zmq_setsockopt(socket, ZMQ_RCVHWM, &rcvhwm, sizeof(rcvhwm)) != 0) {
throw runtime_error(zmq_strerror(errno));
}
int linger = 0;
if (zmq_setsockopt(socket, ZMQ_LINGER, &linger, sizeof(linger)) != 0) {
throw runtime_error(zmq_strerror(errno));
}
// In milliseconds.
int rcvto = 2000;
if (zmq_setsockopt(socket, ZMQ_RCVTIMEO, &rcvto, sizeof(rcvto)) != 0 ){
throw runtime_error(zmq_strerror(errno));
}
if (zmq_connect(socket, "tcp://127.0.0.1:51234") != 0) {
throw runtime_error(zmq_strerror(errno));
}
if (zmq_setsockopt(socket, ZMQ_SUBSCRIBE, "", 0) != 0) {
throw runtime_error(zmq_strerror(errno));
}
const uint64_t PULSE_ID_DELAY = 100;
uint64_t live_pulse_id = pulse_ids_to_write.front();
for (uint64_t pulse_id:pulse_ids_to_write) {
while(!image_assembler.is_slot_free(pulse_id)) {
@@ -35,6 +68,18 @@ void read_buffer(
auto start_time = steady_clock::now();
// Enforce a delay of 1 second for writing.
while (live_pulse_id - pulse_id < PULSE_ID_DELAY) {
if (zmq_recv(socket, &live_pulse_id,
sizeof(live_pulse_id), 0) == -1) {
if (errno == EAGAIN) {
throw runtime_error("Did not receive pulse_id in time.");
} else {
throw runtime_error(zmq_strerror(errno));
}
}
}
reader.get_frame(pulse_id, frame_buffer);
auto end_time = steady_clock::now();
@@ -84,7 +129,6 @@ int main (int argc, char *argv[])
int pulse_id_step = atoi(argv[6]);
std::vector<uint64_t> pulse_ids_to_write;
uint64_t i_pulse_id = start_pulse_id;
for (size_t i=0; i<n_pulses; i++) {
pulse_ids_to_write.push_back(i_pulse_id);
@@ -93,6 +137,9 @@ int main (int argc, char *argv[])
LiveImageAssembler image_assembler(n_modules);
auto ctx = zmq_ctx_new();
zmq_ctx_set (ctx, ZMQ_IO_THREADS, 1);
std::vector<std::thread> reading_threads(n_modules);
for (size_t i_module=0; i_module<n_modules; i_module++) {
@@ -109,7 +156,8 @@ int main (int argc, char *argv[])
module_name,
i_module,
ref(pulse_ids_to_write),
ref(image_assembler));
ref(image_assembler),
ctx);
}
JFH5LiveWriter writer(output_file, detector_folder, n_modules, n_pulses);