diff --git a/documentation/release_notes.h b/documentation/release_notes.h index 0729638..487be13 100644 --- a/documentation/release_notes.h +++ b/documentation/release_notes.h @@ -1,8 +1,33 @@ /** @page pvarelease_notes Release Notes -Release 6.0.0 (UNRELEASED) +Release 6.1.0 (UNRELEASED) ========================== +- Deprecations + - pv/namedLockPattern.h +- Removals + - Remove deprecated methods configure(), flush(), and poll() from ChannelProvider. + - Remove RPCClient::sendRequest() + - Remove RPCService::destroy() and dispose() + - Typedefs GUID, Service +- Fixes + - pvAccessLog() add EPICS_PRINTF_STYLE() + - ioc: shutdown PVA server via epicsAtExit() + - fix 'pva' provider registration during static linking + - Various fixes related to shared_ptr loop breaking. + - Several *NULL bugs. + - PVA client context: avoid lock order violations +- Changes + - pvac::Monitor - shallow copy into Monitor::root + - pvget -m shows time and alarm if available (thanks to Andrew Starritt) +- Additions + - pvput to NTEnum via. string now supported + - pvac::* add valid() method and boolean cast shorthand. Also reset() and operator<<(ostream, ...) + - Add pvac::ClientProvider::named() + +Release 6.0.0 (Dec 2017) +======================== + - Incompatible changes - Requires pvDataCPP >=7.0.0 due to headers moved from pvDataCPP into this module: requester.h, destoryable.h, and monitor.h - Major changes to shared_ptr ownership rules for epics::pvAccess::ChannelProvider and diff --git a/pvtoolsSrc/eget.cpp b/pvtoolsSrc/eget.cpp index 7309ce2..a936c9e 100644 --- a/pvtoolsSrc/eget.cpp +++ b/pvtoolsSrc/eget.cpp @@ -36,8 +36,6 @@ using namespace epics::pvAccess; enum PrintMode { ValueOnlyMode, StructureMode, TerseMode }; PrintMode mode = ValueOnlyMode; -char fieldSeparator = ' '; - bool columnMajor = false; bool transpose = false; @@ -242,7 +240,7 @@ void formatTable(std::ostream& o, { for (size_t i = 0; i < numColumns; i++) { - if (separator == ' ') + if (fieldSeparator == ' ') { int width = std::max(labels[i].size()+padding, maxColumnLength); o << std::setw(width) << std::right; @@ -250,7 +248,7 @@ void formatTable(std::ostream& o, } else if (i > 0) { - o << separator; + o << fieldSeparator; } o << labels[i]; @@ -263,7 +261,7 @@ void formatTable(std::ostream& o, { for (size_t i = 0; i < numColumns; i++) { - if (separator == ' ' && (showHeader || numColumns > 1)) + if (fieldSeparator == ' ' && (showHeader || numColumns > 1)) { int width = std::max(labels[i].size()+padding, maxColumnLength); o << setw(width) << std::right; @@ -271,7 +269,7 @@ void formatTable(std::ostream& o, } else if (i > 0) { - o << separator; + o << fieldSeparator; } PVScalarArrayPtr array = columnData[i]; @@ -297,7 +295,7 @@ void formatTable(std::ostream& o, { if (showHeader && labels.size()) { - if (separator == ' ') + if (fieldSeparator == ' ') { o << std::setw(maxLabelColumnLength) << std::left; } @@ -306,12 +304,12 @@ void formatTable(std::ostream& o, for (size_t r = 0; r < maxValues; r++) { - if (separator == ' ' && (showHeader || numColumns > 1)) + if (fieldSeparator == ' ' && (showHeader || numColumns > 1)) { o << std::setw(maxColumnLength) << std::right; } else if (showHeader || r > 0) - o << separator; + o << fieldSeparator; PVScalarArrayPtr array = columnData[i]; if (array.get() && r < array->getLength()) @@ -433,10 +431,10 @@ void formatNTMatrix(std::ostream& o, PVStructurePtr const & pvStruct) { for (int32 c = 0; c < cols; c++) { - if (separator == ' ' && cols > 1) + if (fieldSeparator == ' ' && cols > 1) o << std::setw(maxColumnLength) << std::right; else if (c > 0) - o << separator; + o << fieldSeparator; if (columnMajor) value->dumpValue(o, r + c * rows); @@ -459,10 +457,10 @@ void formatNTMatrix(std::ostream& o, PVStructurePtr const & pvStruct) { for (int32 r = 0; r < rows; r++) { - if (separator == ' ' && rows > 1) + if (fieldSeparator == ' ' && rows > 1) o << std::setw(maxColumnLength) << std::right; else if (r > 0) - o << separator; + o << fieldSeparator; if (columnMajor) value->dumpValue(o, ix++); @@ -540,7 +538,7 @@ void formatNTNameValue(std::ostream& o, PVStructurePtr const & pvStruct) { for (size_t i = 0; i < numColumns; i++) { - if (separator == ' ') + if (fieldSeparator == ' ') { int width = std::max(nameData[i].size()+padding, maxColumnLength); o << std::setw(width) << std::right; @@ -548,7 +546,7 @@ void formatNTNameValue(std::ostream& o, PVStructurePtr const & pvStruct) } else if (i > 0) { - o << separator; + o << fieldSeparator; } o << nameData[i]; @@ -559,7 +557,7 @@ void formatNTNameValue(std::ostream& o, PVStructurePtr const & pvStruct) // then values for (size_t i = 0; i < numColumns; i++) { - if (separator == ' ' && showHeader) + if (fieldSeparator == ' ' && showHeader) { int width = std::max(nameData[i].size()+padding, maxColumnLength); o << std::setw(width) << std::right; @@ -567,7 +565,7 @@ void formatNTNameValue(std::ostream& o, PVStructurePtr const & pvStruct) } else if (i > 0) { - o << separator; + o << fieldSeparator; } array->dumpValue(o, i); } @@ -586,20 +584,20 @@ void formatNTNameValue(std::ostream& o, PVStructurePtr const & pvStruct) { if (showHeader) { - if (separator == ' ') + if (fieldSeparator == ' ') { o << std::setw(maxLabelColumnLength) << std::left; } o << nameData[i]; } - if (separator == ' ' && showHeader) + if (fieldSeparator == ' ' && showHeader) { o << std::setw(maxColumnLength) << std::right; } else if (showHeader) { - o << separator; + o << fieldSeparator; } array->dumpValue(o, i); @@ -899,7 +897,7 @@ void printValue(std::string const & channelName, PVStructure::shared_pointer con if (forceTerseWithName) { if (!channelName.empty()) - std::cout << channelName << separator; + std::cout << channelName << fieldSeparator; terseStructure(std::cout, pv) << std::endl; } else if (mode == ValueOnlyMode) @@ -1044,7 +1042,7 @@ void usage (void) " -p : Set default provider name, default is '%s'\n" " -q: Quiet mode, print only error messages\n" " -d: Enable debug output\n" - " -F : Use as an alternate output field separator\n" + " -F : Use as an alternate output field fieldSeparator\n" " -f : Use as an input that provides a list PV name(s) to be read, use '-' for stdin\n" " -c: Wait for clean shutdown and report used instance count (for expert users)\n" " enum format:\n" @@ -1416,7 +1414,7 @@ int main (int argc, char *argv[]) dumpStructure = true; break; case 'i': /* T-types format mode */ - formatTTypes(false); + formatTTypesFlag = false; break; case 't': /* Terse mode */ mode = TerseMode; @@ -1478,7 +1476,7 @@ int main (int argc, char *argv[]) break; } case 'n': - setEnumPrintMode(NumberEnum); + enumMode = NumberEnum; break; case '?': fprintf(stderr, @@ -1525,8 +1523,7 @@ int main (int argc, char *argv[]) SET_LOG_LEVEL(debug ? logLevelDebug : logLevelError); std::cout << std::boolalpha; - terseSeparator(fieldSeparator); - terseArrayCount(false); + arrayCountFlag = false; bool allOK = true; diff --git a/pvtoolsSrc/pvget.cpp b/pvtoolsSrc/pvget.cpp index 1757a8b..c4ad4b5 100644 --- a/pvtoolsSrc/pvget.cpp +++ b/pvtoolsSrc/pvget.cpp @@ -48,8 +48,6 @@ string defaultProvider("pva"); enum PrintMode { ValueOnlyMode, StructureMode, TerseMode }; PrintMode mode = ValueOnlyMode; -char fieldSeparator = ' '; - void usage (void) { fprintf (stderr, "\nUsage: pvget [options] ...\n\n" @@ -305,14 +303,7 @@ struct MonitorRequesterImpl : public MonitorRequester, public Tracker } else { - if (fieldSeparator == ' ' && value->getField()->getType() == scalar) - std::cout << std::setw(30) << std::left << m_channelName; - else - std::cout << m_channelName; - - std::cout << fieldSeparator; - - terse(std::cout, value) << '\n'; + printMonitoredValue (value, element); } } } @@ -344,6 +335,35 @@ struct MonitorRequesterImpl : public MonitorRequester, public Tracker std::cerr << "unlisten" << m_channelName << '\n'; done(); } + +private: + // For value type scalar or scalarArray when mode is ValueOnlyMode + void printMonitoredValue (PVField::shared_pointer value, MonitorElement* element) + { + PVStructure::shared_pointer timeStamp(element->pvStructurePtr->getSubField("timeStamp")); + PVStructure::shared_pointer alarm(element->pvStructurePtr->getSubField("alarm")); + + if (fieldSeparator == ' ' && value->getField()->getType() == scalar) + std::cout << std::setw(30) << std::left; + + std::cout << m_channelName << fieldSeparator; + + if (timeStamp) + terseStructure(std::cout, timeStamp) << " "; + + terse(std::cout, value) << " "; + + if (alarm) + { + PVScalar::shared_pointer pvSeverity(alarm->getSubField("severity")); + + bool inAlarm = !pvSeverity ? false : (pvSeverity->getAs()!=0); + if (inAlarm) + terseStructure(std::cout, alarm); + } + + std::cout<< '\n'; + } }; } // namespace @@ -405,7 +425,7 @@ int main (int argc, char *argv[]) mode = TerseMode; break; case 'i': /* T-types format mode */ - formatTTypes(false); + formatTTypesFlag = false; break; case 'm': /* Monitor mode */ monitor = true; @@ -446,7 +466,7 @@ int main (int argc, char *argv[]) break; } case 'n': - setEnumPrintMode(NumberEnum); + enumMode = NumberEnum; break; case '?': fprintf(stderr, @@ -509,7 +529,6 @@ int main (int argc, char *argv[]) SET_LOG_LEVEL(debugFlag ? logLevelDebug : logLevelError); std::cout << std::boolalpha; - terseSeparator(fieldSeparator); // ================ Connect channels and start operations @@ -565,8 +584,6 @@ int main (int argc, char *argv[]) std::cerr<<"Provider "<second; diff --git a/pvtoolsSrc/pvlist.cpp b/pvtoolsSrc/pvlist.cpp index e9840c6..7423e50 100644 --- a/pvtoolsSrc/pvlist.cpp +++ b/pvtoolsSrc/pvlist.cpp @@ -490,7 +490,6 @@ int main (int argc, char *argv[]) int opt; /* getopt() current option */ bool debug = false; double timeOut = DEFAULT_TIMEOUT; - // char fieldSeparator = ' '; bool printInfo = false; /* diff --git a/pvtoolsSrc/pvput.cpp b/pvtoolsSrc/pvput.cpp index db5ae9a..98efae1 100644 --- a/pvtoolsSrc/pvput.cpp +++ b/pvtoolsSrc/pvput.cpp @@ -51,8 +51,6 @@ const string noAddress; enum PrintMode { ValueOnlyMode, StructureMode, TerseMode }; PrintMode mode = ValueOnlyMode; -char fieldSeparator = ' '; - bool debug = false; void usage (bool details=false) @@ -609,8 +607,6 @@ int main (int argc, char *argv[]) SET_LOG_LEVEL(debug ? logLevelDebug : logLevelError); std::cout << std::boolalpha; - terseSeparator(fieldSeparator); - setEnumPrintMode(enumMode); epics::pvAccess::ca::CAClientFactory::start(); diff --git a/pvtoolsSrc/pvutils.cpp b/pvtoolsSrc/pvutils.cpp index b0eab26..28ba4cb 100644 --- a/pvtoolsSrc/pvutils.cpp +++ b/pvtoolsSrc/pvutils.cpp @@ -43,35 +43,14 @@ std::ostream& operator<<(std::ostream& o, const dump_stack_only_on_debug& d) return o; } -char separator = ' '; -void terseSeparator(char c) -{ - separator = c; -} +char fieldSeparator = ' '; char arrayCountFlag = true; -void terseArrayCount(bool flag) -{ - arrayCountFlag = flag; -} EnumMode enumMode = AutoEnum; -void setEnumPrintMode(EnumMode mode) -{ - enumMode = mode; -} bool formatTTypesFlag = true; -void formatTTypes(bool flag) -{ - formatTTypesFlag = flag; -} - bool printUserTagFlag = true; -void printUserTag(bool flag) -{ - printUserTagFlag = flag; -} std::ostream& terse(std::ostream& o, PVField::const_shared_pointer const & pv) { @@ -149,7 +128,7 @@ std::ostream& printTimeT(std::ostream& o, epics::pvData::PVStructure::const_shar epicsTimeToStrftime(timeText, sizeof(timeText), "%Y-%m-%dT%H:%M:%S.%03f", &epicsTS); o << timeText; if (printUserTagFlag && tagf) - o << separator << tagf->getAs(); + o << fieldSeparator << tagf->getAs(); return o; } @@ -192,14 +171,14 @@ std::ostream& printAlarmT(std::ostream& o, epics::pvData::PVStructure::const_sha o << v; else o << severityNames[v]; - o << separator; + o << fieldSeparator; v = pvStatus->get(); if (v < 0 || v > 7) o << v; else o << statusNames[v]; - o << separator; + o << fieldSeparator; if (pvMessage->get().empty()) o << ""; else @@ -266,7 +245,7 @@ std::ostream& terseStructure(std::ostream& o, PVStructure::const_shared_pointer if (first) first = false; else - o << separator; + o << fieldSeparator; terse(o, fieldsData[i]); } @@ -294,7 +273,7 @@ std::ostream& terseScalarArray(std::ostream& o, const PVScalarArray::const_share o << '0'; return o; } - o << length << separator; + o << length << fieldSeparator; } bool first = true; @@ -302,7 +281,7 @@ std::ostream& terseScalarArray(std::ostream& o, const PVScalarArray::const_share if (first) first = false; else - o << separator; + o << fieldSeparator; pvArray->dumpValue(o, i); } @@ -325,7 +304,7 @@ std::ostream& terseStructureArray(std::ostream& o, PVStructureArray::const_share o << '0'; return o; } - o << length << separator; + o << length << fieldSeparator; } PVStructureArray::const_svector data = pvArray->view(); @@ -334,7 +313,7 @@ std::ostream& terseStructureArray(std::ostream& o, PVStructureArray::const_share if (first) first = false; else - o << separator; + o << fieldSeparator; terseStructure(o, data[i]); } @@ -351,7 +330,7 @@ std::ostream& terseUnionArray(std::ostream& o, PVUnionArray::const_shared_pointe o << '0'; return o; } - o << length << separator; + o << length << fieldSeparator; } PVUnionArray::const_svector data = pvArray->view(); @@ -360,7 +339,7 @@ std::ostream& terseUnionArray(std::ostream& o, PVUnionArray::const_shared_pointe if (first) first = false; else - o << separator; + o << fieldSeparator; terseUnion(o, data[i]); } diff --git a/pvtoolsSrc/pvutils.h b/pvtoolsSrc/pvutils.h index da4139c..73439ac 100644 --- a/pvtoolsSrc/pvutils.h +++ b/pvtoolsSrc/pvutils.h @@ -7,8 +7,6 @@ void convertStructure(std::string* buffer, epics::pvData::PVStructure *data, int void convertArray(std::string*, epics::pvData::PVScalarArray * pv, int notFirst); void convertStructureArray(std::string*, epics::pvData::PVStructureArray * pvdata, int notFirst); -void terseSeparator(char c); -void terseArrayCount(bool flag); std::ostream& terse(std::ostream& o, epics::pvData::PVField::const_shared_pointer const & pv); std::ostream& terseUnion(std::ostream& o, epics::pvData::PVUnion::const_shared_pointer const & pvUnion); std::ostream& terseStructure(std::ostream& o, const epics::pvData::PVStructure::const_shared_pointer &pvStructure); @@ -17,14 +15,10 @@ std::ostream& terseStructureArray(std::ostream& o, epics::pvData::PVStructureArr std::ostream& terseUnionArray(std::ostream& o, epics::pvData::PVUnionArray::const_shared_pointer const & pvArray); enum EnumMode { AutoEnum, NumberEnum, StringEnum }; -void setEnumPrintMode(EnumMode mode); -void formatTTypes(bool flag); bool isTType(epics::pvData::PVStructure::const_shared_pointer const & pvStructure); bool formatTType(std::ostream& o, const epics::pvData::PVStructure::const_shared_pointer &pvStructure); -void printUserTag(bool flag); - std::ostream& printEnumT(std::ostream& o, epics::pvData::PVStructure const & pvEnumT); std::ostream& printEnumT(std::ostream& o, epics::pvData::PVStructure::const_shared_pointer const & pvEnumT); std::ostream& printTimeT(std::ostream& o, epics::pvData::PVStructure::const_shared_pointer const & pvTimeT); diff --git a/src/client/client.cpp b/src/client/client.cpp index e797f1d..0770f84 100644 --- a/src/client/client.cpp +++ b/src/client/client.cpp @@ -3,6 +3,8 @@ * found in the file LICENSE that is included with the distribution */ +#include + #include #include #include @@ -197,6 +199,17 @@ void ClientChannel::removeConnectListener(ConnectCallback* cb) } } + +void ClientChannel::show(std::ostream& strm) const +{ + if(impl) { + strm<channel.get()).name()<<" : "; + impl->channel->printInfo(strm); + } else { + strm<<"NULL Channel"; + } +} + static void register_reftrack() { @@ -266,6 +279,12 @@ ClientProvider::ClientProvider(const std::tr1::shared_ptrprovider->getProviderName(); +} + ClientChannel ClientProvider::connect(const std::string& name, const ClientChannel::Options& conf) @@ -305,6 +324,43 @@ void ClientProvider::disconnect() impl->channels.clear(); } +::std::ostream& operator<<(::std::ostream& strm, const Operation& op) +{ + if(op.impl) { + op.impl->show(strm); + } else { + strm << "Operation()"; + } + return strm; +} + +::std::ostream& operator<<(::std::ostream& strm, const ClientChannel& op) +{ + if(op.impl) { + strm << "ClientChannel(" + << typeid(*op.impl->channel.get()).name()<<", " + "\"" << op.impl->channel->getChannelName() <<"\", " + "\"" << op.impl->channel->getProvider()->getProviderName() <<"\", " + "connected="<<(op.impl->channel->isConnected()?"true":"false") + <<"\")"; + } else { + strm << "ClientChannel()"; + } + return strm; +} + +::std::ostream& operator<<(::std::ostream& strm, const ClientProvider& op) +{ + if(op.impl) { + strm << "ClientProvider(" + << typeid(*op.impl->provider.get()).name()<<", " + "\""<provider->getProviderName()<<"\")"; + } else { + strm << "ClientProvider()"; + } + return strm; +} + namespace detail { void registerRefTrack() diff --git a/src/client/clientGet.cpp b/src/client/clientGet.cpp index 0790a02..54051fd 100644 --- a/src/client/clientGet.cpp +++ b/src/client/clientGet.cpp @@ -177,6 +177,13 @@ struct GetPutter : public pva::ChannelPutRequester, callEvent(G, status.isSuccess()? pvac::GetEvent::Success : pvac::GetEvent::Fail); } + + virtual void show(std::ostream &strm) const + { + strm << "Operation(Get/Put" + "\"" << name() <<"\"" + ")"; + } }; size_t GetPutter::num_instances; diff --git a/src/client/clientMonitor.cpp b/src/client/clientMonitor.cpp index 8b7debd..6e19abe 100644 --- a/src/client/clientMonitor.cpp +++ b/src/client/clientMonitor.cpp @@ -248,6 +248,20 @@ ClientChannel::monitor(MonitorCallback *cb, return Monitor(ret); } +::std::ostream& operator<<(::std::ostream& strm, const Monitor& op) +{ + if(op.impl) { + strm << "Monitor(" + "\"" << op.impl->chan->getChannelName() <<"\", " + "\"" << op.impl->chan->getProvider()->getProviderName() <<"\", " + "connected="<<(op.impl->chan->isConnected()?"true":"false") + <<"\")"; + } else { + strm << "Monitor()"; + } + return strm; +} + namespace detail { void registerRefTrackMonitor() diff --git a/src/client/clientRPC.cpp b/src/client/clientRPC.cpp index 9400961..5028586 100644 --- a/src/client/clientRPC.cpp +++ b/src/client/clientRPC.cpp @@ -140,6 +140,13 @@ struct RPCer : public pva::ChannelRPCRequester, callEvent(G, status.isSuccess()? pvac::GetEvent::Success : pvac::GetEvent::Fail); } + + virtual void show(std::ostream &strm) const + { + strm << "Operation(RPC" + "\"" << name() <<"\"" + ")"; + } }; size_t RPCer::num_instances; diff --git a/src/client/pva/client.h b/src/client/pva/client.h index 21f6b3a..2da8e67 100644 --- a/src/client/pva/client.h +++ b/src/client/pva/client.h @@ -5,6 +5,7 @@ #ifndef PVATESTCLIENT_H #define PVATESTCLIENT_H +#include #include #include @@ -53,6 +54,7 @@ struct epicsShareClass Operation virtual ~Impl() {} virtual std::string name() const =0; virtual void cancel() =0; + virtual void show(std::ostream&) const =0; }; Operation() {} @@ -64,7 +66,21 @@ struct epicsShareClass Operation //! Does not wait for remote confirmation. void cancel(); + bool valid() const { return !!impl; } + +#if __cplusplus>=201103L + explicit operator bool() const { return valid(); } +#else +private: + typedef bool (Operation::*bool_type)() const; +public: + operator bool_type() const { return valid() ? &Operation::valid : 0; } +#endif + + void reset() { impl.reset(); } + protected: + friend epicsShareFunc ::std::ostream& operator<<(::std::ostream& strm, const Operation& op); std::tr1::shared_ptr impl; }; @@ -133,8 +149,22 @@ struct epicsShareClass Monitor epics::pvData::BitSet changed, overrun; + bool valid() const { return !!impl; } + +#if __cplusplus>=201103L + explicit operator bool() const { return valid(); } +#else +private: + typedef bool (Monitor::*bool_type)() const; +public: + operator bool_type() const { return valid() ? &Monitor::valid : 0; } +#endif + + void reset() { impl.reset(); } + private: std::tr1::shared_ptr impl; + friend epicsShareFunc ::std::ostream& operator<<(::std::ostream& strm, const Monitor& op); friend struct MonitorSync; }; @@ -220,6 +250,7 @@ private: std::tr1::shared_ptr impl; friend class ClientProvider; friend void detail::registerRefTrack(); + friend epicsShareFunc ::std::ostream& operator<<(::std::ostream& strm, const ClientChannel& op); ClientChannel(const std::tr1::shared_ptr& i) :impl(i) {} public: @@ -247,6 +278,19 @@ public: //! Channel name or an empty string std::string name() const; + bool valid() const { return !!impl; } + +#if __cplusplus>=201103L + explicit operator bool() const { return valid(); } +#else +private: + typedef bool (ClientChannel::*bool_type)() const; +public: + operator bool_type() const { return valid() ? &ClientChannel::valid : 0; } +#endif + + void reset() { impl.reset(); } + //! callback for get() and rpc() struct GetCallback { virtual ~GetCallback() {} @@ -358,6 +402,7 @@ public: //! Remove from list of listeners void removeConnectListener(ConnectCallback*); + void show(std::ostream& strm) const; private: std::tr1::shared_ptr getChannel(); }; @@ -421,6 +466,7 @@ class epicsShareClass ClientProvider struct Impl; std::tr1::shared_ptr impl; friend void detail::registerRefTrack(); + friend epicsShareFunc ::std::ostream& operator<<(::std::ostream& strm, const ClientProvider& op); public: /** Use named provider. @@ -435,6 +481,8 @@ public: explicit ClientProvider(const std::tr1::shared_ptr& provider); ~ClientProvider(); + std::string name() const; + /** Get a new Channel * * Does not block. @@ -450,6 +498,19 @@ public: //! Clear channel cache void disconnect(); + + bool valid() const { return !!impl; } + +#if __cplusplus>=201103L + explicit operator bool() const { return valid(); } +#else +private: + typedef bool (ClientProvider::*bool_type)() const; +public: + operator bool_type() const { return valid() ? &ClientProvider::valid : 0; } +#endif + + void reset() { impl.reset(); } }; @@ -460,6 +521,11 @@ ClientChannel::put(const epics::pvData::PVStructure::const_shared_pointer& pvReq return detail::PutBuilder(*this, pvRequest); } +epicsShareFunc ::std::ostream& operator<<(::std::ostream& strm, const Operation& op); +epicsShareFunc ::std::ostream& operator<<(::std::ostream& strm, const Monitor& op); +epicsShareFunc ::std::ostream& operator<<(::std::ostream& strm, const ClientChannel& op); +epicsShareFunc ::std::ostream& operator<<(::std::ostream& strm, const ClientProvider& op); + //! @} }//namespace pvac diff --git a/src/factory/ChannelAccessFactory.cpp b/src/factory/ChannelAccessFactory.cpp index 90af176..9f5a705 100644 --- a/src/factory/ChannelAccessFactory.cpp +++ b/src/factory/ChannelAccessFactory.cpp @@ -24,6 +24,7 @@ #include "pv/codec.h" #include #include +#include using namespace epics::pvData; using std::string; @@ -173,6 +174,7 @@ void providerRegInit(void*) registerRefCounter("ServerChannel", &ServerChannel::num_instances); registerRefCounter("Transport (ABC)", &Transport::num_instances); registerRefCounter("BlockingTCPTransportCodec", &detail::BlockingTCPTransportCodec::num_instances); + registerRefCounter("BlockingUDPTransport", &BlockingUDPTransport::num_instances); registerRefCounter("ChannelProvider (ABC)", &ChannelProvider::num_instances); registerRefCounter("Channel (ABC)", &Channel::num_instances); registerRefCounter("ChannelRequester (ABC)", &ChannelRequester::num_instances); diff --git a/src/remote/Makefile b/src/remote/Makefile index 1e180cf..89f4361 100644 --- a/src/remote/Makefile +++ b/src/remote/Makefile @@ -9,7 +9,7 @@ pvAccess_SRCS += blockingUDPTransport.cpp pvAccess_SRCS += blockingUDPConnector.cpp pvAccess_SRCS += beaconHandler.cpp pvAccess_SRCS += blockingTCPConnector.cpp -pvAccess_SRCS += simpleChannelSearchManagerImpl.cpp +pvAccess_SRCS += channelSearchManager.cpp pvAccess_SRCS += abstractResponseHandler.cpp pvAccess_SRCS += blockingTCPAcceptor.cpp pvAccess_SRCS += transportRegistry.cpp diff --git a/src/remote/blockingUDPTransport.cpp b/src/remote/blockingUDPTransport.cpp index 844ed9e..67ffe8d 100644 --- a/src/remote/blockingUDPTransport.cpp +++ b/src/remote/blockingUDPTransport.cpp @@ -17,6 +17,7 @@ #include #include +#include #define epicsExportSharedSymbols #include @@ -42,6 +43,8 @@ inline int sendto(int s, const char *buf, size_t len, int flags, const struct so // reserve some space for CMD_ORIGIN_TAG message #define RECEIVE_BUFFER_PRE_RESERVE (PVA_MESSAGE_HEADER_SIZE + 16) +size_t BlockingUDPTransport::num_instances; + BlockingUDPTransport::BlockingUDPTransport(bool serverFlag, ResponseHandler::shared_pointer const & responseHandler, SOCKET channel, osiSockAddr& bindAddress, @@ -79,9 +82,12 @@ BlockingUDPTransport::BlockingUDPTransport(bool serverFlag, sockAddrToDottedIP(&_remoteAddress.sa, strBuffer, sizeof(strBuffer)); _remoteName = strBuffer; } + + REFTRACE_INCREMENT(num_instances); } BlockingUDPTransport::~BlockingUDPTransport() { + REFTRACE_DECREMENT(num_instances); close(true); // close the socket and stop the thread. } diff --git a/src/remote/simpleChannelSearchManagerImpl.cpp b/src/remote/channelSearchManager.cpp similarity index 71% rename from src/remote/simpleChannelSearchManagerImpl.cpp rename to src/remote/channelSearchManager.cpp index 638f3b4..0fe9da1 100644 --- a/src/remote/simpleChannelSearchManagerImpl.cpp +++ b/src/remote/channelSearchManager.cpp @@ -11,7 +11,7 @@ #include #define epicsExportSharedSymbols -#include +#include #include #include #include @@ -20,28 +20,55 @@ using namespace std; using namespace epics::pvData; +namespace { +namespace pva = epics::pvAccess; + +class MockTransportSendControl: public pva::TransportSendControl +{ +public: + void endMessage() {} + void flush(bool /*lastMessageCompleted*/) {} + void setRecipient(const osiSockAddr& /*sendTo*/) {} + void startMessage(epics::pvData::int8 /*command*/, std::size_t /*ensureCapacity*/, epics::pvData::int32 /*payloadSize*/) {} + void ensureBuffer(std::size_t /*size*/) {} + void alignBuffer(std::size_t /*alignment*/) {} + void flushSerializeBuffer() {} + void cachedSerialize(const std::tr1::shared_ptr& field, epics::pvData::ByteBuffer* buffer) + { + // no cache + field->serialize(buffer, this); + } + virtual bool directSerialize(epics::pvData::ByteBuffer* /*existingBuffer*/, const char* /*toSerialize*/, + std::size_t /*elementCount*/, std::size_t /*elementSize*/) + { + return false; + } +}; + +}// namespace + namespace epics { namespace pvAccess { -const int SimpleChannelSearchManagerImpl::DATA_COUNT_POSITION = PVA_MESSAGE_HEADER_SIZE + 4+1+3+16+2+1+4; -const int SimpleChannelSearchManagerImpl::CAST_POSITION = PVA_MESSAGE_HEADER_SIZE + 4; -const int SimpleChannelSearchManagerImpl::PAYLOAD_POSITION = 4; +const int ChannelSearchManager::DATA_COUNT_POSITION = PVA_MESSAGE_HEADER_SIZE + 4+1+3+16+2+1+4; +const int ChannelSearchManager::CAST_POSITION = PVA_MESSAGE_HEADER_SIZE + 4; +const int ChannelSearchManager::PAYLOAD_POSITION = 4; // 225ms +/- 25ms random -const double SimpleChannelSearchManagerImpl::ATOMIC_PERIOD = 0.225; -const int SimpleChannelSearchManagerImpl::PERIOD_JITTER_MS = 25; +const double ChannelSearchManager::ATOMIC_PERIOD = 0.225; +const int ChannelSearchManager::PERIOD_JITTER_MS = 25; -const int SimpleChannelSearchManagerImpl::DEFAULT_USER_VALUE = 1; -const int SimpleChannelSearchManagerImpl::BOOST_VALUE = 1; +const int ChannelSearchManager::DEFAULT_USER_VALUE = 1; +const int ChannelSearchManager::BOOST_VALUE = 1; // must be power of two (so that search is done) -const int SimpleChannelSearchManagerImpl::MAX_COUNT_VALUE = 1 << 8; -const int SimpleChannelSearchManagerImpl::MAX_FALLBACK_COUNT_VALUE = (1 << 7) + 1; +const int ChannelSearchManager::MAX_COUNT_VALUE = 1 << 8; +const int ChannelSearchManager::MAX_FALLBACK_COUNT_VALUE = (1 << 7) + 1; -const int SimpleChannelSearchManagerImpl::MAX_FRAMES_AT_ONCE = 10; -const int SimpleChannelSearchManagerImpl::DELAY_BETWEEN_FRAMES_MS = 50; +const int ChannelSearchManager::MAX_FRAMES_AT_ONCE = 10; +const int ChannelSearchManager::DELAY_BETWEEN_FRAMES_MS = 50; -SimpleChannelSearchManagerImpl::SimpleChannelSearchManagerImpl(Context::shared_pointer const & context) : +ChannelSearchManager::ChannelSearchManager(Context::shared_pointer const & context) : m_context(context), m_responseAddress(), // initialized in activate() m_canceled(), @@ -49,7 +76,6 @@ SimpleChannelSearchManagerImpl::SimpleChannelSearchManagerImpl(Context::shared_p m_sendBuffer(MAX_UDP_UNFRAGMENTED_SEND), m_channels(), m_lastTimeSent(), - m_mockTransportSendControl(), m_channelMutex(), m_userValueMutex(), m_mutex() @@ -58,7 +84,7 @@ SimpleChannelSearchManagerImpl::SimpleChannelSearchManagerImpl(Context::shared_p srand ( time(NULL) ); } -void SimpleChannelSearchManagerImpl::activate() +void ChannelSearchManager::activate() { m_responseAddress = Context::shared_pointer(m_context)->getSearchTransport()->getRemoteAddress(); @@ -73,15 +99,15 @@ void SimpleChannelSearchManagerImpl::activate() context->getTimer()->schedulePeriodic(shared_from_this(), period, period); } -SimpleChannelSearchManagerImpl::~SimpleChannelSearchManagerImpl() +ChannelSearchManager::~ChannelSearchManager() { Lock guard(m_mutex); if (!m_canceled.get()) { - LOG(logLevelWarn, "Logic error: SimpleChannelSearchManagerImpl destroyed w/o cancel()"); + LOG(logLevelWarn, "Logic error: ChannelSearchManager destroyed w/o cancel()"); } } -void SimpleChannelSearchManagerImpl::cancel() +void ChannelSearchManager::cancel() { Lock guard(m_mutex); @@ -94,13 +120,13 @@ void SimpleChannelSearchManagerImpl::cancel() context->getTimer()->cancel(shared_from_this()); } -int32_t SimpleChannelSearchManagerImpl::registeredCount() +int32_t ChannelSearchManager::registeredCount() { Lock guard(m_channelMutex); return static_cast(m_channels.size()); } -void SimpleChannelSearchManagerImpl::registerSearchInstance(SearchInstance::shared_pointer const & channel, bool penalize) +void ChannelSearchManager::registerSearchInstance(SearchInstance::shared_pointer const & channel, bool penalize) { if (m_canceled.get()) return; @@ -122,16 +148,14 @@ void SimpleChannelSearchManagerImpl::registerSearchInstance(SearchInstance::shar callback(); } -void SimpleChannelSearchManagerImpl::unregisterSearchInstance(SearchInstance::shared_pointer const & channel) +void ChannelSearchManager::unregisterSearchInstance(SearchInstance::shared_pointer const & channel) { Lock guard(m_channelMutex); pvAccessID id = channel->getSearchInstanceID(); - m_channels_t::iterator channelsIter = m_channels.find(id); - if(channelsIter != m_channels.end()) - m_channels.erase(id); + m_channels.erase(id); } -void SimpleChannelSearchManagerImpl::searchResponse(const ServerGUID & guid, pvAccessID cid, int32_t /*seqNo*/, int8_t minorRevision, osiSockAddr* serverAddress) +void ChannelSearchManager::searchResponse(const ServerGUID & guid, pvAccessID cid, int32_t /*seqNo*/, int8_t minorRevision, osiSockAddr* serverAddress) { Lock guard(m_channelMutex); m_channels_t::iterator channelsIter = m_channels.find(cid); @@ -162,13 +186,13 @@ void SimpleChannelSearchManagerImpl::searchResponse(const ServerGUID & guid, pvA } } -void SimpleChannelSearchManagerImpl::newServerDetected() +void ChannelSearchManager::newServerDetected() { boost(); callback(); } -void SimpleChannelSearchManagerImpl::initializeSendBuffer() +void ChannelSearchManager::initializeSendBuffer() { // for now OK, since it is only set here m_sequenceNumber++; @@ -197,12 +221,13 @@ void SimpleChannelSearchManagerImpl::initializeSendBuffer() // TODO now only TCP is supported // note: this affects DATA_COUNT_POSITION m_sendBuffer.putByte((int8_t)1); - // TODO "tcp" constant - SerializeHelper::serializeString("tcp", &m_sendBuffer, &m_mockTransportSendControl); + + MockTransportSendControl control; + SerializeHelper::serializeString("tcp", &m_sendBuffer, &control); m_sendBuffer.putShort((int16_t)0); // count } -void SimpleChannelSearchManagerImpl::flushSendBuffer() +void ChannelSearchManager::flushSendBuffer() { Lock guard(m_mutex); @@ -219,7 +244,7 @@ void SimpleChannelSearchManagerImpl::flushSendBuffer() } -bool SimpleChannelSearchManagerImpl::generateSearchRequestMessage(SearchInstance::shared_pointer const & channel, +bool ChannelSearchManager::generateSearchRequestMessage(SearchInstance::shared_pointer const & channel, ByteBuffer* requestMessage, TransportSendControl* control) { epics::pvData::int16 dataCount = requestMessage->getShort(DATA_COUNT_POSITION); @@ -231,7 +256,7 @@ bool SimpleChannelSearchManagerImpl::generateSearchRequestMessage(SearchInstance return false; */ - const std::string name = channel->getSearchInstanceName(); + const std::string& name(channel->getSearchInstanceName()); // not nice... const int addedPayloadSize = sizeof(int32)/sizeof(int8) + (1 + sizeof(int32)/sizeof(int8) + name.length()); if(((int)requestMessage->getRemaining()) < addedPayloadSize) @@ -245,17 +270,19 @@ bool SimpleChannelSearchManagerImpl::generateSearchRequestMessage(SearchInstance return true; } -bool SimpleChannelSearchManagerImpl::generateSearchRequestMessage(SearchInstance::shared_pointer const & channel, +bool ChannelSearchManager::generateSearchRequestMessage(SearchInstance::shared_pointer const & channel, bool allowNewFrame, bool flush) { + MockTransportSendControl control; + Lock guard(m_mutex); - bool success = generateSearchRequestMessage(channel, &m_sendBuffer, &m_mockTransportSendControl); + bool success = generateSearchRequestMessage(channel, &m_sendBuffer, &control); // buffer full, flush if(!success) { flushSendBuffer(); if(allowNewFrame) - generateSearchRequestMessage(channel, &m_sendBuffer, &m_mockTransportSendControl); + generateSearchRequestMessage(channel, &m_sendBuffer, &control); if (flush) flushSendBuffer(); return true; @@ -267,7 +294,7 @@ bool SimpleChannelSearchManagerImpl::generateSearchRequestMessage(SearchInstance return flush; } -void SimpleChannelSearchManagerImpl::boost() +void ChannelSearchManager::boost() { Lock guard(m_channelMutex); Lock guard2(m_userValueMutex); @@ -281,7 +308,7 @@ void SimpleChannelSearchManagerImpl::boost() } } -void SimpleChannelSearchManagerImpl::callback() +void ChannelSearchManager::callback() { // high-frequency beacon anomaly trigger guard { @@ -346,12 +373,12 @@ void SimpleChannelSearchManagerImpl::callback() flushSendBuffer(); } -bool SimpleChannelSearchManagerImpl::isPowerOfTwo(int32_t x) +bool ChannelSearchManager::isPowerOfTwo(int32_t x) { return ((x > 0) && (x & (x - 1)) == 0); } -void SimpleChannelSearchManagerImpl::timerStopped() +void ChannelSearchManager::timerStopped() { } diff --git a/src/remote/codec.cpp b/src/remote/codec.cpp index cd45e83..8efa03c 100644 --- a/src/remote/codec.cpp +++ b/src/remote/codec.cpp @@ -1040,14 +1040,11 @@ void BlockingTCPTransportCodec::close() { // wakeup processSendQueue // clean resources (close socket) - internalClose(true); + internalClose(); // Break sender from queue wait BreakTransport::shared_pointer B(new BreakTransport); enqueueSendRequest(B); - - // post close - internalPostClose(true); } } @@ -1058,9 +1055,42 @@ void BlockingTCPTransportCodec::waitJoin() _readThread.exitWait(); } -void BlockingTCPTransportCodec::internalClose(bool /*force*/) +void BlockingTCPTransportCodec::internalClose() { - this->internalDestroy(); + { + + epicsSocketSystemCallInterruptMechanismQueryInfo info = + epicsSocketSystemCallInterruptMechanismQuery (); + switch ( info ) + { + case esscimqi_socketCloseRequired: + epicsSocketDestroy ( _channel ); + break; + case esscimqi_socketBothShutdownRequired: + { + /*int status =*/ ::shutdown ( _channel, SHUT_RDWR ); + /* + if ( status ) { + char sockErrBuf[64]; + epicsSocketConvertErrnoToString ( + sockErrBuf, sizeof ( sockErrBuf ) ); + LOG(logLevelDebug, + "TCP socket to %s failed to shutdown: %s.", + inetAddressToString(_socketAddress).c_str(), sockErrBuf); + } + */ + epicsSocketDestroy ( _channel ); + } + break; + case esscimqi_socketSigAlarmRequired: + // not supported anymore anyway + default: + epicsSocketDestroy(_channel); + } + } + + Transport::shared_pointer thisSharedPtr = this->shared_from_this(); + _context->getTransportRegistry()->remove(thisSharedPtr); // TODO sync if (_securitySession) @@ -1096,13 +1126,23 @@ void BlockingTCPTransportCodec::start() { void BlockingTCPTransportCodec::receiveThread() { - Transport::shared_pointer ptr = this->shared_from_this(); + /* This innocuous ref. is an important hack. + * The code behind Transport::close() will cause + * channels and operations to drop references + * to this transport. This ref. keeps it from + * being destroyed way down the call stack, from + * which it is apparently not possible to return + * safely. Rather than try to untangle this + * knot, just keep this ref... + */ + Transport::shared_pointer ptr(this->shared_from_this()); while (this->isOpen()) { try { this->processRead(); } catch (std::exception &e) { + PRINT_EXCEPTION(e); LOG(logLevelError, "an exception caught while in receiveThread at %s:%d: %s", __FILE__, __LINE__, e.what()); @@ -1112,14 +1152,13 @@ void BlockingTCPTransportCodec::receiveThread() __FILE__, __LINE__); } } - - this->_shutdownEvent.signal(); } void BlockingTCPTransportCodec::sendThread() { - Transport::shared_pointer ptr = this->shared_from_this(); + // cf. the comment in receiveThread() + Transport::shared_pointer ptr(this->shared_from_this()); this->setSenderThread(); @@ -1130,6 +1169,7 @@ void BlockingTCPTransportCodec::sendThread() } catch (connection_closed_exception &cce) { // noop } catch (std::exception &e) { + PRINT_EXCEPTION(e); LOG(logLevelWarn, "an exception caught while in sendThread at %s:%d: %s", __FILE__, __LINE__, e.what()); @@ -1204,47 +1244,6 @@ BlockingTCPTransportCodec::BlockingTCPTransportCodec(bool serverFlag, const Cont } -// must be called only once, when there will be no operation on socket (e.g. just before tx/rx thread exists) -void BlockingTCPTransportCodec::internalDestroy() { - - if(_channel != INVALID_SOCKET) { - - epicsSocketSystemCallInterruptMechanismQueryInfo info = - epicsSocketSystemCallInterruptMechanismQuery (); - switch ( info ) - { - case esscimqi_socketCloseRequired: - epicsSocketDestroy ( _channel ); - break; - case esscimqi_socketBothShutdownRequired: - { - /*int status =*/ ::shutdown ( _channel, SHUT_RDWR ); - /* - if ( status ) { - char sockErrBuf[64]; - epicsSocketConvertErrnoToString ( - sockErrBuf, sizeof ( sockErrBuf ) ); - LOG(logLevelDebug, - "TCP socket to %s failed to shutdown: %s.", - inetAddressToString(_socketAddress).c_str(), sockErrBuf); - } - */ - epicsSocketDestroy ( _channel ); - } - break; - case esscimqi_socketSigAlarmRequired: - // not supported anymore anyway - default: - epicsSocketDestroy(_channel); - } - - _channel = INVALID_SOCKET; //TODO: mutex to guard _channel - } - - Transport::shared_pointer thisSharedPtr = this->shared_from_this(); - _context->getTransportRegistry()->remove(thisSharedPtr); -} - void BlockingTCPTransportCodec::invalidDataStreamHandler() { close(); @@ -1529,12 +1528,11 @@ void BlockingServerTCPTransportCodec::send(ByteBuffer* buffer, buffer->putShort(0x7FFF); // list of authNZ plugin names - map& securityPlugins = _context->getSecurityPlugins(); + const Context::securityPlugins_t& securityPlugins = _context->getSecurityPlugins(); vector validSPNames; validSPNames.reserve(securityPlugins.size()); - for (map::const_iterator iter = - securityPlugins.begin(); + for (Context::securityPlugins_t::const_iterator iter(securityPlugins.begin()); iter != securityPlugins.end(); iter++) { SecurityPlugin::shared_pointer securityPlugin = iter->second; @@ -1593,9 +1591,9 @@ void BlockingServerTCPTransportCodec::destroyAllChannels() { it->second->destroy(); } -void BlockingServerTCPTransportCodec::internalClose(bool force) { +void BlockingServerTCPTransportCodec::internalClose() { Transport::shared_pointer thisSharedPtr = shared_from_this(); - BlockingTCPTransportCodec::internalClose(force); + BlockingTCPTransportCodec::internalClose(); destroyAllChannels(); } @@ -1627,8 +1625,7 @@ void BlockingServerTCPTransportCodec::authNZInitialize(const std::string& securi // check if plug-in name is valid SecurityPlugin::shared_pointer securityPlugin; - map::iterator spIter = - _context->getSecurityPlugins().find(securityPluginName); + Context::securityPlugins_t::const_iterator spIter(_context->getSecurityPlugins().find(securityPluginName)); if (spIter != _context->getSecurityPlugins().end()) securityPlugin = spIter->second; if (!securityPlugin) @@ -1779,24 +1776,15 @@ bool BlockingClientTCPTransportCodec::acquire(ClientChannelImpl::shared_pointer } // _mutex is held when this method is called -void BlockingClientTCPTransportCodec::internalClose(bool forced) { - BlockingTCPTransportCodec::internalClose(forced); +void BlockingClientTCPTransportCodec::internalClose() { + BlockingTCPTransportCodec::internalClose(); TimerCallbackPtr tcb = std::tr1::dynamic_pointer_cast(shared_from_this()); _context->getTimer()->cancel(tcb); -} - -void BlockingClientTCPTransportCodec::internalPostClose(bool forced) { - BlockingTCPTransportCodec::internalPostClose(forced); // _owners cannot change when transport is closed - closedNotifyClients(); -} -/** - * Notifies clients about disconnect. - */ -void BlockingClientTCPTransportCodec::closedNotifyClients() { + // Notifies clients about disconnect. // check if still acquired size_t refs = _owners.size(); @@ -1937,13 +1925,12 @@ void BlockingClientTCPTransportCodec::authNZInitialize(const std::vector& availableSecurityPlugins = - _context->getSecurityPlugins(); + const Context::securityPlugins_t& availableSecurityPlugins(_context->getSecurityPlugins()); for (vector::const_iterator offeredSP = offeredSecurityPlugins.begin(); offeredSP != offeredSecurityPlugins.end(); offeredSP++) { - map::iterator spi = availableSecurityPlugins.find(*offeredSP); + Context::securityPlugins_t::const_iterator spi(availableSecurityPlugins.find(*offeredSP)); if (spi != availableSecurityPlugins.end()) { SecurityPlugin::shared_pointer securityPlugin = spi->second; diff --git a/src/remote/pv/blockingUDP.h b/src/remote/pv/blockingUDP.h index 143e1c7..6cec9ac 100644 --- a/src/remote/pv/blockingUDP.h +++ b/src/remote/pv/blockingUDP.h @@ -49,6 +49,8 @@ class BlockingUDPTransport : public epics::pvData::NoDefaultMethods, public: POINTER_DEFINITIONS(BlockingUDPTransport); + static size_t num_instances; + private: std::tr1::weak_ptr internal_this; friend class BlockingUDPConnector; diff --git a/src/remote/pv/channelSearchManager.h b/src/remote/pv/channelSearchManager.h index 65c51e2..43b53f9 100644 --- a/src/remote/pv/channelSearchManager.h +++ b/src/remote/pv/channelSearchManager.h @@ -20,6 +20,7 @@ #endif #include +#include namespace epics { namespace pvAccess { @@ -31,11 +32,11 @@ public: /** * Destructor */ - virtual ~SearchInstance() {}; + virtual ~SearchInstance() {} virtual pvAccessID getSearchInstanceID() = 0; - virtual std::string getSearchInstanceName() = 0; + virtual const std::string& getSearchInstanceName() = 0; virtual int32_t& getUserValue() = 0; @@ -49,34 +50,34 @@ public: virtual void searchResponse(const ServerGUID & guid, int8_t minorRevision, osiSockAddr* serverAddress) = 0; }; -class ChannelSearchManager { + +class ChannelSearchManager : + public epics::pvData::TimerCallback, + public std::tr1::enable_shared_from_this +{ public: POINTER_DEFINITIONS(ChannelSearchManager); + virtual ~ChannelSearchManager(); /** - * Destructor + * Cancel. */ - virtual ~ChannelSearchManager() {}; - + void cancel(); /** * Get number of registered channels. * @return number of registered channels. */ - virtual int32_t registeredCount() = 0; - + int32_t registeredCount(); /** * Register channel. - * @param channel + * @param channel to register. */ - virtual void registerSearchInstance(SearchInstance::shared_pointer const & channel, bool penalize = false) = 0; - - + void registerSearchInstance(SearchInstance::shared_pointer const & channel, bool penalize = false); /** * Unregister channel. - * @param channel + * @param channel to unregister. */ - virtual void unregisterSearchInstance(SearchInstance::shared_pointer const & channel) = 0; - + void unregisterSearchInstance(SearchInstance::shared_pointer const & channel); /** * Search response from server (channel found). * @param guid server GUID. @@ -85,19 +86,105 @@ public: * @param minorRevision server minor PVA revision. * @param serverAddress server address. */ - virtual void searchResponse(const ServerGUID & guid, pvAccessID cid, int32_t seqNo, int8_t minorRevision, osiSockAddr* serverAddress) = 0; - + void searchResponse(const ServerGUID & guid, pvAccessID cid, int32_t seqNo, int8_t minorRevision, osiSockAddr* serverAddress); /** * New server detected. * Boost searching of all channels. */ - virtual void newServerDetected() = 0; + void newServerDetected(); + + /// Timer callback. + virtual void callback() OVERRIDE FINAL; + + /// Timer stooped callback. + virtual void timerStopped() OVERRIDE FINAL; /** - * Cancel. + * Private constructor. + * @param context */ - virtual void cancel() = 0; + ChannelSearchManager(Context::shared_pointer const & context); + void activate(); +private: + + bool generateSearchRequestMessage(SearchInstance::shared_pointer const & channel, bool allowNewFrame, bool flush); + + static bool generateSearchRequestMessage(SearchInstance::shared_pointer const & channel, + epics::pvData::ByteBuffer* byteBuffer, TransportSendControl* control); + + void boost(); + + void initializeSendBuffer(); + void flushSendBuffer(); + + static bool isPowerOfTwo(int32_t x); + + /** + * Context. + */ + Context::weak_pointer m_context; + + /** + * Response address. + */ + osiSockAddr m_responseAddress; + + /** + * Canceled flag. + */ + AtomicBoolean m_canceled; + + /** + * Search (datagram) sequence number. + */ + int32_t m_sequenceNumber; + + /** + * Send byte buffer (frame) + */ + epics::pvData::ByteBuffer m_sendBuffer; + + /** + * Set of registered channels. + */ + typedef std::map m_channels_t; + m_channels_t m_channels; + + /** + * Time of last frame send. + */ + int64_t m_lastTimeSent; + + /** + * This instance mutex. + */ + epics::pvData::Mutex m_channelMutex; + + /** + * User value lock. + */ + epics::pvData::Mutex m_userValueMutex; + + /** + * m_channels mutex. + */ + epics::pvData::Mutex m_mutex; + + static const int DATA_COUNT_POSITION; + static const int CAST_POSITION; + static const int PAYLOAD_POSITION; + + static const double ATOMIC_PERIOD; + static const int PERIOD_JITTER_MS; + + static const int DEFAULT_USER_VALUE; + static const int BOOST_VALUE; + static const int MAX_COUNT_VALUE; + static const int MAX_FALLBACK_COUNT_VALUE; + + static const int MAX_FRAMES_AT_ONCE; + static const int DELAY_BETWEEN_FRAMES_MS; }; } diff --git a/src/remote/pv/codec.h b/src/remote/pv/codec.h index 631f421..c1563af 100644 --- a/src/remote/pv/codec.h +++ b/src/remote/pv/codec.h @@ -338,9 +338,6 @@ public: return std::string("tcp"); } - - void internalDestroy(); - virtual void processControlMessage() OVERRIDE FINAL { if (_command == 2) { @@ -452,25 +449,16 @@ protected: virtual void sendBufferFull(int tries) OVERRIDE FINAL; /** - * Called to any resources just before closing transport - * @param[in] force flag indicating if forced (e.g. forced - * disconnect) is required + * Called from close(). after start of shutdown (isOpen()==false) + * but before worker thread shutdown. */ - virtual void internalClose(bool force); - - /** - * Called to any resources just after closing transport and without any locks held on transport - * @param[in] force flag indicating if forced (e.g. forced - * disconnect) is required - */ - virtual void internalPostClose(bool force) {} + virtual void internalClose(); private: AtomicValue _isOpen; epics::pvData::Thread _readThread, _sendThread; - epics::pvData::Event _shutdownEvent; + const SOCKET _channel; protected: - SOCKET _channel; osiSockAddr _socketAddress; std::string _socketName; protected: @@ -589,7 +577,7 @@ public: protected: void destroyAllChannels(); - virtual void internalClose(bool force) OVERRIDE FINAL; + virtual void internalClose() OVERRIDE FINAL; private: @@ -689,8 +677,7 @@ public: protected: - virtual void internalClose(bool force) OVERRIDE FINAL; - virtual void internalPostClose(bool force) OVERRIDE FINAL; + virtual void internalClose() OVERRIDE FINAL; private: diff --git a/src/remote/pv/remote.h b/src/remote/pv/remote.h index 2640067..dcfc378 100644 --- a/src/remote/pv/remote.h +++ b/src/remote/pv/remote.h @@ -317,11 +317,12 @@ public: virtual Configuration::const_shared_pointer getConfiguration() = 0; + typedef std::map > securityPlugins_t; /** * Get map of available security plug-ins. * @return the map of available security plug-ins */ - virtual std::map >& getSecurityPlugins() = 0; + virtual const securityPlugins_t& getSecurityPlugins() = 0; /// diff --git a/src/remote/pv/security.h b/src/remote/pv/security.h index 072c955..9e95e39 100644 --- a/src/remote/pv/security.h +++ b/src/remote/pv/security.h @@ -188,6 +188,8 @@ public: * @param remoteAddress * @return a new session. * @throws SecurityException + * + * @warning a Ref. loop is created if the SecuritySession stores a pointer to 'control' */ // authentication must be done immediately when connection is established (timeout 3seconds), // later on authentication process can be repeated @@ -423,12 +425,14 @@ public: return thisInstance; } - std::map >& getClientSecurityPlugins() + typedef std::map > securityPlugins_t; + + securityPlugins_t& getClientSecurityPlugins() { return m_clientSecurityPlugins; } - std::map >& getServerSecurityPlugins() + securityPlugins_t& getServerSecurityPlugins() { return m_serverSecurityPlugins; } @@ -448,8 +452,8 @@ public: private: SecurityPluginRegistry(); - std::map > m_clientSecurityPlugins; - std::map > m_serverSecurityPlugins; + securityPlugins_t m_clientSecurityPlugins; + securityPlugins_t m_serverSecurityPlugins; }; } diff --git a/src/remote/pv/simpleChannelSearchManagerImpl.h b/src/remote/pv/simpleChannelSearchManagerImpl.h deleted file mode 100644 index 6a54818..0000000 --- a/src/remote/pv/simpleChannelSearchManagerImpl.h +++ /dev/null @@ -1,204 +0,0 @@ -/** - * Copyright - See the COPYRIGHT that is included with this distribution. - * pvAccessCPP is distributed subject to a Software License Agreement found - * in file LICENSE that is included with this distribution. - */ - -#ifndef SIMPLECHANNELSEARCHMANAGERIMPL_H -#define SIMPLECHANNELSEARCHMANAGERIMPL_H - -#ifdef epicsExportSharedSymbols -# define simpleChannelSearchManagerEpicsExportSharedSymbols -# undef epicsExportSharedSymbols -#endif - -#include -#include -#include - -#ifdef simpleChannelSearchManagerEpicsExportSharedSymbols -# define epicsExportSharedSymbols -# undef simpleChannelSearchManagerEpicsExportSharedSymbols -#endif - -#include -#include - -namespace epics { -namespace pvAccess { - - -class MockTransportSendControl: public TransportSendControl -{ -public: - void endMessage() {} - void flush(bool /*lastMessageCompleted*/) {} - void setRecipient(const osiSockAddr& /*sendTo*/) {} - void startMessage(epics::pvData::int8 /*command*/, std::size_t /*ensureCapacity*/, epics::pvData::int32 /*payloadSize*/) {} - void ensureBuffer(std::size_t /*size*/) {} - void alignBuffer(std::size_t /*alignment*/) {} - void flushSerializeBuffer() {} - void cachedSerialize(const std::tr1::shared_ptr& field, epics::pvData::ByteBuffer* buffer) - { - // no cache - field->serialize(buffer, this); - } - virtual bool directSerialize(epics::pvData::ByteBuffer* /*existingBuffer*/, const char* /*toSerialize*/, - std::size_t /*elementCount*/, std::size_t /*elementSize*/) - { - return false; - } -}; - - -class SimpleChannelSearchManagerImpl : - public ChannelSearchManager, - public epics::pvData::TimerCallback, - public std::tr1::enable_shared_from_this -{ -public: - POINTER_DEFINITIONS(SimpleChannelSearchManagerImpl); - - /** - * Constructor. - * @param context - */ - virtual ~SimpleChannelSearchManagerImpl(); - /** - * Cancel. - */ - void cancel(); - /** - * Get number of registered channels. - * @return number of registered channels. - */ - int32_t registeredCount(); - /** - * Register channel. - * @param channel to register. - */ - void registerSearchInstance(SearchInstance::shared_pointer const & channel, bool penalize = false); - /** - * Unregister channel. - * @param channel to unregister. - */ - void unregisterSearchInstance(SearchInstance::shared_pointer const & channel); - /** - * Search response from server (channel found). - * @param guid server GUID. - * @param cid client channel ID. - * @param seqNo search sequence number. - * @param minorRevision server minor PVA revision. - * @param serverAddress server address. - */ - void searchResponse(const ServerGUID & guid, pvAccessID cid, int32_t seqNo, int8_t minorRevision, osiSockAddr* serverAddress); - /** - * New server detected. - * Boost searching of all channels. - */ - void newServerDetected(); - - /// Timer callback. - void callback(); - - /// Timer stooped callback. - void timerStopped(); - - /** - * Private constructor. - * @param context - */ - SimpleChannelSearchManagerImpl(Context::shared_pointer const & context); - void activate(); - -private: - - bool generateSearchRequestMessage(SearchInstance::shared_pointer const & channel, bool allowNewFrame, bool flush); - - static bool generateSearchRequestMessage(SearchInstance::shared_pointer const & channel, - epics::pvData::ByteBuffer* byteBuffer, TransportSendControl* control); - - void boost(); - - void initializeSendBuffer(); - void flushSendBuffer(); - - static bool isPowerOfTwo(int32_t x); - - /** - * Context. - */ - Context::weak_pointer m_context; - - /** - * Response address. - */ - osiSockAddr m_responseAddress; - - /** - * Canceled flag. - */ - AtomicBoolean m_canceled; - - /** - * Search (datagram) sequence number. - */ - int32_t m_sequenceNumber; - - /** - * Send byte buffer (frame) - */ - epics::pvData::ByteBuffer m_sendBuffer; - - /** - * Set of registered channels. - */ - typedef std::map m_channels_t; - m_channels_t m_channels; - - /** - * Time of last frame send. - */ - int64_t m_lastTimeSent; - - /** - * Mock transport send control - */ - MockTransportSendControl m_mockTransportSendControl; - - /** - * This instance mutex. - */ - epics::pvData::Mutex m_channelMutex; - - /** - * User value lock. - */ - epics::pvData::Mutex m_userValueMutex; - - /** - * m_channels mutex. - */ - epics::pvData::Mutex m_mutex; - - static const int DATA_COUNT_POSITION; - static const int CAST_POSITION; - static const int PAYLOAD_POSITION; - - static const double ATOMIC_PERIOD; - static const int PERIOD_JITTER_MS; - - static const int DEFAULT_USER_VALUE; - static const int BOOST_VALUE; - static const int MAX_COUNT_VALUE; - static const int MAX_FALLBACK_COUNT_VALUE; - - static const int MAX_FRAMES_AT_ONCE; - static const int DELAY_BETWEEN_FRAMES_MS; - -}; - -} -} - -#endif /* SIMPLECHANNELSEARCHMANAGERIMPL_H */ diff --git a/src/remoteClient/clientContextImpl.cpp b/src/remoteClient/clientContextImpl.cpp index 473b12e..8c4ea3b 100644 --- a/src/remoteClient/clientContextImpl.cpp +++ b/src/remoteClient/clientContextImpl.cpp @@ -11,6 +11,8 @@ #include #include +#include + #include #include #include @@ -28,7 +30,7 @@ #include #include #include -#include +#include #include #include #include @@ -915,6 +917,7 @@ public: } } + // TODO: m_structure and m_bitSet guarded by m_structureMutex? (as below) if (!(*m_structure->getStructure() == *pvPutStructure->getStructure())) { EXCEPTION_GUARD3(m_callback, cb, cb->putDone(invalidPutStructureStatus, thisPtr)); @@ -933,10 +936,11 @@ public: } try { - lock(); - *m_bitSet = *pvPutBitSet; - m_structure->copyUnchecked(*pvPutStructure, *m_bitSet); - unlock(); + { + epicsGuard G(*this); + *m_bitSet = *pvPutBitSet; + m_structure->copyUnchecked(*pvPutStructure, *m_bitSet); + } m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { abortRequest(); @@ -1182,10 +1186,11 @@ public: } try { - lock(); - *m_putDataBitSet = *bitSet; - m_putData->copyUnchecked(*pvPutStructure, *m_putDataBitSet); - unlock(); + { + epicsGuard G(*this); + *m_putDataBitSet = *bitSet; + m_putData->copyUnchecked(*pvPutStructure, *m_putDataBitSet); + } m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { abortRequest(); @@ -1420,9 +1425,10 @@ public: } try { - m_structureMutex.lock(); - m_structure = pvArgument; - m_structureMutex.unlock(); + { + epicsGuard G(m_structureMutex); + m_structure = pvArgument; + } m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); } catch (std::runtime_error &rte) { @@ -1844,9 +1850,6 @@ private: MonitorElement::shared_pointer m_overrunElement; bool m_overrunInProgress; - - MonitorElement::shared_pointer m_nullMonitorElement; - PVStructure::shared_pointer m_up2datePVStructure; int32 m_releasedCount; @@ -1872,7 +1875,6 @@ public: m_monitorQueue(), m_callback(callback), m_mutex(), m_bitSet1(), m_bitSet2(), m_overrunInProgress(false), - m_nullMonitorElement(), m_releasedCount(0), m_reportQueueStateInProgress(false), m_channel(channel), m_ioid(ioid), @@ -2007,10 +2009,10 @@ public: guard.unlock(); EXCEPTION_GUARD3(m_callback, cb, cb->unlisten(shared_from_this())); } - return m_nullMonitorElement; + return MonitorElement::shared_pointer(); } - MonitorElement::shared_pointer retVal = m_monitorQueue.front(); + MonitorElement::shared_pointer retVal(m_monitorQueue.front()); m_monitorQueue.pop(); return retVal; } @@ -2055,14 +2057,18 @@ public: if (sendAck) { + guard.unlock(); + try { m_channel->checkAndGetTransport()->enqueueSendRequest(shared_from_this()); } catch (std::runtime_error&) { // assume wrong connection state from checkAndGetTransport() + guard.lock(); m_reportQueueStateInProgress = false; } catch (std::exception& e) { LOG(logLevelWarn, "Ignore exception during MonitorStrategyQueue::release: %s", e.what()); + guard.lock(); m_reportQueueStateInProgress = false; } } @@ -2387,12 +2393,19 @@ public: if (!startRequest(QOS_PROCESS | QOS_GET)) return BaseRequestImpl::otherRequestPendingStatus; + bool restore = m_started; + m_started = true; + + guard.unlock(); + try { m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); - m_started = true; return Status::Ok; } catch (std::runtime_error &rte) { + guard.lock(); + + m_started = restore; abortRequest(); return BaseRequestImpl::channelNotConnected; } @@ -2413,12 +2426,19 @@ public: if (!startRequest(QOS_PROCESS)) return BaseRequestImpl::otherRequestPendingStatus; + bool restore = m_started; + m_started = false; + + guard.unlock(); + try { m_channel->checkAndGetTransport()->enqueueSendRequest(internal_from_this()); - m_started = false; return Status::Ok; } catch (std::runtime_error &rte) { + guard.lock(); + + m_started = restore; abortRequest(); return BaseRequestImpl::channelNotConnected; } @@ -2446,7 +2466,7 @@ public: class AbstractClientResponseHandler : public ResponseHandler { protected: - ClientContextImpl::weak_pointer _context; + const ClientContextImpl::weak_pointer _context; public: AbstractClientResponseHandler(ClientContextImpl::shared_pointer const & context, string const & description) : ResponseHandler(context.get(), description), _context(ClientContextImpl::weak_pointer(context)) { @@ -3285,7 +3305,44 @@ private: virtual void destroy() OVERRIDE FINAL { - destroy(false); + // Hack. Prevent Transport from being dtor'd while m_channelMutex is held + Transport::shared_pointer old_transport; + { + Lock guard(m_channelMutex); + if (m_connectionState == DESTROYED) + return; + REFTRACE_DECREMENT(num_active); + + old_transport = m_transport; + + m_getfield.reset(); + + // stop searching... + shared_pointer thisChannelPointer = internal_from_this(); + m_context->getChannelSearchManager()->unregisterSearchInstance(thisChannelPointer); + + disconnectPendingIO(true); + + if (m_connectionState == CONNECTED) + { + disconnect(false, true); + } + else if (m_transport) + { + // unresponsive state, do not forget to release transport + m_transport->release(getID()); + m_transport.reset(); + } + + + setConnectionState(DESTROYED); + + // unregister + m_context->unregisterChannel(thisChannelPointer); + } + + // should be called without any lock hold + reportChannelStateChange(); } virtual string getRequesterName() OVERRIDE FINAL @@ -3355,7 +3412,7 @@ public: return m_channelID; } - virtual string getSearchInstanceName() OVERRIDE FINAL { + virtual const string& getSearchInstanceName() OVERRIDE FINAL { return m_name; } @@ -3388,7 +3445,11 @@ public: void disconnect() { { + // Hack. Prevent Transport from being dtor'd while m_channelMutex is held + Transport::shared_pointer old_transport; Lock guard(m_channelMutex); + old_transport = m_transport; + // if not destroyed... if (m_connectionState == DESTROYED) throw std::runtime_error("Channel destroyed."); @@ -3400,42 +3461,6 @@ public: reportChannelStateChange(); } - /** - * Create a channel, i.e. submit create channel request to the server. - * This method is called after search is complete. - * @param transport - */ - void createChannel(Transport::shared_pointer const & transport) - { - Lock guard(m_channelMutex); - - // do not allow duplicate creation to the same transport - if (!m_allowCreation) - return; - m_allowCreation = false; - - // check existing transport - if (m_transport.get() && m_transport.get() != transport.get()) - { - disconnectPendingIO(false); - - m_transport->release(getID()); - } - else if (m_transport.get() == transport.get()) - { - // request to sent create request to same transport, ignore - // this happens when server is slower (processing search requests) than client generating it - return; - } - - m_transport = transport; - m_transport->enqueueSendRequest(internal_from_this()); - } - - virtual void cancel() { - // noop - } - virtual void timeout() { createChannelFailed(); } @@ -3445,15 +3470,14 @@ public: */ virtual void createChannelFailed() OVERRIDE FINAL { + // Hack. Prevent Transport from being dtor'd while m_channelMutex is held + Transport::shared_pointer old_transport; Lock guard(m_channelMutex); - - cancel(); - // release transport if active if (m_transport) { m_transport->release(getID()); - m_transport.reset(); + old_transport.swap(m_transport); } // ... and search again, with penalty @@ -3476,7 +3500,6 @@ public: if (m_connectionState == DESTROYED) { // end connection request - cancel(); return; } @@ -3496,71 +3519,6 @@ public: LOG(logLevelError, "connectionCompleted() %d '%s' unhandled exception: %s\n", sid, m_name.c_str(), e.what()); // noop } - - // NOTE: always call cancel - // end connection request - cancel(); - } - - // should be called without any lock hold - reportChannelStateChange(); - } - - /** - * @param force force destruction regardless of reference count (not used now) - */ - void destroy(bool force) { - { - Lock guard(m_channelMutex); - if (m_connectionState == DESTROYED) - return; - //throw std::runtime_error("Channel already destroyed."); - } - REFTRACE_DECREMENT(num_active); - - destroyChannel(force); - } - - - /** - * Actual destroy method, to be called CAJContext. - * @param force force destruction regardless of reference count - * @throws PVAException - * @throws std::runtime_error - * @throws IOException - */ - void destroyChannel(bool /*force*/) OVERRIDE FINAL { - { - Lock guard(m_channelMutex); - - if (m_connectionState == DESTROYED) - throw std::runtime_error("Channel already destroyed."); - - m_getfield.reset(); - - // stop searching... - shared_pointer thisChannelPointer = internal_from_this(); - m_context->getChannelSearchManager()->unregisterSearchInstance(thisChannelPointer); - cancel(); - - disconnectPendingIO(true); - - if (m_connectionState == CONNECTED) - { - disconnect(false, true); - } - else if (m_transport) - { - // unresponsive state, do not forget to release transport - m_transport->release(getID()); - m_transport.reset(); - } - - - setConnectionState(DESTROYED); - - // unregister - m_context->unregisterChannel(thisChannelPointer); } // should be called without any lock hold @@ -3584,7 +3542,6 @@ public: if (!initiateSearch) { // stop searching... m_context->getChannelSearchManager()->unregisterSearchInstance(internal_from_this()); - cancel(); } setConnectionState(DISCONNECTED); @@ -3660,9 +3617,12 @@ public: } virtual void searchResponse(const ServerGUID & guid, int8 minorRevision, osiSockAddr* serverAddress) OVERRIDE FINAL { + // Hack. Prevent Transport from being dtor'd while m_channelMutex is held + Transport::shared_pointer old_transport; + Lock guard(m_channelMutex); - Transport::shared_pointer transport = m_transport; - if (transport.get()) + Transport::shared_pointer transport(m_transport); + if (transport) { // GUID check case: same server listening on different NIF @@ -3679,7 +3639,7 @@ public: // NOTE: this creates a new or acquires an existing transport (implies increases usage count) transport = m_context->getTransport(internal_from_this(), serverAddress, minorRevision, m_priority); - if (!transport.get()) + if (!transport) { createChannelFailed(); return; @@ -3690,7 +3650,34 @@ public: std::copy(guid.value, guid.value + 12, m_guid.value); // create channel - createChannel(transport); + { + Lock guard(m_channelMutex); + + // do not allow duplicate creation to the same transport + if (!m_allowCreation) + return; + m_allowCreation = false; + + // check existing transport + if (m_transport && m_transport.get() != transport.get()) + { + disconnectPendingIO(false); + + m_transport->release(getID()); + } + else if (m_transport.get() == transport.get()) + { + // request to sent create request to same transport, ignore + // this happens when server is slower (processing search requests) than client generating it + return; + } + + // rotate: transport -> m_transport -> old_transport -> + old_transport.swap(m_transport); + m_transport.swap(transport); + + m_transport->enqueueSendRequest(internal_from_this()); + } } virtual void transportClosed() OVERRIDE FINAL { @@ -4109,7 +4096,40 @@ public: m_contextState = CONTEXT_DESTROYED; } - internalDestroy(); + // + // cleanup + // + + m_timer->close(); + + m_channelSearchManager->cancel(); + + // this will also close all PVA transports + destroyAllChannels(); + + // stop UDPs + for (BlockingUDPTransportVector::const_iterator iter = m_udpTransports.begin(); + iter != m_udpTransports.end(); iter++) + (*iter)->close(); + m_udpTransports.clear(); + + // stop UDPs + if (m_searchTransport) + m_searchTransport->close(); + + // wait for all transports to cleanly exit + int tries = 40; + epics::pvData::int32 transportCount; + while ((transportCount = m_transportRegistry.size()) && tries--) + epicsThreadSleep(0.025); + + { + Lock guard(m_beaconMapMutex); + m_beaconHandlers.clear(); + } + + if (transportCount) + LOG(logLevelDebug, "PVA client context destroyed with %u transport(s) active.", (unsigned)transportCount); } virtual ~InternalClientContextImpl() @@ -4152,7 +4172,7 @@ private: // stores many weak_ptr m_responseHandler.reset(new ClientResponseHandler(thisPointer)); - m_channelSearchManager.reset(new SimpleChannelSearchManagerImpl(thisPointer)); + m_channelSearchManager.reset(new ChannelSearchManager(thisPointer)); // preinitialize security plugins SecurityPluginRegistry::instance(); @@ -4188,40 +4208,6 @@ private: // TODO what if initialization failed!!! } - void internalDestroy() { - - // - // cleanup - // - - // this will also close all PVA transports - destroyAllChannels(); - - // stop UDPs - for (BlockingUDPTransportVector::const_iterator iter = m_udpTransports.begin(); - iter != m_udpTransports.end(); iter++) - (*iter)->close(); - m_udpTransports.clear(); - - // stop UDPs - if (m_searchTransport) - m_searchTransport->close(); - - // wait for all transports to cleanly exit - int tries = 40; - epics::pvData::int32 transportCount; - while ((transportCount = m_transportRegistry.size()) && tries--) - epicsThreadSleep(0.025); - - { - Lock guard(m_beaconMapMutex); - m_beaconHandlers.clear(); - } - - if (transportCount) - LOG(logLevelDebug, "PVA client context destroyed with %u transport(s) active.", (unsigned)transportCount); - } - void destroyAllChannels() { Lock guard(m_cidMapMutex); @@ -4472,7 +4458,7 @@ private: } } - std::map >& getSecurityPlugins() OVERRIDE FINAL + const securityPlugins_t& getSecurityPlugins() OVERRIDE FINAL { return SecurityPluginRegistry::instance().getClientSecurityPlugins(); } @@ -4586,7 +4572,7 @@ private: * Channel search manager. * Manages UDP search requests. */ - SimpleChannelSearchManagerImpl::shared_pointer m_channelSearchManager; + ChannelSearchManager::shared_pointer m_channelSearchManager; /** * Beacon handler map. diff --git a/src/remoteClient/pv/clientContextImpl.h b/src/remoteClient/pv/clientContextImpl.h index 0ee0884..a003d65 100644 --- a/src/remoteClient/pv/clientContextImpl.h +++ b/src/remoteClient/pv/clientContextImpl.h @@ -41,7 +41,6 @@ public: POINTER_DEFINITIONS(ClientChannelImpl); virtual pvAccessID getChannelID() = 0; - virtual void destroyChannel(bool force) = 0; virtual void connectionCompleted(pvAccessID sid/*, rights*/) = 0; virtual void createChannelFailed() = 0; virtual ClientContextImpl* getContext() = 0; diff --git a/src/server/beaconEmitter.cpp b/src/server/beaconEmitter.cpp index b1b0ea4..9001085 100644 --- a/src/server/beaconEmitter.cpp +++ b/src/server/beaconEmitter.cpp @@ -109,12 +109,16 @@ void BeaconEmitter::timerStopped() void BeaconEmitter::destroy() { - _timer->cancel(shared_from_this()); + Timer::shared_pointer timer(_timer.lock()); + if(timer) + timer->cancel(shared_from_this()); } void BeaconEmitter::start() { - _timer->scheduleAfterDelay(shared_from_this(), 0.0); + Timer::shared_pointer timer(_timer.lock()); + if(timer) + timer->scheduleAfterDelay(shared_from_this(), 0.0); } void BeaconEmitter::reschedule() @@ -122,7 +126,9 @@ void BeaconEmitter::reschedule() const double period = (_beaconSequenceID >= _beaconCountLimit) ? _slowBeaconPeriod : _fastBeaconPeriod; if (period > 0) { - _timer->scheduleAfterDelay(shared_from_this(), period); + Timer::shared_pointer timer(_timer.lock()); + if(timer) + timer->scheduleAfterDelay(shared_from_this(), period); } } diff --git a/src/server/beaconServerStatusProvider.cpp b/src/server/beaconServerStatusProvider.cpp index ccebc05..c533c0b 100644 --- a/src/server/beaconServerStatusProvider.cpp +++ b/src/server/beaconServerStatusProvider.cpp @@ -13,40 +13,19 @@ using namespace epics::pvData; namespace epics { namespace pvAccess { -DefaultBeaconServerStatusProvider::DefaultBeaconServerStatusProvider(ServerContext::shared_pointer const & context): _context(context) -{ - initialize(); -} +DefaultBeaconServerStatusProvider::DefaultBeaconServerStatusProvider(ServerContext::shared_pointer const & context) + :_status(getPVDataCreate()->createPVStructure(getFieldCreate()->createFieldBuilder() + ->add("connections", pvInt) + ->add("connections", pvInt) + ->add("allocatedMemory", pvLong) + ->add("freeMemory", pvLong) + ->add("threads", pvInt) + ->add("deadlocks", pvInt) + ->add("averageSystemLoad", pvDouble) + ->createStructure())) +{} -DefaultBeaconServerStatusProvider::~DefaultBeaconServerStatusProvider() -{ -} - -void DefaultBeaconServerStatusProvider::initialize() -{ - FieldCreatePtr fieldCreate = getFieldCreate(); - - StringArray fieldNames; - fieldNames.resize(6); - fieldNames[0] = "connections"; - fieldNames[1] = "allocatedMemory"; - fieldNames[2] = "freeMemory"; - fieldNames[3] = "threads"; - fieldNames[4] = "deadlocks"; - fieldNames[5] = "averageSystemLoad"; - - FieldConstPtrArray fields; - fields.resize(6); - // TODO hierarchy can be used... - fields[0] = fieldCreate->createScalar(pvInt); - fields[1] = fieldCreate->createScalar(pvLong); - fields[2] = fieldCreate->createScalar(pvLong); - fields[3] = fieldCreate->createScalar(pvInt); - fields[4] = fieldCreate->createScalar(pvInt); - fields[5] = fieldCreate->createScalar(pvDouble); - - _status = getPVDataCreate()->createPVStructure(fieldCreate->createStructure(fieldNames, fields)); -} +DefaultBeaconServerStatusProvider::~DefaultBeaconServerStatusProvider() {} PVField::shared_pointer DefaultBeaconServerStatusProvider::getServerStatusData() { diff --git a/src/server/pv/beaconEmitter.h b/src/server/pv/beaconEmitter.h index 56ead21..cdc68d9 100644 --- a/src/server/pv/beaconEmitter.h +++ b/src/server/pv/beaconEmitter.h @@ -95,7 +95,7 @@ private: /** * Protocol. */ - std::string _protocol; + const std::string _protocol; /** * Transport. @@ -110,22 +110,22 @@ private: /** * Server GUID. */ - ServerGUID _guid; + const ServerGUID _guid; /** * Fast (at startup) beacon period (in sec). */ - double _fastBeaconPeriod; + const double _fastBeaconPeriod; /** * Slow (after beaconCountLimit is reached) beacon period (in sec). */ - double _slowBeaconPeriod; + const double _slowBeaconPeriod; /** * Limit on number of beacons issued. */ - epics::pvData::int16 _beaconCountLimit; + const epics::pvData::int16 _beaconCountLimit; /** * Server address. @@ -135,17 +135,18 @@ private: /** * Server port. */ - epics::pvData::int32 _serverPort; + const epics::pvData::int32 _serverPort; /** * Server status provider implementation (optional). */ BeaconServerStatusProvider::shared_pointer _serverStatusProvider; - /** - * Timer. + /** Timer is referenced by server context, which also references us. + * We will also be queuing ourselves, and be referenced by Timer. + * So keep only a weak ref to Timer to avoid possible ref. loop. */ - epics::pvData::Timer::shared_pointer _timer; + epics::pvData::Timer::weak_pointer _timer; }; } diff --git a/src/server/pv/beaconServerStatusProvider.h b/src/server/pv/beaconServerStatusProvider.h index 0d681f8..e79925a 100644 --- a/src/server/pv/beaconServerStatusProvider.h +++ b/src/server/pv/beaconServerStatusProvider.h @@ -63,17 +63,8 @@ public: virtual epics::pvData::PVField::shared_pointer getServerStatusData(); -private: - /** - * Initialize - */ - void initialize(); - - private: epics::pvData::PVStructure::shared_pointer _status; - std::tr1::shared_ptr _context; - //ServerContext::shared_pointer _context; }; } diff --git a/src/server/pv/serverContextImpl.h b/src/server/pv/serverContextImpl.h index 168032f..b01349e 100644 --- a/src/server/pv/serverContextImpl.h +++ b/src/server/pv/serverContextImpl.h @@ -51,7 +51,7 @@ public: Transport::shared_pointer getSearchTransport() OVERRIDE FINAL; Configuration::const_shared_pointer getConfiguration() OVERRIDE FINAL; TransportRegistry* getTransportRegistry() OVERRIDE FINAL; - std::map >& getSecurityPlugins() OVERRIDE FINAL; + const securityPlugins_t& getSecurityPlugins() OVERRIDE FINAL; virtual void newServerDetected() OVERRIDE FINAL; @@ -259,11 +259,6 @@ private: */ void loadConfiguration(); - /** - * Destroy all transports. - */ - void destroyAllTransports(); - Configuration::const_shared_pointer configuration; epicsTimeStamp _startTime; diff --git a/src/server/serverContext.cpp b/src/server/serverContext.cpp index 3ba9364..aa72bbe 100644 --- a/src/server/serverContext.cpp +++ b/src/server/serverContext.cpp @@ -39,7 +39,7 @@ ServerContextImpl::ServerContextImpl(): _broadcastPort(PVA_BROADCAST_PORT), _serverPort(PVA_SERVER_PORT), _receiveBufferSize(MAX_TCP_RECV), - _timer(new Timer("pvAccess-server timer", lowerPriority)), + _timer(new Timer("PVAS timers", lowerPriority)), _beaconEmitter(), _acceptor(), _transportRegistry(), @@ -307,6 +307,12 @@ void ServerContextImpl::run(uint32 seconds) void ServerContextImpl::shutdown() { + if(!_timer) + return; // already shutdown + + // abort pending timers and prevent new timers from starting + _timer->close(); + // stop responding to search requests for (BlockingUDPTransportVector::const_iterator iter = _udpTransports.begin(); iter != _udpTransports.end(); iter++) @@ -346,7 +352,7 @@ void ServerContextImpl::shutdown() } // this will also destroy all channels - destroyAllTransports(); + _transportRegistry.clear(); // drop timer queue LEAK_CHECK(_timer, "_timer") @@ -360,12 +366,6 @@ void ServerContextImpl::shutdown() _runEvent.signal(); } -void ServerContextImpl::destroyAllTransports() -{ - // now clear all (release) - _transportRegistry.clear(); -} - void ServerContext::printInfo(int lvl) { printInfo(cout, lvl); @@ -540,7 +540,7 @@ epicsTimeStamp& ServerContextImpl::getStartTime() } -std::map >& ServerContextImpl::getSecurityPlugins() +const Context::securityPlugins_t& ServerContextImpl::getSecurityPlugins() { return SecurityPluginRegistry::instance().getServerSecurityPlugins(); } diff --git a/testApp/remote/channelAccessIFTest.cpp b/testApp/remote/channelAccessIFTest.cpp index c95c2a2..3f09a30 100644 --- a/testApp/remote/channelAccessIFTest.cpp +++ b/testApp/remote/channelAccessIFTest.cpp @@ -628,9 +628,9 @@ void ChannelAccessIFTest::test_channelGetIntProcessInternal(Channel::shared_poin pvTimeStamp.get(timeStamp); double deltaT = TimeStamp::diff(timeStamp, previousTimestamp); - testOk((previousValue +1)/*%11*/ == value->get(), "%s: testing the counter value change", - testMethodName.c_str()); - testOk(deltaT > 0.9 && deltaT < 2.0, + testOk((previousValue +1) == value->get(), "%s: testing the counter value change %d == %d", + testMethodName.c_str(), previousValue +1, (int)value->get()); + testOk(deltaT > 0.1 && deltaT < 20.0, "%s: timestamp change was %g", testMethodName.c_str(), deltaT); } @@ -1608,9 +1608,9 @@ void ChannelAccessIFTest::test_channelPutGetIntProcess() { //cout << "Testing1:" << testValue << " == " << getValuePtr->get() << endl; //cout << "Testing2:" << timeStamp.getSecondsPastEpoch() << ">" << previousTimestampSec << endl; - testOk( testValue == getValuePtr->get(), "%s: testing the counter value change", - CURRENT_FUNCTION); - testOk(deltaT > 0.9 && deltaT < 2.0, + testOk( testValue == getValuePtr->get(), "%s: testing the counter value change %d == %d", + CURRENT_FUNCTION, testValue, (int)getValuePtr->get()); + testOk(deltaT > 0.1 && deltaT < 20.0, "%s: timestamp change is %g", CURRENT_FUNCTION, deltaT); }