#include cdevReactor cdevSessionManager::Reactor; IntHash cdevSessionManager::localIdx; // ***************************************************************************** // * ClientSession::ClientSession : // * This method serves only to initialize the internals of the class. // ***************************************************************************** ClientSession::ClientSession ( int SocketID, int ClientID, int LocalID ) : socketID(SocketID), clientID(ClientID), localID(LocalID) { } // ***************************************************************************** // * ClientSession::~ClientSession : // * This method deletes all unprocessed binary packets that are stored // * within the queue. // ***************************************************************************** ClientSession::~ClientSession ( void ) { } // ***************************************************************************** // * SocketSession::SocketSession : // * This is the constructor for the SocketSession object, it serves only // * to initialize internal variables. // ***************************************************************************** SocketSession::SocketSession( int SocketID ) : FifoQueue(), socketID(SocketID) { } // ***************************************************************************** // * SocketSession::~SocketSession : // * This method deletes all unprocessed binary packets that are stored // * within the queue. // ***************************************************************************** SocketSession::~SocketSession ( void ) { char * binary; size_t binaryLen; while(dequeue(&binary, &binaryLen)==0) delete binary; } // ***************************************************************************** // * cdevSessionManager::~cdevSessionManager : // * This method deletes all entries from the client and socket queues // * and then deletes all queue objects... // ***************************************************************************** cdevSessionManager::~cdevSessionManager ( void ) { int socketID; int clientID; IntHashIterator clientIter(&clients); IntHashIterator socketIter(&sockets); ClientSession * clientPtr; SocketSession * socketPtr; cdevPacketBinary * packet; while((packet = (cdevPacketBinary *)inbound.dequeue())!=NULL) { delete packet; } clientIter.first(); while((clientPtr=(ClientSession *)clientIter.data())!=NULL) { clientID = clientPtr->getClientID(); clientIter++; removeClient(clientID, 0); } socketIter.first(); while((socketPtr=(SocketSession *)socketIter.data())!=NULL) { socketID = socketIter.key(); socketIter++; removeSocket(socketID); } } // ***************************************************************************** // * cdevSessionManager::newClientSession : // * This method allows the caller to create a new ClientSession object. The // * ClientSession object functions primarily as a pointer to the queue that // * holds packets destined for a specific client, however, the developer // * can create a subclass of the ClientSession that may be used to associate // * additional, client specific information to the structure. // ***************************************************************************** ClientSession * cdevSessionManager::newClientSession ( int SocketID, int ClientID, int LocalID ) { return new ClientSession (SocketID, ClientID, LocalID); } // ***************************************************************************** // * cdevSessionManager::newSocketSession : // * This method allows the caller to create a new SocketSession object. The // * SocketSession object functions primarily as a pointer to the queue that // * holds packets destined for a specific socket, however, the developer // * can create a subclass of the SocketSession that may be used to associate // * additional, socket specific information to the structure. // ***************************************************************************** SocketSession * cdevSessionManager::newSocketSession ( int SocketID ) { return new SocketSession (SocketID); } // ***************************************************************************** // * cdevSessionManager::deleteSocketSession : // * This method is called to delete a SocketSession object. // ***************************************************************************** void cdevSessionManager::deleteSocketSession ( SocketSession *socket ) { if(socket) { sockets.remove(socket->getSocketID()); delete socket; } } // ***************************************************************************** // * cdevSessionManager::findLocalClient : // * This method allows the caller to locate a ClientSession using the local // * client identifier that is assigned by the cdevSessionManager class. // ***************************************************************************** ClientSession * cdevSessionManager::findLocalClient( short localID ) { return (ClientSession *)localIdx.find(localID); } // ***************************************************************************** // * cdevSessionManager::findClient : // * This method allows the caller to locate a ClientSession using its // * clientID. // ***************************************************************************** ClientSession * cdevSessionManager::findClient( int clientID ) { return (ClientSession *)clients.find(clientID); } // ***************************************************************************** // * cdevSessionManager::findSocket : // * This method allows the caller to locate a SocketSession using its // * socketID. // ***************************************************************************** SocketSession * cdevSessionManager::findSocket( int socketID ) { return (SocketSession *)sockets.find(socketID); } // ***************************************************************************** // * cdevSessionManager::addClient : // * This method allows the caller to add a new clientID and construct a // * ClientSession object for it. // * // * The socketID must have already been registered using the addSocket // * method. If the clientID already exists or if an error occurs NULL will // * be returned. // ***************************************************************************** ClientSession * cdevSessionManager::addClient( int socketID, int clientID ) { ClientSession * session = NULL; if(findSocket(socketID)!=NULL && findClient(clientID)==NULL) { short localID = getNextLocalID(); if(localID>0 && (session = newClientSession(socketID, clientID, localID))!=NULL) { clients.insert (clientID, session); localIdx.insert((int)localID, session); } } return session; } // ***************************************************************************** // * cdevSessionManager::addSocket : // * This method allows the caller to add a new socketID and construct a // * SocketSession object to service it. // * // * This function will fail if the socketID has already been registered... // * On failure this method will return NULL. // ***************************************************************************** SocketSession * cdevSessionManager::addSocket ( int socketID ) { SocketSession * session = NULL; if(findSocket(socketID)==NULL && (session = newSocketSession(socketID))!=NULL) { sockets.insert(socketID, session); } return session; } // ***************************************************************************** // * cdevSessionManager::removeClient : // * This method will remove the specified clientID from the clients list // * and will delete the associated ClientSession object. // ***************************************************************************** void cdevSessionManager::removeClient ( int clientID, int unregisterFlag) { ClientSession * session; if((session = (ClientSession *)clients.find(clientID))!=NULL) { // ***************************************************** // * Submit an unregister command to notify the server // ***************************************************** if(unregisterFlag) unregisterClient(session->getLocalID()); localIdx.remove((int)session->getLocalID()); clients.remove (clientID); delete session; } } // ***************************************************************************** // * cdevSessionManager::removeSocket : // * This method will remove the specified socketID from the sockets list // * and will delete the associated SocketSession object. It will then // * ascend the clients list and will remove all ClientSessions that are // * associated with the socketID. // ***************************************************************************** void cdevSessionManager::removeSocket ( int socketID ) { cdevEventHandler * handler = NULL; SocketSession * socket; ClientSession * client; int clientID; if(Reactor.getHandler(socketID, handler)==0 && handler!=NULL) { delete handler; } if((socket = (SocketSession *)sockets.find(socketID))!=NULL) { IntHashIterator clientIter(&clients); clientIter.first(); while((client=(ClientSession *)clientIter.data())!=NULL) { if(client->getSocketID()==socketID) { clientID = client->getClientID(); clientIter++; removeClient(clientID); } else clientIter++; } deleteSocketSession(socket); } } // ***************************************************************************** // * cdevSessionManager::getNextLocalID : // * This method allows the caller to retrieve a unique localID to be // * assigned to a client. The nextLocalID value is automatically // * incremented. // ***************************************************************************** short cdevSessionManager::getNextLocalID ( void ) { static short nextLocalID = 0; short startingPoint = nextLocalID; ClientSession *session = NULL; if(nextLocalID>=32767) nextLocalID = 1; else nextLocalID++; startingPoint = nextLocalID; do { session=(ClientSession *)localIdx.find((int)nextLocalID); if(session!=NULL) { if(nextLocalID>=32767) nextLocalID = 1; else nextLocalID++; } } while(session!=NULL && nextLocalID!=startingPoint); return session==NULL?nextLocalID:-1; } // ***************************************************************************** // * cdevSessionManager::enqueue : // * This method is used to enqueue a binary packet into the inbound // * fifoQueue. This method is called by the client handler objects. // * // * The binary data item becomes the property of the queue and should not // * be accessed later by the caller. // ***************************************************************************** int cdevSessionManager::enqueue( int socketID, char * binary, unsigned binaryLen ) { int result = -1; SocketSession * socket = NULL; ClientSession * client = NULL; cdevPacketBinary * packet = new cdevPacketBinary; // ********************************************************************* // * Make sure its a valid packet. // ********************************************************************* if(packet->streamIn(binary, binaryLen)==0) { // ************************************************************* // * Add the socketID if it does not already exist. // ************************************************************* if((socket = findSocket(socketID))==NULL) { socket = addSocket(socketID); } // ************************************************************* // * Combine the (short)clientID and the (short)socketID to // * create a unique identifier for this client. // * // * < HIGH WORD > < LOW WORD > // * SSSSSSSS CCCCCCCCC // ************************************************************* short packetClientID; int clientID; packet->getClientID(packetClientID); clientID = ((socketID<<16)|packetClientID); // ************************************************************* // * Add a clientID if it does not already exist. // ************************************************************* if((client = findClient(clientID))==NULL && (client = addClient(socketID, clientID))!=NULL) { registerClient ( client->getLocalID() ); } // ************************************************************* // * This would only fail if the clientID had already been used // * by another socket. // ************************************************************* if(client!=NULL && client->getSocketID()==socketID) { result = 0; // ***************************************************** // * At this point everything necessary is known to // * submit the packet for processing... // ***************************************************** // ***************************************************** // * Set the clientID to the localClientID to be used // * by the server side of the connection. // ***************************************************** packet->setClientID(client->getLocalID()); // ***************************************************** // * Enqueue the packet and set the pointer to NULL to // * prevent its later deletion. // ***************************************************** inbound.enqueue((void *)packet); packet = NULL; } } if(packet!=NULL) delete packet; // ********************************************************************* // * If a packet was successfully added to the inbound queue and the // * queue is not empty , then add an event to the FDTrigger to cause // * the handle_input mechanism to be called. // ********************************************************************* if(result==0 && !inbound.empty()) trigger.insertEvent(); // ********************************************************************* // * Due to the new design of the SocketUtil class, the binary should // * never be deleted. // ********************************************************************* return result; } // ***************************************************************************** // * cdevSessionManager::enqueue : // * This method is used to enqueue a cdevPacket packet into an outbound // * queue. The method will first identify the target client and place the // * packet into its queue... then it will identify the socket and place the // * packet into its queue. // * // * This method will return -1 if either the socket or the client is // * undefined. // * // * The packet remains the property of the caller who must delete it. // ***************************************************************************** int cdevSessionManager::enqueue ( cdevPacket * input ) { cdevPacketBinary * packet = encodePacket(input); ClientSession * client = NULL; SocketSession * socket = NULL; int result = -1; // ********************************************************************* // * Note that this condition makes sure that a queue exists for both // * the client ID and the socketID. If both of these conditions are // * met, then it checks to ensure that the socket has not been overrun // * with data (500 or more packets). // ********************************************************************* if(packet!=NULL && input->getClientID()>0 && (client = findLocalClient(input->getClientID()))!=NULL && (socket = findSocket(client->getSocketID()))!=NULL) { cdevEventHandler * handler = NULL; Reactor.getHandler(client->getSocketID(), handler); // ************************************************************* // * Attempt to flush the handler if more than 500 packets // * have already been inserted. // ************************************************************* if(handler && socket->getCount()>=500) { outputError(CDEV_SEVERITY_WARN, "CDEV Server", "Forcing flush of socket %i to prevent overflow...", client->getSocketID()); // ***************************************************** // * Call handleOutput to cause the handler to attempt // * to write its contents. // ***************************************************** if(handler->handleOutput()<0) { Reactor.removeHandler(handler); handler = NULL; } } if(handler && socket->getCount()<500) { char * binary = NULL; size_t binaryLen = 0; // ***************************************************** // * Create a binary stream from the cdevPacketBinary // * object and then use the detachData method to // * prevent the buffer from being deleted when the // * cdevPacketBinary is destroyed. // ***************************************************** packet->streamOut(&binary, &binaryLen); packet->detachData(); // ***************************************************** // * Populate the cdevServerBinary object with the data // * that was extracted from the cdevPacketBinary object // ***************************************************** socket->enqueue(binary, binaryLen); // ***************************************************** // * Set the event mask for the outbound socket to read/ // * write to force it to attempt to write to the socket // * until data is transmitted. // ***************************************************** if(handler) handler->setMask(READ_MASK|WRITE_MASK); result = 0; } else if(handler) { outputError ( CDEV_SEVERITY_ERROR, "CDEV Server", "Dropping packet to socket %i : queue is full...", client->getSocketID()); } } // ********************************************************************* // * Delete the cdevPacketBinary if it was generated. // ********************************************************************* if(packet) delete packet; return result; } // ***************************************************************************** // * cdevSessionManager::dequeue : // * This method is used to by the server engine to extract a message from // * the inbound queue. Once the packet has been enqueue it will be sent to // * the decodePacket method which will perform any preprocessing that the // * developer may deem necessary before returning the packet to the caller. // * // * The cdevPacket object becomes the property of the caller and must // * be deleted when it is no longer needed. // ***************************************************************************** int cdevSessionManager::dequeue ( cdevPacket * &packet ) { cdevPacketBinary * binary = NULL; packet = NULL; // ********************************************************************* // * Conitnue this loop unitl a valid packet has been extracted, or // * until there are no more binary packets left in the queue. // ********************************************************************* do { // ************************************************************* // * Attempt to dequeue the cdevPacketBinary from the queue... // * If it is not NULL, then begin processing. // ************************************************************* if((binary = (cdevPacketBinary *)inbound.dequeue())!=NULL) { // ***************************************************** // * Call the decodePacket mechanism... This will use // * the factory mechanisms of the cdevPacket class to // * import the data and then will complete any required // * preprocessing... // ***************************************************** packet = decodePacket(binary); // ***************************************************** // * Delete the binary as it is no longer needed. // ***************************************************** delete binary; } } while(binary!=NULL && packet==NULL); // ********************************************************************* // * Return 0 if a cdevPacket was successfully dequeued, otherwise, // * return -1. // ********************************************************************* return packet?0:-1; } // ***************************************************************************** // * cdevServer::decodePacket : // * This method is used to perform preprocessing on a newly dequeued binary // * packet before it is provided to the caller. This method allows the // * developer to perform special preparations on the packet before providing // * it to the caller. // ***************************************************************************** cdevPacket * cdevSessionManager::decodePacket (cdevPacketBinary * input) { return input?cdevPacket::import(*input):(cdevPacket *)NULL; } // ***************************************************************************** // * cdevSessionManager::encodePacket : // * This method is used to perform postprocessing on a packet that has been // * enqueued to be sent to a client. This method allows the developer to // * perform special preparations on the packet before providing it to the // * client. // ***************************************************************************** cdevPacketBinary * cdevSessionManager::encodePacket ( cdevPacket * input ) { cdevPacketBinary *result; if(input) { char * binary; size_t binaryLen; input->streamOut(&binary, &binaryLen); if(binary && binaryLen) { result = new cdevPacketBinary; result->attachData(binary, binaryLen); } } return result; } // ***************************************************************************** // * cdevSessionManager::getHandle : // * This method is used to obtain a copy of the file handle that is used // * to poll for events. In this case it will be the read file descriptor // * of the FD_Trigger object. // ***************************************************************************** int cdevSessionManager::getHandle ( void ) const { return trigger.readfd(); } // ***************************************************************************** // * cdevSessionManager::handleInput : // * This method is called whenever there is a read event pending on the // * FD_Trigger object. // ***************************************************************************** int cdevSessionManager::handleInput ( void ) { processMessages(); if(inbound.empty()) trigger.purge(); return 0; } // **************************************************************************** // cdevSessionManager::handleClose : // Shuts down the timer - this will result in the destruction of the // * cdevSessionManager object. // **************************************************************************** int cdevSessionManager::handleClose(void) { return -1; } // **************************************************************************** // * cdevSessionManager::handleTimeout : // * Called when the timer expires... This method serves only to call the // * processMessages function. // **************************************************************************** int cdevSessionManager::handleTimeout(void) { processMessages(); if(inbound.empty()) trigger.purge(); return 0; } // *************************************************************************** // * cdevServer::processMessages : // * This is a simple stand-in function that retrieves a message from the // * queue and then immediately returns it to the outbound queue. This // * function should be overloaded by the developers mechanism for // * processing messages. // *************************************************************************** void cdevSessionManager::processMessages ( void ) { cdevPacket * packet; while(dequeue(packet)==0) { enqueue(packet); delete packet; } }