diff --git a/pvAccessApp/remote/blockingClientTCPTransport.cpp b/pvAccessApp/remote/blockingClientTCPTransport.cpp index 5f4a30f..1f9fe55 100644 --- a/pvAccessApp/remote/blockingClientTCPTransport.cpp +++ b/pvAccessApp/remote/blockingClientTCPTransport.cpp @@ -37,7 +37,7 @@ namespace epics { new IntrospectionRegistry(false)), _connectionTimeout(beaconInterval *1000), _unresponsiveTransport(false), _timerNode( new TimerNode(this)), _verifyOrEcho(true) { - _autoDelete = false; +// _autoDelete = false; // initialize owners list, send queue acquire(client); @@ -93,7 +93,7 @@ namespace epics { if(_closed) return false; char ipAddrStr[48]; - ipAddrToDottedIP(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr)); + ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr)); errlogSevPrintf(errlogInfo, "Acquiring transport to %s.", ipAddrStr); Lock lock2(&_ownersMutex); @@ -121,7 +121,7 @@ namespace epics { int refs = _owners.size(); if(refs>0) { char ipAddrStr[48]; - ipAddrToDottedIP(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr)); + ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr)); errlogSevPrintf( errlogInfo, "Transport to %s still has %d client(s) active and closing...", @@ -139,7 +139,7 @@ namespace epics { if(_closed) return; char ipAddrStr[48]; - ipAddrToDottedIP(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr)); + ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr)); errlogSevPrintf(errlogInfo, "Releasing transport to %s.", ipAddrStr); diff --git a/pvAccessApp/remote/blockingServerTCPTransport.cpp b/pvAccessApp/remote/blockingServerTCPTransport.cpp index 86ab989..1deadb4 100644 --- a/pvAccessApp/remote/blockingServerTCPTransport.cpp +++ b/pvAccessApp/remote/blockingServerTCPTransport.cpp @@ -48,7 +48,7 @@ namespace epics { if(_channels.size()==0) return; char ipAddrStr[64]; - ipAddrToDottedIP(&_socketAddress->ia, ipAddrStr, sizeof(ipAddrStr)); + ipAddrToDottedIP(&_socketAddress.ia, ipAddrStr, sizeof(ipAddrStr)); errlogSevPrintf( errlogInfo, diff --git a/pvAccessApp/remote/blockingTCP.h b/pvAccessApp/remote/blockingTCP.h index 19fc260..6061b17 100644 --- a/pvAccessApp/remote/blockingTCP.h +++ b/pvAccessApp/remote/blockingTCP.h @@ -22,6 +22,7 @@ #include #include #include +#include /* EPICSv3 */ #include @@ -86,7 +87,7 @@ namespace epics { } virtual const osiSockAddr* getRemoteAddress() const { - return _socketAddress; + return &_socketAddress; } virtual int16 getPriority() const { @@ -108,12 +109,12 @@ namespace epics { virtual int getSocketReceiveBufferSize() const; virtual bool isVerified() const { - Lock lock(_verifiedMutex); + Lock lock(const_cast(&_verifiedMutex)); return _verified; } virtual void verified() { - Lock lock(_verifiedMutex); + Lock lock(&_verifiedMutex); _verified = true; } @@ -148,7 +149,7 @@ namespace epics { _flushStrategy = flushStrategy; } - void requestFlush(); + //void requestFlush(); /** * Close and free connection resources. @@ -165,80 +166,7 @@ namespace epics { void enqueueMonitorSendRequest(TransportSender* sender); protected: - /** - * Connection status - */ - bool volatile _closed; - - /** - * Corresponding channel. - */ - SOCKET _channel; - - /** - * Cached socket address. - */ - osiSockAddr* _socketAddress; - - /** - * Send buffer. - */ - epics::pvData::ByteBuffer* _sendBuffer; - - /** - * Remote side transport revision (minor). - */ - int8 _remoteTransportRevision; - - /** - * Remote side transport receive buffer size. - */ - int _remoteTransportReceiveBufferSize; - - /** - * Remote side transport socket receive buffer size. - */ - int _remoteTransportSocketReceiveBufferSize; - - /** - * Priority. - * NOTE: Priority cannot just be changed, since it is registered - * in transport registry with given priority. - */ - int16 _priority; - // TODO to be implemeneted - - /** - * CAS response handler. - */ - ResponseHandler* _responseHandler; - - /** - * Read sync. object monitor. - */ - //Object _readMonitor = new Object(); - - /** - * Total bytes received. - */ - int64 volatile _totalBytesReceived; - - /** - * Total bytes sent. - */ - int64 volatile _totalBytesSent; - - /** - * Marker to send. - */ - volatile int _markerToSend; - - volatile bool _verified; - - volatile int64 _remoteBufferFreeSpace; - - volatile bool _autoDelete; - + virtual void processReadCached(bool nestedCall, ReceiveStage inStage, int requiredBytes, bool addToBuffer); @@ -259,7 +187,8 @@ namespace epics { virtual ~BlockingTCPTransport(); - private: + + /** * Default marker period. */ @@ -267,7 +196,32 @@ namespace epics { static const int MAX_ENSURE_DATA_BUFFER_SIZE = 1024; - static const double delay = 0.01; + static const double _delay = 0.01; + + /****** finally initialized at construction time and after start (called by the same thread) ********/ + + /** + * Corresponding channel. + */ + SOCKET _channel; + + /** + * Cached socket address. + */ + osiSockAddr _socketAddress; + + /** + * Priority. + * NOTE: Priority cannot just be changed, since it is registered + * in transport registry with given priority. + */ + int16 _priority; + // TODO to be implemeneted + + /** + * CAS response handler. + */ + ResponseHandler* _responseHandler; /** * Send buffer size. @@ -284,6 +238,58 @@ namespace epics { */ int64 _markerPeriodBytes; + + SendQueueFlushStrategy _flushStrategy; + + + epicsThreadId _rcvThreadId; + + epicsThreadId _sendThreadId; + + MonitorSender* _monitorSender; + + Context* _context; + + bool _autoDelete; + + + + /**** after verification ****/ + + /** + * Remote side transport revision (minor). + */ + int8 _remoteTransportRevision; + + /** + * Remote side transport receive buffer size. + */ + int _remoteTransportReceiveBufferSize; + + /** + * Remote side transport socket receive buffer size. + */ + int _remoteTransportSocketReceiveBufferSize; + + + + /*** send thread only - no need to sync ***/ + // NOTE: now all send-related external calls are TransportSender IF + // and its reference is only valid when called from send thread + + // initialized at construction time + GrowingCircularBuffer* _sendQueue; + epics::pvData::Mutex _sendQueueMutex; + + // initialized at construction time + GrowingCircularBuffer* _monitorSendQueue; + epics::pvData::Mutex _monitorMutex; + + /** + * Send buffer. + */ + epics::pvData::ByteBuffer* _sendBuffer; + /** * Next planned marker position. */ @@ -299,20 +305,28 @@ namespace epics { */ int _lastMessageStartPosition; + int8 _lastSegmentedMessageType; + int8 _lastSegmentedMessageCommand; + + bool _flushRequested; + + int _sendBufferSentPosition; + + + + + + + + + + /*** receive thread only - no need to sync ***/ + + // initialized at construction time epics::pvData::ByteBuffer* _socketBuffer; int _startPosition; - epics::pvData::Mutex* _mutex; - epics::pvData::Mutex* _sendQueueMutex; - epics::pvData::Mutex* _verifiedMutex; - epics::pvData::Mutex* _monitorMutex; - - ReceiveStage _stage; - - int8 _lastSegmentedMessageType; - int8 _lastSegmentedMessageCommand; - int _storedPayloadSize; int _storedPosition; int _storedLimit; @@ -322,26 +336,68 @@ namespace epics { int8 _command; int _payloadSize; - volatile bool _flushRequested; + ReceiveStage _stage; - int _sendBufferSentPosition; + /** + * Total bytes received. + */ + int64 _totalBytesReceived; - SendQueueFlushStrategy _flushStrategy; - GrowingCircularBuffer* _sendQueue; - epicsThreadId _rcvThreadId; - epicsThreadId _sendThreadId; - GrowingCircularBuffer* _monitorSendQueue; - MonitorSender* _monitorSender; + /*** send/receive thread shared ***/ - Context* _context; + /** + * Connection status + * NOTE: synced by _mutex + */ + bool volatile _closed; - volatile bool _sendThreadRunning; + // NOTE: synced by _mutex + bool _sendThreadExited; + epics::pvData::Mutex _mutex; + + + bool _verified; + epics::pvData::Mutex _verifiedMutex; + + + + + Event _sendQueueEvent; + + + + + + + /** + * Marker to send. + * NOTE: synced by _flowControlMutex + */ + int _markerToSend; + + /** + * Total bytes sent. + * NOTE: synced by _flowControlMutex + */ + int64 _totalBytesSent; + + /** + * Calculated remote free buffer size. + * NOTE: synced by _flowControlMutex + */ + int64 _remoteBufferFreeSpace; + + epics::pvData::Mutex _flowControlMutex; + + + private: + /** * Internal method that clears and releases buffer. * sendLock and sendBufferLock must be hold while calling this method. diff --git a/pvAccessApp/remote/blockingTCPTransport.cpp b/pvAccessApp/remote/blockingTCPTransport.cpp index 930edf5..2a88afb 100644 --- a/pvAccessApp/remote/blockingTCPTransport.cpp +++ b/pvAccessApp/remote/blockingTCPTransport.cpp @@ -65,7 +65,7 @@ namespace epics { BlockingTCPTransport::BlockingTCPTransport(Context* context, SOCKET channel, ResponseHandler* responseHandler, int receiveBufferSize, int16 priority) : - _closed(false), _channel(channel), _socketAddress(new osiSockAddr), + _closed(false), _channel(channel), _remoteTransportRevision(0), _remoteTransportReceiveBufferSize(MAX_TCP_RECV), _remoteTransportSocketReceiveBufferSize(MAX_TCP_RECV), @@ -75,9 +75,7 @@ namespace epics { LONG_LONG_MAX), _autoDelete(true), _markerPeriodBytes(MARKER_PERIOD), _nextMarkerPosition( _markerPeriodBytes), _sendPending(false), - _lastMessageStartPosition(0), _mutex(new Mutex()), - _sendQueueMutex(new Mutex()), _verifiedMutex(new Mutex()), - _monitorMutex(new Mutex()), _stage(READ_FROM_SOCKET), + _lastMessageStartPosition(0), _stage(READ_FROM_SOCKET), _lastSegmentedMessageType(0), _lastSegmentedMessageCommand( 0), _storedPayloadSize(0), _storedPosition(0), _storedLimit(0), _magicAndVersion(0), _packetType(0), @@ -87,9 +85,9 @@ namespace epics { new GrowingCircularBuffer (100)), _rcvThreadId(NULL), _sendThreadId(NULL), _monitorSendQueue( new GrowingCircularBuffer (100)), - _monitorSender(new MonitorSender(_monitorMutex, + _monitorSender(new MonitorSender(&_monitorMutex, _monitorSendQueue)), _context(context), - _sendThreadRunning(false) { + _sendThreadExited(false) { _socketBuffer = new ByteBuffer(max(MAX_TCP_RECV +MAX_ENSURE_DATA_BUFFER_SIZE, receiveBufferSize), EPICS_ENDIAN_BIG); @@ -114,7 +112,7 @@ namespace epics { } socklen_t saSize = sizeof(sockaddr); - retval = getpeername(_channel, &(_socketAddress->sa), &saSize); + retval = getpeername(_channel, &(_socketAddress.sa), &saSize); if(retval<0) { errlogSevPrintf(errlogMajor, "Error fetching socket remote address: %s", strerror( @@ -130,26 +128,20 @@ namespace epics { BlockingTCPTransport::~BlockingTCPTransport() { close(true); - // TODO remove - epicsThreadSleep(3.0); - delete _socketAddress; delete _sendQueue; delete _socketBuffer; delete _sendBuffer; - delete _mutex; - delete _sendQueueMutex; - delete _verifiedMutex; - delete _monitorMutex; - delete _responseHandler; } void BlockingTCPTransport::start() { - _sendThreadRunning = true; + + // TODO consuder epics::pvData::Thread + String threadName = "TCP-receive "+inetAddressToString( - * _socketAddress); + _socketAddress); errlogSevPrintf(errlogInfo, "Starting thread: %s", threadName.c_str()); @@ -159,7 +151,7 @@ namespace epics { epicsThreadStackMedium), BlockingTCPTransport::rcvThreadRunner, this); - threadName = "TCP-send "+inetAddressToString(*_socketAddress); + threadName = "TCP-send "+inetAddressToString(_socketAddress); errlogSevPrintf(errlogInfo, "Starting thread: %s", threadName.c_str()); @@ -178,9 +170,9 @@ namespace epics { _nextMarkerPosition -= _sendBuffer->getPosition() -CA_MESSAGE_HEADER_SIZE; - _sendQueueMutex->lock(); + _sendQueueMutex.lock(); _flushRequested = false; - _sendQueueMutex->unlock(); + _sendQueueMutex.unlock(); _sendBuffer->clear(); @@ -194,12 +186,13 @@ namespace epics { } void BlockingTCPTransport::close(bool force) { - Lock lock(_mutex); + Lock lock(&_mutex); // already closed check if(_closed) return; _closed = true; + printf("closing.\n"); // remove from registry _context->getTransportRegistry()->remove(this); @@ -207,12 +200,8 @@ namespace epics { // clean resources internalClose(force); - // threads cannot "wait" Epics, no need to notify - // TODO check alternatives to "wait" // notify send queue - //synchronized (sendQueue) { - // sendQueue.notifyAll(); - //} + _sendQueueEvent.signal(); } void BlockingTCPTransport::internalClose(bool force) { @@ -239,21 +228,22 @@ namespace epics { return sockBufSize; } + // TODO reimplement using Event bool BlockingTCPTransport::waitUntilVerified(double timeout) { double internalTimeout = timeout; bool internalVerified = false; - _verifiedMutex->lock(); + _verifiedMutex.lock(); internalVerified = _verified; - _verifiedMutex->unlock(); + _verifiedMutex.unlock(); while(!internalVerified&&internalTimeout>0) { epicsThreadSleep(min(0.1, internalTimeout)); internalTimeout -= 0.1; - _verifiedMutex->lock(); + _verifiedMutex.lock(); internalVerified = _verified; - _verifiedMutex->unlock(); + _verifiedMutex.unlock(); } return internalVerified; } @@ -510,7 +500,7 @@ namespace epics { errlogSevPrintf( errlogMinor, "Invalid header received from client %s, disconnecting...", - inetAddressToString(*_socketAddress).c_str()); + inetAddressToString(_socketAddress).c_str()); close(true); return; } @@ -531,11 +521,14 @@ namespace epics { } else if(type==1) { if(_command==0) { - if(_markerToSend==0) _markerToSend - = _payloadSize; // TODO send back response + _flowControlMutex.lock(); + if(_markerToSend==0) + _markerToSend = _payloadSize; // TODO send back response + _flowControlMutex.unlock(); } else //if (command == 1) { + _flowControlMutex.lock(); int difference = (int)_totalBytesSent -_payloadSize+CA_MESSAGE_HEADER_SIZE; // overrun check @@ -545,6 +538,7 @@ namespace epics { +_remoteTransportSocketReceiveBufferSize -difference; // TODO if this is calculated wrong, this can be critical !!! + _flowControlMutex.unlock(); } // no payload @@ -556,7 +550,7 @@ namespace epics { errlogMajor, "Unknown packet type %d, received from client %s, disconnecting...", type, - inetAddressToString(*_socketAddress).c_str()); + inetAddressToString(_socketAddress).c_str()); close(true); return; } @@ -580,7 +574,7 @@ namespace epics { +_storedPayloadSize, _storedLimit)); try { // handle response - _responseHandler->handleResponse(_socketAddress, + _responseHandler->handleResponse(&_socketAddress, this, version, _command, _payloadSize, _socketBuffer); } catch(...) { @@ -628,8 +622,10 @@ namespace epics { _sendBufferSentPosition = 0; // if not set skip marker otherwise set it + _flowControlMutex.lock(); int markerValue = _markerToSend; _markerToSend = 0; + _flowControlMutex.unlock(); if(markerValue==0) _sendBufferSentPosition = CA_MESSAGE_HEADER_SIZE; else @@ -694,7 +690,7 @@ namespace epics { //errlogSevPrintf(errlogInfo, // "Sending %d of total %d bytes in the packet to %s.", // bytesToSend, limit, - // inetAddressToString(*_socketAddress).c_str()); + // inetAddressToString(_socketAddress).c_str()); while(buffer->getRemaining()>0) { ssize_t bytesSent = ::send(_channel, @@ -720,13 +716,15 @@ namespace epics { //errlogSevPrintf(errlogInfo, // "Send buffer full for %s, waiting...", - // inetAddressToString(*_socketAddress)); + // inetAddressToString(_socketAddress)); return false; } buffer->setPosition(buffer->getPosition()+bytesSent); + _flowControlMutex.lock(); _totalBytesSent += bytesSent; + _flowControlMutex.unlock(); // readjust limit if(bytesToSend==maxBytesToSend) { @@ -752,18 +750,9 @@ namespace epics { TransportSender* BlockingTCPTransport::extractFromSendQueue() { TransportSender* retval; - _sendQueueMutex->lock(); - try { - if(_sendQueue->size()>0) - retval = _sendQueue->extract(); - else - retval = NULL; - } catch(...) { - // not expecting the exception here, but just to be safe - retval = NULL; - } - - _sendQueueMutex->unlock(); + _sendQueueMutex.lock(); + retval = _sendQueue->extract(); + _sendQueueMutex.unlock(); return retval; } @@ -772,23 +761,35 @@ namespace epics { while(!_closed) { TransportSender* sender; +// TODO race! sender = extractFromSendQueue(); + printf("extraced %d\n", sender); // wait for new message - while(sender==NULL&&!_flushRequested&&!_closed) { + while(sender==NULL&&!_flushRequested/*&&!_closed*/) { + + + bool c; + _mutex.lock(); + c = _closed; + printf("closed %d\n", c); + _mutex.unlock(); + if (c) + break; + if(_flushStrategy==DELAYED) { - if(delay>0) epicsThreadSleep(delay); + if(_delay>0) epicsThreadSleep(_delay); if(_sendQueue->size()==0) { // if (hasMonitors || sendBuffer.position() > CAConstants.CA_MESSAGE_HEADER_SIZE) - if(_sendBuffer->getPosition() - >CA_MESSAGE_HEADER_SIZE) + if(_sendBuffer->getPosition()>CA_MESSAGE_HEADER_SIZE) _flushRequested = true; else - epicsThreadSleep(0); + _sendQueueEvent.wait(); } } else - epicsThreadSleep(0); + _sendQueueEvent.wait(); sender = extractFromSendQueue(); + printf("extraced2 %d\n", sender); } // always do flush from this thread @@ -827,13 +828,6 @@ namespace epics { } // while(!_closed) } - void BlockingTCPTransport::requestFlush() { - // needless lock, manipulating a single byte - //Lock lock(_sendQueueMutex); - if(_flushRequested) return; - _flushRequested = true; - } - void BlockingTCPTransport::freeSendBuffers() { // TODO ? } @@ -842,7 +836,7 @@ namespace epics { freeSendBuffers(); errlogSevPrintf(errlogInfo, "Connection to %s closed.", - inetAddressToString(*_socketAddress).c_str()); + inetAddressToString(_socketAddress).c_str()); if(_channel!=INVALID_SOCKET) { epicsSocketDestroy(_channel); @@ -853,53 +847,68 @@ namespace epics { void BlockingTCPTransport::rcvThreadRunner(void* param) { BlockingTCPTransport* obj = (BlockingTCPTransport*)param; +try{ obj->processReadCached(false, NONE, CA_MESSAGE_HEADER_SIZE, false); - +} catch (...) { +printf("rcvThreadRunnner exception\n"); +} + printf("rcvThreadRunner done, autodelete %d-\n", obj->_autoDelete); if(obj->_autoDelete) { - while(obj->_sendThreadRunning) + while(true) + { + printf("waiting send thread to exit.\n"); + bool exited; + obj->_mutex.lock(); + exited = obj->_sendThreadExited; + obj->_mutex.unlock(); + if (exited) + break; epicsThreadSleep(0.1); - + } + printf("deleting.\n"); delete obj; } } void BlockingTCPTransport::sendThreadRunner(void* param) { BlockingTCPTransport* obj = (BlockingTCPTransport*)param; - +try { obj->processSendQueue(); +} catch (...) { +printf("sendThreadRunnner exception\n"); +} obj->freeConnectionResorces(); + printf("exited.\n"); - obj->_sendThreadRunning = false; + // TODO possible crash on unlock + obj->_mutex.lock(); + obj->_sendThreadExited = true; + obj->_mutex.unlock(); } void BlockingTCPTransport::enqueueSendRequest(TransportSender* sender) { + Lock lock(&_sendQueueMutex); if(_closed) return; - Lock lock(_sendQueueMutex); _sendQueue->insert(sender); + _sendQueueEvent.signal(); } - void BlockingTCPTransport::enqueueMonitorSendRequest( - TransportSender* sender) { + void BlockingTCPTransport::enqueueMonitorSendRequest(TransportSender* sender) { + Lock lock(&_monitorMutex); if(_closed) return; - Lock lock(_monitorMutex); _monitorSendQueue->insert(sender); if(_monitorSendQueue->size()==1) enqueueSendRequest(_monitorSender); } - void MonitorSender::send(ByteBuffer* buffer, - TransportSendControl* control) { + void MonitorSender::send(ByteBuffer* buffer, TransportSendControl* control) { control->startMessage(19, 0); while(true) { TransportSender* sender; _monitorMutex->lock(); if(_monitorSendQueue->size()>0) - try { - sender = _monitorSendQueue->extract(); - } catch(...) { - sender = NULL; - } + sender = _monitorSendQueue->extract(); else sender = NULL; _monitorMutex->unlock(); diff --git a/testApp/remote/testRemoteClientImpl.cpp b/testApp/remote/testRemoteClientImpl.cpp index b26afac..4e1a6fb 100644 --- a/testApp/remote/testRemoteClientImpl.cpp +++ b/testApp/remote/testRemoteClientImpl.cpp @@ -165,8 +165,8 @@ public: // destroy remote instance if (!m_remotelyDestroyed) { - startRequest(PURE_DESTROY_REQUEST); - m_channel->checkAndGetTransport()->enqueueSendRequest(this); +// TODO !!! startRequest(PURE_DESTROY_REQUEST); +/// TODO !!! causes crash m_channel->checkAndGetTransport()->enqueueSendRequest(this); } } @@ -320,6 +320,7 @@ class ChannelProcessRequestImpl : public BaseRequestImpl, public ChannelProcess virtual void destroy() { + BaseRequestImpl::destroy(); delete this; } @@ -459,6 +460,7 @@ class ChannelGetImpl : public BaseRequestImpl, public ChannelGet virtual void destroy() { + BaseRequestImpl::destroy(); // TODO sync if (m_data) delete m_data; if (m_bitSet) delete m_bitSet; @@ -636,6 +638,7 @@ class ChannelPutImpl : public BaseRequestImpl, public ChannelPut virtual void destroy() { + BaseRequestImpl::destroy(); // TODO sync if (m_data) delete m_data; if (m_bitSet) delete m_bitSet; @@ -993,10 +996,14 @@ typedef std::map IOIDResponseRequestMap; AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer); transport->ensureData(4); - DataResponse* nrr = dynamic_cast(_context->getResponseRequest(payloadBuffer->getInt())); - if (nrr) - nrr->response(transport, version, payloadBuffer); - } + ResponseRequest* rr = _context->getResponseRequest(payloadBuffer->getInt()); + if (rr) + { + DataResponse* nrr = dynamic_cast(rr); + if (nrr) + nrr->response(transport, version, payloadBuffer); + } + } }; @@ -1794,7 +1801,8 @@ class TestChannelImpl : public ChannelImpl { { if (remoteDestroy) { m_issueCreateMessage = false; - m_transport->enqueueSendRequest(this); + // TODO !!! this causes problems.. since qnqueueSendRequest is added and this instance deleted + //m_transport->enqueueSendRequest(this); } ReferenceCountingTransport* rct = dynamic_cast(m_transport); @@ -2505,6 +2513,7 @@ TODO { Lock guard(&m_ioidMapMutex); IOIDResponseRequestMap::iterator it = m_pendingResponseRequests.find(ioid); + printf("getResponseRequest %d = %d\n", ioid, (it == m_pendingResponseRequests.end() ? 0 : it->second)); return (it == m_pendingResponseRequests.end() ? 0 : it->second); } @@ -2517,6 +2526,7 @@ TODO { Lock guard(&m_ioidMapMutex); pvAccessID ioid = generateIOID(); + printf("registerResponseRequest %d = %d\n", ioid, request); m_pendingResponseRequests[ioid] = request; return ioid; } @@ -2530,10 +2540,12 @@ TODO { Lock guard(&m_ioidMapMutex); IOIDResponseRequestMap::iterator it = m_pendingResponseRequests.find(request->getIOID()); + printf("unregisterResponseRequest %d = %d\n", request->getIOID(), request); if (it == m_pendingResponseRequests.end()) return 0; ResponseRequest* retVal = it->second; + printf("unregisterResponseRequest %d = %d==%d\n", request->getIOID(), request, retVal); m_pendingResponseRequests.erase(it); return retVal; } @@ -2923,10 +2935,13 @@ class ChannelGetRequesterImpl : public ChannelGetRequester virtual void getDone(epics::pvData::Status *status) { std::cout << "getDone(" << status->toString() << ")" << std::endl; - String str; - m_pvStructure->toString(&str); - std::cout << str; - std::cout << std::endl; + if (m_pvStructure) + { + String str; + m_pvStructure->toString(&str); + std::cout << str; + std::cout << std::endl; + } } }; @@ -2960,19 +2975,25 @@ class ChannelPutRequesterImpl : public ChannelPutRequester virtual void getDone(epics::pvData::Status *status) { std::cout << "getDone(" << status->toString() << ")" << std::endl; - String str; - m_pvStructure->toString(&str); - std::cout << str; - std::cout << std::endl; + if (m_pvStructure) + { + String str; + m_pvStructure->toString(&str); + std::cout << str; + std::cout << std::endl; + } } virtual void putDone(epics::pvData::Status *status) { std::cout << "putDone(" << status->toString() << ")" << std::endl; - String str; - m_pvStructure->toString(&str); - std::cout << str; - std::cout << std::endl; + if (m_pvStructure) + { + String str; + m_pvStructure->toString(&str); + std::cout << str; + std::cout << std::endl; + } } }; @@ -3074,7 +3095,7 @@ int main(int argc,char *argv[]) epicsThreadSleep ( 1.0 ); channel->printInfo(); -/* + GetFieldRequesterImpl getFieldRequesterImpl; channel->getField(&getFieldRequesterImpl, ""); epicsThreadSleep ( 1.0 ); @@ -3086,16 +3107,17 @@ int main(int argc,char *argv[]) epicsThreadSleep ( 1.0 ); channelProcess->destroy(); epicsThreadSleep ( 1.0 ); -*/ + ChannelGetRequesterImpl channelGetRequesterImpl; - PVStructure* pvRequest = getCreateRequest()->createRequest("field(value)",&channelGetRequesterImpl); + PVStructure* pvRequest = getCreateRequest()->createRequest("field(value, timeStamp)",&channelGetRequesterImpl); ChannelGet* channelGet = channel->createChannelGet(&channelGetRequesterImpl, pvRequest); epicsThreadSleep ( 3.0 ); channelGet->get(false); epicsThreadSleep ( 3.0 ); channelGet->destroy(); epicsThreadSleep ( 1.0 ); -/* + + ChannelPutRequesterImpl channelPutRequesterImpl; ChannelPut* channelPut = channel->createChannelPut(&channelPutRequesterImpl, pvRequest); epicsThreadSleep ( 1.0 ); @@ -3128,9 +3150,15 @@ int main(int argc,char *argv[]) delete pvRequest; epicsThreadSleep ( 3.0 ); + printf("Destroying channel... \n"); channel->destroy(); + printf("done.\n"); + epicsThreadSleep ( 3.0 ); + + printf("Destroying context... \n"); context->destroy(); + printf("done.\n"); std::cout << "-----------------------------------------------------------------------" << std::endl; getShowConstructDestruct()->constuctDestructTotals(stdout);