removing sem_wait in acquire (#182)

This commit is contained in:
Dhanya Thattil
2020-09-17 13:55:20 +02:00
committed by GitHub
parent bf69951456
commit b0dd82c667
2 changed files with 16 additions and 68 deletions

View File

@ -435,7 +435,6 @@ void DetectorImpl::readFrameFromReceiver() {
bool gapPixels = multi_shm()->gapPixels; bool gapPixels = multi_shm()->gapPixels;
LOG(logDEBUG) << "Gap pixels: " << gapPixels; LOG(logDEBUG) << "Gap pixels: " << gapPixels;
int nX = 0; int nX = 0;
int nY = 0; int nY = 0;
int nDetPixelsX = 0; int nDetPixelsX = 0;
@ -464,7 +463,6 @@ void DetectorImpl::readFrameFromReceiver() {
runningList[i] = false; runningList[i] = false;
} }
} }
int numConnected = numRunning;
bool data = false; bool data = false;
bool completeImage = false; bool completeImage = false;
char *image = nullptr; char *image = nullptr;
@ -482,14 +480,7 @@ void DetectorImpl::readFrameFromReceiver() {
uint32_t currentSubFrameIndex = -1, coordX = -1, coordY = -1, uint32_t currentSubFrameIndex = -1, coordX = -1, coordY = -1,
flippedDataX = -1; flippedDataX = -1;
// wait for real time acquisition to start while (numRunning != 0) {
bool running = true;
sem_wait(&sem_newRTAcquisition);
if (getJoinThreadFlag()) {
running = false;
}
while (running) {
// reset data // reset data
data = false; data = false;
if (multiframe != nullptr) { if (multiframe != nullptr) {
@ -658,26 +649,6 @@ void DetectorImpl::readFrameFromReceiver() {
pCallbackArg); pCallbackArg);
delete thisData; delete thisData;
} }
// all done
if (numRunning == 0) {
// let main thread know that all dummy packets have been received
//(also from external process),
// main thread can now proceed to measurement finished call back
sem_post(&sem_endRTAcquisition);
// wait for next scan/measurement, else join thread
sem_wait(&sem_newRTAcquisition);
// done with complete acquisition
if (getJoinThreadFlag()) {
running = false;
} else {
// starting a new scan/measurement (got dummy data)
for (size_t i = 0; i < zmqSocket.size(); ++i) {
runningList[i] = connectList[i];
}
numRunning = numConnected;
}
}
} }
// Disconnect resources // Disconnect resources
@ -1030,17 +1001,11 @@ int DetectorImpl::acquire() {
struct timespec begin, end; struct timespec begin, end;
clock_gettime(CLOCK_REALTIME, &begin); clock_gettime(CLOCK_REALTIME, &begin);
// in the real time acquisition loop, processing thread will wait for a
// post each time
sem_init(&sem_newRTAcquisition, 1, 0);
// in the real time acquistion loop, main thread will wait for
// processing thread to be done each time (which in turn waits for
// receiver/ext process)
sem_init(&sem_endRTAcquisition, 1, 0);
bool receiver = Parallel(&Module::getUseReceiverFlag, {}).squash(false); bool receiver = Parallel(&Module::getUseReceiverFlag, {}).squash(false);
setJoinThreadFlag(false); if (dataReady == nullptr) {
setJoinThreadFlag(false);
}
// verify receiver is idle // verify receiver is idle
if (receiver) { if (receiver) {
@ -1050,13 +1015,11 @@ int DetectorImpl::acquire() {
} }
} }
startProcessingThread(); startProcessingThread(receiver);
// start receiver // start receiver
if (receiver) { if (receiver) {
Parallel(&Module::startReceiver, {}); Parallel(&Module::startReceiver, {});
// let processing thread listen to these packets
sem_post(&sem_newRTAcquisition);
} }
// start and read all // start and read all
@ -1071,18 +1034,13 @@ int DetectorImpl::acquire() {
// stop receiver // stop receiver
if (receiver) { if (receiver) {
Parallel(&Module::stopReceiver, {}); Parallel(&Module::stopReceiver, {});
if (dataReady != nullptr) {
sem_wait(&sem_endRTAcquisition); // waits for receiver's
}
// external process to be
// done sending data to gui
Parallel(&Module::incrementFileIndex, {}); Parallel(&Module::incrementFileIndex, {});
} }
// waiting for the data processing thread to finish! // let the progress thread (no callback) know acquisition is done
setJoinThreadFlag(true); if (dataReady == nullptr) {
sem_post(&sem_newRTAcquisition); setJoinThreadFlag(true);
}
dataProcessingThread.join(); dataProcessingThread.join();
if (acquisition_finished != nullptr) { if (acquisition_finished != nullptr) {
@ -1092,9 +1050,6 @@ int DetectorImpl::acquire() {
acquisition_finished(progress, status, acqFinished_p); acquisition_finished(progress, status, acqFinished_p);
} }
sem_destroy(&sem_newRTAcquisition);
sem_destroy(&sem_endRTAcquisition);
clock_gettime(CLOCK_REALTIME, &end); clock_gettime(CLOCK_REALTIME, &end);
LOG(logDEBUG1) << "Elapsed time for acquisition:" LOG(logDEBUG1) << "Elapsed time for acquisition:"
<< ((end.tv_sec - begin.tv_sec) + << ((end.tv_sec - begin.tv_sec) +
@ -1115,12 +1070,13 @@ void DetectorImpl::printProgress(double progress) {
std::cout << '\r' << std::flush; std::cout << '\r' << std::flush;
} }
void DetectorImpl::startProcessingThread() { void DetectorImpl::startProcessingThread(bool receiver) {
dataProcessingThread = std::thread(&DetectorImpl::processData, this); dataProcessingThread =
std::thread(&DetectorImpl::processData, this, receiver);
} }
void DetectorImpl::processData() { void DetectorImpl::processData(bool receiver) {
if (Parallel(&Module::getUseReceiverFlag, {}).squash(false)) { if (receiver) {
if (dataReady != nullptr) { if (dataReady != nullptr) {
readFrameFromReceiver(); readFrameFromReceiver();
} }

View File

@ -276,7 +276,7 @@ class DetectorImpl : public virtual slsDetectorDefs {
* Combines data from all readouts and gives it to the gui * Combines data from all readouts and gives it to the gui
* or just gives progress of acquisition by polling receivers * or just gives progress of acquisition by polling receivers
*/ */
void processData(); void processData(bool receiver);
/** /**
* Convert raw file * Convert raw file
@ -352,7 +352,7 @@ class DetectorImpl : public virtual slsDetectorDefs {
void printProgress(double progress); void printProgress(double progress);
void startProcessingThread(); void startProcessingThread(bool receiver);
/** /**
* Check if processing thread is ready to join main thread * Check if processing thread is ready to join main thread
@ -387,14 +387,6 @@ class DetectorImpl : public virtual slsDetectorDefs {
/** ZMQ Socket - Receiver to Client */ /** ZMQ Socket - Receiver to Client */
std::vector<std::unique_ptr<ZmqSocket>> zmqSocket; std::vector<std::unique_ptr<ZmqSocket>> zmqSocket;
/** semaphore to let postprocessing thread continue for next
* scan/measurement */
sem_t sem_newRTAcquisition;
/** semaphore to let main thread know it got all the dummy packets (also
* from ext. process) */
sem_t sem_endRTAcquisition;
/** mutex to synchronize main and data processing threads */ /** mutex to synchronize main and data processing threads */
mutable std::mutex mp; mutable std::mutex mp;