/* * blockingTCPTransport.cpp * * Created on: Dec 29, 2010 * Author: Miha Vitorovic */ #include "blockingTCP.h" #include "inetAddressUtil.h" #include "growingCircularBuffer.h" #include "caConstants.h" /* pvData */ #include #include #include #include /* EPICSv3 */ #include #include #include #include /* standard */ #include #include #include #include using namespace epics::pvData; using std::max; using std::min; using std::ostringstream; namespace epics { namespace pvAccess { class MonitorSender : public TransportSender, public NoDefaultMethods { public: MonitorSender(Mutex* monitorMutex, GrowingCircularBuffer< TransportSender*>* monitorSendQueue) : _monitorMutex(monitorMutex), _monitorSendQueue(monitorSendQueue) { } virtual ~MonitorSender() { } virtual void lock() { } virtual void unlock() { } virtual void send(ByteBuffer* buffer, TransportSendControl* control); private: Mutex* _monitorMutex; GrowingCircularBuffer* _monitorSendQueue; }; BlockingTCPTransport::BlockingTCPTransport(Context* context, SOCKET channel, ResponseHandler* responseHandler, int receiveBufferSize, int16 priority) : _closed(false), _channel(channel), _remoteTransportRevision(0), _remoteTransportReceiveBufferSize(MAX_TCP_RECV), _remoteTransportSocketReceiveBufferSize(MAX_TCP_RECV), _priority(priority), _responseHandler(responseHandler), _totalBytesReceived(0), _totalBytesSent(0), _markerToSend(0), _verified(false), _remoteBufferFreeSpace( LONG_LONG_MAX), _autoDelete(true), _markerPeriodBytes(MARKER_PERIOD), _nextMarkerPosition( _markerPeriodBytes), _sendPending(false), _lastMessageStartPosition(0), _stage(READ_FROM_SOCKET), _lastSegmentedMessageType(0), _lastSegmentedMessageCommand( 0), _storedPayloadSize(0), _storedPosition(0), _storedLimit(0), _magicAndVersion(0), _packetType(0), _command(0), _payloadSize(0), _flushRequested(false), _sendBufferSentPosition(0), _flushStrategy(DELAYED), _sendQueue( new GrowingCircularBuffer (100)), _rcvThreadId(NULL), _sendThreadId(NULL), _monitorSendQueue( new GrowingCircularBuffer (100)), _monitorSender(new MonitorSender(&_monitorMutex, _monitorSendQueue)), _context(context), _sendThreadExited(false) { _socketBuffer = new ByteBuffer(max(MAX_TCP_RECV +MAX_ENSURE_DATA_BUFFER_SIZE, receiveBufferSize), EPICS_ENDIAN_BIG); _socketBuffer->setPosition(_socketBuffer->getLimit()); _startPosition = _socketBuffer->getPosition(); // allocate buffer _sendBuffer = new ByteBuffer(_socketBuffer->getSize(), EPICS_ENDIAN_BIG); _maxPayloadSize = _sendBuffer->getSize()-2*CA_MESSAGE_HEADER_SIZE; // one for header, one for flow control // get send buffer size socklen_t intLen = sizeof(int); int retval = getsockopt(_channel, SOL_SOCKET, SO_SNDBUF, &_socketSendBufferSize, &intLen); if(retval<0) { _socketSendBufferSize = MAX_TCP_RECV; errlogSevPrintf(errlogMinor, "Unable to retrieve socket send buffer size: %s", strerror(errno)); } socklen_t saSize = sizeof(sockaddr); retval = getpeername(_channel, &(_socketAddress.sa), &saSize); if(retval<0) { errlogSevPrintf(errlogMajor, "Error fetching socket remote address: %s", strerror( errno)); } // prepare buffer clearAndReleaseBuffer(); // add to registry _context->getTransportRegistry()->put(this); } BlockingTCPTransport::~BlockingTCPTransport() { close(true); delete _sendQueue; delete _socketBuffer; delete _sendBuffer; delete _responseHandler; } void BlockingTCPTransport::start() { // TODO consuder epics::pvData::Thread String threadName = "TCP-receive "+inetAddressToString( _socketAddress); errlogSevPrintf(errlogInfo, "Starting thread: %s", threadName.c_str()); _rcvThreadId = epicsThreadCreate(threadName.c_str(), epicsThreadPriorityMedium, epicsThreadGetStackSize( epicsThreadStackMedium), BlockingTCPTransport::rcvThreadRunner, this); threadName = "TCP-send "+inetAddressToString(_socketAddress); errlogSevPrintf(errlogInfo, "Starting thread: %s", threadName.c_str()); _sendThreadId = epicsThreadCreate(threadName.c_str(), epicsThreadPriorityMedium, epicsThreadGetStackSize( epicsThreadStackMedium), BlockingTCPTransport::sendThreadRunner, this); } void BlockingTCPTransport::clearAndReleaseBuffer() { // NOTE: take care that nextMarkerPosition is set right // fix position to be correct when buffer is cleared // do not include pre-buffered flow control message; not 100% correct, but OK _nextMarkerPosition -= _sendBuffer->getPosition() -CA_MESSAGE_HEADER_SIZE; _sendQueueMutex.lock(); _flushRequested = false; _sendQueueMutex.unlock(); _sendBuffer->clear(); _sendPending = false; // prepare ACK marker _sendBuffer->putShort(CA_MAGIC_AND_VERSION); _sendBuffer->putByte(1); // control data _sendBuffer->putByte(1); // marker ACK _sendBuffer->putInt(0); } void BlockingTCPTransport::close(bool force) { Lock lock(&_mutex); // already closed check if(_closed) return; _closed = true; printf("closing.\n"); // remove from registry _context->getTransportRegistry()->remove(this); // clean resources internalClose(force); // notify send queue _sendQueueEvent.signal(); } void BlockingTCPTransport::internalClose(bool force) { // close the socket if(_channel!=INVALID_SOCKET) { epicsSocketDestroy(_channel); _channel = INVALID_SOCKET; } } int BlockingTCPTransport::getSocketReceiveBufferSize() const { // Get value of the SO_RCVBUF option for this DatagramSocket, // that is the buffer size used by the platform for input on // this DatagramSocket. int sockBufSize; socklen_t intLen = sizeof(int); int retval = getsockopt(_channel, SOL_SOCKET, SO_RCVBUF, &sockBufSize, &intLen); if(retval<0) errlogSevPrintf(errlogMajor, "Socket getsockopt SO_RCVBUF error: %s", strerror(errno)); return sockBufSize; } // TODO reimplement using Event bool BlockingTCPTransport::waitUntilVerified(double timeout) { double internalTimeout = timeout; bool internalVerified = false; _verifiedMutex.lock(); internalVerified = _verified; _verifiedMutex.unlock(); while(!internalVerified&&internalTimeout>0) { epicsThreadSleep(min(0.1, internalTimeout)); internalTimeout -= 0.1; _verifiedMutex.lock(); internalVerified = _verified; _verifiedMutex.unlock(); } return internalVerified; } void BlockingTCPTransport::flush(bool lastMessageCompleted) { // automatic end endMessage(!lastMessageCompleted); bool moreToSend = true; // TODO closed check !!! while(moreToSend) { moreToSend = !flush(); // all sent, exit if(!moreToSend) break; // TODO solve this sleep in a better way epicsThreadSleep(0.01); } _lastMessageStartPosition = _sendBuffer->getPosition(); // start with last header if(!lastMessageCompleted&&_lastSegmentedMessageType!=0) startMessage( _lastSegmentedMessageCommand, 0); } void BlockingTCPTransport::startMessage(int8 command, int ensureCapacity) { _lastMessageStartPosition = -1; ensureBuffer(CA_MESSAGE_HEADER_SIZE+ensureCapacity); _lastMessageStartPosition = _sendBuffer->getPosition(); _sendBuffer->putShort(CA_MAGIC_AND_VERSION); _sendBuffer->putByte(_lastSegmentedMessageType); // data _sendBuffer->putByte(command); // command _sendBuffer->putInt(0); // temporary zero payload } void BlockingTCPTransport::endMessage() { endMessage(false); } void BlockingTCPTransport::ensureBuffer(int size) { if(_sendBuffer->getRemaining()>=size) return; // too large for buffer... if(_maxPayloadSizegetRemaining()=0) { // set message size _sendBuffer->putInt(_lastMessageStartPosition+sizeof(int16)+2, _sendBuffer->getPosition()-_lastMessageStartPosition -CA_MESSAGE_HEADER_SIZE); int flagsPosition = _lastMessageStartPosition+sizeof(int16); // set segmented bit if(hasMoreSegments) { // first segment if(_lastSegmentedMessageType==0) { int8 type = _sendBuffer->getByte(flagsPosition); // set first segment bit _sendBuffer->putByte(flagsPosition, (int8)(type|0x10)); // first + last segment bit == in-between segment _lastSegmentedMessageType = (int8)(type|0x30); _lastSegmentedMessageCommand = _sendBuffer->getByte( flagsPosition+1); } } else { // last segment if(_lastSegmentedMessageType!=0) { // set last segment bit (by clearing first segment bit) _sendBuffer->putByte(flagsPosition, (int8)(_lastSegmentedMessageType&0xEF)); _lastSegmentedMessageType = 0; } } // manage markers int position = _sendBuffer->getPosition(); int bytesLeft = _sendBuffer->getRemaining(); if(position>=_nextMarkerPosition&&bytesLeft >=CA_MESSAGE_HEADER_SIZE) { _sendBuffer->putShort(CA_MAGIC_AND_VERSION); _sendBuffer->putByte(1); // control data _sendBuffer->putByte(0); // marker _sendBuffer->putInt((int)(_totalBytesSent+position +CA_MESSAGE_HEADER_SIZE)); _nextMarkerPosition = position+_markerPeriodBytes; } } } void BlockingTCPTransport::ensureData(int size) { // enough of data? if(_socketBuffer->getRemaining()>=size) return; // too large for buffer... if(_maxPayloadSizegetPosition()-_storedPosition; // no more data and we have some payload left => read buffer if(_storedPayloadSize>=size) { //errlogSevPrintf(errlogInfo, // "storedPayloadSize >= size, remaining: %d", // _socketBuffer->getRemaining()); // just read up remaining payload // since there is no data on the buffer, read to the beginning of it, at least size bytes processReadCached(true, PROCESS_PAYLOAD, size, false); _storedPosition = _socketBuffer->getPosition(); _storedLimit = _socketBuffer->getLimit(); _socketBuffer->setLimit(min(_storedPosition+_storedPayloadSize, _storedLimit)); } else { // copy remaining bytes, if any int remainingBytes = _socketBuffer->getRemaining(); for(int i = 0; iputByte(i, _socketBuffer->getByte()); // read what is left _socketBuffer->setLimit(_storedLimit); _stage = PROCESS_HEADER; processReadCached(true, NONE, size, false); // copy before position for(int i = remainingBytes-1, j = _socketBuffer->getPosition() -1; i>=0; i--, j--) _socketBuffer->putByte(j, _socketBuffer->getByte(i)); _startPosition = _socketBuffer->getPosition()-remainingBytes; _socketBuffer->setPosition(_startPosition); _storedPosition = _startPosition; //socketBuffer.position(); _storedLimit = _socketBuffer->getLimit(); _socketBuffer->setLimit(min(_storedPosition+_storedPayloadSize, _storedLimit)); // add if missing... if(!_closed&&_socketBuffer->getRemaining()getPosition(); _socketBuffer->setPosition( _socketBuffer->getLimit()); _socketBuffer->setLimit(_socketBuffer->getSize()); } else { // add to bytes read _totalBytesReceived += (_socketBuffer->getPosition() -_startPosition); // copy remaining bytes, if any int remainingBytes = _socketBuffer->getRemaining(); int endPosition = MAX_ENSURE_DATA_BUFFER_SIZE +remainingBytes; for(int i = MAX_ENSURE_DATA_BUFFER_SIZE; i putByte(i, _socketBuffer->getByte()); currentStartPosition = _startPosition = MAX_ENSURE_DATA_BUFFER_SIZE; _socketBuffer->setPosition( MAX_ENSURE_DATA_BUFFER_SIZE+remainingBytes); _socketBuffer->setLimit(_socketBuffer->getSize()); } // read at least requiredBytes bytes int requiredPosition = (currentStartPosition +requiredBytes); while(_socketBuffer->getPosition()getRemaining()); ssize_t bytesRead = recv(_channel, readBuffer, maxToRead, 0); _socketBuffer->put(readBuffer, 0, bytesRead); if(bytesRead<=0) { // error (disconnect, end-of-stream) detected close(true); if(bytesRead<0&&nestedCall) THROW_BASE_EXCEPTION( "bytesRead < 0"); return; } } _socketBuffer->setLimit(_socketBuffer->getPosition()); _socketBuffer->setPosition(currentStartPosition); // notify liveness aliveNotification(); // exit if(inStage!=NONE) return; _stage = PROCESS_HEADER; } if(_stage==PROCESS_HEADER) { // ensure CAConstants.CA_MESSAGE_HEADER_SIZE bytes of data if(_socketBuffer->getRemaining()getShort(); if((short)(_magicAndVersion&0xFFF0) !=CA_MAGIC_AND_MAJOR_VERSION) { // error... disconnect errlogSevPrintf( errlogMinor, "Invalid header received from client %s, disconnecting...", inetAddressToString(_socketAddress).c_str()); close(true); return; } // data vs. control packet _packetType = _socketBuffer->getByte(); // command _command = _socketBuffer->getByte(); // read payload size _payloadSize = _socketBuffer->getInt(); // data int8 type = (int8)(_packetType&0x0F); if(type==0) { _stage = PROCESS_PAYLOAD; } else if(type==1) { if(_command==0) { _flowControlMutex.lock(); if(_markerToSend==0) _markerToSend = _payloadSize; // TODO send back response _flowControlMutex.unlock(); } else //if (command == 1) { _flowControlMutex.lock(); int difference = (int)_totalBytesSent -_payloadSize+CA_MESSAGE_HEADER_SIZE; // overrun check if(difference<0) difference += INT_MAX; _remoteBufferFreeSpace = _remoteTransportReceiveBufferSize +_remoteTransportSocketReceiveBufferSize -difference; // TODO if this is calculated wrong, this can be critical !!! _flowControlMutex.unlock(); } // no payload //stage = ReceiveStage.PROCESS_HEADER; continue; } else { errlogSevPrintf( errlogMajor, "Unknown packet type %d, received from client %s, disconnecting...", type, inetAddressToString(_socketAddress).c_str()); close(true); return; } } if(_stage==PROCESS_PAYLOAD) { // read header int8 version = (int8)(_magicAndVersion&0xFF); // last segment bit set (means in-between segment or last segment) bool notFirstSegment = (_packetType&0x20)!=0; _storedPayloadSize = _payloadSize; // if segmented, exit reading code if(nestedCall&¬FirstSegment) return; // NOTE: nested data (w/ payload) messages between segmented messages are not supported _storedPosition = _socketBuffer->getPosition(); _storedLimit = _socketBuffer->getLimit(); _socketBuffer->setLimit(min(_storedPosition +_storedPayloadSize, _storedLimit)); try { // handle response _responseHandler->handleResponse(&_socketAddress, this, version, _command, _payloadSize, _socketBuffer); } catch(...) { //noop } /* * Java finally start */ _socketBuffer->setLimit(_storedLimit); int newPosition = _storedPosition+_storedPayloadSize; if(newPosition>_storedLimit) { newPosition -= _storedLimit; _socketBuffer->setPosition(_storedLimit); processReadCached(true, PROCESS_PAYLOAD, newPosition, false); newPosition += _startPosition; } _socketBuffer->setPosition(newPosition); // TODO discard all possible segments?!!! /* * Java finally end */ _stage = PROCESS_HEADER; continue; } } } catch(...) { // close connection close(true); if(nestedCall) throw; } } bool BlockingTCPTransport::flush() { // request issues, has not sent anything yet (per partes) if(!_sendPending) { _sendPending = true; // start sending from the start _sendBufferSentPosition = 0; // if not set skip marker otherwise set it _flowControlMutex.lock(); int markerValue = _markerToSend; _markerToSend = 0; _flowControlMutex.unlock(); if(markerValue==0) _sendBufferSentPosition = CA_MESSAGE_HEADER_SIZE; else _sendBuffer->putInt(4, markerValue); } bool success = false; try { // remember current position int currentPos = _sendBuffer->getPosition(); // set to send position _sendBuffer->setPosition(_sendBufferSentPosition); _sendBuffer->setLimit(currentPos); success = send(_sendBuffer); // all sent? if(success) clearAndReleaseBuffer(); else { // remember position _sendBufferSentPosition = _sendBuffer->getPosition(); // .. reset to previous state _sendBuffer->setPosition(currentPos); _sendBuffer->setLimit(_sendBuffer->getSize()); } } catch(std::exception& e) { errlogSevPrintf(errlogMajor, "%s", e.what()); // error, release lock clearAndReleaseBuffer(); } catch(...) { clearAndReleaseBuffer(); } return success; } bool BlockingTCPTransport::send(ByteBuffer* buffer) { try { // TODO simply use value from marker???!!! // On Windows, limiting the buffer size is important to prevent // poor throughput performances when transferring large amount of // data. See Microsoft KB article KB823764. // We do it also for other systems just to be safe. int maxBytesToSend = min(_socketSendBufferSize, _remoteTransportSocketReceiveBufferSize)/2; int limit = buffer->getLimit(); int bytesToSend = limit-buffer->getPosition(); //errlogSevPrintf(errlogInfo,"Total bytes to send: %d", bytesToSend); // limit sending if(bytesToSend>maxBytesToSend) { bytesToSend = maxBytesToSend; buffer->setLimit(buffer->getPosition()+bytesToSend); } //errlogSevPrintf(errlogInfo, // "Sending %d of total %d bytes in the packet to %s.", // bytesToSend, limit, // inetAddressToString(_socketAddress).c_str()); while(buffer->getRemaining()>0) { ssize_t bytesSent = ::send(_channel, &buffer->getArray()[buffer->getPosition()], buffer->getRemaining(), 0); if(bytesSent<0) { // connection lost ostringstream temp; temp<<"error in sending TCP data: "<getPosition(), limit); /* buffers full, reset the limit and indicate that there * is more data to be sent */ if(bytesSent==maxBytesToSend) buffer->setLimit(limit); //errlogSevPrintf(errlogInfo, // "Send buffer full for %s, waiting...", // inetAddressToString(_socketAddress)); return false; } buffer->setPosition(buffer->getPosition()+bytesSent); _flowControlMutex.lock(); _totalBytesSent += bytesSent; _flowControlMutex.unlock(); // readjust limit if(bytesToSend==maxBytesToSend) { bytesToSend = limit-buffer->getPosition(); if(bytesToSend>maxBytesToSend) bytesToSend = maxBytesToSend; buffer->setLimit(buffer->getPosition()+bytesToSend); } //errlogSevPrintf(errlogInfo, // "Sent, position %d of total %d bytes.", // buffer->getPosition(), limit); } // while } catch(...) { close(true); throw; } // all sent return true; } TransportSender* BlockingTCPTransport::extractFromSendQueue() { TransportSender* retval; _sendQueueMutex.lock(); retval = _sendQueue->extract(); _sendQueueMutex.unlock(); return retval; } void BlockingTCPTransport::processSendQueue() { while(!_closed) { TransportSender* sender; sender = extractFromSendQueue(); // wait for new message while(sender==NULL&&!_flushRequested&&!_closed) { if(_flushStrategy==DELAYED) { if(_delay>0) epicsThreadSleep(_delay); if(_sendQueue->size()==0) { // if (hasMonitors || sendBuffer.position() > CAConstants.CA_MESSAGE_HEADER_SIZE) if(_sendBuffer->getPosition()>CA_MESSAGE_HEADER_SIZE) _flushRequested = true; else _sendQueueEvent.wait(); } } else _sendQueueEvent.wait(); sender = extractFromSendQueue(); } // always do flush from this thread if(_flushRequested) { /* if (hasMonitors) { monitorSender.send(sendBuffer, this); } */ flush(); } if(sender!=NULL) { sender->lock(); try { _lastMessageStartPosition = _sendBuffer->getPosition(); sender->send(_sendBuffer, this); if(_flushStrategy==IMMEDIATE) flush(true); else endMessage(false);// automatic end (to set payload) } catch(std::exception &e) { errlogSevPrintf(errlogMajor, "%s", e.what()); _sendBuffer->setPosition(_lastMessageStartPosition); } catch(...) { _sendBuffer->setPosition(_lastMessageStartPosition); } sender->unlock(); } // if(sender!=NULL) } // while(!_closed) } void BlockingTCPTransport::freeSendBuffers() { // TODO ? } void BlockingTCPTransport::freeConnectionResorces() { freeSendBuffers(); errlogSevPrintf(errlogInfo, "Connection to %s closed.", inetAddressToString(_socketAddress).c_str()); if(_channel!=INVALID_SOCKET) { epicsSocketDestroy(_channel); _channel = INVALID_SOCKET; } } void BlockingTCPTransport::rcvThreadRunner(void* param) { BlockingTCPTransport* obj = (BlockingTCPTransport*)param; try{ obj->processReadCached(false, NONE, CA_MESSAGE_HEADER_SIZE, false); } catch (...) { printf("rcvThreadRunnner exception\n"); } if(obj->_autoDelete) { while(true) { bool exited; obj->_mutex.lock(); exited = obj->_sendThreadExited; obj->_mutex.unlock(); if (exited) break; epicsThreadSleep(0.1); } delete obj; } } void BlockingTCPTransport::sendThreadRunner(void* param) { BlockingTCPTransport* obj = (BlockingTCPTransport*)param; try { obj->processSendQueue(); } catch (...) { printf("sendThreadRunnner exception\n"); } obj->freeConnectionResorces(); // TODO possible crash on unlock obj->_mutex.lock(); obj->_sendThreadExited = true; obj->_mutex.unlock(); } void BlockingTCPTransport::enqueueSendRequest(TransportSender* sender) { Lock lock(&_sendQueueMutex); if(_closed) return; _sendQueue->insert(sender); _sendQueueEvent.signal(); } void BlockingTCPTransport::enqueueMonitorSendRequest(TransportSender* sender) { Lock lock(&_monitorMutex); if(_closed) return; _monitorSendQueue->insert(sender); if(_monitorSendQueue->size()==1) enqueueSendRequest(_monitorSender); } void MonitorSender::send(ByteBuffer* buffer, TransportSendControl* control) { control->startMessage(19, 0); while(true) { TransportSender* sender; _monitorMutex->lock(); if(_monitorSendQueue->size()>0) sender = _monitorSendQueue->extract(); else sender = NULL; _monitorMutex->unlock(); if(sender==NULL) { control->ensureBuffer(sizeof(int32)); buffer->putInt(CAJ_INVALID_IOID); break; } sender->send(buffer, control); } } } }