6d1216daad
add pvalink json schema avoid JSON5 in testpvalink for portability. fixup build with pvalink trap bad_weak_ptr open during dtor Not sure why this is happening, but need not be CRIT. c++11, cleanup, and notes fix pvalink test sync fix test cleanup on exit pvalink disconnected link is always INVALID pvalink logging pvalink capture Disconnect time pvalink eliminate providerName restrict local to dbChannelTest() aka. no qsrv groups pvalink onTypeChange when attaching link to existing channel pvalink eliminate unused Connecting state pvalink add InstCounter pvalink AfterPut can be const pvalink add atomic jlif flag include epicsStdio.h later avoid #define printf troubles assert cleanup state on exit pvalink add newer lset functions test link disconnect testpvalink redo testPutAsync() pvalink fill out meta-data fetch pvalink fix FLNK pvalink cache putReq pvalink test atomic monitor pvalink test enum handling pvalink handle scalar read of empty array make it well defined anyway... pvalink test array of strings handle db_add_event() failure handle record._options.DBE
470 lines
19 KiB
C++
470 lines
19 KiB
C++
/*
|
|
* Copyright - See the COPYRIGHT that is included with this distribution.
|
|
* pvxs is distributed subject to a Software License Agreement found
|
|
* in file LICENSE that is included with this distribution.
|
|
*
|
|
* Author George S. McIntyre <george@level-n.com>, 2023
|
|
*
|
|
*/
|
|
|
|
#include <algorithm>
|
|
#include <cmath>
|
|
#include <string>
|
|
|
|
#include <dbAccess.h>
|
|
#include <dbChannel.h>
|
|
#include <dbEvent.h>
|
|
#include <dbStaticLib.h>
|
|
#include <special.h>
|
|
|
|
#include <pvxs/log.h>
|
|
#include <pvxs/nt.h>
|
|
#include <pvxs/source.h>
|
|
#include <dbNotify.h>
|
|
|
|
#include "dbentry.h"
|
|
#include "dberrormessage.h"
|
|
#include "dblocker.h"
|
|
#include "iocsource.h"
|
|
#include "singlesource.h"
|
|
#include "singlesrcsubscriptionctx.h"
|
|
#include "credentials.h"
|
|
#include "securitylogger.h"
|
|
#include "securityclient.h"
|
|
#include "typeutils.h"
|
|
#include "localfieldlog.h"
|
|
|
|
namespace pvxs {
|
|
namespace ioc {
|
|
|
|
DEFINE_LOGGER(_logname, "pvxs.ioc.single.source");
|
|
|
|
DEFINE_INST_COUNTER(PutOperationCache);
|
|
DEFINE_INST_COUNTER(SingleInfo);
|
|
|
|
namespace {
|
|
|
|
void subscriptionCallback(SingleSourceSubscriptionCtx* subscriptionContext,
|
|
UpdateType::type change,
|
|
dbChannel* pChannel,
|
|
struct db_field_log* pDbFieldLog) {
|
|
try {
|
|
// Get the current value of this subscription
|
|
// We simply merge new field changes onto this value as events occur
|
|
auto currentValue = subscriptionContext->currentValue;
|
|
|
|
{
|
|
DBLocker F(dbChannelRecord(subscriptionContext->info->chan));
|
|
// TODO MappingInfo::nsecMask
|
|
IOCSource::get(currentValue, MappingInfo(), Value(), change, pChannel, pDbFieldLog);
|
|
}
|
|
|
|
// Make sure that the initial subscription update has occurred on both channels before continuing
|
|
// As we make two initial updates when opening a new subscription, we need both to have completed before continuing
|
|
if (subscriptionContext->hadValueEvent && subscriptionContext->hadPropertyEvent) {
|
|
// Return value
|
|
subscriptionContext->subscriptionControl->post(currentValue.clone());
|
|
currentValue.unmark();
|
|
}
|
|
} catch(std::exception& e) {
|
|
log_exc_printf(_logname, "Unhandled exception in %s\n", __func__);
|
|
}
|
|
}
|
|
|
|
void subscriptionValueCallback(void* userArg, struct dbChannel* pChannel, int, struct db_field_log* pDbFieldLog) {
|
|
auto subscriptionContext = (SingleSourceSubscriptionCtx*)userArg;
|
|
subscriptionContext->hadValueEvent = true;
|
|
auto change = UpdateType::type(UpdateType::Value | UpdateType::Alarm);
|
|
#if EPICS_VERSION_INT >= VERSION_INT(7, 0, 6, 0)
|
|
if(pDbFieldLog) {
|
|
// when available, use DBE mask from db_field_log
|
|
change = UpdateType::type(pDbFieldLog->mask & UpdateType::Everything);
|
|
}
|
|
#endif
|
|
subscriptionCallback(subscriptionContext, change, pChannel, pDbFieldLog);
|
|
}
|
|
|
|
void subscriptionPropertiesCallback(void* userArg, struct dbChannel* pChannel, int,
|
|
struct db_field_log* pDbFieldLog) {
|
|
auto subscriptionContext = (SingleSourceSubscriptionCtx*)userArg;
|
|
subscriptionContext->hadPropertyEvent = true;
|
|
subscriptionCallback(subscriptionContext, UpdateType::Property, pChannel, pDbFieldLog);
|
|
}
|
|
|
|
/**
|
|
* Called by the framework when a client subscribes to a channel. We intercept the call before this function is called
|
|
* to add a new subscription context with a value prototype matching the channel definition.
|
|
*
|
|
* @param subscriptionContext a new subscription context with a value prototype matching the channel
|
|
* @param subscriptionOperation the channel subscription operation
|
|
*/
|
|
void onSubscribe(const std::shared_ptr<SingleSourceSubscriptionCtx>& subscriptionContext,
|
|
const DBEventContext& eventContext,
|
|
std::unique_ptr<server::MonitorSetupOp>&& subscriptionOperation)
|
|
{
|
|
auto pvReq(subscriptionOperation->pvRequest());
|
|
unsigned dbe = 0;
|
|
if(auto fld = pvReq["record._options.DBE"].ifMarked()) {
|
|
switch(fld.type().kind()) {
|
|
case Kind::String: {
|
|
auto mask(fld.as<std::string>());
|
|
// name and sloppy parsing a la. caProvider...
|
|
#define CASE(EVENT) if(mask.find(#EVENT)!=mask.npos) dbe |= DBE_ ## EVENT
|
|
CASE(VALUE);
|
|
CASE(ARCHIVE);
|
|
CASE(ALARM);
|
|
// CASE(PROPERTY); // handled as special case
|
|
#undef CASE
|
|
break;
|
|
}
|
|
case Kind::Integer:
|
|
case Kind::Real:
|
|
dbe = fld.as<uint8_t>();
|
|
break;
|
|
default:
|
|
break;
|
|
}
|
|
}
|
|
dbe &= (DBE_VALUE | DBE_ARCHIVE | DBE_ALARM);
|
|
if(!dbe)
|
|
dbe = DBE_VALUE | DBE_ALARM;
|
|
|
|
// inform peer of data type and acquire control of the subscription queue
|
|
subscriptionContext->subscriptionControl = subscriptionOperation->connect(subscriptionContext->currentValue);
|
|
|
|
IOCSource::initialize(subscriptionContext->currentValue,
|
|
*subscriptionContext->info,
|
|
subscriptionContext->info->chan);
|
|
|
|
// Two subscription are made for pvxs
|
|
// first subscription is for Value changes
|
|
subscriptionContext->pValueEventSubscription.subscribe(eventContext.get(),
|
|
subscriptionContext->info->chan,
|
|
subscriptionValueCallback,
|
|
subscriptionContext.get(),
|
|
dbe
|
|
);
|
|
// second subscription is for Property changes
|
|
subscriptionContext->pPropertiesEventSubscription.subscribe(eventContext.get(),
|
|
subscriptionContext->pPropertiesChannel,
|
|
subscriptionPropertiesCallback,
|
|
subscriptionContext.get(),
|
|
DBE_PROPERTY
|
|
);
|
|
|
|
// If all goes well, Set up handlers for start and stop monitoring events
|
|
// The subscription context is being kept alive because it is being bound into some internal storage by onStart
|
|
subscriptionContext->subscriptionControl->onStart([subscriptionContext](bool isStarting) {
|
|
if (isStarting) {
|
|
subscriptionContext->eventsEnabled = true;
|
|
subscriptionContext->pValueEventSubscription.enable();
|
|
subscriptionContext->pPropertiesEventSubscription.enable();
|
|
} else {
|
|
subscriptionContext->pValueEventSubscription.disable();
|
|
subscriptionContext->pPropertiesEventSubscription.disable();
|
|
subscriptionContext->eventsEnabled = false;
|
|
}
|
|
});
|
|
}
|
|
/**
|
|
* Create a Value Prototype for storing values returned by the given channel.
|
|
*
|
|
* @param dbChannelSharedPtr pointer to the channel
|
|
* @return a value prototype for the given channel
|
|
*/
|
|
Value getValuePrototype(const std::shared_ptr<SingleInfo>& sinfo) {
|
|
auto& chan(sinfo->chan);
|
|
short dbrType(dbChannelFinalFieldType(chan));
|
|
auto valueType(IOCSource::getChannelValueType(chan));
|
|
|
|
Value valuePrototype;
|
|
// To control optional metadata set to true to include in the output
|
|
bool display = true;
|
|
bool control = true;
|
|
bool valueAlarm = true;
|
|
|
|
if (dbrType == DBR_ENUM) {
|
|
valuePrototype = nt::NTEnum{}.create();
|
|
} else {
|
|
valuePrototype = nt::NTScalar{ valueType, display, control, valueAlarm, true }.create();
|
|
}
|
|
return valuePrototype;
|
|
}
|
|
|
|
/**
|
|
* Callback for asynchronous put operations to handle the actual put value operation
|
|
*
|
|
* @param notify the process notify object to use
|
|
* @param type the put notification type
|
|
* @return 1 for success and 0 for errors
|
|
*/
|
|
int putCallback(struct processNotify* notify, notifyPutType type) {
|
|
if (notify->status != notifyOK) {
|
|
return 0;
|
|
}
|
|
|
|
auto pPutOperationCache = (PutOperationCache*)notify->usrPvt;
|
|
auto valueToSet = std::move(pPutOperationCache->valueToSet);
|
|
|
|
switch (type) {
|
|
case putDisabledType:
|
|
// Request has been made but the record has been disabled, so noop and only call done callback
|
|
return 0;
|
|
case putFieldType:
|
|
// As this type will be only called for Links the IOCSource::put() will handle the locking as a special case
|
|
case putType:
|
|
// For this type, the caller has already locked the record, so we'll not lock either
|
|
IOCSource::put(pPutOperationCache->notify.chan, valueToSet, MappingInfo()); // put
|
|
break;
|
|
}
|
|
return 1;
|
|
}
|
|
|
|
/**
|
|
* Callback when asynchronous put's are complete
|
|
*
|
|
* @param notify the process notify object to use
|
|
*/
|
|
void doneCallback(struct processNotify* notify) {
|
|
// Get our put operation cache object from the user pointer
|
|
auto pPutOperationCache = (PutOperationCache*)notify->usrPvt;
|
|
|
|
// Get the cached putOperation controller
|
|
auto putOperation = std::move(pPutOperationCache->putOperation);
|
|
|
|
// TODO handle cancelled requests
|
|
// int expected = 1;
|
|
// if (std::atomic_compare_exchange_weak(&pPutOperationCache->notifyBusy, &expected, 0) == 0) {
|
|
// std::cerr << "SinglePut dbNotify state error?\n";
|
|
// }
|
|
|
|
switch (notify->status) {
|
|
case notifyOK:
|
|
// If everything is ok then notify the caller
|
|
putOperation->reply();
|
|
break;
|
|
case notifyCanceled:
|
|
return; // skip notification
|
|
case notifyError:
|
|
putOperation->error("Error in dbNotify");
|
|
break;
|
|
case notifyPutDisabled:
|
|
putOperation->error("Put disabled");
|
|
break;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Handle the get operation
|
|
*
|
|
* @param pDbChannel the channel that the request comes in on
|
|
* @param getOperation the current executing operation
|
|
* @param valuePrototype a value prototype that is made based on the expected type to be returned
|
|
*/
|
|
void singleGet(const SingleInfo& info,
|
|
std::unique_ptr<server::ExecOp>& getOperation,
|
|
const Value& valuePrototype) {
|
|
auto& pDbChannel(info.chan);
|
|
try {
|
|
auto returnValue = valuePrototype.cloneEmpty();
|
|
// TODO: MappingInfo::nsecMask
|
|
IOCSource::initialize(returnValue, info, pDbChannel);
|
|
{
|
|
DBLocker F(pDbChannel->addr.precord); // lock
|
|
LocalFieldLog localFieldLog(pDbChannel);
|
|
IOCSource::get(returnValue, info,
|
|
Value(), UpdateType::Everything,
|
|
pDbChannel, localFieldLog.pFieldLog);
|
|
}
|
|
getOperation->reply(returnValue);
|
|
} catch (const std::exception& getException) {
|
|
getOperation->error(getException.what());
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Handler for the onOp event raised by pvxs Sources when they are started, in order to define the get and put handlers
|
|
* on a per source basis.
|
|
* This is called after the event has been intercepted and we add the channel and value prototype to the call.
|
|
*
|
|
* @param dbChannelSharedPtr the channel to which the get/put operation pertains
|
|
* @param valuePrototype the value prototype that is appropriate for the given channel
|
|
* @param channelConnectOperation the channel connect operation object
|
|
*/
|
|
void onOp(const std::shared_ptr<SingleInfo>& sInfo, const Value& valuePrototype,
|
|
std::unique_ptr<server::ConnectOp>&& channelConnectOperation) {
|
|
// Announce the channel type with a `connect()` call. This happens only once
|
|
channelConnectOperation->connect(valuePrototype);
|
|
|
|
// Set up handler for get requests
|
|
channelConnectOperation
|
|
->onGet([sInfo, valuePrototype](std::unique_ptr<server::ExecOp>&& getOperation) {
|
|
singleGet(*sInfo, getOperation, valuePrototype);
|
|
});
|
|
|
|
// Make a security cache for this client's connection to this pv
|
|
// Each time the same client calls put we will re-use the cached security client
|
|
// The security cache will be deleted when the client disconnects from this pv
|
|
auto putOperationCache = std::make_shared<PutOperationCache>();
|
|
|
|
// Set up handler for put requests
|
|
channelConnectOperation
|
|
->onPut([sInfo, putOperationCache](
|
|
std::unique_ptr<server::ExecOp>&& putOperation,
|
|
Value&& value) {
|
|
try {
|
|
dbChannel* pDbChannel = sInfo->chan;
|
|
if (!putOperationCache->done) {
|
|
putOperationCache->credentials.reset(new Credentials(*putOperation->credentials()));
|
|
putOperationCache->securityClient.update(pDbChannel, *putOperationCache->credentials);
|
|
putOperationCache->notify.usrPvt = putOperationCache.get();
|
|
putOperationCache->notify.chan = pDbChannel;
|
|
putOperationCache->notify.putCallback = putCallback;
|
|
putOperationCache->notify.doneCallback = doneCallback;
|
|
|
|
auto& pvRequest = putOperation->pvRequest();
|
|
pvRequest["record._options.block"].as<bool>(putOperationCache->doWait);
|
|
IOCSource::setForceProcessingFlag(pvRequest, putOperationCache);
|
|
if (putOperationCache->forceProcessing) {
|
|
putOperationCache->doWait = false; // no point in waiting
|
|
}
|
|
putOperationCache->done = true;
|
|
}
|
|
|
|
SecurityLogger securityLogger;
|
|
|
|
IOCSource::doPreProcessing(pDbChannel,
|
|
securityLogger,
|
|
*putOperationCache->credentials,
|
|
putOperationCache->securityClient); // pre-process
|
|
IOCSource::doFieldPreProcessing(putOperationCache->securityClient); // pre-process field
|
|
if (putOperationCache->doWait) {
|
|
putOperationCache->valueToSet = value;
|
|
// TODO prevent concurrent put with callbacks (notifyBusy)
|
|
|
|
putOperationCache->notify.requestType = value["value"].isMarked(true, true)
|
|
? putProcessRequest : processRequest;
|
|
putOperationCache->putOperation = std::move(putOperation);
|
|
dbProcessNotify(&putOperationCache->notify);
|
|
return;
|
|
}
|
|
|
|
CurrentOp op(putOperation.get());
|
|
|
|
if (dbChannelFieldType(pDbChannel) >= DBF_INLINK
|
|
&& dbChannelFieldType(pDbChannel) <= DBF_FWDLINK) {
|
|
// Locking is handled by dbPutField() called as a special case in IOCSource::put() for links
|
|
IOCSource::put(pDbChannel, value, MappingInfo()); // put
|
|
} else {
|
|
// All other field types call dbChannelPut() directly, so we have to perform locking here
|
|
DBLocker F(pDbChannel->addr.precord); // lock
|
|
IOCSource::put(pDbChannel, value, MappingInfo()); // put
|
|
IOCSource::doPostProcessing(pDbChannel, putOperationCache->forceProcessing); // post-process
|
|
}
|
|
putOperation->reply();
|
|
} catch (std::exception& e) {
|
|
putOperation->error(e.what());
|
|
}
|
|
});
|
|
}
|
|
|
|
} // namespace
|
|
|
|
/**
|
|
* Constructor for SingleSource registrar.
|
|
*/
|
|
SingleSource::SingleSource()
|
|
:eventContext(db_init_events()) // Initialise event context
|
|
{
|
|
auto names(std::make_shared<std::set<std::string >>());
|
|
|
|
// For each record type and for each record in that type, add record name to the list of all records
|
|
DBEntry dbEntry;
|
|
for (long status = dbFirstRecordType(dbEntry); !status; status = dbNextRecordType(dbEntry)) {
|
|
for (status = dbFirstRecord(dbEntry); !status; status = dbNextRecord(dbEntry)) {
|
|
names->insert(dbEntry->precnode->recordname);
|
|
}
|
|
}
|
|
|
|
allRecords.names = names;
|
|
|
|
// Start event pump
|
|
if (!eventContext) {
|
|
throw std::runtime_error("Single Source: Event Context failed to initialise: db_init_events()");
|
|
}
|
|
|
|
if (db_start_events(eventContext.get(), "qsrvSingle", nullptr, nullptr, epicsThreadPriorityCAServerLow - 1)) {
|
|
throw std::runtime_error("Could not start event thread: db_start_events()");
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Handle the create source operation. This is called once when the source is created.
|
|
* We will register all of the database records that have been loaded until this time as pv names in this
|
|
* source.
|
|
* @param channelControl
|
|
*/
|
|
void SingleSource::onCreate(std::unique_ptr<server::ChannelControl>&& channelControl) {
|
|
auto sourceName(channelControl->name().c_str());
|
|
Channel pDbChannel;
|
|
try {
|
|
pDbChannel = Channel(sourceName);
|
|
} catch (std::exception& e) {
|
|
log_debug_printf(_logname, "Ignore requested channel '%s' : %s\n", sourceName, e.what());
|
|
return;
|
|
}
|
|
|
|
log_debug_printf(_logname, "Accepting channel for '%s'\n", sourceName);
|
|
|
|
auto sInfo(std::make_shared<SingleInfo>(std::move(pDbChannel)));
|
|
|
|
// Create callbacks for handling requests and channel subscriptions
|
|
Value valuePrototype = getValuePrototype(sInfo);
|
|
|
|
// Get and Put requests
|
|
channelControl
|
|
->onOp([sInfo, valuePrototype](std::unique_ptr<server::ConnectOp>&& channelConnectOperation) {
|
|
onOp(sInfo, valuePrototype, std::move(channelConnectOperation));
|
|
});
|
|
|
|
// binding 'this' safe as Server shutdown will close connections before dropping Source
|
|
channelControl
|
|
->onSubscribe([this, valuePrototype, sInfo](
|
|
std::unique_ptr<server::MonitorSetupOp>&& subscriptionOperation) {
|
|
// The subscription must be kept alive
|
|
// We accomplish this further on during the binding of the onStart()
|
|
auto subscriptionContext(std::make_shared<SingleSourceSubscriptionCtx>(sInfo));
|
|
subscriptionContext->currentValue = valuePrototype.cloneEmpty();
|
|
onSubscribe(subscriptionContext, eventContext, std::move(subscriptionOperation));
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Respond to search requests. For each matching pv, claim that pv
|
|
*
|
|
* @param searchOperation the search operation
|
|
*/
|
|
void SingleSource::onSearch(Search& searchOperation) {
|
|
for (auto& pv: searchOperation) {
|
|
if (!dbChannelTest(pv.name())) {
|
|
pv.claim();
|
|
log_debug_printf(_logname, "Claiming '%s'\n", pv.name());
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Respond to the show request by displaying a list of all the PVs hosted in this ioc
|
|
*
|
|
* @param outputStream the stream to show the list on
|
|
*/
|
|
void SingleSource::show(std::ostream& outputStream) {
|
|
outputStream << "IOC";
|
|
for (auto& name: *SingleSource::allRecords.names) {
|
|
outputStream << "\n" << indent{} << name;
|
|
}
|
|
}
|
|
|
|
} // ioc
|
|
} // pvxs
|