diff --git a/src/dataencode.cpp b/src/dataencode.cpp index 365149b..34212cd 100644 --- a/src/dataencode.cpp +++ b/src/dataencode.cpp @@ -432,16 +432,17 @@ void to_wire_full(Buffer& buf, const Value& val) to_wire_field(buf, Value::Helper::desc(val), Value::Helper::store(val)); } -void to_wire_valid(Buffer& buf, const Value& val) +void to_wire_valid(Buffer& buf, const Value& val, const BitMask* mask) { auto desc = Value::Helper::desc(val); auto store = Value::Helper::store(val); assert(desc && desc->code==TypeCode::Struct); + assert(!mask || mask->size()==desc->size()); BitMask valid(desc->size()); for(auto bit : range(desc->size())) { - if((store.get()+bit)->valid) + if((store.get()+bit)->valid && (!mask || (*mask)[bit])) valid[bit] = true; } diff --git a/src/dataimpl.h b/src/dataimpl.h index df7ce37..59d1878 100644 --- a/src/dataimpl.h +++ b/src/dataimpl.h @@ -147,18 +147,23 @@ struct StructTop { using Type = std::shared_ptr; +//! serialize all Value fields PVXS_API void to_wire_full(Buffer& buf, const Value& val); +//! serialize BitMask and marked valid Value fields PVXS_API -void to_wire_valid(Buffer& buf, const Value& val); +void to_wire_valid(Buffer& buf, const Value& val, const BitMask* mask=nullptr); +//! deserialize type description PVXS_API void from_wire_full(Buffer& buf, TypeStore& ctxt, Value& val); +//! deserialize BitMask and partial Value PVXS_API void from_wire_valid(Buffer& buf, TypeStore& ctxt, Value& val); +//! deserialize type description and full value (a la. pvRequest) PVXS_API void from_wire_type_value(Buffer& buf, TypeStore& ctxt, Value& val); diff --git a/src/serverget.cpp b/src/serverget.cpp index 5bc90de..67e06f8 100644 --- a/src/serverget.cpp +++ b/src/serverget.cpp @@ -9,6 +9,7 @@ #include #include "dataimpl.h" #include "serverconn.h" +#include "pvrequest.h" namespace pvxs { namespace impl { DEFINE_LOGGER(connsetup, "pvxs.tcp.setup"); @@ -24,8 +25,7 @@ struct ServerGPR : public ServerOp {} virtual ~ServerGPR() {} - void doReply(const std::shared_ptr& type, - const Value& value, + void doReply(const Value& value, const std::string& msg) { auto ch = chan.lock(); @@ -38,12 +38,13 @@ struct ServerGPR : public ServerOp if(state==Dead || state==Idle) { // no warn if Idle as this may result from a remote Cancel return; - } - if(type && this->type) - throw std::logic_error("Operation already connected (has a type)"); - if(cmd!=CMD_PUT && this->type && (!value || Value::Helper::desc(value)!=this->type.get())) - throw std::logic_error("PUT Must reply with exact type previously passed to connect()"); + } else if(state==Executing) { + if(cmd==CMD_PUT && !value) + throw std::logic_error("PUT reply can't include Value"); + else if(cmd==CMD_GET && this->type && (!value || Value::Helper::desc(value)!=this->type.get())) + throw std::logic_error("GET must reply with exact type previously passed to connect()"); + } Status sts{}; if(!msg.empty()) @@ -68,14 +69,13 @@ struct ServerGPR : public ServerOp } else if(state==Creating) { // connect() if(cmd!=CMD_RPC) { - this->type = type; to_wire(R, type.get()); } state = Idle; } else if(state==Executing) { if(cmd==CMD_GET || (cmd==CMD_PUT && (subcmd&0x40))) { - to_wire_valid(R, value); // GET and PUT/Get reply with bitmask and partial value + to_wire_valid(R, value, &pvMask); // GET and PUT/Get reply with bitmask and partial value } else if(cmd==CMD_RPC) { auto type = Value::Helper::desc(value); @@ -117,6 +117,7 @@ struct ServerGPR : public ServerOp bool lastRequest=false; std::shared_ptr type; + BitMask pvMask; // mask computed from pvRequest .fields std::function onClose; std::function&&, Value&&)> onPut; @@ -158,11 +159,15 @@ struct ServerGPRConnect : public server::ConnectOp if(!prototype && oper->cmd!=CMD_RPC) throw std::invalid_argument("Must provide prototype"); - std::shared_ptr type; - if(prototype) - type = Value::Helper::type(prototype); + if(oper->type) + throw std::logic_error("Operation already connected (has a type)"); - oper->doReply(type, Value(), std::string()); + if(prototype) { + oper->type = Value::Helper::type(prototype); + oper->pvMask = request2mask(oper->type.get(), _pvRequest); + } + + oper->doReply(Value(), std::string()); } }); } @@ -176,7 +181,7 @@ struct ServerGPRConnect : public server::ConnectOp serv->acceptor_loop.call([this, &msg](){ if(auto oper = op.lock()) { if(oper->state==ServerOp::Creating) - oper->doReply(nullptr, Value(), msg); + oper->doReply(Value(), msg); } }); } @@ -245,7 +250,7 @@ struct ServerGPRExec : public server::ExecOp return; serv->acceptor_loop.call([this, &val](){ if(auto oper = op.lock()) { - oper->doReply(nullptr, val, std::string()); + oper->doReply(val, std::string()); } }); } @@ -259,7 +264,7 @@ struct ServerGPRExec : public server::ExecOp return; serv->acceptor_loop.call([this, &msg](){ if(auto oper = op.lock()) { - oper->doReply(nullptr, Value(), msg); + oper->doReply(Value(), msg); } }); }