BaseResponseRequests

This commit is contained in:
Matej Sekoranja
2011-01-16 22:40:30 +01:00
parent 676cb0e3f6
commit 9d2692026a
7 changed files with 308 additions and 112 deletions

View File

@@ -28,6 +28,195 @@
using namespace epics::pvData;
using namespace epics::pvAccess;
static Status* g_statusOK = getStatusCreate()->getStatusOK();
static StatusCreate* statusCreate = getStatusCreate();
static Status* okStatus = g_statusOK;
static Status* destroyedStatus = statusCreate->createStatus(STATUSTYPE_ERROR, "request destroyed");
static Status* channelNotConnected = statusCreate->createStatus(STATUSTYPE_ERROR, "channel not connected");
static Status* otherRequestPendingStatus = statusCreate->createStatus(STATUSTYPE_ERROR, "other request pending");
static PVDataCreate* pvDataCreate = getPVDataCreate();
/**
* Base channel request.
* @author <a href="mailto:matej.sekoranjaATcosylab.com">Matej Sekoranja</a>
* @version $Id: BaseRequestImpl.java,v 1.1 2010/05/03 14:45:40 mrkraimer Exp $
*/
class BaseRequestImpl :
public DataResponse,
public SubscriptionRequest,
public TransportSender {
protected:
ChannelImpl* m_channel;
ClientContextImpl* m_context;
pvAccessID m_ioid;
Requester* m_requester;
// TODO sync
volatile bool m_destroyed;
volatile bool m_remotelyDestroyed;
/* negative... */
static const int NULL_REQUEST = -1;
static const int PURE_DESTROY_REQUEST = -2;
int32 m_pendingRequest;
Mutex m_mutex;
public:
BaseRequestImpl(ChannelImpl* channel, Requester* requester) :
m_channel(channel), m_context(channel->getContext()),
m_requester(requester), m_destroyed(false), m_remotelyDestroyed(false),
m_pendingRequest(NULL_REQUEST)
{
// register response request
m_ioid = m_context->registerResponseRequest(this);
channel->registerResponseRequest(this);
}
bool startRequest(int32 qos) {
Lock guard(&m_mutex);
// we allow pure destroy...
if (m_pendingRequest != NULL_REQUEST && qos != PURE_DESTROY_REQUEST)
return false;
m_pendingRequest = qos;
return true;
}
void stopRequest() {
Lock guard(&m_mutex);
m_pendingRequest = NULL_REQUEST;
}
int32 getPendingRequest() {
Lock guard(&m_mutex);
return m_pendingRequest;
}
Requester* getRequester() {
return m_requester;
}
pvAccessID getIOID() {
return m_ioid;
}
virtual bool initResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status);
virtual bool destroyResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status);
virtual bool normalResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status);
virtual void response(Transport* transport, int8 version, ByteBuffer* payloadBuffer) {
// TODO?
// try
// {
transport->ensureData(1);
int8 qos = payloadBuffer->getByte();
Status* status = statusCreate->deserializeStatus(payloadBuffer, transport);
if (qos & QOS_INIT)
{
initResponse(transport, version, payloadBuffer, qos, status);
}
else if (qos & QOS_DESTROY)
{
m_remotelyDestroyed = true;
if (!destroyResponse(transport, version, payloadBuffer, qos, status))
cancel();
}
else
{
normalResponse(transport, version, payloadBuffer, qos, status);
}
// TODO
if (status != okStatus)
delete status;
}
virtual void cancel() {
destroy();
}
virtual void destroy() {
{
Lock guard(&m_mutex);
if (m_destroyed)
return;
m_destroyed = true;
}
// unregister response request
m_context->unregisterResponseRequest(this);
m_channel->unregisterResponseRequest(this);
// destroy remote instance
if (!m_remotelyDestroyed)
{
startRequest(PURE_DESTROY_REQUEST);
m_channel->checkAndGetTransport()->enqueueSendRequest(this);
}
}
virtual void timeout() {
cancel();
// TODO notify?
}
void reportStatus(Status* status) {
// destroy, since channel (parent) was destroyed
if (status == ChannelImpl::channelDestroyed)
destroy();
else if (status == ChannelImpl::channelDisconnected)
stopRequest();
// TODO notify?
}
virtual void updateSubscription() {
// default is noop
}
virtual void lock() {
// noop
}
virtual void send(ByteBuffer* buffer, TransportSendControl* control) {
int8 qos = getPendingRequest();
if (qos == -1)
return;
else if (qos == PURE_DESTROY_REQUEST)
{
control->startMessage((int8)15, 8);
buffer->putInt(m_channel->getServerChannelID());
buffer->putInt(m_ioid);
}
stopRequest();
}
virtual void unlock() {
// noop
}
};
PVDATA_REFCOUNT_MONITOR_DEFINE(mockChannelProcess);
@@ -77,7 +266,7 @@ class ChannelImplProcess : public ChannelProcess
m_valueField = static_cast<PVScalar*>(field);
// TODO pvRequest
m_channelProcessRequester->channelProcessConnect(getStatusCreate()->getStatusOK(), this);
m_channelProcessRequester->channelProcessConnect(g_statusOK, this);
}
virtual void process(bool lastRequest)
@@ -153,7 +342,7 @@ class ChannelImplProcess : public ChannelProcess
break;
}
m_channelProcessRequester->processDone(getStatusCreate()->getStatusOK());
m_channelProcessRequester->processDone(g_statusOK);
if (lastRequest)
destroy();
@@ -171,41 +360,39 @@ class ChannelImplProcess : public ChannelProcess
PVDATA_REFCOUNT_MONITOR_DEFINE(mockChannelGet);
PVDATA_REFCOUNT_MONITOR_DEFINE(channelGet);
class ChannelImplGet : public ChannelGet
{
private:
ChannelImpl* m_channel;
ChannelGetRequester* m_channelGetRequester;
PVStructure* m_pvStructure;
PVStructure* m_pvRequest;
PVStructure* m_data;
BitSet* m_bitSet;
volatile bool m_first;
private:
~ChannelImplGet()
{
PVDATA_REFCOUNT_MONITOR_DESTRUCT(mockChannelGet);
PVDATA_REFCOUNT_MONITOR_DESTRUCT(channelGet);
}
public:
ChannelImplGet(ChannelGetRequester* channelGetRequester, PVStructure *pvStructure, PVStructure *pvRequest) :
m_channelGetRequester(channelGetRequester), m_pvStructure(pvStructure),
m_bitSet(new BitSet(pvStructure->getNumberFields())), m_first(true)
ChannelImplGet(ChannelImpl* channel, ChannelGetRequester* channelGetRequester, PVStructure *pvRequest) :
m_channel(channel), m_channelGetRequester(channelGetRequester), m_pvRequest(pvRequest),
m_data(0), m_bitSet(0)
{
PVDATA_REFCOUNT_MONITOR_CONSTRUCT(mockChannelGet);
PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelGet);
// TODO pvRequest
m_channelGetRequester->channelGetConnect(getStatusCreate()->getStatusOK(), this, m_pvStructure, m_bitSet);
m_channelGetRequester->channelGetConnect(g_statusOK, this, m_data, m_bitSet);
}
virtual void get(bool lastRequest)
{
m_channelGetRequester->getDone(getStatusCreate()->getStatusOK());
if (m_first)
{
m_first = false;
m_bitSet->set(0); // TODO
}
m_channelGetRequester->getDone(g_statusOK);
if (lastRequest)
destroy();
@@ -213,7 +400,7 @@ class ChannelImplGet : public ChannelGet
virtual void destroy()
{
delete m_bitSet;
// delete m_bitSet;
delete this;
}
@@ -250,19 +437,19 @@ class ChannelImplPut : public ChannelPut
PVDATA_REFCOUNT_MONITOR_CONSTRUCT(mockChannelPut);
// TODO pvRequest
m_channelPutRequester->channelPutConnect(getStatusCreate()->getStatusOK(), this, m_pvStructure, m_bitSet);
m_channelPutRequester->channelPutConnect(g_statusOK, this, m_pvStructure, m_bitSet);
}
virtual void put(bool lastRequest)
{
m_channelPutRequester->putDone(getStatusCreate()->getStatusOK());
m_channelPutRequester->putDone(g_statusOK);
if (lastRequest)
destroy();
}
virtual void get()
{
m_channelPutRequester->getDone(getStatusCreate()->getStatusOK());
m_channelPutRequester->getDone(g_statusOK);
}
virtual void destroy()
@@ -312,7 +499,7 @@ class MockMonitor : public Monitor, public MonitorElement
m_changedBitSet->set(0);
// TODO pvRequest
m_monitorRequester->monitorConnect(getStatusCreate()->getStatusOK(), this, const_cast<Structure*>(m_pvStructure->getStructure()));
m_monitorRequester->monitorConnect(g_statusOK, this, const_cast<Structure*>(m_pvStructure->getStructure()));
}
virtual Status* start()
@@ -707,6 +894,10 @@ typedef std::map<pvAccessID, ResponseRequest*> IOIDResponseRequestMap;
channel->connectionCompleted(sid);
}
// TODO not nice
if (status != g_statusOK)
delete status;
}
};
@@ -901,6 +1092,11 @@ class TestChannelImpl : public ChannelImpl {
*/
IOIDResponseRequestMap m_responseRequests;
/**
* Mutex for response requests.
*/
Mutex m_responseRequestsMutex;
/**
* Allow reconnection flag.
*/
@@ -926,7 +1122,6 @@ class TestChannelImpl : public ChannelImpl {
*/
pvAccessID m_serverChannelID;
/**
* Context sync. mutex.
*/
@@ -1054,6 +1249,10 @@ class TestChannelImpl : public ChannelImpl {
return m_channelID;
}
virtual ClientContextImpl* getContext() {
return m_context;
}
virtual pvAccessID getSearchInstanceID() {
return m_channelID;
}
@@ -1062,6 +1261,23 @@ class TestChannelImpl : public ChannelImpl {
return m_name;
}
virtual pvAccessID getServerChannelID() {
Lock guard(&m_channelMutex);
return m_serverChannelID;
}
virtual void registerResponseRequest(ResponseRequest* responseRequest)
{
Lock guard(&m_responseRequestsMutex);
m_responseRequests[responseRequest->getIOID()] = responseRequest;
}
virtual void unregisterResponseRequest(ResponseRequest* responseRequest)
{
Lock guard(&m_responseRequestsMutex);
m_responseRequests.erase(responseRequest->getIOID());
}
void connect() {
Lock guard(&m_channelMutex);
// if not destroyed...
@@ -1325,6 +1541,17 @@ class TestChannelImpl : public ChannelImpl {
initiateSearch();
}
virtual Transport* checkAndGetTransport()
{
Lock guard(&m_channelMutex);
// TODO C-fy
if (m_connectionState == DESTROYED)
throw std::runtime_error("Channel destroyed.");
else if (m_connectionState != CONNECTED)
throw std::runtime_error("Channel not connected.");
return m_transport; // TODO transport can be 0 !!!!!!!!!!
}
virtual void transportResponsive(Transport* transport) {
Lock guard(&m_channelMutex);
if (m_connectionState == DISCONNECTED)
@@ -1441,7 +1668,7 @@ class TestChannelImpl : public ChannelImpl {
virtual void getField(GetFieldRequester *requester,epics::pvData::String subField)
{
requester->getDone(getStatusCreate()->getStatusOK(),m_pvStructure->getSubField(subField)->getField());
requester->getDone(g_statusOK,m_pvStructure->getSubField(subField)->getField());
}
virtual ChannelProcess* createChannelProcess(
@@ -1455,7 +1682,7 @@ class TestChannelImpl : public ChannelImpl {
ChannelGetRequester *channelGetRequester,
epics::pvData::PVStructure *pvRequest)
{
return new ChannelImplGet(channelGetRequester, m_pvStructure, pvRequest);
return new ChannelImplGet(this, channelGetRequester, pvRequest);
}
virtual ChannelPut* createChannelPut(
@@ -1601,7 +1828,7 @@ class TestChannelImpl : public ChannelImpl {
// TODO support addressList
Channel* channel = m_context->createChannelInternal(channelName, channelRequester, priority, 0);
if (channel)
channelRequester->channelCreated(getStatusCreate()->getStatusOK(), channel);
channelRequester->channelCreated(g_statusOK, channel);
return channel;
// NOTE it's up to internal code to respond w/ error to requester and return 0 in case of errors