5353b02a33
No point to wait for processing, when none will occur.
However, wait as requested when Unset or True.
Somewhere between 409b432dd96b65e8c69d35fe8810081b8ff87a0a
and 93e4d3eef3
the sense of this test was lost.
490 lines
19 KiB
C++
490 lines
19 KiB
C++
/*
|
|
* Copyright - See the COPYRIGHT that is included with this distribution.
|
|
* pvxs is distributed subject to a Software License Agreement found
|
|
* in file LICENSE that is included with this distribution.
|
|
*
|
|
* Author George S. McIntyre <george@level-n.com>, 2023
|
|
*
|
|
*/
|
|
|
|
#include <algorithm>
|
|
#include <cmath>
|
|
#include <string>
|
|
|
|
#include <dbAccess.h>
|
|
#include <dbChannel.h>
|
|
#include <dbEvent.h>
|
|
#include <dbStaticLib.h>
|
|
#include <special.h>
|
|
|
|
#include <pvxs/log.h>
|
|
#include <pvxs/nt.h>
|
|
#include <pvxs/source.h>
|
|
#include <dbNotify.h>
|
|
|
|
#include "dbentry.h"
|
|
#include "dberrormessage.h"
|
|
#include "dblocker.h"
|
|
#include "iocsource.h"
|
|
#include "singlesource.h"
|
|
#include "singlesrcsubscriptionctx.h"
|
|
#include "credentials.h"
|
|
#include "securitylogger.h"
|
|
#include "securityclient.h"
|
|
#include "typeutils.h"
|
|
#include "localfieldlog.h"
|
|
|
|
namespace pvxs {
|
|
namespace ioc {
|
|
|
|
DEFINE_LOGGER(_logname, "pvxs.ioc.single.source");
|
|
|
|
DEFINE_INST_COUNTER(PutOperationCache);
|
|
DEFINE_INST_COUNTER(SingleInfo);
|
|
|
|
namespace {
|
|
|
|
void subscriptionCallback(SingleSourceSubscriptionCtx* subscriptionContext,
|
|
UpdateType::type change,
|
|
dbChannel* pChannel,
|
|
struct db_field_log* pDbFieldLog) noexcept {
|
|
try {
|
|
// Get the current value of this subscription
|
|
// We simply merge new field changes onto this value as events occur
|
|
auto currentValue = subscriptionContext->currentValue;
|
|
|
|
{
|
|
DBLocker F(dbChannelRecord(subscriptionContext->info->chan));
|
|
// TODO MappingInfo::nsecMask
|
|
IOCSource::get(currentValue, MappingInfo(), Value(), change, pChannel, pDbFieldLog);
|
|
}
|
|
|
|
// Make sure that the initial subscription update has occurred on both channels before continuing
|
|
// As we make two initial updates when opening a new subscription, we need both to have completed before continuing
|
|
if (subscriptionContext->hadValueEvent && subscriptionContext->hadPropertyEvent) {
|
|
// Return value
|
|
subscriptionContext->subscriptionControl->post(currentValue.clone());
|
|
currentValue.unmark();
|
|
}
|
|
} catch(std::exception& e) {
|
|
log_exc_printf(_logname, "Unhandled exception in %s\n", __func__);
|
|
}
|
|
}
|
|
|
|
void subscriptionValueCallback(void* userArg, struct dbChannel* pChannel,
|
|
int, struct db_field_log* pDbFieldLog) noexcept {
|
|
auto subscriptionContext = (SingleSourceSubscriptionCtx*)userArg;
|
|
subscriptionContext->hadValueEvent = true;
|
|
auto change = subscriptionContext->pValueEventSubscription.mask;
|
|
#if EPICS_VERSION_INT >= VERSION_INT(7, 0, 6, 0)
|
|
if(pDbFieldLog) {
|
|
// when available, use DBE mask from db_field_log
|
|
change = pDbFieldLog->mask;
|
|
}
|
|
#endif
|
|
// ARCHIVE events will get the same data fields as VALUE
|
|
if(change & DBE_ARCHIVE)
|
|
change = (change&~DBE_ARCHIVE)|DBE_VALUE;
|
|
|
|
// promote DBE_ALARM only to also fetch value
|
|
if((change & (DBE_VALUE|DBE_ARCHIVE|DBE_ALARM)) == DBE_ALARM)
|
|
change |= DBE_VALUE;
|
|
|
|
change &= UpdateType::Everything; // does not include DBE_ARCHIVE
|
|
subscriptionCallback(subscriptionContext, UpdateType::type(change), pChannel, pDbFieldLog);
|
|
}
|
|
|
|
void subscriptionPropertiesCallback(void* userArg, struct dbChannel* pChannel, int,
|
|
struct db_field_log* pDbFieldLog) noexcept {
|
|
auto subscriptionContext = (SingleSourceSubscriptionCtx*)userArg;
|
|
subscriptionContext->hadPropertyEvent = true;
|
|
subscriptionCallback(subscriptionContext, UpdateType::Property, pChannel, pDbFieldLog);
|
|
}
|
|
|
|
/**
|
|
* Called by the framework when a client subscribes to a channel. We intercept the call before this function is called
|
|
* to add a new subscription context with a value prototype matching the channel definition.
|
|
*
|
|
* @param subscriptionContext a new subscription context with a value prototype matching the channel
|
|
* @param subscriptionOperation the channel subscription operation
|
|
*/
|
|
void onSubscribe(const std::shared_ptr<SingleSourceSubscriptionCtx>& subscriptionContext,
|
|
const DBEventContext& eventContext,
|
|
std::unique_ptr<server::MonitorSetupOp>&& subscriptionOperation)
|
|
{
|
|
auto pvReq(subscriptionOperation->pvRequest());
|
|
unsigned dbe = 0;
|
|
if(auto fld = pvReq["record._options.DBE"].ifMarked()) {
|
|
switch(fld.type().kind()) {
|
|
case Kind::String: {
|
|
auto mask(fld.as<std::string>());
|
|
// name and sloppy parsing a la. caProvider...
|
|
#define CASE(EVENT) if(mask.find(#EVENT)!=mask.npos) dbe |= DBE_ ## EVENT
|
|
CASE(VALUE);
|
|
CASE(ARCHIVE);
|
|
CASE(ALARM);
|
|
// CASE(PROPERTY); // handled as special case
|
|
#undef CASE
|
|
if(!dbe && !mask.empty()) {
|
|
subscriptionOperation->logRemote(Level::Warn,
|
|
SB()<<pvReq.nameOf(fld)<<"=\""<<mask<<"\" selects empty mask");
|
|
}
|
|
break;
|
|
}
|
|
case Kind::Integer:
|
|
case Kind::Real:
|
|
dbe = fld.as<uint8_t>();
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
}
|
|
dbe &= (DBE_VALUE | DBE_ARCHIVE | DBE_ALARM);
|
|
if(!dbe)
|
|
dbe = DBE_VALUE | DBE_ALARM;
|
|
|
|
// inform peer of data type and acquire control of the subscription queue
|
|
subscriptionContext->subscriptionControl = subscriptionOperation->connect(subscriptionContext->currentValue);
|
|
|
|
IOCSource::initialize(subscriptionContext->currentValue,
|
|
*subscriptionContext->info,
|
|
subscriptionContext->info->chan);
|
|
|
|
// Two subscription are made for pvxs
|
|
// first subscription is for Value changes
|
|
subscriptionContext->pValueEventSubscription.subscribe(eventContext.get(),
|
|
subscriptionContext->info->chan,
|
|
subscriptionValueCallback,
|
|
subscriptionContext.get(),
|
|
dbe
|
|
);
|
|
// second subscription is for Property changes
|
|
subscriptionContext->pPropertiesEventSubscription.subscribe(eventContext.get(),
|
|
subscriptionContext->pPropertiesChannel,
|
|
subscriptionPropertiesCallback,
|
|
subscriptionContext.get(),
|
|
DBE_PROPERTY
|
|
);
|
|
|
|
// If all goes well, Set up handlers for start and stop monitoring events
|
|
// The subscription context is being kept alive because it is being bound into some internal storage by onStart
|
|
subscriptionContext->subscriptionControl->onStart([subscriptionContext](bool isStarting) {
|
|
if (isStarting) {
|
|
subscriptionContext->eventsEnabled = true;
|
|
subscriptionContext->pValueEventSubscription.enable();
|
|
subscriptionContext->pPropertiesEventSubscription.enable();
|
|
} else {
|
|
subscriptionContext->pValueEventSubscription.disable();
|
|
subscriptionContext->pPropertiesEventSubscription.disable();
|
|
subscriptionContext->eventsEnabled = false;
|
|
}
|
|
});
|
|
}
|
|
/**
|
|
* Create a Value Prototype for storing values returned by the given channel.
|
|
*
|
|
* @param dbChannelSharedPtr pointer to the channel
|
|
* @return a value prototype for the given channel
|
|
*/
|
|
Value getValuePrototype(const std::shared_ptr<SingleInfo>& sinfo) {
|
|
auto& chan(sinfo->chan);
|
|
short dbrType(dbChannelFinalFieldType(chan));
|
|
auto valueType(IOCSource::getChannelValueType(chan));
|
|
|
|
Value valuePrototype;
|
|
// To control optional metadata set to true to include in the output
|
|
bool display = true;
|
|
bool control = true;
|
|
bool valueAlarm = true;
|
|
|
|
if (dbrType == DBR_ENUM) {
|
|
valuePrototype = nt::NTEnum{}.create();
|
|
} else {
|
|
valuePrototype = nt::NTScalar{ valueType, display, control, valueAlarm, true }.create();
|
|
}
|
|
return valuePrototype;
|
|
}
|
|
|
|
/**
|
|
* Callback for asynchronous put operations to handle the actual put value operation
|
|
*
|
|
* @param notify the process notify object to use
|
|
* @param type the put notification type
|
|
* @return 1 for success and 0 for errors
|
|
*/
|
|
int putCallback(struct processNotify* notify, notifyPutType type) {
|
|
if (notify->status != notifyOK) {
|
|
return 0;
|
|
}
|
|
|
|
auto pPutOperationCache = (PutOperationCache*)notify->usrPvt;
|
|
auto valueToSet = std::move(pPutOperationCache->valueToSet);
|
|
|
|
switch (type) {
|
|
case putDisabledType:
|
|
// Request has been made but the record has been disabled, so noop and only call done callback
|
|
return 0;
|
|
case putFieldType:
|
|
// As this type will be only called for Links the IOCSource::put() will handle the locking as a special case
|
|
case putType:
|
|
// For this type, the caller has already locked the record, so we'll not lock either
|
|
IOCSource::put(pPutOperationCache->notify.chan, valueToSet, MappingInfo()); // put
|
|
break;
|
|
}
|
|
return 1;
|
|
}
|
|
|
|
/**
|
|
* Callback when asynchronous put's are complete
|
|
*
|
|
* @param notify the process notify object to use
|
|
*/
|
|
void doneCallback(struct processNotify* notify) {
|
|
// Get our put operation cache object from the user pointer
|
|
auto pPutOperationCache = (PutOperationCache*)notify->usrPvt;
|
|
|
|
// Get the cached putOperation controller
|
|
auto putOperation = std::move(pPutOperationCache->putOperation);
|
|
|
|
// TODO handle cancelled requests
|
|
// int expected = 1;
|
|
// if (std::atomic_compare_exchange_weak(&pPutOperationCache->notifyBusy, &expected, 0) == 0) {
|
|
// std::cerr << "SinglePut dbNotify state error?\n";
|
|
// }
|
|
|
|
switch (notify->status) {
|
|
case notifyOK:
|
|
// If everything is ok then notify the caller
|
|
putOperation->reply();
|
|
break;
|
|
case notifyCanceled:
|
|
return; // skip notification
|
|
case notifyError:
|
|
putOperation->error("Error in dbNotify");
|
|
break;
|
|
case notifyPutDisabled:
|
|
putOperation->error("Put disabled");
|
|
break;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Handle the get operation
|
|
*
|
|
* @param pDbChannel the channel that the request comes in on
|
|
* @param getOperation the current executing operation
|
|
* @param valuePrototype a value prototype that is made based on the expected type to be returned
|
|
*/
|
|
void singleGet(const SingleInfo& info,
|
|
std::unique_ptr<server::ExecOp>& getOperation,
|
|
const Value& valuePrototype) {
|
|
auto& pDbChannel(info.chan);
|
|
try {
|
|
auto returnValue = valuePrototype.cloneEmpty();
|
|
// TODO: MappingInfo::nsecMask
|
|
IOCSource::initialize(returnValue, info, pDbChannel);
|
|
{
|
|
DBLocker F(pDbChannel->addr.precord); // lock
|
|
LocalFieldLog localFieldLog(pDbChannel);
|
|
IOCSource::get(returnValue, info,
|
|
Value(), UpdateType::Everything,
|
|
pDbChannel, localFieldLog.pFieldLog);
|
|
}
|
|
getOperation->reply(returnValue);
|
|
} catch (const std::exception& getException) {
|
|
getOperation->error(getException.what());
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Handler for the onOp event raised by pvxs Sources when they are started, in order to define the get and put handlers
|
|
* on a per source basis.
|
|
* This is called after the event has been intercepted and we add the channel and value prototype to the call.
|
|
*
|
|
* @param dbChannelSharedPtr the channel to which the get/put operation pertains
|
|
* @param valuePrototype the value prototype that is appropriate for the given channel
|
|
* @param channelConnectOperation the channel connect operation object
|
|
*/
|
|
void onOp(const std::shared_ptr<SingleInfo>& sInfo, const Value& valuePrototype,
|
|
std::unique_ptr<server::ConnectOp>&& channelConnectOperation) {
|
|
// Announce the channel type with a `connect()` call. This happens only once
|
|
channelConnectOperation->connect(valuePrototype);
|
|
|
|
// Set up handler for get requests
|
|
channelConnectOperation
|
|
->onGet([sInfo, valuePrototype](std::unique_ptr<server::ExecOp>&& getOperation) {
|
|
singleGet(*sInfo, getOperation, valuePrototype);
|
|
});
|
|
|
|
// skip PUT specific allocations unless needed
|
|
if(channelConnectOperation->op()!=server::OpBase::Put)
|
|
return;
|
|
|
|
// Make a security cache for this client's connection to this pv
|
|
// Each time the same client calls put we will reuse the cached security client
|
|
// The security cache will be deleted when the client disconnects from this pv
|
|
auto putOperationCache = std::make_shared<PutOperationCache>();
|
|
putOperationCache->notify.usrPvt = putOperationCache.get();
|
|
putOperationCache->notify.chan = sInfo->chan;
|
|
putOperationCache->notify.putCallback = putCallback;
|
|
putOperationCache->notify.doneCallback = doneCallback;
|
|
|
|
// Set up handler for put requests
|
|
channelConnectOperation
|
|
->onPut([sInfo, putOperationCache](
|
|
std::unique_ptr<server::ExecOp>&& putOperation,
|
|
Value&& value) {
|
|
try {
|
|
dbChannel* pDbChannel = sInfo->chan;
|
|
if (!sInfo->done) {
|
|
// initialize credentials on first PUT to this Channel
|
|
sInfo->credentials.reset(new Credentials(*putOperation->credentials()));
|
|
sInfo->securityClient.update(pDbChannel, *sInfo->credentials);
|
|
|
|
sInfo->done = true;
|
|
}
|
|
// for each PUT (may have different pvRequest)
|
|
auto& pvRequest = putOperation->pvRequest();
|
|
pvRequest["record._options.block"].as<bool>(putOperationCache->doWait);
|
|
IOCSource::setForceProcessingFlag(putOperation.get(), pvRequest, putOperationCache->forceProcessing);
|
|
if (putOperationCache->forceProcessing == TriState::False) {
|
|
putOperationCache->doWait = false; // no point in waiting
|
|
}
|
|
|
|
SecurityLogger securityLogger;
|
|
|
|
IOCSource::doPreProcessing(pDbChannel,
|
|
securityLogger,
|
|
*sInfo->credentials,
|
|
sInfo->securityClient); // pre-process
|
|
IOCSource::doFieldPreProcessing(sInfo->securityClient); // pre-process field
|
|
if (putOperationCache->doWait) {
|
|
putOperationCache->valueToSet = value;
|
|
// TODO prevent concurrent put with callbacks (notifyBusy)
|
|
|
|
putOperationCache->notify.requestType = value["value"].isMarked(true, true)
|
|
? putProcessRequest : processRequest;
|
|
putOperationCache->putOperation = std::move(putOperation);
|
|
dbProcessNotify(&putOperationCache->notify);
|
|
return;
|
|
}
|
|
|
|
CurrentOp op(putOperation.get());
|
|
|
|
if (dbChannelFieldType(pDbChannel) >= DBF_INLINK
|
|
&& dbChannelFieldType(pDbChannel) <= DBF_FWDLINK) {
|
|
// Locking is handled by dbPutField() called as a special case in IOCSource::put() for links
|
|
IOCSource::put(pDbChannel, value, MappingInfo()); // put
|
|
} else {
|
|
// All other field types call dbChannelPut() directly, so we have to perform locking here
|
|
DBLocker F(pDbChannel->addr.precord); // lock
|
|
IOCSource::put(pDbChannel, value, MappingInfo()); // put
|
|
IOCSource::doPostProcessing(pDbChannel, putOperationCache->forceProcessing); // post-process
|
|
}
|
|
putOperation->reply();
|
|
} catch (std::exception& e) {
|
|
putOperation->error(e.what());
|
|
}
|
|
});
|
|
}
|
|
|
|
} // namespace
|
|
|
|
/**
|
|
* Constructor for SingleSource registrar.
|
|
*/
|
|
SingleSource::SingleSource()
|
|
:eventContext(db_init_events()) // Initialise event context
|
|
{
|
|
auto names(std::make_shared<std::set<std::string >>());
|
|
|
|
// For each record type and for each record in that type, add record name to the list of all records
|
|
DBEntry dbEntry;
|
|
for (long status = dbFirstRecordType(dbEntry); !status; status = dbNextRecordType(dbEntry)) {
|
|
for (status = dbFirstRecord(dbEntry); !status; status = dbNextRecord(dbEntry)) {
|
|
names->insert(dbEntry->precnode->recordname);
|
|
}
|
|
}
|
|
|
|
allRecords.names = names;
|
|
|
|
// Start event pump
|
|
if (!eventContext) {
|
|
throw std::runtime_error("Single Source: Event Context failed to initialise: db_init_events()");
|
|
}
|
|
|
|
if (db_start_events(eventContext.get(), "qsrvSingle", nullptr, nullptr, epicsThreadPriorityCAServerLow - 1)) {
|
|
throw std::runtime_error("Could not start event thread: db_start_events()");
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Handle the create source operation. This is called once when the source is created.
|
|
* We will register all of the database records that have been loaded until this time as pv names in this
|
|
* source.
|
|
* @param channelControl
|
|
*/
|
|
void SingleSource::onCreate(std::unique_ptr<server::ChannelControl>&& channelControl) {
|
|
auto sourceName(channelControl->name().c_str());
|
|
Channel pDbChannel;
|
|
try {
|
|
pDbChannel = Channel(sourceName);
|
|
} catch (std::exception& e) {
|
|
log_debug_printf(_logname, "Ignore requested channel '%s' : %s\n", sourceName, e.what());
|
|
return;
|
|
}
|
|
|
|
log_debug_printf(_logname, "Accepting channel for '%s'\n", sourceName);
|
|
|
|
auto sInfo(std::make_shared<SingleInfo>(std::move(pDbChannel)));
|
|
|
|
// Create callbacks for handling requests and channel subscriptions
|
|
Value valuePrototype = getValuePrototype(sInfo);
|
|
|
|
// Get and Put requests
|
|
channelControl
|
|
->onOp([sInfo, valuePrototype](std::unique_ptr<server::ConnectOp>&& channelConnectOperation) {
|
|
onOp(sInfo, valuePrototype, std::move(channelConnectOperation));
|
|
});
|
|
|
|
// binding 'this' safe as Server shutdown will close connections before dropping Source
|
|
channelControl
|
|
->onSubscribe([this, valuePrototype, sInfo](
|
|
std::unique_ptr<server::MonitorSetupOp>&& subscriptionOperation) {
|
|
// The subscription must be kept alive
|
|
// We accomplish this further on during the binding of the onStart()
|
|
auto subscriptionContext(std::make_shared<SingleSourceSubscriptionCtx>(sInfo));
|
|
subscriptionContext->currentValue = valuePrototype.cloneEmpty();
|
|
onSubscribe(subscriptionContext, eventContext, std::move(subscriptionOperation));
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Respond to search requests. For each matching pv, claim that pv
|
|
*
|
|
* @param searchOperation the search operation
|
|
*/
|
|
void SingleSource::onSearch(Search& searchOperation) {
|
|
for (auto& pv: searchOperation) {
|
|
if (!dbChannelTest(pv.name())) {
|
|
pv.claim();
|
|
log_debug_printf(_logname, "Claiming '%s'\n", pv.name());
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Respond to the show request by displaying a list of all the PVs hosted in this ioc
|
|
*
|
|
* @param outputStream the stream to show the list on
|
|
*/
|
|
void SingleSource::show(std::ostream& outputStream) {
|
|
outputStream << "IOC";
|
|
for (auto& name: *SingleSource::allRecords.names) {
|
|
outputStream << "\n" << indent{} << name;
|
|
}
|
|
}
|
|
|
|
} // ioc
|
|
} // pvxs
|