From 197f763452a4b4ce7313d3d0aeb8dbaf3b774c02 Mon Sep 17 00:00:00 2001 From: Matej Sekoranja Date: Wed, 29 Apr 2015 15:17:54 -0400 Subject: [PATCH 1/3] pipeline support --- pvAccessCPP.files | 218 ++++ pvAccessCPP.files.orig | 1180 +++++++++++++++++++++ src/Makefile | 1 + src/ca/caChannel.cpp | 4 + src/ca/caChannel.h | 1 + src/pipelineService/Makefile | 9 + src/pipelineService/pipelineServer.cpp | 752 +++++++++++++ src/pipelineService/pipelineServer.h | 74 ++ src/pipelineService/pipelineService.cpp | 8 + src/pipelineService/pipelineService.h | 101 ++ src/remoteClient/clientContextImpl.cpp | 127 ++- src/server/responseHandlers.cpp | 29 +- testApp/remote/Makefile | 4 + testApp/remote/pipelineServiceExample.cpp | 81 ++ 14 files changed, 2569 insertions(+), 20 deletions(-) create mode 100644 pvAccessCPP.files.orig create mode 100644 src/pipelineService/Makefile create mode 100644 src/pipelineService/pipelineServer.cpp create mode 100644 src/pipelineService/pipelineServer.h create mode 100644 src/pipelineService/pipelineService.cpp create mode 100644 src/pipelineService/pipelineService.h create mode 100644 testApp/remote/pipelineServiceExample.cpp diff --git a/pvAccessCPP.files b/pvAccessCPP.files index 5617612..60f0bd9 100644 --- a/pvAccessCPP.files +++ b/pvAccessCPP.files @@ -960,3 +960,221 @@ testApp/pvAccessAllTests.c testApp/rtemsConfig.c testApp/rtemsNetworking.h testApp/rtemsTestHarness.c +configure/O.darwin-x86/Makefile +configure/Makefile +include/pv/baseChannelRequester.h +include/pv/beaconEmitter.h +include/pv/beaconHandler.h +include/pv/beaconServerStatusProvider.h +include/pv/blockingTCP.h +include/pv/blockingUDP.h +include/pv/caChannel.h +include/pv/caProvider.h +include/pv/caStatus.h +include/pv/channelSearchManager.h +include/pv/clientContextImpl.h +include/pv/clientFactory.h +include/pv/codec.h +include/pv/configuration.h +include/pv/hexDump.h +include/pv/inetAddressUtil.h +include/pv/introspectionRegistry.h +include/pv/likely.h +include/pv/logger.h +include/pv/namedLockPattern.h +include/pv/pvAccess.h +include/pv/pvAccessMB.h +include/pv/pvaConstants.h +include/pv/pvaVersion.h +include/pv/referenceCountingLock.h +include/pv/remote.h +include/pv/responseHandlers.h +include/pv/rpcClient.h +include/pv/rpcServer.h +include/pv/rpcService.h +include/pv/security.h +include/pv/serializationHelper.h +include/pv/serverChannelImpl.h +include/pv/serverContext.h +include/pv/simpleChannelSearchManagerImpl.h +include/pv/syncChannelFind.h +include/pv/transportRegistry.h +include/pv/wildcard.h +pvtoolsSrc/O.darwin-x86/Makefile +pvtoolsSrc/eget.cpp +pvtoolsSrc/Makefile +pvtoolsSrc/pvget.cpp +pvtoolsSrc/pvinfo.cpp +pvtoolsSrc/pvlist.cpp +pvtoolsSrc/pvput.cpp +pvtoolsSrc/pvutils.cpp +pvtoolsSrc/pvutils.h +src/ca/caChannel.cpp +src/ca/caChannel.h +src/ca/caProvider.cpp +src/ca/caProvider.h +src/ca/caStatus.cpp +src/ca/caStatus.h +src/ca/Makefile +src/client/Makefile +src/client/pvAccess.cpp +src/client/pvAccess.h +src/factory/ChannelAccessFactory.cpp +src/factory/Makefile +src/ioc/Makefile +src/ioc/PVAClientRegister.cpp +src/ioc/PVAServerRegister.cpp +src/ioc/syncChannelFind.h +src/mb/Makefile +src/mb/pvAccessMB.cpp +src/mb/pvAccessMB.h +src/O.darwin-x86/Makefile +src/pipelineService/Makefile +src/pipelineService/pipelineServer.cpp +src/pipelineService/pipelineServer.h +src/pipelineService/pipelineService.cpp +src/pipelineService/pipelineService.h +src/pva/clientFactory.cpp +src/pva/clientFactory.h +src/pva/Makefile +src/pva/pvaConstants.h +src/pva/pvaVersion.cpp +src/pva/pvaVersion.h +src/remote/abstractResponseHandler.cpp +src/remote/beaconHandler.cpp +src/remote/beaconHandler.h +src/remote/blockingTCP.h +src/remote/blockingTCPAcceptor.cpp +src/remote/blockingTCPConnector.cpp +src/remote/blockingUDP.h +src/remote/blockingUDPConnector.cpp +src/remote/blockingUDPTransport.cpp +src/remote/channelSearchManager.h +src/remote/codec.cpp +src/remote/codec.h +src/remote/Makefile +src/remote/remote.h +src/remote/security.cpp +src/remote/security.h +src/remote/serializationHelper.cpp +src/remote/serializationHelper.h +src/remote/simpleChannelSearchManagerImpl.cpp +src/remote/simpleChannelSearchManagerImpl.h +src/remote/transportRegistry.cpp +src/remote/transportRegistry.h +src/remoteClient/clientContextImpl.cpp +src/remoteClient/clientContextImpl.h +src/remoteClient/Makefile +src/rpcClient/Makefile +src/rpcClient/rpcClient.cpp +src/rpcClient/rpcClient.h +src/rpcService/Makefile +src/rpcService/rpcServer.cpp +src/rpcService/rpcServer.h +src/rpcService/rpcService.cpp +src/rpcService/rpcService.h +src/server/baseChannelRequester.cpp +src/server/baseChannelRequester.h +src/server/beaconEmitter.cpp +src/server/beaconEmitter.h +src/server/beaconServerStatusProvider.cpp +src/server/beaconServerStatusProvider.h +src/server/Makefile +src/server/responseHandlers.cpp +src/server/responseHandlers.h +src/server/serverChannelImpl.cpp +src/server/serverChannelImpl.h +src/server/serverContext.cpp +src/server/serverContext.h +src/utils/configuration.cpp +src/utils/configuration.h +src/utils/hexDump.cpp +src/utils/hexDump.h +src/utils/inetAddressUtil.cpp +src/utils/inetAddressUtil.h +src/utils/introspectionRegistry.cpp +src/utils/introspectionRegistry.h +src/utils/likely.h +src/utils/logger.cpp +src/utils/logger.h +src/utils/Makefile +src/utils/namedLockPattern.h +src/utils/referenceCountingLock.cpp +src/utils/referenceCountingLock.h +src/utils/wildcard.cpp +src/utils/wildcard.h +src/Makefile +testApp/client/O.darwin-x86/Makefile +testApp/O.darwin-x86/Makefile +testApp/remote/channelAccessIFTest.cpp +testApp/remote/channelAccessIFTest.h +testApp/remote/epicsv4Grayscale.h +testApp/remote/Makefile +testApp/remote/rpcClientExample.cpp +testApp/remote/rpcServiceAsyncExample.cpp +testApp/remote/rpcServiceExample.cpp +testApp/remote/rpcWildServiceExample.cpp +testApp/remote/syncTestRequesters.h +testApp/remote/testADCSim.cpp +testApp/remote/testChannelAccess.cpp +testApp/remote/testChannelConnect.cpp +testApp/remote/testCodec.cpp +testApp/remote/testGetPerformance.cpp +testApp/remote/testMonitorPerformance.cpp +testApp/remote/testNTImage.cpp +testApp/remote/testRemoteClientImpl.cpp +testApp/remote/testServer.cpp +testApp/remote/testServerContext.cpp +testApp/utils/O.darwin-x86/Makefile +testApp/utils/configurationTest.cpp +testApp/utils/introspectionRegistryTest.cpp +testApp/utils/loggerTest.cpp +testApp/utils/Makefile +testApp/utils/namedLockPatternTest.cpp +testApp/utils/testAtomicBoolean.cpp +testApp/utils/testHexDump.cpp +testApp/utils/testInetAddressUtils.cpp +testApp/utils/transportRegistryTest.cpp +testApp/Makefile +testApp/pvAccessAllTests.c +testApp/rtemsConfig.c +testApp/rtemsNetworking.h +testApp/rtemsTestHarness.c +Makefile +testApp/client/O.darwin-x86/Makefile +testApp/O.darwin-x86/Makefile +testApp/remote/channelAccessIFTest.cpp +testApp/remote/channelAccessIFTest.h +testApp/remote/epicsv4Grayscale.h +testApp/remote/Makefile +testApp/remote/pipelineServiceExample.cpp +testApp/remote/rpcClientExample.cpp +testApp/remote/rpcServiceAsyncExample.cpp +testApp/remote/rpcServiceExample.cpp +testApp/remote/rpcWildServiceExample.cpp +testApp/remote/syncTestRequesters.h +testApp/remote/testADCSim.cpp +testApp/remote/testChannelAccess.cpp +testApp/remote/testChannelConnect.cpp +testApp/remote/testCodec.cpp +testApp/remote/testGetPerformance.cpp +testApp/remote/testMonitorPerformance.cpp +testApp/remote/testNTImage.cpp +testApp/remote/testRemoteClientImpl.cpp +testApp/remote/testServer.cpp +testApp/remote/testServerContext.cpp +testApp/utils/O.darwin-x86/Makefile +testApp/utils/configurationTest.cpp +testApp/utils/introspectionRegistryTest.cpp +testApp/utils/loggerTest.cpp +testApp/utils/Makefile +testApp/utils/namedLockPatternTest.cpp +testApp/utils/testAtomicBoolean.cpp +testApp/utils/testHexDump.cpp +testApp/utils/testInetAddressUtils.cpp +testApp/utils/transportRegistryTest.cpp +testApp/Makefile +testApp/pvAccessAllTests.c +testApp/rtemsConfig.c +testApp/rtemsNetworking.h +testApp/rtemsTestHarness.c diff --git a/pvAccessCPP.files.orig b/pvAccessCPP.files.orig new file mode 100644 index 0000000..60f0bd9 --- /dev/null +++ b/pvAccessCPP.files.orig @@ -0,0 +1,1180 @@ +configure/CONFIG +configure/CONFIG_SITE +configure/RELEASE +configure/RELEASE.local +configure/RULES +configure/RULES.ioc +configure/RULES_DIRS +configure/RULES_TOP +documentation/pvAccessCPP.html +documentation/README +jenkins/cloudbees_build +src/pva/pvaConstants.h +src/pva/clientFactory.cpp +src/pva/clientFactory.h +src/pva/pvaVersion.cpp +src/pva/pvaVersion.h +src/client/pvAccess.cpp +src/client/pvAccess.h +src/factory/ChannelAccessFactory.cpp +src/factory/CreateRequestFactory.cpp +src/mb/pvAccessMB.cpp +src/mb/pvAccessMB.h +src/remote/abstractResponseHandler.cpp +src/remote/beaconHandler.cpp +src/remote/beaconHandler.h +src/remote/blockingTCP.h +src/remote/blockingTCPAcceptor.cpp +src/remote/blockingTCPConnector.cpp +src/remote/blockingUDP.h +src/remote/blockingUDPConnector.cpp +src/remote/blockingUDPTransport.cpp +src/remote/channelSearchManager.h +src/remote/remote.h +src/remote/serializationHelper.cpp +src/remote/serializationHelper.h +src/remote/simpleChannelSearchManagerImpl.cpp +src/remote/simpleChannelSearchManagerImpl.h +src/remote/transportRegistry.cpp +src/remote/transportRegistry.h +src/remote/codec.cpp +src/remote/codec.h +src/remoteClient/clientContextImpl.cpp +src/remoteClient/clientContextImpl.h +src/remoteClient/clientContextImpl.h.orig +src/rpcClient/rpcClient.cpp +src/rpcClient/rpcClient.h +src/rpcService/rpcServer.cpp +src/rpcService/rpcServer.h +src/rpcService/rpcService.h +src/server/baseChannelRequester.cpp +src/server/baseChannelRequester.h +src/server/beaconEmitter.cpp +src/server/beaconEmitter.h +src/server/beaconServerStatusProvider.cpp +src/server/beaconServerStatusProvider.h +src/server/responseHandlers.cpp +src/server/responseHandlers.h +src/server/serverChannelImpl.cpp +src/server/serverChannelImpl.h +src/server/serverContext.cpp +src/server/serverContext.h +src/utils/configuration.cpp +src/utils/configuration.h +src/utils/hexDump.cpp +src/utils/hexDump.h +src/utils/inetAddressUtil.cpp +src/utils/inetAddressUtil.h +src/utils/introspectionRegistry.cpp +src/utils/introspectionRegistry.h +src/utils/likely.h +src/utils/logger.cpp +src/utils/logger.h +src/utils/namedLockPattern.h +src/utils/referenceCountingLock.cpp +src/utils/referenceCountingLock.h +scripts/gcovr +testApp/client/MockClientImpl.cpp +testApp/client/testChannelAccessFactory.cpp +testApp/client/testCreateRequest.cpp +testApp/client/testMockClient.cpp +testApp/client/testStartStop.cpp +pvtoolsSrc/eget.cpp +testApp/remote/epicsv4Grayscale.h +pvtoolsSrc/pvget.cpp +pvtoolsSrc/pvput.cpp +testApp/remote/pvutils.cpp +testApp/remote/pvutils.h +testApp/remote/rpcServiceExample.cpp +testApp/remote/testADCSim.cpp +testApp/remote/testBeaconEmitter.cpp +testApp/remote/testBeaconHandler.cpp +testApp/remote/testBlockingTCPClnt.cpp +testApp/remote/testBlockingTCPSrv.cpp +testApp/remote/testBlockingUDPClnt.cpp +testApp/remote/testBlockingUDPSrv.cpp +testApp/remote/testChannelConnect.cpp +testApp/remote/testChannelSearchManager.cpp +testApp/remote/testGetPerformance.cpp +testApp/remote/testNTImage.cpp +testApp/remote/testRemoteClientImpl.cpp +testApp/remote/testServer.cpp +testApp/remote/testServerContext.cpp +testApp/remote/testChannelAccess.cpp +testApp/remote/testCodec.cpp +testApp/utils/testAtomicBoolean.cpp +testApp/utils/configurationTest.cpp +testApp/utils/testHexDump.cpp +testApp/utils/testInetAddressUtils.cpp +testApp/utils/introspectionRegistryTest.cpp +testApp/utils/loggerTest.cpp +testApp/utils/namedLockPatternTest.cpp +testApp/utils/transportRegistryTest.cpp +DEMO +Doxyfile +README +runTestServer +TODO +pvtoolsSrc/pvinfo.cpp +src/ca/caProvider.h +src/ca/caProvider.cpp +src/ca/caChannel.h +src/ca/caChannel.cpp +testApp/utils/Makefile +testApp/remote/channelAccessIFTest.h +testApp/remote/channelAccessIFTest.cpp +testApp/remote/syncTestRequesters.h +pvtoolsSrc/eget.cpp +pvtoolsSrc/pvget.cpp +pvtoolsSrc/pvinfo.cpp +pvtoolsSrc/pvput.cpp +pvtoolsSrc/pvutils.cpp +pvtoolsSrc/pvutils.h +src/ca/caChannel.cpp +src/ca/caChannel.h +src/ca/caProvider.cpp +src/ca/caProvider.h +src/client/pvAccess.cpp +src/client/pvAccess.h +src/factory/ChannelAccessFactory.cpp +src/mb/pvAccessMB.cpp +src/mb/pvAccessMB.h +src/pva/clientFactory.cpp +src/pva/clientFactory.h +src/pva/pvaConstants.h +src/pva/pvaVersion.cpp +src/pva/pvaVersion.h +src/remote/abstractResponseHandler.cpp +src/remote/beaconHandler.cpp +src/remote/beaconHandler.h +src/remote/blockingTCP.h +src/remote/blockingTCPAcceptor.cpp +src/remote/blockingTCPConnector.cpp +src/remote/blockingUDP.h +src/remote/blockingUDPConnector.cpp +src/remote/blockingUDPTransport.cpp +src/remote/channelSearchManager.h +src/remote/codec.cpp +src/remote/codec.h +src/remote/remote.h +src/remote/serializationHelper.cpp +src/remote/serializationHelper.h +src/remote/simpleChannelSearchManagerImpl.cpp +src/remote/simpleChannelSearchManagerImpl.h +src/remote/transportRegistry.cpp +src/remote/transportRegistry.h +src/remoteClient/clientContextImpl.cpp +src/remoteClient/clientContextImpl.h +src/rpcClient/rpcClient.cpp +src/rpcClient/rpcClient.h +src/rpcService/rpcServer.cpp +src/rpcService/rpcServer.h +src/rpcService/rpcService.cpp +src/rpcService/rpcService.h +src/server/baseChannelRequester.cpp +src/server/baseChannelRequester.h +src/server/beaconEmitter.cpp +src/server/beaconEmitter.h +src/server/beaconServerStatusProvider.cpp +src/server/beaconServerStatusProvider.h +src/server/responseHandlers.cpp +src/server/responseHandlers.h +src/server/serverChannelImpl.cpp +src/server/serverChannelImpl.h +src/server/serverContext.cpp +src/server/serverContext.h +src/utils/configuration.cpp +src/utils/configuration.h +src/utils/hexDump.cpp +src/utils/hexDump.h +src/utils/inetAddressUtil.cpp +src/utils/inetAddressUtil.h +src/utils/introspectionRegistry.cpp +src/utils/introspectionRegistry.h +src/utils/likely.h +src/utils/logger.cpp +src/utils/logger.h +src/utils/namedLockPattern.h +src/utils/referenceCountingLock.cpp +src/utils/referenceCountingLock.h +src/v3ioc/PVAClientRegister.cpp +src/v3ioc/PVAServerRegister.cpp +src/v3ioc/syncChannelFind.h +testApp/client/MockClientImpl.cpp +testApp/client/testChannelAccessFactory.cpp +testApp/client/testMockClient.cpp +testApp/client/testStartStop.cpp +testApp/remote/channelAccessIFTest.cpp +testApp/remote/channelAccessIFTest.h +testApp/remote/epicsv4Grayscale.h +testApp/remote/rpcClientExample.cpp +testApp/remote/rpcServiceExample.cpp +testApp/remote/syncTestRequesters.h +testApp/remote/testADCSim.cpp +testApp/remote/testBeaconEmitter.cpp +testApp/remote/testBeaconHandler.cpp +testApp/remote/testBlockingTCPClnt.cpp +testApp/remote/testBlockingTCPSrv.cpp +testApp/remote/testBlockingUDPClnt.cpp +testApp/remote/testBlockingUDPSrv.cpp +testApp/remote/testChannelAccess.cpp +testApp/remote/testChannelConnect.cpp +testApp/remote/testChannelSearchManager.cpp +testApp/remote/testCodec.cpp +testApp/remote/testGetPerformance.cpp +testApp/remote/testMonitorPerformance.cpp +testApp/remote/testNTImage.cpp +testApp/remote/testRemoteClientImpl.cpp +testApp/remote/testServer.cpp +testApp/remote/testServerContext.cpp +testApp/utils/configurationTest.cpp +testApp/utils/introspectionRegistryTest.cpp +testApp/utils/loggerTest.cpp +testApp/utils/namedLockPatternTest.cpp +testApp/utils/testAtomicBoolean.cpp +testApp/utils/testHexDump.cpp +testApp/utils/testInetAddressUtils.cpp +testApp/utils/transportRegistryTest.cpp +src/remote/security.h +src/remote/security.cpp +include/pv/baseChannelRequester.h +include/pv/beaconEmitter.h +include/pv/beaconHandler.h +include/pv/beaconServerStatusProvider.h +include/pv/blockingTCP.h +include/pv/blockingUDP.h +include/pv/caChannel.h +include/pv/caProvider.h +include/pv/channelSearchManager.h +include/pv/clientContextImpl.h +include/pv/clientFactory.h +include/pv/codec.h +include/pv/configuration.h +include/pv/hexDump.h +include/pv/inetAddressUtil.h +include/pv/introspectionRegistry.h +include/pv/likely.h +include/pv/logger.h +include/pv/namedLockPattern.h +include/pv/pvAccess.h +include/pv/pvAccessMB.h +include/pv/pvaConstants.h +include/pv/pvaVersion.h +include/pv/referenceCountingLock.h +include/pv/remote.h +include/pv/responseHandlers.h +include/pv/rpcClient.h +include/pv/rpcServer.h +include/pv/rpcService.h +include/pv/security.h +include/pv/serializationHelper.h +include/pv/serverChannelImpl.h +include/pv/serverContext.h +include/pv/simpleChannelSearchManagerImpl.h +include/pv/syncChannelFind.h +include/pv/transportRegistry.h +include/pv/wildcard.h +pvtoolsSrc/eget.cpp +pvtoolsSrc/pvget.cpp +pvtoolsSrc/pvinfo.cpp +pvtoolsSrc/pvput.cpp +pvtoolsSrc/pvutils.cpp +pvtoolsSrc/pvutils.h +src/ca/caChannel.cpp +src/ca/caChannel.h +src/ca/caProvider.cpp +src/ca/caProvider.h +src/client/pvAccess.cpp +src/client/pvAccess.h +src/factory/ChannelAccessFactory.cpp +src/mb/pvAccessMB.cpp +src/mb/pvAccessMB.h +src/pva/clientFactory.cpp +src/pva/clientFactory.h +src/pva/pvaConstants.h +src/pva/pvaVersion.cpp +src/pva/pvaVersion.h +src/remote/abstractResponseHandler.cpp +src/remote/beaconHandler.cpp +src/remote/beaconHandler.h +src/remote/blockingTCP.h +src/remote/blockingTCPAcceptor.cpp +src/remote/blockingTCPConnector.cpp +src/remote/blockingUDP.h +src/remote/blockingUDPConnector.cpp +src/remote/blockingUDPTransport.cpp +src/remote/channelSearchManager.h +src/remote/codec.cpp +src/remote/codec.h +src/remote/remote.h +src/remote/security.cpp +src/remote/security.h +src/remote/serializationHelper.cpp +src/remote/serializationHelper.h +src/remote/simpleChannelSearchManagerImpl.cpp +src/remote/simpleChannelSearchManagerImpl.h +src/remote/transportRegistry.cpp +src/remote/transportRegistry.h +src/remoteClient/clientContextImpl.cpp +src/remoteClient/clientContextImpl.h +src/rpcClient/rpcClient.cpp +src/rpcClient/rpcClient.h +src/rpcService/rpcServer.cpp +src/rpcService/rpcServer.h +src/rpcService/rpcService.cpp +src/rpcService/rpcService.h +src/server/baseChannelRequester.cpp +src/server/baseChannelRequester.h +src/server/beaconEmitter.cpp +src/server/beaconEmitter.h +src/server/beaconServerStatusProvider.cpp +src/server/beaconServerStatusProvider.h +src/server/responseHandlers.cpp +src/server/responseHandlers.h +src/server/serverChannelImpl.cpp +src/server/serverChannelImpl.h +src/server/serverContext.cpp +src/server/serverContext.h +src/utils/configuration.cpp +src/utils/configuration.h +src/utils/hexDump.cpp +src/utils/hexDump.h +src/utils/inetAddressUtil.cpp +src/utils/inetAddressUtil.h +src/utils/introspectionRegistry.cpp +src/utils/introspectionRegistry.h +src/utils/likely.h +src/utils/logger.cpp +src/utils/logger.h +src/utils/namedLockPattern.h +src/utils/referenceCountingLock.cpp +src/utils/referenceCountingLock.h +src/utils/wildcard.cpp +src/utils/wildcard.h +src/v3ioc/PVAClientRegister.cpp +src/v3ioc/PVAServerRegister.cpp +src/v3ioc/syncChannelFind.h +testApp/client/MockClientImpl.cpp +testApp/client/testChannelAccessFactory.cpp +testApp/client/testMockClient.cpp +testApp/client/testStartStop.cpp +testApp/remote/channelAccessIFTest.cpp +testApp/remote/channelAccessIFTest.h +testApp/remote/epicsv4Grayscale.h +testApp/remote/rpcClientExample.cpp +testApp/remote/rpcServiceExample.cpp +testApp/remote/syncTestRequesters.h +testApp/remote/testADCSim.cpp +testApp/remote/testBeaconEmitter.cpp +testApp/remote/testBeaconHandler.cpp +testApp/remote/testBlockingTCPClnt.cpp +testApp/remote/testBlockingTCPSrv.cpp +testApp/remote/testBlockingUDPClnt.cpp +testApp/remote/testBlockingUDPSrv.cpp +testApp/remote/testChannelAccess.cpp +testApp/remote/testChannelConnect.cpp +testApp/remote/testChannelSearchManager.cpp +testApp/remote/testCodec.cpp +testApp/remote/testGetPerformance.cpp +testApp/remote/testMonitorPerformance.cpp +testApp/remote/testNTImage.cpp +testApp/remote/testRemoteClientImpl.cpp +testApp/remote/testServer.cpp +testApp/remote/testServerContext.cpp +testApp/utils/configurationTest.cpp +testApp/utils/introspectionRegistryTest.cpp +testApp/utils/loggerTest.cpp +testApp/utils/namedLockPatternTest.cpp +testApp/utils/testAtomicBoolean.cpp +testApp/utils/testHexDump.cpp +testApp/utils/testInetAddressUtils.cpp +testApp/utils/transportRegistryTest.cpp +include/pv/baseChannelRequester.h +include/pv/beaconEmitter.h +include/pv/beaconHandler.h +include/pv/beaconServerStatusProvider.h +include/pv/blockingTCP.h +include/pv/blockingUDP.h +include/pv/caChannel.h +include/pv/caProvider.h +include/pv/channelSearchManager.h +include/pv/clientContextImpl.h +include/pv/clientFactory.h +include/pv/codec.h +include/pv/configuration.h +include/pv/hexDump.h +include/pv/inetAddressUtil.h +include/pv/introspectionRegistry.h +include/pv/likely.h +include/pv/logger.h +include/pv/namedLockPattern.h +include/pv/pvAccess.h +include/pv/pvAccessMB.h +include/pv/pvaConstants.h +include/pv/pvaVersion.h +include/pv/referenceCountingLock.h +include/pv/remote.h +include/pv/responseHandlers.h +include/pv/rpcClient.h +include/pv/rpcServer.h +include/pv/rpcService.h +include/pv/security.h +include/pv/serializationHelper.h +include/pv/serverChannelImpl.h +include/pv/serverContext.h +include/pv/simpleChannelSearchManagerImpl.h +include/pv/syncChannelFind.h +include/pv/transportRegistry.h +include/pv/wildcard.h +pvtoolsSrc/eget.cpp +pvtoolsSrc/pvget.cpp +pvtoolsSrc/pvinfo.cpp +pvtoolsSrc/pvput.cpp +pvtoolsSrc/pvutils.cpp +pvtoolsSrc/pvutils.h +src/ca/caChannel.cpp +src/ca/caChannel.h +src/ca/caProvider.cpp +src/ca/caProvider.h +src/client/pvAccess.cpp +src/client/pvAccess.h +src/factory/ChannelAccessFactory.cpp +src/mb/pvAccessMB.cpp +src/mb/pvAccessMB.h +src/pva/clientFactory.cpp +src/pva/clientFactory.h +src/pva/pvaConstants.h +src/pva/pvaVersion.cpp +src/pva/pvaVersion.h +src/remote/abstractResponseHandler.cpp +src/remote/beaconHandler.cpp +src/remote/beaconHandler.h +src/remote/blockingTCP.h +src/remote/blockingTCPAcceptor.cpp +src/remote/blockingTCPConnector.cpp +src/remote/blockingUDP.h +src/remote/blockingUDPConnector.cpp +src/remote/blockingUDPTransport.cpp +src/remote/channelSearchManager.h +src/remote/codec.cpp +src/remote/codec.h +src/remote/remote.h +src/remote/security.cpp +src/remote/security.h +src/remote/serializationHelper.cpp +src/remote/serializationHelper.h +src/remote/simpleChannelSearchManagerImpl.cpp +src/remote/simpleChannelSearchManagerImpl.h +src/remote/transportRegistry.cpp +src/remote/transportRegistry.h +src/remoteClient/clientContextImpl.cpp +src/remoteClient/clientContextImpl.h +src/rpcClient/rpcClient.cpp +src/rpcClient/rpcClient.h +src/rpcService/rpcServer.cpp +src/rpcService/rpcServer.h +src/rpcService/rpcService.cpp +src/rpcService/rpcService.h +src/server/baseChannelRequester.cpp +src/server/baseChannelRequester.h +src/server/beaconEmitter.cpp +src/server/beaconEmitter.h +src/server/beaconServerStatusProvider.cpp +src/server/beaconServerStatusProvider.h +src/server/responseHandlers.cpp +src/server/responseHandlers.h +src/server/serverChannelImpl.cpp +src/server/serverChannelImpl.h +src/server/serverContext.cpp +src/server/serverContext.h +src/utils/configuration.cpp +src/utils/configuration.h +src/utils/hexDump.cpp +src/utils/hexDump.h +src/utils/inetAddressUtil.cpp +src/utils/inetAddressUtil.h +src/utils/introspectionRegistry.cpp +src/utils/introspectionRegistry.h +src/utils/likely.h +src/utils/logger.cpp +src/utils/logger.h +src/utils/namedLockPattern.h +src/utils/referenceCountingLock.cpp +src/utils/referenceCountingLock.h +src/utils/wildcard.cpp +src/utils/wildcard.h +src/v3ioc/PVAClientRegister.cpp +src/v3ioc/PVAServerRegister.cpp +src/v3ioc/syncChannelFind.h +testApp/client/MockClientImpl.cpp +testApp/client/testChannelAccessFactory.cpp +testApp/client/testMockClient.cpp +testApp/client/testStartStop.cpp +testApp/remote/channelAccessIFTest.cpp +testApp/remote/channelAccessIFTest.h +testApp/remote/epicsv4Grayscale.h +testApp/remote/rpcClientExample.cpp +testApp/remote/rpcServiceExample.cpp +testApp/remote/rpcWildServiceExample.cpp +testApp/remote/syncTestRequesters.h +testApp/remote/testADCSim.cpp +testApp/remote/testBeaconEmitter.cpp +testApp/remote/testBeaconHandler.cpp +testApp/remote/testBlockingTCPClnt.cpp +testApp/remote/testBlockingTCPSrv.cpp +testApp/remote/testBlockingUDPClnt.cpp +testApp/remote/testBlockingUDPSrv.cpp +testApp/remote/testChannelAccess.cpp +testApp/remote/testChannelConnect.cpp +testApp/remote/testChannelSearchManager.cpp +testApp/remote/testCodec.cpp +testApp/remote/testGetPerformance.cpp +testApp/remote/testMonitorPerformance.cpp +testApp/remote/testNTImage.cpp +testApp/remote/testRemoteClientImpl.cpp +testApp/remote/testServer.cpp +testApp/remote/testServerContext.cpp +testApp/utils/configurationTest.cpp +testApp/utils/introspectionRegistryTest.cpp +testApp/utils/loggerTest.cpp +testApp/utils/namedLockPatternTest.cpp +testApp/utils/testAtomicBoolean.cpp +testApp/utils/testHexDump.cpp +testApp/utils/testInetAddressUtils.cpp +testApp/utils/transportRegistryTest.cpp +pvtoolsSrc/pvlist.cpp +configure/O.darwin-x86/Makefile +configure/Makefile +include/pv/baseChannelRequester.h +include/pv/beaconEmitter.h +include/pv/beaconHandler.h +include/pv/beaconServerStatusProvider.h +include/pv/blockingTCP.h +include/pv/blockingUDP.h +include/pv/caChannel.h +include/pv/caProvider.h +include/pv/channelSearchManager.h +include/pv/clientContextImpl.h +include/pv/clientFactory.h +include/pv/codec.h +include/pv/configuration.h +include/pv/hexDump.h +include/pv/inetAddressUtil.h +include/pv/introspectionRegistry.h +include/pv/likely.h +include/pv/logger.h +include/pv/namedLockPattern.h +include/pv/pvAccess.h +include/pv/pvAccessMB.h +include/pv/pvaConstants.h +include/pv/pvaVersion.h +include/pv/referenceCountingLock.h +include/pv/remote.h +include/pv/responseHandlers.h +include/pv/rpcClient.h +include/pv/rpcServer.h +include/pv/rpcService.h +include/pv/security.h +include/pv/serializationHelper.h +include/pv/serverChannelImpl.h +include/pv/serverContext.h +include/pv/simpleChannelSearchManagerImpl.h +include/pv/syncChannelFind.h +include/pv/transportRegistry.h +include/pv/wildcard.h +pvtoolsSrc/O.darwin-x86/Makefile +pvtoolsSrc/eget.cpp +pvtoolsSrc/Makefile +pvtoolsSrc/pvget.cpp +pvtoolsSrc/pvinfo.cpp +pvtoolsSrc/pvlist.cpp +pvtoolsSrc/pvput.cpp +pvtoolsSrc/pvutils.cpp +pvtoolsSrc/pvutils.h +src/ca/caChannel.cpp +src/ca/caChannel.h +src/ca/caProvider.cpp +src/ca/caProvider.h +src/ca/Makefile +src/client/Makefile +src/client/pvAccess.cpp +src/client/pvAccess.h +src/factory/ChannelAccessFactory.cpp +src/factory/Makefile +src/mb/Makefile +src/mb/pvAccessMB.cpp +src/mb/pvAccessMB.h +src/O.darwin-x86/Makefile +src/pva/clientFactory.cpp +src/pva/clientFactory.h +src/pva/Makefile +src/pva/pvaConstants.h +src/pva/pvaVersion.cpp +src/pva/pvaVersion.h +src/remote/abstractResponseHandler.cpp +src/remote/beaconHandler.cpp +src/remote/beaconHandler.h +src/remote/blockingTCP.h +src/remote/blockingTCPAcceptor.cpp +src/remote/blockingTCPConnector.cpp +src/remote/blockingUDP.h +src/remote/blockingUDPConnector.cpp +src/remote/blockingUDPTransport.cpp +src/remote/channelSearchManager.h +src/remote/codec.cpp +src/remote/codec.h +src/remote/Makefile +src/remote/remote.h +src/remote/security.cpp +src/remote/security.h +src/remote/serializationHelper.cpp +src/remote/serializationHelper.h +src/remote/simpleChannelSearchManagerImpl.cpp +src/remote/simpleChannelSearchManagerImpl.h +src/remote/transportRegistry.cpp +src/remote/transportRegistry.h +src/remoteClient/clientContextImpl.cpp +src/remoteClient/clientContextImpl.h +src/remoteClient/Makefile +src/rpcClient/Makefile +src/rpcClient/rpcClient.cpp +src/rpcClient/rpcClient.h +src/rpcService/Makefile +src/rpcService/rpcServer.cpp +src/rpcService/rpcServer.h +src/rpcService/rpcService.cpp +src/rpcService/rpcService.h +src/server/baseChannelRequester.cpp +src/server/baseChannelRequester.h +src/server/beaconEmitter.cpp +src/server/beaconEmitter.h +src/server/beaconServerStatusProvider.cpp +src/server/beaconServerStatusProvider.h +src/server/Makefile +src/server/responseHandlers.cpp +src/server/responseHandlers.h +src/server/serverChannelImpl.cpp +src/server/serverChannelImpl.h +src/server/serverContext.cpp +src/server/serverContext.h +src/utils/configuration.cpp +src/utils/configuration.h +src/utils/hexDump.cpp +src/utils/hexDump.h +src/utils/inetAddressUtil.cpp +src/utils/inetAddressUtil.h +src/utils/introspectionRegistry.cpp +src/utils/introspectionRegistry.h +src/utils/likely.h +src/utils/logger.cpp +src/utils/logger.h +src/utils/Makefile +src/utils/namedLockPattern.h +src/utils/referenceCountingLock.cpp +src/utils/referenceCountingLock.h +src/utils/wildcard.cpp +src/utils/wildcard.h +src/v3ioc/Makefile +src/v3ioc/PVAClientRegister.cpp +src/v3ioc/PVAServerRegister.cpp +src/v3ioc/syncChannelFind.h +src/Makefile +testApp/client/O.darwin-x86/Makefile +testApp/client/Makefile +testApp/client/MockClientImpl.cpp +testApp/client/testChannelAccessFactory.cpp +testApp/client/testMockClient.cpp +testApp/client/testStartStop.cpp +testApp/remote/O.darwin-x86/Makefile +testApp/remote/channelAccessIFTest.cpp +testApp/remote/channelAccessIFTest.h +testApp/remote/epicsv4Grayscale.h +testApp/remote/Makefile +testApp/remote/rpcClientExample.cpp +testApp/remote/rpcServiceExample.cpp +testApp/remote/rpcWildServiceExample.cpp +testApp/remote/syncTestRequesters.h +testApp/remote/testADCSim.cpp +testApp/remote/testBeaconEmitter.cpp +testApp/remote/testBeaconHandler.cpp +testApp/remote/testBlockingTCPClnt.cpp +testApp/remote/testBlockingTCPSrv.cpp +testApp/remote/testBlockingUDPClnt.cpp +testApp/remote/testBlockingUDPSrv.cpp +testApp/remote/testChannelAccess.cpp +testApp/remote/testChannelConnect.cpp +testApp/remote/testChannelSearchManager.cpp +testApp/remote/testCodec.cpp +testApp/remote/testGetPerformance.cpp +testApp/remote/testMonitorPerformance.cpp +testApp/remote/testNTImage.cpp +testApp/remote/testRemoteClientImpl.cpp +testApp/remote/testServer.cpp +testApp/remote/testServerContext.cpp +testApp/utils/O.darwin-x86/Makefile +testApp/utils/configurationTest.cpp +testApp/utils/introspectionRegistryTest.cpp +testApp/utils/loggerTest.cpp +testApp/utils/Makefile +testApp/utils/namedLockPatternTest.cpp +testApp/utils/testAtomicBoolean.cpp +testApp/utils/testHexDump.cpp +testApp/utils/testInetAddressUtils.cpp +testApp/utils/transportRegistryTest.cpp +testApp/Makefile +Makefile +configure/O.darwin-x86/Makefile +configure/Makefile +include/pv/baseChannelRequester.h +include/pv/beaconEmitter.h +include/pv/beaconHandler.h +include/pv/beaconServerStatusProvider.h +include/pv/blockingTCP.h +include/pv/blockingUDP.h +include/pv/caChannel.h +include/pv/caProvider.h +include/pv/channelSearchManager.h +include/pv/clientContextImpl.h +include/pv/clientFactory.h +include/pv/codec.h +include/pv/configuration.h +include/pv/hexDump.h +include/pv/inetAddressUtil.h +include/pv/introspectionRegistry.h +include/pv/likely.h +include/pv/logger.h +include/pv/namedLockPattern.h +include/pv/pvAccess.h +include/pv/pvAccessMB.h +include/pv/pvaConstants.h +include/pv/pvaVersion.h +include/pv/referenceCountingLock.h +include/pv/remote.h +include/pv/responseHandlers.h +include/pv/rpcClient.h +include/pv/rpcServer.h +include/pv/rpcService.h +include/pv/security.h +include/pv/serializationHelper.h +include/pv/serverChannelImpl.h +include/pv/serverContext.h +include/pv/simpleChannelSearchManagerImpl.h +include/pv/syncChannelFind.h +include/pv/transportRegistry.h +include/pv/wildcard.h +pvtoolsSrc/O.darwin-x86/Makefile +pvtoolsSrc/eget.cpp +pvtoolsSrc/Makefile +pvtoolsSrc/pvget.cpp +pvtoolsSrc/pvinfo.cpp +pvtoolsSrc/pvlist.cpp +pvtoolsSrc/pvput.cpp +pvtoolsSrc/pvutils.cpp +pvtoolsSrc/pvutils.h +src/ca/caChannel.cpp +src/ca/caChannel.h +src/ca/caProvider.cpp +src/ca/caProvider.h +src/ca/Makefile +src/client/Makefile +src/client/pvAccess.cpp +src/client/pvAccess.h +src/factory/ChannelAccessFactory.cpp +src/factory/Makefile +src/mb/Makefile +src/mb/pvAccessMB.cpp +src/mb/pvAccessMB.h +src/O.darwin-x86/Makefile +src/pva/clientFactory.cpp +src/pva/clientFactory.h +src/pva/Makefile +src/pva/pvaConstants.h +src/pva/pvaVersion.cpp +src/pva/pvaVersion.h +src/remote/abstractResponseHandler.cpp +src/remote/beaconHandler.cpp +src/remote/beaconHandler.h +src/remote/blockingTCP.h +src/remote/blockingTCPAcceptor.cpp +src/remote/blockingTCPConnector.cpp +src/remote/blockingUDP.h +src/remote/blockingUDPConnector.cpp +src/remote/blockingUDPTransport.cpp +src/remote/channelSearchManager.h +src/remote/codec.cpp +src/remote/codec.h +src/remote/Makefile +src/remote/remote.h +src/remote/security.cpp +src/remote/security.h +src/remote/serializationHelper.cpp +src/remote/serializationHelper.h +src/remote/simpleChannelSearchManagerImpl.cpp +src/remote/simpleChannelSearchManagerImpl.h +src/remote/transportRegistry.cpp +src/remote/transportRegistry.h +src/remoteClient/clientContextImpl.cpp +src/remoteClient/clientContextImpl.h +src/remoteClient/Makefile +src/rpcClient/Makefile +src/rpcClient/rpcClient.cpp +src/rpcClient/rpcClient.h +src/rpcService/Makefile +src/rpcService/rpcServer.cpp +src/rpcService/rpcServer.h +src/rpcService/rpcService.cpp +src/rpcService/rpcService.h +src/server/baseChannelRequester.cpp +src/server/baseChannelRequester.h +src/server/beaconEmitter.cpp +src/server/beaconEmitter.h +src/server/beaconServerStatusProvider.cpp +src/server/beaconServerStatusProvider.h +src/server/Makefile +src/server/responseHandlers.cpp +src/server/responseHandlers.h +src/server/serverChannelImpl.cpp +src/server/serverChannelImpl.h +src/server/serverContext.cpp +src/server/serverContext.h +src/utils/configuration.cpp +src/utils/configuration.h +src/utils/hexDump.cpp +src/utils/hexDump.h +src/utils/inetAddressUtil.cpp +src/utils/inetAddressUtil.h +src/utils/introspectionRegistry.cpp +src/utils/introspectionRegistry.h +src/utils/likely.h +src/utils/logger.cpp +src/utils/logger.h +src/utils/Makefile +src/utils/namedLockPattern.h +src/utils/referenceCountingLock.cpp +src/utils/referenceCountingLock.h +src/utils/wildcard.cpp +src/utils/wildcard.h +src/v3ioc/Makefile +src/v3ioc/PVAClientRegister.cpp +src/v3ioc/PVAServerRegister.cpp +src/v3ioc/syncChannelFind.h +src/Makefile +testApp/O.darwin-x86/Makefile +testApp/remote/O.darwin-x86/Makefile +testApp/remote/channelAccessIFTest.cpp +testApp/remote/channelAccessIFTest.h +testApp/remote/epicsv4Grayscale.h +testApp/remote/Makefile +testApp/remote/rpcClientExample.cpp +testApp/remote/rpcServiceExample.cpp +testApp/remote/rpcWildServiceExample.cpp +testApp/remote/syncTestRequesters.h +testApp/remote/testADCSim.cpp +testApp/remote/testChannelAccess.cpp +testApp/remote/testChannelConnect.cpp +testApp/remote/testCodec.cpp +testApp/remote/testGetPerformance.cpp +testApp/remote/testMonitorPerformance.cpp +testApp/remote/testNTImage.cpp +testApp/remote/testRemoteClientImpl.cpp +testApp/remote/testServer.cpp +testApp/remote/testServerContext.cpp +testApp/utils/O.darwin-x86/Makefile +testApp/utils/configurationTest.cpp +testApp/utils/introspectionRegistryTest.cpp +testApp/utils/loggerTest.cpp +testApp/utils/Makefile +testApp/utils/namedLockPatternTest.cpp +testApp/utils/testAtomicBoolean.cpp +testApp/utils/testHexDump.cpp +testApp/utils/testInetAddressUtils.cpp +testApp/utils/transportRegistryTest.cpp +testApp/Makefile +testApp/pvAccessAllTests.c +Makefile +testApp/remote/channelAccessIFTest.cpp +testApp/remote/channelAccessIFTest.h +testApp/remote/epicsv4Grayscale.h +testApp/remote/rpcClientExample.cpp +testApp/remote/rpcServiceExample.cpp +testApp/remote/rpcWildServiceExample.cpp +testApp/remote/syncTestRequesters.h +testApp/remote/testADCSim.cpp +testApp/remote/testChannelAccess.cpp +testApp/remote/testChannelConnect.cpp +testApp/remote/testCodec.cpp +testApp/remote/testGetPerformance.cpp +testApp/remote/testMonitorPerformance.cpp +testApp/remote/testNTImage.cpp +testApp/remote/testRemoteClientImpl.cpp +testApp/remote/testServer.cpp +testApp/remote/testServerContext.cpp +testApp/utils/configurationTest.cpp +testApp/utils/introspectionRegistryTest.cpp +testApp/utils/loggerTest.cpp +testApp/utils/namedLockPatternTest.cpp +testApp/utils/testAtomicBoolean.cpp +testApp/utils/testHexDump.cpp +testApp/utils/testInetAddressUtils.cpp +testApp/utils/transportRegistryTest.cpp +testApp/pvAccessAllTests.c +testApp/rtemsConfig.c +testApp/rtemsNetworking.h +testApp/rtemsTestHarness.c +src/ca/caStatus.cpp +src/ca/caStatus.h +testApp/client/O.darwin-x86/Makefile +testApp/O.darwin-x86/Makefile +testApp/remote/O.darwin-x86/Makefile +testApp/remote/channelAccessIFTest.cpp +testApp/remote/channelAccessIFTest.h +testApp/remote/epicsv4Grayscale.h +testApp/remote/Makefile +testApp/remote/rpcClientExample.cpp +testApp/remote/rpcServiceAsyncExample.cpp +testApp/remote/rpcServiceExample.cpp +testApp/remote/rpcWildServiceExample.cpp +testApp/remote/syncTestRequesters.h +testApp/remote/testADCSim.cpp +testApp/remote/testChannelAccess.cpp +testApp/remote/testChannelConnect.cpp +testApp/remote/testCodec.cpp +testApp/remote/testGetPerformance.cpp +testApp/remote/testMonitorPerformance.cpp +testApp/remote/testNTImage.cpp +testApp/remote/testRemoteClientImpl.cpp +testApp/remote/testServer.cpp +testApp/remote/testServerContext.cpp +testApp/utils/O.darwin-x86/Makefile +testApp/utils/configurationTest.cpp +testApp/utils/introspectionRegistryTest.cpp +testApp/utils/loggerTest.cpp +testApp/utils/Makefile +testApp/utils/namedLockPatternTest.cpp +testApp/utils/testAtomicBoolean.cpp +testApp/utils/testHexDump.cpp +testApp/utils/testInetAddressUtils.cpp +testApp/utils/transportRegistryTest.cpp +testApp/Makefile +testApp/pvAccessAllTests.c +testApp/rtemsConfig.c +testApp/rtemsNetworking.h +testApp/rtemsTestHarness.c +configure/O.darwin-x86/Makefile +configure/Makefile +include/pv/baseChannelRequester.h +include/pv/beaconEmitter.h +include/pv/beaconHandler.h +include/pv/beaconServerStatusProvider.h +include/pv/blockingTCP.h +include/pv/blockingUDP.h +include/pv/caChannel.h +include/pv/caProvider.h +include/pv/caStatus.h +include/pv/channelSearchManager.h +include/pv/clientContextImpl.h +include/pv/clientFactory.h +include/pv/codec.h +include/pv/configuration.h +include/pv/hexDump.h +include/pv/inetAddressUtil.h +include/pv/introspectionRegistry.h +include/pv/likely.h +include/pv/logger.h +include/pv/namedLockPattern.h +include/pv/pvAccess.h +include/pv/pvAccessMB.h +include/pv/pvaConstants.h +include/pv/pvaVersion.h +include/pv/referenceCountingLock.h +include/pv/remote.h +include/pv/responseHandlers.h +include/pv/rpcClient.h +include/pv/rpcServer.h +include/pv/rpcService.h +include/pv/security.h +include/pv/serializationHelper.h +include/pv/serverChannelImpl.h +include/pv/serverContext.h +include/pv/simpleChannelSearchManagerImpl.h +include/pv/syncChannelFind.h +include/pv/transportRegistry.h +include/pv/wildcard.h +pvtoolsSrc/O.darwin-x86/Makefile +pvtoolsSrc/eget.cpp +pvtoolsSrc/Makefile +pvtoolsSrc/pvget.cpp +pvtoolsSrc/pvinfo.cpp +pvtoolsSrc/pvlist.cpp +pvtoolsSrc/pvput.cpp +pvtoolsSrc/pvutils.cpp +pvtoolsSrc/pvutils.h +src/ca/caChannel.cpp +src/ca/caChannel.h +src/ca/caProvider.cpp +src/ca/caProvider.h +src/ca/caStatus.cpp +src/ca/caStatus.h +src/ca/Makefile +src/client/Makefile +src/client/pvAccess.cpp +src/client/pvAccess.h +src/factory/ChannelAccessFactory.cpp +src/factory/Makefile +src/ioc/Makefile +src/ioc/PVAClientRegister.cpp +src/ioc/PVAServerRegister.cpp +src/ioc/syncChannelFind.h +src/mb/Makefile +src/mb/pvAccessMB.cpp +src/mb/pvAccessMB.h +src/O.darwin-x86/Makefile +src/pipelineService/Makefile +src/pipelineService/pipelineServer.cpp +src/pipelineService/pipelineServer.h +src/pipelineService/pipelineService.cpp +src/pipelineService/pipelineService.h +src/pva/clientFactory.cpp +src/pva/clientFactory.h +src/pva/Makefile +src/pva/pvaConstants.h +src/pva/pvaVersion.cpp +src/pva/pvaVersion.h +src/remote/abstractResponseHandler.cpp +src/remote/beaconHandler.cpp +src/remote/beaconHandler.h +src/remote/blockingTCP.h +src/remote/blockingTCPAcceptor.cpp +src/remote/blockingTCPConnector.cpp +src/remote/blockingUDP.h +src/remote/blockingUDPConnector.cpp +src/remote/blockingUDPTransport.cpp +src/remote/channelSearchManager.h +src/remote/codec.cpp +src/remote/codec.h +src/remote/Makefile +src/remote/remote.h +src/remote/security.cpp +src/remote/security.h +src/remote/serializationHelper.cpp +src/remote/serializationHelper.h +src/remote/simpleChannelSearchManagerImpl.cpp +src/remote/simpleChannelSearchManagerImpl.h +src/remote/transportRegistry.cpp +src/remote/transportRegistry.h +src/remoteClient/clientContextImpl.cpp +src/remoteClient/clientContextImpl.h +src/remoteClient/Makefile +src/rpcClient/Makefile +src/rpcClient/rpcClient.cpp +src/rpcClient/rpcClient.h +src/rpcService/Makefile +src/rpcService/rpcServer.cpp +src/rpcService/rpcServer.h +src/rpcService/rpcService.cpp +src/rpcService/rpcService.h +src/server/baseChannelRequester.cpp +src/server/baseChannelRequester.h +src/server/beaconEmitter.cpp +src/server/beaconEmitter.h +src/server/beaconServerStatusProvider.cpp +src/server/beaconServerStatusProvider.h +src/server/Makefile +src/server/responseHandlers.cpp +src/server/responseHandlers.h +src/server/serverChannelImpl.cpp +src/server/serverChannelImpl.h +src/server/serverContext.cpp +src/server/serverContext.h +src/utils/configuration.cpp +src/utils/configuration.h +src/utils/hexDump.cpp +src/utils/hexDump.h +src/utils/inetAddressUtil.cpp +src/utils/inetAddressUtil.h +src/utils/introspectionRegistry.cpp +src/utils/introspectionRegistry.h +src/utils/likely.h +src/utils/logger.cpp +src/utils/logger.h +src/utils/Makefile +src/utils/namedLockPattern.h +src/utils/referenceCountingLock.cpp +src/utils/referenceCountingLock.h +src/utils/wildcard.cpp +src/utils/wildcard.h +src/Makefile +testApp/client/O.darwin-x86/Makefile +testApp/O.darwin-x86/Makefile +testApp/remote/channelAccessIFTest.cpp +testApp/remote/channelAccessIFTest.h +testApp/remote/epicsv4Grayscale.h +testApp/remote/Makefile +testApp/remote/rpcClientExample.cpp +testApp/remote/rpcServiceAsyncExample.cpp +testApp/remote/rpcServiceExample.cpp +testApp/remote/rpcWildServiceExample.cpp +testApp/remote/syncTestRequesters.h +testApp/remote/testADCSim.cpp +testApp/remote/testChannelAccess.cpp +testApp/remote/testChannelConnect.cpp +testApp/remote/testCodec.cpp +testApp/remote/testGetPerformance.cpp +testApp/remote/testMonitorPerformance.cpp +testApp/remote/testNTImage.cpp +testApp/remote/testRemoteClientImpl.cpp +testApp/remote/testServer.cpp +testApp/remote/testServerContext.cpp +testApp/utils/O.darwin-x86/Makefile +testApp/utils/configurationTest.cpp +testApp/utils/introspectionRegistryTest.cpp +testApp/utils/loggerTest.cpp +testApp/utils/Makefile +testApp/utils/namedLockPatternTest.cpp +testApp/utils/testAtomicBoolean.cpp +testApp/utils/testHexDump.cpp +testApp/utils/testInetAddressUtils.cpp +testApp/utils/transportRegistryTest.cpp +testApp/Makefile +testApp/pvAccessAllTests.c +testApp/rtemsConfig.c +testApp/rtemsNetworking.h +testApp/rtemsTestHarness.c +Makefile +testApp/client/O.darwin-x86/Makefile +testApp/O.darwin-x86/Makefile +testApp/remote/channelAccessIFTest.cpp +testApp/remote/channelAccessIFTest.h +testApp/remote/epicsv4Grayscale.h +testApp/remote/Makefile +testApp/remote/pipelineServiceExample.cpp +testApp/remote/rpcClientExample.cpp +testApp/remote/rpcServiceAsyncExample.cpp +testApp/remote/rpcServiceExample.cpp +testApp/remote/rpcWildServiceExample.cpp +testApp/remote/syncTestRequesters.h +testApp/remote/testADCSim.cpp +testApp/remote/testChannelAccess.cpp +testApp/remote/testChannelConnect.cpp +testApp/remote/testCodec.cpp +testApp/remote/testGetPerformance.cpp +testApp/remote/testMonitorPerformance.cpp +testApp/remote/testNTImage.cpp +testApp/remote/testRemoteClientImpl.cpp +testApp/remote/testServer.cpp +testApp/remote/testServerContext.cpp +testApp/utils/O.darwin-x86/Makefile +testApp/utils/configurationTest.cpp +testApp/utils/introspectionRegistryTest.cpp +testApp/utils/loggerTest.cpp +testApp/utils/Makefile +testApp/utils/namedLockPatternTest.cpp +testApp/utils/testAtomicBoolean.cpp +testApp/utils/testHexDump.cpp +testApp/utils/testInetAddressUtils.cpp +testApp/utils/transportRegistryTest.cpp +testApp/Makefile +testApp/pvAccessAllTests.c +testApp/rtemsConfig.c +testApp/rtemsNetworking.h +testApp/rtemsTestHarness.c diff --git a/src/Makefile b/src/Makefile index a456b70..6d6b9d7 100644 --- a/src/Makefile +++ b/src/Makefile @@ -17,6 +17,7 @@ include $(PVACCESS_SRC)/remoteClient/Makefile include $(PVACCESS_SRC)/server/Makefile include $(PVACCESS_SRC)/rpcService/Makefile include $(PVACCESS_SRC)/rpcClient/Makefile +include $(PVACCESS_SRC)/pipelineService/Makefile include $(PVACCESS_SRC)/ca/Makefile include $(PVACCESS_SRC)/mb/Makefile include $(PVACCESS_SRC)/ioc/Makefile diff --git a/src/ca/caChannel.cpp b/src/ca/caChannel.cpp index d6d34ad..f578de8 100644 --- a/src/ca/caChannel.cpp +++ b/src/ca/caChannel.cpp @@ -1444,6 +1444,10 @@ void CAChannelMonitor::release(epics::pvData::MonitorElementPtr const & /*monito // noop } +void CAChannelMonitor::reportRemoteQueueStatus(int32 /*freeElements*/) +{ + // noop +} /* --------------- epics::pvData::ChannelRequest --------------- */ diff --git a/src/ca/caChannel.h b/src/ca/caChannel.h index ebeb437..7111932 100644 --- a/src/ca/caChannel.h +++ b/src/ca/caChannel.h @@ -264,6 +264,7 @@ public: virtual epics::pvData::Status stop(); virtual epics::pvData::MonitorElementPtr poll(); virtual void release(epics::pvData::MonitorElementPtr const & monitorElement); + virtual void reportRemoteQueueStatus(epics::pvData::int32 freeElements); /* --------------- epics::pvData::ChannelRequest --------------- */ diff --git a/src/pipelineService/Makefile b/src/pipelineService/Makefile new file mode 100644 index 0000000..005179c --- /dev/null +++ b/src/pipelineService/Makefile @@ -0,0 +1,9 @@ +# This is a Makefile fragment, see ../Makefile + +SRC_DIRS += $(PVACCESS_SRC)/pipelineService + +INC += pipelineService.h +INC += pipelineServer.h + +LIBSRCS += pipelineService.cpp +LIBSRCS += pipelineServer.cpp diff --git a/src/pipelineService/pipelineServer.cpp b/src/pipelineService/pipelineServer.cpp new file mode 100644 index 0000000..94caf92 --- /dev/null +++ b/src/pipelineService/pipelineServer.cpp @@ -0,0 +1,752 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvAccessCPP is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#include +#include +#include +#include + +#define epicsExportSharedSymbols +#include +#include + +using namespace epics::pvData; +using namespace std; + +namespace epics { namespace pvAccess { + +class ChannelPipelineMonitorImpl : + public Monitor, + public PipelineControl, + public std::tr1::enable_shared_from_this +{ + private: + + typedef vector FreeElementQueue; + typedef queue MonitorElementQueue; + + Channel::shared_pointer m_channel; + MonitorRequester::shared_pointer m_monitorRequester; + PipelineSession::shared_pointer m_pipelineSession; + + size_t m_queueSize; + + FreeElementQueue m_freeQueue; + MonitorElementQueue m_monitorQueue; + + Mutex m_freeQueueLock; + Mutex m_monitorQueueLock; + + AtomicBoolean m_active; + MonitorElement::shared_pointer m_nullMonitorElement; + + size_t m_requestedCount; + + public: + ChannelPipelineMonitorImpl( + Channel::shared_pointer const & channel, + MonitorRequester::shared_pointer const & monitorRequester, + epics::pvData::PVStructure::shared_pointer const & pvRequest, + PipelineService::shared_pointer const & pipelineService) : + m_channel(channel), + m_monitorRequester(monitorRequester), + m_queueSize(2), + m_freeQueueLock(), + m_monitorQueueLock(), + m_active(), + m_requestedCount(0) + { + + m_pipelineSession = pipelineService->createPipeline(pvRequest); + + // extract queueSize parameter + PVFieldPtr pvField = pvRequest->getSubField("record._options"); + if (pvField.get()) { + PVStructurePtr pvOptions = static_pointer_cast(pvField); + pvField = pvOptions->getSubField("queueSize"); + if (pvField.get()) { + PVStringPtr pvString = pvOptions->getStringField("queueSize"); + if(pvString) { + int32 size; + std::stringstream ss; + ss << pvString->get(); + ss >> size; + if (size < 2) size = 2; + m_queueSize = static_cast(size); + } + } + } + + // server queue size must be >= client queue size + size_t minQueueSize = m_pipelineSession->getMinQueueSize(); + if (m_queueSize < minQueueSize) + m_queueSize = minQueueSize; + + Structure::const_shared_pointer structure = m_pipelineSession->getStructure(); + + // create free elements + { + Lock guard(m_freeQueueLock); + m_freeQueue.reserve(m_queueSize); + for (int32 i = 0; i < m_queueSize; i++) + { + PVStructure::shared_pointer pvStructure = getPVDataCreate()->createPVStructure(structure); + MonitorElement::shared_pointer monitorElement(new MonitorElement(pvStructure)); + // we always send all + monitorElement->changedBitSet->set(0); + m_freeQueue.push_back(monitorElement); + } + } + } + + PipelineSession::shared_pointer getPipelineSession() const { + return m_pipelineSession; + } + + virtual ~ChannelPipelineMonitorImpl() + { + destroy(); + } + + virtual Status start() + { + // already started + if (m_active.get()) + return Status::Ok; + + m_active.set(); + + m_monitorQueueLock.lock(); + bool notify = (m_monitorQueue.size() != 0); + m_monitorQueueLock.unlock(); + + if (notify) + { + Monitor::shared_pointer thisPtr = shared_from_this(); + m_monitorRequester->monitorEvent(thisPtr); + } + + return Status::Ok; + } + + virtual Status stop() + { + m_active.clear(); + return Status::Ok; + } + + // get next free element + virtual MonitorElement::shared_pointer poll() + { + Lock guard(m_monitorQueueLock); + + // do not give send more elements than m_requestedCount + // even if m_monitorQueue is not empty + if (m_monitorQueue.empty() || m_requestedCount == 0) + return m_nullMonitorElement; + + MonitorElement::shared_pointer element = m_monitorQueue.front(); + m_monitorQueue.pop(); + + m_requestedCount--; + + return element; + } + + virtual void release(MonitorElement::shared_pointer const & monitorElement) + { + Lock guard(m_freeQueueLock); + m_freeQueue.push_back(monitorElement); + } + + virtual void reportRemoteQueueStatus(int32 freeElements) + { + // TODO check + size_t count = static_cast(freeElements); + + //std::cout << "reportRemoteQueueStatus(" << count << ')' << std::endl; + + bool notify = false; + { + Lock guard(m_monitorQueueLock); + m_requestedCount += count; + notify = (m_monitorQueue.size() != 0); + } + + // notify + // TODO too many notify calls? + if (notify) + { + Monitor::shared_pointer thisPtr = shared_from_this(); + m_monitorRequester->monitorEvent(thisPtr); + } + + m_pipelineSession->request(shared_from_this(), count); + } + + virtual void destroy() + { + stop(); + } + + virtual void lock() + { + // noop + } + + virtual void unlock() + { + // noop + } + + virtual size_t getFreeElementCount() { + Lock guard(m_freeQueueLock); + return m_freeQueue.size(); + } + + virtual size_t getRequestedCount() { + // TODO consider using atomic ops + Lock guard(m_monitorQueueLock); + return m_requestedCount; + } + + virtual epics::pvData::MonitorElement::shared_pointer getFreeElement() { + Lock guard(m_freeQueueLock); + if (m_freeQueue.empty()) + return m_nullMonitorElement; + + MonitorElement::shared_pointer freeElement = m_freeQueue.back(); + m_freeQueue.pop_back(); + + return freeElement; + } + + virtual void putElement(epics::pvData::MonitorElement::shared_pointer const & element) { + + bool notify = false; + { + Lock guard(m_monitorQueueLock); + m_monitorQueue.push(element); + // TODO there is way to much of notification, per each putElement + notify = (m_requestedCount != 0); + } + + // notify + if (notify) + { + Monitor::shared_pointer thisPtr = shared_from_this(); + m_monitorRequester->monitorEvent(thisPtr); + } + } + + virtual void done() { + // TODO + } + +}; + + +class PipelineChannel : + public Channel, + public std::tr1::enable_shared_from_this +{ +private: + + static Status notSupportedStatus; + static Status destroyedStatus; + + AtomicBoolean m_destroyed; + + ChannelProvider::shared_pointer m_provider; + string m_channelName; + ChannelRequester::shared_pointer m_channelRequester; + + PipelineService::shared_pointer m_pipelineService; + +public: + POINTER_DEFINITIONS(PipelineChannel); + + PipelineChannel( + ChannelProvider::shared_pointer const & provider, + string const & channelName, + ChannelRequester::shared_pointer const & channelRequester, + PipelineService::shared_pointer const & pipelineService) : + m_provider(provider), + m_channelName(channelName), + m_channelRequester(channelRequester), + m_pipelineService(pipelineService) + { + } + + virtual ~PipelineChannel() + { + destroy(); + } + + virtual std::tr1::shared_ptr getProvider() + { + return m_provider; + } + + virtual std::string getRemoteAddress() + { + // local + return getChannelName(); + } + + virtual ConnectionState getConnectionState() + { + return isConnected() ? + Channel::CONNECTED : + Channel::DESTROYED; + } + + virtual std::string getChannelName() + { + return m_channelName; + } + + virtual std::tr1::shared_ptr getChannelRequester() + { + return m_channelRequester; + } + + virtual bool isConnected() + { + return !m_destroyed.get(); + } + + + virtual AccessRights getAccessRights(epics::pvData::PVField::shared_pointer const & /*pvField*/) + { + return none; + } + + virtual void getField(GetFieldRequester::shared_pointer const & requester,std::string const & /*subField*/) + { + requester->getDone(notSupportedStatus, epics::pvData::Field::shared_pointer()); + } + + virtual ChannelProcess::shared_pointer createChannelProcess( + ChannelProcessRequester::shared_pointer const & channelProcessRequester, + epics::pvData::PVStructure::shared_pointer const & /*pvRequest*/) + { + ChannelProcess::shared_pointer nullPtr; + channelProcessRequester->channelProcessConnect(notSupportedStatus, nullPtr); + return nullPtr; + } + + virtual ChannelGet::shared_pointer createChannelGet( + ChannelGetRequester::shared_pointer const & channelGetRequester, + epics::pvData::PVStructure::shared_pointer const & /*pvRequest*/) + { + ChannelGet::shared_pointer nullPtr; + channelGetRequester->channelGetConnect(notSupportedStatus, nullPtr, + epics::pvData::Structure::const_shared_pointer()); + return nullPtr; + } + + virtual ChannelPut::shared_pointer createChannelPut( + ChannelPutRequester::shared_pointer const & channelPutRequester, + epics::pvData::PVStructure::shared_pointer const & /*pvRequest*/) + { + ChannelPut::shared_pointer nullPtr; + channelPutRequester->channelPutConnect(notSupportedStatus, nullPtr, + epics::pvData::Structure::const_shared_pointer()); + return nullPtr; + } + + + virtual ChannelPutGet::shared_pointer createChannelPutGet( + ChannelPutGetRequester::shared_pointer const & channelPutGetRequester, + epics::pvData::PVStructure::shared_pointer const & /*pvRequest*/) + { + ChannelPutGet::shared_pointer nullPtr; + epics::pvData::Structure::const_shared_pointer nullStructure; + channelPutGetRequester->channelPutGetConnect(notSupportedStatus, nullPtr, nullStructure, nullStructure); + return nullPtr; + } + + virtual ChannelRPC::shared_pointer createChannelRPC( + ChannelRPCRequester::shared_pointer const & channelRPCRequester, + epics::pvData::PVStructure::shared_pointer const & /*pvRequest*/) + { + ChannelRPC::shared_pointer nullPtr; + channelRPCRequester->channelRPCConnect(notSupportedStatus, nullPtr); + return nullPtr; + } + + virtual epics::pvData::Monitor::shared_pointer createMonitor( + epics::pvData::MonitorRequester::shared_pointer const & monitorRequester, + epics::pvData::PVStructure::shared_pointer const & pvRequest) + { + if (!pvRequest) + throw std::invalid_argument("pvRequest == null"); + + if (m_destroyed.get()) + { + Monitor::shared_pointer nullPtr; + epics::pvData::Structure::const_shared_pointer nullStructure; + monitorRequester->monitorConnect(destroyedStatus, nullPtr, nullStructure); + return nullPtr; + } + + // TODO use std::make_shared + std::tr1::shared_ptr tp( + new ChannelPipelineMonitorImpl(shared_from_this(), monitorRequester, pvRequest, m_pipelineService) + ); + Monitor::shared_pointer ChannelPipelineMonitorImpl = tp; + monitorRequester->monitorConnect(Status::Ok, ChannelPipelineMonitorImpl, tp->getPipelineSession()->getStructure()); + return ChannelPipelineMonitorImpl; + } + + virtual ChannelArray::shared_pointer createChannelArray( + ChannelArrayRequester::shared_pointer const & channelArrayRequester, + epics::pvData::PVStructure::shared_pointer const & /*pvRequest*/) + { + ChannelArray::shared_pointer nullPtr; + channelArrayRequester->channelArrayConnect(notSupportedStatus, nullPtr, epics::pvData::Array::const_shared_pointer()); + return nullPtr; + } + + + virtual void printInfo() + { + printInfo(std::cout); + } + + virtual void printInfo(std::ostream& out) + { + out << "PipelineChannel: "; + out << getChannelName(); + out << " ["; + out << Channel::ConnectionStateNames[getConnectionState()]; + out << "]"; + } + + virtual string getRequesterName() + { + return getChannelName(); + } + + virtual void message(std::string const & message,MessageType messageType) + { + // just delegate + m_channelRequester->message(message, messageType); + } + + virtual void destroy() + { + m_destroyed.set(); + } +}; + +Status PipelineChannel::notSupportedStatus(Status::STATUSTYPE_ERROR, "only monitor (aka pipeline) requests are supported by this channel"); +Status PipelineChannel::destroyedStatus(Status::STATUSTYPE_ERROR, "channel destroyed"); + +Channel::shared_pointer createPipelineChannel(ChannelProvider::shared_pointer const & provider, + std::string const & channelName, + ChannelRequester::shared_pointer const & channelRequester, + PipelineService::shared_pointer const & pipelineService) +{ + // TODO use std::make_shared + std::tr1::shared_ptr tp( + new PipelineChannel(provider, channelName, channelRequester, pipelineService) + ); + Channel::shared_pointer channel = tp; + return channel; +} + + +class PipelineChannelProvider : + public virtual ChannelProvider, + public virtual ChannelFind, + public std::tr1::enable_shared_from_this { + +public: + POINTER_DEFINITIONS(PipelineChannelProvider); + + static string PROVIDER_NAME; + + static Status noSuchChannelStatus; + + // TODO thread pool support + + PipelineChannelProvider() { + } + + virtual string getProviderName() { + return PROVIDER_NAME; + } + + virtual std::tr1::shared_ptr getChannelProvider() + { + return shared_from_this(); + } + + virtual void cancel() {} + + virtual void destroy() {} + + virtual ChannelFind::shared_pointer channelFind(std::string const & channelName, + ChannelFindRequester::shared_pointer const & channelFindRequester) + { + bool found; + { + Lock guard(m_mutex); + found = (m_services.find(channelName) != m_services.end()) || + findWildService(channelName); + } + ChannelFind::shared_pointer thisPtr(shared_from_this()); + channelFindRequester->channelFindResult(Status::Ok, thisPtr, found); + return thisPtr; + } + + + virtual ChannelFind::shared_pointer channelList( + ChannelListRequester::shared_pointer const & channelListRequester) + { + if (!channelListRequester.get()) + throw std::runtime_error("null requester"); + + PVStringArray::svector channelNames; + { + Lock guard(m_mutex); + channelNames.reserve(m_services.size()); + for (PipelineServiceMap::const_iterator iter = m_services.begin(); + iter != m_services.end(); + iter++) + channelNames.push_back(iter->first); + } + + ChannelFind::shared_pointer thisPtr(shared_from_this()); + channelListRequester->channelListResult(Status::Ok, thisPtr, freeze(channelNames), false); + return thisPtr; + } + + virtual Channel::shared_pointer createChannel( + std::string const & channelName, + ChannelRequester::shared_pointer const & channelRequester, + short /*priority*/) + { + PipelineService::shared_pointer service; + + PipelineServiceMap::const_iterator iter; + { + Lock guard(m_mutex); + iter = m_services.find(channelName); + } + if (iter != m_services.end()) + service = iter->second; + + // check for wild services + if (!service) + service = findWildService(channelName); + + if (!service) + { + Channel::shared_pointer nullChannel; + channelRequester->channelCreated(noSuchChannelStatus, nullChannel); + return nullChannel; + } + + // TODO use std::make_shared + std::tr1::shared_ptr tp( + new PipelineChannel( + shared_from_this(), + channelName, + channelRequester, + service)); + Channel::shared_pointer pipelineChannel = tp; + channelRequester->channelCreated(Status::Ok, pipelineChannel); + return pipelineChannel; + } + + virtual Channel::shared_pointer createChannel( + std::string const & /*channelName*/, + ChannelRequester::shared_pointer const & /*channelRequester*/, + short /*priority*/, + std::string const & /*address*/) + { + // this will never get called by the pvAccess server + throw std::runtime_error("not supported"); + } + + void registerService(std::string const & serviceName, PipelineService::shared_pointer const & service) + { + Lock guard(m_mutex); + m_services[serviceName] = service; + + if (isWildcardPattern(serviceName)) + m_wildServices.push_back(std::make_pair(serviceName, service)); + } + + void unregisterService(std::string const & serviceName) + { + Lock guard(m_mutex); + m_services.erase(serviceName); + + if (isWildcardPattern(serviceName)) + { + for (PipelineWildServiceList::iterator iter = m_wildServices.begin(); + iter != m_wildServices.end(); + iter++) + if (iter->first == serviceName) + { + m_wildServices.erase(iter); + break; + } + } + } + +private: + // assumes sync on services + PipelineService::shared_pointer findWildService(string const & wildcard) + { + if (!m_wildServices.empty()) + for (PipelineWildServiceList::iterator iter = m_wildServices.begin(); + iter != m_wildServices.end(); + iter++) + if (Wildcard::wildcardfit(iter->first.c_str(), wildcard.c_str())) + return iter->second; + + return PipelineService::shared_pointer(); + } + + // (too) simple check + bool isWildcardPattern(string const & pattern) + { + return + (pattern.find('*') != string::npos || + pattern.find('?') != string::npos || + (pattern.find('[') != string::npos && pattern.find(']') != string::npos)); + } + + typedef std::map PipelineServiceMap; + PipelineServiceMap m_services; + + typedef std::vector > PipelineWildServiceList; + PipelineWildServiceList m_wildServices; + + epics::pvData::Mutex m_mutex; +}; + +string PipelineChannelProvider::PROVIDER_NAME("PipelineService"); +Status PipelineChannelProvider::noSuchChannelStatus(Status::STATUSTYPE_ERROR, "no such channel"); + + + +class PipelineChannelProviderFactory : public ChannelProviderFactory +{ +public: + POINTER_DEFINITIONS(PipelineChannelProviderFactory); + + PipelineChannelProviderFactory() : + m_channelProviderImpl(new PipelineChannelProvider()) + { + } + + virtual std::string getFactoryName() + { + return PipelineChannelProvider::PROVIDER_NAME; + } + + virtual ChannelProvider::shared_pointer sharedInstance() + { + return m_channelProviderImpl; + } + + virtual ChannelProvider::shared_pointer newInstance() + { + // TODO use std::make_shared + std::tr1::shared_ptr tp(new PipelineChannelProvider()); + ChannelProvider::shared_pointer channelProvider = tp; + return channelProvider; + } + +private: + PipelineChannelProvider::shared_pointer m_channelProviderImpl; +}; + + +PipelineServer::PipelineServer() +{ + // TODO factory is never deregistered, multiple PipelineServer instances create multiple factories, etc. + m_channelProviderFactory.reset(new PipelineChannelProviderFactory()); + registerChannelProviderFactory(m_channelProviderFactory); + + m_channelProviderImpl = m_channelProviderFactory->sharedInstance(); + + m_serverContext = ServerContextImpl::create(); + m_serverContext->setChannelProviderName(m_channelProviderImpl->getProviderName()); + + m_serverContext->initialize(getChannelProviderRegistry()); +} + +PipelineServer::~PipelineServer() +{ + // multiple destroy call is OK + destroy(); +} + +void PipelineServer::printInfo() +{ + std::cout << m_serverContext->getVersion().getVersionString() << std::endl; + m_serverContext->printInfo(); +} + +void PipelineServer::run(int seconds) +{ + m_serverContext->run(seconds); +} + +struct ThreadRunnerParam { + PipelineServer::shared_pointer server; + int timeToRun; +}; + +static void threadRunner(void* usr) +{ + ThreadRunnerParam* pusr = static_cast(usr); + ThreadRunnerParam param = *pusr; + delete pusr; + + param.server->run(param.timeToRun); +} + +/// Method requires usage of std::tr1::shared_ptr. This instance must be +/// owned by a shared_ptr instance. +void PipelineServer::runInNewThread(int seconds) +{ + std::auto_ptr param(new ThreadRunnerParam()); + param->server = shared_from_this(); + param->timeToRun = seconds; + + epicsThreadCreate("PipelineServer thread", + epicsThreadPriorityMedium, + epicsThreadGetStackSize(epicsThreadStackSmall), + threadRunner, param.get()); + + // let the thread delete 'param' + param.release(); +} + +void PipelineServer::destroy() +{ + m_serverContext->destroy(); +} + +void PipelineServer::registerService(std::string const & serviceName, PipelineService::shared_pointer const & service) +{ + std::tr1::dynamic_pointer_cast(m_channelProviderImpl)->registerService(serviceName, service); +} + +void PipelineServer::unregisterService(std::string const & serviceName) +{ + std::tr1::dynamic_pointer_cast(m_channelProviderImpl)->unregisterService(serviceName); +} + +}} diff --git a/src/pipelineService/pipelineServer.h b/src/pipelineService/pipelineServer.h new file mode 100644 index 0000000..222afea --- /dev/null +++ b/src/pipelineService/pipelineServer.h @@ -0,0 +1,74 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvAccessCPP is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#ifndef PIPELINESERVER_H +#define PIPELINESERVER_H + +#ifdef epicsExportSharedSymbols +# define pipelineServerEpicsExportSharedSymbols +# undef epicsExportSharedSymbols +#endif + +#include + +#ifdef pipelineServerEpicsExportSharedSymbols +# define epicsExportSharedSymbols +# undef pipelineServerEpicsExportSharedSymbols +#endif + +#include +#include +#include + +#include + +namespace epics { namespace pvAccess { + +class epicsShareClass PipelineServer : + public std::tr1::enable_shared_from_this +{ + private: + + ServerContextImpl::shared_pointer m_serverContext; + ChannelProviderFactory::shared_pointer m_channelProviderFactory; + ChannelProvider::shared_pointer m_channelProviderImpl; + + // TODO no thread poll implementation + + public: + POINTER_DEFINITIONS(PipelineServer); + + PipelineServer(); + + virtual ~PipelineServer(); + + void registerService(std::string const & serviceName, PipelineService::shared_pointer const & service); + + void unregisterService(std::string const & serviceName); + + void run(int seconds = 0); + + /// Method requires usage of std::tr1::shared_ptr. This instance must be + /// owned by a shared_ptr instance. + void runInNewThread(int seconds = 0); + + void destroy(); + + /** + * Display basic information about the context. + */ + void printInfo(); + +}; + +epicsShareExtern Channel::shared_pointer createPipelineChannel(ChannelProvider::shared_pointer const & provider, + std::string const & channelName, + ChannelRequester::shared_pointer const & channelRequester, + PipelineService::shared_pointer const & pipelineService); + +}} + +#endif /* PIPELINESERVER_H */ diff --git a/src/pipelineService/pipelineService.cpp b/src/pipelineService/pipelineService.cpp new file mode 100644 index 0000000..3ae6938 --- /dev/null +++ b/src/pipelineService/pipelineService.cpp @@ -0,0 +1,8 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvAccessCPP is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#define epicsExportSharedSymbols +#include diff --git a/src/pipelineService/pipelineService.h b/src/pipelineService/pipelineService.h new file mode 100644 index 0000000..6e4a394 --- /dev/null +++ b/src/pipelineService/pipelineService.h @@ -0,0 +1,101 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvAccessCPP is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#ifndef PIPELINESERVICE_H +#define PIPELINESERVICE_H + +#include + +#ifdef epicsExportSharedSymbols +# define pipelineServiceEpicsExportSharedSymbols +# undef epicsExportSharedSymbols +#endif + +#include +#include + +#ifdef pipelineServiceEpicsExportSharedSymbols +# define epicsExportSharedSymbols +# undef pipelineServiceEpicsExportSharedSymbols +#endif + +#include + +#include + +namespace epics { namespace pvAccess { + +class epicsShareClass PipelineControl +{ +public: + POINTER_DEFINITIONS(PipelineControl); + + virtual ~PipelineControl() {}; + + /// Number of free elements in the local queue. + /// A service can (should) full up the entire queue. + virtual size_t getFreeElementCount() = 0; + + /// Total count of requested elements. + /// This is the minimum element count that a service should provide. + virtual size_t getRequestedCount() = 0; + + /// Grab next free element. + /// A service should take this element, populate it with the data + /// and return it back by calling putElement(). + virtual epics::pvData::MonitorElement::shared_pointer getFreeElement() = 0; + + /// Put element on the local queue (an element to be sent to a client). + virtual void putElement(epics::pvData::MonitorElement::shared_pointer const & element) = 0; + + /// Call to notify that there is no more data to pipelined. + /// This call destroyes corresponding pipeline session. + virtual void done() = 0; + +}; + + +class epicsShareClass PipelineSession +{ +public: + POINTER_DEFINITIONS(PipelineSession); + + virtual ~PipelineSession() {}; + + /// Returns (minimum) local queue size. + /// Actual local queue size = max( getMinQueueSize(), client queue size ); + virtual size_t getMinQueueSize() const = 0; + + /// Description of the structure used by this session. + virtual epics::pvData::Structure::const_shared_pointer getStructure() const = 0; + + /// Request for additional (!) elementCount elements. + /// The service should eventually call PipelineControl.getFreeElement() and PipelineControl.putElement() + /// to provide [PipelineControl.getRequestedCount(), PipelineControl.getFreeElementCount()] elements. + virtual void request(PipelineControl::shared_pointer const & control, size_t elementCount) = 0; + + /// Cancel the session (called by the client). + virtual void cancel() = 0; +}; + + +class epicsShareClass PipelineService +{ +public: + POINTER_DEFINITIONS(PipelineService); + + virtual ~PipelineService() {}; + + virtual PipelineSession::shared_pointer createPipeline( + epics::pvData::PVStructure::shared_pointer const & pvRequest + ) = 0; + +}; + + +}} + +#endif /* PIPELINESERVICE_H */ diff --git a/src/remoteClient/clientContextImpl.cpp b/src/remoteClient/clientContextImpl.cpp index 27e2ac6..651d514 100644 --- a/src/remoteClient/clientContextImpl.cpp +++ b/src/remoteClient/clientContextImpl.cpp @@ -2061,16 +2061,17 @@ namespace epics { class MonitorStrategy : public Monitor { public: virtual ~MonitorStrategy() {}; - virtual void init(StructureConstPtr const & structure) = 0; - virtual void response(Transport::shared_pointer const & transport, ByteBuffer* payloadBuffer) = 0; - }; - + virtual void init(StructureConstPtr const & structure) = 0; + virtual void response(Transport::shared_pointer const & transport, ByteBuffer* payloadBuffer) = 0; + }; + typedef vector FreeElementQueue; typedef queue MonitorElementQueue; class MonitorStrategyQueue : public MonitorStrategy, + public TransportSender, public std::tr1::enable_shared_from_this { private: @@ -2096,16 +2097,26 @@ namespace epics { PVStructure::shared_pointer m_up2datePVStructure; - public: + int32 m_releasedCount; + bool m_reportQueueStateInProgress; - MonitorStrategyQueue(MonitorRequester::shared_pointer const & callback, int32 queueSize) : + // TODO check for cyclic-ref + ChannelImpl::shared_pointer m_channel; + pvAccessID m_ioid; + public: + + MonitorStrategyQueue(ChannelImpl::shared_pointer channel, pvAccessID ioid, + MonitorRequester::shared_pointer const & callback, int32 queueSize) : m_queueSize(queueSize), m_lastStructure(), m_freeQueue(), m_monitorQueue(), m_callback(callback), m_mutex(), m_bitSet1(), m_bitSet2(), m_overrunInProgress(false), - m_nullMonitorElement() - { + m_nullMonitorElement(), + m_releasedCount(0), + m_reportQueueStateInProgress(false), + m_channel(channel), m_ioid(ioid) + { if (queueSize <= 1) throw std::invalid_argument("queueSize <= 1"); @@ -2121,6 +2132,9 @@ namespace epics { virtual void init(StructureConstPtr const & structure) { Lock guard(m_mutex); + m_releasedCount = 0; + m_reportQueueStateInProgress = false; + // reuse on reconnect if (m_lastStructure.get() == 0 || *(m_lastStructure.get()) == *(structure.get())) @@ -2318,6 +2332,8 @@ namespace epics { // NOTE: a client must always call poll() after release() to check the presence of any new monitor elements virtual void release(MonitorElement::shared_pointer const & monitorElement) { + bool sendAck = false; + { Lock guard(m_mutex); m_freeQueue.push_back(monitorElement); @@ -2334,6 +2350,58 @@ namespace epics { m_overrunElement.reset(); m_overrunInProgress = false; } + + m_releasedCount++; + // TODO limit reporting back? + if (!m_reportQueueStateInProgress) + { + sendAck = true; + m_reportQueueStateInProgress = true; + } + } + + if (sendAck) + { + try + { + m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this()); + } catch (...) { + // noop (do not complain if fails) + m_reportQueueStateInProgress = false; + } + } + } + + virtual void reportRemoteQueueStatus(int32 /*freeElements*/) + { + // noop for the client + } + + virtual void send(ByteBuffer* buffer, TransportSendControl* control) { + control->startMessage((int8)CMD_MONITOR, 9); + buffer->putInt(m_channel->getServerChannelID()); + buffer->putInt(m_ioid); + buffer->putByte((int8)QOS_GET_PUT); + + { + Lock guard(m_mutex); + buffer->putInt(m_releasedCount); + m_releasedCount = 0; + m_reportQueueStateInProgress = false; + } + + // immediate send + control->flush(true); + } + + virtual void lock() + { + // noop + } + + virtual void unlock() + { + // noop } Status start() { @@ -2377,6 +2445,9 @@ namespace epics { std::tr1::shared_ptr m_monitorStrategy; + int32 m_queueSize; + bool m_pipeline; + ChannelMonitorImpl( ChannelImpl::shared_pointer const & channel, MonitorRequester::shared_pointer const & monitorRequester, @@ -2385,7 +2456,9 @@ namespace epics { BaseRequestImpl(channel, monitorRequester), m_monitorRequester(monitorRequester), m_started(false), - m_pvRequest(pvRequest) + m_pvRequest(pvRequest), + m_queueSize(0), + m_pipeline(false) { PVACCESS_REFCOUNT_MONITOR_CONSTRUCT(channelMonitor); } @@ -2399,7 +2472,7 @@ namespace epics { return; } - int queueSize = 2; + m_queueSize = 2; PVFieldPtr pvField = m_pvRequest->getSubField("record._options"); if (pvField.get()) { PVStructurePtr pvOptions = static_pointer_cast(pvField); @@ -2411,15 +2484,18 @@ namespace epics { std::stringstream ss; ss << pvString->get(); ss >> size; - queueSize = size; + m_queueSize = size; } } + PVStringPtr pvString = pvOptions->getSubField("pipeline"); + if (pvString) + m_pipeline = (pvString->get() == "true"); } - - BaseRequestImpl::activate(); - if (queueSize<2) queueSize = 2; - std::tr1::shared_ptr tp(new MonitorStrategyQueue(m_monitorRequester, queueSize)); + BaseRequestImpl::activate(); + + if (m_queueSize < 2) m_queueSize = 2; + std::tr1::shared_ptr tp(new MonitorStrategyQueue(m_channel, m_ioid, m_monitorRequester, m_queueSize)); m_monitorStrategy = tp; // subscribe @@ -2432,6 +2508,16 @@ namespace epics { } } + // override default impl. to provide pipeline QoS flag + virtual void resubscribeSubscription(Transport::shared_pointer const & transport) { + if (transport.get() != 0 && !m_subscribed.get() && + startRequest(m_pipeline ? (QOS_INIT | QOS_GET_PUT) : QOS_INIT)) + { + m_subscribed.set(); + transport->enqueueSendRequest(shared_from_this()); + } + } + public: static Monitor::shared_pointer create( ChannelImpl::shared_pointer const & channel, @@ -2471,6 +2557,12 @@ namespace epics { { // pvRequest SerializationHelper::serializePVRequest(buffer, control, m_pvRequest); + + // if streaming + if (pendingRequest & QOS_GET_PUT) + { + buffer->putInt(m_queueSize); + } } stopRequest(); @@ -2629,6 +2721,11 @@ namespace epics { m_monitorStrategy->release(monitorElement); } + virtual void reportRemoteQueueStatus(int32 freeElements) + { + m_monitorStrategy->reportRemoteQueueStatus(freeElements); + } + virtual void lock() { // noop diff --git a/src/server/responseHandlers.cpp b/src/server/responseHandlers.cpp index 72df798..f5a758e 100644 --- a/src/server/responseHandlers.cpp +++ b/src/server/responseHandlers.cpp @@ -1875,20 +1875,39 @@ void ServerMonitorHandler::handleResponse(osiSockAddr* responseFrom, // create... ServerMonitorRequesterImpl::create(_context, channel, ioid, transport, pvRequest); + + // pipelining monitor (i.e. w/ flow control) + const bool ack = (QOS_GET_PUT & qosCode) != 0; + if (ack) + { + int32 nfree = payloadBuffer->getInt(); + ServerMonitorRequesterImpl::shared_pointer request = static_pointer_cast(channel->getRequest(ioid)); + request->getChannelMonitor()->reportRemoteQueueStatus(nfree); + } + } else { - const bool lastRequest = (QOS_DESTROY & qosCode) != 0; - const bool get = (QOS_GET & qosCode) != 0; - const bool process = (QOS_PROCESS & qosCode) != 0; + const bool lastRequest = (QOS_DESTROY & qosCode) != 0; + const bool get = (QOS_GET & qosCode) != 0; + const bool process = (QOS_PROCESS & qosCode) != 0; + const bool ack = (QOS_GET_PUT & qosCode) != 0; - ServerMonitorRequesterImpl::shared_pointer request = static_pointer_cast(channel->getRequest(ioid)); + ServerMonitorRequesterImpl::shared_pointer request = static_pointer_cast(channel->getRequest(ioid)); if (!request.get()) { BaseChannelRequester::sendFailureMessage((int8)CMD_MONITOR, transport, ioid, qosCode, BaseChannelRequester::badIOIDStatus); return; } + if (ack) + { + int32 nfree = payloadBuffer->getInt(); + request->getChannelMonitor()->reportRemoteQueueStatus(nfree); + return; + // note: not possible to ack and destroy + } + /* if (!request->startRequest(qosCode)) { @@ -1946,7 +1965,7 @@ MonitorRequester::shared_pointer ServerMonitorRequesterImpl::create( void ServerMonitorRequesterImpl::activate(PVStructure::shared_pointer const & pvRequest) { - startRequest(QOS_INIT); + startRequest(QOS_INIT); MonitorRequester::shared_pointer thisPointer = shared_from_this(); Destroyable::shared_pointer thisDestroyable = shared_from_this(); _channel->registerRequest(_ioid, thisDestroyable); diff --git a/testApp/remote/Makefile b/testApp/remote/Makefile index 9ef227e..a161fab 100644 --- a/testApp/remote/Makefile +++ b/testApp/remote/Makefile @@ -44,3 +44,7 @@ rpcWildServiceExample_SRCS += rpcWildServiceExample.cpp PROD_HOST += rpcClientExample rpcClientExample_SRCS += rpcClientExample.cpp + +PROD_HOST += pipelineServiceExample +pipelineServiceExample_SRCS += pipelineServiceExample.cpp + diff --git a/testApp/remote/pipelineServiceExample.cpp b/testApp/remote/pipelineServiceExample.cpp new file mode 100644 index 0000000..1fcfe21 --- /dev/null +++ b/testApp/remote/pipelineServiceExample.cpp @@ -0,0 +1,81 @@ +/** + * Copyright - See the COPYRIGHT that is included with this distribution. + * pvAccessCPP is distributed subject to a Software License Agreement found + * in file LICENSE that is included with this distribution. + */ + +#include +#include + +using namespace epics::pvData; +using namespace epics::pvAccess; + +static Structure::const_shared_pointer dataStructure = + getFieldCreate()->createFieldBuilder()-> + add("count", pvInt)-> + createStructure(); + +class PipelineSessionImpl : + public PipelineSession +{ +public: + + PipelineSessionImpl( + epics::pvData::PVStructure::shared_pointer const & /*pvRequest*/ + ) : + m_counter(0) + { + } + + size_t getMinQueueSize() const { + return 16; //1024; + } + + Structure::const_shared_pointer getStructure() const { + return dataStructure; + } + + virtual void request(PipelineControl::shared_pointer const & control, size_t elementCount) { + // blocking in this call is not a good thing + // but generating a simple counter data is fast + // we will generate as much elements as we can + size_t count = control->getFreeElementCount(); + for (size_t i = 0; i < count; i++) { + MonitorElement::shared_pointer element = control->getFreeElement(); + element->pvStructurePtr->getSubField(1 /*"count"*/)->put(m_counter++); + control->putElement(element); + } + } + + virtual void cancel() { + // noop, no need to clean any data-source resources + } + +private: + // NOTE: all the request calls will be made from the same thread, so we do not need sync m_counter + int32 m_counter; +}; + +class PipelineServiceImpl : + public PipelineService +{ + PipelineSession::shared_pointer createPipeline( + epics::pvData::PVStructure::shared_pointer const & pvRequest + ) + { + return PipelineSession::shared_pointer(new PipelineSessionImpl(pvRequest)); + } +}; + +int main() +{ + PipelineServer server; + + server.registerService("counterPipe", PipelineService::shared_pointer(new PipelineServiceImpl())); + // you can register as many services as you want here ... + + server.printInfo(); + server.run(); + + return 0; +} From b381d84e645ee38fe52e98ff8b4dd05303284d26 Mon Sep 17 00:00:00 2001 From: Matej Sekoranja Date: Wed, 6 May 2015 11:08:23 +0200 Subject: [PATCH 2/3] cancel(), done(), ackAny param --- .gitignore | 4 + pvtoolsSrc/eget.cpp | 6 +- src/pipelineService/pipelineServer.cpp | 126 ++++++++++++++------ src/remoteClient/clientContextImpl.cpp | 133 ++++++++++++++++------ src/server/responseHandlers.cpp | 26 ++++- src/server/responseHandlers.h | 1 + testApp/remote/pipelineServiceExample.cpp | 23 +++- 7 files changed, 245 insertions(+), 74 deletions(-) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..609e229 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +bin +lib +include +dbd diff --git a/pvtoolsSrc/eget.cpp b/pvtoolsSrc/eget.cpp index fc5ae1d..53a6770 100644 --- a/pvtoolsSrc/eget.cpp +++ b/pvtoolsSrc/eget.cpp @@ -1422,7 +1422,9 @@ class MonitorRequesterImpl : public MonitorRequester virtual void unlisten(Monitor::shared_pointer const & /*monitor*/) { - std::cerr << "unlisten" << std::endl; + //std::cerr << "unlisten" << std::endl; + // TODO + epicsExit(0); } }; @@ -1747,7 +1749,7 @@ int main (int argc, char *argv[]) fprintf(stderr, "failed to parse request string\n"); return 1; } - + // register "pva" and "ca" providers ClientFactory::start(); epics::pvAccess::ca::CAClientFactory::start(); diff --git a/src/pipelineService/pipelineServer.cpp b/src/pipelineService/pipelineServer.cpp index 94caf92..ca88a29 100644 --- a/src/pipelineService/pipelineServer.cpp +++ b/src/pipelineService/pipelineServer.cpp @@ -40,11 +40,17 @@ class ChannelPipelineMonitorImpl : Mutex m_freeQueueLock; Mutex m_monitorQueueLock; - AtomicBoolean m_active; + bool m_active; MonitorElement::shared_pointer m_nullMonitorElement; size_t m_requestedCount; + bool m_pipeline; + + bool m_done; + + bool m_unlistenReported; + public: ChannelPipelineMonitorImpl( Channel::shared_pointer const & channel, @@ -56,28 +62,30 @@ class ChannelPipelineMonitorImpl : m_queueSize(2), m_freeQueueLock(), m_monitorQueueLock(), - m_active(), - m_requestedCount(0) + m_active(false), + m_requestedCount(0), + m_pipeline(false), + m_done(false), + m_unlistenReported(false) { m_pipelineSession = pipelineService->createPipeline(pvRequest); - // extract queueSize parameter - PVFieldPtr pvField = pvRequest->getSubField("record._options"); - if (pvField.get()) { - PVStructurePtr pvOptions = static_pointer_cast(pvField); - pvField = pvOptions->getSubField("queueSize"); - if (pvField.get()) { - PVStringPtr pvString = pvOptions->getStringField("queueSize"); - if(pvString) { - int32 size; - std::stringstream ss; - ss << pvString->get(); - ss >> size; - if (size < 2) size = 2; + // extract queueSize and pipeline parameter + PVStructurePtr pvOptions = pvRequest->getSubField("record._options"); + if (pvOptions) { + PVStringPtr pvString = pvOptions->getSubField("queueSize"); + if (pvString) { + int32 size; + std::stringstream ss; + ss << pvString->get(); + ss >> size; + if (size > 1) m_queueSize = static_cast(size); - } } + pvString = pvOptions->getSubField("pipeline"); + if (pvString) + m_pipeline = (pvString->get() == "true"); } // server queue size must be >= client queue size @@ -106,6 +114,10 @@ class ChannelPipelineMonitorImpl : return m_pipelineSession; } + bool isPipelineEnabled() const { + return m_pipeline; + } + virtual ~ChannelPipelineMonitorImpl() { destroy(); @@ -113,15 +125,17 @@ class ChannelPipelineMonitorImpl : virtual Status start() { - // already started - if (m_active.get()) - return Status::Ok; + bool notify = false; + { + Lock guard(m_monitorQueueLock); - m_active.set(); + // already started + if (m_active) + return Status::Ok; + m_active = true; - m_monitorQueueLock.lock(); - bool notify = (m_monitorQueue.size() != 0); - m_monitorQueueLock.unlock(); + notify = (m_monitorQueue.size() != 0); + } if (notify) { @@ -134,7 +148,8 @@ class ChannelPipelineMonitorImpl : virtual Status stop() { - m_active.clear(); + Lock guard(m_monitorQueueLock); + m_active = false; return Status::Ok; } @@ -145,8 +160,19 @@ class ChannelPipelineMonitorImpl : // do not give send more elements than m_requestedCount // even if m_monitorQueue is not empty - if (m_monitorQueue.empty() || m_requestedCount == 0) + bool emptyQueue = m_monitorQueue.empty(); + if (emptyQueue || m_requestedCount == 0 || !m_active) + { + // report "unlisten" event if queue empty and done, release lock first + if (!m_unlistenReported && m_done && emptyQueue) + { + m_unlistenReported = true; + guard.unlock(); + m_monitorRequester->unlisten(shared_from_this()); + } + return m_nullMonitorElement; + } MonitorElement::shared_pointer element = m_monitorQueue.front(); m_monitorQueue.pop(); @@ -173,7 +199,7 @@ class ChannelPipelineMonitorImpl : { Lock guard(m_monitorQueueLock); m_requestedCount += count; - notify = (m_monitorQueue.size() != 0); + notify = m_active && (m_monitorQueue.size() != 0); } // notify @@ -189,7 +215,17 @@ class ChannelPipelineMonitorImpl : virtual void destroy() { - stop(); + bool notifyCancel = false; + + { + Lock guard(m_monitorQueueLock); + m_active = false; + notifyCancel = !m_done; + m_done = true; + } + + if (notifyCancel) + m_pipelineSession->cancel(); } virtual void lock() @@ -229,6 +265,10 @@ class ChannelPipelineMonitorImpl : bool notify = false; { Lock guard(m_monitorQueueLock); + if (m_done) + return; + // throw std::logic_error("putElement called after done"); + m_monitorQueue.push(element); // TODO there is way to much of notification, per each putElement notify = (m_requestedCount != 0); @@ -243,7 +283,17 @@ class ChannelPipelineMonitorImpl : } virtual void done() { - // TODO + Lock guard(m_monitorQueueLock); + m_done = true; + + bool report = !m_unlistenReported && m_monitorQueue.empty(); + if (report) + m_unlistenReported = true; + + guard.unlock(); + + if (report) + m_monitorRequester->unlisten(shared_from_this()); } }; @@ -398,9 +448,21 @@ public: std::tr1::shared_ptr tp( new ChannelPipelineMonitorImpl(shared_from_this(), monitorRequester, pvRequest, m_pipelineService) ); - Monitor::shared_pointer ChannelPipelineMonitorImpl = tp; - monitorRequester->monitorConnect(Status::Ok, ChannelPipelineMonitorImpl, tp->getPipelineSession()->getStructure()); - return ChannelPipelineMonitorImpl; + Monitor::shared_pointer channelPipelineMonitorImpl = tp; + + if (tp->isPipelineEnabled()) + { + monitorRequester->monitorConnect(Status::Ok, channelPipelineMonitorImpl, tp->getPipelineSession()->getStructure()); + return channelPipelineMonitorImpl; + } + else + { + Monitor::shared_pointer nullPtr; + epics::pvData::Structure::const_shared_pointer nullStructure; + Status noPipelineEnabledStatus(Status::STATUSTYPE_ERROR, "pipeline option not enabled, use e.g. 'record[queueSize=16,pipeline=true]field(value)' pvRequest to enable pipelining"); + monitorRequester->monitorConnect(noPipelineEnabledStatus, nullPtr, nullStructure); + return nullPtr; + } } virtual ChannelArray::shared_pointer createChannelArray( diff --git a/src/remoteClient/clientContextImpl.cpp b/src/remoteClient/clientContextImpl.cpp index 651d514..bc3cf5b 100644 --- a/src/remoteClient/clientContextImpl.cpp +++ b/src/remoteClient/clientContextImpl.cpp @@ -2063,6 +2063,7 @@ namespace epics { virtual ~MonitorStrategy() {}; virtual void init(StructureConstPtr const & structure) = 0; virtual void response(Transport::shared_pointer const & transport, ByteBuffer* payloadBuffer) = 0; + virtual void unlisten() = 0; }; typedef vector FreeElementQueue; @@ -2103,10 +2104,18 @@ namespace epics { // TODO check for cyclic-ref ChannelImpl::shared_pointer m_channel; pvAccessID m_ioid; + + bool m_pipeline; + int32 m_ackAny; + + bool m_unlisten; + public: MonitorStrategyQueue(ChannelImpl::shared_pointer channel, pvAccessID ioid, - MonitorRequester::shared_pointer const & callback, int32 queueSize) : + MonitorRequester::shared_pointer const & callback, + int32 queueSize, + bool pipeline, int32 ackAny) : m_queueSize(queueSize), m_lastStructure(), m_freeQueue(), m_monitorQueue(), @@ -2115,7 +2124,9 @@ namespace epics { m_nullMonitorElement(), m_releasedCount(0), m_reportQueueStateInProgress(false), - m_channel(channel), m_ioid(ioid) + m_channel(channel), m_ioid(ioid), + m_pipeline(pipeline), m_ackAny(ackAny), + m_unlisten(false) { if (queueSize <= 1) throw std::invalid_argument("queueSize <= 1"); @@ -2318,12 +2329,33 @@ namespace epics { } } + virtual void unlisten() + { + bool notifyUnlisten = false; + { + Lock guard(m_mutex); + notifyUnlisten = m_monitorQueue.empty(); + m_unlisten = !notifyUnlisten; + } + + if (notifyUnlisten) + { + EXCEPTION_GUARD(m_callback->unlisten(shared_from_this())); + } + } virtual MonitorElement::shared_pointer poll() { Lock guard(m_mutex); - if (m_monitorQueue.empty()) + if (m_monitorQueue.empty()) { + + if (m_unlisten) { + m_unlisten = false; + guard.unlock(); + EXCEPTION_GUARD(m_callback->unlisten(shared_from_this())); + } return m_nullMonitorElement; + } MonitorElement::shared_pointer retVal = m_monitorQueue.front(); m_monitorQueue.pop(); @@ -2351,25 +2383,27 @@ namespace epics { m_overrunInProgress = false; } - m_releasedCount++; - // TODO limit reporting back? - if (!m_reportQueueStateInProgress) + if (m_pipeline) { - sendAck = true; - m_reportQueueStateInProgress = true; - } - } - - if (sendAck) - { - try - { - m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this()); - } catch (...) { - // noop (do not complain if fails) - m_reportQueueStateInProgress = false; + m_releasedCount++; + if (!m_reportQueueStateInProgress && m_releasedCount >= m_ackAny) + { + sendAck = true; + m_reportQueueStateInProgress = true; + } } - } + + if (sendAck) + { + try + { + m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this()); + } catch (...) { + // noop (do not complain if fails) + m_reportQueueStateInProgress = false; + } + } + } } virtual void reportRemoteQueueStatus(int32 /*freeElements*/) @@ -2447,6 +2481,7 @@ namespace epics { int32 m_queueSize; bool m_pipeline; + int32 m_ackAny; ChannelMonitorImpl( ChannelImpl::shared_pointer const & channel, @@ -2457,8 +2492,9 @@ namespace epics { m_monitorRequester(monitorRequester), m_started(false), m_pvRequest(pvRequest), - m_queueSize(0), - m_pipeline(false) + m_queueSize(2), + m_pipeline(false), + m_ackAny(0) { PVACCESS_REFCOUNT_MONITOR_CONSTRUCT(channelMonitor); } @@ -2472,30 +2508,45 @@ namespace epics { return; } - m_queueSize = 2; - PVFieldPtr pvField = m_pvRequest->getSubField("record._options"); - if (pvField.get()) { - PVStructurePtr pvOptions = static_pointer_cast(pvField); - pvField = pvOptions->getSubField("queueSize"); - if (pvField.get()) { - PVStringPtr pvString = pvOptions->getStringField("queueSize"); - if(pvString) { + PVStructurePtr pvOptions = m_pvRequest->getSubField("record._options"); + if (pvOptions) { + PVStringPtr pvString = pvOptions->getSubField("queueSize"); + if (pvString) { + int32 size; + std::stringstream ss; + ss << pvString->get(); + ss >> size; + if (size > 1) + m_queueSize = size; + } + pvString = pvOptions->getSubField("pipeline"); + if (pvString) + m_pipeline = (pvString->get() == "true"); + + // pipeline options + if (m_pipeline) + { + // defaults to queueSize/2 + m_ackAny = m_queueSize/2; + + pvString = pvOptions->getSubField("ackAny"); + if (pvString) { int32 size; std::stringstream ss; ss << pvString->get(); ss >> size; - m_queueSize = size; + if (size > 0) + m_ackAny = size; } } - PVStringPtr pvString = pvOptions->getSubField("pipeline"); - if (pvString) - m_pipeline = (pvString->get() == "true"); } BaseRequestImpl::activate(); - if (m_queueSize < 2) m_queueSize = 2; - std::tr1::shared_ptr tp(new MonitorStrategyQueue(m_channel, m_ioid, m_monitorRequester, m_queueSize)); + std::tr1::shared_ptr tp( + new MonitorStrategyQueue(m_channel, m_ioid, m_monitorRequester, m_queueSize, + m_pipeline, m_ackAny) + ); m_monitorStrategy = tp; // subscribe @@ -2609,6 +2660,16 @@ namespace epics { { // TODO not supported by IF yet... } + else if (qos & QOS_DESTROY) + { + // TODO for now status is ignored + + if (payloadBuffer->getRemaining()) + m_monitorStrategy->response(transport, payloadBuffer); + + // unlisten will be called when all the elements in the queue gets processed + m_monitorStrategy->unlisten(); + } else { m_monitorStrategy->response(transport, payloadBuffer); diff --git a/src/server/responseHandlers.cpp b/src/server/responseHandlers.cpp index f5a758e..8773b5e 100644 --- a/src/server/responseHandlers.cpp +++ b/src/server/responseHandlers.cpp @@ -1948,7 +1948,7 @@ void ServerMonitorHandler::handleResponse(osiSockAddr* responseFrom, ServerMonitorRequesterImpl::ServerMonitorRequesterImpl( ServerContextImpl::shared_pointer const & context, ServerChannelImpl::shared_pointer const & channel, const pvAccessID ioid, Transport::shared_pointer const & transport): - BaseChannelRequester(context, channel, ioid, transport), _channelMonitor(), _structure() + BaseChannelRequester(context, channel, ioid, transport), _channelMonitor(), _structure(), _unlisten(false) { } @@ -1992,7 +1992,12 @@ void ServerMonitorRequesterImpl::monitorConnect(const Status& status, Monitor::s void ServerMonitorRequesterImpl::unlisten(Monitor::shared_pointer const & /*monitor*/) { - //TODO + { + Lock guard(_mutex); + _unlisten = true; + } + TransportSender::shared_pointer thisSender = shared_from_this(); + _transport->enqueueSendRequest(thisSender); } void ServerMonitorRequesterImpl::monitorEvent(Monitor::shared_pointer const & /*monitor*/) @@ -2110,6 +2115,23 @@ void ServerMonitorRequesterImpl::send(ByteBuffer* buffer, TransportSendControl* TransportSender::shared_pointer thisSender = shared_from_this(); _transport->enqueueSendRequest(thisSender); } + else + { + // TODO CAS + bool unlisten; + Lock guard(_mutex); + unlisten = _unlisten; + _unlisten = false; + guard.unlock(); + + if (unlisten) + { + control->startMessage((int8)CMD_MONITOR, sizeof(int32)/sizeof(int8) + 1); + buffer->putInt(_ioid); + buffer->putByte((int8)QOS_DESTROY); + Status::Ok.serialize(buffer, control); + } + } } } diff --git a/src/server/responseHandlers.h b/src/server/responseHandlers.h index 9309670..fa8eef7 100644 --- a/src/server/responseHandlers.h +++ b/src/server/responseHandlers.h @@ -542,6 +542,7 @@ namespace pvAccess { epics::pvData::Monitor::shared_pointer _channelMonitor; epics::pvData::StructureConstPtr _structure; epics::pvData::Status _status; + bool _unlisten; }; diff --git a/testApp/remote/pipelineServiceExample.cpp b/testApp/remote/pipelineServiceExample.cpp index 1fcfe21..547b24a 100644 --- a/testApp/remote/pipelineServiceExample.cpp +++ b/testApp/remote/pipelineServiceExample.cpp @@ -21,10 +21,21 @@ class PipelineSessionImpl : public: PipelineSessionImpl( - epics::pvData::PVStructure::shared_pointer const & /*pvRequest*/ + epics::pvData::PVStructure::shared_pointer const & pvRequest ) : - m_counter(0) + m_counter(0), + m_max(0) { + PVStructure::shared_pointer pvOptions = pvRequest->getSubField("record._options"); + if (pvOptions) { + PVString::shared_pointer pvString = pvOptions->getSubField("limit"); + if (pvString) + { + // note: this throws an exception if conversion fails + m_max = pvString->getAs(); + } + } + } size_t getMinQueueSize() const { @@ -44,6 +55,13 @@ public: MonitorElement::shared_pointer element = control->getFreeElement(); element->pvStructurePtr->getSubField(1 /*"count"*/)->put(m_counter++); control->putElement(element); + + // we reached the limit, no more data + if (m_max != 0 && m_counter == m_max) + { + control->done(); + break; + } } } @@ -54,6 +72,7 @@ public: private: // NOTE: all the request calls will be made from the same thread, so we do not need sync m_counter int32 m_counter; + int32 m_max; }; class PipelineServiceImpl : From 5884d5b778e0cdd0e49a8dcc88575df85c5d78d1 Mon Sep 17 00:00:00 2001 From: Matej Sekoranja Date: Thu, 15 Oct 2015 20:59:15 +0200 Subject: [PATCH 3/3] PipelineMonitor instead of epics::pvData::Monitor --- src/ca/caChannel.cpp | 6 ----- src/ca/caChannel.h | 1 - src/client/pvAccess.h | 17 ++++++++++++++ src/mb/pvAccessMB.h | 31 ++++++++++++++++++++++++++ src/pipelineService/pipelineServer.cpp | 2 +- src/remoteClient/clientContextImpl.cpp | 10 --------- src/server/responseHandlers.cpp | 11 +++++++-- 7 files changed, 58 insertions(+), 20 deletions(-) diff --git a/src/ca/caChannel.cpp b/src/ca/caChannel.cpp index f578de8..4cfeea2 100644 --- a/src/ca/caChannel.cpp +++ b/src/ca/caChannel.cpp @@ -1444,12 +1444,6 @@ void CAChannelMonitor::release(epics::pvData::MonitorElementPtr const & /*monito // noop } -void CAChannelMonitor::reportRemoteQueueStatus(int32 /*freeElements*/) -{ - // noop -} - - /* --------------- epics::pvData::ChannelRequest --------------- */ void CAChannelMonitor::cancel() diff --git a/src/ca/caChannel.h b/src/ca/caChannel.h index 7111932..ebeb437 100644 --- a/src/ca/caChannel.h +++ b/src/ca/caChannel.h @@ -264,7 +264,6 @@ public: virtual epics::pvData::Status stop(); virtual epics::pvData::MonitorElementPtr poll(); virtual void release(epics::pvData::MonitorElementPtr const & monitorElement); - virtual void reportRemoteQueueStatus(epics::pvData::int32 freeElements); /* --------------- epics::pvData::ChannelRequest --------------- */ diff --git a/src/client/pvAccess.h b/src/client/pvAccess.h index 4347566..3b0b1bb 100644 --- a/src/client/pvAccess.h +++ b/src/client/pvAccess.h @@ -941,6 +941,23 @@ namespace pvAccess { epicsShareExtern void unregisterChannelProviderFactory(ChannelProviderFactory::shared_pointer const & channelProviderFactory); + /** + * @brief Pipeline (streaming) support API (optional). + * This is used by pvAccess to implement pipeline (streaming) monitors. + */ + class epicsShareClass PipelineMonitor : public virtual epics::pvData::Monitor { + public: + POINTER_DEFINITIONS(PipelineMonitor); + virtual ~PipelineMonitor(){} + + /** + * Report remote queue status. + * @param freeElements number of free elements. + */ + virtual void reportRemoteQueueStatus(epics::pvData::int32 freeElements) = 0; + }; + + }} #endif /* PVACCESS_H */ diff --git a/src/mb/pvAccessMB.h b/src/mb/pvAccessMB.h index 40f4463..9bc5a1a 100644 --- a/src/mb/pvAccessMB.h +++ b/src/mb/pvAccessMB.h @@ -6,8 +6,39 @@ # undef epicsExportSharedSymbols #endif +#ifdef WITH_MICROBENCH + #include +#else + +#define MB_DECLARE(NAME, SIZE) +#define MB_DECLARE_EXTERN(NAME) + +#define MB_POINT_ID(NAME, STAGE, STAGE_DESC, ID) + +#define MB_INC_AUTO_ID(NAME) +#define MB_POINT(NAME, STAGE, STAGE_DESC) + +#define MB_POINT_CONDITIONAL(NAME, STAGE, STAGE_DESC, COND) + +#define MB_NORMALIZE(NAME) + +#define MB_STATS(NAME, STREAM) +#define MB_STATS_OPT(NAME, STAGE_ONLY, SKIP_FIRST_N_SAMPLES, STREAM) + +#define MB_CSV_EXPORT(NAME, STREAM) +#define MB_CSV_EXPORT_OPT(NAME, STAGE_ONLY, SKIP_FIRST_N_SAMPLES, STREAM) +#define MB_CSV_IMPORT(NAME, STREAM) + +#define MB_PRINT(NAME, STREAM) +#define MB_PRINT_OPT(NAME, STAGE_ONLY, SKIP_FIRST_N_SAMPLES, STREAM) + +#define MB_INIT + +#endif + + #ifdef pvAccessMBEpicsExportSharedSymbols # define epicsExportSharedSymbols # undef pvAccessMBEpicsExportSharedSymbols diff --git a/src/pipelineService/pipelineServer.cpp b/src/pipelineService/pipelineServer.cpp index ca88a29..3e25649 100644 --- a/src/pipelineService/pipelineServer.cpp +++ b/src/pipelineService/pipelineServer.cpp @@ -19,7 +19,7 @@ using namespace std; namespace epics { namespace pvAccess { class ChannelPipelineMonitorImpl : - public Monitor, + public PipelineMonitor, public PipelineControl, public std::tr1::enable_shared_from_this { diff --git a/src/remoteClient/clientContextImpl.cpp b/src/remoteClient/clientContextImpl.cpp index bc3cf5b..fc6e92c 100644 --- a/src/remoteClient/clientContextImpl.cpp +++ b/src/remoteClient/clientContextImpl.cpp @@ -2406,11 +2406,6 @@ namespace epics { } } - virtual void reportRemoteQueueStatus(int32 /*freeElements*/) - { - // noop for the client - } - virtual void send(ByteBuffer* buffer, TransportSendControl* control) { control->startMessage((int8)CMD_MONITOR, 9); buffer->putInt(m_channel->getServerChannelID()); @@ -2782,11 +2777,6 @@ namespace epics { m_monitorStrategy->release(monitorElement); } - virtual void reportRemoteQueueStatus(int32 freeElements) - { - m_monitorStrategy->reportRemoteQueueStatus(freeElements); - } - virtual void lock() { // noop diff --git a/src/server/responseHandlers.cpp b/src/server/responseHandlers.cpp index 8773b5e..e706e40 100644 --- a/src/server/responseHandlers.cpp +++ b/src/server/responseHandlers.cpp @@ -1882,7 +1882,11 @@ void ServerMonitorHandler::handleResponse(osiSockAddr* responseFrom, { int32 nfree = payloadBuffer->getInt(); ServerMonitorRequesterImpl::shared_pointer request = static_pointer_cast(channel->getRequest(ioid)); - request->getChannelMonitor()->reportRemoteQueueStatus(nfree); + + Monitor::shared_pointer mp = request->getChannelMonitor(); + PipelineMonitor* pmp = dynamic_cast(mp.get()); + if (pmp) + pmp->reportRemoteQueueStatus(nfree); } } @@ -1903,7 +1907,10 @@ void ServerMonitorHandler::handleResponse(osiSockAddr* responseFrom, if (ack) { int32 nfree = payloadBuffer->getInt(); - request->getChannelMonitor()->reportRemoteQueueStatus(nfree); + Monitor::shared_pointer mp = request->getChannelMonitor(); + PipelineMonitor* pmp = dynamic_cast(mp.get()); + if (pmp) + pmp->reportRemoteQueueStatus(nfree); return; // note: not possible to ack and destroy }