channel get/put/putget/array local storage instead of references

This commit is contained in:
Matej Sekoranja
2015-02-12 13:48:29 +01:00
parent 4e474a75fd
commit a21908de89
5 changed files with 151 additions and 173 deletions
+49
View File
@@ -87,6 +87,55 @@ void SerializationHelper::serializeFull(ByteBuffer* buffer, SerializableControl*
}
}
void SerializationHelper::partialCopy(PVStructure::shared_pointer const & from,
PVStructure::shared_pointer const & to,
BitSet::shared_pointer const & maskBitSet,
bool inverse) {
size_t numberFields = from->getNumberFields();
size_t offset = from->getFieldOffset();
int32 next = inverse ?
maskBitSet->nextClearBit(static_cast<uint32>(offset)) :
maskBitSet->nextSetBit(static_cast<uint32>(offset));
// no more changes or no changes in this structure
if(next<0||next>=static_cast<int32>(offset+numberFields)) return;
// entire structure
if(static_cast<int32>(offset)==next) {
getConvert()->copy(from, to);
return;
}
PVFieldPtrArray const & fromPVFields = from->getPVFields();
PVFieldPtrArray const & toPVFields = to->getPVFields();
size_t fieldsSize = fromPVFields.size();
for(size_t i = 0; i<fieldsSize; i++) {
PVFieldPtr pvField = fromPVFields[i];
offset = pvField->getFieldOffset();
int32 inumberFields = static_cast<int32>(pvField->getNumberFields());
next = inverse ?
maskBitSet->nextClearBit(static_cast<uint32>(offset)) :
maskBitSet->nextSetBit(static_cast<uint32>(offset));
// no more changes
if(next<0) return;
// no change in this pvField
if(next>=static_cast<int32>(offset+inumberFields)) continue;
// serialize field or fields
if(inumberFields==1) {
getConvert()->copy(pvField, toPVFields[i]);
} else {
PVStructure::shared_pointer fromPVStructure = std::tr1::static_pointer_cast<PVStructure>(pvField);
PVStructure::shared_pointer toPVStructure = std::tr1::static_pointer_cast<PVStructure>(toPVFields[i]);
partialCopy(fromPVStructure, toPVStructure, maskBitSet);
}
}
}
}}
+8
View File
@@ -98,6 +98,14 @@ namespace epics {
* @param buffer data buffer.
*/
static void serializeFull(epics::pvData::ByteBuffer* buffer, epics::pvData::SerializableControl* control, epics::pvData::PVField::shared_pointer const & pvField);
// TODO move somewhere else, to pvData?
static void partialCopy(
epics::pvData::PVStructure::shared_pointer const & from,
epics::pvData::PVStructure::shared_pointer const & to,
epics::pvData::BitSet::shared_pointer const & maskBitSet,
bool inverse = false);
};
}
+14 -86
View File
@@ -786,14 +786,9 @@ namespace epics {
PVStructure::shared_pointer m_pvRequest;
// get structure container
PVStructure::shared_pointer m_structure;
BitSet::shared_pointer m_bitSet;
// put reference store
PVStructure::shared_pointer m_pvPutStructure;
BitSet::shared_pointer m_pvPutBitSet;
Mutex m_structureMutex;
ChannelPutImpl(ChannelImpl::shared_pointer const & channel, ChannelPutRequester::shared_pointer const & channelPutRequester, PVStructure::shared_pointer const & pvRequest) :
@@ -869,12 +864,8 @@ namespace epics {
{
// no need to lock here, since it is already locked via TransportSender IF
//Lock lock(m_structureMutex);
m_pvPutBitSet->serialize(buffer, control);
m_pvPutStructure->serialize(buffer, control, m_pvPutBitSet.get());
// release references
m_pvPutBitSet.reset();
m_pvPutStructure.reset();
m_bitSet->serialize(buffer, control);
m_structure->serialize(buffer, control, m_bitSet.get());
}
}
@@ -992,8 +983,8 @@ namespace epics {
try {
lock();
m_pvPutStructure = pvPutStructure;
m_pvPutBitSet = pvPutBitSet;
*m_bitSet = *pvPutBitSet;
SerializationHelper::partialCopy(pvPutStructure, m_structure, m_bitSet);
unlock();
m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this());
} catch (std::runtime_error &rte) {
@@ -1059,10 +1050,6 @@ namespace epics {
PVStructure::shared_pointer m_getData;
BitSet::shared_pointer m_getDataBitSet;
// putGet reference store
PVStructure::shared_pointer m_putPutData;
BitSet::shared_pointer m_putPutDataBitSet;
Mutex m_structureMutex;
ChannelPutGetImpl(ChannelImpl::shared_pointer const & channel, ChannelPutGetRequester::shared_pointer const & channelPutGetRequester, PVStructure::shared_pointer const & pvRequest) :
@@ -1139,12 +1126,8 @@ namespace epics {
{
// no need to lock here, since it is already locked via TransportSender IF
//Lock lock(m_structureMutex);
m_putPutDataBitSet->serialize(buffer, control);
m_putPutData->serialize(buffer, control, m_putPutDataBitSet.get());
// release references
m_putPutDataBitSet.reset();
m_putPutData.reset();
m_putDataBitSet->serialize(buffer, control);
m_putData->serialize(buffer, control, m_putDataBitSet.get());
}
}
@@ -1266,8 +1249,8 @@ namespace epics {
try {
lock();
m_putPutData = pvPutStructure;
m_putPutDataBitSet = bitSet;
*m_putDataBitSet = *bitSet;
SerializationHelper::partialCopy(pvPutStructure, m_putData, m_putDataBitSet);
unlock();
m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this());
} catch (std::runtime_error &rte) {
@@ -1582,12 +1565,9 @@ namespace epics {
PVStructure::shared_pointer m_pvRequest;
// data container (for get)
// data container
PVArray::shared_pointer m_arrayData;
// reference store (for put
PVArray::shared_pointer m_putData;
size_t m_offset;
size_t m_count;
size_t m_stride;
@@ -1687,9 +1667,7 @@ namespace epics {
SerializeHelper::writeSize(m_offset, buffer, control);
SerializeHelper::writeSize(m_stride, buffer, control);
// TODO what about count sanity check?
m_putData->serialize(buffer, control, 0, m_count ? m_count : m_putData->getLength()); // put from 0 offset (see API doc), m_count == 0 means entire array
// release reference
m_putData.reset();
m_arrayData->serialize(buffer, control, 0, m_count ? m_count : m_arrayData->getLength()); // put from 0 offset (see API doc), m_count == 0 means entire array
}
}
@@ -1821,7 +1799,7 @@ namespace epics {
try {
{
Lock lock(m_structureMutex);
m_putData = putArray;
convert->copy(putArray, m_arrayData); // TODO avoid isComptabile checks
m_offset = offset;
m_count = count;
m_stride = stride;
@@ -2159,56 +2137,6 @@ namespace epics {
}
}
void partialCopy(PVStructure::shared_pointer const & from,
PVStructure::shared_pointer const & to,
BitSet::shared_pointer const & maskBitSet,
bool inverse = false) {
size_t numberFields = from->getNumberFields();
size_t offset = from->getFieldOffset();
int32 next = inverse ?
maskBitSet->nextClearBit(static_cast<uint32>(offset)) :
maskBitSet->nextSetBit(static_cast<uint32>(offset));
// no more changes or no changes in this structure
if(next<0||next>=static_cast<int32>(offset+numberFields)) return;
// entire structure
if(static_cast<int32>(offset)==next) {
getConvert()->copy(from, to);
return;
}
PVFieldPtrArray const & fromPVFields = from->getPVFields();
PVFieldPtrArray const & toPVFields = to->getPVFields();
size_t fieldsSize = fromPVFields.size();
for(size_t i = 0; i<fieldsSize; i++) {
PVFieldPtr pvField = fromPVFields[i];
offset = pvField->getFieldOffset();
int32 inumberFields = static_cast<int32>(pvField->getNumberFields());
next = inverse ?
maskBitSet->nextClearBit(static_cast<uint32>(offset)) :
maskBitSet->nextSetBit(static_cast<uint32>(offset));
// no more changes
if(next<0) return;
// no change in this pvField
if(next>=static_cast<int32>(offset+inumberFields)) continue;
// serialize field or fields
if(inumberFields==1) {
getConvert()->copy(pvField, toPVFields[i]);
} else {
PVStructure::shared_pointer fromPVStructure = std::tr1::static_pointer_cast<PVStructure>(pvField);
PVStructure::shared_pointer toPVStructure = std::tr1::static_pointer_cast<PVStructure>(toPVFields[i]);
partialCopy(fromPVStructure, toPVStructure, maskBitSet);
}
}
}
/*
virtual void response(Transport::shared_pointer const & transport, ByteBuffer* payloadBuffer) {
@@ -2225,7 +2153,7 @@ namespace epics {
{
// take new, put current in use
PVStructurePtr pvStructure = m_monitorElement->pvStructurePtr;
getConvert()->copy(pvStructure, newElement->pvStructurePtr);
convert->copy(pvStructure, newElement->pvStructurePtr);
BitSetUtil::compress(m_monitorElement->changedBitSet, pvStructure);
BitSetUtil::compress(m_monitorElement->overrunBitSet, pvStructure);
@@ -2297,7 +2225,7 @@ namespace epics {
m_overrunInProgress = false;
}
getConvert()->copy(pvStructure, newElement->pvStructurePtr);
convert->copy(pvStructure, newElement->pvStructurePtr);
m_monitorQueue.setUsed(m_monitorElement);
@@ -2363,7 +2291,7 @@ namespace epics {
// deserialize changedBitSet and data, and overrun bit set
changedBitSet->deserialize(payloadBuffer, transport.get());
if (m_up2datePVStructure && m_up2datePVStructure.get() != pvStructure.get())
partialCopy(m_up2datePVStructure, pvStructure, changedBitSet, true);
SerializationHelper::partialCopy(m_up2datePVStructure, pvStructure, changedBitSet, true);
pvStructure->deserialize(payloadBuffer, transport.get(), changedBitSet.get());
overrunBitSet->deserialize(payloadBuffer, transport.get());
+74 -56
View File
@@ -1088,8 +1088,14 @@ void ServerChannelGetRequesterImpl::channelGetConnect(const Status& status, Chan
Lock guard(_mutex);
_status = status;
_channelGet = channelGet;
_structure = structure;
}
if (_status.isSuccess())
{
_pvStructure = std::tr1::static_pointer_cast<PVStructure>(reuseOrCreatePVField(structure, _pvStructure));
_bitSet = createBitSetFor(_pvStructure, _bitSet);
}
}
TransportSender::shared_pointer thisSender = shared_from_this();
_transport->enqueueSendRequest(thisSender);
@@ -1107,10 +1113,14 @@ void ServerChannelGetRequesterImpl::getDone(const Status& status, ChannelGet::sh
{
Lock guard(_mutex);
_status = status;
_pvStructure = pvStructure;
_bitSet = bitSet;
if (_status.isSuccess())
{
*_bitSet = *bitSet;
SerializationHelper::partialCopy(pvStructure, _pvStructure, _bitSet);
}
}
TransportSender::shared_pointer thisSender = shared_from_this();
TransportSender::shared_pointer thisSender = shared_from_this();
_transport->enqueueSendRequest(thisSender);
}
@@ -1188,7 +1198,7 @@ void ServerChannelGetRequesterImpl::send(ByteBuffer* buffer, TransportSendContro
if (request & QOS_INIT)
{
Lock guard(_mutex);
control->cachedSerialize(_structure, buffer);
control->cachedSerialize(_pvStructure->getStructure(), buffer);
}
else
{
@@ -1198,9 +1208,6 @@ void ServerChannelGetRequesterImpl::send(ByteBuffer* buffer, TransportSendContro
_bitSet->serialize(buffer, control);
_pvStructure->serialize(buffer, control, _bitSet.get());
_pvStructure.reset();
_bitSet.reset();
}
MB_POINT(channelGet, 7, "server channelGet->serialize response (end)");
}
@@ -1356,14 +1363,12 @@ void ServerChannelPutRequesterImpl::channelPutConnect(const Status& status, Chan
Lock guard(_mutex);
_status = status;
_channelPut = channelPut;
_structure = structure;
}
if (status.isSuccess())
{
_putPVStructure = std::tr1::static_pointer_cast<PVStructure>(reuseOrCreatePVField(_structure, _putPVStructure));
_putBitSet = createBitSetFor(_putPVStructure, _putBitSet);
}
if (_status.isSuccess())
{
_pvStructure = std::tr1::static_pointer_cast<PVStructure>(reuseOrCreatePVField(structure, _pvStructure));
_bitSet = createBitSetFor(_pvStructure, _bitSet);
}
}
TransportSender::shared_pointer thisSender = shared_from_this();
_transport->enqueueSendRequest(thisSender);
@@ -1390,9 +1395,12 @@ void ServerChannelPutRequesterImpl::getDone(const Status& status, ChannelPut::sh
{
Lock guard(_mutex);
_status = status;
_pvStructure = pvStructure;
_bitSet = bitSet;
}
if (_status.isSuccess())
{
*_bitSet = *bitSet;
SerializationHelper::partialCopy(pvStructure, _pvStructure, _bitSet);
}
}
TransportSender::shared_pointer thisSender = shared_from_this();
_transport->enqueueSendRequest(thisSender);
}
@@ -1441,13 +1449,13 @@ ChannelPut::shared_pointer ServerChannelPutRequesterImpl::getChannelPut()
BitSet::shared_pointer ServerChannelPutRequesterImpl::getPutBitSet()
{
//Lock guard(_mutex);
return _putBitSet;
return _bitSet;
}
PVStructure::shared_pointer ServerChannelPutRequesterImpl::getPutPVStructure()
{
//Lock guard(_mutex);
return _putPVStructure;
return _pvStructure;
}
void ServerChannelPutRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* control)
@@ -1476,7 +1484,7 @@ void ServerChannelPutRequesterImpl::send(ByteBuffer* buffer, TransportSendContro
if ((QOS_INIT & request) != 0)
{
Lock guard(_mutex);
control->cachedSerialize(_structure, buffer);
control->cachedSerialize(_pvStructure->getStructure(), buffer);
}
else if ((QOS_GET & request) != 0)
{
@@ -1649,15 +1657,15 @@ void ServerChannelPutGetRequesterImpl::channelPutGetConnect(const Status& status
Lock guard(_mutex);
_status = status;
_channelPutGet = channelPutGet;
_putStructure = putStructure;
_getStructure = getStructure;
}
if (status.isSuccess())
{
_pvPutGetStructure = std::tr1::static_pointer_cast<PVStructure>(reuseOrCreatePVField(_putStructure, _pvPutGetStructure));
_pvPutGetBitSet = createBitSetFor(_pvPutGetStructure, _pvPutGetBitSet);
}
if (_status.isSuccess())
{
_pvPutStructure = std::tr1::static_pointer_cast<PVStructure>(reuseOrCreatePVField(putStructure, _pvPutStructure));
_pvPutBitSet = createBitSetFor(_pvPutStructure, _pvPutBitSet);
_pvGetStructure = std::tr1::static_pointer_cast<PVStructure>(reuseOrCreatePVField(getStructure, _pvGetStructure));
_pvGetBitSet = createBitSetFor(_pvGetStructure, _pvGetBitSet);
}
}
TransportSender::shared_pointer thisSender = shared_from_this();
_transport->enqueueSendRequest(thisSender);
@@ -1675,8 +1683,11 @@ void ServerChannelPutGetRequesterImpl::getGetDone(const Status& status, ChannelP
{
Lock guard(_mutex);
_status = status;
_pvGetStructure = pvStructure;
_pvGetBitSet = bitSet;
if (_status.isSuccess())
{
*_pvGetBitSet = *bitSet;
SerializationHelper::partialCopy(pvStructure, _pvGetStructure, _pvGetBitSet);
}
}
TransportSender::shared_pointer thisSender = shared_from_this();
_transport->enqueueSendRequest(thisSender);
@@ -1688,9 +1699,12 @@ void ServerChannelPutGetRequesterImpl::getPutDone(const Status& status, ChannelP
{
Lock guard(_mutex);
_status = status;
_pvPutStructure = pvStructure;
_pvPutBitSet = bitSet;
}
if (_status.isSuccess())
{
*_pvPutBitSet = *bitSet;
SerializationHelper::partialCopy(pvStructure, _pvPutStructure, _pvPutBitSet);
}
}
TransportSender::shared_pointer thisSender = shared_from_this();
_transport->enqueueSendRequest(thisSender);
}
@@ -1701,9 +1715,12 @@ void ServerChannelPutGetRequesterImpl::putGetDone(const Status& status, ChannelP
{
Lock guard(_mutex);
_status = status;
_pvGetStructure = pvStructure;
_pvGetBitSet = bitSet;
}
if (_status.isSuccess())
{
*_pvGetBitSet = *bitSet;
SerializationHelper::partialCopy(pvStructure, _pvGetStructure, _pvGetBitSet);
}
}
TransportSender::shared_pointer thisSender = shared_from_this();
_transport->enqueueSendRequest(thisSender);
}
@@ -1752,13 +1769,13 @@ ChannelPutGet::shared_pointer ServerChannelPutGetRequesterImpl::getChannelPutGet
PVStructure::shared_pointer ServerChannelPutGetRequesterImpl::getPutGetPVStructure()
{
//Lock guard(_mutex);
return _pvPutGetStructure;
return _pvPutStructure;
}
BitSet::shared_pointer ServerChannelPutGetRequesterImpl::getPutGetBitSet()
{
//Lock guard(_mutex);
return _pvPutGetBitSet;
return _pvPutBitSet;
}
void ServerChannelPutGetRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* control)
@@ -1787,8 +1804,8 @@ void ServerChannelPutGetRequesterImpl::send(ByteBuffer* buffer, TransportSendCon
if ((QOS_INIT & request) != 0)
{
Lock guard(_mutex);
control->cachedSerialize(_putStructure, buffer);
control->cachedSerialize(_getStructure, buffer);
control->cachedSerialize(_pvPutStructure->getStructure(), buffer);
control->cachedSerialize(_pvGetStructure->getStructure(), buffer);
}
else if ((QOS_GET & request) != 0)
{
@@ -2257,20 +2274,18 @@ void ServerChannelArrayRequesterImpl::channelArrayConnect(const Status& status,
Lock guard(_mutex);
_status = Status(Status::STATUSTYPE_ERROR, "fixed sized array returned as a ChannelArray array instance");
_channelArray.reset();
_array.reset();
_pvArray.reset();
}
else
{
Lock guard(_mutex);
_status = status;
_channelArray = channelArray;
_array = array;
}
if (status.isSuccess())
{
_pvPutArray = std::tr1::static_pointer_cast<PVArray>(reuseOrCreatePVField(_array, _pvPutArray));
}
if (_status.isSuccess())
{
_pvArray = std::tr1::static_pointer_cast<PVArray>(reuseOrCreatePVField(array, _pvArray));
}
}
TransportSender::shared_pointer thisSender = shared_from_this();
_transport->enqueueSendRequest(thisSender);
@@ -2287,7 +2302,11 @@ void ServerChannelArrayRequesterImpl::getArrayDone(const Status& status, Channel
{
Lock guard(_mutex);
_status = status;
_pvArray = pvArray;
if (_status.isSuccess())
{
// TODO cache convert
getConvert()->copy(pvArray, _pvArray);
}
}
TransportSender::shared_pointer thisSender = shared_from_this();
_transport->enqueueSendRequest(thisSender);
@@ -2369,7 +2388,7 @@ ChannelArray::shared_pointer ServerChannelArrayRequesterImpl::getChannelArray()
PVArray::shared_pointer ServerChannelArrayRequesterImpl::getPVArray()
{
//Lock guard(_mutex);
return _pvPutArray;
return _pvArray;
}
void ServerChannelArrayRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* control)
@@ -2400,7 +2419,6 @@ void ServerChannelArrayRequesterImpl::send(ByteBuffer* buffer, TransportSendCont
//Lock guard(_mutex);
ScopedLock lock(channelArray);
_pvArray->serialize(buffer, control, 0, _pvArray->getLength());
_pvArray.reset();
}
else if ((QOS_PROCESS & request) != 0)
{
@@ -2410,7 +2428,7 @@ void ServerChannelArrayRequesterImpl::send(ByteBuffer* buffer, TransportSendCont
else if ((QOS_INIT & request) != 0)
{
Lock guard(_mutex);
control->cachedSerialize(_array, buffer);
control->cachedSerialize(_pvArray->getArray(), buffer);
}
}
+6 -31
View File
@@ -361,8 +361,7 @@ namespace pvAccess {
void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control);
private:
ChannelGet::shared_pointer _channelGet;
epics::pvData::Structure::const_shared_pointer _structure;
epics::pvData::PVStructure::shared_pointer _pvStructure;
epics::pvData::PVStructure::shared_pointer _pvStructure;
epics::pvData::BitSet::shared_pointer _bitSet;
epics::pvData::Status _status;
};
@@ -418,17 +417,8 @@ namespace pvAccess {
void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control);
private:
ChannelPut::shared_pointer _channelPut;
epics::pvData::Structure::const_shared_pointer _structure;
// reference store (for get)
epics::pvData::BitSet::shared_pointer _bitSet;
epics::pvData::PVStructure::shared_pointer _pvStructure;
// data store (for put)
epics::pvData::BitSet::shared_pointer _putBitSet;
epics::pvData::PVStructure::shared_pointer _putPVStructure;
epics::pvData::Status _status;
};
@@ -493,20 +483,11 @@ namespace pvAccess {
void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control);
private:
ChannelPutGet::shared_pointer _channelPutGet;
epics::pvData::Structure::const_shared_pointer _putStructure;
epics::pvData::Structure::const_shared_pointer _getStructure;
// reference store
epics::pvData::PVStructure::shared_pointer _pvPutStructure;
epics::pvData::BitSet::shared_pointer _pvPutBitSet;
epics::pvData::PVStructure::shared_pointer _pvGetStructure;
epics::pvData::BitSet::shared_pointer _pvGetBitSet;
// data container (for put-get)
epics::pvData::PVStructure::shared_pointer _pvPutGetStructure;
epics::pvData::BitSet::shared_pointer _pvPutGetBitSet;
epics::pvData::Status _status;
epics::pvData::PVStructure::shared_pointer _pvPutStructure;
epics::pvData::BitSet::shared_pointer _pvPutBitSet;
epics::pvData::PVStructure::shared_pointer _pvGetStructure;
epics::pvData::BitSet::shared_pointer _pvGetBitSet;
epics::pvData::Status _status;
};
@@ -619,14 +600,8 @@ namespace pvAccess {
private:
ChannelArray::shared_pointer _channelArray;
epics::pvData::Array::const_shared_pointer _array;
// reference store
epics::pvData::PVArray::shared_pointer _pvArray;
// data container
epics::pvData::PVArray::shared_pointer _pvPutArray;
std::size_t _length;
epics::pvData::Status _status;
};