Files
cdev-1.7.2n/extensions/cdevGenericServer/lib/ServerInterface.cc
2022-12-13 12:44:04 +01:00

507 lines
18 KiB
C++
Executable File

#include "cdevClock.h"
#include "ServerInterface.h"
cdevReactor ServerInterface::Reactor;
// *****************************************************************************
// * ServerConnectionList::ServerConnectionList :
// * Constructor for the list of active connections that is being managed
// * by the ServerHandler.
// *****************************************************************************
ServerConnectionList::ServerConnectionList ( void )
{
maxItems = ALLOCATION_COUNT;
items = (ServerHandler **)malloc(maxItems*sizeof(ServerHandler *));
memset(items, 0, maxItems*sizeof(ServerHandler *));
}
// *****************************************************************************
// * ServerConnectionList::~ServerConnectionList :
// * Destructor for the list of active connections that is being managed
// * by the ServerHandler.
// *****************************************************************************
ServerConnectionList::~ServerConnectionList ( void )
{
free (items);
}
// *****************************************************************************
// * ServerConnectionList::find :
// * Locates a ServerHandler object that is associated with the specified
// * server name.
// *****************************************************************************
ServerHandler * ServerConnectionList::find ( char * server )
{
int i;
for(i=0;
i<maxItems &&
items[i]!=NULL &&
strcmp(server, items[i]->getServer());
i++);
return (i<maxItems && items[i]!=NULL)?items[i]:(ServerHandler *)NULL;
}
// *****************************************************************************
// * ServerConnectionList::insert :
// * Inserts the new ServerHandler object specified by handler into the
// * server connection list if it does not already exist there...
// *****************************************************************************
int ServerConnectionList::insert ( ServerHandler * handler )
{
int result = -1;
if(handler!=NULL)
{
int i;
char * server = handler->getServer();
for(i=0;
i<maxItems &&
items[i]!=NULL &&
items[i]!=handler &&
strcmp(server, items[i]->getServer());
i++);
if(i>=maxItems)
{
items = (ServerHandler **) realloc(items, 2*maxItems*sizeof(ServerHandler *));
memset(&items[maxItems], 0, maxItems*sizeof(ServerHandler *));
maxItems*=2;
}
if(items[i]==NULL)
{
items[i] = handler;
result = 0;
}
}
return result;
}
// *****************************************************************************
// * ServerConnectionList::remove :
// * Removes the ServerHandler object specified by handler if it exists in
// * list.
// *****************************************************************************
int ServerConnectionList::remove ( ServerHandler * handler )
{
int result = -1;
if(handler!=NULL)
{
int i;
for(i=0; i<maxItems && items[i]!=NULL && items[i]!=handler; i++);
if(i<maxItems && items[i]==handler)
{
items[i] = NULL;
if(i<maxItems-1) memmove(&items[i], &items[i+1], (maxItems-(i+1))*sizeof(ServerHandler *));
result = 0;
}
}
return result;
}
// *****************************************************************************
// * ServerConnectionList::remove :
// * locates and removes the ServerHandler specified by server and returns
// * a pointer to the handler that was removed.
// *****************************************************************************
ServerHandler * ServerConnectionList::remove ( char * server )
{
ServerHandler * handler = find(server);
if(handler!=NULL) remove(handler);
return handler;
}
// *****************************************************************************
// * ServerInterface::ServerInterface :
// * This is the constructor for the ServerInterface class. This class
// * will establish and maintain a collection of connections to cdevServers.
// *****************************************************************************
ServerInterface::ServerInterface ( void )
: connections(),
connectionQueues(),
defaultServer(NULL),
defaultServerHandler(NULL),
maxFd(32)
{
fdList = (int *)malloc( sizeof(int)*maxFd);
}
// *****************************************************************************
// * ServerInterface::~ServerInterface :
// * This is the destructor for the class. It is responsible for releasing
// * any memory that might have been previously allocated to the class.
// *****************************************************************************
ServerInterface::~ServerInterface ( void )
{
if(defaultServer) delete defaultServer;
free (fdList);
// *********************************************************************
// * Remove each of the ServerHandlers from the list and delete them.
// * This will remove them from the global reactor and will terminate
// * the connections.
// *
// * Note: When deleting a ServerHandler - it submits a FAILED_TO_SEND
// * message for each of the packets that it could not send. Because
// * destroying this class implies that the cdevService is being
// * destroyed also, this class will simply delete each of the packets
// * and will not forward any other information.
// *********************************************************************
ServerHandler * handler;
char * buf;
size_t len;
while((handler = connections[0])!=NULL)
{
while(handler->dequeue(&buf, &len)==0) delete buf;
connections.remove(handler);
delete handler;
}
// *********************************************************************
// * Iterate through the queues associated with servers and remove
// * all messages that have not been transmitted.
// *********************************************************************
StringHashIterator iter(&connectionQueues);
iter.first();
while((buf = iter.key())!=NULL)
{
FifoQueue * queue = (FifoQueue *)iter.data();
connectionQueues.remove(buf);
while(queue->dequeue(&buf, &len)==0) delete buf;
delete queue;
iter.first();
}
}
// *****************************************************************************
// * ServerInterface::getDefault :
// * This method allows the caller to obtain a pointer to the name of the
// * default server.
// *****************************************************************************
char * ServerInterface::getDefault ( void )
{
return defaultServer;
}
// *****************************************************************************
// * ServerInterface::setDefault :
// * This method allows the caller to set the name of the default server.
// *****************************************************************************
void ServerInterface::setDefault ( char * Default )
{
if(defaultServer!=NULL)
{
delete defaultServer;
defaultServer = NULL;
defaultServerHandler = NULL;
}
if(Default!=NULL && *Default)
{
defaultServer = strdup(Default);
defaultServerHandler = connect(defaultServer);
}
}
// *****************************************************************************
// * ServerInterface::connect :
// * This is a stub method that should be overridden by the developer in
// * a child class.
// *****************************************************************************
ServerHandler * ServerInterface::connect ( char *, char *, unsigned short )
{
return NULL;
}
// *****************************************************************************
// * ServerInterface::disconnect :
// * This is a stub method that should be overridden by the developer in
// * a child class.
// *****************************************************************************
ServerHandler * ServerInterface::disconnect ( char * )
{
return NULL;
}
// *****************************************************************************
// * ServerInterface::enqueue :
// * This enqueue mechanism is called by the client if they have already
// * identified the server that they wish to communicate with and have
// * obtained a pointer to its ServerHandler object.
// *
// * If the specified server handler is NULL then the message will be
// * submitted to the default server.
// *****************************************************************************
int ServerInterface::enqueue( ServerHandler * handler, char * binary, size_t binaryLen )
{
int result;
if(handler!=NULL || (handler=defaultServerHandler)!=NULL ||
(defaultServer!=NULL && (handler=(defaultServerHandler=connect(defaultServer)))!=NULL))
{
handler->enqueue(binary, binaryLen);
result = CDEV_SUCCESS;
}
else
{
outputError(CDEV_SEVERITY_ERROR,
"ServerInterface::enqueue",
"No default server has been specified");
result = CDEV_ERROR;
}
return result;
}
// *****************************************************************************
// * ServerInterface::getQueue :
// * This method allows the caller to obtain a pointer to the FifoQueue
// * associated with a specific server. This queue is then attached to
// * the ServerHandler that is connected to that server.
// *****************************************************************************
FifoQueue * ServerInterface::getQueue ( char * server )
{
FifoQueue * queue = (FifoQueue *)connectionQueues.find(server);
if(queue==NULL)
{
queue=new FifoQueue;
connectionQueues.insert(server, (void *)queue);
}
return queue;
}
// *****************************************************************************
// * ServerInterface::isPacketValid :
// * This method is called by the ServerHandler after it has been restarted.
// * If the ServerHandler inherits packets that are already in the queue, it
// * will call this method for each pcket in order to determine if they are
// * valid. If the packets are no longer valid (i.e., there is no longer
// * a cdevTransObj associated with them, they will be deleted. Otherwise,
// * they will be submitted to the server.
// *
// * Because this functionality will be handled in a derived class, this
// * method will always return 1 to indicate that the packet is valid.
// *****************************************************************************
int ServerInterface::isPacketValid ( char * binary, size_t binaryLen )
{
return (binary!=NULL && binaryLen>0)?1:0;
}
// *****************************************************************************
// * ServerInterface::getFd :
// * This mechanism is used to obtain a list of the file descriptors that
// * are used by the ServerInterface. These file descriptors may then be
// * used for polling.
// *****************************************************************************
int ServerInterface::getFd (int * &fd, int &numFd )
{
int idx = 0;
numFd = 0;
while(connections[idx]!=NULL)
{
if((numFd+1)>maxFd)
{
maxFd*=2;
fdList = (int *)realloc(fdList, maxFd*sizeof(int));
}
fdList[numFd++] = connections[idx++]->getHandle();
}
if(numFd) fd = fdList;
else fd = NULL;
return CDEV_SUCCESS;
}
// *****************************************************************************
// * ServerInterface::flush :
// * This mechanism is used to flush all outbound buffers to their respective
// * servers.
// *****************************************************************************
int ServerInterface::flush ( void )
{
// *********************************************************************
// * First - determine if there is any outbound data that needs to be
// * processed. Walk through the connectionQueue objects - which exist
// * even if the connection has been lost - and count up the number of
// * queues that are not empty.
// *********************************************************************
StringHashIterator iter(&connectionQueues);
FifoQueue * queue = NULL;
int needsFlush = 0;
// *********************************************************************
// * Invoke a brief reactor.checkHandlers in order to cause any
// * dead ServerHandlers to be removed from the Reactor. Then
// * attempt to reattach to any ServerHandlers that have
// * outbound data.
// *********************************************************************
char * server;
Reactor.checkHandlers();
for(iter.first(); (server=iter.key())!=NULL; iter++)
{
queue = (FifoQueue *)iter.data();
if(queue && !queue->empty())
{
needsFlush++;
connect(server);
}
}
// *********************************************************************
// * At this point he needsFlush variable will be non-zero if any of
// * the queues contain outbound data. The remainder of this code
// * only needs to be performed if this is the case.
// *********************************************************************
if(needsFlush)
{
// *************************************************************
// * Provide a maximum timeout value of 5 seconds. If the system
// * cannot be flushed in that amount of time - then an error or
// * a hang must exist on the other side.
// *************************************************************
cdevTimeValue t(5.0);
cdevClock timer;
timer.schedule(NULL, t);
// *************************************************************
// * While there are sockets that have outbound data AND the
// * timer has not expired - process events.
// *************************************************************
int idx = 0;
while(needsFlush && !timer.expired())
{
needsFlush = 0;
for(idx=0; connections[idx]!=NULL; idx++)
{
if(!connections[idx]->empty())
{
connections[idx]->setMask(cdevEventHandler::WRITE_MASK|cdevEventHandler::EXCEPT_MASK);
needsFlush++;
}
else connections[idx]->setMask(cdevEventHandler::READ_MASK|cdevEventHandler::EXCEPT_MASK);
}
if(needsFlush) Reactor.handleEvents(1.0, cdevReactor::UNTIL_EVENT);
}
// *********************************************************************
// * Walk through all of the ServerHandlers and restore them to
// * READ_MASK mode.
// *********************************************************************
if(needsFlush)
{
for(idx=0; connections[idx]!=NULL; idx++)
{
connections[idx]->setMask(cdevEventHandler::READ_MASK|cdevEventHandler::EXCEPT_MASK);
}
}
}
return CDEV_SUCCESS;
}
// *****************************************************************************
// * ServerInterface::flush :
// * This mechanism is used to flush all outbound buffers to their respective
// * servers.
// *****************************************************************************
int ServerInterface::flush ( int fd )
{
// *********************************************************************
// * Walk through each item and locate the handler that needs to be
// * flushed. If the connection is located, then handle events until
// * it is empty.
// *********************************************************************
int i;
for(i=0; connections[i]!=NULL && connections[i]->getHandle()!=fd; i++);
if(connections[i]!=NULL)
{
cdevTimeValue t(5.0);
cdevClock timer;
timer.schedule(NULL, t);
connections[i]->setMask(cdevEventHandler::WRITE_MASK|cdevEventHandler::EXCEPT_MASK);
while(connections[i]!=NULL && !connections[i]->empty() && !timer.expired())
{
Reactor.handleEvents(1.0, cdevReactor::UNTIL_EVENT);
}
// *************************************************************
// * If an error occurs during the write, the ServerHandler may
// * have been deleted - therefore, check to ensure that the
// * connection is valid prior to restoring its mask.
// *************************************************************
if(connections[i]!=NULL)
{
connections[i]->setMask(cdevEventHandler::READ_MASK|cdevEventHandler::EXCEPT_MASK);
}
}
return CDEV_SUCCESS;
}
// *****************************************************************************
// * ServerInterface::pend :
// * Pends for the specified period of time. The int parameter is not used
// * in this implementation. In later designs it may be used to allow the
// * interface to pend on a specific file descriptor.
// *****************************************************************************
int ServerInterface::pend ( double seconds, int )
{
// *********************************************************************
// * Test to determine if there are any ServerHandlers in operation.
// *********************************************************************
if(connections[0]!=NULL)
{
// *************************************************************
// * Flush outbound requests if necessary.
// *************************************************************
flush();
// *************************************************************
// * Handle events for the specified period.
// *************************************************************
Reactor.handleEvents(seconds);
}
return CDEV_SUCCESS;
}
// *****************************************************************************
// * ServerInterface::poll :
// * This method polls the cdevReactor to see if any events are ready to be
// * processed on any of the sockets. If events are ready, then they are
// * processed immediately - otherwise, the function returns immediately.
// *****************************************************************************
int ServerInterface::poll ( void )
{
// *********************************************************************
// * Test to determine if there are any ServerHandlers in operation.
// *********************************************************************
if(connections[0]!=NULL)
{
// *************************************************************
// * Flush outbound requests if necessary.
// *************************************************************
flush();
// *************************************************************
// * Handle events for 0 seconds - effectively polling.
// *************************************************************
Reactor.handleEvents(0.0001, cdevReactor::UNTIL_EVENT);
}
return CDEV_SUCCESS;
}