diff --git a/pvAccessApp/remote/blockingTCP.h b/pvAccessApp/remote/blockingTCP.h index d098405..6398b86 100644 --- a/pvAccessApp/remote/blockingTCP.h +++ b/pvAccessApp/remote/blockingTCP.h @@ -108,9 +108,10 @@ namespace epics { virtual std::size_t getSocketReceiveBufferSize() const; virtual bool verify(epics::pvData::int32 timeoutMs) { - epics::pvData::Lock lock(_verifiedMutex); - return _verified; - // TODO !!! + return _verifiedEvent.wait(timeoutMs/1000.0); + + //epics::pvData::Lock lock(_verifiedMutex); + //return _verified; } virtual void verified() { @@ -123,11 +124,6 @@ namespace epics { // noop } - /** - * @param[in] timeout Timeout in seconds - */ - bool waitUntilVerified(double timeout); - virtual void flush(bool lastMessageCompleted); virtual void startMessage(epics::pvData::int8 command, std::size_t ensureCapacity); virtual void endMessage(); @@ -148,7 +144,7 @@ namespace epics { virtual void setByteOrder(int byteOrder) { - // TODO !!! + // not used this this implementation } SendQueueFlushStrategy getSendQueueFlushStrategy() { @@ -741,10 +737,11 @@ namespace epics { /** * Verify transport. Server side is self-verified. */ - void verify() { - TransportSender::shared_pointer transportSender = std::tr1::dynamic_pointer_cast(shared_from_this()); - enqueueSendRequest(transportSender); - verified(); + virtual bool verify(epics::pvData::int32 timeoutMs) { + TransportSender::shared_pointer transportSender = std::tr1::dynamic_pointer_cast(shared_from_this()); + enqueueSendRequest(transportSender); + verified(); + return true; } /** @@ -887,7 +884,7 @@ namespace epics { * Validate connection by sending a validation message request. * @return true on success. */ - bool validateConnection(BlockingServerTCPTransport::shared_pointer const & transport, const char* address); + bool validateConnection(Transport::shared_pointer const & transport, const char* address); static void handleEventsRunner(void* param); }; diff --git a/pvAccessApp/remote/blockingTCPAcceptor.cpp b/pvAccessApp/remote/blockingTCPAcceptor.cpp index 120b43b..365fe8f 100644 --- a/pvAccessApp/remote/blockingTCPAcceptor.cpp +++ b/pvAccessApp/remote/blockingTCPAcceptor.cpp @@ -211,9 +211,9 @@ namespace pvAccess { } // while } - bool BlockingTCPAcceptor::validateConnection(BlockingServerTCPTransport::shared_pointer const & transport, const char* address) { + bool BlockingTCPAcceptor::validateConnection(Transport::shared_pointer const & transport, const char* address) { try { - transport->verify(); + transport->verify(0); return true; } catch(...) { LOG(logLevelDebug, "Validation of %s failed.", address); diff --git a/pvAccessApp/remote/blockingTCPConnector.cpp b/pvAccessApp/remote/blockingTCPConnector.cpp index b43e413..6e71945 100644 --- a/pvAccessApp/remote/blockingTCPConnector.cpp +++ b/pvAccessApp/remote/blockingTCPConnector.cpp @@ -140,16 +140,18 @@ namespace epics { // TODO tune buffer sizes?! Win32 defaults are 8k, which is OK // create transport + // TODO introduce factory transport = BlockingClientTCPTransport::create( context, socket, responseHandler, _receiveBufferSize, client, transportRevision, _beaconInterval, priority); // verify - if(!transport->waitUntilVerified(3.0)) { + if(!transport->verify(3000)) { LOG( logLevelDebug, "Connection to CA server %s failed to be validated, closing it.", ipAddrStr); + std::ostringstream temp; temp<<"Failed to verify TCP connection to '"<setEndianess(byteOrder); + + // sync?! + _sendBuffer->setEndianess(byteOrder); } virtual void enqueueSendRequest(TransportSender::shared_pointer const & sender); @@ -280,12 +285,12 @@ namespace epics { /** * Receive buffer. */ - epics::pvData::ByteBuffer* _receiveBuffer; + std::auto_ptr _receiveBuffer; /** * Send buffer. */ - epics::pvData::ByteBuffer* _sendBuffer; + std::auto_ptr _sendBuffer; /** * Last message start position. diff --git a/pvAccessApp/remote/blockingUDPTransport.cpp b/pvAccessApp/remote/blockingUDPTransport.cpp index 5d99808..fac9ad3 100644 --- a/pvAccessApp/remote/blockingUDPTransport.cpp +++ b/pvAccessApp/remote/blockingUDPTransport.cpp @@ -73,9 +73,6 @@ namespace epics { if (_sendAddresses) delete _sendAddresses; if (_ignoredAddresses) delete _ignoredAddresses; - - delete _receiveBuffer; - delete _sendBuffer; } void BlockingUDPTransport::start() { @@ -128,13 +125,13 @@ namespace epics { _sendBuffer->clear(); sender->lock(); try { - sender->send(_sendBuffer, this); + sender->send(_sendBuffer.get(), this); sender->unlock(); endMessage(); if(!_sendToEnabled) - send(_sendBuffer); + send(_sendBuffer.get()); else - send(_sendBuffer, _sendTo); + send(_sendBuffer.get(), _sendTo); } catch(...) { sender->unlock(); } @@ -162,6 +159,7 @@ namespace epics { // object's own thread. osiSockAddr fromAddress; + osiSocklen_t addrStructSize = sizeof(sockaddr); Transport::shared_pointer thisTransport = shared_from_this(); try { @@ -173,8 +171,6 @@ namespace epics { // data ready to be read _receiveBuffer->clear(); - osiSocklen_t addrStructSize = sizeof(sockaddr); - int bytesRead = recvfrom(_channel, (char*)_receiveBuffer->getArray(), _receiveBuffer->getRemaining(), 0, (sockaddr*)&fromAddress, &addrStructSize); @@ -182,7 +178,7 @@ namespace epics { if(likely(bytesRead>0)) { // successfully got datagram bool ignore = false; - if(unlikely(_ignoredAddresses!=0)) + if(likely(_ignoredAddresses!=0)) { for(size_t i = 0; i <_ignoredAddresses->size(); i++) { @@ -199,7 +195,7 @@ namespace epics { _receiveBuffer->flip(); - processBuffer(thisTransport, fromAddress, _receiveBuffer); + processBuffer(thisTransport, fromAddress, _receiveBuffer.get()); } } else if (unlikely(bytesRead == -1)) { @@ -255,12 +251,13 @@ namespace epics { // // first byte is CA_MAGIC - // second byte version - major/minor nibble int8 magic = receiveBuffer->getByte(); - int8 version = receiveBuffer->getByte(); if(unlikely(magic != CA_MAGIC)) return false; + // second byte version + int8 version = receiveBuffer->getByte(); + // only data for UDP int8 flags = receiveBuffer->getByte(); if (flags < 0) @@ -285,7 +282,7 @@ namespace epics { // handle _responseHandler->handleResponse(&fromAddress, thisTransport, version, command, payloadSize, - _receiveBuffer); + _receiveBuffer.get()); // set position (e.g. in case handler did not read all) receiveBuffer->setPosition(nextRequestPosition); diff --git a/pvAccessApp/remote/simpleChannelSearchManagerImpl.cpp b/pvAccessApp/remote/simpleChannelSearchManagerImpl.cpp index 2af1550..11c5d23 100644 --- a/pvAccessApp/remote/simpleChannelSearchManagerImpl.cpp +++ b/pvAccessApp/remote/simpleChannelSearchManagerImpl.cpp @@ -78,7 +78,9 @@ void SimpleChannelSearchManagerImpl::activate() SimpleChannelSearchManagerImpl::~SimpleChannelSearchManagerImpl() { - cancel(); + // shared_from_this() is not allowed from destructor + // be sure to call cancel() first + // cancel(); } void SimpleChannelSearchManagerImpl::cancel()