diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index e679bfb..08123cb 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -42,6 +42,7 @@ const CHECK_HEALTH_TIMEOUT: Duration = Duration::from_millis(5000); const PRINT_ACTIVE_INTERVAL: Duration = Duration::from_millis(60000); const PRINT_STATUS_INTERVAL: Duration = Duration::from_millis(20000); const CHECK_CHANNEL_SLOW_WARN: Duration = Duration::from_millis(500); +const RUN_WITHOUT_SCYLLA: bool = true; pub struct DaemonOpts { pgconf: Database, @@ -637,16 +638,18 @@ pub async fn run(opts: CaIngestOpts, channels_config: Option) -> drop(pg); jh.await?.map_err(Error::from_string)?; - scywr::schema::migrate_scylla_data_schema(opts.scylla_config(), RetentionTime::Short) - .await - .map_err(Error::from_string)?; - - if let Some(scyconf) = opts.scylla_config_lt() { - scywr::schema::migrate_scylla_data_schema(scyconf, RetentionTime::Long) + if RUN_WITHOUT_SCYLLA { + } else { + scywr::schema::migrate_scylla_data_schema(opts.scylla_config(), RetentionTime::Short) .await .map_err(Error::from_string)?; - } + if let Some(scyconf) = opts.scylla_config_lt() { + scywr::schema::migrate_scylla_data_schema(scyconf, RetentionTime::Long) + .await + .map_err(Error::from_string)?; + } + } info!("database check done"); // TODO use a new stats type: diff --git a/dbpg/src/schema.rs b/dbpg/src/schema.rs index 3b56a61..c155e09 100644 --- a/dbpg/src/schema.rs +++ b/dbpg/src/schema.rs @@ -173,6 +173,7 @@ pub async fn schema_check(pgc: &PgClient) -> Result<(), Error> { Ok(()) } +#[allow(unused)] fn ignore_does_not_exist(x: Result) -> Result<(), tokio_postgres::Error> { match x { Ok(_) => Ok(()), diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index f096fef..93cc05f 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -1,3 +1,4 @@ +pub mod beacons; pub mod conn; pub mod connset; pub mod connset_input_merge; diff --git a/netfetch/src/ca/beacons.rs b/netfetch/src/ca/beacons.rs new file mode 100644 index 0000000..d871850 --- /dev/null +++ b/netfetch/src/ca/beacons.rs @@ -0,0 +1,45 @@ +use bytes::Buf; +use err::thiserror; +use err::ThisError; +use log::*; +use std::io::Cursor; +use std::net::Ipv4Addr; +use taskrun::tokio::net::UdpSocket; + +#[derive(Debug, ThisError)] +pub enum Error { + Io(#[from] std::io::Error), +} + +pub async fn listen_beacons(mut cancel: taskrun::tokio::sync::mpsc::Receiver) -> Result<(), Error> { + let sock = UdpSocket::bind("0.0.0.0:5065").await?; + sock.set_broadcast(true).unwrap(); + let mut buf = Vec::new(); + buf.resize(1024 * 4, 0); + loop { + let bb = &mut buf; + let (n, remote) = taskrun::tokio::select! { + x = sock.recv_from(bb) => x, + _ = cancel.recv() => { + break; + } + }?; + if n != 16 { + debug!("len recv {n}"); + } + if n >= 16 { + let mut cur = Cursor::new(bb); + let cmd = cur.get_u16(); + let _ = cur.get_u16(); + let ver = cur.get_u16(); + let port = cur.get_u16(); + let _seqid = cur.get_u32(); + let addr = cur.get_u32(); + let addr = Ipv4Addr::from(addr); + if cmd == 0x0d { + debug!("beacon {remote} {ver} {addr} {port}") + } + } + } + Ok(()) +} diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index f35cb86..57d6757 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -17,7 +17,6 @@ use futures_util::FutureExt; use futures_util::Stream; use futures_util::StreamExt; use hashbrown::HashMap; -use hashbrown::HashSet; use log::*; use netpod::timeunits::*; use netpod::ScalarType; @@ -74,6 +73,7 @@ const IOC_PING_IVL: Duration = Duration::from_millis(80000); const DO_RATE_CHECK: bool = false; const MONITOR_POLL_TIMEOUT: Duration = Duration::from_millis(3000); const TIMEOUT_CHANNEL_CLOSING: Duration = Duration::from_millis(3000); +const TIMEOUT_MONITOR_PASSIVE: Duration = Duration::from_millis(3000); #[allow(unused)] macro_rules! trace2 { @@ -109,7 +109,10 @@ pub enum Error { IocIssue, Protocol(#[from] crate::ca::proto::Error), Writer(#[from] serieswriter::writer::Error), + // TODO remove false positive from ThisError derive + #[allow(private_interfaces)] UnknownCid(Cid), + #[allow(private_interfaces)] NoNameForCid(Cid), CreateChannelBadState, CommonError(#[from] err::Error), @@ -121,6 +124,7 @@ pub enum Error { ShutdownWithQueuesNoProgressNoPending, Error, DurationOutOfBounds, + NoFreeCid, } impl err::ToErr for Error { @@ -261,9 +265,16 @@ struct ReadPendingState { tsbeg: Instant, } +#[derive(Debug, Clone)] +struct Monitoring2PassiveState { + // Holds instant when we entered this state. A receive of an event is considered a re-enter of the state, + // so the instant gets updated. Used for timeout check. + tsbeg: Instant, +} + #[derive(Debug, Clone)] enum Monitoring2State { - Passive, + Passive(Monitoring2PassiveState), ReadPending(Ioid, Instant), } @@ -288,7 +299,12 @@ struct PollingState { #[derive(Debug, Clone)] enum PollTickState { + // TODO use inner struct to give this Instant a name. + // When monitoring, update this ts on received events. + // It should hold the Instant when we entered this state, but a receive of some event + // is considered re-entering this state. Idle(Instant), + // TODO use inner struct to give this Instant a name Wait(Instant, Ioid), } @@ -500,15 +516,9 @@ impl fmt::Debug for CaConnState { } } -fn wait_fut(dt: u64) -> Pin + Send>> { - let fut = tokio::time::sleep(Duration::from_millis(dt)); - Box::pin(fut) -} - struct CidStore { cnt: u32, rng: Xoshiro128PlusPlus, - reg: HashSet, } impl CidStore { @@ -516,7 +526,6 @@ impl CidStore { Self { cnt: 0, rng: Xoshiro128PlusPlus::seed_from_u64(seed as _), - reg: HashSet::new(), } } @@ -898,66 +907,6 @@ impl CaConn { self.proto = None; } - fn cmd_check_health(&mut self) { - // TODO - // no longer in use. - // CaConn emits health updates by iteself. - // Make sure that we do also the checks here on regular intervals. - - trace!("cmd_check_health"); - - // TODO - // what actions are taken here? - // what status is modified here? - match self.check_channels_alive() { - Ok(_) => {} - Err(e) => { - error!("{e}"); - self.trigger_shutdown(ShutdownReason::InternalError); - } - } - - // TODO - // Time this, is it fast enough? - - // let mut kit = self.cid_by_name.values(); - // if let Some(mut kk) = kit.next().map(Clone::clone) { - // let mut start = Some(kk.clone()); - // if let Some(last) = self.channel_status_last_done.take() { - // while kk <= last { - // kk = if let Some(x) = kit.next().map(Clone::clone) { - // start = Some(x.clone()); - // x - // } else { - // start = None; - // break; - // }; - // } - // } - // if let Some(mut kk) = start { - // loop { - // kk = if let Some(x) = kit.next().map(Clone::clone) { - // x - // } else { - // break; - // }; - // } - // } else { - // // Nothing to do, will continue on next call from front. - // } - // } - // while let Some(kk) = kit.next() {} - // let mut channel_statuses = BTreeMap::new(); - // for (k, v) in self.channels.iter() { - // let info = v.to_info(v.cssid(), self.remote_addr_dbg); - // channel_statuses.insert(v.cssid(), info); - // } - } - - fn cmd_channel_add(&mut self, name: ChannelConfig, cssid: ChannelStatusSeriesId) { - self.channel_add(name, cssid); - } - fn cmd_channel_close(&mut self, name: String) { self.channel_close(name); // TODO return the result @@ -981,7 +930,7 @@ impl CaConn { trace3!("handle_conn_command received a command {}", self.remote_addr_dbg); match a.kind { ConnCommandKind::ChannelAdd(conf, cssid) => { - self.cmd_channel_add(conf, cssid); + self.channel_add(conf, cssid); Ok(Ready(Some(()))) } ConnCommandKind::ChannelClose(name) => { @@ -1113,22 +1062,27 @@ impl CaConn { self.stats.clone() } - pub fn channel_add(&mut self, conf: ChannelConfig, cssid: ChannelStatusSeriesId) { + pub fn channel_add(&mut self, conf: ChannelConfig, cssid: ChannelStatusSeriesId) -> Result<(), Error> { if self.cid_by_name(conf.name()).is_some() { - // TODO count for metrics - return; - } - let cid = self.cid_by_name_or_insert(conf.name()); - if self.channels.contains_key(&cid) { + self.stats.channel_add_exists.inc(); error!("logic error channel already exists {conf:?}"); + Ok(()) } else { - let conf = ChannelConf { - conf, - state: ChannelState::Init(cssid), - }; - self.channels.insert(cid, conf); - // TODO do not count, use separate queue for those channels. - self.init_state_count += 1; + let cid = self.cid_by_name_or_insert(conf.name())?; + if self.channels.contains_key(&cid) { + self.stats.channel_add_exists.inc(); + error!("logic error channel already exists {conf:?}"); + Ok(()) + } else { + let conf = ChannelConf { + conf, + state: ChannelState::Init(cssid), + }; + self.channels.insert(cid, conf); + // TODO do not count, use separate queue for those channels. + self.init_state_count += 1; + Ok(()) + } } } @@ -1153,13 +1107,20 @@ impl CaConn { self.cid_by_name.get(name).map(Clone::clone) } - fn cid_by_name_or_insert(&mut self, name: &str) -> Cid { + fn cid_by_name_or_insert(&mut self, name: &str) -> Result { if let Some(cid) = self.cid_by_name.get(name) { - *cid + Ok(*cid) } else { - let cid = self.cid_store.next(); - self.cid_by_name.insert(name.into(), cid); - cid + let mut found = None; + for _ in 0..1000 { + let cid = self.cid_store.next(); + if !self.channels.contains_key(&cid) { + self.cid_by_name.insert(name.into(), cid); + found = Some(cid); + break; + } + } + found.ok_or(Error::NoFreeCid) } } @@ -1214,58 +1175,6 @@ impl CaConn { } } - fn check_channels_alive(&mut self) -> Result<(), Error> { - let tsnow = Instant::now(); - trace2!("check_channels_alive {addr:?}", addr = &self.remote_addr_dbg); - if let Some(started) = self.ioc_ping_start { - if started + Duration::from_millis(4000) < tsnow { - self.stats.pong_timeout().inc(); - warn!("pong timeout {addr:?}", addr = self.remote_addr_dbg); - self.ioc_ping_start = None; - let item = CaConnEvent { - ts: tsnow, - value: CaConnEventValue::EchoTimeout, - }; - self.ca_conn_event_out_queue.push_back(item); - self.trigger_shutdown(ShutdownReason::IocTimeout); - } - } else { - if self.ioc_ping_next < tsnow { - if let Some(proto) = &mut self.proto { - self.stats.ping_start().inc(); - info!("start ping"); - self.ioc_ping_start = Some(Instant::now()); - let msg = CaMsg::from_ty_ts(CaMsgTy::Echo, tsnow); - proto.push_out(msg); - } else { - self.stats.ping_no_proto().inc(); - warn!("can not ping {} no proto", self.remote_addr_dbg); - self.trigger_shutdown(ShutdownReason::Protocol); - } - } - } - let mut alive_count = 0; - let mut not_alive_count = 0; - for (_, conf) in &self.channels { - let st = &conf.state; - match st { - ChannelState::Writable(st2) => { - if tsnow.duration_since(st2.channel.ts_alive_last) >= Duration::from_millis(10000) { - warn!("TODO assume channel not alive because nothing received, but should do CAGET"); - not_alive_count += 1; - } else { - alive_count += 1; - } - } - _ => {} - } - } - self.stats.channel_all_count.__set(self.channels.len() as _); - self.stats.channel_alive_count.__set(alive_count as _); - self.stats.channel_not_alive_count.__set(not_alive_count as _); - Ok(()) - } - fn emit_channel_info_insert_items(&mut self) -> Result<(), Error> { let timenow = self.tmp_ts_poll; for (_, conf) in &mut self.channels { @@ -1385,7 +1294,7 @@ impl CaConn { ReadingState::Monitoring(x) => { match x.mon2state { // actually, no differing behavior needed so far. - Monitoring2State::Passive => (), + Monitoring2State::Passive(_) => (), Monitoring2State::ReadPending(ioid, since) => (), } Some(x.subid.clone()) @@ -1415,7 +1324,7 @@ impl CaConn { st.reading = ReadingState::Monitoring(MonitoringState { tsbeg: tsnow, subid: st2.subid, - mon2state: Monitoring2State::Passive, + mon2state: Monitoring2State::Passive(Monitoring2PassiveState { tsbeg: tsnow }), }); let crst = &mut st.channel; let writer = &mut st.writer; @@ -1424,10 +1333,13 @@ impl CaConn { Self::event_add_ingest(ev.payload_len, ev.value, crst, writer, iiq, tsnow, stnow, stats)?; } ReadingState::Monitoring(st2) => { - match st2.mon2state { - Monitoring2State::Passive => {} + match &mut st2.mon2state { + Monitoring2State::Passive(st3) => { + st3.tsbeg = tsnow; + } Monitoring2State::ReadPending(ioid, since) => { - error!("TODO actually, EventAddRes can anyway not be a response to a ReadNotify"); + warn!("TODO we are waiting for a explicit caget, but received a monitor event"); + st2.mon2state = Monitoring2State::Passive(Monitoring2PassiveState { tsbeg: tsnow }); } } let crst = &mut st.channel; @@ -1499,9 +1411,11 @@ impl CaConn { warn!("received event-cancel but channel {name:?} in wrong state"); } ReadingState::Monitoring(st2) => { - match st2.mon2state { + match &mut st2.mon2state { // no special discrimination needed - Monitoring2State::Passive => {} + Monitoring2State::Passive(st3) => { + st3.tsbeg = tsnow; + } Monitoring2State::ReadPending(ioid, since) => {} } let name = self.name_by_cid(cid); @@ -1568,12 +1482,21 @@ impl CaConn { ReadingState::EnableMonitoring(..) => { error!("TODO handle_read_notify_res handle EnableMonitoring"); } - ReadingState::Monitoring(st2) => match st2.mon2state { - Monitoring2State::Passive => { + ReadingState::Monitoring(st2) => match &mut st2.mon2state { + Monitoring2State::Passive(st3) => { + self.read_ioids.remove(&ioid); + st3.tsbeg = tsnow; error!("ReadNotifyRes even though we do not expect one"); } - Monitoring2State::ReadPending(ioid, since) => { + Monitoring2State::ReadPending(ioid2, _since) => { + trace!("\nhandle_read_notify_res received ReadNotify in Monitoring2State::ReadPending\n\n"); + // We don't check again for `since` here. That's done in timeout checking. + // So we could be here a little beyond timeout but we don't care about that. + if ioid != *ioid2 { + warn!("IOID mismatch ReadNotifyRes on Monitor Read Pending {ioid:?} {ioid2:?}"); + } self.read_ioids.remove(&ioid); + st2.mon2state = Monitoring2State::Passive(Monitoring2PassiveState { tsbeg: tsnow }); let iiq = &mut self.insert_item_queue; let stats = self.stats.as_ref(); Self::read_notify_res_for_write(ev, st, iiq, stnow, tsnow, stats)?; @@ -1804,39 +1727,59 @@ impl CaConn { ChannelState::MakingSeriesWriter(_) => {} ChannelState::Writable(st2) => match &mut st2.reading { ReadingState::EnableMonitoring(_) => {} - ReadingState::Monitoring(st3) => match st3.mon2state { - Monitoring2State::Passive => { - // nothing to do + ReadingState::Monitoring(st3) => match &st3.mon2state { + Monitoring2State::Passive(st4) => { + if st4.tsbeg + TIMEOUT_MONITOR_PASSIVE < tsnow { + trace2!("check_channels_state_poll Monitoring2State::Passive timeout"); + // TODO encapsulate and unify with Polling handler + let ioid = Ioid(self.ioid); + self.ioid = self.ioid.wrapping_add(1); + self.read_ioids.insert(ioid, st2.channel.cid.clone()); + let msg = CaMsg::from_ty_ts( + CaMsgTy::ReadNotify(ReadNotify { + data_type: st2.channel.ca_dbr_type, + data_count: st2.channel.ca_dbr_count, + sid: st2.channel.sid.to_u32(), + ioid: ioid.0, + }), + tsnow, + ); + do_wake_again = true; + self.proto.as_mut().ok_or_else(|| Error::NoProtocol)?.push_out(msg); + st3.mon2state = Monitoring2State::ReadPending(ioid, tsnow); + self.stats.caget_issued().inc(); + } } Monitoring2State::ReadPending(ioid, since) => { - error!("TODO check for timeout"); - if since + MONITOR_POLL_TIMEOUT < tsnow { - let name = conf.conf.name(); - warn!("channel monitor explicit read timeout {} ioid {:?}", name, ioid); - + if *since + MONITOR_POLL_TIMEOUT < tsnow { // Something is wrong with this channel. // Maybe we lost connection, maybe the IOC went down, maybe there is a bug where only // this or a subset of the subscribed channels no longer give updates. - // Here we try to close the channel at hand. - // If the close-state does not + let name = conf.conf.name(); + warn!("channel monitor explicit read timeout {} ioid {:?}", name, ioid); + if false { + // Here we try to close the channel at hand. - // TODO need to define the transition from operating channel to inoperable channel in - // a better and reusable way: - // Do not go directly into error state: need to at least attempt to close the channel and wait/timeout for reply. + // TODO need to define the transition from operating channel to inoperable channel in + // a better and reusable way: + // Do not go directly into error state: need to at least attempt to close the channel and wait/timeout for reply. - let proto = self.proto.as_mut().ok_or(Error::NoProtocol)?; - let item = CaMsg { - ty: CaMsgTy::ChannelClose(ChannelClose { - sid: st2.channel.sid.0, - cid: st2.channel.cid.0, - }), - ts: tsnow, - }; - proto.push_out(item); - *chst = ChannelState::Closing(ClosingState { - tsbeg: tsnow, - cssid: st2.channel.cssid, - }); + let proto = self.proto.as_mut().ok_or(Error::NoProtocol)?; + let item = CaMsg { + ty: CaMsgTy::ChannelClose(ChannelClose { + sid: st2.channel.sid.0, + cid: st2.channel.cid.0, + }), + ts: tsnow, + }; + proto.push_out(item); + *chst = ChannelState::Closing(ClosingState { + tsbeg: tsnow, + cssid: st2.channel.cssid, + }); + } else { + do_shutdown = Some(ShutdownReason::IocTimeout); + } } } }, @@ -1893,6 +1836,76 @@ impl CaConn { Ok(()) } + fn check_channels_alive(&mut self, tsnow: Instant, cx: &mut Context) -> Result<(), Error> { + trace2!("check_channels_alive {addr:?}", addr = &self.remote_addr_dbg); + if let Some(started) = self.ioc_ping_start { + if started + Duration::from_millis(4000) < tsnow { + self.stats.pong_timeout().inc(); + warn!("pong timeout {addr:?}", addr = self.remote_addr_dbg); + self.ioc_ping_start = None; + let item = CaConnEvent { + ts: tsnow, + value: CaConnEventValue::EchoTimeout, + }; + self.ca_conn_event_out_queue.push_back(item); + self.trigger_shutdown(ShutdownReason::IocTimeout); + } + } else { + if self.ioc_ping_next < tsnow { + if let Some(proto) = &mut self.proto { + self.stats.ping_start().inc(); + info!("start ping"); + self.ioc_ping_start = Some(tsnow); + let msg = CaMsg::from_ty_ts(CaMsgTy::Echo, tsnow); + proto.push_out(msg); + } else { + self.stats.ping_no_proto().inc(); + warn!("can not ping {} no proto", self.remote_addr_dbg); + self.trigger_shutdown(ShutdownReason::ProtocolMissing); + } + } + } + let mut alive_count = 0; + let mut not_alive_count = 0; + for (_, conf) in &mut self.channels { + let st = &mut conf.state; + match st { + ChannelState::Writable(st2) => { + match &mut st2.reading { + ReadingState::EnableMonitoring(_) => { + // TODO handle timeout check + } + ReadingState::Monitoring(st3) => match &st3.mon2state { + Monitoring2State::Passive(st4) => {} + Monitoring2State::ReadPending(_, tsbeg) => { + // This is handled in check_channels_state_poll + // TODO should unify. + } + }, + ReadingState::StopMonitoringForPolling(_) => { + // TODO handle timeout check + } + ReadingState::Polling(st3) => { + // This is handled in check_channels_state_poll + // TODO should unify. + } + } + if tsnow.duration_since(st2.channel.ts_alive_last) >= Duration::from_millis(10000) { + warn!("TODO assume channel not alive because nothing received, but should do CAGET"); + not_alive_count += 1; + } else { + alive_count += 1; + } + } + _ => {} + } + } + self.stats.channel_all_count.__set(self.channels.len() as _); + self.stats.channel_alive_count.__set(alive_count as _); + self.stats.channel_not_alive_count.__set(not_alive_count as _); + Ok(()) + } + // Can return: // Pending, error, work-done (pending state unknown), no-more-work-ever-again. fn handle_peer_ready(&mut self, cx: &mut Context) -> Poll>> { @@ -1993,6 +2006,9 @@ impl CaConn { std::process::exit(13); } } + CaMsgTy::ChannelCloseRes(x) => { + self.handle_channel_close_res(x, tsnow)?; + } _ => { warn!("Received unexpected protocol message {:?}", camsg); } @@ -2084,6 +2100,11 @@ impl CaConn { Ok(()) } + fn handle_channel_close_res(&mut self, k: proto::ChannelCloseRes, tsnow: Instant) -> Result<(), Error> { + info!("{:?}", k); + Ok(()) + } + // `?` works not in here. fn _test_control_flow(&mut self, _cx: &mut Context) -> ControlFlow>> { use ControlFlow::*; @@ -2186,7 +2207,6 @@ impl CaConn { } } CaConnState::PeerReady => { - trace4!("PeerReady"); let res = self.handle_peer_ready(cx); match res { Ready(Some(Ok(()))) => Ok(Ready(Some(()))), @@ -2259,6 +2279,7 @@ impl CaConn { } self.check_channels_state_init(tsnow, cx)?; self.check_channels_state_poll(tsnow, cx)?; + self.check_channels_alive(tsnow, cx)?; // TODO add some random variation if self.channel_status_emit_last + Duration::from_millis(3000) <= tsnow { self.channel_status_emit_last = tsnow; @@ -2292,7 +2313,8 @@ impl CaConn { let chinfo = chst.to_info(chst.cssid(), self.remote_addr_dbg, conf.conf.clone()); channel_statuses.insert(chst.cssid(), chinfo); } - trace!("emit_channel_status {}", channel_statuses.len()); + // trace2!("{:?}", channel_statuses); + // trace!("emit_channel_status {}", channel_statuses.len()); let val = ChannelStatusPartial { channel_statuses }; let item = CaConnEvent { ts: Instant::now(), @@ -2385,7 +2407,9 @@ impl CaConn { FS: Fn(&Q), { use Poll::*; - trace3!("attempt_flush_queue id {} len {}", id, qu.len()); + if qu.len() != 0 { + trace3!("attempt_flush_queue id {:7} len {}", id, qu.len()); + } let mut have_progress = false; let mut i = 0; loop { @@ -2407,7 +2431,7 @@ impl CaConn { if sp.is_sending() { match sp.poll_unpin(cx) { Ready(Ok(())) => { - trace3!("attempt_flush_queue id {} send done", id); + trace3!("attempt_flush_queue id {:7} send done", id); have_progress = true; } Ready(Err(e)) => { diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 2346dfc..adb845c 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -62,6 +62,7 @@ use std::net::SocketAddr; use std::net::SocketAddrV4; use std::pin::Pin; +use netpod::OnDrop; use std::sync::Arc; use std::task::Context; use std::task::Poll; @@ -478,6 +479,15 @@ impl CaConnSet { async fn run(mut this: CaConnSet) -> Result<(), Error> { trace!("CaConnSet run begin"); + let (beacons_cancel_guard_tx, rx) = taskrun::tokio::sync::mpsc::channel(12); + let beacons_jh = tokio::spawn(async move { + if false { + crate::ca::beacons::listen_beacons(rx).await + } else { + Ok(()) + } + }); + let _g_beacon = OnDrop::new(move || {}); loop { let x = this.next().await; match x { @@ -486,6 +496,10 @@ impl CaConnSet { } } trace!("CaConnSet EndOfStream"); + beacons_cancel_guard_tx.send(1).await.ok(); + trace!("CaConnSet beacon cancelled"); + beacons_jh.await?.map_err(|e| Error::from_string(e))?; + trace!("CaConnSet beacon joined"); trace!("join ioc_finder_jh A {:?}", this.find_ioc_query_sender.len()); this.find_ioc_query_sender.as_mut().drop(); trace!("join ioc_finder_jh B {:?}", this.find_ioc_query_sender.len()); @@ -496,6 +510,7 @@ impl CaConnSet { this.connset_out_tx.close(); this.connset_inp_rx.close(); this.shutdown_done = true; + trace!("CaConnSet run done"); Ok(()) } @@ -1061,7 +1076,7 @@ impl CaConnSet { trace2!("ca_conn_consumer ended {}", addr); match ret { Ok(x) => { - trace!("Sending CaConnEventValue::EndOfStream"); + trace!("sending CaConnEventValue::EndOfStream"); tx1.send((addr, CaConnEvent::new_now(CaConnEventValue::EndOfStream(x)))) .await?; } @@ -1081,6 +1096,7 @@ impl CaConnSet { ) -> Result { let mut eos_reason = None; while let Some(item) = conn.next().await { + trace!("ca_conn_item_merge_inner item {item:?}"); if let Some(x) = eos_reason { let e = Error::with_msg_no_trace(format!("CaConn delivered already eos {addr} {x:?}")); error!("{e}"); diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index 5d8bf94..9b918f3 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -46,6 +46,7 @@ pub enum Error { NeitherPendingNorProgress, OutputBufferTooSmall, LogicError, + BadPayload, } const CA_PROTO_VERSION: u32 = 13; @@ -53,10 +54,10 @@ const EPICS_EPOCH_OFFSET: u64 = 631152000; const PAYLOAD_LEN_MAX: u32 = 1024 * 1024 * 32; const PROTO_INPUT_BUF_CAP: u32 = 1024 * 1024 * 40; -const TESTING_UNRESPONSIVE_TODO_REMOVE: bool = true; +const TESTING_UNRESPONSIVE_TODO_REMOVE: bool = false; const TESTING_EVENT_ADD_RES_MAX: u32 = 3; -const TESTING_PROTOCOL_ERROR_TODO_REMOVE: bool = true; +const TESTING_PROTOCOL_ERROR_TODO_REMOVE: bool = false; const TESTING_PROTOCOL_ERROR_AFTER_BYTES: u32 = 400; #[derive(Debug)] @@ -867,6 +868,16 @@ impl CaMsg { let ty = CaMsgTy::EventAddRes(d); CaMsg::from_ty_ts(ty, tsnow) } + 0x0c => { + if payload.len() != 0 { + return Err(Error::BadPayload); + } + let ty = CaMsgTy::ChannelCloseRes(ChannelCloseRes { + sid: hi.param1, + cid: hi.param2, + }); + CaMsg::from_ty_ts(ty, tsnow) + } 0x0f => { if payload.len() == 8 { let v = u64::from_be_bytes(payload.try_into().map_err(|_| Error::BadSlice)?); @@ -1197,6 +1208,8 @@ impl CaProto { self.buf.put_u8(0x55)?; } } + } else { + self.buf.wadv(nf)?; } have_progress = true; self.stats.tcp_recv_bytes().add(nf as _); @@ -1296,11 +1309,16 @@ impl CaProto { let ret = match &msg.ty { CaMsgTy::EventAddRes(..) => { self.stats.data_count().ingest(hi.data_count() as u32); - if TESTING_UNRESPONSIVE_TODO_REMOVE && self.event_add_res_cnt < TESTING_EVENT_ADD_RES_MAX { + if TESTING_UNRESPONSIVE_TODO_REMOVE { + if self.event_add_res_cnt < TESTING_EVENT_ADD_RES_MAX { + self.event_add_res_cnt += 1; + Ok(Some(CaItem::Msg(msg))) + } else { + Ok(None) + } + } else { self.event_add_res_cnt += 1; Ok(Some(CaItem::Msg(msg))) - } else { - Ok(None) } } _ => Ok(Some(CaItem::Msg(msg))), diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index d6578a4..7fd0361 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -1,3 +1,4 @@ +#![allow(unused)] pub mod postingest; pub mod status; diff --git a/scywr/src/insertworker.rs b/scywr/src/insertworker.rs index 89218cf..56ac313 100644 --- a/scywr/src/insertworker.rs +++ b/scywr/src/insertworker.rs @@ -366,10 +366,11 @@ fn inspect_items(item_inp: Receiver>) -> impl Stream { trace3!( - "execute Insert {:?} {:?} {:?}", + "execute Insert {:?} {:?} {:?} {:?}", item.series, item.ts_msp, - item.val.shape() + item.val.shape(), + item ); } QueryItem::TimeBinSimpleF32(_) => { diff --git a/stats/src/stats.rs b/stats/src/stats.rs index 859bc1f..0b54a16 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -346,6 +346,7 @@ stats_proc::stats_struct!(( transition_to_polling, transition_to_polling_already_in, transition_to_polling_bad_state, + channel_add_exists, ), values(inter_ivl_ema, read_ioids_len, proto_out_len,), histolog2s(