ioc: avoid enumerating roles for each PUT
Just once on first PUT to each Channel
This commit is contained in:
+51
-23
@@ -34,17 +34,32 @@ namespace ioc {
|
||||
|
||||
DEFINE_LOGGER(_logname, "pvxs.ioc.group.source");
|
||||
|
||||
/**
|
||||
* group security cache - for storing group security credentials and clients.
|
||||
* per-channel, shared by all PUTs on that channel
|
||||
*/
|
||||
class GroupSecurityCache : public SecurityControlObject {
|
||||
public:
|
||||
// references entry in IOCGroupConfig::groupMap
|
||||
Group& group;
|
||||
std::vector<SecurityClient> securityClients;
|
||||
INST_COUNTER(GroupSecurityCache);
|
||||
|
||||
explicit GroupSecurityCache(Group& group) :group(group) {}
|
||||
};
|
||||
|
||||
DEFINE_INST_COUNTER(GroupSourceSubscriptionCtx);
|
||||
DEFINE_INST_COUNTER(GroupSecurityCache);
|
||||
|
||||
static
|
||||
void onOp(Group& group,
|
||||
void onOp(const std::shared_ptr<GroupSecurityCache>& securityCache,
|
||||
std::unique_ptr<server::ConnectOp>&& channelConnectOperation);
|
||||
static
|
||||
void onGet(Group& group, const std::unique_ptr<server::ExecOp>& getOperation);
|
||||
|
||||
static
|
||||
void onPutGroup(Group& group, std::unique_ptr<server::ExecOp>& putOperation, const Value& value,
|
||||
void onPutGroup(Group& group, bool atomic, TriState forceProcessing,
|
||||
std::unique_ptr<server::ExecOp>& putOperation, const Value& value,
|
||||
const GroupSecurityCache& groupSecurityCache);
|
||||
|
||||
static
|
||||
@@ -98,8 +113,11 @@ void GroupSource::onCreate(std::unique_ptr<server::ChannelControl>&& channelCont
|
||||
auto it(config.groupMap.find(sourceName));
|
||||
if(it != config.groupMap.end()) {
|
||||
auto& group(it->second);
|
||||
channelControl->onOp([&](std::unique_ptr<server::ConnectOp>&& channelConnectOperation) {
|
||||
onOp(group, std::move(channelConnectOperation));
|
||||
|
||||
auto securityCache = std::make_shared<GroupSecurityCache>(group);
|
||||
|
||||
channelControl->onOp([securityCache](std::unique_ptr<server::ConnectOp>&& channelConnectOperation) {
|
||||
onOp(securityCache, std::move(channelConnectOperation));
|
||||
});
|
||||
|
||||
channelControl
|
||||
@@ -162,8 +180,12 @@ void onDisableSubscription(const std::shared_ptr<GroupSourceSubscriptionCtx>& gr
|
||||
* @param channelConnectOperation the channel connect operation object
|
||||
*/
|
||||
static
|
||||
void onOp(Group& group,
|
||||
std::unique_ptr<server::ConnectOp>&& channelConnectOperation) {
|
||||
void onOp(const std::shared_ptr<GroupSecurityCache>& securityCache,
|
||||
std::unique_ptr<server::ConnectOp>&& channelConnectOperation)
|
||||
{
|
||||
// ok to allow reference to be captured. Lifetime attached to global group config
|
||||
auto& group = securityCache->group;
|
||||
|
||||
// First stage for handling any request is to announce the channel type with a `connect()` call
|
||||
// @note The type signalled here must match the eventual type returned by a pvxs get
|
||||
channelConnectOperation->connect(group.valueTemplate);
|
||||
@@ -173,16 +195,23 @@ void onOp(Group& group,
|
||||
onGet(group, getOperation);
|
||||
});
|
||||
|
||||
// Make a security cache for this client's connection to this group
|
||||
// Each time the same client calls put we will reuse the cached security client
|
||||
// The security cache will be deleted when the client disconnects from this group pv
|
||||
auto securityCache = std::make_shared<GroupSecurityCache>();
|
||||
// skip PUT specific allocations unless needed
|
||||
if(channelConnectOperation->op()!=server::OpBase::Put)
|
||||
return;
|
||||
|
||||
auto& pvRequest = channelConnectOperation->pvRequest();
|
||||
bool atomic = group.atomicPutGet;
|
||||
pvRequest["record._options.atomic"].as(atomic);
|
||||
|
||||
TriState forceProcessing{Unset};
|
||||
IOCSource::setForceProcessingFlag(channelConnectOperation.get(), pvRequest, forceProcessing);
|
||||
|
||||
// register handler for pvxs group put
|
||||
channelConnectOperation
|
||||
->onPut([&group, securityCache](std::unique_ptr<server::ExecOp>&& putOperation, Value&& value) {
|
||||
->onPut([&group, securityCache, atomic, forceProcessing]
|
||||
(std::unique_ptr<server::ExecOp>&& putOperation, Value&& value) {
|
||||
if (!securityCache->done) {
|
||||
// First time we call put we need to initialise the security cache
|
||||
// First PUT on this Channel
|
||||
securityCache->securityClients.resize(group.fields.size());
|
||||
securityCache->credentials.reset(new Credentials(*putOperation->credentials()));
|
||||
auto fieldIndex = 0u;
|
||||
@@ -193,12 +222,11 @@ void onOp(Group& group,
|
||||
}
|
||||
fieldIndex++;
|
||||
}
|
||||
auto& pvRequest = putOperation->pvRequest();
|
||||
IOCSource::setForceProcessingFlag(putOperation.get(), pvRequest, securityCache);
|
||||
securityCache->done = true;
|
||||
}
|
||||
// for each PUT
|
||||
|
||||
onPutGroup(group, putOperation, value, *securityCache);
|
||||
onPutGroup(group, atomic, forceProcessing, putOperation, value, *securityCache);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -515,6 +543,7 @@ void onGet(Group& group, const std::unique_ptr<server::ExecOp>& getOperation) {
|
||||
static
|
||||
bool putGroupField(const Value& value,
|
||||
const Field& field,
|
||||
TriState forceProcessing,
|
||||
const SecurityClient& securityClient,
|
||||
const GroupSecurityCache& groupSecurityCache,
|
||||
server::RemoteLogger& notify) {
|
||||
@@ -535,7 +564,7 @@ bool putGroupField(const Value& value,
|
||||
}
|
||||
if (changing || field.info.type==MappingInfo::Proc) {
|
||||
// Do processing if required
|
||||
IOCSource::doPostProcessing(field.value, groupSecurityCache.forceProcessing);
|
||||
IOCSource::doPostProcessing(field.value, forceProcessing);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
@@ -550,14 +579,13 @@ bool putGroupField(const Value& value,
|
||||
* @param groupSecurityCache the object that caches the security context of client connections
|
||||
*/
|
||||
static
|
||||
void onPutGroup(Group& group, std::unique_ptr<server::ExecOp>& putOperation, const Value& value,
|
||||
const GroupSecurityCache& groupSecurityCache) {
|
||||
void onPutGroup(Group& group, bool atomic, TriState forceProcessing,
|
||||
std::unique_ptr<server::ExecOp>& putOperation, const Value& value,
|
||||
const GroupSecurityCache& groupSecurityCache)
|
||||
{
|
||||
try {
|
||||
CurrentOp op(putOperation.get());
|
||||
|
||||
bool atomic = group.atomicPutGet;
|
||||
putOperation->pvRequest()["record._options.atomic"].as(atomic);
|
||||
|
||||
log_debug_printf(_logname, "%s %s\n", __func__, group.name.c_str());
|
||||
|
||||
std::vector<SecurityLogger> securityLoggers(group.fields.size());
|
||||
@@ -591,7 +619,7 @@ void onPutGroup(Group& group, std::unique_ptr<server::ExecOp>& putOperation, con
|
||||
// Loop through all fields
|
||||
for (auto& field: group.fields) {
|
||||
// Put the field
|
||||
didSomething |= putGroupField(value, field,
|
||||
didSomething |= putGroupField(value, field, forceProcessing,
|
||||
groupSecurityCache.securityClients[fieldIndex],
|
||||
groupSecurityCache,
|
||||
*putOperation);
|
||||
@@ -612,7 +640,7 @@ void onPutGroup(Group& group, std::unique_ptr<server::ExecOp>& putOperation, con
|
||||
// Lock this field
|
||||
DBLocker F(pDbChannel->addr.precord);
|
||||
// Put the field
|
||||
didSomething |= putGroupField(value, field,
|
||||
didSomething |= putGroupField(value, field, forceProcessing,
|
||||
groupSecurityCache.securityClients[fieldIndex],
|
||||
groupSecurityCache,
|
||||
*putOperation);
|
||||
|
||||
Reference in New Issue
Block a user