From b3500464277cc02c202a798ac2ff166cfc7d63a4 Mon Sep 17 00:00:00 2001 From: Michael Davidsaver Date: Mon, 14 Dec 2015 14:14:45 -0500 Subject: [PATCH] todo: udprepeat --- src/utils/udprepeat.cpp | 172 ++++++++++++++++++++++++++++++++++++++++ src/utils/udprepeat.h | 54 +++++++++++++ 2 files changed, 226 insertions(+) create mode 100644 src/utils/udprepeat.cpp create mode 100644 src/utils/udprepeat.h diff --git a/src/utils/udprepeat.cpp b/src/utils/udprepeat.cpp new file mode 100644 index 0000000..5c4f3d8 --- /dev/null +++ b/src/utils/udprepeat.cpp @@ -0,0 +1,172 @@ + +#include +#include + +#include +#include + +#include + +#include "udprepeat.h" + +namespace pvd = epics::pvData; +namespace pva = epics::pvAccess; + +typedef epicsGuard Guard; +typedef epicsGuardRelease UnGuard; + +namespace { +static epicsMutex repeatlistlock; +typedef std::map repeaters_t; +static repeaters_t repeaters; + +struct Receiver : public pvd::Runnable +{ + const SOCKET sock; + pva::UDPFanout::Pvt * const pvt; + std::vector buf; + std::vector > recvs; + bool run; + pvd::Thread runner; + epicsEvent evt; + + Receiver(SOCKET sock, const std::string& name, pva::UDPFanout::Pvt *pvt) + :sock(sock), pvt(pvt), buf(8196), run(true) + ,runner(pvd::Thread::Config(this) + .name(name) + .prio(epicsThreadPriorityCAServerLow-2)) + { + evt.wait(); + } + ~Receiver() + { + epicsSocketDestroy(sock); + } + virtual void run() + { + evt.signal(); + Guard G(pvt->lock); + while(run) { + ssize_t ret; + { + UnGuard U(G); + ret = ::recv(sock, &buf[0], buf.size(), 0); + } + if(ret<0) { + int err = errno; + + } + } + } + void shutdown(); +}; + +} + +namespace epics {namespace pvAccess { + +struct UDPFanout::Pvt +{ + const unsigned port; + epicsMutex lock; + + UDPFanout::addr_list iface, bcase; + UDPFanout::name_list names; + + Pvt(unsigned port) + :port(port) + { + + } +}; + +}} // namespace + +namespace { + +void Receiver::shutdown() +{ + { + Guard G(pvt->lock); + run = false; + } + switch(epicsSocketSystemCallInterruptMechanismQuery()) + { + case esscimqi_socketBothShutdownRequired: + ::shutdown(sock, SHUT_RDWR); + break; + default: + break; + } + runner.exitWait(); +} + +} + +namespace epics {namespace pvAccess { + + +UDPFanout::~UDPFanout() +{ + std::auto_ptr pvt(this->pvt); + + epicsGuard G(repeatlistlock); + + repeaters_t::iterator it = repeaters.find(pvt->port); + if(it!=repeaters.end()) { + UDPFanout::shared_pointer self(it->second.lock()); + if(self && self.get()==this) + repeaters.erase(it); + } +} + +void +UDPFanout::bind(const UDPReceiver::shared_pointer, const std::string& iname) +{ + +} + +void +UDPFanout::bind(const UDPReceiver::shared_pointer, const osiSockAddr& iface) +{ + +} + +void +UDPFanout::unbind(const UDPReceiver::shared_pointer) +{ + +} + +const UDPFanout::name_list& +UDPFanout::getNames() +{ + +} + +const UDPFanout::addr_list& +UDPFanout::getAddresses() +{ + +} + +UDPFanout::shared_pointer +UDPFanout::getFanoutPort(unsigned port) +{ + epicsGuard G(repeatlistlock); + + repeaters_t::const_iterator it = repeaters.find(port); + if(it!=repeaters.end()) { + UDPFanout::shared_pointer R(it->second.lock()); + if(R) + return R; + } + + std::auto_ptr pvt(new UDPFanout::Pvt(port)); + UDPFanout::shared_pointer ret(new UDPFanout(pvt.get())); + repeaters[port] = ret; + pvt.release(); + return ret; +} + +}} // namespace diff --git a/src/utils/udprepeat.h b/src/utils/udprepeat.h new file mode 100644 index 0000000..eb9aabe --- /dev/null +++ b/src/utils/udprepeat.h @@ -0,0 +1,54 @@ +#ifndef UDPREPEAT_H +#define UDPREPEAT_H + +#include +#include + +#include + +#include "byteBuffer.h" +#include "sharedPtr.h" + +#define epicsExportSharedSymbols + +namespace epics {namespace pvAccess { + +class UDPReceiver +{ + POINTER_DEFINITIONS(UDPReceiver); + + virtual ~UDPReceiver() {} + + virtual void recv(const osiSockAddr& src, + const char *buf, size_t buflen) =0; +}; + +class UDPFanout +{ + struct Pvt; + friend struct Pvt; + Pvt *pvt; + UDPFanout(Pvt *pvt) :pvt(pvt) {} + UDPFanout(const UDPFanout&); + UDPFanout& operator=(const UDPFanout&); +public: + POINTER_DEFINITIONS(UDPFanout); + + ~UDPFanout(); + + void bind(const UDPReceiver::shared_pointer, const std::string& iname); + void bind(const UDPReceiver::shared_pointer, const osiSockAddr& iface); + void unbind(const UDPReceiver::shared_pointer); + + typedef std::vector name_list; + typedef std::vector addr_list; + + const name_list& getNames(); + const addr_list& getAddresses(); + + static UDPFanout::shared_pointer getFanoutPort(unsigned port); +}; + +}} + +#endif // UDPREPEAT_H