Files
cdev-1.7.2n/extensions/cdevGenericServer/cdevServer/cdevSessionManager.cc
2022-12-13 12:44:04 +01:00

636 lines
23 KiB
C++
Executable File

#include <cdevSessionManager.h>
cdevReactor cdevSessionManager::Reactor;
IntHash cdevSessionManager::localIdx;
// *****************************************************************************
// * ClientSession::ClientSession :
// * This method serves only to initialize the internals of the class.
// *****************************************************************************
ClientSession::ClientSession ( int SocketID, int ClientID, int LocalID )
: socketID(SocketID), clientID(ClientID), localID(LocalID)
{
}
// *****************************************************************************
// * ClientSession::~ClientSession :
// * This method deletes all unprocessed binary packets that are stored
// * within the queue.
// *****************************************************************************
ClientSession::~ClientSession ( void )
{
}
// *****************************************************************************
// * SocketSession::SocketSession :
// * This is the constructor for the SocketSession object, it serves only
// * to initialize internal variables.
// *****************************************************************************
SocketSession::SocketSession( int SocketID )
: FifoQueue(), socketID(SocketID)
{
}
// *****************************************************************************
// * SocketSession::~SocketSession :
// * This method deletes all unprocessed binary packets that are stored
// * within the queue.
// *****************************************************************************
SocketSession::~SocketSession ( void )
{
char * binary;
size_t binaryLen;
while(dequeue(&binary, &binaryLen)==0) delete binary;
}
// *****************************************************************************
// * cdevSessionManager::~cdevSessionManager :
// * This method deletes all entries from the client and socket queues
// * and then deletes all queue objects...
// *****************************************************************************
cdevSessionManager::~cdevSessionManager ( void )
{
int socketID;
int clientID;
IntHashIterator clientIter(&clients);
IntHashIterator socketIter(&sockets);
ClientSession * clientPtr;
SocketSession * socketPtr;
cdevPacketBinary * packet;
while((packet = (cdevPacketBinary *)inbound.dequeue())!=NULL)
{
delete packet;
}
clientIter.first();
while((clientPtr=(ClientSession *)clientIter.data())!=NULL)
{
clientID = clientPtr->getClientID();
clientIter++;
removeClient(clientID, 0);
}
socketIter.first();
while((socketPtr=(SocketSession *)socketIter.data())!=NULL)
{
socketID = socketIter.key();
socketIter++;
removeSocket(socketID);
}
}
// *****************************************************************************
// * cdevSessionManager::newClientSession :
// * This method allows the caller to create a new ClientSession object. The
// * ClientSession object functions primarily as a pointer to the queue that
// * holds packets destined for a specific client, however, the developer
// * can create a subclass of the ClientSession that may be used to associate
// * additional, client specific information to the structure.
// *****************************************************************************
ClientSession * cdevSessionManager::newClientSession ( int SocketID, int ClientID, int LocalID )
{
return new ClientSession (SocketID, ClientID, LocalID);
}
// *****************************************************************************
// * cdevSessionManager::newSocketSession :
// * This method allows the caller to create a new SocketSession object. The
// * SocketSession object functions primarily as a pointer to the queue that
// * holds packets destined for a specific socket, however, the developer
// * can create a subclass of the SocketSession that may be used to associate
// * additional, socket specific information to the structure.
// *****************************************************************************
SocketSession * cdevSessionManager::newSocketSession ( int SocketID )
{
return new SocketSession (SocketID);
}
// *****************************************************************************
// * cdevSessionManager::deleteSocketSession :
// * This method is called to delete a SocketSession object.
// *****************************************************************************
void cdevSessionManager::deleteSocketSession ( SocketSession *socket )
{
if(socket)
{
sockets.remove(socket->getSocketID());
delete socket;
}
}
// *****************************************************************************
// * cdevSessionManager::findLocalClient :
// * This method allows the caller to locate a ClientSession using the local
// * client identifier that is assigned by the cdevSessionManager class.
// *****************************************************************************
ClientSession * cdevSessionManager::findLocalClient( short localID )
{
return (ClientSession *)localIdx.find(localID);
}
// *****************************************************************************
// * cdevSessionManager::findClient :
// * This method allows the caller to locate a ClientSession using its
// * clientID.
// *****************************************************************************
ClientSession * cdevSessionManager::findClient( int clientID )
{
return (ClientSession *)clients.find(clientID);
}
// *****************************************************************************
// * cdevSessionManager::findSocket :
// * This method allows the caller to locate a SocketSession using its
// * socketID.
// *****************************************************************************
SocketSession * cdevSessionManager::findSocket( int socketID )
{
return (SocketSession *)sockets.find(socketID);
}
// *****************************************************************************
// * cdevSessionManager::addClient :
// * This method allows the caller to add a new clientID and construct a
// * ClientSession object for it.
// *
// * The socketID must have already been registered using the addSocket
// * method. If the clientID already exists or if an error occurs NULL will
// * be returned.
// *****************************************************************************
ClientSession * cdevSessionManager::addClient( int socketID, int clientID )
{
ClientSession * session = NULL;
if(findSocket(socketID)!=NULL && findClient(clientID)==NULL)
{
short localID = getNextLocalID();
if(localID>0 &&
(session = newClientSession(socketID, clientID, localID))!=NULL)
{
clients.insert (clientID, session);
localIdx.insert((int)localID, session);
}
}
return session;
}
// *****************************************************************************
// * cdevSessionManager::addSocket :
// * This method allows the caller to add a new socketID and construct a
// * SocketSession object to service it.
// *
// * This function will fail if the socketID has already been registered...
// * On failure this method will return NULL.
// *****************************************************************************
SocketSession * cdevSessionManager::addSocket ( int socketID )
{
SocketSession * session = NULL;
if(findSocket(socketID)==NULL &&
(session = newSocketSession(socketID))!=NULL)
{
sockets.insert(socketID, session);
}
return session;
}
// *****************************************************************************
// * cdevSessionManager::removeClient :
// * This method will remove the specified clientID from the clients list
// * and will delete the associated ClientSession object.
// *****************************************************************************
void cdevSessionManager::removeClient ( int clientID, int unregisterFlag)
{
ClientSession * session;
if((session = (ClientSession *)clients.find(clientID))!=NULL)
{
// *****************************************************
// * Submit an unregister command to notify the server
// *****************************************************
if(unregisterFlag) unregisterClient(session->getLocalID());
localIdx.remove((int)session->getLocalID());
clients.remove (clientID);
delete session;
}
}
// *****************************************************************************
// * cdevSessionManager::removeSocket :
// * This method will remove the specified socketID from the sockets list
// * and will delete the associated SocketSession object. It will then
// * ascend the clients list and will remove all ClientSessions that are
// * associated with the socketID.
// *****************************************************************************
void cdevSessionManager::removeSocket ( int socketID )
{
cdevEventHandler * handler = NULL;
SocketSession * socket;
ClientSession * client;
int clientID;
if(Reactor.getHandler(socketID, handler)==0 && handler!=NULL)
{
delete handler;
}
if((socket = (SocketSession *)sockets.find(socketID))!=NULL)
{
IntHashIterator clientIter(&clients);
clientIter.first();
while((client=(ClientSession *)clientIter.data())!=NULL)
{
if(client->getSocketID()==socketID)
{
clientID = client->getClientID();
clientIter++;
removeClient(clientID);
}
else clientIter++;
}
deleteSocketSession(socket);
}
}
// *****************************************************************************
// * cdevSessionManager::getNextLocalID :
// * This method allows the caller to retrieve a unique localID to be
// * assigned to a client. The nextLocalID value is automatically
// * incremented.
// *****************************************************************************
short cdevSessionManager::getNextLocalID ( void )
{
static short nextLocalID = 0;
short startingPoint = nextLocalID;
ClientSession *session = NULL;
if(nextLocalID>=32767) nextLocalID = 1;
else nextLocalID++;
startingPoint = nextLocalID;
do {
session=(ClientSession *)localIdx.find((int)nextLocalID);
if(session!=NULL)
{
if(nextLocalID>=32767) nextLocalID = 1;
else nextLocalID++;
}
} while(session!=NULL && nextLocalID!=startingPoint);
return session==NULL?nextLocalID:-1;
}
// *****************************************************************************
// * cdevSessionManager::enqueue :
// * This method is used to enqueue a binary packet into the inbound
// * fifoQueue. This method is called by the client handler objects.
// *
// * The binary data item becomes the property of the queue and should not
// * be accessed later by the caller.
// *****************************************************************************
int cdevSessionManager::enqueue( int socketID, char * binary, unsigned binaryLen )
{
int result = -1;
SocketSession * socket = NULL;
ClientSession * client = NULL;
cdevPacketBinary * packet = new cdevPacketBinary;
// *********************************************************************
// * Make sure its a valid packet.
// *********************************************************************
if(packet->streamIn(binary, binaryLen)==0)
{
// *************************************************************
// * Add the socketID if it does not already exist.
// *************************************************************
if((socket = findSocket(socketID))==NULL)
{
socket = addSocket(socketID);
}
// *************************************************************
// * Combine the (short)clientID and the (short)socketID to
// * create a unique identifier for this client.
// *
// * < HIGH WORD > < LOW WORD >
// * SSSSSSSS CCCCCCCCC
// *************************************************************
short packetClientID;
int clientID;
packet->getClientID(packetClientID);
clientID = ((socketID<<16)|packetClientID);
// *************************************************************
// * Add a clientID if it does not already exist.
// *************************************************************
if((client = findClient(clientID))==NULL &&
(client = addClient(socketID, clientID))!=NULL)
{
registerClient ( client->getLocalID() );
}
// *************************************************************
// * This would only fail if the clientID had already been used
// * by another socket.
// *************************************************************
if(client!=NULL && client->getSocketID()==socketID)
{
result = 0;
// *****************************************************
// * At this point everything necessary is known to
// * submit the packet for processing...
// *****************************************************
// *****************************************************
// * Set the clientID to the localClientID to be used
// * by the server side of the connection.
// *****************************************************
packet->setClientID(client->getLocalID());
// *****************************************************
// * Enqueue the packet and set the pointer to NULL to
// * prevent its later deletion.
// *****************************************************
inbound.enqueue((void *)packet);
packet = NULL;
}
}
if(packet!=NULL) delete packet;
// *********************************************************************
// * If a packet was successfully added to the inbound queue and the
// * queue is not empty , then add an event to the FDTrigger to cause
// * the handle_input mechanism to be called.
// *********************************************************************
if(result==0 && !inbound.empty()) trigger.insertEvent();
// *********************************************************************
// * Due to the new design of the SocketUtil class, the binary should
// * never be deleted.
// *********************************************************************
return result;
}
// *****************************************************************************
// * cdevSessionManager::enqueue :
// * This method is used to enqueue a cdevPacket packet into an outbound
// * queue. The method will first identify the target client and place the
// * packet into its queue... then it will identify the socket and place the
// * packet into its queue.
// *
// * This method will return -1 if either the socket or the client is
// * undefined.
// *
// * The packet remains the property of the caller who must delete it.
// *****************************************************************************
int cdevSessionManager::enqueue ( cdevPacket * input )
{
cdevPacketBinary * packet = encodePacket(input);
ClientSession * client = NULL;
SocketSession * socket = NULL;
int result = -1;
// *********************************************************************
// * Note that this condition makes sure that a queue exists for both
// * the client ID and the socketID. If both of these conditions are
// * met, then it checks to ensure that the socket has not been overrun
// * with data (500 or more packets).
// *********************************************************************
if(packet!=NULL &&
input->getClientID()>0 &&
(client = findLocalClient(input->getClientID()))!=NULL &&
(socket = findSocket(client->getSocketID()))!=NULL)
{
cdevEventHandler * handler = NULL;
Reactor.getHandler(client->getSocketID(), handler);
// *************************************************************
// * Attempt to flush the handler if more than 500 packets
// * have already been inserted.
// *************************************************************
if(handler && socket->getCount()>=500)
{
outputError(CDEV_SEVERITY_WARN, "CDEV Server",
"Forcing flush of socket %i to prevent overflow...",
client->getSocketID());
// *****************************************************
// * Call handleOutput to cause the handler to attempt
// * to write its contents.
// *****************************************************
if(handler->handleOutput()<0)
{
Reactor.removeHandler(handler);
handler = NULL;
}
}
if(handler && socket->getCount()<500)
{
char * binary = NULL;
size_t binaryLen = 0;
// *****************************************************
// * Create a binary stream from the cdevPacketBinary
// * object and then use the detachData method to
// * prevent the buffer from being deleted when the
// * cdevPacketBinary is destroyed.
// *****************************************************
packet->streamOut(&binary, &binaryLen);
packet->detachData();
// *****************************************************
// * Populate the cdevServerBinary object with the data
// * that was extracted from the cdevPacketBinary object
// *****************************************************
socket->enqueue(binary, binaryLen);
// *****************************************************
// * Set the event mask for the outbound socket to read/
// * write to force it to attempt to write to the socket
// * until data is transmitted.
// *****************************************************
if(handler) handler->setMask(READ_MASK|WRITE_MASK);
result = 0;
}
else if(handler)
{
outputError ( CDEV_SEVERITY_ERROR, "CDEV Server",
"Dropping packet to socket %i : queue is full...",
client->getSocketID());
}
}
// *********************************************************************
// * Delete the cdevPacketBinary if it was generated.
// *********************************************************************
if(packet) delete packet;
return result;
}
// *****************************************************************************
// * cdevSessionManager::dequeue :
// * This method is used to by the server engine to extract a message from
// * the inbound queue. Once the packet has been enqueue it will be sent to
// * the decodePacket method which will perform any preprocessing that the
// * developer may deem necessary before returning the packet to the caller.
// *
// * The cdevPacket object becomes the property of the caller and must
// * be deleted when it is no longer needed.
// *****************************************************************************
int cdevSessionManager::dequeue ( cdevPacket * &packet )
{
cdevPacketBinary * binary = NULL;
packet = NULL;
// *********************************************************************
// * Conitnue this loop unitl a valid packet has been extracted, or
// * until there are no more binary packets left in the queue.
// *********************************************************************
do {
// *************************************************************
// * Attempt to dequeue the cdevPacketBinary from the queue...
// * If it is not NULL, then begin processing.
// *************************************************************
if((binary = (cdevPacketBinary *)inbound.dequeue())!=NULL)
{
// *****************************************************
// * Call the decodePacket mechanism... This will use
// * the factory mechanisms of the cdevPacket class to
// * import the data and then will complete any required
// * preprocessing...
// *****************************************************
packet = decodePacket(binary);
// *****************************************************
// * Delete the binary as it is no longer needed.
// *****************************************************
delete binary;
}
} while(binary!=NULL && packet==NULL);
// *********************************************************************
// * Return 0 if a cdevPacket was successfully dequeued, otherwise,
// * return -1.
// *********************************************************************
return packet?0:-1;
}
// *****************************************************************************
// * cdevServer::decodePacket :
// * This method is used to perform preprocessing on a newly dequeued binary
// * packet before it is provided to the caller. This method allows the
// * developer to perform special preparations on the packet before providing
// * it to the caller.
// *****************************************************************************
cdevPacket * cdevSessionManager::decodePacket (cdevPacketBinary * input)
{
return input?cdevPacket::import(*input):(cdevPacket *)NULL;
}
// *****************************************************************************
// * cdevSessionManager::encodePacket :
// * This method is used to perform postprocessing on a packet that has been
// * enqueued to be sent to a client. This method allows the developer to
// * perform special preparations on the packet before providing it to the
// * client.
// *****************************************************************************
cdevPacketBinary * cdevSessionManager::encodePacket ( cdevPacket * input )
{
cdevPacketBinary *result;
if(input)
{
char * binary;
size_t binaryLen;
input->streamOut(&binary, &binaryLen);
if(binary && binaryLen)
{
result = new cdevPacketBinary;
result->attachData(binary, binaryLen);
}
}
return result;
}
// *****************************************************************************
// * cdevSessionManager::getHandle :
// * This method is used to obtain a copy of the file handle that is used
// * to poll for events. In this case it will be the read file descriptor
// * of the FD_Trigger object.
// *****************************************************************************
int cdevSessionManager::getHandle ( void ) const
{
return trigger.readfd();
}
// *****************************************************************************
// * cdevSessionManager::handleInput :
// * This method is called whenever there is a read event pending on the
// * FD_Trigger object.
// *****************************************************************************
int cdevSessionManager::handleInput ( void )
{
processMessages();
if(inbound.empty()) trigger.purge();
return 0;
}
// ****************************************************************************
// cdevSessionManager::handleClose :
// Shuts down the timer - this will result in the destruction of the
// * cdevSessionManager object.
// ****************************************************************************
int cdevSessionManager::handleClose(void)
{
return -1;
}
// ****************************************************************************
// * cdevSessionManager::handleTimeout :
// * Called when the timer expires... This method serves only to call the
// * processMessages function.
// ****************************************************************************
int cdevSessionManager::handleTimeout(void)
{
processMessages();
if(inbound.empty()) trigger.purge();
return 0;
}
// ***************************************************************************
// * cdevServer::processMessages :
// * This is a simple stand-in function that retrieves a message from the
// * queue and then immediately returns it to the outbound queue. This
// * function should be overloaded by the developers mechanism for
// * processing messages.
// ***************************************************************************
void cdevSessionManager::processMessages ( void )
{
cdevPacket * packet;
while(dequeue(packet)==0)
{
enqueue(packet);
delete packet;
}
}