diff --git a/src/pva/pvaConstants.h b/src/pva/pvaConstants.h index 2d8d97d..f5ddd10 100644 --- a/src/pva/pvaConstants.h +++ b/src/pva/pvaConstants.h @@ -40,8 +40,12 @@ namespace pvAccess { /** PVA protocol message header size. */ const epics::pvData::int16 PVA_MESSAGE_HEADER_SIZE = 8; - /** All messages must be aligned to 8-bytes (64-bit). */ - const epics::pvData::int32 PVA_ALIGNMENT = 1; //8; + /** + * All messages must be aligned to 8-bytes (64-bit). + * MUST be 1. Code does not handle well alignment in some situations (e.g. direct deserialize). + * Alignment is not worth additional code complexity. + */ + const epics::pvData::int32 PVA_ALIGNMENT = 1; /** * UDP maximum send message size. diff --git a/src/remoteClient/clientContextImpl.cpp b/src/remoteClient/clientContextImpl.cpp index 867a7fb..0592d93 100644 --- a/src/remoteClient/clientContextImpl.cpp +++ b/src/remoteClient/clientContextImpl.cpp @@ -2674,6 +2674,48 @@ namespace epics { }; + class MultipleDataResponseHandler : public AbstractClientResponseHandler, private epics::pvData::NoDefaultMethods { + public: + MultipleDataResponseHandler(ClientContextImpl::shared_pointer const & context) : + AbstractClientResponseHandler(context, "Multiple data response") + { + } + + virtual ~MultipleDataResponseHandler() { + } + + virtual void handleResponse(osiSockAddr* responseFrom, + Transport::shared_pointer const & transport, int8 version, int8 command, + size_t payloadSize, epics::pvData::ByteBuffer* payloadBuffer) + { + AbstractClientResponseHandler::handleResponse(responseFrom, transport, version, command, payloadSize, payloadBuffer); + + // TODO add submessage payload size, so that non-existant IOID can be skipped + // and others not lost + + ClientContextImpl::shared_pointer context = _context.lock(); + while (true) + { + transport->ensureData(4); + pvAccessID ioid = payloadBuffer->getInt(); + if (ioid == INVALID_IOID) + return; + + ResponseRequest::shared_pointer rr = context->getResponseRequest(ioid); + if (rr.get()) + { + DataResponse::shared_pointer nrr = dynamic_pointer_cast(rr); + if (nrr.get()) + nrr->response(transport, version, payloadBuffer); + else + return; // we cannot deserialize, we are lost in stream, we must stop + } + else + return; // we cannot deserialize, we are lost in stream, we must stop + } + } + }; + class SearchResponseHandler : public AbstractClientResponseHandler, private epics::pvData::NoDefaultMethods { public: SearchResponseHandler(ClientContextImpl::shared_pointer const & context) : @@ -2983,7 +3025,7 @@ namespace epics { m_handlerTable[CMD_PROCESS] = dataResponse; /* 16 - process response */ m_handlerTable[CMD_GET_FIELD] = dataResponse; /* 17 - get field response */ m_handlerTable[CMD_MESSAGE].reset(new MessageHandler(context)); /* 18 - message to Requester */ - m_handlerTable[CMD_MULTIPLE_DATA] = badResponse; // TODO new MultipleDataResponseHandler(context), /* 19 - grouped monitors */ + m_handlerTable[CMD_MULTIPLE_DATA].reset(new MultipleDataResponseHandler(context)); /* 19 - grouped monitors */ m_handlerTable[CMD_RPC] = dataResponse; /* 20 - RPC response */ m_handlerTable[CMD_CANCEL_REQUEST] = badResponse; /* 21 - cancel request */ }