This commit is contained in:
jrowlandls
2011-09-21 10:33:16 +01:00
9 changed files with 175 additions and 101 deletions

View File

@@ -75,6 +75,9 @@ namespace epics {
/** Invalid IOID. */
const int32 INVALID_IOID = 0;
/** All messages must be aligned to 8-bytes (64-bit). */
const int32 CA_ALIGNMENT = 8;
/** Default CA provider name. */
const String PVACCESS_DEFAULT_PROVIDER = "local";
}

View File

@@ -141,8 +141,12 @@ namespace epics {
virtual void ensureBuffer(int size);
virtual void alignBuffer(int alignment);
virtual void ensureData(int size);
virtual void alignData(int alignment);
virtual void close(bool force);
SendQueueFlushStrategy getSendQueueFlushStrategy() {
@@ -172,7 +176,7 @@ namespace epics {
protected:
virtual void processReadCached(bool nestedCall,
ReceiveStage inStage, int requiredBytes, bool addToBuffer);
ReceiveStage inStage, int requiredBytes);
/**
* Called to any resources just before closing transport
@@ -324,7 +328,7 @@ namespace epics {
int _sendBufferSentPosition;
int8 _byteOrderFlag;

View File

@@ -1,8 +1,5 @@
/*
* blockingTCPTransport.cpp
*
* Created on: Dec 29, 2010
* Author: Miha Vitorovic
*/
#define __STDC_LIMIT_MACROS 1
@@ -23,11 +20,14 @@
#include <epicsThread.h>
#include <logger.h>
#include <pv/hexDump.h>
/* standard */
#include <sys/types.h>
#include <algorithm>
#include <sstream>
#ifdef _WIN32
#include <BaseTsd.h>
typedef SSIZE_T ssize_t;
@@ -39,6 +39,15 @@ using std::max;
using std::min;
using std::ostringstream;
// TODO moved to some compiler_utils.h?
#if defined(__GNUC__)
#define likely(x) __builtin_expect (x, 1)
#define unlikely(x) __builtin_expect (x, 0)
#else
#define likely(x) (x)
#define unlikely(x) (x)
#endif
namespace epics {
namespace pvAccess {
@@ -104,6 +113,7 @@ namespace epics {
_lastSegmentedMessageCommand(0),
_flushRequested(false),
_sendBufferSentPosition(0),
_byteOrderFlag((EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG) ? 0x80 : 0x00),
_storedPayloadSize(0),
_storedPosition(0),
_storedLimit(0),
@@ -136,7 +146,7 @@ namespace epics {
osiSocklen_t intLen = sizeof(int);
int retval = getsockopt(_channel, SOL_SOCKET, SO_SNDBUF, (char *)&_socketSendBufferSize, &intLen);
if(retval<0) {
if(unlikely(retval<0)) {
_socketSendBufferSize = MAX_TCP_RECV;
char errStr[64];
epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
@@ -147,7 +157,7 @@ namespace epics {
osiSocklen_t saSize = sizeof(sockaddr);
retval = getpeername(_channel, &(_socketAddress.sa), &saSize);
if(retval<0) {
if(unlikely(retval<0)) {
char errStr[64];
epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
LOG(logLevelError,
@@ -161,7 +171,7 @@ namespace epics {
timeout.tv_sec = 1;
timeout.tv_usec = 0;
if (::setsockopt (_channel, SOL_SOCKET, SO_RCVTIMEO, (char*)&timeout, sizeof(timeout)) < 0)
if (unlikely(::setsockopt (_channel, SOL_SOCKET, SO_RCVTIMEO, (char*)&timeout, sizeof(timeout)) < 0))
{
char errStr[64];
epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
@@ -169,7 +179,8 @@ namespace epics {
"Failed to set SO_RCVTIMEO for TDP socket %s: %s.",
inetAddressToString(_socketAddress).c_str(), errStr);
}
// TODO this will create marker with invalid endian flag
// prepare buffer
clearAndReleaseBuffer();
}
@@ -236,7 +247,7 @@ namespace epics {
// prepare ACK marker
_sendBuffer->putByte(CA_MAGIC);
_sendBuffer->putByte(CA_VERSION);
_sendBuffer->putByte(1); // control data
_sendBuffer->putByte(0x01 | _byteOrderFlag); // control data
_sendBuffer->putByte(1); // marker ACK
_sendBuffer->putInt(0);
}
@@ -285,7 +296,7 @@ namespace epics {
osiSocklen_t intLen = sizeof(int);
int retval = getsockopt(_channel, SOL_SOCKET, SO_RCVBUF, (char *)&sockBufSize, &intLen);
if(retval<0)
if(unlikely(retval<0))
{
char errStr[64];
epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
@@ -319,9 +330,10 @@ namespace epics {
}
_lastMessageStartPosition = _sendBuffer->getPosition();
// start with last header
if(!lastMessageCompleted&&_lastSegmentedMessageType!=0) startMessage(
_lastSegmentedMessageCommand, 0);
if (unlikely(!lastMessageCompleted && _lastSegmentedMessageType!=0))
startMessage(_lastSegmentedMessageCommand, 0);
}
void BlockingTCPTransport::startMessage(int8 command, int ensureCapacity) {
@@ -330,7 +342,7 @@ namespace epics {
_lastMessageStartPosition = _sendBuffer->getPosition();
_sendBuffer->putByte(CA_MAGIC);
_sendBuffer->putByte(CA_VERSION);
_sendBuffer->putByte(_lastSegmentedMessageType | ((EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG) ? 0x80 : 0x00)); // data + endianess
_sendBuffer->putByte(_lastSegmentedMessageType | _byteOrderFlag); // data + endianess
_sendBuffer->putByte(command); // command
_sendBuffer->putInt(0); // temporary zero payload
@@ -341,10 +353,10 @@ namespace epics {
}
void BlockingTCPTransport::ensureBuffer(int size) {
if((int)(_sendBuffer->getRemaining())>=size) return;
if(likely((int)(_sendBuffer->getRemaining())>=size)) return;
// too large for buffer...
if(_maxPayloadSize<size) {
if(unlikely(_maxPayloadSize<size)) {
ostringstream temp;
temp<<"requested for buffer size "<<size<<", but only ";
temp<<_maxPayloadSize<<" available.";
@@ -353,26 +365,37 @@ namespace epics {
// TODO sync _closed
while(((int)_sendBuffer->getRemaining())<size&&!_closed)
while(((int)_sendBuffer->getRemaining())<size && !_closed)
flush(false);
if(_closed) THROW_BASE_EXCEPTION("transport closed");
if (unlikely(_closed)) THROW_BASE_EXCEPTION("transport closed");
}
void BlockingTCPTransport::alignBuffer(int alignment) {
// not space optimal (always requires 7-bytes), but fast
if(unlikely((int)(_sendBuffer->getRemaining())<(alignment-1)))
ensureBuffer(alignment-1);
_sendBuffer->align(alignment);
}
void BlockingTCPTransport::endMessage(bool hasMoreSegments) {
if(_lastMessageStartPosition>=0) {
// TODO align?
// set message size
if(likely(_lastMessageStartPosition>=0)) {
// align
// alignBuffer(CA_ALIGNMENT);
// set paylaod size
_sendBuffer->putInt(_lastMessageStartPosition+sizeof(int16)+2,
_sendBuffer->getPosition()-_lastMessageStartPosition
-CA_MESSAGE_HEADER_SIZE);
int flagsPosition = _lastMessageStartPosition+sizeof(int16);
// set segmented bit
if(hasMoreSegments) {
if(likely(hasMoreSegments)) {
// first segment
if(_lastSegmentedMessageType==0) {
if(unlikely(_lastSegmentedMessageType==0)) {
int8 type = _sendBuffer->getByte(flagsPosition);
// set first segment bit
@@ -386,7 +409,7 @@ namespace epics {
}
else {
// last segment
if(_lastSegmentedMessageType!=0) {
if(unlikely(_lastSegmentedMessageType!=0)) {
// set last segment bit (by clearing first segment bit)
_sendBuffer->putByte(flagsPosition,
(int8)(_lastSegmentedMessageType&0xEF));
@@ -398,14 +421,13 @@ namespace epics {
int position = _sendBuffer->getPosition();
int bytesLeft = _sendBuffer->getRemaining();
if(position>=_nextMarkerPosition &&
bytesLeft>=CA_MESSAGE_HEADER_SIZE) {
if(unlikely(position>=_nextMarkerPosition &&
bytesLeft>=CA_MESSAGE_HEADER_SIZE)) {
_sendBuffer->putByte(CA_MAGIC);
_sendBuffer->putByte(CA_VERSION);
_sendBuffer->putByte(1); // control data
_sendBuffer->putByte(0x01 | _byteOrderFlag); // control data
_sendBuffer->putByte(0); // marker
_sendBuffer->putInt((int)(_totalBytesSent+position
+CA_MESSAGE_HEADER_SIZE));
_sendBuffer->putInt((int)(_totalBytesSent+position+CA_MESSAGE_HEADER_SIZE));
_nextMarkerPosition = position+_markerPeriodBytes;
}
}
@@ -413,13 +435,13 @@ namespace epics {
void BlockingTCPTransport::ensureData(int size) {
// enough of data?
if(((int)_socketBuffer->getRemaining())>=size) return;
if(likely(((int)_socketBuffer->getRemaining())>=size)) return;
// too large for buffer...
if(_maxPayloadSize<size) {
if(unlikely(MAX_ENSURE_DATA_BUFFER_SIZE<size)) {
ostringstream temp;
temp<<"requested for buffer size "<<size<<", but only ";
temp<<_maxPayloadSize<<" available.";
temp<<MAX_ENSURE_DATA_BUFFER_SIZE<<" available.";
THROW_BASE_EXCEPTION(temp.str().c_str());
}
@@ -427,14 +449,14 @@ namespace epics {
_storedPayloadSize -= _socketBuffer->getPosition()-_storedPosition;
// no more data and we have some payload left => read buffer
if(_storedPayloadSize>=size) {
if(likely(_storedPayloadSize>=size)) {
//LOG(logLevelInfo,
// "storedPayloadSize >= size, remaining: %d",
// _socketBuffer->getRemaining());
// just read up remaining payload
// since there is no data on the buffer, read to the beginning of it, at least size bytes
processReadCached(true, PROCESS_PAYLOAD, size, false);
// just read up remaining payload, move current (<size) part of the buffer
// to the beginning of the buffer
processReadCached(true, PROCESS_PAYLOAD, size);
_storedPosition = _socketBuffer->getPosition();
_storedLimit = _socketBuffer->getLimit();
_socketBuffer->setLimit(min(_storedPosition+_storedPayloadSize,
@@ -450,7 +472,7 @@ namespace epics {
_socketBuffer->setLimit(_storedLimit);
_stage = PROCESS_HEADER;
processReadCached(true, NONE, size, false);
processReadCached(true, NONE, size-remainingBytes);
// copy before position
for(int i = remainingBytes-1, j = _socketBuffer->getPosition()
@@ -467,39 +489,46 @@ namespace epics {
// TODO sync _closed
// add if missing...
if(!_closed&&((int)_socketBuffer->getRemaining())<size)
if(unlikely(!_closed&&((int)_socketBuffer->getRemaining())<size))
ensureData(size);
}
if(_closed) THROW_BASE_EXCEPTION("transport closed");
if(unlikely(_closed)) THROW_BASE_EXCEPTION("transport closed");
}
void BlockingTCPTransport::alignData(int alignment) {
// not space optimal (always requires 7-bytes), but fast
if(unlikely((int)(_socketBuffer->getRemaining())<(alignment-1)))
ensureData(alignment-1);
_socketBuffer->align(alignment);
}
void BlockingTCPTransport::processReadCached(bool nestedCall,
ReceiveStage inStage, int requiredBytes, bool addToBuffer) {
ReceiveStage inStage, int requiredBytes) {
try {
// TODO sync _closed
while(!_closed) {
while(likely(!_closed)) {
if(_stage==READ_FROM_SOCKET||inStage!=NONE) {
int currentStartPosition;
if(addToBuffer) {
currentStartPosition = _socketBuffer->getPosition();
_socketBuffer->setPosition(_socketBuffer->getLimit());
_socketBuffer->setLimit(_socketBuffer->getSize());
}
else {
// add to bytes read
_totalBytesReceived += (_socketBuffer->getPosition() -_startPosition);
// copy remaining bytes, if any
int remainingBytes = _socketBuffer->getRemaining();
int endPosition = MAX_ENSURE_DATA_BUFFER_SIZE+remainingBytes;
for(int i = MAX_ENSURE_DATA_BUFFER_SIZE; i<endPosition; i++)
_socketBuffer->putByte(i, _socketBuffer->getByte());
// add to bytes read
int currentPosition = _socketBuffer->getPosition();
_totalBytesReceived += (currentPosition - _startPosition);
currentStartPosition = _startPosition = MAX_ENSURE_DATA_BUFFER_SIZE;
_socketBuffer->setPosition(MAX_ENSURE_DATA_BUFFER_SIZE+remainingBytes);
_socketBuffer->setLimit(_socketBuffer->getSize());
}
// preserve alignment
int currentStartPosition = _startPosition =
MAX_ENSURE_DATA_BUFFER_SIZE; // "TODO uncomment align" + (unsigned int)currentPosition % CA_ALIGNMENT;
// copy remaining bytes, if any
int remainingBytes = _socketBuffer->getRemaining();
int endPosition = currentStartPosition + remainingBytes;
// TODO memmove
for(int i = MAX_ENSURE_DATA_BUFFER_SIZE; i<endPosition; i++)
_socketBuffer->putByte(i, _socketBuffer->getByte());
_socketBuffer->setPosition(endPosition);
_socketBuffer->setLimit(_socketBuffer->getSize());
// read at least requiredBytes bytes
@@ -509,9 +538,8 @@ namespace epics {
int pos = _socketBuffer->getPosition();
ssize_t bytesRead = recv(_channel, (char*)(_socketBuffer->getArray()+pos),
_socketBuffer->getRemaining(), 0);
_socketBuffer->setPosition(pos+bytesRead);
if(bytesRead<=0) {
if(unlikely(bytesRead<=0)) {
if (bytesRead<0)
{
@@ -532,10 +560,18 @@ namespace epics {
return;
}
_socketBuffer->setPosition(pos+bytesRead);
}
_socketBuffer->setLimit(_socketBuffer->getPosition());
_socketBuffer->setPosition(currentStartPosition);
/*
hexDump("\n\n\n", "READ",
(const int8*)_socketBuffer->getArray(),
_socketBuffer->getPosition(), _socketBuffer->getRemaining());
*/
// notify liveness
aliveNotification();
@@ -545,16 +581,16 @@ namespace epics {
_stage = PROCESS_HEADER;
}
if(_stage==PROCESS_HEADER) {
if(likely(_stage==PROCESS_HEADER)) {
// ensure CAConstants.CA_MESSAGE_HEADER_SIZE bytes of data
if(((int)_socketBuffer->getRemaining())<CA_MESSAGE_HEADER_SIZE)
processReadCached(true, PROCESS_HEADER, CA_MESSAGE_HEADER_SIZE, false);
if(unlikely(((int)_socketBuffer->getRemaining())<CA_MESSAGE_HEADER_SIZE))
processReadCached(true, PROCESS_HEADER, CA_MESSAGE_HEADER_SIZE);
// first byte is CA_MAGIC
// second byte version - major/minor nibble
int8 magic = _socketBuffer->getByte();
_version = _socketBuffer->getByte();
if((magic != CA_MAGIC) || (((uint8_t)_version) >> 4)!=CA_MAJOR_PROTOCOL_REVISION)
if(unlikely((magic != CA_MAGIC) || (((uint8_t)_version) >> 4)!=CA_MAJOR_PROTOCOL_REVISION))
{
// error... disconnect
LOG(
@@ -575,12 +611,12 @@ namespace epics {
_payloadSize = _socketBuffer->getInt();
int8 type = (int8)(_packetType&0x0F);
if(type==0)
if(likely(type==0))
{
// data
_stage = PROCESS_PAYLOAD;
}
else if(type==1)
else if(unlikely(type==1))
{
// control
@@ -612,9 +648,15 @@ namespace epics {
{
// check 7-th bit
// TODO no sync !!! on send
_socketBuffer->setEndianess(_packetType < 0 ? EPICS_ENDIAN_BIG : EPICS_ENDIAN_LITTLE);
_sendBuffer->setEndianess(_packetType < 0 ? EPICS_ENDIAN_BIG : EPICS_ENDIAN_LITTLE);
int endianess = (_packetType < 0 ? EPICS_ENDIAN_BIG : EPICS_ENDIAN_LITTLE);
_socketBuffer->setEndianess(endianess);
// TODO register as TransportSender and add to the queue
// current implementation is OK, but not nice
_sendQueueMutex.lock();
_sendBuffer->setEndianess(endianess);
_byteOrderFlag = (endianess == EPICS_ENDIAN_BIG) ? 0x80 : 0x00;
_sendQueueMutex.unlock();
}
@@ -633,7 +675,7 @@ namespace epics {
}
}
if(_stage==PROCESS_PAYLOAD) {
if(likely(_stage==PROCESS_PAYLOAD)) {
// read header
// last segment bit set (means in-between segment or last segment)
@@ -660,10 +702,10 @@ namespace epics {
_socketBuffer->setLimit(_storedLimit);
int newPosition = _storedPosition+_storedPayloadSize;
if(newPosition>_storedLimit) {
if(unlikely(newPosition>_storedLimit)) {
newPosition -= _storedLimit;
_socketBuffer->setPosition(_storedLimit);
processReadCached(true, PROCESS_PAYLOAD,newPosition, false);
processReadCached(true, PROCESS_PAYLOAD,newPosition);
newPosition += _startPosition;
}
_socketBuffer->setPosition(newPosition);
@@ -685,7 +727,7 @@ namespace epics {
bool BlockingTCPTransport::flush() {
// request issues, has not sent anything yet (per partes)
if(!_sendPending) {
if(likely(!_sendPending)) {
_sendPending = true;
// start sending from the start
@@ -714,7 +756,7 @@ namespace epics {
success = send(_sendBuffer);
// all sent?
if(success)
if(likely(success))
clearAndReleaseBuffer();
else {
// remember position
@@ -765,7 +807,7 @@ namespace epics {
&buffer->getArray()[buffer->getPosition()],
buffer->getRemaining(), 0);
if(bytesSent<0) {
if(unlikely(bytesSent<0)) {
int socketError = SOCKERRNO;
@@ -791,7 +833,7 @@ namespace epics {
//LOG(logLevelError, "%s", temp.str().c_str());
THROW_BASE_EXCEPTION(temp.str().c_str());
}
else if(bytesSent==0) {
else if(unlikely(bytesSent==0)) {
// TODO WINSOCK indicates disconnect by returning zero here !!!
@@ -839,12 +881,12 @@ namespace epics {
void BlockingTCPTransport::processSendQueue() {
// TODO sync _closed
while(!_closed) {
while(unlikely(!_closed)) {
_sendQueueMutex.lock();
// TODO optimize
TransportSender::shared_pointer sender;
if (!_sendQueue.empty())
if (likely(!_sendQueue.empty()))
{
sender = _sendQueue.front();
_sendQueue.pop_front();
@@ -852,10 +894,10 @@ namespace epics {
_sendQueueMutex.unlock();
// wait for new message
while(sender.get()==0&&!_flushRequested&&!_closed) {
while(likely(sender.get()==0&&!_flushRequested&&!_closed)) {
if(_flushStrategy==DELAYED) {
if(_delay>0) epicsThreadSleep(_delay);
if(_sendQueue.empty()) {
if(unlikely(_sendQueue.empty())) {
// if (hasMonitors || sendBuffer.position() > CAConstants.CA_MESSAGE_HEADER_SIZE)
if(((int)_sendBuffer->getPosition())>CA_MESSAGE_HEADER_SIZE)
_flushRequested = true;
@@ -867,7 +909,7 @@ namespace epics {
_sendQueueEvent.wait();
_sendQueueMutex.lock();
if (!_sendQueue.empty())
if (likely(!_sendQueue.empty()))
{
sender = _sendQueue.front();
_sendQueue.pop_front();
@@ -878,7 +920,7 @@ namespace epics {
}
// always do flush from this thread
if(_flushRequested) {
if(unlikely(_flushRequested)) {
/*
if (hasMonitors)
{
@@ -889,7 +931,7 @@ namespace epics {
flush();
}
if(sender.get()) {
if(likely(sender.get() != 0)) {
sender->lock();
try {
_lastMessageStartPosition = _sendBuffer->getPosition();
@@ -932,7 +974,7 @@ namespace epics {
Transport::shared_pointer ptr = obj->shared_from_this(); // hold reference
try{
obj->processReadCached(false, NONE, CA_MESSAGE_HEADER_SIZE, false);
obj->processReadCached(false, NONE, CA_MESSAGE_HEADER_SIZE);
} catch (...) {
printf("rcvThreadRunnner exception\n");
}

View File

@@ -117,6 +117,10 @@ namespace epics {
// noop
}
virtual void alignData(int alignment) {
_receiveBuffer->align(alignment);
}
virtual void startMessage(int8 command, int ensureCapacity);
virtual void endMessage();
@@ -137,6 +141,10 @@ namespace epics {
// noop
}
virtual void alignBuffer(int alignment) {
_sendBuffer->align(alignment);
}
/**
* Set ignore list.
* @param addresses list of ignored addresses.

View File

@@ -1,7 +1,5 @@
/* blockingUDPTransport.cpp
*
* Created on: Dec 20, 2010
* Author: Miha Vitorovic
/*
* blockingUDPTransport.cpp
*/
/* pvAccess */
@@ -28,6 +26,15 @@
using namespace epics::pvData;
using namespace std;
// TODO moved to some compiler_utils.h?
#if defined(__GNUC__)
#define likely(x) __builtin_expect (x, 1)
#define unlikely(x) __builtin_expect (x, 0)
#else
#define likely(x) (x)
#define unlikely(x) (x)
#endif
namespace epics {
namespace pvAccess {
@@ -57,7 +64,7 @@ namespace epics {
timeout.tv_sec = 1;
timeout.tv_usec = 0;
if (::setsockopt (_channel, SOL_SOCKET, SO_RCVTIMEO, (char*)&timeout, sizeof(timeout)) < 0)
if (unlikely(::setsockopt (_channel, SOL_SOCKET, SO_RCVTIMEO, (char*)&timeout, sizeof(timeout)) < 0))
{
char errStr[64];
epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
@@ -152,6 +159,8 @@ namespace epics {
}
void BlockingUDPTransport::endMessage() {
//we always (for now) send by packet, so no need for this here...
//alignBuffer(CA_ALIGNMENT);
_sendBuffer->putInt(
_lastMessageStartPosition+(sizeof(int16)+2),
_sendBuffer->getPosition()-_lastMessageStartPosition-CA_MESSAGE_HEADER_SIZE);
@@ -167,7 +176,7 @@ namespace epics {
try {
bool closed;
while(!_closed)
while(likely(!_closed))
{
_mutex.lock();
@@ -187,7 +196,7 @@ namespace epics {
_receiveBuffer->getRemaining(), 0, (sockaddr*)&fromAddress,
&addrStructSize);
if(bytesRead>0) {
if(likely(bytesRead>0)) {
// successfully got datagram
bool ignore = false;
if(_ignoredAddresses!=0)
@@ -210,7 +219,7 @@ namespace epics {
processBuffer(thisTransport, fromAddress, _receiveBuffer);
}
}
else if (bytesRead == -1) {
else if (unlikely(bytesRead == -1)) {
int socketError = SOCKERRNO;
@@ -254,7 +263,7 @@ namespace epics {
bool BlockingUDPTransport::processBuffer(Transport::shared_pointer const & thisTransport, osiSockAddr& fromAddress, ByteBuffer* receiveBuffer) {
// handle response(s)
while((int)receiveBuffer->getRemaining()>=CA_MESSAGE_HEADER_SIZE) {
while(likely((int)receiveBuffer->getRemaining()>=CA_MESSAGE_HEADER_SIZE)) {
//
// read header
//
@@ -263,7 +272,7 @@ namespace epics {
// second byte version - major/minor nibble
int8 magic = receiveBuffer->getByte();
int8 version = receiveBuffer->getByte();
if((magic != CA_MAGIC) || (((uint8_t)version) >> 4)!=CA_MAJOR_PROTOCOL_REVISION)
if(unlikely((magic != CA_MAGIC) || (((uint8_t)version) >> 4)!=CA_MAJOR_PROTOCOL_REVISION))
return false;
// only data for UDP
@@ -284,7 +293,7 @@ namespace epics {
int nextRequestPosition = receiveBuffer->getPosition() + payloadSize;
// payload size check
if(nextRequestPosition>(int)receiveBuffer->getLimit()) return false;
if(unlikely(nextRequestPosition>(int)receiveBuffer->getLimit())) return false;
// handle
_responseHandler->handleResponse(&fromAddress, thisTransport,
@@ -304,7 +313,7 @@ namespace epics {
buffer->flip();
int retval = sendto(_channel, buffer->getArray(),
buffer->getLimit(), 0, &(address.sa), sizeof(sockaddr));
if(retval<0)
if(unlikely(retval<0))
{
char errStr[64];
epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
@@ -324,7 +333,7 @@ namespace epics {
buffer->getLimit(), 0, &(_sendAddresses->at(i).sa),
sizeof(sockaddr));
{
if(retval<0)
if(unlikely(retval<0))
{
char errStr[64];
epicsSocketConvertErrnoToString(errStr, sizeof(errStr));
@@ -346,7 +355,7 @@ namespace epics {
osiSocklen_t intLen = sizeof(int);
int retval = getsockopt(_channel, SOL_SOCKET, SO_RCVBUF, (char *)&sockBufSize, &intLen);
if(retval<0)
if(unlikely(retval<0))
{
char errStr[64];
epicsSocketConvertErrnoToString(errStr, sizeof(errStr));

View File

@@ -268,6 +268,7 @@ public:
void setRecipient(const osiSockAddr& sendTo) {}
void startMessage(int8 command, int ensureCapacity) {}
void ensureBuffer(int size) {}
void alignBuffer(int alignment) {}
void flushSerializeBuffer() {}
};

View File

@@ -1143,7 +1143,7 @@ int main(int argc, char *argv[])
cout << "Done" << endl;
epicsThreadSleep ( 1.0 );
epicsThreadSleep ( 3.0 );
std::cout << "-----------------------------------------------------------------------" << std::endl;
epicsExitCallAtExits();
CDRMonitor::get().show(stdout, true);

View File

@@ -31,6 +31,9 @@ namespace epics {
virtual void ensureBuffer(int size) {
}
virtual void alignBuffer(int alignment) {
}
SerializableControlImpl() {
}
@@ -44,6 +47,9 @@ namespace epics {
virtual void ensureData(int size) {
}
virtual void alignData(int alignment) {
}
DeserializableControlImpl() {
}

View File

@@ -44,6 +44,7 @@ namespace epics {
virtual void verified(){};
virtual void enqueueSendRequest(TransportSender::shared_pointer const & sender){};
virtual void ensureData(int) {};
virtual void alignData(int) {};
virtual IntrospectionRegistry* getIntrospectionRegistry() {return NULL;};
private:
string _type;