process, getField implementation

This commit is contained in:
Matej Sekoranja
2011-01-19 20:01:56 +01:00
parent 4c49c498c9
commit ff25642cd1
4 changed files with 320 additions and 143 deletions

View File

@@ -221,135 +221,102 @@ public:
PVDATA_REFCOUNT_MONITOR_DEFINE(mockChannelProcess);
PVDATA_REFCOUNT_MONITOR_DEFINE(channelProcess);
class ChannelImplProcess : public ChannelProcess
class ChannelProcessRequestImpl : public BaseRequestImpl, public ChannelProcess
{
private:
ChannelProcessRequester* m_channelProcessRequester;
PVStructure* m_pvStructure;
PVScalar* m_valueField;
ChannelProcessRequester* m_callback;
volatile bool m_initialized;
PVStructure* m_pvRequest;
private:
~ChannelImplProcess()
~ChannelProcessRequestImpl()
{
PVDATA_REFCOUNT_MONITOR_DESTRUCT(mockChannelProcess);
PVDATA_REFCOUNT_MONITOR_DESTRUCT(channelProcess);
}
public:
ChannelImplProcess(ChannelProcessRequester* channelProcessRequester, PVStructure *pvStructure, PVStructure *pvRequest) :
m_channelProcessRequester(channelProcessRequester), m_pvStructure(pvStructure)
ChannelProcessRequestImpl(ChannelImpl* channel, ChannelProcessRequester* callback, PVStructure *pvRequest) :
BaseRequestImpl(channel, callback),
m_callback(callback), m_initialized(false), m_pvRequest(pvRequest)
{
PVDATA_REFCOUNT_MONITOR_CONSTRUCT(mockChannelProcess);
PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelProcess);
PVField* field = pvStructure->getSubField(String("value"));
if (field == 0)
{
Status* noValueFieldStatus = getStatusCreate()->createStatus(STATUSTYPE_ERROR, "no 'value' field");
m_channelProcessRequester->channelProcessConnect(noValueFieldStatus, this);
delete noValueFieldStatus;
// TODO best-effort support
// NOTE client must destroy this instance...
// do not access any fields and return ASAP
return;
}
if (field->getField()->getType() != scalar)
{
Status* notAScalarStatus = getStatusCreate()->createStatus(STATUSTYPE_ERROR, "'value' field not scalar type");
m_channelProcessRequester->channelProcessConnect(notAScalarStatus, this);
delete notAScalarStatus;
// NOTE client must destroy this instance….
// do not access any fields and return ASAP
return;
}
m_valueField = static_cast<PVScalar*>(field);
// TODO pvRequest
m_channelProcessRequester->channelProcessConnect(g_statusOK, this);
// subscribe
// try {
resubscribeSubscription(channel->checkAndGetTransport());
/* } catch (IllegalStateException ise) {
callback.channelProcessConnect(channelNotConnected, null);
} catch (CAException e) {
callback.channelProcessConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", e), null);
}*/
}
virtual void send(ByteBuffer* buffer, TransportSendControl* control) {
int32 pendingRequest = getPendingRequest();
if (pendingRequest < 0)
{
BaseRequestImpl::send(buffer, control);
return;
}
control->startMessage((int8)16, 9);
buffer->putInt(m_channel->getServerChannelID());
buffer->putInt(m_ioid);
buffer->putByte((int8)m_pendingRequest);
if (pendingRequest & QOS_INIT)
{
// pvRequest
m_channel->getTransport()->getIntrospectionRegistry()->serializePVRequest(buffer, control, m_pvRequest);
}
stopRequest();
}
virtual bool destroyResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) {
m_callback->processDone(status);
return true;
}
virtual bool initResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) {
m_callback->channelProcessConnect(status, this);
return true;
}
virtual bool normalResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) {
m_callback->processDone(status);
return true;
}
virtual void process(bool lastRequest)
{
switch (m_valueField->getScalar()->getScalarType())
{
case pvBoolean:
{
// negate
PVBoolean *pvBoolean = static_cast<PVBoolean*>(m_valueField);
pvBoolean->put(!pvBoolean->get());
break;
}
case pvByte:
{
// increment by one
PVByte *pvByte = static_cast<PVByte*>(m_valueField);
pvByte->put(pvByte->get() + 1);
break;
}
case pvShort:
{
// increment by one
PVShort *pvShort = static_cast<PVShort*>(m_valueField);
pvShort->put(pvShort->get() + 1);
break;
}
case pvInt:
{
// increment by one
PVInt *pvInt = static_cast<PVInt*>(m_valueField);
pvInt->put(pvInt->get() + 1);
break;
}
case pvLong:
{
// increment by one
PVLong *pvLong = static_cast<PVLong*>(m_valueField);
pvLong->put(pvLong->get() + 1);
break;
}
case pvFloat:
{
// increment by one
PVFloat *pvFloat = static_cast<PVFloat*>(m_valueField);
pvFloat->put(pvFloat->get() + 1.0f);
break;
}
case pvDouble:
{
// increment by one
PVDouble *pvDouble = static_cast<PVDouble*>(m_valueField);
pvDouble->put(pvDouble->get() + 1.0);
break;
}
case pvString:
{
// increment by one
PVString *pvString = static_cast<PVString*>(m_valueField);
String val = pvString->get();
if (val.empty())
pvString->put("gen0");
else
{
char c = val[0];
c++;
pvString->put("gen" + c);
}
break;
}
default:
// noop
break;
}
m_channelProcessRequester->processDone(g_statusOK);
if (lastRequest)
destroy();
// TODO sync
if (m_destroyed) {
m_callback->processDone(destroyedStatus);
return;
}
if (!startRequest(lastRequest ? QOS_DESTROY : QOS_DEFAULT)) {
m_callback->processDone(otherRequestPendingStatus);
return;
}
//try {
m_channel->checkAndGetTransport()->enqueueSendRequest(this);
//} catch (IllegalStateException ise) {
// m_callback->processDone(channelNotConnected);
//}
}
virtual void resubscribeSubscription(Transport* transport) {
startRequest(QOS_INIT);
transport->enqueueSendRequest(this);
}
virtual void destroy()
{
delete this;
@@ -362,12 +329,12 @@ class ChannelImplProcess : public ChannelProcess
PVDATA_REFCOUNT_MONITOR_DEFINE(channelGet);
class ChannelImplGet : public BaseRequestImpl, public ChannelGet
class ChannelGetImpl : public BaseRequestImpl, public ChannelGet
{
private:
ChannelImpl* m_channel;
ChannelGetRequester* m_channelGetRequester;
PVStructure* m_pvRequest;
@@ -376,15 +343,15 @@ class ChannelImplGet : public BaseRequestImpl, public ChannelGet
BitSet* m_bitSet;
private:
~ChannelImplGet()
~ChannelGetImpl()
{
PVDATA_REFCOUNT_MONITOR_DESTRUCT(channelGet);
}
public:
ChannelImplGet(ChannelImpl* channel, ChannelGetRequester* channelGetRequester, PVStructure *pvRequest) :
ChannelGetImpl(ChannelImpl* channel, ChannelGetRequester* channelGetRequester, PVStructure *pvRequest) :
BaseRequestImpl(channel, channelGetRequester),
m_channel(channel), m_channelGetRequester(channelGetRequester), m_pvRequest(pvRequest), // TODO pvRequest
m_channelGetRequester(channelGetRequester), m_pvRequest(pvRequest), // TODO pvRequest
m_data(0), m_bitSet(0)
{
PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelGet);
@@ -491,7 +458,9 @@ class ChannelImplGet : public BaseRequestImpl, public ChannelGet
virtual void destroy()
{
// delete m_bitSet;
// TODO sync
if (m_data) delete m_data;
if (m_bitSet) delete m_bitSet;
delete this;
}
@@ -502,6 +471,139 @@ class ChannelImplGet : public BaseRequestImpl, public ChannelGet
PVDATA_REFCOUNT_MONITOR_DEFINE(channelGetField);
class ChannelGetFieldRequestImpl : public DataResponse, public TransportSender
{
private:
ChannelImpl* m_channel;
ClientContextImpl* m_context;
pvAccessID m_ioid;
GetFieldRequester* m_callback;
String m_subField;
Mutex m_mutex;
bool m_destroyed;
private:
~ChannelGetFieldRequestImpl()
{
PVDATA_REFCOUNT_MONITOR_DESTRUCT(channelGetField);
}
public:
ChannelGetFieldRequestImpl(ChannelImpl* channel, GetFieldRequester* callback, String subField) :
m_channel(channel), m_context(channel->getContext()),
m_callback(callback), m_subField(subField),
m_destroyed(false)
{
PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelGetField);
// register response request
m_ioid = m_context->registerResponseRequest(this);
channel->registerResponseRequest(this);
// TODO
// enqueue send request
//try {
m_channel->checkAndGetTransport()->enqueueSendRequest(this);
//} catch (IllegalStateException ise) {
// callback.getDone(BaseRequestImpl.channelNotConnected, null);
//}
}
Requester* getRequester() {
return m_callback;
}
pvAccessID getIOID() {
return m_ioid;
}
virtual void lock() {
// noop
}
virtual void send(ByteBuffer* buffer, TransportSendControl* control) {
control->startMessage((int8)17, 8);
buffer->putInt(m_channel->getServerChannelID());
buffer->putInt(m_ioid);
SerializeHelper::serializeString(m_subField, buffer, control);
}
virtual void cancel() {
destroy();
// TODO notify?
}
virtual void timeout() {
cancel();
}
void reportStatus(Status* status) {
// destroy, since channel (parent) was destroyed
if (status == ChannelImpl::channelDestroyed)
destroy();
// TODO notify?
}
virtual void unlock() {
// noop
}
virtual void destroy()
{
{
Lock guard(&m_mutex);
if (m_destroyed)
return;
m_destroyed = true;
}
// unregister response request
m_context->unregisterResponseRequest(this);
m_channel->unregisterResponseRequest(this);
delete this;
}
virtual void response(Transport* transport, int8 version, ByteBuffer* payloadBuffer) {
// TODO?
// try
// {
Status* status = statusCreate->deserializeStatus(payloadBuffer, transport);
if (status->isSuccess())
{
// deserialize Field...
const Field* field = transport->getIntrospectionRegistry()->deserialize(payloadBuffer, transport);
m_callback->getDone(status, field);
}
else
{
m_callback->getDone(status, 0);
}
// TODO
if (status != okStatus)
delete status;
// } // TODO guard callback
// finally
// {
// always cancel request
// cancel();
// }
}
};
PVDATA_REFCOUNT_MONITOR_DEFINE(mockChannelPut);
@@ -1223,9 +1325,6 @@ class TestChannelImpl : public ChannelImpl {
*/
bool m_issueCreateMessage;
// TODO mock
PVStructure* m_pvStructure;
private:
~TestChannelImpl()
{
@@ -1273,7 +1372,6 @@ class TestChannelImpl : public ChannelImpl {
virtual void destroy()
{
if (m_addresses) delete m_addresses;
delete m_pvStructure;
delete this;
};
@@ -1765,28 +1863,28 @@ class TestChannelImpl : public ChannelImpl {
virtual void getField(GetFieldRequester *requester,epics::pvData::String subField)
{
requester->getDone(g_statusOK,m_pvStructure->getSubField(subField)->getField());
new ChannelGetFieldRequestImpl(this, requester, subField);
}
virtual ChannelProcess* createChannelProcess(
ChannelProcessRequester *channelProcessRequester,
epics::pvData::PVStructure *pvRequest)
{
return new ChannelImplProcess(channelProcessRequester, m_pvStructure, pvRequest);
return new ChannelProcessRequestImpl(this, channelProcessRequester, pvRequest);
}
virtual ChannelGet* createChannelGet(
ChannelGetRequester *channelGetRequester,
epics::pvData::PVStructure *pvRequest)
{
return new ChannelImplGet(this, channelGetRequester, pvRequest);
return new ChannelGetImpl(this, channelGetRequester, pvRequest);
}
virtual ChannelPut* createChannelPut(
ChannelPutRequester *channelPutRequester,
epics::pvData::PVStructure *pvRequest)
{
return new ChannelImplPut(channelPutRequester, m_pvStructure, pvRequest);
return new ChannelImplPut(channelPutRequester, 0, pvRequest);
}
virtual ChannelPutGet* createChannelPutGet(
@@ -1808,7 +1906,7 @@ class TestChannelImpl : public ChannelImpl {
epics::pvData::MonitorRequester *monitorRequester,
epics::pvData::PVStructure *pvRequest)
{
return new MockMonitor(monitorRequester, m_pvStructure, pvRequest);
return new MockMonitor(monitorRequester, 0, pvRequest);
}
virtual ChannelArray* createChannelArray(
@@ -2114,6 +2212,7 @@ class TestChannelImpl : public ChannelImpl {
listenLocalAddress, CA_MINOR_PROTOCOL_REVISION,
CA_DEFAULT_PRIORITY);
BlockingUDPConnector* searchConnector = new BlockingUDPConnector(false, broadcastAddresses, true);
// undefined address
@@ -2845,12 +2944,21 @@ int main(int argc,char *argv[])
ChannelRequesterImpl channelRequester;
Channel* channel = context->getProvider()->createChannel("structureArrayTest", &channelRequester);
channel->printInfo();
//GetFieldRequesterImpl getFieldRequesterImpl;
//channel->getField(&getFieldRequesterImpl, "timeStamp.secondsPastEpoch");
epicsThreadSleep ( 3.0 );
GetFieldRequesterImpl getFieldRequesterImpl;
channel->getField(&getFieldRequesterImpl, "");
epicsThreadSleep ( 1.0 );
ChannelProcessRequesterImpl channelProcessRequester;
ChannelProcess* channelProcess = channel->createChannelProcess(&channelProcessRequester, 0);
channelProcess->process(false);
channelProcess->destroy();
epicsThreadSleep ( 1.0 );
ChannelGetRequesterImpl channelGetRequesterImpl;
PVStructure* pvRequest = getCreateRequest()->createRequest("field(timeStamp,value)",&channelGetRequesterImpl);
ChannelGet* channelGet = channel->createChannelGet(&channelGetRequesterImpl, pvRequest);
@@ -2875,11 +2983,6 @@ int main(int argc,char *argv[])
delete status;
ChannelProcessRequesterImpl channelProcessRequester;
ChannelProcess* channelProcess = channel->createChannelProcess(&channelProcessRequester, 0);
channelProcess->process(false);
channelProcess->destroy();
status = monitor->stop();
std::cout << "monitor->stop() = " << status->toString() << std::endl;