work on server API changes

This commit is contained in:
Matej Sekoranja
2014-05-21 12:01:02 +02:00
parent 2c4bffd8d9
commit 6ae61258ca
6 changed files with 302 additions and 133 deletions

View File

@@ -73,7 +73,7 @@ class ChannelRPCRequesterImpl : public ChannelRPCRequester
std::cerr << "[" << getRequesterName() << "] message(" << message << ", " << getMessageTypeName(messageType) << ")" << std::endl;
}
virtual void channelRPCConnect(const epics::pvData::Status& status,ChannelRPC::shared_pointer const & channelRPC)
virtual void channelRPCConnect(const epics::pvData::Status& status, ChannelRPC::shared_pointer const & channelRPC)
{
if (status.isSuccess())
{
@@ -97,7 +97,8 @@ class ChannelRPCRequesterImpl : public ChannelRPCRequester
}
}
virtual void requestDone (const epics::pvData::Status &status, epics::pvData::PVStructure::shared_pointer const &pvResponse)
virtual void requestDone(const epics::pvData::Status &status, ChannelRPC::shared_pointer const & /*channelRPC*/,
epics::pvData::PVStructure::shared_pointer const &pvResponse)
{
if (status.isSuccess())
{
@@ -214,7 +215,7 @@ private:
void init()
{
using namespace std::tr1;
m_provider = getChannelAccess()->getProvider("pva");
m_provider = getChannelProviderRegistry()->getProvider("pva");
shared_ptr<ChannelRequesterImpl> channelRequesterImpl(new ChannelRequesterImpl());
m_channelRequesterImpl = channelRequesterImpl;
@@ -253,7 +254,8 @@ PVStructure::shared_pointer RPCClientImpl::request(PVStructure::shared_pointer p
if (rpcRequesterImpl->waitUntilConnected(timeOut))
{
channelRPC->request(pvRequest, true);
channelRPC->lastRequest();
channelRPC->request(pvRequest);
allOK &= rpcRequesterImpl->waitUntilRPC(timeOut);
response = rpcRequesterImpl->response;
}

View File

@@ -14,18 +14,22 @@ using namespace epics::pvData;
namespace epics { namespace pvAccess {
class ChannelRPCServiceImpl : public ChannelRPC
class ChannelRPCServiceImpl :
public ChannelRPC,
public std::tr1::enable_shared_from_this<ChannelRPC>
{
private:
ChannelRPCRequester::shared_pointer m_channelRPCRequester;
RPCService::shared_pointer m_rpcService;
AtomicBoolean m_lastRequest;
public:
ChannelRPCServiceImpl(
ChannelRPCRequester::shared_pointer const & channelRPCRequester,
RPCService::shared_pointer const & rpcService) :
m_channelRPCRequester(channelRPCRequester),
m_rpcService(rpcService)
m_rpcService(rpcService),
m_lastRequest()
{
}
@@ -34,7 +38,7 @@ class ChannelRPCServiceImpl : public ChannelRPC
destroy();
}
void processRequest(epics::pvData::PVStructure::shared_pointer const & pvArgument, bool lastRequest)
void processRequest(epics::pvData::PVStructure::shared_pointer const & pvArgument)
{
epics::pvData::PVStructure::shared_pointer result;
Status status = Status::Ok;
@@ -66,18 +70,23 @@ class ChannelRPCServiceImpl : public ChannelRPC
status = Status(Status::STATUSTYPE_FATAL, "RPCService.request(PVStructure) returned null.");
}
m_channelRPCRequester->requestDone(status, result);
m_channelRPCRequester->requestDone(status, shared_from_this(), result);
if (lastRequest)
if (m_lastRequest.get())
destroy();
}
virtual void request(epics::pvData::PVStructure::shared_pointer const & pvArgument, bool lastRequest)
virtual void request(epics::pvData::PVStructure::shared_pointer const & pvArgument)
{
processRequest(pvArgument, lastRequest);
processRequest(pvArgument);
}
void lastRequest()
{
m_lastRequest.set();
}
virtual void cancel()
{
// noop
@@ -197,7 +206,7 @@ public:
{
ChannelGet::shared_pointer nullPtr;
channelGetRequester->channelGetConnect(notSupportedStatus, nullPtr,
epics::pvData::PVStructure::shared_pointer(), epics::pvData::BitSet::shared_pointer());
epics::pvData::Structure::const_shared_pointer());
return nullPtr;
}
@@ -207,7 +216,7 @@ public:
{
ChannelPut::shared_pointer nullPtr;
channelPutRequester->channelPutConnect(notSupportedStatus, nullPtr,
epics::pvData::PVStructure::shared_pointer(), epics::pvData::BitSet::shared_pointer());
epics::pvData::Structure::const_shared_pointer());
return nullPtr;
}
@@ -217,7 +226,7 @@ public:
epics::pvData::PVStructure::shared_pointer const & /*pvRequest*/)
{
ChannelPutGet::shared_pointer nullPtr;
epics::pvData::PVStructure::shared_pointer nullStructure;
epics::pvData::Structure::const_shared_pointer nullStructure;
channelPutGetRequester->channelPutGetConnect(notSupportedStatus, nullPtr, nullStructure, nullStructure);
return nullPtr;
}
@@ -257,7 +266,7 @@ public:
epics::pvData::PVStructure::shared_pointer const & /*pvRequest*/)
{
ChannelArray::shared_pointer nullPtr;
channelArrayRequester->channelArrayConnect(notSupportedStatus, nullPtr, epics::pvData::PVArray::shared_pointer());
channelArrayRequester->channelArrayConnect(notSupportedStatus, nullPtr, epics::pvData::Array::const_shared_pointer());
return nullPtr;
}
@@ -443,7 +452,7 @@ RPCServer::RPCServer()
m_serverContext = ServerContextImpl::create();
m_serverContext->setChannelProviderName(m_channelProviderImpl->getProviderName());
m_serverContext->initialize(getChannelAccess());
m_serverContext->initialize(getChannelProviderRegistry());
}
RPCServer::~RPCServer()
@@ -482,7 +491,7 @@ static void threadRunner(void* usr)
void RPCServer::runInNewThread(int seconds)
{
std::auto_ptr<ThreadRunnerParam> param(new ThreadRunnerParam());
param->server = rpcServer;
param->server = shared_from_this();
param->timeToRun = seconds;
epicsThreadCreate("RPCServer thread",

View File

@@ -8,6 +8,7 @@
#include <pv/remote.h>
#include <pv/hexDump.h>
#include <pv/serializationHelper.h>
#include <pv/convert.h>
#include <pv/byteBuffer.h>
@@ -29,6 +30,38 @@ using namespace epics::pvData;
namespace epics {
namespace pvAccess {
// TODO this is a copy from clientContextImpl.cpp
static PVDataCreatePtr pvDataCreate = getPVDataCreate();
static BitSet::shared_pointer createBitSetFor(
PVStructure::shared_pointer const & pvStructure,
BitSet::shared_pointer const & existingBitSet)
{
int pvStructureSize = pvStructure->getNumberFields();
if (existingBitSet.get() && static_cast<int32>(existingBitSet->size()) >= pvStructureSize)
{
// clear existing BitSet
// also necessary if larger BitSet is reused
existingBitSet->clear();
return existingBitSet;
}
else
return BitSet::shared_pointer(new BitSet(pvStructureSize));
}
static PVField::shared_pointer reuseOrCreatePVField(
Field::const_shared_pointer const & field,
PVField::shared_pointer const & existingPVField)
{
if (existingPVField.get() && *field == *existingPVField->getField())
return existingPVField;
else
return pvDataCreate->createPVField(field);
}
void ServerBadResponse::handleResponse(osiSockAddr* responseFrom,
Transport::shared_pointer const & transport, int8 version, int8 command,
size_t payloadSize, ByteBuffer* payloadBuffer)
@@ -574,7 +607,9 @@ void ServerGetHandler::handleResponse(osiSockAddr* responseFrom,
MB_POINT(channelGet, 4, "server channelGet->deserialize request (end)");
request->getChannelGet()->get(lastRequest);
if (lastRequest)
request->getChannelGet()->lastRequest();
request->getChannelGet()->get();
}
}
@@ -609,7 +644,7 @@ void ServerGetHandler::handleResponse(osiSockAddr* responseFrom,
}
ServerChannelGetRequesterImpl::ServerChannelGetRequesterImpl(ServerContextImpl::shared_pointer const & context, ServerChannelImpl::shared_pointer const & channel, const pvAccessID ioid, Transport::shared_pointer const & transport) :
BaseChannelRequester(context, channel, ioid, transport), _channelGet(), _bitSet(), _pvStructure()
BaseChannelRequester(context, channel, ioid, transport)
{
}
@@ -631,14 +666,13 @@ void ServerChannelGetRequesterImpl::activate(PVStructure::shared_pointer const &
INIT_EXCEPTION_GUARD(CMD_GET, _channelGet = _channel->getChannel()->createChannelGet(thisPointer, pvRequest));
}
void ServerChannelGetRequesterImpl::channelGetConnect(const Status& status, ChannelGet::shared_pointer const & channelGet, PVStructure::shared_pointer const & pvStructure, BitSet::shared_pointer const & bitSet)
void ServerChannelGetRequesterImpl::channelGetConnect(const Status& status, ChannelGet::shared_pointer const & channelGet, Structure::const_shared_pointer const & structure)
{
{
Lock guard(_mutex);
_bitSet = bitSet;
_pvStructure = pvStructure;
_status = status;
_channelGet = channelGet;
_structure = structure;
}
TransportSender::shared_pointer thisSender = shared_from_this();
_transport->enqueueSendRequest(thisSender);
@@ -650,12 +684,15 @@ void ServerChannelGetRequesterImpl::channelGetConnect(const Status& status, Chan
}
}
void ServerChannelGetRequesterImpl::getDone(const Status& status)
void ServerChannelGetRequesterImpl::getDone(const Status& status, ChannelGet::shared_pointer const & /*channelGet*/,
PVStructure::shared_pointer const & pvStructure, BitSet::shared_pointer const & bitSet)
{
MB_POINT(channelGet, 5, "server channelGet->getDone()");
{
Lock guard(_mutex);
_status = status;
_pvStructure = pvStructure;
_bitSet = bitSet;
}
TransportSender::shared_pointer thisSender = shared_from_this();
_transport->enqueueSendRequest(thisSender);
@@ -687,12 +724,12 @@ ChannelGet::shared_pointer ServerChannelGetRequesterImpl::getChannelGet()
void ServerChannelGetRequesterImpl::lock()
{
//noop
// noop
}
void ServerChannelGetRequesterImpl::unlock()
{
//noop
// noop
}
void ServerChannelGetRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* control)
@@ -719,18 +756,20 @@ void ServerChannelGetRequesterImpl::send(ByteBuffer* buffer, TransportSendContro
if (request & QOS_INIT)
{
Lock guard(_mutex);
control->cachedSerialize(_pvStructure != NULL ? _pvStructure->getField() : FieldConstPtr(), buffer);
control->cachedSerialize(_structure, buffer);
}
else
{
MB_POINT(channelGet, 6, "server channelGet->serialize response (start)");
{
// we locked _mutex above, so _channelGet is valid
ScopedLock lock(_channelGet);
_bitSet->serialize(buffer, control);
_pvStructure->serialize(buffer, control, _bitSet.get());
// we locked _mutex above, so _channelGet is valid
ScopedLock lock(_channelGet);
_bitSet->serialize(buffer, control);
_pvStructure->serialize(buffer, control, _bitSet.get());
_pvStructure.reset();
_bitSet.reset();
}
MB_POINT(channelGet, 7, "server channelGet->serialize response (end)");
}
@@ -796,33 +835,40 @@ void ServerPutHandler::handleResponse(osiSockAddr* responseFrom,
return;
}
ChannelPut::shared_pointer channelPut = request->getChannelPut();
if (lastRequest)
channelPut->lastRequest();
if (get)
{
// no destroy w/ get
request->getChannelPut()->get();
channelPut->get();
}
else
{
// deserialize bitSet and do a put
ChannelPut::shared_pointer channelPut = request->getChannelPut();
{
ScopedLock lock(channelPut); // TODO not needed if put is processed by the same thread
BitSet::shared_pointer putBitSet = request->getBitSet();
BitSet::shared_pointer putBitSet = request->getPutBitSet();
PVStructure::shared_pointer putPVStructure = request->getPutPVStructure();
DESERIALIZE_EXCEPTION_GUARD(
putBitSet->deserialize(payloadBuffer, transport.get());
request->getPVStructure()->deserialize(payloadBuffer, transport.get(), putBitSet.get());
putPVStructure->deserialize(payloadBuffer, transport.get(), putBitSet.get());
);
lock.unlock();
channelPut->put(putPVStructure, putBitSet);
}
channelPut->put(lastRequest);
}
}
}
ServerChannelPutRequesterImpl::ServerChannelPutRequesterImpl(ServerContextImpl::shared_pointer const & context, ServerChannelImpl::shared_pointer const & channel,
const pvAccessID ioid, Transport::shared_pointer const & transport):
BaseChannelRequester(context, channel, ioid, transport), _channelPut(), _bitSet(), _pvStructure()
BaseChannelRequester(context, channel, ioid, transport)
{
}
@@ -843,15 +889,17 @@ void ServerChannelPutRequesterImpl::activate(PVStructure::shared_pointer const &
INIT_EXCEPTION_GUARD(CMD_PUT, _channelPut = _channel->getChannel()->createChannelPut(thisPointer, pvRequest));
}
void ServerChannelPutRequesterImpl::channelPutConnect(const Status& status, ChannelPut::shared_pointer const & channelPut, PVStructure::shared_pointer const & pvStructure, BitSet::shared_pointer const & bitSet)
void ServerChannelPutRequesterImpl::channelPutConnect(const Status& status, ChannelPut::shared_pointer const & channelPut, Structure::const_shared_pointer const & structure)
{
{
Lock guard(_mutex);
_bitSet = bitSet;
_pvStructure = pvStructure;
_status = status;
_channelPut = channelPut;
_structure = structure;
}
_putPVStructure = std::tr1::static_pointer_cast<PVStructure>(reuseOrCreatePVField(_structure, _putPVStructure));
_putBitSet = createBitSetFor(_putPVStructure, _putBitSet);
TransportSender::shared_pointer thisSender = shared_from_this();
_transport->enqueueSendRequest(thisSender);
@@ -863,7 +911,7 @@ void ServerChannelPutRequesterImpl::channelPutConnect(const Status& status, Chan
}
}
void ServerChannelPutRequesterImpl::putDone(const Status& status)
void ServerChannelPutRequesterImpl::putDone(const Status& status, ChannelPut::shared_pointer const & /*channelPut*/)
{
{
Lock guard(_mutex);
@@ -873,11 +921,13 @@ void ServerChannelPutRequesterImpl::putDone(const Status& status)
_transport->enqueueSendRequest(thisSender);
}
void ServerChannelPutRequesterImpl::getDone(const Status& status)
void ServerChannelPutRequesterImpl::getDone(const Status& status, ChannelPut::shared_pointer const & /*channelPut*/, PVStructure::shared_pointer const & pvStructure, BitSet::shared_pointer const & bitSet)
{
{
Lock guard(_mutex);
_status = status;
_pvStructure = pvStructure;
_bitSet = bitSet;
}
TransportSender::shared_pointer thisSender = shared_from_this();
_transport->enqueueSendRequest(thisSender);
@@ -918,16 +968,16 @@ ChannelPut::shared_pointer ServerChannelPutRequesterImpl::getChannelPut()
return _channelPut;
}
BitSet::shared_pointer ServerChannelPutRequesterImpl::getBitSet()
BitSet::shared_pointer ServerChannelPutRequesterImpl::getPutBitSet()
{
//Lock guard(_mutex);
return _bitSet;
return _putBitSet;
}
PVStructure::shared_pointer ServerChannelPutRequesterImpl::getPVStructure()
PVStructure::shared_pointer ServerChannelPutRequesterImpl::getPutPVStructure()
{
//Lock guard(_mutex);
return _pvStructure;
return _putPVStructure;
}
void ServerChannelPutRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* control)
@@ -947,12 +997,13 @@ void ServerChannelPutRequesterImpl::send(ByteBuffer* buffer, TransportSendContro
if ((QOS_INIT & request) != 0)
{
Lock guard(_mutex);
control->cachedSerialize(_pvStructure != NULL ? _pvStructure->getField() : FieldConstPtr(), buffer);
control->cachedSerialize(_structure, buffer);
}
else if ((QOS_GET & request) != 0)
{
ScopedLock lock(_channelPut); // _channelPut is valid because we required _mutex above
_pvStructure->serialize(buffer, control);
_bitSet->serialize(buffer, control);
_pvStructure->serialize(buffer, control, _bitSet.get());
}
}
@@ -1016,26 +1067,34 @@ void ServerPutGetHandler::handleResponse(osiSockAddr* responseFrom,
return;
}
ChannelPutGet::shared_pointer channelPutGet = request->getChannelPutGet();
if (lastRequest)
channelPutGet->lastRequest();
if (getGet)
{
request->getChannelPutGet()->getGet();
channelPutGet->getGet();
}
else if(getPut)
{
request->getChannelPutGet()->getPut();
channelPutGet->getPut();
}
else
{
// deserialize bitSet and do a put
ChannelPutGet::shared_pointer channelPutGet = request->getChannelPutGet();
{
ScopedLock lock(channelPutGet); // TODO not necessary if read is done in putGet
BitSet::shared_pointer putBitSet = request->getPutGetBitSet();
PVStructure::shared_pointer putPVStructure = request->getPutGetPVStructure();
DESERIALIZE_EXCEPTION_GUARD(
request->getPVPutStructure()->deserialize(payloadBuffer, transport.get());
);
putBitSet->deserialize(payloadBuffer, transport.get());
putPVStructure->deserialize(payloadBuffer, transport.get(), putBitSet.get());
);
lock.unlock();
channelPutGet->putGet(putPVStructure, putBitSet);
}
channelPutGet->putGet(lastRequest);
}
}
}
@@ -1064,15 +1123,18 @@ void ServerChannelPutGetRequesterImpl::activate(PVStructure::shared_pointer cons
}
void ServerChannelPutGetRequesterImpl::channelPutGetConnect(const Status& status, ChannelPutGet::shared_pointer const & channelPutGet,
PVStructure::shared_pointer const & pvPutStructure, PVStructure::shared_pointer const & pvGetStructure)
Structure::const_shared_pointer const & putStructure, Structure::const_shared_pointer const & getStructure)
{
{
Lock guard(_mutex);
_pvPutStructure = pvPutStructure;
_pvGetStructure = pvGetStructure;
_status = status;
_channelPutGet = channelPutGet;
_putStructure = putStructure;
_getStructure = getStructure;
}
_pvPutGetStructure = std::tr1::static_pointer_cast<PVStructure>(reuseOrCreatePVField(_putStructure, _pvPutGetStructure));
_pvPutGetBitSet = createBitSetFor(_pvPutGetStructure, _pvPutGetBitSet);
TransportSender::shared_pointer thisSender = shared_from_this();
_transport->enqueueSendRequest(thisSender);
@@ -1084,31 +1146,40 @@ void ServerChannelPutGetRequesterImpl::channelPutGetConnect(const Status& status
}
}
void ServerChannelPutGetRequesterImpl::getGetDone(const Status& status)
void ServerChannelPutGetRequesterImpl::getGetDone(const Status& status, ChannelPutGet::shared_pointer const & /*channelPutGet*/,
PVStructure::shared_pointer const & pvStructure, BitSet::shared_pointer const & bitSet)
{
{
Lock guard(_mutex);
_status = status;
_pvGetStructure = pvStructure;
_pvGetBitSet = bitSet;
}
TransportSender::shared_pointer thisSender = shared_from_this();
_transport->enqueueSendRequest(thisSender);
}
void ServerChannelPutGetRequesterImpl::getPutDone(const Status& status)
void ServerChannelPutGetRequesterImpl::getPutDone(const Status& status, ChannelPutGet::shared_pointer const & /*channelPutGet*/,
PVStructure::shared_pointer const & pvStructure, BitSet::shared_pointer const & bitSet)
{
{
Lock guard(_mutex);
_status = status;
_pvPutStructure = pvStructure;
_pvPutBitSet = bitSet;
}
TransportSender::shared_pointer thisSender = shared_from_this();
_transport->enqueueSendRequest(thisSender);
}
void ServerChannelPutGetRequesterImpl::putGetDone(const Status& status)
void ServerChannelPutGetRequesterImpl::putGetDone(const Status& status, ChannelPutGet::shared_pointer const & /*channelPutGet*/,
PVStructure::shared_pointer const & pvStructure, BitSet::shared_pointer const & bitSet)
{
{
Lock guard(_mutex);
_status = status;
_pvGetStructure = pvStructure;
_pvGetBitSet = bitSet;
}
TransportSender::shared_pointer thisSender = shared_from_this();
_transport->enqueueSendRequest(thisSender);
@@ -1116,12 +1187,12 @@ void ServerChannelPutGetRequesterImpl::putGetDone(const Status& status)
void ServerChannelPutGetRequesterImpl::lock()
{
//noop
// noop
}
void ServerChannelPutGetRequesterImpl::unlock()
{
//noop
// noop
}
void ServerChannelPutGetRequesterImpl::destroy()
@@ -1149,10 +1220,16 @@ ChannelPutGet::shared_pointer ServerChannelPutGetRequesterImpl::getChannelPutGet
return _channelPutGet;
}
PVStructure::shared_pointer ServerChannelPutGetRequesterImpl::getPVPutStructure()
PVStructure::shared_pointer ServerChannelPutGetRequesterImpl::getPutGetPVStructure()
{
//Lock guard(_mutex);
return _pvPutStructure;
return _pvPutGetStructure;
}
BitSet::shared_pointer ServerChannelPutGetRequesterImpl::getPutGetBitSet()
{
//Lock guard(_mutex);
return _pvPutGetBitSet;
}
void ServerChannelPutGetRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* control)
@@ -1172,25 +1249,28 @@ void ServerChannelPutGetRequesterImpl::send(ByteBuffer* buffer, TransportSendCon
if ((QOS_INIT & request) != 0)
{
Lock guard(_mutex);
control->cachedSerialize(_pvPutStructure != NULL ? _pvPutStructure->getField() : FieldConstPtr(), buffer);
control->cachedSerialize(_pvGetStructure != NULL ? _pvGetStructure->getField() : FieldConstPtr(), buffer);
control->cachedSerialize(_putStructure, buffer);
control->cachedSerialize(_getStructure, buffer);
}
else if ((QOS_GET & request) != 0)
{
Lock guard(_mutex);
_pvGetStructure->serialize(buffer, control);
_pvGetBitSet->serialize(buffer, control);
_pvGetStructure->serialize(buffer, control, _pvGetBitSet.get());
}
else if ((QOS_GET_PUT & request) != 0)
{
ScopedLock lock(_channelPutGet); // valid due to _mutex lock above
//Lock guard(_mutex);
_pvPutStructure->serialize(buffer, control);
_pvPutBitSet->serialize(buffer, control);
_pvPutStructure->serialize(buffer, control, _pvPutBitSet.get());
}
else
{
ScopedLock lock(_channelPutGet); // valid due to _mutex lock above
//Lock guard(_mutex);
_pvGetStructure->serialize(buffer, control);
_pvGetBitSet->serialize(buffer, control);
_pvGetStructure->serialize(buffer, control, _pvGetBitSet.get());
}
}
@@ -1465,6 +1545,7 @@ void ServerArrayHandler::handleResponse(osiSockAddr* responseFrom,
const bool lastRequest = (QOS_DESTROY & qosCode) != 0;
const bool get = (QOS_GET & qosCode) != 0;
const bool setLength = (QOS_GET_PUT & qosCode) != 0;
const bool getLength = (QOS_PROCESS & qosCode) != 0;
ServerChannelArrayRequesterImpl::shared_pointer request = static_pointer_cast<ServerChannelArrayRequesterImpl>(channel->getRequest(ioid));
if (request == NULL)
@@ -1479,34 +1560,43 @@ void ServerArrayHandler::handleResponse(osiSockAddr* responseFrom,
return;
}
ChannelArray::shared_pointer channelArray = request->getChannelArray();
if (lastRequest)
channelArray->lastRequest();
if (get)
{
size_t offset = SerializeHelper::readSize(payloadBuffer, transport.get());
size_t count = SerializeHelper::readSize(payloadBuffer, transport.get());
request->getChannelArray()->getArray(lastRequest, offset, count);
size_t stride = SerializeHelper::readSize(payloadBuffer, transport.get());
request->getChannelArray()->getArray(offset, count, stride);
}
else if (setLength)
{
size_t length = SerializeHelper::readSize(payloadBuffer, transport.get());
size_t capacity = SerializeHelper::readSize(payloadBuffer, transport.get());
request->getChannelArray()->setLength(lastRequest, length, capacity);
request->getChannelArray()->setLength(length, capacity);
}
else if (getLength)
{
request->getChannelArray()->getLength();
}
else
{
// deserialize data to put
size_t offset;
ChannelArray::shared_pointer channelArray = request->getChannelArray();
size_t stride;
PVArray::shared_pointer array = request->getPVArray();
{
ScopedLock lock(channelArray); // TODO not needed if read by the same thread
DESERIALIZE_EXCEPTION_GUARD(
offset = SerializeHelper::readSize(payloadBuffer, transport.get());
stride = SerializeHelper::readSize(payloadBuffer, transport.get());
array->deserialize(payloadBuffer, transport.get());
);
}
channelArray->putArray(lastRequest, offset, array->getLength());
channelArray->putArray(array, offset, array->getLength(), stride);
}
}
}
@@ -1536,14 +1626,17 @@ void ServerChannelArrayRequesterImpl::activate(PVStructure::shared_pointer const
INIT_EXCEPTION_GUARD(CMD_ARRAY, _channelArray = _channel->getChannel()->createChannelArray(thisPointer, pvRequest));
}
void ServerChannelArrayRequesterImpl::channelArrayConnect(const Status& status, ChannelArray::shared_pointer const & channelArray, PVArray::shared_pointer const & pvArray)
void ServerChannelArrayRequesterImpl::channelArrayConnect(const Status& status, ChannelArray::shared_pointer const & channelArray, Array::const_shared_pointer const & array)
{
{
Lock guard(_mutex);
_status = status;
_pvArray = pvArray;
_channelArray = channelArray;
_array = array;
}
_pvArray = std::tr1::static_pointer_cast<PVArray>(reuseOrCreatePVField(_array, _pvArray));
TransportSender::shared_pointer thisSender = shared_from_this();
_transport->enqueueSendRequest(thisSender);
@@ -1554,7 +1647,18 @@ void ServerChannelArrayRequesterImpl::channelArrayConnect(const Status& status,
}
}
void ServerChannelArrayRequesterImpl::getArrayDone(const Status& status)
void ServerChannelArrayRequesterImpl::getArrayDone(const Status& status, ChannelArray::shared_pointer const & /*channelArray*/, PVArray::shared_pointer const & pvArray)
{
{
Lock guard(_mutex);
_status = status;
_pvArray = pvArray;
}
TransportSender::shared_pointer thisSender = shared_from_this();
_transport->enqueueSendRequest(thisSender);
}
void ServerChannelArrayRequesterImpl::putArrayDone(const Status& status, ChannelArray::shared_pointer const & /*channelArray*/)
{
{
Lock guard(_mutex);
@@ -1564,7 +1668,7 @@ void ServerChannelArrayRequesterImpl::getArrayDone(const Status& status)
_transport->enqueueSendRequest(thisSender);
}
void ServerChannelArrayRequesterImpl::putArrayDone(const Status& status)
void ServerChannelArrayRequesterImpl::setLengthDone(const Status& status, ChannelArray::shared_pointer const & /*channelArray*/)
{
{
Lock guard(_mutex);
@@ -1574,11 +1678,14 @@ void ServerChannelArrayRequesterImpl::putArrayDone(const Status& status)
_transport->enqueueSendRequest(thisSender);
}
void ServerChannelArrayRequesterImpl::setLengthDone(const Status& status)
void ServerChannelArrayRequesterImpl::getLengthDone(const Status& status, ChannelArray::shared_pointer const & /*channelArray*/,
size_t length, size_t capacity)
{
{
Lock guard(_mutex);
_status = status;
_length = length;
_capacity = capacity;
}
TransportSender::shared_pointer thisSender = shared_from_this();
_transport->enqueueSendRequest(thisSender);
@@ -1586,12 +1693,12 @@ void ServerChannelArrayRequesterImpl::setLengthDone(const Status& status)
void ServerChannelArrayRequesterImpl::lock()
{
//noop
// noop
}
void ServerChannelArrayRequesterImpl::unlock()
{
//noop
// noop
}
void ServerChannelArrayRequesterImpl::destroy()
@@ -1644,11 +1751,18 @@ void ServerChannelArrayRequesterImpl::send(ByteBuffer* buffer, TransportSendCont
//Lock guard(_mutex);
ScopedLock lock(_channelArray); // valid due to _mutex lock above
_pvArray->serialize(buffer, control, 0, _pvArray->getLength());
_pvArray.reset();
}
else if ((QOS_PROCESS & request) != 0)
{
//Lock guard(_mutex);
SerializeHelper::writeSize(_length, buffer, control);
SerializeHelper::writeSize(_capacity, buffer, control);
}
else if ((QOS_INIT & request) != 0)
{
Lock guard(_mutex);
control->cachedSerialize(_pvArray != NULL ? _pvArray->getField() : FieldConstPtr(), buffer);
control->cachedSerialize(_array, buffer);
}
}
@@ -1794,7 +1908,9 @@ void ServerProcessHandler::handleResponse(osiSockAddr* responseFrom,
return;
}
request->getChannelProcess()->process(lastRequest);
if (lastRequest)
request->getChannelProcess()->lastRequest();
request->getChannelProcess()->process();
}
}
@@ -1840,7 +1956,7 @@ void ServerChannelProcessRequesterImpl::channelProcessConnect(const Status& stat
}
}
void ServerChannelProcessRequesterImpl::processDone(const Status& status)
void ServerChannelProcessRequesterImpl::processDone(const Status& status, ChannelProcess::shared_pointer const & /*channelProcess*/)
{
{
Lock guard(_mutex);
@@ -1852,12 +1968,12 @@ void ServerChannelProcessRequesterImpl::processDone(const Status& status)
void ServerChannelProcessRequesterImpl::lock()
{
//noop
// noop
}
void ServerChannelProcessRequesterImpl::unlock()
{
//noop
// noop
}
void ServerChannelProcessRequesterImpl::destroy()
@@ -2045,7 +2161,9 @@ void ServerRPCHandler::handleResponse(osiSockAddr* responseFrom,
pvArgument = SerializationHelper::deserializeStructureFull(payloadBuffer, transport.get());
);
channelRPC->request(pvArgument, lastRequest);
if (lastRequest)
channelRPC->lastRequest();
channelRPC->request(pvArgument);
}
}
@@ -2093,7 +2211,7 @@ void ServerChannelRPCRequesterImpl::channelRPCConnect(const Status& status, Chan
}
}
void ServerChannelRPCRequesterImpl::requestDone(const Status& status, PVStructure::shared_pointer const & pvResponse)
void ServerChannelRPCRequesterImpl::requestDone(const Status& status, ChannelRPC::shared_pointer const & /*channelRPC*/, PVStructure::shared_pointer const & pvResponse)
{
{
Lock guard(_mutex);
@@ -2106,12 +2224,12 @@ void ServerChannelRPCRequesterImpl::requestDone(const Status& status, PVStructur
void ServerChannelRPCRequesterImpl::lock()
{
//noop
// noop
}
void ServerChannelRPCRequesterImpl::unlock()
{
//noop
// noop
}
void ServerChannelRPCRequesterImpl::destroy()

View File

@@ -333,8 +333,10 @@ namespace pvAccess {
epics::pvData::PVStructure::shared_pointer const & pvRequest);
virtual ~ServerChannelGetRequesterImpl() {}
void channelGetConnect(const epics::pvData::Status& status, ChannelGet::shared_pointer const & channelGet,
epics::pvData::PVStructure::shared_pointer const & pvStructure, epics::pvData::BitSet::shared_pointer const & bitSet);
void getDone(const epics::pvData::Status& status);
epics::pvData::Structure::const_shared_pointer const & structure);
void getDone(const epics::pvData::Status& status, ChannelGet::shared_pointer const & channelGet,
epics::pvData::PVStructure::shared_pointer const & pvStructure,
epics::pvData::BitSet::shared_pointer const & bitSet);
void destroy();
ChannelGet::shared_pointer getChannelGet();
@@ -344,8 +346,9 @@ namespace pvAccess {
void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control);
private:
ChannelGet::shared_pointer _channelGet;
epics::pvData::BitSet::shared_pointer _bitSet;
epics::pvData::Structure::const_shared_pointer _structure;
epics::pvData::PVStructure::shared_pointer _pvStructure;
epics::pvData::BitSet::shared_pointer _bitSet;
epics::pvData::Status _status;
};
@@ -387,21 +390,30 @@ namespace pvAccess {
Transport::shared_pointer const & transport,epics::pvData::PVStructure::shared_pointer const & pvRequest);
virtual ~ServerChannelPutRequesterImpl() {}
void channelPutConnect(const epics::pvData::Status& status, ChannelPut::shared_pointer const & channelPut, epics::pvData::PVStructure::shared_pointer const & pvStructure, epics::pvData::BitSet::shared_pointer const & bitSet);
void putDone(const epics::pvData::Status& status);
void getDone(const epics::pvData::Status& status);
void channelPutConnect(const epics::pvData::Status& status, ChannelPut::shared_pointer const & channelPut, epics::pvData::Structure::const_shared_pointer const & structure);
void putDone(const epics::pvData::Status& status, ChannelPut::shared_pointer const & channelPut);
void getDone(const epics::pvData::Status& status, ChannelPut::shared_pointer const & channelPut, epics::pvData::PVStructure::shared_pointer const & pvStructure, epics::pvData::BitSet::shared_pointer const & bitSet);
void lock();
void unlock();
void destroy();
ChannelPut::shared_pointer getChannelPut();
epics::pvData::BitSet::shared_pointer getBitSet();
epics::pvData::PVStructure::shared_pointer getPVStructure();
epics::pvData::BitSet::shared_pointer getPutBitSet();
epics::pvData::PVStructure::shared_pointer getPutPVStructure();
void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control);
private:
ChannelPut::shared_pointer _channelPut;
epics::pvData::Structure::const_shared_pointer _structure;
// reference store (for get)
epics::pvData::BitSet::shared_pointer _bitSet;
epics::pvData::PVStructure::shared_pointer _pvStructure;
// data store (for put)
epics::pvData::BitSet::shared_pointer _putBitSet;
epics::pvData::PVStructure::shared_pointer _putPVStructure;
epics::pvData::Status _status;
};
@@ -442,21 +454,43 @@ namespace pvAccess {
Transport::shared_pointer const & transport,epics::pvData::PVStructure::shared_pointer const & pvRequest);
virtual ~ServerChannelPutGetRequesterImpl() {}
void channelPutGetConnect(const epics::pvData::Status& status, ChannelPutGet::shared_pointer const & channelPutGet, epics::pvData::PVStructure::shared_pointer const & pvPutStructure, epics::pvData::PVStructure::shared_pointer const & pvGetStructure);
void getGetDone(const epics::pvData::Status& status);
void getPutDone(const epics::pvData::Status& status);
void putGetDone(const epics::pvData::Status& status);
void channelPutGetConnect(const epics::pvData::Status& status, ChannelPutGet::shared_pointer const & channelPutGet,
epics::pvData::Structure::const_shared_pointer const & putStructure,
epics::pvData::Structure::const_shared_pointer const & getStructure);
void getGetDone(const epics::pvData::Status& status, ChannelPutGet::shared_pointer const & channelPutGet,
epics::pvData::PVStructure::shared_pointer const & pvStructure,
epics::pvData::BitSet::shared_pointer const & bitSet);
void getPutDone(const epics::pvData::Status& status, ChannelPutGet::shared_pointer const & channelPutGet,
epics::pvData::PVStructure::shared_pointer const & pvStructure,
epics::pvData::BitSet::shared_pointer const & bitSet);
void putGetDone(const epics::pvData::Status& status, ChannelPutGet::shared_pointer const & channelPutGet,
epics::pvData::PVStructure::shared_pointer const & pvStructure,
epics::pvData::BitSet::shared_pointer const & bitSet);
void lock();
void unlock();
void destroy();
ChannelPutGet::shared_pointer getChannelPutGet();
epics::pvData::PVStructure::shared_pointer getPVPutStructure();
epics::pvData::PVStructure::shared_pointer getPutGetPVStructure();
epics::pvData::BitSet::shared_pointer getPutGetBitSet();
void send(epics::pvData::ByteBuffer* buffer, TransportSendControl* control);
private:
ChannelPutGet::shared_pointer _channelPutGet;
epics::pvData::Structure::const_shared_pointer _putStructure;
epics::pvData::Structure::const_shared_pointer _getStructure;
// reference store
epics::pvData::PVStructure::shared_pointer _pvPutStructure;
epics::pvData::BitSet::shared_pointer _pvPutBitSet;
epics::pvData::PVStructure::shared_pointer _pvGetStructure;
epics::pvData::BitSet::shared_pointer _pvGetBitSet;
// data container (for put-get)
epics::pvData::PVStructure::shared_pointer _pvPutGetStructure;
epics::pvData::BitSet::shared_pointer _pvPutGetBitSet;
epics::pvData::Status _status;
};
@@ -552,10 +586,13 @@ namespace pvAccess {
Transport::shared_pointer const & transport,epics::pvData::PVStructure::shared_pointer const & pvRequest);
virtual ~ServerChannelArrayRequesterImpl() {}
void channelArrayConnect(const epics::pvData::Status& status, ChannelArray::shared_pointer const & channelArray, epics::pvData::PVArray::shared_pointer const & pvArray);
void getArrayDone(const epics::pvData::Status& status);
void putArrayDone(const epics::pvData::Status& status);
void setLengthDone(const epics::pvData::Status& status);
void channelArrayConnect(const epics::pvData::Status& status, ChannelArray::shared_pointer const & channelArray, epics::pvData::Array::const_shared_pointer const & array);
void getArrayDone(const epics::pvData::Status& status, ChannelArray::shared_pointer const & channelArray,
epics::pvData::PVArray::shared_pointer const & pvArray);
void putArrayDone(const epics::pvData::Status& status, ChannelArray::shared_pointer const & channelArray);
void setLengthDone(const epics::pvData::Status& status, ChannelArray::shared_pointer const & channelArray);
void getLengthDone(const epics::pvData::Status& status, ChannelArray::shared_pointer const & channelArray,
std::size_t length, std::size_t capacity);
void lock();
void unlock();
void destroy();
@@ -567,7 +604,10 @@ namespace pvAccess {
private:
ChannelArray::shared_pointer _channelArray;
epics::pvData::Array::const_shared_pointer _array;
epics::pvData::PVArray::shared_pointer _pvArray;
std::size_t _length;
std::size_t _capacity;
epics::pvData::Status _status;
};
@@ -651,7 +691,7 @@ namespace pvAccess {
virtual ~ServerChannelProcessRequesterImpl() {}
void channelProcessConnect(const epics::pvData::Status& status, ChannelProcess::shared_pointer const & channelProcess);
void processDone(const epics::pvData::Status& status);
void processDone(const epics::pvData::Status& status, ChannelProcess::shared_pointer const & channelProcess);
void lock();
void unlock();
void destroy();
@@ -777,7 +817,7 @@ namespace pvAccess {
virtual ~ServerChannelRPCRequesterImpl() {}
void channelRPCConnect(const epics::pvData::Status& status, ChannelRPC::shared_pointer const & channelRPC);
void requestDone(const epics::pvData::Status& status, epics::pvData::PVStructure::shared_pointer const & pvResponse);
void requestDone(const epics::pvData::Status& status, ChannelRPC::shared_pointer const & channelRPC, epics::pvData::PVStructure::shared_pointer const & pvResponse);
void lock();
void unlock();
void destroy();

View File

@@ -36,7 +36,7 @@ ServerContextImpl::ServerContextImpl():
_beaconEmitter(),
_acceptor(),
_transportRegistry(),
_channelAccess(),
_channelProviderRegistry(),
_channelProviderNames(PVACCESS_DEFAULT_PROVIDER),
_channelProviders(),
_beaconServerStatusProvider()
@@ -126,12 +126,12 @@ bool ServerContextImpl::isChannelProviderNamePreconfigured()
return config->hasProperty("EPICS_PVA_PROVIDER_NAMES") || config->hasProperty("EPICS_PVAS_PROVIDER_NAMES");
}
void ServerContextImpl::initialize(ChannelAccess::shared_pointer const & channelAccess)
void ServerContextImpl::initialize(ChannelProviderRegistry::shared_pointer const & channelProviderRegistry)
{
Lock guard(_mutex);
if (channelAccess == NULL)
if (channelProviderRegistry == NULL)
{
THROW_BASE_EXCEPTION("non null channelAccess expected");
THROW_BASE_EXCEPTION("non null channelProviderRegistry expected");
}
if (_state == DESTROYED)
@@ -143,7 +143,7 @@ void ServerContextImpl::initialize(ChannelAccess::shared_pointer const & channel
THROW_BASE_EXCEPTION("Context already initialized.");
}
_channelAccess = channelAccess;
_channelProviderRegistry = channelProviderRegistry;
// user all providers
@@ -151,10 +151,10 @@ void ServerContextImpl::initialize(ChannelAccess::shared_pointer const & channel
{
_channelProviderNames.resize(0); // VxWorks 5.5 omits clear()
std::auto_ptr<ChannelAccess::stringVector_t> names = _channelAccess->getProviderNames();
for (ChannelAccess::stringVector_t::iterator iter = names->begin(); iter != names->end(); iter++)
std::auto_ptr<ChannelProviderRegistry::stringVector_t> names = _channelProviderRegistry->getProviderNames();
for (ChannelProviderRegistry::stringVector_t::iterator iter = names->begin(); iter != names->end(); iter++)
{
ChannelProvider::shared_pointer channelProvider = _channelAccess->getProvider(*iter);
ChannelProvider::shared_pointer channelProvider = _channelProviderRegistry->getProvider(*iter);
if (channelProvider)
{
_channelProviders.push_back(channelProvider);
@@ -173,13 +173,13 @@ void ServerContextImpl::initialize(ChannelAccess::shared_pointer const & channel
std::string providerName;
while (std::getline(ss, providerName, ' '))
{
ChannelProvider::shared_pointer channelProvider = _channelAccess->getProvider(providerName);
ChannelProvider::shared_pointer channelProvider = _channelProviderRegistry->getProvider(providerName);
if (channelProvider)
_channelProviders.push_back(channelProvider);
}
}
//_channelProvider = _channelAccess->getProvider(_channelProviderNames);
//_channelProvider = _channelProviderRegistry->getProvider(_channelProviderNames);
if (_channelProviders.size() == 0)
{
std::string msg = "None of the specified channel providers are available: " + _channelProviderNames + ".";
@@ -545,9 +545,9 @@ BlockingUDPTransport::shared_pointer ServerContextImpl::getBroadcastTransport()
return _broadcastTransport;
}
ChannelAccess::shared_pointer ServerContextImpl::getChannelAccess()
ChannelProviderRegistry::shared_pointer ServerContextImpl::getChannelProviderRegistry()
{
return _channelAccess;
return _channelProviderRegistry;
}
std::string ServerContextImpl::getChannelProviderName()
@@ -621,8 +621,8 @@ ServerContext::shared_pointer startPVAServer(String const & providerNames, int t
if (!ctx->isChannelProviderNamePreconfigured())
ctx->setChannelProviderName(providerNames);
ChannelAccess::shared_pointer channelAccess = getChannelAccess();
ctx->initialize(channelAccess);
ChannelProviderRegistry::shared_pointer channelProviderRegistry = getChannelProviderRegistry();
ctx->initialize(channelProviderRegistry);
if (printInfo)
ctx->printInfo();

View File

@@ -42,10 +42,10 @@ public:
virtual const Version& getVersion() = 0;
/**
* Set <code>ChannelAccess</code> implementation and initialize server.
* @param channelAccess implementation of channel access to be served.
* Set <code>ChannelProviderRegistry</code> implementation and initialize server.
* @param channelProviderRegistry channel providers registry to be used.
*/
virtual void initialize(ChannelAccess::shared_pointer const & channelAccess) = 0;
virtual void initialize(ChannelProviderRegistry::shared_pointer const & channelProviderRegistry) = 0;
/**
* Run server (process events).
@@ -116,7 +116,7 @@ public:
//**************** derived from ServerContext ****************//
const Version& getVersion();
void initialize(ChannelAccess::shared_pointer const & channelAccess);
void initialize(ChannelProviderRegistry::shared_pointer const & channelProviderRegistry);
void run(epics::pvData::int32 seconds);
void shutdown();
void destroy();
@@ -253,10 +253,10 @@ public:
BlockingUDPTransport::shared_pointer getBroadcastTransport();
/**
* Get channel access implementation.
* @return channel access implementation.
* Get channel provider registry implementation used by this instance.
* @return channel provider registry used by this instance.
*/
ChannelAccess::shared_pointer getChannelAccess();
ChannelProviderRegistry::shared_pointer getChannelProviderRegistry();
/**
* Get channel provider name.
@@ -354,7 +354,7 @@ private:
/**
* Channel access.
*/
ChannelAccess::shared_pointer _channelAccess;
ChannelProviderRegistry::shared_pointer _channelProviderRegistry;
/**
* Channel provider name.