diff --git a/testApp/remote/testRemoteClientImpl.cpp b/testApp/remote/testRemoteClientImpl.cpp index c3d419b..6b395b9 100644 --- a/testApp/remote/testRemoteClientImpl.cpp +++ b/testApp/remote/testRemoteClientImpl.cpp @@ -243,7 +243,7 @@ class ChannelProcessRequestImpl : public BaseRequestImpl, public ChannelProcess { PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelProcess); - // TODO check for nulls!!!! + // TODO check for 0s!!!! // TODO best-effort support @@ -251,9 +251,9 @@ class ChannelProcessRequestImpl : public BaseRequestImpl, public ChannelProcess // try { resubscribeSubscription(channel->checkAndGetTransport()); /* } catch (IllegalStateException ise) { - callback.channelProcessConnect(channelNotConnected, null); + callback.channelProcessConnect(channelNotConnected, 0); } catch (CAException e) { - callback.channelProcessConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", e), null); + callback.channelProcessConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", e), 0); }*/ } @@ -368,9 +368,9 @@ class ChannelGetImpl : public BaseRequestImpl, public ChannelGet // try { resubscribeSubscription(m_channel->checkAndGetTransport()); // } catch (IllegalStateException ise) { -// TODO m_channelGetRequester->channelGetConnect(channelNotConnected, null, null, null); +// TODO m_channelGetRequester->channelGetConnect(channelNotConnected, 0, 0, 0); // } catch (CAException caex) { -// TODO m_channelGetRequester->channelGetConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", caex), null, null, null); +// TODO m_channelGetRequester->channelGetConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", caex), 0, 0, 0); // } } @@ -517,9 +517,9 @@ class ChannelPutImpl : public BaseRequestImpl, public ChannelPut // try { resubscribeSubscription(m_channel->checkAndGetTransport()); // } catch (IllegalStateException ise) { -// TODO m_channelPutRequester->channelPutConnect(channelNotConnected, null, null, null); +// TODO m_channelPutRequester->channelPutConnect(channelNotConnected, 0, 0, 0); // } catch (CAException caex) { -// TODO m_channelPutRequester->channelPutConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", caex), null, null, null); +// TODO m_channelPutRequester->channelPutConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", caex), 0, 0, 0); // } } @@ -691,9 +691,9 @@ class ChannelPutGetImpl : public BaseRequestImpl, public ChannelPutGet // try { resubscribeSubscription(m_channel->checkAndGetTransport()); // } catch (IllegalStateException ise) { -// TODO m_channelPutGetRequester->channelPutGetConnect(channelNotConnected, null, null, null); +// TODO m_channelPutGetRequester->channelPutGetConnect(channelNotConnected, 0, 0, 0); // } catch (CAException caex) { -// TODO m_channelPutGetRequester->channelPutGetConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", caex), null, null, null); +// TODO m_channelPutGetRequester->channelPutGetConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", caex), 0, 0, 0); // } } @@ -909,9 +909,9 @@ class ChannelRPCImpl : public BaseRequestImpl, public ChannelRPC // try { resubscribeSubscription(m_channel->checkAndGetTransport()); // } catch (IllegalStateException ise) { -// TODO m_channelRPCRequester->channelRPCConnect(channelNotConnected, null, null, null); +// TODO m_channelRPCRequester->channelRPCConnect(channelNotConnected, 0, 0, 0); // } catch (CAException caex) { -// TODO m_channelRPCRequester->channelRPCConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", caex), null, null, null); +// TODO m_channelRPCRequester->channelRPCConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", caex), 0, 0, 0); // } } @@ -1026,6 +1026,227 @@ class ChannelRPCImpl : public BaseRequestImpl, public ChannelRPC +PVDATA_REFCOUNT_MONITOR_DEFINE(channelArray); + +class ChannelArrayImpl : public BaseRequestImpl, public ChannelArray +{ + private: + ChannelArrayRequester* m_channelArrayRequester; + + PVStructure* m_pvRequest; + + PVArray* m_data; + + int32 m_offset; + int32 m_count; + + int32 m_length; + int32 m_capacity; + + private: + ~ChannelArrayImpl() + { + PVDATA_REFCOUNT_MONITOR_DESTRUCT(channelArray); + } + + public: + ChannelArrayImpl(ChannelImpl* channel, ChannelArrayRequester* channelArrayRequester, PVStructure *pvRequest) : + BaseRequestImpl(channel, channelArrayRequester), + m_channelArrayRequester(channelArrayRequester), m_pvRequest(pvRequest), + //(dynamic_cast(getPVDataCreate()->createPVField(0, "", pvRequest))), + m_data(0), m_offset(0), m_count(0), m_length(-1), m_capacity(-1) + { + PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelArray); + + // subscribe +// try { + resubscribeSubscription(m_channel->checkAndGetTransport()); +// } catch (IllegalStateException ise) { +// TODO m_channelArrayRequester->channelArrayConnect(channelNotConnected, 0, 0); +// } catch (CAException caex) { +// TODO m_channelArrayRequester->channelArrayConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", caex), 0, 0); +// } + + } + + virtual void send(ByteBuffer* buffer, TransportSendControl* control) { + int32 pendingRequest = getPendingRequest(); + if (pendingRequest < 0) + { + BaseRequestImpl::send(buffer, control); + return; + } + + control->startMessage((int8)14, 9); + buffer->putInt(m_channel->getServerChannelID()); + buffer->putInt(m_ioid); + buffer->putByte((int8)m_pendingRequest); + + if (pendingRequest & QOS_INIT) + { + // pvRequest + m_channel->getTransport()->getIntrospectionRegistry()->serializePVRequest(buffer, control, m_pvRequest); + } + else if (pendingRequest & QOS_GET) + { + SerializeHelper::writeSize(m_offset, buffer, control); + SerializeHelper::writeSize(m_count, buffer, control); + } + else if (pendingRequest & QOS_GET_PUT) // i.e. setLength + { + SerializeHelper::writeSize(m_length, buffer, control); + SerializeHelper::writeSize(m_capacity, buffer, control); + } + // put + else + { + SerializeHelper::writeSize(m_offset, buffer, control); + m_data->serialize(buffer, control, 0, m_count); // put from 0 offset; TODO count out-of-bounds check?! + } + + stopRequest(); + } + + virtual bool destroyResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { + // data available (get with destroy) + if (qos & QOS_GET) + return normalResponse(transport, version, payloadBuffer, qos, status); + return true; + } + + virtual bool initResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { + if (!status->isSuccess()) + { + m_channelArrayRequester->channelArrayConnect(status, this, 0); + return true; + } + + // create data and its bitSet + FieldConstPtr field = transport->getIntrospectionRegistry()->deserialize(payloadBuffer, transport); + m_data = dynamic_cast(getPVDataCreate()->createPVField(0, field)); + + // notify + m_channelArrayRequester->channelArrayConnect(okStatus, this, m_data); + return true; + } + + virtual bool normalResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { + if (qos & QOS_GET) + { + if (!status->isSuccess()) + { + m_channelArrayRequester->getArrayDone(status); + return true; + } + + m_data->deserialize(payloadBuffer, transport); + m_channelArrayRequester->getArrayDone(okStatus); + return true; + } + else if (qos & QOS_GET_PUT) + { + m_channelArrayRequester->setLengthDone(status); + return true; + } + else + { + m_channelArrayRequester->putArrayDone(status); + return true; + } + } + + + virtual void getArray(bool lastRequest, int offset, int count) { + // TODO sync? + + if (m_destroyed) { + m_channelArrayRequester->getArrayDone(destroyedStatus); + return; + } + + if (!startRequest(lastRequest ? QOS_DESTROY | QOS_GET : QOS_GET)) { + m_channelArrayRequester->getArrayDone(otherRequestPendingStatus); + return; + } + + //try { + m_offset = offset; + m_count = count; + m_channel->checkAndGetTransport()->enqueueSendRequest(this); + //} catch (IllegalStateException ise) { + //TODO // m_channelArrayRequester->getArrayDone(channelNotConnected); + //} + } + + virtual void putArray(bool lastRequest, int offset, int count) { + // TODO sync? + + if (m_destroyed) { + m_channelArrayRequester->putArrayDone(destroyedStatus); + return; + } + + if (!startRequest(lastRequest ? QOS_DESTROY : QOS_DEFAULT)) { + m_channelArrayRequester->putArrayDone(otherRequestPendingStatus); + return; + } + + //try { + m_offset = offset; + m_count = count; + m_channel->checkAndGetTransport()->enqueueSendRequest(this); + //} catch (IllegalStateException ise) { + //TODO // m_channelArrayRequester->putArrayDone(channelNotConnected); + //} + } + + virtual void setLength(bool lastRequest, int length, int capacity) { + // TODO sync? + + if (m_destroyed) { + m_channelArrayRequester->setLengthDone(destroyedStatus); + return; + } + + if (!startRequest(lastRequest ? QOS_DESTROY | QOS_GET_PUT : QOS_GET_PUT)) { + m_channelArrayRequester->setLengthDone(otherRequestPendingStatus); + return; + } + + //try { + m_length = length; + m_capacity = capacity; + m_channel->checkAndGetTransport()->enqueueSendRequest(this); + //} catch (IllegalStateException ise) { + //TODO // m_channelArrayRequester->setLengthDone(channelNotConnected); + //} + } + + + virtual void resubscribeSubscription(Transport* transport) { + startRequest(QOS_INIT); + transport->enqueueSendRequest(this); + } + + + virtual void destroy() + { + BaseRequestImpl::destroy(); + // TODO sync + if (m_data) delete m_data; + if (m_pvRequest) delete m_pvRequest; + delete this; + } + +}; + + + + + + + + PVDATA_REFCOUNT_MONITOR_DEFINE(channelGetField); @@ -1065,7 +1286,7 @@ class ChannelGetFieldRequestImpl : public DataResponse, public TransportSender //try { m_channel->checkAndGetTransport()->enqueueSendRequest(this); //} catch (IllegalStateException ise) { - // callback.getDone(BaseRequestImpl.channelNotConnected, null); + // callback.getDone(BaseRequestImpl.channelNotConnected, 0); //} } @@ -1215,9 +1436,9 @@ public MonitorElement // try { resubscribeSubscription(m_channel->checkAndGetTransport()); // } catch (IllegalStateException ise) { -// TODO m_monitorRequester->monitorConnect(channelNotConnected, null, null); +// TODO m_monitorRequester->monitorConnect(channelNotConnected, 0, 0); // } catch (CAException caex) { -// TODO m_monitorRequester->monitorConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", caex), null, null); +// TODO m_monitorRequester->monitorConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", caex), 0, 0); // } } @@ -2077,7 +2298,7 @@ class TestChannelImpl : public ChannelImpl { return m_context->getProvider(); } - // NOTE: synchronization guarantees that transport is non-null and state == CONNECTED. + // NOTE: synchronization guarantees that transport is non-0 and state == CONNECTED. virtual epics::pvData::String getRemoteAddress() { Lock guard(&m_channelMutex); @@ -2599,8 +2820,7 @@ class TestChannelImpl : public ChannelImpl { ChannelArrayRequester *channelArrayRequester, epics::pvData::PVStructure *pvRequest) { - // TODO - return 0; + return new ChannelArrayImpl(this, channelArrayRequester, pvRequest); } @@ -2685,7 +2905,7 @@ class TestChannelImpl : public ChannelImpl { m_context->checkChannelName(channelName); if (!channelFindRequester) - throw std::runtime_error("null requester"); + throw std::runtime_error("0 requester"); std::auto_ptr errorStatus(getStatusCreate()->createStatus(STATUSTYPE_ERROR, "not implemented", 0)); channelFindRequester->channelFindResult(errorStatus.get(), 0, false); @@ -2740,7 +2960,7 @@ class TestChannelImpl : public ChannelImpl { TODO final ConfigurationProvider configurationProvider = ConfigurationFactory.getProvider(); Configuration config = configurationProvider.getConfiguration("pvAccess-client"); - if (config == null) + if (config == 0) config = configurationProvider.getConfiguration("system"); return config; */ @@ -2978,7 +3198,7 @@ TODO */ void checkChannelName(String& name) { if (name.empty()) - throw std::runtime_error("null or empty channel name"); + throw std::runtime_error("0 or empty channel name"); else if (name.length() > UNREASONABLE_CHANNEL_NAME_LENGTH) throw std::runtime_error("name too long"); } @@ -3080,7 +3300,7 @@ TODO /** * Unregister response request. * @param request - * @return removed object, can be null + * @return removed object, can be 0 */ ResponseRequest* unregisterResponseRequest(ResponseRequest* request) { @@ -3172,7 +3392,7 @@ TODO checkChannelName(name); if (requester == 0) - throw std::runtime_error("null requester"); + throw std::runtime_error("0 requester"); if (priority < ChannelProvider::PRIORITY_MIN || priority > ChannelProvider::PRIORITY_MAX) throw std::range_error("priority out of bounds"); @@ -3407,7 +3627,7 @@ class ChannelRequesterImpl : public ChannelRequester virtual void channelCreated(epics::pvData::Status* status, Channel *channel) { std::cout << "channelCreated(" << status->toString() << ", " - << (channel ? channel->getChannelName() : "(null)") << ")" << std::endl; + << (channel ? channel->getChannelName() : "(0)") << ")" << std::endl; } virtual void channelStateChange(Channel *c, ConnectionState connectionState) @@ -3438,7 +3658,7 @@ class GetFieldRequesterImpl : public GetFieldRequester std::cout << str; } else - std::cout << "(null)"; + std::cout << "(0)"; std::cout << ")" << std::endl; } }; @@ -3668,6 +3888,59 @@ class ChannelRPCRequesterImpl : public ChannelRPCRequester } }; +class ChannelArrayRequesterImpl : public ChannelArrayRequester +{ + ChannelArray *m_channelArray; + epics::pvData::PVArray *m_pvArray; + + virtual String getRequesterName() + { + return "ChannelArrayRequesterImpl"; + }; + + virtual void message(String message,MessageType messageType) + { + std::cout << "[" << getRequesterName() << "] message(" << message << ", " << messageTypeName[messageType] << ")" << std::endl; + } + + virtual void channelArrayConnect(epics::pvData::Status *status,ChannelArray *channelArray, + epics::pvData::PVArray *pvArray) + { + std::cout << "channelArrayConnect(" << status->toString() << ")" << std::endl; + if (pvArray) + { + String st; + pvArray->toString(&st); + std::cout << st << std::endl; + } + + // TODO sync + m_channelArray = channelArray; + m_pvArray = pvArray; + } + + virtual void getArrayDone(epics::pvData::Status *status) + { + std::cout << "getArrayDone(" << status->toString() << ")" << std::endl; + if (m_pvArray) + { + String str; + m_pvArray->toString(&str); + std::cout << str; + std::cout << std::endl; + } + } + + virtual void putArrayDone(epics::pvData::Status *status) + { + std::cout << "putArrayDone(" << status->toString() << ")" << std::endl; + } + + virtual void setLengthDone(epics::pvData::Status *status) + { + std::cout << "setLengthDone(" << status->toString() << ")" << std::endl; + } +}; class MonitorRequesterImpl : public MonitorRequester { @@ -3755,10 +4028,12 @@ int main(int argc,char *argv[]) context->printInfo(); epicsThreadSleep ( 1.0 ); - - //ChannelFindRequesterImpl findRequester; - //context->getProvider()->channelFind("something", &findRequester); - +/* + ChannelFindRequesterImpl findRequester; + ChannelFind* channelFind = context->getProvider()->channelFind("something", &findRequester); + epicsThreadSleep ( 1.0 ); + channelFind->destroy(); +*/ ChannelRequesterImpl channelRequester; Channel* channel = context->getProvider()->createChannel("structureArrayTest", &channelRequester); @@ -3767,7 +4042,7 @@ int main(int argc,char *argv[]) channel->printInfo(); PVStructure* pvRequest; -/* + GetFieldRequesterImpl getFieldRequesterImpl; channel->getField(&getFieldRequesterImpl, ""); epicsThreadSleep ( 1.0 ); @@ -3811,7 +4086,7 @@ int main(int argc,char *argv[]) channelPutGet->putGet(false); epicsThreadSleep ( 1.0 ); channelPutGet->destroy(); -*/ + ChannelRPCRequesterImpl channelRPCRequesterImpl; pvRequest = getCreateRequest()->createRequest("record[]field(arguments)",&channelRPCRequesterImpl); @@ -3821,10 +4096,25 @@ int main(int argc,char *argv[]) epicsThreadSleep ( 1.0 ); channelRPC->destroy(); -/* + ChannelArrayRequesterImpl channelArrayRequesterImpl; + //pvRequest = getCreateRequest()->createRequest("value",&channelArrayRequesterImpl); + pvRequest = getPVDataCreate()->createPVStructure(0, "", 0); + PVString* pvFieldName = (PVString*)getPVDataCreate()->createPVScalar(pvRequest, "field", pvString); + pvFieldName->put("value"); + pvRequest->appendPVField(pvFieldName); + + ChannelArray* channelArray = channel->createChannelArray(&channelArrayRequesterImpl, pvRequest); + epicsThreadSleep ( 1.0 ); + channelArray->getArray(false,0,-1); + epicsThreadSleep ( 1.0 ); + channelArray->putArray(false,0,-1); + epicsThreadSleep ( 1.0 ); + channelArray->setLength(false,3,4); + epicsThreadSleep ( 1.0 ); + channelArray->destroy(); MonitorRequesterImpl monitorRequesterImpl; - PVStructure* pvRequest = getCreateRequest()->createRequest("field(value,timeStamp)",&monitorRequesterImpl); + pvRequest = getCreateRequest()->createRequest("field(value,timeStamp)",&monitorRequesterImpl); Monitor* monitor = channel->createMonitor(&monitorRequesterImpl, pvRequest); epicsThreadSleep( 1.0 ); @@ -3842,7 +4132,6 @@ int main(int argc,char *argv[]) monitor->destroy(); -*/ epicsThreadSleep ( 3.0 ); printf("Destroying channel... \n"); channel->destroy();