diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 1e0cd03..6d45f63 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -1567,6 +1567,7 @@ pub async fn run(opts: CaIngestOpts, channels: Vec) -> Result<(), Error> index: opts.ttl_index(), d0: opts.ttl_d0(), d1: opts.ttl_d1(), + binned: opts.ttl_binned(), }, test_bsread_addr: opts.test_bsread_addr.clone(), }; diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 961efc2..5129c37 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -26,6 +26,8 @@ use crate::store::InsertItem; use crate::store::IvlItem; use crate::store::MuteItem; use crate::store::QueryItem; +use crate::store::TimeBinPatchSimpleF32; +use crate::timebin::ConnTimeBin; use async_channel::Sender; use err::Error; use futures_util::stream::FuturesUnordered; @@ -39,6 +41,7 @@ use items_0::Appendable; use items_0::Empty; use items_0::Events; use items_0::Resettable; +use items_2::binsdim0::BinsDim0; use items_2::eventsdim0::EventsDim0; use items_2::eventsdim1::EventsDim1; use log::*; @@ -450,12 +453,7 @@ pub struct CaConn { series_lookup_futs: FuturesUnordered< Pin), Error>> + Send>>, >, - events_acc: Box, - events_acc_push: Box, u64, &CaEventValue) -> Result<(), Error> + Send>, - events_acc_tick: - Box, &mut Box, &mut PatchCollect) -> Result<(), Error> + Send>, - events_binner: Option>, - patch_collect: PatchCollect, + conn_time_bin: ConnTimeBin, } impl CaConn { @@ -472,7 +470,7 @@ impl CaConn { let (cq_tx, cq_rx) = async_channel::bounded(32); Self { state: CaConnState::Unconnected, - ticker: Box::pin(tokio::time::sleep(Duration::from_millis(500))), + ticker: Self::new_self_ticker(), proto: None, cid_store: CidStore::new(), subid_store: SubidStore::new(), @@ -508,14 +506,14 @@ impl CaConn { channel_info_query_tx, series_lookup_schedule: BTreeMap::new(), series_lookup_futs: FuturesUnordered::new(), - events_acc: Box::new(()), - events_acc_push: Box::new(Self::event_acc_push::), - events_acc_tick: Box::new(Self::event_acc_tick::), - events_binner: None, - patch_collect: PatchCollect::new(TsNano(SEC * 10), 3), + conn_time_bin: ConnTimeBin::empty(), } } + fn new_self_ticker() -> Pin> { + Box::pin(tokio::time::sleep(Duration::from_millis(1000))) + } + pub fn get_channel_set_ops_map(&self) -> Arc { self.channel_set_ops.clone() } @@ -938,7 +936,7 @@ impl CaConn { // TODO handle error better! Transition channel to Error state? let scalar_type = ScalarType::from_ca_id(data_type)?; let shape = Shape::from_ca_count(data_count)?; - self.setup_event_acc(&scalar_type, &shape)?; + self.conn_time_bin.setup_for(series.clone(), &scalar_type, &shape)?; let subid = self.subid_store.next(); self.cid_by_subid.insert(subid, cid); let name = self.name_by_cid(cid).unwrap().to_string(); @@ -1049,173 +1047,6 @@ impl CaConn { } } - fn event_acc_push(acc: &mut Box, ts: u64, ev: &CaEventValue) -> Result<(), Error> - where - STY: ScalarOps, - CaDataValue: proto::GetValHelp, - { - let v = proto::GetValHelp::::get(&ev.data)?; - if let Some(c) = acc.downcast_mut::>() { - c.push(ts, 0, v.clone()); - Ok(()) - } else { - // TODO report once and error out - error!("unexpected container"); - //Err(Error::with_msg_no_trace("unexpected container")) - Ok(()) - } - } - - fn event_acc_tick( - acc: &mut Box, - tb: &mut Box, - patch_collect: &mut PatchCollect, - ) -> Result<(), Error> - where - STY: ScalarOps, - { - use items_0::WithLen; - if let Some(c) = acc.downcast_mut::>() { - if c.len() >= 1 { - //info!("push events len {}", c.len()); - tb.ingest(c); - c.reset(); - if tb.bins_ready_count() >= 1 { - info!("store bins len {}", tb.bins_ready_count()); - if let Some(mut bins) = tb.bins_ready() { - //info!("store bins {bins:?}"); - let mut bins = bins.to_simple_bins_f32(); - patch_collect.ingest(bins.as_mut())?; - Ok(()) - } else { - error!("have bins but none returned"); - Err(Error::with_msg_no_trace("have bins but none returned")) - } - } else { - Ok(()) - } - } else { - Ok(()) - } - } else { - error!("unexpected container"); - //Err(Error::with_msg_no_trace("unexpected container")) - Ok(()) - } - } - - fn setup_event_acc(&mut self, scalar_type: &ScalarType, shape: &Shape) -> Result<(), Error> { - use ScalarType::*; - let tsnow = SystemTime::now(); - let ts0 = SEC * tsnow.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); - let bin_len = TsNano(SEC * 10); - let range1 = BinnedRange { - bin_off: ts0 / bin_len.ns(), - bin_cnt: u64::MAX / bin_len.ns() - 10, - bin_len, - }; - let binrange = BinnedRangeEnum::Time(range1); - info!("binrange {binrange:?}"); - let do_time_weight = true; - match shape { - Shape::Scalar => { - type Cont = EventsDim0; - match scalar_type { - I8 => { - type ST = i8; - info!("SCALAR {}", std::any::type_name::()); - let cont = Cont::::empty(); - self.events_binner = - Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight)); - self.events_acc = Box::new(cont); - self.events_acc_push = Box::new(Self::event_acc_push::); - self.events_acc_tick = Box::new(Self::event_acc_tick::); - } - I16 => { - type ST = i16; - info!("SCALAR {}", std::any::type_name::()); - let cont = Cont::::empty(); - self.events_binner = - Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight)); - self.events_acc = Box::new(cont); - self.events_acc_push = Box::new(Self::event_acc_push::); - self.events_acc_tick = Box::new(Self::event_acc_tick::); - } - I32 => { - type ST = i32; - info!("SCALAR {}", std::any::type_name::()); - let cont = Cont::::empty(); - self.events_binner = - Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight)); - self.events_acc = Box::new(cont); - self.events_acc_push = Box::new(Self::event_acc_push::); - self.events_acc_tick = Box::new(Self::event_acc_tick::); - } - F32 => { - type ST = f32; - info!("SCALAR {}", std::any::type_name::()); - let cont = Cont::::empty(); - self.events_binner = - Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight)); - self.events_acc = Box::new(cont); - self.events_acc_push = Box::new(Self::event_acc_push::); - self.events_acc_tick = Box::new(Self::event_acc_tick::); - } - F64 => { - type ST = f64; - info!("SCALAR {}", std::any::type_name::()); - let cont = Cont::::empty(); - self.events_binner = - Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight)); - self.events_acc = Box::new(cont); - self.events_acc_push = Box::new(Self::event_acc_push::); - self.events_acc_tick = Box::new(Self::event_acc_tick::); - } - _ => { - warn!("TODO setup_event_acc {:?} {:?}", scalar_type, shape); - } - } - } - Shape::Wave(..) => { - type Cont = EventsDim1; - match scalar_type { - I8 => { - type ST = i8; - info!("DIM1 {}", std::any::type_name::()); - self.events_acc = Box::new(Cont::::empty()); - } - I16 => { - type ST = i16; - info!("DIM1 {}", std::any::type_name::()); - self.events_acc = Box::new(Cont::::empty()); - } - I32 => { - type ST = i32; - info!("DIM1 {}", std::any::type_name::()); - self.events_acc = Box::new(Cont::::empty()); - } - F32 => { - type ST = f32; - info!("DIM1 {}", std::any::type_name::()); - self.events_acc = Box::new(Cont::::empty()); - } - F64 => { - type ST = f64; - info!("DIM1 {}", std::any::type_name::()); - self.events_acc = Box::new(Cont::::empty()); - } - _ => { - warn!("TODO setup_event_acc {:?} {:?}", scalar_type, shape); - } - } - } - _ => { - warn!("TODO setup_event_acc {:?} {:?}", scalar_type, shape); - } - } - Ok(()) - } - fn event_add_insert( st: &mut CreatedState, series: SeriesId, @@ -1386,9 +1217,37 @@ impl CaConn { let item_queue = &mut self.insert_item_queue; let inserts_counter = &mut self.inserts_counter; let extra_inserts_conf = &self.extra_inserts_conf; - { - let (f, acc) = { (&self.events_acc_push, &mut self.events_acc) }; - f(acc, ts, &ev.value)?; + self.conn_time_bin.push(ts, &ev.value)?; + #[cfg(DISABLED)] + match &ev.value.data { + CaDataValue::Scalar(x) => match &x { + proto::CaDataScalarValue::F32(..) => match &scalar_type { + ScalarType::F32 => {} + _ => { + error!("MISMATCH got f32 exp {:?}", scalar_type); + } + }, + proto::CaDataScalarValue::F64(..) => match &scalar_type { + ScalarType::F64 => {} + _ => { + error!("MISMATCH got f64 exp {:?}", scalar_type); + } + }, + proto::CaDataScalarValue::I16(..) => match &scalar_type { + ScalarType::I16 => {} + _ => { + error!("MISMATCH got i16 exp {:?}", scalar_type); + } + }, + proto::CaDataScalarValue::I32(..) => match &scalar_type { + ScalarType::I32 => {} + _ => { + error!("MISMATCH got i32 exp {:?}", scalar_type); + } + }, + _ => {} + }, + _ => {} } Self::do_event_insert( st, @@ -1878,15 +1737,8 @@ impl CaConn { fn handle_own_ticker_tick(self: Pin<&mut Self>, _cx: &mut Context) -> Result<(), Error> { let this = self.get_mut(); - let (f, acc, tb, patch_collect) = { - ( - &this.events_acc_tick, - &mut this.events_acc, - this.events_binner.as_mut().unwrap(), - &mut this.patch_collect, - ) - }; - f(acc, tb, patch_collect)?; + let (obj, insert_item_queue) = { (&mut this.conn_time_bin, &mut this.insert_item_queue) }; + obj.tick(insert_item_queue)?; Ok(()) } } @@ -1913,7 +1765,7 @@ impl Stream for CaConn { return Ready(Some(Err(e))); } } - self.ticker = Box::pin(tokio::time::sleep(Duration::from_millis(500))); + self.ticker = Self::new_self_ticker(); cx.waker().wake_by_ref(); } Pending => {} diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index 6ae2bd6..582430f 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -177,9 +177,14 @@ impl GetValHelp for CaDataValue { match self { CaDataValue::Scalar(v) => match v { CaDataScalarValue::I8(v) => Ok(v), - _ => Err(Error::with_msg_no_trace("GetValHelp inner type mismatch")), + _ => { + let ty = std::any::type_name::(); + Err(Error::with_msg_no_trace(format!( + "GetValHelp inner type mismatch {ty} vs {v:?}", + ))) + } }, - _ => Err(Error::with_msg_no_trace("GetValHelp inner type mismatch")), + _ => Err(Error::with_msg_no_trace("GetValHelp waveform not supported")), } } } @@ -190,9 +195,14 @@ impl GetValHelp for CaDataValue { match self { CaDataValue::Scalar(v) => match v { CaDataScalarValue::I16(v) => Ok(v), - _ => Err(Error::with_msg_no_trace("GetValHelp inner type mismatch")), + _ => { + let ty = std::any::type_name::(); + Err(Error::with_msg_no_trace(format!( + "GetValHelp inner type mismatch {ty} vs {v:?}", + ))) + } }, - _ => Err(Error::with_msg_no_trace("GetValHelp inner type mismatch")), + _ => Err(Error::with_msg_no_trace("GetValHelp waveform not supported")), } } } @@ -203,9 +213,14 @@ impl GetValHelp for CaDataValue { match self { CaDataValue::Scalar(v) => match v { CaDataScalarValue::I32(v) => Ok(v), - _ => Err(Error::with_msg_no_trace("GetValHelp inner type mismatch")), + _ => { + let ty = std::any::type_name::(); + Err(Error::with_msg_no_trace(format!( + "GetValHelp inner type mismatch {ty} vs {v:?}", + ))) + } }, - _ => Err(Error::with_msg_no_trace("GetValHelp inner type mismatch")), + _ => Err(Error::with_msg_no_trace("GetValHelp waveform not supported")), } } } @@ -216,9 +231,14 @@ impl GetValHelp for CaDataValue { match self { CaDataValue::Scalar(v) => match v { CaDataScalarValue::F32(v) => Ok(v), - _ => Err(Error::with_msg_no_trace("GetValHelp inner type mismatch")), + _ => { + let ty = std::any::type_name::(); + Err(Error::with_msg_no_trace(format!( + "GetValHelp inner type mismatch {ty} vs {v:?}", + ))) + } }, - _ => Err(Error::with_msg_no_trace("GetValHelp inner type mismatch")), + _ => Err(Error::with_msg_no_trace("GetValHelp waveform not supported")), } } } @@ -229,9 +249,14 @@ impl GetValHelp for CaDataValue { match self { CaDataValue::Scalar(v) => match v { CaDataScalarValue::F64(v) => Ok(v), - _ => Err(Error::with_msg_no_trace("GetValHelp inner type mismatch")), + _ => { + let ty = std::any::type_name::(); + Err(Error::with_msg_no_trace(format!( + "GetValHelp inner type mismatch {ty} vs {v:?}", + ))) + } }, - _ => Err(Error::with_msg_no_trace("GetValHelp inner type mismatch")), + _ => Err(Error::with_msg_no_trace("GetValHelp waveform not supported")), } } } diff --git a/netfetch/src/ca/store.rs b/netfetch/src/ca/store.rs index da2739e..2a1699b 100644 --- a/netfetch/src/ca/store.rs +++ b/netfetch/src/ca/store.rs @@ -29,34 +29,10 @@ pub struct DataStore { pub qu_insert_channel_status: Arc, pub qu_insert_channel_status_by_ts_msp: Arc, pub qu_insert_channel_ping: Arc, + pub qu_insert_binned_scalar_f32_v01: Arc, } impl DataStore { - async fn has_table(name: &str, scy: &ScySession, scyconf: &ScyllaConfig) -> Result { - let mut res = scy - .query_iter( - "select table_name from system_schema.tables where keyspace_name = ?", - (&scyconf.keyspace,), - ) - .await - .map_err(|e| e.to_string()) - .map_err(Error::from)?; - while let Some(k) = res.next().await { - let row = k.map_err(|e| e.to_string()).map_err(Error::from)?; - if let Some(table_name) = row.columns[0].as_ref().unwrap().as_text() { - if table_name == name { - return Ok(true); - } - } - } - Ok(false) - } - - async fn migrate_00(scy: &ScySession, scyconf: &ScyllaConfig) -> Result<(), Error> { - if !Self::has_table("somename", scy, scyconf).await? {} - Ok(()) - } - pub async fn new(scyconf: &ScyllaConfig) -> Result { let scy = scylla::SessionBuilder::new() .known_nodes(&scyconf.hosts) @@ -72,8 +48,6 @@ impl DataStore { .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let scy = Arc::new(scy); - Self::migrate_00(&scy, scyconf).await?; - let q = scy .prepare("insert into ts_msp (series, ts_msp) values (?, ?) using ttl ?") .await @@ -186,6 +160,12 @@ impl DataStore { .await .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; let qu_insert_channel_ping = Arc::new(q); + + let q = scy + .prepare("insert into binned_scalar_f32_v01 (series, bin_len_sec, bin_count, off_msp, off_lsp, counts, mins, maxs, avgs) values (?, ?, ?, ?, ?, ?, ?, ?, ?) using ttl ?") + .await + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; + let qu_insert_binned_scalar_f32_v01 = Arc::new(q); let ret = Self { scy, qu_insert_ts_msp, @@ -208,6 +188,7 @@ impl DataStore { qu_insert_channel_status, qu_insert_channel_status_by_ts_msp, qu_insert_channel_ping, + qu_insert_binned_scalar_f32_v01, }; Ok(ret) } diff --git a/netfetch/src/conf.rs b/netfetch/src/conf.rs index 8d2c71d..b728a0a 100644 --- a/netfetch/src/conf.rs +++ b/netfetch/src/conf.rs @@ -40,6 +40,8 @@ pub struct CaIngestOpts { ttl_d0: Option, #[serde(with = "humantime_serde")] ttl_d1: Option, + #[serde(with = "humantime_serde")] + ttl_binned: Option, pub test_bsread_addr: Option, } @@ -123,6 +125,12 @@ impl CaIngestOpts { pub fn ttl_d1(&self) -> Duration { self.ttl_d1.clone().unwrap_or_else(|| Duration::from_secs(60 * 60 * 12)) } + + pub fn ttl_binned(&self) -> Duration { + self.ttl_binned + .clone() + .unwrap_or_else(|| Duration::from_secs(60 * 60 * 24 * 40)) + } } #[test] @@ -130,6 +138,7 @@ fn parse_config_minimal() { let conf = r###" backend: scylla ttl_d1: 10m 3s +ttl_binned: 70d api_bind: "0.0.0.0:3011" channels: /some/path/file.txt search: @@ -155,6 +164,7 @@ scylla: assert_eq!(conf.search.get(0), Some(&"172.26.0.255".to_string())); assert_eq!(conf.scylla.hosts.get(1), Some(&"sf-nube-12:19042".to_string())); assert_eq!(conf.ttl_d1, Some(Duration::from_millis(1000 * (60 * 10 + 3) + 45))); + assert_eq!(conf.ttl_binned, Some(Duration::from_secs(60 * 60 * 70))); } #[test] diff --git a/netfetch/src/insertworker.rs b/netfetch/src/insertworker.rs index a0d59e2..b9585a3 100644 --- a/netfetch/src/insertworker.rs +++ b/netfetch/src/insertworker.rs @@ -48,6 +48,7 @@ pub struct Ttls { pub index: Duration, pub d0: Duration, pub d1: Duration, + pub binned: Duration, } pub async fn spawn_scylla_insert_workers( @@ -268,7 +269,36 @@ pub async fn spawn_scylla_insert_workers( } } } - QueryItem::TimeBinPatch => {} + QueryItem::TimeBinPatchSimpleF32(item) => { + info!("have time bin patch to insert: {item:?}"); + let params = ( + item.series.id() as i64, + item.bin_len_sec as i32, + item.bin_count as i32, + item.off_msp as i32, + item.off_lsp as i32, + item.counts, + item.mins, + item.maxs, + item.avgs, + ttls.binned.as_secs() as i32, + ); + let qres = data_store + .scy + .execute(&data_store.qu_insert_binned_scalar_f32_v01, params) + .await; + match qres { + Ok(_) => { + stats.store_worker_insert_binned_done_inc(); + backoff = backoff_0; + } + Err(e) => { + let e = e.into_simpler(); + stats_inc_for_err(&stats, &e); + back_off_sleep(&mut backoff).await; + } + } + } } } ingest_commons diff --git a/netfetch/src/netfetch.rs b/netfetch/src/netfetch.rs index 6d9b7cf..d55b96b 100644 --- a/netfetch/src/netfetch.rs +++ b/netfetch/src/netfetch.rs @@ -18,4 +18,5 @@ pub mod series; pub mod store; #[cfg(test)] pub mod test; +pub mod timebin; pub mod zmtp; diff --git a/netfetch/src/patchcollect.rs b/netfetch/src/patchcollect.rs index 40b0914..c36ae0d 100644 --- a/netfetch/src/patchcollect.rs +++ b/netfetch/src/patchcollect.rs @@ -1,16 +1,18 @@ use err::Error; -use items_0::collect_s::Collectable; use items_0::timebin::TimeBinned; -use items_2::merger::Mergeable; use netpod::log::*; use netpod::timeunits::SEC; use netpod::TsNano; +use std::collections::VecDeque; +use std::mem; pub struct PatchCollect { patch_len: TsNano, bin_len: TsNano, + bin_count: u64, coll: Option>, locked: bool, + outq: VecDeque>, } impl PatchCollect { @@ -18,82 +20,104 @@ impl PatchCollect { Self { patch_len: TsNano(bin_len.0 * bin_count), bin_len, + bin_count, coll: None, locked: false, + outq: VecDeque::new(), } } + pub fn patch_len(&self) -> TsNano { + self.patch_len.clone() + } + + pub fn bin_len(&self) -> TsNano { + self.bin_len.clone() + } + + pub fn bin_count(&self) -> u64 { + self.bin_count + } + pub fn ingest(&mut self, item: &mut dyn TimeBinned) -> Result<(), Error> { - let coll = self.coll.get_or_insert_with(|| item.empty_like_self_box_time_binned()); let mut n1 = 0; + let mut item_len_exp = item.len(); loop { n1 += 1; if n1 > 20 { return Err(Error::with_msg_no_trace("patchcollect too many iterations")); } info!("ingest loop item len {}", item.len()); + if item.len() != item_len_exp { + return Err(Error::with_msg_no_trace(format!( + "patchcollect item_len_exp mismatch {} vs {}", + item.len(), + item_len_exp + ))); + } + if item.len() == 0 { + break; + } + let coll = self.coll.get_or_insert_with(|| item.empty_like_self_box_time_binned()); let (ts1s, ts2s) = item.edges_slice(); let mut discard = false; let mut emit = false; - let mut i1 = 0; + let i1 = 0; let mut i3 = item.len(); - let mut brk = false; for (i2, (ts1, ts2)) in ts1s.iter().zip(ts2s).enumerate() { info!("EDGE {}", ts1 / SEC); - if ts2 % self.patch_len.0 == 0 { - info!("FOUND PATCH END-EDGE at {}", ts1 / SEC); - if self.locked { - info!("LOCKED AND FINAL RECEIVED, EMIT"); - // do: drain into coll i1...i2 - // do: emit! + if self.locked { + if ts2 % self.patch_len.0 == 0 { + info!("FOUND PATCH EDGE-END at {}", ts2 / SEC); i3 = i2 + 1; emit = true; - brk = true; - } else { - info!("NOT LOCKED YET"); - // do: drain into NIRVANA i1...i2 can reset coll. - i3 = i2 + 1; + } + } else { + if ts1 % self.patch_len.0 == 0 { + info!("FOUND PATCH EDGE-BEG at {}", ts1 / SEC); + self.locked = true; + i3 = i2; discard = true; - brk = true; + break; } - } else if ts1 % self.patch_len.0 == 0 { - info!("FOUND PATCH BEG-EDGE at {}", ts1 / SEC); - if self.locked { - info!("LOCKED"); - if i2 > 0 { - error!("should never happen"); - // ??? do: drain into coll i1..i2 - } else { - // ??? nothing to do - } - } else { - // ??? do: drain into NIRVANA i1..i2 can reset coll. - } - } - if !self.locked && ts1 % self.patch_len.0 == 0 { - info!("NOT LOCKED YET, BUT NOW"); - self.locked = true; - // do: drain into NIRVANA i1..i2 can reset coll. - } - if brk { - break; } } - if discard { - item.drain_into_tb(coll.as_mut(), i1..i3)?; + if !self.locked { + info!("drain all"); + item_len_exp = 0; item.reset(); + } else if discard { + let range = i1..i3; + info!("discard range-len {}", range.len()); + item_len_exp -= range.len(); + item.drain_into_tb(coll.as_mut(), range)?; + coll.reset(); + } else if emit { + let range = i1..i3; + info!("take and emit range-len {}", range.len()); + item_len_exp -= range.len(); + item.drain_into_tb(coll.as_mut(), range)?; + if coll.len() != self.bin_count as usize { + error!("PatchCollect bin count mismatch {} vs {}", coll.len(), self.bin_count); + } + //info!("Patch EMIT {coll:?}"); + let k = self.coll.take().unwrap(); + self.outq.push_back(k); } else { + let range = i1..i3; + info!("take all range-len {}", range.len()); + item_len_exp = 0; + item.drain_into_tb(coll.as_mut(), range)?; } } - // TODO need some way to know the bin edges of the bins in the container. - // From there, derive a range of bins that I want to use. - // Need some generic way (like Collect) to take out the chosen range of bins - // into the local container which holds the current patch. - // From there, for the first iteration, need a function to convert the - // bin-container into a single standard TimeBins0 or TimeBins1 for everything. - - // PROBLEM need to handle BinsDim0 and BinsXbinDim0. - // Need to use some of those because the retrieval also needs Rust-types to work with them. Ok(()) } + + pub fn outq_len(&self) -> usize { + self.outq.len() + } + + pub fn take_outq(&mut self) -> VecDeque> { + mem::replace(&mut self.outq, VecDeque::new()) + } } diff --git a/netfetch/src/scylla.rs b/netfetch/src/scylla.rs index fd630d9..5717990 100644 --- a/netfetch/src/scylla.rs +++ b/netfetch/src/scylla.rs @@ -1,4 +1,5 @@ use err::Error; +use futures_util::StreamExt; use netpod::log::*; use netpod::ScyllaConfig; use scylla::execution_profile::ExecutionProfileBuilder; @@ -25,6 +26,29 @@ pub async fn create_session(scyconf: &ScyllaConfig) -> Result, Erro Ok(scy) } +async fn has_table(name: &str, scy: &Session, scyconf: &ScyllaConfig) -> Result { + let ks = scy + .get_keyspace() + .ok_or_else(|| Error::with_msg_no_trace("session is not using a keyspace yet"))?; + let mut res = scy + .query_iter( + "select table_name from system_schema.tables where keyspace_name = ?", + (ks.as_ref(),), + ) + .await + .map_err(|e| e.to_string()) + .map_err(Error::from)?; + while let Some(k) = res.next().await { + let row = k.map_err(|e| e.to_string()).map_err(Error::from)?; + if let Some(table_name) = row.columns[0].as_ref().unwrap().as_text() { + if table_name == name { + return Ok(true); + } + } + } + Ok(false) +} + async fn check_table_exist(name: &str, scy: &Session) -> Result { match scy.query(format!("select * from {} limit 1", name), ()).await { Ok(_) => Ok(true), @@ -254,21 +278,6 @@ pub async fn migrate_keyspace(scyconf: &ScyllaConfig) -> Result<(), Error> { .map_err(|e| Error::from(format!("{e}")))?; } } - { - let desc = GenTwcsTab { - name: "binned_scalar_f32_v00".into(), - cql: "(series bigint, bin_len_sec int, patch_len_sec int, off_msp bigint, off_lsp bigint, counts frozen>, avgs frozen>, mins frozen>, maxs frozen>, primary key ((series, bin_len_sec, patch_len_sec, off_msp), off_lsp))" - .into(), - default_time_to_live: 60 * 60 * 24 * 30, - compaction_window_size: 24 * 4, - }; - if !check_table_exist(&desc.name(), scy).await? { - scy.query(desc.cql(), ()) - .await - .map_err(|e| Error::from(format!("{e}")))?; - } - } - { let desc = GenTwcsTab { name: "muted".into(), @@ -295,5 +304,19 @@ pub async fn migrate_keyspace(scyconf: &ScyllaConfig) -> Result<(), Error> { .map_err(|e| Error::from(format!("{e}")))?; } } + { + let desc = GenTwcsTab { + name: "binned_scalar_f32_v01".into(), + cql: "(series bigint, bin_len_sec int, bin_count int, off_msp int, off_lsp int, counts frozen>, mins frozen>, maxs frozen>, avgs frozen>, primary key ((series, bin_len_sec, bin_count, off_msp), off_lsp))" + .into(), + default_time_to_live: 60 * 60 * 24 * 30, + compaction_window_size: 24 * 4, + }; + if !check_table_exist(&desc.name(), scy).await? { + scy.query(desc.cql(), ()) + .await + .map_err(|e| Error::from(format!("{e}")))?; + } + } Ok(()) } diff --git a/netfetch/src/store.rs b/netfetch/src/store.rs index 82d3048..31f17ee 100644 --- a/netfetch/src/store.rs +++ b/netfetch/src/store.rs @@ -4,6 +4,7 @@ use crate::errconv::ErrConv; use crate::series::SeriesId; use futures_util::Future; use futures_util::FutureExt; +use items_2::binsdim0::BinsDim0; use log::*; use netpod::ScalarType; use netpod::Shape; @@ -23,6 +24,7 @@ use std::time::Instant; use std::time::SystemTime; pub use netpod::CONNECTION_STATUS_DIV; +use std::collections::VecDeque; #[derive(Debug)] pub enum Error { @@ -293,6 +295,19 @@ pub struct ChannelInfoItem { pub evsize: u32, } +#[derive(Debug)] +pub struct TimeBinPatchSimpleF32 { + pub series: SeriesId, + pub bin_len_sec: u32, + pub bin_count: u32, + pub off_msp: u32, + pub off_lsp: u32, + pub counts: Vec, + pub mins: Vec, + pub maxs: Vec, + pub avgs: Vec, +} + #[derive(Debug)] pub enum QueryItem { ConnectionStatus(ConnectionStatusItem), @@ -301,7 +316,7 @@ pub enum QueryItem { Mute(MuteItem), Ivl(IvlItem), ChannelInfo(ChannelInfoItem), - TimeBinPatch, + TimeBinPatchSimpleF32(TimeBinPatchSimpleF32), } pub struct CommonInsertItemQueueSender { diff --git a/netfetch/src/timebin.rs b/netfetch/src/timebin.rs new file mode 100644 index 0000000..4d0f704 --- /dev/null +++ b/netfetch/src/timebin.rs @@ -0,0 +1,299 @@ +use crate::ca::proto; +use crate::ca::proto::CaDataValue; +use crate::ca::proto::CaEventValue; +use crate::patchcollect::PatchCollect; +use crate::series::SeriesId; +use crate::store::QueryItem; +use crate::store::TimeBinPatchSimpleF32; +use err::Error; +use items_0::scalar_ops::ScalarOps; +use items_0::timebin::TimeBinner; +use items_0::Appendable; +use items_0::Empty; +use items_0::Events; +use items_0::Resettable; +use items_2::binsdim0::BinsDim0; +use items_2::eventsdim0::EventsDim0; +use items_2::eventsdim1::EventsDim1; +use netpod::log::*; +use netpod::timeunits::SEC; +use netpod::BinnedRange; +use netpod::BinnedRangeEnum; +use netpod::ScalarType; +use netpod::Shape; +use netpod::TsNano; +use std::any; +use std::any::Any; +use std::collections::VecDeque; +use std::time::SystemTime; + +struct TickParams<'a> { + series: SeriesId, + acc: &'a mut Box, + tb: &'a mut Box, + pc: &'a mut PatchCollect, + iiq: &'a mut VecDeque, +} + +pub struct ConnTimeBin { + did_setup: bool, + series: SeriesId, + acc: Box, + push_fn: Box, u64, &CaEventValue) -> Result<(), Error> + Send>, + tick_fn: Box Result<(), Error> + Send>, + events_binner: Option>, + patch_collect: PatchCollect, +} + +impl ConnTimeBin { + pub fn empty() -> Self { + Self { + did_setup: false, + series: SeriesId::new(0), + acc: Box::new(()), + push_fn: Box::new(push::), + tick_fn: Box::new(tick::), + events_binner: None, + patch_collect: PatchCollect::new(TsNano(SEC * 60), 1), + } + } + + pub fn setup_for(&mut self, series: SeriesId, scalar_type: &ScalarType, shape: &Shape) -> Result<(), Error> { + use ScalarType::*; + self.series = series; + let tsnow = SystemTime::now(); + let ts0 = SEC * tsnow.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); + let bin_len = self.patch_collect.bin_len(); + let range1 = BinnedRange { + bin_off: ts0 / bin_len.ns(), + bin_cnt: u64::MAX / bin_len.ns() - 10, + bin_len, + }; + let binrange = BinnedRangeEnum::Time(range1); + //info!("binrange {binrange:?}"); + let do_time_weight = true; + match shape { + Shape::Scalar => { + type Cont = EventsDim0; + match scalar_type { + I8 => { + type ST = i8; + info!("SCALAR {}", any::type_name::()); + let cont = Cont::::empty(); + self.events_binner = + Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight)); + self.acc = Box::new(cont); + self.push_fn = Box::new(push::); + self.tick_fn = Box::new(tick::); + self.did_setup = true; + } + I16 => { + type ST = i16; + info!("SCALAR {}", std::any::type_name::()); + let cont = Cont::::empty(); + self.events_binner = + Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight)); + self.acc = Box::new(cont); + self.push_fn = Box::new(push::); + self.tick_fn = Box::new(tick::); + self.did_setup = true; + } + I32 => { + type ST = i32; + info!("SCALAR {}", std::any::type_name::()); + let cont = Cont::::empty(); + self.events_binner = + Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight)); + self.acc = Box::new(cont); + self.push_fn = Box::new(push::); + self.tick_fn = Box::new(tick::); + self.did_setup = true; + } + F32 => { + type ST = f32; + info!("SCALAR {}", std::any::type_name::()); + let cont = Cont::::empty(); + self.events_binner = + Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight)); + self.acc = Box::new(cont); + self.push_fn = Box::new(push::); + self.tick_fn = Box::new(tick::); + self.did_setup = true; + } + F64 => { + type ST = f64; + info!("SCALAR {}", std::any::type_name::()); + let cont = Cont::::empty(); + self.events_binner = + Some(cont.as_time_binnable_ref().time_binner_new(binrange, do_time_weight)); + self.acc = Box::new(cont); + self.push_fn = Box::new(push::); + self.tick_fn = Box::new(tick::); + self.did_setup = true; + } + _ => { + warn!("TODO setup_event_acc {:?} {:?}", scalar_type, shape); + } + } + } + Shape::Wave(..) => { + //type Cont = EventsDim1; + match scalar_type { + _ => { + warn!("TODO setup_event_acc {:?} {:?}", scalar_type, shape); + } + } + } + _ => { + warn!("TODO setup_event_acc {:?} {:?}", scalar_type, shape); + } + } + Ok(()) + } + + pub fn push(&mut self, ts: u64, value: &CaEventValue) -> Result<(), Error> { + if !self.did_setup { + //return Err(Error::with_msg_no_trace("ConnTimeBin not yet set up")); + return Ok(()); + } + let (f, acc) = (&self.push_fn, &mut self.acc); + f(self.series.clone(), acc, ts, value) + } + + pub fn tick(&mut self, insert_item_queue: &mut VecDeque) -> Result<(), Error> { + if !self.did_setup { + return Ok(()); + } + let (f,) = (&self.tick_fn,); + let params = TickParams { + series: self.series.clone(), + acc: &mut self.acc, + tb: self.events_binner.as_mut().unwrap(), + pc: &mut self.patch_collect, + iiq: insert_item_queue, + }; + f(params) + } +} + +fn store_patch(series: SeriesId, pc: &mut PatchCollect, iiq: &mut VecDeque) -> Result<(), Error> { + for item in pc.take_outq() { + if let Some(k) = item.as_any_ref().downcast_ref::>() { + let ts0 = if let Some(x) = k.ts1s.front() { + *x + } else { + return Err(Error::with_msg_no_trace("patch contains no bins")); + }; + let off = ts0 / pc.patch_len().0; + let off_msp = off / 1000; + let off_lsp = off % 1000; + let item = TimeBinPatchSimpleF32 { + series: series.clone(), + bin_len_sec: (pc.bin_len().ns() / SEC) as u32, + bin_count: pc.bin_count() as u32, + off_msp: off_msp as u32, + off_lsp: off_lsp as u32, + counts: k.counts.iter().map(|x| *x as i64).collect(), + mins: k.mins.iter().map(|x| *x).collect(), + maxs: k.maxs.iter().map(|x| *x).collect(), + avgs: k.avgs.iter().map(|x| *x).collect(), + }; + let item = QueryItem::TimeBinPatchSimpleF32(item); + iiq.push_back(item); + } else { + error!("unexpected container!"); + return Err(Error::with_msg_no_trace("timebin store_patch unexpected container")); + } + } + Ok(()) +} + +fn push(series: SeriesId, acc: &mut Box, ts: u64, ev: &CaEventValue) -> Result<(), Error> +where + STY: ScalarOps, + CaDataValue: proto::GetValHelp, +{ + let v = match proto::GetValHelp::::get(&ev.data) { + Ok(x) => x, + Err(e) => { + let msg = format!( + "GetValHelp mismatch: series {:?} STY {} data {:?}", + series, + any::type_name::(), + ev.data + ); + error!("{msg}"); + return Err(Error::with_msg_no_trace(msg)); + } + }; + if let Some(c) = acc.downcast_mut::>() { + c.push(ts, 0, v.clone()); + Ok(()) + } else { + // TODO report once and error out + error!("unexpected container"); + //Err(Error::with_msg_no_trace("unexpected container")) + Ok(()) + } +} + +fn tick(params: TickParams) -> Result<(), Error> +where + STY: ScalarOps, +{ + use items_0::WithLen; + let acc = params.acc; + let tb = params.tb; + let pc = params.pc; + let iiq = params.iiq; + if let Some(c) = acc.downcast_mut::>() { + if c.len() >= 1 { + //info!("push events len {}", c.len()); + tb.ingest(c); + c.reset(); + if tb.bins_ready_count() >= 1 { + info!("store bins len {}", tb.bins_ready_count()); + if let Some(mut bins) = tb.bins_ready() { + //info!("store bins {bins:?}"); + let mut bins = bins.to_simple_bins_f32(); + pc.ingest(bins.as_mut())?; + if pc.outq_len() != 0 { + store_patch(params.series.clone(), pc, iiq)?; + for item in pc.take_outq() { + if let Some(k) = item.as_any_ref().downcast_ref::>() { + //let off_msp = + let item = TimeBinPatchSimpleF32 { + series: params.series.clone(), + bin_len_sec: (pc.bin_len().ns() / SEC) as u32, + bin_count: pc.bin_count() as u32, + off_msp: 0, + off_lsp: 0, + counts: k.counts.iter().map(|x| *x as i64).collect(), + mins: k.mins.iter().map(|x| *x).collect(), + maxs: k.maxs.iter().map(|x| *x).collect(), + avgs: k.avgs.iter().map(|x| *x).collect(), + }; + let item = QueryItem::TimeBinPatchSimpleF32(item); + iiq.push_back(item); + } else { + error!("unexpected container!"); + } + } + } + Ok(()) + } else { + error!("have bins but none returned"); + Err(Error::with_msg_no_trace("have bins but none returned")) + } + } else { + Ok(()) + } + } else { + Ok(()) + } + } else { + error!("unexpected container"); + //Err(Error::with_msg_no_trace("unexpected container")) + Ok(()) + } +} diff --git a/stats/src/stats.rs b/stats/src/stats.rs index 593105b..2ccdabb 100644 --- a/stats/src/stats.rs +++ b/stats/src/stats.rs @@ -226,6 +226,7 @@ stats_proc::stats_struct!(( store_worker_fraction_drop, store_worker_ratelimit_drop, store_worker_insert_done, + store_worker_insert_binned_done, store_worker_insert_overload, store_worker_insert_timeout, store_worker_insert_unavailable,