minor cleanup

This commit is contained in:
Matej Sekoranja
2012-07-30 13:41:13 +02:00
parent 39d6a6bc19
commit e74f2cdb2a
7 changed files with 38 additions and 38 deletions

View File

@@ -108,9 +108,10 @@ namespace epics {
virtual std::size_t getSocketReceiveBufferSize() const;
virtual bool verify(epics::pvData::int32 timeoutMs) {
epics::pvData::Lock lock(_verifiedMutex);
return _verified;
// TODO !!!
return _verifiedEvent.wait(timeoutMs/1000.0);
//epics::pvData::Lock lock(_verifiedMutex);
//return _verified;
}
virtual void verified() {
@@ -123,11 +124,6 @@ namespace epics {
// noop
}
/**
* @param[in] timeout Timeout in seconds
*/
bool waitUntilVerified(double timeout);
virtual void flush(bool lastMessageCompleted);
virtual void startMessage(epics::pvData::int8 command, std::size_t ensureCapacity);
virtual void endMessage();
@@ -148,7 +144,7 @@ namespace epics {
virtual void setByteOrder(int byteOrder)
{
// TODO !!!
// not used this this implementation
}
SendQueueFlushStrategy getSendQueueFlushStrategy() {
@@ -741,10 +737,11 @@ namespace epics {
/**
* Verify transport. Server side is self-verified.
*/
void verify() {
TransportSender::shared_pointer transportSender = std::tr1::dynamic_pointer_cast<TransportSender>(shared_from_this());
enqueueSendRequest(transportSender);
verified();
virtual bool verify(epics::pvData::int32 timeoutMs) {
TransportSender::shared_pointer transportSender = std::tr1::dynamic_pointer_cast<TransportSender>(shared_from_this());
enqueueSendRequest(transportSender);
verified();
return true;
}
/**
@@ -887,7 +884,7 @@ namespace epics {
* Validate connection by sending a validation message request.
* @return <code>true</code> on success.
*/
bool validateConnection(BlockingServerTCPTransport::shared_pointer const & transport, const char* address);
bool validateConnection(Transport::shared_pointer const & transport, const char* address);
static void handleEventsRunner(void* param);
};

View File

@@ -211,9 +211,9 @@ namespace pvAccess {
} // while
}
bool BlockingTCPAcceptor::validateConnection(BlockingServerTCPTransport::shared_pointer const & transport, const char* address) {
bool BlockingTCPAcceptor::validateConnection(Transport::shared_pointer const & transport, const char* address) {
try {
transport->verify();
transport->verify(0);
return true;
} catch(...) {
LOG(logLevelDebug, "Validation of %s failed.", address);

View File

@@ -140,16 +140,18 @@ namespace epics {
// TODO tune buffer sizes?! Win32 defaults are 8k, which is OK
// create transport
// TODO introduce factory
transport = BlockingClientTCPTransport::create(
context, socket, responseHandler, _receiveBufferSize,
client, transportRevision, _beaconInterval, priority);
// verify
if(!transport->waitUntilVerified(3.0)) {
if(!transport->verify(3000)) {
LOG(
logLevelDebug,
"Connection to CA server %s failed to be validated, closing it.",
ipAddrStr);
std::ostringstream temp;
temp<<"Failed to verify TCP connection to '"<<ipAddrStr<<"'.";
THROW_BASE_EXCEPTION(temp.str().c_str());

View File

@@ -37,6 +37,7 @@ using std::max;
using std::min;
using std::ostringstream;
// TODO to be completely replaced by codec based implementation (see Java)
namespace epics {
namespace pvAccess {
@@ -305,10 +306,6 @@ namespace pvAccess {
return (size_t)sockBufSize;
}
bool BlockingTCPTransport::waitUntilVerified(double timeout) {
return _verifiedEvent.wait(timeout);
}
void BlockingTCPTransport::flush(bool lastMessageCompleted) {
// automatic end

View File

@@ -108,8 +108,13 @@ namespace epics {
// noop
}
// NOTE: this is not yet used for UDP
virtual void setByteOrder(int byteOrder) {
// TODO
// called from receive thread... or before processing
_receiveBuffer->setEndianess(byteOrder);
// sync?!
_sendBuffer->setEndianess(byteOrder);
}
virtual void enqueueSendRequest(TransportSender::shared_pointer const & sender);
@@ -280,12 +285,12 @@ namespace epics {
/**
* Receive buffer.
*/
epics::pvData::ByteBuffer* _receiveBuffer;
std::auto_ptr<epics::pvData::ByteBuffer> _receiveBuffer;
/**
* Send buffer.
*/
epics::pvData::ByteBuffer* _sendBuffer;
std::auto_ptr<epics::pvData::ByteBuffer> _sendBuffer;
/**
* Last message start position.

View File

@@ -73,9 +73,6 @@ namespace epics {
if (_sendAddresses) delete _sendAddresses;
if (_ignoredAddresses) delete _ignoredAddresses;
delete _receiveBuffer;
delete _sendBuffer;
}
void BlockingUDPTransport::start() {
@@ -128,13 +125,13 @@ namespace epics {
_sendBuffer->clear();
sender->lock();
try {
sender->send(_sendBuffer, this);
sender->send(_sendBuffer.get(), this);
sender->unlock();
endMessage();
if(!_sendToEnabled)
send(_sendBuffer);
send(_sendBuffer.get());
else
send(_sendBuffer, _sendTo);
send(_sendBuffer.get(), _sendTo);
} catch(...) {
sender->unlock();
}
@@ -162,6 +159,7 @@ namespace epics {
// object's own thread.
osiSockAddr fromAddress;
osiSocklen_t addrStructSize = sizeof(sockaddr);
Transport::shared_pointer thisTransport = shared_from_this();
try {
@@ -173,8 +171,6 @@ namespace epics {
// data ready to be read
_receiveBuffer->clear();
osiSocklen_t addrStructSize = sizeof(sockaddr);
int bytesRead = recvfrom(_channel, (char*)_receiveBuffer->getArray(),
_receiveBuffer->getRemaining(), 0, (sockaddr*)&fromAddress,
&addrStructSize);
@@ -182,7 +178,7 @@ namespace epics {
if(likely(bytesRead>0)) {
// successfully got datagram
bool ignore = false;
if(unlikely(_ignoredAddresses!=0))
if(likely(_ignoredAddresses!=0))
{
for(size_t i = 0; i <_ignoredAddresses->size(); i++)
{
@@ -199,7 +195,7 @@ namespace epics {
_receiveBuffer->flip();
processBuffer(thisTransport, fromAddress, _receiveBuffer);
processBuffer(thisTransport, fromAddress, _receiveBuffer.get());
}
}
else if (unlikely(bytesRead == -1)) {
@@ -255,12 +251,13 @@ namespace epics {
//
// first byte is CA_MAGIC
// second byte version - major/minor nibble
int8 magic = receiveBuffer->getByte();
int8 version = receiveBuffer->getByte();
if(unlikely(magic != CA_MAGIC))
return false;
// second byte version
int8 version = receiveBuffer->getByte();
// only data for UDP
int8 flags = receiveBuffer->getByte();
if (flags < 0)
@@ -285,7 +282,7 @@ namespace epics {
// handle
_responseHandler->handleResponse(&fromAddress, thisTransport,
version, command, payloadSize,
_receiveBuffer);
_receiveBuffer.get());
// set position (e.g. in case handler did not read all)
receiveBuffer->setPosition(nextRequestPosition);

View File

@@ -78,7 +78,9 @@ void SimpleChannelSearchManagerImpl::activate()
SimpleChannelSearchManagerImpl::~SimpleChannelSearchManagerImpl()
{
cancel();
// shared_from_this() is not allowed from destructor
// be sure to call cancel() first
// cancel();
}
void SimpleChannelSearchManagerImpl::cancel()