From d46f024e6bcc5becc9e7614fe2f8b0740236dbc3 Mon Sep 17 00:00:00 2001 From: Matej Sekoranja Date: Mon, 7 Nov 2011 13:29:30 +0100 Subject: [PATCH] symmetric RPC --- pvAccessApp/client/pvAccess.h | 7 ++-- .../remoteClient/clientContextImpl.cpp | 35 +++++++------------ pvAccessApp/server/responseHandlers.cpp | 30 ++++------------ pvAccessApp/server/responseHandlers.h | 13 ++----- testApp/remote/testRemoteClientImpl.cpp | 16 ++------- testApp/remote/testServer.cpp | 8 ++--- 6 files changed, 30 insertions(+), 79 deletions(-) diff --git a/pvAccessApp/client/pvAccess.h b/pvAccessApp/client/pvAccess.h index d1fc0b9..b45f4d3 100644 --- a/pvAccessApp/client/pvAccess.h +++ b/pvAccessApp/client/pvAccess.h @@ -427,9 +427,10 @@ namespace pvAccess { /** * Issue an RPC request to the channel. * This fails if the request can not be satisfied. + * @param pvArgument The argument structure for an RPC request. * @param lastRequest Is this the last request? */ - virtual void request(bool lastRequest) = 0; + virtual void request(epics::pvData::PVStructure::shared_pointer const & pvArgument, bool lastRequest) = 0; }; @@ -446,11 +447,9 @@ namespace pvAccess { * The client and server have both completed the createChannelGet request. * @param status Completion status. * @param channelRPC The channelRPC interface or null if the request failed. - * @param pvArgument The argument structure for an RPC request. * @param bitSet The bitSet for argument changes. */ - virtual void channelRPCConnect(const epics::pvData::Status& status,ChannelRPC::shared_pointer const & channelRPC, - epics::pvData::PVStructure::shared_pointer const & pvArgument,epics::pvData::BitSet::shared_pointer const & bitSet) = 0; + virtual void channelRPCConnect(const epics::pvData::Status& status,ChannelRPC::shared_pointer const & channelRPC) = 0; /** * The request is done. This is always called with no locks held. diff --git a/pvAccessApp/remoteClient/clientContextImpl.cpp b/pvAccessApp/remoteClient/clientContextImpl.cpp index 1164273..ae63204 100644 --- a/pvAccessApp/remoteClient/clientContextImpl.cpp +++ b/pvAccessApp/remoteClient/clientContextImpl.cpp @@ -1171,7 +1171,6 @@ namespace epics { PVStructure::shared_pointer m_pvRequest; PVStructure::shared_pointer m_data; - BitSet::shared_pointer m_bitSet; Mutex m_dataMutex; @@ -1189,9 +1188,7 @@ namespace epics { if (m_pvRequest.get() == 0) { ChannelRPC::shared_pointer thisPointer = dynamic_pointer_cast(shared_from_this()); - PVStructure::shared_pointer nullPVStructure; - BitSet::shared_pointer nullBitSet; - EXCEPTION_GUARD(m_channelRPCRequester->channelRPCConnect(pvRequestNull, thisPointer, nullPVStructure, nullBitSet)); + EXCEPTION_GUARD(m_channelRPCRequester->channelRPCConnect(pvRequestNull, thisPointer)); return; } @@ -1201,9 +1198,7 @@ namespace epics { resubscribeSubscription(transport); } catch (std::runtime_error &rte) { ChannelRPC::shared_pointer thisPointer = dynamic_pointer_cast(shared_from_this()); - PVStructure::shared_pointer nullPVStructure; - BitSet::shared_pointer nullBitSet; - EXCEPTION_GUARD(m_channelRPCRequester->channelRPCConnect(channelNotConnected, thisPointer, nullPVStructure, nullBitSet)); + EXCEPTION_GUARD(m_channelRPCRequester->channelRPCConnect(channelNotConnected, thisPointer)); } } @@ -1246,8 +1241,9 @@ namespace epics { { // no need to lock here, since it is already locked via TransportSender IF //Lock lock(m_dataMutex); - m_bitSet->serialize(buffer, control); - m_data->serialize(buffer, control, m_bitSet.get()); + m_channel->getTransport()->getIntrospectionRegistry()->serializeStructure(buffer, control, m_data.get()); + // release arguments structure + m_data.reset(); } } @@ -1264,22 +1260,13 @@ namespace epics { if (!status.isSuccess()) { ChannelRPC::shared_pointer nullChannelRPC; - PVStructure::shared_pointer nullPVStructure; - BitSet::shared_pointer nullBitSet; - EXCEPTION_GUARD(m_channelRPCRequester->channelRPCConnect(status, nullChannelRPC, nullPVStructure, nullBitSet)); + EXCEPTION_GUARD(m_channelRPCRequester->channelRPCConnect(status, nullChannelRPC)); return true; } - // create data and its bitSet - { - Lock lock(m_dataMutex); - m_data.reset(transport->getIntrospectionRegistry()->deserializeStructureAndCreatePVStructure(payloadBuffer, transport.get())); - m_bitSet.reset(new BitSet(m_data->getNumberFields())); - } - // notify ChannelRPC::shared_pointer thisChannelRPC = dynamic_pointer_cast(shared_from_this()); - EXCEPTION_GUARD(m_channelRPCRequester->channelRPCConnect(status, thisChannelRPC, m_data, m_bitSet)); + EXCEPTION_GUARD(m_channelRPCRequester->channelRPCConnect(status, thisChannelRPC)); return true; } @@ -1292,12 +1279,12 @@ namespace epics { } - PVStructure::shared_pointer response(transport->getIntrospectionRegistry()->deserializeStructureAndCreatePVStructure(payloadBuffer, transport.get())); + PVStructure::shared_pointer response(transport->getIntrospectionRegistry()->deserializeStructure(payloadBuffer, transport.get())); EXCEPTION_GUARD(m_channelRPCRequester->requestDone(status, response)); return true; } - virtual void request(bool lastRequest) { + virtual void request(epics::pvData::PVStructure::shared_pointer const & pvArgument, bool lastRequest) { { Lock guard(m_mutex); @@ -1320,6 +1307,10 @@ namespace epics { } try { + m_dataMutex.lock(); + m_data = pvArgument; + m_dataMutex.unlock(); + TransportSender::shared_pointer thisSender = shared_from_this(); m_channel->checkAndGetTransport()->enqueueSendRequest(thisSender); } catch (std::runtime_error &rte) { diff --git a/pvAccessApp/server/responseHandlers.cpp b/pvAccessApp/server/responseHandlers.cpp index d85c1ee..d9aaaa0 100644 --- a/pvAccessApp/server/responseHandlers.cpp +++ b/pvAccessApp/server/responseHandlers.cpp @@ -1922,13 +1922,9 @@ void ServerRPCHandler::handleResponse(osiSockAddr* responseFrom, // deserialize put data ChannelRPC::shared_pointer channelRPC = request->getChannelRPC(); - { - ScopedLock lock(channelRPC); // TODO not really needed if channelRPC->request() is reads from the same thread - BitSet::shared_pointer changedBitSet = request->getArgumentsBitSet(); - changedBitSet->deserialize(payloadBuffer, transport.get()); - request->getPvArguments()->deserialize(payloadBuffer, transport.get(), changedBitSet.get()); - } - channelRPC->request(lastRequest); + // pvArgument + PVStructure::shared_pointer pvArgument(transport->getIntrospectionRegistry()->deserializeStructure(payloadBuffer, transport.get())); + channelRPC->request(pvArgument, lastRequest); } } @@ -1936,7 +1932,7 @@ ServerChannelRPCRequesterImpl::ServerChannelRPCRequesterImpl( ServerContextImpl::shared_pointer const & context, ServerChannelImpl::shared_pointer const & channel, const pvAccessID ioid, Transport::shared_pointer const & transport): BaseChannelRequester(context, channel, ioid, transport), - _channelRPC(), _pvArguments(), _pvResponse(), _argumentsBitSet() + _channelRPC(), _pvResponse() { } @@ -1959,12 +1955,10 @@ void ServerChannelRPCRequesterImpl::activate(PVStructure::shared_pointer const & INIT_EXCEPTION_GUARD(CMD_RPC, _channelRPC = _channel->getChannel()->createChannelRPC(thisPointer, pvRequest)); } -void ServerChannelRPCRequesterImpl::channelRPCConnect(const Status& status, ChannelRPC::shared_pointer const & channelRPC, PVStructure::shared_pointer const & arguments, BitSet::shared_pointer const & bitSet) +void ServerChannelRPCRequesterImpl::channelRPCConnect(const Status& status, ChannelRPC::shared_pointer const & channelRPC) { { Lock guard(_mutex); - _pvArguments = arguments; - _argumentsBitSet = bitSet; _status = status; _channelRPC = channelRPC; } @@ -2019,18 +2013,6 @@ ChannelRPC::shared_pointer ServerChannelRPCRequesterImpl::getChannelRPC() return _channelRPC; } -PVStructure::shared_pointer ServerChannelRPCRequesterImpl::getPvArguments() -{ - //Lock guard(_mutex); - return _pvArguments; -} - -BitSet::shared_pointer ServerChannelRPCRequesterImpl::getArgumentsBitSet() -{ - //Lock guard(_mutex); - return _argumentsBitSet; -} - void ServerChannelRPCRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* control) { const int32 request = getPendingRequest(); @@ -2048,7 +2030,7 @@ void ServerChannelRPCRequesterImpl::send(ByteBuffer* buffer, TransportSendContro { if ((QOS_INIT & request) != 0) { - introspectionRegistry->serialize(_pvArguments != NULL ? _pvArguments->getField() : FieldConstPtr(), buffer, control); + // noop } else { diff --git a/pvAccessApp/server/responseHandlers.h b/pvAccessApp/server/responseHandlers.h index 283e676..74921ba 100644 --- a/pvAccessApp/server/responseHandlers.h +++ b/pvAccessApp/server/responseHandlers.h @@ -728,7 +728,7 @@ namespace pvAccess { ServerChannelImpl::shared_pointer const & channel, const pvAccessID ioid, Transport::shared_pointer const & transport,epics::pvData::PVStructure::shared_pointer const & pvRequest); - void channelRPCConnect(const epics::pvData::Status& status, ChannelRPC::shared_pointer const & channelRPC, epics::pvData::PVStructure::shared_pointer const & arguments, epics::pvData::BitSet::shared_pointer const & bitSet); + void channelRPCConnect(const epics::pvData::Status& status, ChannelRPC::shared_pointer const & channelRPC); void requestDone(const epics::pvData::Status& status, epics::pvData::PVStructure::shared_pointer const & pvResponse); void lock(); void unlock(); @@ -737,20 +737,11 @@ namespace pvAccess { * @return the channelRPC */ ChannelRPC::shared_pointer getChannelRPC(); - /** - * @return the pvArguments - */ - epics::pvData::PVStructure::shared_pointer getPvArguments(); - /** - * @return the agrumentsepics::pvData::BitSet - */ - epics::pvData::BitSet::shared_pointer getArgumentsBitSet(); + void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control); private: ChannelRPC::shared_pointer _channelRPC; - epics::pvData::PVStructure::shared_pointer _pvArguments; epics::pvData::PVStructure::shared_pointer _pvResponse; - epics::pvData::BitSet::shared_pointer _argumentsBitSet; epics::pvData::Status _status; }; } diff --git a/testApp/remote/testRemoteClientImpl.cpp b/testApp/remote/testRemoteClientImpl.cpp index 2ba3cd1..3ae3f57 100644 --- a/testApp/remote/testRemoteClientImpl.cpp +++ b/testApp/remote/testRemoteClientImpl.cpp @@ -267,8 +267,6 @@ class ChannelPutGetRequesterImpl : public ChannelPutGetRequester class ChannelRPCRequesterImpl : public ChannelRPCRequester { //ChannelRPC::shared_pointer m_channelRPC; - epics::pvData::PVStructure::shared_pointer m_pvStructure; - epics::pvData::BitSet::shared_pointer m_bitSet; virtual String getRequesterName() { @@ -280,20 +278,11 @@ class ChannelRPCRequesterImpl : public ChannelRPCRequester std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; } - virtual void channelRPCConnect(const epics::pvData::Status& status,ChannelRPC::shared_pointer const & channelRPC, - epics::pvData::PVStructure::shared_pointer const & pvStructure,epics::pvData::BitSet::shared_pointer const & bitSet) + virtual void channelRPCConnect(const epics::pvData::Status& status,ChannelRPC::shared_pointer const & channelRPC) { std::cout << "channelRPCConnect(" << status.toString() << ")" << std::endl; - if (status.isSuccess()) - { - String st; - pvStructure->toString(&st); - std::cout << st << std::endl; - } //m_channelRPC = channelRPC; - m_pvStructure = pvStructure; - m_bitSet = bitSet; } virtual void requestDone(const epics::pvData::Status& status,epics::pvData::PVStructure::shared_pointer const & pvResponse) @@ -529,7 +518,8 @@ int main(int argc,char *argv[]) PVStructure::shared_pointer pvRequest = getCreateRequest()->createRequest("record[]field(arguments)"); ChannelRPC::shared_pointer channelRPC = channel->createChannelRPC(channelRPCRequesterImpl, pvRequest); epicsThreadSleep ( 1.0 ); - channelRPC->request(false); + // for test simply use pvRequest as arguments + channelRPC->request(pvRequest, false); epicsThreadSleep ( 1.0 ); channelRPC->destroy(); } diff --git a/testApp/remote/testServer.cpp b/testApp/remote/testServer.cpp index 5e6f874..5a22143 100644 --- a/testApp/remote/testServer.cpp +++ b/testApp/remote/testServer.cpp @@ -371,12 +371,10 @@ class MockChannelRPC : public ChannelRPC private: ChannelRPCRequester::shared_pointer m_channelRPCRequester; PVStructure::shared_pointer m_pvStructure; - BitSet::shared_pointer m_bitSet; protected: MockChannelRPC(ChannelRPCRequester::shared_pointer const & channelRPCRequester, PVStructure::shared_pointer const & pvStructure, PVStructure::shared_pointer const & pvRequest) : - m_channelRPCRequester(channelRPCRequester), m_pvStructure(pvStructure), - m_bitSet(new BitSet(pvStructure->getNumberFields())) + m_channelRPCRequester(channelRPCRequester), m_pvStructure(pvStructure) { PVDATA_REFCOUNT_MONITOR_CONSTRUCT(mockChannelRPC); } @@ -386,7 +384,7 @@ class MockChannelRPC : public ChannelRPC { ChannelRPC::shared_pointer thisPtr(new MockChannelRPC(channelRPCRequester, pvStructure, pvRequest)); // TODO pvRequest - channelRPCRequester->channelRPCConnect(Status::OK, thisPtr, pvStructure, static_cast(thisPtr.get())->m_bitSet); + channelRPCRequester->channelRPCConnect(Status::OK, thisPtr); return thisPtr; } @@ -395,7 +393,7 @@ class MockChannelRPC : public ChannelRPC PVDATA_REFCOUNT_MONITOR_DESTRUCT(mockChannelRPC); } - virtual void request(bool lastRequest) + virtual void request(epics::pvData::PVStructure::shared_pointer const & pvArgument, bool lastRequest) { m_channelRPCRequester->requestDone(Status::OK, m_pvStructure); if (lastRequest)