/* * Copyright - See the COPYRIGHT that is included with this distribution. * pvxs is distributed subject to a Software License Agreement found * in file LICENSE that is included with this distribution. * * Author George S. McIntyre , 2023 * */ #include #include #include #include #include #include #include "credentials.h" #include "dberrormessage.h" #include "dblocker.h" #include "dbmanylocker.h" #include "fieldsubscriptionctx.h" #include "groupsource.h" #include "groupsrcsubscriptionctx.h" #include "iocshcommand.h" #include "iocsource.h" #include "securitylogger.h" #include "securityclient.h" #include "localfieldlog.h" namespace pvxs { namespace ioc { DEFINE_LOGGER(_logname, "pvxs.ioc.group.source"); /** * Constructor for GroupSource registrar. */ GroupSource::GroupSource() :eventContext(db_init_events()) // Initialise event context { // Get GroupPv configuration and register each pv name in the server runOnPvxsServer([this](IOCServer* pPvxsServer) { auto names(std::make_shared>()); // Lock map and get names { epicsGuard G(pPvxsServer->groupMapMutex); // For each defined group, add group name to the list of all records for (auto& groupMapEntry: pPvxsServer->groupMap) { auto& groupName = groupMapEntry.first; names->insert(groupName); } } allRecords.names = names; // Start event pump if (!eventContext) { throw std::runtime_error("Group Source: Event Context failed to initialise: db_init_events()"); } if (db_start_events(eventContext.get(), "qsrvGroup", nullptr, nullptr, epicsThreadPriorityCAServerLow - 1)) { throw std::runtime_error("Could not start event thread: db_start_events()"); } }); } /** * Handle the create source operation. This is called once when the source is created. * We will register all of the database records that have been loaded until this time as pv names in this * source. * * @param channelControl channel control object provided by the pvxs framework */ void GroupSource::onCreate(std::unique_ptr&& channelControl) { auto& sourceName = channelControl->name(); log_debug_printf(_logname, "Accepting channel for '%s'\n", sourceName.c_str()); runOnPvxsServer([&](IOCServer* pPvxsServer) { // Create callbacks for handling requests and group subscriptions auto& group = pPvxsServer->groupMap[sourceName]; createRequestAndSubscriptionHandlers(channelControl, group); }); } /** * Respond to search requests. For each matching pv, claim that pv * * @param searchOperation the search operation */ void GroupSource::onSearch(Search& searchOperation) { runOnPvxsServer([&](IOCServer* pPvxsServer) { for (auto& pv: searchOperation) { if (allRecords.names->count(pv.name()) == 1) { pv.claim(); log_debug_printf(_logname, "Claiming '%s'\n", pv.name()); } } }); } /** * Respond to the show request by displaying a list of all the PVs hosted in this ioc * * @param outputStream the stream to show the list on */ void GroupSource::show(std::ostream& outputStream) { outputStream << "IOC"; for (auto& name: *GroupSource::allRecords.names) { outputStream << "\n" << indent{} << name; } } /** * Create request and subscription handlers for group record sources * * @param channelControl the control channel pointer that we got from onCreate * @param group the group that we're creating the request and subscription handlers for */ void GroupSource::createRequestAndSubscriptionHandlers(std::unique_ptr& channelControl, Group& group) { // Get and Put requests channelControl->onOp([&](std::unique_ptr&& channelConnectOperation) { onOp(group, std::move(channelConnectOperation)); }); auto subscriptionContext(std::make_shared(group)); channelControl ->onSubscribe([this, subscriptionContext](std::unique_ptr&& subscriptionOperation) { onSubscribe(subscriptionContext, std::move(subscriptionOperation)); }); } /** * Called when a client pauses / stops a subscription it has been subscribed to. * This function loops over all fields event subscriptions the group subscription context and disables each of them. * * @param groupSubscriptionCtx the group subscription context */ void GroupSource::onDisableSubscription(const std::shared_ptr& groupSubscriptionCtx) { for (auto& fieldSubscriptionCtx: groupSubscriptionCtx->fieldSubscriptionContexts) { auto pValueEventSubscription = fieldSubscriptionCtx.pValueEventSubscription.get(); auto pPropertiesEventSubscription = fieldSubscriptionCtx.pPropertiesEventSubscription.get(); db_event_disable(pValueEventSubscription); db_event_disable(pPropertiesEventSubscription); } } /** * Handler for the onOp event raised by pvxs Sources when they are started, in order to define the get and put handlers * on a per source basis. * This is called after the event has been intercepted and we add the group to the call. * * @param group the group to which the get/put operation pertains * @param channelConnectOperation the channel connect operation object */ void GroupSource::onOp(Group& group, std::unique_ptr&& channelConnectOperation) { // First stage for handling any request is to announce the channel type with a `connect()` call // @note The type signalled here must match the eventual type returned by a pvxs get channelConnectOperation->connect(group.valueTemplate); // register handler for pvxs group get channelConnectOperation->onGet([&group](std::unique_ptr&& getOperation) { get(group, getOperation); }); // Make a security cache for this client's connection to this group // Each time the same client calls put we will re-use the cached security client // The security cache will be deleted when the client disconnects from this group pv auto securityCache = std::make_shared(); // register handler for pvxs group put channelConnectOperation ->onPut([&group, securityCache](std::unique_ptr&& putOperation, Value&& value) { if (!securityCache->done) { // First time we call put we need to initialise the security cache securityCache->securityClients.resize(group.fields.size()); securityCache->credentials.reset(new Credentials(*putOperation->credentials())); auto fieldIndex = 0u; for (auto& field: group.fields) { if (field.value.channel) { securityCache->securityClients[fieldIndex] .update(field.value.channel, *securityCache->credentials); } fieldIndex++; } auto& pvRequest = putOperation->pvRequest(); IOCSource::setForceProcessingFlag(pvRequest, securityCache); securityCache->done = true; } putGroup(group, putOperation, value, *securityCache); }); } /** * Called by the framework when the monitoring client issues a start or stop subscription. We * intercept the framework's call prior to entering here, and add the group subscription context * containing a list of field contexts and their event subscriptions to manage. * * @param groupSubscriptionCtx the group subscription context * @param isStarting true if the client issued a start subscription request, false otherwise */ void GroupSource::onStart(const std::shared_ptr& groupSubscriptionCtx, bool isStarting) { if (isStarting) { onStartSubscription(groupSubscriptionCtx); } else { onDisableSubscription(groupSubscriptionCtx); } } /** * Called when a client starts a subscription it has subscribed to. For each field in the subscription, * enable events and post a single event to both the values and properties event channels to kick things off. * * @param groupSubscriptionCtx the group subscription context containing the field event subscriptions to start */ void GroupSource::onStartSubscription(const std::shared_ptr& groupSubscriptionCtx) { for (auto& fieldSubscriptionCtx: groupSubscriptionCtx->fieldSubscriptionContexts) { auto pValueEventSubscription = fieldSubscriptionCtx.pValueEventSubscription.get(); auto pPropertiesEventSubscription = fieldSubscriptionCtx.pPropertiesEventSubscription.get(); db_event_enable(pValueEventSubscription); db_event_enable(pPropertiesEventSubscription); db_post_single_event(pValueEventSubscription); db_post_single_event(pPropertiesEventSubscription); } } /** * Called by the framework when a client subscribes to a channel. We intercept the call before this function is called * to add a new group subscription context containing a reference to the group. * This function must initialise all of the field's subscription contexts. * * @param groupSubscriptionCtx a new group subscription context * @param subscriptionOperation the group subscription operation */ void GroupSource::onSubscribe(const std::shared_ptr& groupSubscriptionCtx, std::unique_ptr&& subscriptionOperation) const { // inform peer of data type and acquire control of the subscription queue groupSubscriptionCtx->subscriptionControl = subscriptionOperation ->connect(groupSubscriptionCtx->group.valueTemplate); // Initialise the field subscription contexts. One for each group field. // This is stored in the group context groupSubscriptionCtx->fieldSubscriptionContexts.reserve(groupSubscriptionCtx->group.fields.size()); for (auto& field: groupSubscriptionCtx->group.fields) { groupSubscriptionCtx->fieldSubscriptionContexts.emplace_back(field, groupSubscriptionCtx.get()); auto& fieldSubscriptionContext = groupSubscriptionCtx->fieldSubscriptionContexts.back(); // Two subscription are made for each group channel for pvxs if (field.isMeta) { fieldSubscriptionContext .subscribeField(eventContext.get(), subscriptionValueCallback, DBE_ALARM); } else { fieldSubscriptionContext .subscribeField(eventContext.get(), subscriptionValueCallback, DBE_VALUE | DBE_ALARM | DBE_ARCHIVE); } fieldSubscriptionContext .subscribeField(eventContext.get(), subscriptionPropertiesCallback, DBE_PROPERTY, false); } // If all goes well, set up handlers for start and stop monitoring events groupSubscriptionCtx->subscriptionControl->onStart([groupSubscriptionCtx](bool isStarting) { onStart(groupSubscriptionCtx, isStarting); }); } /** * Handle the get operation * * @param group the group to get * @param getOperation the current executing operation */ void GroupSource::get(Group& group, std::unique_ptr& getOperation) { groupGet(group, [&getOperation](Value& value) { getOperation->reply(value); }, [&getOperation](const char* errorMessage) { getOperation->error(errorMessage); }); } /** * Get each field and make up the whole group structure * * @param group group to base result on * @param returnFn function to call with the result * @param errorFn function to call on errors */ void GroupSource::groupGet(Group& group, const std::function& returnFn, const std::function& errorFn) { // Make an empty value to return auto returnValue(group.valueTemplate.cloneEmpty()); // If the group is configured for an atomic get operation, // then we need to get all the fields at once, so we lock them all together // and do the operation in one go if (group.atomicPutGet) { // Lock all the fields DBManyLocker G(group.value.lock); // Loop through all fields for (auto& field: group.fields) { // ignore all zero length named fields that are not meta if (field.name.empty() && !field.isMeta) { continue; } // find the leaf node in which to set the value auto leafNode = field.findIn(returnValue); if (leafNode) { if (!getGroupField(field, leafNode, group.name, errorFn)) { return; } } } // Unlock the all group fields when the locker goes out of scope } else { // Otherwise, this is a non-atomic operation, and we need to `put` each field individually, // locking each of them independently of each other. // Loop through all fields for (auto& field: group.fields) { // ignore all zero length fields that are not meta if (field.name.empty() && !field.isMeta) { continue; } // find the leaf node in which to set the value auto leafNode = field.findIn(returnValue); if (leafNode) { // Lock this field dbChannel* pDbChannel = field.value.channel; DBLocker F(pDbChannel->addr.precord); if (!getGroupField(field, leafNode, group.name, errorFn)) { return; } } } } // Send reply returnFn(returnValue); } /** * Get a group field into the specified Value target object. The group name is provided in case there are errors * to better identify the location of the error when the error function is called with the error text * * @param field the field to get * @param valueTarget the place to store the value retrieved * @param groupName the name of the group that the field is a part of * @param errorFn the function to call if errors occur * @return true if retrieved successfully, false otherwise */ bool GroupSource::getGroupField(const Field& field, Value valueTarget, const std::string& groupName, const std::function& errorFn) { try { LocalFieldLog localFieldLog(field.value.channel); IOCSource::get(field.value.channel, nullptr, valueTarget, field.isMeta ? FOR_METADATA : FOR_VALUE_AND_PROPERTIES, localFieldLog.pFieldLog); } catch (std::exception& e) { std::stringstream errorString; errorString << "Error retrieving value for pvName: " << groupName << (field.name.empty() ? "/" : ".") << field.fullName << " : " << e.what(); errorFn(errorString.str().c_str()); return false; } return true; } /** * Handler invoked when a peer sends data on a PUT * * @param group the group to which the data is posted * @param putOperation the put operation object to use to interact with the client * @param value the value being posted * @param groupSecurityCache the object that caches the security context of client connections */ void GroupSource::putGroup(Group& group, std::unique_ptr& putOperation, const Value& value, const GroupSecurityCache& groupSecurityCache) { try { std::vector securityLoggers(group.fields.size()); // Prepare group put operation auto fieldIndex = 0; for (auto& field: group.fields) { dbChannel* pDbChannel = field.value.channel; if (pDbChannel) { IOCSource::doPreProcessing(pDbChannel, securityLoggers[fieldIndex], *groupSecurityCache.credentials, groupSecurityCache.securityClients[fieldIndex]); if (dbChannelFinalFieldType(pDbChannel) >= DBF_INLINK && dbChannelFinalFieldType(pDbChannel) <= DBF_FWDLINK) { throw std::runtime_error("Links not supported for put"); } } fieldIndex++; } // Reset index for subsequent loops fieldIndex = 0; // If the group is configured for an atomic put operation, // then we need to put all the fields at once, so we lock them all together // and do the operation in one go if (group.atomicPutGet) { // Lock all the fields DBManyLocker G(group.value.lock); // Loop through all fields for (auto& field: group.fields) { // Put the field putGroupField(value, field, groupSecurityCache.securityClients[fieldIndex]); // Do processing if required IOCSource::doPostProcessing(field.value.channel, groupSecurityCache.forceProcessing); fieldIndex++; } // Unlock the all group fields when the locker goes out of scope } else { // Otherwise, this is a non-atomic operation, and we need to `put` each field individually, // locking each of them independently of each other. // Loop through all fields for (auto& field: group.fields) { dbChannel* pDbChannel = field.value.channel; // Lock this field DBLocker F(pDbChannel->addr.precord); // Put the field putGroupField(value, field, groupSecurityCache.securityClients[fieldIndex]); // Do processing if required IOCSource::doPostProcessing(field.value.channel, groupSecurityCache.forceProcessing); // Unlock this field when locker goes out of scope fieldIndex++; } } } catch (std::exception& e) { // Unlock all locked fields when lockers go out of scope // Post error message to put operation object putOperation->error(e.what()); return; } // If all went ok then let the client know putOperation->reply(); } /** * Called by putGroup() to perform the actual put of the given value into the group field specified. * The value will be the whole value template that the group represents but only the fields passed in by * the client will be set. So we simply check to see whether the parts of value that are referenced by the * provided field parameter are included in the given value, and if so, we pull them out and do a low level * database put. * * @param value the sparsely populated value to put into the group's field * @param field the group field to check against * @param securityClient the security client to use to authorise the operation */ void GroupSource::putGroupField(const Value& value, const Field& field, const SecurityClient& securityClient) { // find the leaf node that the field refers to in the given value auto leafNode = field.findIn(value); // If the field references a valid part of the given value then we can send it to the database if (leafNode && leafNode.isMarked()) { SecurityLogger securityLogger; IOCSource::doFieldPreProcessing(securityClient); // pre-process field IOCSource::put(field.value.channel, leafNode); } } /** * Used by both value and property subscriptions, this function will get the database value and return it * to the monitor. It is called whenever a field subscription event is received. * * @param fieldSubscriptionCtx the field subscription context * @param getOperationType the operation this callback serves * @param pDbFieldLog the database field log */ void GroupSource::subscriptionCallback(FieldSubscriptionCtx* fieldSubscriptionCtx, const GetOperationType getOperationType, struct db_field_log* pDbFieldLog) { // Find the group subscription context from the field subscription context auto& pGroupCtx = fieldSubscriptionCtx->pGroupCtx; // Also find the field auto field = fieldSubscriptionCtx->field; // Get the current value of this group subscription // We simply merge new field changes onto this value as events occur auto currentValue = pGroupCtx->currentValue; // Lock only fields triggered by this field DBManyLocker G(getOperationType <= FOR_METADATA ? field->value.lock : field->properties.lock); // for all triggered fields get the values. Assumes that self has been added to triggered list for (auto& pTriggeredField: field->triggers) { // Find leaf node within the current value. This will be a reference into the currentValue. // So that if we assign the leafNode with the value we `get()` back, then currentValue will be updated auto leafNode = pTriggeredField->findIn(currentValue); if (leafNode) { dbChannel* channelToUse = (getOperationType == FOR_PROPERTIES) ? pTriggeredField->properties.channel : pTriggeredField->value.channel; LocalFieldLog localFieldLog(channelToUse, (pTriggeredField == field) ? pDbFieldLog : nullptr); IOCSource::get(pTriggeredField->value.channel, pTriggeredField->properties.channel, leafNode, getOperationType, localFieldLog.pFieldLog); } } // Make sure that the initial subscription update has occurred on all channels before replying // As we make two initial updates when opening a new subscription, for each field, // we need all updates for all fields to have completed before continuing if (!pGroupCtx->eventsPrimed) { for (auto& fieldCtx: pGroupCtx->fieldSubscriptionContexts) { if (!fieldCtx.hadValueEvent || !fieldCtx.hadPropertyEvent) { return; } } pGroupCtx->eventsPrimed = true; } // If events have been primed then return the value to the subscriber, // and unmark all accumulated changes pGroupCtx->subscriptionControl->post(currentValue.clone()); currentValue.unmark(); // Unlock fields in group when locker goes out of scope } /** * This callback handles notifying of updates to subscribed-to pv values. * * @param userArg the user argument passed to the callback function from the framework: a FieldSubscriptionCtx * @param pDbFieldLog the database field log containing the changes being notified */ void GroupSource::subscriptionValueCallback(void* userArg, dbChannel*, int, struct db_field_log* pDbFieldLog) { auto subscriptionContext = (FieldSubscriptionCtx*)userArg; { epicsGuard G((subscriptionContext->pGroupCtx)->eventLock); subscriptionContext->hadValueEvent = true; } subscriptionCallback(subscriptionContext, subscriptionContext->field->isMeta ? FOR_METADATA : FOR_VALUE, pDbFieldLog); } /** * This callback handles notifying of updates to subscribed-to pv properties. * * @param userArg the user argument passed to the callback function from the framework: a FieldSubscriptionCtx * @param pDbFieldLog the database field log containing the changes being notified */ void GroupSource::subscriptionPropertiesCallback(void* userArg, dbChannel*, int, struct db_field_log* pDbFieldLog) { auto subscriptionContext = (FieldSubscriptionCtx*)userArg; { epicsGuard G((subscriptionContext->pGroupCtx)->eventLock); subscriptionContext->hadPropertyEvent = true; } if (!subscriptionContext->field->isMeta) { subscriptionCallback(subscriptionContext, FOR_PROPERTIES, pDbFieldLog); } } } // ioc } // pvxs