2001 lines
58 KiB
C++
2001 lines
58 KiB
C++
|
|
|
|
/* * $Id$
|
|
*
|
|
* L O S A L A M O S
|
|
* Los Alamos National Laboratory
|
|
* Los Alamos, New Mexico 87545
|
|
*
|
|
* Copyright, 1986, The Regents of the University of California.
|
|
*
|
|
* Author: Jeff Hill
|
|
*/
|
|
|
|
#include "iocinf.h"
|
|
|
|
#include "inetAddrID_IL.h"
|
|
#include "bhe_IL.h"
|
|
#include "tcpiiu_IL.h"
|
|
#include "cac_IL.h"
|
|
#include "comBuf_IL.h"
|
|
#include "comQueSend_IL.h"
|
|
#include "netiiu_IL.h"
|
|
#include "nciu_IL.h"
|
|
#include "baseNMIU_IL.h"
|
|
#include "netWriteNotifyIO_IL.h"
|
|
#include "netReadCopyIO_IL.h"
|
|
#include "netReadNotifyIO_IL.h"
|
|
#include "netSubscription_IL.h"
|
|
#include "ioCounter_IL.h"
|
|
|
|
// nill message pad bytes
|
|
static const char nillBytes [] =
|
|
{
|
|
0, 0, 0, 0,
|
|
0, 0, 0, 0
|
|
};
|
|
|
|
// TCP protocol jump table
|
|
const tcpiiu::pProtoStubTCP tcpiiu::tcpJumpTableCAC [] =
|
|
{
|
|
&tcpiiu::noopAction,
|
|
&tcpiiu::eventRespAction,
|
|
&tcpiiu::badTCPRespAction,
|
|
&tcpiiu::readRespAction,
|
|
&tcpiiu::badTCPRespAction,
|
|
&tcpiiu::badTCPRespAction,
|
|
&tcpiiu::badTCPRespAction,
|
|
&tcpiiu::badTCPRespAction,
|
|
&tcpiiu::badTCPRespAction,
|
|
&tcpiiu::badTCPRespAction,
|
|
&tcpiiu::badTCPRespAction,
|
|
&tcpiiu::exceptionRespAction,
|
|
&tcpiiu::clearChannelRespAction,
|
|
&tcpiiu::badTCPRespAction,
|
|
&tcpiiu::badTCPRespAction,
|
|
&tcpiiu::readNotifyRespAction,
|
|
&tcpiiu::badTCPRespAction,
|
|
&tcpiiu::badTCPRespAction,
|
|
&tcpiiu::claimCIURespAction,
|
|
&tcpiiu::writeNotifyRespAction,
|
|
&tcpiiu::badTCPRespAction,
|
|
&tcpiiu::badTCPRespAction,
|
|
&tcpiiu::accessRightsRespAction,
|
|
&tcpiiu::echoRespAction,
|
|
&tcpiiu::badTCPRespAction,
|
|
&tcpiiu::badTCPRespAction,
|
|
&tcpiiu::verifyAndDisconnectChan,
|
|
&tcpiiu::verifyAndDisconnectChan
|
|
};
|
|
|
|
#ifdef DEBUG
|
|
# define debugPrintf(argsInParen) printf argsInParen
|
|
#else
|
|
# define debugPrintf(argsInParen)
|
|
#endif
|
|
|
|
#ifdef CONVERSION_REQUIRED
|
|
extern CACVRTFUNC *cac_dbr_cvrt[];
|
|
#endif /*CONVERSION_REQUIRED*/
|
|
|
|
const static char nullBuff[32] = {
|
|
0,0,0,0,0,0,0,0,0,0,
|
|
0,0,0,0,0,0,0,0,0,0,
|
|
0,0,0,0,0,0,0,0,0,0,
|
|
0,0
|
|
};
|
|
|
|
//
|
|
// cacSendThreadTCP ()
|
|
//
|
|
// care is taken to not hold the lock while sending a message
|
|
//
|
|
extern "C" void cacSendThreadTCP ( void *pParam )
|
|
{
|
|
tcpiiu *piiu = ( tcpiiu * ) pParam;
|
|
//claimMsgCache cache ( CA_V44 ( CA_PROTOCOL_VERSION, piiu->minorProtocolVersion ) );
|
|
bool laborNeeded;
|
|
|
|
while ( true ) {
|
|
|
|
epicsEventMustWait ( piiu->sendThreadFlushSignal );
|
|
|
|
if ( piiu->state != iiu_connected ) {
|
|
break;
|
|
}
|
|
|
|
{
|
|
epicsAutoMutex autoMutex ( piiu->mutex );
|
|
laborNeeded = piiu->busyStateDetected != piiu->flowControlActive;
|
|
}
|
|
|
|
if ( laborNeeded ) {
|
|
if ( piiu->flowControlActive ) {
|
|
int status = piiu->disableFlowControlRequest ();
|
|
if ( status == ECA_NORMAL ) {
|
|
piiu->flowControlActive = false;
|
|
}
|
|
# if defined ( DEBUG )
|
|
printf ( "fc off\n" );
|
|
# endif
|
|
}
|
|
else {
|
|
int status = piiu->enableFlowControlRequest ();
|
|
if ( status == ECA_NORMAL ) {
|
|
piiu->flowControlActive = true;
|
|
}
|
|
# if defined ( DEBUG )
|
|
printf ( "fc on\n" );
|
|
# endif
|
|
}
|
|
piiu->flushPending = true;
|
|
}
|
|
|
|
{
|
|
epicsAutoMutex autoMutex ( piiu->mutex );
|
|
laborNeeded = piiu->echoRequestPending;
|
|
piiu->echoRequestPending = false;
|
|
}
|
|
|
|
if ( laborNeeded ) {
|
|
if ( CA_V43 ( CA_PROTOCOL_VERSION, piiu->minorProtocolVersion ) ) {
|
|
piiu->echoRequest ();
|
|
}
|
|
else {
|
|
piiu->noopRequest ();
|
|
}
|
|
piiu->flushPending = true;
|
|
}
|
|
|
|
{
|
|
epicsAutoMutex autoMutex ( piiu->mutex );
|
|
laborNeeded = piiu->flushPending;
|
|
piiu->flushPending = false;
|
|
}
|
|
|
|
if ( laborNeeded ) {
|
|
if ( ! piiu->flushToWire ( false ) ) {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
epicsEventSignal ( piiu->sendThreadExitSignal );
|
|
}
|
|
|
|
unsigned tcpiiu::sendBytes ( const void *pBuf,
|
|
unsigned nBytesInBuf )
|
|
{
|
|
int status;
|
|
unsigned nBytes;
|
|
|
|
if ( this->state != iiu_connected ) {
|
|
return 0u;
|
|
}
|
|
|
|
assert ( nBytesInBuf <= INT_MAX );
|
|
|
|
this->armSendWatchdog ();
|
|
|
|
while ( true ) {
|
|
status = ::send ( this->sock,
|
|
static_cast < const char * > (pBuf), (int) nBytesInBuf, 0 );
|
|
if ( status > 0 ) {
|
|
nBytes = static_cast <unsigned> ( status );
|
|
break;
|
|
}
|
|
else {
|
|
int localError = SOCKERRNO;
|
|
|
|
if ( status == 0 ) {
|
|
this->cleanShutdown ();
|
|
nBytes = 0u;
|
|
break;
|
|
}
|
|
|
|
if ( localError == SOCK_SHUTDOWN ) {
|
|
nBytes = 0u;
|
|
break;
|
|
}
|
|
|
|
if ( localError == SOCK_EINTR ) {
|
|
continue;
|
|
}
|
|
|
|
if ( localError != SOCK_EPIPE && localError != SOCK_ECONNRESET &&
|
|
localError != SOCK_ETIMEDOUT && localError != SOCK_ECONNABORTED ) {
|
|
ca_printf ("CAC: unexpected TCP send error: %s\n", SOCKERRSTR (localError) );
|
|
}
|
|
|
|
this->cleanShutdown ();
|
|
nBytes = 0u;
|
|
break;
|
|
}
|
|
}
|
|
|
|
this->cancelSendWatchdog ();
|
|
|
|
return nBytes;
|
|
}
|
|
|
|
unsigned tcpiiu::recvBytes ( void *pBuf, unsigned nBytesInBuf )
|
|
{
|
|
unsigned totalBytes;
|
|
int status;
|
|
|
|
if ( this->state != iiu_connected ) {
|
|
return 0u;
|
|
}
|
|
|
|
assert ( nBytesInBuf <= INT_MAX );
|
|
status = ::recv ( this->sock, static_cast <char *> ( pBuf ),
|
|
static_cast <int> ( nBytesInBuf ), 0);
|
|
if ( status <= 0 ) {
|
|
int localErrno = SOCKERRNO;
|
|
|
|
if ( status == 0 ) {
|
|
this->cleanShutdown ();
|
|
return 0u;
|
|
}
|
|
|
|
if ( localErrno == SOCK_SHUTDOWN ) {
|
|
return 0u;
|
|
}
|
|
|
|
if ( localErrno == SOCK_EINTR ) {
|
|
return 0u;
|
|
}
|
|
|
|
if ( localErrno == SOCK_ECONNABORTED ) {
|
|
return 0u;
|
|
}
|
|
|
|
if ( localErrno == SOCK_ECONNRESET ) {
|
|
return 0u;
|
|
}
|
|
|
|
{
|
|
char name[64];
|
|
this->hostName ( name, sizeof (name) );
|
|
ca_printf ( "Disconnecting from CA server %s because: %s\n",
|
|
name, SOCKERRSTR (localErrno) );
|
|
}
|
|
|
|
this->cleanShutdown ();
|
|
|
|
return 0u;
|
|
}
|
|
|
|
assert ( static_cast <unsigned> ( status ) <= nBytesInBuf );
|
|
totalBytes = static_cast <unsigned> ( status );
|
|
|
|
{
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
if ( nBytesInBuf == totalBytes ) {
|
|
if ( this->contigRecvMsgCount >= contiguousMsgCountWhichTriggersFlowControl ) {
|
|
this->busyStateDetected = true;
|
|
}
|
|
else {
|
|
this->contigRecvMsgCount++;
|
|
}
|
|
}
|
|
else {
|
|
this->contigRecvMsgCount = 0u;
|
|
this->busyStateDetected = false;
|
|
}
|
|
}
|
|
|
|
this->messageArrivalNotify (); // reschedule connection activity watchdog
|
|
|
|
return totalBytes;
|
|
}
|
|
|
|
/*
|
|
* cacRecvThreadTCP ()
|
|
*/
|
|
extern "C" void cacRecvThreadTCP ( void *pParam )
|
|
{
|
|
tcpiiu *piiu = ( tcpiiu * ) pParam;
|
|
|
|
piiu->connect ();
|
|
|
|
{
|
|
epicsAutoMutex autoMutex ( piiu->mutex );
|
|
if ( piiu->state == iiu_connected ) {
|
|
unsigned priorityOfSend;
|
|
threadBoolStatus tbs;
|
|
threadId tid;
|
|
|
|
tbs = threadLowestPriorityLevelAbove ( piiu->pCAC ()->getInitializingThreadsPriority (), &priorityOfSend );
|
|
if ( tbs != tbsSuccess ) {
|
|
priorityOfSend = piiu->pCAC ()->getInitializingThreadsPriority ();
|
|
}
|
|
tid = threadCreate ( "CAC-TCP-send", priorityOfSend,
|
|
threadGetStackSize ( threadStackMedium ), cacSendThreadTCP, piiu );
|
|
if ( ! tid ) {
|
|
epicsEventSignal ( piiu->recvThreadExitSignal );
|
|
epicsEventSignal ( piiu->sendThreadExitSignal );
|
|
piiu->cleanShutdown ();
|
|
return;
|
|
}
|
|
}
|
|
else {
|
|
epicsEventSignal ( piiu->recvThreadExitSignal );
|
|
epicsEventSignal ( piiu->sendThreadExitSignal );
|
|
piiu->cleanShutdown ();
|
|
return;
|
|
}
|
|
}
|
|
|
|
while ( piiu->state == iiu_connected ) {
|
|
unsigned nBytes;
|
|
{
|
|
epicsAutoMutex autoMutex ( piiu->mutex );
|
|
nBytes = piiu->recvQue.occupiedBytes ();
|
|
}
|
|
if ( nBytes >= 0x4000 ) {
|
|
epicsEventMustWait ( piiu->recvThreadRingBufferSpaceAvailableSignal );
|
|
}
|
|
else {
|
|
comBuf * pComBuf = new comBuf;
|
|
if ( pComBuf ) {
|
|
unsigned nBytesIn = pComBuf->fillFromWire ( *piiu );
|
|
if ( nBytesIn ) {
|
|
{
|
|
epicsAutoMutex autoMutex ( piiu->mutex );
|
|
piiu->recvQue.pushLastComBufReceived ( *pComBuf );
|
|
}
|
|
piiu->pCAC ()->signalRecvActivity ();
|
|
}
|
|
else {
|
|
pComBuf->destroy ();
|
|
}
|
|
}
|
|
else {
|
|
// no way to be informed when memory is available
|
|
threadSleep ( 0.5 );
|
|
}
|
|
}
|
|
}
|
|
|
|
epicsEventSignal ( piiu->recvThreadExitSignal );
|
|
}
|
|
|
|
//
|
|
// tcpiiu::tcpiiu ()
|
|
//
|
|
tcpiiu::tcpiiu ( cac &cac, double connectionTimeout, osiTimerQueue &timerQueue ) :
|
|
tcpRecvWatchdog ( connectionTimeout, timerQueue ),
|
|
tcpSendWatchdog ( connectionTimeout, timerQueue ),
|
|
netiiu ( &cac ),
|
|
ioTable ( 1024 ),
|
|
sendQue ( *this ),
|
|
pHostNameCache ( 0 ),
|
|
curDataMax ( 0ul ),
|
|
pBHE ( 0 ),
|
|
pCurData ( 0 ),
|
|
minorProtocolVersion ( 0u ),
|
|
state ( iiu_connecting ),
|
|
sock ( INVALID_SOCKET ),
|
|
contigRecvMsgCount ( 0u ),
|
|
busyStateDetected ( false ),
|
|
flowControlActive ( false ),
|
|
echoRequestPending ( false ),
|
|
flushPending ( false ),
|
|
msgHeaderAvailable ( false ),
|
|
sockCloseCompleted ( false )
|
|
{
|
|
this->addr.sa.sa_family = AF_UNSPEC;
|
|
|
|
this->sendThreadExitSignal = epicsEventCreate ( epicsEventEmpty );
|
|
if ( ! this->sendThreadExitSignal ) {
|
|
ca_printf ("CA: unable to create CA client send thread exit semaphore\n");
|
|
this->fullyConstructedFlag = false;
|
|
return;
|
|
}
|
|
|
|
this->recvThreadExitSignal = epicsEventCreate ( epicsEventEmpty );
|
|
if ( ! this->recvThreadExitSignal ) {
|
|
ca_printf ("CA: unable to create CA client send thread exit semaphore\n");
|
|
epicsEventDestroy (this->sendThreadExitSignal);
|
|
this->fullyConstructedFlag = false;
|
|
return;
|
|
}
|
|
|
|
this->sendThreadFlushSignal = epicsEventCreate ( epicsEventEmpty );
|
|
if ( ! this->sendThreadFlushSignal ) {
|
|
ca_printf ("CA: unable to create sendThreadFlushSignal object\n");
|
|
epicsEventDestroy (this->sendThreadExitSignal);
|
|
this->fullyConstructedFlag = false;
|
|
return;
|
|
}
|
|
|
|
this->recvThreadRingBufferSpaceAvailableSignal = epicsEventCreate ( epicsEventEmpty );
|
|
if ( ! this->recvThreadRingBufferSpaceAvailableSignal ) {
|
|
ca_printf ("CA: unable to create recvThreadRingBufferSpaceAvailableSignal object\n");
|
|
epicsEventDestroy (this->sendThreadExitSignal);
|
|
epicsEventDestroy (this->sendThreadFlushSignal);
|
|
this->fullyConstructedFlag = false;
|
|
return;
|
|
}
|
|
|
|
this->fullyConstructedFlag = true;
|
|
}
|
|
|
|
/*
|
|
* tcpiiu::initiateConnect ()
|
|
*/
|
|
bool tcpiiu::initiateConnect ( const osiSockAddr &addrIn, unsigned minorVersion,
|
|
class bhe &bhe, ipAddrToAsciiEngine &engineIn )
|
|
{
|
|
unsigned priorityOfRecv;
|
|
threadBoolStatus tbs;
|
|
threadId tid;
|
|
int status;
|
|
int flag;
|
|
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
|
|
this->addr = addrIn;
|
|
|
|
this->pHostNameCache = new hostNameCache ( addrIn, engineIn );
|
|
if ( ! this->pHostNameCache ) {
|
|
return false;
|
|
}
|
|
|
|
this->pBHE = &bhe;
|
|
bhe.bindToIIU ( *this );
|
|
|
|
this->state = iiu_connecting;
|
|
this->minorProtocolVersion = minorVersion;
|
|
|
|
this->contigRecvMsgCount = 0u;
|
|
this->busyStateDetected = false;
|
|
this->flowControlActive = false;
|
|
this->echoRequestPending = false;
|
|
this->flushPending = false;
|
|
this->msgHeaderAvailable = false;
|
|
this->sockCloseCompleted = false;
|
|
|
|
// first message informs server of user and host name of client
|
|
this->userNameSetRequest ();
|
|
this->hostNameSetRequest ();
|
|
|
|
this->sock = socket ( AF_INET, SOCK_STREAM, IPPROTO_TCP );
|
|
if ( this->sock == INVALID_SOCKET ) {
|
|
ca_printf ( "CAC: unable to create virtual circuit because \"%s\"\n",
|
|
SOCKERRSTR ( SOCKERRNO ) );
|
|
return false;
|
|
}
|
|
|
|
flag = TRUE;
|
|
status = setsockopt ( this->sock, IPPROTO_TCP, TCP_NODELAY,
|
|
(char *) &flag, sizeof ( flag ) );
|
|
if ( status < 0 ) {
|
|
ca_printf ("CAC: problems setting socket option TCP_NODELAY = \"%s\"\n",
|
|
SOCKERRSTR (SOCKERRNO));
|
|
}
|
|
|
|
flag = TRUE;
|
|
status = setsockopt ( this->sock , SOL_SOCKET, SO_KEEPALIVE,
|
|
( char * ) &flag, sizeof ( flag ) );
|
|
if ( status < 0 ) {
|
|
ca_printf ( "CAC: problems setting socket option SO_KEEPALIVE = \"%s\"\n",
|
|
SOCKERRSTR ( SOCKERRNO ) );
|
|
}
|
|
|
|
#if 0
|
|
{
|
|
int i;
|
|
|
|
/*
|
|
* some concern that vxWorks will run out of mBuf's
|
|
* if this change is made joh 11-10-98
|
|
*/
|
|
i = MAX_MSG_SIZE;
|
|
status = setsockopt ( this->sock, SOL_SOCKET, SO_SNDBUF,
|
|
( char * ) &i, sizeof ( i ) );
|
|
if (status < 0) {
|
|
ca_printf ("CAC: problems setting socket option SO_SNDBUF = \"%s\"\n",
|
|
SOCKERRSTR ( SOCKERRNO ) );
|
|
}
|
|
i = MAX_MSG_SIZE;
|
|
status = setsockopt ( this->sock, SOL_SOCKET, SO_RCVBUF,
|
|
( char * ) &i, sizeof ( i ) );
|
|
if ( status < 0 ) {
|
|
ca_printf ("CAC: problems setting socket option SO_RCVBUF = \"%s\"\n",
|
|
SOCKERRSTR (SOCKERRNO));
|
|
}
|
|
}
|
|
#endif
|
|
|
|
memset ( (void *) &this->curMsg, '\0', sizeof ( this->curMsg ) );
|
|
|
|
tbs = threadHighestPriorityLevelBelow ( this->pCAC ()->getInitializingThreadsPriority (), &priorityOfRecv );
|
|
if ( tbs != tbsSuccess ) {
|
|
priorityOfRecv = this->pCAC ()->getInitializingThreadsPriority ();
|
|
}
|
|
|
|
tid = threadCreate ("CAC-TCP-recv", priorityOfRecv,
|
|
threadGetStackSize (threadStackMedium), cacRecvThreadTCP, this);
|
|
if ( tid == 0 ) {
|
|
ca_printf ("CA: unable to create CA client receive thread\n");
|
|
socket_close ( this->sock );
|
|
return false;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
/*
|
|
* tcpiiu::connect ()
|
|
*/
|
|
void tcpiiu::connect ()
|
|
{
|
|
/*
|
|
* attempt to connect to a CA server
|
|
*/
|
|
this->armSendWatchdog ();
|
|
while ( ! this->sockCloseCompleted ) {
|
|
|
|
int status = ::connect ( this->sock, &this->addr.sa, sizeof ( addr.sa ) );
|
|
|
|
if ( status == 0 ) {
|
|
|
|
this->cancelSendWatchdog ();
|
|
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
|
|
if ( this->state == iiu_connecting ) {
|
|
// put the iiu into the connected state
|
|
this->state = iiu_connected;
|
|
|
|
// start connection activity watchdog
|
|
this->connectNotify ();
|
|
}
|
|
|
|
return;
|
|
}
|
|
|
|
int errnoCpy = SOCKERRNO;
|
|
|
|
if ( errnoCpy == SOCK_EINTR ) {
|
|
if ( this->state != iiu_connecting ) {
|
|
this->cancelSendWatchdog ();
|
|
return;
|
|
}
|
|
else {
|
|
continue;
|
|
}
|
|
}
|
|
else if ( errnoCpy == SOCK_SHUTDOWN ) {
|
|
this->cancelSendWatchdog ();
|
|
return;
|
|
}
|
|
else {
|
|
this->cancelSendWatchdog ();
|
|
ca_printf ( "Unable to connect because %d=\"%s\"\n",
|
|
errnoCpy, SOCKERRSTR ( errnoCpy ) );
|
|
this->cleanShutdown ();
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
/*
|
|
* tcpiiu::cleanShutdown ()
|
|
*/
|
|
void tcpiiu::cleanShutdown ()
|
|
{
|
|
this->cancelSendWatchdog ();
|
|
this->cancelRecvWatchdog ();
|
|
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
|
|
if ( this->state == iiu_connected ) {
|
|
int status = ::shutdown ( this->sock, SD_BOTH );
|
|
if ( status ) {
|
|
errlogPrintf ("CAC TCP socket shutdown error was %s\n",
|
|
SOCKERRSTR (SOCKERRNO) );
|
|
status = socket_close ( this->sock );
|
|
if ( status ) {
|
|
errlogPrintf ("CAC TCP socket close error was %s\n",
|
|
SOCKERRSTR (SOCKERRNO) );
|
|
}
|
|
else {
|
|
this->sockCloseCompleted = true;
|
|
this->state = iiu_disconnected;
|
|
}
|
|
}
|
|
else {
|
|
this->state = iiu_disconnected;
|
|
}
|
|
|
|
}
|
|
else if ( this->state == iiu_connecting ) {
|
|
int status = socket_close ( this->sock );
|
|
if ( status ) {
|
|
errlogPrintf ("CAC TCP socket close error was %s\n",
|
|
SOCKERRSTR (SOCKERRNO) );
|
|
}
|
|
else {
|
|
this->sockCloseCompleted = true;
|
|
this->state = iiu_disconnected;
|
|
}
|
|
}
|
|
epicsEventSignal ( this->sendThreadFlushSignal );
|
|
epicsEventSignal ( this->recvThreadRingBufferSpaceAvailableSignal );
|
|
this->pCAC ()->signalRecvActivity ();
|
|
}
|
|
|
|
/*
|
|
* tcpiiu::forcedShutdown ()
|
|
*/
|
|
void tcpiiu::forcedShutdown ()
|
|
{
|
|
this->cancelSendWatchdog ();
|
|
this->cancelRecvWatchdog ();
|
|
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
|
|
if ( this->state != iiu_disconnected ) {
|
|
// force abortive shutdown sequence (discard outstanding sends
|
|
// and receives
|
|
struct linger tmpLinger;
|
|
tmpLinger.l_onoff = true;
|
|
tmpLinger.l_linger = 0u;
|
|
int status = setsockopt ( this->sock, SOL_SOCKET, SO_LINGER,
|
|
reinterpret_cast <char *> ( &tmpLinger ), sizeof (tmpLinger) );
|
|
if ( status != 0 ) {
|
|
errlogPrintf ( "CAC TCP socket linger set error was %s\n",
|
|
SOCKERRSTR (SOCKERRNO) );
|
|
}
|
|
|
|
status = socket_close ( this->sock );
|
|
if ( status ) {
|
|
errlogPrintf ("CAC TCP socket close error was %s\n",
|
|
SOCKERRSTR (SOCKERRNO) );
|
|
}
|
|
else {
|
|
this->state = iiu_disconnected;
|
|
this->sockCloseCompleted = true;
|
|
}
|
|
}
|
|
|
|
epicsEventSignal ( this->sendThreadFlushSignal );
|
|
epicsEventSignal ( this->recvThreadRingBufferSpaceAvailableSignal );
|
|
this->pCAC ()->signalRecvActivity ();
|
|
}
|
|
|
|
void tcpiiu::disconnect ()
|
|
{
|
|
assert ( this->fullyConstructedFlag );
|
|
|
|
{
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
this->ioTable.traverse ( &baseNMIU::destroy );
|
|
}
|
|
|
|
this->cleanShutdown ();
|
|
|
|
// wait for send thread to exit
|
|
static const double shutdownDelay = 15.0;
|
|
epicsEventWaitStatus semStat;
|
|
while ( true ) {
|
|
semStat = epicsEventWaitWithTimeout ( this->sendThreadExitSignal, shutdownDelay );
|
|
if ( semStat == epicsEventWaitOK ) {
|
|
break;
|
|
}
|
|
assert ( semStat == epicsEventWaitTimeout );
|
|
if ( ! this->sockCloseCompleted ) {
|
|
printf ( "Gave up waiting for \"shutdown()\" to force send thread to exit after %f sec\n",
|
|
shutdownDelay);
|
|
printf ( "Closing socket\n" );
|
|
int status = socket_close ( this->sock );
|
|
if ( status ) {
|
|
errlogPrintf ("CAC TCP socket close error was %s\n",
|
|
SOCKERRSTR ( SOCKERRNO ) );
|
|
}
|
|
else {
|
|
this->sockCloseCompleted = true;
|
|
}
|
|
}
|
|
}
|
|
|
|
// wait for recv thread to exit
|
|
while ( true ) {
|
|
semStat = epicsEventWaitWithTimeout ( this->recvThreadExitSignal, shutdownDelay );
|
|
if ( semStat == epicsEventWaitOK ) {
|
|
break;
|
|
}
|
|
assert ( semStat == epicsEventWaitTimeout );
|
|
if ( ! this->sockCloseCompleted ) {
|
|
printf ( "Gave up waiting for \"shutdown()\" to force receive thread to exit after %f sec\n",
|
|
shutdownDelay);
|
|
printf ( "Closing socket\n" );
|
|
int status = socket_close ( this->sock );
|
|
if ( status ) {
|
|
errlogPrintf ("CAC TCP socket close error was %s\n",
|
|
SOCKERRSTR ( SOCKERRNO ) );
|
|
}
|
|
else {
|
|
this->sockCloseCompleted = true;
|
|
}
|
|
}
|
|
}
|
|
|
|
if ( ! this->sockCloseCompleted ) {
|
|
int status = socket_close ( this->sock );
|
|
if ( status ) {
|
|
errlogPrintf ("CAC TCP socket close error was %s\n",
|
|
SOCKERRSTR ( SOCKERRNO ) );
|
|
}
|
|
else {
|
|
this->sockCloseCompleted = true;
|
|
}
|
|
}
|
|
|
|
/*
|
|
* free message body cache
|
|
*/
|
|
{
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
|
|
if ( this->pCurData ) {
|
|
free ( this->pCurData );
|
|
this->pCurData = 0;
|
|
this->curDataMax = 0u;
|
|
}
|
|
|
|
this->addr.sa.sa_family = AF_UNSPEC;
|
|
|
|
this->minorProtocolVersion = 0u;
|
|
if ( this->pHostNameCache ) {
|
|
this->pHostNameCache->destroy ();
|
|
this->pHostNameCache = 0;
|
|
}
|
|
this->pBHE = 0;
|
|
this->sendQue.clear ();
|
|
this->recvQue.clear ();
|
|
}
|
|
}
|
|
|
|
|
|
//
|
|
// tcpiiu::~tcpiiu ()
|
|
//
|
|
tcpiiu::~tcpiiu ()
|
|
{
|
|
if ( ! this->fullyConstructedFlag ) {
|
|
return;
|
|
}
|
|
|
|
epicsEventDestroy ( this->sendThreadExitSignal );
|
|
epicsEventDestroy ( this->recvThreadExitSignal );
|
|
epicsEventDestroy ( this->sendThreadFlushSignal );
|
|
epicsEventDestroy ( this->recvThreadRingBufferSpaceAvailableSignal );
|
|
|
|
if ( this->pHostNameCache ) {
|
|
this->pHostNameCache->destroy ();
|
|
}
|
|
|
|
// this->pBHE lifetime management is handled by the class that creates this object
|
|
}
|
|
|
|
void tcpiiu::suicide ()
|
|
{
|
|
delete this;
|
|
}
|
|
|
|
bool tcpiiu::isVirtaulCircuit ( const char *pChannelName, const osiSockAddr &addrIn ) const
|
|
{
|
|
osiSockAddr addrTmp = this->addr;
|
|
|
|
if ( addrTmp.sa.sa_family == AF_UNSPEC ) {
|
|
return false;
|
|
}
|
|
|
|
bool match;
|
|
|
|
if ( addrTmp.sa.sa_family != addrIn.sa.sa_family ) {
|
|
match = false;
|
|
}
|
|
else if ( addrTmp.sa.sa_family != AF_INET ) {
|
|
match = false;
|
|
}
|
|
else if ( addrTmp.ia.sin_addr.s_addr != addrIn.ia.sin_addr.s_addr ) {
|
|
match = false;
|
|
}
|
|
else if ( addrTmp.ia.sin_port != addrIn.ia.sin_port ) {
|
|
match = false;
|
|
}
|
|
else {
|
|
match = true;
|
|
}
|
|
|
|
if ( ! match ) {
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
char acc[64];
|
|
if ( this->pHostNameCache ) {
|
|
this->pHostNameCache->hostName ( acc, sizeof ( acc ) );
|
|
assert ( this->pCAC () );
|
|
msgForMultiplyDefinedPV *pMsg = new msgForMultiplyDefinedPV (
|
|
*this->pCAC (), pChannelName, acc, addr );
|
|
if ( pMsg ) {
|
|
this->pCAC ()->ipAddrToAsciiAsynchronousRequestInstall ( *pMsg );
|
|
}
|
|
}
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
void tcpiiu::show ( unsigned level ) const
|
|
{
|
|
epicsAutoMutex autoMuext ( this->mutex );
|
|
char buf[256];
|
|
if ( this->pHostNameCache ) {
|
|
this->pHostNameCache->hostName ( buf, sizeof ( buf ) );
|
|
}
|
|
else {
|
|
strncpy ( buf, "<disconnected>", sizeof ( buf ) );
|
|
buf [ sizeof ( buf ) - 1 ] = '\0';
|
|
}
|
|
printf ( "Virtual circuit to \"%s\" at version %u.%u state %u\n",
|
|
buf, CA_PROTOCOL_VERSION, this->minorProtocolVersion,
|
|
this->state );
|
|
if ( level > 1u ) {
|
|
this->netiiu::show ( level - 1u );
|
|
}
|
|
if ( level > 2u ) {
|
|
printf ( "\tcurrent data cache pointer = %p current data cache size = %lu\n",
|
|
this->pCurData, this->curDataMax );
|
|
printf ( "\tcontiguous receive message count=%u, busy detect bool=%u, flow control bool=%u\n",
|
|
this->contigRecvMsgCount, this->busyStateDetected, this->flowControlActive );
|
|
}
|
|
if ( level > 3u ) {
|
|
printf ( "\tvirtual circuit socket identifier %d\n", this->sock );
|
|
printf ( "\tsend thread flush signal:\n" );
|
|
epicsEventShow ( this->sendThreadFlushSignal, level-3u );
|
|
printf ( "\trecv thread buffer space available signal:\n" );
|
|
epicsEventShow ( this->recvThreadRingBufferSpaceAvailableSignal, level-3u );
|
|
printf ( "\tsend thread exit signal:\n" );
|
|
epicsEventShow ( this->sendThreadExitSignal, level-3u );
|
|
printf ( "\trecv thread exit signal:\n" );
|
|
epicsEventShow ( this->recvThreadExitSignal, level-3u );
|
|
printf ( "\tfully constructed bool %u\n", this->fullyConstructedFlag );
|
|
printf ("\techo pending bool = %u\n", this->echoRequestPending );
|
|
printf ("\tflush pending bool = %u\n", this->flushPending );
|
|
printf ("\treceive message header available bool = %u\n", this->msgHeaderAvailable );
|
|
if ( this->pBHE ) {
|
|
this->pBHE->show ( level - 3u );
|
|
}
|
|
::printf ( "IO identifier hash table:\n" );
|
|
this->ioTable.show ( level - 3u );
|
|
}
|
|
}
|
|
|
|
bool tcpiiu::setEchoRequestPending ()
|
|
{
|
|
{
|
|
epicsAutoMutex autoMuext ( this->mutex );
|
|
this->echoRequestPending = true;
|
|
}
|
|
this->flush ();
|
|
if ( CA_V43 (CA_PROTOCOL_VERSION, this->minorProtocolVersion ) ) {
|
|
// we send an echo
|
|
return true;
|
|
}
|
|
else {
|
|
// we send a NOOP
|
|
return false;
|
|
}
|
|
}
|
|
|
|
/*
|
|
* tcpiiu::hostNameSetRequest ()
|
|
*/
|
|
int tcpiiu::hostNameSetRequest ()
|
|
{
|
|
if ( ! CA_V41 ( CA_PROTOCOL_VERSION, this->minorProtocolVersion ) ) {
|
|
return ECA_NORMAL;
|
|
}
|
|
|
|
const char *pName = localHostNameAtLoadTime.pointer ();
|
|
unsigned size = strlen ( pName ) + 1u;
|
|
unsigned postSize = CA_MESSAGE_ALIGN ( size );
|
|
assert ( postSize < 0xffff );
|
|
|
|
if ( this->sendQue.flushThreshold ( postSize + 16u ) ) {
|
|
this->flush ();
|
|
}
|
|
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
|
|
int status = this->sendQue.reserveSpace ( postSize + 16u );
|
|
if ( status == ECA_NORMAL ) {
|
|
this->sendQue.pushUInt16 ( CA_PROTO_HOST_NAME ); // cmd
|
|
this->sendQue.pushUInt16 ( postSize ); // postsize
|
|
this->sendQue.pushUInt16 ( 0u ); // dataType
|
|
this->sendQue.pushUInt16 ( 0u ); // count
|
|
this->sendQue.pushUInt32 ( static_cast <ca_uint32_t> ( 0u ) ); // cid
|
|
this->sendQue.pushUInt32 ( static_cast <ca_uint32_t> ( 0u ) ); // available
|
|
|
|
this->sendQue.pushString ( pName, size );
|
|
this->sendQue.pushString ( nillBytes, postSize - size );
|
|
}
|
|
|
|
return status;
|
|
}
|
|
|
|
/*
|
|
* tcpiiu::userNameSetRequest ()
|
|
*/
|
|
int tcpiiu::userNameSetRequest ()
|
|
{
|
|
if ( ! CA_V41 ( CA_PROTOCOL_VERSION, this->minorProtocolVersion ) ) {
|
|
return ECA_NORMAL;
|
|
}
|
|
|
|
const char *pName = this->pCAC ()->userNamePointer ();
|
|
unsigned size = strlen ( pName ) + 1u;
|
|
unsigned postSize = CA_MESSAGE_ALIGN ( size );
|
|
assert ( postSize < 0xffff );
|
|
|
|
if ( this->sendQue.flushThreshold ( postSize + 16u ) ) {
|
|
this->flush ();
|
|
}
|
|
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
|
|
int status = this->sendQue.reserveSpace ( postSize + 16u );
|
|
if ( status == ECA_NORMAL ) {
|
|
this->sendQue.pushUInt16 ( CA_PROTO_CLIENT_NAME ); // cmd
|
|
this->sendQue.pushUInt16 ( postSize ); // postsize
|
|
this->sendQue.pushUInt16 ( 0u ); // dataType
|
|
this->sendQue.pushUInt16 ( 0u ); // count
|
|
this->sendQue.pushUInt32 ( 0u ); // cid
|
|
this->sendQue.pushUInt32 ( 0u ); // available
|
|
|
|
this->sendQue.pushString ( pName, size );
|
|
this->sendQue.pushString ( nillBytes, postSize - size );
|
|
}
|
|
|
|
return status;
|
|
}
|
|
|
|
int tcpiiu::disableFlowControlRequest ()
|
|
{
|
|
if ( this->sendQue.flushThreshold ( 16u ) ) {
|
|
this->flush ();
|
|
}
|
|
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
|
|
int status = this->sendQue.reserveSpace ( 16u );
|
|
if ( status == ECA_NORMAL ) {
|
|
this->sendQue.pushUInt16 ( CA_PROTO_EVENTS_ON ); // cmd
|
|
this->sendQue.pushUInt16 ( 0u ); // postsize
|
|
this->sendQue.pushUInt16 ( 0u ); // dataType
|
|
this->sendQue.pushUInt16 ( 0u ); // count
|
|
this->sendQue.pushUInt32 ( 0u ); // cid
|
|
this->sendQue.pushUInt32 ( 0u ); // available
|
|
}
|
|
|
|
return status;
|
|
}
|
|
|
|
int tcpiiu::enableFlowControlRequest ()
|
|
{
|
|
if ( this->sendQue.flushThreshold ( 16u ) ) {
|
|
this->flush ();
|
|
}
|
|
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
|
|
int status = this->sendQue.reserveSpace ( 16u );
|
|
if ( status == ECA_NORMAL ) {
|
|
this->sendQue.pushUInt16 ( CA_PROTO_EVENTS_OFF ); // cmd
|
|
this->sendQue.pushUInt16 ( 0u ); // postsize
|
|
this->sendQue.pushUInt16 ( 0u ); // dataType
|
|
this->sendQue.pushUInt16 ( 0u ); // count
|
|
this->sendQue.pushUInt32 ( 0u ); // cid
|
|
this->sendQue.pushUInt32 ( 0u ); // available
|
|
}
|
|
|
|
return status;
|
|
}
|
|
|
|
int tcpiiu::noopRequest ()
|
|
{
|
|
if ( this->sendQue.flushThreshold ( 16u ) ) {
|
|
this->flush ();
|
|
}
|
|
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
|
|
int status = this->sendQue.reserveSpace ( 16u );
|
|
if ( status == ECA_NORMAL ) {
|
|
this->sendQue.pushUInt16 ( CA_PROTO_NOOP ); // cmd
|
|
this->sendQue.pushUInt16 ( 0u ); // postsize
|
|
this->sendQue.pushUInt16 ( 0u ); // dataType
|
|
this->sendQue.pushUInt16 ( 0u ); // count
|
|
this->sendQue.pushUInt32 ( 0u ); // cid
|
|
this->sendQue.pushUInt32 ( 0u ); // available
|
|
}
|
|
|
|
return status;
|
|
}
|
|
|
|
int tcpiiu::echoRequest ()
|
|
{
|
|
if ( this->sendQue.flushThreshold ( 16u ) ) {
|
|
this->flush ();
|
|
}
|
|
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
|
|
int status = this->sendQue.reserveSpace ( 16u );
|
|
if ( status == ECA_NORMAL ) {
|
|
this->sendQue.pushUInt16 ( CA_PROTO_ECHO ); // cmd
|
|
this->sendQue.pushUInt16 ( 0u ); // postsize
|
|
this->sendQue.pushUInt16 ( 0u ); // dataType
|
|
this->sendQue.pushUInt16 ( 0u ); // count
|
|
this->sendQue.pushUInt32 ( 0u ); // cid
|
|
this->sendQue.pushUInt32 ( 0u ); // available
|
|
}
|
|
|
|
return status;
|
|
}
|
|
|
|
void tcpiiu::noopAction ()
|
|
{
|
|
return;
|
|
}
|
|
|
|
void tcpiiu::echoRespAction ()
|
|
{
|
|
return;
|
|
}
|
|
|
|
void tcpiiu::writeNotifyRespAction ()
|
|
{
|
|
int status = this->curMsg.m_cid;
|
|
if ( status == ECA_NORMAL ) {
|
|
this->ioCompletionNotifyAndDestroy ( this->curMsg.m_available );
|
|
}
|
|
else {
|
|
this->ioExceptionNotifyAndDestroy ( this->curMsg.m_available,
|
|
status, "write notify request rejected" );
|
|
}
|
|
}
|
|
|
|
void tcpiiu::readNotifyRespAction ()
|
|
{
|
|
int v41;
|
|
int status;
|
|
|
|
/*
|
|
* convert the data buffer from net
|
|
* format to host format
|
|
*/
|
|
# ifdef CONVERSION_REQUIRED
|
|
if ( this->curMsg.m_dataType < NELEMENTS ( cac_dbr_cvrt ) ) {
|
|
( *cac_dbr_cvrt[ this->curMsg.m_dataType ] ) (
|
|
this->pCurData, this->pCurData, FALSE, this->curMsg.m_count);
|
|
}
|
|
else {
|
|
this->curMsg.m_cid = htonl ( ECA_BADTYPE );
|
|
}
|
|
# endif
|
|
|
|
/*
|
|
* the channel id field is abused for
|
|
* read notify status starting
|
|
* with CA V4.1
|
|
*/
|
|
v41 = CA_V41 ( CA_PROTOCOL_VERSION, this->minorProtocolVersion );
|
|
if (v41) {
|
|
status = this->curMsg.m_cid;
|
|
}
|
|
else{
|
|
status = ECA_NORMAL;
|
|
}
|
|
|
|
if ( status == ECA_NORMAL ) {
|
|
this->ioCompletionNotifyAndDestroy ( this->curMsg.m_available,
|
|
this->curMsg.m_dataType, this->curMsg.m_count, this->pCurData );
|
|
}
|
|
else {
|
|
this->ioExceptionNotifyAndDestroy ( this->curMsg.m_available,
|
|
status, "read failed", this->curMsg.m_dataType, this->curMsg.m_count );
|
|
}
|
|
}
|
|
|
|
void tcpiiu::eventRespAction ()
|
|
{
|
|
int v41;
|
|
int status;
|
|
|
|
/*
|
|
* m_postsize = 0 used to be a confirmation, but is
|
|
* now a noop because the above hash lookup will
|
|
* not find a matching IO block
|
|
*/
|
|
if ( ! this->curMsg.m_postsize ) {
|
|
return;
|
|
}
|
|
|
|
/*
|
|
* convert the data buffer from net
|
|
* format to host format
|
|
*/
|
|
# ifdef CONVERSION_REQUIRED
|
|
if ( this->curMsg.m_dataType < NELEMENTS ( cac_dbr_cvrt ) ) {
|
|
( *cac_dbr_cvrt [ this->curMsg.m_dataType ] )(
|
|
this->pCurData, this->pCurData, FALSE,
|
|
this->curMsg.m_count);
|
|
}
|
|
else {
|
|
this->curMsg.m_cid = htonl ( ECA_BADTYPE );
|
|
}
|
|
# endif
|
|
|
|
/*
|
|
* the channel id field is abused for
|
|
* read notify status starting
|
|
* with CA V4.1
|
|
*/
|
|
v41 = CA_V41 ( CA_PROTOCOL_VERSION, this->minorProtocolVersion );
|
|
if (v41) {
|
|
status = this->curMsg.m_cid;
|
|
}
|
|
else {
|
|
status = ECA_NORMAL;
|
|
}
|
|
if ( status == ECA_NORMAL ) {
|
|
this->ioCompletionNotify ( this->curMsg.m_available,
|
|
this->curMsg.m_dataType, this->curMsg.m_count, this->pCurData );
|
|
}
|
|
else {
|
|
this->ioExceptionNotify ( this->curMsg.m_available,
|
|
status, "subscription update failed",
|
|
this->curMsg.m_dataType, this->curMsg.m_count );
|
|
}
|
|
}
|
|
|
|
void tcpiiu::readRespAction ()
|
|
{
|
|
this->ioCompletionNotifyAndDestroy ( this->curMsg.m_available,
|
|
this->curMsg.m_dataType, this->curMsg.m_count, this->pCurData );
|
|
}
|
|
|
|
void tcpiiu::clearChannelRespAction ()
|
|
{
|
|
this->pCAC ()->channelDestroy ( this->curMsg.m_available );
|
|
}
|
|
|
|
void tcpiiu::exceptionRespAction ()
|
|
{
|
|
char context[255];
|
|
char hostName[64];
|
|
caHdr *req = (caHdr *) this->pCurData;
|
|
|
|
{
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
|
|
if ( this->pHostNameCache ) {
|
|
this->pHostNameCache->hostName ( hostName, sizeof ( hostName ) );
|
|
if ( this->curMsg.m_postsize > sizeof (caHdr) ) {
|
|
sprintf ( context, "detected by: %s for: %s",
|
|
hostName, (char *)(req+1) );
|
|
}
|
|
else{
|
|
sprintf ( context, "for: %s", (char *) ( req + 1 ) );
|
|
}
|
|
}
|
|
else {
|
|
sprintf ( context, "for: %s", (char *) ( req + 1 ) );
|
|
}
|
|
}
|
|
|
|
switch ( ntohs ( req->m_cmmd ) ) {
|
|
case CA_PROTO_READ_NOTIFY:
|
|
this->ioExceptionNotifyAndDestroy ( ntohl (req->m_available),
|
|
ntohl (this->curMsg.m_available), context,
|
|
ntohs (req->m_dataType), ntohs (req->m_count) );
|
|
break;
|
|
case CA_PROTO_READ:
|
|
this->ioExceptionNotifyAndDestroy ( ntohl (req->m_available),
|
|
ntohl (this->curMsg.m_available), context,
|
|
ntohs (req->m_dataType), ntohs (req->m_count) );
|
|
break;
|
|
case CA_PROTO_WRITE_NOTIFY:
|
|
this->ioExceptionNotifyAndDestroy ( ntohl (req->m_available),
|
|
ntohl (this->curMsg.m_available), context,
|
|
ntohs (req->m_dataType), ntohs (req->m_count) );
|
|
break;
|
|
case CA_PROTO_WRITE:
|
|
this->pCAC ()->exceptionNotify ( ntohl ( this->curMsg.m_available),
|
|
context, ntohs (req->m_dataType), ntohs (req->m_count), __FILE__, __LINE__);
|
|
break;
|
|
case CA_PROTO_EVENT_ADD:
|
|
this->ioExceptionNotify ( ntohl (req->m_available),
|
|
ntohl (this->curMsg.m_available), context,
|
|
ntohs (req->m_dataType), ntohs (req->m_count) );
|
|
break;
|
|
case CA_PROTO_EVENT_CANCEL:
|
|
this->ioExceptionNotifyAndDestroy ( ntohl (req->m_available),
|
|
ntohl (this->curMsg.m_available), context );
|
|
break;
|
|
default:
|
|
this->pCAC ()->exceptionNotify (ntohl (this->curMsg.m_available),
|
|
context, __FILE__, __LINE__);
|
|
break;
|
|
}
|
|
}
|
|
|
|
void tcpiiu::accessRightsRespAction ()
|
|
{
|
|
static caar init;
|
|
caar arBitField = init; // shut up bounds checker
|
|
unsigned ar;
|
|
|
|
ar = this->curMsg.m_available;
|
|
arBitField.read_access = ( ar & CA_PROTO_ACCESS_RIGHT_READ ) ? 1 : 0;
|
|
arBitField.write_access = ( ar & CA_PROTO_ACCESS_RIGHT_WRITE ) ? 1 : 0;
|
|
|
|
this->pCAC ()->accessRightsNotify ( this->curMsg.m_cid, arBitField );
|
|
}
|
|
|
|
void tcpiiu::claimCIURespAction ()
|
|
{
|
|
this->pCAC ()->connectChannel ( this->ca_v44_ok (), this->curMsg.m_cid,
|
|
this->curMsg.m_dataType, this->curMsg.m_count, this->curMsg.m_available );
|
|
}
|
|
|
|
void tcpiiu::verifyAndDisconnectChan ()
|
|
{
|
|
this->pCAC ()->disconnectChannel ( this->curMsg.m_cid );
|
|
}
|
|
|
|
void tcpiiu::badTCPRespAction ()
|
|
{
|
|
char hostName[64];
|
|
bool hostNameInit;
|
|
{
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
if ( this->pHostNameCache ) {
|
|
this->pHostNameCache->hostName ( hostName, sizeof ( hostName ) );
|
|
hostNameInit = true;
|
|
}
|
|
else {
|
|
hostNameInit = false;
|
|
}
|
|
}
|
|
|
|
if ( hostNameInit ) {
|
|
ca_printf ( "CAC: Bad response code in TCP message from %s was %u\n",
|
|
hostName, this->curMsg.m_cmmd);
|
|
}
|
|
else {
|
|
ca_printf ( "CAC: Bad response code in TCP message was %u\n",
|
|
this->curMsg.m_cmmd);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* tcpiiu::processIncoming ()
|
|
*/
|
|
void tcpiiu::processIncoming ()
|
|
{
|
|
while ( 1 ) {
|
|
|
|
//
|
|
// fetch a complete message header
|
|
//
|
|
{
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
|
|
if ( ! this->msgHeaderAvailable ) {
|
|
|
|
this->msgHeaderAvailable = this->recvQue.copyOutBytes (
|
|
&this->curMsg, sizeof ( this->curMsg ) );
|
|
|
|
if ( ! this->msgHeaderAvailable ) {
|
|
return;
|
|
}
|
|
|
|
//
|
|
// fix endian of bytes
|
|
//
|
|
this->curMsg.m_cmmd = ntohs ( this->curMsg.m_cmmd );
|
|
this->curMsg.m_postsize = ntohs ( this->curMsg.m_postsize );
|
|
this->curMsg.m_dataType = ntohs ( this->curMsg.m_dataType );
|
|
this->curMsg.m_count = ntohs ( this->curMsg.m_count );
|
|
this->curMsg.m_cid = ntohl ( this->curMsg.m_cid );
|
|
this->curMsg.m_available = ntohl ( this->curMsg.m_available );
|
|
|
|
debugPrintf (
|
|
( "%s Cmd=%3d Type=%3d Count=%4d Size=%4d",
|
|
this->pHostName (),
|
|
this->curMsg.m_cmmd,
|
|
this->curMsg.m_dataType,
|
|
this->curMsg.m_count,
|
|
this->curMsg.m_postsize) );
|
|
|
|
debugPrintf (
|
|
( " Avail=%8x Cid=%6d\n",
|
|
this->curMsg.m_available,
|
|
this->curMsg.m_cid) );
|
|
}
|
|
|
|
//
|
|
// dont allow huge msg body until
|
|
// the client library supports it
|
|
//
|
|
if ( this->curMsg.m_postsize > ( unsigned ) MAX_TCP ) {
|
|
this->msgHeaderAvailable = false;
|
|
ca_printf ( "CAC: message body was too large ( disconnecting )\n" );
|
|
this->cleanShutdown ();
|
|
return;
|
|
}
|
|
|
|
//
|
|
// make sure we have a large enough message body cache
|
|
//
|
|
if ( this->curMsg.m_postsize > this->curDataMax ) {
|
|
void *pData;
|
|
size_t cacheSize;
|
|
|
|
/*
|
|
* scalar DBR_STRING is sometimes clipped to the
|
|
* actual string size so make sure this cache is
|
|
* as large as one DBR_STRING so they will
|
|
* not page fault if they read MAX_STRING_SIZE
|
|
* bytes (instead of the actual string size).
|
|
*/
|
|
cacheSize = max ( this->curMsg.m_postsize * 2u, MAX_STRING_SIZE );
|
|
pData = (void *) calloc (1u, cacheSize);
|
|
if ( ! pData ) {
|
|
ca_printf ("CAC: not enough memory for message body cache (disconnecting)\n");
|
|
this->cleanShutdown ();
|
|
return;
|
|
}
|
|
if ( this->pCurData ) {
|
|
free ( this->pCurData );
|
|
}
|
|
this->pCurData = pData;
|
|
this->curDataMax = this->curMsg.m_postsize;
|
|
}
|
|
|
|
if ( this->curMsg.m_postsize > 0u ) {
|
|
bool msgBodyAvailable = this->recvQue.copyOutBytes (
|
|
this->pCurData, this->curMsg.m_postsize );
|
|
if ( ! msgBodyAvailable ) {
|
|
return;
|
|
}
|
|
}
|
|
|
|
epicsEventSignal ( this->recvThreadRingBufferSpaceAvailableSignal );
|
|
}
|
|
|
|
/*
|
|
* execute the response message
|
|
*/
|
|
pProtoStubTCP pStub;
|
|
if ( this->curMsg.m_cmmd >= NELEMENTS ( tcpJumpTableCAC ) ) {
|
|
pStub = &tcpiiu::badTCPRespAction;
|
|
}
|
|
else {
|
|
pStub = tcpJumpTableCAC [this->curMsg.m_cmmd];
|
|
}
|
|
( this->*pStub ) ();
|
|
|
|
this->msgHeaderAvailable = false;
|
|
}
|
|
}
|
|
|
|
inline int tcpiiu::requestStubStatus ()
|
|
{
|
|
if ( this->state == iiu_connected ) {
|
|
return ECA_NORMAL;
|
|
}
|
|
else {
|
|
return ECA_DISCONNCHID;
|
|
}
|
|
}
|
|
|
|
int tcpiiu::writeRequest ( nciu &chan, unsigned type, unsigned nElem, const void *pValue )
|
|
{
|
|
bufferReservoir reservoir;
|
|
unsigned size, postcnt;
|
|
bool stringOptim;
|
|
|
|
if ( ! this->sendQue.dbr_type_ok ( type ) ) {
|
|
return ECA_BADTYPE;
|
|
}
|
|
|
|
if ( nElem > 0xffff) {
|
|
return ECA_BADCOUNT;
|
|
}
|
|
|
|
if ( type == DBR_STRING && nElem == 1 ) {
|
|
char *pstr = (char *) pValue;
|
|
size = strlen ( pstr ) +1;
|
|
stringOptim = true;
|
|
}
|
|
else {
|
|
size = dbr_size_n ( type, nElem );
|
|
stringOptim = false;
|
|
}
|
|
|
|
postcnt = CA_MESSAGE_ALIGN ( size );
|
|
if ( postcnt > 0xffff ) {
|
|
return ECA_BADCOUNT;
|
|
}
|
|
|
|
if ( this->sendQue.flushThreshold ( postcnt + 16u ) ) {
|
|
this->flushToWire ( true );
|
|
}
|
|
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
|
|
int status = this->sendQue.reserveSpace ( postcnt + 16u );
|
|
if ( status == ECA_NORMAL ) {
|
|
if ( ! chan.verifyConnected ( *this ) ) {
|
|
status = ECA_DISCONNCHID;
|
|
}
|
|
else {
|
|
this->sendQue.pushUInt16 ( CA_PROTO_WRITE ); // cmd
|
|
this->sendQue.pushUInt16 ( postcnt ); // postsize
|
|
this->sendQue.pushUInt16 ( type ); // dataType
|
|
this->sendQue.pushUInt16 ( nElem ); // count
|
|
this->sendQue.pushUInt32 ( chan.getSID () ); // cid
|
|
this->sendQue.pushUInt32 ( ~0UL ); // available
|
|
if ( stringOptim ) {
|
|
this->sendQue.pushString ( static_cast < const char * > ( pValue ), size );
|
|
}
|
|
else {
|
|
this->sendQue.push_dbr_type ( type, pValue, nElem );
|
|
}
|
|
this->sendQue.pushString ( nillBytes, postcnt - size );
|
|
}
|
|
}
|
|
|
|
return status;
|
|
}
|
|
|
|
int tcpiiu::writeNotifyRequest ( nciu &chan, cacNotify ¬ify, unsigned type,
|
|
unsigned nElem, const void *pValue )
|
|
{
|
|
ca_uint32_t size, postcnt;
|
|
bool stringOptim;
|
|
|
|
if ( ! this->ca_v41_ok () ) {
|
|
return ECA_NOSUPPORT;
|
|
}
|
|
|
|
if ( ! this->sendQue.dbr_type_ok ( type ) ) {
|
|
return ECA_BADTYPE;
|
|
}
|
|
|
|
if ( nElem > 0xffff ) {
|
|
return ECA_BADCOUNT;
|
|
}
|
|
|
|
if ( type == DBR_STRING && nElem == 1 ) {
|
|
char *pstr = (char *) pValue;
|
|
size = strlen ( pstr ) +1;
|
|
stringOptim = true;
|
|
}
|
|
else {
|
|
size = dbr_size_n ( type, nElem );
|
|
stringOptim = false;
|
|
}
|
|
postcnt = CA_MESSAGE_ALIGN ( size );
|
|
if ( postcnt > 0xffff ) {
|
|
return ECA_BADCOUNT;
|
|
}
|
|
|
|
if ( this->sendQue.flushThreshold ( postcnt + 16u ) ) {
|
|
this->flushToWire ( true );
|
|
}
|
|
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
|
|
int status = this->sendQue.reserveSpace ( postcnt + 16u );
|
|
if ( status == ECA_NORMAL ) {
|
|
if ( ! chan.verifyConnected ( *this ) ) {
|
|
status = ECA_DISCONNCHID;
|
|
}
|
|
else {
|
|
netWriteNotifyIO * pIO = new netWriteNotifyIO ( chan, notify );
|
|
if ( ! pIO ) {
|
|
status = ECA_ALLOCMEM;
|
|
}
|
|
else {
|
|
this->ioTable.add ( *pIO );
|
|
chan.tcpiiuPrivateListOfIO::eventq.add ( *pIO );
|
|
this->sendQue.pushUInt16 ( CA_PROTO_WRITE_NOTIFY ); // cmd
|
|
this->sendQue.pushUInt16 ( postcnt ); // postsize
|
|
this->sendQue.pushUInt16 ( type ); // dataType
|
|
this->sendQue.pushUInt16 ( nElem ); // count
|
|
this->sendQue.pushUInt32 ( chan.getSID () ); // cid
|
|
this->sendQue.pushUInt32 ( pIO->getID () ); // available
|
|
if ( stringOptim ) {
|
|
this->sendQue.pushString ( static_cast < const char * > ( pValue ), size );
|
|
}
|
|
else {
|
|
this->sendQue.push_dbr_type ( type, pValue, nElem );
|
|
}
|
|
this->sendQue.pushString ( nillBytes, postcnt - size );
|
|
}
|
|
}
|
|
}
|
|
|
|
return status;
|
|
}
|
|
|
|
int tcpiiu::readCopyRequest ( nciu &chan, unsigned type, unsigned nElem, void *pValue )
|
|
{
|
|
if ( nElem > 0xffff) {
|
|
return ECA_BADCOUNT;
|
|
}
|
|
if ( type > 0xffff) {
|
|
return ECA_BADTYPE;
|
|
}
|
|
|
|
if ( this->sendQue.flushThreshold ( 16u ) ) {
|
|
this->flushToWire ( true );
|
|
}
|
|
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
|
|
int status = this->sendQue.reserveSpace ( 16u );
|
|
if ( status == ECA_NORMAL ) {
|
|
if ( ! chan.verifyConnected ( *this ) ) {
|
|
status = ECA_DISCONNCHID;
|
|
}
|
|
else {
|
|
netReadCopyIO *pIO = new netReadCopyIO ( chan, type, nElem, pValue,
|
|
this->pCAC ()->readSequenceOfOutstandingIO () );
|
|
if ( ! pIO ) {
|
|
status = ECA_ALLOCMEM;
|
|
}
|
|
else {
|
|
this->ioTable.add ( *pIO );
|
|
chan.tcpiiuPrivateListOfIO::eventq.add ( *pIO );
|
|
this->sendQue.pushUInt16 ( CA_PROTO_READ ); // cmd
|
|
this->sendQue.pushUInt16 ( 0u ); // postsize
|
|
this->sendQue.pushUInt16 ( type ); // dataType
|
|
this->sendQue.pushUInt16 ( nElem ); // count
|
|
this->sendQue.pushUInt32 ( chan.getSID () ); // cid
|
|
this->sendQue.pushUInt32 ( pIO->getID () ); // available
|
|
}
|
|
}
|
|
}
|
|
|
|
return status;
|
|
}
|
|
|
|
int tcpiiu::readNotifyRequest ( nciu &chan, cacNotify ¬ify,
|
|
unsigned type, unsigned nElem )
|
|
{
|
|
if ( nElem > 0xffff) {
|
|
return ECA_BADCOUNT;
|
|
}
|
|
if ( type > 0xffff) {
|
|
return ECA_BADTYPE;
|
|
}
|
|
|
|
if ( this->sendQue.flushThreshold ( 16u ) ) {
|
|
this->flushToWire ( true );
|
|
}
|
|
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
|
|
int status = this->sendQue.reserveSpace ( 16u );
|
|
if ( status == ECA_NORMAL ) {
|
|
if ( ! chan.verifyConnected ( *this ) ) {
|
|
status = ECA_DISCONNCHID;
|
|
}
|
|
else {
|
|
netReadNotifyIO *pIO = new netReadNotifyIO ( chan, notify );
|
|
if ( ! pIO ) {
|
|
status = ECA_ALLOCMEM;
|
|
}
|
|
else {
|
|
this->ioTable.add ( *pIO );
|
|
chan.tcpiiuPrivateListOfIO::eventq.add ( *pIO );
|
|
this->sendQue.pushUInt16 ( CA_PROTO_READ_NOTIFY ); // cmd
|
|
this->sendQue.pushUInt16 ( 0u ); // postsize
|
|
this->sendQue.pushUInt16 ( type ); // dataType
|
|
this->sendQue.pushUInt16 ( nElem ); // count
|
|
this->sendQue.pushUInt32 ( chan.getSID () ); // cid
|
|
this->sendQue.pushUInt32 ( pIO->getID () ); // available
|
|
}
|
|
}
|
|
}
|
|
|
|
return status;
|
|
}
|
|
|
|
int tcpiiu::createChannelRequest ( nciu &chan )
|
|
{
|
|
const char *pName;
|
|
unsigned nameLength;
|
|
ca_uint32_t identity;
|
|
if ( this->ca_v44_ok () ) {
|
|
identity = chan.getCID ();
|
|
pName = chan.pName ();
|
|
nameLength = chan.nameLen ();
|
|
}
|
|
else {
|
|
identity = chan.getSID ();
|
|
pName = 0;
|
|
nameLength = 0u;
|
|
}
|
|
|
|
unsigned postCnt = CA_MESSAGE_ALIGN ( nameLength );
|
|
|
|
if ( postCnt > 0xffff ) {
|
|
return ECA_INTERNAL;
|
|
}
|
|
|
|
if ( this->sendQue.flushThreshold ( postCnt + 16u ) ) {
|
|
this->flush ();
|
|
}
|
|
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
|
|
int status = this->sendQue.reserveSpace ( postCnt + 16u );
|
|
if ( status == ECA_NORMAL ) {
|
|
if ( ! chan.verifyIIU ( *this ) ) {
|
|
status = ECA_DISCONNCHID;
|
|
}
|
|
else {
|
|
this->sendQue.pushUInt16 ( CA_PROTO_CLAIM_CIU ); // cmd
|
|
this->sendQue.pushUInt16 ( postCnt ); // postsize
|
|
this->sendQue.pushUInt16 ( 0u ); // dataType
|
|
this->sendQue.pushUInt16 ( 0u ); // count
|
|
this->sendQue.pushUInt32 ( identity ); // cid
|
|
//
|
|
// The available field is used (abused)
|
|
// here to communicate the minor version number
|
|
// starting with CA 4.1.
|
|
//
|
|
this->sendQue.pushUInt32 ( CA_MINOR_VERSION ); // available
|
|
if ( nameLength ) {
|
|
this->sendQue.pushString ( pName, nameLength );
|
|
}
|
|
if ( postCnt > nameLength ) {
|
|
this->sendQue.pushString ( nillBytes, postCnt - nameLength );
|
|
}
|
|
}
|
|
}
|
|
|
|
return status;
|
|
}
|
|
|
|
int tcpiiu::clearChannelRequest ( nciu &chan )
|
|
{
|
|
if ( this->sendQue.flushThreshold ( 16u ) ) {
|
|
this->flushToWire ( true );
|
|
}
|
|
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
|
|
int status = this->sendQue.reserveSpace ( 16u );
|
|
if ( status == ECA_NORMAL ) {
|
|
if ( ! chan.verifyConnected ( *this ) ) {
|
|
status = ECA_DISCONNCHID;
|
|
}
|
|
else {
|
|
baseNMIU *pNMIU;
|
|
while ( ( pNMIU = chan.tcpiiuPrivateListOfIO::eventq.get () ) ) {
|
|
baseNMIU *pFound = this->ioTable.remove ( *pNMIU );
|
|
assert ( pFound == pNMIU );
|
|
pNMIU->subscriptionCancelMsg ();
|
|
delete pNMIU;
|
|
}
|
|
this->sendQue.pushUInt16 ( CA_PROTO_CLEAR_CHANNEL ); // cmd
|
|
this->sendQue.pushUInt16 ( 0u ); // postsize
|
|
this->sendQue.pushUInt16 ( 0u ); // dataType
|
|
this->sendQue.pushUInt16 ( 0u ); // count
|
|
this->sendQue.pushUInt32 ( chan.getSID () ); // cid
|
|
this->sendQue.pushUInt32 ( chan.getCID () ); // available
|
|
}
|
|
}
|
|
|
|
return status;
|
|
}
|
|
|
|
int tcpiiu::subscriptionRequest ( netSubscription &subscr, bool userThread )
|
|
{
|
|
if ( subscr.getCount () > 0xffff ) {
|
|
return ECA_BADCOUNT;
|
|
}
|
|
|
|
if ( subscr.getType () > 0xffff ) {
|
|
return ECA_BADTYPE;
|
|
}
|
|
|
|
if ( subscr.getMask () > 0xffff ) {
|
|
return ECA_BADMASK;
|
|
}
|
|
|
|
if ( this->sendQue.flushThreshold ( 32u ) ) {
|
|
if ( userThread ) {
|
|
this->flushToWire ( true );
|
|
}
|
|
else {
|
|
this->flush ();
|
|
}
|
|
}
|
|
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
|
|
int status = this->sendQue.reserveSpace ( 32u );
|
|
if ( status == ECA_NORMAL ) {
|
|
if ( ! subscr.channel ().verifyConnected ( *this ) ) {
|
|
status = ECA_NORMAL;
|
|
}
|
|
else {
|
|
this->ioTable.add ( subscr );
|
|
subscr.channel ().tcpiiuPrivateListOfIO::eventq.add ( subscr );
|
|
|
|
// header
|
|
this->sendQue.pushUInt16 ( CA_PROTO_EVENT_ADD ); // cmd
|
|
this->sendQue.pushUInt16 ( 16u ); // postsize
|
|
this->sendQue.pushUInt16 ( static_cast < ca_uint16_t > ( subscr.getType () ) ); // dataType
|
|
this->sendQue.pushUInt16 ( static_cast < ca_uint16_t > ( subscr.getCount () ) ); // count
|
|
this->sendQue.pushUInt32 ( subscr.channel ().getSID () ); // cid
|
|
this->sendQue.pushUInt32 ( subscr.getID () ); // available
|
|
|
|
// extension
|
|
this->sendQue.pushFloat32 ( 0.0 ); // m_lval
|
|
this->sendQue.pushFloat32 ( 0.0 ); // m_hval
|
|
this->sendQue.pushFloat32 ( 0.0 ); // m_toval
|
|
this->sendQue.pushUInt16 ( static_cast < ca_uint16_t > ( subscr.getMask () ) ); // m_mask
|
|
this->sendQue.pushUInt16 ( 0u ); // m_pad
|
|
}
|
|
}
|
|
|
|
return status;
|
|
}
|
|
|
|
int tcpiiu::subscriptionCancelRequest ( netSubscription &subscr )
|
|
{
|
|
if ( this->sendQue.flushThreshold ( 16u ) ) {
|
|
this->flushToWire ( true );
|
|
}
|
|
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
|
|
int status = this->sendQue.reserveSpace ( 16u );
|
|
if ( status == ECA_NORMAL ) {
|
|
if ( ! subscr.channel ().verifyConnected ( *this ) ) {
|
|
status = ECA_DISCONNCHID;
|
|
}
|
|
else {
|
|
this->sendQue.pushUInt16 ( CA_PROTO_EVENT_CANCEL ); // cmd
|
|
this->sendQue.pushUInt16 ( 0u ); // postsize
|
|
this->sendQue.pushUInt16 ( static_cast < ca_uint16_t > ( subscr.getType () ) ); // dataType
|
|
this->sendQue.pushUInt16 ( static_cast < ca_uint16_t > ( subscr.getCount () ) ); // count
|
|
this->sendQue.pushUInt32 ( subscr.channel ().getSID () ); // cid
|
|
this->sendQue.pushUInt32 ( subscr.getID () ); // available
|
|
}
|
|
}
|
|
|
|
return status;
|
|
}
|
|
|
|
void tcpiiu::lastChannelDetachNotify ()
|
|
{
|
|
this->cleanShutdown ();
|
|
}
|
|
|
|
bool tcpiiu::flushToWire ( bool userThread )
|
|
{
|
|
bool success = true;
|
|
|
|
// the recv thread is not permitted to flush as this
|
|
// can result in a push / pull deadlock on the TCP pipe,
|
|
// but in that case we still schedual the flush through
|
|
// the higher priority send thread
|
|
if ( ! pCAC ()->flushPermit () ) {
|
|
this->flush ();
|
|
return true;
|
|
}
|
|
|
|
// enable callback processing prior to taking the flush lock
|
|
if ( userThread ) {
|
|
this->pCAC ()->enableCallbackPreemption ();
|
|
}
|
|
|
|
// only one thread at a time can perform a flush. Nevertheless,
|
|
// the primary lock must not be held while sending in order
|
|
// to prevent push pull deadlocks
|
|
epicsAutoMutex autoFlushMutex ( this->flushMutex );
|
|
|
|
while ( true ) {
|
|
comBuf * pBuf;
|
|
|
|
{
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
pBuf = this->sendQue.popNextComBufToSend ();
|
|
}
|
|
|
|
if ( ! pBuf ) {
|
|
break;
|
|
}
|
|
|
|
success = pBuf->flushToWire ( *this );
|
|
|
|
pBuf->destroy ();
|
|
|
|
if ( ! success ) {
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
while ( ( pBuf = this->sendQue.popNextComBufToSend () ) ) {
|
|
pBuf->destroy ();
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
|
|
if ( userThread ) {
|
|
this->pCAC ()->disableCallbackPreemption ();
|
|
}
|
|
|
|
return success;
|
|
}
|
|
|
|
void tcpiiu::ioCompletionNotify ( unsigned id, unsigned type,
|
|
unsigned long count, const void *pData )
|
|
{
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
baseNMIU * pmiu = this->ioTable.lookup ( id );
|
|
if ( pmiu ) {
|
|
pmiu->completionNotify ( type, count, pData );
|
|
}
|
|
}
|
|
|
|
void tcpiiu::ioExceptionNotify ( unsigned id, int status, const char *pContext )
|
|
{
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
baseNMIU * pmiu = this->ioTable.lookup ( id );
|
|
if ( pmiu ) {
|
|
pmiu->exceptionNotify ( status, pContext );
|
|
}
|
|
}
|
|
|
|
void tcpiiu::ioExceptionNotify ( unsigned id, int status,
|
|
const char *pContext, unsigned type, unsigned long count )
|
|
{
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
baseNMIU * pmiu = this->ioTable.lookup ( id );
|
|
if ( pmiu ) {
|
|
pmiu->exceptionNotify ( status, pContext, type, count );
|
|
}
|
|
}
|
|
|
|
void tcpiiu::ioCompletionNotifyAndDestroy ( unsigned id )
|
|
{
|
|
baseNMIU * pmiu;
|
|
|
|
{
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
pmiu = this->ioTable.remove ( id );
|
|
if ( pmiu ) {
|
|
pmiu->channel ().tcpiiuPrivateListOfIO::eventq.remove ( *pmiu );
|
|
}
|
|
}
|
|
|
|
if ( pmiu ) {
|
|
pmiu->completionNotify ();
|
|
delete pmiu;
|
|
}
|
|
}
|
|
|
|
void tcpiiu::ioCompletionNotifyAndDestroy ( unsigned id,
|
|
unsigned type, unsigned long count, const void *pData )
|
|
{
|
|
baseNMIU * pmiu;
|
|
|
|
{
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
pmiu = this->ioTable.remove ( id );
|
|
if ( pmiu ) {
|
|
pmiu->channel ().tcpiiuPrivateListOfIO::eventq.remove ( *pmiu );
|
|
}
|
|
}
|
|
|
|
if ( pmiu ) {
|
|
pmiu->completionNotify ( type, count, pData );
|
|
delete pmiu;
|
|
}
|
|
}
|
|
|
|
void tcpiiu::ioExceptionNotifyAndDestroy ( unsigned id, int status, const char *pContext )
|
|
{
|
|
baseNMIU * pmiu;
|
|
|
|
{
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
pmiu = this->ioTable.remove ( id );
|
|
if ( pmiu ) {
|
|
pmiu->channel ().tcpiiuPrivateListOfIO::eventq.remove ( *pmiu );
|
|
}
|
|
}
|
|
|
|
if ( pmiu ) {
|
|
pmiu->exceptionNotify ( status, pContext );
|
|
delete pmiu;
|
|
}
|
|
}
|
|
|
|
void tcpiiu::ioExceptionNotifyAndDestroy ( unsigned id, int status,
|
|
const char *pContext, unsigned type, unsigned long count )
|
|
{
|
|
baseNMIU * pmiu;
|
|
|
|
{
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
pmiu = this->ioTable.remove ( id );
|
|
if ( pmiu ) {
|
|
pmiu->channel ().tcpiiuPrivateListOfIO::eventq.remove ( *pmiu );
|
|
}
|
|
}
|
|
|
|
if ( pmiu ) {
|
|
pmiu->exceptionNotify ( status, pContext, type, count );
|
|
delete pmiu;
|
|
}
|
|
}
|
|
|
|
|
|
// resubscribe for monitors from this channel
|
|
void tcpiiu::subscribeAllIO ( nciu &chan )
|
|
{
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
if ( chan.verifyConnected ( *this ) ) {
|
|
tsDLIterBD < baseNMIU > iter =
|
|
chan.tcpiiuPrivateListOfIO::eventq.first ();
|
|
while ( iter.valid () ) {
|
|
iter->subscriptionMsg ();
|
|
iter++;
|
|
}
|
|
}
|
|
}
|
|
|
|
// cancel IO operations and monitor subscriptions
|
|
void tcpiiu::disconnectAllIO ( nciu &chan )
|
|
{
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
if ( chan.verifyConnected ( *this ) ) {
|
|
tsDLIterBD < baseNMIU > iter =
|
|
chan.tcpiiuPrivateListOfIO::eventq.first ();
|
|
while ( iter.valid () ) {
|
|
tsDLIterBD < baseNMIU > next = iter.itemAfter ();
|
|
this->ioTable.remove ( *iter );
|
|
if ( ! iter->isSubscription () ) {
|
|
iter->exceptionNotify ( ECA_DISCONN, this->pHostName () );
|
|
chan.tcpiiuPrivateListOfIO::eventq.remove ( *iter );
|
|
iter->destroy ();
|
|
}
|
|
iter = next;
|
|
}
|
|
}
|
|
}
|
|
|
|
void tcpiiu::unistallSubscription ( nciu &chan, netSubscription &subscr )
|
|
{
|
|
epicsAutoMutex autoMutex ( this->mutex );
|
|
if ( chan.verifyConnected ( *this ) ) {
|
|
baseNMIU *p = this->ioTable.remove ( subscr );
|
|
if ( p ) {
|
|
chan.tcpiiuPrivateListOfIO::eventq.remove ( subscr );
|
|
}
|
|
}
|
|
}
|