respond to CreateChan

This commit is contained in:
Michael Davidsaver
2019-11-08 10:52:50 -08:00
parent e1b8923f33
commit 946e557960
8 changed files with 196 additions and 11 deletions
+135 -2
View File
@@ -6,8 +6,10 @@
#include <limits>
#include <system_error>
#include <utility>
#include <osiSock.h>
#include <epicsGuard.h>
#include <epicsAssert.h>
#include <pvxs/log.h>
@@ -33,6 +35,7 @@ ServerConn::ServerConn(ServIface* iface, evutil_socket_t sock, struct sockaddr *
,segCmd(0xff)
,segBuf(evbuffer_new())
,txBody(evbuffer_new())
,nextSID(0)
{
bufferevent_setcb(bev.get(), &bevReadS, &bevWriteS, &bevEventS, this);
// initially wait for at least a header
@@ -164,10 +167,140 @@ void ServerConn::handle_Search()
{}
void ServerConn::handle_CreateChan()
{}
{
const bool be = EPICS_BYTE_ORDER==EPICS_ENDIAN_BIG;
(void)evbuffer_drain(txBody.get(), evbuffer_get_length(txBody.get()));
EvInBuf M(peerBE, segBuf.get(), 16);
epicsGuard<RWLock::Reader> G(iface->server->sourcesLock.reader()); // could move this into loop, only guards server->sources
uint16_t count = 0;
from_wire(M, count);
for(auto i : range(count)) {
(void)i;
uint32_t cid = -1, sid = -1;
server::Source::Create op{peerName};
from_wire(M, cid);
from_wire(M, op.name);
if(!M.good())
break;
Status sts{Status::Ok};
bool claimed = false;
try {
if(chanByCID.size()==0xffffffff || chanBySID.size()==0xffffffff) {
sts.code = Status::Error;
sts.msg = "Too many Server channels";
sts.trace = "pvx:serv:chanidoverflow:";
}
if(sts.isSuccess() && chanByCID.find(cid)!=chanByCID.end()) {
sts.code = Status::Fatal;
sts.msg = "Client reuses existing CID";
sts.trace = "pvx:serv:dupcid:";
}
std::unique_ptr<server::Handler> handler;
if(sts.isSuccess() && !op.name.empty()) {
for(auto& pair : iface->server->sources) {
try {
handler = pair.second->onCreate(op);
if(handler)
break;
}catch(std::exception& e){
log_printf(connsetup, PLVL_ERR, "Unhandled error in onCreate %s,%d %s : %s\n",
pair.first.second.c_str(), pair.first.first,
typeid(&e).name(), e.what());
}
}
}
if(sts.isSuccess() && handler) {
do {
sid = nextSID++;
} while(chanBySID.find(sid)!=chanBySID.end());
auto pair = chanBySID.emplace(std::piecewise_construct,
std::make_tuple(sid),
std::make_tuple(this, sid, cid, op.name, std::move(handler)));
auto pair2 = chanByCID.emplace(cid, &pair.first->second);
assert(!!pair.second && !!pair2.second); // we've already checked for a duplicate
claimed = true;
}
}catch(std::exception& e){
log_printf(connsetup, PLVL_ERR, "Unhandled error in onCreate %s : %s\n",
typeid(&e).name(), e.what());
sts.code = Status::Fatal;
sts.msg = e.what();
sts.trace = "pvx:serv:internal:";
}
{
if(sts.isSuccess() && !claimed) {
sts.code = Status::Fatal;
sts.msg = "Unable to create Channel";
sts.trace = "pvx:serv:nosource:";
}
EvOutBuf R(be, txBody.get());
to_wire(R, cid);
to_wire(R, sid);
to_wire(R, sts);
// "spec" calls for uint16_t Access Rights here, but pvAccessCPP don't include this (it's useless anyway)
if(!R.good()) {
M.fault();
log_printf(connio, PLVL_ERR, "Encode error in CreateChan\n");
break;
}
}
auto tx = bufferevent_get_output(bev.get());
to_evbuf(tx, Header{pva_app_msg::CreateChan,
pva_flags::Server,
uint32_t(evbuffer_get_length(txBody.get()))},
be);
auto err = evbuffer_add_buffer(tx, txBody.get());
assert(!err);
}
if(!M.good()) {
log_printf(connio, PLVL_ERR, "Decode error in CreateChan\n");
bev.reset();
}
}
void ServerConn::handle_DestroyChan()
{}
{
EvInBuf M(peerBE, segBuf.get(), 16);
uint32_t sid=-1, cid=-1;
from_wire(M, sid);
from_wire(M, cid);
auto it = chanBySID.find(sid);
if(M.good() && it!=chanBySID.end()) {
auto& chan = it->second;
if(chan.cid!=cid) {
log_printf(connsetup, PLVL_DEBUG, "Client provides incorrect CID with DestroyChan sid=%d cid=%d!=%d '%s'\n",
unsigned(sid), unsigned(chan.cid), unsigned(cid), chan.name.c_str());
}
auto n = chanByCID.erase(chan.cid);
assert(n==1);
chanBySID.erase(it);
// ServerChannel is delete'd
} else {
log_printf(connsetup, PLVL_DEBUG, "Client DestroyChan non-existant sid=%d cid=%d\n",
unsigned(sid), unsigned(cid));
}
}
void ServerConn::handle_GetOp()
{}