server GET apply pvRequest to mask
This commit is contained in:
+3
-2
@@ -432,16 +432,17 @@ void to_wire_full(Buffer& buf, const Value& val)
|
||||
to_wire_field(buf, Value::Helper::desc(val), Value::Helper::store(val));
|
||||
}
|
||||
|
||||
void to_wire_valid(Buffer& buf, const Value& val)
|
||||
void to_wire_valid(Buffer& buf, const Value& val, const BitMask* mask)
|
||||
{
|
||||
auto desc = Value::Helper::desc(val);
|
||||
auto store = Value::Helper::store(val);
|
||||
assert(desc && desc->code==TypeCode::Struct);
|
||||
assert(!mask || mask->size()==desc->size());
|
||||
|
||||
BitMask valid(desc->size());
|
||||
|
||||
for(auto bit : range(desc->size())) {
|
||||
if((store.get()+bit)->valid)
|
||||
if((store.get()+bit)->valid && (!mask || (*mask)[bit]))
|
||||
valid[bit] = true;
|
||||
}
|
||||
|
||||
|
||||
+6
-1
@@ -147,18 +147,23 @@ struct StructTop {
|
||||
using Type = std::shared_ptr<const FieldDesc>;
|
||||
|
||||
|
||||
//! serialize all Value fields
|
||||
PVXS_API
|
||||
void to_wire_full(Buffer& buf, const Value& val);
|
||||
|
||||
//! serialize BitMask and marked valid Value fields
|
||||
PVXS_API
|
||||
void to_wire_valid(Buffer& buf, const Value& val);
|
||||
void to_wire_valid(Buffer& buf, const Value& val, const BitMask* mask=nullptr);
|
||||
|
||||
//! deserialize type description
|
||||
PVXS_API
|
||||
void from_wire_full(Buffer& buf, TypeStore& ctxt, Value& val);
|
||||
|
||||
//! deserialize BitMask and partial Value
|
||||
PVXS_API
|
||||
void from_wire_valid(Buffer& buf, TypeStore& ctxt, Value& val);
|
||||
|
||||
//! deserialize type description and full value (a la. pvRequest)
|
||||
PVXS_API
|
||||
void from_wire_type_value(Buffer& buf, TypeStore& ctxt, Value& val);
|
||||
|
||||
|
||||
+21
-16
@@ -9,6 +9,7 @@
|
||||
#include <pvxs/log.h>
|
||||
#include "dataimpl.h"
|
||||
#include "serverconn.h"
|
||||
#include "pvrequest.h"
|
||||
|
||||
namespace pvxs { namespace impl {
|
||||
DEFINE_LOGGER(connsetup, "pvxs.tcp.setup");
|
||||
@@ -24,8 +25,7 @@ struct ServerGPR : public ServerOp
|
||||
{}
|
||||
virtual ~ServerGPR() {}
|
||||
|
||||
void doReply(const std::shared_ptr<const FieldDesc>& type,
|
||||
const Value& value,
|
||||
void doReply(const Value& value,
|
||||
const std::string& msg)
|
||||
{
|
||||
auto ch = chan.lock();
|
||||
@@ -38,12 +38,13 @@ struct ServerGPR : public ServerOp
|
||||
if(state==Dead || state==Idle) {
|
||||
// no warn if Idle as this may result from a remote Cancel
|
||||
return;
|
||||
}
|
||||
|
||||
if(type && this->type)
|
||||
throw std::logic_error("Operation already connected (has a type)");
|
||||
if(cmd!=CMD_PUT && this->type && (!value || Value::Helper::desc(value)!=this->type.get()))
|
||||
throw std::logic_error("PUT Must reply with exact type previously passed to connect()");
|
||||
} else if(state==Executing) {
|
||||
if(cmd==CMD_PUT && !value)
|
||||
throw std::logic_error("PUT reply can't include Value");
|
||||
else if(cmd==CMD_GET && this->type && (!value || Value::Helper::desc(value)!=this->type.get()))
|
||||
throw std::logic_error("GET must reply with exact type previously passed to connect()");
|
||||
}
|
||||
|
||||
Status sts{};
|
||||
if(!msg.empty())
|
||||
@@ -68,14 +69,13 @@ struct ServerGPR : public ServerOp
|
||||
} else if(state==Creating) {
|
||||
// connect()
|
||||
if(cmd!=CMD_RPC) {
|
||||
this->type = type;
|
||||
to_wire(R, type.get());
|
||||
}
|
||||
state = Idle;
|
||||
|
||||
} else if(state==Executing) {
|
||||
if(cmd==CMD_GET || (cmd==CMD_PUT && (subcmd&0x40))) {
|
||||
to_wire_valid(R, value); // GET and PUT/Get reply with bitmask and partial value
|
||||
to_wire_valid(R, value, &pvMask); // GET and PUT/Get reply with bitmask and partial value
|
||||
|
||||
} else if(cmd==CMD_RPC) {
|
||||
auto type = Value::Helper::desc(value);
|
||||
@@ -117,6 +117,7 @@ struct ServerGPR : public ServerOp
|
||||
bool lastRequest=false;
|
||||
|
||||
std::shared_ptr<const FieldDesc> type;
|
||||
BitMask pvMask; // mask computed from pvRequest .fields
|
||||
|
||||
std::function<void(const std::string&)> onClose;
|
||||
std::function<void(std::unique_ptr<server::ExecOp>&&, Value&&)> onPut;
|
||||
@@ -158,11 +159,15 @@ struct ServerGPRConnect : public server::ConnectOp
|
||||
if(!prototype && oper->cmd!=CMD_RPC)
|
||||
throw std::invalid_argument("Must provide prototype");
|
||||
|
||||
std::shared_ptr<const FieldDesc> type;
|
||||
if(prototype)
|
||||
type = Value::Helper::type(prototype);
|
||||
if(oper->type)
|
||||
throw std::logic_error("Operation already connected (has a type)");
|
||||
|
||||
oper->doReply(type, Value(), std::string());
|
||||
if(prototype) {
|
||||
oper->type = Value::Helper::type(prototype);
|
||||
oper->pvMask = request2mask(oper->type.get(), _pvRequest);
|
||||
}
|
||||
|
||||
oper->doReply(Value(), std::string());
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -176,7 +181,7 @@ struct ServerGPRConnect : public server::ConnectOp
|
||||
serv->acceptor_loop.call([this, &msg](){
|
||||
if(auto oper = op.lock()) {
|
||||
if(oper->state==ServerOp::Creating)
|
||||
oper->doReply(nullptr, Value(), msg);
|
||||
oper->doReply(Value(), msg);
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -245,7 +250,7 @@ struct ServerGPRExec : public server::ExecOp
|
||||
return;
|
||||
serv->acceptor_loop.call([this, &val](){
|
||||
if(auto oper = op.lock()) {
|
||||
oper->doReply(nullptr, val, std::string());
|
||||
oper->doReply(val, std::string());
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -259,7 +264,7 @@ struct ServerGPRExec : public server::ExecOp
|
||||
return;
|
||||
serv->acceptor_loop.call([this, &msg](){
|
||||
if(auto oper = op.lock()) {
|
||||
oper->doReply(nullptr, Value(), msg);
|
||||
oper->doReply(Value(), msg);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user