diff --git a/.cargo/config.toml b/.cargo/config.toml index c633aa7..bc814c1 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,8 +1,8 @@ [build] rustflags = [ #"-C", "target-cpu=native", - "-C", "target-cpu=sandybridge", - "-C", "force-frame-pointers=yes", + "-C", "target-cpu=skylake", + #"-C", "force-frame-pointers=yes", #"-C", "force-unwind-tables=yes", #"-C", "relocation-model=static", #"-C", "embed-bitcode=no", diff --git a/Cargo.toml b/Cargo.toml index 16f7b42..1ab4739 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ resolver = "2" [profile.release] opt-level = 2 -debug = 0 +debug = 1 overflow-checks = false debug-assertions = false lto = "thin" diff --git a/batchtools/Cargo.toml b/batchtools/Cargo.toml index 2cdee2e..62ff17d 100644 --- a/batchtools/Cargo.toml +++ b/batchtools/Cargo.toml @@ -4,8 +4,11 @@ version = "0.0.1" authors = ["Dominik Werder "] edition = "2021" +[lib] +doctest = false + [dependencies] log = { path = "../log" } err = { path = "../../daqbuffer/crates/err" } taskrun = { path = "../../daqbuffer/crates/taskrun" } -async-channel = "2.0.0" +async-channel = "2.1.0" diff --git a/batchtools/src/channeltest.rs b/batchtools/src/channeltest.rs new file mode 100644 index 0000000..b13ea93 --- /dev/null +++ b/batchtools/src/channeltest.rs @@ -0,0 +1,45 @@ +use async_channel::Receiver; +use async_channel::Sender; +use std::time::Instant; + +#[test] +fn prod_cons() { + let rt = taskrun::get_runtime(); + rt.block_on(run()); +} + +async fn run() { + let n = 10_000_000; + let ts1 = Instant::now(); + let (tx, rx) = async_channel::bounded(1000); + let mut jhs = Vec::new(); + let jh = taskrun::spawn(consumer(rx.clone())); + jhs.push(jh); + let jh = taskrun::spawn(consumer(rx)); + jhs.push(jh); + let jh = taskrun::spawn(producer(tx, n)); + jhs.push(jh); + for jh in jhs { + jh.await.unwrap(); + } + let ts2 = Instant::now(); + let dt = ts2.duration_since(ts1); + eprintln!("dt {:.3} MHz", n as f32 / dt.as_secs_f32() * 1e-6); + panic!(); +} + +async fn producer(tx: Sender, n: u64) { + for i in 0..n { + let item = Item { x: i, y: i }; + tx.send(item).await.unwrap(); + } +} + +async fn consumer(rx: Receiver) { + while let Ok(_x) = rx.recv().await {} +} + +struct Item { + x: u64, + y: u64, +} diff --git a/batchtools/src/lib.rs b/batchtools/src/lib.rs index de0a2da..6a20484 100644 --- a/batchtools/src/lib.rs +++ b/batchtools/src/lib.rs @@ -1 +1,3 @@ pub mod batcher; +#[cfg(test)] +pub mod channeltest; diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index f198163..21efeb7 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -22,6 +22,7 @@ use scywriiq::QueryItem; use stats::DaemonStats; use stats::InsertWorkerStats; use stats::SeriesByChannelStats; +use std::collections::VecDeque; use std::sync::atomic; use std::sync::atomic::AtomicU64; use std::sync::atomic::AtomicUsize; @@ -42,29 +43,18 @@ enum CheckPeriodic { } pub struct DaemonOpts { - backend: String, - local_epics_hostname: String, - array_truncate: u64, - insert_item_queue_cap: usize, pgconf: Database, scyconf: ScyllaConfig, ttls: Ttls, #[allow(unused)] test_bsread_addr: Option, - insert_worker_count: usize, - insert_scylla_sessions: usize, insert_frac: Arc, store_workers_rate: Arc, } -impl DaemonOpts { - pub fn backend(&self) -> &str { - &self.backend - } -} - pub struct Daemon { opts: DaemonOpts, + ingest_opts: CaIngestOpts, tx: Sender, rx: Receiver, insert_queue_counter: Arc, @@ -84,7 +74,7 @@ pub struct Daemon { connset_status_last: CheckPeriodic, // TODO should be a stats object? insert_workers_running: AtomicU64, - query_item_tx_weak: WeakSender, + query_item_tx_weak: WeakSender>, connset_health_lat_ema: f32, } @@ -101,20 +91,20 @@ impl Daemon { .await .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; - let (query_item_tx, query_item_rx) = async_channel::bounded(opts.insert_item_queue_cap); + let (query_item_tx, query_item_rx) = async_channel::bounded(ingest_opts.insert_item_queue_cap()); let query_item_tx_weak = query_item_tx.downgrade(); let insert_queue_counter = Arc::new(AtomicUsize::new(0)); // Insert queue hook - let query_item_rx = inserthook::active_channel_insert_hook(query_item_rx); + // let query_item_rx = inserthook::active_channel_insert_hook(query_item_rx); let conn_set_ctrl = CaConnSet::start( - opts.backend.clone(), - opts.local_epics_hostname.clone(), + ingest_opts.backend().into(), + ingest_opts.local_epics_hostname(), query_item_tx, channel_info_query_tx, - ingest_opts, + ingest_opts.clone(), ); // TODO remove @@ -140,24 +130,44 @@ impl Daemon { } }); - let use_rate_limit_queue = true; + // #[cfg(DISABLED)] + let query_item_rx = { + // TODO only testing, remove + tokio::spawn({ + let rx = query_item_rx; + async move { + while let Ok(item) = rx.recv().await { + drop(item); + } + } + }); + let (tx, rx) = async_channel::bounded(128); + tokio::spawn(async move { + loop { + tokio::time::sleep(Duration::from_millis(2000)).await; + tx.len(); + } + }); + rx + }; let ttls = opts.ttls.clone(); let insert_worker_opts = InsertWorkerOpts { store_workers_rate: opts.store_workers_rate.clone(), insert_workers_running: Arc::new(AtomicU64::new(0)), insert_frac: opts.insert_frac.clone(), - array_truncate: Arc::new(AtomicU64::new(opts.array_truncate)), + array_truncate: Arc::new(AtomicU64::new(ingest_opts.array_truncate())), }; let insert_worker_opts = Arc::new(insert_worker_opts); let insert_workers_jh = scywr::insertworker::spawn_scylla_insert_workers( opts.scyconf.clone(), - opts.insert_scylla_sessions, - opts.insert_worker_count, + ingest_opts.insert_scylla_sessions(), + ingest_opts.insert_worker_count(), + ingest_opts.insert_worker_concurrency(), query_item_rx, insert_worker_opts, insert_worker_stats.clone(), - use_rate_limit_queue, + ingest_opts.use_rate_limit_queue(), ttls, ) .await?; @@ -199,6 +209,7 @@ impl Daemon { let ret = Self { opts, + ingest_opts, tx: daemon_ev_tx, rx: daemon_ev_rx, insert_queue_counter, @@ -230,7 +241,7 @@ impl Daemon { async fn check_caconn_chans(&mut self, ts1: Instant) -> Result<(), Error> { match &self.connset_status_last { CheckPeriodic::Waiting(since) => { - if *since + Duration::from_millis(500) < ts1 { + if *since + Duration::from_millis(2000) < ts1 { self.connset_ctrl.check_health().await?; self.connset_status_last = CheckPeriodic::Ongoing(ts1); } @@ -297,9 +308,9 @@ impl Daemon { // debug!("handle_channel_add {ch:?}"); self.connset_ctrl .add_channel( - self.opts.backend.clone(), + self.ingest_opts.backend().into(), ch.id().into(), - self.opts.local_epics_hostname.clone(), + self.ingest_opts.local_epics_hostname(), ) .await?; Ok(()) @@ -365,23 +376,23 @@ impl Daemon { use CaConnSetItem::*; match item { Healthy(ts1, ts2) => { - let ts3 = Instant::now(); - let dt1 = ts2.duration_since(ts1).as_secs_f32() * 1e3; - let dt2 = ts3.duration_since(ts2).as_secs_f32() * 1e3; + let tsnow = Instant::now(); + let dt1 = tsnow.duration_since(ts1).as_secs_f32() * 1e3; + let dt2 = tsnow.duration_since(ts2).as_secs_f32() * 1e3; match &self.connset_status_last { CheckPeriodic::Waiting(_since) => { error!("received CaConnSet health report without having asked {dt1:.0} ms {dt2:.0} ms"); } CheckPeriodic::Ongoing(since) => { // TODO insert response time as series to scylla. - let dtsince = ts3.duration_since(*since).as_secs_f32() * 1e6; + let dtsince = tsnow.duration_since(*since).as_secs_f32() * 1e3; { let v = &mut self.connset_health_lat_ema; *v += (dtsince - *v) * 0.2; self.stats.connset_health_lat_ema().set(*v as _); } - // debug!("======================================== received CaConnSet healthy dtsince {dtsince:.0} ms {dt1:.0} ms {dt2:.0} ms"); - self.connset_status_last = CheckPeriodic::Waiting(ts3); + debug!("received CaConnSet Healthy dtsince {dtsince:.0} ms {dt1:.0} ms {dt2:.0} ms"); + self.connset_status_last = CheckPeriodic::Waiting(tsnow); self.stats.caconnset_health_response().inc(); } } @@ -583,10 +594,6 @@ pub async fn run(opts: CaIngestOpts, channels: Vec) -> Result<(), Error> let store_workers_rate = Arc::new(AtomicU64::new(opts.store_workers_rate())); let opts2 = DaemonOpts { - backend: opts.backend().into(), - local_epics_hostname: opts.local_epics_hostname().into(), - array_truncate: opts.array_truncate(), - insert_item_queue_cap: opts.insert_item_queue_cap(), pgconf: opts.postgresql_config().clone(), scyconf: opts.scylla_config().clone(), ttls: Ttls { @@ -596,8 +603,6 @@ pub async fn run(opts: CaIngestOpts, channels: Vec) -> Result<(), Error> binned: opts.ttl_binned(), }, test_bsread_addr: opts.test_bsread_addr.clone(), - insert_worker_count: opts.insert_worker_count(), - insert_scylla_sessions: opts.insert_scylla_sessions(), insert_frac: insert_frac.clone(), store_workers_rate, }; diff --git a/daqingest/src/opts.rs b/daqingest/src/opts.rs index 2332ad9..5497ad4 100644 --- a/daqingest/src/opts.rs +++ b/daqingest/src/opts.rs @@ -1,14 +1,12 @@ -use clap::ArgAction::Count; -use clap::Parser; #[cfg(feature = "bsread")] use ingest_bsread::zmtp::ZmtpClientOpts; use std::net::SocketAddr; -#[derive(Debug, Parser)] +#[derive(Debug, clap::Parser)] #[command(author, version, about)] pub struct DaqIngestOpts { - #[arg(long, action(Count))] - pub verbose: u32, + #[arg(long, action(clap::ArgAction::Count))] + pub verbose: u8, #[clap(long)] pub tag: Option, #[command(subcommand)] @@ -17,7 +15,7 @@ pub struct DaqIngestOpts { pub nworkers: Option, } -#[derive(Debug, Parser)] +#[derive(Debug, clap::Parser)] pub enum SubCmd { ListPkey, ListPulses, @@ -31,7 +29,7 @@ pub enum SubCmd { Version, } -#[derive(Debug, Parser)] +#[derive(Debug, clap::Parser)] pub struct Bsread { #[arg(long)] pub backend: String, @@ -61,7 +59,7 @@ impl From for ZmtpClientOpts { } } -#[derive(Debug, Parser)] +#[derive(Debug, clap::Parser)] pub struct FetchEvents { #[arg(long, num_args(1..))] pub scylla: Vec, @@ -71,24 +69,24 @@ pub struct FetchEvents { pub backend: String, } -#[derive(Debug, Parser)] +#[derive(Debug, clap::Parser)] pub struct BsreadDump { pub source: String, } -#[derive(Debug, Parser)] +#[derive(Debug, clap::Parser)] pub enum ChannelAccess { CaIngest(CaConfig), #[cfg(DISABLED)] CaSearch(CaSearch), } -#[derive(Debug, Parser)] +#[derive(Debug, clap::Parser)] pub struct CaSearch { pub config: String, } -#[derive(Debug, Parser)] +#[derive(Debug, clap::Parser)] pub struct CaConfig { pub config: String, } diff --git a/ingest-linux/src/net.rs b/ingest-linux/src/net.rs index a752a15..d4c698b 100644 --- a/ingest-linux/src/net.rs +++ b/ingest-linux/src/net.rs @@ -20,6 +20,7 @@ pub fn local_hostname() -> String { let hostname = CStr::from_ptr(&buf[0] as *const _ as _); hostname.to_str().unwrap() }; + log::info!("---------------------- found hostname {hostname:?}"); hostname.into() } diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index d39a05e..0cb4032 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -41,11 +41,15 @@ use scywriiq::QueryItem; use serde::Serialize; use series::ChannelStatusSeriesId; use series::SeriesId; +use stats::rand_xoshiro::rand_core::RngCore; +use stats::rand_xoshiro::rand_core::SeedableRng; +use stats::rand_xoshiro::Xoshiro128StarStar; use stats::CaConnStats; use stats::CaProtoStats; use stats::IntervalEma; use stats::XorShift32; use std::collections::BTreeMap; +use std::collections::HashMap; use std::collections::VecDeque; use std::net::SocketAddrV4; use std::ops::ControlFlow; @@ -149,10 +153,10 @@ fn ser_instant(val: &Option, ser: S) -> Result, cid_store: CidStore, subid_store: SubidStore, - channels: BTreeMap, - cid_by_name: BTreeMap, - cid_by_subid: BTreeMap, - name_by_cid: BTreeMap, - time_binners: BTreeMap, + channels: HashMap, + cid_by_name: HashMap, + cid_by_subid: HashMap, + name_by_cid: HashMap, + time_binners: HashMap, init_state_count: u64, insert_item_queue: VecDeque, remote_addr_dbg: SocketAddrV4, @@ -541,14 +545,14 @@ pub struct CaConn { ioc_ping_last: Instant, ioc_ping_next: Instant, ioc_ping_start: Option, - storage_insert_sender: Pin>>, + storage_insert_sender: Pin>>>, ca_conn_event_out_queue: VecDeque, channel_info_query_queue: VecDeque, channel_info_query_sending: Pin>>, thr_msg_poll: ThrottleTrace, ca_proto_stats: Arc, weird_count: usize, - rng: XorShift32, + rng: Xoshiro128StarStar, } #[cfg(DISABLED)] @@ -564,13 +568,13 @@ impl CaConn { backend: String, remote_addr_dbg: SocketAddrV4, local_epics_hostname: String, - storage_insert_tx: Sender, + storage_insert_tx: Sender>, channel_info_query_tx: Sender, stats: Arc, ca_proto_stats: Arc, ) -> Self { let (cq_tx, cq_rx) = async_channel::bounded(32); - let mut rng = XorShift32::new_from_time(); + let mut rng = stats::xoshiro_from_time(); Self { opts, backend, @@ -580,16 +584,16 @@ impl CaConn { cid_store: CidStore::new_from_time(), subid_store: SubidStore::new_from_time(), init_state_count: 0, - channels: BTreeMap::new(), - cid_by_name: BTreeMap::new(), - cid_by_subid: BTreeMap::new(), - name_by_cid: BTreeMap::new(), - time_binners: BTreeMap::new(), + channels: HashMap::new(), + cid_by_name: HashMap::new(), + cid_by_subid: HashMap::new(), + name_by_cid: HashMap::new(), + time_binners: HashMap::new(), insert_item_queue: VecDeque::new(), remote_addr_dbg, local_epics_hostname, stats, - insert_ivl_min_mus: 1000 * 6, + insert_ivl_min_mus: 1000 * 4, conn_command_tx: Box::pin(cq_tx), conn_command_rx: Box::pin(cq_rx), conn_backoff: 0.02, @@ -610,8 +614,8 @@ impl CaConn { } } - fn ioc_ping_ivl_rng(rng: &mut XorShift32) -> Duration { - IOC_PING_IVL * 100 / (70 + (rng.next() % 60)) + fn ioc_ping_ivl_rng(rng: &mut Xoshiro128StarStar) -> Duration { + IOC_PING_IVL * 100 / (70 + (rng.next_u32() % 60)) } fn new_self_ticker() -> Pin> { @@ -813,7 +817,7 @@ impl CaConn { fn handle_conn_command(&mut self, cx: &mut Context) -> Result>, Error> { // TODO if this loops for too long time, yield and make sure we get wake up again. use Poll::*; - self.stats.caconn_loop3_count.inc(); + self.stats.loop3_count.inc(); if self.is_shutdown() { Ok(Ready(None)) } else { @@ -896,11 +900,11 @@ impl CaConn { fn channel_remove_expl( name: String, - channels: &mut BTreeMap, - cid_by_name: &mut BTreeMap, - name_by_cid: &mut BTreeMap, + channels: &mut HashMap, + cid_by_name: &mut HashMap, + name_by_cid: &mut HashMap, cid_store: &mut CidStore, - time_binners: &mut BTreeMap, + time_binners: &mut HashMap, ) { let cid = Self::cid_by_name_expl(&name, cid_by_name, name_by_cid, cid_store); if channels.contains_key(&cid) { @@ -924,8 +928,8 @@ impl CaConn { fn cid_by_name_expl( name: &str, - cid_by_name: &mut BTreeMap, - name_by_cid: &mut BTreeMap, + cid_by_name: &mut HashMap, + name_by_cid: &mut HashMap, cid_store: &mut CidStore, ) -> Cid { if let Some(cid) = cid_by_name.get(name) { @@ -1214,9 +1218,7 @@ impl CaConn { let ema = em.ema(); let ivl_min = (insert_ivl_min_mus as f32) * 1e-6; let dt = (ivl_min - ema).max(0.) / em.k(); - st.insert_next_earliest = tsnow - .checked_add(Duration::from_micros((dt * 1e6) as u64)) - .ok_or_else(|| Error::with_msg_no_trace("time overflow in next insert"))?; + st.insert_next_earliest = tsnow + Duration::from_micros((dt * 1e6) as u64); let ts_msp_last = st.ts_msp_last; // TODO get event timestamp from channel access field let ts_msp_grid = (ts / TS_MSP_GRID_UNIT / TS_MSP_GRID_SPACING * TS_MSP_GRID_SPACING) as u32; @@ -1555,7 +1557,7 @@ impl CaConn { } CaMsgTy::EventAddRes(k) => { trace4!("got EventAddRes: {k:?}"); - self.stats.caconn_recv_data.inc(); + self.stats.event_add_res_recv.inc(); let res = Self::handle_event_add_res(self, k, tsnow); let ts2 = Instant::now(); self.stats @@ -1866,7 +1868,7 @@ impl CaConn { let tsnow = Instant::now(); let mut have_progress = false; for _ in 0..64 { - self.stats.caconn_loop2_count.inc(); + self.stats.loop2_count.inc(); if self.is_shutdown() { break; } else if self.insert_item_queue.len() >= self.opts.insert_queue_max { @@ -1963,21 +1965,50 @@ impl CaConn { fn attempt_flush_storage_queue(mut self: Pin<&mut Self>, cx: &mut Context) -> Result>, Error> { use Poll::*; + let (qu, sd, stats) = Self::storage_queue_vars(&mut self); + { + let n = qu.len(); + if n >= 128 { + stats.storage_queue_above_128().inc(); + } else if n >= 32 { + stats.storage_queue_above_32().inc(); + } else if n >= 8 { + stats.storage_queue_above_8().inc(); + } + } let mut have_progress = false; - for _ in 0..128 { - let sd = &mut self.storage_insert_sender; + let mut i = 0; + loop { + i += 1; + if i > 120 { + break; + } + if !sd.has_sender() { + return Err(Error::with_msg_no_trace("attempt_flush_storage_queue no more sender")); + } if sd.is_idle() { - if let Some(item) = self.insert_item_queue.pop_front() { - self.storage_insert_sender.as_mut().send_pin(item); + if qu.len() != 0 { + let item: VecDeque<_> = qu.drain(..).collect(); + stats.storage_queue_send().add(item.len() as _); + sd.as_mut().send_pin(item); + } else { + break; } } - if self.storage_insert_sender.is_sending() { - match self.storage_insert_sender.poll_unpin(cx) { + if sd.is_sending() { + match sd.poll_unpin(cx) { Ready(Ok(())) => { have_progress = true; } - Ready(Err(_)) => return Err(Error::with_msg_no_trace("can not send into channel")), - Pending => return Ok(Pending), + Ready(Err(_)) => { + return Err(Error::with_msg_no_trace( + "attempt_flush_storage_queue can not send into channel", + )); + } + Pending => { + stats.storage_queue_pending().inc(); + return Ok(Pending); + } } } } @@ -1988,12 +2019,32 @@ impl CaConn { } } + // TODO refactor, put together in separate type: + fn storage_queue_vars( + this: &mut CaConn, + ) -> ( + &mut VecDeque, + &mut Pin>>>, + &CaConnStats, + ) { + ( + &mut this.insert_item_queue, + &mut this.storage_insert_sender, + &this.stats, + ) + } + fn attempt_flush_channel_info_query(mut self: Pin<&mut Self>, cx: &mut Context) -> Result>, Error> { use Poll::*; if self.is_shutdown() { Ok(Ready(None)) } else { let sd = self.channel_info_query_sending.as_mut(); + if !sd.has_sender() { + return Err(Error::with_msg_no_trace( + "attempt_flush_channel_info_query no more sender", + )); + } if sd.is_idle() { if let Some(item) = self.channel_info_query_queue.pop_front() { trace3!("send series query {item:?}"); @@ -2005,7 +2056,9 @@ impl CaConn { if sd.is_sending() { match sd.poll_unpin(cx) { Ready(Ok(())) => Ok(Ready(Some(()))), - Ready(Err(_)) => Err(Error::with_msg_no_trace("can not send into channel")), + Ready(Err(_)) => Err(Error::with_msg_no_trace( + "attempt_flush_channel_info_query can not send into channel", + )), Pending => Ok(Pending), } } else { @@ -2020,11 +2073,11 @@ impl Stream for CaConn { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - self.stats.caconn_poll_count.inc(); + self.stats.poll_count().inc(); let poll_ts1 = Instant::now(); - self.stats.ca_conn_poll_fn_begin().inc(); + self.stats.poll_fn_begin().inc(); let ret = loop { - self.stats.ca_conn_poll_loop_begin().inc(); + self.stats.poll_loop_begin().inc(); let qlen = self.insert_item_queue.len(); if qlen >= self.opts.insert_queue_max * 2 / 3 { self.stats.insert_item_queue_pressure().inc(); @@ -2108,10 +2161,10 @@ impl Stream for CaConn { } else { // debug!("queues_out_flushed false"); if have_progress { - self.stats.ca_conn_poll_reloop().inc(); + self.stats.poll_reloop().inc(); continue; } else if have_pending { - self.stats.ca_conn_poll_pending().inc(); + self.stats.poll_pending().inc(); Pending } else { // TODO error @@ -2123,13 +2176,13 @@ impl Stream for CaConn { } } else { if have_progress { - self.stats.ca_conn_poll_reloop().inc(); + self.stats.poll_reloop().inc(); continue; } else if have_pending { - self.stats.ca_conn_poll_pending().inc(); + self.stats.poll_pending().inc(); Pending } else { - self.stats.ca_conn_poll_no_progress_no_pending().inc(); + self.stats.poll_no_progress_no_pending().inc(); let e = Error::with_msg_no_trace("no progress no pending"); Ready(Some(Err(e))) } diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 45bba2d..2b872af 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -261,7 +261,13 @@ impl CaConnSetCtrl { pub async fn check_health(&self) -> Result<(), Error> { let cmd = ConnSetCmd::CheckHealth(Instant::now()); + let n = self.tx.len(); + if n > 0 { + debug!("check_health self.tx.len() {:?}", n); + } + let s = format!("{:?}", cmd); self.tx.send(CaConnSetEvent::ConnSetCmd(cmd)).await?; + debug!("check_health enqueued {s}"); Ok(()) } @@ -345,9 +351,9 @@ pub struct CaConnSet { find_ioc_query_queue: VecDeque, find_ioc_query_sender: Pin>>, find_ioc_res_rx: Pin>>>, - storage_insert_tx: Pin>>, - storage_insert_queue: VecDeque, - storage_insert_sender: Pin>>, + storage_insert_tx: Pin>>>, + storage_insert_queue: VecDeque>, + storage_insert_sender: Pin>>>, ca_conn_res_tx: Pin>>, ca_conn_res_rx: Pin>>, connset_out_queue: VecDeque, @@ -361,7 +367,6 @@ pub struct CaConnSet { await_ca_conn_jhs: VecDeque<(SocketAddr, JoinHandle>)>, thr_msg_poll_1: ThrottleTrace, thr_msg_storage_len: ThrottleTrace, - did_connset_out_queue: bool, ca_proto_stats: Arc, rogue_channel_count: u64, connect_fail_count: usize, @@ -371,7 +376,7 @@ impl CaConnSet { pub fn start( backend: String, local_epics_hostname: String, - storage_insert_tx: Sender, + storage_insert_tx: Sender>, channel_info_query_tx: Sender, ingest_opts: CaIngestOpts, ) -> CaConnSetCtrl { @@ -422,7 +427,6 @@ impl CaConnSet { await_ca_conn_jhs: VecDeque::new(), thr_msg_poll_1: ThrottleTrace::new(Duration::from_millis(2000)), thr_msg_storage_len: ThrottleTrace::new(Duration::from_millis(1000)), - did_connset_out_queue: false, ca_proto_stats: ca_proto_stats.clone(), rogue_channel_count: 0, connect_fail_count: 0, @@ -491,7 +495,8 @@ impl CaConnSet { CaConnEventValue::EchoTimeout => Ok(()), CaConnEventValue::ConnCommandResult(x) => self.handle_conn_command_result(addr, x), CaConnEventValue::QueryItem(item) => { - self.storage_insert_queue.push_back(item); + todo!("remove this insert case"); + // self.storage_insert_queue.push_back(item); Ok(()) } CaConnEventValue::ChannelCreateFail(x) => self.handle_channel_create_fail(addr, x), @@ -743,7 +748,7 @@ impl CaConnSet { } fn handle_check_health(&mut self, ts1: Instant) -> Result<(), Error> { - debug!("handle_check_health"); + trace2!("handle_check_health"); if self.shutdown_stopping { return Ok(()); } @@ -754,15 +759,11 @@ impl CaConnSet { self.check_channel_states()?; // Trigger already the next health check, but use the current data that we have. - - // TODO try to deliver a command to CaConn - // Add some queue for commands to CaConn to the ress. - // Fail here if that queue gets too long. - // Try to push the commands periodically. + // TODO do the full check before sending the reply to daemon. for (_, res) in self.ca_conn_ress.iter_mut() { let item = ConnCommand::check_health(); res.cmd_queue.push_back(item); - debug!( + trace2!( "handle_check_health pushed check command {:?} {:?}", res.cmd_queue.len(), res.sender.len() @@ -822,7 +823,7 @@ impl CaConnSet { } fn apply_ca_conn_health_update(&mut self, addr: SocketAddr, res: CheckHealthResult) -> Result<(), Error> { - debug!("apply_ca_conn_health_update {addr}"); + trace2!("apply_ca_conn_health_update {addr}"); let tsnow = SystemTime::now(); self.rogue_channel_count = 0; for (k, v) in res.channel_statuses { @@ -993,7 +994,7 @@ impl CaConnSet { async fn ca_conn_item_merge( conn: CaConn, tx1: Sender<(SocketAddr, CaConnEvent)>, - tx2: Sender, + tx2: Sender>, addr: SocketAddr, stats: Arc, ) -> Result<(), Error> { @@ -1005,10 +1006,13 @@ impl CaConnSet { while let Some(item) = conn.next().await { match item { Ok(item) => { - connstats.conn_item_count.inc(); + connstats.item_count.inc(); match item.value { CaConnEventValue::QueryItem(x) => { - if let Err(_) = tx2.send(x).await { + warn!("ca_conn_item_merge should not go here often"); + let mut v = VecDeque::new(); + v.push_back(x); + if let Err(_) = tx2.send(v).await { break; } } @@ -1076,7 +1080,9 @@ impl CaConnSet { }; } let item = QueryItem::ChannelStatus(item); - self.storage_insert_queue.push_back(item); + let mut v = VecDeque::new(); + v.push_back(item); + self.storage_insert_queue.push_back(v); Ok(()) } @@ -1423,35 +1429,37 @@ impl CaConnSet { (search_pending, assigned_without_health_update) } - fn try_push_ca_conn_cmds(&mut self, cx: &mut Context) { + fn try_push_ca_conn_cmds(&mut self, cx: &mut Context) -> Result<(), Error> { use Poll::*; for (_, v) in self.ca_conn_ress.iter_mut() { - 'level2: loop { - let tx = &mut v.sender; - if v.cmd_queue.len() != 0 || tx.is_sending() { - debug!("try_push_ca_conn_cmds {:?} {:?}", v.cmd_queue.len(), tx.len()); + let tx = &mut v.sender; + loop { + if false { + if v.cmd_queue.len() != 0 || tx.is_sending() { + debug!("try_push_ca_conn_cmds {:?} {:?}", v.cmd_queue.len(), tx.len()); + } } - loop { - break if tx.is_sending() { - match tx.poll_unpin(cx) { - Ready(Ok(())) => { - self.stats.try_push_ca_conn_cmds_sent.inc(); - continue; - } - Ready(Err(e)) => { - error!("try_push_ca_conn_cmds {e}"); - } - Pending => { - break 'level2; - } + break if tx.is_sending() { + match tx.poll_unpin(cx) { + Ready(Ok(())) => { + self.stats.try_push_ca_conn_cmds_sent.inc(); + continue; } - } else if let Some(item) = v.cmd_queue.pop_front() { - tx.as_mut().send_pin(item); - continue; - }; - } + Ready(Err(e)) => { + error!("try_push_ca_conn_cmds {e}"); + return Err(Error::with_msg_no_trace(format!("{e}"))); + } + Pending => (), + } + } else if let Some(item) = v.cmd_queue.pop_front() { + tx.as_mut().send_pin(item); + continue; + } else { + () + }; } } + Ok(()) } } @@ -1459,9 +1467,11 @@ impl Stream for CaConnSet { type Item = CaConnSetItem; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + trace4!("CaConnSet poll begin"); use Poll::*; self.stats.poll_fn_begin().inc(); - loop { + let res = loop { + trace4!("CaConnSet poll loop"); self.stats.poll_loop_begin().inc(); self.stats.storage_insert_tx_len.set(self.storage_insert_tx.len() as _); @@ -1485,15 +1495,12 @@ impl Stream for CaConnSet { let mut have_pending = false; let mut have_progress = false; - self.try_push_ca_conn_cmds(cx); + if let Err(e) = self.try_push_ca_conn_cmds(cx) { + break Ready(Some(CaConnSetItem::Error(e))); + } - if self.did_connset_out_queue { - self.did_connset_out_queue = false; - } else { - if let Some(item) = self.connset_out_queue.pop_front() { - self.did_connset_out_queue = true; - break Ready(Some(item)); - } + if let Some(item) = self.connset_out_queue.pop_front() { + break Ready(Some(item)); } if let Some((addr, jh)) = self.await_ca_conn_jhs.front_mut() { @@ -1634,7 +1641,9 @@ impl Stream for CaConnSet { } Err(e) => break Ready(Some(CaConnSetItem::Error(e))), }, - Ready(None) => {} + Ready(None) => { + warn!("connset_inp_rx broken?") + } Pending => { have_pending = true; } @@ -1663,6 +1672,8 @@ impl Stream for CaConnSet { } } }; - } + }; + trace4!("CaConnSet poll done"); + res } } diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index ca7afaa..22a5940 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -1,13 +1,12 @@ use crate::netbuf; use err::thiserror; use err::ThisError; -use futures_util::pin_mut; use futures_util::Stream; use log::*; use netpod::timeunits::*; use slidebuf::SlideBuf; use stats::CaProtoStats; -use std::collections::BTreeMap; +use std::collections::HashMap; use std::collections::VecDeque; use std::io; use std::net::SocketAddrV4; @@ -46,6 +45,8 @@ pub enum Error { ParseAttemptInDoneState, UnexpectedHeader, ExtendedHeaderBadCount, + NoReadBufferSpace, + NeitherPendingNorProgress, } const CA_PROTO_VERSION: u16 = 13; @@ -1016,8 +1017,9 @@ pub struct CaProto { outbuf: SlideBuf, out: VecDeque, array_truncate: usize, - logged_proto_error_for_cid: BTreeMap, + logged_proto_error_for_cid: HashMap, stats: Arc, + resqu: VecDeque, } impl CaProto { @@ -1026,12 +1028,13 @@ impl CaProto { tcp, remote_addr_dbg, state: CaState::StdHead, - buf: SlideBuf::new(1024 * 1024 * 4), + buf: SlideBuf::new(1024 * 1024 * 8), outbuf: SlideBuf::new(1024 * 128), out: VecDeque::new(), array_truncate, - logged_proto_error_for_cid: BTreeMap::new(), + logged_proto_error_for_cid: HashMap::new(), stats, + resqu: VecDeque::with_capacity(256), } } @@ -1063,14 +1066,14 @@ impl CaProto { } } - fn attempt_output(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + fn attempt_output(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; let (w, b) = self.outbuf_conn(); - pin_mut!(w); + let w = Pin::new(w); match w.poll_write(cx, b) { Ready(k) => match k { Ok(k) => match self.outbuf.adv(k) { - Ok(()) => Ready(Ok(())), + Ok(()) => Ready(Ok(k)), Err(e) => { error!("advance error {:?}", e); Ready(Err(e.into())) @@ -1085,13 +1088,12 @@ impl CaProto { } } - fn loop_body(mut self: Pin<&mut Self>, cx: &mut Context) -> Result>, Error> { + fn loop_body(mut self: Pin<&mut Self>, cx: &mut Context) -> Result, Error> { use Poll::*; + let mut have_pending = false; + let mut have_progress = false; let tsnow = Instant::now(); - let output_res_1: Option> = 'll1: loop { - if self.out.len() == 0 { - break None; - } + 'l1: while self.out.len() != 0 { while let Some((msg, buf)) = self.out_msg_buf() { let msglen = msg.len(); if msglen > buf.len() { @@ -1103,168 +1105,184 @@ impl CaProto { self.out.pop_front(); } } - while self.outbuf.len() > 0 { + while self.outbuf.len() != 0 { match Self::attempt_output(self.as_mut(), cx)? { - Ready(()) => {} + Ready(n) => { + if n != 0 { + have_progress = true; + } else { + // Should not occur to begin with. TODO restructure. + break 'l1; + } + } Pending => { - break 'll1 Some(Pending); + have_pending = true; + break 'l1; } } } - }; - let output_res_2: Option> = if let Some(Pending) = output_res_1 { - Some(Pending) - } else { - loop { - if self.outbuf.len() == 0 { - break None; + } + 'l1: while self.outbuf.len() != 0 { + match Self::attempt_output(self.as_mut(), cx)? { + Ready(n) => { + if n != 0 { + have_progress = true; + } else { + // Should not occur to begin with. TODO restructure. + break 'l1; + } } - match Self::attempt_output(self.as_mut(), cx)? { - Ready(()) => {} - Pending => break Some(Pending), + Pending => { + have_pending = true; + break 'l1; } } - }; + } let need_min = self.state.need_min(); - let read_res = { - if self.buf.cap() < need_min { - self.state = CaState::Done; - let e = Error::BufferTooSmallForNeedMin(self.buf.cap(), self.state.need_min()); - Err(e) - } else if self.buf.len() < need_min { - let (w, mut rbuf) = self.inpbuf_conn(need_min)?; - pin_mut!(w); - match w.poll_read(cx, &mut rbuf) { - Ready(k) => match k { - Ok(()) => { - let nf = rbuf.filled().len(); - if nf == 0 { - info!( - "EOF peer {:?} {:?} {:?}", - self.tcp.peer_addr(), - self.remote_addr_dbg, - self.state - ); - // TODO may need another state, if not yet done when input is EOF. - self.state = CaState::Done; - Ok(Some(Ready(CaItem::empty()))) - } else { - if false { - info!("received {} bytes", rbuf.filled().len()); - let t = rbuf.filled().len().min(32); - info!("received data {:?}", &rbuf.filled()[0..t]); + if self.buf.cap() < need_min { + self.state = CaState::Done; + let e = Error::BufferTooSmallForNeedMin(self.buf.cap(), self.state.need_min()); + return Err(e); + } + if self.buf.len() < need_min { + let (w, mut rbuf) = self.inpbuf_conn(need_min)?; + if rbuf.remaining() == 0 { + return Err(Error::NoReadBufferSpace); + } + let w = Pin::new(w); + match w.poll_read(cx, &mut rbuf) { + Ready(k) => match k { + Ok(()) => { + let nf = rbuf.filled().len(); + if nf == 0 { + info!( + "EOF peer {:?} {:?} {:?}", + self.tcp.peer_addr(), + self.remote_addr_dbg, + self.state + ); + // TODO may need another state, if not yet done when input is EOF. + self.state = CaState::Done; + } else { + if false { + info!("received {} bytes", rbuf.filled().len()); + let t = rbuf.filled().len().min(32); + info!("received data {:?}", &rbuf.filled()[0..t]); + } + match self.buf.wadv(nf) { + Ok(()) => { + have_progress = true; + self.stats.tcp_recv_bytes().add(nf as _); + self.stats.tcp_recv_count().inc(); } - match self.buf.wadv(nf) { - Ok(()) => { - self.stats.tcp_recv_bytes().add(nf as _); - self.stats.tcp_recv_count().inc(); - Ok(Some(Ready(CaItem::empty()))) - } - Err(e) => { - error!("netbuf wadv fail nf {nf}"); - Err(e.into()) - } + Err(e) => { + error!("netbuf wadv fail nf {nf}"); + return Err(e.into()); } } } - Err(e) => Err(e.into()), - }, - Pending => Ok(Some(Pending)), + } + Err(e) => { + return Err(e.into()); + } + }, + Pending => { + have_pending = true; } + } + } + while self.resqu.len() < self.resqu.capacity() { + if let Some(item) = self.parse_item(tsnow)? { + have_progress = true; + self.resqu.push_back(item); } else { - Ok(None) + break; } - }?; - let parse_res: Option = self.parse_item(tsnow)?; - match (output_res_2, read_res, parse_res) { - (_, _, Some(item)) => Ok(Some(Ready(item))), - (Some(Pending), _, _) => Ok(Some(Pending)), - (_, Some(Pending), _) => Ok(Some(Pending)), - (_, None, None) => { - // TODO constrain how often we can go to this case consecutively. - Ok(None) - } - (_, Some(_), None) => Ok(None), + } + if have_progress { + Ok(Ready(())) + } else if have_pending { + Ok(Pending) + } else { + Err(Error::NeitherPendingNorProgress) } } fn parse_item(&mut self, tsnow: Instant) -> Result, Error> { - loop { - if self.buf.len() < self.state.need_min() { - break Ok(None); - } - break match &self.state { - CaState::StdHead => { - let hi = HeadInfo::from_netbuf(&mut self.buf)?; - if hi.cmdid == 1 || hi.cmdid == 15 { - let sid = hi.param1; - if hi.payload_size == 0xffff { - if hi.data_count != 0 { - warn!("protocol error: {hi:?}"); - return Err(Error::ExtendedHeaderBadCount); - } + if self.buf.len() < self.state.need_min() { + return Ok(None); + } + match &self.state { + CaState::StdHead => { + let hi = HeadInfo::from_netbuf(&mut self.buf)?; + if hi.cmdid == 1 || hi.cmdid == 15 { + let sid = hi.param1; + if hi.payload_size == 0xffff { + if hi.data_count != 0 { + warn!("protocol error: {hi:?}"); + return Err(Error::ExtendedHeaderBadCount); } - if hi.payload_size == 0xffff { - } else if hi.payload_size > 16368 { - self.stats.payload_std_too_large().inc(); - } - } - if hi.cmdid > 26 { - // TODO count as logic error - self.stats.protocol_issue().inc(); } if hi.payload_size == 0xffff { - self.state = CaState::ExtHead(hi); - Ok(None) - } else { - // For extended messages, ingest on receive of extended header - self.stats.payload_size().ingest(hi.payload_len() as u32); - if hi.payload_size == 0 { - self.state = CaState::StdHead; - let msg = CaMsg::from_proto_infos(&hi, &[], tsnow, self.array_truncate)?; - Ok(Some(CaItem::Msg(msg))) - } else { - self.state = CaState::Payload(hi); - Ok(None) - } + } else if hi.payload_size > 16368 { + self.stats.payload_std_too_large().inc(); } } - CaState::ExtHead(hi) => { - let payload_size = self.buf.read_u32_be()?; - let data_count = self.buf.read_u32_be()?; + if hi.cmdid > 26 { + // TODO count as logic error + self.stats.protocol_issue().inc(); + } + if hi.payload_size == 0xffff { + self.state = CaState::ExtHead(hi); + Ok(None) + } else { + // For extended messages, ingest on receive of extended header self.stats.payload_size().ingest(hi.payload_len() as u32); - if payload_size > 1024 * 1024 * 32 { - self.stats.payload_ext_very_large().inc(); - if false { - warn!( - "ExtHead data_type {} payload_size {payload_size} data_count {data_count}", - hi.data_type - ); - } + if hi.payload_size == 0 { + self.state = CaState::StdHead; + let msg = CaMsg::from_proto_infos(&hi, &[], tsnow, self.array_truncate)?; + Ok(Some(CaItem::Msg(msg))) + } else { + self.state = CaState::Payload(hi); + Ok(None) } - if payload_size <= 16368 { - self.stats.payload_ext_but_small().inc(); + } + } + CaState::ExtHead(hi) => { + let payload_size = self.buf.read_u32_be()?; + let data_count = self.buf.read_u32_be()?; + self.stats.payload_size().ingest(hi.payload_len() as u32); + if payload_size > 1024 * 1024 * 32 { + self.stats.payload_ext_very_large().inc(); + if false { warn!( "ExtHead data_type {} payload_size {payload_size} data_count {data_count}", hi.data_type ); } - let hi = hi.clone().with_ext(payload_size, data_count); - self.state = CaState::Payload(hi); - Ok(None) } - CaState::Payload(hi) => { - let g = self.buf.read_bytes(hi.payload_len())?; - let msg = CaMsg::from_proto_infos(hi, g, tsnow, self.array_truncate)?; - // data-count is only reasonable for event messages - if let CaMsgTy::EventAddRes(e) = &msg.ty { - self.stats.data_count().ingest(hi.data_count() as u32); - } - self.state = CaState::StdHead; - Ok(Some(CaItem::Msg(msg))) + if payload_size <= 16368 { + self.stats.payload_ext_but_small().inc(); + warn!( + "ExtHead data_type {} payload_size {payload_size} data_count {data_count}", + hi.data_type + ); } - CaState::Done => Err(Error::ParseAttemptInDoneState), - }; + let hi = hi.clone().with_ext(payload_size, data_count); + self.state = CaState::Payload(hi); + Ok(None) + } + CaState::Payload(hi) => { + let g = self.buf.read_bytes(hi.payload_len())?; + let msg = CaMsg::from_proto_infos(hi, g, tsnow, self.array_truncate)?; + // data-count is only reasonable for event messages + if let CaMsgTy::EventAddRes(e) = &msg.ty { + self.stats.data_count().ingest(hi.data_count() as u32); + } + self.state = CaState::StdHead; + Ok(Some(CaItem::Msg(msg))) + } + CaState::Done => Err(Error::ParseAttemptInDoneState), } } } @@ -1274,16 +1292,16 @@ impl Stream for CaProto { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - loop { - break if let CaState::Done = self.state { + break if let Some(item) = self.resqu.pop_front() { + Ready(Some(Ok(item))) + } else if let CaState::Done = self.state { Ready(None) } else { let k = Self::loop_body(self.as_mut(), cx); match k { - Ok(Some(Ready(k))) => Ready(Some(Ok(k))), - Ok(Some(Pending)) => Pending, - Ok(None) => continue, + Ok(Ready(())) => continue, + Ok(Pending) => Pending, Err(e) => Ready(Some(Err(e))), } }; diff --git a/netfetch/src/conf.rs b/netfetch/src/conf.rs index acb932f..2fdeb4e 100644 --- a/netfetch/src/conf.rs +++ b/netfetch/src/conf.rs @@ -28,6 +28,7 @@ pub struct CaIngestOpts { scylla: ScyllaConfig, array_truncate: Option, insert_worker_count: Option, + insert_worker_concurrency: Option, insert_scylla_sessions: Option, insert_queue_max: Option, insert_item_queue_cap: Option, @@ -76,7 +77,11 @@ impl CaIngestOpts { } pub fn insert_worker_count(&self) -> usize { - self.insert_worker_count.unwrap_or(800) + self.insert_worker_count.unwrap_or(4) + } + + pub fn insert_worker_concurrency(&self) -> usize { + self.insert_worker_concurrency.unwrap_or(32) } pub fn insert_scylla_sessions(&self) -> usize { diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index 4a53d6d..3d900f3 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -223,6 +223,15 @@ fn make_routes(dcom: Arc, connset_cmd_tx: Sender, st || async move { axum::Json(serde_json::json!({ "v1": 42_u32, + "o1": { + "v2": 56, + "o2": { + "v3": "test", + }, + }, + "o5": { + "v6": 89, + }, })) } }), diff --git a/netfetch/src/senderpolling.rs b/netfetch/src/senderpolling.rs index 0ce781e..9186cff 100644 --- a/netfetch/src/senderpolling.rs +++ b/netfetch/src/senderpolling.rs @@ -43,6 +43,10 @@ impl SenderPolling { ret } + pub fn has_sender(&self) -> bool { + self.sender.is_some() + } + pub fn is_idle(&self) -> bool { self.sender.is_some() && self.fut.is_none() } @@ -97,6 +101,18 @@ impl SenderPolling { } self.sender.as_ref().unwrap().send(item).await } + + unsafe fn reset_fut(futopt: Pin<&mut Option>>) { + let y = futopt.get_unchecked_mut(); + let z = y.as_mut().unwrap_unchecked(); + std::ptr::drop_in_place(z); + std::ptr::write(y, None); + } + + #[allow(unused)] + unsafe fn reset_fut_old(futopt: Pin<&mut Option>>) { + *futopt.get_unchecked_mut() = None; + } } impl Future for SenderPolling @@ -109,16 +125,16 @@ where use Poll::*; let mut this = self.project(); match this.fut.as_mut().as_pin_mut() { - Some(fut) => match fut.poll(cx) { + Some(mut fut) => match fut.as_mut().poll(cx) { Ready(Ok(())) => { unsafe { - *this.fut.get_unchecked_mut() = None; + Self::reset_fut(this.fut); } Ready(Ok(())) } Ready(Err(e)) => { unsafe { - *this.fut.get_unchecked_mut() = None; + Self::reset_fut(this.fut); } Ready(Err(Error::Closed(e.0))) } diff --git a/scywr/src/insertworker.rs b/scywr/src/insertworker.rs index 95f39fe..caeb72c 100644 --- a/scywr/src/insertworker.rs +++ b/scywr/src/insertworker.rs @@ -98,6 +98,7 @@ pub async fn spawn_scylla_insert_workers( scyconf: ScyllaConfig, insert_scylla_sessions: usize, insert_worker_count: usize, + insert_worker_concurrency: usize, item_inp: Receiver, insert_worker_opts: Arc, store_stats: Arc, @@ -128,7 +129,7 @@ pub async fn spawn_scylla_insert_workers( )); let jh = tokio::spawn(worker_streamed( worker_ix, - insert_worker_count * 3, + insert_worker_concurrency, item_inp.clone(), ttls.clone(), insert_worker_opts.clone(), @@ -363,6 +364,10 @@ async fn worker_streamed( }) .map(|x| futures_util::stream::iter(x)) .flatten_unordered(Some(1)) + // .map(|x| async move { + // drop(x); + // Ok(()) + // }) .buffer_unordered(concurrency); while let Some(item) = stream.next().await { match item { @@ -408,23 +413,27 @@ fn prepare_query_insert_futs( let series = item.series.clone(); let ts_msp = item.ts_msp; let do_insert = true; - let fut = insert_item_fut(item, &ttls, &data_store, do_insert, stats); - let mut futs = smallvec![fut]; - if msp_bump { - stats.inserts_msp().inc(); - let fut = insert_msp_fut( - series, - ts_msp, - item_ts_local, - ttls, - data_store.scy.clone(), - data_store.qu_insert_ts_msp.clone(), - stats.clone(), - ); - if item_ts_local % 100000 == 7461 { + let mut futs = smallvec![]; + + // TODO + if true || item_ts_local & 0x3f00000 < 0x0a00000 { + let fut = insert_item_fut(item, &ttls, &data_store, do_insert, stats); + futs.push(fut); + if msp_bump { + stats.inserts_msp().inc(); + let fut = insert_msp_fut( + series, + ts_msp, + item_ts_local, + ttls, + data_store.scy.clone(), + data_store.qu_insert_ts_msp.clone(), + stats.clone(), + ); futs.push(fut); } } + #[cfg(DISABLED)] if let Some(ts_msp_grid) = item.ts_msp_grid { let params = ( @@ -441,5 +450,6 @@ fn prepare_query_insert_futs( .await?; stats.inserts_msp_grid().inc(); } + futs } diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index b4dd897..06e5bae 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -713,3 +713,37 @@ pub async fn insert_channel_status( .await?; Ok(()) } + +pub struct InsertFut2 { + data_store: Arc, + stats: Arc, + kind: InsertFutKind, +} + +impl Future for InsertFut2 { + type Output = Result<(), Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + use Poll::*; + todo!() + } +} + +pub enum InsertFutKind { + Value, +} + +pub struct InsertItemFut { + data_store: Arc, + stats: Arc, + item: InsertItem, +} + +impl Future for InsertItemFut { + type Output = Result<(), Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + use Poll::*; + todo!() + } +} diff --git a/stats/Cargo.toml b/stats/Cargo.toml index 5fc571f..0e18976 100644 --- a/stats/Cargo.toml +++ b/stats/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" path = "src/stats.rs" [dependencies] +rand_xoshiro = "0.6.0" stats_types = { path = "../stats_types" } stats_proc = { path = "../stats_proc" } log = { path = "../log" } diff --git a/stats/src/stats.rs b/stats/src/stats.rs index 26d4635..82bf792 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -1,3 +1,5 @@ +pub use rand_xoshiro; + use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; use std::time::Duration; @@ -367,6 +369,7 @@ stats_proc::stats_struct!(( stats_proc::stats_struct!(( stats_struct( name(CaConnStats), + prefix(caconn), counters( insert_item_create, inserts_val, @@ -383,35 +386,34 @@ stats_proc::stats_struct!(( // TODO maybe rename: this is now only the recv of the intermediate queue: store_worker_item_recv, // TODO rename to make clear that this drop is voluntary because of user config choice: - store_worker_fraction_drop, - store_worker_ratelimit_drop, - store_worker_insert_done, - store_worker_insert_binned_done, - store_worker_insert_overload, - store_worker_insert_timeout, - store_worker_insert_unavailable, - store_worker_insert_error, + // store_worker_fraction_drop, + // store_worker_ratelimit_drop, + // store_worker_insert_done, + // store_worker_insert_binned_done, + // store_worker_insert_overload, + // store_worker_insert_timeout, + // store_worker_insert_unavailable, + // store_worker_insert_error, connection_status_insert_done, channel_status_insert_done, channel_info_insert_done, ivl_insert_done, mute_insert_done, - caconn_poll_count, - caconn_loop1_count, - caconn_loop2_count, - caconn_loop3_count, - caconn_loop4_count, - caconn_command_can_not_reply, - caconn_recv_data, + poll_count, + loop1_count, + loop2_count, + loop3_count, + loop4_count, + command_can_not_reply, time_handle_conn_listen, time_handle_peer_ready, time_check_channels_state_init, time_handle_event_add_res, tcp_connected, get_series_id_ok, - conn_item_count, - conn_stream_ready, - conn_stream_pending, + item_count, + stream_ready, + stream_pending, channel_all_count, channel_alive_count, channel_not_alive_count, @@ -419,11 +421,17 @@ stats_proc::stats_struct!(( ping_start, ping_no_proto, pong_timeout, - ca_conn_poll_fn_begin, - ca_conn_poll_loop_begin, - ca_conn_poll_reloop, - ca_conn_poll_pending, - ca_conn_poll_no_progress_no_pending, + poll_fn_begin, + poll_loop_begin, + poll_reloop, + poll_pending, + poll_no_progress_no_pending, + storage_queue_send, + storage_queue_pending, + storage_queue_above_8, + storage_queue_above_32, + storage_queue_above_128, + event_add_res_recv, ), values(inter_ivl_ema), histolog2s(pong_recv_lat, ca_ts_off,), @@ -491,3 +499,17 @@ fn test0_diff() { let diff = TestStats0Diff::diff_from(&stats_a, &stats_b); assert_eq!(diff.count0.load(), 3); } + +pub fn xoshiro_from_time() -> rand_xoshiro::Xoshiro128StarStar { + use rand_xoshiro::rand_core::SeedableRng; + use std::time::SystemTime; + let a = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .subsec_nanos() as u64; + let b = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .subsec_nanos() as u64; + rand_xoshiro::Xoshiro128StarStar::seed_from_u64(a << 32 ^ b) +}