fdManager changed to use poll()

The implementation using select() limits file desciptors to FD_SETSIZE,
typically 1024 on Linux. This number is too low for some applications,
for example for the CA gateway.
Therefore, Linux builds use poll() instead.
This commit is contained in:
2025-01-20 09:30:27 +01:00
parent 8f77e941c7
commit 57c0295024
2 changed files with 128 additions and 20 deletions

View File

@ -31,27 +31,41 @@ using std :: max;
fdManager fileDescriptorManager;
const unsigned mSecPerSec = 1000u;
const unsigned uSecPerSec = 1000u * mSecPerSec;
static const unsigned mSecPerSec = 1000u;
static const unsigned uSecPerSec = 1000u * mSecPerSec;
#ifdef FDMGR_USE_POLL
static const int PollEvents[] = { // must match fdRegType
POLLRDNORM | POLLRDBAND | POLLIN | POLLHUP | POLLERR,
POLLWRBAND | POLLWRNORM | POLLOUT | POLLERR,
POLLPRI};
#endif
//
// fdManager::fdManager()
//
// hopefully its a reasonable guess that select() and epicsThreadSleep()
// hopefully its a reasonable guess that poll()/select() and epicsThreadSleep()
// will have the same sleep quantum
//
LIBCOM_API fdManager::fdManager () :
LIBCOM_API fdManager::fdManager () :
sleepQuantum ( epicsThreadSleepQuantum () ),
fdSetsPtr ( new fd_set [fdrNEnums] ),
pTimerQueue ( 0 ), maxFD ( 0 ), processInProg ( false ),
pCBReg ( 0 )
pTimerQueue ( 0 ), processInProg ( false ),
#ifdef FDMGR_USE_POLL
nfds ( 0 ), pollfdsCap ( 0 ), pollfds ( 0 ),
#endif
#ifdef FDMGR_USE_SELECT
fdSetsPtr ( new fd_set [fdrNEnums] ), maxFD ( 0 ),
#endif
pCBReg ( 0 )
{
int status = osiSockAttach ();
assert (status);
#ifdef FDMGR_USE_SELECT
for ( size_t i = 0u; i < fdrNEnums; i++ ) {
FD_ZERO ( &fdSetsPtr[i] );
}
#endif
}
//
@ -70,7 +84,12 @@ LIBCOM_API fdManager::~fdManager()
pReg->destroy();
}
delete this->pTimerQueue;
#ifdef FDMGR_USE_POLL
delete [] this->pollfds;
#endif
#ifdef FDMGR_USE_SELECT
delete [] this->fdSetsPtr;
#endif
osiSockRelease();
}
@ -91,7 +110,7 @@ LIBCOM_API void fdManager::process (double delay)
//
// One shot at expired timers prior to going into
// select. This allows zero delay timers to arm
// poll/select. This allows zero delay timers to arm
// fd writes. We will never process the timer queue
// more than once here so that fd activity get serviced
// in a reasonable length of time.
@ -102,15 +121,30 @@ LIBCOM_API void fdManager::process (double delay)
minDelay = delay;
}
bool ioPending = false;
int ioPending = 0;
tsDLIter < fdReg > iter = this->regList.firstIter ();
while ( iter.valid () ) {
#ifdef FDMGR_USE_POLL
pollfds[ioPending].fd = iter->getFD();
pollfds[ioPending].events = PollEvents[iter->getType()];
#endif
#ifdef FDMGR_USE_SELECT
FD_SET(iter->getFD(), &this->fdSetsPtr[iter->getType()]);
ioPending = true;
#endif
++ioPending;
++iter;
}
if ( ioPending ) {
#ifdef FDMGR_USE_POLL
if (minDelay * mSecPerSec > INT_MAX)
minDelay = INT_MAX / mSecPerSec;
int status = poll(pollfds, ioPending, static_cast<int>(minDelay * mSecPerSec));
int i = 0;
#endif
#ifdef FDMGR_USE_SELECT
struct timeval tv;
tv.tv_sec = static_cast<time_t> ( minDelay );
tv.tv_usec = static_cast<long> ( (minDelay-tv.tv_sec) * uSecPerSec );
@ -119,6 +153,7 @@ LIBCOM_API void fdManager::process (double delay)
fd_set * pWriteSet = & this->fdSetsPtr[fdrWrite];
fd_set * pExceptSet = & this->fdSetsPtr[fdrException];
int status = select (this->maxFD, pReadSet, pWriteSet, pExceptSet, &tv);
#endif
this->pTimerQueue->process(epicsTime::getCurrent());
@ -131,8 +166,31 @@ LIBCOM_API void fdManager::process (double delay)
while ( iter.valid () && status > 0 ) {
tsDLIter < fdReg > tmp = iter;
tmp++;
#ifdef FDMGR_USE_POLL
// In a single threaded application, nothing should have
// changed the order of regList and pollfds by now.
// But just in case...
int isave = i;
while (pollfds[i].fd != iter->getFD() || pollfds[i].events != PollEvents[iter->getType()])
{
i++; // skip pollfd of removed items
if (i >= ioPending) { // skip unknown (inserted?) items
iter = tmp;
tmp++;
if (!iter.valid()) break;
i = isave;
}
}
if (i >= ioPending) break; // any unhandled item stays in regList for next time
if (pollfds[i++].revents & PollEvents[iter->getType()]) {
#endif
#ifdef FDMGR_USE_SELECT
if (FD_ISSET(iter->getFD(), &this->fdSetsPtr[iter->getType()])) {
FD_CLR(iter->getFD(), &this->fdSetsPtr[iter->getType()]);
#endif
this->regList.remove(*iter);
this->activeList.add(*iter);
iter->state = fdReg::active;
@ -177,22 +235,30 @@ LIBCOM_API void fdManager::process (double delay)
else if ( status < 0 ) {
int errnoCpy = SOCKERRNO;
#ifdef FDMGR_USE_SELECT
// don't depend on flags being properly set if
// an error is returned from select
for ( size_t i = 0u; i < fdrNEnums; i++ ) {
FD_ZERO ( &fdSetsPtr[i] );
}
#endif
//
// print a message if its an unexpected error
// print a message if it's an unexpected error
//
if ( errnoCpy != SOCK_EINTR ) {
char sockErrBuf[64];
epicsSocketConvertErrnoToString (
sockErrBuf, sizeof ( sockErrBuf ) );
fprintf ( stderr,
"fdManager: select failed because \"%s\"\n",
sockErrBuf );
errlogPrintf("fdManager: "
#ifdef FDMGR_USE_POLL
"poll()"
#endif
#ifdef FDMGR_USE_SELECT
"select()"
#endif
" failed because \"%s\"\n",
sockErrBuf);
}
}
}
@ -256,8 +322,10 @@ void fdRegId::show ( unsigned level ) const
//
void fdManager::installReg (fdReg &reg)
{
#ifdef FDMGR_USE_SELECT
this->maxFD = max ( this->maxFD, reg.getFD()+1 );
// Most applications will find that its important to push here to
#endif
// Most applications will find that it's important to push here to
// the front of the list so that transient writes get executed
// first allowing incoming read protocol to find that outgoing
// buffer space is newly available.
@ -268,6 +336,17 @@ void fdManager::installReg (fdReg &reg)
if ( status != 0 ) {
throwWithLocation ( fdInterestSubscriptionAlreadyExits () );
}
#ifdef FDMGR_USE_POLL
// keep enough event slots for all fds to become available at once
if (++nfds > pollfdsCap) {
if (pollfdsCap == 0)
pollfdsCap = 16;
else
pollfdsCap *= 2;
delete[] pollfds;
pollfds = new struct pollfd[pollfdsCap];
}
#endif
}
//
@ -279,8 +358,7 @@ void fdManager::removeReg (fdReg &regIn)
pItemFound = this->fdTbl.remove (regIn);
if (pItemFound!=&regIn) {
fprintf(stderr,
"fdManager::removeReg() bad fd registration object\n");
errlogPrintf("fdManager::removeReg() bad fd registration object\n");
return;
}
@ -309,7 +387,13 @@ void fdManager::removeReg (fdReg &regIn)
}
regIn.state = fdReg::limbo;
#ifdef FDMGR_USE_SELECT
FD_CLR(regIn.getFD(), &this->fdSetsPtr[regIn.getType()]);
#endif
#ifdef FDMGR_USE_POLL
nfds--;
#endif
}
//
@ -346,11 +430,13 @@ fdReg::fdReg (const SOCKET fdIn, const fdRegType typIn,
fdRegId (fdIn,typIn), state (limbo),
onceOnly (onceOnlyIn), manager (managerIn)
{
#ifdef FDMGR_USE_SELECT
if (!FD_IN_FDSET(fdIn)) {
fprintf (stderr, "%s: fd > FD_SETSIZE ignored\n",
errlogPrintf("%s: fd > FD_SETSIZE ignored\n",
__FILE__);
return;
}
#endif
this->manager.installReg (*this);
}

View File

@ -19,6 +19,18 @@
#ifndef fdManagerH_included
#define fdManagerH_included
#if !defined(FDMGR_USE_POLL) && !defined(FDMGR_USE_SELECT)
#if defined(__linux__)
#define FDMGR_USE_POLL
#else
#define FDMGR_USE_SELECT
#endif
#endif
#ifdef FDMGR_USE_POLL
#include <poll.h>
#endif
#include "libComAPI.h" // reset share lib defines
#include "tsDLList.h"
#include "resourceLib.h"
@ -91,10 +103,20 @@ private:
tsDLList < fdReg > activeList;
resTable < fdReg, fdRegId > fdTbl;
const double sleepQuantum;
fd_set * fdSetsPtr;
epicsTimerQueuePassive * pTimerQueue;
SOCKET maxFD;
bool processInProg;
#ifdef FDMGR_USE_POLL
int nfds;
int pollfdsCap;
struct pollfd *pollfds;
#endif
#ifdef FDMGR_USE_SELECT
fd_set * fdSetsPtr;
SOCKET maxFD;
#endif
//
// Set to fdreg when in call back
// and nill otherwise