Merge remote-tracking branch 'github/fdManager_using_poll' into PSI-7.0

This commit is contained in:
2025-03-03 17:20:13 +01:00
7 changed files with 642 additions and 319 deletions

View File

@ -38,6 +38,12 @@ record type by opening these files in a text editor intead of opening a browser
and loading the HTML versions or finding and opening the files from the EPICS
Documentation site.
### fdManager file descriptor limit removed
In order to support file descriptors above 1023, fdManager now uses
poll() instead of select() on all architectures that support it
(Linux, MacOS, Windows, newer RTEMS).
### Post monitors from compress record when it's reset
Writing into a compress record's `RES` field now posts a monitor event instead

View File

@ -19,39 +19,117 @@
// 1) This library is not thread safe
//
#include <algorithm>
#define instantiateRecourceLib
#include "epicsAssert.h"
#include "epicsThread.h"
#include "fdManager.h"
#include "locationException.h"
using std :: max;
#if !defined(FDMGR_USE_POLL) && !defined(FDMGR_USE_SELECT)
#if defined(__linux__) || defined(darwin) || _WIN32_WINNT >= 0x600 || (defined(__rtems__) && !defined(RTEMS_LEGACY_STACK))
#define FDMGR_USE_POLL
#else
#define FDMGR_USE_SELECT
#endif
#endif
#ifdef FDMGR_USE_POLL
#include <vector>
#if !defined(_WIN32)
#include <poll.h>
#endif
static const short PollEvents[] = { // must match fdRegType
POLLRDNORM | POLLRDBAND | POLLIN | POLLHUP | POLLERR,
POLLWRBAND | POLLWRNORM | POLLOUT | POLLERR,
POLLPRI };
#if defined(_WIN32)
#define poll WSAPoll
// Filter out PollEvents that Windows does not accept in events (only returns in revents)
#define WIN_POLLEVENT_FILTER(ev) static_cast<short>((ev) & (POLLIN | POLLOUT))
#else
// Linux, MacOS and RTEMS don't care
#define WIN_POLLEVENT_FILTER(ev) (ev)
#endif
#endif
#ifdef FDMGR_USE_SELECT
#include <algorithm>
#endif
struct fdManagerPrivate {
tsDLList<fdReg> regList;
tsDLList<fdReg> activeList;
resTable<fdReg, fdRegId> fdTbl;
const double sleepQuantum;
epics::auto_ptr<epicsTimerQueuePassive> pTimerQueue;
bool processInProg;
#ifdef FDMGR_USE_POLL
std::vector<struct pollfd> pollfds;
#endif
#ifdef FDMGR_USE_SELECT
fd_set fdSets[fdrNEnums];
SOCKET maxFD;
#endif
//
// Set to fdreg when in call back
// and nill otherwise
//
volatile fdReg* pCBReg;
fdManager& owner;
explicit fdManagerPrivate(fdManager& owner);
void lazyInitTimerQueue();
};
fdManagerPrivate::fdManagerPrivate(fdManager& owner) :
sleepQuantum(epicsThreadSleepQuantum()),
processInProg(false),
pCBReg(NULL), owner(owner)
{}
inline void fdManagerPrivate::lazyInitTimerQueue()
{
if (!pTimerQueue.get()) {
pTimerQueue.reset(&epicsTimerQueuePassive::create(owner));
}
}
epicsTimer& fdManager::createTimer()
{
priv->lazyInitTimerQueue();
return priv->pTimerQueue->createTimer();
}
fdManager fileDescriptorManager;
const unsigned mSecPerSec = 1000u;
const unsigned uSecPerSec = 1000u * mSecPerSec;
static const unsigned mSecPerSec = 1000u;
#ifdef FDMGR_USE_SELECT
static const unsigned uSecPerSec = 1000u * mSecPerSec;
#endif
//
// fdManager::fdManager()
//
// hopefully its a reasonable guess that select() and epicsThreadSleep()
// hopefully its a reasonable guess that poll()/select() and epicsThreadSleep()
// will have the same sleep quantum
//
LIBCOM_API fdManager::fdManager() :
sleepQuantum ( epicsThreadSleepQuantum () ),
fdSetsPtr ( new fd_set [fdrNEnums] ),
pTimerQueue ( 0 ), maxFD ( 0 ), processInProg ( false ),
pCBReg ( 0 )
priv(new fdManagerPrivate(*this))
{
int status = osiSockAttach();
assert(status);
#ifdef FDMGR_USE_SELECT
priv->maxFD = 0;
for (size_t i = 0u; i < fdrNEnums; i++) {
FD_ZERO ( &fdSetsPtr[i] );
FD_ZERO(&priv->fdSets[i]);
}
#endif
}
//
@ -61,16 +139,14 @@ LIBCOM_API fdManager::~fdManager()
{
fdReg* pReg;
while ( (pReg = this->regList.get()) ) {
while ((pReg = priv->regList.get())) {
pReg->state = fdReg::limbo;
pReg->destroy();
}
while ( (pReg = this->activeList.get()) ) {
while ((pReg = priv->activeList.get())) {
pReg->state = fdReg::limbo;
pReg->destroy();
}
delete this->pTimerQueue;
delete [] this->fdSetsPtr;
osiSockRelease();
}
@ -79,62 +155,120 @@ LIBCOM_API fdManager::~fdManager()
//
LIBCOM_API void fdManager::process(double delay)
{
this->lazyInitTimerQueue ();
priv->lazyInitTimerQueue();
//
// no recursion
//
if (this->processInProg) {
if (priv->processInProg)
return;
}
this->processInProg = true;
priv->processInProg = true;
//
// One shot at expired timers prior to going into
// select. This allows zero delay timers to arm
// poll/select. This allows zero delay timers to arm
// fd writes. We will never process the timer queue
// more than once here so that fd activity get serviced
// in a reasonable length of time.
//
double minDelay = this->pTimerQueue->process(epicsTime::getCurrent());
double minDelay = priv->pTimerQueue->process(epicsTime::getCurrent());
if (minDelay >= delay) {
minDelay = delay;
}
bool ioPending = false;
tsDLIter < fdReg > iter = this->regList.firstIter ();
#ifdef FDMGR_USE_POLL
priv->pollfds.clear();
#endif
int ioPending = 0;
tsDLIter<fdReg> iter = priv->regList.firstIter();
while (iter.valid()) {
FD_SET(iter->getFD(), &this->fdSetsPtr[iter->getType()]);
ioPending = true;
++ioPending;
#ifdef FDMGR_USE_POLL
#if __cplusplus >= 201100L
priv->pollfds.emplace_back(pollfd{
.fd = iter->getFD(),
.events = WIN_POLLEVENT_FILTER(PollEvents[iter->getType()])
});
#else
struct pollfd pollfd;
pollfd.fd = iter->getFD();
pollfd.events = WIN_POLLEVENT_FILTER(PollEvents[iter->getType()]);
pollfd.revents = 0;
priv->pollfds.push_back(pollfd);
#endif
#endif
#ifdef FDMGR_USE_SELECT
FD_SET(iter->getFD(), &priv->fdSets[iter->getType()]);
#endif
++iter;
}
if (ioPending) {
#ifdef FDMGR_USE_POLL
if (minDelay * mSecPerSec > INT_MAX)
minDelay = INT_MAX / mSecPerSec;
int status = poll(&priv->pollfds[0], // ancient C++ has no vector.data()
ioPending, static_cast<int>(minDelay * mSecPerSec));
int i = 0;
#endif
#ifdef FDMGR_USE_SELECT
struct timeval tv;
tv.tv_sec = static_cast<time_t>(minDelay);
tv.tv_usec = static_cast<long>((minDelay-tv.tv_sec) * uSecPerSec);
fd_set * pReadSet = & this->fdSetsPtr[fdrRead];
fd_set * pWriteSet = & this->fdSetsPtr[fdrWrite];
fd_set * pExceptSet = & this->fdSetsPtr[fdrException];
int status = select (this->maxFD, pReadSet, pWriteSet, pExceptSet, &tv);
int status = select(priv->maxFD,
&priv->fdSets[fdrRead],
&priv->fdSets[fdrWrite],
&priv->fdSets[fdrException], &tv);
#endif
this->pTimerQueue->process(epicsTime::getCurrent());
priv->pTimerQueue->process(epicsTime::getCurrent());
if (status > 0) {
//
// Look for activity
//
iter=this->regList.firstIter ();
iter = priv->regList.firstIter();
while (iter.valid() && status > 0) {
tsDLIter<fdReg> tmp = iter;
tmp++;
if (FD_ISSET(iter->getFD(), &this->fdSetsPtr[iter->getType()])) {
FD_CLR(iter->getFD(), &this->fdSetsPtr[iter->getType()]);
this->regList.remove(*iter);
this->activeList.add(*iter);
#ifdef FDMGR_USE_POLL
// In a single threaded application, nothing should have
// changed the order of regList and pollfds by now.
// But just in case...
int isave = i;
while (priv->pollfds[i].fd != iter->getFD() ||
priv->pollfds[i].events != WIN_POLLEVENT_FILTER(PollEvents[iter->getType()]))
{
errlogPrintf("fdManager: skipping (removed?) pollfd %d (expected %d)\n", priv->pollfds[i].fd, iter->getFD());
i++; // skip pollfd of removed items
if (i >= ioPending) { // skip unknown (inserted?) items
errlogPrintf("fdManager: skipping (inserted?) item %d\n", iter->getFD());
iter = tmp;
tmp++;
if (!iter.valid()) break;
i = isave;
}
}
if (i >= ioPending) break; // any unhandled item stays in regList for next time
if (priv->pollfds[i++].revents & PollEvents[iter->getType()]) {
#endif
#ifdef FDMGR_USE_SELECT
if (FD_ISSET(iter->getFD(), &priv->fdSets[iter->getType()])) {
FD_CLR(iter->getFD(), &priv->fdSets[iter->getType()]);
#endif
priv->regList.remove(*iter);
priv->activeList.add(*iter);
iter->state = fdReg::active;
status--;
}
@ -146,7 +280,7 @@ LIBCOM_API void fdManager::process (double delay)
// above list while in a "callBack()" routine
//
fdReg* pReg;
while ( (pReg = this->activeList.get()) ) {
while ((pReg = priv->activeList.get())) {
pReg->state = fdReg::limbo;
//
@ -154,21 +288,21 @@ LIBCOM_API void fdManager::process (double delay)
// can detect if it was deleted
// during the call back
//
this->pCBReg = pReg;
priv->pCBReg = pReg;
pReg->callBack();
if (this->pCBReg != NULL) {
if (priv->pCBReg != NULL) {
//
// check only after we see that it is non-null so
// that we don't trigger bounds-checker dangling pointer
// error
//
assert (this->pCBReg==pReg);
this->pCBReg = 0;
assert(priv->pCBReg == pReg);
priv->pCBReg = NULL;
if (pReg->onceOnly) {
pReg->destroy();
}
else {
this->regList.add(*pReg);
priv->regList.add(*pReg);
pReg->state = fdReg::pending;
}
}
@ -177,21 +311,29 @@ LIBCOM_API void fdManager::process (double delay)
else if (status < 0) {
int errnoCpy = SOCKERRNO;
#ifdef FDMGR_USE_SELECT
// don't depend on flags being properly set if
// an error is returned from select
for (size_t i = 0u; i < fdrNEnums; i++) {
FD_ZERO ( &fdSetsPtr[i] );
FD_ZERO(&priv->fdSets[i]);
}
#endif
//
// print a message if its an unexpected error
// print a message if it's an unexpected error
//
if (errnoCpy != SOCK_EINTR) {
char sockErrBuf[64];
epicsSocketConvertErrnoToString(
sockErrBuf, sizeof(sockErrBuf));
fprintf ( stderr,
"fdManager: select failed because \"%s\"\n",
errlogPrintf("fdManager: "
#ifdef FDMGR_USE_POLL
"poll()"
#endif
#ifdef FDMGR_USE_SELECT
"select()"
#endif
" failed because \"%s\"\n",
sockErrBuf);
}
}
@ -203,9 +345,9 @@ LIBCOM_API void fdManager::process (double delay)
* of select()
*/
epicsThreadSleep(minDelay);
this->pTimerQueue->process(epicsTime::getCurrent());
priv->pTimerQueue->process(epicsTime::getCurrent());
}
this->processInProg = false;
priv->processInProg = false;
}
//
@ -222,7 +364,7 @@ void fdReg::destroy()
//
fdReg::~fdReg()
{
this->manager.removeReg(*this);
manager.removeReg(*this);
}
//
@ -230,12 +372,12 @@ fdReg::~fdReg()
//
void fdReg::show(unsigned level) const
{
printf ("fdReg at %p\n", (void *) this);
printf("fdReg at %p\n", this);
if (level > 1u) {
printf("\tstate = %d, onceOnly = %d\n",
this->state, this->onceOnly);
state, onceOnly);
}
this->fdRegId::show(level);
fdRegId::show(level);
}
//
@ -243,11 +385,14 @@ void fdReg::show(unsigned level) const
//
void fdRegId::show(unsigned level) const
{
printf ( "fdRegId at %p\n",
static_cast <const void *> ( this ) );
printf("fdRegId at %p\n", this);
if (level > 1u) {
printf ( "\tfd = %d, type = %d\n",
int(this->fd), this->type );
printf("\tfd = %"
#if defined(_WIN32)
"I"
#endif
"d, type = %d\n",
fd, type);
}
}
@ -256,18 +401,21 @@ void fdRegId::show ( unsigned level ) const
//
void fdManager::installReg(fdReg &reg)
{
this->maxFD = max ( this->maxFD, reg.getFD()+1 );
// Most applications will find that its important to push here to
#ifdef FDMGR_USE_SELECT
priv->maxFD = std::max(priv->maxFD, reg.getFD()+1);
#endif
// Most applications will find that it's important to push here to
// the front of the list so that transient writes get executed
// first allowing incoming read protocol to find that outgoing
// buffer space is newly available.
this->regList.push ( reg );
priv->regList.push(reg);
reg.state = fdReg::pending;
int status = this->fdTbl.add ( reg );
int status = priv->fdTbl.add(reg);
if (status != 0) {
throwWithLocation(fdInterestSubscriptionAlreadyExits());
}
// errlogPrintf("fdManager::adding fd %d\n", reg.getFD());
}
//
@ -277,10 +425,9 @@ void fdManager::removeReg (fdReg &regIn)
{
fdReg* pItemFound;
pItemFound = this->fdTbl.remove (regIn);
pItemFound = priv->fdTbl.remove(regIn);
if (pItemFound != &regIn) {
fprintf(stderr,
"fdManager::removeReg() bad fd registration object\n");
errlogPrintf("fdManager::removeReg() bad fd registration object\n");
return;
}
@ -288,16 +435,16 @@ void fdManager::removeReg (fdReg &regIn)
// signal fdManager that the fdReg was deleted
// during the call back
//
if (this->pCBReg == &regIn) {
this->pCBReg = 0;
if (priv->pCBReg == &regIn) {
priv->pCBReg = NULL;
}
switch (regIn.state) {
case fdReg::active:
this->activeList.remove (regIn);
priv->activeList.remove(regIn);
break;
case fdReg::pending:
this->regList.remove (regIn);
priv->regList.remove(regIn);
break;
case fdReg::limbo:
break;
@ -309,7 +456,11 @@ void fdManager::removeReg (fdReg &regIn)
}
regIn.state = fdReg::limbo;
FD_CLR(regIn.getFD(), &this->fdSetsPtr[regIn.getType()]);
#ifdef FDMGR_USE_SELECT
FD_CLR(regIn.getFD(), &priv->fdSets[regIn.getType()]);
#endif
// errlogPrintf("fdManager::removing fd %d\n", regIn.getFD());
}
//
@ -323,7 +474,7 @@ void fdManager::reschedule ()
double fdManager::quantum()
{
return this->sleepQuantum;
return priv->sleepQuantum;
}
//
@ -335,7 +486,7 @@ LIBCOM_API fdReg *fdManager::lookUpFD (const SOCKET fd, const fdRegType type)
return NULL;
}
fdRegId id(fd,type);
return this->fdTbl.lookup(id);
return priv->fdTbl.lookup(id);
}
//
@ -346,12 +497,14 @@ fdReg::fdReg (const SOCKET fdIn, const fdRegType typIn,
fdRegId(fdIn,typIn), state(limbo),
onceOnly(onceOnlyIn), manager(managerIn)
{
#ifdef FDMGR_USE_SELECT
if (!FD_IN_FDSET(fdIn)) {
fprintf (stderr, "%s: fd > FD_SETSIZE ignored\n",
errlogPrintf("%s: fd > FD_SETSIZE ignored\n",
__FILE__);
return;
}
this->manager.installReg (*this);
#endif
manager.installReg(*this);
}
template class resTable<fdReg, fdRegId>;

View File

@ -19,6 +19,16 @@
#ifndef fdManagerH_included
#define fdManagerH_included
#include <memory>
namespace epics {
#if __cplusplus>=201103L
template<typename T>
using auto_ptr = std::unique_ptr<T>;
#else
using std::auto_ptr;
#endif
}
#include "libComAPI.h" // reset share lib defines
#include "tsDLList.h"
#include "resourceLib.h"
@ -42,17 +52,17 @@ public:
SOCKET getFD() const
{
return this->fd;
return fd;
}
fdRegType getType() const
{
return this->type;
return type;
}
bool operator == (const fdRegId& idIn) const
{
return this->fd == idIn.fd && this->type==idIn.type;
return fd == idIn.fd && type == idIn.type;
}
resTableIndex hash() const;
@ -70,41 +80,29 @@ private:
//
// file descriptor manager
//
class fdManager : public epicsTimerQueueNotify {
class LIBCOM_API fdManager : public epicsTimerQueueNotify {
public:
//
// exceptions
//
class fdInterestSubscriptionAlreadyExits {};
LIBCOM_API fdManager ();
LIBCOM_API virtual ~fdManager ();
LIBCOM_API void process ( double delay ); // delay parameter is in seconds
fdManager();
virtual ~fdManager();
void process(double delay); // delay parameter is in seconds
// returns NULL if the fd is unknown
LIBCOM_API class fdReg *lookUpFD (const SOCKET fd, const fdRegType type);
class fdReg* lookUpFD(const SOCKET fd, const fdRegType type);
epicsTimer& createTimer();
private:
tsDLList < fdReg > regList;
tsDLList < fdReg > activeList;
resTable < fdReg, fdRegId > fdTbl;
const double sleepQuantum;
fd_set * fdSetsPtr;
epicsTimerQueuePassive * pTimerQueue;
SOCKET maxFD;
bool processInProg;
//
// Set to fdreg when in call back
// and nill otherwise
//
fdReg * pCBReg;
epics::auto_ptr <struct fdManagerPrivate> priv;
void reschedule();
double quantum();
void installReg(fdReg& reg);
void removeReg(fdReg& reg);
void lazyInitTimerQueue ();
fdManager(const fdManager&);
fdManager& operator = (const fdManager&);
friend class fdReg;
@ -173,12 +171,12 @@ inline resTableIndex fdRegId::hash () const
resTableIndex hashid;
hashid = integerHash(fdManagerHashTableMinIndexBits,
fdManagerHashTableMaxIndexBits, this->fd );
fdManagerHashTableMaxIndexBits, fd);
//
// also evenly distribute based on the type of fdRegType
//
hashid ^= this->type;
hashid ^= type;
//
// the result here is always masked to the
@ -187,18 +185,5 @@ inline resTableIndex fdRegId::hash () const
return hashid;
}
inline void fdManager::lazyInitTimerQueue ()
{
if ( ! this->pTimerQueue ) {
this->pTimerQueue = & epicsTimerQueuePassive::create ( *this );
}
}
inline epicsTimer & fdManager::createTimer ()
{
this->lazyInitTimerQueue ();
return this->pTimerQueue->createTimer ();
}
#endif // fdManagerH_included

View File

@ -209,6 +209,11 @@ aslibtest_SRCS += aslibtest.c
testHarness_SRCS += aslibtest.c
TESTS += aslibtest
TESTPROD_HOST += fdManagerTest
fdManagerTest_SRCS += fdManagerTest.cpp
testHarness_SRCS += fdManagerTest.cpp
TESTS += fdManagerTest
# Perl module tests:
TESTS += macLib
@ -315,11 +320,6 @@ TESTPROD_HOST += buckTest
buckTest_SRCS += buckTest.c
testHarness_SRCS += buckTest.c
#TESTPROD_HOST += fdmgrTest
fdmgrTest_SRCS += fdmgrTest.c
fdmgrTest_LIBS += ca
# FIXME: program never exits.
TESTPROD_HOST += epicsAtomicPerform
epicsAtomicPerform_SRCS += epicsAtomicPerform.cpp
testHarness_SRCS += epicsAtomicPerform.cpp

View File

@ -54,6 +54,7 @@ int initHookTest(void);
int ipAddrToAsciiTest(void);
int macDefExpandTest(void);
int macLibTest(void);
int fdManagerTest(void);
int osiSockTest(void);
int ringBytesTest(void);
int ringPointerTest(void);
@ -110,6 +111,7 @@ void epicsRunLibComTests(void)
runTest(ipAddrToAsciiTest);
runTest(macDefExpandTest);
runTest(macLibTest);
runTest(fdManagerTest);
runTest(osiSockTest);
runTest(ringBytesTest);
runTest(ringPointerTest);

View File

@ -0,0 +1,316 @@
/*************************************************************************\
* Copyright (c) 2025 Michael Davidsaver
* SPDX-License-Identifier: EPICS
* EPICS Base is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
#include <algorithm>
#include <string.h>
#include <osiSock.h>
#include <fdManager.h>
#include <epicsTime.h>
#include <epicsAtomic.h>
#include <epicsThread.h>
#include <epicsUnitTest.h>
#include <testMain.h>
#if __cplusplus<201103L
# define final
# define override
#endif
namespace {
void set_non_blocking(SOCKET sd)
{
osiSockIoctl_t yes = true;
int status = socket_ioctl ( sd,
FIONBIO, & yes);
if(status)
testFail("set_non_blocking fails : %d", SOCKERRNO);
}
// RAII for epicsTimer
struct ScopedTimer {
epicsTimer& timer;
explicit
ScopedTimer(epicsTimer& t) :timer(t) {}
~ScopedTimer() { timer.destroy(); }
};
struct ScopedFDReg {
fdReg * const reg;
explicit
ScopedFDReg(fdReg* reg) :reg(reg) {}
~ScopedFDReg() { reg->destroy(); }
};
// RAII for socket
struct Socket {
SOCKET sd;
Socket() :sd(INVALID_SOCKET) {}
explicit
Socket(SOCKET sd) :sd(sd) {}
Socket(int af, int type)
:sd(epicsSocketCreate(af, type, 0))
{
if(sd==INVALID_SOCKET)
testAbort("failed to allocate socket %d %d", af, type);
}
private:
Socket(const Socket&);
Socket& operator=(const Socket&);
public:
~Socket() {
if(sd!=INVALID_SOCKET)
epicsSocketDestroy(sd);
}
void swap(Socket& o) {
std::swap(sd, o.sd);
}
osiSockAddr bind()
{
osiSockAddr addr = {0};
addr.ia.sin_family = AF_INET;
addr.ia.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
if(::bind(sd, &addr.sa, sizeof(addr)))
testAbort("Unable to bind lo : %d", SOCKERRNO);
osiSockAddr ret;
osiSocklen_t addrlen = sizeof(ret);
if(getsockname(sd, &ret.sa, &addrlen))
testAbort("Unable to getsockname : %d", SOCKERRNO);
(void)addrlen;
if(listen(sd, 1))
testAbort("Unable to listen : %d", SOCKERRNO);
return ret;
}
};
struct DoConnect final : public epicsThreadRunable {
const SOCKET sd;
osiSockAddr to;
DoConnect(SOCKET sd, const osiSockAddr& to)
:sd(sd)
,to(to)
{}
void run() override final {
int err = connect(sd, &to.sa, sizeof(to));
testOk(err==0, "connect() %d %d", err, SOCKERRNO);
}
};
struct DoAccept final : public epicsThreadRunable {
const SOCKET sd;
Socket peer;
osiSockAddr peer_addr;
explicit
DoAccept(SOCKET sd) :sd(sd) {}
void run() override final {
osiSocklen_t len(sizeof(peer_addr));
Socket temp(accept(sd, &peer_addr.sa, &len));
if(temp.sd==INVALID_SOCKET)
testFail("accept() -> %d", SOCKERRNO);
temp.swap(peer);
}
};
struct DoRead final : public epicsThreadRunable {
const SOCKET sd;
char* buf;
unsigned buflen;
int n;
DoRead(SOCKET sd, char* buf, unsigned buflen): sd(sd), buf(buf), buflen(buflen), n(0) {}
void run() override final {
n = recv(sd, buf, buflen, 0);
if(n<0)
testFail("read() -> %d, %d", n, SOCKERRNO);
}
};
struct DoWriteAll final : public epicsThreadRunable {
const SOCKET sd;
const char* buf;
unsigned buflen;
DoWriteAll(SOCKET sd, const char* buf, unsigned buflen): sd(sd), buf(buf), buflen(buflen) {}
void run() override final {
unsigned nsent = 0;
while(nsent<buflen) {
int n = send(sd, nsent+buf, buflen-nsent, 0);
if(n<0) {
testFail("send() -> %d, %d", n, SOCKERRNO);
break;
}
nsent += n;
}
}
};
struct Expire final : public epicsTimerNotify {
bool expired;
Expire() :expired(false) {}
virtual ~Expire() {}
virtual expireStatus expire(const epicsTime &) override final
{
if(!expired) {
expired = true;
testPass("expired");
} else {
testFail("re-expired?");
}
return noRestart;
}
};
struct Event final : public fdReg {
int* ready;
Event(fdManager& mgr, fdRegType evt, SOCKET sd, int* ready)
:fdReg(sd, evt, false, mgr)
,ready(ready)
{}
virtual ~Event() {}
virtual void callBack() override final {
epics::atomic::set(*ready, 1);
}
};
struct OneShot final : public fdReg {
int *mask;
OneShot(fdManager& mgr, fdRegType evt, SOCKET sd, int *mask)
:fdReg(sd, evt, true, mgr)
,mask(mask)
{}
virtual ~OneShot() {
epics::atomic::add(*mask, 2);
}
virtual void callBack() override final {
epics::atomic::add(*mask, 1);
}
};
void testEmpty()
{
fdManager empty;
empty.process(0.1); // ca-gateway always passes 0.01
testPass("Did nothing");
}
void testOnlyTimer()
{
fdManager mgr;
Expire trig, never;
ScopedTimer trig_timer(mgr.createTimer()),
never_timer(mgr.createTimer());
epicsTime now(epicsTime::getCurrent());
trig_timer.timer.start(trig, now+0.1);
never_timer.timer.start(never, now+9999999.0);
mgr.process(0.2);
testOk1(trig.expired);
testOk1(!never.expired);
}
void testSockIO()
{
fdManager mgr;
Socket listener(AF_INET, SOCK_STREAM);
set_non_blocking(listener.sd);
osiSockAddr servAddr(listener.bind());
Socket client(AF_INET, SOCK_STREAM);
Socket server;
// listen() / connect()
{
int readable = 0;
Event evt(mgr, fdrRead, listener.sd, &readable);
DoConnect conn(client.sd, servAddr);
epicsThread connector(conn, "connect", 0);
connector.start();
mgr.process(5.0);
testOk1(readable);
DoAccept acc(listener.sd);
acc.run();
server.swap(acc.peer);
}
set_non_blocking(server.sd);
// writeable
{
int mask = 0;
new OneShot(mgr, fdrWrite, server.sd, &mask);
mgr.process(5.0);
testOk(mask==3, "OneShot event mask %x", mask);
}
// read
{
const char msg[] = "testing";
int readable = 0;
Event evt(mgr, fdrRead, server.sd, &readable);
DoWriteAll op(client.sd, msg, sizeof(msg)-1);
epicsThread writer(op, "writer", 0);
writer.start();
mgr.process(5.0);
testOk1(readable);
char buf[sizeof(msg)] = "";
DoRead(server.sd, buf, sizeof(buf)-1).run();
buf[sizeof(buf)-1] = '\0';
testOk(strcmp(msg, buf)==0, "%s == %s", msg, buf);
}
// timer while unreadable
{
int readable = 0;
Event evt(mgr, fdrRead, server.sd, &readable);
Expire tmo;
ScopedTimer timer(mgr.createTimer());
timer.timer.start(tmo, epicsTime::getCurrent()); // immediate
mgr.process(1.0);
testOk1(!readable);
testOk1(tmo.expired);
}
// notification on close()
{
int readable = 0;
Event evt(mgr, fdrRead, server.sd, &readable);
shutdown(client.sd, SHUT_RDWR);
//Socket().swap(client);
mgr.process(1.0);
testOk1(readable);
}
}
} // namespace
MAIN(fdManagerTest)
{
testPlan(13);
osiSockAttach();
testEmpty();
testOnlyTimer();
testSockIO();
osiSockRelease();
return testDone();
}

View File

@ -1,139 +0,0 @@
/*************************************************************************\
* Copyright (c) 2006 UChicago Argonne LLC, as Operator of Argonne
* National Laboratory.
* Copyright (c) 2002 The Regents of the University of California, as
* Operator of Los Alamos National Laboratory.
* SPDX-License-Identifier: EPICS
* EPICS BASE is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
\*************************************************************************/
#include <stdio.h>
#include <math.h>
#include "fdmgr.h"
#include "epicsTime.h"
#include "epicsAssert.h"
#include "cadef.h"
#define verify(exp) ((exp) ? (void)0 : \
epicsAssert(__FILE__, __LINE__, #exp, epicsAssertAuthor))
static const unsigned uSecPerSec = 1000000;
typedef struct cbStructCreateDestroyFD {
fdctx *pfdm;
int trig;
} cbStructCreateDestroyFD;
void fdHandler (void *pArg)
{
cbStructCreateDestroyFD *pCBFD = (cbStructCreateDestroyFD *) pArg;
printf ("triggered\n");
pCBFD->trig = 1;
}
void fdCreateDestroyHandler (void *pArg, int fd, int open)
{
cbStructCreateDestroyFD *pCBFD = (cbStructCreateDestroyFD *) pArg;
int status;
if (open) {
printf ("new fd = %d\n", fd);
status = fdmgr_add_callback (pCBFD->pfdm, fd, fdi_read, fdHandler, pArg);
verify (status==0);
}
else {
printf ("terminated fd = %d\n", fd);
status = fdmgr_clear_callback (pCBFD->pfdm, fd, fdi_read);
verify (status==0);
}
}
typedef struct cbStuctTimer {
epicsTimeStamp time;
int done;
} cbStruct;
void alarmCB (void *parg)
{
cbStruct *pCBS = (cbStruct *) parg;
epicsTimeGetCurrent (&pCBS->time);
pCBS->done = 1;
}
void testTimer (fdctx *pfdm, double delay)
{
int status;
fdmgrAlarmId aid;
struct timeval tmo;
epicsTimeStamp begin;
cbStruct cbs;
double measuredDelay;
double measuredError;
epicsTimeGetCurrent (&begin);
cbs.done = 0;
tmo.tv_sec = (time_t) delay;
tmo.tv_usec = (unsigned long) ((delay - tmo.tv_sec) * uSecPerSec);
aid = fdmgr_add_timeout (pfdm, &tmo, alarmCB, &cbs);
verify (aid!=fdmgrNoAlarm);
while (!cbs.done) {
tmo.tv_sec = (time_t) delay;
tmo.tv_usec = (unsigned long) ((delay - tmo.tv_sec) * uSecPerSec);
status = fdmgr_pend_event (pfdm, &tmo);
verify (status==0);
}
measuredDelay = epicsTimeDiffInSeconds (&cbs.time, &begin);
measuredError = fabs (measuredDelay-delay);
printf ("measured delay for %lf sec was off by %lf sec (%lf %%)\n",
delay, measuredError, 100.0*measuredError/delay);
}
int main (int argc, char **argv)
{
int status;
fdctx *pfdm;
cbStructCreateDestroyFD cbsfd;
struct timeval tmo;
chid chan;
pfdm = fdmgr_init ();
verify (pfdm);
SEVCHK (ca_task_initialize(), NULL);
cbsfd.pfdm = pfdm;
SEVCHK (ca_add_fd_registration (fdCreateDestroyHandler, &cbsfd), NULL);
/*
* timer test
*/
testTimer (pfdm, 0.001);
testTimer (pfdm, 0.01);
testTimer (pfdm, 0.1);
testTimer (pfdm, 1.0);
if (argc==2) {
SEVCHK(ca_search (argv[1], &chan), NULL);
}
while (1) {
tmo.tv_sec = 0;
tmo.tv_usec = 100000;
cbsfd.trig = 0;
status = fdmgr_pend_event (pfdm, &tmo);
verify (status==0);
ca_poll ();
}
status = fdmgr_delete (pfdm);
verify (status==0);
printf ( "Test Complete\n" );
return 0;
}