TCP read fixed, some unactive alignment code added

This commit is contained in:
Matej Sekoranja
2011-09-20 17:37:28 +02:00
parent 24d8cb96a3
commit d03ac9d00f
9 changed files with 157 additions and 86 deletions

View File

@@ -23,11 +23,14 @@
#include <epicsThread.h>
#include <logger.h>
#include <pv/hexDump.h>
/* standard */
#include <sys/types.h>
#include <algorithm>
#include <sstream>
#ifdef _WIN32
#include <BaseTsd.h>
typedef SSIZE_T ssize_t;
@@ -39,6 +42,15 @@ using std::max;
using std::min;
using std::ostringstream;
// TODO moved to some compiler_utils.h?
#if defined(__GNUC__)
#define likely(x) __builtin_expect (x, 1)
#define unlikely(x) __builtin_expect (x, 0)
#else
#define likely(x) (x)
#define unlikely(x) (x)
#endif
namespace epics {
namespace pvAccess {
@@ -136,7 +148,7 @@ namespace epics {
osiSocklen_t intLen = sizeof(int);
int retval = getsockopt(_channel, SOL_SOCKET, SO_SNDBUF, (char *)&_socketSendBufferSize, &intLen);
if(retval<0) {
if(unlikely(retval<0)) {
_socketSendBufferSize = MAX_TCP_RECV;
char errStr[64];
epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
@@ -147,7 +159,7 @@ namespace epics {
osiSocklen_t saSize = sizeof(sockaddr);
retval = getpeername(_channel, &(_socketAddress.sa), &saSize);
if(retval<0) {
if(unlikely(retval<0)) {
char errStr[64];
epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
LOG(logLevelError,
@@ -161,7 +173,7 @@ namespace epics {
timeout.tv_sec = 1;
timeout.tv_usec = 0;
if (::setsockopt (_channel, SOL_SOCKET, SO_RCVTIMEO, (char*)&timeout, sizeof(timeout)) < 0)
if (unlikely(::setsockopt (_channel, SOL_SOCKET, SO_RCVTIMEO, (char*)&timeout, sizeof(timeout)) < 0))
{
char errStr[64];
epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
@@ -285,7 +297,7 @@ namespace epics {
osiSocklen_t intLen = sizeof(int);
int retval = getsockopt(_channel, SOL_SOCKET, SO_RCVBUF, (char *)&sockBufSize, &intLen);
if(retval<0)
if(unlikely(retval<0))
{
char errStr[64];
epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
@@ -319,9 +331,10 @@ namespace epics {
}
_lastMessageStartPosition = _sendBuffer->getPosition();
// start with last header
if(!lastMessageCompleted&&_lastSegmentedMessageType!=0) startMessage(
_lastSegmentedMessageCommand, 0);
if (unlikely(!lastMessageCompleted && _lastSegmentedMessageType!=0))
startMessage(_lastSegmentedMessageCommand, 0);
}
void BlockingTCPTransport::startMessage(int8 command, int ensureCapacity) {
@@ -341,10 +354,10 @@ namespace epics {
}
void BlockingTCPTransport::ensureBuffer(int size) {
if((int)(_sendBuffer->getRemaining())>=size) return;
if(likely((int)(_sendBuffer->getRemaining())>=size)) return;
// too large for buffer...
if(_maxPayloadSize<size) {
if(unlikely(_maxPayloadSize<size)) {
ostringstream temp;
temp<<"requested for buffer size "<<size<<", but only ";
temp<<_maxPayloadSize<<" available.";
@@ -353,26 +366,37 @@ namespace epics {
// TODO sync _closed
while(((int)_sendBuffer->getRemaining())<size&&!_closed)
while(((int)_sendBuffer->getRemaining())<size && !_closed)
flush(false);
if(_closed) THROW_BASE_EXCEPTION("transport closed");
if (unlikely(_closed)) THROW_BASE_EXCEPTION("transport closed");
}
void BlockingTCPTransport::alignBuffer(int alignment) {
// not space optimal (always requires 7-bytes), but fast
if(unlikely((int)(_sendBuffer->getRemaining())<(alignment-1)))
ensureBuffer(alignment-1);
_sendBuffer->align(alignment);
}
void BlockingTCPTransport::endMessage(bool hasMoreSegments) {
if(_lastMessageStartPosition>=0) {
// TODO align?
// set message size
if(likely(_lastMessageStartPosition>=0)) {
// align
// alignBuffer(CA_ALIGNMENT);
// set paylaod size
_sendBuffer->putInt(_lastMessageStartPosition+sizeof(int16)+2,
_sendBuffer->getPosition()-_lastMessageStartPosition
-CA_MESSAGE_HEADER_SIZE);
int flagsPosition = _lastMessageStartPosition+sizeof(int16);
// set segmented bit
if(hasMoreSegments) {
if(likely(hasMoreSegments)) {
// first segment
if(_lastSegmentedMessageType==0) {
if(unlikely(_lastSegmentedMessageType==0)) {
int8 type = _sendBuffer->getByte(flagsPosition);
// set first segment bit
@@ -386,7 +410,7 @@ namespace epics {
}
else {
// last segment
if(_lastSegmentedMessageType!=0) {
if(unlikely(_lastSegmentedMessageType!=0)) {
// set last segment bit (by clearing first segment bit)
_sendBuffer->putByte(flagsPosition,
(int8)(_lastSegmentedMessageType&0xEF));
@@ -398,14 +422,13 @@ namespace epics {
int position = _sendBuffer->getPosition();
int bytesLeft = _sendBuffer->getRemaining();
if(position>=_nextMarkerPosition &&
bytesLeft>=CA_MESSAGE_HEADER_SIZE) {
if(unlikely(position>=_nextMarkerPosition &&
bytesLeft>=CA_MESSAGE_HEADER_SIZE)) {
_sendBuffer->putByte(CA_MAGIC);
_sendBuffer->putByte(CA_VERSION);
_sendBuffer->putByte(1); // control data
_sendBuffer->putByte(0); // marker
_sendBuffer->putInt((int)(_totalBytesSent+position
+CA_MESSAGE_HEADER_SIZE));
_sendBuffer->putInt((int)(_totalBytesSent+position+CA_MESSAGE_HEADER_SIZE));
_nextMarkerPosition = position+_markerPeriodBytes;
}
}
@@ -413,13 +436,13 @@ namespace epics {
void BlockingTCPTransport::ensureData(int size) {
// enough of data?
if(((int)_socketBuffer->getRemaining())>=size) return;
if(likely(((int)_socketBuffer->getRemaining())>=size)) return;
// too large for buffer...
if(_maxPayloadSize<size) {
if(unlikely(MAX_ENSURE_DATA_BUFFER_SIZE<size)) {
ostringstream temp;
temp<<"requested for buffer size "<<size<<", but only ";
temp<<_maxPayloadSize<<" available.";
temp<<MAX_ENSURE_DATA_BUFFER_SIZE<<" available.";
THROW_BASE_EXCEPTION(temp.str().c_str());
}
@@ -427,14 +450,14 @@ namespace epics {
_storedPayloadSize -= _socketBuffer->getPosition()-_storedPosition;
// no more data and we have some payload left => read buffer
if(_storedPayloadSize>=size) {
if(likely(_storedPayloadSize>=size)) {
//LOG(logLevelInfo,
// "storedPayloadSize >= size, remaining: %d",
// _socketBuffer->getRemaining());
// just read up remaining payload
// since there is no data on the buffer, read to the beginning of it, at least size bytes
processReadCached(true, PROCESS_PAYLOAD, size, false);
// just read up remaining payload, move current (<size) part of the buffer
// to the beginning of the buffer
processReadCached(true, PROCESS_PAYLOAD, size);
_storedPosition = _socketBuffer->getPosition();
_storedLimit = _socketBuffer->getLimit();
_socketBuffer->setLimit(min(_storedPosition+_storedPayloadSize,
@@ -450,7 +473,7 @@ namespace epics {
_socketBuffer->setLimit(_storedLimit);
_stage = PROCESS_HEADER;
processReadCached(true, NONE, size, false);
processReadCached(true, NONE, size-remainingBytes);
// copy before position
for(int i = remainingBytes-1, j = _socketBuffer->getPosition()
@@ -467,39 +490,46 @@ namespace epics {
// TODO sync _closed
// add if missing...
if(!_closed&&((int)_socketBuffer->getRemaining())<size)
if(unlikely(!_closed&&((int)_socketBuffer->getRemaining())<size))
ensureData(size);
}
if(_closed) THROW_BASE_EXCEPTION("transport closed");
if(unlikely(_closed)) THROW_BASE_EXCEPTION("transport closed");
}
void BlockingTCPTransport::alignData(int alignment) {
// not space optimal (always requires 7-bytes), but fast
if(unlikely((int)(_socketBuffer->getRemaining())<(alignment-1)))
ensureData(alignment-1);
_socketBuffer->align(alignment);
}
void BlockingTCPTransport::processReadCached(bool nestedCall,
ReceiveStage inStage, int requiredBytes, bool addToBuffer) {
ReceiveStage inStage, int requiredBytes) {
try {
// TODO sync _closed
while(!_closed) {
while(likely(!_closed)) {
if(_stage==READ_FROM_SOCKET||inStage!=NONE) {
int currentStartPosition;
if(addToBuffer) {
currentStartPosition = _socketBuffer->getPosition();
_socketBuffer->setPosition(_socketBuffer->getLimit());
_socketBuffer->setLimit(_socketBuffer->getSize());
}
else {
// add to bytes read
_totalBytesReceived += (_socketBuffer->getPosition() -_startPosition);
// copy remaining bytes, if any
int remainingBytes = _socketBuffer->getRemaining();
int endPosition = MAX_ENSURE_DATA_BUFFER_SIZE+remainingBytes;
for(int i = MAX_ENSURE_DATA_BUFFER_SIZE; i<endPosition; i++)
_socketBuffer->putByte(i, _socketBuffer->getByte());
// add to bytes read
int currentPosition = _socketBuffer->getPosition();
_totalBytesReceived += (currentPosition - _startPosition);
currentStartPosition = _startPosition = MAX_ENSURE_DATA_BUFFER_SIZE;
_socketBuffer->setPosition(MAX_ENSURE_DATA_BUFFER_SIZE+remainingBytes);
_socketBuffer->setLimit(_socketBuffer->getSize());
}
// preserve alignment
int currentStartPosition = _startPosition =
MAX_ENSURE_DATA_BUFFER_SIZE; // "TODO uncomment align" + (unsigned int)currentPosition % CA_ALIGNMENT;
// copy remaining bytes, if any
int remainingBytes = _socketBuffer->getRemaining();
int endPosition = currentStartPosition + remainingBytes;
// TODO memmove
for(int i = MAX_ENSURE_DATA_BUFFER_SIZE; i<endPosition; i++)
_socketBuffer->putByte(i, _socketBuffer->getByte());
_socketBuffer->setPosition(endPosition);
_socketBuffer->setLimit(_socketBuffer->getSize());
// read at least requiredBytes bytes
@@ -509,9 +539,8 @@ namespace epics {
int pos = _socketBuffer->getPosition();
ssize_t bytesRead = recv(_channel, (char*)(_socketBuffer->getArray()+pos),
_socketBuffer->getRemaining(), 0);
_socketBuffer->setPosition(pos+bytesRead);
if(bytesRead<=0) {
if(unlikely(bytesRead<=0)) {
if (bytesRead<0)
{
@@ -532,10 +561,18 @@ namespace epics {
return;
}
_socketBuffer->setPosition(pos+bytesRead);
}
_socketBuffer->setLimit(_socketBuffer->getPosition());
_socketBuffer->setPosition(currentStartPosition);
/*
hexDump("\n\n\n", "READ",
(const int8*)_socketBuffer->getArray(),
_socketBuffer->getPosition(), _socketBuffer->getRemaining());
*/
// notify liveness
aliveNotification();
@@ -545,16 +582,16 @@ namespace epics {
_stage = PROCESS_HEADER;
}
if(_stage==PROCESS_HEADER) {
if(likely(_stage==PROCESS_HEADER)) {
// ensure CAConstants.CA_MESSAGE_HEADER_SIZE bytes of data
if(((int)_socketBuffer->getRemaining())<CA_MESSAGE_HEADER_SIZE)
processReadCached(true, PROCESS_HEADER, CA_MESSAGE_HEADER_SIZE, false);
if(unlikely(((int)_socketBuffer->getRemaining())<CA_MESSAGE_HEADER_SIZE))
processReadCached(true, PROCESS_HEADER, CA_MESSAGE_HEADER_SIZE);
// first byte is CA_MAGIC
// second byte version - major/minor nibble
int8 magic = _socketBuffer->getByte();
_version = _socketBuffer->getByte();
if((magic != CA_MAGIC) || (((unsigned int8)_version) >> 4)!=CA_MAJOR_PROTOCOL_REVISION)
if(unlikely((magic != CA_MAGIC) || (((unsigned int8)_version) >> 4)!=CA_MAJOR_PROTOCOL_REVISION))
{
// error... disconnect
LOG(
@@ -575,12 +612,12 @@ namespace epics {
_payloadSize = _socketBuffer->getInt();
int8 type = (int8)(_packetType&0x0F);
if(type==0)
if(likely(type==0))
{
// data
_stage = PROCESS_PAYLOAD;
}
else if(type==1)
else if(unlikely(type==1))
{
// control
@@ -633,7 +670,7 @@ namespace epics {
}
}
if(_stage==PROCESS_PAYLOAD) {
if(likely(_stage==PROCESS_PAYLOAD)) {
// read header
// last segment bit set (means in-between segment or last segment)
@@ -660,10 +697,10 @@ namespace epics {
_socketBuffer->setLimit(_storedLimit);
int newPosition = _storedPosition+_storedPayloadSize;
if(newPosition>_storedLimit) {
if(unlikely(newPosition>_storedLimit)) {
newPosition -= _storedLimit;
_socketBuffer->setPosition(_storedLimit);
processReadCached(true, PROCESS_PAYLOAD,newPosition, false);
processReadCached(true, PROCESS_PAYLOAD,newPosition);
newPosition += _startPosition;
}
_socketBuffer->setPosition(newPosition);
@@ -685,7 +722,7 @@ namespace epics {
bool BlockingTCPTransport::flush() {
// request issues, has not sent anything yet (per partes)
if(!_sendPending) {
if(likely(!_sendPending)) {
_sendPending = true;
// start sending from the start
@@ -714,7 +751,7 @@ namespace epics {
success = send(_sendBuffer);
// all sent?
if(success)
if(likely(success))
clearAndReleaseBuffer();
else {
// remember position
@@ -765,7 +802,7 @@ namespace epics {
&buffer->getArray()[buffer->getPosition()],
buffer->getRemaining(), 0);
if(bytesSent<0) {
if(unlikely(bytesSent<0)) {
int socketError = SOCKERRNO;
@@ -791,7 +828,7 @@ namespace epics {
//LOG(logLevelError, "%s", temp.str().c_str());
THROW_BASE_EXCEPTION(temp.str().c_str());
}
else if(bytesSent==0) {
else if(unlikely(bytesSent==0)) {
// TODO WINSOCK indicates disconnect by returning zero here !!!
@@ -839,12 +876,12 @@ namespace epics {
void BlockingTCPTransport::processSendQueue() {
// TODO sync _closed
while(!_closed) {
while(unlikely(!_closed)) {
_sendQueueMutex.lock();
// TODO optimize
TransportSender::shared_pointer sender;
if (!_sendQueue.empty())
if (likely(!_sendQueue.empty()))
{
sender = _sendQueue.front();
_sendQueue.pop_front();
@@ -852,10 +889,10 @@ namespace epics {
_sendQueueMutex.unlock();
// wait for new message
while(sender.get()==0&&!_flushRequested&&!_closed) {
while(likely(sender.get()==0&&!_flushRequested&&!_closed)) {
if(_flushStrategy==DELAYED) {
if(_delay>0) epicsThreadSleep(_delay);
if(_sendQueue.empty()) {
if(unlikely(_sendQueue.empty())) {
// if (hasMonitors || sendBuffer.position() > CAConstants.CA_MESSAGE_HEADER_SIZE)
if(((int)_sendBuffer->getPosition())>CA_MESSAGE_HEADER_SIZE)
_flushRequested = true;
@@ -867,7 +904,7 @@ namespace epics {
_sendQueueEvent.wait();
_sendQueueMutex.lock();
if (!_sendQueue.empty())
if (likely(!_sendQueue.empty()))
{
sender = _sendQueue.front();
_sendQueue.pop_front();
@@ -878,7 +915,7 @@ namespace epics {
}
// always do flush from this thread
if(_flushRequested) {
if(unlikely(_flushRequested)) {
/*
if (hasMonitors)
{
@@ -889,7 +926,7 @@ namespace epics {
flush();
}
if(sender.get()) {
if(likely(sender.get() != 0)) {
sender->lock();
try {
_lastMessageStartPosition = _sendBuffer->getPosition();
@@ -932,7 +969,7 @@ namespace epics {
Transport::shared_pointer ptr = obj->shared_from_this(); // hold reference
try{
obj->processReadCached(false, NONE, CA_MESSAGE_HEADER_SIZE, false);
obj->processReadCached(false, NONE, CA_MESSAGE_HEADER_SIZE);
} catch (...) {
printf("rcvThreadRunnner exception\n");
}