Run local test

This commit is contained in:
Dominik Werder
2024-04-23 15:08:40 +02:00
parent 534df84d7d
commit c98c638381
10 changed files with 306 additions and 195 deletions

View File

@@ -42,6 +42,7 @@ const CHECK_HEALTH_TIMEOUT: Duration = Duration::from_millis(5000);
const PRINT_ACTIVE_INTERVAL: Duration = Duration::from_millis(60000);
const PRINT_STATUS_INTERVAL: Duration = Duration::from_millis(20000);
const CHECK_CHANNEL_SLOW_WARN: Duration = Duration::from_millis(500);
const RUN_WITHOUT_SCYLLA: bool = true;
pub struct DaemonOpts {
pgconf: Database,
@@ -637,16 +638,18 @@ pub async fn run(opts: CaIngestOpts, channels_config: Option<ChannelsConfig>) ->
drop(pg);
jh.await?.map_err(Error::from_string)?;
scywr::schema::migrate_scylla_data_schema(opts.scylla_config(), RetentionTime::Short)
.await
.map_err(Error::from_string)?;
if let Some(scyconf) = opts.scylla_config_lt() {
scywr::schema::migrate_scylla_data_schema(scyconf, RetentionTime::Long)
if RUN_WITHOUT_SCYLLA {
} else {
scywr::schema::migrate_scylla_data_schema(opts.scylla_config(), RetentionTime::Short)
.await
.map_err(Error::from_string)?;
}
if let Some(scyconf) = opts.scylla_config_lt() {
scywr::schema::migrate_scylla_data_schema(scyconf, RetentionTime::Long)
.await
.map_err(Error::from_string)?;
}
}
info!("database check done");
// TODO use a new stats type:

View File

@@ -173,6 +173,7 @@ pub async fn schema_check(pgc: &PgClient) -> Result<(), Error> {
Ok(())
}
#[allow(unused)]
fn ignore_does_not_exist<T>(x: Result<T, tokio_postgres::Error>) -> Result<(), tokio_postgres::Error> {
match x {
Ok(_) => Ok(()),

View File

@@ -1,3 +1,4 @@
pub mod beacons;
pub mod conn;
pub mod connset;
pub mod connset_input_merge;

View File

@@ -0,0 +1,45 @@
use bytes::Buf;
use err::thiserror;
use err::ThisError;
use log::*;
use std::io::Cursor;
use std::net::Ipv4Addr;
use taskrun::tokio::net::UdpSocket;
#[derive(Debug, ThisError)]
pub enum Error {
Io(#[from] std::io::Error),
}
pub async fn listen_beacons(mut cancel: taskrun::tokio::sync::mpsc::Receiver<u32>) -> Result<(), Error> {
let sock = UdpSocket::bind("0.0.0.0:5065").await?;
sock.set_broadcast(true).unwrap();
let mut buf = Vec::new();
buf.resize(1024 * 4, 0);
loop {
let bb = &mut buf;
let (n, remote) = taskrun::tokio::select! {
x = sock.recv_from(bb) => x,
_ = cancel.recv() => {
break;
}
}?;
if n != 16 {
debug!("len recv {n}");
}
if n >= 16 {
let mut cur = Cursor::new(bb);
let cmd = cur.get_u16();
let _ = cur.get_u16();
let ver = cur.get_u16();
let port = cur.get_u16();
let _seqid = cur.get_u32();
let addr = cur.get_u32();
let addr = Ipv4Addr::from(addr);
if cmd == 0x0d {
debug!("beacon {remote} {ver} {addr} {port}")
}
}
}
Ok(())
}

View File

@@ -17,7 +17,6 @@ use futures_util::FutureExt;
use futures_util::Stream;
use futures_util::StreamExt;
use hashbrown::HashMap;
use hashbrown::HashSet;
use log::*;
use netpod::timeunits::*;
use netpod::ScalarType;
@@ -74,6 +73,7 @@ const IOC_PING_IVL: Duration = Duration::from_millis(80000);
const DO_RATE_CHECK: bool = false;
const MONITOR_POLL_TIMEOUT: Duration = Duration::from_millis(3000);
const TIMEOUT_CHANNEL_CLOSING: Duration = Duration::from_millis(3000);
const TIMEOUT_MONITOR_PASSIVE: Duration = Duration::from_millis(3000);
#[allow(unused)]
macro_rules! trace2 {
@@ -109,7 +109,10 @@ pub enum Error {
IocIssue,
Protocol(#[from] crate::ca::proto::Error),
Writer(#[from] serieswriter::writer::Error),
// TODO remove false positive from ThisError derive
#[allow(private_interfaces)]
UnknownCid(Cid),
#[allow(private_interfaces)]
NoNameForCid(Cid),
CreateChannelBadState,
CommonError(#[from] err::Error),
@@ -121,6 +124,7 @@ pub enum Error {
ShutdownWithQueuesNoProgressNoPending,
Error,
DurationOutOfBounds,
NoFreeCid,
}
impl err::ToErr for Error {
@@ -261,9 +265,16 @@ struct ReadPendingState {
tsbeg: Instant,
}
#[derive(Debug, Clone)]
struct Monitoring2PassiveState {
// Holds instant when we entered this state. A receive of an event is considered a re-enter of the state,
// so the instant gets updated. Used for timeout check.
tsbeg: Instant,
}
#[derive(Debug, Clone)]
enum Monitoring2State {
Passive,
Passive(Monitoring2PassiveState),
ReadPending(Ioid, Instant),
}
@@ -288,7 +299,12 @@ struct PollingState {
#[derive(Debug, Clone)]
enum PollTickState {
// TODO use inner struct to give this Instant a name.
// When monitoring, update this ts on received events.
// It should hold the Instant when we entered this state, but a receive of some event
// is considered re-entering this state.
Idle(Instant),
// TODO use inner struct to give this Instant a name
Wait(Instant, Ioid),
}
@@ -500,15 +516,9 @@ impl fmt::Debug for CaConnState {
}
}
fn wait_fut(dt: u64) -> Pin<Box<dyn Future<Output = ()> + Send>> {
let fut = tokio::time::sleep(Duration::from_millis(dt));
Box::pin(fut)
}
struct CidStore {
cnt: u32,
rng: Xoshiro128PlusPlus,
reg: HashSet<u32>,
}
impl CidStore {
@@ -516,7 +526,6 @@ impl CidStore {
Self {
cnt: 0,
rng: Xoshiro128PlusPlus::seed_from_u64(seed as _),
reg: HashSet::new(),
}
}
@@ -898,66 +907,6 @@ impl CaConn {
self.proto = None;
}
fn cmd_check_health(&mut self) {
// TODO
// no longer in use.
// CaConn emits health updates by iteself.
// Make sure that we do also the checks here on regular intervals.
trace!("cmd_check_health");
// TODO
// what actions are taken here?
// what status is modified here?
match self.check_channels_alive() {
Ok(_) => {}
Err(e) => {
error!("{e}");
self.trigger_shutdown(ShutdownReason::InternalError);
}
}
// TODO
// Time this, is it fast enough?
// let mut kit = self.cid_by_name.values();
// if let Some(mut kk) = kit.next().map(Clone::clone) {
// let mut start = Some(kk.clone());
// if let Some(last) = self.channel_status_last_done.take() {
// while kk <= last {
// kk = if let Some(x) = kit.next().map(Clone::clone) {
// start = Some(x.clone());
// x
// } else {
// start = None;
// break;
// };
// }
// }
// if let Some(mut kk) = start {
// loop {
// kk = if let Some(x) = kit.next().map(Clone::clone) {
// x
// } else {
// break;
// };
// }
// } else {
// // Nothing to do, will continue on next call from front.
// }
// }
// while let Some(kk) = kit.next() {}
// let mut channel_statuses = BTreeMap::new();
// for (k, v) in self.channels.iter() {
// let info = v.to_info(v.cssid(), self.remote_addr_dbg);
// channel_statuses.insert(v.cssid(), info);
// }
}
fn cmd_channel_add(&mut self, name: ChannelConfig, cssid: ChannelStatusSeriesId) {
self.channel_add(name, cssid);
}
fn cmd_channel_close(&mut self, name: String) {
self.channel_close(name);
// TODO return the result
@@ -981,7 +930,7 @@ impl CaConn {
trace3!("handle_conn_command received a command {}", self.remote_addr_dbg);
match a.kind {
ConnCommandKind::ChannelAdd(conf, cssid) => {
self.cmd_channel_add(conf, cssid);
self.channel_add(conf, cssid);
Ok(Ready(Some(())))
}
ConnCommandKind::ChannelClose(name) => {
@@ -1113,22 +1062,27 @@ impl CaConn {
self.stats.clone()
}
pub fn channel_add(&mut self, conf: ChannelConfig, cssid: ChannelStatusSeriesId) {
pub fn channel_add(&mut self, conf: ChannelConfig, cssid: ChannelStatusSeriesId) -> Result<(), Error> {
if self.cid_by_name(conf.name()).is_some() {
// TODO count for metrics
return;
}
let cid = self.cid_by_name_or_insert(conf.name());
if self.channels.contains_key(&cid) {
self.stats.channel_add_exists.inc();
error!("logic error channel already exists {conf:?}");
Ok(())
} else {
let conf = ChannelConf {
conf,
state: ChannelState::Init(cssid),
};
self.channels.insert(cid, conf);
// TODO do not count, use separate queue for those channels.
self.init_state_count += 1;
let cid = self.cid_by_name_or_insert(conf.name())?;
if self.channels.contains_key(&cid) {
self.stats.channel_add_exists.inc();
error!("logic error channel already exists {conf:?}");
Ok(())
} else {
let conf = ChannelConf {
conf,
state: ChannelState::Init(cssid),
};
self.channels.insert(cid, conf);
// TODO do not count, use separate queue for those channels.
self.init_state_count += 1;
Ok(())
}
}
}
@@ -1153,13 +1107,20 @@ impl CaConn {
self.cid_by_name.get(name).map(Clone::clone)
}
fn cid_by_name_or_insert(&mut self, name: &str) -> Cid {
fn cid_by_name_or_insert(&mut self, name: &str) -> Result<Cid, Error> {
if let Some(cid) = self.cid_by_name.get(name) {
*cid
Ok(*cid)
} else {
let cid = self.cid_store.next();
self.cid_by_name.insert(name.into(), cid);
cid
let mut found = None;
for _ in 0..1000 {
let cid = self.cid_store.next();
if !self.channels.contains_key(&cid) {
self.cid_by_name.insert(name.into(), cid);
found = Some(cid);
break;
}
}
found.ok_or(Error::NoFreeCid)
}
}
@@ -1214,58 +1175,6 @@ impl CaConn {
}
}
fn check_channels_alive(&mut self) -> Result<(), Error> {
let tsnow = Instant::now();
trace2!("check_channels_alive {addr:?}", addr = &self.remote_addr_dbg);
if let Some(started) = self.ioc_ping_start {
if started + Duration::from_millis(4000) < tsnow {
self.stats.pong_timeout().inc();
warn!("pong timeout {addr:?}", addr = self.remote_addr_dbg);
self.ioc_ping_start = None;
let item = CaConnEvent {
ts: tsnow,
value: CaConnEventValue::EchoTimeout,
};
self.ca_conn_event_out_queue.push_back(item);
self.trigger_shutdown(ShutdownReason::IocTimeout);
}
} else {
if self.ioc_ping_next < tsnow {
if let Some(proto) = &mut self.proto {
self.stats.ping_start().inc();
info!("start ping");
self.ioc_ping_start = Some(Instant::now());
let msg = CaMsg::from_ty_ts(CaMsgTy::Echo, tsnow);
proto.push_out(msg);
} else {
self.stats.ping_no_proto().inc();
warn!("can not ping {} no proto", self.remote_addr_dbg);
self.trigger_shutdown(ShutdownReason::Protocol);
}
}
}
let mut alive_count = 0;
let mut not_alive_count = 0;
for (_, conf) in &self.channels {
let st = &conf.state;
match st {
ChannelState::Writable(st2) => {
if tsnow.duration_since(st2.channel.ts_alive_last) >= Duration::from_millis(10000) {
warn!("TODO assume channel not alive because nothing received, but should do CAGET");
not_alive_count += 1;
} else {
alive_count += 1;
}
}
_ => {}
}
}
self.stats.channel_all_count.__set(self.channels.len() as _);
self.stats.channel_alive_count.__set(alive_count as _);
self.stats.channel_not_alive_count.__set(not_alive_count as _);
Ok(())
}
fn emit_channel_info_insert_items(&mut self) -> Result<(), Error> {
let timenow = self.tmp_ts_poll;
for (_, conf) in &mut self.channels {
@@ -1385,7 +1294,7 @@ impl CaConn {
ReadingState::Monitoring(x) => {
match x.mon2state {
// actually, no differing behavior needed so far.
Monitoring2State::Passive => (),
Monitoring2State::Passive(_) => (),
Monitoring2State::ReadPending(ioid, since) => (),
}
Some(x.subid.clone())
@@ -1415,7 +1324,7 @@ impl CaConn {
st.reading = ReadingState::Monitoring(MonitoringState {
tsbeg: tsnow,
subid: st2.subid,
mon2state: Monitoring2State::Passive,
mon2state: Monitoring2State::Passive(Monitoring2PassiveState { tsbeg: tsnow }),
});
let crst = &mut st.channel;
let writer = &mut st.writer;
@@ -1424,10 +1333,13 @@ impl CaConn {
Self::event_add_ingest(ev.payload_len, ev.value, crst, writer, iiq, tsnow, stnow, stats)?;
}
ReadingState::Monitoring(st2) => {
match st2.mon2state {
Monitoring2State::Passive => {}
match &mut st2.mon2state {
Monitoring2State::Passive(st3) => {
st3.tsbeg = tsnow;
}
Monitoring2State::ReadPending(ioid, since) => {
error!("TODO actually, EventAddRes can anyway not be a response to a ReadNotify");
warn!("TODO we are waiting for a explicit caget, but received a monitor event");
st2.mon2state = Monitoring2State::Passive(Monitoring2PassiveState { tsbeg: tsnow });
}
}
let crst = &mut st.channel;
@@ -1499,9 +1411,11 @@ impl CaConn {
warn!("received event-cancel but channel {name:?} in wrong state");
}
ReadingState::Monitoring(st2) => {
match st2.mon2state {
match &mut st2.mon2state {
// no special discrimination needed
Monitoring2State::Passive => {}
Monitoring2State::Passive(st3) => {
st3.tsbeg = tsnow;
}
Monitoring2State::ReadPending(ioid, since) => {}
}
let name = self.name_by_cid(cid);
@@ -1568,12 +1482,21 @@ impl CaConn {
ReadingState::EnableMonitoring(..) => {
error!("TODO handle_read_notify_res handle EnableMonitoring");
}
ReadingState::Monitoring(st2) => match st2.mon2state {
Monitoring2State::Passive => {
ReadingState::Monitoring(st2) => match &mut st2.mon2state {
Monitoring2State::Passive(st3) => {
self.read_ioids.remove(&ioid);
st3.tsbeg = tsnow;
error!("ReadNotifyRes even though we do not expect one");
}
Monitoring2State::ReadPending(ioid, since) => {
Monitoring2State::ReadPending(ioid2, _since) => {
trace!("\nhandle_read_notify_res received ReadNotify in Monitoring2State::ReadPending\n\n");
// We don't check again for `since` here. That's done in timeout checking.
// So we could be here a little beyond timeout but we don't care about that.
if ioid != *ioid2 {
warn!("IOID mismatch ReadNotifyRes on Monitor Read Pending {ioid:?} {ioid2:?}");
}
self.read_ioids.remove(&ioid);
st2.mon2state = Monitoring2State::Passive(Monitoring2PassiveState { tsbeg: tsnow });
let iiq = &mut self.insert_item_queue;
let stats = self.stats.as_ref();
Self::read_notify_res_for_write(ev, st, iiq, stnow, tsnow, stats)?;
@@ -1804,39 +1727,59 @@ impl CaConn {
ChannelState::MakingSeriesWriter(_) => {}
ChannelState::Writable(st2) => match &mut st2.reading {
ReadingState::EnableMonitoring(_) => {}
ReadingState::Monitoring(st3) => match st3.mon2state {
Monitoring2State::Passive => {
// nothing to do
ReadingState::Monitoring(st3) => match &st3.mon2state {
Monitoring2State::Passive(st4) => {
if st4.tsbeg + TIMEOUT_MONITOR_PASSIVE < tsnow {
trace2!("check_channels_state_poll Monitoring2State::Passive timeout");
// TODO encapsulate and unify with Polling handler
let ioid = Ioid(self.ioid);
self.ioid = self.ioid.wrapping_add(1);
self.read_ioids.insert(ioid, st2.channel.cid.clone());
let msg = CaMsg::from_ty_ts(
CaMsgTy::ReadNotify(ReadNotify {
data_type: st2.channel.ca_dbr_type,
data_count: st2.channel.ca_dbr_count,
sid: st2.channel.sid.to_u32(),
ioid: ioid.0,
}),
tsnow,
);
do_wake_again = true;
self.proto.as_mut().ok_or_else(|| Error::NoProtocol)?.push_out(msg);
st3.mon2state = Monitoring2State::ReadPending(ioid, tsnow);
self.stats.caget_issued().inc();
}
}
Monitoring2State::ReadPending(ioid, since) => {
error!("TODO check for timeout");
if since + MONITOR_POLL_TIMEOUT < tsnow {
let name = conf.conf.name();
warn!("channel monitor explicit read timeout {} ioid {:?}", name, ioid);
if *since + MONITOR_POLL_TIMEOUT < tsnow {
// Something is wrong with this channel.
// Maybe we lost connection, maybe the IOC went down, maybe there is a bug where only
// this or a subset of the subscribed channels no longer give updates.
// Here we try to close the channel at hand.
// If the close-state does not
let name = conf.conf.name();
warn!("channel monitor explicit read timeout {} ioid {:?}", name, ioid);
if false {
// Here we try to close the channel at hand.
// TODO need to define the transition from operating channel to inoperable channel in
// a better and reusable way:
// Do not go directly into error state: need to at least attempt to close the channel and wait/timeout for reply.
// TODO need to define the transition from operating channel to inoperable channel in
// a better and reusable way:
// Do not go directly into error state: need to at least attempt to close the channel and wait/timeout for reply.
let proto = self.proto.as_mut().ok_or(Error::NoProtocol)?;
let item = CaMsg {
ty: CaMsgTy::ChannelClose(ChannelClose {
sid: st2.channel.sid.0,
cid: st2.channel.cid.0,
}),
ts: tsnow,
};
proto.push_out(item);
*chst = ChannelState::Closing(ClosingState {
tsbeg: tsnow,
cssid: st2.channel.cssid,
});
let proto = self.proto.as_mut().ok_or(Error::NoProtocol)?;
let item = CaMsg {
ty: CaMsgTy::ChannelClose(ChannelClose {
sid: st2.channel.sid.0,
cid: st2.channel.cid.0,
}),
ts: tsnow,
};
proto.push_out(item);
*chst = ChannelState::Closing(ClosingState {
tsbeg: tsnow,
cssid: st2.channel.cssid,
});
} else {
do_shutdown = Some(ShutdownReason::IocTimeout);
}
}
}
},
@@ -1893,6 +1836,76 @@ impl CaConn {
Ok(())
}
fn check_channels_alive(&mut self, tsnow: Instant, cx: &mut Context) -> Result<(), Error> {
trace2!("check_channels_alive {addr:?}", addr = &self.remote_addr_dbg);
if let Some(started) = self.ioc_ping_start {
if started + Duration::from_millis(4000) < tsnow {
self.stats.pong_timeout().inc();
warn!("pong timeout {addr:?}", addr = self.remote_addr_dbg);
self.ioc_ping_start = None;
let item = CaConnEvent {
ts: tsnow,
value: CaConnEventValue::EchoTimeout,
};
self.ca_conn_event_out_queue.push_back(item);
self.trigger_shutdown(ShutdownReason::IocTimeout);
}
} else {
if self.ioc_ping_next < tsnow {
if let Some(proto) = &mut self.proto {
self.stats.ping_start().inc();
info!("start ping");
self.ioc_ping_start = Some(tsnow);
let msg = CaMsg::from_ty_ts(CaMsgTy::Echo, tsnow);
proto.push_out(msg);
} else {
self.stats.ping_no_proto().inc();
warn!("can not ping {} no proto", self.remote_addr_dbg);
self.trigger_shutdown(ShutdownReason::ProtocolMissing);
}
}
}
let mut alive_count = 0;
let mut not_alive_count = 0;
for (_, conf) in &mut self.channels {
let st = &mut conf.state;
match st {
ChannelState::Writable(st2) => {
match &mut st2.reading {
ReadingState::EnableMonitoring(_) => {
// TODO handle timeout check
}
ReadingState::Monitoring(st3) => match &st3.mon2state {
Monitoring2State::Passive(st4) => {}
Monitoring2State::ReadPending(_, tsbeg) => {
// This is handled in check_channels_state_poll
// TODO should unify.
}
},
ReadingState::StopMonitoringForPolling(_) => {
// TODO handle timeout check
}
ReadingState::Polling(st3) => {
// This is handled in check_channels_state_poll
// TODO should unify.
}
}
if tsnow.duration_since(st2.channel.ts_alive_last) >= Duration::from_millis(10000) {
warn!("TODO assume channel not alive because nothing received, but should do CAGET");
not_alive_count += 1;
} else {
alive_count += 1;
}
}
_ => {}
}
}
self.stats.channel_all_count.__set(self.channels.len() as _);
self.stats.channel_alive_count.__set(alive_count as _);
self.stats.channel_not_alive_count.__set(not_alive_count as _);
Ok(())
}
// Can return:
// Pending, error, work-done (pending state unknown), no-more-work-ever-again.
fn handle_peer_ready(&mut self, cx: &mut Context) -> Poll<Option<Result<(), Error>>> {
@@ -1993,6 +2006,9 @@ impl CaConn {
std::process::exit(13);
}
}
CaMsgTy::ChannelCloseRes(x) => {
self.handle_channel_close_res(x, tsnow)?;
}
_ => {
warn!("Received unexpected protocol message {:?}", camsg);
}
@@ -2084,6 +2100,11 @@ impl CaConn {
Ok(())
}
fn handle_channel_close_res(&mut self, k: proto::ChannelCloseRes, tsnow: Instant) -> Result<(), Error> {
info!("{:?}", k);
Ok(())
}
// `?` works not in here.
fn _test_control_flow(&mut self, _cx: &mut Context) -> ControlFlow<Poll<Result<(), Error>>> {
use ControlFlow::*;
@@ -2186,7 +2207,6 @@ impl CaConn {
}
}
CaConnState::PeerReady => {
trace4!("PeerReady");
let res = self.handle_peer_ready(cx);
match res {
Ready(Some(Ok(()))) => Ok(Ready(Some(()))),
@@ -2259,6 +2279,7 @@ impl CaConn {
}
self.check_channels_state_init(tsnow, cx)?;
self.check_channels_state_poll(tsnow, cx)?;
self.check_channels_alive(tsnow, cx)?;
// TODO add some random variation
if self.channel_status_emit_last + Duration::from_millis(3000) <= tsnow {
self.channel_status_emit_last = tsnow;
@@ -2292,7 +2313,8 @@ impl CaConn {
let chinfo = chst.to_info(chst.cssid(), self.remote_addr_dbg, conf.conf.clone());
channel_statuses.insert(chst.cssid(), chinfo);
}
trace!("emit_channel_status {}", channel_statuses.len());
// trace2!("{:?}", channel_statuses);
// trace!("emit_channel_status {}", channel_statuses.len());
let val = ChannelStatusPartial { channel_statuses };
let item = CaConnEvent {
ts: Instant::now(),
@@ -2385,7 +2407,9 @@ impl CaConn {
FS: Fn(&Q),
{
use Poll::*;
trace3!("attempt_flush_queue id {} len {}", id, qu.len());
if qu.len() != 0 {
trace3!("attempt_flush_queue id {:7} len {}", id, qu.len());
}
let mut have_progress = false;
let mut i = 0;
loop {
@@ -2407,7 +2431,7 @@ impl CaConn {
if sp.is_sending() {
match sp.poll_unpin(cx) {
Ready(Ok(())) => {
trace3!("attempt_flush_queue id {} send done", id);
trace3!("attempt_flush_queue id {:7} send done", id);
have_progress = true;
}
Ready(Err(e)) => {

View File

@@ -62,6 +62,7 @@ use std::net::SocketAddr;
use std::net::SocketAddrV4;
use std::pin::Pin;
use netpod::OnDrop;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
@@ -478,6 +479,15 @@ impl CaConnSet {
async fn run(mut this: CaConnSet) -> Result<(), Error> {
trace!("CaConnSet run begin");
let (beacons_cancel_guard_tx, rx) = taskrun::tokio::sync::mpsc::channel(12);
let beacons_jh = tokio::spawn(async move {
if false {
crate::ca::beacons::listen_beacons(rx).await
} else {
Ok(())
}
});
let _g_beacon = OnDrop::new(move || {});
loop {
let x = this.next().await;
match x {
@@ -486,6 +496,10 @@ impl CaConnSet {
}
}
trace!("CaConnSet EndOfStream");
beacons_cancel_guard_tx.send(1).await.ok();
trace!("CaConnSet beacon cancelled");
beacons_jh.await?.map_err(|e| Error::from_string(e))?;
trace!("CaConnSet beacon joined");
trace!("join ioc_finder_jh A {:?}", this.find_ioc_query_sender.len());
this.find_ioc_query_sender.as_mut().drop();
trace!("join ioc_finder_jh B {:?}", this.find_ioc_query_sender.len());
@@ -496,6 +510,7 @@ impl CaConnSet {
this.connset_out_tx.close();
this.connset_inp_rx.close();
this.shutdown_done = true;
trace!("CaConnSet run done");
Ok(())
}
@@ -1061,7 +1076,7 @@ impl CaConnSet {
trace2!("ca_conn_consumer ended {}", addr);
match ret {
Ok(x) => {
trace!("Sending CaConnEventValue::EndOfStream");
trace!("sending CaConnEventValue::EndOfStream");
tx1.send((addr, CaConnEvent::new_now(CaConnEventValue::EndOfStream(x))))
.await?;
}
@@ -1081,6 +1096,7 @@ impl CaConnSet {
) -> Result<EndOfStreamReason, Error> {
let mut eos_reason = None;
while let Some(item) = conn.next().await {
trace!("ca_conn_item_merge_inner item {item:?}");
if let Some(x) = eos_reason {
let e = Error::with_msg_no_trace(format!("CaConn delivered already eos {addr} {x:?}"));
error!("{e}");

View File

@@ -46,6 +46,7 @@ pub enum Error {
NeitherPendingNorProgress,
OutputBufferTooSmall,
LogicError,
BadPayload,
}
const CA_PROTO_VERSION: u32 = 13;
@@ -53,10 +54,10 @@ const EPICS_EPOCH_OFFSET: u64 = 631152000;
const PAYLOAD_LEN_MAX: u32 = 1024 * 1024 * 32;
const PROTO_INPUT_BUF_CAP: u32 = 1024 * 1024 * 40;
const TESTING_UNRESPONSIVE_TODO_REMOVE: bool = true;
const TESTING_UNRESPONSIVE_TODO_REMOVE: bool = false;
const TESTING_EVENT_ADD_RES_MAX: u32 = 3;
const TESTING_PROTOCOL_ERROR_TODO_REMOVE: bool = true;
const TESTING_PROTOCOL_ERROR_TODO_REMOVE: bool = false;
const TESTING_PROTOCOL_ERROR_AFTER_BYTES: u32 = 400;
#[derive(Debug)]
@@ -867,6 +868,16 @@ impl CaMsg {
let ty = CaMsgTy::EventAddRes(d);
CaMsg::from_ty_ts(ty, tsnow)
}
0x0c => {
if payload.len() != 0 {
return Err(Error::BadPayload);
}
let ty = CaMsgTy::ChannelCloseRes(ChannelCloseRes {
sid: hi.param1,
cid: hi.param2,
});
CaMsg::from_ty_ts(ty, tsnow)
}
0x0f => {
if payload.len() == 8 {
let v = u64::from_be_bytes(payload.try_into().map_err(|_| Error::BadSlice)?);
@@ -1197,6 +1208,8 @@ impl CaProto {
self.buf.put_u8(0x55)?;
}
}
} else {
self.buf.wadv(nf)?;
}
have_progress = true;
self.stats.tcp_recv_bytes().add(nf as _);
@@ -1296,11 +1309,16 @@ impl CaProto {
let ret = match &msg.ty {
CaMsgTy::EventAddRes(..) => {
self.stats.data_count().ingest(hi.data_count() as u32);
if TESTING_UNRESPONSIVE_TODO_REMOVE && self.event_add_res_cnt < TESTING_EVENT_ADD_RES_MAX {
if TESTING_UNRESPONSIVE_TODO_REMOVE {
if self.event_add_res_cnt < TESTING_EVENT_ADD_RES_MAX {
self.event_add_res_cnt += 1;
Ok(Some(CaItem::Msg(msg)))
} else {
Ok(None)
}
} else {
self.event_add_res_cnt += 1;
Ok(Some(CaItem::Msg(msg)))
} else {
Ok(None)
}
}
_ => Ok(Some(CaItem::Msg(msg))),

View File

@@ -1,3 +1,4 @@
#![allow(unused)]
pub mod postingest;
pub mod status;

View File

@@ -366,10 +366,11 @@ fn inspect_items(item_inp: Receiver<VecDeque<QueryItem>>) -> impl Stream<Item =
}
QueryItem::Insert(item) => {
trace3!(
"execute Insert {:?} {:?} {:?}",
"execute Insert {:?} {:?} {:?} {:?}",
item.series,
item.ts_msp,
item.val.shape()
item.val.shape(),
item
);
}
QueryItem::TimeBinSimpleF32(_) => {

View File

@@ -346,6 +346,7 @@ stats_proc::stats_struct!((
transition_to_polling,
transition_to_polling_already_in,
transition_to_polling_bad_state,
channel_add_exists,
),
values(inter_ivl_ema, read_ioids_len, proto_out_len,),
histolog2s(