channelRPC done

This commit is contained in:
Matej Sekoranja
2011-01-26 10:46:51 +01:00
parent c1c391854c
commit 7d650f3d68

View File

@@ -876,6 +876,157 @@ class ChannelPutGetImpl : public BaseRequestImpl, public ChannelPutGet
PVDATA_REFCOUNT_MONITOR_DEFINE(channelRPC);
class ChannelRPCImpl : public BaseRequestImpl, public ChannelRPC
{
private:
ChannelRPCRequester* m_channelRPCRequester;
PVStructure* m_pvRequest;
PVStructure* m_data;
BitSet* m_bitSet;
private:
~ChannelRPCImpl()
{
PVDATA_REFCOUNT_MONITOR_DESTRUCT(channelRPC);
}
public:
ChannelRPCImpl(ChannelImpl* channel, ChannelRPCRequester* channelRPCRequester, PVStructure *pvRequest) :
BaseRequestImpl(channel, channelRPCRequester),
m_channelRPCRequester(channelRPCRequester), m_pvRequest(pvRequest),
//(dynamic_cast<PVStructure*>(getPVDataCreate()->createPVField(0, "", pvRequest))),
m_data(0), m_bitSet(0)
{
PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelRPC);
// subscribe
// try {
resubscribeSubscription(m_channel->checkAndGetTransport());
// } catch (IllegalStateException ise) {
// TODO m_channelRPCRequester->channelRPCConnect(channelNotConnected, null, null, null);
// } catch (CAException caex) {
// TODO m_channelRPCRequester->channelRPCConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", caex), null, null, null);
// }
}
virtual void send(ByteBuffer* buffer, TransportSendControl* control) {
int32 pendingRequest = getPendingRequest();
if (pendingRequest < 0)
{
BaseRequestImpl::send(buffer, control);
return;
}
control->startMessage((int8)20, 9);
buffer->putInt(m_channel->getServerChannelID());
buffer->putInt(m_ioid);
if ((m_pendingRequest & QOS_INIT) == 0)
buffer->putByte((int8)m_pendingRequest);
if (pendingRequest & QOS_INIT)
{
buffer->putByte((int8)QOS_INIT);
// pvRequest
m_channel->getTransport()->getIntrospectionRegistry()->serializePVRequest(buffer, control, m_pvRequest);
}
else
{
m_bitSet->serialize(buffer, control);
m_data->serialize(buffer, control, m_bitSet);
}
stopRequest();
}
virtual bool destroyResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) {
// data available
// TODO we need a flag here...
return normalResponse(transport, version, payloadBuffer, qos, status);
}
virtual bool initResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) {
if (!status->isSuccess())
{
m_channelRPCRequester->channelRPCConnect(status, this, 0, 0);
return true;
}
// create data and its bitSet
m_data = transport->getIntrospectionRegistry()->deserializeStructureAndCreatePVStructure(payloadBuffer, transport);
m_bitSet = new BitSet(m_data->getNumberFields());
// notify
m_channelRPCRequester->channelRPCConnect(okStatus, this, m_data, m_bitSet);
return true;
}
virtual bool normalResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) {
if (!status->isSuccess())
{
m_channelRPCRequester->requestDone(status, 0);
return true;
}
auto_ptr<PVStructure> response(transport->getIntrospectionRegistry()->deserializeStructureAndCreatePVStructure(payloadBuffer, transport));
m_channelRPCRequester->requestDone(okStatus, response.get());
return true;
}
virtual void request(bool lastRequest) {
// TODO sync?
if (m_destroyed) {
m_channelRPCRequester->requestDone(destroyedStatus, 0);
return;
}
if (!startRequest(lastRequest ? QOS_DESTROY : QOS_DEFAULT)) {
m_channelRPCRequester->requestDone(otherRequestPendingStatus, 0);
return;
}
//try {
m_channel->checkAndGetTransport()->enqueueSendRequest(this);
//} catch (IllegalStateException ise) {
//TODO // m_channelRPCRequester->requestDone(channelNotConnected, 0);
//}
}
virtual void resubscribeSubscription(Transport* transport) {
startRequest(QOS_INIT);
transport->enqueueSendRequest(this);
}
virtual void destroy()
{
BaseRequestImpl::destroy();
// TODO sync
if (m_data) delete m_data;
if (m_bitSet) delete m_bitSet;
if (m_pvRequest) delete m_pvRequest;
delete this;
}
};
PVDATA_REFCOUNT_MONITOR_DEFINE(channelGetField);
// NOTE: this instance is not returned as Request, so it must self-destruct
@@ -2434,8 +2585,7 @@ class TestChannelImpl : public ChannelImpl {
virtual ChannelRPC* createChannelRPC(ChannelRPCRequester *channelRPCRequester,
epics::pvData::PVStructure *pvRequest)
{
// TODO
return 0;
return new ChannelRPCImpl(this, channelRPCRequester, pvRequest);
}
virtual epics::pvData::Monitor* createMonitor(
@@ -3413,6 +3563,7 @@ class ChannelPutGetRequesterImpl : public ChannelPutGetRequester
{
std::cout << "channelGetPutConnect(" << status->toString() << ")" << std::endl;
// TODO sync
m_channelPutGet = channelPutGet;
m_putData = putData;
m_getData = getData;
@@ -3470,6 +3621,54 @@ class ChannelPutGetRequesterImpl : public ChannelPutGetRequester
};
class ChannelRPCRequesterImpl : public ChannelRPCRequester
{
ChannelRPC *m_channelRPC;
epics::pvData::PVStructure *m_pvStructure;
epics::pvData::BitSet *m_bitSet;
virtual String getRequesterName()
{
return "ChannelRPCRequesterImpl";
};
virtual void message(String message,MessageType messageType)
{
std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl;
}
virtual void channelRPCConnect(epics::pvData::Status *status,ChannelRPC *channelRPC,
epics::pvData::PVStructure *pvStructure,epics::pvData::BitSet *bitSet)
{
std::cout << "channelRPCConnect(" << status->toString() << ")" << std::endl;
if (pvStructure)
{
String st;
pvStructure->toString(&st);
std::cout << st << std::endl;
}
// TODO sync
m_channelRPC = channelRPC;
m_pvStructure = pvStructure;
m_bitSet = bitSet;
}
virtual void requestDone(epics::pvData::Status *status,epics::pvData::PVStructure *pvResponse)
{
std::cout << "requestDone(" << status->toString() << ")" << std::endl;
if (pvResponse)
{
String str;
pvResponse->toString(&str);
std::cout << str;
std::cout << std::endl;
}
}
};
class MonitorRequesterImpl : public MonitorRequester
{
virtual String getRequesterName()
@@ -3566,6 +3765,8 @@ int main(int argc,char *argv[])
epicsThreadSleep ( 1.0 );
channel->printInfo();
PVStructure* pvRequest;
/*
GetFieldRequesterImpl getFieldRequesterImpl;
channel->getField(&getFieldRequesterImpl, "");
@@ -3580,7 +3781,7 @@ int main(int argc,char *argv[])
epicsThreadSleep ( 1.0 );
ChannelGetRequesterImpl channelGetRequesterImpl;
PVStructure* pvRequest = getCreateRequest()->createRequest("field(value,timeStamp)",&channelGetRequesterImpl);
pvRequest = getCreateRequest()->createRequest("field(value,timeStamp)",&channelGetRequesterImpl);
ChannelGet* channelGet = channel->createChannelGet(&channelGetRequesterImpl, pvRequest);
epicsThreadSleep ( 3.0 );
channelGet->get(false);
@@ -3598,9 +3799,9 @@ int main(int argc,char *argv[])
channelPut->put(false);
epicsThreadSleep ( 1.0 );
channelPut->destroy();
*/
ChannelPutGetRequesterImpl channelPutGetRequesterImpl;
PVStructure* pvRequest = getCreateRequest()->createRequest("putField(value,timeStamp)getField(timeStamp)",&channelPutGetRequesterImpl);
pvRequest = getCreateRequest()->createRequest("putField(value,timeStamp)getField(timeStamp)",&channelPutGetRequesterImpl);
ChannelPutGet* channelPutGet = channel->createChannelPutGet(&channelPutGetRequesterImpl, pvRequest);
epicsThreadSleep ( 1.0 );
channelPutGet->getGet();
@@ -3610,8 +3811,17 @@ int main(int argc,char *argv[])
channelPutGet->putGet(false);
epicsThreadSleep ( 1.0 );
channelPutGet->destroy();
/*
*/
ChannelRPCRequesterImpl channelRPCRequesterImpl;
pvRequest = getCreateRequest()->createRequest("record[]field(arguments)",&channelRPCRequesterImpl);
ChannelRPC* channelRPC = channel->createChannelRPC(&channelRPCRequesterImpl, pvRequest);
epicsThreadSleep ( 1.0 );
channelRPC->request(false);
epicsThreadSleep ( 1.0 );
channelRPC->destroy();
/*
MonitorRequesterImpl monitorRequesterImpl;
PVStructure* pvRequest = getCreateRequest()->createRequest("field(value,timeStamp)",&monitorRequesterImpl);