sync of ChannelRequest-s data

This commit is contained in:
Matej Sekoranja
2011-10-05 10:30:28 +02:00
parent dc41d7569d
commit 019e768e84
2 changed files with 143 additions and 69 deletions
+87 -37
View File
@@ -548,9 +548,12 @@ namespace epics {
}
// create data and its bitSet
m_data.reset(transport->getIntrospectionRegistry()->deserializeStructureAndCreatePVStructure(payloadBuffer, transport.get()));
m_bitSet.reset(new BitSet(m_data->getNumberFields()));
{
Lock lock(m_dataMutex);
m_data.reset(transport->getIntrospectionRegistry()->deserializeStructureAndCreatePVStructure(payloadBuffer, transport.get()));
m_bitSet.reset(new BitSet(m_data->getNumberFields()));
}
// notify
ChannelGet::shared_pointer thisChannelGet = dynamic_pointer_cast<ChannelGet>(shared_from_this());
EXCEPTION_GUARD(m_channelGetRequester->channelGetConnect(status, thisChannelGet, m_data, m_bitSet));
@@ -565,9 +568,12 @@ namespace epics {
}
// deserialize bitSet and data
m_bitSet->deserialize(payloadBuffer, transport.get());
m_data->deserialize(payloadBuffer, transport.get(), m_bitSet.get());
{
Lock lock(m_dataMutex);
m_bitSet->deserialize(payloadBuffer, transport.get());
m_data->deserialize(payloadBuffer, transport.get(), m_bitSet.get());
}
EXCEPTION_GUARD(m_channelGetRequester->getDone(status));
return true;
}
@@ -715,8 +721,12 @@ namespace epics {
{
// put
// serialize only what has been changed
m_bitSet->serialize(buffer, control);
m_data->serialize(buffer, control, m_bitSet.get());
{
// no need to lock here, since it is already locked via TransportSender IF
//Lock lock(m_dataMutex);
m_bitSet->serialize(buffer, control);
m_data->serialize(buffer, control, m_bitSet.get());
}
}
stopRequest();
@@ -738,9 +748,12 @@ namespace epics {
}
// create data and its bitSet
m_data.reset(transport->getIntrospectionRegistry()->deserializeStructureAndCreatePVStructure(payloadBuffer, transport.get()));
m_bitSet.reset(new BitSet(m_data->getNumberFields()));
{
Lock lock(m_dataMutex);
m_data.reset(transport->getIntrospectionRegistry()->deserializeStructureAndCreatePVStructure(payloadBuffer, transport.get()));
m_bitSet.reset(new BitSet(m_data->getNumberFields()));
}
// notify
ChannelPut::shared_pointer thisChannelPut = dynamic_pointer_cast<ChannelPut>(shared_from_this());
EXCEPTION_GUARD(m_channelPutRequester->channelPutConnect(status, thisChannelPut, m_data, m_bitSet));
@@ -756,8 +769,11 @@ namespace epics {
return true;
}
m_data->deserialize(payloadBuffer, transport.get());
{
Lock lock(m_dataMutex);
m_data->deserialize(payloadBuffer, transport.get());
}
EXCEPTION_GUARD(m_channelPutRequester->getDone(status));
return true;
}
@@ -936,7 +952,11 @@ namespace epics {
}
else
{
m_putData->serialize(buffer, control);
{
// no need to lock here, since it is already locked via TransportSender IF
//Lock lock(m_dataMutex);
m_putData->serialize(buffer, control);
}
}
stopRequest();
@@ -958,9 +978,13 @@ namespace epics {
}
IntrospectionRegistry* registry = transport->getIntrospectionRegistry();
m_putData.reset(registry->deserializeStructureAndCreatePVStructure(payloadBuffer, transport.get()));
m_getData.reset(registry->deserializeStructureAndCreatePVStructure(payloadBuffer, transport.get()));
{
Lock lock(m_dataMutex);
m_putData.reset(registry->deserializeStructureAndCreatePVStructure(payloadBuffer, transport.get()));
m_getData.reset(registry->deserializeStructureAndCreatePVStructure(payloadBuffer, transport.get()));
}
// notify
ChannelPutGet::shared_pointer thisChannelPutGet = dynamic_pointer_cast<ChannelPutGet>(shared_from_this());
EXCEPTION_GUARD(m_channelPutGetRequester->channelPutGetConnect(status, thisChannelPutGet, m_putData, m_getData));
@@ -977,9 +1001,12 @@ namespace epics {
return true;
}
// deserialize get data
m_getData->deserialize(payloadBuffer, transport.get());
{
Lock lock(m_dataMutex);
// deserialize get data
m_getData->deserialize(payloadBuffer, transport.get());
}
EXCEPTION_GUARD(m_channelPutGetRequester->getGetDone(status));
return true;
}
@@ -991,9 +1018,12 @@ namespace epics {
return true;
}
// deserialize put data
m_putData->deserialize(payloadBuffer, transport.get());
{
Lock lock(m_dataMutex);
// deserialize put data
m_putData->deserialize(payloadBuffer, transport.get());
}
EXCEPTION_GUARD(m_channelPutGetRequester->getPutDone(status));
return true;
}
@@ -1005,9 +1035,12 @@ namespace epics {
return true;
}
// deserialize data
m_getData->deserialize(payloadBuffer, transport.get());
{
Lock lock(m_dataMutex);
// deserialize data
m_getData->deserialize(payloadBuffer, transport.get());
}
EXCEPTION_GUARD(m_channelPutGetRequester->putGetDone(status));
return true;
}
@@ -1209,8 +1242,12 @@ namespace epics {
}
else
{
m_bitSet->serialize(buffer, control);
m_data->serialize(buffer, control, m_bitSet.get());
{
// no need to lock here, since it is already locked via TransportSender IF
//Lock lock(m_dataMutex);
m_bitSet->serialize(buffer, control);
m_data->serialize(buffer, control, m_bitSet.get());
}
}
stopRequest();
@@ -1233,9 +1270,12 @@ namespace epics {
}
// create data and its bitSet
m_data.reset(transport->getIntrospectionRegistry()->deserializeStructureAndCreatePVStructure(payloadBuffer, transport.get()));
m_bitSet.reset(new BitSet(m_data->getNumberFields()));
{
Lock lock(m_dataMutex);
m_data.reset(transport->getIntrospectionRegistry()->deserializeStructureAndCreatePVStructure(payloadBuffer, transport.get()));
m_bitSet.reset(new BitSet(m_data->getNumberFields()));
}
// notify
ChannelRPC::shared_pointer thisChannelRPC = dynamic_pointer_cast<ChannelRPC>(shared_from_this());
EXCEPTION_GUARD(m_channelRPCRequester->channelRPCConnect(status, thisChannelRPC, m_data, m_bitSet));
@@ -1412,8 +1452,12 @@ namespace epics {
// put
else
{
SerializeHelper::writeSize(m_offset, buffer, control);
m_data->serialize(buffer, control, 0, m_count); // put from 0 offset; TODO count out-of-bounds check?!
{
// no need to lock here, since it is already locked via TransportSender IF
//Lock lock(m_dataMutex);
SerializeHelper::writeSize(m_offset, buffer, control);
m_data->serialize(buffer, control, 0, m_count); // put from 0 offset; TODO count out-of-bounds check?!
}
}
stopRequest();
@@ -1437,8 +1481,11 @@ namespace epics {
// create data and its bitSet
FieldConstPtr field = transport->getIntrospectionRegistry()->deserialize(payloadBuffer, transport.get());
m_data.reset(dynamic_cast<PVArray*>(getPVDataCreate()->createPVField(0, field)));
{
Lock lock(m_dataMutex);
m_data.reset(dynamic_cast<PVArray*>(getPVDataCreate()->createPVField(0, field)));
}
// notify
ChannelArray::shared_pointer thisChannelArray = dynamic_pointer_cast<ChannelArray>(shared_from_this());
EXCEPTION_GUARD(m_channelArrayRequester->channelArrayConnect(status, thisChannelArray, m_data));
@@ -1454,8 +1501,11 @@ namespace epics {
return true;
}
m_data->deserialize(payloadBuffer, transport.get());
{
Lock lock(m_dataMutex);
m_data->deserialize(payloadBuffer, transport.get());
}
EXCEPTION_GUARD(m_channelArrayRequester->getArrayDone(status));
return true;
}
+56 -32
View File
@@ -689,6 +689,9 @@ void ServerChannelGetRequesterImpl::send(ByteBuffer* buffer, TransportSendContro
}
else
{
// we locked _mutex above, so _channelGet is valid
ScopedLock lock(_channelGet);
_bitSet->serialize(buffer, control);
_pvStructure->serialize(buffer, control, _bitSet.get());
}
@@ -762,10 +765,14 @@ void ServerPutHandler::handleResponse(osiSockAddr* responseFrom,
else
{
// deserialize bitSet and do a put
BitSet::shared_pointer putBitSet = request->getBitSet();
putBitSet->deserialize(payloadBuffer, transport.get());
request->getPVStructure()->deserialize(payloadBuffer, transport.get(), putBitSet.get());
request->getChannelPut()->put(lastRequest);
ChannelPut::shared_pointer channelPut = request->getChannelPut();
{
ScopedLock lock(channelPut); // TODO not needed if put is processed by the same thread
BitSet::shared_pointer putBitSet = request->getBitSet();
putBitSet->deserialize(payloadBuffer, transport.get());
request->getPVStructure()->deserialize(payloadBuffer, transport.get(), putBitSet.get());
}
channelPut->put(lastRequest);
}
}
}
@@ -897,7 +904,7 @@ void ServerChannelPutRequesterImpl::send(ByteBuffer* buffer, TransportSendContro
}
else if ((QOS_GET & request) != 0)
{
Lock guard(_mutex);
ScopedLock lock(_channelPut); // _channelPut is valid because we required _mutex above
_pvStructure->serialize(buffer, control);
}
}
@@ -973,8 +980,12 @@ void ServerPutGetHandler::handleResponse(osiSockAddr* responseFrom,
else
{
// deserialize bitSet and do a put
request->getPVPutStructure()->deserialize(payloadBuffer, transport.get());
request->getChannelPutGet()->putGet(lastRequest);
ChannelPutGet::shared_pointer channelPutGet = request->getChannelPutGet();
{
ScopedLock lock(channelPutGet); // TODO not necessary if read is done in putGet
request->getPVPutStructure()->deserialize(payloadBuffer, transport.get());
}
channelPutGet->putGet(lastRequest);
}
}
}
@@ -1117,12 +1128,14 @@ void ServerChannelPutGetRequesterImpl::send(ByteBuffer* buffer, TransportSendCon
}
else if ((QOS_GET_PUT & request) != 0)
{
Lock guard(_mutex);
ScopedLock lock(_channelPutGet); // valid due to _mutex lock above
//Lock guard(_mutex);
_pvPutStructure->serialize(buffer, control);
}
else
{
Lock guard(_mutex);
ScopedLock lock(_channelPutGet); // valid due to _mutex lock above
//Lock guard(_mutex);
_pvGetStructure->serialize(buffer, control);
}
}
@@ -1320,6 +1333,7 @@ void ServerMonitorRequesterImpl::send(ByteBuffer* buffer, TransportSendControl*
if (_status.isSuccess())
{
// valid due to _mutex lock above
introspectionRegistry->serialize(_structure, buffer, control);
}
stopRequest();
@@ -1419,10 +1433,15 @@ void ServerArrayHandler::handleResponse(osiSockAddr* responseFrom,
else
{
// deserialize data to put
const int32 offset = SerializeHelper::readSize(payloadBuffer, transport.get());
PVArray::shared_pointer array = request->getPVArray();
array->deserialize(payloadBuffer, transport.get());
request->getChannelArray()->putArray(lastRequest, offset, array->getLength());
int32 offset;
ChannelArray::shared_pointer channelArray = request->getChannelArray();
PVArray::shared_pointer array = request->getPVArray();
{
ScopedLock lock(channelArray); // TODO not needed if read by the same thread
offset = SerializeHelper::readSize(payloadBuffer, transport.get());
array->deserialize(payloadBuffer, transport.get());
}
channelArray->putArray(lastRequest, offset, array->getLength());
}
}
}
@@ -1553,7 +1572,8 @@ void ServerChannelArrayRequesterImpl::send(ByteBuffer* buffer, TransportSendCont
{
if ((QOS_GET & request) != 0)
{
Lock guard(_mutex);
//Lock guard(_mutex);
ScopedLock lock(_channelArray); // valid due to _mutex lock above
_pvArray->serialize(buffer, control, 0, _pvArray->getLength());
}
else if ((QOS_INIT & request) != 0)
@@ -1899,10 +1919,14 @@ void ServerRPCHandler::handleResponse(osiSockAddr* responseFrom,
}
// deserialize put data
BitSet::shared_pointer changedBitSet = request->getAgrumentsBitSet();
changedBitSet->deserialize(payloadBuffer, transport.get());
request->getPvArguments()->deserialize(payloadBuffer, transport.get(), changedBitSet.get());
request->getChannelRPC()->request(lastRequest);
ChannelRPC::shared_pointer channelRPC = request->getChannelRPC();
{
ScopedLock lock(channelRPC); // TODO not really needed if channelRPC->request() is reads from the same thread
BitSet::shared_pointer changedBitSet = request->getAgrumentsBitSet();
changedBitSet->deserialize(payloadBuffer, transport.get());
request->getPvArguments()->deserialize(payloadBuffer, transport.get(), changedBitSet.get());
}
channelRPC->request(lastRequest);
}
}
@@ -2013,24 +2037,24 @@ void ServerChannelRPCRequesterImpl::send(ByteBuffer* buffer, TransportSendContro
buffer->putInt(_ioid);
buffer->putByte((int8)request);
IntrospectionRegistry* introspectionRegistry = _transport->getIntrospectionRegistry();
{
Lock guard(_mutex);
Lock guard(_mutex);
introspectionRegistry->serializeStatus(buffer, control, _status);
}
if (_status.isSuccess())
{
if ((QOS_INIT & request) != 0)
{
Lock guard(_mutex);
introspectionRegistry->serialize(_pvArguments != NULL ? _pvArguments->getField() : FieldConstPtr(), buffer, control);
}
else
{
introspectionRegistry->serializeStructure(buffer, control, _pvResponse.get());
}
if (_status.isSuccess())
{
if ((QOS_INIT & request) != 0)
{
introspectionRegistry->serialize(_pvArguments != NULL ? _pvArguments->getField() : FieldConstPtr(), buffer, control);
}
else
{
introspectionRegistry->serializeStructure(buffer, control, _pvResponse.get());
}
}
}
stopRequest();
// lastRequest