Files
pvxs/ioc/singlesource.cpp
T
Michael Davidsaver a02b6e9730 ioc: improve long string detection
Partially replicate the '$' handling logic of dbChannelCreate()
in the case where no '$' was provided.
2023-06-18 10:07:16 -07:00

443 lines
18 KiB
C++

/*
* Copyright - See the COPYRIGHT that is included with this distribution.
* pvxs is distributed subject to a Software License Agreement found
* in file LICENSE that is included with this distribution.
*
* Author George S. McIntyre <george@level-n.com>, 2023
*
*/
#include <algorithm>
#include <cmath>
#include <string>
#include <dbAccess.h>
#include <dbChannel.h>
#include <dbEvent.h>
#include <dbStaticLib.h>
#include <special.h>
#include <pvxs/log.h>
#include <pvxs/nt.h>
#include <pvxs/source.h>
#include <dbNotify.h>
#include "dbentry.h"
#include "dberrormessage.h"
#include "dblocker.h"
#include "iocsource.h"
#include "singlesource.h"
#include "singlesrcsubscriptionctx.h"
#include "credentials.h"
#include "securitylogger.h"
#include "securityclient.h"
#include "typeutils.h"
#include "localfieldlog.h"
namespace pvxs {
namespace ioc {
DEFINE_LOGGER(_logname, "pvxs.ioc.single.source");
DEFINE_INST_COUNTER(PutOperationCache);
DEFINE_INST_COUNTER(SingleInfo);
namespace {
void subscriptionCallback(SingleSourceSubscriptionCtx* subscriptionContext,
UpdateType::type change,
dbChannel* pChannel,
struct db_field_log* pDbFieldLog) {
try {
// Get the current value of this subscription
// We simply merge new field changes onto this value as events occur
auto currentValue = subscriptionContext->currentValue;
{
DBLocker F(dbChannelRecord(subscriptionContext->info->chan));
// TODO MappingInfo::nsecMask
IOCSource::get(currentValue, MappingInfo(), Value(), change, pChannel, pDbFieldLog);
}
// Make sure that the initial subscription update has occurred on both channels before continuing
// As we make two initial updates when opening a new subscription, we need both to have completed before continuing
if (subscriptionContext->hadValueEvent && subscriptionContext->hadPropertyEvent) {
// Return value
subscriptionContext->subscriptionControl->post(currentValue.clone());
currentValue.unmark();
}
} catch(std::exception& e) {
log_exc_printf(_logname, "Unhandled exception in %s\n", __func__);
}
}
void subscriptionValueCallback(void* userArg, struct dbChannel* pChannel, int, struct db_field_log* pDbFieldLog) {
auto subscriptionContext = (SingleSourceSubscriptionCtx*)userArg;
subscriptionContext->hadValueEvent = true;
auto change = UpdateType::type(UpdateType::Value | UpdateType::Alarm);
#if EPICS_VERSION_INT >= VERSION_INT(7, 0, 6, 0)
if(pDbFieldLog) {
// when available, use DBE mask from db_field_log
change = UpdateType::type(pDbFieldLog->mask & UpdateType::Everything);
}
#endif
subscriptionCallback(subscriptionContext, change, pChannel, pDbFieldLog);
}
void subscriptionPropertiesCallback(void* userArg, struct dbChannel* pChannel, int,
struct db_field_log* pDbFieldLog) {
auto subscriptionContext = (SingleSourceSubscriptionCtx*)userArg;
subscriptionContext->hadPropertyEvent = true;
subscriptionCallback(subscriptionContext, UpdateType::Property, pChannel, pDbFieldLog);
}
/**
* Called by the framework when a client subscribes to a channel. We intercept the call before this function is called
* to add a new subscription context with a value prototype matching the channel definition.
*
* @param subscriptionContext a new subscription context with a value prototype matching the channel
* @param subscriptionOperation the channel subscription operation
*/
void onSubscribe(const std::shared_ptr<SingleSourceSubscriptionCtx>& subscriptionContext,
const DBEventContext& eventContext,
std::unique_ptr<server::MonitorSetupOp>&& subscriptionOperation)
{
// inform peer of data type and acquire control of the subscription queue
subscriptionContext->subscriptionControl = subscriptionOperation->connect(subscriptionContext->currentValue);
IOCSource::initialize(subscriptionContext->currentValue,
*subscriptionContext->info,
subscriptionContext->info->chan);
// Two subscription are made for pvxs
// first subscription is for Value changes
subscriptionContext->pValueEventSubscription.subscribe(eventContext.get(),
subscriptionContext->info->chan,
subscriptionValueCallback,
subscriptionContext.get(),
DBE_VALUE | DBE_ALARM | DBE_ARCHIVE
);
// second subscription is for Property changes
subscriptionContext->pPropertiesEventSubscription.subscribe(eventContext.get(),
subscriptionContext->pPropertiesChannel,
subscriptionPropertiesCallback,
subscriptionContext.get(),
DBE_PROPERTY
);
// If all goes well, Set up handlers for start and stop monitoring events
// The subscription context is being kept alive because it is being bound into some internal storage by onStart
subscriptionContext->subscriptionControl->onStart([subscriptionContext](bool isStarting) {
if (isStarting) {
subscriptionContext->eventsEnabled = true;
subscriptionContext->pValueEventSubscription.enable();
subscriptionContext->pPropertiesEventSubscription.enable();
} else {
subscriptionContext->pValueEventSubscription.disable();
subscriptionContext->pPropertiesEventSubscription.disable();
subscriptionContext->eventsEnabled = false;
}
});
}
/**
* Create a Value Prototype for storing values returned by the given channel.
*
* @param dbChannelSharedPtr pointer to the channel
* @return a value prototype for the given channel
*/
Value getValuePrototype(const std::shared_ptr<SingleInfo>& sinfo) {
auto& chan(sinfo->chan);
short dbrType(dbChannelFinalFieldType(chan));
auto valueType(IOCSource::getChannelValueType(chan));
Value valuePrototype;
// To control optional metadata set to true to include in the output
bool display = true;
bool control = true;
bool valueAlarm = true;
if (dbrType == DBR_ENUM) {
valuePrototype = nt::NTEnum{}.create();
} else {
valuePrototype = nt::NTScalar{ valueType, display, control, valueAlarm, true }.create();
}
return valuePrototype;
}
/**
* Callback for asynchronous put operations to handle the actual put value operation
*
* @param notify the process notify object to use
* @param type the put notification type
* @return 1 for success and 0 for errors
*/
int putCallback(struct processNotify* notify, notifyPutType type) {
if (notify->status != notifyOK) {
return 0;
}
auto pPutOperationCache = (PutOperationCache*)notify->usrPvt;
auto valueToSet = std::move(pPutOperationCache->valueToSet);
switch (type) {
case putDisabledType:
// Request has been made but the record has been disabled, so noop and only call done callback
return 0;
case putFieldType:
// As this type will be only called for Links the IOCSource::put() will handle the locking as a special case
case putType:
// For this type, the caller has already locked the record, so we'll not lock either
IOCSource::put(pPutOperationCache->notify.chan, valueToSet, MappingInfo()); // put
break;
}
return 1;
}
/**
* Callback when asynchronous put's are complete
*
* @param notify the process notify object to use
*/
void doneCallback(struct processNotify* notify) {
// Get our put operation cache object from the user pointer
auto pPutOperationCache = (PutOperationCache*)notify->usrPvt;
// Get the cached putOperation controller
auto putOperation = std::move(pPutOperationCache->putOperation);
// TODO handle cancelled requests
// int expected = 1;
// if (std::atomic_compare_exchange_weak(&pPutOperationCache->notifyBusy, &expected, 0) == 0) {
// std::cerr << "SinglePut dbNotify state error?\n";
// }
switch (notify->status) {
case notifyOK:
// If everything is ok then notify the caller
putOperation->reply();
break;
case notifyCanceled:
return; // skip notification
case notifyError:
putOperation->error("Error in dbNotify");
break;
case notifyPutDisabled:
putOperation->error("Put disabled");
break;
}
}
/**
* Handle the get operation
*
* @param pDbChannel the channel that the request comes in on
* @param getOperation the current executing operation
* @param valuePrototype a value prototype that is made based on the expected type to be returned
*/
void singleGet(const SingleInfo& info,
std::unique_ptr<server::ExecOp>& getOperation,
const Value& valuePrototype) {
auto& pDbChannel(info.chan);
try {
auto returnValue = valuePrototype.cloneEmpty();
// TODO: MappingInfo::nsecMask
IOCSource::initialize(returnValue, info, pDbChannel);
{
DBLocker F(pDbChannel->addr.precord); // lock
LocalFieldLog localFieldLog(pDbChannel);
IOCSource::get(returnValue, info,
Value(), UpdateType::Everything,
pDbChannel, localFieldLog.pFieldLog);
}
getOperation->reply(returnValue);
} catch (const std::exception& getException) {
getOperation->error(getException.what());
}
}
/**
* Handler for the onOp event raised by pvxs Sources when they are started, in order to define the get and put handlers
* on a per source basis.
* This is called after the event has been intercepted and we add the channel and value prototype to the call.
*
* @param dbChannelSharedPtr the channel to which the get/put operation pertains
* @param valuePrototype the value prototype that is appropriate for the given channel
* @param channelConnectOperation the channel connect operation object
*/
void onOp(const std::shared_ptr<SingleInfo>& sInfo, const Value& valuePrototype,
std::unique_ptr<server::ConnectOp>&& channelConnectOperation) {
// Announce the channel type with a `connect()` call. This happens only once
channelConnectOperation->connect(valuePrototype);
// Set up handler for get requests
channelConnectOperation
->onGet([sInfo, valuePrototype](std::unique_ptr<server::ExecOp>&& getOperation) {
singleGet(*sInfo, getOperation, valuePrototype);
});
// Make a security cache for this client's connection to this pv
// Each time the same client calls put we will re-use the cached security client
// The security cache will be deleted when the client disconnects from this pv
auto putOperationCache = std::make_shared<PutOperationCache>();
// Set up handler for put requests
channelConnectOperation
->onPut([sInfo, putOperationCache](
std::unique_ptr<server::ExecOp>&& putOperation,
Value&& value) {
try {
dbChannel* pDbChannel = sInfo->chan;
if (!putOperationCache->done) {
putOperationCache->credentials.reset(new Credentials(*putOperation->credentials()));
putOperationCache->securityClient.update(pDbChannel, *putOperationCache->credentials);
putOperationCache->notify.usrPvt = putOperationCache.get();
putOperationCache->notify.chan = pDbChannel;
putOperationCache->notify.putCallback = putCallback;
putOperationCache->notify.doneCallback = doneCallback;
auto& pvRequest = putOperation->pvRequest();
pvRequest["record._options.block"].as<bool>(putOperationCache->doWait);
IOCSource::setForceProcessingFlag(pvRequest, putOperationCache);
if (putOperationCache->forceProcessing) {
putOperationCache->doWait = false; // no point in waiting
}
putOperationCache->done = true;
}
SecurityLogger securityLogger;
IOCSource::doPreProcessing(pDbChannel,
securityLogger,
*putOperationCache->credentials,
putOperationCache->securityClient); // pre-process
IOCSource::doFieldPreProcessing(putOperationCache->securityClient); // pre-process field
if (putOperationCache->doWait) {
putOperationCache->valueToSet = value;
// TODO prevent concurrent put with callbacks (notifyBusy)
putOperationCache->notify.requestType = value["value"].isMarked() ? putProcessRequest
: processRequest;
putOperationCache->putOperation = std::move(putOperation);
dbProcessNotify(&putOperationCache->notify);
return;
}
CurrentOp op(putOperation.get());
if (dbChannelFieldType(pDbChannel) >= DBF_INLINK
&& dbChannelFieldType(pDbChannel) <= DBF_FWDLINK) {
// Locking is handled by dbPutField() called as a special case in IOCSource::put() for links
IOCSource::put(pDbChannel, value, MappingInfo()); // put
} else {
// All other field types call dbChannelPut() directly, so we have to perform locking here
DBLocker F(pDbChannel->addr.precord); // lock
IOCSource::put(pDbChannel, value, MappingInfo()); // put
IOCSource::doPostProcessing(pDbChannel, putOperationCache->forceProcessing); // post-process
}
putOperation->reply();
} catch (std::exception& e) {
putOperation->error(e.what());
}
});
}
} // namespace
/**
* Constructor for SingleSource registrar.
*/
SingleSource::SingleSource()
:eventContext(db_init_events()) // Initialise event context
{
auto names(std::make_shared<std::set<std::string >>());
// For each record type and for each record in that type, add record name to the list of all records
DBEntry dbEntry;
for (long status = dbFirstRecordType(dbEntry); !status; status = dbNextRecordType(dbEntry)) {
for (status = dbFirstRecord(dbEntry); !status; status = dbNextRecord(dbEntry)) {
names->insert(dbEntry->precnode->recordname);
}
}
allRecords.names = names;
// Start event pump
if (!eventContext) {
throw std::runtime_error("Single Source: Event Context failed to initialise: db_init_events()");
}
if (db_start_events(eventContext.get(), "qsrvSingle", nullptr, nullptr, epicsThreadPriorityCAServerLow - 1)) {
throw std::runtime_error("Could not start event thread: db_start_events()");
}
}
/**
* Handle the create source operation. This is called once when the source is created.
* We will register all of the database records that have been loaded until this time as pv names in this
* source.
* @param channelControl
*/
void SingleSource::onCreate(std::unique_ptr<server::ChannelControl>&& channelControl) {
auto sourceName(channelControl->name().c_str());
Channel pDbChannel;
try {
pDbChannel = Channel(sourceName);
} catch (std::exception& e) {
log_debug_printf(_logname, "Ignore requested channel '%s' : %s\n", sourceName, e.what());
return;
}
log_debug_printf(_logname, "Accepting channel for '%s'\n", sourceName);
auto sInfo(std::make_shared<SingleInfo>(std::move(pDbChannel)));
// Create callbacks for handling requests and channel subscriptions
Value valuePrototype = getValuePrototype(sInfo);
// Get and Put requests
channelControl
->onOp([sInfo, valuePrototype](std::unique_ptr<server::ConnectOp>&& channelConnectOperation) {
onOp(sInfo, valuePrototype, std::move(channelConnectOperation));
});
// binding 'this' safe as Server shutdown will close connections before dropping Source
channelControl
->onSubscribe([this, valuePrototype, sInfo](
std::unique_ptr<server::MonitorSetupOp>&& subscriptionOperation) {
// The subscription must be kept alive
// We accomplish this further on during the binding of the onStart()
auto subscriptionContext(std::make_shared<SingleSourceSubscriptionCtx>(sInfo));
subscriptionContext->currentValue = valuePrototype.cloneEmpty();
onSubscribe(subscriptionContext, eventContext, std::move(subscriptionOperation));
});
}
/**
* Respond to search requests. For each matching pv, claim that pv
*
* @param searchOperation the search operation
*/
void SingleSource::onSearch(Search& searchOperation) {
for (auto& pv: searchOperation) {
if (!dbChannelTest(pv.name())) {
pv.claim();
log_debug_printf(_logname, "Claiming '%s'\n", pv.name());
}
}
}
/**
* Respond to the show request by displaying a list of all the PVs hosted in this ioc
*
* @param outputStream the stream to show the list on
*/
void SingleSource::show(std::ostream& outputStream) {
outputStream << "IOC";
for (auto& name: *SingleSource::allRecords.names) {
outputStream << "\n" << indent{} << name;
}
}
} // ioc
} // pvxs