diff --git a/ioc/groupsource.cpp b/ioc/groupsource.cpp index 8c68889..4527fdf 100644 --- a/ioc/groupsource.cpp +++ b/ioc/groupsource.cpp @@ -34,17 +34,32 @@ namespace ioc { DEFINE_LOGGER(_logname, "pvxs.ioc.group.source"); +/** + * group security cache - for storing group security credentials and clients. + * per-channel, shared by all PUTs on that channel + */ +class GroupSecurityCache : public SecurityControlObject { +public: + // references entry in IOCGroupConfig::groupMap + Group& group; + std::vector securityClients; + INST_COUNTER(GroupSecurityCache); + + explicit GroupSecurityCache(Group& group) :group(group) {} +}; + DEFINE_INST_COUNTER(GroupSourceSubscriptionCtx); DEFINE_INST_COUNTER(GroupSecurityCache); static -void onOp(Group& group, +void onOp(const std::shared_ptr& securityCache, std::unique_ptr&& channelConnectOperation); static void onGet(Group& group, const std::unique_ptr& getOperation); static -void onPutGroup(Group& group, std::unique_ptr& putOperation, const Value& value, +void onPutGroup(Group& group, bool atomic, TriState forceProcessing, + std::unique_ptr& putOperation, const Value& value, const GroupSecurityCache& groupSecurityCache); static @@ -98,8 +113,11 @@ void GroupSource::onCreate(std::unique_ptr&& channelCont auto it(config.groupMap.find(sourceName)); if(it != config.groupMap.end()) { auto& group(it->second); - channelControl->onOp([&](std::unique_ptr&& channelConnectOperation) { - onOp(group, std::move(channelConnectOperation)); + + auto securityCache = std::make_shared(group); + + channelControl->onOp([securityCache](std::unique_ptr&& channelConnectOperation) { + onOp(securityCache, std::move(channelConnectOperation)); }); channelControl @@ -162,8 +180,12 @@ void onDisableSubscription(const std::shared_ptr& gr * @param channelConnectOperation the channel connect operation object */ static -void onOp(Group& group, - std::unique_ptr&& channelConnectOperation) { +void onOp(const std::shared_ptr& securityCache, + std::unique_ptr&& channelConnectOperation) +{ + // ok to allow reference to be captured. Lifetime attached to global group config + auto& group = securityCache->group; + // First stage for handling any request is to announce the channel type with a `connect()` call // @note The type signalled here must match the eventual type returned by a pvxs get channelConnectOperation->connect(group.valueTemplate); @@ -173,16 +195,23 @@ void onOp(Group& group, onGet(group, getOperation); }); - // Make a security cache for this client's connection to this group - // Each time the same client calls put we will reuse the cached security client - // The security cache will be deleted when the client disconnects from this group pv - auto securityCache = std::make_shared(); + // skip PUT specific allocations unless needed + if(channelConnectOperation->op()!=server::OpBase::Put) + return; + + auto& pvRequest = channelConnectOperation->pvRequest(); + bool atomic = group.atomicPutGet; + pvRequest["record._options.atomic"].as(atomic); + + TriState forceProcessing{Unset}; + IOCSource::setForceProcessingFlag(channelConnectOperation.get(), pvRequest, forceProcessing); // register handler for pvxs group put channelConnectOperation - ->onPut([&group, securityCache](std::unique_ptr&& putOperation, Value&& value) { + ->onPut([&group, securityCache, atomic, forceProcessing] + (std::unique_ptr&& putOperation, Value&& value) { if (!securityCache->done) { - // First time we call put we need to initialise the security cache + // First PUT on this Channel securityCache->securityClients.resize(group.fields.size()); securityCache->credentials.reset(new Credentials(*putOperation->credentials())); auto fieldIndex = 0u; @@ -193,12 +222,11 @@ void onOp(Group& group, } fieldIndex++; } - auto& pvRequest = putOperation->pvRequest(); - IOCSource::setForceProcessingFlag(putOperation.get(), pvRequest, securityCache); securityCache->done = true; } + // for each PUT - onPutGroup(group, putOperation, value, *securityCache); + onPutGroup(group, atomic, forceProcessing, putOperation, value, *securityCache); }); } @@ -515,6 +543,7 @@ void onGet(Group& group, const std::unique_ptr& getOperation) { static bool putGroupField(const Value& value, const Field& field, + TriState forceProcessing, const SecurityClient& securityClient, const GroupSecurityCache& groupSecurityCache, server::RemoteLogger& notify) { @@ -535,7 +564,7 @@ bool putGroupField(const Value& value, } if (changing || field.info.type==MappingInfo::Proc) { // Do processing if required - IOCSource::doPostProcessing(field.value, groupSecurityCache.forceProcessing); + IOCSource::doPostProcessing(field.value, forceProcessing); return true; } return false; @@ -550,14 +579,13 @@ bool putGroupField(const Value& value, * @param groupSecurityCache the object that caches the security context of client connections */ static -void onPutGroup(Group& group, std::unique_ptr& putOperation, const Value& value, - const GroupSecurityCache& groupSecurityCache) { +void onPutGroup(Group& group, bool atomic, TriState forceProcessing, + std::unique_ptr& putOperation, const Value& value, + const GroupSecurityCache& groupSecurityCache) +{ try { CurrentOp op(putOperation.get()); - bool atomic = group.atomicPutGet; - putOperation->pvRequest()["record._options.atomic"].as(atomic); - log_debug_printf(_logname, "%s %s\n", __func__, group.name.c_str()); std::vector securityLoggers(group.fields.size()); @@ -591,7 +619,7 @@ void onPutGroup(Group& group, std::unique_ptr& putOperation, con // Loop through all fields for (auto& field: group.fields) { // Put the field - didSomething |= putGroupField(value, field, + didSomething |= putGroupField(value, field, forceProcessing, groupSecurityCache.securityClients[fieldIndex], groupSecurityCache, *putOperation); @@ -612,7 +640,7 @@ void onPutGroup(Group& group, std::unique_ptr& putOperation, con // Lock this field DBLocker F(pDbChannel->addr.precord); // Put the field - didSomething |= putGroupField(value, field, + didSomething |= putGroupField(value, field, forceProcessing, groupSecurityCache.securityClients[fieldIndex], groupSecurityCache, *putOperation); diff --git a/ioc/iocsource.cpp b/ioc/iocsource.cpp index 4e94a23..da2e251 100644 --- a/ioc/iocsource.cpp +++ b/ioc/iocsource.cpp @@ -422,12 +422,10 @@ void IOCSource::doPostProcessing(dbChannel* pDbChannel, TriState forceProcessing /** * Set a flag that will force processing of record in the specified security control object - * - * @param pvRequest the request - * @param securityControlObject the security control object to update */ -void IOCSource::setForceProcessingFlag(server::RemoteLogger *op, const Value& pvRequest, - const std::shared_ptr& securityControlObject) +void IOCSource::setForceProcessingFlag(server::RemoteLogger *op, + const Value& pvRequest, + TriState& forceProc) { auto proc = pvRequest["record._options.process"]; bool b; @@ -436,12 +434,12 @@ void IOCSource::setForceProcessingFlag(server::RemoteLogger *op, const Value& pv return; // not provided } else if(proc.as(b)) { // actual bool, integer, or string parsable to bool - securityControlObject->forceProcessing = b ? True : False; + forceProc = b ? True : False; return; } else if(proc.as(s)) { if(s=="passive") { - securityControlObject->forceProcessing = Unset; + forceProc = Unset; return; } } diff --git a/ioc/iocsource.h b/ioc/iocsource.h index e8adb33..105ed8d 100644 --- a/ioc/iocsource.h +++ b/ioc/iocsource.h @@ -64,7 +64,7 @@ public: static void setForceProcessingFlag(server::RemoteLogger *op, const Value& pvRequest, - const std::shared_ptr& securityControlObject); + TriState &forceProc); }; struct CurrentOp { diff --git a/ioc/securityclient.h b/ioc/securityclient.h index 7288fdf..94697a7 100644 --- a/ioc/securityclient.h +++ b/ioc/securityclient.h @@ -35,18 +35,10 @@ public: */ class SecurityControlObject { public: - bool done = false; - TriState forceProcessing{ Unset }; -}; - -/** - * group security cache - for storing group security credentials and clients - */ -class GroupSecurityCache : public SecurityControlObject { -public: - std::vector securityClients; - std::unique_ptr credentials; - INST_COUNTER(GroupSecurityCache); + // set by the first PUT to each Channel, + // when associated Credentials and SecurityClient(s) are initialized + bool done = false; + std::unique_ptr credentials; }; /** @@ -54,16 +46,16 @@ public: */ class SingleSecurityCache : public SecurityControlObject { public: - SecurityClient securityClient; - std::unique_ptr credentials; + SecurityClient securityClient; }; /** * The put operation cache for caching information about the current client put connection * Includes a single security cache as well as information pertaining to asynchronous put operations */ -struct PutOperationCache : public SingleSecurityCache { +struct PutOperationCache { bool doWait{ false }; + TriState forceProcessing{ Unset }; processNotify notify{}; Value valueToSet; std::unique_ptr putOperation; diff --git a/ioc/singlesource.cpp b/ioc/singlesource.cpp index f7e87c4..e41dd29 100644 --- a/ioc/singlesource.cpp +++ b/ioc/singlesource.cpp @@ -316,10 +316,18 @@ void onOp(const std::shared_ptr& sInfo, const Value& valuePrototype, singleGet(*sInfo, getOperation, valuePrototype); }); + // skip PUT specific allocations unless needed + if(channelConnectOperation->op()!=server::OpBase::Put) + return; + // Make a security cache for this client's connection to this pv // Each time the same client calls put we will reuse the cached security client // The security cache will be deleted when the client disconnects from this pv auto putOperationCache = std::make_shared(); + putOperationCache->notify.usrPvt = putOperationCache.get(); + putOperationCache->notify.chan = sInfo->chan; + putOperationCache->notify.putCallback = putCallback; + putOperationCache->notify.doneCallback = doneCallback; // Set up handler for put requests channelConnectOperation @@ -328,30 +336,28 @@ void onOp(const std::shared_ptr& sInfo, const Value& valuePrototype, Value&& value) { try { dbChannel* pDbChannel = sInfo->chan; - if (!putOperationCache->done) { - putOperationCache->credentials.reset(new Credentials(*putOperation->credentials())); - putOperationCache->securityClient.update(pDbChannel, *putOperationCache->credentials); - putOperationCache->notify.usrPvt = putOperationCache.get(); - putOperationCache->notify.chan = pDbChannel; - putOperationCache->notify.putCallback = putCallback; - putOperationCache->notify.doneCallback = doneCallback; + if (!sInfo->done) { + // initialize credentials on first PUT to this Channel + sInfo->credentials.reset(new Credentials(*putOperation->credentials())); + sInfo->securityClient.update(pDbChannel, *sInfo->credentials); - auto& pvRequest = putOperation->pvRequest(); - pvRequest["record._options.block"].as(putOperationCache->doWait); - IOCSource::setForceProcessingFlag(putOperation.get(), pvRequest, putOperationCache); - if (putOperationCache->forceProcessing) { - putOperationCache->doWait = false; // no point in waiting - } - putOperationCache->done = true; + sInfo->done = true; + } + // for each PUT (may have different pvRequest) + auto& pvRequest = putOperation->pvRequest(); + pvRequest["record._options.block"].as(putOperationCache->doWait); + IOCSource::setForceProcessingFlag(putOperation.get(), pvRequest, putOperationCache->forceProcessing); + if (putOperationCache->forceProcessing) { + putOperationCache->doWait = false; // no point in waiting } SecurityLogger securityLogger; IOCSource::doPreProcessing(pDbChannel, securityLogger, - *putOperationCache->credentials, - putOperationCache->securityClient); // pre-process - IOCSource::doFieldPreProcessing(putOperationCache->securityClient); // pre-process field + *sInfo->credentials, + sInfo->securityClient); // pre-process + IOCSource::doFieldPreProcessing(sInfo->securityClient); // pre-process field if (putOperationCache->doWait) { putOperationCache->valueToSet = value; // TODO prevent concurrent put with callbacks (notifyBusy) diff --git a/ioc/singlesrcsubscriptionctx.h b/ioc/singlesrcsubscriptionctx.h index 528d658..8321632 100644 --- a/ioc/singlesrcsubscriptionctx.h +++ b/ioc/singlesrcsubscriptionctx.h @@ -16,11 +16,12 @@ #include "fieldconfig.h" #include "subscriptionctx.h" #include "utilpvt.h" +#include "securityclient.h" namespace pvxs { namespace ioc { -struct SingleInfo : public MappingInfo { +struct SingleInfo : public MappingInfo, public SingleSecurityCache { Channel chan; INST_COUNTER(SingleInfo);