diff --git a/daqingest/src/bin/daqingest.rs b/daqingest/src/bin/daqingest.rs index beca3fc..400b3df 100644 --- a/daqingest/src/bin/daqingest.rs +++ b/daqingest/src/bin/daqingest.rs @@ -5,6 +5,7 @@ use log::*; use netfetch::conf::CaIngestOpts; use netfetch::conf::parse_config; use netpod::Database; +use netpod::ttl::RetentionTime; use scywr::config::ScyllaIngestConfig; use taskrun::TracingMode; @@ -160,25 +161,33 @@ async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> { } async fn scylla_schema_check(opts: CaIngestOpts, do_change: bool) -> Result<(), Error> { - todo!("scylla_schema_check config"); - let scy_confs = opts.scylla_insert_set_conf(0); + let mut confs = Vec::new(); + confs.push(opts.scylla_insert_set_conf_main()); + if let Some(x) = opts.scylla_insert_set_conf_2nd() { + confs.push(x); + } let opstr = if do_change { "change" } else { "check" }; info!("start scylla schema {}", opstr); - info!("{:?}", opts.scylla_config_st()); - info!("{:?}", opts.scylla_config_mt()); - info!("{:?}", opts.scylla_config_lt()); - info!("{:?}", opts.scylla_config_st_rf1()); - scywr::schema::migrate_scylla_data_schema_all_rt( - [ - &opts.scylla_config_st(), - &opts.scylla_config_mt(), - &opts.scylla_config_lt(), - &opts.scylla_config_st_rf1(), - ], - do_change, - ) - .await - .map_err(Error::from_string)?; + for conf in confs { + info!("{:?}", conf.st_rf3()); + info!("{:?}", conf.mt_rf3()); + info!("{:?}", conf.lt_rf3()); + info!("{:?}", conf.st_rf1()); + // TODO attach RetentionTime to the config struct + let rts = [ + RetentionTime::Short, + RetentionTime::Medium, + RetentionTime::Long, + RetentionTime::Short, + ]; + scywr::schema::migrate_scylla_data_schema_all_rt( + rts, + [&conf.st_rf3(), &conf.mt_rf3(), &conf.lt_rf3(), &conf.st_rf1()], + do_change, + ) + .await + .map_err(Error::from_string)?; + } info!("stop scylla schema {}", opstr); Ok(()) } diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index eb5411d..ff362aa 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -259,8 +259,8 @@ impl Daemon { .map_err(Error::from_string)?; insert_workers_jhs.extend(jh); } else { - let scyset1 = ingest_opts.scylla_insert_set_conf(0).unwrap(); - if let Some(scyset2) = ingest_opts.scylla_insert_set_conf(1) { + let scyset1 = ingest_opts.scylla_insert_set_conf_main(); + if let Some(scyset2) = ingest_opts.scylla_insert_set_conf_2nd() { let (iqrx1, iqrx2) = iqrx.clone_2(); let jhs = spawn_scylla_insert_workers( &scyset1, @@ -911,7 +911,14 @@ pub async fn run(opts: CaIngestOpts, channels_config: Option) -> warn!("scylla_disable config flag enabled"); } else { info!("start scylla schema check"); + let rts = [ + RetentionTime::Short, + RetentionTime::Medium, + RetentionTime::Long, + RetentionTime::Short, + ]; scywr::schema::migrate_scylla_data_schema_all_rt( + rts, [ &opts.scylla_config_st(), &opts.scylla_config_mt(), diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 5545752..807faf0 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -82,7 +82,7 @@ use tokio::net::TcpStream; const CONNECTING_TIMEOUT: Duration = Duration::from_millis(1000 * 6); const CHANNEL_STATUS_EMIT_IVL: Duration = Duration::from_millis(1000 * 8); -const IOC_PING_IVL: Duration = Duration::from_millis(1000 * 180); +const IOC_PING_IVL: Duration = Duration::from_millis(1000 * 60 * 10); const MONITOR_POLL_TIMEOUT: Duration = Duration::from_millis(1000 * 6); const TIMEOUT_CHANNEL_CLOSING: Duration = Duration::from_millis(1000 * 8); const TIMEOUT_PONG_WAIT: Duration = Duration::from_millis(1000 * 10); @@ -90,7 +90,7 @@ const READ_CHANNEL_VALUE_STATUS_EMIT_QUIET_MIN: Duration = Duration::from_millis const SILENCE_READ_NEXT_IVL: Duration = Duration::from_millis(1000 * 200); const POLL_READ_TIMEOUT: Duration = Duration::from_millis(1000 * 10); const DO_RATE_CHECK: bool = false; -const CHANNEL_STATUS_PONG_QUIET: Duration = Duration::from_millis(1000 * 60 * 60); +const CHANNEL_STATUS_PONG_QUIET: Duration = Duration::from_millis(1000 * 60 * 5); const METRICS_EMIT_IVL: Duration = Duration::from_millis(1000 * 1); macro_rules! trace3 { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); } ); } @@ -779,6 +779,7 @@ impl WriterStatus { &mut self, item: ChannelStatusItem, deque: &mut VecDeque, + mett: &mut stats::mett::CaConnMetrics, ) -> Result<(), Error> { let tsev = TsNano::from_system_time(SystemTime::now()); let (ts, val) = item.to_ts_val(); @@ -789,6 +790,7 @@ impl WriterStatus { tsev, deque, )?; + mett.emit_channel_status_item().inc(); Ok(()) } } @@ -1064,7 +1066,7 @@ impl<'a> EventAddIngestRefobj<'a> { status: ChannelStatus::MonitoringSilenceReadUnchanged, }; if wrst - .emit_channel_status_item(item, CaConn::channel_status_qu(self.iqdqs)) + .emit_channel_status_item(item, CaConn::channel_status_qu(self.iqdqs), mett) .is_err() { mett.logic_error().inc(); @@ -1527,10 +1529,6 @@ impl CaConn { &mut iqdqs.lt_rf3_qu } - fn channel_status_pong_qu(iqdqs: &mut InsertDeques) -> &mut VecDeque { - &mut iqdqs.st_rf3_qu - } - pub fn conn_command_tx(&self) -> Sender { self.conn_command_tx.as_ref().get_ref().clone() } @@ -1835,8 +1833,11 @@ impl CaConn { cssid: st2.channel.cssid.clone(), status: ChannelStatus::Opened, }; - conf.wrst - .emit_channel_status_item(item, Self::channel_status_qu(&mut self.iqdqs))?; + conf.wrst.emit_channel_status_item( + item, + Self::channel_status_qu(&mut self.iqdqs), + &mut self.mett, + )?; } if let Some((ivl,)) = conf_poll_conf { let ivl = Duration::from_millis(ivl); @@ -1972,7 +1973,7 @@ impl CaConn { status: ChannelStatus::Closed(ChannelStatusClosedReason::ChannelRemove), }; let qu = Self::channel_status_qu(&mut self.iqdqs); - if conf.wrst.emit_channel_status_item(item, qu).is_err() { + if conf.wrst.emit_channel_status_item(item, qu, &mut self.mett).is_err() { self.mett.logic_error().inc(); } // TODO shutdown the internal writer structures. @@ -2089,7 +2090,11 @@ impl CaConn { cssid: conf.state.cssid(), status: ChannelStatus::Closed(channel_reason.clone()), }; - if conf.wrst.emit_channel_status_item(item, status_qu).is_err() { + if conf + .wrst + .emit_channel_status_item(item, status_qu, &mut self.mett) + .is_err() + { self.mett.logic_error().inc(); } } @@ -2177,6 +2182,7 @@ impl CaConn { tsnow: Instant, tscaproto: Instant, ) -> Result<(), Error> { + self.mett.fn_handle_event_add_res().inc(); let subid = Subid(ev.subid); // TODO handle subid-not-found which can also be peer error: let cid = if let Some(x) = self.cid_by_subid.get(&subid) { @@ -2400,6 +2406,7 @@ impl CaConn { tsnow: Instant, tscaproto: Instant, ) -> Result<(), Error> { + self.mett.fn_handle_read_notify_res().inc(); // trace!("handle_read_notify_res {ev:?}"); // TODO can not rely on the SID in the response. let sid_ev = Sid(ev.sid); @@ -2494,6 +2501,7 @@ impl CaConn { ch_wrst.emit_channel_status_item( item, Self::channel_status_qu(&mut self.iqdqs), + &mut self.mett, )?; } // NOTE we do not update the last value in this ev handler. @@ -2516,6 +2524,7 @@ impl CaConn { ch_wrst.emit_channel_status_item( item, Self::channel_status_qu(&mut self.iqdqs), + &mut self.mett, )?; } } @@ -2535,6 +2544,7 @@ impl CaConn { ch_wrst.emit_channel_status_item( item, Self::channel_status_qu(&mut self.iqdqs), + &mut self.mett, )?; } } @@ -2754,8 +2764,11 @@ impl CaConn { cssid: st2.channel.cssid.clone(), status: ChannelStatus::MonitoringSilenceReadStart, }; - conf.wrst - .emit_channel_status_item(item, Self::channel_status_qu(&mut self.iqdqs))?; + conf.wrst.emit_channel_status_item( + item, + Self::channel_status_qu(&mut self.iqdqs), + &mut self.mett, + )?; } } } @@ -2777,8 +2790,11 @@ impl CaConn { cssid: st2.channel.cssid.clone(), status: ChannelStatus::MonitoringSilenceReadTimeout, }; - conf.wrst - .emit_channel_status_item(item, Self::channel_status_qu(&mut self.iqdqs))?; + conf.wrst.emit_channel_status_item( + item, + Self::channel_status_qu(&mut self.iqdqs), + &mut self.mett, + )?; } if false { // Here we try to close the channel at hand. @@ -2855,8 +2871,11 @@ impl CaConn { cssid: st2.channel.cssid.clone(), status: ChannelStatus::PollingReadTimeout, }; - conf.wrst - .emit_channel_status_item(item, Self::channel_status_qu(&mut self.iqdqs))?; + conf.wrst.emit_channel_status_item( + item, + Self::channel_status_qu(&mut self.iqdqs), + &mut self.mett, + )?; } } } @@ -2894,6 +2913,25 @@ impl CaConn { value: CaConnEventValue::EchoTimeout, }; self.ca_conn_event_out_queue.push_back(item); + for (_cid, ch_conf) in &mut self.channels { + match &mut ch_conf.state { + ChannelState::Writable(st2) => { + let item = ChannelStatusItem { + ts: self.tmp_ts_poll, + cssid: st2.channel.cssid.clone(), + status: ChannelStatus::PongTimeout, + }; + ch_conf.wrst.emit_channel_status_item( + item, + Self::channel_status_qu(&mut self.iqdqs), + &mut self.mett, + )?; + } + _ => { + // TODO count metrics + } + } + } self.trigger_shutdown(ShutdownReason::IocTimeout); } } else { @@ -2904,6 +2942,25 @@ impl CaConn { self.ioc_ping_last = Some(self.tmp_ts_poll); let msg = CaMsg::from_ty_ts(CaMsgTy::Echo, tsnow); proto.push_out(msg); + for (_cid, ch_conf) in &mut self.channels { + match &mut ch_conf.state { + ChannelState::Writable(st2) => { + let item = ChannelStatusItem { + ts: self.tmp_ts_poll, + cssid: st2.channel.cssid.clone(), + status: ChannelStatus::Ping, + }; + ch_conf.wrst.emit_channel_status_item( + item, + Self::channel_status_qu(&mut self.iqdqs), + &mut self.mett, + )?; + } + _ => { + // TODO count metrics + } + } + } } else { self.mett.ping_no_proto().inc(); info!("can not ping {} no proto", self.remote_addr_dbg); @@ -3619,9 +3676,9 @@ impl CaConn { self.ts_channel_status_pong_last = self.poll_tsnow; Self::channel_status_qu(&mut self.iqdqs) } else { - Self::channel_status_pong_qu(&mut self.iqdqs) + Self::channel_status_qu(&mut self.iqdqs) }; - if ch.wrst.emit_channel_status_item(item, qu).is_err() { + if ch.wrst.emit_channel_status_item(item, qu, &mut self.mett).is_err() { self.mett.logic_error().inc(); } } diff --git a/netfetch/src/conf.rs b/netfetch/src/conf.rs index 2dcd285..848ad95 100644 --- a/netfetch/src/conf.rs +++ b/netfetch/src/conf.rs @@ -32,6 +32,7 @@ pub struct CaIngestOpts { scylla_mt: ScyllaRtConf, scylla_lt: ScyllaRtConf, scylla_st_rf1: ScyllaRtConf, + scylla_2nd: Option, array_truncate: Option, insert_worker_count: Option, insert_worker_concurrency: Option, @@ -70,13 +71,23 @@ impl CaIngestOpts { &self.postgresql } - pub fn scylla_insert_set_conf(&self, n: usize) -> Option { - if n == 0 { + pub fn scylla_insert_set_conf_main(&self) -> ScyllaInsertsetConf { + let ret = ScyllaInsertsetConf { + st_rf1: self.scylla_config_st_rf1(), + st_rf3: self.scylla_config_st(), + mt_rf3: self.scylla_config_mt(), + lt_rf3: self.scylla_config_lt(), + }; + ret + } + + pub fn scylla_insert_set_conf_2nd(&self) -> Option { + if let Some(cc) = self.scylla_2nd.as_ref() { let ret = ScyllaInsertsetConf { - st_rf1: self.scylla_config_st_rf1(), - st_rf3: self.scylla_config_st(), - mt_rf3: self.scylla_config_mt(), - lt_rf3: self.scylla_config_lt(), + st_rf1: cc.scylla_config_st_rf1(), + st_rf3: cc.scylla_config_st(), + mt_rf3: cc.scylla_config_mt(), + lt_rf3: cc.scylla_config_lt(), }; Some(ret) } else { @@ -193,6 +204,52 @@ impl CaIngestOpts { } } +#[derive(Clone, Debug, Deserialize)] +pub struct ScyllaConfigBlockV2 { + scylla: ScyllaConfigBlockV2MainHosts, + scylla_st: ScyllaRtConf, + scylla_mt: ScyllaRtConf, + scylla_lt: ScyllaRtConf, + scylla_st_rf1: ScyllaRtConf, +} + +impl ScyllaConfigBlockV2 { + pub fn scylla_config_st(&self) -> ScyllaIngestConfig { + let c = &self.scylla_st; + let hosts = self.fill_hosts_if_empty(c); + ScyllaIngestConfig::new(hosts, c.keyspace.clone()) + } + + pub fn scylla_config_mt(&self) -> ScyllaIngestConfig { + let c = &self.scylla_mt; + let hosts = self.fill_hosts_if_empty(c); + ScyllaIngestConfig::new(hosts, c.keyspace.clone()) + } + + pub fn scylla_config_lt(&self) -> ScyllaIngestConfig { + let c = &self.scylla_lt; + let hosts = self.fill_hosts_if_empty(c); + ScyllaIngestConfig::new(hosts, c.keyspace.clone()) + } + + pub fn scylla_config_st_rf1(&self) -> ScyllaIngestConfig { + let c = &self.scylla_st_rf1; + let hosts = self.fill_hosts_if_empty(c); + ScyllaIngestConfig::new(hosts, c.keyspace.clone()) + } + + fn fill_hosts_if_empty(&self, c: &ScyllaRtConf) -> Vec { + c.hosts + .as_ref() + .map_or_else(|| self.scylla.hosts.clone(), |x| x.clone()) + } +} + +#[derive(Clone, Debug, Deserialize)] +struct ScyllaConfigBlockV2MainHosts { + hosts: Vec, +} + #[test] fn parse_config_minimal() { let conf = r###" @@ -342,6 +399,11 @@ scylla_2nd: assert_eq!(conf.search.get(0), Some(&"172.26.0.255".to_string())); assert_eq!(conf.scylla_config_st().hosts().get(1), Some(&"node2:19042".to_string())); assert_eq!(conf.scylla_config_lt().hosts().get(1), Some(&"node4:19042".to_string())); + assert_eq!(conf.scylla_2nd.is_some(), true); + { + let scy = conf.scylla_2nd.as_ref().unwrap(); + assert_eq!(scy.scylla.hosts.len(), 1); + } assert_eq!(conf.timeout, Some(Duration::from_millis(1000 * (60 * 10 + 3) + 45))); } diff --git a/scywr/src/config.rs b/scywr/src/config.rs index 53a32e5..7fd471f 100644 --- a/scywr/src/config.rs +++ b/scywr/src/config.rs @@ -1,3 +1,4 @@ +use netpod::ttl::RetentionTime; use serde::Deserialize; #[derive(Debug, Clone, Deserialize)] @@ -26,4 +27,8 @@ impl ScyllaIngestConfig { pub fn hosts(&self) -> &Vec { &self.hosts } + + pub fn short_name(&self, rt: RetentionTime) -> String { + format!("Scyconf {{ {:?}, {:?}, {:?} }}", self.hosts.get(0), self.keyspace, rt) + } } diff --git a/scywr/src/insertworker.rs b/scywr/src/insertworker.rs index 078ae9d..d0960cc 100644 --- a/scywr/src/insertworker.rs +++ b/scywr/src/insertworker.rs @@ -31,19 +31,19 @@ use std::time::Instant; use taskrun::tokio; use tokio::task::JoinHandle; -macro_rules! error { ($($arg:expr),*) => ( if true { log::error!($($arg),*); } ); } +macro_rules! error { ($($arg:tt)*) => ( if true { log::error!($($arg)*); } ); } -macro_rules! warn { ($($arg:expr),*) => ( if true { log::warn!($($arg),*); } ); } +macro_rules! warn { ($($arg:tt)*) => ( if true { log::warn!($($arg)*); } ); } -macro_rules! trace2 { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ); } +macro_rules! trace2 { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); } ); } -macro_rules! trace_transform { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ); } +macro_rules! trace_transform { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); } ); } -macro_rules! trace_inspect { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ); } +macro_rules! trace_inspect { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); } ); } -macro_rules! trace_item_execute { ($($arg:expr),*) => ( if false { log::trace!($($arg),*); } ); } +macro_rules! trace_item_execute { ($($arg:tt)*) => ( if false { log::trace!($($arg)*); } ); } -macro_rules! debug_setup { ($($arg:expr),*) => ( if false { log::debug!($($arg),*); } ); } +macro_rules! debug_setup { ($($arg:tt)*) => ( if false { log::debug!($($arg)*); } ); } autoerr::create_error_v1!( name(Error, "ScyllaInsertWorker"), @@ -169,8 +169,9 @@ struct FutTrackDt { ts1: Instant, ts2: Instant, ts_net: Instant, - poll1: bool, + npoll: u16, fut: F, + jobkind: FutJobKind, } impl FutTrackDt { @@ -180,8 +181,9 @@ impl FutTrackDt { ts1: tsnow, ts2: tsnow, ts_net: job.ts_net, - poll1: false, + npoll: 0, fut: job.fut, + jobkind: job.jobkind, } } } @@ -190,16 +192,16 @@ impl Future for FutTrackDt where F: Future + Unpin, { - type Output = (Instant, Instant, Instant, F::Output); + type Output = (Instant, Instant, Instant, F::Output, FutJobKind); fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { use Poll::*; - if self.poll1 == false { - self.poll1 = true; + if self.npoll == 0 { self.ts2 = Instant::now(); } + self.npoll = self.npoll.saturating_add(1); match self.as_mut().fut.poll_unpin(cx) { - Ready(x) => Ready((self.ts_net, self.ts1, self.ts2, x)), + Ready(x) => Ready((self.ts_net, self.ts1, self.ts2, x, self.jobkind.clone())), Pending => Pending, } } @@ -236,9 +238,29 @@ async fn worker_streamed( .buffer_unordered(concurrency); let mut stream = Box::pin(stream); debug_setup!("waiting for item"); - while let Some((ts_net, ts1, ts2, item)) = stream.next().await { + while let Some((ts_net, ts1, ts2, item, jobkind)) = stream.next().await { trace_item_execute!("see item"); let tsnow = Instant::now(); + match jobkind { + FutJobKind::SeriesData => { + mett.jobtrans().SeriesData().inc(); + } + FutJobKind::SeriesMsp => { + mett.jobtrans().SeriesMsp().inc(); + } + FutJobKind::TimeBinSimpleF32V02 => { + mett.jobtrans().TimeBinSimpleF32V02().inc(); + } + FutJobKind::BinWriteIndexV04 => { + mett.jobtrans().BinWriteIndexV04().inc(); + } + FutJobKind::Accounting => { + mett.jobtrans().Accounting().inc(); + } + FutJobKind::AccountingRecv => { + mett.jobtrans().AccountingRecv().inc(); + } + } match item { Ok(_) => { mett.job_ok().inc(); @@ -282,9 +304,20 @@ async fn worker_streamed( Ok(()) } +#[derive(Debug, Clone)] +enum FutJobKind { + SeriesData, + SeriesMsp, + TimeBinSimpleF32V02, + BinWriteIndexV04, + Accounting, + AccountingRecv, +} + struct FutJob { fut: InsertFut, ts_net: Instant, + jobkind: FutJobKind, } fn transform_to_db_futures( @@ -296,11 +329,7 @@ where S: Stream>, { trace_transform!("transform_to_db_futures begin"); - // TODO possible without box? - // let item_inp = Box::pin(item_inp); item_inp.map(move |batch| { - // TODO - // stats.item_recv.inc(); trace_transform!("transform_to_db_futures have batch len {}", batch.len()); let tsnow = Instant::now(); let mut res = Vec::with_capacity(32); @@ -400,6 +429,7 @@ fn prepare_msp_insert_futs(item: MspItem, data_store: &Arc) -> SmallV let fut = FutJob { fut, ts_net: item.ts_net(), + jobkind: FutJobKind::SeriesMsp, }; let futs = smallvec![fut]; futs @@ -412,6 +442,7 @@ fn prepare_query_insert_futs(item: InsertItem, data_store: &Arc) -> S let fut = FutJob { fut, ts_net: item_ts_net, + jobkind: FutJobKind::SeriesData, }; let futs = smallvec![fut]; futs @@ -439,7 +470,11 @@ fn prepare_timebin_v02_insert_futs( data_store.qu_insert_binned_scalar_f32_v02.clone(), params, ); - let fut = FutJob { fut, ts_net: tsnow }; + let fut = FutJob { + fut, + ts_net: tsnow, + jobkind: FutJobKind::TimeBinSimpleF32V02, + }; let futs = smallvec![fut]; // TODO match on the query result: @@ -467,7 +502,11 @@ fn prepare_bin_write_index_v04_insert_futs( data_store.qu_insert_bin_write_index_v04.clone(), params, ); - let fut = FutJob { fut, ts_net: tsnow }; + let fut = FutJob { + fut, + ts_net: tsnow, + jobkind: FutJobKind::BinWriteIndexV04, + }; let futs = smallvec![fut]; // TODO match on the query result: @@ -497,7 +536,11 @@ fn prepare_accounting_insert_futs( item.bytes, ); let fut = InsertFut::new(data_store.scy.clone(), data_store.qu_account_00.clone(), params); - let fut = FutJob { fut, ts_net: tsnow }; + let fut = FutJob { + fut, + ts_net: tsnow, + jobkind: FutJobKind::Accounting, + }; let futs = smallvec![fut]; futs } @@ -515,7 +558,11 @@ fn prepare_accounting_recv_insert_futs( item.bytes, ); let fut = InsertFut::new(data_store.scy.clone(), data_store.qu_account_recv_00.clone(), params); - let fut = FutJob { fut, ts_net: tsnow }; + let fut = FutJob { + fut, + ts_net: tsnow, + jobkind: FutJobKind::AccountingRecv, + }; let futs = smallvec![fut]; futs } diff --git a/scywr/src/schema.rs b/scywr/src/schema.rs index a5a831d..e806a9d 100644 --- a/scywr/src/schema.rs +++ b/scywr/src/schema.rs @@ -37,12 +37,20 @@ impl From for Error { } struct Changeset { + name: String, todo: Vec, } impl Changeset { fn new() -> Self { - Self { todo: Vec::new() } + Self { + name: String::new(), + todo: Vec::new(), + } + } + + fn set_name(&mut self, x: String) { + self.name = x; } fn add_todo(&mut self, cql: String) { @@ -55,7 +63,7 @@ impl Changeset { fn log_statements(&self) { for q in &self.todo { - info!("would execute:\n{}\n", q); + info!("would execute:\n{}\n{}\n", self.name, q); } } } @@ -717,16 +725,11 @@ async fn migrate_scylla_data_schema( } pub async fn migrate_scylla_data_schema_all_rt( + rts: [RetentionTime; 4], scyconfs: [&ScyllaIngestConfig; 4], do_change: bool, ) -> Result<(), Error> { let mut chsa = [Changeset::new(), Changeset::new(), Changeset::new(), Changeset::new()]; - let rts = [ - RetentionTime::Short, - RetentionTime::Medium, - RetentionTime::Long, - RetentionTime::Short, - ]; let rfs = [3, 3, 3, 1]; for (((rt, scyconf), chs), rf) in rts .clone() @@ -735,12 +738,13 @@ pub async fn migrate_scylla_data_schema_all_rt( .zip(chsa.iter_mut()) .zip(rfs.iter().map(|&x| x)) { + chs.set_name(scyconf.short_name(rt.clone())); migrate_scylla_data_schema(scyconf, rt, rf, chs).await?; } let todo = chsa.iter().any(|x| x.has_to_do()); if do_change { if todo { - for ((_rt, scyconf), chs) in rts.into_iter().zip(scyconfs.iter()).zip(chsa.iter_mut()) { + for ((_rt, scyconf), chs) in rts.clone().into_iter().zip(scyconfs.iter()).zip(chsa.iter_mut()) { if chs.has_to_do() { let scy2 = create_session_no_ks(scyconf).await?; let scy = &scy2; @@ -755,7 +759,7 @@ pub async fn migrate_scylla_data_schema_all_rt( } } } - let fut = migrate_scylla_data_schema_all_rt(scyconfs, false); + let fut = migrate_scylla_data_schema_all_rt(rts, scyconfs, false); Box::pin(fut).await?; Ok(()) } else { diff --git a/stats/mettdecl.rs b/stats/mettdecl.rs index 7bc3d18..44818b6 100644 --- a/stats/mettdecl.rs +++ b/stats/mettdecl.rs @@ -1,3 +1,15 @@ +mod Metrics { + type StructName = ScyllaJobTransform; + enum counters { + SeriesData, + SeriesMsp, + TimeBinSimpleF32V02, + BinWriteIndexV04, + Accounting, + AccountingRecv, + } +} + mod Metrics { type StructName = ScyllaInsertWorker; enum counters { @@ -10,6 +22,10 @@ mod Metrics { job_dt2, job_dt_net, } + mod Compose { + type Input = ScyllaJobTransform; + type Name = jobtrans; + } } mod Metrics { @@ -41,6 +57,8 @@ mod Metrics { pong_timeout, caget_timeout, caget_issued, + fn_handle_event_add_res, + fn_handle_read_notify_res, unknown_ioid, monitor_stale_read_begin, monitor_stale_read_timeout, @@ -81,6 +99,7 @@ mod Metrics { writer_ignore_monitor_not_min_quiet, writer_ignore_poll_not_min_quiet, writer_ignore_rate_cap, + emit_channel_status_item, } enum values { channel_all_count, diff --git a/stats/src/stats.rs b/stats/src/stats.rs index b968ae1..b29f0e5 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -1,5 +1,6 @@ pub mod mett; +pub use mettrics; pub use rand_xoshiro; use std::time::Duration;