channelArray done.

This commit is contained in:
Matej Sekoranja
2011-01-26 12:31:41 +01:00
parent 7d650f3d68
commit 4bb231f22c

View File

@@ -243,7 +243,7 @@ class ChannelProcessRequestImpl : public BaseRequestImpl, public ChannelProcess
{
PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelProcess);
// TODO check for nulls!!!!
// TODO check for 0s!!!!
// TODO best-effort support
@@ -251,9 +251,9 @@ class ChannelProcessRequestImpl : public BaseRequestImpl, public ChannelProcess
// try {
resubscribeSubscription(channel->checkAndGetTransport());
/* } catch (IllegalStateException ise) {
callback.channelProcessConnect(channelNotConnected, null);
callback.channelProcessConnect(channelNotConnected, 0);
} catch (CAException e) {
callback.channelProcessConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", e), null);
callback.channelProcessConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", e), 0);
}*/
}
@@ -368,9 +368,9 @@ class ChannelGetImpl : public BaseRequestImpl, public ChannelGet
// try {
resubscribeSubscription(m_channel->checkAndGetTransport());
// } catch (IllegalStateException ise) {
// TODO m_channelGetRequester->channelGetConnect(channelNotConnected, null, null, null);
// TODO m_channelGetRequester->channelGetConnect(channelNotConnected, 0, 0, 0);
// } catch (CAException caex) {
// TODO m_channelGetRequester->channelGetConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", caex), null, null, null);
// TODO m_channelGetRequester->channelGetConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", caex), 0, 0, 0);
// }
}
@@ -517,9 +517,9 @@ class ChannelPutImpl : public BaseRequestImpl, public ChannelPut
// try {
resubscribeSubscription(m_channel->checkAndGetTransport());
// } catch (IllegalStateException ise) {
// TODO m_channelPutRequester->channelPutConnect(channelNotConnected, null, null, null);
// TODO m_channelPutRequester->channelPutConnect(channelNotConnected, 0, 0, 0);
// } catch (CAException caex) {
// TODO m_channelPutRequester->channelPutConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", caex), null, null, null);
// TODO m_channelPutRequester->channelPutConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", caex), 0, 0, 0);
// }
}
@@ -691,9 +691,9 @@ class ChannelPutGetImpl : public BaseRequestImpl, public ChannelPutGet
// try {
resubscribeSubscription(m_channel->checkAndGetTransport());
// } catch (IllegalStateException ise) {
// TODO m_channelPutGetRequester->channelPutGetConnect(channelNotConnected, null, null, null);
// TODO m_channelPutGetRequester->channelPutGetConnect(channelNotConnected, 0, 0, 0);
// } catch (CAException caex) {
// TODO m_channelPutGetRequester->channelPutGetConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", caex), null, null, null);
// TODO m_channelPutGetRequester->channelPutGetConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", caex), 0, 0, 0);
// }
}
@@ -909,9 +909,9 @@ class ChannelRPCImpl : public BaseRequestImpl, public ChannelRPC
// try {
resubscribeSubscription(m_channel->checkAndGetTransport());
// } catch (IllegalStateException ise) {
// TODO m_channelRPCRequester->channelRPCConnect(channelNotConnected, null, null, null);
// TODO m_channelRPCRequester->channelRPCConnect(channelNotConnected, 0, 0, 0);
// } catch (CAException caex) {
// TODO m_channelRPCRequester->channelRPCConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", caex), null, null, null);
// TODO m_channelRPCRequester->channelRPCConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", caex), 0, 0, 0);
// }
}
@@ -1026,6 +1026,227 @@ class ChannelRPCImpl : public BaseRequestImpl, public ChannelRPC
PVDATA_REFCOUNT_MONITOR_DEFINE(channelArray);
class ChannelArrayImpl : public BaseRequestImpl, public ChannelArray
{
private:
ChannelArrayRequester* m_channelArrayRequester;
PVStructure* m_pvRequest;
PVArray* m_data;
int32 m_offset;
int32 m_count;
int32 m_length;
int32 m_capacity;
private:
~ChannelArrayImpl()
{
PVDATA_REFCOUNT_MONITOR_DESTRUCT(channelArray);
}
public:
ChannelArrayImpl(ChannelImpl* channel, ChannelArrayRequester* channelArrayRequester, PVStructure *pvRequest) :
BaseRequestImpl(channel, channelArrayRequester),
m_channelArrayRequester(channelArrayRequester), m_pvRequest(pvRequest),
//(dynamic_cast<PVStructure*>(getPVDataCreate()->createPVField(0, "", pvRequest))),
m_data(0), m_offset(0), m_count(0), m_length(-1), m_capacity(-1)
{
PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelArray);
// subscribe
// try {
resubscribeSubscription(m_channel->checkAndGetTransport());
// } catch (IllegalStateException ise) {
// TODO m_channelArrayRequester->channelArrayConnect(channelNotConnected, 0, 0);
// } catch (CAException caex) {
// TODO m_channelArrayRequester->channelArrayConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", caex), 0, 0);
// }
}
virtual void send(ByteBuffer* buffer, TransportSendControl* control) {
int32 pendingRequest = getPendingRequest();
if (pendingRequest < 0)
{
BaseRequestImpl::send(buffer, control);
return;
}
control->startMessage((int8)14, 9);
buffer->putInt(m_channel->getServerChannelID());
buffer->putInt(m_ioid);
buffer->putByte((int8)m_pendingRequest);
if (pendingRequest & QOS_INIT)
{
// pvRequest
m_channel->getTransport()->getIntrospectionRegistry()->serializePVRequest(buffer, control, m_pvRequest);
}
else if (pendingRequest & QOS_GET)
{
SerializeHelper::writeSize(m_offset, buffer, control);
SerializeHelper::writeSize(m_count, buffer, control);
}
else if (pendingRequest & QOS_GET_PUT) // i.e. setLength
{
SerializeHelper::writeSize(m_length, buffer, control);
SerializeHelper::writeSize(m_capacity, buffer, control);
}
// put
else
{
SerializeHelper::writeSize(m_offset, buffer, control);
m_data->serialize(buffer, control, 0, m_count); // put from 0 offset; TODO count out-of-bounds check?!
}
stopRequest();
}
virtual bool destroyResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) {
// data available (get with destroy)
if (qos & QOS_GET)
return normalResponse(transport, version, payloadBuffer, qos, status);
return true;
}
virtual bool initResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) {
if (!status->isSuccess())
{
m_channelArrayRequester->channelArrayConnect(status, this, 0);
return true;
}
// create data and its bitSet
FieldConstPtr field = transport->getIntrospectionRegistry()->deserialize(payloadBuffer, transport);
m_data = dynamic_cast<PVArray*>(getPVDataCreate()->createPVField(0, field));
// notify
m_channelArrayRequester->channelArrayConnect(okStatus, this, m_data);
return true;
}
virtual bool normalResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) {
if (qos & QOS_GET)
{
if (!status->isSuccess())
{
m_channelArrayRequester->getArrayDone(status);
return true;
}
m_data->deserialize(payloadBuffer, transport);
m_channelArrayRequester->getArrayDone(okStatus);
return true;
}
else if (qos & QOS_GET_PUT)
{
m_channelArrayRequester->setLengthDone(status);
return true;
}
else
{
m_channelArrayRequester->putArrayDone(status);
return true;
}
}
virtual void getArray(bool lastRequest, int offset, int count) {
// TODO sync?
if (m_destroyed) {
m_channelArrayRequester->getArrayDone(destroyedStatus);
return;
}
if (!startRequest(lastRequest ? QOS_DESTROY | QOS_GET : QOS_GET)) {
m_channelArrayRequester->getArrayDone(otherRequestPendingStatus);
return;
}
//try {
m_offset = offset;
m_count = count;
m_channel->checkAndGetTransport()->enqueueSendRequest(this);
//} catch (IllegalStateException ise) {
//TODO // m_channelArrayRequester->getArrayDone(channelNotConnected);
//}
}
virtual void putArray(bool lastRequest, int offset, int count) {
// TODO sync?
if (m_destroyed) {
m_channelArrayRequester->putArrayDone(destroyedStatus);
return;
}
if (!startRequest(lastRequest ? QOS_DESTROY : QOS_DEFAULT)) {
m_channelArrayRequester->putArrayDone(otherRequestPendingStatus);
return;
}
//try {
m_offset = offset;
m_count = count;
m_channel->checkAndGetTransport()->enqueueSendRequest(this);
//} catch (IllegalStateException ise) {
//TODO // m_channelArrayRequester->putArrayDone(channelNotConnected);
//}
}
virtual void setLength(bool lastRequest, int length, int capacity) {
// TODO sync?
if (m_destroyed) {
m_channelArrayRequester->setLengthDone(destroyedStatus);
return;
}
if (!startRequest(lastRequest ? QOS_DESTROY | QOS_GET_PUT : QOS_GET_PUT)) {
m_channelArrayRequester->setLengthDone(otherRequestPendingStatus);
return;
}
//try {
m_length = length;
m_capacity = capacity;
m_channel->checkAndGetTransport()->enqueueSendRequest(this);
//} catch (IllegalStateException ise) {
//TODO // m_channelArrayRequester->setLengthDone(channelNotConnected);
//}
}
virtual void resubscribeSubscription(Transport* transport) {
startRequest(QOS_INIT);
transport->enqueueSendRequest(this);
}
virtual void destroy()
{
BaseRequestImpl::destroy();
// TODO sync
if (m_data) delete m_data;
if (m_pvRequest) delete m_pvRequest;
delete this;
}
};
PVDATA_REFCOUNT_MONITOR_DEFINE(channelGetField);
@@ -1065,7 +1286,7 @@ class ChannelGetFieldRequestImpl : public DataResponse, public TransportSender
//try {
m_channel->checkAndGetTransport()->enqueueSendRequest(this);
//} catch (IllegalStateException ise) {
// callback.getDone(BaseRequestImpl.channelNotConnected, null);
// callback.getDone(BaseRequestImpl.channelNotConnected, 0);
//}
}
@@ -1215,9 +1436,9 @@ public MonitorElement
// try {
resubscribeSubscription(m_channel->checkAndGetTransport());
// } catch (IllegalStateException ise) {
// TODO m_monitorRequester->monitorConnect(channelNotConnected, null, null);
// TODO m_monitorRequester->monitorConnect(channelNotConnected, 0, 0);
// } catch (CAException caex) {
// TODO m_monitorRequester->monitorConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", caex), null, null);
// TODO m_monitorRequester->monitorConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", caex), 0, 0);
// }
}
@@ -2077,7 +2298,7 @@ class TestChannelImpl : public ChannelImpl {
return m_context->getProvider();
}
// NOTE: synchronization guarantees that <code>transport</code> is non-<code>null</code> and <code>state == CONNECTED</code>.
// NOTE: synchronization guarantees that <code>transport</code> is non-<code>0</code> and <code>state == CONNECTED</code>.
virtual epics::pvData::String getRemoteAddress()
{
Lock guard(&m_channelMutex);
@@ -2599,8 +2820,7 @@ class TestChannelImpl : public ChannelImpl {
ChannelArrayRequester *channelArrayRequester,
epics::pvData::PVStructure *pvRequest)
{
// TODO
return 0;
return new ChannelArrayImpl(this, channelArrayRequester, pvRequest);
}
@@ -2685,7 +2905,7 @@ class TestChannelImpl : public ChannelImpl {
m_context->checkChannelName(channelName);
if (!channelFindRequester)
throw std::runtime_error("null requester");
throw std::runtime_error("0 requester");
std::auto_ptr<Status> errorStatus(getStatusCreate()->createStatus(STATUSTYPE_ERROR, "not implemented", 0));
channelFindRequester->channelFindResult(errorStatus.get(), 0, false);
@@ -2740,7 +2960,7 @@ class TestChannelImpl : public ChannelImpl {
TODO
final ConfigurationProvider configurationProvider = ConfigurationFactory.getProvider();
Configuration config = configurationProvider.getConfiguration("pvAccess-client");
if (config == null)
if (config == 0)
config = configurationProvider.getConfiguration("system");
return config;
*/
@@ -2978,7 +3198,7 @@ TODO
*/
void checkChannelName(String& name) {
if (name.empty())
throw std::runtime_error("null or empty channel name");
throw std::runtime_error("0 or empty channel name");
else if (name.length() > UNREASONABLE_CHANNEL_NAME_LENGTH)
throw std::runtime_error("name too long");
}
@@ -3080,7 +3300,7 @@ TODO
/**
* Unregister response request.
* @param request
* @return removed object, can be <code>null</code>
* @return removed object, can be <code>0</code>
*/
ResponseRequest* unregisterResponseRequest(ResponseRequest* request)
{
@@ -3172,7 +3392,7 @@ TODO
checkChannelName(name);
if (requester == 0)
throw std::runtime_error("null requester");
throw std::runtime_error("0 requester");
if (priority < ChannelProvider::PRIORITY_MIN || priority > ChannelProvider::PRIORITY_MAX)
throw std::range_error("priority out of bounds");
@@ -3407,7 +3627,7 @@ class ChannelRequesterImpl : public ChannelRequester
virtual void channelCreated(epics::pvData::Status* status, Channel *channel)
{
std::cout << "channelCreated(" << status->toString() << ", "
<< (channel ? channel->getChannelName() : "(null)") << ")" << std::endl;
<< (channel ? channel->getChannelName() : "(0)") << ")" << std::endl;
}
virtual void channelStateChange(Channel *c, ConnectionState connectionState)
@@ -3438,7 +3658,7 @@ class GetFieldRequesterImpl : public GetFieldRequester
std::cout << str;
}
else
std::cout << "(null)";
std::cout << "(0)";
std::cout << ")" << std::endl;
}
};
@@ -3668,6 +3888,59 @@ class ChannelRPCRequesterImpl : public ChannelRPCRequester
}
};
class ChannelArrayRequesterImpl : public ChannelArrayRequester
{
ChannelArray *m_channelArray;
epics::pvData::PVArray *m_pvArray;
virtual String getRequesterName()
{
return "ChannelArrayRequesterImpl";
};
virtual void message(String message,MessageType messageType)
{
std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl;
}
virtual void channelArrayConnect(epics::pvData::Status *status,ChannelArray *channelArray,
epics::pvData::PVArray *pvArray)
{
std::cout << "channelArrayConnect(" << status->toString() << ")" << std::endl;
if (pvArray)
{
String st;
pvArray->toString(&st);
std::cout << st << std::endl;
}
// TODO sync
m_channelArray = channelArray;
m_pvArray = pvArray;
}
virtual void getArrayDone(epics::pvData::Status *status)
{
std::cout << "getArrayDone(" << status->toString() << ")" << std::endl;
if (m_pvArray)
{
String str;
m_pvArray->toString(&str);
std::cout << str;
std::cout << std::endl;
}
}
virtual void putArrayDone(epics::pvData::Status *status)
{
std::cout << "putArrayDone(" << status->toString() << ")" << std::endl;
}
virtual void setLengthDone(epics::pvData::Status *status)
{
std::cout << "setLengthDone(" << status->toString() << ")" << std::endl;
}
};
class MonitorRequesterImpl : public MonitorRequester
{
@@ -3755,10 +4028,12 @@ int main(int argc,char *argv[])
context->printInfo();
epicsThreadSleep ( 1.0 );
//ChannelFindRequesterImpl findRequester;
//context->getProvider()->channelFind("something", &findRequester);
/*
ChannelFindRequesterImpl findRequester;
ChannelFind* channelFind = context->getProvider()->channelFind("something", &findRequester);
epicsThreadSleep ( 1.0 );
channelFind->destroy();
*/
ChannelRequesterImpl channelRequester;
Channel* channel = context->getProvider()->createChannel("structureArrayTest", &channelRequester);
@@ -3767,7 +4042,7 @@ int main(int argc,char *argv[])
channel->printInfo();
PVStructure* pvRequest;
/*
GetFieldRequesterImpl getFieldRequesterImpl;
channel->getField(&getFieldRequesterImpl, "");
epicsThreadSleep ( 1.0 );
@@ -3811,7 +4086,7 @@ int main(int argc,char *argv[])
channelPutGet->putGet(false);
epicsThreadSleep ( 1.0 );
channelPutGet->destroy();
*/
ChannelRPCRequesterImpl channelRPCRequesterImpl;
pvRequest = getCreateRequest()->createRequest("record[]field(arguments)",&channelRPCRequesterImpl);
@@ -3821,10 +4096,25 @@ int main(int argc,char *argv[])
epicsThreadSleep ( 1.0 );
channelRPC->destroy();
/*
ChannelArrayRequesterImpl channelArrayRequesterImpl;
//pvRequest = getCreateRequest()->createRequest("value",&channelArrayRequesterImpl);
pvRequest = getPVDataCreate()->createPVStructure(0, "", 0);
PVString* pvFieldName = (PVString*)getPVDataCreate()->createPVScalar(pvRequest, "field", pvString);
pvFieldName->put("value");
pvRequest->appendPVField(pvFieldName);
ChannelArray* channelArray = channel->createChannelArray(&channelArrayRequesterImpl, pvRequest);
epicsThreadSleep ( 1.0 );
channelArray->getArray(false,0,-1);
epicsThreadSleep ( 1.0 );
channelArray->putArray(false,0,-1);
epicsThreadSleep ( 1.0 );
channelArray->setLength(false,3,4);
epicsThreadSleep ( 1.0 );
channelArray->destroy();
MonitorRequesterImpl monitorRequesterImpl;
PVStructure* pvRequest = getCreateRequest()->createRequest("field(value,timeStamp)",&monitorRequesterImpl);
pvRequest = getCreateRequest()->createRequest("field(value,timeStamp)",&monitorRequesterImpl);
Monitor* monitor = channel->createMonitor(&monitorRequesterImpl, pvRequest);
epicsThreadSleep( 1.0 );
@@ -3842,7 +4132,6 @@ int main(int argc,char *argv[])
monitor->destroy();
*/
epicsThreadSleep ( 3.0 );
printf("Destroying channel... \n");
channel->destroy();