diff --git a/pvAccessApp/client/pvAccess.h b/pvAccessApp/client/pvAccess.h index ee3d628..5514610 100644 --- a/pvAccessApp/client/pvAccess.h +++ b/pvAccessApp/client/pvAccess.h @@ -878,13 +878,13 @@ namespace pvAccess { /** * Interface for locating channel providers. */ - class epicsShareClass ChannelAccess : private epics::pvData::NoDefaultMethods { + class epicsShareClass ChannelProviderRegistry : private epics::pvData::NoDefaultMethods { public: - POINTER_DEFINITIONS(ChannelAccess); + POINTER_DEFINITIONS(ChannelProviderRegistry); typedef std::vector stringVector_t; - virtual ~ChannelAccess() {}; + virtual ~ChannelProviderRegistry() {}; /** * Get a shared instance of the provider with the specified name. @@ -907,7 +907,7 @@ namespace pvAccess { virtual std::auto_ptr getProviderNames() = 0; }; - epicsShareExtern ChannelAccess::shared_pointer getChannelAccess(); + epicsShareExtern ChannelProviderRegistry::shared_pointer getChannelProviderRegistry(); epicsShareExtern void registerChannelProviderFactory(ChannelProviderFactory::shared_pointer const & channelProviderFactory); epicsShareExtern void unregisterChannelProviderFactory(ChannelProviderFactory::shared_pointer const & channelProviderFactory); diff --git a/pvAccessApp/factory/ChannelAccessFactory.cpp b/pvAccessApp/factory/ChannelAccessFactory.cpp index 5e11b71..9b74123 100644 --- a/pvAccessApp/factory/ChannelAccessFactory.cpp +++ b/pvAccessApp/factory/ChannelAccessFactory.cpp @@ -20,7 +20,7 @@ using namespace epics::pvData; namespace epics { namespace pvAccess { -static ChannelAccess::shared_pointer channelAccess; +static ChannelProviderRegistry::shared_pointer ChannelProviderRegistry; static Mutex channelProviderMutex; @@ -28,7 +28,7 @@ typedef std::map ChannelProvider static ChannelProviderFactoryMap channelProviders; -class ChannelAccessImpl : public ChannelAccess { +class ChannelProviderRegistryImpl : public ChannelProviderRegistry { public: ChannelProvider::shared_pointer getProvider(String const & _providerName) { @@ -68,14 +68,14 @@ class ChannelAccessImpl : public ChannelAccess { } }; -ChannelAccess::shared_pointer getChannelAccess() { +ChannelProviderRegistry::shared_pointer getChannelProviderRegistry() { static Mutex mutex; Lock guard(mutex); - if(channelAccess.get()==0){ - channelAccess.reset(new ChannelAccessImpl()); + if(ChannelProviderRegistry.get()==0){ + ChannelProviderRegistry.reset(new ChannelProviderRegistryImpl()); } - return channelAccess; + return ChannelProviderRegistry; } void registerChannelProviderFactory(ChannelProviderFactory::shared_pointer const & channelProviderFactory) { diff --git a/pvAccessApp/remoteClient/clientContextImpl.cpp b/pvAccessApp/remoteClient/clientContextImpl.cpp index e14ee2c..a1d7a3b 100644 --- a/pvAccessApp/remoteClient/clientContextImpl.cpp +++ b/pvAccessApp/remoteClient/clientContextImpl.cpp @@ -106,11 +106,41 @@ namespace epics { static Status channelNotConnected; static Status channelDestroyed; static Status otherRequestPendingStatus; + static Status invalidPutStructureStatus; + static Status invalidPutArrayStatus; + static Status invalidBitSetLengthStatus; static Status pvRequestNull; static PVStructure::shared_pointer nullPVStructure; + static Structure::shared_pointer nullStructure; static BitSet::shared_pointer nullBitSet; + static BitSet::shared_pointer createBitSetFor( + PVStructure::shared_pointer const & pvStructure, + BitSet::shared_pointer const & existingBitSet) + { + int pvStructureSize = pvStructure->getNumberFields(); + if (existingBitSet.get() && static_cast(existingBitSet->size()) >= pvStructureSize) + { + // clear existing BitSet + // also necessary if larger BitSet is reused + existingBitSet->clear(); + return existingBitSet; + } + else + return BitSet::shared_pointer(new BitSet(pvStructureSize)); + } + + static PVField::shared_pointer reuseOrCreatePVField( + Field::const_shared_pointer const & field, + PVField::shared_pointer const & existingPVField) + { + if (existingPVField.get() && *field == *existingPVField->getField()) + return existingPVField; + else + return pvDataCreate->createPVField(field); + } + protected: ChannelImpl::shared_pointer m_channel; @@ -134,6 +164,8 @@ namespace epics { bool m_destroyed; bool m_initialized; + AtomicBoolean m_lastRequest; + AtomicBoolean m_subscribed; virtual ~BaseRequestImpl() {}; @@ -189,9 +221,8 @@ namespace epics { return m_ioid; } - virtual bool initResponse(Transport::shared_pointer const & transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, const Status& status) = 0; - virtual bool destroyResponse(Transport::shared_pointer const & transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, const Status& status) = 0; - virtual bool normalResponse(Transport::shared_pointer const & transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, const Status& status) = 0; + virtual void initResponse(Transport::shared_pointer const & transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, const Status& status) = 0; + virtual void normalResponse(Transport::shared_pointer const & transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, const Status& status) = 0; virtual void response(Transport::shared_pointer const & transport, int8 version, ByteBuffer* payloadBuffer) { transport->ensureData(1); @@ -218,18 +249,22 @@ namespace epics { initResponse(transport, version, payloadBuffer, qos, m_status); } - else if (qos & QOS_DESTROY) - { - m_mutex.lock(); - m_initialized = false; - m_mutex.unlock(); - - if (!destroyResponse(transport, version, payloadBuffer, qos, m_status)) - destroy(); - } else { + bool destroyReq = false; + + if (qos & QOS_DESTROY) + { + m_mutex.lock(); + m_initialized = false; + destroyReq = true; + m_mutex.unlock(); + } + normalResponse(transport, version, payloadBuffer, qos, m_status); + + if (destroyReq) + destroy(); } } catch (std::exception &e) { @@ -260,6 +295,10 @@ namespace epics { virtual void destroy() { destroy(false); } + + virtual void lastRequest() { + m_lastRequest.set(); + } virtual void destroy(bool createRequestFailed) { @@ -351,27 +390,14 @@ namespace epics { Status BaseRequestImpl::channelNotConnected = Status(Status::STATUSTYPE_ERROR, "channel not connected"); Status BaseRequestImpl::channelDestroyed = Status(Status::STATUSTYPE_ERROR, "channel destroyed"); Status BaseRequestImpl::otherRequestPendingStatus = Status(Status::STATUSTYPE_ERROR, "other request pending"); + Status BaseRequestImpl::invalidPutStructureStatus = Status(Status::STATUSTYPE_ERROR, "incompatible put structure"); + Status BaseRequestImpl::invalidPutArrayStatus = Status(Status::STATUSTYPE_ERROR, "incompatible put array"); + Status BaseRequestImpl::invalidBitSetLengthStatus = Status(Status::STATUSTYPE_ERROR, "invalid bit-set length"); Status BaseRequestImpl::pvRequestNull = Status(Status::STATUSTYPE_ERROR, "pvRequest == 0"); PVStructure::shared_pointer BaseRequestImpl::nullPVStructure; BitSet::shared_pointer BaseRequestImpl::nullBitSet; - - static BitSet::shared_pointer createBitSetFor(PVStructure::shared_pointer const & pvStructure, BitSet::shared_pointer const & existingBitSet) - { - int pvStructureSize = pvStructure->getNumberFields(); - if (existingBitSet.get() && static_cast(existingBitSet->size()) >= pvStructureSize) - { - // clear existing BitSet - // also necessary if larger BitSet is reused - existingBitSet->clear(); - return existingBitSet; - } - else - return BitSet::shared_pointer(new BitSet(pvStructureSize)); - } - - PVACCESS_REFCOUNT_MONITOR_DEFINE(channelProcess); class ChannelProcessRequestImpl : @@ -445,38 +471,31 @@ namespace epics { stopRequest(); } - virtual bool destroyResponse(Transport::shared_pointer const & /*transport*/, int8 /*version*/, ByteBuffer* /*payloadBuffer*/, int8 /*qos*/, const Status& status) { - EXCEPTION_GUARD(m_callback->processDone(status)); - return true; - } - - virtual bool initResponse(Transport::shared_pointer const & /*transport*/, int8 /*version*/, ByteBuffer* /*payloadBuffer*/, int8 /*qos*/, const Status& status) { + virtual void initResponse(Transport::shared_pointer const & /*transport*/, int8 /*version*/, ByteBuffer* /*payloadBuffer*/, int8 /*qos*/, const Status& status) { ChannelProcess::shared_pointer thisPtr = dynamic_pointer_cast(shared_from_this()); EXCEPTION_GUARD(m_callback->channelProcessConnect(status, thisPtr)); - return true; } - virtual bool normalResponse(Transport::shared_pointer const & /*transport*/, int8 /*version*/, ByteBuffer* /*payloadBuffer*/, int8 /*qos*/, const Status& status) { - EXCEPTION_GUARD(m_callback->processDone(status)); - return true; + virtual void normalResponse(Transport::shared_pointer const & /*transport*/, int8 /*version*/, ByteBuffer* /*payloadBuffer*/, int8 /*qos*/, const Status& status) { + EXCEPTION_GUARD(m_callback->processDone(status, shared_from_this())); } - virtual void process(bool lastRequest) + virtual void process() { { Lock guard(m_mutex); if (m_destroyed) { - EXCEPTION_GUARD(m_callback->processDone(destroyedStatus)); + EXCEPTION_GUARD(m_callback->processDone(destroyedStatus, shared_from_this())); return; } if (!m_initialized) { - EXCEPTION_GUARD(m_callback->processDone(notInitializedStatus)); + EXCEPTION_GUARD(m_callback->processDone(notInitializedStatus, shared_from_this())); return; } } - if (!startRequest(lastRequest ? QOS_DESTROY : QOS_DEFAULT)) { - EXCEPTION_GUARD(m_callback->processDone(otherRequestPendingStatus)); + if (!startRequest(m_lastRequest.get() ? QOS_DESTROY : QOS_DEFAULT)) { + EXCEPTION_GUARD(m_callback->processDone(otherRequestPendingStatus, shared_from_this())); return; } @@ -484,7 +503,7 @@ namespace epics { m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this()); } catch (std::runtime_error &rte) { stopRequest(); - EXCEPTION_GUARD(m_callback->processDone(channelNotConnected)); + EXCEPTION_GUARD(m_callback->processDone(channelNotConnected, shared_from_this())); } } @@ -498,6 +517,11 @@ namespace epics { BaseRequestImpl::destroy(); } + virtual void lastRequest() + { + BaseRequestImpl::lastRequest(); + } + virtual void lock() { // noop } @@ -542,7 +566,7 @@ namespace epics { if (m_pvRequest == 0) { ChannelGet::shared_pointer thisPointer = dynamic_pointer_cast(shared_from_this()); - EXCEPTION_GUARD(m_channelGetRequester->channelGetConnect(pvRequestNull, thisPointer, nullPVStructure, nullBitSet)); + EXCEPTION_GUARD(m_channelGetRequester->channelGetConnect(pvRequestNull, thisPointer, nullStructure)); return; } @@ -555,7 +579,7 @@ namespace epics { resubscribeSubscription(m_channel->checkDestroyedAndGetTransport()); } catch (std::runtime_error &rte) { ChannelGet::shared_pointer thisPointer = dynamic_pointer_cast(shared_from_this()); - EXCEPTION_GUARD(m_channelGetRequester->channelGetConnect(channelDestroyed, thisPointer, nullPVStructure, nullBitSet)); + EXCEPTION_GUARD(m_channelGetRequester->channelGetConnect(channelDestroyed, thisPointer, nullStructure)); BaseRequestImpl::destroy(true); } } @@ -601,19 +625,12 @@ namespace epics { stopRequest(); } - virtual bool destroyResponse(Transport::shared_pointer const & transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, const Status& status) { - // data available - if (qos & QOS_GET) - return normalResponse(transport, version, payloadBuffer, qos, status); - return true; - } - - virtual bool initResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) { + virtual void initResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) { if (!status.isSuccess()) { ChannelGet::shared_pointer thisPointer = dynamic_pointer_cast(shared_from_this()); - EXCEPTION_GUARD(m_channelGetRequester->channelGetConnect(status, thisPointer, nullPVStructure, nullBitSet)); - return true; + EXCEPTION_GUARD(m_channelGetRequester->channelGetConnect(status, thisPointer, nullStructure)); + return; } // create data and its bitSet @@ -625,18 +642,17 @@ namespace epics { // notify ChannelGet::shared_pointer thisChannelGet = dynamic_pointer_cast(shared_from_this()); - EXCEPTION_GUARD(m_channelGetRequester->channelGetConnect(status, thisChannelGet, m_structure, m_bitSet)); - return true; + EXCEPTION_GUARD(m_channelGetRequester->channelGetConnect(status, thisChannelGet, m_structure->getStructure())); } - virtual bool normalResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) { + virtual void normalResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) { MB_POINT(channelGet, 8, "client channelGet->deserialize (start)"); if (!status.isSuccess()) { - EXCEPTION_GUARD(m_channelGetRequester->getDone(status)); - return true; + EXCEPTION_GUARD(m_channelGetRequester->getDone(status, shared_from_this(), nullPVStructure, nullBitSet)); + return; } // deserialize bitSet and data @@ -648,11 +664,10 @@ namespace epics { MB_POINT(channelGet, 9, "client channelGet->deserialize (end), just before channelGet->getDone() is called"); - EXCEPTION_GUARD(m_channelGetRequester->getDone(status)); - return true; + EXCEPTION_GUARD(m_channelGetRequester->getDone(status, shared_from_this(), m_structure, m_bitSet)); } - virtual void get(bool lastRequest) { + virtual void get() { { MB_INC_AUTO_ID(channelGet); @@ -660,11 +675,11 @@ namespace epics { Lock guard(m_mutex); if (m_destroyed) { - EXCEPTION_GUARD(m_channelGetRequester->getDone(destroyedStatus)); + EXCEPTION_GUARD(m_channelGetRequester->getDone(destroyedStatus, shared_from_this(), nullPVStructure, nullBitSet)); return; } if (!m_initialized) { - EXCEPTION_GUARD(m_channelGetRequester->getDone(notInitializedStatus)); + EXCEPTION_GUARD(m_channelGetRequester->getDone(notInitializedStatus, shared_from_this(), nullPVStructure, nullBitSet)); return; } } @@ -681,8 +696,8 @@ namespace epics { return; } */ - if (!startRequest(lastRequest ? QOS_DESTROY | QOS_GET : QOS_DEFAULT)) { - EXCEPTION_GUARD(m_channelGetRequester->getDone(otherRequestPendingStatus)); + if (!startRequest(m_lastRequest.get() ? QOS_DESTROY | QOS_GET : QOS_DEFAULT)) { + EXCEPTION_GUARD(m_channelGetRequester->getDone(otherRequestPendingStatus, shared_from_this(), nullPVStructure, nullBitSet)); return; } @@ -691,7 +706,7 @@ namespace epics { //TODO bulk hack m_channel->checkAndGetTransport()->enqueueOnlySendRequest(thisSender); } catch (std::runtime_error &rte) { stopRequest(); - EXCEPTION_GUARD(m_channelGetRequester->getDone(channelNotConnected)); + EXCEPTION_GUARD(m_channelGetRequester->getDone(channelNotConnected, shared_from_this(), nullPVStructure, nullBitSet)); } } @@ -705,6 +720,11 @@ namespace epics { BaseRequestImpl::destroy(); } + virtual void lastRequest() + { + BaseRequestImpl::lastRequest(); + } + virtual void lock() { m_structureMutex.lock(); @@ -737,8 +757,13 @@ namespace epics { PVStructure::shared_pointer m_pvRequest; + // get structure container PVStructure::shared_pointer m_structure; BitSet::shared_pointer m_bitSet; + + // put reference store + PVStructure::shared_pointer m_pvPutStructure; + BitSet::shared_pointer m_pvPutBitSet; Mutex m_structureMutex; @@ -755,7 +780,7 @@ namespace epics { if (m_pvRequest == 0) { ChannelPut::shared_pointer thisPointer = dynamic_pointer_cast(shared_from_this()); - EXCEPTION_GUARD(m_channelPutRequester->channelPutConnect(pvRequestNull, thisPointer, nullPVStructure, nullBitSet)); + EXCEPTION_GUARD(m_channelPutRequester->channelPutConnect(pvRequestNull, thisPointer, nullStructure)); return; } @@ -768,7 +793,7 @@ namespace epics { resubscribeSubscription(m_channel->checkDestroyedAndGetTransport()); } catch (std::runtime_error &rte) { ChannelPut::shared_pointer thisPointer = dynamic_pointer_cast(shared_from_this()); - EXCEPTION_GUARD(m_channelPutRequester->channelPutConnect(channelDestroyed, thisPointer, nullPVStructure, nullBitSet)); + EXCEPTION_GUARD(m_channelPutRequester->channelPutConnect(channelDestroyed, thisPointer, nullStructure)); BaseRequestImpl::destroy(true); } } @@ -811,25 +836,24 @@ namespace epics { { // no need to lock here, since it is already locked via TransportSender IF //Lock lock(m_structureMutex); - m_bitSet->serialize(buffer, control); - m_structure->serialize(buffer, control, m_bitSet.get()); + m_pvPutBitSet->serialize(buffer, control); + m_pvPutStructure->serialize(buffer, control, m_pvPutBitSet.get()); + + // release references + m_pvPutBitSet.reset(); + m_pvPutStructure.reset(); } } stopRequest(); } - virtual bool destroyResponse(Transport::shared_pointer const & /*transport*/, int8 /*version*/, ByteBuffer* /*payloadBuffer*/, int8 /*qos*/, const Status& status) { - EXCEPTION_GUARD(m_channelPutRequester->putDone(status)); - return true; - } - - virtual bool initResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) { + virtual void initResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) { if (!status.isSuccess()) { ChannelPut::shared_pointer thisChannelPut = dynamic_pointer_cast(shared_from_this()); - EXCEPTION_GUARD(m_channelPutRequester->channelPutConnect(status, thisChannelPut, nullPVStructure, nullBitSet)); - return true; + EXCEPTION_GUARD(m_channelPutRequester->channelPutConnect(status, thisChannelPut, nullStructure)); + return; } // create data and its bitSet @@ -841,31 +865,29 @@ namespace epics { // notify ChannelPut::shared_pointer thisChannelPut = dynamic_pointer_cast(shared_from_this()); - EXCEPTION_GUARD(m_channelPutRequester->channelPutConnect(status, thisChannelPut, m_structure, m_bitSet)); - return true; + EXCEPTION_GUARD(m_channelPutRequester->channelPutConnect(status, thisChannelPut, m_structure->getStructure())); } - virtual bool normalResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 qos, const Status& status) { + virtual void normalResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 qos, const Status& status) { if (qos & QOS_GET) { if (!status.isSuccess()) { - EXCEPTION_GUARD(m_channelPutRequester->getDone(status)); - return true; + EXCEPTION_GUARD(m_channelPutRequester->getDone(status, shared_from_this(), nullPVStructure, nullBitSet)); + return; } { Lock lock(m_structureMutex); - m_structure->deserialize(payloadBuffer, transport.get()); + m_bitSet->deserialize(payloadBuffer, transport.get()); + m_structure->deserialize(payloadBuffer, transport.get(), m_bitSet.get()); } - EXCEPTION_GUARD(m_channelPutRequester->getDone(status)); - return true; + EXCEPTION_GUARD(m_channelPutRequester->getDone(status, shared_from_this(), m_structure, m_bitSet)); } else { - EXCEPTION_GUARD(m_channelPutRequester->putDone(status)); - return true; + EXCEPTION_GUARD(m_channelPutRequester->putDone(status, shared_from_this())); } } @@ -874,17 +896,17 @@ namespace epics { { Lock guard(m_mutex); if (m_destroyed) { - EXCEPTION_GUARD(m_channelPutRequester->getDone(destroyedStatus)); + EXCEPTION_GUARD(m_channelPutRequester->getDone(destroyedStatus, shared_from_this(), nullPVStructure, nullBitSet)); return; } if (!m_initialized) { - EXCEPTION_GUARD(m_channelPutRequester->getDone(notInitializedStatus)); + EXCEPTION_GUARD(m_channelPutRequester->getDone(notInitializedStatus, shared_from_this(), nullPVStructure, nullBitSet)); return; } } - if (!startRequest(QOS_GET)) { - EXCEPTION_GUARD(m_channelPutRequester->getDone(otherRequestPendingStatus)); + if (!startRequest(m_lastRequest.get() ? QOS_GET | QOS_DESTROY : QOS_GET)) { + EXCEPTION_GUARD(m_channelPutRequester->getDone(otherRequestPendingStatus, shared_from_this(), nullPVStructure, nullBitSet)); return; } @@ -893,34 +915,50 @@ namespace epics { m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this()); } catch (std::runtime_error &rte) { stopRequest(); - EXCEPTION_GUARD(m_channelPutRequester->getDone(channelNotConnected)); + EXCEPTION_GUARD(m_channelPutRequester->getDone(channelNotConnected, shared_from_this(), nullPVStructure, nullBitSet)); } } - virtual void put(bool lastRequest) { + virtual void put(PVStructure::shared_pointer const & pvPutStructure, BitSet::shared_pointer const & pvPutBitSet) { { Lock guard(m_mutex); if (m_destroyed) { - m_channelPutRequester->putDone(destroyedStatus); + m_channelPutRequester->putDone(destroyedStatus, shared_from_this()); return; } if (!m_initialized) { - EXCEPTION_GUARD(m_channelPutRequester->putDone(notInitializedStatus)); + EXCEPTION_GUARD(m_channelPutRequester->putDone(notInitializedStatus, shared_from_this())); return; } } - if (!startRequest(lastRequest ? QOS_DESTROY : QOS_DEFAULT)) { - m_channelPutRequester->putDone(otherRequestPendingStatus); + if (!(*m_structure->getStructure() == *pvPutStructure->getStructure())) + { + EXCEPTION_GUARD(m_channelPutRequester->putDone(invalidPutStructureStatus, shared_from_this())); + return; + } + + if (pvPutBitSet->size() < m_bitSet->size()) + { + EXCEPTION_GUARD(m_channelPutRequester->putDone(invalidBitSetLengthStatus, shared_from_this())); + return; + } + + if (!startRequest(m_lastRequest.get() ? QOS_DESTROY : QOS_DEFAULT)) { + m_channelPutRequester->putDone(otherRequestPendingStatus, shared_from_this()); return; } try { + lock(); + m_pvPutStructure = pvPutStructure; + m_pvPutBitSet = pvPutBitSet; + unlock(); m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this()); } catch (std::runtime_error &rte) { stopRequest(); - EXCEPTION_GUARD(m_channelPutRequester->putDone(channelNotConnected)); + EXCEPTION_GUARD(m_channelPutRequester->putDone(channelNotConnected, shared_from_this())); } } @@ -934,6 +972,11 @@ namespace epics { BaseRequestImpl::destroy(); } + virtual void lastRequest() + { + BaseRequestImpl::lastRequest(); + } + virtual void lock() { m_structureMutex.lock(); @@ -963,9 +1006,18 @@ namespace epics { PVStructure::shared_pointer m_pvRequest; + // put data container PVStructure::shared_pointer m_putData; + BitSet::shared_pointer m_putDataBitSet; + + // get data container PVStructure::shared_pointer m_getData; + BitSet::shared_pointer m_getDataBitSet; + // putGet reference store + PVStructure::shared_pointer m_putPutData; + BitSet::shared_pointer m_putPutDataBitSet; + Mutex m_structureMutex; ChannelPutGetImpl(ChannelImpl::shared_pointer const & channel, ChannelPutGetRequester::shared_pointer const & channelPutGetRequester, PVStructure::shared_pointer const & pvRequest) : @@ -981,7 +1033,7 @@ namespace epics { if (m_pvRequest == 0) { ChannelPutGet::shared_pointer thisPointer = dynamic_pointer_cast(shared_from_this()); - EXCEPTION_GUARD(m_channelPutGetRequester->channelPutGetConnect(pvRequestNull, thisPointer, nullPVStructure, nullPVStructure)); + EXCEPTION_GUARD(m_channelPutGetRequester->channelPutGetConnect(pvRequestNull, thisPointer, nullStructure, nullStructure)); return; } @@ -991,7 +1043,7 @@ namespace epics { resubscribeSubscription(m_channel->checkDestroyedAndGetTransport()); } catch (std::runtime_error &rte) { ChannelPutGet::shared_pointer thisPointer = dynamic_pointer_cast(shared_from_this()); - EXCEPTION_GUARD(m_channelPutGetRequester->channelPutGetConnect(channelDestroyed, thisPointer, nullPVStructure, nullPVStructure)); + EXCEPTION_GUARD(m_channelPutGetRequester->channelPutGetConnect(channelDestroyed, thisPointer, nullStructure, nullStructure)); BaseRequestImpl::destroy(true); } } @@ -1038,118 +1090,134 @@ namespace epics { { // no need to lock here, since it is already locked via TransportSender IF //Lock lock(m_structureMutex); - m_putData->serialize(buffer, control); + m_putPutDataBitSet->serialize(buffer, control); + m_putPutData->serialize(buffer, control, m_putPutDataBitSet.get()); + + // release references + m_putPutDataBitSet.reset(); + m_putPutData.reset(); } } stopRequest(); } - virtual bool destroyResponse(Transport::shared_pointer const & transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, const Status& status) { - // data available - // TODO we need a flag here... - return normalResponse(transport, version, payloadBuffer, qos, status); - } - - virtual bool initResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) { + virtual void initResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) { if (!status.isSuccess()) { ChannelPutGet::shared_pointer thisChannelPutGet = dynamic_pointer_cast(shared_from_this()); - EXCEPTION_GUARD(m_channelPutGetRequester->channelPutGetConnect(status, thisChannelPutGet, nullPVStructure, nullPVStructure)); - return true; + EXCEPTION_GUARD(m_channelPutGetRequester->channelPutGetConnect(status, thisChannelPutGet, nullStructure, nullStructure)); + return; } { Lock lock(m_structureMutex); m_putData = SerializationHelper::deserializeStructureAndCreatePVStructure(payloadBuffer, transport.get()); + m_putDataBitSet = createBitSetFor(m_putData, m_putDataBitSet); m_getData = SerializationHelper::deserializeStructureAndCreatePVStructure(payloadBuffer, transport.get()); + m_getDataBitSet = createBitSetFor(m_getData, m_getDataBitSet); } // notify ChannelPutGet::shared_pointer thisChannelPutGet = dynamic_pointer_cast(shared_from_this()); - EXCEPTION_GUARD(m_channelPutGetRequester->channelPutGetConnect(status, thisChannelPutGet, m_putData, m_getData)); - return true; + EXCEPTION_GUARD(m_channelPutGetRequester->channelPutGetConnect(status, thisChannelPutGet, m_putData->getStructure(), m_getData->getStructure())); } - virtual bool normalResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 qos, const Status& status) { + virtual void normalResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 qos, const Status& status) { if (qos & QOS_GET) { if (!status.isSuccess()) { - EXCEPTION_GUARD(m_channelPutGetRequester->getGetDone(status)); - return true; + EXCEPTION_GUARD(m_channelPutGetRequester->getGetDone(status, shared_from_this(), nullPVStructure, nullBitSet)); + return; } { Lock lock(m_structureMutex); // deserialize get data - m_getData->deserialize(payloadBuffer, transport.get()); + m_getDataBitSet->deserialize(payloadBuffer, transport.get()); + m_getData->deserialize(payloadBuffer, transport.get(), m_getDataBitSet.get()); } - EXCEPTION_GUARD(m_channelPutGetRequester->getGetDone(status)); - return true; + EXCEPTION_GUARD(m_channelPutGetRequester->getGetDone(status, shared_from_this(), m_getData, m_getDataBitSet)); } else if (qos & QOS_GET_PUT) { if (!status.isSuccess()) { - EXCEPTION_GUARD(m_channelPutGetRequester->getPutDone(status)); - return true; + EXCEPTION_GUARD(m_channelPutGetRequester->getPutDone(status, shared_from_this(), nullPVStructure, nullBitSet)); + return; } { Lock lock(m_structureMutex); // deserialize put data - m_putData->deserialize(payloadBuffer, transport.get()); + m_putDataBitSet->deserialize(payloadBuffer, transport.get()); + m_putData->deserialize(payloadBuffer, transport.get(), m_putDataBitSet.get()); } - EXCEPTION_GUARD(m_channelPutGetRequester->getPutDone(status)); - return true; + EXCEPTION_GUARD(m_channelPutGetRequester->getPutDone(status, shared_from_this(), m_putData, m_putDataBitSet)); } else { if (!status.isSuccess()) { - EXCEPTION_GUARD(m_channelPutGetRequester->putGetDone(status)); - return true; + EXCEPTION_GUARD(m_channelPutGetRequester->putGetDone(status, shared_from_this(), nullPVStructure, nullBitSet)); + return; } { Lock lock(m_structureMutex); // deserialize data - m_getData->deserialize(payloadBuffer, transport.get()); + m_getDataBitSet->deserialize(payloadBuffer, transport.get()); + m_getData->deserialize(payloadBuffer, transport.get(), m_getDataBitSet.get()); } - EXCEPTION_GUARD(m_channelPutGetRequester->putGetDone(status)); - return true; + EXCEPTION_GUARD(m_channelPutGetRequester->putGetDone(status, shared_from_this(), m_getData, m_getDataBitSet)); } } - virtual void putGet(bool lastRequest) { + virtual void putGet(PVStructure::shared_pointer const & pvPutStructure, BitSet::shared_pointer const & bitSet) { { Lock guard(m_mutex); if (m_destroyed) { - EXCEPTION_GUARD(m_channelPutGetRequester->putGetDone(destroyedStatus)); + EXCEPTION_GUARD(m_channelPutGetRequester->putGetDone(destroyedStatus, shared_from_this(), nullPVStructure, nullBitSet)); return; } if (!m_initialized) { - EXCEPTION_GUARD(m_channelPutGetRequester->putGetDone(notInitializedStatus)); + EXCEPTION_GUARD(m_channelPutGetRequester->putGetDone(notInitializedStatus, shared_from_this(), nullPVStructure, nullBitSet)); return; } } - if (!startRequest(lastRequest ? QOS_DESTROY : QOS_DEFAULT)) { - EXCEPTION_GUARD(m_channelPutGetRequester->putGetDone(otherRequestPendingStatus)); + if (!(*m_putData->getStructure() == *pvPutStructure->getStructure())) + { + EXCEPTION_GUARD(m_channelPutGetRequester->putGetDone(invalidPutStructureStatus, shared_from_this(), nullPVStructure, nullBitSet)); + return; + } + + if (bitSet->size() < m_putDataBitSet->size()) + { + EXCEPTION_GUARD(m_channelPutGetRequester->putGetDone(invalidBitSetLengthStatus, shared_from_this(), nullPVStructure, nullBitSet)); + return; + } + + if (!startRequest(m_lastRequest.get() ? QOS_DESTROY : QOS_DEFAULT)) { + EXCEPTION_GUARD(m_channelPutGetRequester->putGetDone(otherRequestPendingStatus, shared_from_this(), nullPVStructure, nullBitSet)); return; } try { + lock(); + m_putPutData = pvPutStructure; + m_putPutDataBitSet = bitSet; + unlock(); m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this()); } catch (std::runtime_error &rte) { stopRequest(); - EXCEPTION_GUARD(m_channelPutGetRequester->putGetDone(channelNotConnected)); + EXCEPTION_GUARD(m_channelPutGetRequester->putGetDone(channelNotConnected, shared_from_this(), nullPVStructure, nullBitSet)); } } @@ -1157,17 +1225,17 @@ namespace epics { { Lock guard(m_mutex); if (m_destroyed) { - EXCEPTION_GUARD(m_channelPutGetRequester->getGetDone(destroyedStatus)); + EXCEPTION_GUARD(m_channelPutGetRequester->getGetDone(destroyedStatus, shared_from_this(), nullPVStructure, nullBitSet)); return; } if (!m_initialized) { - EXCEPTION_GUARD(m_channelPutGetRequester->getGetDone(notInitializedStatus)); + EXCEPTION_GUARD(m_channelPutGetRequester->getGetDone(notInitializedStatus, shared_from_this(), nullPVStructure, nullBitSet)); return; } } - if (!startRequest(QOS_GET)) { - EXCEPTION_GUARD(m_channelPutGetRequester->getGetDone(otherRequestPendingStatus)); + if (!startRequest(m_lastRequest.get() ? QOS_DESTROY | QOS_GET : QOS_GET)) { + EXCEPTION_GUARD(m_channelPutGetRequester->getGetDone(otherRequestPendingStatus, shared_from_this(), nullPVStructure, nullBitSet)); return; } @@ -1175,7 +1243,7 @@ namespace epics { m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this()); } catch (std::runtime_error &rte) { stopRequest(); - EXCEPTION_GUARD(m_channelPutGetRequester->getGetDone(channelNotConnected)); + EXCEPTION_GUARD(m_channelPutGetRequester->getGetDone(channelNotConnected, shared_from_this(), nullPVStructure, nullBitSet)); } } @@ -1183,17 +1251,17 @@ namespace epics { { Lock guard(m_mutex); if (m_destroyed) { - m_channelPutGetRequester->getPutDone(destroyedStatus); + m_channelPutGetRequester->getPutDone(destroyedStatus, shared_from_this(), nullPVStructure, nullBitSet); return; } if (!m_initialized) { - EXCEPTION_GUARD(m_channelPutGetRequester->getPutDone(notInitializedStatus)); + EXCEPTION_GUARD(m_channelPutGetRequester->getPutDone(notInitializedStatus, shared_from_this(), nullPVStructure, nullBitSet)); return; } } - if (!startRequest(QOS_GET_PUT)) { - m_channelPutGetRequester->getPutDone(otherRequestPendingStatus); + if (!startRequest(m_lastRequest.get() ? QOS_DESTROY | QOS_GET_PUT : QOS_GET_PUT)) { + m_channelPutGetRequester->getPutDone(otherRequestPendingStatus, shared_from_this(), nullPVStructure, nullBitSet); return; } @@ -1201,7 +1269,7 @@ namespace epics { m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this()); } catch (std::runtime_error &rte) { stopRequest(); - EXCEPTION_GUARD(m_channelPutGetRequester->getPutDone(channelNotConnected)); + EXCEPTION_GUARD(m_channelPutGetRequester->getPutDone(channelNotConnected, shared_from_this(), nullPVStructure, nullBitSet)); } } @@ -1215,6 +1283,11 @@ namespace epics { BaseRequestImpl::destroy(); } + virtual void lastRequest() + { + BaseRequestImpl::lastRequest(); + } + virtual void lock() { m_structureMutex.lock(); @@ -1328,55 +1401,47 @@ namespace epics { stopRequest(); } - virtual bool destroyResponse(Transport::shared_pointer const & transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, const Status& status) { - // data available - // TODO we need a flag here... - return normalResponse(transport, version, payloadBuffer, qos, status); - } - - virtual bool initResponse(Transport::shared_pointer const & /*transport*/, int8 /*version*/, ByteBuffer* /*payloadBuffer*/, int8 /*qos*/, const Status& status) { + virtual void initResponse(Transport::shared_pointer const & /*transport*/, int8 /*version*/, ByteBuffer* /*payloadBuffer*/, int8 /*qos*/, const Status& status) { if (!status.isSuccess()) { ChannelRPC::shared_pointer thisChannelRPC = dynamic_pointer_cast(shared_from_this()); EXCEPTION_GUARD(m_channelRPCRequester->channelRPCConnect(status, thisChannelRPC)); - return true; + return; } // notify ChannelRPC::shared_pointer thisChannelRPC = dynamic_pointer_cast(shared_from_this()); EXCEPTION_GUARD(m_channelRPCRequester->channelRPCConnect(status, thisChannelRPC)); - return true; } - virtual bool normalResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) { + virtual void normalResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) { if (!status.isSuccess()) { - EXCEPTION_GUARD(m_channelRPCRequester->requestDone(status, nullPVStructure)); - return true; + EXCEPTION_GUARD(m_channelRPCRequester->requestDone(status, shared_from_this(), nullPVStructure)); + return; } PVStructure::shared_pointer response(SerializationHelper::deserializeStructureFull(payloadBuffer, transport.get())); - EXCEPTION_GUARD(m_channelRPCRequester->requestDone(status, response)); - return true; + EXCEPTION_GUARD(m_channelRPCRequester->requestDone(status, shared_from_this(), response)); } - virtual void request(epics::pvData::PVStructure::shared_pointer const & pvArgument, bool lastRequest) { + virtual void request(epics::pvData::PVStructure::shared_pointer const & pvArgument) { { Lock guard(m_mutex); if (m_destroyed) { - EXCEPTION_GUARD(m_channelRPCRequester->requestDone(destroyedStatus, nullPVStructure)); + EXCEPTION_GUARD(m_channelRPCRequester->requestDone(destroyedStatus, shared_from_this(), nullPVStructure)); return; } if (!m_initialized) { - EXCEPTION_GUARD(m_channelRPCRequester->requestDone(notInitializedStatus, nullPVStructure)); + EXCEPTION_GUARD(m_channelRPCRequester->requestDone(notInitializedStatus, shared_from_this(), nullPVStructure)); return; } } - if (!startRequest(lastRequest ? QOS_DESTROY : QOS_DEFAULT)) { - EXCEPTION_GUARD(m_channelRPCRequester->requestDone(otherRequestPendingStatus, nullPVStructure)); + if (!startRequest(m_lastRequest.get() ? QOS_DESTROY : QOS_DEFAULT)) { + EXCEPTION_GUARD(m_channelRPCRequester->requestDone(otherRequestPendingStatus, shared_from_this(), nullPVStructure)); return; } @@ -1388,7 +1453,7 @@ namespace epics { m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this()); } catch (std::runtime_error &rte) { stopRequest(); - EXCEPTION_GUARD(m_channelRPCRequester->requestDone(channelNotConnected, nullPVStructure)); + EXCEPTION_GUARD(m_channelRPCRequester->requestDone(channelNotConnected, shared_from_this(), nullPVStructure)); } } @@ -1402,6 +1467,11 @@ namespace epics { BaseRequestImpl::destroy(); } + virtual void lastRequest() + { + BaseRequestImpl::lastRequest(); + } + virtual void lock() { m_structureMutex.lock(); @@ -1432,10 +1502,15 @@ namespace epics { PVStructure::shared_pointer m_pvRequest; - PVArray::shared_pointer m_structure; + // data container (for get) + PVArray::shared_pointer m_data; + // reference store (for put + PVArray::shared_pointer m_putData; + size_t m_offset; size_t m_count; + size_t m_stride; size_t m_length; size_t m_capacity; @@ -1457,7 +1532,7 @@ namespace epics { if (m_pvRequest == 0) { ChannelArray::shared_pointer thisPointer = dynamic_pointer_cast(shared_from_this()); - EXCEPTION_GUARD(m_channelArrayRequester->channelArrayConnect(pvRequestNull, thisPointer, PVArray::shared_pointer())); + EXCEPTION_GUARD(m_channelArrayRequester->channelArrayConnect(pvRequestNull, thisPointer, Array::shared_pointer())); return; } @@ -1468,7 +1543,7 @@ namespace epics { resubscribeSubscription(m_channel->checkDestroyedAndGetTransport()); } catch (std::runtime_error &rte) { ChannelArray::shared_pointer thisPointer = dynamic_pointer_cast(shared_from_this()); - EXCEPTION_GUARD(m_channelArrayRequester->channelArrayConnect(channelDestroyed, thisPointer, PVArray::shared_pointer())); + EXCEPTION_GUARD(m_channelArrayRequester->channelArrayConnect(channelDestroyed, thisPointer, Array::shared_pointer())); BaseRequestImpl::destroy(true); } } @@ -1509,6 +1584,7 @@ namespace epics { // lock... see comment below SerializeHelper::writeSize(m_offset, buffer, control); SerializeHelper::writeSize(m_count, buffer, control); + SerializeHelper::writeSize(m_stride, buffer, control); } else if (pendingRequest & QOS_GET_PUT) // i.e. setLength { @@ -1516,6 +1592,10 @@ namespace epics { SerializeHelper::writeSize(m_length, buffer, control); SerializeHelper::writeSize(m_capacity, buffer, control); } + else if (pendingRequest & QOS_PROCESS) // i.e. getLength + { + // noop + } // put else { @@ -1523,87 +1603,93 @@ namespace epics { // no need to lock here, since it is already locked via TransportSender IF //Lock lock(m_structureMutex); SerializeHelper::writeSize(m_offset, buffer, control); - m_structure->serialize(buffer, control, 0, m_count ? m_count : m_structure->getLength()); // put from 0 offset (see API doc), m_count == 0 means entire array + SerializeHelper::writeSize(m_stride, buffer, control); + // TODO what about count sanity check? + m_putData->serialize(buffer, control, 0, m_count ? m_count : m_putData->getLength()); // put from 0 offset (see API doc), m_count == 0 means entire array + // release reference + m_putData.reset(); } } stopRequest(); } - virtual bool destroyResponse(Transport::shared_pointer const & transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, const Status& status) { - // data available (get with destroy) - if (qos & QOS_GET) - return normalResponse(transport, version, payloadBuffer, qos, status); - return true; - } - - virtual bool initResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) { + virtual void initResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 /*qos*/, const Status& status) { if (!status.isSuccess()) { ChannelArray::shared_pointer thisChannelArray = dynamic_pointer_cast(shared_from_this()); EXCEPTION_GUARD(m_channelArrayRequester->channelArrayConnect(status, thisChannelArray, PVArray::shared_pointer())); - return true; + return; } // create data and its bitSet FieldConstPtr field = transport->cachedDeserialize(payloadBuffer); { Lock lock(m_structureMutex); - m_structure = dynamic_pointer_cast(getPVDataCreate()->createPVField(field)); + m_data = dynamic_pointer_cast(getPVDataCreate()->createPVField(field)); } // notify ChannelArray::shared_pointer thisChannelArray = dynamic_pointer_cast(shared_from_this()); - EXCEPTION_GUARD(m_channelArrayRequester->channelArrayConnect(status, thisChannelArray, m_structure)); - return true; + EXCEPTION_GUARD(m_channelArrayRequester->channelArrayConnect(status, thisChannelArray, m_data->getArray())); } - virtual bool normalResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 qos, const Status& status) { + virtual void normalResponse(Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, int8 qos, const Status& status) { + + ChannelArray::shared_pointer thisChannelArray = dynamic_pointer_cast(shared_from_this()); + if (qos & QOS_GET) { if (!status.isSuccess()) { - m_channelArrayRequester->getArrayDone(status); - return true; + m_channelArrayRequester->getArrayDone(status, thisChannelArray, PVArray::shared_pointer()); + return; } { Lock lock(m_structureMutex); - m_structure->deserialize(payloadBuffer, transport.get()); + m_data->deserialize(payloadBuffer, transport.get()); } - EXCEPTION_GUARD(m_channelArrayRequester->getArrayDone(status)); - return true; + EXCEPTION_GUARD(m_channelArrayRequester->getArrayDone(status, thisChannelArray, m_data)); } else if (qos & QOS_GET_PUT) { - EXCEPTION_GUARD(m_channelArrayRequester->setLengthDone(status)); - return true; + EXCEPTION_GUARD(m_channelArrayRequester->setLengthDone(status, thisChannelArray)); + } + else if (qos & QOS_PROCESS) + { + size_t length = SerializeHelper::readSize(payloadBuffer, transport.get()); + size_t capacity = SerializeHelper::readSize(payloadBuffer, transport.get()); + + EXCEPTION_GUARD(m_channelArrayRequester->getLengthDone(status, thisChannelArray, length, capacity)); } else { - EXCEPTION_GUARD(m_channelArrayRequester->putArrayDone(status)); - return true; + EXCEPTION_GUARD(m_channelArrayRequester->putArrayDone(status, thisChannelArray)); } } - virtual void getArray(bool lastRequest, size_t offset, size_t count) { + virtual void getArray(size_t offset, size_t count, size_t stride) { + // TODO stride == 0 check + ChannelArray::shared_pointer thisChannelArray = dynamic_pointer_cast(shared_from_this()); + { Lock guard(m_mutex); if (m_destroyed) { - EXCEPTION_GUARD(m_channelArrayRequester->getArrayDone(destroyedStatus)); + EXCEPTION_GUARD(m_channelArrayRequester->getArrayDone(destroyedStatus, thisChannelArray, PVArray::shared_pointer())); return; } if (!m_initialized) { - EXCEPTION_GUARD(m_channelArrayRequester->getArrayDone(notInitializedStatus)); + EXCEPTION_GUARD(m_channelArrayRequester->getArrayDone(notInitializedStatus, thisChannelArray, PVArray::shared_pointer())); return; } } - if (!startRequest(lastRequest ? QOS_DESTROY | QOS_GET : QOS_GET)) { - EXCEPTION_GUARD(m_channelArrayRequester->getArrayDone(otherRequestPendingStatus)); + if (!startRequest(m_lastRequest.get() ? QOS_DESTROY | QOS_GET : QOS_GET)) { + EXCEPTION_GUARD(m_channelArrayRequester->getArrayDone(otherRequestPendingStatus, thisChannelArray, PVArray::shared_pointer())); return; } @@ -1612,62 +1698,73 @@ namespace epics { Lock lock(m_structureMutex); m_offset = offset; m_count = count; + m_stride = stride; } m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this()); } catch (std::runtime_error &rte) { stopRequest(); - EXCEPTION_GUARD(m_channelArrayRequester->getArrayDone(channelNotConnected)); + EXCEPTION_GUARD(m_channelArrayRequester->getArrayDone(channelNotConnected, thisChannelArray, PVArray::shared_pointer())); } } - virtual void putArray(bool lastRequest, size_t offset, size_t count) { + virtual void putArray(PVArray::shared_pointer const & putArray, size_t offset, size_t count, size_t stride) { + + // TODO stride == 0 check { Lock guard(m_mutex); if (m_destroyed) { - EXCEPTION_GUARD(m_channelArrayRequester->putArrayDone(destroyedStatus)); + EXCEPTION_GUARD(m_channelArrayRequester->putArrayDone(destroyedStatus, shared_from_this())); return; } if (!m_initialized) { - EXCEPTION_GUARD(m_channelArrayRequester->putArrayDone(notInitializedStatus)); + EXCEPTION_GUARD(m_channelArrayRequester->putArrayDone(notInitializedStatus, shared_from_this())); return; } } - if (!startRequest(lastRequest ? QOS_DESTROY : QOS_DEFAULT)) { - EXCEPTION_GUARD(m_channelArrayRequester->putArrayDone(otherRequestPendingStatus)); + if (!(*m_data->getArray() == *putArray->getArray())) + { + EXCEPTION_GUARD(m_channelArrayRequester->putArrayDone(invalidPutArrayStatus, shared_from_this())); + return; + } + + if (!startRequest(m_lastRequest.get() ? QOS_DESTROY : QOS_DEFAULT)) { + EXCEPTION_GUARD(m_channelArrayRequester->putArrayDone(otherRequestPendingStatus, shared_from_this())); return; } try { { Lock lock(m_structureMutex); + m_putData = putArray; m_offset = offset; m_count = count; + m_stride = stride; } m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this()); } catch (std::runtime_error &rte) { stopRequest(); - EXCEPTION_GUARD(m_channelArrayRequester->putArrayDone(channelNotConnected)); + EXCEPTION_GUARD(m_channelArrayRequester->putArrayDone(channelNotConnected, shared_from_this())); } } - virtual void setLength(bool lastRequest, size_t length, size_t capacity) { + virtual void setLength(size_t length, size_t capacity) { { Lock guard(m_mutex); if (m_destroyed) { - EXCEPTION_GUARD(m_channelArrayRequester->setLengthDone(destroyedStatus)); + EXCEPTION_GUARD(m_channelArrayRequester->setLengthDone(destroyedStatus, shared_from_this())); return; } if (!m_initialized) { - EXCEPTION_GUARD(m_channelArrayRequester->setLengthDone(notInitializedStatus)); + EXCEPTION_GUARD(m_channelArrayRequester->setLengthDone(notInitializedStatus, shared_from_this())); return; } } - if (!startRequest(lastRequest ? QOS_DESTROY | QOS_GET_PUT : QOS_GET_PUT)) { - EXCEPTION_GUARD(m_channelArrayRequester->setLengthDone(otherRequestPendingStatus)); + if (!startRequest(m_lastRequest.get() ? QOS_DESTROY | QOS_GET_PUT : QOS_GET_PUT)) { + EXCEPTION_GUARD(m_channelArrayRequester->setLengthDone(otherRequestPendingStatus, shared_from_this())); return; } @@ -1680,7 +1777,35 @@ namespace epics { m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this()); } catch (std::runtime_error &rte) { stopRequest(); - EXCEPTION_GUARD(m_channelArrayRequester->setLengthDone(channelNotConnected)); + EXCEPTION_GUARD(m_channelArrayRequester->setLengthDone(channelNotConnected, shared_from_this())); + } + } + + + virtual void getLength() { + + { + Lock guard(m_mutex); + if (m_destroyed) { + EXCEPTION_GUARD(m_channelArrayRequester->getLengthDone(destroyedStatus, shared_from_this(), 0, 0)); + return; + } + if (!m_initialized) { + EXCEPTION_GUARD(m_channelArrayRequester->getLengthDone(notInitializedStatus, shared_from_this(), 0, 0)); + return; + } + } + + if (!startRequest(m_lastRequest.get() ? QOS_DESTROY | QOS_PROCESS : QOS_PROCESS)) { + EXCEPTION_GUARD(m_channelArrayRequester->getLengthDone(otherRequestPendingStatus, shared_from_this(), 0, 0)); + return; + } + + try { + m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this()); + } catch (std::runtime_error &rte) { + stopRequest(); + EXCEPTION_GUARD(m_channelArrayRequester->getLengthDone(channelNotConnected, shared_from_this(), 0, 0)); } } @@ -1694,6 +1819,11 @@ namespace epics { BaseRequestImpl::destroy(); } + virtual void lastRequest() + { + BaseRequestImpl::lastRequest(); + } + virtual void lock() { m_structureMutex.lock(); @@ -2107,17 +2237,7 @@ namespace epics { stopRequest(); } - virtual bool destroyResponse( - Transport::shared_pointer const & transport, - int8 version, ByteBuffer* payloadBuffer, - int8 qos, const Status& status) - { - // data available - // TODO if (qos & QOS_GET) - return normalResponse(transport, version, payloadBuffer, qos, status); - } - - virtual bool initResponse( + virtual void initResponse( Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, @@ -2128,7 +2248,7 @@ namespace epics { { Monitor::shared_pointer thisChannelMonitor = dynamic_pointer_cast(shared_from_this()); EXCEPTION_GUARD(m_monitorRequester->monitorConnect(status, thisChannelMonitor, StructureConstPtr())); - return true; + return; } StructureConstPtr structure = @@ -2143,11 +2263,9 @@ namespace epics { if (m_started) start(); - - return true; } - virtual bool normalResponse( + virtual void normalResponse( Transport::shared_pointer const & transport, int8 /*version*/, ByteBuffer* payloadBuffer, @@ -2162,7 +2280,6 @@ namespace epics { { m_ElementQueue->response(transport, payloadBuffer); } - return true; } // override, since we optimize status @@ -2194,8 +2311,7 @@ namespace epics { m_initialized = false; m_mutex.unlock(); - if (!destroyResponse(transport, version, payloadBuffer, qos, status)) - destroy(); + normalResponse(transport, version, payloadBuffer, qos, status); } else { diff --git a/pvAccessApp/rpcService/rpcServer.cpp b/pvAccessApp/rpcService/rpcServer.cpp index 8b50efa..022b961 100644 --- a/pvAccessApp/rpcService/rpcServer.cpp +++ b/pvAccessApp/rpcService/rpcServer.cpp @@ -463,6 +463,37 @@ void RPCServer::run(int seconds) m_serverContext->run(seconds); } +struct ThreadRunnerParam { + RPCServer::shared_pointer server; + int timeToRun; +}; + +static void threadRunner(void* usr) +{ + ThreadRunnerParam* pusr = static_cast(usr); + ThreadRunnerParam param = *pusr; + delete pusr; + + param.server->run(param.timeToRun); +} + +/// Method requires usage of std::tr1::shared_ptr. This instance must be +/// owned by a shared_ptr instance. +void RPCServer::runInNewThread(int seconds) +{ + std::auto_ptr param(new ThreadRunnerParam()); + param->server = rpcServer; + param->timeToRun = seconds; + + epicsThreadCreate("RPCServer thread", + epicsThreadPriorityMedium, + epicsThreadGetStackSize(epicsThreadStackSmall), + threadRunner, param.get()); + + // let the thread delete 'param' + param.release(); +} + void RPCServer::destroy() { m_serverContext->destroy(); diff --git a/pvAccessApp/rpcService/rpcServer.h b/pvAccessApp/rpcService/rpcServer.h index 53f97b2..fc102e2 100644 --- a/pvAccessApp/rpcService/rpcServer.h +++ b/pvAccessApp/rpcService/rpcServer.h @@ -27,7 +27,9 @@ namespace epics { namespace pvAccess { -class epicsShareClass RPCServer { +class epicsShareClass RPCServer : + public std::tr1::enable_shared_from_this +{ private: ServerContextImpl::shared_pointer m_serverContext; @@ -49,6 +51,10 @@ class epicsShareClass RPCServer { void run(int seconds = 0); + /// Method requires usage of std::tr1::shared_ptr. This instance must be + /// owned by a shared_ptr instance. + void runInNewThread(int seconds = 0); + void destroy(); /**