From 4d63e65b9d48a3cb000264544cc97d7211510fb2 Mon Sep 17 00:00:00 2001 From: Dirk Zimoch Date: Mon, 20 Jan 2025 09:30:27 +0100 Subject: [PATCH] fdManager changed to use poll() The implementation using select() limits file desciptors to FD_SETSIZE, typically 1024 on Linux. This number is too low for some applications, for example for the CA gateway. Therefore, Linux builds use poll() instead. --- modules/libcom/src/fdmgr/fdManager.cpp | 122 +++++++++++++++++++++---- modules/libcom/src/fdmgr/fdManager.h | 26 +++++- 2 files changed, 128 insertions(+), 20 deletions(-) diff --git a/modules/libcom/src/fdmgr/fdManager.cpp b/modules/libcom/src/fdmgr/fdManager.cpp index 8a9ed788f..94c5f7b14 100644 --- a/modules/libcom/src/fdmgr/fdManager.cpp +++ b/modules/libcom/src/fdmgr/fdManager.cpp @@ -31,27 +31,41 @@ using std :: max; fdManager fileDescriptorManager; -const unsigned mSecPerSec = 1000u; -const unsigned uSecPerSec = 1000u * mSecPerSec; +static const unsigned mSecPerSec = 1000u; +static const unsigned uSecPerSec = 1000u * mSecPerSec; + +#ifdef FDMGR_USE_POLL +static const int PollEvents[] = { // must match fdRegType + POLLRDNORM | POLLRDBAND | POLLIN | POLLHUP | POLLERR, + POLLWRBAND | POLLWRNORM | POLLOUT | POLLERR, + POLLPRI}; +#endif // // fdManager::fdManager() // -// hopefully its a reasonable guess that select() and epicsThreadSleep() +// hopefully its a reasonable guess that poll()/select() and epicsThreadSleep() // will have the same sleep quantum // -LIBCOM_API fdManager::fdManager () : +LIBCOM_API fdManager::fdManager () : sleepQuantum ( epicsThreadSleepQuantum () ), - fdSetsPtr ( new fd_set [fdrNEnums] ), - pTimerQueue ( 0 ), maxFD ( 0 ), processInProg ( false ), - pCBReg ( 0 ) + pTimerQueue ( 0 ), processInProg ( false ), +#ifdef FDMGR_USE_POLL + nfds ( 0 ), pollfdsCap ( 0 ), pollfds ( 0 ), +#endif +#ifdef FDMGR_USE_SELECT + fdSetsPtr ( new fd_set [fdrNEnums] ), maxFD ( 0 ), +#endif + pCBReg ( 0 ) { int status = osiSockAttach (); assert (status); +#ifdef FDMGR_USE_SELECT for ( size_t i = 0u; i < fdrNEnums; i++ ) { FD_ZERO ( &fdSetsPtr[i] ); } +#endif } // @@ -70,7 +84,12 @@ LIBCOM_API fdManager::~fdManager() pReg->destroy(); } delete this->pTimerQueue; +#ifdef FDMGR_USE_POLL + delete [] this->pollfds; +#endif +#ifdef FDMGR_USE_SELECT delete [] this->fdSetsPtr; +#endif osiSockRelease(); } @@ -91,7 +110,7 @@ LIBCOM_API void fdManager::process (double delay) // // One shot at expired timers prior to going into - // select. This allows zero delay timers to arm + // poll/select. This allows zero delay timers to arm // fd writes. We will never process the timer queue // more than once here so that fd activity get serviced // in a reasonable length of time. @@ -102,15 +121,30 @@ LIBCOM_API void fdManager::process (double delay) minDelay = delay; } - bool ioPending = false; + int ioPending = 0; tsDLIter < fdReg > iter = this->regList.firstIter (); while ( iter.valid () ) { +#ifdef FDMGR_USE_POLL + pollfds[ioPending].fd = iter->getFD(); + pollfds[ioPending].events = PollEvents[iter->getType()]; +#endif +#ifdef FDMGR_USE_SELECT FD_SET(iter->getFD(), &this->fdSetsPtr[iter->getType()]); - ioPending = true; +#endif + ++ioPending; ++iter; } if ( ioPending ) { +#ifdef FDMGR_USE_POLL + if (minDelay * mSecPerSec > INT_MAX) + minDelay = INT_MAX / mSecPerSec; + + int status = poll(pollfds, ioPending, static_cast(minDelay * mSecPerSec)); + int i = 0; +#endif + +#ifdef FDMGR_USE_SELECT struct timeval tv; tv.tv_sec = static_cast ( minDelay ); tv.tv_usec = static_cast ( (minDelay-tv.tv_sec) * uSecPerSec ); @@ -119,6 +153,7 @@ LIBCOM_API void fdManager::process (double delay) fd_set * pWriteSet = & this->fdSetsPtr[fdrWrite]; fd_set * pExceptSet = & this->fdSetsPtr[fdrException]; int status = select (this->maxFD, pReadSet, pWriteSet, pExceptSet, &tv); +#endif this->pTimerQueue->process(epicsTime::getCurrent()); @@ -131,8 +166,31 @@ LIBCOM_API void fdManager::process (double delay) while ( iter.valid () && status > 0 ) { tsDLIter < fdReg > tmp = iter; tmp++; + +#ifdef FDMGR_USE_POLL + // In a single threaded application, nothing should have + // changed the order of regList and pollfds by now. + // But just in case... + int isave = i; + while (pollfds[i].fd != iter->getFD() || pollfds[i].events != PollEvents[iter->getType()]) + { + i++; // skip pollfd of removed items + if (i >= ioPending) { // skip unknown (inserted?) items + iter = tmp; + tmp++; + if (!iter.valid()) break; + i = isave; + } + } + if (i >= ioPending) break; // any unhandled item stays in regList for next time + + if (pollfds[i++].revents & PollEvents[iter->getType()]) { +#endif + +#ifdef FDMGR_USE_SELECT if (FD_ISSET(iter->getFD(), &this->fdSetsPtr[iter->getType()])) { FD_CLR(iter->getFD(), &this->fdSetsPtr[iter->getType()]); +#endif this->regList.remove(*iter); this->activeList.add(*iter); iter->state = fdReg::active; @@ -177,22 +235,30 @@ LIBCOM_API void fdManager::process (double delay) else if ( status < 0 ) { int errnoCpy = SOCKERRNO; +#ifdef FDMGR_USE_SELECT // don't depend on flags being properly set if // an error is returned from select for ( size_t i = 0u; i < fdrNEnums; i++ ) { FD_ZERO ( &fdSetsPtr[i] ); } +#endif // - // print a message if its an unexpected error + // print a message if it's an unexpected error // if ( errnoCpy != SOCK_EINTR ) { char sockErrBuf[64]; epicsSocketConvertErrnoToString ( sockErrBuf, sizeof ( sockErrBuf ) ); - fprintf ( stderr, - "fdManager: select failed because \"%s\"\n", - sockErrBuf ); + errlogPrintf("fdManager: " +#ifdef FDMGR_USE_POLL + "poll()" +#endif +#ifdef FDMGR_USE_SELECT + "select()" +#endif + " failed because \"%s\"\n", + sockErrBuf); } } } @@ -256,8 +322,10 @@ void fdRegId::show ( unsigned level ) const // void fdManager::installReg (fdReg ®) { +#ifdef FDMGR_USE_SELECT this->maxFD = max ( this->maxFD, reg.getFD()+1 ); - // Most applications will find that its important to push here to +#endif + // Most applications will find that it's important to push here to // the front of the list so that transient writes get executed // first allowing incoming read protocol to find that outgoing // buffer space is newly available. @@ -268,6 +336,17 @@ void fdManager::installReg (fdReg ®) if ( status != 0 ) { throwWithLocation ( fdInterestSubscriptionAlreadyExits () ); } +#ifdef FDMGR_USE_POLL + // keep enough event slots for all fds to become available at once + if (++nfds > pollfdsCap) { + if (pollfdsCap == 0) + pollfdsCap = 16; + else + pollfdsCap *= 2; + delete[] pollfds; + pollfds = new struct pollfd[pollfdsCap]; + } +#endif } // @@ -279,8 +358,7 @@ void fdManager::removeReg (fdReg ®In) pItemFound = this->fdTbl.remove (regIn); if (pItemFound!=®In) { - fprintf(stderr, - "fdManager::removeReg() bad fd registration object\n"); + errlogPrintf("fdManager::removeReg() bad fd registration object\n"); return; } @@ -309,7 +387,13 @@ void fdManager::removeReg (fdReg ®In) } regIn.state = fdReg::limbo; +#ifdef FDMGR_USE_SELECT FD_CLR(regIn.getFD(), &this->fdSetsPtr[regIn.getType()]); +#endif + +#ifdef FDMGR_USE_POLL + nfds--; +#endif } // @@ -346,11 +430,13 @@ fdReg::fdReg (const SOCKET fdIn, const fdRegType typIn, fdRegId (fdIn,typIn), state (limbo), onceOnly (onceOnlyIn), manager (managerIn) { +#ifdef FDMGR_USE_SELECT if (!FD_IN_FDSET(fdIn)) { - fprintf (stderr, "%s: fd > FD_SETSIZE ignored\n", + errlogPrintf("%s: fd > FD_SETSIZE ignored\n", __FILE__); return; } +#endif this->manager.installReg (*this); } diff --git a/modules/libcom/src/fdmgr/fdManager.h b/modules/libcom/src/fdmgr/fdManager.h index 3112b26f4..22fce9762 100644 --- a/modules/libcom/src/fdmgr/fdManager.h +++ b/modules/libcom/src/fdmgr/fdManager.h @@ -19,6 +19,18 @@ #ifndef fdManagerH_included #define fdManagerH_included +#if !defined(FDMGR_USE_POLL) && !defined(FDMGR_USE_SELECT) +#if defined(__linux__) +#define FDMGR_USE_POLL +#else +#define FDMGR_USE_SELECT +#endif +#endif + +#ifdef FDMGR_USE_POLL +#include +#endif + #include "libComAPI.h" // reset share lib defines #include "tsDLList.h" #include "resourceLib.h" @@ -91,10 +103,20 @@ private: tsDLList < fdReg > activeList; resTable < fdReg, fdRegId > fdTbl; const double sleepQuantum; - fd_set * fdSetsPtr; epicsTimerQueuePassive * pTimerQueue; - SOCKET maxFD; bool processInProg; + +#ifdef FDMGR_USE_POLL + int nfds; + int pollfdsCap; + struct pollfd *pollfds; +#endif + +#ifdef FDMGR_USE_SELECT + fd_set * fdSetsPtr; + SOCKET maxFD; +#endif + // // Set to fdreg when in call back // and nill otherwise