exception safe calls

This commit is contained in:
Matej Sekoranja
2011-01-28 23:39:10 +01:00
parent ec3a7b3ed6
commit f42834179c

View File

@@ -11,6 +11,7 @@
#include <standardPVField.h>
#include <memory>
#include <stdexcept>
#include <caConstants.h>
#include <timer.h>
#include <blockingUDP.h>
@@ -43,6 +44,15 @@ static Status* g_statusOK = getStatusCreate()->getStatusOK();
Status* ChannelImpl::channelDestroyed = statusCreate->createStatus(STATUSTYPE_WARNING, "channel destroyed");
Status* ChannelImpl::channelDisconnected = statusCreate->createStatus(STATUSTYPE_WARNING, "channel disconnected");;
// TODO consider std::unordered_map
typedef std::map<pvAccessID, ResponseRequest*> IOIDResponseRequestMap;
#define EXCEPTION_GUARD(code) try { code; } \
catch (std::exception &e) { errlogSevPrintf(errlogMajor, "Unhandled exception caught from client code at %s:%d: %s", __FILE__, __LINE__, e.what()); } \
catch (...) { errlogSevPrintf(errlogMajor, "Unhandled exception caught from client code at %s:%d.", __FILE__, __LINE__); }
/**
* Base channel request.
* @author <a href="mailto:matej.sekoranjaATcosylab.com">Matej Sekoranja</a>
@@ -248,13 +258,11 @@ class ChannelProcessRequestImpl : public BaseRequestImpl, public ChannelProcess
// TODO best-effort support
// subscribe
// try {
try {
resubscribeSubscription(channel->checkAndGetTransport());
/* } catch (IllegalStateException ise) {
callback.channelProcessConnect(channelNotConnected, 0);
} catch (CAException e) {
callback.channelProcessConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", e), 0);
}*/
} catch (std::runtime_error &rte) {
EXCEPTION_GUARD(m_callback->channelProcessConnect(channelNotConnected, 0));
}
}
virtual void send(ByteBuffer* buffer, TransportSendControl* control) {
@@ -280,17 +288,17 @@ class ChannelProcessRequestImpl : public BaseRequestImpl, public ChannelProcess
}
virtual bool destroyResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) {
m_callback->processDone(status);
EXCEPTION_GUARD(m_callback->processDone(status));
return true;
}
virtual bool initResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) {
m_callback->channelProcessConnect(status, this);
EXCEPTION_GUARD(m_callback->channelProcessConnect(status, this));
return true;
}
virtual bool normalResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) {
m_callback->processDone(status);
EXCEPTION_GUARD(m_callback->processDone(status));
return true;
}
@@ -298,20 +306,20 @@ class ChannelProcessRequestImpl : public BaseRequestImpl, public ChannelProcess
{
// TODO sync
if (m_destroyed) {
m_callback->processDone(destroyedStatus);
EXCEPTION_GUARD(m_callback->processDone(destroyedStatus));
return;
}
if (!startRequest(lastRequest ? QOS_DESTROY : QOS_DEFAULT)) {
m_callback->processDone(otherRequestPendingStatus);
EXCEPTION_GUARD(m_callback->processDone(otherRequestPendingStatus));
return;
}
//try {
try {
m_channel->checkAndGetTransport()->enqueueSendRequest(this);
//} catch (IllegalStateException ise) {
// m_callback->processDone(channelNotConnected);
//}
} catch (std::runtime_error &rte) {
EXCEPTION_GUARD(m_callback->processDone(channelNotConnected));
}
}
virtual void resubscribeSubscription(Transport* transport) {
@@ -365,14 +373,11 @@ class ChannelGetImpl : public BaseRequestImpl, public ChannelGet
// TODO one-time get, i.e. immediate get + lastRequest
// subscribe
// try {
try {
resubscribeSubscription(m_channel->checkAndGetTransport());
// } catch (IllegalStateException ise) {
// TODO m_channelGetRequester->channelGetConnect(channelNotConnected, 0, 0, 0);
// } catch (CAException caex) {
// TODO m_channelGetRequester->channelGetConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", caex), 0, 0, 0);
// }
} catch (std::runtime_error &rte) {
EXCEPTION_GUARD(m_channelGetRequester->channelGetConnect(channelNotConnected, 0, 0, 0));
}
}
virtual void send(ByteBuffer* buffer, TransportSendControl* control) {
@@ -407,7 +412,7 @@ class ChannelGetImpl : public BaseRequestImpl, public ChannelGet
virtual bool initResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) {
if (!status->isSuccess())
{
m_channelGetRequester->channelGetConnect(status, this, 0, 0);
EXCEPTION_GUARD(m_channelGetRequester->channelGetConnect(status, this, 0, 0));
return true;
}
@@ -416,14 +421,14 @@ class ChannelGetImpl : public BaseRequestImpl, public ChannelGet
m_bitSet = new BitSet(m_data->getNumberFields());
// notify
m_channelGetRequester->channelGetConnect(okStatus, this, m_data, m_bitSet);
EXCEPTION_GUARD(m_channelGetRequester->channelGetConnect(okStatus, this, m_data, m_bitSet));
return true;
}
virtual bool normalResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) {
if (!status->isSuccess())
{
m_channelGetRequester->getDone(status);
EXCEPTION_GUARD(m_channelGetRequester->getDone(status));
return true;
}
@@ -431,7 +436,7 @@ class ChannelGetImpl : public BaseRequestImpl, public ChannelGet
m_bitSet->deserialize(payloadBuffer, transport);
m_data->deserialize(payloadBuffer, transport, m_bitSet);
m_channelGetRequester->getDone(okStatus);
EXCEPTION_GUARD(m_channelGetRequester->getDone(okStatus));
return true;
}
@@ -439,20 +444,20 @@ class ChannelGetImpl : public BaseRequestImpl, public ChannelGet
// TODO sync?
if (m_destroyed) {
m_channelGetRequester->getDone(destroyedStatus);
EXCEPTION_GUARD(m_channelGetRequester->getDone(destroyedStatus));
return;
}
if (!startRequest(lastRequest ? QOS_DESTROY | QOS_GET : QOS_DEFAULT)) {
m_channelGetRequester->getDone(otherRequestPendingStatus);
EXCEPTION_GUARD(m_channelGetRequester->getDone(otherRequestPendingStatus));
return;
}
//try {
try {
m_channel->checkAndGetTransport()->enqueueSendRequest(this);
//} catch (IllegalStateException ise) {
//TODO // m_channelGetRequester->getDone(channelNotConnected);
//}
} catch (std::runtime_error &rte) {
EXCEPTION_GUARD(m_channelGetRequester->getDone(channelNotConnected));
}
}
virtual void resubscribeSubscription(Transport* transport) {
@@ -514,14 +519,11 @@ class ChannelPutImpl : public BaseRequestImpl, public ChannelPut
// TODO best-effort put
// subscribe
// try {
try {
resubscribeSubscription(m_channel->checkAndGetTransport());
// } catch (IllegalStateException ise) {
// TODO m_channelPutRequester->channelPutConnect(channelNotConnected, 0, 0, 0);
// } catch (CAException caex) {
// TODO m_channelPutRequester->channelPutConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", caex), 0, 0, 0);
// }
} catch (std::runtime_error &rte) {
EXCEPTION_GUARD(m_channelPutRequester->channelPutConnect(channelNotConnected, 0, 0, 0));
}
}
virtual void send(ByteBuffer* buffer, TransportSendControl* control) {
@@ -554,14 +556,14 @@ class ChannelPutImpl : public BaseRequestImpl, public ChannelPut
}
virtual bool destroyResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) {
m_channelPutRequester->putDone(status);
EXCEPTION_GUARD(m_channelPutRequester->putDone(status));
return true;
}
virtual bool initResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) {
if (!status->isSuccess())
{
m_channelPutRequester->channelPutConnect(status, this, 0, 0);
EXCEPTION_GUARD(m_channelPutRequester->channelPutConnect(status, this, 0, 0));
return true;
}
@@ -570,7 +572,7 @@ class ChannelPutImpl : public BaseRequestImpl, public ChannelPut
m_bitSet = new BitSet(m_data->getNumberFields());
// notify
m_channelPutRequester->channelPutConnect(okStatus, this, m_data, m_bitSet);
EXCEPTION_GUARD(m_channelPutRequester->channelPutConnect(okStatus, this, m_data, m_bitSet));
return true;
}
@@ -579,17 +581,18 @@ class ChannelPutImpl : public BaseRequestImpl, public ChannelPut
{
if (!status->isSuccess())
{
m_channelPutRequester->getDone(status);
EXCEPTION_GUARD(m_channelPutRequester->getDone(status));
return true;
}
m_data->deserialize(payloadBuffer, transport);
m_channelPutRequester->getDone(status);
EXCEPTION_GUARD(m_channelPutRequester->getDone(status));
return true;
}
else
{
m_channelPutRequester->putDone(okStatus);
EXCEPTION_GUARD(m_channelPutRequester->putDone(okStatus));
return true;
}
}
@@ -598,21 +601,21 @@ class ChannelPutImpl : public BaseRequestImpl, public ChannelPut
// TODO sync?
if (m_destroyed) {
m_channelPutRequester->getDone(destroyedStatus);
EXCEPTION_GUARD(m_channelPutRequester->getDone(destroyedStatus));
return;
}
if (!startRequest(QOS_GET)) {
m_channelPutRequester->getDone(otherRequestPendingStatus);
EXCEPTION_GUARD(m_channelPutRequester->getDone(otherRequestPendingStatus));
return;
}
// try {
try {
m_channel->checkAndGetTransport()->enqueueSendRequest(this);
// } catch (IllegalStateException ise) {
// m_channelPutRequester->getDone(channelNotConnected);
// }
} catch (std::runtime_error &rte) {
EXCEPTION_GUARD(m_channelPutRequester->getDone(channelNotConnected));
}
}
virtual void put(bool lastRequest) {
@@ -628,11 +631,11 @@ class ChannelPutImpl : public BaseRequestImpl, public ChannelPut
return;
}
//try {
try {
m_channel->checkAndGetTransport()->enqueueSendRequest(this);
//} catch (IllegalStateException ise) {
//TODO // m_channelPutRequester->putDone(channelNotConnected);
//}
} catch (std::runtime_error &rte) {
EXCEPTION_GUARD(m_channelPutRequester->putDone(channelNotConnected));
}
}
virtual void resubscribeSubscription(Transport* transport) {
@@ -688,14 +691,11 @@ class ChannelPutGetImpl : public BaseRequestImpl, public ChannelPutGet
PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelPutGet);
// subscribe
// try {
try {
resubscribeSubscription(m_channel->checkAndGetTransport());
// } catch (IllegalStateException ise) {
// TODO m_channelPutGetRequester->channelPutGetConnect(channelNotConnected, 0, 0, 0);
// } catch (CAException caex) {
// TODO m_channelPutGetRequester->channelPutGetConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", caex), 0, 0, 0);
// }
} catch (std::runtime_error &rte) {
EXCEPTION_GUARD(m_channelPutGetRequester->channelPutGetConnect(channelNotConnected, 0, 0, 0));
}
}
@@ -740,7 +740,7 @@ class ChannelPutGetImpl : public BaseRequestImpl, public ChannelPutGet
virtual bool initResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) {
if (!status->isSuccess())
{
m_channelPutGetRequester->channelPutGetConnect(status, this, 0, 0);
EXCEPTION_GUARD(m_channelPutGetRequester->channelPutGetConnect(status, this, 0, 0));
return true;
}
@@ -749,7 +749,7 @@ class ChannelPutGetImpl : public BaseRequestImpl, public ChannelPutGet
m_getData = registry->deserializeStructureAndCreatePVStructure(payloadBuffer, transport);
// notify
m_channelPutGetRequester->channelPutGetConnect(okStatus, this, m_putData, m_getData);
EXCEPTION_GUARD(m_channelPutGetRequester->channelPutGetConnect(okStatus, this, m_putData, m_getData));
return true;
}
@@ -759,39 +759,42 @@ class ChannelPutGetImpl : public BaseRequestImpl, public ChannelPutGet
{
if (!status->isSuccess())
{
m_channelPutGetRequester->getGetDone(status);
EXCEPTION_GUARD(m_channelPutGetRequester->getGetDone(status));
return true;
}
// deserialize get data
m_getData->deserialize(payloadBuffer, transport);
m_channelPutGetRequester->getGetDone(status);
EXCEPTION_GUARD(m_channelPutGetRequester->getGetDone(status));
return true;
}
else if (qos & QOS_GET_PUT)
{
if (!status->isSuccess())
{
m_channelPutGetRequester->getPutDone(status);
EXCEPTION_GUARD(m_channelPutGetRequester->getPutDone(status));
return true;
}
// deserialize put data
m_putData->deserialize(payloadBuffer, transport);
m_channelPutGetRequester->getPutDone(status);
EXCEPTION_GUARD(m_channelPutGetRequester->getPutDone(status));
return true;
}
else
{
if (!status->isSuccess())
{
m_channelPutGetRequester->putGetDone(status);
EXCEPTION_GUARD(m_channelPutGetRequester->putGetDone(status));
return true;
}
// deserialize data
m_getData->deserialize(payloadBuffer, transport);
m_channelPutGetRequester->putGetDone(status);
EXCEPTION_GUARD(m_channelPutGetRequester->putGetDone(status));
return true;
}
}
@@ -799,38 +802,38 @@ class ChannelPutGetImpl : public BaseRequestImpl, public ChannelPutGet
virtual void putGet(bool lastRequest) {
if (m_destroyed) {
m_channelPutGetRequester->putGetDone(destroyedStatus);
EXCEPTION_GUARD(m_channelPutGetRequester->putGetDone(destroyedStatus));
return;
}
if (!startRequest(lastRequest ? QOS_DESTROY : QOS_DEFAULT)) {
m_channelPutGetRequester->putGetDone(otherRequestPendingStatus);
EXCEPTION_GUARD(m_channelPutGetRequester->putGetDone(otherRequestPendingStatus));
return;
}
// try {
try {
m_channel->checkAndGetTransport()->enqueueSendRequest(this);
// } catch (IllegalStateException ise) {
// m_channelPutGetRequester->putGetDone(channelNotConnected);
// }
} catch (std::runtime_error &rte) {
EXCEPTION_GUARD(m_channelPutGetRequester->putGetDone(channelNotConnected));
}
}
virtual void getGet() {
if (m_destroyed) {
m_channelPutGetRequester->getGetDone(destroyedStatus);
EXCEPTION_GUARD(m_channelPutGetRequester->getGetDone(destroyedStatus));
return;
}
if (!startRequest(QOS_GET)) {
m_channelPutGetRequester->getGetDone(otherRequestPendingStatus);
EXCEPTION_GUARD(m_channelPutGetRequester->getGetDone(otherRequestPendingStatus));
return;
}
//try {
try {
m_channel->checkAndGetTransport()->enqueueSendRequest(this);
//} catch (IllegalStateException ise) {
//TODO // m_channelPutGetRequester->getGetDone(channelNotConnected);
//}
} catch (std::runtime_error &rte) {
EXCEPTION_GUARD(m_channelPutGetRequester->getGetDone(channelNotConnected));
}
}
virtual void getPut() {
@@ -844,11 +847,11 @@ class ChannelPutGetImpl : public BaseRequestImpl, public ChannelPutGet
return;
}
//try {
try {
m_channel->checkAndGetTransport()->enqueueSendRequest(this);
//} catch (IllegalStateException ise) {
//TODO // m_channelPutGetRequester->getPutDone(channelNotConnected);
//}
} catch (std::runtime_error &rte) {
EXCEPTION_GUARD(m_channelPutGetRequester->getPutDone(channelNotConnected));
}
}
virtual void resubscribeSubscription(Transport* transport) {
@@ -906,14 +909,11 @@ class ChannelRPCImpl : public BaseRequestImpl, public ChannelRPC
PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelRPC);
// subscribe
// try {
try {
resubscribeSubscription(m_channel->checkAndGetTransport());
// } catch (IllegalStateException ise) {
// TODO m_channelRPCRequester->channelRPCConnect(channelNotConnected, 0, 0, 0);
// } catch (CAException caex) {
// TODO m_channelRPCRequester->channelRPCConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", caex), 0, 0, 0);
// }
} catch (std::runtime_error &rte) {
EXCEPTION_GUARD(m_channelRPCRequester->channelRPCConnect(channelNotConnected, 0, 0, 0));
}
}
virtual void send(ByteBuffer* buffer, TransportSendControl* control) {
@@ -955,7 +955,7 @@ class ChannelRPCImpl : public BaseRequestImpl, public ChannelRPC
virtual bool initResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) {
if (!status->isSuccess())
{
m_channelRPCRequester->channelRPCConnect(status, this, 0, 0);
EXCEPTION_GUARD(m_channelRPCRequester->channelRPCConnect(status, this, 0, 0));
return true;
}
@@ -964,19 +964,21 @@ class ChannelRPCImpl : public BaseRequestImpl, public ChannelRPC
m_bitSet = new BitSet(m_data->getNumberFields());
// notify
m_channelRPCRequester->channelRPCConnect(okStatus, this, m_data, m_bitSet);
EXCEPTION_GUARD(m_channelRPCRequester->channelRPCConnect(okStatus, this, m_data, m_bitSet));
return true;
}
virtual bool normalResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) {
if (!status->isSuccess())
{
m_channelRPCRequester->requestDone(status, 0);
EXCEPTION_GUARD(m_channelRPCRequester->requestDone(status, 0));
return true;
}
auto_ptr<PVStructure> response(transport->getIntrospectionRegistry()->deserializeStructureAndCreatePVStructure(payloadBuffer, transport));
m_channelRPCRequester->requestDone(okStatus, response.get());
PVStructure* response = transport->getIntrospectionRegistry()->deserializeStructureAndCreatePVStructure(payloadBuffer, transport);
EXCEPTION_GUARD(m_channelRPCRequester->requestDone(okStatus, response));
delete response;
return true;
}
@@ -984,20 +986,20 @@ class ChannelRPCImpl : public BaseRequestImpl, public ChannelRPC
// TODO sync?
if (m_destroyed) {
m_channelRPCRequester->requestDone(destroyedStatus, 0);
EXCEPTION_GUARD(m_channelRPCRequester->requestDone(destroyedStatus, 0));
return;
}
if (!startRequest(lastRequest ? QOS_DESTROY : QOS_DEFAULT)) {
m_channelRPCRequester->requestDone(otherRequestPendingStatus, 0);
EXCEPTION_GUARD(m_channelRPCRequester->requestDone(otherRequestPendingStatus, 0));
return;
}
//try {
try {
m_channel->checkAndGetTransport()->enqueueSendRequest(this);
//} catch (IllegalStateException ise) {
//TODO // m_channelRPCRequester->requestDone(channelNotConnected, 0);
//}
} catch (std::runtime_error &rte) {
EXCEPTION_GUARD(m_channelRPCRequester->requestDone(channelNotConnected, 0));
}
}
virtual void resubscribeSubscription(Transport* transport) {
@@ -1059,14 +1061,11 @@ class ChannelArrayImpl : public BaseRequestImpl, public ChannelArray
PVDATA_REFCOUNT_MONITOR_CONSTRUCT(channelArray);
// subscribe
// try {
try {
resubscribeSubscription(m_channel->checkAndGetTransport());
// } catch (IllegalStateException ise) {
// TODO m_channelArrayRequester->channelArrayConnect(channelNotConnected, 0, 0);
// } catch (CAException caex) {
// TODO m_channelArrayRequester->channelArrayConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", caex), 0, 0);
// }
} catch (std::runtime_error &rte) {
EXCEPTION_GUARD(m_channelArrayRequester->channelArrayConnect(channelNotConnected, 0, 0));
}
}
virtual void send(ByteBuffer* buffer, TransportSendControl* control) {
@@ -1117,7 +1116,7 @@ class ChannelArrayImpl : public BaseRequestImpl, public ChannelArray
virtual bool initResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) {
if (!status->isSuccess())
{
m_channelArrayRequester->channelArrayConnect(status, this, 0);
EXCEPTION_GUARD(m_channelArrayRequester->channelArrayConnect(status, this, 0));
return true;
}
@@ -1126,7 +1125,7 @@ class ChannelArrayImpl : public BaseRequestImpl, public ChannelArray
m_data = dynamic_cast<PVArray*>(getPVDataCreate()->createPVField(0, field));
// notify
m_channelArrayRequester->channelArrayConnect(okStatus, this, m_data);
EXCEPTION_GUARD(m_channelArrayRequester->channelArrayConnect(okStatus, this, m_data));
return true;
}
@@ -1140,17 +1139,18 @@ class ChannelArrayImpl : public BaseRequestImpl, public ChannelArray
}
m_data->deserialize(payloadBuffer, transport);
m_channelArrayRequester->getArrayDone(okStatus);
EXCEPTION_GUARD(m_channelArrayRequester->getArrayDone(okStatus));
return true;
}
else if (qos & QOS_GET_PUT)
{
m_channelArrayRequester->setLengthDone(status);
EXCEPTION_GUARD(m_channelArrayRequester->setLengthDone(status));
return true;
}
else
{
m_channelArrayRequester->putArrayDone(status);
EXCEPTION_GUARD(m_channelArrayRequester->putArrayDone(status));
return true;
}
}
@@ -1160,66 +1160,66 @@ class ChannelArrayImpl : public BaseRequestImpl, public ChannelArray
// TODO sync?
if (m_destroyed) {
m_channelArrayRequester->getArrayDone(destroyedStatus);
EXCEPTION_GUARD(m_channelArrayRequester->getArrayDone(destroyedStatus));
return;
}
if (!startRequest(lastRequest ? QOS_DESTROY | QOS_GET : QOS_GET)) {
m_channelArrayRequester->getArrayDone(otherRequestPendingStatus);
EXCEPTION_GUARD(m_channelArrayRequester->getArrayDone(otherRequestPendingStatus));
return;
}
//try {
try {
m_offset = offset;
m_count = count;
m_channel->checkAndGetTransport()->enqueueSendRequest(this);
//} catch (IllegalStateException ise) {
//TODO // m_channelArrayRequester->getArrayDone(channelNotConnected);
//}
} catch (std::runtime_error &rte) {
EXCEPTION_GUARD(m_channelArrayRequester->getArrayDone(channelNotConnected));
}
}
virtual void putArray(bool lastRequest, int offset, int count) {
// TODO sync?
if (m_destroyed) {
m_channelArrayRequester->putArrayDone(destroyedStatus);
EXCEPTION_GUARD(m_channelArrayRequester->putArrayDone(destroyedStatus));
return;
}
if (!startRequest(lastRequest ? QOS_DESTROY : QOS_DEFAULT)) {
m_channelArrayRequester->putArrayDone(otherRequestPendingStatus);
EXCEPTION_GUARD(m_channelArrayRequester->putArrayDone(otherRequestPendingStatus));
return;
}
//try {
try {
m_offset = offset;
m_count = count;
m_channel->checkAndGetTransport()->enqueueSendRequest(this);
//} catch (IllegalStateException ise) {
//TODO // m_channelArrayRequester->putArrayDone(channelNotConnected);
//}
} catch (std::runtime_error &rte) {
EXCEPTION_GUARD(m_channelArrayRequester->putArrayDone(channelNotConnected));
}
}
virtual void setLength(bool lastRequest, int length, int capacity) {
// TODO sync?
if (m_destroyed) {
m_channelArrayRequester->setLengthDone(destroyedStatus);
EXCEPTION_GUARD(m_channelArrayRequester->setLengthDone(destroyedStatus));
return;
}
if (!startRequest(lastRequest ? QOS_DESTROY | QOS_GET_PUT : QOS_GET_PUT)) {
m_channelArrayRequester->setLengthDone(otherRequestPendingStatus);
EXCEPTION_GUARD(m_channelArrayRequester->setLengthDone(otherRequestPendingStatus));
return;
}
//try {
try {
m_length = length;
m_capacity = capacity;
m_channel->checkAndGetTransport()->enqueueSendRequest(this);
//} catch (IllegalStateException ise) {
//TODO // m_channelArrayRequester->setLengthDone(channelNotConnected);
//}
} catch (std::runtime_error &rte) {
EXCEPTION_GUARD(m_channelArrayRequester->setLengthDone(channelNotConnected));
}
}
@@ -1281,13 +1281,12 @@ class ChannelGetFieldRequestImpl : public DataResponse, public TransportSender
m_ioid = m_context->registerResponseRequest(this);
channel->registerResponseRequest(this);
// TODO
// enqueue send request
//try {
try {
m_channel->checkAndGetTransport()->enqueueSendRequest(this);
//} catch (IllegalStateException ise) {
// callback.getDone(BaseRequestImpl.channelNotConnected, 0);
//}
} catch (std::runtime_error &rte) {
EXCEPTION_GUARD(callback->getDone(channelNotConnected, 0));
}
}
Requester* getRequester() {
@@ -1355,12 +1354,12 @@ class ChannelGetFieldRequestImpl : public DataResponse, public TransportSender
{
// deserialize Field...
const Field* field = transport->getIntrospectionRegistry()->deserialize(payloadBuffer, transport);
m_callback->getDone(status, field);
EXCEPTION_GUARD(m_callback->getDone(status, field));
field->decReferenceCount();
}
else
{
m_callback->getDone(status, 0);
EXCEPTION_GUARD(m_callback->getDone(status, 0));
}
// TODO
@@ -1433,14 +1432,11 @@ public MonitorElement
// TODO quques
// subscribe
// try {
try {
resubscribeSubscription(m_channel->checkAndGetTransport());
// } catch (IllegalStateException ise) {
// TODO m_monitorRequester->monitorConnect(channelNotConnected, 0, 0);
// } catch (CAException caex) {
// TODO m_monitorRequester->monitorConnect(statusCreate.createStatus(StatusType.ERROR, "failed to sent message over network", caex), 0, 0);
// }
} catch (std::runtime_error &rte) {
EXCEPTION_GUARD(m_monitorRequester->monitorConnect(channelNotConnected, 0, 0));
}
}
virtual void send(ByteBuffer* buffer, TransportSendControl* control) {
@@ -1474,7 +1470,7 @@ public MonitorElement
virtual bool initResponse(Transport* transport, int8 version, ByteBuffer* payloadBuffer, int8 qos, Status* status) {
if (!status->isSuccess())
{
m_monitorRequester->monitorConnect(status, this, 0);
EXCEPTION_GUARD(m_monitorRequester->monitorConnect(status, this, 0));
return true;
}
@@ -1490,7 +1486,7 @@ public MonitorElement
// notify
m_monitorRequester->monitorConnect(okStatus, this, m_structure);
EXCEPTION_GUARD(m_monitorRequester->monitorConnect(okStatus, this, m_structure));
return true;
}
@@ -1505,7 +1501,8 @@ public MonitorElement
m_changedBitSet->deserialize(payloadBuffer, transport);
m_pvStructure->deserialize(payloadBuffer, transport, m_changedBitSet);
m_overrunBitSet->deserialize(payloadBuffer, transport);
m_monitorRequester->monitorEvent(this);
EXCEPTION_GUARD(m_monitorRequester->monitorEvent(this));
}
return true;
}
@@ -1591,8 +1588,8 @@ public MonitorElement
m_started = true;
// client needs to delete status, so passing shared OK instance is not right thing to do
return getStatusCreate()->createStatus(STATUSTYPE_OK, "Monitor started.");
//} catch (IllegalStateException ise) {
// return channelNotConnected;
//} catch (std::runtime_error &rte) {
// return channelNotConnected; // TODO clone
//}
@@ -1619,8 +1616,8 @@ public MonitorElement
m_started = false;
// client needs to delete status, so passing shared OK instance is not right thing to do
return getStatusCreate()->createStatus(STATUSTYPE_OK, "Monitor stopped.");
//} catch (IllegalStateException ise) {
// return channelNotConnected;
//} catch (std::runtime_error &rte) {
// return channelNotConnected; // TODO clone
//}
}
@@ -1671,20 +1668,6 @@ public MonitorElement
// TODO consider std::unordered_map
typedef std::map<pvAccessID, ResponseRequest*> IOIDResponseRequestMap;
// TODO log
#define CALLBACK_GUARD(code) try { code } catch(...) { }
/**
* @author <a href="mailto:matej.sekoranjaATcosylab.com">Matej Sekoranja</a>
* @version $Id: AbstractServerResponseHandler.java,v 1.1 2010/05/03 14:45:39 mrkraimer Exp $
@@ -2513,7 +2496,7 @@ class TestChannelImpl : public ChannelImpl {
* Actual destroy method, to be called <code>CAJContext</code>.
* @param force force destruction regardless of reference count
* @throws CAException
* @throws IllegalStateException
* @throws std::runtime_error
* @throws IOException
*/
void destroyChannel(bool force) {
@@ -2617,8 +2600,8 @@ class TestChannelImpl : public ChannelImpl {
// multiple defined PV or reconnect request (same server address)
if (sockAddrAreIdentical(transport->getRemoteAddress(), serverAddress))
{
m_requester->message("More than one channel with name '" + m_name +
"' detected, additional response from: " + inetAddressToString(*serverAddress), warningMessage);
EXCEPTION_GUARD(m_requester->message("More than one channel with name '" + m_name +
"' detected, additional response from: " + inetAddressToString(*serverAddress), warningMessage));
return;
}
}
@@ -2697,7 +2680,7 @@ class TestChannelImpl : public ChannelImpl {
{
//lastReportedConnectionState = connectionStatusToReport;
// TODO via dispatcher ?!!!
m_requester->channelStateChange(this, connectionState);
EXCEPTION_GUARD(m_requester->channelStateChange(this, connectionState));
}
}
}
@@ -2763,8 +2746,7 @@ class TestChannelImpl : public ChannelImpl {
iter != m_responseRequests.end();
iter++)
{
// TODO GUARD
iter->second->reportStatus(status);
EXCEPTION_GUARD(iter->second->reportStatus(status));
}
}
@@ -2782,10 +2764,9 @@ class TestChannelImpl : public ChannelImpl {
iter != m_responseRequests.end();
iter++)
{
// TODO GUARD
SubscriptionRequest* rrs = dynamic_cast<SubscriptionRequest*>(iter->second);
if (rrs)
rrs->resubscribeSubscription(transport);
EXCEPTION_GUARD(rrs->resubscribeSubscription(transport));
}
}
@@ -2806,10 +2787,9 @@ class TestChannelImpl : public ChannelImpl {
iter != m_responseRequests.end();
iter++)
{
// TODO GUARD
SubscriptionRequest* rrs = dynamic_cast<SubscriptionRequest*>(iter->second);
if (rrs)
rrs->updateSubscription();
EXCEPTION_GUARD(rrs->updateSubscription());
}
}
@@ -3466,7 +3446,7 @@ TODO
* @param channel
* @param force
* @throws CAException
* @throws IllegalStateException
* @throws std::runtime_error
*/
void destroyChannel(ChannelImpl* channel, bool force) {
@@ -4103,7 +4083,7 @@ int main(int argc,char *argv[])
ChannelGet* channelGet = channel->createChannelGet(&channelGetRequesterImpl, pvRequest);
epicsThreadSleep ( 3.0 );
channelGet->get(false);
epicsThreadSleep ( 300.0 );
epicsThreadSleep ( 3.0 );
/*
channelGet->destroy();
epicsThreadSleep ( 1.0 );