diff --git a/jf-live-writer/src/main.cpp b/jf-live-writer/src/main.cpp index f4ffd8d..139a34f 100644 --- a/jf-live-writer/src/main.cpp +++ b/jf-live-writer/src/main.cpp @@ -22,11 +22,44 @@ void read_buffer( const string module_name, const int i_module, const vector& pulse_ids_to_write, - LiveImageAssembler& image_assembler) + LiveImageAssembler& image_assembler, + void* ctx) { BinaryReader reader(detector_folder, module_name); auto frame_buffer = new BufferBinaryFormat(); + void* socket = zmq_socket(ctx, ZMQ_SUB); + if (socket == nullptr) { + throw runtime_error(zmq_strerror(errno)); + } + + int rcvhwm = 100; + if (zmq_setsockopt(socket, ZMQ_RCVHWM, &rcvhwm, sizeof(rcvhwm)) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + int linger = 0; + if (zmq_setsockopt(socket, ZMQ_LINGER, &linger, sizeof(linger)) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + // In milliseconds. + int rcvto = 2000; + if (zmq_setsockopt(socket, ZMQ_RCVTIMEO, &rcvto, sizeof(rcvto)) != 0 ){ + throw runtime_error(zmq_strerror(errno)); + } + + if (zmq_connect(socket, "tcp://127.0.0.1:51234") != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + if (zmq_setsockopt(socket, ZMQ_SUBSCRIBE, "", 0) != 0) { + throw runtime_error(zmq_strerror(errno)); + } + + const uint64_t PULSE_ID_DELAY = 100; + + uint64_t live_pulse_id = pulse_ids_to_write.front(); for (uint64_t pulse_id:pulse_ids_to_write) { while(!image_assembler.is_slot_free(pulse_id)) { @@ -35,6 +68,18 @@ void read_buffer( auto start_time = steady_clock::now(); + // Enforce a delay of 1 second for writing. + while (live_pulse_id - pulse_id < PULSE_ID_DELAY) { + if (zmq_recv(socket, &live_pulse_id, + sizeof(live_pulse_id), 0) == -1) { + if (errno == EAGAIN) { + throw runtime_error("Did not receive pulse_id in time."); + } else { + throw runtime_error(zmq_strerror(errno)); + } + } + } + reader.get_frame(pulse_id, frame_buffer); auto end_time = steady_clock::now(); @@ -84,7 +129,6 @@ int main (int argc, char *argv[]) int pulse_id_step = atoi(argv[6]); std::vector pulse_ids_to_write; - uint64_t i_pulse_id = start_pulse_id; for (size_t i=0; i reading_threads(n_modules); for (size_t i_module=0; i_module