diff --git a/pvAccessApp/remote/blockingUDPTransport.cpp b/pvAccessApp/remote/blockingUDPTransport.cpp index 84aa0fd..1c46b8c 100644 --- a/pvAccessApp/remote/blockingUDPTransport.cpp +++ b/pvAccessApp/remote/blockingUDPTransport.cpp @@ -124,6 +124,7 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so "UDP socket %s failed to shutdown: %s.", inetAddressToString(_bindAddress).c_str(), sockErrBuf); } + epicsSocketDestroy ( _channel ); } break; case esscimqi_socketSigAlarmRequired: diff --git a/pvAccessApp/remote/codec.cpp b/pvAccessApp/remote/codec.cpp index 081e42a..9698e8c 100644 --- a/pvAccessApp/remote/codec.cpp +++ b/pvAccessApp/remote/codec.cpp @@ -54,7 +54,7 @@ namespace epics { //PRIVATE _storedPayloadSize(0), _storedPosition(0), _startPosition(0), _maxSendPayloadSize(0), - _lastMessageStartPosition(0),_lastSegmentedMessageType(0), + _lastMessageStartPosition(std::numeric_limits::max()),_lastSegmentedMessageType(0), _lastSegmentedMessageCommand(0), _nextMessagePayloadOffset(0), _byteOrderFlag(EPICS_BYTE_ORDER == EPICS_ENDIAN_BIG ? 0x80 : 0x00), _socketSendBufferSize(0) @@ -345,6 +345,10 @@ namespace epics { // do we already have requiredBytes available? std::size_t remainingBytes = _socketBuffer->getRemaining(); if (remainingBytes >= requiredBytes) { + LOG(logLevelTrace, + "AbstractCodec::readToBuffer requiredBytes: %u" + " <= remainingBytes: %d (threadId: %u)", + requiredBytes, remainingBytes); return true; } @@ -384,7 +388,7 @@ namespace epics { { LOG(logLevelTrace, - "AbstractCodec::before close (threadId: %u)", + "AbstractCodec::before close on bytesRead < 0 condition (threadId: %u)", epicsThreadGetIdSelf()); close(); @@ -689,6 +693,7 @@ namespace epics { if (_lastSegmentedMessageType == 0) { std::size_t flagsPosition = _lastMessageStartPosition + 2; + std::cout << "peek at " << flagsPosition << " " << _lastMessageStartPosition << std::endl; epics::pvData::int8 type = _sendBuffer->getByte(flagsPosition); // set first segment bit _sendBuffer->putByte(flagsPosition, (type | 0x10)); @@ -759,7 +764,7 @@ namespace epics { flush(false); } - + // assumes startMessage was called (or header is in place), because endMessage(true) is later called that peeks and sets _lastSegmentedMessageType void AbstractCodec::flushSerializeBuffer() { LOG(logLevelTrace, @@ -1120,7 +1125,7 @@ namespace epics { bool BlockingAbstractCodec::isOpen() { - LOG(logLevelTrace, "BlockingAbstractCodec::isOpen enter: (threadId: %u)", + LOG(logLevelTrace, "BlockingAbstractCodec::isOpen %d (threadId: %u)", _isOpen.get(), epicsThreadGetIdSelf()); return _isOpen.get(); @@ -1182,6 +1187,8 @@ namespace epics { " EXIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIT: (threadId: %u)", epicsThreadGetIdSelf()); + bac->_shutdownEvent.signal(); + } @@ -1215,9 +1222,7 @@ namespace epics { // wait read thread to die - //TODO epics join thread - //readThread.join(); // TODO timeout - //bac->_shutdownEvent.signal(); + bac->_shutdownEvent.wait(); // call internal destroy LOG(logLevelTrace, "XXXXXXXXXXXXXXXXXXXXXXXXXXXX" diff --git a/pvAccessApp/remote/codec.h b/pvAccessApp/remote/codec.h index 21146f9..bcc034c 100644 --- a/pvAccessApp/remote/codec.h +++ b/pvAccessApp/remote/codec.h @@ -813,7 +813,7 @@ namespace epics { TransportClient::shared_pointer const & client, epics::pvData::int8 remoteTransportRevision, float beaconInterval, - int16_t priority ); + int16_t priority); public: static shared_pointer create( diff --git a/pvAccessApp/remote/simpleChannelSearchManagerImpl.cpp b/pvAccessApp/remote/simpleChannelSearchManagerImpl.cpp index 773a2bc..f747bf5 100644 --- a/pvAccessApp/remote/simpleChannelSearchManagerImpl.cpp +++ b/pvAccessApp/remote/simpleChannelSearchManagerImpl.cpp @@ -107,13 +107,20 @@ void SimpleChannelSearchManagerImpl::registerSearchInstance(SearchInstance::shar if (m_canceled.get()) return; - Lock guard(m_channelMutex); - //overrides if already registered - m_channels[channel->getSearchInstanceID()] = channel; - - Lock guard2(m_userValueMutex); - int32_t& userValue = channel->getUserValue(); - userValue = 1; + bool immediateTrigger; + { + Lock guard(m_channelMutex); + //overrides if already registered + m_channels[channel->getSearchInstanceID()] = channel; + immediateTrigger = m_channels.size() == 1; + + Lock guard2(m_userValueMutex); + int32_t& userValue = channel->getUserValue(); + userValue = 1; + } + + if (immediateTrigger) + callback(); } void SimpleChannelSearchManagerImpl::unregisterSearchInstance(SearchInstance::shared_pointer const & channel) diff --git a/testApp/remote/testCodec.cpp b/testApp/remote/testCodec.cpp index 495735c..0897354 100644 --- a/testApp/remote/testCodec.cpp +++ b/testApp/remote/testCodec.cpp @@ -3004,6 +3004,11 @@ namespace epics { void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control) { + // after connection_closed_exception is thrown codec is no longer valid, + // however we want to do some tests and in order for memory checkers not to complain + // the following step is needed + memset((void*)buffer->getBuffer(), 0, buffer->getSize()); + throw connection_closed_exception( "expected test exception"); } diff --git a/testApp/remote/testServer.cpp b/testApp/remote/testServer.cpp index 90eba81..6f1dd11 100644 --- a/testApp/remote/testServer.cpp +++ b/testApp/remote/testServer.cpp @@ -1499,21 +1499,45 @@ public: } else if (m_channelName == "testSum") { - int a = pvArgument->getIntField("a")->get(); - int b = pvArgument->getIntField("b")->get(); + PVStructure::shared_pointer args( + (pvArgument->getStructure()->getID() == "uri:ev4:nt/2012/pwd:NTURI") ? + pvArgument->getStructureField("query") : + pvArgument + ); - FieldCreatePtr fieldCreate = getFieldCreate(); + const String helpText = + "Calculates a sum of two integer values.\n" + "Arguments:\n" + "\tint a\tfirst integer number\n" + "\tint b\tsecond integer number\n"; + if (handleHelp(args, m_channelRPCRequester, helpText)) + return; - StringArray fieldNames; - fieldNames.push_back("c"); - FieldConstPtrArray fields; - fields.push_back(fieldCreate->createScalar(pvInt)); - StructureConstPtr resultStructure = fieldCreate->createStructure(fieldNames, fields); + PVInt::shared_pointer pa = args->getSubField("a"); + PVInt::shared_pointer pb = args->getSubField("b"); + if (!pa || !pb) + { + PVStructure::shared_pointer nullPtr; + Status errorStatus(Status::STATUSTYPE_ERROR, "int a and int b arguments are required"); + m_channelRPCRequester->requestDone(errorStatus, nullPtr); + return; + } - PVStructure::shared_pointer result = getPVDataCreate()->createPVStructure(resultStructure); - result->getIntField("c")->put(a+b); + int a = pa->get(); + int b = pb->get(); - m_channelRPCRequester->requestDone(Status::Ok, result); + FieldCreatePtr fieldCreate = getFieldCreate(); + + StringArray fieldNames; + fieldNames.push_back("c"); + FieldConstPtrArray fields; + fields.push_back(fieldCreate->createScalar(pvInt)); + StructureConstPtr resultStructure = fieldCreate->createStructure(fieldNames, fields); + + PVStructure::shared_pointer result = getPVDataCreate()->createPVStructure(resultStructure); + result->getIntField("c")->put(a+b); + + m_channelRPCRequester->requestDone(Status::Ok, result); } else if (m_channelName.find("testServerShutdown") == 0)