esrf changes: slsReceiver: extend CircularFifo to support blocking/non-blocking transfers:

* Add blocking push (for performance) and non-blocking pop (for symmetry), default to blocking operations
* Fix memory fault if Fifo allocation fails
* Fix fifoFree initialisation to fifoSize elements (was fifoSize - 1)
This commit is contained in:
maliakal_d 2018-04-23 17:32:40 +02:00
parent 8f9c657fa0
commit d38108e527
3 changed files with 60 additions and 51 deletions

View File

@ -28,36 +28,53 @@ public:
CircularFifo(unsigned int Size) : tail(0), head(0){ CircularFifo(unsigned int Size) : tail(0), head(0){
Capacity = Size + 1; Capacity = Size + 1;
array.resize(Capacity); array.resize(Capacity);
sem_init(&free_mutex,0,0); sem_init(&data_mutex,0,0);
sem_init(&free_mutex,0,Size);
} }
virtual ~CircularFifo() { virtual ~CircularFifo() {
sem_destroy(&free_mutex); sem_destroy(&data_mutex);
sem_destroy(&free_mutex);
} }
bool push(Element*& item_); bool push(Element*& item_, bool no_block=false);
bool pop(Element*& item_); bool pop(Element*& item_, bool no_block=false);
bool isEmpty() const; bool isEmpty() const;
bool isFull() const; bool isFull() const;
int getSemValue(); int getDataValue() const;
int getFreeValue() const;
private: private:
volatile unsigned int tail; // input index
vector <Element*> array; vector <Element*> array;
volatile unsigned int head; // output index unsigned int tail; // input index
unsigned int head; // output index
unsigned int Capacity; unsigned int Capacity;
sem_t free_mutex; //#ifdef __cplusplus >= 201103L
mutable sem_t data_mutex;
mutable sem_t free_mutex;
//#else
// sem_t free_mutex;
//#endif
unsigned int increment(unsigned int idx_) const; unsigned int increment(unsigned int idx_) const;
}; };
template<typename Element> template<typename Element>
int CircularFifo<Element>::getSemValue() int CircularFifo<Element>::getDataValue() const
{ {
int value; int value;
sem_getvalue(&free_mutex, &value); sem_getvalue(&data_mutex, &value);
return value; return value;
}
template<typename Element>
int CircularFifo<Element>::getFreeValue() const
{
int value;
sem_getvalue(&free_mutex, &value);
return value;
} }
@ -66,21 +83,20 @@ int CircularFifo<Element>::getSemValue()
* will happen, it is up to the caller to handle this case * will happen, it is up to the caller to handle this case
* *
* \param item_ copy by reference the input item * \param item_ copy by reference the input item
* \param no_block if true, return immediately if fifo is full
* \return whether operation was successful or not */ * \return whether operation was successful or not */
template<typename Element> template<typename Element>
bool CircularFifo<Element>::push(Element*& item_) bool CircularFifo<Element>::push(Element*& item_, bool no_block)
{ {
unsigned int nextTail = increment(tail); // check for fifo full
if (no_block && isFull())
return false;
if(nextTail != head) sem_wait(&free_mutex);
{ array[tail] = item_;
array[tail] = item_; tail = increment(tail);
tail = nextTail; sem_post(&data_mutex);
sem_post(&free_mutex); return true;
return true;
}
// queue was full
return false;
} }
/** Consumer only: Removes and returns item from the queue /** Consumer only: Removes and returns item from the queue
@ -88,15 +104,19 @@ bool CircularFifo<Element>::push(Element*& item_)
* It is up to the caller to handle this case * It is up to the caller to handle this case
* *
* \param item_ return by reference the wanted item * \param item_ return by reference the wanted item
* \param no_block if true, return immediately if fifo is full
* \return whether operation was successful or not */ * \return whether operation was successful or not */
template<typename Element> template<typename Element>
bool CircularFifo<Element>::pop(Element*& item_) bool CircularFifo<Element>::pop(Element*& item_, bool no_block)
{ {
sem_wait(&free_mutex); // check for fifo empty
if (no_block && isEmpty())
return false;
sem_wait(&data_mutex);
item_ = array[head]; item_ = array[head];
head = increment(head); head = increment(head);
sem_post(&free_mutex);
return true; return true;
} }
@ -108,7 +128,7 @@ bool CircularFifo<Element>::pop(Element*& item_)
template<typename Element> template<typename Element>
bool CircularFifo<Element>::isEmpty() const bool CircularFifo<Element>::isEmpty() const
{ {
return (head == tail); return (getDataValue() == 0);
} }
/** Useful for testing and Producer check of status /** Useful for testing and Producer check of status
@ -119,9 +139,7 @@ bool CircularFifo<Element>::isEmpty() const
template<typename Element> template<typename Element>
bool CircularFifo<Element>::isFull() const bool CircularFifo<Element>::isFull() const
{ {
int tailCheck = increment(tail); return (getFreeValue() == 0);
//int tailCheck = (tail+1) % Capacity;
return (tailCheck == head);
} }
/** Increment helper function for index of the circular queue /** Increment helper function for index of the circular queue

View File

@ -50,28 +50,24 @@ int Fifo::CreateFifos(uint32_t fifoItemSize) {
fifoFree = new CircularFifo<char>(fifoDepth); fifoFree = new CircularFifo<char>(fifoDepth);
fifoStream = new CircularFifo<char>(fifoDepth); fifoStream = new CircularFifo<char>(fifoDepth);
//allocate memory //allocate memory
memory = (char*) calloc (fifoItemSize * fifoDepth, sizeof(char)); size_t mem_len = fifoItemSize * fifoDepth * sizeof(char);
memset(memory,0,fifoItemSize * fifoDepth* sizeof(char)); memory = (char*) malloc (mem_len);
if (memory == NULL){ if (memory == NULL){
FILE_LOG(logERROR) << "Could not allocate memory for fifos"; FILE_LOG(logERROR) << "Could not allocate memory for fifos";
memory = 0;
return FAIL; return FAIL;
} }
FILE_LOG(logDEBUG) << "Memory Allocated " << index << ": " << (fifoItemSize * fifoDepth) << " bytes"; memset(memory, 0, mem_len);
FILE_LOG(logDEBUG) << "Memory Allocated " << index << ": " << mem_len << " bytes";
{ //push free addresses into fifoFree fifo { //push free addresses into fifoFree fifo
char* buffer = memory; char* buffer = memory;
while (buffer < (memory + fifoItemSize * (fifoDepth-1))) { for (int i = 0; i < fifoDepth; ++i) {
//sprintf(buffer,"memory"); //sprintf(buffer,"memory");
#ifdef FIFODEBUG
cprintf(MAGENTA,"Fifofree %d: value:%d, pop 0x%p\n", index, fifoFree->getSemValue(), (void*)(buffer));
#endif
FreeAddress(buffer); FreeAddress(buffer);
buffer += fifoItemSize; buffer += fifoItemSize;
} }
} }
FILE_LOG(logINFO) << "Fifo " << index << " reconstructed Depth (rx_fifodepth): " << fifoFree->getDataValue();
FILE_LOG(logDEBUG) << "Fifo Reconstructed Depth " << index << ": " << fifoDepth;
return OK; return OK;
} }
@ -79,7 +75,6 @@ int Fifo::CreateFifos(uint32_t fifoItemSize) {
void Fifo::DestroyFifos(){ void Fifo::DestroyFifos(){
FILE_LOG(logDEBUG) << __AT__ << " called"; FILE_LOG(logDEBUG) << __AT__ << " called";
if(memory) { if(memory) {
free(memory); free(memory);
memory = 0; memory = 0;
@ -100,25 +95,22 @@ void Fifo::DestroyFifos(){
void Fifo::FreeAddress(char*& address) { void Fifo::FreeAddress(char*& address) {
while(!fifoFree->push(address)); fifoFree->push(address);
} }
void Fifo::GetNewAddress(char*& address) { void Fifo::GetNewAddress(char*& address) {
int temp = fifoFree->getSemValue(); int temp = fifoFree->getDataValue();
if (temp < status_fifoFree) if (temp < status_fifoFree)
status_fifoFree = temp; status_fifoFree = temp;
fifoFree->pop(address); fifoFree->pop(address);
/*temp = fifoFree->getSemValue();
if (temp < status_fifoFree)
status_fifoFree = temp;*/
} }
void Fifo::PushAddress(char*& address) { void Fifo::PushAddress(char*& address) {
int temp = fifoBound->getSemValue(); int temp = fifoBound->getDataValue();
if (temp > status_fifoBound) if (temp > status_fifoBound)
status_fifoBound = temp; status_fifoBound = temp;
while(!fifoBound->push(address)); while(!fifoBound->push(address));
/*temp = fifoBound->getSemValue(); /*temp = fifoBound->getDataValue();
if (temp > status_fifoBound) if (temp > status_fifoBound)
status_fifoBound = temp;*/ status_fifoBound = temp;*/
} }
@ -128,7 +120,7 @@ void Fifo::PopAddress(char*& address) {
} }
void Fifo::PushAddressToStream(char*& address) { void Fifo::PushAddressToStream(char*& address) {
while(!fifoStream->push(address)); fifoStream->push(address);
} }
void Fifo::PopAddressToStream(char*& address) { void Fifo::PopAddressToStream(char*& address) {
@ -141,7 +133,6 @@ int Fifo::GetMaxLevelForFifoBound() {
return temp; return temp;
} }
int Fifo::GetMinLevelForFifoFree() { int Fifo::GetMinLevelForFifoFree() {
int temp = status_fifoFree; int temp = status_fifoFree;
status_fifoFree = fifoDepth; status_fifoFree = fifoDepth;

View File

@ -686,7 +686,7 @@ int UDPStandardImplementation::SetupFifoStructure() {
} }
FILE_LOG(logINFO) << "Memory Allocated Per Fifo: " << ( ((generalData->imageSize) * numberofJobs + (generalData->fifoBufferHeaderSize)) * fifoDepth) << " bytes" ; FILE_LOG(logINFO) << "Memory Allocated Per Fifo: " << ( ((generalData->imageSize) * numberofJobs + (generalData->fifoBufferHeaderSize)) * fifoDepth) << " bytes" ;
FILE_LOG(logINFO) << " Fifo structure(s) reconstructed: " << numThreads; FILE_LOG(logINFO) << numThreads << " Fifo structure(s) reconstructed";
return OK; return OK;
} }