putGet impl

This commit is contained in:
Matej Sekoranja
2011-01-25 23:43:07 +01:00
parent 228ed56858
commit c1c391854c

View File

@@ -400,7 +400,7 @@ class ChannelGetImpl : public BaseRequestImpl, public ChannelGet
virtual bool destroyResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) {
// data available
if (qos & QOS_GET)
normalResponse(transport, version, payloadBuffer, qos, status);
return normalResponse(transport, version, payloadBuffer, qos, status);
return true;
}
@@ -660,6 +660,220 @@ class ChannelPutImpl : public BaseRequestImpl, public ChannelPut
PVDATA_REFCOUNT_MONITOR_DEFINE(channelPutGet);
class ChannelPutGetImpl : public BaseRequestImpl, public ChannelPutGet
{
private:
ChannelPutGetRequester* m_channelPutGetRequester;
PVStructure* m_pvRequest;
PVStructure* m_putData;
PVStructure* m_getData;
private:
~ChannelPutGetImpl()
{
PVDATA_REFCOUNT_MONITOR_DESTRUCT(channelPutGet);
}
public:
ChannelPutGetImpl(ChannelImpl* channel, ChannelPutGetRequester* channelPutGetRequester, PVStructure *pvRequest) :
BaseRequestImpl(channel, channelPutGetRequester),
m_channelPutGetRequester(channelPutGetRequester), m_pvRequest(pvRequest),
//(dynamic_cast<PVStructure*>(getPVDataCreate()->createPVField(0, "", pvRequest))),
m_putData(0), m_getData(0)
{
PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelPutGet);
// subscribe
// try {
resubscribeSubscription(m_channel->checkAndGetTransport());
// } catch (IllegalStateException ise) {
// TODO m_channelPutGetRequester->channelPutGetConnect(channelNotConnected, null, null, null);
// } catch (CAException caex) {
// TODO m_channelPutGetRequester->channelPutGetConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", caex), null, null, null);
// }
}
virtual void send(ByteBuffer* buffer, TransportSendControl* control) {
int32 pendingRequest = getPendingRequest();
if (pendingRequest < 0)
{
BaseRequestImpl::send(buffer, control);
return;
}
control->startMessage((int8)12, 9);
buffer->putInt(m_channel->getServerChannelID());
buffer->putInt(m_ioid);
if ((pendingRequest & QOS_INIT) == 0)
buffer->putByte((int8)pendingRequest);
if (pendingRequest & QOS_INIT)
{
buffer->putByte((int8)QOS_INIT);
// pvRequest
m_channel->getTransport()->getIntrospectionRegistry()->serializePVRequest(buffer, control, m_pvRequest);
}
else if (pendingRequest & (QOS_GET | QOS_GET_PUT)) {
// noop
}
else
{
m_putData->serialize(buffer, control);
}
stopRequest();
}
virtual bool destroyResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) {
// data available
// TODO we need a flag here...
return normalResponse(transport, version, payloadBuffer, qos, status);
}
virtual bool initResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) {
if (!status->isSuccess())
{
m_channelPutGetRequester->channelPutGetConnect(status, this, 0, 0);
return true;
}
IntrospectionRegistry* registry = transport->getIntrospectionRegistry();
m_putData = registry->deserializeStructureAndCreatePVStructure(payloadBuffer, transport);
m_getData = registry->deserializeStructureAndCreatePVStructure(payloadBuffer, transport);
// notify
m_channelPutGetRequester->channelPutGetConnect(okStatus, this, m_putData, m_getData);
return true;
}
virtual bool normalResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) {
if (qos & QOS_GET)
{
if (!status->isSuccess())
{
m_channelPutGetRequester->getGetDone(status);
return true;
}
// deserialize get data
m_getData->deserialize(payloadBuffer, transport);
m_channelPutGetRequester->getGetDone(status);
return true;
}
else if (qos & QOS_GET_PUT)
{
if (!status->isSuccess())
{
m_channelPutGetRequester->getPutDone(status);
return true;
}
// deserialize put data
m_putData->deserialize(payloadBuffer, transport);
m_channelPutGetRequester->getPutDone(status);
return true;
}
else
{
if (!status->isSuccess())
{
m_channelPutGetRequester->putGetDone(status);
return true;
}
// deserialize data
m_getData->deserialize(payloadBuffer, transport);
m_channelPutGetRequester->putGetDone(status);
return true;
}
}
virtual void putGet(bool lastRequest) {
if (m_destroyed) {
m_channelPutGetRequester->putGetDone(destroyedStatus);
return;
}
if (!startRequest(lastRequest ? QOS_DESTROY : QOS_DEFAULT)) {
m_channelPutGetRequester->putGetDone(otherRequestPendingStatus);
return;
}
// try {
m_channel->checkAndGetTransport()->enqueueSendRequest(this);
// } catch (IllegalStateException ise) {
// m_channelPutGetRequester->putGetDone(channelNotConnected);
// }
}
virtual void getGet() {
if (m_destroyed) {
m_channelPutGetRequester->getGetDone(destroyedStatus);
return;
}
if (!startRequest(QOS_GET)) {
m_channelPutGetRequester->getGetDone(otherRequestPendingStatus);
return;
}
//try {
m_channel->checkAndGetTransport()->enqueueSendRequest(this);
//} catch (IllegalStateException ise) {
//TODO // m_channelPutGetRequester->getGetDone(channelNotConnected);
//}
}
virtual void getPut() {
if (m_destroyed) {
m_channelPutGetRequester->getPutDone(destroyedStatus);
return;
}
if (!startRequest(QOS_GET_PUT)) {
m_channelPutGetRequester->getPutDone(otherRequestPendingStatus);
return;
}
//try {
m_channel->checkAndGetTransport()->enqueueSendRequest(this);
//} catch (IllegalStateException ise) {
//TODO // m_channelPutGetRequester->getPutDone(channelNotConnected);
//}
}
virtual void resubscribeSubscription(Transport* transport) {
startRequest(QOS_INIT);
transport->enqueueSendRequest(this);
}
virtual void destroy()
{
BaseRequestImpl::destroy();
// TODO sync
if (m_putData) delete m_putData;
if (m_getData) delete m_getData;
if (m_pvRequest) delete m_pvRequest;
delete this;
}
};
PVDATA_REFCOUNT_MONITOR_DEFINE(channelGetField);
@@ -882,8 +1096,7 @@ public MonitorElement
virtual bool destroyResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) {
// data available
// TODO if (qos & QOS_GET)
normalResponse(transport, version, payloadBuffer, qos, status);
return true;
return normalResponse(transport, version, payloadBuffer, qos, status);
}
virtual bool initResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) {
@@ -2215,8 +2428,7 @@ class TestChannelImpl : public ChannelImpl {
ChannelPutGetRequester *channelPutGetRequester,
epics::pvData::PVStructure *pvRequest)
{
// TODO return new ChannelPutGetImpl(this, channelPutGetRequester, pvRequest);
return 0;
return new ChannelPutGetImpl(this, channelPutGetRequester, pvRequest);
}
virtual ChannelRPC* createChannelRPC(ChannelRPCRequester *channelRPCRequester,
@@ -3180,6 +3392,83 @@ class ChannelPutRequesterImpl : public ChannelPutRequester
};
class ChannelPutGetRequesterImpl : public ChannelPutGetRequester
{
ChannelPutGet *m_channelPutGet;
epics::pvData::PVStructure *m_putData;
epics::pvData::PVStructure *m_getData;
virtual String getRequesterName()
{
return "ChannelGetPutRequesterImpl";
};
virtual void message(String message,MessageType messageType)
{
std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl;
}
virtual void channelPutGetConnect(epics::pvData::Status *status,ChannelPutGet *channelPutGet,
epics::pvData::PVStructure *putData,epics::pvData::PVStructure *getData)
{
std::cout << "channelGetPutConnect(" << status->toString() << ")" << std::endl;
// TODO sync
m_putData = putData;
m_getData = getData;
if (m_putData)
{
String str;
m_putData->toString(&str);
std::cout << str;
std::cout << std::endl;
}
if (m_getData)
{
String str;
m_getData->toString(&str);
std::cout << str;
std::cout << std::endl;
}
}
virtual void getGetDone(epics::pvData::Status *status)
{
std::cout << "getGetDone(" << status->toString() << ")" << std::endl;
if (m_getData)
{
String str;
m_getData->toString(&str);
std::cout << str;
std::cout << std::endl;
}
}
virtual void getPutDone(epics::pvData::Status *status)
{
std::cout << "getPutDone(" << status->toString() << ")" << std::endl;
if (m_putData)
{
String str;
m_putData->toString(&str);
std::cout << str;
std::cout << std::endl;
}
}
virtual void putGetDone(epics::pvData::Status *status)
{
std::cout << "putGetDone(" << status->toString() << ")" << std::endl;
if (m_putData)
{
String str;
m_putData->toString(&str);
std::cout << str;
std::cout << std::endl;
}
}
};
class MonitorRequesterImpl : public MonitorRequester
{
@@ -3291,7 +3580,7 @@ int main(int argc,char *argv[])
epicsThreadSleep ( 1.0 );
ChannelGetRequesterImpl channelGetRequesterImpl;
PVStructure* pvRequest = getCreateRequest()->createRequest("field(value, timeStamp)",&channelGetRequesterImpl);
PVStructure* pvRequest = getCreateRequest()->createRequest("field(value,timeStamp)",&channelGetRequesterImpl);
ChannelGet* channelGet = channel->createChannelGet(&channelGetRequesterImpl, pvRequest);
epicsThreadSleep ( 3.0 );
channelGet->get(false);
@@ -3301,7 +3590,7 @@ int main(int argc,char *argv[])
ChannelPutRequesterImpl channelPutRequesterImpl;
pvRequest = getCreateRequest()->createRequest("field(value, timeStamp)",&channelPutRequesterImpl);
pvRequest = getCreateRequest()->createRequest("field(value,timeStamp)",&channelPutRequesterImpl);
ChannelPut* channelPut = channel->createChannelPut(&channelPutRequesterImpl, pvRequest);
epicsThreadSleep ( 1.0 );
channelPut->get();
@@ -3310,10 +3599,22 @@ int main(int argc,char *argv[])
epicsThreadSleep ( 1.0 );
channelPut->destroy();
*/
ChannelPutGetRequesterImpl channelPutGetRequesterImpl;
PVStructure* pvRequest = getCreateRequest()->createRequest("putField(value,timeStamp)getField(timeStamp)",&channelPutGetRequesterImpl);
ChannelPutGet* channelPutGet = channel->createChannelPutGet(&channelPutGetRequesterImpl, pvRequest);
epicsThreadSleep ( 1.0 );
channelPutGet->getGet();
epicsThreadSleep ( 1.0 );
channelPutGet->getPut();
epicsThreadSleep ( 1.0 );
channelPutGet->putGet(false);
epicsThreadSleep ( 1.0 );
channelPutGet->destroy();
/*
MonitorRequesterImpl monitorRequesterImpl;
PVStructure* pvRequest = getCreateRequest()->createRequest("field(value, timeStamp)",&monitorRequesterImpl);
PVStructure* pvRequest = getCreateRequest()->createRequest("field(value,timeStamp)",&monitorRequesterImpl);
Monitor* monitor = channel->createMonitor(&monitorRequesterImpl, pvRequest);
epicsThreadSleep( 1.0 );
@@ -3331,7 +3632,7 @@ int main(int argc,char *argv[])
monitor->destroy();
*/
epicsThreadSleep ( 3.0 );
printf("Destroying channel... \n");
channel->destroy();