diff --git a/pvAccessApp/rpcClient/rpcClient.cpp b/pvAccessApp/rpcClient/rpcClient.cpp index 32f6bf3..1c8b1b3 100644 --- a/pvAccessApp/rpcClient/rpcClient.cpp +++ b/pvAccessApp/rpcClient/rpcClient.cpp @@ -73,7 +73,7 @@ class ChannelRPCRequesterImpl : public ChannelRPCRequester std::cerr << "[" << getRequesterName() << "] message(" << message << ", " << getMessageTypeName(messageType) << ")" << std::endl; } - virtual void channelRPCConnect(const epics::pvData::Status& status,ChannelRPC::shared_pointer const & channelRPC) + virtual void channelRPCConnect(const epics::pvData::Status& status, ChannelRPC::shared_pointer const & channelRPC) { if (status.isSuccess()) { @@ -97,7 +97,8 @@ class ChannelRPCRequesterImpl : public ChannelRPCRequester } } - virtual void requestDone (const epics::pvData::Status &status, epics::pvData::PVStructure::shared_pointer const &pvResponse) + virtual void requestDone(const epics::pvData::Status &status, ChannelRPC::shared_pointer const & /*channelRPC*/, + epics::pvData::PVStructure::shared_pointer const &pvResponse) { if (status.isSuccess()) { @@ -214,7 +215,7 @@ private: void init() { using namespace std::tr1; - m_provider = getChannelAccess()->getProvider("pva"); + m_provider = getChannelProviderRegistry()->getProvider("pva"); shared_ptr channelRequesterImpl(new ChannelRequesterImpl()); m_channelRequesterImpl = channelRequesterImpl; @@ -253,7 +254,8 @@ PVStructure::shared_pointer RPCClientImpl::request(PVStructure::shared_pointer p if (rpcRequesterImpl->waitUntilConnected(timeOut)) { - channelRPC->request(pvRequest, true); + channelRPC->lastRequest(); + channelRPC->request(pvRequest); allOK &= rpcRequesterImpl->waitUntilRPC(timeOut); response = rpcRequesterImpl->response; } diff --git a/pvAccessApp/rpcService/rpcServer.cpp b/pvAccessApp/rpcService/rpcServer.cpp index 022b961..2a38379 100644 --- a/pvAccessApp/rpcService/rpcServer.cpp +++ b/pvAccessApp/rpcService/rpcServer.cpp @@ -14,18 +14,22 @@ using namespace epics::pvData; namespace epics { namespace pvAccess { -class ChannelRPCServiceImpl : public ChannelRPC +class ChannelRPCServiceImpl : + public ChannelRPC, + public std::tr1::enable_shared_from_this { private: ChannelRPCRequester::shared_pointer m_channelRPCRequester; RPCService::shared_pointer m_rpcService; + AtomicBoolean m_lastRequest; public: ChannelRPCServiceImpl( ChannelRPCRequester::shared_pointer const & channelRPCRequester, RPCService::shared_pointer const & rpcService) : m_channelRPCRequester(channelRPCRequester), - m_rpcService(rpcService) + m_rpcService(rpcService), + m_lastRequest() { } @@ -34,7 +38,7 @@ class ChannelRPCServiceImpl : public ChannelRPC destroy(); } - void processRequest(epics::pvData::PVStructure::shared_pointer const & pvArgument, bool lastRequest) + void processRequest(epics::pvData::PVStructure::shared_pointer const & pvArgument) { epics::pvData::PVStructure::shared_pointer result; Status status = Status::Ok; @@ -66,18 +70,23 @@ class ChannelRPCServiceImpl : public ChannelRPC status = Status(Status::STATUSTYPE_FATAL, "RPCService.request(PVStructure) returned null."); } - m_channelRPCRequester->requestDone(status, result); + m_channelRPCRequester->requestDone(status, shared_from_this(), result); - if (lastRequest) + if (m_lastRequest.get()) destroy(); } - virtual void request(epics::pvData::PVStructure::shared_pointer const & pvArgument, bool lastRequest) + virtual void request(epics::pvData::PVStructure::shared_pointer const & pvArgument) { - processRequest(pvArgument, lastRequest); + processRequest(pvArgument); } + void lastRequest() + { + m_lastRequest.set(); + } + virtual void cancel() { // noop @@ -197,7 +206,7 @@ public: { ChannelGet::shared_pointer nullPtr; channelGetRequester->channelGetConnect(notSupportedStatus, nullPtr, - epics::pvData::PVStructure::shared_pointer(), epics::pvData::BitSet::shared_pointer()); + epics::pvData::Structure::const_shared_pointer()); return nullPtr; } @@ -207,7 +216,7 @@ public: { ChannelPut::shared_pointer nullPtr; channelPutRequester->channelPutConnect(notSupportedStatus, nullPtr, - epics::pvData::PVStructure::shared_pointer(), epics::pvData::BitSet::shared_pointer()); + epics::pvData::Structure::const_shared_pointer()); return nullPtr; } @@ -217,7 +226,7 @@ public: epics::pvData::PVStructure::shared_pointer const & /*pvRequest*/) { ChannelPutGet::shared_pointer nullPtr; - epics::pvData::PVStructure::shared_pointer nullStructure; + epics::pvData::Structure::const_shared_pointer nullStructure; channelPutGetRequester->channelPutGetConnect(notSupportedStatus, nullPtr, nullStructure, nullStructure); return nullPtr; } @@ -257,7 +266,7 @@ public: epics::pvData::PVStructure::shared_pointer const & /*pvRequest*/) { ChannelArray::shared_pointer nullPtr; - channelArrayRequester->channelArrayConnect(notSupportedStatus, nullPtr, epics::pvData::PVArray::shared_pointer()); + channelArrayRequester->channelArrayConnect(notSupportedStatus, nullPtr, epics::pvData::Array::const_shared_pointer()); return nullPtr; } @@ -443,7 +452,7 @@ RPCServer::RPCServer() m_serverContext = ServerContextImpl::create(); m_serverContext->setChannelProviderName(m_channelProviderImpl->getProviderName()); - m_serverContext->initialize(getChannelAccess()); + m_serverContext->initialize(getChannelProviderRegistry()); } RPCServer::~RPCServer() @@ -482,7 +491,7 @@ static void threadRunner(void* usr) void RPCServer::runInNewThread(int seconds) { std::auto_ptr param(new ThreadRunnerParam()); - param->server = rpcServer; + param->server = shared_from_this(); param->timeToRun = seconds; epicsThreadCreate("RPCServer thread", diff --git a/pvAccessApp/server/responseHandlers.cpp b/pvAccessApp/server/responseHandlers.cpp index 4d8fb56..3dbab5e 100644 --- a/pvAccessApp/server/responseHandlers.cpp +++ b/pvAccessApp/server/responseHandlers.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include @@ -29,6 +30,38 @@ using namespace epics::pvData; namespace epics { namespace pvAccess { +// TODO this is a copy from clientContextImpl.cpp + static PVDataCreatePtr pvDataCreate = getPVDataCreate(); + + + static BitSet::shared_pointer createBitSetFor( + PVStructure::shared_pointer const & pvStructure, + BitSet::shared_pointer const & existingBitSet) + { + int pvStructureSize = pvStructure->getNumberFields(); + if (existingBitSet.get() && static_cast(existingBitSet->size()) >= pvStructureSize) + { + // clear existing BitSet + // also necessary if larger BitSet is reused + existingBitSet->clear(); + return existingBitSet; + } + else + return BitSet::shared_pointer(new BitSet(pvStructureSize)); + } + + static PVField::shared_pointer reuseOrCreatePVField( + Field::const_shared_pointer const & field, + PVField::shared_pointer const & existingPVField) + { + if (existingPVField.get() && *field == *existingPVField->getField()) + return existingPVField; + else + return pvDataCreate->createPVField(field); + } + + + void ServerBadResponse::handleResponse(osiSockAddr* responseFrom, Transport::shared_pointer const & transport, int8 version, int8 command, size_t payloadSize, ByteBuffer* payloadBuffer) @@ -574,7 +607,9 @@ void ServerGetHandler::handleResponse(osiSockAddr* responseFrom, MB_POINT(channelGet, 4, "server channelGet->deserialize request (end)"); - request->getChannelGet()->get(lastRequest); + if (lastRequest) + request->getChannelGet()->lastRequest(); + request->getChannelGet()->get(); } } @@ -609,7 +644,7 @@ void ServerGetHandler::handleResponse(osiSockAddr* responseFrom, } ServerChannelGetRequesterImpl::ServerChannelGetRequesterImpl(ServerContextImpl::shared_pointer const & context, ServerChannelImpl::shared_pointer const & channel, const pvAccessID ioid, Transport::shared_pointer const & transport) : - BaseChannelRequester(context, channel, ioid, transport), _channelGet(), _bitSet(), _pvStructure() + BaseChannelRequester(context, channel, ioid, transport) { } @@ -631,14 +666,13 @@ void ServerChannelGetRequesterImpl::activate(PVStructure::shared_pointer const & INIT_EXCEPTION_GUARD(CMD_GET, _channelGet = _channel->getChannel()->createChannelGet(thisPointer, pvRequest)); } -void ServerChannelGetRequesterImpl::channelGetConnect(const Status& status, ChannelGet::shared_pointer const & channelGet, PVStructure::shared_pointer const & pvStructure, BitSet::shared_pointer const & bitSet) +void ServerChannelGetRequesterImpl::channelGetConnect(const Status& status, ChannelGet::shared_pointer const & channelGet, Structure::const_shared_pointer const & structure) { { Lock guard(_mutex); - _bitSet = bitSet; - _pvStructure = pvStructure; _status = status; _channelGet = channelGet; + _structure = structure; } TransportSender::shared_pointer thisSender = shared_from_this(); _transport->enqueueSendRequest(thisSender); @@ -650,12 +684,15 @@ void ServerChannelGetRequesterImpl::channelGetConnect(const Status& status, Chan } } -void ServerChannelGetRequesterImpl::getDone(const Status& status) +void ServerChannelGetRequesterImpl::getDone(const Status& status, ChannelGet::shared_pointer const & /*channelGet*/, + PVStructure::shared_pointer const & pvStructure, BitSet::shared_pointer const & bitSet) { MB_POINT(channelGet, 5, "server channelGet->getDone()"); { Lock guard(_mutex); _status = status; + _pvStructure = pvStructure; + _bitSet = bitSet; } TransportSender::shared_pointer thisSender = shared_from_this(); _transport->enqueueSendRequest(thisSender); @@ -687,12 +724,12 @@ ChannelGet::shared_pointer ServerChannelGetRequesterImpl::getChannelGet() void ServerChannelGetRequesterImpl::lock() { - //noop + // noop } void ServerChannelGetRequesterImpl::unlock() { - //noop + // noop } void ServerChannelGetRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* control) @@ -719,18 +756,20 @@ void ServerChannelGetRequesterImpl::send(ByteBuffer* buffer, TransportSendContro if (request & QOS_INIT) { Lock guard(_mutex); - control->cachedSerialize(_pvStructure != NULL ? _pvStructure->getField() : FieldConstPtr(), buffer); - + control->cachedSerialize(_structure, buffer); } else { MB_POINT(channelGet, 6, "server channelGet->serialize response (start)"); { - // we locked _mutex above, so _channelGet is valid - ScopedLock lock(_channelGet); - - _bitSet->serialize(buffer, control); - _pvStructure->serialize(buffer, control, _bitSet.get()); + // we locked _mutex above, so _channelGet is valid + ScopedLock lock(_channelGet); + + _bitSet->serialize(buffer, control); + _pvStructure->serialize(buffer, control, _bitSet.get()); + + _pvStructure.reset(); + _bitSet.reset(); } MB_POINT(channelGet, 7, "server channelGet->serialize response (end)"); } @@ -796,33 +835,40 @@ void ServerPutHandler::handleResponse(osiSockAddr* responseFrom, return; } + ChannelPut::shared_pointer channelPut = request->getChannelPut(); + + if (lastRequest) + channelPut->lastRequest(); + if (get) { // no destroy w/ get - request->getChannelPut()->get(); + channelPut->get(); } else { // deserialize bitSet and do a put - ChannelPut::shared_pointer channelPut = request->getChannelPut(); + { ScopedLock lock(channelPut); // TODO not needed if put is processed by the same thread - BitSet::shared_pointer putBitSet = request->getBitSet(); + BitSet::shared_pointer putBitSet = request->getPutBitSet(); + PVStructure::shared_pointer putPVStructure = request->getPutPVStructure(); DESERIALIZE_EXCEPTION_GUARD( putBitSet->deserialize(payloadBuffer, transport.get()); - request->getPVStructure()->deserialize(payloadBuffer, transport.get(), putBitSet.get()); + putPVStructure->deserialize(payloadBuffer, transport.get(), putBitSet.get()); ); - + + lock.unlock(); + channelPut->put(putPVStructure, putBitSet); } - channelPut->put(lastRequest); } } } ServerChannelPutRequesterImpl::ServerChannelPutRequesterImpl(ServerContextImpl::shared_pointer const & context, ServerChannelImpl::shared_pointer const & channel, const pvAccessID ioid, Transport::shared_pointer const & transport): - BaseChannelRequester(context, channel, ioid, transport), _channelPut(), _bitSet(), _pvStructure() + BaseChannelRequester(context, channel, ioid, transport) { } @@ -843,15 +889,17 @@ void ServerChannelPutRequesterImpl::activate(PVStructure::shared_pointer const & INIT_EXCEPTION_GUARD(CMD_PUT, _channelPut = _channel->getChannel()->createChannelPut(thisPointer, pvRequest)); } -void ServerChannelPutRequesterImpl::channelPutConnect(const Status& status, ChannelPut::shared_pointer const & channelPut, PVStructure::shared_pointer const & pvStructure, BitSet::shared_pointer const & bitSet) +void ServerChannelPutRequesterImpl::channelPutConnect(const Status& status, ChannelPut::shared_pointer const & channelPut, Structure::const_shared_pointer const & structure) { { Lock guard(_mutex); - _bitSet = bitSet; - _pvStructure = pvStructure; _status = status; _channelPut = channelPut; + _structure = structure; } + + _putPVStructure = std::tr1::static_pointer_cast(reuseOrCreatePVField(_structure, _putPVStructure)); + _putBitSet = createBitSetFor(_putPVStructure, _putBitSet); TransportSender::shared_pointer thisSender = shared_from_this(); _transport->enqueueSendRequest(thisSender); @@ -863,7 +911,7 @@ void ServerChannelPutRequesterImpl::channelPutConnect(const Status& status, Chan } } -void ServerChannelPutRequesterImpl::putDone(const Status& status) +void ServerChannelPutRequesterImpl::putDone(const Status& status, ChannelPut::shared_pointer const & /*channelPut*/) { { Lock guard(_mutex); @@ -873,11 +921,13 @@ void ServerChannelPutRequesterImpl::putDone(const Status& status) _transport->enqueueSendRequest(thisSender); } -void ServerChannelPutRequesterImpl::getDone(const Status& status) +void ServerChannelPutRequesterImpl::getDone(const Status& status, ChannelPut::shared_pointer const & /*channelPut*/, PVStructure::shared_pointer const & pvStructure, BitSet::shared_pointer const & bitSet) { { Lock guard(_mutex); _status = status; + _pvStructure = pvStructure; + _bitSet = bitSet; } TransportSender::shared_pointer thisSender = shared_from_this(); _transport->enqueueSendRequest(thisSender); @@ -918,16 +968,16 @@ ChannelPut::shared_pointer ServerChannelPutRequesterImpl::getChannelPut() return _channelPut; } -BitSet::shared_pointer ServerChannelPutRequesterImpl::getBitSet() +BitSet::shared_pointer ServerChannelPutRequesterImpl::getPutBitSet() { //Lock guard(_mutex); - return _bitSet; + return _putBitSet; } -PVStructure::shared_pointer ServerChannelPutRequesterImpl::getPVStructure() +PVStructure::shared_pointer ServerChannelPutRequesterImpl::getPutPVStructure() { //Lock guard(_mutex); - return _pvStructure; + return _putPVStructure; } void ServerChannelPutRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* control) @@ -947,12 +997,13 @@ void ServerChannelPutRequesterImpl::send(ByteBuffer* buffer, TransportSendContro if ((QOS_INIT & request) != 0) { Lock guard(_mutex); - control->cachedSerialize(_pvStructure != NULL ? _pvStructure->getField() : FieldConstPtr(), buffer); + control->cachedSerialize(_structure, buffer); } else if ((QOS_GET & request) != 0) { ScopedLock lock(_channelPut); // _channelPut is valid because we required _mutex above - _pvStructure->serialize(buffer, control); + _bitSet->serialize(buffer, control); + _pvStructure->serialize(buffer, control, _bitSet.get()); } } @@ -1016,26 +1067,34 @@ void ServerPutGetHandler::handleResponse(osiSockAddr* responseFrom, return; } + ChannelPutGet::shared_pointer channelPutGet = request->getChannelPutGet(); + if (lastRequest) + channelPutGet->lastRequest(); + if (getGet) { - request->getChannelPutGet()->getGet(); + channelPutGet->getGet(); } else if(getPut) { - request->getChannelPutGet()->getPut(); + channelPutGet->getPut(); } else { // deserialize bitSet and do a put - ChannelPutGet::shared_pointer channelPutGet = request->getChannelPutGet(); { ScopedLock lock(channelPutGet); // TODO not necessary if read is done in putGet + BitSet::shared_pointer putBitSet = request->getPutGetBitSet(); + PVStructure::shared_pointer putPVStructure = request->getPutGetPVStructure(); DESERIALIZE_EXCEPTION_GUARD( - request->getPVPutStructure()->deserialize(payloadBuffer, transport.get()); - ); + putBitSet->deserialize(payloadBuffer, transport.get()); + putPVStructure->deserialize(payloadBuffer, transport.get(), putBitSet.get()); + ); + + lock.unlock(); + channelPutGet->putGet(putPVStructure, putBitSet); } - channelPutGet->putGet(lastRequest); } } } @@ -1064,15 +1123,18 @@ void ServerChannelPutGetRequesterImpl::activate(PVStructure::shared_pointer cons } void ServerChannelPutGetRequesterImpl::channelPutGetConnect(const Status& status, ChannelPutGet::shared_pointer const & channelPutGet, - PVStructure::shared_pointer const & pvPutStructure, PVStructure::shared_pointer const & pvGetStructure) + Structure::const_shared_pointer const & putStructure, Structure::const_shared_pointer const & getStructure) { { Lock guard(_mutex); - _pvPutStructure = pvPutStructure; - _pvGetStructure = pvGetStructure; _status = status; _channelPutGet = channelPutGet; + _putStructure = putStructure; + _getStructure = getStructure; } + + _pvPutGetStructure = std::tr1::static_pointer_cast(reuseOrCreatePVField(_putStructure, _pvPutGetStructure)); + _pvPutGetBitSet = createBitSetFor(_pvPutGetStructure, _pvPutGetBitSet); TransportSender::shared_pointer thisSender = shared_from_this(); _transport->enqueueSendRequest(thisSender); @@ -1084,31 +1146,40 @@ void ServerChannelPutGetRequesterImpl::channelPutGetConnect(const Status& status } } -void ServerChannelPutGetRequesterImpl::getGetDone(const Status& status) +void ServerChannelPutGetRequesterImpl::getGetDone(const Status& status, ChannelPutGet::shared_pointer const & /*channelPutGet*/, + PVStructure::shared_pointer const & pvStructure, BitSet::shared_pointer const & bitSet) { { Lock guard(_mutex); _status = status; + _pvGetStructure = pvStructure; + _pvGetBitSet = bitSet; } TransportSender::shared_pointer thisSender = shared_from_this(); _transport->enqueueSendRequest(thisSender); } -void ServerChannelPutGetRequesterImpl::getPutDone(const Status& status) +void ServerChannelPutGetRequesterImpl::getPutDone(const Status& status, ChannelPutGet::shared_pointer const & /*channelPutGet*/, + PVStructure::shared_pointer const & pvStructure, BitSet::shared_pointer const & bitSet) { { Lock guard(_mutex); _status = status; + _pvPutStructure = pvStructure; + _pvPutBitSet = bitSet; } TransportSender::shared_pointer thisSender = shared_from_this(); _transport->enqueueSendRequest(thisSender); } -void ServerChannelPutGetRequesterImpl::putGetDone(const Status& status) +void ServerChannelPutGetRequesterImpl::putGetDone(const Status& status, ChannelPutGet::shared_pointer const & /*channelPutGet*/, + PVStructure::shared_pointer const & pvStructure, BitSet::shared_pointer const & bitSet) { { Lock guard(_mutex); _status = status; + _pvGetStructure = pvStructure; + _pvGetBitSet = bitSet; } TransportSender::shared_pointer thisSender = shared_from_this(); _transport->enqueueSendRequest(thisSender); @@ -1116,12 +1187,12 @@ void ServerChannelPutGetRequesterImpl::putGetDone(const Status& status) void ServerChannelPutGetRequesterImpl::lock() { - //noop + // noop } void ServerChannelPutGetRequesterImpl::unlock() { - //noop + // noop } void ServerChannelPutGetRequesterImpl::destroy() @@ -1149,10 +1220,16 @@ ChannelPutGet::shared_pointer ServerChannelPutGetRequesterImpl::getChannelPutGet return _channelPutGet; } -PVStructure::shared_pointer ServerChannelPutGetRequesterImpl::getPVPutStructure() +PVStructure::shared_pointer ServerChannelPutGetRequesterImpl::getPutGetPVStructure() { //Lock guard(_mutex); - return _pvPutStructure; + return _pvPutGetStructure; +} + +BitSet::shared_pointer ServerChannelPutGetRequesterImpl::getPutGetBitSet() +{ + //Lock guard(_mutex); + return _pvPutGetBitSet; } void ServerChannelPutGetRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* control) @@ -1172,25 +1249,28 @@ void ServerChannelPutGetRequesterImpl::send(ByteBuffer* buffer, TransportSendCon if ((QOS_INIT & request) != 0) { Lock guard(_mutex); - control->cachedSerialize(_pvPutStructure != NULL ? _pvPutStructure->getField() : FieldConstPtr(), buffer); - control->cachedSerialize(_pvGetStructure != NULL ? _pvGetStructure->getField() : FieldConstPtr(), buffer); + control->cachedSerialize(_putStructure, buffer); + control->cachedSerialize(_getStructure, buffer); } else if ((QOS_GET & request) != 0) { Lock guard(_mutex); - _pvGetStructure->serialize(buffer, control); + _pvGetBitSet->serialize(buffer, control); + _pvGetStructure->serialize(buffer, control, _pvGetBitSet.get()); } else if ((QOS_GET_PUT & request) != 0) { ScopedLock lock(_channelPutGet); // valid due to _mutex lock above //Lock guard(_mutex); - _pvPutStructure->serialize(buffer, control); + _pvPutBitSet->serialize(buffer, control); + _pvPutStructure->serialize(buffer, control, _pvPutBitSet.get()); } else { ScopedLock lock(_channelPutGet); // valid due to _mutex lock above //Lock guard(_mutex); - _pvGetStructure->serialize(buffer, control); + _pvGetBitSet->serialize(buffer, control); + _pvGetStructure->serialize(buffer, control, _pvGetBitSet.get()); } } @@ -1465,6 +1545,7 @@ void ServerArrayHandler::handleResponse(osiSockAddr* responseFrom, const bool lastRequest = (QOS_DESTROY & qosCode) != 0; const bool get = (QOS_GET & qosCode) != 0; const bool setLength = (QOS_GET_PUT & qosCode) != 0; + const bool getLength = (QOS_PROCESS & qosCode) != 0; ServerChannelArrayRequesterImpl::shared_pointer request = static_pointer_cast(channel->getRequest(ioid)); if (request == NULL) @@ -1479,34 +1560,43 @@ void ServerArrayHandler::handleResponse(osiSockAddr* responseFrom, return; } - + ChannelArray::shared_pointer channelArray = request->getChannelArray(); + if (lastRequest) + channelArray->lastRequest(); + if (get) { size_t offset = SerializeHelper::readSize(payloadBuffer, transport.get()); size_t count = SerializeHelper::readSize(payloadBuffer, transport.get()); - request->getChannelArray()->getArray(lastRequest, offset, count); + size_t stride = SerializeHelper::readSize(payloadBuffer, transport.get()); + request->getChannelArray()->getArray(offset, count, stride); } else if (setLength) { size_t length = SerializeHelper::readSize(payloadBuffer, transport.get()); size_t capacity = SerializeHelper::readSize(payloadBuffer, transport.get()); - request->getChannelArray()->setLength(lastRequest, length, capacity); + request->getChannelArray()->setLength(length, capacity); + } + else if (getLength) + { + request->getChannelArray()->getLength(); } else { // deserialize data to put size_t offset; - ChannelArray::shared_pointer channelArray = request->getChannelArray(); + size_t stride; PVArray::shared_pointer array = request->getPVArray(); { ScopedLock lock(channelArray); // TODO not needed if read by the same thread DESERIALIZE_EXCEPTION_GUARD( offset = SerializeHelper::readSize(payloadBuffer, transport.get()); + stride = SerializeHelper::readSize(payloadBuffer, transport.get()); array->deserialize(payloadBuffer, transport.get()); ); } - channelArray->putArray(lastRequest, offset, array->getLength()); + channelArray->putArray(array, offset, array->getLength(), stride); } } } @@ -1536,14 +1626,17 @@ void ServerChannelArrayRequesterImpl::activate(PVStructure::shared_pointer const INIT_EXCEPTION_GUARD(CMD_ARRAY, _channelArray = _channel->getChannel()->createChannelArray(thisPointer, pvRequest)); } -void ServerChannelArrayRequesterImpl::channelArrayConnect(const Status& status, ChannelArray::shared_pointer const & channelArray, PVArray::shared_pointer const & pvArray) +void ServerChannelArrayRequesterImpl::channelArrayConnect(const Status& status, ChannelArray::shared_pointer const & channelArray, Array::const_shared_pointer const & array) { { Lock guard(_mutex); _status = status; - _pvArray = pvArray; _channelArray = channelArray; + _array = array; } + + _pvArray = std::tr1::static_pointer_cast(reuseOrCreatePVField(_array, _pvArray)); + TransportSender::shared_pointer thisSender = shared_from_this(); _transport->enqueueSendRequest(thisSender); @@ -1554,7 +1647,18 @@ void ServerChannelArrayRequesterImpl::channelArrayConnect(const Status& status, } } -void ServerChannelArrayRequesterImpl::getArrayDone(const Status& status) +void ServerChannelArrayRequesterImpl::getArrayDone(const Status& status, ChannelArray::shared_pointer const & /*channelArray*/, PVArray::shared_pointer const & pvArray) +{ + { + Lock guard(_mutex); + _status = status; + _pvArray = pvArray; + } + TransportSender::shared_pointer thisSender = shared_from_this(); + _transport->enqueueSendRequest(thisSender); +} + +void ServerChannelArrayRequesterImpl::putArrayDone(const Status& status, ChannelArray::shared_pointer const & /*channelArray*/) { { Lock guard(_mutex); @@ -1564,7 +1668,7 @@ void ServerChannelArrayRequesterImpl::getArrayDone(const Status& status) _transport->enqueueSendRequest(thisSender); } -void ServerChannelArrayRequesterImpl::putArrayDone(const Status& status) +void ServerChannelArrayRequesterImpl::setLengthDone(const Status& status, ChannelArray::shared_pointer const & /*channelArray*/) { { Lock guard(_mutex); @@ -1574,11 +1678,14 @@ void ServerChannelArrayRequesterImpl::putArrayDone(const Status& status) _transport->enqueueSendRequest(thisSender); } -void ServerChannelArrayRequesterImpl::setLengthDone(const Status& status) +void ServerChannelArrayRequesterImpl::getLengthDone(const Status& status, ChannelArray::shared_pointer const & /*channelArray*/, + size_t length, size_t capacity) { { Lock guard(_mutex); _status = status; + _length = length; + _capacity = capacity; } TransportSender::shared_pointer thisSender = shared_from_this(); _transport->enqueueSendRequest(thisSender); @@ -1586,12 +1693,12 @@ void ServerChannelArrayRequesterImpl::setLengthDone(const Status& status) void ServerChannelArrayRequesterImpl::lock() { - //noop + // noop } void ServerChannelArrayRequesterImpl::unlock() { - //noop + // noop } void ServerChannelArrayRequesterImpl::destroy() @@ -1644,11 +1751,18 @@ void ServerChannelArrayRequesterImpl::send(ByteBuffer* buffer, TransportSendCont //Lock guard(_mutex); ScopedLock lock(_channelArray); // valid due to _mutex lock above _pvArray->serialize(buffer, control, 0, _pvArray->getLength()); + _pvArray.reset(); + } + else if ((QOS_PROCESS & request) != 0) + { + //Lock guard(_mutex); + SerializeHelper::writeSize(_length, buffer, control); + SerializeHelper::writeSize(_capacity, buffer, control); } else if ((QOS_INIT & request) != 0) { Lock guard(_mutex); - control->cachedSerialize(_pvArray != NULL ? _pvArray->getField() : FieldConstPtr(), buffer); + control->cachedSerialize(_array, buffer); } } @@ -1794,7 +1908,9 @@ void ServerProcessHandler::handleResponse(osiSockAddr* responseFrom, return; } - request->getChannelProcess()->process(lastRequest); + if (lastRequest) + request->getChannelProcess()->lastRequest(); + request->getChannelProcess()->process(); } } @@ -1840,7 +1956,7 @@ void ServerChannelProcessRequesterImpl::channelProcessConnect(const Status& stat } } -void ServerChannelProcessRequesterImpl::processDone(const Status& status) +void ServerChannelProcessRequesterImpl::processDone(const Status& status, ChannelProcess::shared_pointer const & /*channelProcess*/) { { Lock guard(_mutex); @@ -1852,12 +1968,12 @@ void ServerChannelProcessRequesterImpl::processDone(const Status& status) void ServerChannelProcessRequesterImpl::lock() { - //noop + // noop } void ServerChannelProcessRequesterImpl::unlock() { - //noop + // noop } void ServerChannelProcessRequesterImpl::destroy() @@ -2045,7 +2161,9 @@ void ServerRPCHandler::handleResponse(osiSockAddr* responseFrom, pvArgument = SerializationHelper::deserializeStructureFull(payloadBuffer, transport.get()); ); - channelRPC->request(pvArgument, lastRequest); + if (lastRequest) + channelRPC->lastRequest(); + channelRPC->request(pvArgument); } } @@ -2093,7 +2211,7 @@ void ServerChannelRPCRequesterImpl::channelRPCConnect(const Status& status, Chan } } -void ServerChannelRPCRequesterImpl::requestDone(const Status& status, PVStructure::shared_pointer const & pvResponse) +void ServerChannelRPCRequesterImpl::requestDone(const Status& status, ChannelRPC::shared_pointer const & /*channelRPC*/, PVStructure::shared_pointer const & pvResponse) { { Lock guard(_mutex); @@ -2106,12 +2224,12 @@ void ServerChannelRPCRequesterImpl::requestDone(const Status& status, PVStructur void ServerChannelRPCRequesterImpl::lock() { - //noop + // noop } void ServerChannelRPCRequesterImpl::unlock() { - //noop + // noop } void ServerChannelRPCRequesterImpl::destroy() diff --git a/pvAccessApp/server/responseHandlers.h b/pvAccessApp/server/responseHandlers.h index 49a2912..66d7e3c 100644 --- a/pvAccessApp/server/responseHandlers.h +++ b/pvAccessApp/server/responseHandlers.h @@ -333,8 +333,10 @@ namespace pvAccess { epics::pvData::PVStructure::shared_pointer const & pvRequest); virtual ~ServerChannelGetRequesterImpl() {} void channelGetConnect(const epics::pvData::Status& status, ChannelGet::shared_pointer const & channelGet, - epics::pvData::PVStructure::shared_pointer const & pvStructure, epics::pvData::BitSet::shared_pointer const & bitSet); - void getDone(const epics::pvData::Status& status); + epics::pvData::Structure::const_shared_pointer const & structure); + void getDone(const epics::pvData::Status& status, ChannelGet::shared_pointer const & channelGet, + epics::pvData::PVStructure::shared_pointer const & pvStructure, + epics::pvData::BitSet::shared_pointer const & bitSet); void destroy(); ChannelGet::shared_pointer getChannelGet(); @@ -344,8 +346,9 @@ namespace pvAccess { void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control); private: ChannelGet::shared_pointer _channelGet; - epics::pvData::BitSet::shared_pointer _bitSet; + epics::pvData::Structure::const_shared_pointer _structure; epics::pvData::PVStructure::shared_pointer _pvStructure; + epics::pvData::BitSet::shared_pointer _bitSet; epics::pvData::Status _status; }; @@ -387,21 +390,30 @@ namespace pvAccess { Transport::shared_pointer const & transport,epics::pvData::PVStructure::shared_pointer const & pvRequest); virtual ~ServerChannelPutRequesterImpl() {} - void channelPutConnect(const epics::pvData::Status& status, ChannelPut::shared_pointer const & channelPut, epics::pvData::PVStructure::shared_pointer const & pvStructure, epics::pvData::BitSet::shared_pointer const & bitSet); - void putDone(const epics::pvData::Status& status); - void getDone(const epics::pvData::Status& status); + void channelPutConnect(const epics::pvData::Status& status, ChannelPut::shared_pointer const & channelPut, epics::pvData::Structure::const_shared_pointer const & structure); + void putDone(const epics::pvData::Status& status, ChannelPut::shared_pointer const & channelPut); + void getDone(const epics::pvData::Status& status, ChannelPut::shared_pointer const & channelPut, epics::pvData::PVStructure::shared_pointer const & pvStructure, epics::pvData::BitSet::shared_pointer const & bitSet); void lock(); void unlock(); void destroy(); ChannelPut::shared_pointer getChannelPut(); - epics::pvData::BitSet::shared_pointer getBitSet(); - epics::pvData::PVStructure::shared_pointer getPVStructure(); + epics::pvData::BitSet::shared_pointer getPutBitSet(); + epics::pvData::PVStructure::shared_pointer getPutPVStructure(); void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control); private: ChannelPut::shared_pointer _channelPut; + + epics::pvData::Structure::const_shared_pointer _structure; + + // reference store (for get) epics::pvData::BitSet::shared_pointer _bitSet; epics::pvData::PVStructure::shared_pointer _pvStructure; + + // data store (for put) + epics::pvData::BitSet::shared_pointer _putBitSet; + epics::pvData::PVStructure::shared_pointer _putPVStructure; + epics::pvData::Status _status; }; @@ -442,21 +454,43 @@ namespace pvAccess { Transport::shared_pointer const & transport,epics::pvData::PVStructure::shared_pointer const & pvRequest); virtual ~ServerChannelPutGetRequesterImpl() {} - void channelPutGetConnect(const epics::pvData::Status& status, ChannelPutGet::shared_pointer const & channelPutGet, epics::pvData::PVStructure::shared_pointer const & pvPutStructure, epics::pvData::PVStructure::shared_pointer const & pvGetStructure); - void getGetDone(const epics::pvData::Status& status); - void getPutDone(const epics::pvData::Status& status); - void putGetDone(const epics::pvData::Status& status); + void channelPutGetConnect(const epics::pvData::Status& status, ChannelPutGet::shared_pointer const & channelPutGet, + epics::pvData::Structure::const_shared_pointer const & putStructure, + epics::pvData::Structure::const_shared_pointer const & getStructure); + void getGetDone(const epics::pvData::Status& status, ChannelPutGet::shared_pointer const & channelPutGet, + epics::pvData::PVStructure::shared_pointer const & pvStructure, + epics::pvData::BitSet::shared_pointer const & bitSet); + void getPutDone(const epics::pvData::Status& status, ChannelPutGet::shared_pointer const & channelPutGet, + epics::pvData::PVStructure::shared_pointer const & pvStructure, + epics::pvData::BitSet::shared_pointer const & bitSet); + void putGetDone(const epics::pvData::Status& status, ChannelPutGet::shared_pointer const & channelPutGet, + epics::pvData::PVStructure::shared_pointer const & pvStructure, + epics::pvData::BitSet::shared_pointer const & bitSet); void lock(); void unlock(); void destroy(); ChannelPutGet::shared_pointer getChannelPutGet(); - epics::pvData::PVStructure::shared_pointer getPVPutStructure(); + + epics::pvData::PVStructure::shared_pointer getPutGetPVStructure(); + epics::pvData::BitSet::shared_pointer getPutGetBitSet(); + void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control); private: ChannelPutGet::shared_pointer _channelPutGet; + epics::pvData::Structure::const_shared_pointer _putStructure; + epics::pvData::Structure::const_shared_pointer _getStructure; + + // reference store epics::pvData::PVStructure::shared_pointer _pvPutStructure; + epics::pvData::BitSet::shared_pointer _pvPutBitSet; epics::pvData::PVStructure::shared_pointer _pvGetStructure; + epics::pvData::BitSet::shared_pointer _pvGetBitSet; + + // data container (for put-get) + epics::pvData::PVStructure::shared_pointer _pvPutGetStructure; + epics::pvData::BitSet::shared_pointer _pvPutGetBitSet; + epics::pvData::Status _status; }; @@ -552,10 +586,13 @@ namespace pvAccess { Transport::shared_pointer const & transport,epics::pvData::PVStructure::shared_pointer const & pvRequest); virtual ~ServerChannelArrayRequesterImpl() {} - void channelArrayConnect(const epics::pvData::Status& status, ChannelArray::shared_pointer const & channelArray, epics::pvData::PVArray::shared_pointer const & pvArray); - void getArrayDone(const epics::pvData::Status& status); - void putArrayDone(const epics::pvData::Status& status); - void setLengthDone(const epics::pvData::Status& status); + void channelArrayConnect(const epics::pvData::Status& status, ChannelArray::shared_pointer const & channelArray, epics::pvData::Array::const_shared_pointer const & array); + void getArrayDone(const epics::pvData::Status& status, ChannelArray::shared_pointer const & channelArray, + epics::pvData::PVArray::shared_pointer const & pvArray); + void putArrayDone(const epics::pvData::Status& status, ChannelArray::shared_pointer const & channelArray); + void setLengthDone(const epics::pvData::Status& status, ChannelArray::shared_pointer const & channelArray); + void getLengthDone(const epics::pvData::Status& status, ChannelArray::shared_pointer const & channelArray, + std::size_t length, std::size_t capacity); void lock(); void unlock(); void destroy(); @@ -567,7 +604,10 @@ namespace pvAccess { private: ChannelArray::shared_pointer _channelArray; + epics::pvData::Array::const_shared_pointer _array; epics::pvData::PVArray::shared_pointer _pvArray; + std::size_t _length; + std::size_t _capacity; epics::pvData::Status _status; }; @@ -651,7 +691,7 @@ namespace pvAccess { virtual ~ServerChannelProcessRequesterImpl() {} void channelProcessConnect(const epics::pvData::Status& status, ChannelProcess::shared_pointer const & channelProcess); - void processDone(const epics::pvData::Status& status); + void processDone(const epics::pvData::Status& status, ChannelProcess::shared_pointer const & channelProcess); void lock(); void unlock(); void destroy(); @@ -777,7 +817,7 @@ namespace pvAccess { virtual ~ServerChannelRPCRequesterImpl() {} void channelRPCConnect(const epics::pvData::Status& status, ChannelRPC::shared_pointer const & channelRPC); - void requestDone(const epics::pvData::Status& status, epics::pvData::PVStructure::shared_pointer const & pvResponse); + void requestDone(const epics::pvData::Status& status, ChannelRPC::shared_pointer const & channelRPC, epics::pvData::PVStructure::shared_pointer const & pvResponse); void lock(); void unlock(); void destroy(); diff --git a/pvAccessApp/server/serverContext.cpp b/pvAccessApp/server/serverContext.cpp index d16f9bb..c357d23 100644 --- a/pvAccessApp/server/serverContext.cpp +++ b/pvAccessApp/server/serverContext.cpp @@ -36,7 +36,7 @@ ServerContextImpl::ServerContextImpl(): _beaconEmitter(), _acceptor(), _transportRegistry(), - _channelAccess(), + _channelProviderRegistry(), _channelProviderNames(PVACCESS_DEFAULT_PROVIDER), _channelProviders(), _beaconServerStatusProvider() @@ -126,12 +126,12 @@ bool ServerContextImpl::isChannelProviderNamePreconfigured() return config->hasProperty("EPICS_PVA_PROVIDER_NAMES") || config->hasProperty("EPICS_PVAS_PROVIDER_NAMES"); } -void ServerContextImpl::initialize(ChannelAccess::shared_pointer const & channelAccess) +void ServerContextImpl::initialize(ChannelProviderRegistry::shared_pointer const & channelProviderRegistry) { Lock guard(_mutex); - if (channelAccess == NULL) + if (channelProviderRegistry == NULL) { - THROW_BASE_EXCEPTION("non null channelAccess expected"); + THROW_BASE_EXCEPTION("non null channelProviderRegistry expected"); } if (_state == DESTROYED) @@ -143,7 +143,7 @@ void ServerContextImpl::initialize(ChannelAccess::shared_pointer const & channel THROW_BASE_EXCEPTION("Context already initialized."); } - _channelAccess = channelAccess; + _channelProviderRegistry = channelProviderRegistry; // user all providers @@ -151,10 +151,10 @@ void ServerContextImpl::initialize(ChannelAccess::shared_pointer const & channel { _channelProviderNames.resize(0); // VxWorks 5.5 omits clear() - std::auto_ptr names = _channelAccess->getProviderNames(); - for (ChannelAccess::stringVector_t::iterator iter = names->begin(); iter != names->end(); iter++) + std::auto_ptr names = _channelProviderRegistry->getProviderNames(); + for (ChannelProviderRegistry::stringVector_t::iterator iter = names->begin(); iter != names->end(); iter++) { - ChannelProvider::shared_pointer channelProvider = _channelAccess->getProvider(*iter); + ChannelProvider::shared_pointer channelProvider = _channelProviderRegistry->getProvider(*iter); if (channelProvider) { _channelProviders.push_back(channelProvider); @@ -173,13 +173,13 @@ void ServerContextImpl::initialize(ChannelAccess::shared_pointer const & channel std::string providerName; while (std::getline(ss, providerName, ' ')) { - ChannelProvider::shared_pointer channelProvider = _channelAccess->getProvider(providerName); + ChannelProvider::shared_pointer channelProvider = _channelProviderRegistry->getProvider(providerName); if (channelProvider) _channelProviders.push_back(channelProvider); } } - //_channelProvider = _channelAccess->getProvider(_channelProviderNames); + //_channelProvider = _channelProviderRegistry->getProvider(_channelProviderNames); if (_channelProviders.size() == 0) { std::string msg = "None of the specified channel providers are available: " + _channelProviderNames + "."; @@ -545,9 +545,9 @@ BlockingUDPTransport::shared_pointer ServerContextImpl::getBroadcastTransport() return _broadcastTransport; } -ChannelAccess::shared_pointer ServerContextImpl::getChannelAccess() +ChannelProviderRegistry::shared_pointer ServerContextImpl::getChannelProviderRegistry() { - return _channelAccess; + return _channelProviderRegistry; } std::string ServerContextImpl::getChannelProviderName() @@ -621,8 +621,8 @@ ServerContext::shared_pointer startPVAServer(String const & providerNames, int t if (!ctx->isChannelProviderNamePreconfigured()) ctx->setChannelProviderName(providerNames); - ChannelAccess::shared_pointer channelAccess = getChannelAccess(); - ctx->initialize(channelAccess); + ChannelProviderRegistry::shared_pointer channelProviderRegistry = getChannelProviderRegistry(); + ctx->initialize(channelProviderRegistry); if (printInfo) ctx->printInfo(); diff --git a/pvAccessApp/server/serverContext.h b/pvAccessApp/server/serverContext.h index 79dd3d4..bb9afd0 100644 --- a/pvAccessApp/server/serverContext.h +++ b/pvAccessApp/server/serverContext.h @@ -42,10 +42,10 @@ public: virtual const Version& getVersion() = 0; /** - * Set ChannelAccess implementation and initialize server. - * @param channelAccess implementation of channel access to be served. + * Set ChannelProviderRegistry implementation and initialize server. + * @param channelProviderRegistry channel providers registry to be used. */ - virtual void initialize(ChannelAccess::shared_pointer const & channelAccess) = 0; + virtual void initialize(ChannelProviderRegistry::shared_pointer const & channelProviderRegistry) = 0; /** * Run server (process events). @@ -116,7 +116,7 @@ public: //**************** derived from ServerContext ****************// const Version& getVersion(); - void initialize(ChannelAccess::shared_pointer const & channelAccess); + void initialize(ChannelProviderRegistry::shared_pointer const & channelProviderRegistry); void run(epics::pvData::int32 seconds); void shutdown(); void destroy(); @@ -253,10 +253,10 @@ public: BlockingUDPTransport::shared_pointer getBroadcastTransport(); /** - * Get channel access implementation. - * @return channel access implementation. + * Get channel provider registry implementation used by this instance. + * @return channel provider registry used by this instance. */ - ChannelAccess::shared_pointer getChannelAccess(); + ChannelProviderRegistry::shared_pointer getChannelProviderRegistry(); /** * Get channel provider name. @@ -354,7 +354,7 @@ private: /** * Channel access. */ - ChannelAccess::shared_pointer _channelAccess; + ChannelProviderRegistry::shared_pointer _channelProviderRegistry; /** * Channel provider name.