diff --git a/pvAccessApp/remote/blockingClientTCPTransport.cpp b/pvAccessApp/remote/blockingClientTCPTransport.cpp index 1f9fe55..42adb22 100644 --- a/pvAccessApp/remote/blockingClientTCPTransport.cpp +++ b/pvAccessApp/remote/blockingClientTCPTransport.cpp @@ -56,7 +56,6 @@ namespace epics { } BlockingClientTCPTransport::~BlockingClientTCPTransport() { - printf("========== ~BlockingClientTCPTransport\n"); delete _introspectionRegistry; delete _timerNode; } diff --git a/pvAccessApp/remote/blockingTCPTransport.cpp b/pvAccessApp/remote/blockingTCPTransport.cpp index 2a88afb..bd190b0 100644 --- a/pvAccessApp/remote/blockingTCPTransport.cpp +++ b/pvAccessApp/remote/blockingTCPTransport.cpp @@ -761,21 +761,9 @@ namespace epics { while(!_closed) { TransportSender* sender; -// TODO race! sender = extractFromSendQueue(); - printf("extraced %d\n", sender); // wait for new message - while(sender==NULL&&!_flushRequested/*&&!_closed*/) { - - - bool c; - _mutex.lock(); - c = _closed; - printf("closed %d\n", c); - _mutex.unlock(); - if (c) - break; - + while(sender==NULL&&!_flushRequested&&!_closed) { if(_flushStrategy==DELAYED) { if(_delay>0) epicsThreadSleep(_delay); if(_sendQueue->size()==0) { @@ -789,7 +777,6 @@ namespace epics { else _sendQueueEvent.wait(); sender = extractFromSendQueue(); - printf("extraced2 %d\n", sender); } // always do flush from this thread @@ -852,11 +839,10 @@ try{ } catch (...) { printf("rcvThreadRunnner exception\n"); } - printf("rcvThreadRunner done, autodelete %d-\n", obj->_autoDelete); + if(obj->_autoDelete) { while(true) { - printf("waiting send thread to exit.\n"); bool exited; obj->_mutex.lock(); exited = obj->_sendThreadExited; @@ -865,7 +851,6 @@ printf("rcvThreadRunnner exception\n"); break; epicsThreadSleep(0.1); } - printf("deleting.\n"); delete obj; } } @@ -879,7 +864,6 @@ printf("sendThreadRunnner exception\n"); } obj->freeConnectionResorces(); - printf("exited.\n"); // TODO possible crash on unlock obj->_mutex.lock(); diff --git a/testApp/remote/testRemoteClientImpl.cpp b/testApp/remote/testRemoteClientImpl.cpp index 4e1a6fb..a0297f2 100644 --- a/testApp/remote/testRemoteClientImpl.cpp +++ b/testApp/remote/testRemoteClientImpl.cpp @@ -239,6 +239,7 @@ class ChannelProcessRequestImpl : public BaseRequestImpl, public ChannelProcess ChannelProcessRequestImpl(ChannelImpl* channel, ChannelProcessRequester* callback, PVStructure *pvRequest) : BaseRequestImpl(channel, callback), m_callback(callback), m_pvRequest(pvRequest) + //(dynamic_cast(getPVDataCreate()->createPVField(0, "", pvRequest))) { PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelProcess); @@ -321,6 +322,7 @@ class ChannelProcessRequestImpl : public BaseRequestImpl, public ChannelProcess virtual void destroy() { BaseRequestImpl::destroy(); + if (m_pvRequest) delete m_pvRequest; delete this; } @@ -353,7 +355,8 @@ class ChannelGetImpl : public BaseRequestImpl, public ChannelGet public: ChannelGetImpl(ChannelImpl* channel, ChannelGetRequester* channelGetRequester, PVStructure *pvRequest) : BaseRequestImpl(channel, channelGetRequester), - m_channelGetRequester(channelGetRequester), m_pvRequest(pvRequest), // TODO pvRequest + m_channelGetRequester(channelGetRequester), m_pvRequest(pvRequest), + //(dynamic_cast(getPVDataCreate()->createPVField(0, "", pvRequest))), m_data(0), m_bitSet(0) { PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelGet); @@ -464,6 +467,7 @@ class ChannelGetImpl : public BaseRequestImpl, public ChannelGet // TODO sync if (m_data) delete m_data; if (m_bitSet) delete m_bitSet; + if (m_pvRequest) delete m_pvRequest; delete this; } @@ -500,7 +504,8 @@ class ChannelPutImpl : public BaseRequestImpl, public ChannelPut public: ChannelPutImpl(ChannelImpl* channel, ChannelPutRequester* channelPutRequester, PVStructure *pvRequest) : BaseRequestImpl(channel, channelPutRequester), - m_channelPutRequester(channelPutRequester), m_pvRequest(pvRequest), // TODO pvRequest + m_channelPutRequester(channelPutRequester), m_pvRequest(pvRequest), + //(dynamic_cast(getPVDataCreate()->createPVField(0, "", pvRequest))), m_data(0), m_bitSet(0) { PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelPut); @@ -642,6 +647,7 @@ class ChannelPutImpl : public BaseRequestImpl, public ChannelPut // TODO sync if (m_data) delete m_data; if (m_bitSet) delete m_bitSet; + if (m_pvRequest) delete m_pvRequest; delete this; } @@ -800,60 +806,246 @@ class ChannelGetFieldRequestImpl : public DataResponse, public TransportSender -PVDATA_REFCOUNT_MONITOR_DEFINE(mockMonitor); -class MockMonitor : public Monitor, public MonitorElement +PVDATA_REFCOUNT_MONITOR_DEFINE(channelMonitor); + +class ChannelMonitorImpl : public BaseRequestImpl, public Monitor, +public MonitorElement { private: MonitorRequester* m_monitorRequester; + Structure* m_structure; + bool m_started; + + PVStructure* m_pvRequest; + + // TODO temp PVStructure* m_pvStructure; BitSet* m_changedBitSet; BitSet* m_overrunBitSet; - volatile bool m_first; - Mutex* m_lock; - volatile int m_count; - + + + int m_count; + Mutex m_lock; + private: - ~MockMonitor() + ~ChannelMonitorImpl() { - PVDATA_REFCOUNT_MONITOR_DESTRUCT(mockMonitor); + PVDATA_REFCOUNT_MONITOR_DESTRUCT(channelMonitor); } public: - MockMonitor(MonitorRequester* monitorRequester, PVStructure *pvStructure, PVStructure *pvRequest) : - m_monitorRequester(monitorRequester), m_pvStructure(pvStructure), - m_changedBitSet(new BitSet(pvStructure->getNumberFields())), - m_overrunBitSet(new BitSet(pvStructure->getNumberFields())), - m_first(true), - m_lock(new Mutex()), - m_count(0) + ChannelMonitorImpl(ChannelImpl* channel, MonitorRequester* monitorRequester, PVStructure *pvRequest) : + BaseRequestImpl(channel, monitorRequester), + m_monitorRequester(monitorRequester), m_structure(0), + m_started(false), m_pvRequest(pvRequest), + //(dynamic_cast(getPVDataCreate()->createPVField(0, "", pvRequest))), + m_pvStructure(0), m_count(0) { - PVDATA_REFCOUNT_MONITOR_CONSTRUCT(mockMonitor); + PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelMonitor); - m_changedBitSet->set(0); + // TODO quques + + // subscribe +// try { + resubscribeSubscription(m_channel->checkAndGetTransport()); +// } catch (IllegalStateException ise) { +// TODO m_monitorRequester->monitorConnect(channelNotConnected, null, null); +// } catch (CAException caex) { +// TODO m_monitorRequester->monitorConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", caex), null, null); +// } - // TODO pvRequest - m_monitorRequester->monitorConnect(g_statusOK, this, const_cast(m_pvStructure->getStructure())); } + virtual void send(ByteBuffer* buffer, TransportSendControl* control) { + int32 pendingRequest = getPendingRequest(); + if (pendingRequest < 0) + { + BaseRequestImpl::send(buffer, control); + return; + } + + control->startMessage((int8)13, 9); + buffer->putInt(m_channel->getServerChannelID()); + buffer->putInt(m_ioid); + buffer->putByte((int8)m_pendingRequest); + + if (pendingRequest & QOS_INIT) + { + // pvRequest + m_channel->getTransport()->getIntrospectionRegistry()->serializePVRequest(buffer, control, m_pvRequest); + } + + stopRequest(); + } + + virtual bool destroyResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { + // data available + // TODO if (qos & QOS_GET) + normalResponse(transport, version, payloadBuffer, qos, status); + return true; + } + + virtual bool initResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { + if (!status->isSuccess()) + { + m_monitorRequester->monitorConnect(status, this, 0); + return true; + } + + // create data and its bitSet + m_structure = const_cast(dynamic_cast(transport->getIntrospectionRegistry()->deserialize(payloadBuffer, transport))); + //monitorStrategy->init(structure); + + + // TODO temp + m_pvStructure = dynamic_cast(getPVDataCreate()->createPVField(0, m_structure)); + m_changedBitSet = new BitSet(m_pvStructure->getNumberFields()); + m_overrunBitSet = new BitSet(m_pvStructure->getNumberFields()); + + + // notify + m_monitorRequester->monitorConnect(okStatus, this, m_structure); + return true; + } + + virtual bool normalResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { + if (qos & QOS_GET) + { + // TODO not supported by IF yet... + } + else + { + // TODO + m_changedBitSet->deserialize(payloadBuffer, transport); + m_pvStructure->deserialize(payloadBuffer, transport, m_changedBitSet); + m_overrunBitSet->deserialize(payloadBuffer, transport); + m_monitorRequester->monitorEvent(this); + } + return true; + } + + virtual void resubscribeSubscription(Transport* transport) { + startRequest(QOS_INIT); + transport->enqueueSendRequest(this); + if (m_started) + start(); + } + + + virtual void destroy() + { + BaseRequestImpl::destroy(); + // TODO sync + if (m_pvRequest) delete m_pvRequest; + // uncomment when m_pvStructure not destroyed if (m_structure) m_structure->decReferenceCount(); + + // TODO temp + if (m_pvStructure) + { + delete m_pvStructure; + delete m_overrunBitSet; + delete m_changedBitSet; + } + + delete this; + } + + + + // override, since we optimize status + virtual void response(Transport* transport, int8 version, ByteBuffer* payloadBuffer) { +// TODO? +// try +// { + transport->ensureData(1); + int8 qos = payloadBuffer->getByte(); + if (qos & QOS_INIT) + { + Status* status = statusCreate->deserializeStatus(payloadBuffer, transport); + initResponse(transport, version, payloadBuffer, qos, status); + // TODO + if (status != okStatus) + delete status; + } + else if (qos & QOS_DESTROY) + { + Status* status = statusCreate->deserializeStatus(payloadBuffer, transport); + m_remotelyDestroyed = true; + + if (!destroyResponse(transport, version, payloadBuffer, qos, status)) + cancel(); + // TODO + if (status != okStatus) + delete status; + } + else + { + normalResponse(transport, version, payloadBuffer, qos, okStatus); + } + + } + virtual Status* start() { - // fist monitor - m_monitorRequester->monitorEvent(this); + Lock guard(&m_lock); + + // TODO sync + if (m_destroyed) + return getStatusCreate()->createStatus(STATUSTYPE_ERROR, "Monitor destroyed.");; + + // TODO monitorStrategy.start(); + + //try { + // start == process + get + if (!startRequest(QOS_PROCESS | QOS_GET)) + { + return getStatusCreate()->createStatus(STATUSTYPE_ERROR, "Other request pending."); + } + m_channel->checkAndGetTransport()->enqueueSendRequest(this); + m_started = true; + // client needs to delete status, so passing shared OK instance is not right thing to do + return getStatusCreate()->createStatus(STATUSTYPE_OK, "Monitor started."); + //} catch (IllegalStateException ise) { + // return channelNotConnected; + //} + + - // client needs to delete status, so passing shared OK instance is not right thing to do - return getStatusCreate()->createStatus(STATUSTYPE_OK, "Monitor started."); } virtual Status* stop() { - // client needs to delete status, so passing shared OK instance is not right thing to do - return getStatusCreate()->createStatus(STATUSTYPE_OK, "Monitor stopped."); + Lock guard(&m_lock); + + // TODO sync + if (m_destroyed) + return getStatusCreate()->createStatus(STATUSTYPE_ERROR, "Monitor destroyed.");; + + //monitorStrategy.stop(); + + //try { + // stop == process + no get + if (!startRequest(QOS_PROCESS)) + { + return getStatusCreate()->createStatus(STATUSTYPE_ERROR, "Other request pending."); + } + m_channel->checkAndGetTransport()->enqueueSendRequest(this); + m_started = false; + // client needs to delete status, so passing shared OK instance is not right thing to do + return getStatusCreate()->createStatus(STATUSTYPE_OK, "Monitor stopped."); + //} catch (IllegalStateException ise) { + // return channelNotConnected; + //} + } + + // ============ temp ============ + virtual MonitorElement* poll() { - Lock xx(m_lock); + Lock xx(&m_lock); if (m_count) { return 0; @@ -867,21 +1059,11 @@ class MockMonitor : public Monitor, public MonitorElement virtual void release(MonitorElement* monitorElement) { - Lock xx(m_lock); + Lock xx(&m_lock); if (m_count) m_count--; } - virtual void destroy() - { - delete stop(); - - delete m_lock; - delete m_overrunBitSet; - delete m_changedBitSet; - delete this; - } - // ============ MonitorElement ============ virtual PVStructure* getPVStructure() @@ -898,12 +1080,16 @@ class MockMonitor : public Monitor, public MonitorElement { return m_overrunBitSet; } - + }; + + + + // TODO consider std::unordered_map typedef std::map IOIDResponseRequestMap; @@ -2029,8 +2215,8 @@ class TestChannelImpl : public ChannelImpl { ChannelPutGetRequester *channelPutGetRequester, epics::pvData::PVStructure *pvRequest) { - // TODO - return 0; + // TODO return new ChannelPutGetImpl(this, channelPutGetRequester, pvRequest); + return 0; } virtual ChannelRPC* createChannelRPC(ChannelRPCRequester *channelRPCRequester, @@ -2044,7 +2230,7 @@ class TestChannelImpl : public ChannelImpl { epics::pvData::MonitorRequester *monitorRequester, epics::pvData::PVStructure *pvRequest) { - return new MockMonitor(monitorRequester, 0, pvRequest); + return new ChannelMonitorImpl(this, monitorRequester, pvRequest); } virtual ChannelArray* createChannelArray( @@ -2513,7 +2699,6 @@ TODO { Lock guard(&m_ioidMapMutex); IOIDResponseRequestMap::iterator it = m_pendingResponseRequests.find(ioid); - printf("getResponseRequest %d = %d\n", ioid, (it == m_pendingResponseRequests.end() ? 0 : it->second)); return (it == m_pendingResponseRequests.end() ? 0 : it->second); } @@ -2526,7 +2711,6 @@ TODO { Lock guard(&m_ioidMapMutex); pvAccessID ioid = generateIOID(); - printf("registerResponseRequest %d = %d\n", ioid, request); m_pendingResponseRequests[ioid] = request; return ioid; } @@ -2540,12 +2724,10 @@ TODO { Lock guard(&m_ioidMapMutex); IOIDResponseRequestMap::iterator it = m_pendingResponseRequests.find(request->getIOID()); - printf("unregisterResponseRequest %d = %d\n", request->getIOID(), request); if (it == m_pendingResponseRequests.end()) return 0; ResponseRequest* retVal = it->second; - printf("unregisterResponseRequest %d = %d==%d\n", request->getIOID(), request, retVal); m_pendingResponseRequests.erase(it); return retVal; } @@ -3095,7 +3277,7 @@ int main(int argc,char *argv[]) epicsThreadSleep ( 1.0 ); channel->printInfo(); - +/* GetFieldRequesterImpl getFieldRequesterImpl; channel->getField(&getFieldRequesterImpl, ""); epicsThreadSleep ( 1.0 ); @@ -3119,6 +3301,7 @@ int main(int argc,char *argv[]) ChannelPutRequesterImpl channelPutRequesterImpl; + pvRequest = getCreateRequest()->createRequest("field(value, timeStamp)",&channelPutRequesterImpl); ChannelPut* channelPut = channel->createChannelPut(&channelPutRequesterImpl, pvRequest); epicsThreadSleep ( 1.0 ); channelPut->get(); @@ -3126,17 +3309,20 @@ int main(int argc,char *argv[]) channelPut->put(false); epicsThreadSleep ( 1.0 ); channelPut->destroy(); +*/ -/* MonitorRequesterImpl monitorRequesterImpl; - Monitor* monitor = channel->createMonitor(&monitorRequesterImpl, 0); + PVStructure* pvRequest = getCreateRequest()->createRequest("field(value, timeStamp)",&monitorRequesterImpl); + Monitor* monitor = channel->createMonitor(&monitorRequesterImpl, pvRequest); + + epicsThreadSleep( 1.0 ); Status* status = monitor->start(); std::cout << "monitor->start() = " << status->toString() << std::endl; delete status; - + epicsThreadSleep( 3.0 ); status = monitor->stop(); std::cout << "monitor->stop() = " << status->toString() << std::endl; @@ -3144,10 +3330,7 @@ int main(int argc,char *argv[]) monitor->destroy(); - */ - // TODO share it? - delete pvRequest; epicsThreadSleep ( 3.0 ); printf("Destroying channel... \n");