diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 1302641..1e0cd03 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -66,7 +66,8 @@ const SEARCH_PENDING_TIMEOUT: Duration = Duration::from_millis(30000); const SEARCH_PENDING_TIMEOUT_WARN: Duration = Duration::from_millis(8000); const FINDER_TIMEOUT: Duration = Duration::from_millis(100); const CHANNEL_CHECK_INTERVAL: Duration = Duration::from_millis(5000); -const PRINT_ACTIVE_INTERVAL: Duration = Duration::from_millis(8000); +const PRINT_ACTIVE_INTERVAL: Duration = Duration::from_millis(60000); +const PRINT_STATUS_INTERVAL: Duration = Duration::from_millis(20000); const DO_ASSIGN_TO_CA_CONN: bool = true; @@ -1073,7 +1074,7 @@ impl Daemon { async fn check_caconn_chans(&mut self) -> Result<(), Error> { if self.caconn_last_channel_check.elapsed() > CHANNEL_CHECK_INTERVAL { - info!("Issue channel check to all CaConn"); + debug!("Issue channel check to all CaConn"); self.ingest_commons .ca_conn_set .enqueue_command_to_all(|| ConnCommand::check_health()) @@ -1133,7 +1134,7 @@ impl Daemon { if dt > Duration::from_millis(500) { info!("slow check_chans {}ms", dt.as_secs_f32() * 1e3); } - if tsnow.duration_since(self.last_status_print).unwrap_or(Duration::ZERO) >= Duration::from_millis(1000) { + if tsnow.duration_since(self.last_status_print).unwrap_or(Duration::ZERO) >= PRINT_STATUS_INTERVAL { self.last_status_print = tsnow; info!( "{:8} {:8} {:8} : {:8} : {:8} {:8} : {:10}", @@ -1543,6 +1544,8 @@ pub async fn run(opts: CaIngestOpts, channels: Vec) -> Result<(), Error> netfetch::dbpg::schema_check(opts.postgresql()).await?; + netfetch::scylla::migrate_keyspace(opts.scylla()).await?; + // TODO use a new stats type: //let store_stats = Arc::new(CaConnStats::new()); //let metrics_agg_fut = metrics_agg_task(ingest_commons.clone(), local_stats.clone(), store_stats.clone()); diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 83cc1f6..961efc2 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -11,6 +11,7 @@ use crate::batchquery::series_by_channel::ChannelInfoQuery; use crate::bsread::ChannelDescDecoded; use crate::ca::proto::CreateChan; use crate::ca::proto::EventAdd; +use crate::patchcollect::PatchCollect; use crate::series::ChannelStatusSeriesId; use crate::series::Existence; use crate::series::SeriesId; @@ -33,14 +34,20 @@ use futures_util::FutureExt; use futures_util::Stream; use futures_util::StreamExt; use items_0::scalar_ops::ScalarOps; +use items_0::timebin::TimeBinner; use items_0::Appendable; use items_0::Empty; +use items_0::Events; +use items_0::Resettable; use items_2::eventsdim0::EventsDim0; use items_2::eventsdim1::EventsDim1; use log::*; use netpod::timeunits::*; +use netpod::BinnedRange; +use netpod::BinnedRangeEnum; use netpod::ScalarType; use netpod::Shape; +use netpod::TsNano; use netpod::TS_MSP_GRID_SPACING; use netpod::TS_MSP_GRID_UNIT; use serde::Serialize; @@ -444,7 +451,11 @@ pub struct CaConn { Pin), Error>> + Send>>, >, events_acc: Box, - events_acc_func: Box Result<(), Error> + Send>, + events_acc_push: Box, u64, &CaEventValue) -> Result<(), Error> + Send>, + events_acc_tick: + Box, &mut Box, &mut PatchCollect) -> Result<(), Error> + Send>, + events_binner: Option>, + patch_collect: PatchCollect, } impl CaConn { @@ -498,7 +509,10 @@ impl CaConn { series_lookup_schedule: BTreeMap::new(), series_lookup_futs: FuturesUnordered::new(), events_acc: Box::new(()), - events_acc_func: Box::new(Self::event_acc_push::), + events_acc_push: Box::new(Self::event_acc_push::), + events_acc_tick: Box::new(Self::event_acc_tick::), + events_binner: None, + patch_collect: PatchCollect::new(TsNano(SEC * 10), 3), } } @@ -823,7 +837,7 @@ impl CaConn { } else { self.ioc_ping_start = Some(Instant::now()); if let Some(proto) = &mut self.proto { - info!("push echo to {}", self.remote_addr_dbg); + debug!("push echo to {}", self.remote_addr_dbg); let msg = CaMsg { ty: CaMsgTy::Echo }; proto.push_out(msg); } else { @@ -1035,49 +1049,127 @@ impl CaConn { } } - fn event_acc_push(this: &mut CaConn, ts: u64, ev: CaEventValue) -> Result<(), Error> + fn event_acc_push(acc: &mut Box, ts: u64, ev: &CaEventValue) -> Result<(), Error> where STY: ScalarOps, CaDataValue: proto::GetValHelp, { let v = proto::GetValHelp::::get(&ev.data)?; - if let Some(c) = this.events_acc.downcast_mut::>() { + if let Some(c) = acc.downcast_mut::>() { c.push(ts, 0, v.clone()); - // TODO check for a max length. - // Also check at every check-tick. + Ok(()) + } else { + // TODO report once and error out + error!("unexpected container"); + //Err(Error::with_msg_no_trace("unexpected container")) + Ok(()) + } + } + + fn event_acc_tick( + acc: &mut Box, + tb: &mut Box, + patch_collect: &mut PatchCollect, + ) -> Result<(), Error> + where + STY: ScalarOps, + { + use items_0::WithLen; + if let Some(c) = acc.downcast_mut::>() { + if c.len() >= 1 { + //info!("push events len {}", c.len()); + tb.ingest(c); + c.reset(); + if tb.bins_ready_count() >= 1 { + info!("store bins len {}", tb.bins_ready_count()); + if let Some(mut bins) = tb.bins_ready() { + //info!("store bins {bins:?}"); + let mut bins = bins.to_simple_bins_f32(); + patch_collect.ingest(bins.as_mut())?; + Ok(()) + } else { + error!("have bins but none returned"); + Err(Error::with_msg_no_trace("have bins but none returned")) + } + } else { + Ok(()) + } + } else { + Ok(()) + } + } else { + error!("unexpected container"); + //Err(Error::with_msg_no_trace("unexpected container")) + Ok(()) } - Ok(()) } fn setup_event_acc(&mut self, scalar_type: &ScalarType, shape: &Shape) -> Result<(), Error> { use ScalarType::*; + let tsnow = SystemTime::now(); + let ts0 = SEC * tsnow.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); + let bin_len = TsNano(SEC * 10); + let range1 = BinnedRange { + bin_off: ts0 / bin_len.ns(), + bin_cnt: u64::MAX / bin_len.ns() - 10, + bin_len, + }; + let binrange = BinnedRangeEnum::Time(range1); + info!("binrange {binrange:?}"); + let do_time_weight = true; match shape { Shape::Scalar => { type Cont = EventsDim0; match scalar_type { I8 => { - self.events_acc = Box::new(Cont::::empty()); + type ST = i8; + info!("SCALAR {}", std::any::type_name::()); + let cont = Cont::::empty(); + self.events_binner = + Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight)); + self.events_acc = Box::new(cont); + self.events_acc_push = Box::new(Self::event_acc_push::); + self.events_acc_tick = Box::new(Self::event_acc_tick::); } I16 => { - self.events_acc = Box::new(Cont::::empty()); + type ST = i16; + info!("SCALAR {}", std::any::type_name::()); + let cont = Cont::::empty(); + self.events_binner = + Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight)); + self.events_acc = Box::new(cont); + self.events_acc_push = Box::new(Self::event_acc_push::); + self.events_acc_tick = Box::new(Self::event_acc_tick::); } I32 => { type ST = i32; - self.events_acc = Box::new(Cont::::empty()); - let f: Box Result<(), Error>> = - Box::new(Self::event_acc_push::); + info!("SCALAR {}", std::any::type_name::()); + let cont = Cont::::empty(); + self.events_binner = + Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight)); + self.events_acc = Box::new(cont); + self.events_acc_push = Box::new(Self::event_acc_push::); + self.events_acc_tick = Box::new(Self::event_acc_tick::); } F32 => { type ST = f32; - self.events_acc = Box::new(Cont::::empty()); - let f: Box Result<(), Error>> = - Box::new(Self::event_acc_push::); + info!("SCALAR {}", std::any::type_name::()); + let cont = Cont::::empty(); + self.events_binner = + Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight)); + self.events_acc = Box::new(cont); + self.events_acc_push = Box::new(Self::event_acc_push::); + self.events_acc_tick = Box::new(Self::event_acc_tick::); } F64 => { type ST = f64; - self.events_acc = Box::new(Cont::::empty()); - let f: Box Result<(), Error>> = - Box::new(Self::event_acc_push::); + info!("SCALAR {}", std::any::type_name::()); + let cont = Cont::::empty(); + self.events_binner = + Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight)); + self.events_acc = Box::new(cont); + self.events_acc_push = Box::new(Self::event_acc_push::); + self.events_acc_tick = Box::new(Self::event_acc_tick::); } _ => { warn!("TODO setup_event_acc {:?} {:?}", scalar_type, shape); @@ -1088,18 +1180,28 @@ impl CaConn { type Cont = EventsDim1; match scalar_type { I8 => { + type ST = i8; + info!("DIM1 {}", std::any::type_name::()); self.events_acc = Box::new(Cont::::empty()); } I16 => { + type ST = i16; + info!("DIM1 {}", std::any::type_name::()); self.events_acc = Box::new(Cont::::empty()); } I32 => { + type ST = i32; + info!("DIM1 {}", std::any::type_name::()); self.events_acc = Box::new(Cont::::empty()); } F32 => { + type ST = f32; + info!("DIM1 {}", std::any::type_name::()); self.events_acc = Box::new(Cont::::empty()); } F64 => { + type ST = f64; + info!("DIM1 {}", std::any::type_name::()); self.events_acc = Box::new(Cont::::empty()); } _ => { @@ -1284,6 +1386,10 @@ impl CaConn { let item_queue = &mut self.insert_item_queue; let inserts_counter = &mut self.inserts_counter; let extra_inserts_conf = &self.extra_inserts_conf; + { + let (f, acc) = { (&self.events_acc_push, &mut self.events_acc) }; + f(acc, ts, &ev.value)?; + } Self::do_event_insert( st, series, @@ -1769,6 +1875,20 @@ impl CaConn { }; Self::apply_channel_ops_with_res(res) } + + fn handle_own_ticker_tick(self: Pin<&mut Self>, _cx: &mut Context) -> Result<(), Error> { + let this = self.get_mut(); + let (f, acc, tb, patch_collect) = { + ( + &this.events_acc_tick, + &mut this.events_acc, + this.events_binner.as_mut().unwrap(), + &mut this.patch_collect, + ) + }; + f(acc, tb, patch_collect)?; + Ok(()) + } } impl Stream for CaConn { @@ -1785,6 +1905,14 @@ impl Stream for CaConn { self.poll_channel_info_results(cx); match self.ticker.poll_unpin(cx) { Ready(()) => { + match self.as_mut().handle_own_ticker_tick(cx) { + Ok(_) => {} + Err(e) => { + error!("{e}"); + self.trigger_shutdown(ChannelStatusClosedReason::InternalError); + return Ready(Some(Err(e))); + } + } self.ticker = Box::pin(tokio::time::sleep(Duration::from_millis(500))); cx.waker().wake_by_ref(); } diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index 0c75d13..6ae2bd6 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -171,6 +171,32 @@ pub trait GetValHelp { fn get(&self) -> Result<&Self::ScalTy, Error>; } +impl GetValHelp for CaDataValue { + type ScalTy = i8; + fn get(&self) -> Result<&Self::ScalTy, Error> { + match self { + CaDataValue::Scalar(v) => match v { + CaDataScalarValue::I8(v) => Ok(v), + _ => Err(Error::with_msg_no_trace("GetValHelp inner type mismatch")), + }, + _ => Err(Error::with_msg_no_trace("GetValHelp inner type mismatch")), + } + } +} + +impl GetValHelp for CaDataValue { + type ScalTy = i16; + fn get(&self) -> Result<&Self::ScalTy, Error> { + match self { + CaDataValue::Scalar(v) => match v { + CaDataScalarValue::I16(v) => Ok(v), + _ => Err(Error::with_msg_no_trace("GetValHelp inner type mismatch")), + }, + _ => Err(Error::with_msg_no_trace("GetValHelp inner type mismatch")), + } + } +} + impl GetValHelp for CaDataValue { type ScalTy = i32; fn get(&self) -> Result<&Self::ScalTy, Error> { diff --git a/netfetch/src/insertworker.rs b/netfetch/src/insertworker.rs index 92519b3..a0d59e2 100644 --- a/netfetch/src/insertworker.rs +++ b/netfetch/src/insertworker.rs @@ -268,6 +268,7 @@ pub async fn spawn_scylla_insert_workers( } } } + QueryItem::TimeBinPatch => {} } } ingest_commons diff --git a/netfetch/src/netfetch.rs b/netfetch/src/netfetch.rs index f712f98..6d9b7cf 100644 --- a/netfetch/src/netfetch.rs +++ b/netfetch/src/netfetch.rs @@ -11,7 +11,9 @@ pub mod insertworker; pub mod linuxhelper; pub mod metrics; pub mod netbuf; +pub mod patchcollect; pub mod rt; +pub mod scylla; pub mod series; pub mod store; #[cfg(test)] diff --git a/netfetch/src/patchcollect.rs b/netfetch/src/patchcollect.rs new file mode 100644 index 0000000..40b0914 --- /dev/null +++ b/netfetch/src/patchcollect.rs @@ -0,0 +1,99 @@ +use err::Error; +use items_0::collect_s::Collectable; +use items_0::timebin::TimeBinned; +use items_2::merger::Mergeable; +use netpod::log::*; +use netpod::timeunits::SEC; +use netpod::TsNano; + +pub struct PatchCollect { + patch_len: TsNano, + bin_len: TsNano, + coll: Option>, + locked: bool, +} + +impl PatchCollect { + pub fn new(bin_len: TsNano, bin_count: u64) -> Self { + Self { + patch_len: TsNano(bin_len.0 * bin_count), + bin_len, + coll: None, + locked: false, + } + } + + pub fn ingest(&mut self, item: &mut dyn TimeBinned) -> Result<(), Error> { + let coll = self.coll.get_or_insert_with(|| item.empty_like_self_box_time_binned()); + let mut n1 = 0; + loop { + n1 += 1; + if n1 > 20 { + return Err(Error::with_msg_no_trace("patchcollect too many iterations")); + } + info!("ingest loop item len {}", item.len()); + let (ts1s, ts2s) = item.edges_slice(); + let mut discard = false; + let mut emit = false; + let mut i1 = 0; + let mut i3 = item.len(); + let mut brk = false; + for (i2, (ts1, ts2)) in ts1s.iter().zip(ts2s).enumerate() { + info!("EDGE {}", ts1 / SEC); + if ts2 % self.patch_len.0 == 0 { + info!("FOUND PATCH END-EDGE at {}", ts1 / SEC); + if self.locked { + info!("LOCKED AND FINAL RECEIVED, EMIT"); + // do: drain into coll i1...i2 + // do: emit! + i3 = i2 + 1; + emit = true; + brk = true; + } else { + info!("NOT LOCKED YET"); + // do: drain into NIRVANA i1...i2 can reset coll. + i3 = i2 + 1; + discard = true; + brk = true; + } + } else if ts1 % self.patch_len.0 == 0 { + info!("FOUND PATCH BEG-EDGE at {}", ts1 / SEC); + if self.locked { + info!("LOCKED"); + if i2 > 0 { + error!("should never happen"); + // ??? do: drain into coll i1..i2 + } else { + // ??? nothing to do + } + } else { + // ??? do: drain into NIRVANA i1..i2 can reset coll. + } + } + if !self.locked && ts1 % self.patch_len.0 == 0 { + info!("NOT LOCKED YET, BUT NOW"); + self.locked = true; + // do: drain into NIRVANA i1..i2 can reset coll. + } + if brk { + break; + } + } + if discard { + item.drain_into_tb(coll.as_mut(), i1..i3)?; + item.reset(); + } else { + } + } + // TODO need some way to know the bin edges of the bins in the container. + // From there, derive a range of bins that I want to use. + // Need some generic way (like Collect) to take out the chosen range of bins + // into the local container which holds the current patch. + // From there, for the first iteration, need a function to convert the + // bin-container into a single standard TimeBins0 or TimeBins1 for everything. + + // PROBLEM need to handle BinsDim0 and BinsXbinDim0. + // Need to use some of those because the retrieval also needs Rust-types to work with them. + Ok(()) + } +} diff --git a/netfetch/src/scylla.rs b/netfetch/src/scylla.rs new file mode 100644 index 0000000..fd630d9 --- /dev/null +++ b/netfetch/src/scylla.rs @@ -0,0 +1,299 @@ +use err::Error; +use netpod::log::*; +use netpod::ScyllaConfig; +use scylla::execution_profile::ExecutionProfileBuilder; +use scylla::statement::Consistency; +use scylla::transport::errors::DbError; +use scylla::transport::errors::QueryError; +use scylla::Session; +use std::sync::Arc; + +pub async fn create_session(scyconf: &ScyllaConfig) -> Result, Error> { + let scy = scylla::SessionBuilder::new() + .known_nodes(&scyconf.hosts) + .use_keyspace(&scyconf.keyspace, true) + .default_execution_profile_handle( + ExecutionProfileBuilder::default() + .consistency(Consistency::LocalOne) + .build() + .into_handle(), + ) + .build() + .await + .map_err(|e| Error::from(format!("{e}")))?; + let scy = Arc::new(scy); + Ok(scy) +} + +async fn check_table_exist(name: &str, scy: &Session) -> Result { + match scy.query(format!("select * from {} limit 1", name), ()).await { + Ok(_) => Ok(true), + Err(e) => match &e { + QueryError::DbError(e2, msg) => match e2 { + DbError::Invalid => { + if msg.contains("unconfigured table") { + Ok(false) + } else { + Err(Error::from(format!("{e}"))) + } + } + _ => Err(Error::from(format!("{e}"))), + }, + _ => Err(Error::from(format!("{e}"))), + }, + } +} + +async fn create_table_ts_msp(scy: &Session) -> Result<(), Error> { + use std::fmt::Write; + // seconds: + let default_time_to_live = 60 * 60 * 5; + // hours: + let twcs_window_index = 24 * 4; + let mut s = String::new(); + s.write_str("create table ts_msp (series bigint, ts_msp bigint, primary key (series, ts_msp))")?; + write!(s, " with default_time_to_live = {}", default_time_to_live)?; + s.write_str(" and compaction = { 'class': 'TimeWindowCompactionStrategy'")?; + s.write_str(", 'compaction_window_unit': 'HOURS'")?; + write!(s, ", 'compaction_window_size': {}", twcs_window_index)?; + s.write_str(" }")?; + scy.query(s, ()).await.map_err(|e| Error::from(format!("{e}")))?; + Ok(()) +} + +struct GenTwcsTab { + name: String, + cql: String, + default_time_to_live: usize, + compaction_window_size: usize, +} + +impl GenTwcsTab { + fn name(&self) -> String { + self.name.clone() + } + + fn cql(&self) -> String { + use std::fmt::Write; + let mut s = String::new(); + write!(s, "create table {}", self.name()).unwrap(); + s.write_str(&self.cql).unwrap(); + write!(s, " with default_time_to_live = {}", self.default_time_to_live).unwrap(); + s.write_str(" and compaction = { 'class': 'TimeWindowCompactionStrategy', 'compaction_window_unit': 'HOURS'") + .unwrap(); + write!(s, ", 'compaction_window_size': {}", self.compaction_window_size).unwrap(); + s.write_str(" }").unwrap(); + s + } +} + +struct EvTabDim0 { + sty: String, + cqlsty: String, + // SCYLLA_TTL_EVENTS_DIM0 + default_time_to_live: usize, + // TWCS_WINDOW_0D + compaction_window_size: usize, +} + +impl EvTabDim0 { + fn name(&self) -> String { + format!("events_scalar_{}", self.sty) + } + + fn cql(&self) -> String { + use std::fmt::Write; + let mut s = String::new(); + write!(s, "create table {}", self.name()).unwrap(); + write!(s, " (series bigint, ts_msp bigint, ts_lsp bigint, pulse bigint, value {}, primary key ((series, ts_msp), ts_lsp))", self.cqlsty).unwrap(); + write!(s, " with default_time_to_live = {}", self.default_time_to_live).unwrap(); + s.write_str(" and compaction = { 'class': 'TimeWindowCompactionStrategy', 'compaction_window_unit': 'HOURS'") + .unwrap(); + write!(s, ", 'compaction_window_size': {}", self.compaction_window_size).unwrap(); + s.write_str(" }").unwrap(); + s + } +} + +struct EvTabDim1 { + sty: String, + cqlsty: String, + // SCYLLA_TTL_EVENTS_DIM1 + default_time_to_live: usize, + // TWCS_WINDOW_1D + compaction_window_size: usize, +} + +impl EvTabDim1 { + fn name(&self) -> String { + format!("events_array_{}", self.sty) + } + + fn cql(&self) -> String { + use std::fmt::Write; + let mut s = String::new(); + write!(s, "create table {}", self.name()).unwrap(); + write!(s, " (series bigint, ts_msp bigint, ts_lsp bigint, pulse bigint, value {}, primary key ((series, ts_msp), ts_lsp))", self.cqlsty).unwrap(); + write!(s, " with default_time_to_live = {}", self.default_time_to_live).unwrap(); + s.write_str(" and compaction = { 'class': 'TimeWindowCompactionStrategy', 'compaction_window_unit': 'HOURS'") + .unwrap(); + write!(s, ", 'compaction_window_size': {}", self.compaction_window_size).unwrap(); + s.write_str(" }").unwrap(); + s + } +} + +async fn check_event_tables(scy: &Session) -> Result<(), Error> { + let stys = [ + "u8", "u16", "u32", "u64", "i8", "i16", "i32", "i64", "f32", "f64", "bool", "string", + ]; + let cqlstys = [ + "tinyint", "smallint", "int", "bigint", "tinyint", "smallint", "int", "bigint", "float", "double", "boolean", + "text", + ]; + for (sty, cqlsty) in stys.into_iter().zip(cqlstys) { + let desc = EvTabDim0 { + sty: sty.into(), + cqlsty: cqlsty.into(), + // ttl is set in actual data inserts + default_time_to_live: 60 * 60 * 1, + compaction_window_size: 48, + }; + if !check_table_exist(&desc.name(), scy).await? { + scy.query(desc.cql(), ()) + .await + .map_err(|e| Error::from(format!("{e}")))?; + } + let desc = EvTabDim1 { + sty: sty.into(), + cqlsty: format!("frozen>", cqlsty), + // ttl is set in actual data inserts + default_time_to_live: 60 * 60 * 1, + compaction_window_size: 12, + }; + if !check_table_exist(&desc.name(), scy).await? { + scy.query(desc.cql(), ()) + .await + .map_err(|e| Error::from(format!("{e}")))?; + } + } + Ok(()) +} + +pub async fn migrate_keyspace(scyconf: &ScyllaConfig) -> Result<(), Error> { + let scy2 = create_session(scyconf).await?; + let scy = &scy2; + if !check_table_exist("ts_msp", &scy).await? { + create_table_ts_msp(scy).await?; + } + check_event_tables(scy).await?; + { + let desc = GenTwcsTab { + name: "series_by_ts_msp".into(), + cql: "(part int, ts_msp int, shape_kind int, scalar_type int, series bigint, primary key ((part, ts_msp, shape_kind, scalar_type), series))".into(), + default_time_to_live: 60 * 60 * 5, + compaction_window_size: 24 * 4, + }; + if !check_table_exist(&desc.name(), scy).await? { + scy.query(desc.cql(), ()) + .await + .map_err(|e| Error::from(format!("{e}")))?; + } + } + { + let desc = GenTwcsTab { + name: "connection_status".into(), + cql: "(ts_msp bigint, ts_lsp bigint, kind int, addr text, primary key (ts_msp, ts_lsp))".into(), + default_time_to_live: 60 * 60 * 1, + compaction_window_size: 24 * 4, + }; + if !check_table_exist(&desc.name(), scy).await? { + scy.query(desc.cql(), ()) + .await + .map_err(|e| Error::from(format!("{e}")))?; + } + } + { + let desc = GenTwcsTab { + name: "channel_status".into(), + cql: "(series bigint, ts_msp bigint, ts_lsp bigint, kind int, primary key ((series, ts_msp), ts_lsp))" + .into(), + default_time_to_live: 60 * 60 * 1, + compaction_window_size: 24 * 4, + }; + if !check_table_exist(&desc.name(), scy).await? { + scy.query(desc.cql(), ()) + .await + .map_err(|e| Error::from(format!("{e}")))?; + } + } + { + let desc = GenTwcsTab { + name: "channel_status_by_ts_msp".into(), + cql: "(ts_msp bigint, ts_lsp bigint, series bigint, kind int, primary key (ts_msp, ts_lsp))".into(), + default_time_to_live: 60 * 60 * 1, + compaction_window_size: 24 * 4, + }; + if !check_table_exist(&desc.name(), scy).await? { + scy.query(desc.cql(), ()) + .await + .map_err(|e| Error::from(format!("{e}")))?; + } + } + { + let desc = GenTwcsTab { + name: "channel_ping".into(), + cql: "(part int, ts_msp int, series bigint, ivl float, interest float, evsize int, primary key ((part, ts_msp), series))" + .into(), + default_time_to_live: 60 * 60 * 1, + compaction_window_size: 24 * 4, + }; + if !check_table_exist(&desc.name(), scy).await? { + scy.query(desc.cql(), ()) + .await + .map_err(|e| Error::from(format!("{e}")))?; + } + } + { + let desc = GenTwcsTab { + name: "binned_scalar_f32_v00".into(), + cql: "(series bigint, bin_len_sec int, patch_len_sec int, off_msp bigint, off_lsp bigint, counts frozen>, avgs frozen>, mins frozen>, maxs frozen>, primary key ((series, bin_len_sec, patch_len_sec, off_msp), off_lsp))" + .into(), + default_time_to_live: 60 * 60 * 24 * 30, + compaction_window_size: 24 * 4, + }; + if !check_table_exist(&desc.name(), scy).await? { + scy.query(desc.cql(), ()) + .await + .map_err(|e| Error::from(format!("{e}")))?; + } + } + + { + let desc = GenTwcsTab { + name: "muted".into(), + cql: "(part int, series bigint, ts bigint, ema float, emd float, primary key (part, series, ts))".into(), + default_time_to_live: 60 * 60 * 4, + compaction_window_size: 24 * 1, + }; + if !check_table_exist(&desc.name(), scy).await? { + scy.query(desc.cql(), ()) + .await + .map_err(|e| Error::from(format!("{e}")))?; + } + } + { + let desc = GenTwcsTab { + name: "item_recv_ivl".into(), + cql: "(part int, series bigint, ts bigint, ema float, emd float, primary key (part, series, ts))".into(), + default_time_to_live: 60 * 60 * 4, + compaction_window_size: 24 * 1, + }; + if !check_table_exist(&desc.name(), scy).await? { + scy.query(desc.cql(), ()) + .await + .map_err(|e| Error::from(format!("{e}")))?; + } + } + Ok(()) +} diff --git a/netfetch/src/store.rs b/netfetch/src/store.rs index a51c304..82d3048 100644 --- a/netfetch/src/store.rs +++ b/netfetch/src/store.rs @@ -301,6 +301,7 @@ pub enum QueryItem { Mute(MuteItem), Ivl(IvlItem), ChannelInfo(ChannelInfoItem), + TimeBinPatch, } pub struct CommonInsertItemQueueSender {