/* * Copyright - See the COPYRIGHT that is included with this distribution. * pvxs is distributed subject to a Software License Agreement found * in file LICENSE that is included with this distribution. * * Author George S. McIntyre , 2023 * */ #include #include #include #include #include #include #include "credentials.h" #include "dberrormessage.h" #include "dblocker.h" #include "dbmanylocker.h" #include "fieldsubscriptionctx.h" #include "groupsource.h" #include "groupsrcsubscriptionctx.h" #include "iocshcommand.h" #include "iocsource.h" #include "securitylogger.h" #include "securityclient.h" #include "localfieldlog.h" namespace pvxs { namespace ioc { DEFINE_LOGGER(_logname, "pvxs.ioc.group.source"); DEFINE_INST_COUNTER(GroupSourceSubscriptionCtx); DEFINE_INST_COUNTER(GroupSecurityCache); /** * Constructor for GroupSource registrar. */ GroupSource::GroupSource() :eventContext(db_init_events()) // Initialise event context ,config(IOCGroupConfig::instance()) { // Get GroupPv configuration and register each pv name in the server auto names(std::make_shared>()); // Lock map and get names { epicsGuard G(config.groupMapMutex); // For each defined group, add group name to the list of all records for (auto& groupMapEntry: config.groupMap) { auto& groupName = groupMapEntry.first; names->insert(groupName); } } allRecords.names = names; // Start event pump if (!eventContext) { throw std::runtime_error("Group Source: Event Context failed to initialise: db_init_events()"); } if (db_start_events(eventContext.get(), "qsrvGroup", nullptr, nullptr, epicsThreadPriorityCAServerLow - 1)) { throw std::runtime_error("Could not start event thread: db_start_events()"); } } /** * Handle the create source operation. This is called once when the source is created. * We will register all of the database records that have been loaded until this time as pv names in this * source. * * @param channelControl channel control object provided by the pvxs framework */ void GroupSource::onCreate(std::unique_ptr&& channelControl) { auto& sourceName = channelControl->name(); log_debug_printf(_logname, "Accepting channel for '%s'\n", sourceName.c_str()); // Create callbacks for handling requests and group subscriptions auto it(config.groupMap.find(sourceName)); if(it != config.groupMap.end()) { auto& group(it->second); channelControl->onOp([&](std::unique_ptr&& channelConnectOperation) { onOp(group, std::move(channelConnectOperation)); }); channelControl ->onSubscribe([this, &group](std::unique_ptr&& subscriptionOperation) { // The group subscription must be kept alive // We accomplish this further on during the binding of the onStart() auto subscriptionContext(std::make_shared(group)); onSubscribe(subscriptionContext, std::move(subscriptionOperation)); }); } } /** * Respond to search requests. For each matching pv, claim that pv * * @param searchOperation the search operation */ void GroupSource::onSearch(Search& searchOperation) { for (auto& pv: searchOperation) { if (allRecords.names->count(pv.name()) == 1) { pv.claim(); log_debug_printf(_logname, "Claiming '%s'\n", pv.name()); } } } /** * Respond to the show request by displaying a list of all the PVs hosted in this ioc * * @param outputStream the stream to show the list on */ void GroupSource::show(std::ostream& outputStream) { outputStream << "IOC"; for (auto& name: *GroupSource::allRecords.names) { outputStream << "\n" << indent{} << name; } } /** * Called when a client pauses / stops a subscription it has been subscribed to. * This function loops over all fields event subscriptions the group subscription context and disables each of them. * * @param groupSubscriptionCtx the group subscription context */ void GroupSource::onDisableSubscription(const std::shared_ptr& groupSubscriptionCtx) { for (auto& fieldSubscriptionCtx: groupSubscriptionCtx->fieldSubscriptionContexts) { fieldSubscriptionCtx.pValueEventSubscription.disable(); fieldSubscriptionCtx.pPropertiesEventSubscription.disable(); } groupSubscriptionCtx->eventsEnabled = false; } /** * Handler for the onOp event raised by pvxs Sources when they are started, in order to define the get and put handlers * on a per source basis. * This is called after the event has been intercepted and we add the group to the call. * * @param group the group to which the get/put operation pertains * @param channelConnectOperation the channel connect operation object */ void GroupSource::onOp(Group& group, std::unique_ptr&& channelConnectOperation) { // First stage for handling any request is to announce the channel type with a `connect()` call // @note The type signalled here must match the eventual type returned by a pvxs get channelConnectOperation->connect(group.valueTemplate); // register handler for pvxs group get channelConnectOperation->onGet([&group](std::unique_ptr&& getOperation) { get(group, getOperation); }); // Make a security cache for this client's connection to this group // Each time the same client calls put we will re-use the cached security client // The security cache will be deleted when the client disconnects from this group pv auto securityCache = std::make_shared(); // register handler for pvxs group put channelConnectOperation ->onPut([&group, securityCache](std::unique_ptr&& putOperation, Value&& value) { if (!securityCache->done) { // First time we call put we need to initialise the security cache securityCache->securityClients.resize(group.fields.size()); securityCache->credentials.reset(new Credentials(*putOperation->credentials())); auto fieldIndex = 0u; for (auto& field: group.fields) { if (field.value) { securityCache->securityClients[fieldIndex] .update(field.value, *securityCache->credentials); } fieldIndex++; } auto& pvRequest = putOperation->pvRequest(); IOCSource::setForceProcessingFlag(pvRequest, securityCache); securityCache->done = true; } putGroup(group, putOperation, value, *securityCache); }); } /** * Called by the framework when the monitoring client issues a start or stop subscription. We * intercept the framework's call prior to entering here, and add the group subscription context * containing a list of field contexts and their event subscriptions to manage. * * @param groupSubscriptionCtx the group subscription context * @param isStarting true if the client issued a start subscription request, false otherwise */ void GroupSource::onStart(const std::shared_ptr& groupSubscriptionCtx, bool isStarting) { if (isStarting) { onStartSubscription(groupSubscriptionCtx); } else { onDisableSubscription(groupSubscriptionCtx); } } static void subscriptionPost(GroupSourceSubscriptionCtx *pGroupCtx) { // Make sure that the initial subscription update has occurred on all channels before replying // As we make two initial updates when opening a new subscription, for each field, // we need all updates for all fields to have completed before continuing if (!pGroupCtx->eventsPrimed) { for (auto& fieldCtx: pGroupCtx->fieldSubscriptionContexts) { if (!fieldCtx.hadValueEvent || !fieldCtx.hadPropertyEvent) { return; } } pGroupCtx->eventsPrimed = true; } auto& currentValue = pGroupCtx->currentValue; // If events have been primed then return the value to the subscriber, // and unmark all accumulated changes pGroupCtx->subscriptionControl->post(currentValue.clone()); currentValue.unmark(); } /** * Called when a client starts a subscription it has subscribed to. For each field in the subscription, * enable events and post a single event to both the values and properties event channels to kick things off. * * @param groupSubscriptionCtx the group subscription context containing the field event subscriptions to start */ void GroupSource::onStartSubscription(const std::shared_ptr& groupSubscriptionCtx) { groupSubscriptionCtx->eventsEnabled = true; for (auto& fieldSubscriptionCtx: groupSubscriptionCtx->fieldSubscriptionContexts) { fieldSubscriptionCtx.pValueEventSubscription.enable(); fieldSubscriptionCtx.pPropertiesEventSubscription.enable(); } // maybe post initial here in pathological case with no +channel. (eg. all const) subscriptionPost(groupSubscriptionCtx.get()); } /** * This callback handles notifying of updates to subscribed-to pv values. * * @param userArg the user argument passed to the callback function from the framework: a FieldSubscriptionCtx * @param pDbFieldLog the database field log containing the changes being notified */ static void subscriptionValueCallback(void* userArg, dbChannel* pChannel, int, struct db_field_log* pDbFieldLog) { try { auto fieldSubscriptionCtx = (FieldSubscriptionCtx*)userArg; fieldSubscriptionCtx->hadValueEvent = true; // Find the group subscription context from the field subscription context auto& pGroupCtx = fieldSubscriptionCtx->pGroupCtx; // Also find the field auto& field = *fieldSubscriptionCtx->field; auto currentValue = pGroupCtx->currentValue; // lock all records to be triggered DBManyLocker G(field.lock); for (auto& pTriggeredField: field.triggers) { auto leafNode = pTriggeredField->findIn(currentValue); dbChannel *channelToUse = pTriggeredField->value; bool isSelfTrig = channelToUse==pChannel; auto change = UpdateType::type(UpdateType::Value | UpdateType::Alarm); #if EPICS_VERSION_INT >= VERSION_INT(7, 0, 6, 0) if(isSelfTrig && pDbFieldLog) { // when available, use DBE mask from db_field_log change = UpdateType::type(pDbFieldLog->mask & UpdateType::Everything); } #endif LocalFieldLog localFieldLog(channelToUse, isSelfTrig ? pDbFieldLog : nullptr); IOCSource::get(leafNode, pTriggeredField->info, pTriggeredField->anyType, change, channelToUse, localFieldLog.pFieldLog); } subscriptionPost(pGroupCtx); } catch(std::exception& e) { log_exc_printf(_logname, "Unhandled exception in %s\n", __func__); } } static void subscriptionPropertiesCallback(void* userArg, dbChannel* pChannel, int, struct db_field_log* pDbFieldLog) { try { auto subscriptionContext = (FieldSubscriptionCtx*)userArg; subscriptionContext->hadPropertyEvent = true; auto& field(*subscriptionContext->field); auto fieldValue(field.findIn(subscriptionContext->pGroupCtx->currentValue)); /* For a property update, we (may) only post changes to the field mapping * in question. But never the triggered fields. */ DBLocker L(dbChannelRecord(pChannel)); LocalFieldLog localFieldLog(pChannel, pDbFieldLog); IOCSource::get(fieldValue, field.info, field.anyType, UpdateType::Property, pChannel, pDbFieldLog); subscriptionPost(subscriptionContext->pGroupCtx); } catch(std::exception& e) { log_exc_printf(_logname, "Unhandled exception in %s\n", __func__); } } /** * Called by the framework when a client subscribes to a channel. We intercept the call before this function is called * to add a new group subscription context containing a reference to the group. * This function must initialise all of the field's subscription contexts. * * @param groupSubscriptionCtx a new group subscription context * @param subscriptionOperation the group subscription operation */ void GroupSource::onSubscribe(const std::shared_ptr& groupSubscriptionCtx, std::unique_ptr&& subscriptionOperation) const { // inform peer of data type and acquire control of the subscription queue groupSubscriptionCtx->subscriptionControl = subscriptionOperation ->connect(groupSubscriptionCtx->group.valueTemplate); server::MonitorStat stats; groupSubscriptionCtx->subscriptionControl->stats(stats); // include actual negotiated queue size with initial update groupSubscriptionCtx->currentValue["record._options.queueSize"] = stats.limitQueue; groupSubscriptionCtx->currentValue["record._options.atomic"] = true; // Initialise the field subscription contexts. One for each group field. // This is stored in the group context groupSubscriptionCtx->fieldSubscriptionContexts.reserve(groupSubscriptionCtx->group.fields.size()); for (auto& field: groupSubscriptionCtx->group.fields) { groupSubscriptionCtx->fieldSubscriptionContexts.emplace_back(field, groupSubscriptionCtx.get()); auto& fieldSubscriptionContext = groupSubscriptionCtx->fieldSubscriptionContexts.back(); if(field.info.type == MappingInfo::Const) { auto fld(field.findIn(groupSubscriptionCtx->currentValue)); // only populate const values once fld.assign(field.info.cval); continue; // nothing to subscribe } else if(!field.value) { continue; // no associated dbChannel } auto leafNode = field.findIn(groupSubscriptionCtx->currentValue); IOCSource::initialize(leafNode, field.info, field.value); // Two subscription are made for each group channel for pvxs // one for value|alarm changes if (field.info.type == MappingInfo::Meta) { fieldSubscriptionContext .subscribeField(eventContext.get(), subscriptionValueCallback, DBE_ALARM); } else { fieldSubscriptionContext .subscribeField(eventContext.get(), subscriptionValueCallback, DBE_VALUE | DBE_ALARM | DBE_ARCHIVE); } // one for property changes if (field.info.type == MappingInfo::Meta || field.info.type == MappingInfo::Scalar) { // only scalar and meta mappings include property metadata (display, control, ...) fieldSubscriptionContext .subscribeField(eventContext.get(), subscriptionPropertiesCallback, DBE_PROPERTY, false); } else { fieldSubscriptionContext.hadPropertyEvent = true; } } // If all goes well, set up handlers for start and stop monitoring events // The group subscription context is being kept alive because it is being bound into some internal storage by onStart groupSubscriptionCtx->subscriptionControl->onStart([groupSubscriptionCtx](bool isStarting) { onStart(groupSubscriptionCtx, isStarting); }); } static bool getGroupField(const Field& field, Value valueTarget, const std::string& groupName, const std::unique_ptr& getOperation) { try { IOCSource::initialize(valueTarget, field.info, field.value); LocalFieldLog localFieldLog(field.value); IOCSource::get(valueTarget, field.info, field.anyType, UpdateType::Everything, field.value, localFieldLog.pFieldLog); } catch (std::exception& e) { std::stringstream errorString; errorString << "Error retrieving value for pvName: " << groupName << (field.name.empty() ? "/" : ".") << field.fullName << " : " << e.what(); getOperation->error(errorString.str()); return false; } return true; } /** * Handle the get operation * * @param group the group to get * @param getOperation the current executing operation */ void GroupSource::get(Group& group, const std::unique_ptr& getOperation) { bool atomic = group.atomicPutGet; getOperation->pvRequest()["record._options.atomic"].as(atomic); // Make an empty value to return auto returnValue(group.valueTemplate.cloneEmpty()); returnValue["record._options.atomic"] = atomic; // If the group is configured for an atomic get operation, // then we need to get all the fields at once, so we lock them all together // and do the operation in one go if (atomic) { // Lock all the fields DBManyLocker G(group.value.lock); // Loop through all fields for (auto& field: group.fields) { if(field.info.type == MappingInfo::Proc || field.info.type==MappingInfo::Structure) continue; // find the leaf node in which to set the value auto leafNode = field.findIn(returnValue); if (!getGroupField(field, leafNode, group.name, getOperation)) { return; } } // Unlock the all group fields when the locker goes out of scope } else { // Otherwise, this is a non-atomic operation, and we need to `put` each field individually, // locking each of them independently of each other. // Loop through all fields for (auto& field: group.fields) { dbChannel* pDbChannel = field.value; // find the leaf node in which to set the value auto leafNode = field.findIn(returnValue); if (pDbChannel && leafNode) { // Lock this field DBLocker F(pDbChannel->addr.precord); if (!getGroupField(field, leafNode, group.name, getOperation)) { return; } } } } // Send reply getOperation->reply(returnValue); } /** * Handler invoked when a peer sends data on a PUT * * @param group the group to which the data is posted * @param putOperation the put operation object to use to interact with the client * @param value the value being posted * @param groupSecurityCache the object that caches the security context of client connections */ void GroupSource::putGroup(Group& group, std::unique_ptr& putOperation, const Value& value, const GroupSecurityCache& groupSecurityCache) { try { CurrentOp op(putOperation.get()); bool atomic = group.atomicPutGet; putOperation->pvRequest()["record._options.atomic"].as(atomic); std::vector securityLoggers(group.fields.size()); // Prepare group put operation auto fieldIndex = 0; for (auto& field: group.fields) { if (dbChannel* pDbChannel = field.value) { IOCSource::doPreProcessing(pDbChannel, securityLoggers[fieldIndex], *groupSecurityCache.credentials, groupSecurityCache.securityClients[fieldIndex]); if (dbChannelFinalFieldType(pDbChannel) >= DBF_INLINK && dbChannelFinalFieldType(pDbChannel) <= DBF_FWDLINK) { throw std::runtime_error("Links not supported for put"); } } fieldIndex++; } // Reset index for subsequent loops fieldIndex = 0; // If the group is configured for an atomic put operation, // then we need to put all the fields at once, so we lock them all together // and do the operation in one go if (atomic) { // Lock all the fields DBManyLocker G(group.value.lock); // Loop through all fields for (auto& field: group.fields) { // Put the field putGroupField(value, field, groupSecurityCache.securityClients[fieldIndex]); // Do processing if required IOCSource::doPostProcessing(field.value, groupSecurityCache.forceProcessing); fieldIndex++; } // Unlock the all group fields when the locker goes out of scope } else { // Otherwise, this is a non-atomic operation, and we need to `put` each field individually, // locking each of them independently of each other. // Loop through all fields for (auto& field: group.fields) { dbChannel* pDbChannel = field.value; if(!pDbChannel) continue; // Lock this field DBLocker F(pDbChannel->addr.precord); // Put the field putGroupField(value, field, groupSecurityCache.securityClients[fieldIndex]); // Do processing if required IOCSource::doPostProcessing(field.value, groupSecurityCache.forceProcessing); // Unlock this field when locker goes out of scope fieldIndex++; } } } catch (std::exception& e) { // Unlock all locked fields when lockers go out of scope // Post error message to put operation object putOperation->error(e.what()); return; } // If all went ok then let the client know putOperation->reply(); } /** * Called by putGroup() to perform the actual put of the given value into the group field specified. * The value will be the whole value template that the group represents but only the fields passed in by * the client will be set. So we simply check to see whether the parts of value that are referenced by the * provided field parameter are included in the given value, and if so, we pull them out and do a low level * database put. * * @param value the sparsely populated value to put into the group's field * @param field the group field to check against * @param securityClient the security client to use to authorise the operation */ void GroupSource::putGroupField(const Value& value, const Field& field, const SecurityClient& securityClient) { // find the leaf node that the field refers to in the given value auto leafNode = field.findIn(value); // If the field references a valid part of the given value then we can send it to the database if (leafNode && leafNode.isMarked()) { IOCSource::doFieldPreProcessing(securityClient); // pre-process field IOCSource::put(field.value, leafNode, field.info); } } } // ioc } // pvxs