From 259504aa65fde8dd287b44edd6d0eebdbb0685f0 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 21 Mar 2024 16:10:50 +0100 Subject: [PATCH] Refactor time type --- daqingest/src/bin/daqingest.rs | 6 +- daqingest/src/daemon.rs | 2 +- daqingest/src/daemon/inserthook.rs | 18 +- daqingest/src/tools.rs | 19 +- ingest-bsread/src/bsreadclient.rs | 17 +- netfetch/src/ca/conn.rs | 78 ++----- netfetch/src/ca/connset.rs | 10 - netfetch/src/ca/proto.rs | 1 - scywr/Cargo.toml | 2 + scywr/src/insertworker.rs | 164 +++++---------- scywr/src/iteminsertqueue.rs | 315 +++++++++++++++++------------ scywr/src/schema.rs | 181 ++++++++++------- scywr/src/session.rs | 10 +- scywr/src/store.rs | 28 +-- series/src/series.rs | 12 +- serieswriter/src/timebin.rs | 3 +- serieswriter/src/writer.rs | 48 ++--- 17 files changed, 465 insertions(+), 449 deletions(-) diff --git a/daqingest/src/bin/daqingest.rs b/daqingest/src/bin/daqingest.rs index cefab86..768ef0f 100644 --- a/daqingest/src/bin/daqingest.rs +++ b/daqingest/src/bin/daqingest.rs @@ -58,9 +58,9 @@ async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> { name: k.pg_name, }; let scyconf = ScyllaIngestConfig::new([k.scylla_host], k.scylla_keyspace); - scywr::schema::migrate_scylla_data_schema(&scyconf, netpod::ttl::RetentionTime::Short) - .await - .map_err(Error::from_string)?; + // scywr::schema::migrate_scylla_data_schema(&scyconf, netpod::ttl::RetentionTime::Short) + // .await + // .map_err(Error::from_string)?; match k.sub { DbSub::Data(u) => { use daqingest::opts::DbDataSub; diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 7a31d14..32c6102 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -572,7 +572,7 @@ pub async fn run(opts: CaIngestOpts, channels_config: Option) -> drop(pg); jh.await?.map_err(Error::from_string)?; - scywr::schema::migrate_scylla_data_schema(opts.scylla_config(), RetentionTime::Short) + scywr::schema::migrate_scylla_data_schema(opts.scylla_config(), 1, true, RetentionTime::Short) .await .map_err(Error::from_string)?; diff --git a/daqingest/src/daemon/inserthook.rs b/daqingest/src/daemon/inserthook.rs index f7e7786..51bd10f 100644 --- a/daqingest/src/daemon/inserthook.rs +++ b/daqingest/src/daemon/inserthook.rs @@ -64,11 +64,25 @@ pub async fn active_channel_insert_hook_worker(rx: Receiver, tx: Send all.sort_unstable(); info!("Active scalar"); for (c, sid, msp, lsp, pulse, _shape_kind) in all.iter().filter(|x| x.5 == 0).take(6) { - info!("{:10} {:20} {:14} {:20} {:?}", usize::MAX - c, msp, lsp, pulse, sid); + info!( + "{:10} {:20} {:14} {:20} {:?}", + usize::MAX - c, + msp.to_u64(), + lsp.ns(), + pulse, + sid + ); } info!("Active wave"); for (c, sid, msp, lsp, pulse, _shape_kind) in all.iter().filter(|x| x.5 == 1).take(6) { - info!("{:10} {:20} {:14} {:20} {:?}", usize::MAX - c, msp, lsp, pulse, sid); + info!( + "{:10} {:20} {:14} {:20} {:?}", + usize::MAX - c, + msp.to_u64(), + lsp.ns(), + pulse, + sid + ); } histo.clear(); } diff --git a/daqingest/src/tools.rs b/daqingest/src/tools.rs index 9cb02be..b6b21dc 100644 --- a/daqingest/src/tools.rs +++ b/daqingest/src/tools.rs @@ -10,6 +10,7 @@ use log::*; use netpod::Database; use netpod::ScalarType; use netpod::Shape; +use netpod::TsMs; use scywr::config::ScyllaIngestConfig; use scywr::scylla::prepared_statement::PreparedStatement; use scywr::scylla::transport::errors::QueryError; @@ -35,8 +36,8 @@ pub async fn remove_older( scyconf: &ScyllaIngestConfig, ) -> Result<(), Error> { let date_cut = parse_date_str(¶ms.date)?; - let ts_cut = date_to_ts_ns(date_cut); - debug!("chosen date is {:?} {}", date_cut, ts_cut); + let ts_cut = TsMs::from_ns_u64(date_to_ts_ns(date_cut)); + debug!("chosen date is {:?} {:?}", date_cut, ts_cut); let (pg, _) = dbpg::conn::make_pg_client(pgconf).await?; let scy = scywr::session::create_session(scyconf).await?; let sql = concat!( @@ -63,7 +64,7 @@ async fn remove_older_series( series: u64, scalar_type: &ScalarType, shape: &Shape, - ts_cut: u64, + ts_cut: TsMs, _pg: &PgClient, scy: &ScySession, ) -> Result<(), Error> { @@ -73,16 +74,16 @@ async fn remove_older_series( let it = scy .query_iter( "select ts_msp from ts_msp where series = ? and ts_msp < ?", - (series as i64, ts_cut as i64), + (series as i64, ts_cut.to_i64()), ) .await?; type RowType = (i64,); let mut it = it.into_typed::(); while let Some(e) = it.next().await { let row = e?; - let ts_msp = row.0 as u64; + let ts_msp = row.0; debug!("remove ts_msp {}", ts_msp); - let res = scy.execute(&qu_delete, (series as i64, ts_msp as i64)).await?; + let res = scy.execute(&qu_delete, (series as i64, ts_msp)).await?; { // informative if let Some(rows) = res.rows { @@ -105,9 +106,9 @@ pub async fn find_older_msp( scyconf: &ScyllaIngestConfig, ) -> Result<(), Error> { let date_cut = parse_date_str(¶ms.date)?; - let ts_cut = date_to_ts_ns(date_cut); - debug!("chosen date is {:?} {}", date_cut, ts_cut); - let (pg, _) = dbpg::conn::make_pg_client(pgconf).await?; + let ts_cut = TsMs::from_ns_u64(date_to_ts_ns(date_cut)); + debug!("chosen date is {:?} {:?}", date_cut, ts_cut); + let (_pg, _jh) = dbpg::conn::make_pg_client(pgconf).await?; let scy = scywr::session::create_session(scyconf).await?; let table_name = ¶ms.table_name; let cql = format!( diff --git a/ingest-bsread/src/bsreadclient.rs b/ingest-bsread/src/bsreadclient.rs index 0d41058..d930d48 100644 --- a/ingest-bsread/src/bsreadclient.rs +++ b/ingest-bsread/src/bsreadclient.rs @@ -17,10 +17,10 @@ use futures_util::StreamExt; use netpod::log::*; use netpod::timeunits::HOUR; use netpod::timeunits::SEC; +use netpod::DtNano; use netpod::ScalarType; use netpod::Shape; -use netpod::TS_MSP_GRID_SPACING; -use netpod::TS_MSP_GRID_UNIT; +use netpod::TsMs; use scywr::iteminsertqueue::ArrayValue; use scywr::iteminsertqueue::DataValue; use scywr::iteminsertqueue::InsertItem; @@ -171,8 +171,12 @@ impl BsreadClient { self.inserted_in_ts_msp_count += 1; (self.ts_msp_last, false) }; + if true { + todo!("rework grid handling"); + } let ts_lsp = ts - ts_msp; - let ts_msp_grid = (ts / TS_MSP_GRID_UNIT / TS_MSP_GRID_SPACING * TS_MSP_GRID_SPACING) as u32; + let ts_msp_grid = 0; + // let ts_msp_grid = (ts / TS_MSP_GRID_UNIT / TS_MSP_GRID_SPACING * TS_MSP_GRID_SPACING) as u32; let ts_msp_grid = if self.ts_msp_grid_last != ts_msp_grid { self.ts_msp_grid_last = ts_msp_grid; Some(ts_msp_grid) @@ -181,15 +185,14 @@ impl BsreadClient { }; let item = InsertItem { series: series.into(), - ts_msp, - ts_lsp, + ts_msp: TsMs::from_ns_u64(ts_msp), + ts_lsp: DtNano::from_ns(ts_lsp), msp_bump: ts_msp_changed, - ts_msp_grid, pulse, scalar_type, shape, val: DataValue::Array(ArrayValue::Bool(evtset)), - ts_local: ts, + ts_local: err::todoval(), }; let item = QueryItem::Insert(item); match self.insqtx.send(item).await { diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 74083d7..49327f3 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -21,6 +21,7 @@ use log::*; use netpod::timeunits::*; use netpod::ScalarType; use netpod::Shape; +use netpod::TsMs; use netpod::TsNano; use netpod::EMIT_ACCOUNTING_SNAP; use proto::CaItem; @@ -32,15 +33,12 @@ use proto::EventAdd; use scywr::iteminsertqueue as scywriiq; use scywr::iteminsertqueue::Accounting; use scywr::iteminsertqueue::DataValue; -use scywriiq::ChannelInfoItem; +use scywr::iteminsertqueue::QueryItem; use scywriiq::ChannelStatus; use scywriiq::ChannelStatusClosedReason; use scywriiq::ChannelStatusItem; use scywriiq::ConnectionStatus; use scywriiq::ConnectionStatusItem; -use scywriiq::IvlItem; -use scywriiq::MuteItem; -use scywriiq::QueryItem; use serde::Serialize; use series::ChannelStatusSeriesId; use series::SeriesId; @@ -71,6 +69,7 @@ use tokio::net::TcpStream; const CONNECTING_TIMEOUT: Duration = Duration::from_millis(6000); const IOC_PING_IVL: Duration = Duration::from_millis(80000); +const DO_RATE_CHECK: bool = false; #[allow(unused)] macro_rules! trace2 { @@ -303,16 +302,14 @@ struct CreatedState { insert_item_ivl_ema: IntervalEma, item_recv_ivl_ema: IntervalEma, insert_recv_ivl_last: Instant, - insert_next_earliest: Instant, muted_before: u32, - insert_ivl_min_mus: u32, info_store_msp_last: u32, recv_count: u64, recv_bytes: u64, stwin_ts: u64, stwin_count: u32, stwin_bytes: u32, - account_emit_last: u64, + account_emit_last: TsMs, account_count: u64, account_bytes: u64, } @@ -334,16 +331,14 @@ impl CreatedState { insert_item_ivl_ema: IntervalEma::new(), item_recv_ivl_ema: IntervalEma::new(), insert_recv_ivl_last: tsnow, - insert_next_earliest: tsnow, muted_before: 0, - insert_ivl_min_mus: 0, info_store_msp_last: 0, recv_count: 0, recv_bytes: 0, stwin_ts: 0, stwin_count: 0, stwin_bytes: 0, - account_emit_last: 0, + account_emit_last: TsMs(0), account_count: 0, account_bytes: 0, } @@ -717,7 +712,6 @@ pub struct CaConn { remote_addr_dbg: SocketAddrV4, local_epics_hostname: String, stats: Arc, - insert_ivl_min_mus: u32, conn_command_tx: Pin>>, conn_command_rx: Pin>>, conn_backoff: f32, @@ -784,7 +778,6 @@ impl CaConn { remote_addr_dbg, local_epics_hostname, stats, - insert_ivl_min_mus: 1000 * 4, conn_command_tx: Box::pin(cq_tx), conn_command_rx: Box::pin(cq_rx), conn_backoff: 0.02, @@ -1262,14 +1255,6 @@ impl CaConn { let msp = info_store_msp_from_time(timenow.clone()); if msp != crst.info_store_msp_last { crst.info_store_msp_last = msp; - let item = QueryItem::ChannelInfo(ChannelInfoItem { - ts_msp: msp, - series: st.writer.sid(), - ivl: crst.item_recv_ivl_ema.ema().ema(), - interest: 0., - evsize: 0, - }); - self.insert_item_queue.push_back(item); } } ChannelState::Error(_) => { @@ -1359,7 +1344,7 @@ impl CaConn { crst.stwin_ts = stwin_ts; crst.stwin_count = 0; } - { + if DO_RATE_CHECK { crst.stwin_count += 1; crst.stwin_bytes += ev.payload_len; if crst.stwin_count > 30000 || crst.stwin_bytes > 1024 * 1024 * 500 { @@ -1587,56 +1572,33 @@ impl CaConn { let ts = value.ts; let ts_diff = ts.abs_diff(ts_local); stats.ca_ts_off().ingest((ts_diff / MS) as u32); - if tsnow >= crst.insert_next_earliest { + { { crst.account_count += 1; // TODO how do we account for bytes? Here, we also add 8 bytes for the timestamp. crst.account_bytes += 8 + payload_len as u64; - } - { crst.muted_before = 0; crst.insert_item_ivl_ema.tick(tsnow); - let em = crst.insert_item_ivl_ema.ema(); - let ema = em.ema(); - let ivl_min = (crst.insert_ivl_min_mus as f32) * 1e-6; - let dt = (ivl_min - ema).max(0.) / em.k(); - crst.insert_next_earliest = tsnow + Duration::from_micros((dt * 1e6) as u64); } Self::check_ev_value_data(&value.data, writer.scalar_type())?; { let val: DataValue = value.data.into(); writer.write(TsNano::from_ns(ts), TsNano::from_ns(ts_local), val, iiq)?; } - Ok(()) - } else { + } + if false { + // TODO record stats on drop with the new filter stats.channel_fast_item_drop.inc(); - // TODO - if false { + { if tsnow.duration_since(crst.insert_recv_ivl_last) >= Duration::from_millis(10000) { crst.insert_recv_ivl_last = tsnow; let ema = crst.insert_item_ivl_ema.ema(); - let item = IvlItem { - series: series.clone(), - ts, - ema: ema.ema(), - emd: ema.emv().sqrt(), - }; - iiq.push_back(QueryItem::Ivl(item)); - } - if false && crst.muted_before == 0 { - let ema = crst.insert_item_ivl_ema.ema(); - let item = MuteItem { - series: series.clone(), - ts, - ema: ema.ema(), - emd: ema.emv().sqrt(), - }; - iiq.push_back(QueryItem::Mute(item)); } + if crst.muted_before == 0 {} crst.muted_before = 1; } - Ok(()) } + Ok(()) } fn check_ev_value_data(data: &proto::CaDataValue, scalar_type: &ScalarType) -> Result<(), Error> { @@ -1989,16 +1951,14 @@ impl CaConn { insert_item_ivl_ema: IntervalEma::new(), item_recv_ivl_ema: IntervalEma::new(), insert_recv_ivl_last: tsnow, - insert_next_earliest: tsnow, muted_before: 0, - insert_ivl_min_mus: self.insert_ivl_min_mus, info_store_msp_last: info_store_msp_from_time(self.tmp_ts_poll), recv_count: 0, recv_bytes: 0, stwin_ts: 0, stwin_count: 0, stwin_bytes: 0, - account_emit_last: 0, + account_emit_last: TsMs::from_ms_u64(0), account_count: 0, account_bytes: 0, }; @@ -2244,15 +2204,15 @@ impl CaConn { fn emit_accounting(&mut self) -> Result<(), Error> { let stnow = self.tmp_ts_poll; - let ts_sec = stnow.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); - let ts_sec_snap = ts_sec / EMIT_ACCOUNTING_SNAP * EMIT_ACCOUNTING_SNAP; + let ts = TsMs::from_system_time(stnow); + let (msp, lsp) = ts.to_grid_02(EMIT_ACCOUNTING_SNAP); for (_k, chconf) in self.channels.iter_mut() { let st0 = &mut chconf.state; match st0 { ChannelState::Writable(st1) => { let ch = &mut st1.channel; - if ts_sec_snap != ch.account_emit_last { - ch.account_emit_last = ts_sec_snap; + if ch.account_emit_last != msp { + ch.account_emit_last = msp; if ch.account_count != 0 { let series_id = ch.cssid.id(); let count = ch.account_count as i64; @@ -2261,7 +2221,7 @@ impl CaConn { ch.account_bytes = 0; let item = QueryItem::Accounting(Accounting { part: (series_id & 0xff) as i32, - ts: ts_sec_snap as i64, + ts: msp, series: SeriesId::new(series_id), count, bytes, diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index ba62040..ac91559 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -36,7 +36,6 @@ use log::*; use netpod::ScalarType; use netpod::SeriesKind; use netpod::Shape; -use scywr::iteminsertqueue::ChannelInfoItem; use scywr::iteminsertqueue::ChannelStatusItem; use scywr::iteminsertqueue::QueryItem; use serde::Serialize; @@ -1111,15 +1110,6 @@ impl CaConnSet { } fn push_channel_status(&mut self, item: ChannelStatusItem) -> Result<(), Error> { - if false { - let _ = ChannelInfoItem { - ts_msp: todo!(), - series: todo!(), - ivl: todo!(), - interest: todo!(), - evsize: todo!(), - }; - } let item = QueryItem::ChannelStatus(item); let mut v = VecDeque::new(); v.push_back(item); diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index 793e785..f52ee52 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -13,7 +13,6 @@ use std::pin::Pin; use std::sync::Arc; use std::task::Context; use std::task::Poll; -use std::time::Duration; use std::time::Instant; use taskrun::tokio; use tokio::io::AsyncRead; diff --git a/scywr/Cargo.toml b/scywr/Cargo.toml index 0a08b56..301e13d 100644 --- a/scywr/Cargo.toml +++ b/scywr/Cargo.toml @@ -11,6 +11,7 @@ scylla = "0.11.0" smallvec = "1.11.0" pin-project = "1.1.3" stackfuture = "0.3.0" +bytes = "1.5.0" serde = { version = "1", features = ["derive"] } log = { path = "../log" } stats = { path = "../stats" } @@ -18,3 +19,4 @@ series = { path = "../series" } err = { path = "../../daqbuffer/crates/err" } netpod = { path = "../../daqbuffer/crates/netpod" } taskrun = { path = "../../daqbuffer/crates/taskrun" } +bitshuffle = { path = "../../daqbuffer/crates/bitshuffle" } diff --git a/scywr/src/insertworker.rs b/scywr/src/insertworker.rs index 44d7937..15240fc 100644 --- a/scywr/src/insertworker.rs +++ b/scywr/src/insertworker.rs @@ -13,24 +13,22 @@ use crate::iteminsertqueue::QueryItem; use crate::iteminsertqueue::TimeBinSimpleF32; use crate::store::DataStore; use async_channel::Receiver; +use atomic::AtomicU64; +use atomic::Ordering; use err::Error; use log::*; -use netpod::timeunits::MS; -use netpod::timeunits::SEC; use netpod::ttl::RetentionTime; +use netpod::TsMs; use smallvec::smallvec; use smallvec::SmallVec; use stats::InsertWorkerStats; use std::collections::VecDeque; use std::sync::atomic; -use std::sync::atomic::AtomicU64; -use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; -use std::time::Instant; use std::time::SystemTime; use taskrun::tokio; -use taskrun::tokio::task::JoinHandle; +use tokio::task::JoinHandle; #[allow(unused)] macro_rules! trace2 { @@ -53,9 +51,9 @@ fn stats_inc_for_err(stats: &stats::InsertWorkerStats, err: &crate::iteminsertqu Error::DbUnavailable => { stats.db_unavailable().inc(); } - Error::DbError(e) => { - if false { - warn!("db error {e}"); + Error::DbError(_) => { + if true { + warn!("db error {err}"); } stats.db_error().inc(); } @@ -184,25 +182,17 @@ async fn worker( } }, QueryItem::Insert(item) => { - let item_ts_local = item.ts_local.clone(); - let tsnow = { - let ts = SystemTime::now(); - let epoch = ts.duration_since(std::time::UNIX_EPOCH).unwrap(); - epoch.as_secs() * SEC + epoch.subsec_nanos() as u64 - }; - let dt = ((tsnow / 1000000) as u32).saturating_sub((item_ts_local / 1000000) as u32); + let item_ts_local = item.ts_local; + let tsnow = TsMs::from_system_time(SystemTime::now()); + let dt = tsnow.to_u64().saturating_sub(item_ts_local.to_u64()) as u32; stats.item_lat_net_worker().ingest(dt); let insert_frac = insert_worker_opts.insert_frac.load(Ordering::Acquire); let do_insert = i1 % 1000 < insert_frac; match insert_item(item, &data_store, do_insert, &stats).await { Ok(_) => { stats.inserted_values().inc(); - let tsnow = { - let ts = SystemTime::now(); - let epoch = ts.duration_since(std::time::UNIX_EPOCH).unwrap(); - epoch.as_secs() * SEC + epoch.subsec_nanos() as u64 - }; - let dt = ((tsnow / 1000000) as u32).saturating_sub((item_ts_local / 1000000) as u32); + let tsnow = TsMs::from_system_time(SystemTime::now()); + let dt = tsnow.to_u64().saturating_sub(item_ts_local.to_u64()) as u32; stats.item_lat_net_store().ingest(dt); backoff = backoff_0; } @@ -213,70 +203,6 @@ async fn worker( } i1 += 1; } - QueryItem::Mute(item) => { - let values = ( - (item.series.id() & 0xff) as i32, - item.series.id() as i64, - item.ts as i64, - item.ema, - item.emd, - ); - let qu = err::todoval(); - let qres = data_store.scy.execute(&qu, values).await; - match qres { - Ok(_) => { - stats.inserted_mute().inc(); - backoff = backoff_0; - } - Err(e) => { - stats_inc_for_err(&stats, &crate::iteminsertqueue::Error::QueryError(e)); - back_off_sleep(&mut backoff).await; - } - } - } - QueryItem::Ivl(item) => { - let values = ( - (item.series.id() & 0xff) as i32, - item.series.id() as i64, - item.ts as i64, - item.ema, - item.emd, - ); - let qu = err::todoval(); - let qres = data_store.scy.execute(&qu, values).await; - match qres { - Ok(_) => { - stats.inserted_interval().inc(); - backoff = backoff_0; - } - Err(e) => { - stats_inc_for_err(&stats, &crate::iteminsertqueue::Error::QueryError(e)); - back_off_sleep(&mut backoff).await; - } - } - } - QueryItem::ChannelInfo(item) => { - let params = ( - (item.series.id() & 0xff) as i32, - item.ts_msp as i32, - item.series.id() as i64, - item.ivl, - item.interest, - item.evsize as i32, - ); - let qu = err::todoval(); - let qres = data_store.scy.execute(&qu, params).await; - match qres { - Ok(_) => { - stats.inserted_channel_info().inc(); - backoff = backoff_0; - } - Err(e) => { - stats_inc_for_err(&stats, &crate::iteminsertqueue::Error::QueryError(e)); - back_off_sleep(&mut backoff).await; - } - } - } QueryItem::TimeBinSimpleF32(item) => { info!("have time bin patch to insert: {item:?}"); return Err(Error::with_msg_no_trace("TODO insert item old path")); @@ -310,15 +236,35 @@ async fn worker_streamed( let mut stream = item_inp .map(|batch| { stats.item_recv.inc(); - let tsnow_u64 = { - let ts = SystemTime::now(); - let epoch = ts.duration_since(std::time::UNIX_EPOCH).unwrap(); - epoch.as_secs() * SEC + epoch.subsec_nanos() as u64 - }; + let tsnow = TsMs::from_system_time(SystemTime::now()); let mut res = Vec::with_capacity(32); for item in batch { + if false { + match &item { + QueryItem::ConnectionStatus(_) => { + debug!("execute ConnectionStatus"); + } + QueryItem::ChannelStatus(_) => { + debug!("execute ChannelStatus"); + } + QueryItem::Insert(item) => { + debug!( + "execute Insert {:?} {:?} {:?}", + item.series, + item.ts_msp, + item.val.shape() + ); + } + QueryItem::TimeBinSimpleF32(_) => { + debug!("execute TimeBinSimpleF32"); + } + QueryItem::Accounting(_) => { + debug!("execute Accounting"); + } + } + } let futs = match item { - QueryItem::Insert(item) => prepare_query_insert_futs(item, &data_store, &stats, tsnow_u64), + QueryItem::Insert(item) => prepare_query_insert_futs(item, &data_store, &stats, tsnow), QueryItem::ConnectionStatus(item) => { stats.inserted_connection_status().inc(); let fut = insert_connection_status_fut(item, &data_store, stats.clone()); @@ -328,15 +274,8 @@ async fn worker_streamed( stats.inserted_channel_status().inc(); insert_channel_status_fut(item, &data_store, stats.clone()) } - QueryItem::TimeBinSimpleF32(item) => { - prepare_timebin_insert_futs(item, &data_store, &stats, tsnow_u64) - } - QueryItem::Accounting(item) => prepare_accounting_insert_futs(item, &data_store, &stats, tsnow_u64), - _ => { - // TODO - debug!("TODO insert item {item:?}"); - SmallVec::new() - } + QueryItem::TimeBinSimpleF32(item) => prepare_timebin_insert_futs(item, &data_store, &stats, tsnow), + QueryItem::Accounting(item) => prepare_accounting_insert_futs(item, &data_store, &stats, tsnow), }; res.extend(futs.into_iter()); } @@ -382,11 +321,11 @@ fn prepare_query_insert_futs( item: InsertItem, data_store: &Arc, stats: &Arc, - tsnow_u64: u64, + tsnow: TsMs, ) -> SmallVec<[InsertFut; 4]> { stats.inserts_value().inc(); let item_ts_local = item.ts_local; - let dt = ((tsnow_u64 / 1000000) as u32).saturating_sub((item_ts_local / 1000000) as u32); + let dt = tsnow.to_u64().saturating_sub(item_ts_local.to_u64()) as u32; stats.item_lat_net_worker().ingest(dt); let msp_bump = item.msp_bump; let series = item.series.clone(); @@ -396,6 +335,7 @@ fn prepare_query_insert_futs( let fut = insert_item_fut(item, &data_store, do_insert, stats); futs.push(fut); if msp_bump { + // debug!("execute MSP bump"); stats.inserts_msp().inc(); let fut = insert_msp_fut( series, @@ -414,13 +354,13 @@ fn prepare_timebin_insert_futs( item: TimeBinSimpleF32, data_store: &Arc, stats: &Arc, - tsnow_u64: u64, + tsnow: TsMs, ) -> SmallVec<[InsertFut; 4]> { // debug!("have time bin patch to insert: {item:?}"); let params = ( item.series.id() as i64, item.bin_len_ms, - item.ts_msp, + item.ts_msp.to_i64(), item.off, item.count, item.min, @@ -433,7 +373,7 @@ fn prepare_timebin_insert_futs( data_store.scy.clone(), data_store.qu_insert_binned_scalar_f32_v02.clone(), params, - tsnow_u64, + tsnow, stats.clone(), ); let futs = smallvec![fut]; @@ -456,14 +396,20 @@ fn prepare_accounting_insert_futs( item: Accounting, data_store: &Arc, stats: &Arc, - tsnow_u64: u64, + tsnow: TsMs, ) -> SmallVec<[InsertFut; 4]> { - let params = (item.part, item.ts, item.series.id() as i64, item.count, item.bytes); + let params = ( + item.part, + item.ts.to_i64(), + item.series.id() as i64, + item.count, + item.bytes, + ); let fut = InsertFut::new( data_store.scy.clone(), data_store.qu_account_00.clone(), params, - tsnow_u64, + tsnow, stats.clone(), ); let futs = smallvec![fut]; diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index a90688f..e2a270f 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -2,13 +2,15 @@ pub use netpod::CONNECTION_STATUS_DIV; use crate::session::ScySession; use crate::store::DataStore; +use bytes::BufMut; use err::thiserror; use err::ThisError; use futures_util::Future; use futures_util::FutureExt; -use netpod::timeunits::SEC; +use netpod::DtNano; use netpod::ScalarType; use netpod::Shape; +use netpod::TsMs; use scylla::frame::value::Value; use scylla::frame::value::ValueList; use scylla::prepared_statement::PreparedStatement; @@ -28,9 +30,7 @@ use std::ptr::NonNull; use std::sync::Arc; use std::task::Context; use std::task::Poll; -use std::time::Duration; use std::time::SystemTime; -use std::time::UNIX_EPOCH; #[derive(Debug, ThisError)] pub enum Error { @@ -55,6 +55,21 @@ pub enum ScalarValue { Bool(bool), } +impl ScalarValue { + pub fn byte_size(&self) -> u32 { + match self { + ScalarValue::I8(_) => 1, + ScalarValue::I16(_) => 2, + ScalarValue::I32(_) => 4, + ScalarValue::F32(_) => 4, + ScalarValue::F64(_) => 8, + ScalarValue::Enum(_) => 2, + ScalarValue::String(x) => x.len() as u32, + ScalarValue::Bool(_) => 1, + } + } +} + #[derive(Clone, Debug)] pub enum ArrayValue { I8(Vec), @@ -65,12 +80,127 @@ pub enum ArrayValue { Bool(Vec), } +impl ArrayValue { + pub fn len(&self) -> usize { + use ArrayValue::*; + match self { + I8(a) => a.len(), + I16(a) => a.len(), + I32(a) => a.len(), + F32(a) => a.len(), + F64(a) => a.len(), + Bool(a) => a.len(), + } + } + + pub fn byte_size(&self) -> u32 { + use ArrayValue::*; + match self { + I8(a) => 1 * a.len() as u32, + I16(a) => 2 * a.len() as u32, + I32(a) => 4 * a.len() as u32, + F32(a) => 4 * a.len() as u32, + F64(a) => 8 * a.len() as u32, + Bool(a) => 1 * a.len() as u32, + } + } + + pub fn to_binary_blob(&self) -> Vec { + use ArrayValue::*; + match self { + I8(a) => { + let n = self.byte_size(); + let mut blob = Vec::with_capacity(32 + n as usize); + for _ in 0..4 { + blob.put_u64_le(0); + } + for &x in a { + blob.put_i8(x); + } + blob + } + I16(a) => { + let n = self.byte_size(); + let mut blob = Vec::with_capacity(32 + n as usize); + for _ in 0..4 { + blob.put_u64_le(0); + } + for &x in a { + blob.put_i16_le(x); + } + blob + } + I32(a) => { + let n = self.byte_size(); + let mut blob = Vec::with_capacity(32 + n as usize); + for _ in 0..4 { + blob.put_u64_le(0); + } + for &x in a { + blob.put_i32_le(x); + } + blob + } + F32(a) => { + let n = self.byte_size(); + let mut blob = Vec::with_capacity(32 + n as usize); + for _ in 0..4 { + blob.put_u64_le(0); + } + for &x in a { + blob.put_f32_le(x); + } + blob + } + F64(a) => { + let n = self.byte_size(); + let mut blob = Vec::with_capacity(32 + n as usize); + for _ in 0..4 { + blob.put_u64_le(0); + } + for &x in a { + blob.put_f64_le(x); + } + blob + } + Bool(a) => { + let n = self.byte_size(); + let mut blob = Vec::with_capacity(32 + n as usize); + for _ in 0..4 { + blob.put_u64_le(0); + } + for &x in a { + let x = if x { 1 } else { 0 }; + blob.put_u8(x); + } + blob + } + } + } +} + #[derive(Clone, Debug)] pub enum DataValue { Scalar(ScalarValue), Array(ArrayValue), } +impl DataValue { + pub fn byte_size(&self) -> u32 { + match self { + DataValue::Scalar(x) => x.byte_size(), + DataValue::Array(x) => x.byte_size(), + } + } + + pub fn shape(&self) -> Shape { + match self { + DataValue::Scalar(_) => Shape::Scalar, + DataValue::Array(a) => Shape::Wave(a.len() as u32), + } + } +} + pub trait GetValHelp { type ScalTy: Clone; fn get(&self) -> Result<&Self::ScalTy, Error>; @@ -294,47 +424,21 @@ impl ChannelStatusItem { #[derive(Debug)] pub struct InsertItem { pub series: SeriesId, - pub ts_msp: u64, - pub ts_lsp: u64, + pub ts_msp: TsMs, + pub ts_lsp: DtNano, pub msp_bump: bool, - pub ts_msp_grid: Option, pub pulse: u64, pub scalar_type: ScalarType, pub shape: Shape, pub val: DataValue, - pub ts_local: u64, -} - -#[derive(Debug)] -pub struct MuteItem { - pub series: SeriesId, - pub ts: u64, - pub ema: f32, - pub emd: f32, -} - -#[derive(Debug)] -pub struct IvlItem { - pub series: SeriesId, - pub ts: u64, - pub ema: f32, - pub emd: f32, -} - -#[derive(Debug)] -pub struct ChannelInfoItem { - pub ts_msp: u32, - pub series: SeriesId, - pub ivl: f32, - pub interest: f32, - pub evsize: u32, + pub ts_local: TsMs, } #[derive(Debug)] pub struct TimeBinSimpleF32 { pub series: SeriesId, pub bin_len_ms: i32, - pub ts_msp: i64, + pub ts_msp: TsMs, pub off: i32, pub count: i64, pub min: f32, @@ -347,9 +451,6 @@ pub enum QueryItem { ConnectionStatus(ConnectionStatusItem), ChannelStatus(ChannelStatusItem), Insert(InsertItem), - Mute(MuteItem), - Ivl(IvlItem), - ChannelInfo(ChannelInfoItem), TimeBinSimpleF32(TimeBinSimpleF32), Accounting(Accounting), } @@ -357,17 +458,17 @@ pub enum QueryItem { #[derive(Debug)] pub struct Accounting { pub part: i32, - pub ts: i64, + pub ts: TsMs, pub series: SeriesId, pub count: i64, pub bytes: i64, } struct InsParCom { - series: u64, - ts_msp: u64, - ts_lsp: u64, - ts_local: u64, + series: SeriesId, + ts_msp: TsMs, + ts_lsp: DtNano, + ts_local: TsMs, pulse: u64, do_insert: bool, stats: Arc, @@ -378,23 +479,21 @@ where ST: Value + SerializeCql + Send + 'static, { let params = ( - par.series as i64, - par.ts_msp as i64, - par.ts_lsp as i64, + par.series.to_i64(), + par.ts_msp.to_i64(), + par.ts_lsp.to_i64(), par.pulse as i64, val, ); InsertFut::new(scy, qu, params, par.ts_local, par.stats) } -fn insert_array_gen_fut(par: InsParCom, val: ST, qu: Arc, scy: Arc) -> InsertFut -where - ST: Value + SerializeCql + Send + 'static, -{ +// val: Vec where ST: Value + SerializeCql + Send + 'static, +fn insert_array_gen_fut(par: InsParCom, val: Vec, qu: Arc, scy: Arc) -> InsertFut { let params = ( - par.series as i64, - par.ts_msp as i64, - par.ts_lsp as i64, + par.series.to_i64(), + par.ts_msp.to_i64(), + par.ts_lsp.to_i64(), par.pulse as i64, val, ); @@ -418,20 +517,15 @@ impl InsertFut { qu: Arc, params: V, // timestamp when we first encountered the data to-be inserted, for metrics - tsnet: u64, + tsnet: TsMs, stats: Arc, ) -> Self { let scy_ref = unsafe { NonNull::from(scy.as_ref()).as_ref() }; let qu_ref = unsafe { NonNull::from(qu.as_ref()).as_ref() }; let fut = scy_ref.execute_paged(qu_ref, params, None); let fut = fut.map(move |x| { - let item_ts_local = tsnet; - let tsnow_u64 = { - let ts = SystemTime::now(); - let epoch = ts.duration_since(std::time::UNIX_EPOCH).unwrap(); - epoch.as_secs() * SEC + epoch.subsec_nanos() as u64 - }; - let dt = ((tsnow_u64 / 1000000) as u32).saturating_sub((item_ts_local / 1000000) as u32); + let tsnow = TsMs::from_system_time(SystemTime::now()); + let dt = tsnow.to_u64().saturating_sub(tsnet.to_u64()) as u32; stats.item_lat_net_store().ingest(dt); x }); @@ -461,9 +555,9 @@ where ST: Value + SerializeCql, { let params = ( - par.series as i64, - par.ts_msp as i64, - par.ts_lsp as i64, + par.series.to_i64(), + par.ts_msp.to_i64(), + par.ts_lsp.to_i64(), par.pulse as i64, val, ); @@ -497,9 +591,9 @@ where { if par.do_insert { let params = ( - par.series as i64, - par.ts_msp as i64, - par.ts_lsp as i64, + par.series.to_i64(), + par.ts_msp.to_i64(), + par.ts_lsp.to_i64(), par.pulse as i64, val, ); @@ -521,7 +615,7 @@ where } } -// TODO ncurrently not in use, anything to merge? +// TODO currently not in use, anything to merge? pub async fn insert_item( item: InsertItem, data_store: &DataStore, @@ -529,29 +623,15 @@ pub async fn insert_item( stats: &Arc, ) -> Result<(), Error> { if item.msp_bump { - let params = (item.series.id() as i64, item.ts_msp as i64); + let params = (item.series.id() as i64, item.ts_msp.to_i64()); data_store.scy.execute(&data_store.qu_insert_ts_msp, params).await?; stats.inserts_msp().inc(); } - if let Some(ts_msp_grid) = item.ts_msp_grid { - let params = ( - (item.series.id() as i32) & 0xff, - ts_msp_grid as i32, - if item.shape.to_scylla_vec().is_empty() { 0 } else { 1 } as i32, - item.scalar_type.to_scylla_i32(), - item.series.id() as i64, - ); - data_store - .scy - .execute(&data_store.qu_insert_series_by_ts_msp, params) - .await?; - stats.inserts_msp_grid().inc(); - } use DataValue::*; match item.val { Scalar(val) => { let par = InsParCom { - series: item.series.id(), + series: item.series, ts_msp: item.ts_msp, ts_lsp: item.ts_lsp, ts_local: item.ts_local, @@ -573,7 +653,7 @@ pub async fn insert_item( } Array(val) => { let par = InsParCom { - series: item.series.id(), + series: item.series, ts_msp: item.ts_msp, ts_lsp: item.ts_lsp, ts_local: item.ts_local, @@ -581,6 +661,7 @@ pub async fn insert_item( do_insert, stats: stats.clone(), }; + err::todo(); use ArrayValue::*; match val { I8(val) => insert_array_gen(par, val, &data_store.qu_insert_array_i8, &data_store).await?, @@ -598,14 +679,14 @@ pub async fn insert_item( pub fn insert_msp_fut( series: SeriesId, - ts_msp: u64, + ts_msp: TsMs, // for stats, the timestamp when we received that data - tsnet: u64, + tsnet: TsMs, scy: Arc, qu: Arc, stats: Arc, ) -> InsertFut { - let params = (series.id() as i64, ts_msp as i64); + let params = (series.to_i64(), ts_msp.to_i64()); InsertFut::new(scy, qu, params, tsnet, stats) } @@ -620,7 +701,7 @@ pub fn insert_item_fut( match item.val { Scalar(val) => { let par = InsParCom { - series: item.series.id(), + series: item.series, ts_msp: item.ts_msp, ts_lsp: item.ts_lsp, ts_local: item.ts_local, @@ -642,7 +723,7 @@ pub fn insert_item_fut( } Array(val) => { let par = InsParCom { - series: item.series.id(), + series: item.series, ts_msp: item.ts_msp, ts_lsp: item.ts_lsp, ts_local: item.ts_local, @@ -651,13 +732,15 @@ pub fn insert_item_fut( stats: stats.clone(), }; use ArrayValue::*; + let blob = val.to_binary_blob(); + #[allow(unused)] match val { - I8(val) => insert_array_gen_fut(par, val, data_store.qu_insert_array_i8.clone(), scy), - I16(val) => insert_array_gen_fut(par, val, data_store.qu_insert_array_i16.clone(), scy), - I32(val) => insert_array_gen_fut(par, val, data_store.qu_insert_array_i32.clone(), scy), - F32(val) => insert_array_gen_fut(par, val, data_store.qu_insert_array_f32.clone(), scy), - F64(val) => insert_array_gen_fut(par, val, data_store.qu_insert_array_f64.clone(), scy), - Bool(val) => insert_array_gen_fut(par, val, data_store.qu_insert_array_bool.clone(), scy), + I8(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_i8.clone(), scy), + I16(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_i16.clone(), scy), + I32(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_i32.clone(), scy), + F32(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_f32.clone(), scy), + F64(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_f64.clone(), scy), + Bool(val) => insert_array_gen_fut(par, blob, data_store.qu_insert_array_bool.clone(), scy), } } } @@ -668,17 +751,13 @@ pub fn insert_connection_status_fut( data_store: &DataStore, stats: Arc, ) -> InsertFut { - let tsunix = item.ts.duration_since(UNIX_EPOCH).unwrap_or(Duration::ZERO); - let secs = tsunix.as_secs() * SEC; - let nanos = tsunix.subsec_nanos() as u64; - let ts = secs + nanos; - let ts_msp = ts / CONNECTION_STATUS_DIV * CONNECTION_STATUS_DIV; - let ts_lsp = ts - ts_msp; + let ts = TsMs::from_system_time(item.ts); + let (msp, lsp) = ts.to_grid_02(CONNECTION_STATUS_DIV); // TODO is that the good tsnet to use? let tsnet = ts; let kind = item.status.to_kind(); let addr = format!("{}", item.addr); - let params = (ts_msp as i64, ts_lsp as i64, kind as i32, addr); + let params = (msp.to_i64(), lsp.to_i64(), kind as i32, addr); InsertFut::new( data_store.scy.clone(), data_store.qu_insert_connection_status.clone(), @@ -693,16 +772,12 @@ pub fn insert_channel_status_fut( data_store: &DataStore, stats: Arc, ) -> SmallVec<[InsertFut; 4]> { - let tsunix = item.ts.duration_since(UNIX_EPOCH).unwrap_or(Duration::ZERO); - let secs = tsunix.as_secs() * SEC; - let nanos = tsunix.subsec_nanos() as u64; - let ts = secs + nanos; - let ts_msp = ts / CONNECTION_STATUS_DIV * CONNECTION_STATUS_DIV; - let ts_lsp = ts - ts_msp; + let ts = TsMs::from_system_time(item.ts); + let (msp, lsp) = ts.to_grid_02(CONNECTION_STATUS_DIV); let tsnet = ts; let kind = item.status.to_kind(); let cssid = item.cssid.id(); - let params = (cssid as i64, ts_msp as i64, ts_lsp as i64, kind as i32); + let params = (cssid as i64, msp.to_i64(), lsp.to_i64(), kind as i32); let fut1 = InsertFut::new( data_store.scy.clone(), data_store.qu_insert_channel_status.clone(), @@ -710,7 +785,7 @@ pub fn insert_channel_status_fut( tsnet, stats.clone(), ); - let params = (ts_msp as i64, ts_lsp as i64, cssid as i64, kind as i32); + let params = (msp.to_i64(), lsp.to_i64(), cssid as i64, kind as i32); let fut2 = InsertFut::new( data_store.scy.clone(), data_store.qu_insert_channel_status_by_ts_msp.clone(), @@ -722,15 +797,11 @@ pub fn insert_channel_status_fut( } pub async fn insert_connection_status(item: ConnectionStatusItem, data_store: &DataStore) -> Result<(), Error> { - let tsunix = item.ts.duration_since(UNIX_EPOCH).unwrap_or(Duration::ZERO); - let secs = tsunix.as_secs() * SEC; - let nanos = tsunix.subsec_nanos() as u64; - let ts = secs + nanos; - let ts_msp = ts / CONNECTION_STATUS_DIV * CONNECTION_STATUS_DIV; - let ts_lsp = ts - ts_msp; + let ts = TsMs::from_system_time(item.ts); + let (msp, lsp) = ts.to_grid_02(CONNECTION_STATUS_DIV); let kind = item.status.to_kind(); let addr = format!("{}", item.addr); - let params = (ts_msp as i64, ts_lsp as i64, kind as i32, addr); + let params = (msp.to_i64(), lsp.to_i64(), kind as i32, addr); data_store .scy .execute(&data_store.qu_insert_connection_status, params) @@ -739,20 +810,16 @@ pub async fn insert_connection_status(item: ConnectionStatusItem, data_store: &D } pub async fn insert_channel_status(item: ChannelStatusItem, data_store: &DataStore) -> Result<(), Error> { - let tsunix = item.ts.duration_since(UNIX_EPOCH).unwrap_or(Duration::ZERO); - let secs = tsunix.as_secs() * SEC; - let nanos = tsunix.subsec_nanos() as u64; - let ts = secs + nanos; - let ts_msp = ts / CONNECTION_STATUS_DIV * CONNECTION_STATUS_DIV; - let ts_lsp = ts - ts_msp; + let ts = TsMs::from_system_time(item.ts); + let (msp, lsp) = ts.to_grid_02(CONNECTION_STATUS_DIV); let kind = item.status.to_kind(); let cssid = item.cssid.id(); - let params = (cssid as i64, ts_msp as i64, ts_lsp as i64, kind as i32); + let params = (cssid as i64, msp.to_i64(), lsp.to_i64(), kind as i32); data_store .scy .execute(&data_store.qu_insert_channel_status, params) .await?; - let params = (ts_msp as i64, ts_lsp as i64, cssid as i64, kind as i32); + let params = (msp.to_i64(), lsp.to_i64(), cssid as i64, kind as i32); data_store .scy .execute(&data_store.qu_insert_channel_status_by_ts_msp, params) diff --git a/scywr/src/schema.rs b/scywr/src/schema.rs index 7b2ed1a..0a059f1 100644 --- a/scywr/src/schema.rs +++ b/scywr/src/schema.rs @@ -21,6 +21,7 @@ pub enum Error { NewSession(String), ScyllaNextRow(#[from] NextRowError), MissingData, + AddColumnImpossible, } impl From for Error { @@ -110,6 +111,7 @@ fn ddays(x: u64) -> Duration { } struct GenTwcsTab { + keyspace: String, name: String, col_names: Vec, col_types: Vec, @@ -121,11 +123,8 @@ struct GenTwcsTab { } impl GenTwcsTab { - // name: "series_by_ts_msp".into(), - // cql: "(part int, ts_msp int, shape_kind int, scalar_type int, series bigint, primary key ((part, ts_msp, shape_kind, scalar_type), series))".into(), - // default_time_to_live: 60 * 60 * 5, - // compaction_window_size: 24 * 4, - pub fn new<'a, PRE, N, CI, A, B, I2, I2A, I3, I3A>( + pub fn new<'a, KS, PRE, N, CI, A, B, I2, I2A, I3, I3A>( + keyspace: KS, pre: PRE, name: N, cols: CI, @@ -134,6 +133,7 @@ impl GenTwcsTab { default_time_to_live: Duration, ) -> Self where + KS: Into, PRE: AsRef, N: AsRef, CI: IntoIterator, @@ -146,6 +146,7 @@ impl GenTwcsTab { I3A: Into, { Self::new_inner( + keyspace.into(), pre.as_ref(), name.as_ref(), cols, @@ -157,6 +158,7 @@ impl GenTwcsTab { } fn new_inner<'a, CI, A, B, I2, I2A, I3, I3A>( + keyspace: String, pre: &str, name: &str, cols: CI, @@ -181,6 +183,7 @@ impl GenTwcsTab { col_types.push(b.as_ref().into()); }); Self { + keyspace, name: format!("{}{}", pre, name), col_names, col_types, @@ -192,6 +195,10 @@ impl GenTwcsTab { } } + fn keyspace(&self) -> &str { + &self.keyspace + } + fn name(&self) -> &str { &self.name } @@ -199,6 +206,7 @@ impl GenTwcsTab { async fn setup(&self, scy: &ScySession) -> Result<(), Error> { self.create_if_missing(scy).await?; self.check_table_options(scy).await?; + self.check_columns(scy).await?; Ok(()) } @@ -271,9 +279,7 @@ impl GenTwcsTab { "select default_time_to_live, gc_grace_seconds, compaction", " from system_schema.tables where keyspace_name = ? and table_name = ?" ); - let x = scy - .query_iter(cql, (scy.get_keyspace().unwrap().as_ref(), &self.name())) - .await?; + let x = scy.query_iter(cql, (self.keyspace(), self.name())).await?; let mut it = x.into_typed::<(i32, i32, BTreeMap)>(); let mut rows = Vec::new(); while let Some(u) = it.next().await { @@ -281,23 +287,15 @@ impl GenTwcsTab { rows.push((row.0 as u64, row.1 as u64, row.2)); } if let Some(row) = rows.get(0) { + let mut set_opts = Vec::new(); if row.0 != self.default_time_to_live.as_secs() { - let cql = format!( - concat!("alter table {} with default_time_to_live = {}"), - self.name(), + set_opts.push(format!( + "default_time_to_live = {}", self.default_time_to_live.as_secs() - ); - debug!("{cql}"); - scy.query(cql, ()).await?; + )); } if row.1 != self.gc_grace.as_secs() { - let cql = format!( - concat!("alter table {} with gc_grace_seconds = {}"), - self.name(), - self.gc_grace.as_secs() - ); - debug!("{cql}"); - scy.query(cql, ()).await?; + set_opts.push(format!("gc_grace_seconds = {}", self.gc_grace.as_secs())); } if row.2 != self.compaction_options() { let params: Vec<_> = self @@ -306,11 +304,10 @@ impl GenTwcsTab { .map(|(k, v)| format!("'{k}': '{v}'")) .collect(); let params = params.join(", "); - let cql = format!( - concat!("alter table {} with compaction = {{ {} }}"), - self.name(), - params - ); + set_opts.push(format!("compaction = {{ {} }}", params)); + } + if set_opts.len() != 0 { + let cql = format!(concat!("alter table {} with {}"), self.name(), set_opts.join(" and ")); debug!("{cql}"); scy.query(cql, ()).await?; } @@ -320,27 +317,87 @@ impl GenTwcsTab { } Ok(()) } + + async fn check_columns(&self, scy: &ScySession) -> Result<(), Error> { + let cql = concat!( + "select column_name, type from system_schema.columns", + " where keyspace_name = ?", + " and table_name = ?", + ); + let mut it = scy + .query_iter(cql, (self.keyspace(), self.name())) + .await? + .into_typed::<(String, String)>(); + let mut names_exist = Vec::new(); + let mut types_exist = Vec::new(); + while let Some(x) = it.next().await { + let row = x?; + names_exist.push(row.0); + types_exist.push(row.1); + } + debug!("names_exist {:?} types_exist {:?}", names_exist, types_exist); + for (cn, ct) in self.col_names.iter().zip(self.col_types.iter()) { + if names_exist.contains(cn) { + let i = names_exist.binary_search(cn).unwrap(); + let ty2 = types_exist.get(i).unwrap(); + if ct != ty2 { + error!( + "type mismatch for existing column {} {} {} {}", + self.name(), + cn, + ct, + ty2 + ); + return Err(Error::AddColumnImpossible); + } + } else { + if self.partition_keys.contains(cn) { + error!("pk {} {}", cn, ct); + return Err(Error::AddColumnImpossible); + } + if self.cluster_keys.contains(cn) { + error!("ck {} {}", cn, ct); + return Err(Error::AddColumnImpossible); + } + self.add_column(cn, ct, scy).await?; + } + } + Ok(()) + } + + async fn add_column(&self, name: &str, ty: &str, scy: &ScySession) -> Result<(), Error> { + let cql = format!(concat!("alter table {} add {} {}"), self.name(), name, ty); + debug!("NOTE add_column CQL {}", cql); + scy.query(cql, ()).await?; + Ok(()) + } } #[allow(unused)] async fn get_columns(keyspace: &str, table: &str, scy: &ScySession) -> Result, Error> { let mut ret = Vec::new(); - let cql = "select column_name, kind, type from system_schema.columns where keyspace_name = ? and table_name = ?"; - let params = (keyspace, table); - let mut res = scy.query_iter(cql, params).await?; - while let Some(row) = res.next().await { + // kind (text) can be one of: "regular", "clustering", "partition_key". + // clustering_order (text) can be one of: "NONE", "ASC", "DESC". + // type (text) examples: "bigint", "frozen>", etc. + let cql = concat!( + "select column_name, clustering_order, kind, position, type", + " from system_schema.columns where keyspace_name = ? and table_name = ?" + ); + let mut it = scy + .query_iter(cql, (keyspace, table)) + .await? + .into_typed::<(String, String, String, i32, String)>(); + while let Some(x) = it.next().await { + let row = x?; // columns: - // kind (text): regular, clustering, partition_key. // column_name (text) // type (text): text, blob, int, ... - let row = row?; - let name = row.columns[0].as_ref().unwrap().as_text().unwrap(); - ret.push(name.into()); + ret.push(row.0); } Ok(ret) } -async fn check_event_tables(rett: RetentionTime, scy: &ScySession) -> Result<(), Error> { +async fn check_event_tables(keyspace: &str, rett: RetentionTime, scy: &ScySession) -> Result<(), Error> { let stys = [ "u8", "u16", "u32", "u64", "i8", "i16", "i32", "i64", "f32", "f64", "bool", "string", ]; @@ -351,6 +408,7 @@ async fn check_event_tables(rett: RetentionTime, scy: &ScySession) -> Result<(), for (sty, cqlsty) in stys.into_iter().zip(cqlstys) { { let tab = GenTwcsTab::new( + keyspace, rett.table_prefix(), format!("events_scalar_{}", sty), &[ @@ -368,6 +426,7 @@ async fn check_event_tables(rett: RetentionTime, scy: &ScySession) -> Result<(), } { let tab = GenTwcsTab::new( + keyspace, rett.table_prefix(), format!("events_array_{}", sty), &[ @@ -376,6 +435,7 @@ async fn check_event_tables(rett: RetentionTime, scy: &ScySession) -> Result<(), ("ts_lsp", "bigint"), ("pulse", "bigint"), ("value", &format!("frozen>", cqlsty)), + ("valueblob", "blob"), ], ["series", "ts_msp"], ["ts_lsp"], @@ -387,13 +447,16 @@ async fn check_event_tables(rett: RetentionTime, scy: &ScySession) -> Result<(), Ok(()) } -pub async fn migrate_scylla_data_schema(scyconf: &ScyllaIngestConfig, rett: RetentionTime) -> Result<(), Error> { +pub async fn migrate_scylla_data_schema( + scyconf: &ScyllaIngestConfig, + replication: u32, + durable: bool, + rett: RetentionTime, +) -> Result<(), Error> { let scy2 = create_session_no_ks(scyconf).await?; let scy = &scy2; if !has_keyspace(scyconf.keyspace(), scy).await? { - let replication = 3; - let durable = false; let cql = format!( concat!( "create keyspace {}", @@ -409,51 +472,27 @@ pub async fn migrate_scylla_data_schema(scyconf: &ScyllaIngestConfig, rett: Rete info!("keyspace created"); } - scy.use_keyspace(scyconf.keyspace(), true).await?; + let ks = scyconf.keyspace(); - check_event_tables(rett.clone(), scy).await?; + scy.use_keyspace(ks, true).await?; + + check_event_tables(ks, rett.clone(), scy).await?; { let tab = GenTwcsTab::new( + ks, rett.table_prefix(), "ts_msp", &[("series", "bigint"), ("ts_msp", "bigint")], ["series"], - ["ts_msp_ms"], + ["ts_msp"], rett.ttl_ts_msp(), ); tab.setup(scy).await?; } { let tab = GenTwcsTab::new( - rett.table_prefix(), - "ts_msp_ms", - &[("series", "bigint"), ("ts_msp_ms", "bigint")], - ["series"], - ["ts_msp_ms"], - rett.ttl_ts_msp(), - ); - tab.setup(scy).await?; - } - { - let tab = GenTwcsTab::new( - rett.table_prefix(), - "series_by_ts_msp", - &[ - ("part", "int"), - ("ts_msp", "int"), - ("shape_kind", "int"), - ("scalar_type", "int"), - ("series", "bigint"), - ], - ["part", "ts_msp", "shape_kind", "scalar_type"], - ["series"], - dhours(5), - ); - tab.setup(scy).await?; - } - { - let tab = GenTwcsTab::new( + ks, rett.table_prefix(), "connection_status", &[ @@ -470,6 +509,7 @@ pub async fn migrate_scylla_data_schema(scyconf: &ScyllaIngestConfig, rett: Rete } { let tab = GenTwcsTab::new( + ks, rett.table_prefix(), "channel_status", &[ @@ -486,6 +526,7 @@ pub async fn migrate_scylla_data_schema(scyconf: &ScyllaIngestConfig, rett: Rete } { let tab = GenTwcsTab::new( + ks, rett.table_prefix(), "channel_status_by_ts_msp", &[ @@ -502,6 +543,7 @@ pub async fn migrate_scylla_data_schema(scyconf: &ScyllaIngestConfig, rett: Rete } { let tab = GenTwcsTab::new( + ks, rett.table_prefix(), "binned_scalar_f32", &[ @@ -522,6 +564,7 @@ pub async fn migrate_scylla_data_schema(scyconf: &ScyllaIngestConfig, rett: Rete } { let tab = GenTwcsTab::new( + ks, rett.table_prefix(), "account_00", &[ diff --git a/scywr/src/session.rs b/scywr/src/session.rs index d6d7d34..b66491d 100644 --- a/scywr/src/session.rs +++ b/scywr/src/session.rs @@ -7,7 +7,6 @@ use err::ThisError; use scylla::execution_profile::ExecutionProfileBuilder; use scylla::statement::Consistency; use scylla::transport::errors::NewSessionError; -use std::num::NonZeroUsize; use std::sync::Arc; #[derive(Debug, ThisError)] @@ -22,18 +21,19 @@ impl From for Error { } pub async fn create_session_no_ks(scyconf: &ScyllaIngestConfig) -> Result, Error> { + use scylla::transport::session::PoolSize; + use scylla::transport::session_builder::GenericSessionBuilder; let profile = ExecutionProfileBuilder::default() .consistency(Consistency::LocalOne) .build() .into_handle(); - let scy = scylla::transport::session_builder::GenericSessionBuilder::new() - .pool_size(scylla::transport::session::PoolSize::PerShard( - NonZeroUsize::new(1).unwrap(), - )) + let scy = GenericSessionBuilder::new() + .pool_size(PoolSize::default()) .known_nodes(scyconf.hosts()) .default_execution_profile_handle(profile) .write_coalescing(true) .compression(None) + // .compression(Some(scylla::frame::Compression::Snappy)) .build() .await?; let scy = Arc::new(scy); diff --git a/scywr/src/store.rs b/scywr/src/store.rs index 37fe1ee..fe16e28 100644 --- a/scywr/src/store.rs +++ b/scywr/src/store.rs @@ -20,7 +20,6 @@ pub struct DataStore { pub rett: RetentionTime, pub scy: Arc, pub qu_insert_ts_msp: Arc, - pub qu_insert_series_by_ts_msp: Arc, pub qu_insert_scalar_i8: Arc, pub qu_insert_scalar_i16: Arc, pub qu_insert_scalar_i32: Arc, @@ -71,18 +70,6 @@ impl DataStore { .await?; let qu_insert_ts_msp = Arc::new(q); - let cql = format!( - concat!( - "insert into {}{}", - " (part, ts_msp, shape_kind, scalar_type, series)", - " values (?, ?, ?, ?, ?)" - ), - rett.table_prefix(), - "series_by_ts_msp" - ); - let q = scy.prepare(cql).await?; - let qu_insert_series_by_ts_msp = Arc::new(q); - let qu_insert_scalar_i8 = prep_qu_ins_a!("events_scalar_i8", rett, scy); let qu_insert_scalar_i16 = prep_qu_ins_a!("events_scalar_i16", rett, scy); let qu_insert_scalar_i32 = prep_qu_ins_a!("events_scalar_i32", rett, scy); @@ -93,31 +80,31 @@ impl DataStore { let qu_insert_scalar_string = prep_qu_ins_a!("events_scalar_string", rett, scy); // array - let cql = "insert into events_array_i8 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)"; + let cql = "insert into events_array_i8 (series, ts_msp, ts_lsp, pulse, valueblob) values (?, ?, ?, ?, ?)"; let q = scy.prepare(cql).await?; let qu_insert_array_i8 = Arc::new(q); - let cql = "insert into events_array_i16 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)"; + let cql = "insert into events_array_i16 (series, ts_msp, ts_lsp, pulse, valueblob) values (?, ?, ?, ?, ?)"; let q = scy.prepare(cql).await?; let qu_insert_array_i16 = Arc::new(q); - let cql = "insert into events_array_i32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)"; + let cql = "insert into events_array_i32 (series, ts_msp, ts_lsp, pulse, valueblob) values (?, ?, ?, ?, ?)"; let q = scy.prepare(cql).await?; let qu_insert_array_i32 = Arc::new(q); - let cql = "insert into events_array_i64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)"; + let cql = "insert into events_array_i64 (series, ts_msp, ts_lsp, pulse, valueblob) values (?, ?, ?, ?, ?)"; let q = scy.prepare(cql).await?; let qu_insert_array_i64 = Arc::new(q); - let cql = "insert into events_array_f32 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)"; + let cql = "insert into events_array_f32 (series, ts_msp, ts_lsp, pulse, valueblob) values (?, ?, ?, ?, ?)"; let q = scy.prepare(cql).await?; let qu_insert_array_f32 = Arc::new(q); - let cql = "insert into events_array_f64 (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)"; + let cql = "insert into events_array_f64 (series, ts_msp, ts_lsp, pulse, valueblob) values (?, ?, ?, ?, ?)"; let q = scy.prepare(cql).await?; let qu_insert_array_f64 = Arc::new(q); - let cql = "insert into events_array_bool (series, ts_msp, ts_lsp, pulse, value) values (?, ?, ?, ?, ?)"; + let cql = "insert into events_array_bool (series, ts_msp, ts_lsp, pulse, valueblob) values (?, ?, ?, ?, ?)"; let q = scy.prepare(cql).await?; let qu_insert_array_bool = Arc::new(q); @@ -154,7 +141,6 @@ impl DataStore { rett, scy, qu_insert_ts_msp, - qu_insert_series_by_ts_msp, qu_insert_scalar_i8, qu_insert_scalar_i16, qu_insert_scalar_i32, diff --git a/series/src/series.rs b/series/src/series.rs index 1bb7e6d..a6958e8 100644 --- a/series/src/series.rs +++ b/series/src/series.rs @@ -17,7 +17,7 @@ impl Existence { } } -#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize)] +#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] pub struct SeriesId(u64); impl SeriesId { @@ -28,9 +28,13 @@ impl SeriesId { pub fn id(&self) -> u64 { self.0 } + + pub fn to_i64(&self) -> i64 { + self.0 as i64 + } } -#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Hash, Serialize, Deserialize)] +#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] pub struct ChannelStatusSeriesId(u64); impl ChannelStatusSeriesId { @@ -41,4 +45,8 @@ impl ChannelStatusSeriesId { pub fn id(&self) -> u64 { self.0 } + + pub fn to_i64(&self) -> i64 { + self.0 as i64 + } } diff --git a/serieswriter/src/timebin.rs b/serieswriter/src/timebin.rs index 0f079f0..ae6cc16 100644 --- a/serieswriter/src/timebin.rs +++ b/serieswriter/src/timebin.rs @@ -19,6 +19,7 @@ use netpod::BinnedRange; use netpod::BinnedRangeEnum; use netpod::ScalarType; use netpod::Shape; +use netpod::TsMs; use netpod::TsNano; use scywr::iteminsertqueue::DataValue; use scywr::iteminsertqueue::GetValHelp; @@ -374,7 +375,7 @@ fn store_bins( let item = TimeBinSimpleF32 { series: series.clone(), bin_len_ms: bin_len_ms as i32, - ts_msp: ts_msp as i64, + ts_msp: TsMs::from_ms_u64(ts_msp), off: off as i32, count: count as i64, min, diff --git a/serieswriter/src/writer.rs b/serieswriter/src/writer.rs index a9b4cb2..e97d6e1 100644 --- a/serieswriter/src/writer.rs +++ b/serieswriter/src/writer.rs @@ -13,8 +13,6 @@ use netpod::ScalarType; use netpod::SeriesKind; use netpod::Shape; use netpod::TsNano; -use netpod::TS_MSP_GRID_SPACING; -use netpod::TS_MSP_GRID_UNIT; use scywr::iteminsertqueue::DataValue; use scywr::iteminsertqueue::InsertItem; use scywr::iteminsertqueue::QueryItem; @@ -61,7 +59,9 @@ pub struct SeriesWriter { shape: Shape, ts_msp_last: Option, inserted_in_current_msp: u32, + bytes_in_current_msp: u32, msp_max_entries: u32, + msp_max_bytes: u32, // TODO this should be in an Option: ts_msp_grid_last: u32, binner: ConnTimeBin, @@ -121,7 +121,9 @@ impl SeriesWriter { shape, ts_msp_last: None, inserted_in_current_msp: 0, + bytes_in_current_msp: 0, msp_max_entries: 64000, + msp_max_bytes: 1024 * 1024 * 20, ts_msp_grid_last: 0, binner, }; @@ -154,54 +156,48 @@ impl SeriesWriter { // As long as one writer is active, the msp is arbitrary. // Maximum resolution of the ts msp: - let msp_res_max = SEC * 10; + let msp_res_max = SEC * 2; - let (ts_msp, ts_msp_changed) = match self.ts_msp_last.clone() { + let (ts_msp, ts_msp_changed) = match self.ts_msp_last { Some(ts_msp_last) => { - if self.inserted_in_current_msp >= self.msp_max_entries || ts_msp_last.clone().add_ns(HOUR) <= ts { - let ts_msp = ts.clone().div(msp_res_max).mul(msp_res_max); + if self.inserted_in_current_msp >= self.msp_max_entries + || self.bytes_in_current_msp >= self.msp_max_bytes + || ts_msp_last.add_ns(HOUR) <= ts + { + let ts_msp = ts.div(msp_res_max).mul(msp_res_max); if ts_msp == ts_msp_last { (ts_msp, false) } else { - self.ts_msp_last = Some(ts_msp.clone()); + self.ts_msp_last = Some(ts_msp); self.inserted_in_current_msp = 1; + self.bytes_in_current_msp = val.byte_size(); (ts_msp, true) } } else { self.inserted_in_current_msp += 1; + self.bytes_in_current_msp += val.byte_size(); (ts_msp_last, false) } } None => { - let ts_msp = ts.clone().div(msp_res_max).mul(msp_res_max); - self.ts_msp_last = Some(ts_msp.clone()); + let ts_msp = ts.div(msp_res_max).mul(msp_res_max); + self.ts_msp_last = Some(ts_msp); self.inserted_in_current_msp = 1; + self.bytes_in_current_msp = val.byte_size(); (ts_msp, true) } }; - let ts_lsp = ts.clone().sub(ts_msp.clone()); - let ts_msp_grid = ts - .div(TS_MSP_GRID_UNIT) - .div(TS_MSP_GRID_SPACING) - .mul(TS_MSP_GRID_SPACING) - .ns() as u32; - let ts_msp_grid = if self.ts_msp_grid_last != ts_msp_grid { - self.ts_msp_grid_last = ts_msp_grid; - Some(ts_msp_grid) - } else { - None - }; + let ts_lsp = ts.delta(ts_msp); let item = InsertItem { series: self.sid.clone(), - ts_msp: ts_msp.ns(), - ts_lsp: ts_lsp.ns(), + ts_msp: ts_msp.to_ts_ms(), + ts_lsp: ts_lsp, msp_bump: ts_msp_changed, pulse: 0, scalar_type: self.scalar_type.clone(), shape: self.shape.clone(), val, - ts_msp_grid, - ts_local: ts_local.ns(), + ts_local: ts_local.to_ts_ms(), }; item_qu.push_back(QueryItem::Insert(item)); Ok(()) @@ -336,7 +332,7 @@ fn write_00() { let scyconf = &ScyllaIngestConfig::new(["127.0.0.1:19042"], "daqingest_test_00"); let (pgc, pg_jh) = dbpg::conn::make_pg_client(dbconf).await?; dbpg::schema::schema_check(&pgc).await?; - scywr::schema::migrate_scylla_data_schema(scyconf, netpod::ttl::RetentionTime::Short).await?; + scywr::schema::migrate_scylla_data_schema(scyconf, 1, true, netpod::ttl::RetentionTime::Short).await?; let scy = scywr::session::create_session(scyconf).await?; let stats = SeriesByChannelStats::new(); let stats = Arc::new(stats);