From a21908de890689bc63aa1f6fd9d7471921fddea6 Mon Sep 17 00:00:00 2001 From: Matej Sekoranja Date: Thu, 12 Feb 2015 13:48:29 +0100 Subject: [PATCH] channel get/put/putget/array local storage instead of references --- src/remote/serializationHelper.cpp | 49 ++++++++++ src/remote/serializationHelper.h | 8 ++ src/remoteClient/clientContextImpl.cpp | 100 +++---------------- src/server/responseHandlers.cpp | 130 ++++++++++++++----------- src/server/responseHandlers.h | 37 ++----- 5 files changed, 151 insertions(+), 173 deletions(-) diff --git a/src/remote/serializationHelper.cpp b/src/remote/serializationHelper.cpp index 23d3317..d9d42d3 100644 --- a/src/remote/serializationHelper.cpp +++ b/src/remote/serializationHelper.cpp @@ -87,6 +87,55 @@ void SerializationHelper::serializeFull(ByteBuffer* buffer, SerializableControl* } } +void SerializationHelper::partialCopy(PVStructure::shared_pointer const & from, + PVStructure::shared_pointer const & to, + BitSet::shared_pointer const & maskBitSet, + bool inverse) { + + size_t numberFields = from->getNumberFields(); + size_t offset = from->getFieldOffset(); + int32 next = inverse ? + maskBitSet->nextClearBit(static_cast(offset)) : + maskBitSet->nextSetBit(static_cast(offset)); + + // no more changes or no changes in this structure + if(next<0||next>=static_cast(offset+numberFields)) return; + + // entire structure + if(static_cast(offset)==next) { + getConvert()->copy(from, to); + return; + } + + PVFieldPtrArray const & fromPVFields = from->getPVFields(); + PVFieldPtrArray const & toPVFields = to->getPVFields(); + + size_t fieldsSize = fromPVFields.size(); + for(size_t i = 0; igetFieldOffset(); + int32 inumberFields = static_cast(pvField->getNumberFields()); + next = inverse ? + maskBitSet->nextClearBit(static_cast(offset)) : + maskBitSet->nextSetBit(static_cast(offset)); + + // no more changes + if(next<0) return; + // no change in this pvField + if(next>=static_cast(offset+inumberFields)) continue; + + // serialize field or fields + if(inumberFields==1) { + getConvert()->copy(pvField, toPVFields[i]); + } else { + PVStructure::shared_pointer fromPVStructure = std::tr1::static_pointer_cast(pvField); + PVStructure::shared_pointer toPVStructure = std::tr1::static_pointer_cast(toPVFields[i]); + partialCopy(fromPVStructure, toPVStructure, maskBitSet); + } + } +} + + }} diff --git a/src/remote/serializationHelper.h b/src/remote/serializationHelper.h index 355d5f1..dfba71f 100644 --- a/src/remote/serializationHelper.h +++ b/src/remote/serializationHelper.h @@ -98,6 +98,14 @@ namespace epics { * @param buffer data buffer. */ static void serializeFull(epics::pvData::ByteBuffer* buffer, epics::pvData::SerializableControl* control, epics::pvData::PVField::shared_pointer const & pvField); + + // TODO move somewhere else, to pvData? + static void partialCopy( + epics::pvData::PVStructure::shared_pointer const & from, + epics::pvData::PVStructure::shared_pointer const & to, + epics::pvData::BitSet::shared_pointer const & maskBitSet, + bool inverse = false); + }; } diff --git a/src/remoteClient/clientContextImpl.cpp b/src/remoteClient/clientContextImpl.cpp index 8aaa489..be841b4 100644 --- a/src/remoteClient/clientContextImpl.cpp +++ b/src/remoteClient/clientContextImpl.cpp @@ -786,14 +786,9 @@ namespace epics { PVStructure::shared_pointer m_pvRequest; - // get structure container PVStructure::shared_pointer m_structure; BitSet::shared_pointer m_bitSet; - // put reference store - PVStructure::shared_pointer m_pvPutStructure; - BitSet::shared_pointer m_pvPutBitSet; - Mutex m_structureMutex; ChannelPutImpl(ChannelImpl::shared_pointer const & channel, ChannelPutRequester::shared_pointer const & channelPutRequester, PVStructure::shared_pointer const & pvRequest) : @@ -869,12 +864,8 @@ namespace epics { { // no need to lock here, since it is already locked via TransportSender IF //Lock lock(m_structureMutex); - m_pvPutBitSet->serialize(buffer, control); - m_pvPutStructure->serialize(buffer, control, m_pvPutBitSet.get()); - - // release references - m_pvPutBitSet.reset(); - m_pvPutStructure.reset(); + m_bitSet->serialize(buffer, control); + m_structure->serialize(buffer, control, m_bitSet.get()); } } @@ -992,8 +983,8 @@ namespace epics { try { lock(); - m_pvPutStructure = pvPutStructure; - m_pvPutBitSet = pvPutBitSet; + *m_bitSet = *pvPutBitSet; + SerializationHelper::partialCopy(pvPutStructure, m_structure, m_bitSet); unlock(); m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this()); } catch (std::runtime_error &rte) { @@ -1059,10 +1050,6 @@ namespace epics { PVStructure::shared_pointer m_getData; BitSet::shared_pointer m_getDataBitSet; - // putGet reference store - PVStructure::shared_pointer m_putPutData; - BitSet::shared_pointer m_putPutDataBitSet; - Mutex m_structureMutex; ChannelPutGetImpl(ChannelImpl::shared_pointer const & channel, ChannelPutGetRequester::shared_pointer const & channelPutGetRequester, PVStructure::shared_pointer const & pvRequest) : @@ -1139,12 +1126,8 @@ namespace epics { { // no need to lock here, since it is already locked via TransportSender IF //Lock lock(m_structureMutex); - m_putPutDataBitSet->serialize(buffer, control); - m_putPutData->serialize(buffer, control, m_putPutDataBitSet.get()); - - // release references - m_putPutDataBitSet.reset(); - m_putPutData.reset(); + m_putDataBitSet->serialize(buffer, control); + m_putData->serialize(buffer, control, m_putDataBitSet.get()); } } @@ -1266,8 +1249,8 @@ namespace epics { try { lock(); - m_putPutData = pvPutStructure; - m_putPutDataBitSet = bitSet; + *m_putDataBitSet = *bitSet; + SerializationHelper::partialCopy(pvPutStructure, m_putData, m_putDataBitSet); unlock(); m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this()); } catch (std::runtime_error &rte) { @@ -1582,12 +1565,9 @@ namespace epics { PVStructure::shared_pointer m_pvRequest; - // data container (for get) + // data container PVArray::shared_pointer m_arrayData; - // reference store (for put - PVArray::shared_pointer m_putData; - size_t m_offset; size_t m_count; size_t m_stride; @@ -1687,9 +1667,7 @@ namespace epics { SerializeHelper::writeSize(m_offset, buffer, control); SerializeHelper::writeSize(m_stride, buffer, control); // TODO what about count sanity check? - m_putData->serialize(buffer, control, 0, m_count ? m_count : m_putData->getLength()); // put from 0 offset (see API doc), m_count == 0 means entire array - // release reference - m_putData.reset(); + m_arrayData->serialize(buffer, control, 0, m_count ? m_count : m_arrayData->getLength()); // put from 0 offset (see API doc), m_count == 0 means entire array } } @@ -1821,7 +1799,7 @@ namespace epics { try { { Lock lock(m_structureMutex); - m_putData = putArray; + convert->copy(putArray, m_arrayData); // TODO avoid isComptabile checks m_offset = offset; m_count = count; m_stride = stride; @@ -2159,56 +2137,6 @@ namespace epics { } } - - - void partialCopy(PVStructure::shared_pointer const & from, - PVStructure::shared_pointer const & to, - BitSet::shared_pointer const & maskBitSet, - bool inverse = false) { - - size_t numberFields = from->getNumberFields(); - size_t offset = from->getFieldOffset(); - int32 next = inverse ? - maskBitSet->nextClearBit(static_cast(offset)) : - maskBitSet->nextSetBit(static_cast(offset)); - - // no more changes or no changes in this structure - if(next<0||next>=static_cast(offset+numberFields)) return; - - // entire structure - if(static_cast(offset)==next) { - getConvert()->copy(from, to); - return; - } - - PVFieldPtrArray const & fromPVFields = from->getPVFields(); - PVFieldPtrArray const & toPVFields = to->getPVFields(); - - size_t fieldsSize = fromPVFields.size(); - for(size_t i = 0; igetFieldOffset(); - int32 inumberFields = static_cast(pvField->getNumberFields()); - next = inverse ? - maskBitSet->nextClearBit(static_cast(offset)) : - maskBitSet->nextSetBit(static_cast(offset)); - - // no more changes - if(next<0) return; - // no change in this pvField - if(next>=static_cast(offset+inumberFields)) continue; - - // serialize field or fields - if(inumberFields==1) { - getConvert()->copy(pvField, toPVFields[i]); - } else { - PVStructure::shared_pointer fromPVStructure = std::tr1::static_pointer_cast(pvField); - PVStructure::shared_pointer toPVStructure = std::tr1::static_pointer_cast(toPVFields[i]); - partialCopy(fromPVStructure, toPVStructure, maskBitSet); - } - } - } - /* virtual void response(Transport::shared_pointer const & transport, ByteBuffer* payloadBuffer) { @@ -2225,7 +2153,7 @@ namespace epics { { // take new, put current in use PVStructurePtr pvStructure = m_monitorElement->pvStructurePtr; - getConvert()->copy(pvStructure, newElement->pvStructurePtr); + convert->copy(pvStructure, newElement->pvStructurePtr); BitSetUtil::compress(m_monitorElement->changedBitSet, pvStructure); BitSetUtil::compress(m_monitorElement->overrunBitSet, pvStructure); @@ -2297,7 +2225,7 @@ namespace epics { m_overrunInProgress = false; } - getConvert()->copy(pvStructure, newElement->pvStructurePtr); + convert->copy(pvStructure, newElement->pvStructurePtr); m_monitorQueue.setUsed(m_monitorElement); @@ -2363,7 +2291,7 @@ namespace epics { // deserialize changedBitSet and data, and overrun bit set changedBitSet->deserialize(payloadBuffer, transport.get()); if (m_up2datePVStructure && m_up2datePVStructure.get() != pvStructure.get()) - partialCopy(m_up2datePVStructure, pvStructure, changedBitSet, true); + SerializationHelper::partialCopy(m_up2datePVStructure, pvStructure, changedBitSet, true); pvStructure->deserialize(payloadBuffer, transport.get(), changedBitSet.get()); overrunBitSet->deserialize(payloadBuffer, transport.get()); diff --git a/src/server/responseHandlers.cpp b/src/server/responseHandlers.cpp index e0f783c..8be2ce4 100644 --- a/src/server/responseHandlers.cpp +++ b/src/server/responseHandlers.cpp @@ -1088,8 +1088,14 @@ void ServerChannelGetRequesterImpl::channelGetConnect(const Status& status, Chan Lock guard(_mutex); _status = status; _channelGet = channelGet; - _structure = structure; - } + + if (_status.isSuccess()) + { + _pvStructure = std::tr1::static_pointer_cast(reuseOrCreatePVField(structure, _pvStructure)); + _bitSet = createBitSetFor(_pvStructure, _bitSet); + } + } + TransportSender::shared_pointer thisSender = shared_from_this(); _transport->enqueueSendRequest(thisSender); @@ -1107,10 +1113,14 @@ void ServerChannelGetRequesterImpl::getDone(const Status& status, ChannelGet::sh { Lock guard(_mutex); _status = status; - _pvStructure = pvStructure; - _bitSet = bitSet; + if (_status.isSuccess()) + { + *_bitSet = *bitSet; + SerializationHelper::partialCopy(pvStructure, _pvStructure, _bitSet); + } } - TransportSender::shared_pointer thisSender = shared_from_this(); + + TransportSender::shared_pointer thisSender = shared_from_this(); _transport->enqueueSendRequest(thisSender); } @@ -1188,7 +1198,7 @@ void ServerChannelGetRequesterImpl::send(ByteBuffer* buffer, TransportSendContro if (request & QOS_INIT) { Lock guard(_mutex); - control->cachedSerialize(_structure, buffer); + control->cachedSerialize(_pvStructure->getStructure(), buffer); } else { @@ -1198,9 +1208,6 @@ void ServerChannelGetRequesterImpl::send(ByteBuffer* buffer, TransportSendContro _bitSet->serialize(buffer, control); _pvStructure->serialize(buffer, control, _bitSet.get()); - - _pvStructure.reset(); - _bitSet.reset(); } MB_POINT(channelGet, 7, "server channelGet->serialize response (end)"); } @@ -1356,14 +1363,12 @@ void ServerChannelPutRequesterImpl::channelPutConnect(const Status& status, Chan Lock guard(_mutex); _status = status; _channelPut = channelPut; - _structure = structure; - } - - if (status.isSuccess()) - { - _putPVStructure = std::tr1::static_pointer_cast(reuseOrCreatePVField(_structure, _putPVStructure)); - _putBitSet = createBitSetFor(_putPVStructure, _putBitSet); - } + if (_status.isSuccess()) + { + _pvStructure = std::tr1::static_pointer_cast(reuseOrCreatePVField(structure, _pvStructure)); + _bitSet = createBitSetFor(_pvStructure, _bitSet); + } + } TransportSender::shared_pointer thisSender = shared_from_this(); _transport->enqueueSendRequest(thisSender); @@ -1390,9 +1395,12 @@ void ServerChannelPutRequesterImpl::getDone(const Status& status, ChannelPut::sh { Lock guard(_mutex); _status = status; - _pvStructure = pvStructure; - _bitSet = bitSet; - } + if (_status.isSuccess()) + { + *_bitSet = *bitSet; + SerializationHelper::partialCopy(pvStructure, _pvStructure, _bitSet); + } + } TransportSender::shared_pointer thisSender = shared_from_this(); _transport->enqueueSendRequest(thisSender); } @@ -1441,13 +1449,13 @@ ChannelPut::shared_pointer ServerChannelPutRequesterImpl::getChannelPut() BitSet::shared_pointer ServerChannelPutRequesterImpl::getPutBitSet() { //Lock guard(_mutex); - return _putBitSet; + return _bitSet; } PVStructure::shared_pointer ServerChannelPutRequesterImpl::getPutPVStructure() { //Lock guard(_mutex); - return _putPVStructure; + return _pvStructure; } void ServerChannelPutRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* control) @@ -1476,7 +1484,7 @@ void ServerChannelPutRequesterImpl::send(ByteBuffer* buffer, TransportSendContro if ((QOS_INIT & request) != 0) { Lock guard(_mutex); - control->cachedSerialize(_structure, buffer); + control->cachedSerialize(_pvStructure->getStructure(), buffer); } else if ((QOS_GET & request) != 0) { @@ -1649,15 +1657,15 @@ void ServerChannelPutGetRequesterImpl::channelPutGetConnect(const Status& status Lock guard(_mutex); _status = status; _channelPutGet = channelPutGet; - _putStructure = putStructure; - _getStructure = getStructure; - } - - if (status.isSuccess()) - { - _pvPutGetStructure = std::tr1::static_pointer_cast(reuseOrCreatePVField(_putStructure, _pvPutGetStructure)); - _pvPutGetBitSet = createBitSetFor(_pvPutGetStructure, _pvPutGetBitSet); - } + if (_status.isSuccess()) + { + _pvPutStructure = std::tr1::static_pointer_cast(reuseOrCreatePVField(putStructure, _pvPutStructure)); + _pvPutBitSet = createBitSetFor(_pvPutStructure, _pvPutBitSet); + + _pvGetStructure = std::tr1::static_pointer_cast(reuseOrCreatePVField(getStructure, _pvGetStructure)); + _pvGetBitSet = createBitSetFor(_pvGetStructure, _pvGetBitSet); + } + } TransportSender::shared_pointer thisSender = shared_from_this(); _transport->enqueueSendRequest(thisSender); @@ -1675,8 +1683,11 @@ void ServerChannelPutGetRequesterImpl::getGetDone(const Status& status, ChannelP { Lock guard(_mutex); _status = status; - _pvGetStructure = pvStructure; - _pvGetBitSet = bitSet; + if (_status.isSuccess()) + { + *_pvGetBitSet = *bitSet; + SerializationHelper::partialCopy(pvStructure, _pvGetStructure, _pvGetBitSet); + } } TransportSender::shared_pointer thisSender = shared_from_this(); _transport->enqueueSendRequest(thisSender); @@ -1688,9 +1699,12 @@ void ServerChannelPutGetRequesterImpl::getPutDone(const Status& status, ChannelP { Lock guard(_mutex); _status = status; - _pvPutStructure = pvStructure; - _pvPutBitSet = bitSet; - } + if (_status.isSuccess()) + { + *_pvPutBitSet = *bitSet; + SerializationHelper::partialCopy(pvStructure, _pvPutStructure, _pvPutBitSet); + } + } TransportSender::shared_pointer thisSender = shared_from_this(); _transport->enqueueSendRequest(thisSender); } @@ -1701,9 +1715,12 @@ void ServerChannelPutGetRequesterImpl::putGetDone(const Status& status, ChannelP { Lock guard(_mutex); _status = status; - _pvGetStructure = pvStructure; - _pvGetBitSet = bitSet; - } + if (_status.isSuccess()) + { + *_pvGetBitSet = *bitSet; + SerializationHelper::partialCopy(pvStructure, _pvGetStructure, _pvGetBitSet); + } + } TransportSender::shared_pointer thisSender = shared_from_this(); _transport->enqueueSendRequest(thisSender); } @@ -1752,13 +1769,13 @@ ChannelPutGet::shared_pointer ServerChannelPutGetRequesterImpl::getChannelPutGet PVStructure::shared_pointer ServerChannelPutGetRequesterImpl::getPutGetPVStructure() { //Lock guard(_mutex); - return _pvPutGetStructure; + return _pvPutStructure; } BitSet::shared_pointer ServerChannelPutGetRequesterImpl::getPutGetBitSet() { //Lock guard(_mutex); - return _pvPutGetBitSet; + return _pvPutBitSet; } void ServerChannelPutGetRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* control) @@ -1787,8 +1804,8 @@ void ServerChannelPutGetRequesterImpl::send(ByteBuffer* buffer, TransportSendCon if ((QOS_INIT & request) != 0) { Lock guard(_mutex); - control->cachedSerialize(_putStructure, buffer); - control->cachedSerialize(_getStructure, buffer); + control->cachedSerialize(_pvPutStructure->getStructure(), buffer); + control->cachedSerialize(_pvGetStructure->getStructure(), buffer); } else if ((QOS_GET & request) != 0) { @@ -2257,20 +2274,18 @@ void ServerChannelArrayRequesterImpl::channelArrayConnect(const Status& status, Lock guard(_mutex); _status = Status(Status::STATUSTYPE_ERROR, "fixed sized array returned as a ChannelArray array instance"); _channelArray.reset(); - _array.reset(); + _pvArray.reset(); } else { Lock guard(_mutex); _status = status; _channelArray = channelArray; - _array = array; - } - - if (status.isSuccess()) - { - _pvPutArray = std::tr1::static_pointer_cast(reuseOrCreatePVField(_array, _pvPutArray)); - } + if (_status.isSuccess()) + { + _pvArray = std::tr1::static_pointer_cast(reuseOrCreatePVField(array, _pvArray)); + } + } TransportSender::shared_pointer thisSender = shared_from_this(); _transport->enqueueSendRequest(thisSender); @@ -2287,7 +2302,11 @@ void ServerChannelArrayRequesterImpl::getArrayDone(const Status& status, Channel { Lock guard(_mutex); _status = status; - _pvArray = pvArray; + if (_status.isSuccess()) + { + // TODO cache convert + getConvert()->copy(pvArray, _pvArray); + } } TransportSender::shared_pointer thisSender = shared_from_this(); _transport->enqueueSendRequest(thisSender); @@ -2369,7 +2388,7 @@ ChannelArray::shared_pointer ServerChannelArrayRequesterImpl::getChannelArray() PVArray::shared_pointer ServerChannelArrayRequesterImpl::getPVArray() { //Lock guard(_mutex); - return _pvPutArray; + return _pvArray; } void ServerChannelArrayRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* control) @@ -2400,7 +2419,6 @@ void ServerChannelArrayRequesterImpl::send(ByteBuffer* buffer, TransportSendCont //Lock guard(_mutex); ScopedLock lock(channelArray); _pvArray->serialize(buffer, control, 0, _pvArray->getLength()); - _pvArray.reset(); } else if ((QOS_PROCESS & request) != 0) { @@ -2410,7 +2428,7 @@ void ServerChannelArrayRequesterImpl::send(ByteBuffer* buffer, TransportSendCont else if ((QOS_INIT & request) != 0) { Lock guard(_mutex); - control->cachedSerialize(_array, buffer); + control->cachedSerialize(_pvArray->getArray(), buffer); } } diff --git a/src/server/responseHandlers.h b/src/server/responseHandlers.h index 80811a8..9309670 100644 --- a/src/server/responseHandlers.h +++ b/src/server/responseHandlers.h @@ -361,8 +361,7 @@ namespace pvAccess { void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control); private: ChannelGet::shared_pointer _channelGet; - epics::pvData::Structure::const_shared_pointer _structure; - epics::pvData::PVStructure::shared_pointer _pvStructure; + epics::pvData::PVStructure::shared_pointer _pvStructure; epics::pvData::BitSet::shared_pointer _bitSet; epics::pvData::Status _status; }; @@ -418,17 +417,8 @@ namespace pvAccess { void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control); private: ChannelPut::shared_pointer _channelPut; - - epics::pvData::Structure::const_shared_pointer _structure; - - // reference store (for get) epics::pvData::BitSet::shared_pointer _bitSet; epics::pvData::PVStructure::shared_pointer _pvStructure; - - // data store (for put) - epics::pvData::BitSet::shared_pointer _putBitSet; - epics::pvData::PVStructure::shared_pointer _putPVStructure; - epics::pvData::Status _status; }; @@ -493,20 +483,11 @@ namespace pvAccess { void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control); private: ChannelPutGet::shared_pointer _channelPutGet; - epics::pvData::Structure::const_shared_pointer _putStructure; - epics::pvData::Structure::const_shared_pointer _getStructure; - - // reference store - epics::pvData::PVStructure::shared_pointer _pvPutStructure; - epics::pvData::BitSet::shared_pointer _pvPutBitSet; - epics::pvData::PVStructure::shared_pointer _pvGetStructure; - epics::pvData::BitSet::shared_pointer _pvGetBitSet; - - // data container (for put-get) - epics::pvData::PVStructure::shared_pointer _pvPutGetStructure; - epics::pvData::BitSet::shared_pointer _pvPutGetBitSet; - - epics::pvData::Status _status; + epics::pvData::PVStructure::shared_pointer _pvPutStructure; + epics::pvData::BitSet::shared_pointer _pvPutBitSet; + epics::pvData::PVStructure::shared_pointer _pvGetStructure; + epics::pvData::BitSet::shared_pointer _pvGetBitSet; + epics::pvData::Status _status; }; @@ -619,14 +600,8 @@ namespace pvAccess { private: ChannelArray::shared_pointer _channelArray; - epics::pvData::Array::const_shared_pointer _array; - - // reference store epics::pvData::PVArray::shared_pointer _pvArray; - // data container - epics::pvData::PVArray::shared_pointer _pvPutArray; - std::size_t _length; epics::pvData::Status _status; };