From f5f1b234660b2ab3aa24f147fffbb8a76590bca2 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 25 May 2022 10:46:09 +0200 Subject: [PATCH] Channel inserts through common scylla workers --- daqingest/src/bin/daqingest.rs | 4 +- daqingest/src/daqingest.rs | 29 +-- netfetch/src/ca.rs | 66 ++++- netfetch/src/ca/conn.rs | 442 ++++++++++----------------------- netfetch/src/ca/store.rs | 5 +- netfetch/src/stats.rs | 0 netfetch/src/store.rs | 158 +++++++++++- netfetch/src/zmtp.rs | 2 +- stats/src/stats.rs | 3 +- 9 files changed, 365 insertions(+), 344 deletions(-) delete mode 100644 netfetch/src/stats.rs diff --git a/daqingest/src/bin/daqingest.rs b/daqingest/src/bin/daqingest.rs index f1a1957..f58cdb4 100644 --- a/daqingest/src/bin/daqingest.rs +++ b/daqingest/src/bin/daqingest.rs @@ -1,6 +1,7 @@ use clap::Parser; use daqingest::{ChannelAccess, DaqIngestOpts, SubCmd}; use err::Error; +use log::*; pub fn main() -> Result<(), Error> { let opts = DaqIngestOpts::parse(); @@ -21,7 +22,6 @@ pub fn main() -> Result<(), Error> { f.run().await? } SubCmd::ChannelAccess(k) => match k { - ChannelAccess::CaChannel(_) => todo!(), ChannelAccess::CaSearch(k) => netfetch::ca::ca_search(k.into()).await?, ChannelAccess::CaConfig(k) => netfetch::ca::ca_connect(k.into()).await?, }, @@ -31,7 +31,7 @@ pub fn main() -> Result<(), Error> { match res { Ok(k) => Ok(k), Err(e) => { - log::error!("Catched: {:?}", e); + error!("Catched: {:?}", e); Err(e) } } diff --git a/daqingest/src/daqingest.rs b/daqingest/src/daqingest.rs index e1bf6c8..6cc58f4 100644 --- a/daqingest/src/daqingest.rs +++ b/daqingest/src/daqingest.rs @@ -1,7 +1,7 @@ pub mod query; use clap::Parser; -use netfetch::ca::{CaConnectOpts, ListenFromFileOpts}; +use netfetch::ca::ListenFromFileOpts; use netfetch::zmtp::ZmtpClientOpts; #[derive(Debug, Parser)] @@ -76,37 +76,10 @@ pub struct BsreadDump { #[derive(Debug, Parser)] pub enum ChannelAccess { - CaChannel(CaChannel), CaConfig(CaConfig), CaSearch(CaConfig), } -#[derive(Debug, Parser)] -pub struct CaChannel { - #[clap(long)] - pub channel: Vec, - #[clap(long)] - pub addr_bind: String, - #[clap(long)] - pub addr_conn: String, -} - -impl From for CaConnectOpts { - fn from(k: CaChannel) -> Self { - Self { - channels: k.channel, - search: vec!["255.255.255.255".into()], - addr_bind: k.addr_bind.parse().expect("can not parse address"), - addr_conn: k.addr_conn.parse().expect("can not parse address"), - max_simul: 113, - timeout: 2000, - abort_after_search: 0, - pg_pass: "".into(), - array_truncate: 512, - } - } -} - #[derive(Debug, Parser)] pub struct CaConfig { pub config: String, diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index 610dbdd..3ba3b44 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -2,10 +2,9 @@ pub mod conn; pub mod proto; pub mod store; -use crate::store::CommonInsertQueue; - use self::conn::FindIocStream; use self::store::DataStore; +use crate::store::{CommonInsertItemQueue, CommonInsertQueue}; use conn::CaConn; use err::Error; use futures_util::StreamExt; @@ -50,6 +49,10 @@ struct ChannelConfig { abort_after_search: u32, pg_pass: String, array_truncate: Option, + insert_worker_count: Option, + insert_scylla_sessions: Option, + insert_queue_max: Option, + insert_item_queue_cap: Option, } pub struct ListenFromFileOpts { @@ -88,6 +91,10 @@ pub async fn parse_config(config: PathBuf) -> Result { abort_after_search: conf.abort_after_search, pg_pass: conf.pg_pass, array_truncate: conf.array_truncate.unwrap_or(512), + insert_worker_count: conf.insert_worker_count.unwrap_or(1), + insert_scylla_sessions: conf.insert_scylla_sessions.unwrap_or(1), + insert_queue_max: conf.insert_queue_max.unwrap_or(16), + insert_item_queue_cap: conf.insert_item_queue_cap.unwrap_or(256), }) } @@ -101,6 +108,10 @@ pub struct CaConnectOpts { pub abort_after_search: u32, pub pg_pass: String, pub array_truncate: usize, + pub insert_worker_count: usize, + pub insert_scylla_sessions: usize, + pub insert_queue_max: usize, + pub insert_item_queue_cap: usize, } async fn resolve_address(addr_str: &str) -> Result { @@ -254,6 +265,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { // TODO allow clean shutdown on ctrl-c and join the pg_conn in the end: tokio::spawn(pg_conn); let pg_client = Arc::new(pg_client); + let scy = scylla::SessionBuilder::new() .known_node("sf-nube-14:19042") .default_consistency(Consistency::One) @@ -262,6 +274,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let scy = Arc::new(scy); + info!("FIND IOCS"); let qu_find_addr = pg_client .prepare("select t2.channel, t2.addr from ioc_by_channel t1, ioc_by_channel t2 where t2.facility = t1.facility and t2.channel = t1.channel and t1.facility = $1 and t1.channel in ($2, $3, $4, $5, $6, $7, $8, $9)") @@ -318,7 +331,44 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { if opts.abort_after_search == 1 { return Ok(()); } - let data_store = Arc::new(DataStore::new(pg_client, scy.clone(), ciq.sender()).await?); + let data_store = Arc::new(DataStore::new(pg_client.clone(), scy.clone(), ciq.sender()).await?); + let insert_item_queue = CommonInsertItemQueue::new(opts.insert_item_queue_cap); + + // TODO use a new stats struct + let store_stats = Arc::new(stats::CaConnStats::new()); + + let mut data_stores = vec![]; + for _ in 0..opts.insert_scylla_sessions { + let scy = scylla::SessionBuilder::new() + .known_node("sf-nube-14:19042") + .default_consistency(Consistency::One) + .use_keyspace("ks1", true) + .build() + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let scy = Arc::new(scy); + let data_store = Arc::new(DataStore::new(pg_client.clone(), scy.clone(), ciq.sender()).await?); + data_stores.push(data_store); + } + for i1 in 0..opts.insert_worker_count { + let data_store = data_stores[i1 * data_stores.len() / opts.insert_worker_count].clone(); + let stats = store_stats.clone(); + let recv = insert_item_queue.receiver(); + let fut = async move { + while let Ok(item) = recv.recv().await { + stats.store_worker_item_recv_inc(); + match crate::store::insert_item(item, &data_store, &stats).await { + Ok(_) => {} + Err(e) => { + // TODO back off but continue. + error!("insert worker sees error: {e:?}"); + break; + } + } + } + }; + tokio::spawn(fut); + } let mut conn_jhs = vec![]; let mut conn_stats = vec![]; for (host, channels) in channels_by_host { @@ -335,7 +385,14 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { continue; } }; - let mut conn = CaConn::new(tcp, addr, data_store.clone(), opts.array_truncate); + let mut conn = CaConn::new( + tcp, + addr, + data_store.clone(), + insert_item_queue.sender(), + opts.array_truncate, + opts.insert_queue_max, + ); conn_stats.push(conn.stats()); for c in channels { conn.channel_add(c); @@ -362,6 +419,7 @@ pub async fn ca_connect(opts: ListenFromFileOpts) -> Result<(), Error> { loop { tokio::time::sleep(Duration::from_millis(500)).await; let agg = CaConnStatsAgg::new(); + agg.push(&store_stats); for g in &conn_stats { agg.push(&g); } diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 804050f..807a9ca 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -1,15 +1,15 @@ -use super::proto::{self, CaItem, CaMsg, CaMsgTy, CaProto, EventAddRes}; +use super::proto::{self, CaItem, CaMsg, CaMsgTy, CaProto}; use super::store::DataStore; use crate::bsread::ChannelDescDecoded; -use crate::ca::proto::{CreateChan, EventAdd, HeadInfo, ReadNotify}; +use crate::ca::proto::{CreateChan, EventAdd, HeadInfo}; use crate::series::{Existence, SeriesId}; -use crate::store::ScyInsertFut; +use crate::store::{CommonInsertItemQueueSender, InsertItem}; use err::Error; -use futures_util::stream::{FuturesOrdered, FuturesUnordered}; +use futures_util::stream::FuturesOrdered; use futures_util::{Future, FutureExt, Stream, StreamExt, TryFutureExt}; use libc::c_int; use log::*; -use netpod::timeunits::SEC; +use netpod::timeunits::*; use netpod::{ScalarType, Shape}; use stats::{CaConnStats, IntervalEma}; use std::collections::{BTreeMap, VecDeque}; @@ -22,12 +22,9 @@ use std::time::{Duration, Instant, SystemTime}; use tokio::io::unix::AsyncFd; use tokio::net::TcpStream; -const INSERT_FUTS_MAX: usize = 2; -const INSERT_FUTS_LIM: usize = 16; -const TABLE_SERIES_MOD: u32 = 128; - #[derive(Debug)] enum ChannelError { + #[allow(unused)] NoSuccess, } @@ -42,11 +39,15 @@ enum MonitoringState { AddingEvent(SeriesId), Evented(SeriesId, EventedState), // TODO we also want to read while staying subscribed: + #[allow(unused)] Reading, + #[allow(unused)] Read, + #[allow(unused)] Muted, } +#[allow(unused)] #[derive(Debug)] struct CreatedState { cid: u32, @@ -60,6 +61,7 @@ struct CreatedState { ivl_ema: IntervalEma, } +#[allow(unused)] #[derive(Debug)] enum ChannelState { Init, @@ -91,199 +93,7 @@ impl IdStore { } } -// TODO test that errors are properly forwarded. -macro_rules! insert_scalar_impl { - ($fname:ident, $valty:ty, $qu_insert:ident) => { - fn $fname( - data_store: Arc, - // TODO maybe use a newtype? - futs_queue: &mut FuturesOrdered> + Send>>>, - series: SeriesId, - ts_msp: u64, - ts_lsp: u64, - val: $valty, - ts_msp_changed: bool, - st: ScalarType, - sh: Shape, - stats: Arc, - ) { - if futs_queue.len() >= INSERT_FUTS_LIM { - stats.inserts_discard.fetch_add(1, Ordering::AcqRel); - return; - } - let pulse = 0 as u64; - let params = ( - series.id() as i64, - ts_msp as i64, - ts_lsp as i64, - pulse as i64, - val, - ); - let fut3 = ScyInsertFut::new(data_store.scy.clone(), data_store.$qu_insert.clone(), params); - stats.inserts_val.fetch_add(1, Ordering::AcqRel); - let fut = if ts_msp_changed { - let fut1 = ScyInsertFut::new( - data_store.scy.clone(), - data_store.qu_insert_series.clone(), - ( - (series.id() as u32 % TABLE_SERIES_MOD) as i32, - series.id() as i64, - ts_msp as i64, - st.to_scylla_i32(), - sh.to_scylla_vec(), - ), - ); - let fut2 = ScyInsertFut::new( - data_store.scy.clone(), - data_store.qu_insert_ts_msp.clone(), - (series.id() as i64, ts_msp as i64), - ); - stats.inserts_msp.fetch_add(1, Ordering::AcqRel); - Box::pin(fut1.and_then(move |_| fut2).and_then(move |_| fut3)) as _ - } else { - Box::pin(fut3) as _ - }; - futs_queue.push(fut); - stats.inserts_queue_push_inc(); - } - }; -} - -// TODO test that errors are properly forwarded. -macro_rules! insert_array_impl { - ($fname:ident, $valty:ty, $qu_insert:ident) => { - fn $fname( - data_store: Arc, - // TODO maybe use a newtype? - futs_queue: &mut FuturesOrdered> + Send>>>, - series: SeriesId, - ts_msp: u64, - ts_lsp: u64, - val: Vec<$valty>, - ts_msp_changed: bool, - st: ScalarType, - sh: Shape, - stats: Arc, - ) { - if futs_queue.len() >= INSERT_FUTS_LIM { - stats.inserts_discard.fetch_add(1, Ordering::AcqRel); - return; - } - let pulse = 0 as u64; - let params = ( - series.id() as i64, - ts_msp as i64, - ts_lsp as i64, - pulse as i64, - val, - ); - let fut3 = ScyInsertFut::new(data_store.scy.clone(), data_store.$qu_insert.clone(), params); - stats.inserts_val.fetch_add(1, Ordering::AcqRel); - let fut = if ts_msp_changed { - let fut1 = ScyInsertFut::new( - data_store.scy.clone(), - data_store.qu_insert_series.clone(), - ( - (series.id() as u32 % TABLE_SERIES_MOD) as i32, - series.id() as i64, - ts_msp as i64, - st.to_scylla_i32(), - sh.to_scylla_vec(), - ), - ); - let fut2 = ScyInsertFut::new( - data_store.scy.clone(), - data_store.qu_insert_ts_msp.clone(), - (series.id() as i64, ts_msp as i64), - ); - stats.inserts_msp.fetch_add(1, Ordering::AcqRel); - Box::pin(fut1.and_then(move |_| fut2).and_then(move |_| fut3)) as _ - } else { - Box::pin(fut3) as _ - }; - futs_queue.push(fut); - stats.inserts_queue_push_inc(); - } - }; -} - -insert_scalar_impl!(insert_scalar_i8, i8, qu_insert_scalar_i8); -insert_scalar_impl!(insert_scalar_i16, i16, qu_insert_scalar_i16); -insert_scalar_impl!(insert_scalar_i32, i32, qu_insert_scalar_i32); -insert_scalar_impl!(insert_scalar_f32, f32, qu_insert_scalar_f32); -insert_scalar_impl!(insert_scalar_f64, f64, qu_insert_scalar_f64); -insert_scalar_impl!(insert_scalar_string, String, qu_insert_scalar_string); - -insert_array_impl!(insert_array_i8, i8, qu_insert_array_i8); -insert_array_impl!(insert_array_i16, i16, qu_insert_array_i16); -insert_array_impl!(insert_array_i32, i32, qu_insert_array_i32); -insert_array_impl!(insert_array_f32, f32, qu_insert_array_f32); -insert_array_impl!(insert_array_f64, f64, qu_insert_array_f64); - -macro_rules! match_scalar_value_insert { - ($stv:ident, $insf:ident, $val:expr, $comm:expr) => {{ - let (data_store, futs_queue, series, ts_msp, ts_lsp, ts_msp_changed, scalar_type, shape, stats2) = $comm; - match shape { - Shape::Scalar => match scalar_type { - ScalarType::$stv => $insf( - data_store, - futs_queue, - series, - ts_msp, - ts_lsp, - $val, - ts_msp_changed, - scalar_type, - shape, - stats2, - ), - _ => { - error!("unexpected value type insf {:?}", stringify!($insf)); - } - }, - _ => { - error!( - "unexpected value shape insf {:?} shape {:?}", - stringify!($insf), - shape - ); - } - } - }}; -} - -macro_rules! match_array_value_insert { - ($stv:ident, $insf:ident, $val:expr, $comm:expr) => {{ - let (data_store, futs_queue, series, ts_msp, ts_lsp, ts_msp_changed, scalar_type, shape, stats2) = $comm; - match shape { - Shape::Wave(_) => match scalar_type { - ScalarType::$stv => $insf( - data_store, - futs_queue, - series, - ts_msp, - ts_lsp, - $val, - ts_msp_changed, - scalar_type, - shape, - stats2, - ), - _ => { - error!("unexpected value type insf {:?}", stringify!($insf)); - } - }, - _ => { - error!( - "unexpected value shape insf {:?} shape {:?}", - stringify!($insf), - shape - ); - } - } - }}; -} - +#[allow(unused)] pub struct CaConn { state: CaConnState, proto: CaProto, @@ -298,11 +108,14 @@ pub struct CaConn { name_by_cid: BTreeMap, poll_count: usize, data_store: Arc, + insert_item_queue: VecDeque, + insert_item_sender: CommonInsertItemQueueSender, + insert_item_send_fut: Option>, fut_get_series: FuturesOrdered), Error>> + Send>>>, - value_insert_futs: FuturesOrdered> + Send>>>, remote_addr_dbg: SocketAddrV4, stats: Arc, + insert_queue_max: usize, } impl CaConn { @@ -310,7 +123,9 @@ impl CaConn { tcp: TcpStream, remote_addr_dbg: SocketAddrV4, data_store: Arc, + insert_item_sender: CommonInsertItemQueueSender, array_truncate: usize, + insert_queue_max: usize, ) -> Self { Self { state: CaConnState::Init, @@ -326,10 +141,13 @@ impl CaConn { name_by_cid: BTreeMap::new(), poll_count: 0, data_store, + insert_item_queue: VecDeque::new(), + insert_item_sender, + insert_item_send_fut: None, fut_get_series: FuturesOrdered::new(), - value_insert_futs: FuturesOrdered::new(), remote_addr_dbg, stats: Arc::new(CaConnStats::new()), + insert_queue_max, } } @@ -363,20 +181,29 @@ impl CaConn { } #[inline(never)] - fn handle_insert_futs(&mut self, cx: &mut Context) -> Result<(), Error> { + fn handle_insert_futs(&mut self, cx: &mut Context) -> Poll> { use Poll::*; - while self.value_insert_futs.len() > 0 { - match self.value_insert_futs.poll_next_unpin(cx) { - Pending => break, - _ => { - self.stats.inserts_queue_pop_inc(); - } + loop { + match self.insert_item_send_fut.as_mut() { + Some(fut) => match fut.poll_unpin(cx) { + Ready(Ok(())) => { + self.stats.inserts_queue_push_inc(); + self.insert_item_send_fut = None; + } + Ready(Err(_)) => break Ready(Err(Error::with_msg_no_trace(format!("can not send the item")))), + Pending => break Pending, + }, + None => {} + } + if let Some(item) = self.insert_item_queue.pop_front() { + let sender = unsafe { &*(&self.insert_item_sender as *const CommonInsertItemQueueSender) }; + self.insert_item_send_fut = Some(sender.send(item)); + } else { + break Ready(Ok(())); } } - Ok(()) } - #[inline(never)] fn handle_get_series_futs(&mut self, cx: &mut Context) -> Result<(), Error> { use Poll::*; while self.fut_get_series.len() > 0 { @@ -487,48 +314,19 @@ impl CaConn { self.ts_msp_last_by_series.insert(series.clone(), ts_msp); true }; - // TODO make sure that I only accept types I expect. - use crate::ca::proto::CaDataScalarValue::*; - use crate::ca::proto::CaDataValue::*; - let data_store = self.data_store.clone(); - let futs_queue = &mut self.value_insert_futs; - let comm = ( - data_store, - futs_queue, - series, + let item_queue = &mut self.insert_item_queue; + let item = InsertItem { + series: series.id(), ts_msp, ts_lsp, - ts_msp_changed, + msp_bump: ts_msp_changed, + pulse: 0, scalar_type, shape, - self.stats.clone(), - ); - match ev.value { - Scalar(v) => match v { - I8(val) => match_scalar_value_insert!(I8, insert_scalar_i8, val, comm), - I16(val) => match_scalar_value_insert!(I16, insert_scalar_i16, val, comm), - I32(val) => match_scalar_value_insert!(I32, insert_scalar_i32, val, comm), - F32(val) => match_scalar_value_insert!(F32, insert_scalar_f32, val, comm), - F64(val) => match_scalar_value_insert!(F64, insert_scalar_f64, val, comm), - String(val) => match_scalar_value_insert!(STRING, insert_scalar_string, val, comm), - _ => { - warn!("can not handle Scalar {:?}", v); - } - }, - Array(v) => { - use crate::ca::proto::CaDataArrayValue::*; - match v { - I8(val) => match_array_value_insert!(I8, insert_array_i8, val, comm), - I16(val) => match_array_value_insert!(I16, insert_array_i16, val, comm), - I32(val) => match_array_value_insert!(I32, insert_array_i32, val, comm), - F32(val) => match_array_value_insert!(F32, insert_array_f32, val, comm), - F64(val) => match_array_value_insert!(F64, insert_array_f64, val, comm), - _ => { - warn!("can not handle Array ty {} n {}", ev.data_type, ev.data_count); - } - } - } - } + val: ev.value, + }; + item_queue.push_back(item); + self.stats.insert_item_create_inc(); Ok(()) } @@ -585,46 +383,51 @@ impl CaConn { Ok(()) } + /* + Acts more like a stream? Can be: + Pending + Ready(no-more-work, something-was-done, error) + */ #[inline(never)] - fn handle_conn_listen(&mut self, cx: &mut Context) -> Option>>> { + fn handle_conn_listen(&mut self, cx: &mut Context) -> Poll>> { use Poll::*; match self.proto.poll_next_unpin(cx) { Ready(Some(k)) => match k { Ok(k) => match k { CaItem::Empty => { info!("CaItem::Empty"); - Some(Ready(Some(Ok(())))) + Ready(Some(Ok(()))) } CaItem::Msg(msg) => match msg.ty { CaMsgTy::VersionRes(n) => { if n < 12 || n > 13 { error!("See some unexpected version {n} channel search may not work."); - Some(Ready(Some(Ok(())))) + Ready(Some(Ok(()))) } else { if n != 13 { warn!("Received peer version {n}"); } self.state = CaConnState::PeerReady; - None + Ready(Some(Ok(()))) } } k => { warn!("Got some other unhandled message: {k:?}"); - Some(Ready(Some(Ok(())))) + Ready(Some(Ok(()))) } }, }, Err(e) => { error!("got error item from CaProto {e:?}"); - Some(Ready(Some(Ok(())))) + Ready(Some(Ok(()))) } }, Ready(None) => { warn!("CaProto is done {:?}", self.remote_addr_dbg); self.state = CaConnState::Done; - None + Ready(None) } - Pending => Some(Pending), + Pending => Pending, } } @@ -667,6 +470,8 @@ impl CaConn { Ok(()) } + // Can return: + // Pending, error, work-done (pending state unknown), no-more-work-ever-again. #[inline(never)] fn handle_peer_ready(&mut self, cx: &mut Context) -> Poll>> { use Poll::*; @@ -677,7 +482,7 @@ impl CaConn { let ts2 = Instant::now(); self.stats .time_check_channels_state_init - .fetch_add((ts2.duration_since(ts1) * 1000000).as_secs(), Ordering::Release); + .fetch_add((ts2.duration_since(ts1) * MS as u32).as_secs(), Ordering::Release); ts1 = ts2; let mut do_wake_again = false; if msgs_tmp.len() > 0 { @@ -745,8 +550,9 @@ impl CaConn { let ts2 = Instant::now(); self.stats .time_handle_event_add_res - .fetch_add((ts2.duration_since(ts1) * 1000000).as_secs(), Ordering::Release); + .fetch_add((ts2.duration_since(ts1) * MS as u32).as_secs(), Ordering::AcqRel); ts1 = ts2; + let _ = ts1; res? } _ => {} @@ -763,7 +569,7 @@ impl CaConn { Ready(None) => { warn!("CaProto is done"); self.state = CaConnState::Done; - Ready(Some(Ok(()))) + Ready(None) } Pending => Pending, }; @@ -784,65 +590,89 @@ impl Stream for CaConn { let ts_outer_1 = Instant::now(); let mut ts1 = ts_outer_1; self.poll_count += 1; - let ret = loop { - self.handle_insert_futs(cx)?; + // TODO factor out the inner loop: + let ret = 'outer: loop { + let q = self.handle_insert_futs(cx); let ts2 = Instant::now(); self.stats .poll_time_handle_insert_futs - .fetch_add((ts2.duration_since(ts1) * 1000000).as_secs(), Ordering::AcqRel); + .fetch_add((ts2.duration_since(ts1) * MS as u32).as_secs(), Ordering::AcqRel); ts1 = ts2; + match q { + Ready(_) => {} + Pending => break Pending, + } + self.handle_get_series_futs(cx)?; let ts2 = Instant::now(); self.stats .poll_time_get_series_futs - .fetch_add((ts2.duration_since(ts1) * 1000000).as_secs(), Ordering::AcqRel); + .fetch_add((ts2.duration_since(ts1) * MS as u32).as_secs(), Ordering::AcqRel); ts1 = ts2; - if self.value_insert_futs.len() >= INSERT_FUTS_MAX { - // TODO do not do more. - // But: can I assume that in this case we triggered a Pending? + + if self.insert_item_queue.len() >= self.insert_queue_max { break Pending; } - break match &self.state { - CaConnState::Init => { - let msg = CaMsg { ty: CaMsgTy::Version }; - self.proto.push_out(msg); - let msg = CaMsg { - ty: CaMsgTy::ClientName, - }; - self.proto.push_out(msg); - let msg = CaMsg { ty: CaMsgTy::HostName }; - self.proto.push_out(msg); - self.state = CaConnState::Listen; - continue; - } - CaConnState::Listen => match { - let res = self.handle_conn_listen(cx); - let ts2 = Instant::now(); - self.stats - .time_handle_conn_listen - .fetch_add((ts2.duration_since(ts1) * 1000000).as_secs(), Ordering::AcqRel); - ts1 = ts2; - res - } { - Some(k) => k, - None => continue, - }, - CaConnState::PeerReady => { - let res = self.handle_peer_ready(cx); - let ts2 = Instant::now(); - self.stats.time_handle_peer_ready_dur(ts2.duration_since(ts1)); - ts1 = ts2; - res - } - CaConnState::Done => Ready(None), + + break loop { + break match &self.state { + CaConnState::Init => { + let msg = CaMsg { ty: CaMsgTy::Version }; + self.proto.push_out(msg); + let msg = CaMsg { + ty: CaMsgTy::ClientName, + }; + self.proto.push_out(msg); + let msg = CaMsg { ty: CaMsgTy::HostName }; + self.proto.push_out(msg); + self.state = CaConnState::Listen; + continue 'outer; + } + CaConnState::Listen => match { + let res = self.handle_conn_listen(cx); + let ts2 = Instant::now(); + self.stats + .time_handle_conn_listen + .fetch_add((ts2.duration_since(ts1) * MS as u32).as_secs(), Ordering::AcqRel); + ts1 = ts2; + res + } { + Ready(Some(Ok(()))) => Ready(Some(Ok(()))), + Ready(Some(Err(e))) => Ready(Some(Err(e))), + Ready(None) => continue 'outer, + Pending => Pending, + }, + CaConnState::PeerReady => { + let res = self.handle_peer_ready(cx); + let ts2 = Instant::now(); + self.stats.time_handle_peer_ready_dur(ts2.duration_since(ts1)); + ts1 = ts2; + match res { + Ready(Some(Ok(()))) => { + if self.insert_item_queue.len() >= self.insert_queue_max { + continue 'outer; + } else { + continue; + } + } + Ready(Some(Err(e))) => Ready(Some(Err(e))), + Ready(None) => { + // TODO even though protocol is done, we might still have e.g. insert items to flush! + Ready(None) + } + Pending => Pending, + } + } + CaConnState::Done => { + // TODO handle better + Pending + } + }; }; }; - let nn = self.value_insert_futs.len(); - if nn > INSERT_FUTS_LIM { - warn!("value_insert_futs len {nn}"); - } let ts_outer_2 = Instant::now(); self.stats.poll_time_all_dur(ts_outer_2.duration_since(ts_outer_1)); + // TODO currently, this will never stop by itself ret } } diff --git a/netfetch/src/ca/store.rs b/netfetch/src/ca/store.rs index 41a043f..bea9a7c 100644 --- a/netfetch/src/ca/store.rs +++ b/netfetch/src/ca/store.rs @@ -1,6 +1,6 @@ use crate::bsread::ChannelDescDecoded; use crate::series::{Existence, SeriesId}; -use crate::store::{CommonInsertQueue, CommonInsertQueueSender}; +use crate::store::CommonInsertQueueSender; use async_channel::{Receiver, Sender}; use err::Error; use scylla::prepared_statement::PreparedStatement; @@ -8,6 +8,7 @@ use scylla::Session as ScySession; use std::sync::Arc; use tokio_postgres::Client as PgClient; +#[allow(unused)] pub struct RegisterJob { desc: ChannelDescDecoded, } @@ -18,11 +19,13 @@ impl RegisterJob { } } +#[allow(unused)] pub struct RegisterChannel { tx: Sender, rx: Receiver, } +#[allow(unused)] pub struct ChannelRegistry { scy: Arc, pg_client: Arc, diff --git a/netfetch/src/stats.rs b/netfetch/src/stats.rs deleted file mode 100644 index e69de29..0000000 diff --git a/netfetch/src/store.rs b/netfetch/src/store.rs index 98a2753..fb46d9a 100644 --- a/netfetch/src/store.rs +++ b/netfetch/src/store.rs @@ -1,13 +1,16 @@ +use crate::ca::proto::{CaDataArrayValue, CaDataScalarValue, CaDataValue}; +use crate::ca::store::DataStore; use crate::errconv::ErrConv; use err::Error; use futures_util::stream::FuturesOrdered; use futures_util::{Future, FutureExt, Stream, StreamExt}; use log::*; +use netpod::{ScalarType, Shape}; use scylla::frame::value::ValueList; use scylla::prepared_statement::PreparedStatement; use scylla::transport::errors::QueryError; use scylla::{QueryResult, Session as ScySession}; -use std::collections::VecDeque; +use stats::CaConnStats; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -15,6 +18,7 @@ use std::time::Instant; const CHANNEL_CAP: usize = 128; const POLLING_CAP: usize = 32; +const TABLE_SERIES_MOD: u32 = 128; pub struct ScyInsertFut { #[allow(unused)] @@ -84,6 +88,158 @@ impl Future for ScyInsertFut { } } +#[derive(Debug)] +pub struct InsertItem { + pub series: u64, + pub ts_msp: u64, + pub ts_lsp: u64, + pub msp_bump: bool, + pub pulse: u64, + pub scalar_type: ScalarType, + pub shape: Shape, + pub val: CaDataValue, +} + +pub struct CommonInsertItemQueueSender { + sender: async_channel::Sender, +} + +impl CommonInsertItemQueueSender { + #[inline(always)] + pub fn send(&self, k: InsertItem) -> async_channel::Send { + self.sender.send(k) + } +} + +pub struct CommonInsertItemQueue { + sender: async_channel::Sender, + recv: async_channel::Receiver, +} + +impl CommonInsertItemQueue { + pub fn new(cap: usize) -> Self { + let (tx, rx) = async_channel::bounded(cap); + Self { + sender: tx.clone(), + recv: rx, + } + } + + pub fn sender(&self) -> CommonInsertItemQueueSender { + CommonInsertItemQueueSender { + sender: self.sender.clone(), + } + } + + pub fn receiver(&self) -> async_channel::Receiver { + self.recv.clone() + } +} + +struct InsParCom { + series: u64, + ts_msp: u64, + ts_lsp: u64, + pulse: u64, +} + +async fn insert_scalar_gen( + par: InsParCom, + val: ST, + qu: &PreparedStatement, + data_store: &DataStore, +) -> Result<(), Error> +where + ST: scylla::frame::value::Value, +{ + let params = ( + par.series as i64, + par.ts_msp as i64, + par.ts_lsp as i64, + par.pulse as i64, + val, + ); + data_store.scy.execute(qu, params).await.err_conv()?; + Ok(()) +} + +async fn insert_array_gen( + par: InsParCom, + val: Vec, + qu: &PreparedStatement, + data_store: &DataStore, +) -> Result<(), Error> +where + ST: scylla::frame::value::Value, +{ + let params = ( + par.series as i64, + par.ts_msp as i64, + par.ts_lsp as i64, + par.pulse as i64, + val, + ); + data_store.scy.execute(qu, params).await.err_conv()?; + Ok(()) +} + +pub async fn insert_item(item: InsertItem, data_store: &DataStore, stats: &CaConnStats) -> Result<(), Error> { + if item.msp_bump { + let params = ( + (item.series as u32 % TABLE_SERIES_MOD) as i32, + item.series as i64, + item.ts_msp as i64, + item.scalar_type.to_scylla_i32(), + item.shape.to_scylla_vec(), + ); + data_store + .scy + .execute(&data_store.qu_insert_series, params) + .await + .err_conv()?; + let params = (item.series as i64, item.ts_msp as i64); + data_store + .scy + .execute(&data_store.qu_insert_ts_msp, params) + .await + .err_conv()?; + stats.inserts_msp_inc() + } + let par = InsParCom { + series: item.series, + ts_msp: item.ts_msp, + ts_lsp: item.ts_lsp, + pulse: item.pulse, + }; + use CaDataValue::*; + match item.val { + Scalar(val) => { + use CaDataScalarValue::*; + match val { + I8(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i8, &data_store).await?, + I16(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i16, &data_store).await?, + Enum(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i16, &data_store).await?, + I32(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i32, &data_store).await?, + F32(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_f32, &data_store).await?, + F64(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_f64, &data_store).await?, + String(_) => (), + } + } + Array(val) => { + use CaDataArrayValue::*; + match val { + I8(val) => insert_array_gen(par, val, &data_store.qu_insert_array_i8, &data_store).await?, + I16(val) => insert_array_gen(par, val, &data_store.qu_insert_array_i16, &data_store).await?, + I32(val) => insert_array_gen(par, val, &data_store.qu_insert_array_i32, &data_store).await?, + F32(val) => insert_array_gen(par, val, &data_store.qu_insert_array_f32, &data_store).await?, + F64(val) => insert_array_gen(par, val, &data_store.qu_insert_array_f64, &data_store).await?, + } + } + } + stats.inserts_val_inc(); + Ok(()) +} + type FutTy = Pin> + Send>>; pub struct CommonInsertQueueSender { diff --git a/netfetch/src/zmtp.rs b/netfetch/src/zmtp.rs index b5f3b0c..e61425a 100644 --- a/netfetch/src/zmtp.rs +++ b/netfetch/src/zmtp.rs @@ -70,7 +70,7 @@ pub fn __get_series_id(chn: &ChannelDesc) -> u64 { u64::from_le_bytes(f.as_slice()[0..8].try_into().unwrap()) } -pub async fn get_series_id(scy: &ScySession, chn: &ChannelDescDecoded) -> Result { +pub async fn get_series_id(_scy: &ScySession, _chn: &ChannelDescDecoded) -> Result { error!("TODO get_series_id"); err::todoval() } diff --git a/stats/src/stats.rs b/stats/src/stats.rs index e794a79..99ab8c7 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -115,11 +115,12 @@ stats_proc::stats_struct!(( stats_struct( name(CaConnStats), counters( + insert_item_create, inserts_val, inserts_msp, - inserts_discard, inserts_queue_push, inserts_queue_pop, + store_worker_item_recv, poll_time_all, poll_time_handle_insert_futs, poll_time_get_series_futs,