diff --git a/pvAccessApp/remoteClient/clientContextImpl.cpp b/pvAccessApp/remoteClient/clientContextImpl.cpp index 88198dd..8145e42 100644 --- a/pvAccessApp/remoteClient/clientContextImpl.cpp +++ b/pvAccessApp/remoteClient/clientContextImpl.cpp @@ -7,7 +7,7 @@ #include #include #include -#include ƒ +#include #include #include @@ -89,9 +89,9 @@ namespace epics { m_requester(requester), m_destroyed(false), m_remotelyDestroyed(false), m_pendingRequest(NULL_REQUEST) { - // register response request - m_ioid = m_context->registerResponseRequest(this); - channel->registerResponseRequest(this); + // register response request + m_ioid = m_context->registerResponseRequest(this); + channel->registerResponseRequest(this); } bool startRequest(int32 qos) { @@ -116,11 +116,11 @@ namespace epics { } Requester* getRequester() { - return m_requester; + return m_requester; } pvAccessID getIOID() { - return m_ioid; + return m_ioid; } virtual bool initResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) = 0; @@ -129,8 +129,8 @@ namespace epics { virtual void response(Transport* transport, int8 version, ByteBuffer* payloadBuffer) { // TODO? - // try - // { + // try + // { transport->ensureData(1); int8 qos = payloadBuffer->getByte(); Status* status = statusCreate->deserializeStatus(payloadBuffer, transport); @@ -156,68 +156,68 @@ namespace epics { } virtual void cancel() { - destroy(); + destroy(); } virtual void destroy() { - - { + + { Lock guard(&m_mutex); if (m_destroyed) return; m_destroyed = true; - } + } - // unregister response request - m_context->unregisterResponseRequest(this); - m_channel->unregisterResponseRequest(this); + // unregister response request + m_context->unregisterResponseRequest(this); + m_channel->unregisterResponseRequest(this); + + // destroy remote instance + if (!m_remotelyDestroyed) + { + // TODO !!! startRequest(PURE_DESTROY_REQUEST); + /// TODO !!! causes crash m_channel->checkAndGetTransport()->enqueueSendRequest(this); + } - // destroy remote instance - if (!m_remotelyDestroyed) - { - // TODO !!! startRequest(PURE_DESTROY_REQUEST); - /// TODO !!! causes crash m_channel->checkAndGetTransport()->enqueueSendRequest(this); - } - } virtual void timeout() { - cancel(); - // TODO notify? + cancel(); + // TODO notify? } void reportStatus(Status* status) { - // destroy, since channel (parent) was destroyed - if (status == ChannelImpl::channelDestroyed) + // destroy, since channel (parent) was destroyed + if (status == ChannelImpl::channelDestroyed) destroy(); - else if (status == ChannelImpl::channelDisconnected) + else if (status == ChannelImpl::channelDisconnected) stopRequest(); - // TODO notify? + // TODO notify? } virtual void updateSubscription() { - // default is noop + // default is noop } virtual void lock() { - // noop + // noop } virtual void send(ByteBuffer* buffer, TransportSendControl* control) { - int8 qos = getPendingRequest(); - if (qos == -1) + int8 qos = getPendingRequest(); + if (qos == -1) return; - else if (qos == PURE_DESTROY_REQUEST) - { + else if (qos == PURE_DESTROY_REQUEST) + { control->startMessage((int8)15, 8); buffer->putInt(m_channel->getServerChannelID()); buffer->putInt(m_ioid); - } - stopRequest(); + } + stopRequest(); } virtual void unlock() { - // noop + // noop } }; @@ -255,76 +255,76 @@ namespace epics { // TODO check for 0s!!!! - // TODO best-effort support + // TODO best-effort support - // subscribe - try { + // subscribe + try { resubscribeSubscription(channel->checkAndGetTransport()); - } catch (std::runtime_error &rte) { + } catch (std::runtime_error &rte) { EXCEPTION_GUARD(m_callback->channelProcessConnect(channelNotConnected, 0)); - } + } } virtual void send(ByteBuffer* buffer, TransportSendControl* control) { - int32 pendingRequest = getPendingRequest(); - if (pendingRequest < 0) - { + int32 pendingRequest = getPendingRequest(); + if (pendingRequest < 0) + { BaseRequestImpl::send(buffer, control); return; - } - - control->startMessage((int8)16, 9); - buffer->putInt(m_channel->getServerChannelID()); - buffer->putInt(m_ioid); - buffer->putByte((int8)m_pendingRequest); + } - if (pendingRequest & QOS_INIT) - { + control->startMessage((int8)16, 9); + buffer->putInt(m_channel->getServerChannelID()); + buffer->putInt(m_ioid); + buffer->putByte((int8)m_pendingRequest); + + if (pendingRequest & QOS_INIT) + { // pvRequest m_channel->getTransport()->getIntrospectionRegistry()->serializePVRequest(buffer, control, m_pvRequest); - } - - stopRequest(); + } + + stopRequest(); } virtual bool destroyResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { - EXCEPTION_GUARD(m_callback->processDone(status)); - return true; + EXCEPTION_GUARD(m_callback->processDone(status)); + return true; } virtual bool initResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { - EXCEPTION_GUARD(m_callback->channelProcessConnect(status, this)); - return true; + EXCEPTION_GUARD(m_callback->channelProcessConnect(status, this)); + return true; } virtual bool normalResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { - EXCEPTION_GUARD(m_callback->processDone(status)); - return true; + EXCEPTION_GUARD(m_callback->processDone(status)); + return true; } virtual void process(bool lastRequest) { // TODO sync - if (m_destroyed) { + if (m_destroyed) { EXCEPTION_GUARD(m_callback->processDone(destroyedStatus)); return; - } - - if (!startRequest(lastRequest ? QOS_DESTROY : QOS_DEFAULT)) { + } + + if (!startRequest(lastRequest ? QOS_DESTROY : QOS_DEFAULT)) { EXCEPTION_GUARD(m_callback->processDone(otherRequestPendingStatus)); return; - } - - try { + } + + try { m_channel->checkAndGetTransport()->enqueueSendRequest(this); - } catch (std::runtime_error &rte) { + } catch (std::runtime_error &rte) { EXCEPTION_GUARD(m_callback->processDone(channelNotConnected)); - } + } } virtual void resubscribeSubscription(Transport* transport) { - startRequest(QOS_INIT); - transport->enqueueSendRequest(this); + startRequest(QOS_INIT); + transport->enqueueSendRequest(this); } virtual void destroy() @@ -349,7 +349,7 @@ namespace epics { private: ChannelGetRequester* m_channelGetRequester; - PVStructure* m_pvRequest; + PVStructure* m_pvRequest; PVStructure* m_data; BitSet* m_bitSet; @@ -369,100 +369,100 @@ namespace epics { { PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelGet); - // TODO immediate get, i.e. get data with init message - // TODO one-time get, i.e. immediate get + lastRequest + // TODO immediate get, i.e. get data with init message + // TODO one-time get, i.e. immediate get + lastRequest - // subscribe - try { + // subscribe + try { resubscribeSubscription(m_channel->checkAndGetTransport()); - } catch (std::runtime_error &rte) { + } catch (std::runtime_error &rte) { EXCEPTION_GUARD(m_channelGetRequester->channelGetConnect(channelNotConnected, 0, 0, 0)); - } + } } virtual void send(ByteBuffer* buffer, TransportSendControl* control) { - int32 pendingRequest = getPendingRequest(); - if (pendingRequest < 0) - { + int32 pendingRequest = getPendingRequest(); + if (pendingRequest < 0) + { BaseRequestImpl::send(buffer, control); return; - } - - control->startMessage((int8)10, 9); - buffer->putInt(m_channel->getServerChannelID()); - buffer->putInt(m_ioid); - buffer->putByte((int8)m_pendingRequest); - - if (pendingRequest & QOS_INIT) - { + } + + control->startMessage((int8)10, 9); + buffer->putInt(m_channel->getServerChannelID()); + buffer->putInt(m_ioid); + buffer->putByte((int8)m_pendingRequest); + + if (pendingRequest & QOS_INIT) + { // pvRequest m_channel->getTransport()->getIntrospectionRegistry()->serializePVRequest(buffer, control, m_pvRequest); - } - - stopRequest(); + } + + stopRequest(); } virtual bool destroyResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { - // data available - if (qos & QOS_GET) + // data available + if (qos & QOS_GET) return normalResponse(transport, version, payloadBuffer, qos, status); - return true; + return true; } virtual bool initResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { - if (!status->isSuccess()) - { + if (!status->isSuccess()) + { EXCEPTION_GUARD(m_channelGetRequester->channelGetConnect(status, this, 0, 0)); return true; - } + } - // create data and its bitSet - m_data = transport->getIntrospectionRegistry()->deserializeStructureAndCreatePVStructure(payloadBuffer, transport); - m_bitSet = new BitSet(m_data->getNumberFields()); + // create data and its bitSet + m_data = transport->getIntrospectionRegistry()->deserializeStructureAndCreatePVStructure(payloadBuffer, transport); + m_bitSet = new BitSet(m_data->getNumberFields()); - // notify - EXCEPTION_GUARD(m_channelGetRequester->channelGetConnect(okStatus, this, m_data, m_bitSet)); - return true; + // notify + EXCEPTION_GUARD(m_channelGetRequester->channelGetConnect(okStatus, this, m_data, m_bitSet)); + return true; } virtual bool normalResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { - if (!status->isSuccess()) - { + if (!status->isSuccess()) + { EXCEPTION_GUARD(m_channelGetRequester->getDone(status)); return true; - } - - // deserialize bitSet and data - m_bitSet->deserialize(payloadBuffer, transport); - m_data->deserialize(payloadBuffer, transport, m_bitSet); - - EXCEPTION_GUARD(m_channelGetRequester->getDone(okStatus)); - return true; + } + + // deserialize bitSet and data + m_bitSet->deserialize(payloadBuffer, transport); + m_data->deserialize(payloadBuffer, transport, m_bitSet); + + EXCEPTION_GUARD(m_channelGetRequester->getDone(okStatus)); + return true; } virtual void get(bool lastRequest) { // TODO sync? - if (m_destroyed) { + if (m_destroyed) { EXCEPTION_GUARD(m_channelGetRequester->getDone(destroyedStatus)); return; - } + } - if (!startRequest(lastRequest ? QOS_DESTROY | QOS_GET : QOS_DEFAULT)) { + if (!startRequest(lastRequest ? QOS_DESTROY | QOS_GET : QOS_DEFAULT)) { EXCEPTION_GUARD(m_channelGetRequester->getDone(otherRequestPendingStatus)); return; - } - - try { + } + + try { m_channel->checkAndGetTransport()->enqueueSendRequest(this); - } catch (std::runtime_error &rte) { - EXCEPTION_GUARD(m_channelGetRequester->getDone(channelNotConnected)); - } + } catch (std::runtime_error &rte) { + EXCEPTION_GUARD(m_channelGetRequester->getDone(channelNotConnected)); + } } virtual void resubscribeSubscription(Transport* transport) { - startRequest(QOS_INIT); - transport->enqueueSendRequest(this); + startRequest(QOS_INIT); + transport->enqueueSendRequest(this); } @@ -495,7 +495,7 @@ namespace epics { private: ChannelPutRequester* m_channelPutRequester; - PVStructure* m_pvRequest; + PVStructure* m_pvRequest; PVStructure* m_data; BitSet* m_bitSet; @@ -515,70 +515,70 @@ namespace epics { { PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelPut); - // TODO low-overhead put - // TODO best-effort put + // TODO low-overhead put + // TODO best-effort put - // subscribe - try { + // subscribe + try { resubscribeSubscription(m_channel->checkAndGetTransport()); - } catch (std::runtime_error &rte) { + } catch (std::runtime_error &rte) { EXCEPTION_GUARD(m_channelPutRequester->channelPutConnect(channelNotConnected, 0, 0, 0)); - } + } } virtual void send(ByteBuffer* buffer, TransportSendControl* control) { - int32 pendingRequest = getPendingRequest(); - if (pendingRequest < 0) - { + int32 pendingRequest = getPendingRequest(); + if (pendingRequest < 0) + { BaseRequestImpl::send(buffer, control); return; - } - - control->startMessage((int8)11, 9); - buffer->putInt(m_channel->getServerChannelID()); - buffer->putInt(m_ioid); - buffer->putByte((int8)m_pendingRequest); - - if (pendingRequest & QOS_INIT) - { + } + + control->startMessage((int8)11, 9); + buffer->putInt(m_channel->getServerChannelID()); + buffer->putInt(m_ioid); + buffer->putByte((int8)m_pendingRequest); + + if (pendingRequest & QOS_INIT) + { // pvRequest m_channel->getTransport()->getIntrospectionRegistry()->serializePVRequest(buffer, control, m_pvRequest); - } - else if (!(pendingRequest & QOS_GET)) - { + } + else if (!(pendingRequest & QOS_GET)) + { // put // serialize only what has been changed m_bitSet->serialize(buffer, control); m_data->serialize(buffer, control, m_bitSet); - } - - stopRequest(); + } + + stopRequest(); } virtual bool destroyResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { - EXCEPTION_GUARD(m_channelPutRequester->putDone(status)); - return true; + EXCEPTION_GUARD(m_channelPutRequester->putDone(status)); + return true; } virtual bool initResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { - if (!status->isSuccess()) - { + if (!status->isSuccess()) + { EXCEPTION_GUARD(m_channelPutRequester->channelPutConnect(status, this, 0, 0)); return true; - } + } - // create data and its bitSet - m_data = transport->getIntrospectionRegistry()->deserializeStructureAndCreatePVStructure(payloadBuffer, transport); - m_bitSet = new BitSet(m_data->getNumberFields()); + // create data and its bitSet + m_data = transport->getIntrospectionRegistry()->deserializeStructureAndCreatePVStructure(payloadBuffer, transport); + m_bitSet = new BitSet(m_data->getNumberFields()); - // notify - EXCEPTION_GUARD(m_channelPutRequester->channelPutConnect(okStatus, this, m_data, m_bitSet)); - return true; + // notify + EXCEPTION_GUARD(m_channelPutRequester->channelPutConnect(okStatus, this, m_data, m_bitSet)); + return true; } virtual bool normalResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { - if (qos & QOS_GET) - { + if (qos & QOS_GET) + { if (!status->isSuccess()) { EXCEPTION_GUARD(m_channelPutRequester->getDone(status)); @@ -589,58 +589,58 @@ namespace epics { EXCEPTION_GUARD(m_channelPutRequester->getDone(status)); return true; - } - else - { - EXCEPTION_GUARD(m_channelPutRequester->putDone(okStatus)); + } + else + { + EXCEPTION_GUARD(m_channelPutRequester->putDone(okStatus)); return true; - } + } } virtual void get() { // TODO sync? - if (m_destroyed) { + if (m_destroyed) { EXCEPTION_GUARD(m_channelPutRequester->getDone(destroyedStatus)); return; - } + } - if (!startRequest(QOS_GET)) { + if (!startRequest(QOS_GET)) { EXCEPTION_GUARD(m_channelPutRequester->getDone(otherRequestPendingStatus)); return; - } - + } - try { + + try { m_channel->checkAndGetTransport()->enqueueSendRequest(this); - } catch (std::runtime_error &rte) { + } catch (std::runtime_error &rte) { EXCEPTION_GUARD(m_channelPutRequester->getDone(channelNotConnected)); - } + } } virtual void put(bool lastRequest) { // TODO sync? - if (m_destroyed) { + if (m_destroyed) { m_channelPutRequester->putDone(destroyedStatus); return; - } + } - if (!startRequest(lastRequest ? QOS_DESTROY : QOS_DEFAULT)) { + if (!startRequest(lastRequest ? QOS_DESTROY : QOS_DEFAULT)) { m_channelPutRequester->putDone(otherRequestPendingStatus); return; - } - - try { + } + + try { m_channel->checkAndGetTransport()->enqueueSendRequest(this); - } catch (std::runtime_error &rte) { + } catch (std::runtime_error &rte) { EXCEPTION_GUARD(m_channelPutRequester->putDone(channelNotConnected)); - } + } } virtual void resubscribeSubscription(Transport* transport) { - startRequest(QOS_INIT); - transport->enqueueSendRequest(this); + startRequest(QOS_INIT); + transport->enqueueSendRequest(this); } @@ -670,7 +670,7 @@ namespace epics { private: ChannelPutGetRequester* m_channelPutGetRequester; - PVStructure* m_pvRequest; + PVStructure* m_pvRequest; PVStructure* m_putData; PVStructure* m_getData; @@ -690,73 +690,73 @@ namespace epics { { PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelPutGet); - // subscribe - try { + // subscribe + try { resubscribeSubscription(m_channel->checkAndGetTransport()); - } catch (std::runtime_error &rte) { + } catch (std::runtime_error &rte) { EXCEPTION_GUARD(m_channelPutGetRequester->channelPutGetConnect(channelNotConnected, 0, 0, 0)); - } + } } virtual void send(ByteBuffer* buffer, TransportSendControl* control) { - int32 pendingRequest = getPendingRequest(); - if (pendingRequest < 0) - { + int32 pendingRequest = getPendingRequest(); + if (pendingRequest < 0) + { BaseRequestImpl::send(buffer, control); return; - } - - control->startMessage((int8)12, 9); - buffer->putInt(m_channel->getServerChannelID()); - buffer->putInt(m_ioid); - if ((pendingRequest & QOS_INIT) == 0) + } + + control->startMessage((int8)12, 9); + buffer->putInt(m_channel->getServerChannelID()); + buffer->putInt(m_ioid); + if ((pendingRequest & QOS_INIT) == 0) buffer->putByte((int8)pendingRequest); - - if (pendingRequest & QOS_INIT) - { + + if (pendingRequest & QOS_INIT) + { buffer->putByte((int8)QOS_INIT); // pvRequest m_channel->getTransport()->getIntrospectionRegistry()->serializePVRequest(buffer, control, m_pvRequest); - } - else if (pendingRequest & (QOS_GET | QOS_GET_PUT)) { + } + else if (pendingRequest & (QOS_GET | QOS_GET_PUT)) { // noop - } - else - { + } + else + { m_putData->serialize(buffer, control); - } - - stopRequest(); + } + + stopRequest(); } virtual bool destroyResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { - // data available - // TODO we need a flag here... - return normalResponse(transport, version, payloadBuffer, qos, status); + // data available + // TODO we need a flag here... + return normalResponse(transport, version, payloadBuffer, qos, status); } virtual bool initResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { - if (!status->isSuccess()) - { + if (!status->isSuccess()) + { EXCEPTION_GUARD(m_channelPutGetRequester->channelPutGetConnect(status, this, 0, 0)); return true; - } + } - IntrospectionRegistry* registry = transport->getIntrospectionRegistry(); - m_putData = registry->deserializeStructureAndCreatePVStructure(payloadBuffer, transport); - m_getData = registry->deserializeStructureAndCreatePVStructure(payloadBuffer, transport); + IntrospectionRegistry* registry = transport->getIntrospectionRegistry(); + m_putData = registry->deserializeStructureAndCreatePVStructure(payloadBuffer, transport); + m_getData = registry->deserializeStructureAndCreatePVStructure(payloadBuffer, transport); - // notify - EXCEPTION_GUARD(m_channelPutGetRequester->channelPutGetConnect(okStatus, this, m_putData, m_getData)); - return true; + // notify + EXCEPTION_GUARD(m_channelPutGetRequester->channelPutGetConnect(okStatus, this, m_putData, m_getData)); + return true; } virtual bool normalResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { - if (qos & QOS_GET) - { + if (qos & QOS_GET) + { if (!status->isSuccess()) { EXCEPTION_GUARD(m_channelPutGetRequester->getGetDone(status)); @@ -768,9 +768,9 @@ namespace epics { EXCEPTION_GUARD(m_channelPutGetRequester->getGetDone(status)); return true; - } - else if (qos & QOS_GET_PUT) - { + } + else if (qos & QOS_GET_PUT) + { if (!status->isSuccess()) { EXCEPTION_GUARD(m_channelPutGetRequester->getPutDone(status)); @@ -782,9 +782,9 @@ namespace epics { EXCEPTION_GUARD(m_channelPutGetRequester->getPutDone(status)); return true; - } - else - { + } + else + { if (!status->isSuccess()) { EXCEPTION_GUARD(m_channelPutGetRequester->putGetDone(status)); @@ -796,67 +796,67 @@ namespace epics { EXCEPTION_GUARD(m_channelPutGetRequester->putGetDone(status)); return true; - } + } } virtual void putGet(bool lastRequest) { - if (m_destroyed) { + if (m_destroyed) { EXCEPTION_GUARD(m_channelPutGetRequester->putGetDone(destroyedStatus)); return; - } - - if (!startRequest(lastRequest ? QOS_DESTROY : QOS_DEFAULT)) { + } + + if (!startRequest(lastRequest ? QOS_DESTROY : QOS_DEFAULT)) { EXCEPTION_GUARD(m_channelPutGetRequester->putGetDone(otherRequestPendingStatus)); return; - } - - try { + } + + try { m_channel->checkAndGetTransport()->enqueueSendRequest(this); - } catch (std::runtime_error &rte) { + } catch (std::runtime_error &rte) { EXCEPTION_GUARD(m_channelPutGetRequester->putGetDone(channelNotConnected)); - } + } } virtual void getGet() { - if (m_destroyed) { + if (m_destroyed) { EXCEPTION_GUARD(m_channelPutGetRequester->getGetDone(destroyedStatus)); return; - } + } - if (!startRequest(QOS_GET)) { + if (!startRequest(QOS_GET)) { EXCEPTION_GUARD(m_channelPutGetRequester->getGetDone(otherRequestPendingStatus)); return; - } + } - try { + try { m_channel->checkAndGetTransport()->enqueueSendRequest(this); - } catch (std::runtime_error &rte) { + } catch (std::runtime_error &rte) { EXCEPTION_GUARD(m_channelPutGetRequester->getGetDone(channelNotConnected)); - } + } } virtual void getPut() { - if (m_destroyed) { + if (m_destroyed) { m_channelPutGetRequester->getPutDone(destroyedStatus); return; - } + } - if (!startRequest(QOS_GET_PUT)) { + if (!startRequest(QOS_GET_PUT)) { m_channelPutGetRequester->getPutDone(otherRequestPendingStatus); return; - } + } - try { + try { m_channel->checkAndGetTransport()->enqueueSendRequest(this); - } catch (std::runtime_error &rte) { + } catch (std::runtime_error &rte) { EXCEPTION_GUARD(m_channelPutGetRequester->getPutDone(channelNotConnected)); - } + } } virtual void resubscribeSubscription(Transport* transport) { - startRequest(QOS_INIT); - transport->enqueueSendRequest(this); + startRequest(QOS_INIT); + transport->enqueueSendRequest(this); } @@ -888,7 +888,7 @@ namespace epics { private: ChannelRPCRequester* m_channelRPCRequester; - PVStructure* m_pvRequest; + PVStructure* m_pvRequest; PVStructure* m_data; BitSet* m_bitSet; @@ -908,103 +908,103 @@ namespace epics { { PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelRPC); - // subscribe - try { + // subscribe + try { resubscribeSubscription(m_channel->checkAndGetTransport()); - } catch (std::runtime_error &rte) { + } catch (std::runtime_error &rte) { EXCEPTION_GUARD(m_channelRPCRequester->channelRPCConnect(channelNotConnected, 0, 0, 0)); - } + } } virtual void send(ByteBuffer* buffer, TransportSendControl* control) { - int32 pendingRequest = getPendingRequest(); - if (pendingRequest < 0) - { + int32 pendingRequest = getPendingRequest(); + if (pendingRequest < 0) + { BaseRequestImpl::send(buffer, control); return; - } - - control->startMessage((int8)20, 9); - buffer->putInt(m_channel->getServerChannelID()); - buffer->putInt(m_ioid); - if ((m_pendingRequest & QOS_INIT) == 0) + } + + control->startMessage((int8)20, 9); + buffer->putInt(m_channel->getServerChannelID()); + buffer->putInt(m_ioid); + if ((m_pendingRequest & QOS_INIT) == 0) buffer->putByte((int8)m_pendingRequest); - - if (pendingRequest & QOS_INIT) - { + + if (pendingRequest & QOS_INIT) + { buffer->putByte((int8)QOS_INIT); // pvRequest m_channel->getTransport()->getIntrospectionRegistry()->serializePVRequest(buffer, control, m_pvRequest); - } - else - { + } + else + { m_bitSet->serialize(buffer, control); m_data->serialize(buffer, control, m_bitSet); - } - - stopRequest(); + } + + stopRequest(); } virtual bool destroyResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { - // data available - // TODO we need a flag here... - return normalResponse(transport, version, payloadBuffer, qos, status); + // data available + // TODO we need a flag here... + return normalResponse(transport, version, payloadBuffer, qos, status); } virtual bool initResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { - if (!status->isSuccess()) - { + if (!status->isSuccess()) + { EXCEPTION_GUARD(m_channelRPCRequester->channelRPCConnect(status, this, 0, 0)); return true; - } + } - // create data and its bitSet - m_data = transport->getIntrospectionRegistry()->deserializeStructureAndCreatePVStructure(payloadBuffer, transport); - m_bitSet = new BitSet(m_data->getNumberFields()); + // create data and its bitSet + m_data = transport->getIntrospectionRegistry()->deserializeStructureAndCreatePVStructure(payloadBuffer, transport); + m_bitSet = new BitSet(m_data->getNumberFields()); - // notify - EXCEPTION_GUARD(m_channelRPCRequester->channelRPCConnect(okStatus, this, m_data, m_bitSet)); - return true; + // notify + EXCEPTION_GUARD(m_channelRPCRequester->channelRPCConnect(okStatus, this, m_data, m_bitSet)); + return true; } virtual bool normalResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { - if (!status->isSuccess()) - { + if (!status->isSuccess()) + { EXCEPTION_GUARD(m_channelRPCRequester->requestDone(status, 0)); return true; - } - - - PVStructure* response = transport->getIntrospectionRegistry()->deserializeStructureAndCreatePVStructure(payloadBuffer, transport); - EXCEPTION_GUARD(m_channelRPCRequester->requestDone(okStatus, response)); - delete response; - return true; + } + + + PVStructure* response = transport->getIntrospectionRegistry()->deserializeStructureAndCreatePVStructure(payloadBuffer, transport); + EXCEPTION_GUARD(m_channelRPCRequester->requestDone(okStatus, response)); + delete response; + return true; } virtual void request(bool lastRequest) { // TODO sync? - if (m_destroyed) { + if (m_destroyed) { EXCEPTION_GUARD(m_channelRPCRequester->requestDone(destroyedStatus, 0)); return; - } + } - if (!startRequest(lastRequest ? QOS_DESTROY : QOS_DEFAULT)) { + if (!startRequest(lastRequest ? QOS_DESTROY : QOS_DEFAULT)) { EXCEPTION_GUARD(m_channelRPCRequester->requestDone(otherRequestPendingStatus, 0)); return; - } - - try { + } + + try { m_channel->checkAndGetTransport()->enqueueSendRequest(this); - } catch (std::runtime_error &rte) { + } catch (std::runtime_error &rte) { EXCEPTION_GUARD(m_channelRPCRequester->requestDone(channelNotConnected, 0)); - } + } } virtual void resubscribeSubscription(Transport* transport) { - startRequest(QOS_INIT); - transport->enqueueSendRequest(this); + startRequest(QOS_INIT); + transport->enqueueSendRequest(this); } @@ -1035,7 +1035,7 @@ namespace epics { private: ChannelArrayRequester* m_channelArrayRequester; - PVStructure* m_pvRequest; + PVStructure* m_pvRequest; PVArray* m_data; @@ -1060,78 +1060,78 @@ namespace epics { { PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelArray); - // subscribe - try { + // subscribe + try { resubscribeSubscription(m_channel->checkAndGetTransport()); - } catch (std::runtime_error &rte) { + } catch (std::runtime_error &rte) { EXCEPTION_GUARD(m_channelArrayRequester->channelArrayConnect(channelNotConnected, 0, 0)); - } + } } virtual void send(ByteBuffer* buffer, TransportSendControl* control) { - int32 pendingRequest = getPendingRequest(); - if (pendingRequest < 0) - { + int32 pendingRequest = getPendingRequest(); + if (pendingRequest < 0) + { BaseRequestImpl::send(buffer, control); return; - } - - control->startMessage((int8)14, 9); - buffer->putInt(m_channel->getServerChannelID()); - buffer->putInt(m_ioid); - buffer->putByte((int8)m_pendingRequest); - - if (pendingRequest & QOS_INIT) - { + } + + control->startMessage((int8)14, 9); + buffer->putInt(m_channel->getServerChannelID()); + buffer->putInt(m_ioid); + buffer->putByte((int8)m_pendingRequest); + + if (pendingRequest & QOS_INIT) + { // pvRequest m_channel->getTransport()->getIntrospectionRegistry()->serializePVRequest(buffer, control, m_pvRequest); - } - else if (pendingRequest & QOS_GET) - { - SerializeHelper::writeSize(m_offset, buffer, control); - SerializeHelper::writeSize(m_count, buffer, control); - } - else if (pendingRequest & QOS_GET_PUT) // i.e. setLength - { - SerializeHelper::writeSize(m_length, buffer, control); - SerializeHelper::writeSize(m_capacity, buffer, control); - } - // put - else - { - SerializeHelper::writeSize(m_offset, buffer, control); - m_data->serialize(buffer, control, 0, m_count); // put from 0 offset; TODO count out-of-bounds check?! - } - - stopRequest(); + } + else if (pendingRequest & QOS_GET) + { + SerializeHelper::writeSize(m_offset, buffer, control); + SerializeHelper::writeSize(m_count, buffer, control); + } + else if (pendingRequest & QOS_GET_PUT) // i.e. setLength + { + SerializeHelper::writeSize(m_length, buffer, control); + SerializeHelper::writeSize(m_capacity, buffer, control); + } + // put + else + { + SerializeHelper::writeSize(m_offset, buffer, control); + m_data->serialize(buffer, control, 0, m_count); // put from 0 offset; TODO count out-of-bounds check?! + } + + stopRequest(); } virtual bool destroyResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { - // data available (get with destroy) - if (qos & QOS_GET) + // data available (get with destroy) + if (qos & QOS_GET) return normalResponse(transport, version, payloadBuffer, qos, status); - return true; + return true; } virtual bool initResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { - if (!status->isSuccess()) - { + if (!status->isSuccess()) + { EXCEPTION_GUARD(m_channelArrayRequester->channelArrayConnect(status, this, 0)); return true; - } + } - // create data and its bitSet - FieldConstPtr field = transport->getIntrospectionRegistry()->deserialize(payloadBuffer, transport); - m_data = dynamic_cast(getPVDataCreate()->createPVField(0, field)); + // create data and its bitSet + FieldConstPtr field = transport->getIntrospectionRegistry()->deserialize(payloadBuffer, transport); + m_data = dynamic_cast(getPVDataCreate()->createPVField(0, field)); - // notify - EXCEPTION_GUARD(m_channelArrayRequester->channelArrayConnect(okStatus, this, m_data)); - return true; + // notify + EXCEPTION_GUARD(m_channelArrayRequester->channelArrayConnect(okStatus, this, m_data)); + return true; } virtual bool normalResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { - if (qos & QOS_GET) - { + if (qos & QOS_GET) + { if (!status->isSuccess()) { m_channelArrayRequester->getArrayDone(status); @@ -1142,90 +1142,90 @@ namespace epics { EXCEPTION_GUARD(m_channelArrayRequester->getArrayDone(okStatus)); return true; - } - else if (qos & QOS_GET_PUT) - { + } + else if (qos & QOS_GET_PUT) + { EXCEPTION_GUARD(m_channelArrayRequester->setLengthDone(status)); return true; - } - else - { + } + else + { EXCEPTION_GUARD(m_channelArrayRequester->putArrayDone(status)); return true; - } + } } virtual void getArray(bool lastRequest, int offset, int count) { // TODO sync? - if (m_destroyed) { + if (m_destroyed) { EXCEPTION_GUARD(m_channelArrayRequester->getArrayDone(destroyedStatus)); return; - } + } - if (!startRequest(lastRequest ? QOS_DESTROY | QOS_GET : QOS_GET)) { + if (!startRequest(lastRequest ? QOS_DESTROY | QOS_GET : QOS_GET)) { EXCEPTION_GUARD(m_channelArrayRequester->getArrayDone(otherRequestPendingStatus)); return; - } - - try { - m_offset = offset; - m_count = count; + } + + try { + m_offset = offset; + m_count = count; m_channel->checkAndGetTransport()->enqueueSendRequest(this); - } catch (std::runtime_error &rte) { + } catch (std::runtime_error &rte) { EXCEPTION_GUARD(m_channelArrayRequester->getArrayDone(channelNotConnected)); - } + } } virtual void putArray(bool lastRequest, int offset, int count) { // TODO sync? - if (m_destroyed) { + if (m_destroyed) { EXCEPTION_GUARD(m_channelArrayRequester->putArrayDone(destroyedStatus)); return; - } + } - if (!startRequest(lastRequest ? QOS_DESTROY : QOS_DEFAULT)) { + if (!startRequest(lastRequest ? QOS_DESTROY : QOS_DEFAULT)) { EXCEPTION_GUARD(m_channelArrayRequester->putArrayDone(otherRequestPendingStatus)); return; - } - - try { - m_offset = offset; - m_count = count; + } + + try { + m_offset = offset; + m_count = count; m_channel->checkAndGetTransport()->enqueueSendRequest(this); - } catch (std::runtime_error &rte) { + } catch (std::runtime_error &rte) { EXCEPTION_GUARD(m_channelArrayRequester->putArrayDone(channelNotConnected)); - } + } } virtual void setLength(bool lastRequest, int length, int capacity) { // TODO sync? - if (m_destroyed) { + if (m_destroyed) { EXCEPTION_GUARD(m_channelArrayRequester->setLengthDone(destroyedStatus)); return; - } + } - if (!startRequest(lastRequest ? QOS_DESTROY | QOS_GET_PUT : QOS_GET_PUT)) { + if (!startRequest(lastRequest ? QOS_DESTROY | QOS_GET_PUT : QOS_GET_PUT)) { EXCEPTION_GUARD(m_channelArrayRequester->setLengthDone(otherRequestPendingStatus)); return; - } - - try { - m_length = length; - m_capacity = capacity; + } + + try { + m_length = length; + m_capacity = capacity; m_channel->checkAndGetTransport()->enqueueSendRequest(this); - } catch (std::runtime_error &rte) { + } catch (std::runtime_error &rte) { EXCEPTION_GUARD(m_channelArrayRequester->setLengthDone(channelNotConnected)); - } + } } virtual void resubscribeSubscription(Transport* transport) { - startRequest(QOS_INIT); - transport->enqueueSendRequest(this); + startRequest(QOS_INIT); + transport->enqueueSendRequest(this); } @@ -1255,12 +1255,12 @@ namespace epics { { private: ChannelImpl* m_channel; - ClientContextImpl* m_context; - pvAccessID m_ioid; + ClientContextImpl* m_context; + pvAccessID m_ioid; GetFieldRequester* m_callback; - String m_subField; - Mutex m_mutex; - bool m_destroyed; + String m_subField; + Mutex m_mutex; + bool m_destroyed; private: ~ChannelGetFieldRequestImpl() @@ -1269,7 +1269,7 @@ namespace epics { } - public: + public: ChannelGetFieldRequestImpl(ChannelImpl* channel, GetFieldRequester* callback, String subField) : m_channel(channel), m_context(channel->getContext()), m_callback(callback), m_subField(subField), @@ -1277,78 +1277,78 @@ namespace epics { { PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelGetField); - // register response request - m_ioid = m_context->registerResponseRequest(this); - channel->registerResponseRequest(this); + // register response request + m_ioid = m_context->registerResponseRequest(this); + channel->registerResponseRequest(this); - // enqueue send request - try { + // enqueue send request + try { m_channel->checkAndGetTransport()->enqueueSendRequest(this); - } catch (std::runtime_error &rte) { + } catch (std::runtime_error &rte) { EXCEPTION_GUARD(callback->getDone(channelNotConnected, 0)); - } + } } Requester* getRequester() { - return m_callback; + return m_callback; } pvAccessID getIOID() { - return m_ioid; + return m_ioid; } virtual void lock() { - // noop + // noop } virtual void send(ByteBuffer* buffer, TransportSendControl* control) { - control->startMessage((int8)17, 8); - buffer->putInt(m_channel->getServerChannelID()); - buffer->putInt(m_ioid); - SerializeHelper::serializeString(m_subField, buffer, control); + control->startMessage((int8)17, 8); + buffer->putInt(m_channel->getServerChannelID()); + buffer->putInt(m_ioid); + SerializeHelper::serializeString(m_subField, buffer, control); } virtual void cancel() { - destroy(); - // TODO notify? + destroy(); + // TODO notify? } virtual void timeout() { - cancel(); + cancel(); } void reportStatus(Status* status) { - // destroy, since channel (parent) was destroyed - if (status == ChannelImpl::channelDestroyed) + // destroy, since channel (parent) was destroyed + if (status == ChannelImpl::channelDestroyed) destroy(); - // TODO notify? + // TODO notify? } virtual void unlock() { - // noop + // noop } virtual void destroy() { - { + { Lock guard(&m_mutex); if (m_destroyed) return; m_destroyed = true; - } + } - // unregister response request - m_context->unregisterResponseRequest(this); - m_channel->unregisterResponseRequest(this); - - delete this; + // unregister response request + m_context->unregisterResponseRequest(this); + m_channel->unregisterResponseRequest(this); + + delete this; } virtual void response(Transport* transport, int8 version, ByteBuffer* payloadBuffer) { // TODO? - // try - // { + // try + // { Status* status = statusCreate->deserializeStatus(payloadBuffer, transport); if (status->isSuccess()) { @@ -1366,11 +1366,11 @@ namespace epics { if (status != okStatus) delete status; // } // TODO guard callback - // finally - // { + // finally + // { // always cancel request - // cancel(); - // } + // cancel(); + // } cancel(); @@ -1402,7 +1402,7 @@ namespace epics { Structure* m_structure; bool m_started; - PVStructure* m_pvRequest; + PVStructure* m_pvRequest; // TODO temp PVStructure* m_pvStructure; @@ -1431,51 +1431,51 @@ namespace epics { // TODO quques - // subscribe - try { + // subscribe + try { resubscribeSubscription(m_channel->checkAndGetTransport()); - } catch (std::runtime_error &rte) { + } catch (std::runtime_error &rte) { EXCEPTION_GUARD(m_monitorRequester->monitorConnect(channelNotConnected, 0, 0)); - } + } } virtual void send(ByteBuffer* buffer, TransportSendControl* control) { - int32 pendingRequest = getPendingRequest(); - if (pendingRequest < 0) - { + int32 pendingRequest = getPendingRequest(); + if (pendingRequest < 0) + { BaseRequestImpl::send(buffer, control); return; - } - - control->startMessage((int8)13, 9); - buffer->putInt(m_channel->getServerChannelID()); - buffer->putInt(m_ioid); - buffer->putByte((int8)m_pendingRequest); - - if (pendingRequest & QOS_INIT) - { + } + + control->startMessage((int8)13, 9); + buffer->putInt(m_channel->getServerChannelID()); + buffer->putInt(m_ioid); + buffer->putByte((int8)m_pendingRequest); + + if (pendingRequest & QOS_INIT) + { // pvRequest m_channel->getTransport()->getIntrospectionRegistry()->serializePVRequest(buffer, control, m_pvRequest); - } - - stopRequest(); + } + + stopRequest(); } virtual bool destroyResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { - // data available - // TODO if (qos & QOS_GET) - return normalResponse(transport, version, payloadBuffer, qos, status); + // data available + // TODO if (qos & QOS_GET) + return normalResponse(transport, version, payloadBuffer, qos, status); } virtual bool initResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { - if (!status->isSuccess()) - { + if (!status->isSuccess()) + { EXCEPTION_GUARD(m_monitorRequester->monitorConnect(status, this, 0)); return true; - } + } - // create data and its bitSet - m_structure = const_cast(dynamic_cast(transport->getIntrospectionRegistry()->deserialize(payloadBuffer, transport))); + // create data and its bitSet + m_structure = const_cast(dynamic_cast(transport->getIntrospectionRegistry()->deserialize(payloadBuffer, transport))); //monitorStrategy->init(structure); @@ -1485,33 +1485,35 @@ namespace epics { m_overrunBitSet = new BitSet(m_pvStructure->getNumberFields()); - // notify - EXCEPTION_GUARD(m_monitorRequester->monitorConnect(okStatus, this, m_structure)); - return true; + // notify + EXCEPTION_GUARD(m_monitorRequester->monitorConnect(okStatus, this, m_structure)); + + if (m_started) + delete start(); + + return true; } virtual bool normalResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) { - if (qos & QOS_GET) - { + if (qos & QOS_GET) + { // TODO not supported by IF yet... - } - else - { + } + else + { // TODO m_changedBitSet->deserialize(payloadBuffer, transport); m_pvStructure->deserialize(payloadBuffer, transport, m_changedBitSet); m_overrunBitSet->deserialize(payloadBuffer, transport); EXCEPTION_GUARD(m_monitorRequester->monitorEvent(this)); - } - return true; + } + return true; } virtual void resubscribeSubscription(Transport* transport) { - startRequest(QOS_INIT); - transport->enqueueSendRequest(this); - if (m_started) - start(); + startRequest(QOS_INIT); + transport->enqueueSendRequest(this); } @@ -1538,8 +1540,8 @@ namespace epics { // override, since we optimize status virtual void response(Transport* transport, int8 version, ByteBuffer* payloadBuffer) { // TODO? - // try - // { + // try + // { transport->ensureData(1); int8 qos = payloadBuffer->getByte(); if (qos & QOS_INIT) @@ -1573,12 +1575,12 @@ namespace epics { Lock guard(&m_lock); // TODO sync - if (m_destroyed) + if (m_destroyed) return getStatusCreate()->createStatus(STATUSTYPE_ERROR, "Monitor destroyed.");; - - // TODO monitorStrategy.start(); - - //try { + + // TODO monitorStrategy.start(); + + //try { // start == process + get if (!startRequest(QOS_PROCESS | QOS_GET)) { @@ -1588,9 +1590,9 @@ namespace epics { m_started = true; // client needs to delete status, so passing shared OK instance is not right thing to do return getStatusCreate()->createStatus(STATUSTYPE_OK, "Monitor started."); - //} catch (std::runtime_error &rte) { - // return channelNotConnected; // TODO clone - //} + //} catch (std::runtime_error &rte) { + // return channelNotConnected; // TODO clone + //} @@ -1601,12 +1603,12 @@ namespace epics { Lock guard(&m_lock); // TODO sync - if (m_destroyed) + if (m_destroyed) return getStatusCreate()->createStatus(STATUSTYPE_ERROR, "Monitor destroyed.");; - - //monitorStrategy.stop(); - - //try { + + //monitorStrategy.stop(); + + //try { // stop == process + no get if (!startRequest(QOS_PROCESS)) { @@ -1616,9 +1618,9 @@ namespace epics { m_started = false; // client needs to delete status, so passing shared OK instance is not right thing to do return getStatusCreate()->createStatus(STATUSTYPE_OK, "Monitor stopped."); - //} catch (std::runtime_error &rte) { - // return channelNotConnected; // TODO clone - //} + //} catch (std::runtime_error &rte) { + // return channelNotConnected; // TODO clone + //} } @@ -1747,15 +1749,15 @@ namespace epics { Transport* transport, int8 version, int8 command, int payloadSize, epics::pvData::ByteBuffer* payloadBuffer) { - AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer); - - transport->ensureData(4); - ResponseRequest* rr = _context->getResponseRequest(payloadBuffer->getInt()); - if (rr) - { + AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer); + + transport->ensureData(4); + ResponseRequest* rr = _context->getResponseRequest(payloadBuffer->getInt()); + if (rr) + { DataResponse* nrr = dynamic_cast(rr); if (nrr) - nrr->response(transport, version, payloadBuffer); + nrr->response(transport, version, payloadBuffer); } } }; @@ -1775,48 +1777,48 @@ namespace epics { Transport* transport, int8 version, int8 command, int payloadSize, epics::pvData::ByteBuffer* payloadBuffer) { - AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer); + AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer); - transport->ensureData(5); - int32 searchSequenceId = payloadBuffer->getInt(); - bool found = payloadBuffer->getByte() != 0; - if (!found) + transport->ensureData(5); + int32 searchSequenceId = payloadBuffer->getInt(); + bool found = payloadBuffer->getByte() != 0; + if (!found) return; - transport->ensureData((128+2*16)/8); + transport->ensureData((128+2*16)/8); osiSockAddr serverAddress; serverAddress.ia.sin_family = AF_INET; - // 128-bit IPv6 address - /* - int8* byteAddress = new int8[16]; - for (int i = 0; i < 16; i++) - byteAddress[i] = payloadBuffer->getByte(); }; - */ - + // 128-bit IPv6 address + /* + int8* byteAddress = new int8[16]; + for (int i = 0; i < 16; i++) + byteAddress[i] = payloadBuffer->getByte(); }; + */ + // IPv4 compatible IPv6 address expected // first 80-bit are 0 if (payloadBuffer->getLong() != 0) return; if (payloadBuffer->getShort() != 0) return; if (payloadBuffer->getShort() != (int16)0xFFFF) return; - // accept given address if explicitly specified by sender + // accept given address if explicitly specified by sender serverAddress.ia.sin_addr.s_addr = htonl(payloadBuffer->getInt()); if (serverAddress.ia.sin_addr.s_addr == INADDR_ANY) serverAddress.ia.sin_addr = responseFrom->ia.sin_addr; serverAddress.ia.sin_port = htons(payloadBuffer->getShort()); - // reads CIDs - ChannelSearchManager* csm = _context->getChannelSearchManager(); - int16 count = payloadBuffer->getShort(); - for (int i = 0; i < count; i++) - { + // reads CIDs + ChannelSearchManager* csm = _context->getChannelSearchManager(); + int16 count = payloadBuffer->getShort(); + for (int i = 0; i < count; i++) + { transport->ensureData(4); pvAccessID cid = payloadBuffer->getInt(); csm->searchResponse(cid, searchSequenceId, version & 0x0F, &serverAddress); - } + } } @@ -1841,7 +1843,7 @@ namespace epics { TimeStamp timestamp; timestamp.getCurrent(); - AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer); + AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer); transport->ensureData((2*sizeof(int16)+2*sizeof(int32)+128)/sizeof(int8)); @@ -1851,20 +1853,20 @@ namespace epics { osiSockAddr serverAddress; serverAddress.ia.sin_family = AF_INET; - // 128-bit IPv6 address - /* - int8* byteAddress = new int8[16]; - for (int i = 0; i < 16; i++) - byteAddress[i] = payloadBuffer->getByte(); }; - */ - + // 128-bit IPv6 address + /* + int8* byteAddress = new int8[16]; + for (int i = 0; i < 16; i++) + byteAddress[i] = payloadBuffer->getByte(); }; + */ + // IPv4 compatible IPv6 address expected // first 80-bit are 0 if (payloadBuffer->getLong() != 0) return; if (payloadBuffer->getShort() != 0) return; if (payloadBuffer->getShort() != (int16)0xFFFF) return; - // accept given address if explicitly specified by sender + // accept given address if explicitly specified by sender serverAddress.ia.sin_addr.s_addr = htonl(payloadBuffer->getInt()); if (serverAddress.ia.sin_addr.s_addr == INADDR_ANY) serverAddress.ia.sin_addr = responseFrom->ia.sin_addr; @@ -1905,17 +1907,17 @@ namespace epics { Transport* transport, int8 version, int8 command, int payloadSize, epics::pvData::ByteBuffer* payloadBuffer) { - AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer); + AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer); transport->ensureData(8); - transport->setRemoteTransportReceiveBufferSize(payloadBuffer->getInt()); - transport->setRemoteTransportSocketReceiveBufferSize(payloadBuffer->getInt()); + transport->setRemoteTransportReceiveBufferSize(payloadBuffer->getInt()); + transport->setRemoteTransportSocketReceiveBufferSize(payloadBuffer->getInt()); - transport->setRemoteMinorRevision(version); - TransportSender* sender = dynamic_cast(transport); - if (sender) + transport->setRemoteMinorRevision(version); + TransportSender* sender = dynamic_cast(transport); + if (sender) transport->enqueueSendRequest(sender); - transport->verified(); + transport->verified(); } }; @@ -1934,18 +1936,18 @@ namespace epics { Transport* transport, int8 version, int8 command, int payloadSize, epics::pvData::ByteBuffer* payloadBuffer) { - AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer); + AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer); - transport->ensureData(5); + transport->ensureData(5); - DataResponse* nrr = dynamic_cast(_context->getResponseRequest(payloadBuffer->getInt())); - Requester* requester; - if (nrr && (requester = nrr->getRequester())) - { + DataResponse* nrr = dynamic_cast(_context->getResponseRequest(payloadBuffer->getInt())); + Requester* requester; + if (nrr && (requester = nrr->getRequester())) + { MessageType type = (MessageType)payloadBuffer->getByte(); String message = SerializeHelper::deserializeString(payloadBuffer, transport); requester->message(message, type); // TODO do we need to guard from exceptions - } + } } }; @@ -1964,17 +1966,17 @@ namespace epics { Transport* transport, int8 version, int8 command, int payloadSize, epics::pvData::ByteBuffer* payloadBuffer) { - AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer); + AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer); - transport->ensureData(8); - pvAccessID cid = payloadBuffer->getInt(); - pvAccessID sid = payloadBuffer->getInt(); - // TODO... do not destroy OK - Status* status = transport->getIntrospectionRegistry()->deserializeStatus(payloadBuffer, transport); - - ChannelImpl* channel = static_cast(_context->getChannel(cid)); - if (channel) - { + transport->ensureData(8); + pvAccessID cid = payloadBuffer->getInt(); + pvAccessID sid = payloadBuffer->getInt(); + // TODO... do not destroy OK + Status* status = transport->getIntrospectionRegistry()->deserializeStatus(payloadBuffer, transport); + + ChannelImpl* channel = static_cast(_context->getChannel(cid)); + if (channel) + { // failed check if (!status->isSuccess()) { channel->createChannelFailed(); @@ -1984,10 +1986,10 @@ namespace epics { //int16 acl = payloadBuffer->getShort(); channel->connectionCompleted(sid); - } - - // TODO not nice - if (status != g_statusOK) + } + + // TODO not nice + if (status != g_statusOK) delete status; } @@ -2004,16 +2006,16 @@ namespace epics { private: /** - * Table of response handlers for each command ID. - */ + * Table of response handlers for each command ID. + */ ResponseHandler** m_handlerTable; /* - * Context instance is part of the response handler now - */ + * Context instance is part of the response handler now + */ //ClientContextImpl* m_context; - public: + public: virtual ~ClientResponseHandler() { delete m_handlerTable[ 0]; @@ -2032,51 +2034,51 @@ namespace epics { } /** - * @param context - */ + * @param context + */ ClientResponseHandler(ClientContextImpl* context) { - ResponseHandler* badResponse = new BadResponse(context); - ResponseHandler* dataResponse = new DataResponseHandler(context); + ResponseHandler* badResponse = new BadResponse(context); + ResponseHandler* dataResponse = new DataResponseHandler(context); // TODO free!!! #define HANDLER_COUNT 28 - m_handlerTable = new ResponseHandler*[HANDLER_COUNT]; - m_handlerTable[ 0] = new BeaconResponseHandler(context), /* 0 */ - m_handlerTable[ 1] = new ClientConnectionValidationHandler(context), /* 1 */ - m_handlerTable[ 2] = new NoopResponse(context, "Echo"), /* 2 */ - m_handlerTable[ 3] = new NoopResponse(context, "Search"), /* 3 */ - m_handlerTable[ 4] = new SearchResponseHandler(context), /* 4 */ - m_handlerTable[ 5] = new NoopResponse(context, "Introspection search"), /* 5 */ - m_handlerTable[ 6] = dataResponse; /* 6 - introspection search */ - m_handlerTable[ 7] = new CreateChannelHandler(context), /* 7 */ - m_handlerTable[ 8] = new NoopResponse(context, "Destroy channel"), /* 8 */ // TODO it might be useful to implement this... - m_handlerTable[ 9] = badResponse; /* 9 */ - m_handlerTable[10] = dataResponse; /* 10 - get response */ - m_handlerTable[11] = dataResponse; /* 11 - put response */ - m_handlerTable[12] = dataResponse; /* 12 - put-get response */ - m_handlerTable[13] = dataResponse; /* 13 - monitor response */ - m_handlerTable[14] = dataResponse; /* 14 - array response */ - m_handlerTable[15] = badResponse; /* 15 - cancel request */ - m_handlerTable[16] = dataResponse; /* 16 - process response */ - m_handlerTable[17] = dataResponse; /* 17 - get field response */ - m_handlerTable[18] = new MessageHandler(context), /* 18 - message to Requester */ - m_handlerTable[19] = badResponse; // TODO new MultipleDataResponseHandler(context), /* 19 - grouped monitors */ - m_handlerTable[20] = dataResponse; /* 20 - RPC response */ - m_handlerTable[21] = badResponse; /* 21 */ - m_handlerTable[22] = badResponse; /* 22 */ - m_handlerTable[23] = badResponse; /* 23 */ - m_handlerTable[24] = badResponse; /* 24 */ - m_handlerTable[25] = badResponse; /* 25 */ - m_handlerTable[26] = badResponse; /* 26 */ - m_handlerTable[27] = badResponse; /* 27 */ + m_handlerTable = new ResponseHandler*[HANDLER_COUNT]; + m_handlerTable[ 0] = new BeaconResponseHandler(context), /* 0 */ + m_handlerTable[ 1] = new ClientConnectionValidationHandler(context), /* 1 */ + m_handlerTable[ 2] = new NoopResponse(context, "Echo"), /* 2 */ + m_handlerTable[ 3] = new NoopResponse(context, "Search"), /* 3 */ + m_handlerTable[ 4] = new SearchResponseHandler(context), /* 4 */ + m_handlerTable[ 5] = new NoopResponse(context, "Introspection search"), /* 5 */ + m_handlerTable[ 6] = dataResponse; /* 6 - introspection search */ + m_handlerTable[ 7] = new CreateChannelHandler(context), /* 7 */ + m_handlerTable[ 8] = new NoopResponse(context, "Destroy channel"), /* 8 */ // TODO it might be useful to implement this... + m_handlerTable[ 9] = badResponse; /* 9 */ + m_handlerTable[10] = dataResponse; /* 10 - get response */ + m_handlerTable[11] = dataResponse; /* 11 - put response */ + m_handlerTable[12] = dataResponse; /* 12 - put-get response */ + m_handlerTable[13] = dataResponse; /* 13 - monitor response */ + m_handlerTable[14] = dataResponse; /* 14 - array response */ + m_handlerTable[15] = badResponse; /* 15 - cancel request */ + m_handlerTable[16] = dataResponse; /* 16 - process response */ + m_handlerTable[17] = dataResponse; /* 17 - get field response */ + m_handlerTable[18] = new MessageHandler(context), /* 18 - message to Requester */ + m_handlerTable[19] = badResponse; // TODO new MultipleDataResponseHandler(context), /* 19 - grouped monitors */ + m_handlerTable[20] = dataResponse; /* 20 - RPC response */ + m_handlerTable[21] = badResponse; /* 21 */ + m_handlerTable[22] = badResponse; /* 22 */ + m_handlerTable[23] = badResponse; /* 23 */ + m_handlerTable[24] = badResponse; /* 24 */ + m_handlerTable[25] = badResponse; /* 25 */ + m_handlerTable[26] = badResponse; /* 26 */ + m_handlerTable[27] = badResponse; /* 27 */ } virtual void handleResponse(osiSockAddr* responseFrom, Transport* transport, int8 version, int8 command, int payloadSize, ByteBuffer* payloadBuffer) { - if (command < 0 || command >= HANDLER_COUNT) - { + if (command < 0 || command >= HANDLER_COUNT) + { // TODO context.getLogger().fine("Invalid (or unsupported) command: " + command + "."); std::cout << "Invalid (or unsupported) command: " << command << "." << std::endl; // TODO remove debug output @@ -2084,9 +2086,9 @@ namespace epics { sprintf(buf, "Invalid CA header %d its payload buffer", command); hexDump(buf, (const int8*)(payloadBuffer->getArray()), payloadBuffer->getPosition(), payloadSize); return; - } - // delegate - m_handlerTable[command]->handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer); + } + // delegate + m_handlerTable[command]->handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer); } }; @@ -2102,18 +2104,18 @@ namespace epics { */ enum ContextState { /** - * State value of non-initialized context. - */ + * State value of non-initialized context. + */ CONTEXT_NOT_INITIALIZED, /** - * State value of initialized context. - */ + * State value of initialized context. + */ CONTEXT_INITIALIZED, /** - * State value of destroyed context. - */ + * State value of destroyed context. + */ CONTEXT_DESTROYED }; @@ -2134,43 +2136,43 @@ namespace epics { private: /** - * Context. - */ + * Context. + */ ClientContextImpl* m_context; /** - * Client channel ID. - */ + * Client channel ID. + */ pvAccessID m_channelID; /** - * Channel name. - */ + * Channel name. + */ String m_name; /** - * Channel requester. - */ + * Channel requester. + */ ChannelRequester* m_requester; /** - * Process priority. - */ + * Process priority. + */ short m_priority; /** - * List of fixed addresses, if name resolution will be used. - */ + * List of fixed addresses, if name resolution will be used. + */ InetAddrVector* m_addresses; /** - * Connection status. - */ + * Connection status. + */ ConnectionState m_connectionState; /** - * List of all channel's pending requests (keys are subscription IDs). - */ + * List of all channel's pending requests (keys are subscription IDs). + */ IOIDResponseRequestMap m_responseRequests; /** @@ -2181,14 +2183,14 @@ namespace epics { bool m_needSubscriptionUpdate; /** - * Allow reconnection flag. - */ + * Allow reconnection flag. + */ bool m_allowCreation; /** - * Reference counting. - * NOTE: synced on m_channelMutex. - */ + * Reference counting. + * NOTE: synced on m_channelMutex. + */ int m_references; /* ****************** */ @@ -2196,13 +2198,13 @@ namespace epics { /* ****************** */ /** - * Server transport. - */ + * Server transport. + */ Transport* m_transport; /** - * Server channel ID. - */ + * Server channel ID. + */ pvAccessID m_serverChannelID; /** @@ -2224,19 +2226,19 @@ namespace epics { public: /** - * Constructor. - * @param context - * @param name - * @param listener - * @throws CAException - */ + * Constructor. + * @param context + * @param name + * @param listener + * @throws CAException + */ InternalChannelImpl( ClientContextImpl* context, pvAccessID channelID, String name, - ChannelRequester* requester, - short priority, - InetAddrVector* addresses) : + ChannelRequester* requester, + short priority, + InetAddrVector* addresses) : m_context(context), m_channelID(channelID), m_name(name), @@ -2288,7 +2290,7 @@ namespace epics { Lock guard(&m_channelMutex); if (m_connectionState != CONNECTED) { static String emptyString; - return emptyString; + return emptyString; } else { @@ -2323,9 +2325,9 @@ namespace epics { } /** - * Get client channel ID. - * @return client channel ID. - */ + * Get client channel ID. + * @return client channel ID. + */ pvAccessID getChannelID() { return m_channelID; } @@ -2363,47 +2365,47 @@ namespace epics { Lock guard(&m_channelMutex); // if not destroyed... if (m_connectionState == DESTROYED) - throw std::runtime_error("Channel destroyed."); + throw std::runtime_error("Channel destroyed."); else if (m_connectionState != CONNECTED) - initiateSearch(); + initiateSearch(); } void disconnect() { Lock guard(&m_channelMutex); // if not destroyed... if (m_connectionState == DESTROYED) - throw std::runtime_error("Channel destroyed."); + throw std::runtime_error("Channel destroyed."); else if (m_connectionState == CONNECTED) - disconnect(false, true); + disconnect(false, true); } /** - * Create a channel, i.e. submit create channel request to the server. - * This method is called after search is complete. - * @param transport - */ + * Create a channel, i.e. submit create channel request to the server. + * This method is called after search is complete. + * @param transport + */ void createChannel(Transport* transport) { Lock guard(&m_channelMutex); // do not allow duplicate creation to the same transport if (!m_allowCreation) - return; + return; m_allowCreation = false; // check existing transport if (m_transport && m_transport != transport) { - disconnectPendingIO(false); + disconnectPendingIO(false); - ReferenceCountingTransport* rct = dynamic_cast(m_transport); - if (rct) rct->release(this); + ReferenceCountingTransport* rct = dynamic_cast(m_transport); + if (rct) rct->release(this); } else if (m_transport == transport) { - // request to sent create request to same transport, ignore - // this happens when server is slower (processing search requests) than client generating it - return; + // request to sent create request to same transport, ignore + // this happens when server is slower (processing search requests) than client generating it + return; } m_transport = transport; @@ -2419,8 +2421,8 @@ namespace epics { } /** - * Create channel failed. - */ + * Create channel failed. + */ virtual void createChannelFailed() { Lock guard(&m_channelMutex); @@ -2431,10 +2433,10 @@ namespace epics { } /** - * Called when channel created succeeded on the server. - * sid might not be valid, this depends on protocol revision. - * @param sid - */ + * Called when channel created succeeded on the server. + * sid might not be valid, this depends on protocol revision. + * @param sid + */ virtual void connectionCompleted(pvAccessID sid/*, rights*/) { Lock guard(&m_channelMutex); @@ -2442,20 +2444,20 @@ namespace epics { bool allOK = false; try { - // do this silently - if (m_connectionState == DESTROYED) + // do this silently + if (m_connectionState == DESTROYED) return; - // store data - m_serverChannelID = sid; - //setAccessRights(rights); + // store data + m_serverChannelID = sid; + //setAccessRights(rights); - // user might create monitors in listeners, so this has to be done before this can happen - // however, it would not be nice if events would come before connection event is fired - // but this cannot happen since transport (TCP) is serving in this thread - resubscribeSubscriptions(); - setConnectionState(CONNECTED); - allOK = true; + // user might create monitors in listeners, so this has to be done before this can happen + // however, it would not be nice if events would come before connection event is fired + // but this cannot happen since transport (TCP) is serving in this thread + resubscribeSubscriptions(); + setConnectionState(CONNECTED); + allOK = true; } catch (...) { // noop @@ -2464,18 +2466,18 @@ namespace epics { if (!allOK) { - // end connection request - cancel(); + // end connection request + cancel(); } } /** - * @param force force destruction regardless of reference count - */ + * @param force force destruction regardless of reference count + */ void destroy(bool force) { Lock guard(&m_channelMutex); if (m_connectionState == DESTROYED) - throw std::runtime_error("Channel already destroyed."); + throw std::runtime_error("Channel already destroyed."); // do destruction via context m_context->destroyChannel(this, force); @@ -2483,29 +2485,29 @@ namespace epics { } /** - * Increment reference. - */ + * Increment reference. + */ void acquire() { Lock guard(&m_channelMutex); m_references++; } /** - * Actual destroy method, to be called CAJContext. - * @param force force destruction regardless of reference count - * @throws CAException - * @throws std::runtime_error - * @throws IOException - */ + * Actual destroy method, to be called CAJContext. + * @param force force destruction regardless of reference count + * @throws CAException + * @throws std::runtime_error + * @throws IOException + */ void destroyChannel(bool force) { Lock guard(&m_channelMutex); if (m_connectionState == DESTROYED) - throw std::runtime_error("Channel already destroyed."); + throw std::runtime_error("Channel already destroyed."); m_references--; if (m_references > 0 && !force) - return; + return; // stop searching... m_context->getChannelSearchManager()->unregisterChannel(this); @@ -2515,14 +2517,14 @@ namespace epics { if (m_connectionState == CONNECTED) { - disconnect(false, true); + disconnect(false, true); } else if (m_transport) { - // unresponsive state, do not forget to release transport - ReferenceCountingTransport* rct = dynamic_cast(m_transport); - if (rct) rct->release(this); - m_transport = 0; + // unresponsive state, do not forget to release transport + ReferenceCountingTransport* rct = dynamic_cast(m_transport); + if (rct) rct->release(this); + m_transport = 0; } setConnectionState(DESTROYED); @@ -2532,20 +2534,20 @@ namespace epics { } /** - * Disconnected notification. - * @param initiateSearch flag to indicate if searching (connect) procedure should be initiated - * @param remoteDestroy issue channel destroy request. - */ + * Disconnected notification. + * @param initiateSearch flag to indicate if searching (connect) procedure should be initiated + * @param remoteDestroy issue channel destroy request. + */ void disconnect(bool initiateSearch, bool remoteDestroy) { Lock guard(&m_channelMutex); if (m_connectionState != CONNECTED && !m_transport) - return; + return; if (!initiateSearch) { - // stop searching... - m_context->getChannelSearchManager()->unregisterChannel(this); - cancel(); + // stop searching... + m_context->getChannelSearchManager()->unregisterChannel(this); + cancel(); } setConnectionState(DISCONNECTED); @@ -2554,25 +2556,25 @@ namespace epics { // release transport if (m_transport) { - if (remoteDestroy) { + if (remoteDestroy) { m_issueCreateMessage = false; // TODO !!! this causes problems.. since qnqueueSendRequest is added and this instance deleted //m_transport->enqueueSendRequest(this); - } + } - ReferenceCountingTransport* rct = dynamic_cast(m_transport); - if (rct) rct->release(this); - m_transport = 0; + ReferenceCountingTransport* rct = dynamic_cast(m_transport); + if (rct) rct->release(this); + m_transport = 0; } if (initiateSearch) - this->initiateSearch(); + this->initiateSearch(); } /** - * Initiate search (connect) procedure. - */ + * Initiate search (connect) procedure. + */ void initiateSearch() { Lock guard(&m_channelMutex); @@ -2580,14 +2582,14 @@ namespace epics { m_allowCreation = true; if (!m_addresses) - m_context->getChannelSearchManager()->registerChannel(this); + m_context->getChannelSearchManager()->registerChannel(this); /* TODO - else - // TODO not only first - // TODO minor version - // TODO what to do if there is no channel, do not search in a loop!!! do this in other thread...! - searchResponse(CAConstants.CA_MINOR_PROTOCOL_REVISION, addresses[0]); - */ + else + // TODO not only first + // TODO minor version + // TODO what to do if there is no channel, do not search in a loop!!! do this in other thread...! + searchResponse(CAConstants.CA_MINOR_PROTOCOL_REVISION, addresses[0]); + */ } virtual void searchResponse(int8 minorRevision, osiSockAddr* serverAddress) { @@ -2595,20 +2597,20 @@ namespace epics { Transport* transport = m_transport; if (transport) { - // multiple defined PV or reconnect request (same server address) - if (sockAddrAreIdentical(transport->getRemoteAddress(), serverAddress)) - { + // multiple defined PV or reconnect request (same server address) + if (sockAddrAreIdentical(transport->getRemoteAddress(), serverAddress)) + { EXCEPTION_GUARD(m_requester->message("More than one channel with name '" + m_name + "' detected, additional response from: " + inetAddressToString(*serverAddress), warningMessage)); return; - } + } } transport = m_context->getTransport(this, serverAddress, minorRevision, m_priority); if (!transport) { - createChannelFailed(); - return; + createChannelFailed(); + return; } // create channel @@ -2628,10 +2630,10 @@ namespace epics { Lock guard(&m_channelMutex); // TODO C-fy if (m_connectionState == DESTROYED) - throw std::runtime_error("Channel destroyed."); + throw std::runtime_error("Channel destroyed."); else if (m_connectionState != CONNECTED) - throw std::runtime_error("Channel not connected."); - return m_transport; // TODO transport can be 0 !!!!!!!!!! + throw std::runtime_error("Channel not connected."); + return m_transport; // TODO transport can be 0 !!!!!!!!!! } virtual Transport* getTransport() @@ -2644,10 +2646,10 @@ namespace epics { Lock guard(&m_channelMutex); if (m_connectionState == DISCONNECTED) { - updateSubscriptions(); + updateSubscriptions(); - // reconnect using existing IDs, data - connectionCompleted(m_serverChannelID/*, accessRights*/); + // reconnect using existing IDs, data + connectionCompleted(m_serverChannelID/*, accessRights*/); } } @@ -2655,31 +2657,31 @@ namespace epics { Lock guard(&m_channelMutex); if (m_connectionState == CONNECTED) { - // NOTE: 2 types of disconnected state - distinguish them - setConnectionState(DISCONNECTED); + // NOTE: 2 types of disconnected state - distinguish them + setConnectionState(DISCONNECTED); - // ... CA notifies also w/ no access rights callback, although access right are not changed + // ... CA notifies also w/ no access rights callback, although access right are not changed } } /** - * Set connection state and if changed, notifies listeners. - * @param newState state to set. - */ + * Set connection state and if changed, notifies listeners. + * @param newState state to set. + */ void setConnectionState(ConnectionState connectionState) { Lock guard(&m_channelMutex); if (m_connectionState != connectionState) { - m_connectionState = connectionState; + m_connectionState = connectionState; - //bool connectionStatusToReport = (connectionState == CONNECTED); - //if (connectionStatusToReport != lastReportedConnectionState) - { + //bool connectionStatusToReport = (connectionState == CONNECTED); + //if (connectionStatusToReport != lastReportedConnectionState) + { //lastReportedConnectionState = connectionStatusToReport; // TODO via dispatcher ?!!! EXCEPTION_GUARD(m_requester->channelStateChange(this, connectionState)); - } + } } } @@ -2695,30 +2697,30 @@ namespace epics { if (issueCreateMessage) { - control->startMessage((int8)7, 2+4); + control->startMessage((int8)7, 2+4); - // count - buffer->putShort((int16)1); - // array of CIDs and names - buffer->putInt(m_channelID); - SerializeHelper::serializeString(m_name, buffer, control); - // send immediately - // TODO - control->flush(true); + // count + buffer->putShort((int16)1); + // array of CIDs and names + buffer->putInt(m_channelID); + SerializeHelper::serializeString(m_name, buffer, control); + // send immediately + // TODO + control->flush(true); } else { - control->startMessage((int8)8, 4+4); - // SID + control->startMessage((int8)8, 4+4); + // SID m_channelMutex.lock(); pvAccessID sid = m_serverChannelID; m_channelMutex.unlock(); - buffer->putInt(sid); - // CID - buffer->putInt(m_channelID); - // send immediately - // TODO - control->flush(true); + buffer->putInt(sid); + // CID + buffer->putInt(m_channelID); + // send immediately + // TODO + control->flush(true); } } @@ -2728,9 +2730,9 @@ namespace epics { /** - * Disconnects (destroys) all channels pending IO. - * @param destroy true if channel is being destroyed. - */ + * Disconnects (destroys) all channels pending IO. + * @param destroy true if channel is being destroyed. + */ void disconnectPendingIO(bool destroy) { // TODO destroy????!! @@ -2749,8 +2751,8 @@ namespace epics { } /** - * Resubscribe subscriptions. - */ + * Resubscribe subscriptions. + */ // TODO to be called from non-transport thread !!!!!! void resubscribeSubscriptions() { @@ -2769,17 +2771,17 @@ namespace epics { } /** - * Update subscriptions. - */ + * Update subscriptions. + */ // TODO to be called from non-transport thread !!!!!! void updateSubscriptions() { Lock guard(&m_responseRequestsMutex); if (m_needSubscriptionUpdate) - m_needSubscriptionUpdate = false; + m_needSubscriptionUpdate = false; else - return; // noop + return; // noop for (IOIDResponseRequestMap::iterator iter = m_responseRequests.begin(); iter != m_responseRequests.end(); @@ -2862,7 +2864,7 @@ namespace epics { if (m_connectionState == CONNECTED) { out->append("\nADDRESS : "); out->append(getRemoteAddress()); - //out->append("\nRIGHTS : "); out->append(getAccessRights()); + //out->append("\nRIGHTS : "); out->append(getAccessRights()); } out->append("\n"); } @@ -2926,7 +2928,7 @@ namespace epics { m_context->checkChannelName(channelName); if (!channelFindRequester) - throw std::runtime_error("0 requester"); + throw std::runtime_error("0 requester"); std::auto_ptr errorStatus(getStatusCreate()->createStatus(STATUSTYPE_ERROR, "not implemented", 0)); channelFindRequester->channelFindResult(errorStatus.get(), 0, false); @@ -2979,11 +2981,11 @@ namespace epics { virtual Configuration* getConfiguration() { /* TODO - final ConfigurationProvider configurationProvider = ConfigurationFactory.getProvider(); - Configuration config = configurationProvider.getConfiguration("pvAccess-client"); - if (config == 0) - config = configurationProvider.getConfiguration("system"); - return config; + final ConfigurationProvider configurationProvider = ConfigurationFactory.getProvider(); + Configuration config = configurationProvider.getConfiguration("pvAccess-client"); + if (config == 0) + config = configurationProvider.getConfiguration("system"); + return config; */ return m_configuration; } @@ -3018,14 +3020,14 @@ TODO virtual void initialize() { Lock lock(&m_contextMutex); - if (m_contextState == CONTEXT_DESTROYED) + if (m_contextState == CONTEXT_DESTROYED) throw std::runtime_error("Context destroyed."); - else if (m_contextState == CONTEXT_INITIALIZED) + else if (m_contextState == CONTEXT_INITIALIZED) throw std::runtime_error("Context already initialized."); - internalInitialize(); + internalInitialize(); - m_contextState = CONTEXT_INITIALIZED; + m_contextState = CONTEXT_INITIALIZED; } virtual void printInfo() { @@ -3039,17 +3041,17 @@ TODO std::ostringstream ostr; static String emptyString; - out->append( "CLASS : ::epics::pvAccess::ClientContextImpl"); - out->append("\nVERSION : "); out->append(m_version->getVersionString()); - out->append("\nADDR_LIST : "); ostr << m_addressList; out->append(ostr.str()); ostr.str(emptyString); - out->append("\nAUTO_ADDR_LIST : "); out->append(m_autoAddressList ? "true" : "false"); - out->append("\nCONNECTION_TIMEOUT : "); ostr << m_connectionTimeout; out->append(ostr.str()); ostr.str(emptyString); - out->append("\nBEACON_PERIOD : "); ostr << m_beaconPeriod; out->append(ostr.str()); ostr.str(emptyString); - out->append("\nBROADCAST_PORT : "); ostr << m_broadcastPort; out->append(ostr.str()); ostr.str(emptyString); - out->append("\nRCV_BUFFER_SIZE : "); ostr << m_receiveBufferSize; out->append(ostr.str()); ostr.str(emptyString); - out->append("\nSTATE : "); - switch (m_contextState) - { + out->append( "CLASS : ::epics::pvAccess::ClientContextImpl"); + out->append("\nVERSION : "); out->append(m_version->getVersionString()); + out->append("\nADDR_LIST : "); ostr << m_addressList; out->append(ostr.str()); ostr.str(emptyString); + out->append("\nAUTO_ADDR_LIST : "); out->append(m_autoAddressList ? "true" : "false"); + out->append("\nCONNECTION_TIMEOUT : "); ostr << m_connectionTimeout; out->append(ostr.str()); ostr.str(emptyString); + out->append("\nBEACON_PERIOD : "); ostr << m_beaconPeriod; out->append(ostr.str()); ostr.str(emptyString); + out->append("\nBROADCAST_PORT : "); ostr << m_broadcastPort; out->append(ostr.str()); ostr.str(emptyString); + out->append("\nRCV_BUFFER_SIZE : "); ostr << m_receiveBufferSize; out->append(ostr.str()); ostr.str(emptyString); + out->append("\nSTATE : "); + switch (m_contextState) + { case CONTEXT_NOT_INITIALIZED: out->append("CONTEXT_NOT_INITIALIZED"); break; @@ -3061,24 +3063,24 @@ TODO break; default: out->append("UNKNOWN"); - } - out->append("\n"); + } + out->append("\n"); } virtual void destroy() { m_contextMutex.lock(); - if (m_contextState == CONTEXT_DESTROYED) - { - m_contextMutex.unlock(); + if (m_contextState == CONTEXT_DESTROYED) + { + m_contextMutex.unlock(); throw std::runtime_error("Context already destroyed."); - } + } - // go into destroyed state ASAP - m_contextState = CONTEXT_DESTROYED; + // go into destroyed state ASAP + m_contextState = CONTEXT_DESTROYED; - internalDestroy(); + internalDestroy(); } virtual void dispose() @@ -3090,43 +3092,43 @@ TODO ~InternalClientContextImpl() {}; void loadConfiguration() { - m_addressList = m_configuration->getPropertyAsString("EPICS4_CA_ADDR_LIST", m_addressList); - m_autoAddressList = m_configuration->getPropertyAsBoolean("EPICS4_CA_AUTO_ADDR_LIST", m_autoAddressList); - m_connectionTimeout = m_configuration->getPropertyAsFloat("EPICS4_CA_CONN_TMO", m_connectionTimeout); - m_beaconPeriod = m_configuration->getPropertyAsFloat("EPICS4_CA_BEACON_PERIOD", m_beaconPeriod); - m_broadcastPort = m_configuration->getPropertyAsInteger("EPICS4_CA_BROADCAST_PORT", m_broadcastPort); - m_receiveBufferSize = m_configuration->getPropertyAsInteger("EPICS4_CA_MAX_ARRAY_BYTES", m_receiveBufferSize); + m_addressList = m_configuration->getPropertyAsString("EPICS4_CA_ADDR_LIST", m_addressList); + m_autoAddressList = m_configuration->getPropertyAsBoolean("EPICS4_CA_AUTO_ADDR_LIST", m_autoAddressList); + m_connectionTimeout = m_configuration->getPropertyAsFloat("EPICS4_CA_CONN_TMO", m_connectionTimeout); + m_beaconPeriod = m_configuration->getPropertyAsFloat("EPICS4_CA_BEACON_PERIOD", m_beaconPeriod); + m_broadcastPort = m_configuration->getPropertyAsInteger("EPICS4_CA_BROADCAST_PORT", m_broadcastPort); + m_receiveBufferSize = m_configuration->getPropertyAsInteger("EPICS4_CA_MAX_ARRAY_BYTES", m_receiveBufferSize); } void internalInitialize() { - m_timer = new Timer("pvAccess-client timer", lowPriority); - m_connector = new BlockingTCPConnector(this, m_receiveBufferSize, m_beaconPeriod); - m_transportRegistry = new TransportRegistry(); - m_namedLocker = new NamedLockPattern(); + m_timer = new Timer("pvAccess-client timer", lowPriority); + m_connector = new BlockingTCPConnector(this, m_receiveBufferSize, m_beaconPeriod); + m_transportRegistry = new TransportRegistry(); + m_namedLocker = new NamedLockPattern(); - // setup UDP transport - initializeUDPTransport(); - // TODO what if initialization failed!!! + // setup UDP transport + initializeUDPTransport(); + // TODO what if initialization failed!!! - // setup search manager - m_channelSearchManager = new ChannelSearchManager(this); + // setup search manager + m_channelSearchManager = new ChannelSearchManager(this); } /** - * Initialized UDP transport (broadcast socket and repeater connection). - */ + * Initialized UDP transport (broadcast socket and repeater connection). + */ bool initializeUDPTransport() { - // quary broadcast addresses of all IFs + // quary broadcast addresses of all IFs SOCKET socket = epicsSocketCreate(AF_INET, SOCK_DGRAM, 0); if (socket == INVALID_SOCKET) return false; auto_ptr broadcastAddresses(getBroadcastAddresses(socket, m_broadcastPort)); epicsSocketDestroy (socket); - // set broadcast address list - if (!m_addressList.empty()) - { + // set broadcast address list + if (!m_addressList.empty()) + { // if auto is true, add it to specified list InetAddrVector* appendList = 0; if (m_autoAddressList) @@ -3137,76 +3139,76 @@ TODO // delete old list and take ownership of a new one broadcastAddresses = list; } - } + } - // where to bind (listen) address + // where to bind (listen) address osiSockAddr listenLocalAddress; listenLocalAddress.ia.sin_family = AF_INET; listenLocalAddress.ia.sin_port = htons(m_broadcastPort); listenLocalAddress.ia.sin_addr.s_addr = htonl(INADDR_ANY); - auto_ptr broadcastConnector(new BlockingUDPConnector(true, true)); - m_broadcastTransport = (BlockingUDPTransport*)broadcastConnector->connect( + auto_ptr broadcastConnector(new BlockingUDPConnector(true, true)); + m_broadcastTransport = (BlockingUDPTransport*)broadcastConnector->connect( 0, new ClientResponseHandler(this), listenLocalAddress, CA_MINOR_PROTOCOL_REVISION, CA_DEFAULT_PRIORITY); - if (!m_broadcastTransport) + if (!m_broadcastTransport) return false; - m_broadcastTransport->setBroadcastAddresses(broadcastAddresses.get()); + m_broadcastTransport->setBroadcastAddresses(broadcastAddresses.get()); - // undefined address + // undefined address osiSockAddr undefinedAddress; undefinedAddress.ia.sin_family = AF_INET; undefinedAddress.ia.sin_port = htons(0); undefinedAddress.ia.sin_addr.s_addr = htonl(INADDR_ANY); - auto_ptr searchConnector(new BlockingUDPConnector(false, true)); - m_searchTransport = (BlockingUDPTransport*)searchConnector->connect( + auto_ptr searchConnector(new BlockingUDPConnector(false, true)); + m_searchTransport = (BlockingUDPTransport*)searchConnector->connect( 0, new ClientResponseHandler(this), undefinedAddress, CA_MINOR_PROTOCOL_REVISION, CA_DEFAULT_PRIORITY); - if (!m_searchTransport) + if (!m_searchTransport) return false; - m_searchTransport->setBroadcastAddresses(broadcastAddresses.get()); - + m_searchTransport->setBroadcastAddresses(broadcastAddresses.get()); + // become active - m_broadcastTransport->start(); - m_searchTransport->start(); + m_broadcastTransport->start(); + m_searchTransport->start(); return true; } void internalDestroy() { - // stop searching - if (m_channelSearchManager) + // stop searching + if (m_channelSearchManager) delete m_channelSearchManager; //->destroy(); - // stop timer - if (m_timer) + // stop timer + if (m_timer) delete m_timer; - // - // cleanup - // + // + // cleanup + // - // this will also close all CA transports - destroyAllChannels(); + // this will also close all CA transports + destroyAllChannels(); - // TODO destroy !!! - if (m_broadcastTransport) + // TODO destroy !!! + if (m_broadcastTransport) delete m_broadcastTransport; //->destroy(true); - if (m_searchTransport) + if (m_searchTransport) delete m_searchTransport; //->destroy(true); - if (m_namedLocker) delete m_namedLocker; - if (m_transportRegistry) delete m_transportRegistry; - if (m_connector) delete m_connector; - if (m_configuration) delete m_configuration; + if (m_namedLocker) delete m_namedLocker; + if (m_transportRegistry) delete m_transportRegistry; + if (m_connector) delete m_connector; + if (m_configuration) delete m_configuration; m_provider->destroy(); delete m_version; - m_contextMutex.unlock(); + m_contextMutex.unlock(); delete this; } @@ -3215,31 +3217,31 @@ TODO } /** - * Check channel name. - */ + * Check channel name. + */ void checkChannelName(String& name) { - if (name.empty()) + if (name.empty()) throw std::runtime_error("0 or empty channel name"); - else if (name.length() > UNREASONABLE_CHANNEL_NAME_LENGTH) + else if (name.length() > UNREASONABLE_CHANNEL_NAME_LENGTH) throw std::runtime_error("name too long"); } /** - * Check context state and tries to establish necessary state. - */ + * Check context state and tries to establish necessary state. + */ void checkState() { Lock lock(&m_contextMutex); // TODO check double-lock?!!! - if (m_contextState == CONTEXT_DESTROYED) + if (m_contextState == CONTEXT_DESTROYED) throw std::runtime_error("Context destroyed."); - else if (m_contextState == CONTEXT_NOT_INITIALIZED) + else if (m_contextState == CONTEXT_NOT_INITIALIZED) initialize(); } /** - * Register channel. - * @param channel - */ + * Register channel. + * @param channel + */ void registerChannel(ChannelImpl* channel) { Lock guard(&m_cidMapMutex); @@ -3247,9 +3249,9 @@ TODO } /** - * Unregister channel. - * @param channel - */ + * Unregister channel. + * @param channel + */ void unregisterChannel(ChannelImpl* channel) { Lock guard(&m_cidMapMutex); @@ -3257,10 +3259,10 @@ TODO } /** - * Searches for a channel with given channel ID. - * @param channelID CID. - * @return channel with given CID, 0 if non-existent. - */ + * Searches for a channel with given channel ID. + * @param channelID CID. + * @return channel with given CID, 0 if non-existent. + */ ChannelImpl* getChannel(pvAccessID channelID) { Lock guard(&m_cidMapMutex); @@ -3269,9 +3271,9 @@ TODO } /** - * Generate Client channel ID (CID). - * @return Client channel ID (CID). - */ + * Generate Client channel ID (CID). + * @return Client channel ID (CID). + */ pvAccessID generateCID() { Lock guard(&m_cidMapMutex); @@ -3284,8 +3286,8 @@ TODO } /** - * Free generated channel ID (CID). - */ + * Free generated channel ID (CID). + */ void freeCID(int cid) { Lock guard(&m_cidMapMutex); @@ -3294,10 +3296,10 @@ TODO /** - * Searches for a response request with given channel IOID. - * @param ioid I/O ID. - * @return request response with given I/O ID. - */ + * Searches for a response request with given channel IOID. + * @param ioid I/O ID. + * @return request response with given I/O ID. + */ ResponseRequest* getResponseRequest(pvAccessID ioid) { Lock guard(&m_ioidMapMutex); @@ -3306,10 +3308,10 @@ TODO } /** - * Register response request. - * @param request request to register. - * @return request ID (IOID). - */ + * Register response request. + * @param request request to register. + * @return request ID (IOID). + */ pvAccessID registerResponseRequest(ResponseRequest* request) { Lock guard(&m_ioidMapMutex); @@ -3319,10 +3321,10 @@ TODO } /** - * Unregister response request. - * @param request - * @return removed object, can be 0 - */ + * Unregister response request. + * @param request + * @return removed object, can be 0 + */ ResponseRequest* unregisterResponseRequest(ResponseRequest* request) { Lock guard(&m_ioidMapMutex); @@ -3336,9 +3338,9 @@ TODO } /** - * Generate IOID. - * @return IOID. - */ + * Generate IOID. + * @return IOID. + */ pvAccessID generateIOID() { Lock guard(&m_ioidMapMutex); @@ -3352,19 +3354,19 @@ TODO } /** - * Called each time beacon anomaly is detected. - */ + * Called each time beacon anomaly is detected. + */ void beaconAnomalyNotify() { - if (m_channelSearchManager) + if (m_channelSearchManager) m_channelSearchManager->beaconAnomalyNotify(); } /** - * Get (and if necessary create) beacon handler. - * @param responseFrom remote source address of received beacon. - * @return beacon handler for particular server. - */ + * Get (and if necessary create) beacon handler. + * @param responseFrom remote source address of received beacon. + * @return beacon handler for particular server. + */ BeaconHandler* getBeaconHandler(osiSockAddr* responseFrom) { // TODO delete handlers @@ -3382,45 +3384,45 @@ TODO } /** - * Get, or create if necessary, transport of given server address. - * @param serverAddress required transport address - * @param priority process priority. - * @return transport for given address - */ + * Get, or create if necessary, transport of given server address. + * @param serverAddress required transport address + * @param priority process priority. + * @return transport for given address + */ Transport* getTransport(TransportClient* client, osiSockAddr* serverAddress, int16 minorRevision, int16 priority) { - try - { + try + { return m_connector->connect(client, new ClientResponseHandler(this), *serverAddress, minorRevision, priority); - } - catch (...) - { + } + catch (...) + { // TODO log //printf("failed to get transport\n"); return 0; - } + } } /** - * Internal create channel. - */ + * Internal create channel. + */ // TODO no minor version with the addresses // TODO what if there is an channel with the same name, but on different host! ChannelImpl* createChannelInternal(String name, ChannelRequester* requester, short priority, InetAddrVector* addresses) { // TODO addresses - checkState(); - checkChannelName(name); + checkState(); + checkChannelName(name); - if (requester == 0) + if (requester == 0) throw std::runtime_error("0 requester"); - if (priority < ChannelProvider::PRIORITY_MIN || priority > ChannelProvider::PRIORITY_MAX) + if (priority < ChannelProvider::PRIORITY_MIN || priority > ChannelProvider::PRIORITY_MAX) throw std::range_error("priority out of bounds"); - bool lockAcquired = true; // TODO namedLocker->acquireSynchronizationObject(name, LOCK_TIMEOUT); - if (lockAcquired) - { + bool lockAcquired = true; // TODO namedLocker->acquireSynchronizationObject(name, LOCK_TIMEOUT); + if (lockAcquired) + { try { pvAccessID cid = generateCID(); @@ -3431,27 +3433,27 @@ TODO return 0; } // TODO namedLocker.releaseSynchronizationObject(name); - } - else - { + } + else + { // TODO is this OK? throw std::runtime_error("Failed to obtain synchronization lock for '" + name + "', possible deadlock."); - } + } } /** - * Destroy channel. - * @param channel - * @param force - * @throws CAException - * @throws std::runtime_error - */ + * Destroy channel. + * @param channel + * @param force + * @throws CAException + * @throws std::runtime_error + */ void destroyChannel(ChannelImpl* channel, bool force) { - String name = channel->getChannelName(); - bool lockAcquired = true; //namedLocker->acquireSynchronizationObject(name, LOCK_TIMEOUT); - if (lockAcquired) - { + String name = channel->getChannelName(); + bool lockAcquired = true; //namedLocker->acquireSynchronizationObject(name, LOCK_TIMEOUT); + if (lockAcquired) + { try { channel->destroyChannel(force); @@ -3459,96 +3461,96 @@ TODO catch(...) { // TODO } - // TODO namedLocker->releaseSynchronizationObject(channel.getChannelName()); - } - else - { + // TODO namedLocker->releaseSynchronizationObject(channel.getChannelName()); + } + else + { // TODO is this OK? throw std::runtime_error("Failed to obtain synchronization lock for '" + name + "', possible deadlock."); - } + } } /** - * Get channel search manager. - * @return channel search manager. - */ + * Get channel search manager. + * @return channel search manager. + */ ChannelSearchManager* getChannelSearchManager() { - return m_channelSearchManager; + return m_channelSearchManager; } /** - * A space-separated list of broadcast address for process variable name resolution. - * Each address must be of the form: ip.number:port or host.name:port - */ + * A space-separated list of broadcast address for process variable name resolution. + * Each address must be of the form: ip.number:port or host.name:port + */ String m_addressList; /** - * Define whether or not the network interfaces should be discovered at runtime. - */ + * Define whether or not the network interfaces should be discovered at runtime. + */ bool m_autoAddressList; /** - * If the context doesn't see a beacon from a server that it is connected to for - * connectionTimeout seconds then a state-of-health message is sent to the server over TCP/IP. - * If this state-of-health message isn't promptly replied to then the context will assume that - * the server is no longer present on the network and disconnect. - */ + * If the context doesn't see a beacon from a server that it is connected to for + * connectionTimeout seconds then a state-of-health message is sent to the server over TCP/IP. + * If this state-of-health message isn't promptly replied to then the context will assume that + * the server is no longer present on the network and disconnect. + */ float m_connectionTimeout; /** - * Period in second between two beacon signals. - */ + * Period in second between two beacon signals. + */ float m_beaconPeriod; /** - * Broadcast (beacon, search) port number to listen to. - */ + * Broadcast (beacon, search) port number to listen to. + */ int m_broadcastPort; /** - * Receive buffer size (max size of payload). - */ + * Receive buffer size (max size of payload). + */ int m_receiveBufferSize; /** - * Timer. - */ + * Timer. + */ Timer* m_timer; /** - * Broadcast transport needed to listen for broadcasts. - */ + * Broadcast transport needed to listen for broadcasts. + */ BlockingUDPTransport* m_broadcastTransport; /** - * UDP transport needed for channel searches. - */ + * UDP transport needed for channel searches. + */ BlockingUDPTransport* m_searchTransport; /** - * CA connector (creates CA virtual circuit). - */ + * CA connector (creates CA virtual circuit). + */ BlockingTCPConnector* m_connector; /** - * CA transport (virtual circuit) registry. - * This registry contains all active transports - connections to CA servers. - */ + * CA transport (virtual circuit) registry. + * This registry contains all active transports - connections to CA servers. + */ TransportRegistry* m_transportRegistry; /** - * Context instance. - */ + * Context instance. + */ NamedLockPattern* m_namedLocker; /** - * Context instance. - */ - static const int LOCK_TIMEOUT = 20 * 1000; // 20s + * Context instance. + */ + static const int LOCK_TIMEOUT = 20 * 1000; // 20s /** - * Map of channels (keys are CIDs). - */ + * Map of channels (keys are CIDs). + */ // TODO consider std::unordered_map typedef std::map CIDChannelMap; CIDChannelMap m_channelsByCID; @@ -3559,13 +3561,13 @@ TODO Mutex m_cidMapMutex; /** - * Last CID cache. - */ + * Last CID cache. + */ pvAccessID m_lastCID; /** - * Map of pending response requests (keys are IOID). - */ + * Map of pending response requests (keys are IOID). + */ // TODO consider std::unordered_map typedef std::map IOIDResponseRequestMap; IOIDResponseRequestMap m_pendingResponseRequests; @@ -3576,19 +3578,19 @@ TODO Mutex m_ioidMapMutex; /** - * Last IOID cache. - */ + * Last IOID cache. + */ pvAccessID m_lastIOID; /** - * Channel search manager. - * Manages UDP search requests. - */ + * Channel search manager. + * Manages UDP search requests. + */ ChannelSearchManager* m_channelSearchManager; /** - * Beacon handler map. - */ + * Beacon handler map. + */ // TODO consider std::unordered_map typedef std::map AddressBeaconHandlerMap; AddressBeaconHandlerMap m_beaconHandlers; @@ -3599,13 +3601,13 @@ TODO Mutex m_beaconMapMutex; /** - * Version. - */ + * Version. + */ Version* m_version; /** - * Provider implementation. - */ + * Provider implementation. + */ ChannelProviderImpl* m_provider; /** diff --git a/pvAccessApp/remoteClient/clientContextImpl.h b/pvAccessApp/remoteClient/clientContextImpl.h index 207aaa2..07004f2 100644 --- a/pvAccessApp/remoteClient/clientContextImpl.h +++ b/pvAccessApp/remoteClient/clientContextImpl.h @@ -19,7 +19,7 @@ namespace epics { class ClientContextImpl; class ChannelImpl : - public Channel , + public Channel, public TransportClient, public TransportSender, public BaseSearchInstance