diff --git a/.cargo/config.toml b/.cargo/config.toml index d1d2fff..ec464e1 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,16 +1,17 @@ [build] rustflags = [ + #"-C", "target-cpu=native", "-C", "target-cpu=sandybridge", - "-C", "force-frame-pointers=yes", - "-C", "force-unwind-tables=yes", + #"-C", "force-frame-pointers=yes", + #"-C", "force-unwind-tables=yes", #"-C", "relocation-model=static", #"-C", "embed-bitcode=no", #"-C", "inline-threshold=1000", #"-Z", "time-passes=yes", #"-Z", "time-llvm-passes=yes", - "--cfg", "tokio_unstable", + #"--cfg", "tokio_unstable", ] rustdocflags = [ - "--cfg", "tokio_unstable" + #"--cfg", "tokio_unstable" ] diff --git a/Cargo.toml b/Cargo.toml index ba16d4a..f40b37e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,13 +2,13 @@ members = ["log", "netfetch", "daqingest"] [profile.release] -opt-level = 2 -debug = 2 +opt-level = 3 +debug = 0 overflow-checks = false debug-assertions = false lto = "thin" -codegen-units = 32 -incremental = true +#codegen-units = 32 +incremental = false [patch.crates-io] #tokio = { git = "https://github.com/dominikwerder/tokio", rev = "995221d8" } diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index 3ba3b44..440b13f 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -4,7 +4,7 @@ pub mod store; use self::conn::FindIocStream; use self::store::DataStore; -use crate::store::{CommonInsertItemQueue, CommonInsertQueue}; +use crate::store::CommonInsertItemQueue; use conn::CaConn; use err::Error; use futures_util::StreamExt; @@ -12,15 +12,17 @@ use log::*; use netpod::Database; use scylla::batch::Consistency; use serde::{Deserialize, Serialize}; -use stats::{CaConnStatsAgg, CaConnStatsAggDiff}; +use stats::{CaConnStats, CaConnStatsAgg, CaConnStatsAggDiff}; use std::collections::BTreeMap; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use std::path::PathBuf; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex, Once}; use std::time::{Duration, Instant}; use tokio::fs::OpenOptions; use tokio::io::AsyncReadExt; use tokio::net::TcpStream; +use tokio_postgres::Client as PgClient; static mut METRICS: Option>> = None; static METRICS_ONCE: Once = Once::new(); @@ -53,6 +55,7 @@ struct ChannelConfig { insert_scylla_sessions: Option, insert_queue_max: Option, insert_item_queue_cap: Option, + api_bind: Option, } pub struct ListenFromFileOpts { @@ -86,15 +89,15 @@ pub async fn parse_config(config: PathBuf) -> Result { search: conf.search, addr_bind: conf.addr_bind, addr_conn: conf.addr_conn, - max_simul: conf.max_simul.unwrap_or(113), timeout: conf.timeout.unwrap_or(2000), abort_after_search: conf.abort_after_search, pg_pass: conf.pg_pass, array_truncate: conf.array_truncate.unwrap_or(512), - insert_worker_count: conf.insert_worker_count.unwrap_or(1), + insert_worker_count: conf.insert_worker_count.unwrap_or(8), insert_scylla_sessions: conf.insert_scylla_sessions.unwrap_or(1), - insert_queue_max: conf.insert_queue_max.unwrap_or(16), - insert_item_queue_cap: conf.insert_item_queue_cap.unwrap_or(256), + insert_queue_max: conf.insert_queue_max.unwrap_or(32), + insert_item_queue_cap: conf.insert_item_queue_cap.unwrap_or(380000), + api_bind: conf.api_bind.unwrap_or_else(|| "0.0.0.0:3011".into()), }) } @@ -103,7 +106,6 @@ pub struct CaConnectOpts { pub search: Vec, pub addr_bind: Ipv4Addr, pub addr_conn: Ipv4Addr, - pub max_simul: usize, pub timeout: u64, pub abort_after_search: u32, pub pg_pass: String, @@ -112,6 +114,7 @@ pub struct CaConnectOpts { pub insert_scylla_sessions: usize, pub insert_queue_max: usize, pub insert_item_queue_cap: usize, + pub api_bind: String, } async fn resolve_address(addr_str: &str) -> Result { @@ -242,14 +245,75 @@ pub async fn ca_search(opts: ListenFromFileOpts) -> Result<(), Error> { Ok(()) } +async fn spawn_scylla_insert_workers( + insert_scylla_sessions: usize, + insert_worker_count: usize, + insert_item_queue: &CommonInsertItemQueue, + insert_frac: Arc, + pg_client: Arc, + store_stats: Arc, +) -> Result<(), Error> { + let mut data_stores = vec![]; + for _ in 0..insert_scylla_sessions { + let scy = scylla::SessionBuilder::new() + .known_node("sf-nube-14:19042") + .default_consistency(Consistency::One) + .use_keyspace("ks1", true) + .build() + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let scy = Arc::new(scy); + let data_store = Arc::new(DataStore::new(pg_client.clone(), scy.clone()).await?); + data_stores.push(data_store); + } + for i1 in 0..insert_worker_count { + let data_store = data_stores[i1 * data_stores.len() / insert_worker_count].clone(); + let stats = store_stats.clone(); + let recv = insert_item_queue.receiver(); + let insert_frac = insert_frac.clone(); + let fut = async move { + let mut i1 = 0; + while let Ok(item) = recv.recv().await { + stats.store_worker_item_recv_inc(); + let insert_frac = insert_frac.load(Ordering::Acquire); + if i1 % 1000 < insert_frac { + match crate::store::insert_item(item, &data_store, &stats).await { + Ok(_) => { + stats.store_worker_item_insert_inc(); + } + Err(e) => { + stats.store_worker_item_error_inc(); + // TODO introduce more structured error variants. + if e.msg().contains("WriteTimeout") { + tokio::time::sleep(Duration::from_millis(100)).await; + } else { + // TODO back off but continue. + error!("insert worker sees error: {e:?}"); + break; + } + } + } + } else { + stats.store_worker_item_drop_inc(); + } + i1 += 1; + } + }; + tokio::spawn(fut); + } + Ok(()) +} + pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { - tokio::spawn(start_metrics_service()); - - // TODO maybe this should hold the resources needed by the futures? - let ciq = CommonInsertQueue::new(); - let facility = "scylla"; + let insert_frac = Arc::new(AtomicU64::new(1000)); + let insert_ivl_min = Arc::new(AtomicU64::new(8800)); let opts = parse_config(opts.config).await?; + tokio::spawn(start_metrics_service( + opts.api_bind.clone(), + insert_frac.clone(), + insert_ivl_min.clone(), + )); let d = Database { name: "daqbuffer".into(), host: "sf-nube-11".into(), @@ -275,6 +339,10 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let scy = Arc::new(scy); + // TODO use new struct: + let local_stats = Arc::new(CaConnStats::new()); + + // TODO factor the find loop into a separate Stream. info!("FIND IOCS"); let qu_find_addr = pg_client .prepare("select t2.channel, t2.addr from ioc_by_channel t1, ioc_by_channel t2 where t2.facility = t1.facility and t2.channel = t1.channel and t1.facility = $1 and t1.channel in ($2, $3, $4, $5, $6, $7, $8, $9)") @@ -309,7 +377,10 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { // TODO the address was searched before but could not be found. } else { let addr: SocketAddrV4 = match addr.parse() { - Ok(k) => k, + Ok(k) => { + local_stats.ioc_lookup_inc(); + k + } Err(e) => { error!("can not parse {addr:?} for channel {ch:?} {e:?}"); continue; @@ -331,60 +402,45 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { if opts.abort_after_search == 1 { return Ok(()); } - let data_store = Arc::new(DataStore::new(pg_client.clone(), scy.clone(), ciq.sender()).await?); + let data_store = Arc::new(DataStore::new(pg_client.clone(), scy.clone()).await?); let insert_item_queue = CommonInsertItemQueue::new(opts.insert_item_queue_cap); // TODO use a new stats struct - let store_stats = Arc::new(stats::CaConnStats::new()); + let store_stats = Arc::new(CaConnStats::new()); + + spawn_scylla_insert_workers( + opts.insert_scylla_sessions, + opts.insert_worker_count, + &insert_item_queue, + insert_frac.clone(), + pg_client.clone(), + store_stats.clone(), + ) + .await?; - let mut data_stores = vec![]; - for _ in 0..opts.insert_scylla_sessions { - let scy = scylla::SessionBuilder::new() - .known_node("sf-nube-14:19042") - .default_consistency(Consistency::One) - .use_keyspace("ks1", true) - .build() - .await - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; - let scy = Arc::new(scy); - let data_store = Arc::new(DataStore::new(pg_client.clone(), scy.clone(), ciq.sender()).await?); - data_stores.push(data_store); - } - for i1 in 0..opts.insert_worker_count { - let data_store = data_stores[i1 * data_stores.len() / opts.insert_worker_count].clone(); - let stats = store_stats.clone(); - let recv = insert_item_queue.receiver(); - let fut = async move { - while let Ok(item) = recv.recv().await { - stats.store_worker_item_recv_inc(); - match crate::store::insert_item(item, &data_store, &stats).await { - Ok(_) => {} - Err(e) => { - // TODO back off but continue. - error!("insert worker sees error: {e:?}"); - break; - } - } - } - }; - tokio::spawn(fut); - } let mut conn_jhs = vec![]; let mut conn_stats = vec![]; + info!("channels_by_host len {}", channels_by_host.len()); for (host, channels) in channels_by_host { if false && host.ip() != &"172.26.24.76".parse::().unwrap() { continue; } let data_store = data_store.clone(); - debug!("Create TCP connection to {:?}", (host.ip(), host.port())); + //debug!("Create TCP connection to {:?}", (host.ip(), host.port())); let addr = SocketAddrV4::new(host.ip().clone(), host.port()); - let tcp = match TcpStream::connect(addr).await { - Ok(k) => k, + // TODO establish the connection in the future SM. + let tcp = match tokio::time::timeout(Duration::from_millis(500), TcpStream::connect(addr)).await { + Ok(Ok(k)) => k, + Ok(Err(e)) => { + error!("Can not connect to {addr:?} {e:?}"); + continue; + } Err(e) => { error!("Can not connect to {addr:?} {e:?}"); continue; } }; + local_stats.tcp_connected_inc(); let mut conn = CaConn::new( tcp, addr, @@ -392,15 +448,18 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { insert_item_queue.sender(), opts.array_truncate, opts.insert_queue_max, + insert_ivl_min.clone(), ); conn_stats.push(conn.stats()); for c in channels { conn.channel_add(c); } + let stats2 = conn.stats(); let conn_block = async move { while let Some(item) = conn.next().await { match item { Ok(_) => { + stats2.conn_item_count_inc(); // TODO test if performance can be noticed: //trace!("CaConn gives item: {k:?}"); } @@ -417,8 +476,9 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { } let mut agg_last = CaConnStatsAgg::new(); loop { - tokio::time::sleep(Duration::from_millis(500)).await; + tokio::time::sleep(Duration::from_millis(671)).await; let agg = CaConnStatsAgg::new(); + agg.push(&local_stats); agg.push(&store_stats); for g in &conn_stats { agg.push(&g); @@ -429,6 +489,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { let diff = CaConnStatsAggDiff::diff_from(&agg_last, &agg); info!("{}", diff.display()); } + for _s1 in &conn_stats {} agg_last = agg; if false { break; @@ -450,24 +511,37 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { Ok(()) } -async fn start_metrics_service() { - let app = axum::Router::new().route( - "/metrics", - axum::routing::get(|| async { - let stats = get_metrics(); - match stats { - Some(s) => { - trace!("Metrics"); - s.prometheus() +async fn start_metrics_service(bind_to: String, insert_frac: Arc, insert_ivl_min: Arc) { + let app = axum::Router::new() + .route( + "/metrics", + axum::routing::get(|| async { + let stats = get_metrics(); + match stats { + Some(s) => { + trace!("Metrics"); + s.prometheus() + } + None => { + trace!("Metrics empty"); + String::new() + } } - None => { - trace!("Metrics empty"); - String::new() - } - } - }), - ); - axum::Server::bind(&"0.0.0.0:3011".parse().unwrap()) + }), + ) + .route( + "/insert_frac", + axum::routing::put(|v: axum::extract::Json| async move { + insert_frac.store(v.0, Ordering::Release); + }), + ) + .route( + "/insert_ivl_min", + axum::routing::put(|v: axum::extract::Json| async move { + insert_ivl_min.store(v.0, Ordering::Release); + }), + ); + axum::Server::bind(&bind_to.parse().unwrap()) .serve(app.into_make_service()) .await .unwrap() diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 807a9ca..6425051 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -15,7 +15,7 @@ use stats::{CaConnStats, IntervalEma}; use std::collections::{BTreeMap, VecDeque}; use std::net::{Ipv4Addr, SocketAddrV4}; use std::pin::Pin; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use std::sync::Arc; use std::task::{Context, Poll}; use std::time::{Duration, Instant, SystemTime}; @@ -47,18 +47,24 @@ enum MonitoringState { Muted, } -#[allow(unused)] #[derive(Debug)] struct CreatedState { + #[allow(unused)] cid: u32, + #[allow(unused)] sid: u32, scalar_type: ScalarType, shape: Shape, + #[allow(unused)] ts_created: Instant, state: MonitoringState, ts_msp_last: u64, + ts_msp_grid_last: u32, inserted_in_ts_msp: u64, - ivl_ema: IntervalEma, + insert_item_ivl_ema: IntervalEma, + insert_next_earliest: Instant, + #[allow(unused)] + fast_warn_count: u32, } #[allow(unused)] @@ -116,6 +122,7 @@ pub struct CaConn { remote_addr_dbg: SocketAddrV4, stats: Arc, insert_queue_max: usize, + insert_ivl_min: Arc, } impl CaConn { @@ -126,6 +133,7 @@ impl CaConn { insert_item_sender: CommonInsertItemQueueSender, array_truncate: usize, insert_queue_max: usize, + insert_ivl_min: Arc, ) -> Self { Self { state: CaConnState::Init, @@ -148,6 +156,7 @@ impl CaConn { remote_addr_dbg, stats: Arc::new(CaConnStats::new()), insert_queue_max, + insert_ivl_min, } } @@ -191,13 +200,28 @@ impl CaConn { self.insert_item_send_fut = None; } Ready(Err(_)) => break Ready(Err(Error::with_msg_no_trace(format!("can not send the item")))), - Pending => break Pending, + Pending => { + if false { + // Drop the item and continue. + // TODO this causes performance degradation in the channel. + self.stats.inserts_queue_drop_inc(); + self.insert_item_send_fut = None; + } else { + // Wait until global queue is ready (peer will see network pressure) + break Pending; + } + } }, None => {} } if let Some(item) = self.insert_item_queue.pop_front() { + self.stats.inserts_queue_pop_for_global_inc(); let sender = unsafe { &*(&self.insert_item_sender as *const CommonInsertItemQueueSender) }; - self.insert_item_send_fut = Some(sender.send(item)); + if sender.is_full() { + self.stats.inserts_queue_drop_inc(); + } else { + self.insert_item_send_fut = Some(sender.send(item)); + } } else { break Ready(Ok(())); } @@ -209,7 +233,7 @@ impl CaConn { while self.fut_get_series.len() > 0 { match self.fut_get_series.poll_next_unpin(cx) { Ready(Some(Ok(k))) => { - //info!("Have SeriesId {k:?}"); + self.stats.get_series_id_ok_inc(); let cid = k.0; let sid = k.1; let data_type = k.2; @@ -244,8 +268,11 @@ impl CaConn { ts_created: Instant::now(), state: MonitoringState::AddingEvent(series), ts_msp_last: 0, + ts_msp_grid_last: 0, inserted_in_ts_msp: u64::MAX, - ivl_ema: IntervalEma::new(), + insert_item_ivl_ema: IntervalEma::new(), + insert_next_earliest: Instant::now(), + fast_warn_count: 0, }); let scalar_type = ScalarType::from_ca_id(data_type)?; let shape = Shape::from_ca_count(data_count)?; @@ -274,17 +301,15 @@ impl CaConn { series: SeriesId, scalar_type: ScalarType, shape: Shape, + ts: u64, ev: proto::EventAddRes, cid: u32, ts_msp_last: u64, inserted_in_ts_msp: u64, + ts_msp_grid: Option, ) -> Result<(), Error> { - // TODO where to actually get the timestamp of the event from? - let ts = SystemTime::now(); - let epoch = ts.duration_since(std::time::UNIX_EPOCH).unwrap(); // TODO decide on better msp/lsp: random offset! // As long as one writer is active, the msp is arbitrary. - let ts = epoch.as_secs() * SEC + epoch.subsec_nanos() as u64; let ts_msp = if inserted_in_ts_msp > 2000 { let ts_msp = ts / (60 * SEC) * (60 * SEC); if let ChannelState::Created(st) = self.channels.get_mut(&cid).unwrap() { @@ -324,6 +349,7 @@ impl CaConn { scalar_type, shape, val: ev.value, + ts_msp_grid, }; item_queue.push_back(item); self.stats.insert_item_create_inc(); @@ -372,9 +398,44 @@ impl CaConn { return Err(format!("no series id on insert").into()); } }; - let ts_msp_last = st.ts_msp_last; - let inserted_in_ts_msp = st.inserted_in_ts_msp; - self.event_add_insert(series, scalar_type, shape, ev, cid, ts_msp_last, inserted_in_ts_msp)?; + let tsnow = Instant::now(); + if tsnow >= st.insert_next_earliest { + st.insert_item_ivl_ema.tick(tsnow); + let em = st.insert_item_ivl_ema.ema(); + let ema = em.ema(); + let mm = self.insert_ivl_min.load(Ordering::Acquire); + let mm = (mm as f32) * 1e-6; + let dt = (mm - ema) / em.k(); + st.insert_next_earliest = tsnow + .checked_add(Duration::from_micros((dt * 1e6) as u64)) + .ok_or_else(|| Error::with_msg_no_trace("time overflow in next insert"))?; + let ts_msp_last = st.ts_msp_last; + let inserted_in_ts_msp = st.inserted_in_ts_msp; + // TODO get event timestamp from channel access field + let ts = SystemTime::now(); + let epoch = ts.duration_since(std::time::UNIX_EPOCH).unwrap(); + let ts = epoch.as_secs() * SEC + epoch.subsec_nanos() as u64; + let ts_msp_grid = (ts / (SEC * 10 * 6 * 2)) as u32 * (6 * 2); + let ts_msp_grid = if st.ts_msp_grid_last != ts_msp_grid { + st.ts_msp_grid_last = ts_msp_grid; + Some(ts_msp_grid) + } else { + None + }; + self.event_add_insert( + series, + scalar_type, + shape, + ts, + ev, + cid, + ts_msp_last, + inserted_in_ts_msp, + ts_msp_grid, + )?; + } else { + self.stats.channel_fast_item_drop_inc(); + } } _ => { error!("unexpected state: EventAddRes while having {ch_s:?}"); @@ -522,8 +583,11 @@ impl CaConn { ts_created: Instant::now(), state: MonitoringState::FetchSeriesId, ts_msp_last: 0, + ts_msp_grid_last: 0, inserted_in_ts_msp: u64::MAX, - ivl_ema: IntervalEma::new(), + insert_item_ivl_ema: IntervalEma::new(), + insert_next_earliest: Instant::now(), + fast_warn_count: 0, }); // TODO handle error in different way. Should most likely not abort. let cd = ChannelDescDecoded { @@ -673,6 +737,10 @@ impl Stream for CaConn { let ts_outer_2 = Instant::now(); self.stats.poll_time_all_dur(ts_outer_2.duration_since(ts_outer_1)); // TODO currently, this will never stop by itself + match &ret { + Ready(_) => self.stats.conn_stream_ready_inc(), + Pending => self.stats.conn_stream_pending_inc(), + } ret } } diff --git a/netfetch/src/ca/store.rs b/netfetch/src/ca/store.rs index bea9a7c..4ddc44c 100644 --- a/netfetch/src/ca/store.rs +++ b/netfetch/src/ca/store.rs @@ -1,6 +1,5 @@ use crate::bsread::ChannelDescDecoded; use crate::series::{Existence, SeriesId}; -use crate::store::CommonInsertQueueSender; use async_channel::{Receiver, Sender}; use err::Error; use scylla::prepared_statement::PreparedStatement; @@ -25,15 +24,13 @@ pub struct RegisterChannel { rx: Receiver, } -#[allow(unused)] pub struct ChannelRegistry { - scy: Arc, pg_client: Arc, } impl ChannelRegistry { - pub fn new(pg_client: Arc, scy: Arc) -> Self { - Self { pg_client, scy } + pub fn new(pg_client: Arc) -> Self { + Self { pg_client } } pub async fn get_series_id(&self, cd: ChannelDescDecoded) -> Result, Error> { @@ -43,8 +40,8 @@ impl ChannelRegistry { pub struct DataStore { pub scy: Arc, - pub qu_insert_series: Arc, pub qu_insert_ts_msp: Arc, + pub qu_insert_series_by_ts_msp: Arc, pub qu_insert_scalar_i8: Arc, pub qu_insert_scalar_i16: Arc, pub qu_insert_scalar_i32: Arc, @@ -57,25 +54,21 @@ pub struct DataStore { pub qu_insert_array_f32: Arc, pub qu_insert_array_f64: Arc, pub chan_reg: Arc, - pub ciqs: CommonInsertQueueSender, } impl DataStore { - pub async fn new( - pg_client: Arc, - scy: Arc, - ciqs: CommonInsertQueueSender, - ) -> Result { - let q = scy - .prepare("insert into series (part, series, ts_msp, scalar_type, shape_dims) values (?, ?, ?, ?, ?)") - .await - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; - let qu_insert_series = Arc::new(q); + pub async fn new(pg_client: Arc, scy: Arc) -> Result { let q = scy .prepare("insert into ts_msp (series, ts_msp) values (?, ?)") .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let qu_insert_ts_msp = Arc::new(q); + let q = scy + .prepare("insert into series_by_ts_msp (ts_msp, shape_kind, scalar_type, series) values (?, ?, ?, ?)") + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let qu_insert_series_by_ts_msp = Arc::new(q); + // scalar: let q = scy .prepare("insert into events_scalar_i8 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)") .await @@ -133,10 +126,10 @@ impl DataStore { .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let qu_insert_array_f64 = Arc::new(q); let ret = Self { - chan_reg: Arc::new(ChannelRegistry::new(pg_client, scy.clone())), + chan_reg: Arc::new(ChannelRegistry::new(pg_client)), scy, - qu_insert_series, qu_insert_ts_msp, + qu_insert_series_by_ts_msp, qu_insert_scalar_i8, qu_insert_scalar_i16, qu_insert_scalar_i32, @@ -148,7 +141,6 @@ impl DataStore { qu_insert_array_i32, qu_insert_array_f32, qu_insert_array_f64, - ciqs, }; Ok(ret) } diff --git a/netfetch/src/series.rs b/netfetch/src/series.rs index 1daabff..73f91a4 100644 --- a/netfetch/src/series.rs +++ b/netfetch/src/series.rs @@ -3,7 +3,6 @@ use crate::errconv::ErrConv; use err::Error; #[allow(unused)] use log::*; -use scylla::Session as ScySession; use std::time::Duration; use tokio_postgres::Client as PgClient; @@ -22,76 +21,6 @@ impl SeriesId { } } -// TODO don't need byte_order or compression from ChannelDescDecoded for channel registration. -pub async fn get_series_id_scylla(scy: &ScySession, cd: &ChannelDescDecoded) -> Result, Error> { - err::todo(); - // TODO do not use, LWT in Scylla is currently buggy. - let facility = "scylla"; - let channel_name = &cd.name; - let scalar_type = cd.scalar_type.to_scylla_i32(); - let shape = cd.shape.to_scylla_vec(); - info!("get_series_id {facility:?} {channel_name:?} {scalar_type:?} {shape:?}"); - let res = scy - .query( - "select series, agg_kind from series_by_channel where facility = ? and channel_name = ? and scalar_type = ? and shape_dims = ?", - (facility, channel_name, &scalar_type, &shape), - ) - .await - .err_conv()?; - let mut all = vec![]; - for row in res.rows_typed_or_empty::<(i64, Option)>() { - match row { - Ok(k) => { - if k.1.is_none() { - all.push(k.0); - } - } - Err(e) => return Err(Error::with_msg_no_trace(format!("{e:?}"))), - } - } - info!("all: {all:?}"); - let rn = all.len(); - if rn == 0 { - use md5::Digest; - let mut h = md5::Md5::new(); - h.update(facility.as_bytes()); - h.update(channel_name.as_bytes()); - h.update(format!("{:?} {:?}", scalar_type, shape).as_bytes()); - let f = h.finalize(); - let mut series = u64::from_le_bytes(f.as_slice()[0..8].try_into().unwrap()); - // TODO technically we could/should assert that we run on 2-complement machine. - const SMASK: u64 = 0x7fffffffffffffff; - series = series & SMASK; - for _ in 0..2000 { - let res = scy - .query( - concat!( - "insert into series_by_channel", - " (facility, channel_name, scalar_type, shape_dims, agg_kind, series)", - " values (?, ?, ?, ?, null, ?) if not exists" - ), - (facility, channel_name, &scalar_type, &shape, series as i64), - ) - .await - .err_conv()?; - let row = res.first_row().err_conv()?; - if row.columns[0].as_ref().unwrap().as_boolean().unwrap() { - return Ok(Existence::Created(SeriesId(series))); - } else { - error!("tried to insert but series exists..."); - } - tokio::time::sleep(Duration::from_millis(20)).await; - series += 1; - series = series & SMASK; - } - Err(Error::with_msg_no_trace(format!("can not create and insert series id"))) - } else { - let series = all[0] as u64; - info!("series: {:?}", series); - Ok(Existence::Existing(SeriesId(series))) - } -} - // TODO don't need byte_order or compression from ChannelDescDecoded for channel registration. pub async fn get_series_id(pg_client: &PgClient, cd: &ChannelDescDecoded) -> Result, Error> { let facility = "scylla"; diff --git a/netfetch/src/store.rs b/netfetch/src/store.rs index fb46d9a..dc1357e 100644 --- a/netfetch/src/store.rs +++ b/netfetch/src/store.rs @@ -2,8 +2,7 @@ use crate::ca::proto::{CaDataArrayValue, CaDataScalarValue, CaDataValue}; use crate::ca::store::DataStore; use crate::errconv::ErrConv; use err::Error; -use futures_util::stream::FuturesOrdered; -use futures_util::{Future, FutureExt, Stream, StreamExt}; +use futures_util::{Future, FutureExt}; use log::*; use netpod::{ScalarType, Shape}; use scylla::frame::value::ValueList; @@ -16,10 +15,6 @@ use std::sync::Arc; use std::task::{Context, Poll}; use std::time::Instant; -const CHANNEL_CAP: usize = 128; -const POLLING_CAP: usize = 32; -const TABLE_SERIES_MOD: u32 = 128; - pub struct ScyInsertFut { #[allow(unused)] scy: Arc, @@ -94,6 +89,7 @@ pub struct InsertItem { pub ts_msp: u64, pub ts_lsp: u64, pub msp_bump: bool, + pub ts_msp_grid: Option, pub pulse: u64, pub scalar_type: ScalarType, pub shape: Shape, @@ -109,6 +105,11 @@ impl CommonInsertItemQueueSender { pub fn send(&self, k: InsertItem) -> async_channel::Send { self.sender.send(k) } + + #[inline(always)] + pub fn is_full(&self) -> bool { + self.sender.is_full() + } } pub struct CommonInsertItemQueue { @@ -185,18 +186,6 @@ where pub async fn insert_item(item: InsertItem, data_store: &DataStore, stats: &CaConnStats) -> Result<(), Error> { if item.msp_bump { - let params = ( - (item.series as u32 % TABLE_SERIES_MOD) as i32, - item.series as i64, - item.ts_msp as i64, - item.scalar_type.to_scylla_i32(), - item.shape.to_scylla_vec(), - ); - data_store - .scy - .execute(&data_store.qu_insert_series, params) - .await - .err_conv()?; let params = (item.series as i64, item.ts_msp as i64); data_store .scy @@ -205,6 +194,20 @@ pub async fn insert_item(item: InsertItem, data_store: &DataStore, stats: &CaCon .err_conv()?; stats.inserts_msp_inc() } + if let Some(ts_msp_grid) = item.ts_msp_grid { + let params = ( + ts_msp_grid as i32, + if item.shape.to_scylla_vec().is_empty() { 0 } else { 1 } as i32, + item.scalar_type.to_scylla_i32(), + item.series as i64, + ); + data_store + .scy + .execute(&data_store.qu_insert_series_by_ts_msp, params) + .await + .err_conv()?; + stats.inserts_msp_grid_inc() + } let par = InsParCom { series: item.series, ts_msp: item.ts_msp, @@ -239,82 +242,3 @@ pub async fn insert_item(item: InsertItem, data_store: &DataStore, stats: &CaCon stats.inserts_val_inc(); Ok(()) } - -type FutTy = Pin> + Send>>; - -pub struct CommonInsertQueueSender { - sender: async_channel::Sender, -} - -impl CommonInsertQueueSender { - pub async fn send(&self, k: FutTy) -> Result<(), Error> { - self.sender - .send(k) - .await - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}"))) - } -} - -pub struct CommonInsertQueue { - sender: async_channel::Sender, - recv: async_channel::Receiver, - futs: FuturesOrdered, - inp_done: bool, -} - -impl CommonInsertQueue { - pub fn new() -> Self { - let (tx, rx) = async_channel::bounded(CHANNEL_CAP); - Self { - sender: tx.clone(), - recv: rx, - futs: FuturesOrdered::new(), - inp_done: false, - } - } - - pub fn sender(&self) -> CommonInsertQueueSender { - CommonInsertQueueSender { - sender: self.sender.clone(), - } - } -} - -impl Stream for CommonInsertQueue { - type Item = Result<(), Error>; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - loop { - let _res_inp = if self.futs.len() < POLLING_CAP && !self.inp_done { - match self.recv.poll_next_unpin(cx) { - Ready(Some(k)) => { - self.futs.push(k); - continue; - } - Ready(None) => { - self.inp_done = true; - Ready(None) - } - Pending => Pending, - } - } else { - Ready(Some(())) - }; - let res_qu = match self.futs.poll_next_unpin(cx) { - Ready(Some(Ok(_k))) => Ready(Some(Ok(()))), - Ready(Some(Err(e))) => Ready(Some(Err(e))), - Ready(None) => { - if self.inp_done { - Ready(None) - } else { - Pending - } - } - Pending => Pending, - }; - // TODO monitor queue length and queue pushes per poll of this. - break res_qu; - } - } -} diff --git a/stats/src/stats.rs b/stats/src/stats.rs index 99ab8c7..817981c 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -6,7 +6,7 @@ const US: u64 = 1000; const MS: u64 = US * 1000; const SEC: u64 = MS * 1000; -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct EMA { ema: f32, emv: f32, @@ -56,6 +56,10 @@ impl EMA { pub fn emv(&self) -> f32 { self.emv } + + pub fn k(&self) -> f32 { + self.k + } } pub struct CheckEvery { @@ -83,7 +87,7 @@ impl CheckEvery { } } -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct IntervalEma { tslast: Option, ema: EMA, @@ -109,6 +113,22 @@ impl IntervalEma { } } } + + pub fn ema_preview(&self, tsnow: Instant) -> Option { + match self.tslast { + Some(tslast) => { + let dt = tsnow.duration_since(tslast); + let v = dt.as_secs_f32(); + let dv = v - self.ema.ema; + Some(self.ema.ema + self.ema.k * dv) + } + None => None, + } + } + + pub fn ema(&self) -> &EMA { + &self.ema + } } stats_proc::stats_struct!(( @@ -118,9 +138,15 @@ stats_proc::stats_struct!(( insert_item_create, inserts_val, inserts_msp, + inserts_msp_grid, + inserts_queue_pop_for_global, inserts_queue_push, - inserts_queue_pop, + inserts_queue_drop, + channel_fast_item_drop, store_worker_item_recv, + store_worker_item_insert, + store_worker_item_drop, + store_worker_item_error, poll_time_all, poll_time_handle_insert_futs, poll_time_get_series_futs, @@ -128,6 +154,12 @@ stats_proc::stats_struct!(( time_handle_peer_ready, time_check_channels_state_init, time_handle_event_add_res, + ioc_lookup, + tcp_connected, + get_series_id_ok, + conn_item_count, + conn_stream_ready, + conn_stream_pending, ), ), agg(name(CaConnStatsAgg), parent(CaConnStats)),