From 019e768e846c74304adbaee332aa35a92f8769c3 Mon Sep 17 00:00:00 2001 From: Matej Sekoranja Date: Wed, 5 Oct 2011 10:30:28 +0200 Subject: [PATCH] sync of ChannelRequest-s data --- .../remoteClient/clientContextImpl.cpp | 124 ++++++++++++------ pvAccessApp/server/responseHandlers.cpp | 88 ++++++++----- 2 files changed, 143 insertions(+), 69 deletions(-) diff --git a/pvAccessApp/remoteClient/clientContextImpl.cpp b/pvAccessApp/remoteClient/clientContextImpl.cpp index 28b64b8..d022bb2 100644 --- a/pvAccessApp/remoteClient/clientContextImpl.cpp +++ b/pvAccessApp/remoteClient/clientContextImpl.cpp @@ -548,9 +548,12 @@ namespace epics { } // create data and its bitSet - m_data.reset(transport->getIntrospectionRegistry()->deserializeStructureAndCreatePVStructure(payloadBuffer, transport.get())); - m_bitSet.reset(new BitSet(m_data->getNumberFields())); - + { + Lock lock(m_dataMutex); + m_data.reset(transport->getIntrospectionRegistry()->deserializeStructureAndCreatePVStructure(payloadBuffer, transport.get())); + m_bitSet.reset(new BitSet(m_data->getNumberFields())); + } + // notify ChannelGet::shared_pointer thisChannelGet = dynamic_pointer_cast(shared_from_this()); EXCEPTION_GUARD(m_channelGetRequester->channelGetConnect(status, thisChannelGet, m_data, m_bitSet)); @@ -565,9 +568,12 @@ namespace epics { } // deserialize bitSet and data - m_bitSet->deserialize(payloadBuffer, transport.get()); - m_data->deserialize(payloadBuffer, transport.get(), m_bitSet.get()); - + { + Lock lock(m_dataMutex); + m_bitSet->deserialize(payloadBuffer, transport.get()); + m_data->deserialize(payloadBuffer, transport.get(), m_bitSet.get()); + } + EXCEPTION_GUARD(m_channelGetRequester->getDone(status)); return true; } @@ -715,8 +721,12 @@ namespace epics { { // put // serialize only what has been changed - m_bitSet->serialize(buffer, control); - m_data->serialize(buffer, control, m_bitSet.get()); + { + // no need to lock here, since it is already locked via TransportSender IF + //Lock lock(m_dataMutex); + m_bitSet->serialize(buffer, control); + m_data->serialize(buffer, control, m_bitSet.get()); + } } stopRequest(); @@ -738,9 +748,12 @@ namespace epics { } // create data and its bitSet - m_data.reset(transport->getIntrospectionRegistry()->deserializeStructureAndCreatePVStructure(payloadBuffer, transport.get())); - m_bitSet.reset(new BitSet(m_data->getNumberFields())); - + { + Lock lock(m_dataMutex); + m_data.reset(transport->getIntrospectionRegistry()->deserializeStructureAndCreatePVStructure(payloadBuffer, transport.get())); + m_bitSet.reset(new BitSet(m_data->getNumberFields())); + } + // notify ChannelPut::shared_pointer thisChannelPut = dynamic_pointer_cast(shared_from_this()); EXCEPTION_GUARD(m_channelPutRequester->channelPutConnect(status, thisChannelPut, m_data, m_bitSet)); @@ -756,8 +769,11 @@ namespace epics { return true; } - m_data->deserialize(payloadBuffer, transport.get()); - + { + Lock lock(m_dataMutex); + m_data->deserialize(payloadBuffer, transport.get()); + } + EXCEPTION_GUARD(m_channelPutRequester->getDone(status)); return true; } @@ -936,7 +952,11 @@ namespace epics { } else { - m_putData->serialize(buffer, control); + { + // no need to lock here, since it is already locked via TransportSender IF + //Lock lock(m_dataMutex); + m_putData->serialize(buffer, control); + } } stopRequest(); @@ -958,9 +978,13 @@ namespace epics { } IntrospectionRegistry* registry = transport->getIntrospectionRegistry(); - m_putData.reset(registry->deserializeStructureAndCreatePVStructure(payloadBuffer, transport.get())); - m_getData.reset(registry->deserializeStructureAndCreatePVStructure(payloadBuffer, transport.get())); - + + { + Lock lock(m_dataMutex); + m_putData.reset(registry->deserializeStructureAndCreatePVStructure(payloadBuffer, transport.get())); + m_getData.reset(registry->deserializeStructureAndCreatePVStructure(payloadBuffer, transport.get())); + } + // notify ChannelPutGet::shared_pointer thisChannelPutGet = dynamic_pointer_cast(shared_from_this()); EXCEPTION_GUARD(m_channelPutGetRequester->channelPutGetConnect(status, thisChannelPutGet, m_putData, m_getData)); @@ -977,9 +1001,12 @@ namespace epics { return true; } - // deserialize get data - m_getData->deserialize(payloadBuffer, transport.get()); - + { + Lock lock(m_dataMutex); + // deserialize get data + m_getData->deserialize(payloadBuffer, transport.get()); + } + EXCEPTION_GUARD(m_channelPutGetRequester->getGetDone(status)); return true; } @@ -991,9 +1018,12 @@ namespace epics { return true; } - // deserialize put data - m_putData->deserialize(payloadBuffer, transport.get()); - + { + Lock lock(m_dataMutex); + // deserialize put data + m_putData->deserialize(payloadBuffer, transport.get()); + } + EXCEPTION_GUARD(m_channelPutGetRequester->getPutDone(status)); return true; } @@ -1005,9 +1035,12 @@ namespace epics { return true; } - // deserialize data - m_getData->deserialize(payloadBuffer, transport.get()); - + { + Lock lock(m_dataMutex); + // deserialize data + m_getData->deserialize(payloadBuffer, transport.get()); + } + EXCEPTION_GUARD(m_channelPutGetRequester->putGetDone(status)); return true; } @@ -1209,8 +1242,12 @@ namespace epics { } else { - m_bitSet->serialize(buffer, control); - m_data->serialize(buffer, control, m_bitSet.get()); + { + // no need to lock here, since it is already locked via TransportSender IF + //Lock lock(m_dataMutex); + m_bitSet->serialize(buffer, control); + m_data->serialize(buffer, control, m_bitSet.get()); + } } stopRequest(); @@ -1233,9 +1270,12 @@ namespace epics { } // create data and its bitSet - m_data.reset(transport->getIntrospectionRegistry()->deserializeStructureAndCreatePVStructure(payloadBuffer, transport.get())); - m_bitSet.reset(new BitSet(m_data->getNumberFields())); - + { + Lock lock(m_dataMutex); + m_data.reset(transport->getIntrospectionRegistry()->deserializeStructureAndCreatePVStructure(payloadBuffer, transport.get())); + m_bitSet.reset(new BitSet(m_data->getNumberFields())); + } + // notify ChannelRPC::shared_pointer thisChannelRPC = dynamic_pointer_cast(shared_from_this()); EXCEPTION_GUARD(m_channelRPCRequester->channelRPCConnect(status, thisChannelRPC, m_data, m_bitSet)); @@ -1412,8 +1452,12 @@ namespace epics { // put else { - SerializeHelper::writeSize(m_offset, buffer, control); - m_data->serialize(buffer, control, 0, m_count); // put from 0 offset; TODO count out-of-bounds check?! + { + // no need to lock here, since it is already locked via TransportSender IF + //Lock lock(m_dataMutex); + SerializeHelper::writeSize(m_offset, buffer, control); + m_data->serialize(buffer, control, 0, m_count); // put from 0 offset; TODO count out-of-bounds check?! + } } stopRequest(); @@ -1437,8 +1481,11 @@ namespace epics { // create data and its bitSet FieldConstPtr field = transport->getIntrospectionRegistry()->deserialize(payloadBuffer, transport.get()); - m_data.reset(dynamic_cast(getPVDataCreate()->createPVField(0, field))); - + { + Lock lock(m_dataMutex); + m_data.reset(dynamic_cast(getPVDataCreate()->createPVField(0, field))); + } + // notify ChannelArray::shared_pointer thisChannelArray = dynamic_pointer_cast(shared_from_this()); EXCEPTION_GUARD(m_channelArrayRequester->channelArrayConnect(status, thisChannelArray, m_data)); @@ -1454,8 +1501,11 @@ namespace epics { return true; } - m_data->deserialize(payloadBuffer, transport.get()); - + { + Lock lock(m_dataMutex); + m_data->deserialize(payloadBuffer, transport.get()); + } + EXCEPTION_GUARD(m_channelArrayRequester->getArrayDone(status)); return true; } diff --git a/pvAccessApp/server/responseHandlers.cpp b/pvAccessApp/server/responseHandlers.cpp index 66d35ba..89da2b4 100644 --- a/pvAccessApp/server/responseHandlers.cpp +++ b/pvAccessApp/server/responseHandlers.cpp @@ -689,6 +689,9 @@ void ServerChannelGetRequesterImpl::send(ByteBuffer* buffer, TransportSendContro } else { + // we locked _mutex above, so _channelGet is valid + ScopedLock lock(_channelGet); + _bitSet->serialize(buffer, control); _pvStructure->serialize(buffer, control, _bitSet.get()); } @@ -762,10 +765,14 @@ void ServerPutHandler::handleResponse(osiSockAddr* responseFrom, else { // deserialize bitSet and do a put - BitSet::shared_pointer putBitSet = request->getBitSet(); - putBitSet->deserialize(payloadBuffer, transport.get()); - request->getPVStructure()->deserialize(payloadBuffer, transport.get(), putBitSet.get()); - request->getChannelPut()->put(lastRequest); + ChannelPut::shared_pointer channelPut = request->getChannelPut(); + { + ScopedLock lock(channelPut); // TODO not needed if put is processed by the same thread + BitSet::shared_pointer putBitSet = request->getBitSet(); + putBitSet->deserialize(payloadBuffer, transport.get()); + request->getPVStructure()->deserialize(payloadBuffer, transport.get(), putBitSet.get()); + } + channelPut->put(lastRequest); } } } @@ -897,7 +904,7 @@ void ServerChannelPutRequesterImpl::send(ByteBuffer* buffer, TransportSendContro } else if ((QOS_GET & request) != 0) { - Lock guard(_mutex); + ScopedLock lock(_channelPut); // _channelPut is valid because we required _mutex above _pvStructure->serialize(buffer, control); } } @@ -973,8 +980,12 @@ void ServerPutGetHandler::handleResponse(osiSockAddr* responseFrom, else { // deserialize bitSet and do a put - request->getPVPutStructure()->deserialize(payloadBuffer, transport.get()); - request->getChannelPutGet()->putGet(lastRequest); + ChannelPutGet::shared_pointer channelPutGet = request->getChannelPutGet(); + { + ScopedLock lock(channelPutGet); // TODO not necessary if read is done in putGet + request->getPVPutStructure()->deserialize(payloadBuffer, transport.get()); + } + channelPutGet->putGet(lastRequest); } } } @@ -1117,12 +1128,14 @@ void ServerChannelPutGetRequesterImpl::send(ByteBuffer* buffer, TransportSendCon } else if ((QOS_GET_PUT & request) != 0) { - Lock guard(_mutex); + ScopedLock lock(_channelPutGet); // valid due to _mutex lock above + //Lock guard(_mutex); _pvPutStructure->serialize(buffer, control); } else { - Lock guard(_mutex); + ScopedLock lock(_channelPutGet); // valid due to _mutex lock above + //Lock guard(_mutex); _pvGetStructure->serialize(buffer, control); } } @@ -1320,6 +1333,7 @@ void ServerMonitorRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* if (_status.isSuccess()) { + // valid due to _mutex lock above introspectionRegistry->serialize(_structure, buffer, control); } stopRequest(); @@ -1419,10 +1433,15 @@ void ServerArrayHandler::handleResponse(osiSockAddr* responseFrom, else { // deserialize data to put - const int32 offset = SerializeHelper::readSize(payloadBuffer, transport.get()); - PVArray::shared_pointer array = request->getPVArray(); - array->deserialize(payloadBuffer, transport.get()); - request->getChannelArray()->putArray(lastRequest, offset, array->getLength()); + int32 offset; + ChannelArray::shared_pointer channelArray = request->getChannelArray(); + PVArray::shared_pointer array = request->getPVArray(); + { + ScopedLock lock(channelArray); // TODO not needed if read by the same thread + offset = SerializeHelper::readSize(payloadBuffer, transport.get()); + array->deserialize(payloadBuffer, transport.get()); + } + channelArray->putArray(lastRequest, offset, array->getLength()); } } } @@ -1553,7 +1572,8 @@ void ServerChannelArrayRequesterImpl::send(ByteBuffer* buffer, TransportSendCont { if ((QOS_GET & request) != 0) { - Lock guard(_mutex); + //Lock guard(_mutex); + ScopedLock lock(_channelArray); // valid due to _mutex lock above _pvArray->serialize(buffer, control, 0, _pvArray->getLength()); } else if ((QOS_INIT & request) != 0) @@ -1899,10 +1919,14 @@ void ServerRPCHandler::handleResponse(osiSockAddr* responseFrom, } // deserialize put data - BitSet::shared_pointer changedBitSet = request->getAgrumentsBitSet(); - changedBitSet->deserialize(payloadBuffer, transport.get()); - request->getPvArguments()->deserialize(payloadBuffer, transport.get(), changedBitSet.get()); - request->getChannelRPC()->request(lastRequest); + ChannelRPC::shared_pointer channelRPC = request->getChannelRPC(); + { + ScopedLock lock(channelRPC); // TODO not really needed if channelRPC->request() is reads from the same thread + BitSet::shared_pointer changedBitSet = request->getAgrumentsBitSet(); + changedBitSet->deserialize(payloadBuffer, transport.get()); + request->getPvArguments()->deserialize(payloadBuffer, transport.get(), changedBitSet.get()); + } + channelRPC->request(lastRequest); } } @@ -2013,24 +2037,24 @@ void ServerChannelRPCRequesterImpl::send(ByteBuffer* buffer, TransportSendContro buffer->putInt(_ioid); buffer->putByte((int8)request); IntrospectionRegistry* introspectionRegistry = _transport->getIntrospectionRegistry(); + { - Lock guard(_mutex); + Lock guard(_mutex); introspectionRegistry->serializeStatus(buffer, control, _status); - } - if (_status.isSuccess()) - { - if ((QOS_INIT & request) != 0) - { - Lock guard(_mutex); - introspectionRegistry->serialize(_pvArguments != NULL ? _pvArguments->getField() : FieldConstPtr(), buffer, control); - } - else - { - introspectionRegistry->serializeStructure(buffer, control, _pvResponse.get()); - } + if (_status.isSuccess()) + { + if ((QOS_INIT & request) != 0) + { + introspectionRegistry->serialize(_pvArguments != NULL ? _pvArguments->getField() : FieldConstPtr(), buffer, control); + } + else + { + introspectionRegistry->serializeStructure(buffer, control, _pvResponse.get()); + } + } } - + stopRequest(); // lastRequest