channelGet... not working

This commit is contained in:
Matej Sekoranja
2011-01-17 00:11:08 +01:00
parent 9d2692026a
commit b2bac9f916
3 changed files with 123 additions and 19 deletions

View File

@@ -32,7 +32,6 @@ static Status* g_statusOK = getStatusCreate()->getStatusOK();
static StatusCreate* statusCreate = getStatusCreate();
static Status* okStatus = g_statusOK;
static Status* destroyedStatus = statusCreate->createStatus(STATUSTYPE_ERROR, "request destroyed");
@@ -40,6 +39,10 @@ static Status* g_statusOK = getStatusCreate()->getStatusOK();
static Status* otherRequestPendingStatus = statusCreate->createStatus(STATUSTYPE_ERROR, "other request pending");
static PVDataCreate* pvDataCreate = getPVDataCreate();
// TODO
Status* ChannelImpl::channelDestroyed = statusCreate->createStatus(STATUSTYPE_WARNING, "channel destroyed");
Status* ChannelImpl::channelDisconnected = statusCreate->createStatus(STATUSTYPE_WARNING, "channel disconnected");;
/**
* Base channel request.
* @author <a href="mailto:matej.sekoranjaATcosylab.com">Matej Sekoranja</a>
@@ -110,9 +113,9 @@ public:
return m_ioid;
}
virtual bool initResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status);
virtual bool destroyResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status);
virtual bool normalResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status);
virtual bool initResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) = 0;
virtual bool destroyResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) = 0;
virtual bool normalResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) = 0;
virtual void response(Transport* transport, int8 version, ByteBuffer* payloadBuffer) {
// TODO?
@@ -362,7 +365,7 @@ class ChannelImplProcess : public ChannelProcess
PVDATA_REFCOUNT_MONITOR_DEFINE(channelGet);
class ChannelImplGet : public ChannelGet
class ChannelImplGet : public BaseRequestImpl, public ChannelGet
{
private:
ChannelImpl* m_channel;
@@ -381,22 +384,111 @@ class ChannelImplGet : public ChannelGet
public:
ChannelImplGet(ChannelImpl* channel, ChannelGetRequester* channelGetRequester, PVStructure *pvRequest) :
m_channel(channel), m_channelGetRequester(channelGetRequester), m_pvRequest(pvRequest),
BaseRequestImpl(channel, channelGetRequester),
m_channel(channel), m_channelGetRequester(channelGetRequester), m_pvRequest(pvRequest), // TODO pvRequest
m_data(0), m_bitSet(0)
{
PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelGet);
// TODO pvRequest
m_channelGetRequester->channelGetConnect(g_statusOK, this, m_data, m_bitSet);
// TODO immediate get, i.e. get data with init message
// TODO one-time get, i.e. immediate get + lastRequest
// subscribe
// try {
resubscribeSubscription(m_channel->checkAndGetTransport());
// } catch (IllegalStateException ise) {
// TODO m_channelGetRequester->channelGetConnect(channelNotConnected, null, null, null);
// } catch (CAException caex) {
// TODO m_channelGetRequester->channelGetConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", caex), null, null, null);
// }
}
virtual void get(bool lastRequest)
{
m_channelGetRequester->getDone(g_statusOK);
virtual void send(ByteBuffer* buffer, TransportSendControl* control) {
int32 pendingRequest = getPendingRequest();
if (pendingRequest < 0)
{
BaseRequestImpl::send(buffer, control);
return;
}
control->startMessage((int8)10, 9);
buffer->putInt(m_channel->getServerChannelID());
buffer->putInt(m_ioid);
buffer->putByte((int8)m_pendingRequest);
if (pendingRequest & QOS_INIT)
{
// pvRequest
m_channel->getTransport()->getIntrospectionRegistry()->serializePVRequest(buffer, control, m_pvRequest);
}
stopRequest();
}
virtual bool destroyResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) {
// data available
if (qos & QOS_GET)
normalResponse(transport, version, payloadBuffer, qos, status);
return true;
}
virtual bool initResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) {
if (!status->isSuccess())
{
m_channelGetRequester->channelGetConnect(status, this, 0, 0);
return true;
}
// create data and its bitSet
m_data = transport->getIntrospectionRegistry()->deserializeStructureAndCreatePVStructure(payloadBuffer, transport);
m_bitSet = new BitSet(m_data->getNumberFields());
// notify
m_channelGetRequester->channelGetConnect(okStatus, this, m_data, m_bitSet);
return true;
}
virtual bool normalResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) {
if (!status->isSuccess())
{
m_channelGetRequester->getDone(status);
return true;
}
// deserialize bitSet and data
m_bitSet->deserialize(payloadBuffer, transport);
m_data->deserialize(payloadBuffer, transport, m_bitSet);
m_channelGetRequester->getDone(okStatus);
return true;
}
virtual void get(bool lastRequest) {
// TODO sync?
if (m_destroyed) {
m_channelGetRequester->getDone(destroyedStatus);
return;
}
if (!startRequest(lastRequest ? QOS_DESTROY | QOS_GET : QOS_DEFAULT)) {
m_channelGetRequester->getDone(otherRequestPendingStatus);
return;
}
//try {
m_channel->checkAndGetTransport()->enqueueSendRequest(this);
//} catch (IllegalStateException ise) {
//TODO // m_channelGetRequester->getDone(channelNotConnected);
//}
}
virtual void resubscribeSubscription(Transport* transport) {
startRequest(QOS_INIT);
transport->enqueueSendRequest(this);
}
if (lastRequest)
destroy();
}
virtual void destroy()
{
@@ -1552,6 +1644,12 @@ class TestChannelImpl : public ChannelImpl {
return m_transport; // TODO transport can be 0 !!!!!!!!!!
}
virtual Transport* getTransport()
{
Lock guard(&m_channelMutex);
return m_transport;
}
virtual void transportResponsive(Transport* transport) {
Lock guard(&m_channelMutex);
if (m_connectionState == DISCONNECTED)
@@ -2741,15 +2839,19 @@ int main(int argc,char *argv[])
ChannelRequesterImpl channelRequester;
Channel* channel = context->getProvider()->createChannel("structureArrayTest", &channelRequester);
channel->printInfo();
/*
GetFieldRequesterImpl getFieldRequesterImpl;
channel->getField(&getFieldRequesterImpl, "timeStamp.secondsPastEpoch");
//GetFieldRequesterImpl getFieldRequesterImpl;
//channel->getField(&getFieldRequesterImpl, "timeStamp.secondsPastEpoch");
epicsThreadSleep ( 3.0 );
ChannelGetRequesterImpl channelGetRequesterImpl;
ChannelGet* channelGet = channel->createChannelGet(&channelGetRequesterImpl, 0);
epicsThreadSleep ( 3.0 );
channelGet->get(false);
epicsThreadSleep ( 3.0 );
channelGet->destroy();
/*
ChannelPutRequesterImpl channelPutRequesterImpl;
ChannelPut* channelPut = channel->createChannelPut(&channelPutRequesterImpl, 0);
channelPut->get();
@@ -2778,7 +2880,7 @@ int main(int argc,char *argv[])
monitor->destroy();
*/
epicsThreadSleep ( 100.0 );
epicsThreadSleep ( 20.0 );
channel->destroy();
context->destroy();