From d38108e5272d28d380f5c594cc01e74ac29e7393 Mon Sep 17 00:00:00 2001 From: Dhanya Thattil Date: Mon, 23 Apr 2018 17:32:40 +0200 Subject: [PATCH] esrf changes: slsReceiver: extend CircularFifo to support blocking/non-blocking transfers: * Add blocking push (for performance) and non-blocking pop (for symmetry), default to blocking operations * Fix memory fault if Fifo allocation fails * Fix fifoFree initialisation to fifoSize elements (was fifoSize - 1) --- slsReceiverSoftware/include/circularFifo.h | 78 ++++++++++++------- slsReceiverSoftware/src/Fifo.cpp | 31 +++----- .../src/UDPStandardImplementation.cpp | 2 +- 3 files changed, 60 insertions(+), 51 deletions(-) diff --git a/slsReceiverSoftware/include/circularFifo.h b/slsReceiverSoftware/include/circularFifo.h index ab00444c9..580e11654 100644 --- a/slsReceiverSoftware/include/circularFifo.h +++ b/slsReceiverSoftware/include/circularFifo.h @@ -28,36 +28,53 @@ public: CircularFifo(unsigned int Size) : tail(0), head(0){ Capacity = Size + 1; array.resize(Capacity); - sem_init(&free_mutex,0,0); + sem_init(&data_mutex,0,0); + sem_init(&free_mutex,0,Size); } virtual ~CircularFifo() { - sem_destroy(&free_mutex); + sem_destroy(&data_mutex); + sem_destroy(&free_mutex); } - bool push(Element*& item_); - bool pop(Element*& item_); + bool push(Element*& item_, bool no_block=false); + bool pop(Element*& item_, bool no_block=false); bool isEmpty() const; bool isFull() const; - int getSemValue(); + int getDataValue() const; + int getFreeValue() const; private: - volatile unsigned int tail; // input index vector array; - volatile unsigned int head; // output index + unsigned int tail; // input index + unsigned int head; // output index unsigned int Capacity; - sem_t free_mutex; +//#ifdef __cplusplus >= 201103L + mutable sem_t data_mutex; + mutable sem_t free_mutex; +//#else +// sem_t free_mutex; + +//#endif unsigned int increment(unsigned int idx_) const; }; template -int CircularFifo::getSemValue() +int CircularFifo::getDataValue() const { - int value; - sem_getvalue(&free_mutex, &value); - return value; + int value; + sem_getvalue(&data_mutex, &value); + return value; +} + +template +int CircularFifo::getFreeValue() const +{ + int value; + sem_getvalue(&free_mutex, &value); + return value; } @@ -66,21 +83,20 @@ int CircularFifo::getSemValue() * will happen, it is up to the caller to handle this case * * \param item_ copy by reference the input item +* \param no_block if true, return immediately if fifo is full * \return whether operation was successful or not */ template -bool CircularFifo::push(Element*& item_) +bool CircularFifo::push(Element*& item_, bool no_block) { - unsigned int nextTail = increment(tail); + // check for fifo full + if (no_block && isFull()) + return false; - if(nextTail != head) - { - array[tail] = item_; - tail = nextTail; - sem_post(&free_mutex); - return true; - } - // queue was full - return false; + sem_wait(&free_mutex); + array[tail] = item_; + tail = increment(tail); + sem_post(&data_mutex); + return true; } /** Consumer only: Removes and returns item from the queue @@ -88,15 +104,19 @@ bool CircularFifo::push(Element*& item_) * It is up to the caller to handle this case * * \param item_ return by reference the wanted item +* \param no_block if true, return immediately if fifo is full * \return whether operation was successful or not */ template -bool CircularFifo::pop(Element*& item_) +bool CircularFifo::pop(Element*& item_, bool no_block) { - sem_wait(&free_mutex); + // check for fifo empty + if (no_block && isEmpty()) + return false; + sem_wait(&data_mutex); item_ = array[head]; head = increment(head); - + sem_post(&free_mutex); return true; } @@ -108,7 +128,7 @@ bool CircularFifo::pop(Element*& item_) template bool CircularFifo::isEmpty() const { - return (head == tail); + return (getDataValue() == 0); } /** Useful for testing and Producer check of status @@ -119,9 +139,7 @@ bool CircularFifo::isEmpty() const template bool CircularFifo::isFull() const { - int tailCheck = increment(tail); - //int tailCheck = (tail+1) % Capacity; - return (tailCheck == head); + return (getFreeValue() == 0); } /** Increment helper function for index of the circular queue diff --git a/slsReceiverSoftware/src/Fifo.cpp b/slsReceiverSoftware/src/Fifo.cpp index 2c071356a..ef0ffe57f 100644 --- a/slsReceiverSoftware/src/Fifo.cpp +++ b/slsReceiverSoftware/src/Fifo.cpp @@ -50,28 +50,24 @@ int Fifo::CreateFifos(uint32_t fifoItemSize) { fifoFree = new CircularFifo(fifoDepth); fifoStream = new CircularFifo(fifoDepth); //allocate memory - memory = (char*) calloc (fifoItemSize * fifoDepth, sizeof(char)); - memset(memory,0,fifoItemSize * fifoDepth* sizeof(char)); + size_t mem_len = fifoItemSize * fifoDepth * sizeof(char); + memory = (char*) malloc (mem_len); if (memory == NULL){ FILE_LOG(logERROR) << "Could not allocate memory for fifos"; - memory = 0; return FAIL; } - FILE_LOG(logDEBUG) << "Memory Allocated " << index << ": " << (fifoItemSize * fifoDepth) << " bytes"; + memset(memory, 0, mem_len); + FILE_LOG(logDEBUG) << "Memory Allocated " << index << ": " << mem_len << " bytes"; { //push free addresses into fifoFree fifo char* buffer = memory; - while (buffer < (memory + fifoItemSize * (fifoDepth-1))) { + for (int i = 0; i < fifoDepth; ++i) { //sprintf(buffer,"memory"); -#ifdef FIFODEBUG - cprintf(MAGENTA,"Fifofree %d: value:%d, pop 0x%p\n", index, fifoFree->getSemValue(), (void*)(buffer)); -#endif FreeAddress(buffer); buffer += fifoItemSize; } } - - FILE_LOG(logDEBUG) << "Fifo Reconstructed Depth " << index << ": " << fifoDepth; + FILE_LOG(logINFO) << "Fifo " << index << " reconstructed Depth (rx_fifodepth): " << fifoFree->getDataValue(); return OK; } @@ -79,7 +75,6 @@ int Fifo::CreateFifos(uint32_t fifoItemSize) { void Fifo::DestroyFifos(){ FILE_LOG(logDEBUG) << __AT__ << " called"; - if(memory) { free(memory); memory = 0; @@ -100,25 +95,22 @@ void Fifo::DestroyFifos(){ void Fifo::FreeAddress(char*& address) { - while(!fifoFree->push(address)); + fifoFree->push(address); } void Fifo::GetNewAddress(char*& address) { - int temp = fifoFree->getSemValue(); + int temp = fifoFree->getDataValue(); if (temp < status_fifoFree) status_fifoFree = temp; fifoFree->pop(address); - /*temp = fifoFree->getSemValue(); - if (temp < status_fifoFree) - status_fifoFree = temp;*/ } void Fifo::PushAddress(char*& address) { - int temp = fifoBound->getSemValue(); + int temp = fifoBound->getDataValue(); if (temp > status_fifoBound) status_fifoBound = temp; while(!fifoBound->push(address)); - /*temp = fifoBound->getSemValue(); + /*temp = fifoBound->getDataValue(); if (temp > status_fifoBound) status_fifoBound = temp;*/ } @@ -128,7 +120,7 @@ void Fifo::PopAddress(char*& address) { } void Fifo::PushAddressToStream(char*& address) { - while(!fifoStream->push(address)); + fifoStream->push(address); } void Fifo::PopAddressToStream(char*& address) { @@ -141,7 +133,6 @@ int Fifo::GetMaxLevelForFifoBound() { return temp; } - int Fifo::GetMinLevelForFifoFree() { int temp = status_fifoFree; status_fifoFree = fifoDepth; diff --git a/slsReceiverSoftware/src/UDPStandardImplementation.cpp b/slsReceiverSoftware/src/UDPStandardImplementation.cpp index c6786f283..54d0ac3cd 100644 --- a/slsReceiverSoftware/src/UDPStandardImplementation.cpp +++ b/slsReceiverSoftware/src/UDPStandardImplementation.cpp @@ -686,7 +686,7 @@ int UDPStandardImplementation::SetupFifoStructure() { } FILE_LOG(logINFO) << "Memory Allocated Per Fifo: " << ( ((generalData->imageSize) * numberofJobs + (generalData->fifoBufferHeaderSize)) * fifoDepth) << " bytes" ; - FILE_LOG(logINFO) << " Fifo structure(s) reconstructed: " << numThreads; + FILE_LOG(logINFO) << numThreads << " Fifo structure(s) reconstructed"; return OK; }