diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index 9da8de9..2641f6d 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -20,4 +20,5 @@ netpod = { path = "../../daqbuffer/crates/netpod" } taskrun = { path = "../../daqbuffer/crates/taskrun" } log = { path = "../log" } stats = { path = "../stats" } +scywr = { path = "../scywr" } netfetch = { path = "../netfetch" } diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index a308c3f..ae30235 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -11,27 +11,28 @@ use netfetch::ca::conn::ConnCommand; use netfetch::ca::connset::CaConnSet; use netfetch::ca::findioc::FindIocRes; use netfetch::ca::findioc::FindIocStream; -use netfetch::ca::store::DataStore; use netfetch::ca::IngestCommons; use netfetch::ca::SlowWarnable; use netfetch::conf::CaIngestOpts; use netfetch::daemon_common::Channel; use netfetch::daemon_common::DaemonEvent; use netfetch::errconv::ErrConv; -use netfetch::insertworker::Ttls; use netfetch::metrics::ExtraInsertsConf; use netfetch::metrics::StatsSet; use netfetch::series::ChannelStatusSeriesId; use netfetch::series::Existence; use netfetch::series::SeriesId; -use netfetch::store::ChannelStatus; -use netfetch::store::ChannelStatusItem; -use netfetch::store::CommonInsertItemQueue; -use netfetch::store::ConnectionStatus; -use netfetch::store::ConnectionStatusItem; -use netfetch::store::QueryItem; use netpod::Database; use netpod::ScyllaConfig; +use scywr::insertworker::Ttls; +use scywr::iteminsertqueue as scywriiq; +use scywr::store::DataStore; +use scywriiq::ChannelStatus; +use scywriiq::ChannelStatusItem; +use scywriiq::CommonInsertItemQueue; +use scywriiq::ConnectionStatus; +use scywriiq::ConnectionStatusItem; +use scywriiq::QueryItem; use serde::Serialize; use stats::DaemonStats; use std::collections::BTreeMap; @@ -293,7 +294,7 @@ pub struct Daemon { impl Daemon { pub async fn new(opts: DaemonOpts) -> Result { - let pg_client = Arc::new(make_pg_client(&opts.pgconf).await?); + // let pg_client = Arc::new(make_pg_client(&opts.pgconf).await?); let datastore = DataStore::new(&opts.scyconf) .await .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; @@ -333,7 +334,7 @@ impl Daemon { insert_queue_counter.fetch_add(1, atomic::Ordering::AcqRel); //trace!("insert queue item {item:?}"); match &item { - netfetch::store::QueryItem::Insert(item) => { + QueryItem::Insert(item) => { let shape_kind = match &item.shape { netpod::Shape::Scalar => 0 as u32, netpod::Shape::Wave(_) => 1, @@ -394,10 +395,10 @@ impl Daemon { data_store: datastore.clone(), insert_ivl_min: Arc::new(AtomicU64::new(0)), extra_inserts_conf: tokio::sync::Mutex::new(ExtraInsertsConf::new()), - store_workers_rate: AtomicU64::new(20000), - insert_frac: AtomicU64::new(1000), + store_workers_rate: Arc::new(AtomicU64::new(20000)), + insert_frac: Arc::new(AtomicU64::new(1000)), ca_conn_set: CaConnSet::new(channel_info_query_tx.clone()), - insert_workers_running: atomic::AtomicUsize::new(0), + insert_workers_running: Arc::new(AtomicU64::new(0)), }; let ingest_commons = Arc::new(ingest_commons); @@ -426,13 +427,14 @@ impl Daemon { // TODO use a new stats type: let store_stats = Arc::new(stats::CaConnStats::new()); let ttls = opts.ttls.clone(); - let jh_insert_workers = netfetch::insertworker::spawn_scylla_insert_workers( + let insert_worker_opts = Arc::new(ingest_commons.as_ref().into()); + use scywr::insertworker::spawn_scylla_insert_workers; + let jh_insert_workers = spawn_scylla_insert_workers( opts.scyconf.clone(), insert_scylla_sessions, insert_worker_count, common_insert_item_queue_2.clone(), - ingest_commons.clone(), - pg_client.clone(), + insert_worker_opts, store_stats.clone(), use_rate_limit_queue, ttls, @@ -977,7 +979,7 @@ impl Daemon { if let Some(tx) = self.ingest_commons.insert_item_queue.sender() { let item = QueryItem::ChannelStatus(ChannelStatusItem { ts: tsnow, - series: SeriesId::new(status_series_id.id()), + series: scywr::iteminsertqueue::SeriesId::new(status_series_id.id()), status: ChannelStatus::AssignedToAddress, }); match tx.send(item).await { @@ -1551,7 +1553,9 @@ pub async fn run(opts: CaIngestOpts, channels: Vec) -> Result<(), Error> netfetch::dbpg::schema_check(opts.postgresql()).await?; - netfetch::scylla::migrate_keyspace(opts.scylla()).await?; + scywr::schema::migrate_keyspace(opts.scylla()) + .await + .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; // TODO use a new stats type: //let store_stats = Arc::new(CaConnStats::new()); diff --git a/netfetch/src/bsreadclient.rs b/netfetch/src/bsreadclient.rs index 0edd8cd..88b806c 100644 --- a/netfetch/src/bsreadclient.rs +++ b/netfetch/src/bsreadclient.rs @@ -7,9 +7,6 @@ use crate::ca::proto::CaDataArrayValue; use crate::ca::proto::CaDataValue; use crate::ca::IngestCommons; use crate::series::SeriesId; -use crate::store::CommonInsertItemQueueSender; -use crate::store::InsertItem; -use crate::store::QueryItem; use crate::zmtp::zmtpproto; use crate::zmtp::zmtpproto::SocketType; use crate::zmtp::zmtpproto::Zmtp; @@ -28,7 +25,12 @@ use netpod::ScalarType; use netpod::Shape; use netpod::TS_MSP_GRID_SPACING; use netpod::TS_MSP_GRID_UNIT; -use scylla::Session as ScySession; +use scywr::iteminsertqueue::ArrayValue; +use scywr::iteminsertqueue::CommonInsertItemQueueSender; +use scywr::iteminsertqueue::DataValue; +use scywr::iteminsertqueue::InsertItem; +use scywr::iteminsertqueue::QueryItem; +use scywr::session::ScySession; use stats::CheckEvery; use std::io; use std::net::SocketAddr; @@ -184,7 +186,7 @@ impl BsreadClient { None }; let item = InsertItem { - series, + series: series.into(), ts_msp, ts_lsp, msp_bump: ts_msp_changed, @@ -192,7 +194,7 @@ impl BsreadClient { pulse, scalar_type, shape, - val: CaDataValue::Array(CaDataArrayValue::Bool(evtset)), + val: DataValue::Array(ArrayValue::Bool(evtset)), }; let item = QueryItem::Insert(item); match self.insqtx.send(item).await { diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index 1efba37..16cca12 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -3,19 +3,19 @@ pub mod connset; pub mod findioc; pub mod proto; pub mod search; -pub mod store; -use self::store::DataStore; use crate::ca::connset::CaConnSet; use crate::errconv::ErrConv; use crate::metrics::ExtraInsertsConf; use crate::rt::TokMx; -use crate::store::CommonInsertItemQueue; use err::Error; use futures_util::Future; use futures_util::FutureExt; use log::*; use netpod::Database; +use scywr::insertworker::InsertWorkerOpts; +use scywr::iteminsertqueue::CommonInsertItemQueue; +use scywr::store::DataStore; use stats::CaConnStatsAgg; use std::net::SocketAddrV4; use std::pin::Pin; @@ -43,10 +43,20 @@ pub struct IngestCommons { pub data_store: Arc, pub insert_ivl_min: Arc, pub extra_inserts_conf: TokMx, - pub insert_frac: AtomicU64, - pub store_workers_rate: AtomicU64, + pub insert_frac: Arc, + pub store_workers_rate: Arc, pub ca_conn_set: CaConnSet, - pub insert_workers_running: atomic::AtomicUsize, + pub insert_workers_running: Arc, +} + +impl From<&IngestCommons> for InsertWorkerOpts { + fn from(val: &IngestCommons) -> Self { + Self { + store_workers_rate: val.store_workers_rate.clone(), + insert_workers_running: val.insert_workers_running.clone(), + insert_frac: val.insert_frac.clone(), + } + } } pub trait SlowWarnable { diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 0926528..abf9c0f 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -3,7 +3,6 @@ use super::proto::CaItem; use super::proto::CaMsg; use super::proto::CaMsgTy; use super::proto::CaProto; -use super::store::DataStore; use super::ExtraInsertsConf; use crate::batchquery::series_by_channel::ChannelInfoQuery; use crate::bsread::ChannelDescDecoded; @@ -12,17 +11,6 @@ use crate::ca::proto::EventAdd; use crate::series::ChannelStatusSeriesId; use crate::series::Existence; use crate::series::SeriesId; -use crate::store::ChannelInfoItem; -use crate::store::ChannelStatus; -use crate::store::ChannelStatusClosedReason; -use crate::store::ChannelStatusItem; -use crate::store::CommonInsertItemQueueSender; -use crate::store::ConnectionStatus; -use crate::store::ConnectionStatusItem; -use crate::store::InsertItem; -use crate::store::IvlItem; -use crate::store::MuteItem; -use crate::store::QueryItem; use crate::timebin::ConnTimeBin; use async_channel::Sender; use err::Error; @@ -37,6 +25,19 @@ use netpod::ScalarType; use netpod::Shape; use netpod::TS_MSP_GRID_SPACING; use netpod::TS_MSP_GRID_UNIT; +use scywr::iteminsertqueue as scywriiq; +use scywr::store::DataStore; +use scywriiq::ChannelInfoItem; +use scywriiq::ChannelStatus; +use scywriiq::ChannelStatusClosedReason; +use scywriiq::ChannelStatusItem; +use scywriiq::CommonInsertItemQueueSender; +use scywriiq::ConnectionStatus; +use scywriiq::ConnectionStatusItem; +use scywriiq::InsertItem; +use scywriiq::IvlItem; +use scywriiq::MuteItem; +use scywriiq::QueryItem; use serde::Serialize; use stats::CaConnStats; use stats::IntervalEma; @@ -763,7 +764,7 @@ impl CaConn { ChannelState::Created(series, ..) => { let item = QueryItem::ChannelStatus(ChannelStatusItem { ts: SystemTime::now(), - series: series.clone(), + series: series.into(), status: ChannelStatus::Closed(channel_reason.clone()), }); self.insert_item_queue.push_back(item); @@ -896,7 +897,7 @@ impl CaConn { st.info_store_msp_last = msp; let item = QueryItem::ChannelInfo(ChannelInfoItem { ts_msp: msp, - series: series.clone(), + series: series.into(), ivl: st.item_recv_ivl_ema.ema().ema(), interest: 0., evsize: 0, @@ -1029,7 +1030,7 @@ impl CaConn { { let item = QueryItem::ChannelStatus(ChannelStatusItem { ts: SystemTime::now(), - series: series.clone().into_inner(), + series: series.clone().into_inner().into(), status: ChannelStatus::Opened, }); self.insert_item_queue.push_back(item); @@ -1080,14 +1081,14 @@ impl CaConn { }; let ts_lsp = ts - ts_msp; let item = InsertItem { - series, + series: series.into(), ts_msp, ts_lsp, msp_bump: ts_msp_changed, pulse: 0, scalar_type, shape, - val: ev.value.data, + val: ev.value.data.into(), ts_msp_grid, }; item_queue.push_back(QueryItem::Insert(item)); @@ -1276,7 +1277,7 @@ impl CaConn { st.insert_recv_ivl_last = tsnow; let ema = st.insert_item_ivl_ema.ema(); let item = IvlItem { - series: series.clone(), + series: (&series).into(), ts, ema: ema.ema(), emd: ema.emv().sqrt(), @@ -1286,7 +1287,7 @@ impl CaConn { if false && st.muted_before == 0 { let ema = st.insert_item_ivl_ema.ema(); let item = MuteItem { - series, + series: series.into(), ts, ema: ema.ema(), emd: ema.emv().sqrt(), diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index e17f9c8..a5c619c 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -1,7 +1,6 @@ use super::conn::CaConnEvent; use super::conn::ChannelSetOps; use super::conn::ConnCommand; -use super::store::DataStore; use super::SlowWarnable; use crate::batchquery::series_by_channel::ChannelInfoQuery; use crate::ca::conn::CaConn; @@ -10,14 +9,15 @@ use crate::errconv::ErrConv; use crate::rt::JoinHandle; use crate::rt::TokMx; use crate::series::ChannelStatusSeriesId; -use crate::store::CommonInsertItemQueue; -use crate::store::CommonInsertItemQueueSender; use async_channel::Receiver; use async_channel::Sender; use err::Error; use futures_util::FutureExt; use futures_util::StreamExt; use netpod::log::*; +use scywr::iteminsertqueue::CommonInsertItemQueue; +use scywr::iteminsertqueue::CommonInsertItemQueueSender; +use scywr::store::DataStore; use stats::CaConnStats; use std::collections::BTreeMap; use std::collections::VecDeque; diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index efff650..802d58d 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -209,6 +209,22 @@ pub enum CaDataScalarValue { Bool(bool), } +impl From for scywr::iteminsertqueue::ScalarValue { + fn from(val: CaDataScalarValue) -> Self { + use scywr::iteminsertqueue::ScalarValue; + match val { + CaDataScalarValue::I8(x) => ScalarValue::I8(x), + CaDataScalarValue::I16(x) => ScalarValue::I16(x), + CaDataScalarValue::I32(x) => ScalarValue::I32(x), + CaDataScalarValue::F32(x) => ScalarValue::F32(x), + CaDataScalarValue::F64(x) => ScalarValue::F64(x), + CaDataScalarValue::Enum(x) => ScalarValue::Enum(x), + CaDataScalarValue::String(x) => ScalarValue::String(x), + CaDataScalarValue::Bool(x) => ScalarValue::Bool(x), + } + } +} + pub trait GetValHelp { type ScalTy: Clone; fn get(&self) -> Result<&Self::ScalTy, Error>; @@ -305,12 +321,36 @@ pub enum CaDataArrayValue { Bool(Vec), } +impl From for scywr::iteminsertqueue::ArrayValue { + fn from(val: CaDataArrayValue) -> Self { + use scywr::iteminsertqueue::ArrayValue; + match val { + CaDataArrayValue::I8(x) => ArrayValue::I8(x), + CaDataArrayValue::I16(x) => ArrayValue::I16(x), + CaDataArrayValue::I32(x) => ArrayValue::I32(x), + CaDataArrayValue::F32(x) => ArrayValue::F32(x), + CaDataArrayValue::F64(x) => ArrayValue::F64(x), + CaDataArrayValue::Bool(x) => ArrayValue::Bool(x), + } + } +} + #[derive(Clone, Debug)] pub enum CaDataValue { Scalar(CaDataScalarValue), Array(CaDataArrayValue), } +impl From for scywr::iteminsertqueue::DataValue { + fn from(value: CaDataValue) -> Self { + use scywr::iteminsertqueue::DataValue; + match value { + CaDataValue::Scalar(x) => DataValue::Scalar(x.into()), + CaDataValue::Array(x) => DataValue::Array(x.into()), + } + } +} + #[derive(Clone, Debug)] pub struct CaEventValue { pub ts: Option, diff --git a/netfetch/src/channelwriter.rs b/netfetch/src/channelwriter.rs deleted file mode 100644 index 6f16294..0000000 --- a/netfetch/src/channelwriter.rs +++ /dev/null @@ -1,615 +0,0 @@ -use crate::errconv::ErrConv; -use crate::zmtp::zmtpproto::ZmtpFrame; -use err::Error; -use futures_util::Future; -use futures_util::FutureExt; -use log::*; -use netpod::timeunits::SEC; -use netpod::ByteOrder; -use netpod::ScalarType; -use netpod::Shape; -use std::mem; -use std::pin::Pin; -use std::sync::Arc; -use std::task::Context; -use std::task::Poll; -use std::time::Duration; -use std::time::Instant; - -pub struct ChannelWriteRes { - pub nrows: u32, - pub dt: Duration, -} - -pub struct ChannelWriteFut<'a> { - nn: usize, - fut1: Option> + Send + 'a>>>, - fut2: Option> + Send + 'a>>>, - ts1: Option, - mask: u8, -} - -impl<'a> Future for ChannelWriteFut<'a> { - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - use Poll::*; - loop { - break if self.ts1.is_none() { - self.ts1 = Some(Instant::now()); - continue; - } else if let Some(f) = self.fut1.as_mut() { - match f.poll_unpin(cx) { - Ready(k) => { - trace!("ChannelWriteFut fut1 Ready"); - self.fut1 = None; - self.mask |= 1; - match k { - Ok(_) => continue, - Err(e) => Ready(Err(e)), - } - } - Pending => Pending, - } - } else if let Some(f) = self.fut2.as_mut() { - match f.poll_unpin(cx) { - Ready(k) => { - trace!("ChannelWriteFut fut2 Ready"); - self.fut2 = None; - self.mask |= 2; - match k { - Ok(_) => continue, - Err(e) => Ready(Err(e)), - } - } - Pending => Pending, - } - } else { - if self.mask != 0 { - let ts2 = Instant::now(); - let dt = ts2.duration_since(self.ts1.unwrap()); - if false { - trace!( - "ChannelWriteFut inserted nn {} dt {:6.2} ms", - self.nn, - dt.as_secs_f32() * 1e3 - ); - } - let res = ChannelWriteRes { - nrows: self.nn as u32, - dt, - }; - Ready(Ok(res)) - } else { - let res = ChannelWriteRes { - nrows: 0, - dt: Duration::from_millis(0), - }; - Ready(Ok(res)) - } - }; - } - } -} - -pub trait ChannelWriter { - fn write_msg(&mut self, ts: u64, pulse: u64, fr: &ZmtpFrame) -> Result; -} - -struct MsgAcceptorOptions { - skip_insert: bool, - array_truncate: usize, -} - -trait MsgAcceptor { - fn len(&self) -> usize; - fn accept(&mut self, ts_msp: i64, ts_lsp: i64, pulse: i64, fr: &ZmtpFrame) -> Result<(), Error>; - fn should_flush(&self) -> bool; - fn flush_loop<'a>(&'a mut self, scy: &'a ScySession) -> Result, Error>; - fn flush_batch<'a>(&'a mut self, scy: &'a ScySession) -> Result, Error>; -} - -macro_rules! impl_msg_acceptor_scalar { - ($sname:ident, $st:ty, $qu_id:ident, $from_bytes:ident) => { - struct $sname { - //query: PreparedStatement, - values: Vec<(i64, i64, i64, i64, $st)>, - series: i64, - opts: MsgAcceptorOptions, - batch: Batch, - } - - impl $sname { - pub fn new(series: i64, opts: MsgAcceptorOptions) -> Self { - Self { - //query: opts.cq.$qu_id.clone(), - values: Vec::new(), - series, - opts, - batch: Batch::new((BatchType::Unlogged)), - } - } - } - - impl MsgAcceptor for $sname { - fn len(&self) -> usize { - self.values.len() - } - - fn accept(&mut self, ts_msp: i64, ts_lsp: i64, pulse: i64, fr: &ZmtpFrame) -> Result<(), Error> { - type ST = $st; - const STL: usize = std::mem::size_of::(); - let data = fr.data(); - if data.len() < STL { - return Err(Error::with_msg_no_trace(format!( - "data frame too small for type: {} vs {}", - data.len(), - STL - ))); - } - let a = data[..STL].try_into()?; - let value = ST::$from_bytes(a); - self.values.push((self.series, ts_msp, ts_lsp, pulse, value)); - Ok(()) - } - - fn should_flush(&self) -> bool { - self.len() >= 140 + ((self.series as usize) & 0x1f) - } - - fn flush_batch<'a>(&'a mut self, scy: &'a ScySession) -> Result, Error> { - let vt = mem::replace(&mut self.values, Vec::new()); - let nn = vt.len(); - self.batch = Batch::new(BatchType::Unlogged); - let batch = &mut self.batch; - for _ in 0..nn { - //batch.append_statement(self.query.clone()); - } - let ret = ScyBatchFutGen::new(&scy, batch, vt); - Ok(ret) - } - - fn flush_loop<'a>(&'a mut self, scy: &'a ScySession) -> Result, Error> { - let vt = mem::replace(&mut self.values, Vec::new()); - let ret = InsertLoopFut::new(scy, None, vt, self.opts.skip_insert); - Ok(ret) - } - } - }; -} - -macro_rules! impl_msg_acceptor_array { - ($sname:ident, $st:ty, $qu_id:ident, $from_bytes:ident) => { - struct $sname { - //query: PreparedStatement, - values: Vec<(i64, i64, i64, i64, Vec<$st>)>, - series: i64, - array_truncate: usize, - truncated: usize, - opts: MsgAcceptorOptions, - batch: Batch, - } - - impl $sname { - pub fn new(series: i64, opts: MsgAcceptorOptions) -> Self { - Self { - //query: opts.cq.$qu_id.clone(), - values: Vec::new(), - series, - array_truncate: opts.array_truncate, - truncated: 0, - opts, - batch: Batch::new(BatchType::Unlogged), - } - } - } - - impl MsgAcceptor for $sname { - fn len(&self) -> usize { - self.values.len() - } - - fn accept(&mut self, ts_msp: i64, ts_lsp: i64, pulse: i64, fr: &ZmtpFrame) -> Result<(), Error> { - type ST = $st; - const STL: usize = std::mem::size_of::(); - let vc2 = (fr.data().len() / STL); - let vc = vc2.min(self.array_truncate); - if vc != vc2 { - self.truncated = self.truncated.saturating_add(1); - } - let mut values = Vec::with_capacity(vc); - for i in 0..vc { - let h = i * STL; - let value = ST::$from_bytes(fr.data()[h..h + STL].try_into()?); - values.push(value); - } - self.values.push((self.series, ts_msp, ts_lsp, pulse, values)); - Ok(()) - } - - fn should_flush(&self) -> bool { - self.len() >= 40 + ((self.series as usize) & 0x7) - } - - fn flush_batch<'a>(&'a mut self, scy: &'a ScySession) -> Result, Error> { - let vt = mem::replace(&mut self.values, Vec::new()); - let nn = vt.len(); - self.batch = Batch::new(BatchType::Unlogged); - let batch = &mut self.batch; - for _ in 0..nn { - //batch.append_statement(self.query.clone()); - } - let ret = ScyBatchFutGen::new(&scy, batch, vt); - Ok(ret) - } - - fn flush_loop<'a>(&'a mut self, scy: &'a ScySession) -> Result, Error> { - let vt = mem::replace(&mut self.values, Vec::new()); - let ret = InsertLoopFut::new(scy, None, vt, self.opts.skip_insert); - Ok(ret) - } - } - }; -} - -impl_msg_acceptor_scalar!(MsgAcceptorScalarU16LE, i16, qu_insert_scalar_i16, from_le_bytes); -impl_msg_acceptor_scalar!(MsgAcceptorScalarU16BE, i16, qu_insert_scalar_i16, from_be_bytes); -impl_msg_acceptor_scalar!(MsgAcceptorScalarU32LE, i32, qu_insert_scalar_i32, from_le_bytes); -impl_msg_acceptor_scalar!(MsgAcceptorScalarU32BE, i32, qu_insert_scalar_i32, from_be_bytes); -impl_msg_acceptor_scalar!(MsgAcceptorScalarI16LE, i16, qu_insert_scalar_i16, from_le_bytes); -impl_msg_acceptor_scalar!(MsgAcceptorScalarI16BE, i16, qu_insert_scalar_i16, from_be_bytes); -impl_msg_acceptor_scalar!(MsgAcceptorScalarI32LE, i32, qu_insert_scalar_i32, from_le_bytes); -impl_msg_acceptor_scalar!(MsgAcceptorScalarI32BE, i32, qu_insert_scalar_i32, from_be_bytes); -impl_msg_acceptor_scalar!(MsgAcceptorScalarF32LE, f32, qu_insert_scalar_f32, from_le_bytes); -impl_msg_acceptor_scalar!(MsgAcceptorScalarF32BE, f32, qu_insert_scalar_f32, from_be_bytes); -impl_msg_acceptor_scalar!(MsgAcceptorScalarF64LE, f64, qu_insert_scalar_f64, from_le_bytes); -impl_msg_acceptor_scalar!(MsgAcceptorScalarF64BE, f64, qu_insert_scalar_f64, from_be_bytes); - -impl_msg_acceptor_array!(MsgAcceptorArrayU16LE, i16, qu_insert_array_u16, from_le_bytes); -impl_msg_acceptor_array!(MsgAcceptorArrayU16BE, i16, qu_insert_array_u16, from_be_bytes); -impl_msg_acceptor_array!(MsgAcceptorArrayI16LE, i16, qu_insert_array_i16, from_le_bytes); -impl_msg_acceptor_array!(MsgAcceptorArrayI16BE, i16, qu_insert_array_i16, from_be_bytes); -impl_msg_acceptor_array!(MsgAcceptorArrayI32LE, i32, qu_insert_array_i32, from_le_bytes); -impl_msg_acceptor_array!(MsgAcceptorArrayI32BE, i32, qu_insert_array_i32, from_be_bytes); -impl_msg_acceptor_array!(MsgAcceptorArrayF32LE, f32, qu_insert_array_f32, from_le_bytes); -impl_msg_acceptor_array!(MsgAcceptorArrayF32BE, f32, qu_insert_array_f32, from_be_bytes); -impl_msg_acceptor_array!(MsgAcceptorArrayF64LE, f64, qu_insert_array_f64, from_le_bytes); -impl_msg_acceptor_array!(MsgAcceptorArrayF64BE, f64, qu_insert_array_f64, from_be_bytes); - -struct MsgAcceptorArrayBool { - //query: PreparedStatement, - values: Vec<(i64, i64, i64, i64, Vec)>, - series: i64, - array_truncate: usize, - truncated: usize, - opts: MsgAcceptorOptions, - batch: Batch, -} - -impl MsgAcceptorArrayBool { - pub fn new(series: i64, opts: MsgAcceptorOptions) -> Self { - Self { - //query: opts.cq.qu_insert_array_bool.clone(), - values: Vec::new(), - series, - array_truncate: opts.array_truncate, - truncated: 0, - opts, - batch: Batch::new(BatchType::Unlogged), - } - } -} - -impl MsgAcceptor for MsgAcceptorArrayBool { - fn len(&self) -> usize { - self.values.len() - } - - fn accept(&mut self, ts_msp: i64, ts_lsp: i64, pulse: i64, fr: &ZmtpFrame) -> Result<(), Error> { - type ST = bool; - const STL: usize = std::mem::size_of::(); - let vc = fr.data().len() / STL; - let mut values = Vec::with_capacity(vc); - for i in 0..vc { - let h = i * STL; - let value = u8::from_le_bytes(fr.data()[h..h + STL].try_into()?); - values.push(value); - } - if values.len() > self.array_truncate { - if self.truncated < 10 { - warn!( - "truncate {} to {} for series {}", - values.len(), - self.array_truncate, - self.series - ); - } - values.truncate(self.array_truncate); - self.truncated = self.truncated.saturating_add(1); - } - let values = values.into_iter().map(|x| x != 0).collect(); - self.values.push((self.series, ts_msp, ts_lsp, pulse, values)); - Ok(()) - } - - fn should_flush(&self) -> bool { - self.len() >= 40 + ((self.series as usize) & 0x7) - } - - fn flush_batch<'a>(&'a mut self, scy: &'a ScySession) -> Result { - let vt = mem::replace(&mut self.values, Vec::new()); - let nn = vt.len(); - self.batch = Batch::new(BatchType::Unlogged); - let batch = &mut self.batch; - for _ in 0..nn { - //batch.append_statement(self.query.clone()); - } - let ret = ScyBatchFutGen::new(&scy, batch, vt); - Ok(ret) - } - - fn flush_loop<'a>(&'a mut self, scy: &'a ScySession) -> Result, Error> { - let vt = mem::replace(&mut self.values, Vec::new()); - let ret = InsertLoopFut::new(scy, None, vt, self.opts.skip_insert); - Ok(ret) - } -} - -pub struct ChannelWriterAll { - series: u64, - scy: Arc, - ts_msp_lsp: fn(u64, u64) -> (u64, u64), - ts_msp_last: u64, - acceptor: Box, - #[allow(unused)] - scalar_type: ScalarType, - #[allow(unused)] - shape: Shape, - pulse_last: u64, - #[allow(unused)] - skip_insert: bool, -} - -impl ChannelWriterAll { - pub fn new( - series: u64, - scy: Arc, - scalar_type: ScalarType, - shape: Shape, - byte_order: ByteOrder, - array_truncate: usize, - skip_insert: bool, - ) -> Result { - let opts = MsgAcceptorOptions { - skip_insert, - array_truncate, - }; - let (ts_msp_lsp, acc): (fn(u64, u64) -> (u64, u64), Box) = match &shape { - Shape::Scalar => match &scalar_type { - ScalarType::U16 => match &byte_order { - ByteOrder::Little => { - let acc = MsgAcceptorScalarU16LE::new(series as i64, opts); - (ts_msp_lsp_1, Box::new(acc) as _) - } - ByteOrder::Big => { - let acc = MsgAcceptorScalarU16BE::new(series as i64, opts); - (ts_msp_lsp_1, Box::new(acc) as _) - } - }, - ScalarType::U32 => match &byte_order { - ByteOrder::Little => { - let acc = MsgAcceptorScalarU32LE::new(series as i64, opts); - (ts_msp_lsp_1, Box::new(acc) as _) - } - ByteOrder::Big => { - let acc = MsgAcceptorScalarU32BE::new(series as i64, opts); - (ts_msp_lsp_1, Box::new(acc) as _) - } - }, - ScalarType::I16 => match &byte_order { - ByteOrder::Little => { - let acc = MsgAcceptorScalarI16LE::new(series as i64, opts); - (ts_msp_lsp_1, Box::new(acc) as _) - } - ByteOrder::Big => { - let acc = MsgAcceptorScalarI16BE::new(series as i64, opts); - (ts_msp_lsp_1, Box::new(acc) as _) - } - }, - ScalarType::I32 => match &byte_order { - ByteOrder::Little => { - let acc = MsgAcceptorScalarI32LE::new(series as i64, opts); - (ts_msp_lsp_1, Box::new(acc) as _) - } - ByteOrder::Big => { - let acc = MsgAcceptorScalarI32BE::new(series as i64, opts); - (ts_msp_lsp_1, Box::new(acc) as _) - } - }, - ScalarType::F32 => match &byte_order { - ByteOrder::Little => { - let acc = MsgAcceptorScalarF32LE::new(series as i64, opts); - (ts_msp_lsp_1, Box::new(acc) as _) - } - ByteOrder::Big => { - let acc = MsgAcceptorScalarF32BE::new(series as i64, opts); - (ts_msp_lsp_1, Box::new(acc) as _) - } - }, - ScalarType::F64 => match &byte_order { - ByteOrder::Little => { - let acc = MsgAcceptorScalarF64LE::new(series as i64, opts); - (ts_msp_lsp_1, Box::new(acc) as _) - } - ByteOrder::Big => { - let acc = MsgAcceptorScalarF64BE::new(series as i64, opts); - (ts_msp_lsp_1, Box::new(acc) as _) - } - }, - _ => { - return Err(Error::with_msg_no_trace(format!( - "TODO {:?} {:?} {:?}", - scalar_type, shape, byte_order - ))); - } - }, - Shape::Wave(nele) => { - info!("set up wave acceptor nele {nele}"); - match &scalar_type { - ScalarType::BOOL => match &byte_order { - _ => { - let acc = MsgAcceptorArrayBool::new(series as i64, opts); - (ts_msp_lsp_2, Box::new(acc) as _) - } - }, - ScalarType::U16 => match &byte_order { - ByteOrder::Little => { - let acc = MsgAcceptorArrayU16LE::new(series as i64, opts); - (ts_msp_lsp_2, Box::new(acc) as _) - } - ByteOrder::Big => { - let acc = MsgAcceptorArrayU16BE::new(series as i64, opts); - (ts_msp_lsp_2, Box::new(acc) as _) - } - }, - ScalarType::I16 => match &byte_order { - ByteOrder::Little => { - let acc = MsgAcceptorArrayI16LE::new(series as i64, opts); - (ts_msp_lsp_2, Box::new(acc) as _) - } - ByteOrder::Big => { - let acc = MsgAcceptorArrayI16BE::new(series as i64, opts); - (ts_msp_lsp_2, Box::new(acc) as _) - } - }, - ScalarType::I32 => match &byte_order { - ByteOrder::Little => { - let acc = MsgAcceptorArrayI32LE::new(series as i64, opts); - (ts_msp_lsp_2, Box::new(acc) as _) - } - ByteOrder::Big => { - let acc = MsgAcceptorArrayI32BE::new(series as i64, opts); - (ts_msp_lsp_2, Box::new(acc) as _) - } - }, - ScalarType::F32 => match &byte_order { - ByteOrder::Little => { - let acc = MsgAcceptorArrayF32LE::new(series as i64, opts); - (ts_msp_lsp_2, Box::new(acc) as _) - } - ByteOrder::Big => { - let acc = MsgAcceptorArrayF32BE::new(series as i64, opts); - (ts_msp_lsp_2, Box::new(acc) as _) - } - }, - ScalarType::F64 => match &byte_order { - ByteOrder::Little => { - let acc = MsgAcceptorArrayF64LE::new(series as i64, opts); - (ts_msp_lsp_2, Box::new(acc) as _) - } - ByteOrder::Big => { - let acc = MsgAcceptorArrayF64BE::new(series as i64, opts); - (ts_msp_lsp_2, Box::new(acc) as _) - } - }, - _ => { - return Err(Error::with_msg_no_trace(format!( - "TODO {:?} {:?} {:?}", - scalar_type, shape, byte_order - ))); - } - } - } - _ => { - return Err(Error::with_msg_no_trace(format!( - "TODO {:?} {:?} {:?}", - scalar_type, shape, byte_order - ))); - } - }; - let ret = Self { - series, - scy, - ts_msp_lsp, - ts_msp_last: 0, - acceptor: acc, - scalar_type, - shape, - pulse_last: 0, - skip_insert, - }; - Ok(ret) - } - - pub fn write_msg_impl(&mut self, ts: u64, pulse: u64, fr: &ZmtpFrame) -> Result { - // TODO limit log rate - // TODO for many channels, it's normal to have gaps. - if false && pulse != 0 && pulse != self.pulse_last + 1 { - let gap = pulse as i64 - self.pulse_last as i64; - warn!("GAP series {} pulse {} gap {}", self.series, pulse, gap); - } - self.pulse_last = pulse; - let (ts_msp, ts_lsp) = (self.ts_msp_lsp)(ts, self.series); - let fut1 = if ts_msp != self.ts_msp_last { - debug!("ts_msp changed ts {ts} pulse {pulse} ts_msp {ts_msp} ts_lsp {ts_lsp}"); - self.ts_msp_last = ts_msp; - if !self.skip_insert { - let fut = ScyQueryFut::new(&self.scy, None, (self.series as i64, ts_msp as i64)); - Some(Box::pin(fut) as _) - } else { - None - } - } else { - None - }; - self.acceptor.accept(ts_msp as i64, ts_lsp as i64, pulse as i64, fr)?; - if self.acceptor.should_flush() { - let nn = self.acceptor.len(); - let fut = self.acceptor.flush_loop(&self.scy)?; - let fut2 = Some(Box::pin(fut) as _); - let ret = ChannelWriteFut { - ts1: None, - mask: 0, - nn, - fut1, - fut2, - }; - Ok(ret) - } else { - let ret = ChannelWriteFut { - ts1: None, - mask: 0, - nn: 0, - fut1, - fut2: None, - }; - Ok(ret) - } - } -} - -impl ChannelWriter for ChannelWriterAll { - fn write_msg(&mut self, ts: u64, pulse: u64, fr: &ZmtpFrame) -> Result { - self.write_msg_impl(ts, pulse, fr) - } -} - -fn ts_msp_lsp_1(ts: u64, series: u64) -> (u64, u64) { - ts_msp_lsp_gen(ts, series, 100 * SEC) -} - -fn ts_msp_lsp_2(ts: u64, series: u64) -> (u64, u64) { - ts_msp_lsp_gen(ts, series, 10 * SEC) -} - -fn ts_msp_lsp_gen(ts: u64, series: u64, fak: u64) -> (u64, u64) { - if ts < u32::MAX as u64 { - return (0, 0); - } - let off = series & 0xffffffff; - let ts_a = ts - off; - let ts_b = ts_a / fak; - let ts_lsp = ts_a % fak; - let ts_msp = ts_b * fak + off; - (ts_msp, ts_lsp) -} diff --git a/netfetch/src/errconv.rs b/netfetch/src/errconv.rs index 5fbfb94..075a955 100644 --- a/netfetch/src/errconv.rs +++ b/netfetch/src/errconv.rs @@ -1,7 +1,10 @@ -use async_channel::{RecvError, SendError}; +use async_channel::RecvError; +use async_channel::SendError; use err::Error; use scylla::transport::errors::QueryError; -use scylla::transport::query_result::{FirstRowError, RowsExpectedError}; +use scylla::transport::query_result::FirstRowError; +use scylla::transport::query_result::RowsExpectedError; +use scywr::scylla; pub trait ErrConv { fn err_conv(self) -> Result; diff --git a/netfetch/src/insertworker.rs b/netfetch/src/insertworker.rs index 8ebf1a7..8b13789 100644 --- a/netfetch/src/insertworker.rs +++ b/netfetch/src/insertworker.rs @@ -1,319 +1 @@ -use crate::ca::store::DataStore; -use crate::ca::IngestCommons; -use crate::rt::JoinHandle; -use crate::store::CommonInsertItemQueue; -use crate::store::IntoSimplerError; -use crate::store::QueryItem; -use err::Error; -use log::*; -use netpod::timeunits::MS; -use netpod::timeunits::SEC; -use netpod::ScyllaConfig; -use std::sync::atomic; -use std::sync::atomic::Ordering; -use std::sync::Arc; -use std::time::Duration; -use std::time::Instant; -use taskrun::tokio; -use tokio_postgres::Client as PgClient; -fn stats_inc_for_err(stats: &stats::CaConnStats, err: &crate::store::Error) { - use crate::store::Error; - match err { - Error::DbOverload => { - stats.store_worker_insert_overload_inc(); - } - Error::DbTimeout => { - stats.store_worker_insert_timeout_inc(); - } - Error::DbUnavailable => { - stats.store_worker_insert_unavailable_inc(); - } - Error::DbError(e) => { - warn!("db error {e}"); - stats.store_worker_insert_error_inc(); - } - } -} - -fn back_off_next(backoff_dt: &mut Duration) { - *backoff_dt = *backoff_dt + (*backoff_dt) * 3 / 2; - let dtmax = Duration::from_millis(4000); - if *backoff_dt > dtmax { - *backoff_dt = dtmax; - } -} - -async fn back_off_sleep(backoff_dt: &mut Duration) { - back_off_next(backoff_dt); - tokio::time::sleep(*backoff_dt).await; -} - -#[derive(Debug, Clone)] -pub struct Ttls { - pub index: Duration, - pub d0: Duration, - pub d1: Duration, - pub binned: Duration, -} - -pub async fn spawn_scylla_insert_workers( - scyconf: ScyllaConfig, - insert_scylla_sessions: usize, - insert_worker_count: usize, - insert_item_queue: Arc, - ingest_commons: Arc, - _pg_client: Arc, - store_stats: Arc, - use_rate_limit_queue: bool, - ttls: Ttls, -) -> Result>, Error> { - let (q2_tx, q2_rx) = async_channel::bounded( - insert_item_queue - .receiver() - .map_or(20000, |x| x.capacity().unwrap_or(20000)), - ); - { - let ingest_commons = ingest_commons.clone(); - let stats = store_stats.clone(); - let recv = insert_item_queue - .receiver() - .ok_or_else(|| Error::with_msg_no_trace("can not derive insert queue receiver"))?; - let store_stats = store_stats.clone(); - let fut = async move { - if !use_rate_limit_queue { - return; - } - let mut ts_forward_last = Instant::now(); - let mut ivl_ema = stats::Ema64::with_k(0.00001); - loop { - let item = if let Ok(x) = recv.recv().await { - x - } else { - break; - }; - let ts_received = Instant::now(); - let allowed_to_drop = match &item { - QueryItem::Insert(_) => true, - _ => false, - }; - let dt_min = { - let rate = ingest_commons.store_workers_rate.load(Ordering::Acquire); - Duration::from_nanos(SEC / rate) - }; - let mut ema2 = ivl_ema.clone(); - { - let dt = ts_received.duration_since(ts_forward_last); - let dt_ns = SEC * dt.as_secs() + dt.subsec_nanos() as u64; - ema2.update(dt_ns.min(MS * 100) as f32); - } - let ivl2 = Duration::from_nanos(ema2.ema() as u64); - if allowed_to_drop && ivl2 < dt_min { - //tokio::time::sleep_until(ts_recv_last.checked_add(dt_min).unwrap().into()).await; - stats.store_worker_ratelimit_drop_inc(); - } else { - if q2_tx.send(item).await.is_err() { - break; - } else { - let tsnow = Instant::now(); - let dt = tsnow.duration_since(ts_forward_last); - let dt_ns = SEC * dt.as_secs() + dt.subsec_nanos() as u64; - ivl_ema.update(dt_ns.min(MS * 100) as f32); - ts_forward_last = tsnow; - store_stats.inter_ivl_ema.store(ivl_ema.ema() as u64, Ordering::Release); - } - } - } - info!("intermediate queue done"); - }; - tokio::spawn(fut); - } - - let mut jhs = Vec::new(); - let mut data_stores = Vec::new(); - for _ in 0..insert_scylla_sessions { - let data_store = Arc::new(DataStore::new(&scyconf).await.map_err(|e| Error::from(e.to_string()))?); - data_stores.push(data_store); - } - for worker_ix in 0..insert_worker_count { - let data_store = data_stores[worker_ix * data_stores.len() / insert_worker_count].clone(); - let stats = store_stats.clone(); - let recv = if use_rate_limit_queue { - q2_rx.clone() - } else { - insert_item_queue - .receiver() - .ok_or_else(|| Error::with_msg_no_trace("can not derive receiver"))? - }; - let ingest_commons = ingest_commons.clone(); - let fut = async move { - ingest_commons - .insert_workers_running - .fetch_add(1, atomic::Ordering::AcqRel); - let backoff_0 = Duration::from_millis(10); - let mut backoff = backoff_0.clone(); - let mut i1 = 0; - loop { - let item = if let Ok(item) = recv.recv().await { - stats.store_worker_item_recv_inc(); - item - } else { - break; - }; - match item { - QueryItem::ConnectionStatus(item) => { - match crate::store::insert_connection_status(item, ttls.index, &data_store, &stats).await { - Ok(_) => { - stats.connection_status_insert_done_inc(); - backoff = backoff_0; - } - Err(e) => { - stats_inc_for_err(&stats, &e); - back_off_sleep(&mut backoff).await; - } - } - } - QueryItem::ChannelStatus(item) => { - match crate::store::insert_channel_status(item, ttls.index, &data_store, &stats).await { - Ok(_) => { - stats.channel_status_insert_done_inc(); - backoff = backoff_0; - } - Err(e) => { - stats_inc_for_err(&stats, &e); - back_off_sleep(&mut backoff).await; - } - } - } - QueryItem::Insert(item) => { - let insert_frac = ingest_commons.insert_frac.load(Ordering::Acquire); - if i1 % 1000 < insert_frac { - match crate::store::insert_item(item, ttls.index, ttls.d0, ttls.d1, &data_store, &stats) - .await - { - Ok(_) => { - stats.store_worker_insert_done_inc(); - backoff = backoff_0; - } - Err(e) => { - stats_inc_for_err(&stats, &e); - back_off_sleep(&mut backoff).await; - } - } - } else { - stats.store_worker_fraction_drop_inc(); - } - i1 += 1; - } - QueryItem::Mute(item) => { - let values = ( - (item.series.id() & 0xff) as i32, - item.series.id() as i64, - item.ts as i64, - item.ema, - item.emd, - ttls.index.as_secs() as i32, - ); - let qres = data_store.scy.execute(&data_store.qu_insert_muted, values).await; - match qres { - Ok(_) => { - stats.mute_insert_done_inc(); - backoff = backoff_0; - } - Err(e) => { - let e = e.into_simpler(); - stats_inc_for_err(&stats, &e); - back_off_sleep(&mut backoff).await; - } - } - } - QueryItem::Ivl(item) => { - let values = ( - (item.series.id() & 0xff) as i32, - item.series.id() as i64, - item.ts as i64, - item.ema, - item.emd, - ttls.index.as_secs() as i32, - ); - let qres = data_store - .scy - .execute(&data_store.qu_insert_item_recv_ivl, values) - .await; - match qres { - Ok(_) => { - stats.ivl_insert_done_inc(); - backoff = backoff_0; - } - Err(e) => { - let e = e.into_simpler(); - stats_inc_for_err(&stats, &e); - back_off_sleep(&mut backoff).await; - } - } - } - QueryItem::ChannelInfo(item) => { - let params = ( - (item.series.id() & 0xff) as i32, - item.ts_msp as i32, - item.series.id() as i64, - item.ivl, - item.interest, - item.evsize as i32, - ttls.index.as_secs() as i32, - ); - let qres = data_store.scy.execute(&data_store.qu_insert_channel_ping, params).await; - match qres { - Ok(_) => { - stats.channel_info_insert_done_inc(); - backoff = backoff_0; - } - Err(e) => { - let e = e.into_simpler(); - stats_inc_for_err(&stats, &e); - back_off_sleep(&mut backoff).await; - } - } - } - QueryItem::TimeBinPatchSimpleF32(item) => { - info!("have time bin patch to insert: {item:?}"); - let params = ( - item.series.id() as i64, - item.bin_len_sec as i32, - item.bin_count as i32, - item.off_msp as i32, - item.off_lsp as i32, - item.counts, - item.mins, - item.maxs, - item.avgs, - ttls.binned.as_secs() as i32, - ); - let qres = data_store - .scy - .execute(&data_store.qu_insert_binned_scalar_f32_v01, params) - .await; - match qres { - Ok(_) => { - stats.store_worker_insert_binned_done_inc(); - backoff = backoff_0; - } - Err(e) => { - let e = e.into_simpler(); - stats_inc_for_err(&stats, &e); - back_off_sleep(&mut backoff).await; - } - } - } - } - } - ingest_commons - .insert_workers_running - .fetch_sub(1, atomic::Ordering::AcqRel); - trace!("insert worker {worker_ix} done"); - }; - let jh = tokio::spawn(fut); - jhs.push(jh); - } - Ok(jhs) -} diff --git a/netfetch/src/lib.rs b/netfetch/src/lib.rs index 0f9aeb5..b7fd74c 100644 --- a/netfetch/src/lib.rs +++ b/netfetch/src/lib.rs @@ -3,7 +3,6 @@ pub mod batchquery; pub mod bsread; pub mod bsreadclient; pub mod ca; -pub mod channelwriter; pub mod conf; pub mod daemon_common; pub mod dbpg; @@ -14,9 +13,7 @@ pub mod metrics; pub mod netbuf; pub mod patchcollect; pub mod rt; -pub mod scylla; pub mod series; -pub mod store; #[cfg(test)] pub mod test; pub mod timebin; diff --git a/netfetch/src/scylla.rs b/netfetch/src/scylla.rs deleted file mode 100644 index 2b9f204..0000000 --- a/netfetch/src/scylla.rs +++ /dev/null @@ -1,278 +0,0 @@ -use err::Error; -use futures_util::StreamExt; -#[allow(unused)] -use netpod::log::*; -use netpod::ScyllaConfig; -use std::sync::Arc; - -async fn check_table_exist(name: &str, scy: &Session) -> Result { - match scy.query(format!("select * from {} limit 1", name), ()).await { - Ok(_) => Ok(true), - Err(e) => match &e { - QueryError::DbError(e2, msg) => match e2 { - DbError::Invalid => { - if msg.contains("unconfigured table") { - Ok(false) - } else { - Err(Error::from(format!("{e}"))) - } - } - _ => Err(Error::from(format!("{e}"))), - }, - _ => Err(Error::from(format!("{e}"))), - }, - } -} - -async fn create_table_ts_msp(scy: &Session) -> Result<(), Error> { - use std::fmt::Write; - // seconds: - let default_time_to_live = 60 * 60 * 5; - // hours: - let twcs_window_index = 24 * 4; - let mut s = String::new(); - s.write_str("create table ts_msp (series bigint, ts_msp bigint, primary key (series, ts_msp))")?; - write!(s, " with default_time_to_live = {}", default_time_to_live)?; - s.write_str(" and compaction = { 'class': 'TimeWindowCompactionStrategy'")?; - s.write_str(", 'compaction_window_unit': 'HOURS'")?; - write!(s, ", 'compaction_window_size': {}", twcs_window_index)?; - s.write_str(" }")?; - scy.query(s, ()).await.map_err(|e| Error::from(format!("{e}")))?; - Ok(()) -} - -struct GenTwcsTab { - name: String, - cql: String, - default_time_to_live: usize, - compaction_window_size: usize, -} - -impl GenTwcsTab { - fn name(&self) -> String { - self.name.clone() - } - - fn cql(&self) -> String { - use std::fmt::Write; - let mut s = String::new(); - write!(s, "create table {}", self.name()).unwrap(); - s.write_str(&self.cql).unwrap(); - write!(s, " with default_time_to_live = {}", self.default_time_to_live).unwrap(); - s.write_str(" and compaction = { 'class': 'TimeWindowCompactionStrategy', 'compaction_window_unit': 'HOURS'") - .unwrap(); - write!(s, ", 'compaction_window_size': {}", self.compaction_window_size).unwrap(); - s.write_str(" }").unwrap(); - s - } -} - -struct EvTabDim0 { - sty: String, - cqlsty: String, - // SCYLLA_TTL_EVENTS_DIM0 - default_time_to_live: usize, - // TWCS_WINDOW_0D - compaction_window_size: usize, -} - -impl EvTabDim0 { - fn name(&self) -> String { - format!("events_scalar_{}", self.sty) - } - - fn cql(&self) -> String { - use std::fmt::Write; - let mut s = String::new(); - write!(s, "create table {}", self.name()).unwrap(); - write!(s, " (series bigint, ts_msp bigint, ts_lsp bigint, pulse bigint, value {}, primary key ((series, ts_msp), ts_lsp))", self.cqlsty).unwrap(); - write!(s, " with default_time_to_live = {}", self.default_time_to_live).unwrap(); - s.write_str(" and compaction = { 'class': 'TimeWindowCompactionStrategy', 'compaction_window_unit': 'HOURS'") - .unwrap(); - write!(s, ", 'compaction_window_size': {}", self.compaction_window_size).unwrap(); - s.write_str(" }").unwrap(); - s - } -} - -struct EvTabDim1 { - sty: String, - cqlsty: String, - // SCYLLA_TTL_EVENTS_DIM1 - default_time_to_live: usize, - // TWCS_WINDOW_1D - compaction_window_size: usize, -} - -impl EvTabDim1 { - fn name(&self) -> String { - format!("events_array_{}", self.sty) - } - - fn cql(&self) -> String { - use std::fmt::Write; - let mut s = String::new(); - write!(s, "create table {}", self.name()).unwrap(); - write!(s, " (series bigint, ts_msp bigint, ts_lsp bigint, pulse bigint, value {}, primary key ((series, ts_msp), ts_lsp))", self.cqlsty).unwrap(); - write!(s, " with default_time_to_live = {}", self.default_time_to_live).unwrap(); - s.write_str(" and compaction = { 'class': 'TimeWindowCompactionStrategy', 'compaction_window_unit': 'HOURS'") - .unwrap(); - write!(s, ", 'compaction_window_size': {}", self.compaction_window_size).unwrap(); - s.write_str(" }").unwrap(); - s - } -} - -async fn check_event_tables(scy: &Session) -> Result<(), Error> { - let stys = [ - "u8", "u16", "u32", "u64", "i8", "i16", "i32", "i64", "f32", "f64", "bool", "string", - ]; - let cqlstys = [ - "tinyint", "smallint", "int", "bigint", "tinyint", "smallint", "int", "bigint", "float", "double", "boolean", - "text", - ]; - for (sty, cqlsty) in stys.into_iter().zip(cqlstys) { - let desc = EvTabDim0 { - sty: sty.into(), - cqlsty: cqlsty.into(), - // ttl is set in actual data inserts - default_time_to_live: 60 * 60 * 1, - compaction_window_size: 48, - }; - if !check_table_exist(&desc.name(), scy).await? { - scy.query(desc.cql(), ()) - .await - .map_err(|e| Error::from(format!("{e}")))?; - } - let desc = EvTabDim1 { - sty: sty.into(), - cqlsty: format!("frozen>", cqlsty), - // ttl is set in actual data inserts - default_time_to_live: 60 * 60 * 1, - compaction_window_size: 12, - }; - if !check_table_exist(&desc.name(), scy).await? { - scy.query(desc.cql(), ()) - .await - .map_err(|e| Error::from(format!("{e}")))?; - } - } - Ok(()) -} - -pub async fn migrate_keyspace(scyconf: &ScyllaConfig) -> Result<(), Error> { - let scy2 = create_session(scyconf).await?; - let scy = &scy2; - if !check_table_exist("ts_msp", &scy).await? { - create_table_ts_msp(scy).await?; - } - check_event_tables(scy).await?; - { - let desc = GenTwcsTab { - name: "series_by_ts_msp".into(), - cql: "(part int, ts_msp int, shape_kind int, scalar_type int, series bigint, primary key ((part, ts_msp, shape_kind, scalar_type), series))".into(), - default_time_to_live: 60 * 60 * 5, - compaction_window_size: 24 * 4, - }; - if !check_table_exist(&desc.name(), scy).await? { - scy.query(desc.cql(), ()) - .await - .map_err(|e| Error::from(format!("{e}")))?; - } - } - { - let desc = GenTwcsTab { - name: "connection_status".into(), - cql: "(ts_msp bigint, ts_lsp bigint, kind int, addr text, primary key (ts_msp, ts_lsp))".into(), - default_time_to_live: 60 * 60 * 1, - compaction_window_size: 24 * 4, - }; - if !check_table_exist(&desc.name(), scy).await? { - scy.query(desc.cql(), ()) - .await - .map_err(|e| Error::from(format!("{e}")))?; - } - } - { - let desc = GenTwcsTab { - name: "channel_status".into(), - cql: "(series bigint, ts_msp bigint, ts_lsp bigint, kind int, primary key ((series, ts_msp), ts_lsp))" - .into(), - default_time_to_live: 60 * 60 * 1, - compaction_window_size: 24 * 4, - }; - if !check_table_exist(&desc.name(), scy).await? { - scy.query(desc.cql(), ()) - .await - .map_err(|e| Error::from(format!("{e}")))?; - } - } - { - let desc = GenTwcsTab { - name: "channel_status_by_ts_msp".into(), - cql: "(ts_msp bigint, ts_lsp bigint, series bigint, kind int, primary key (ts_msp, ts_lsp))".into(), - default_time_to_live: 60 * 60 * 1, - compaction_window_size: 24 * 4, - }; - if !check_table_exist(&desc.name(), scy).await? { - scy.query(desc.cql(), ()) - .await - .map_err(|e| Error::from(format!("{e}")))?; - } - } - { - let desc = GenTwcsTab { - name: "channel_ping".into(), - cql: "(part int, ts_msp int, series bigint, ivl float, interest float, evsize int, primary key ((part, ts_msp), series))" - .into(), - default_time_to_live: 60 * 60 * 1, - compaction_window_size: 24 * 4, - }; - if !check_table_exist(&desc.name(), scy).await? { - scy.query(desc.cql(), ()) - .await - .map_err(|e| Error::from(format!("{e}")))?; - } - } - { - let desc = GenTwcsTab { - name: "muted".into(), - cql: "(part int, series bigint, ts bigint, ema float, emd float, primary key (part, series, ts))".into(), - default_time_to_live: 60 * 60 * 4, - compaction_window_size: 24 * 1, - }; - if !check_table_exist(&desc.name(), scy).await? { - scy.query(desc.cql(), ()) - .await - .map_err(|e| Error::from(format!("{e}")))?; - } - } - { - let desc = GenTwcsTab { - name: "item_recv_ivl".into(), - cql: "(part int, series bigint, ts bigint, ema float, emd float, primary key (part, series, ts))".into(), - default_time_to_live: 60 * 60 * 4, - compaction_window_size: 24 * 1, - }; - if !check_table_exist(&desc.name(), scy).await? { - scy.query(desc.cql(), ()) - .await - .map_err(|e| Error::from(format!("{e}")))?; - } - } - { - let desc = GenTwcsTab { - name: "binned_scalar_f32_v01".into(), - cql: "(series bigint, bin_len_sec int, bin_count int, off_msp int, off_lsp int, counts frozen>, mins frozen>, maxs frozen>, avgs frozen>, primary key ((series, bin_len_sec, bin_count, off_msp), off_lsp))" - .into(), - default_time_to_live: 60 * 60 * 24 * 30, - compaction_window_size: 24 * 4, - }; - if !check_table_exist(&desc.name(), scy).await? { - scy.query(desc.cql(), ()) - .await - .map_err(|e| Error::from(format!("{e}")))?; - } - } - Ok(()) -} diff --git a/netfetch/src/series.rs b/netfetch/src/series.rs index 163a29d..b42bdf8 100644 --- a/netfetch/src/series.rs +++ b/netfetch/src/series.rs @@ -36,6 +36,24 @@ impl SeriesId { } } +impl From<&SeriesId> for scywr::iteminsertqueue::SeriesId { + fn from(value: &SeriesId) -> Self { + Self::new(value.id()) + } +} + +impl From<&mut SeriesId> for scywr::iteminsertqueue::SeriesId { + fn from(value: &mut SeriesId) -> Self { + Self::new(value.id()) + } +} + +impl From for scywr::iteminsertqueue::SeriesId { + fn from(value: SeriesId) -> Self { + Self::new(value.id()) + } +} + #[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize)] pub struct ChannelStatusSeriesId(u64); diff --git a/netfetch/src/timebin.rs b/netfetch/src/timebin.rs index 1f13356..d0c9d34 100644 --- a/netfetch/src/timebin.rs +++ b/netfetch/src/timebin.rs @@ -3,8 +3,6 @@ use crate::ca::proto::CaDataValue; use crate::ca::proto::CaEventValue; use crate::patchcollect::PatchCollect; use crate::series::SeriesId; -use crate::store::QueryItem; -use crate::store::TimeBinPatchSimpleF32; use err::Error; use items_0::scalar_ops::ScalarOps; use items_0::timebin::TimeBinner; @@ -21,6 +19,8 @@ use netpod::BinnedRangeEnum; use netpod::ScalarType; use netpod::Shape; use netpod::TsNano; +use scywr::iteminsertqueue::QueryItem; +use scywr::iteminsertqueue::TimeBinPatchSimpleF32; use std::any; use std::any::Any; use std::collections::VecDeque; @@ -187,7 +187,8 @@ fn store_patch(series: SeriesId, pc: &mut PatchCollect, iiq: &mut VecDeque x, Err(e) => { let msg = format!( - "GetValHelp mismatch: series {:?} STY {} data {:?}", + "GetValHelp mismatch: series {:?} STY {} data {:?} {e}", series, any::type_name::(), ev.data @@ -260,9 +261,10 @@ where store_patch(params.series.clone(), pc, iiq)?; for item in pc.take_outq() { if let Some(k) = item.as_any_ref().downcast_ref::>() { + // TODO //let off_msp = let item = TimeBinPatchSimpleF32 { - series: params.series.clone(), + series: (¶ms.series).into(), bin_len_sec: (pc.bin_len().ns() / SEC) as u32, bin_count: pc.bin_count() as u32, off_msp: 0, diff --git a/scywr/Cargo.toml b/scywr/Cargo.toml index fd18105..b6b9d32 100644 --- a/scywr/Cargo.toml +++ b/scywr/Cargo.toml @@ -6,6 +6,10 @@ edition = "2021" [dependencies] futures-util = "0.3" +async-channel = "1.9.0" scylla = "0.9.0" +log = { path = "../log" } +stats = { path = "../stats" } err = { path = "../../daqbuffer/crates/err" } netpod = { path = "../../daqbuffer/crates/netpod" } +taskrun = { path = "../../daqbuffer/crates/taskrun" } diff --git a/scywr/src/err.rs b/scywr/src/err.rs new file mode 100644 index 0000000..9de1e5f --- /dev/null +++ b/scywr/src/err.rs @@ -0,0 +1,51 @@ +use scylla::transport::errors::DbError; +use scylla::transport::errors::QueryError; + +#[derive(Debug)] +pub enum Error { + DbUnavailable, + DbOverload, + DbTimeout, + DbError(String), +} + +impl From for err::Error { + fn from(e: Error) -> Self { + err::Error::with_msg_no_trace(format!("{e:?}")) + } +} + +pub trait IntoSimplerError { + fn into_simpler(self) -> Error; +} + +impl IntoSimplerError for QueryError { + fn into_simpler(self) -> Error { + let e = self; + match e { + QueryError::DbError(e, msg) => match e { + DbError::Unavailable { .. } => Error::DbUnavailable, + DbError::Overloaded => Error::DbOverload, + DbError::IsBootstrapping => Error::DbUnavailable, + DbError::ReadTimeout { .. } => Error::DbTimeout, + DbError::WriteTimeout { .. } => Error::DbTimeout, + _ => Error::DbError(format!("{e} {msg}")), + }, + QueryError::BadQuery(e) => Error::DbError(e.to_string()), + QueryError::IoError(e) => Error::DbError(e.to_string()), + QueryError::ProtocolError(e) => Error::DbError(e.to_string()), + QueryError::InvalidMessage(e) => Error::DbError(e.to_string()), + QueryError::TimeoutError => Error::DbTimeout, + QueryError::TooManyOrphanedStreamIds(e) => Error::DbError(e.to_string()), + QueryError::UnableToAllocStreamId => Error::DbError(e.to_string()), + QueryError::RequestTimeout(e) => Error::DbError(e.to_string()), + QueryError::TranslationError(e) => Error::DbError(e.to_string()), + } + } +} + +impl From for Error { + fn from(e: T) -> Self { + e.into_simpler() + } +} diff --git a/scywr/src/futinsert.rs b/scywr/src/futinsert.rs new file mode 100644 index 0000000..9a91f7d --- /dev/null +++ b/scywr/src/futinsert.rs @@ -0,0 +1,73 @@ +use crate::access::Error; +use crate::session::ScySession; +use futures_util::Future; +use futures_util::FutureExt; +use netpod::log::*; +use scylla::frame::value::ValueList; +use scylla::prepared_statement::PreparedStatement; +use scylla::transport::errors::QueryError; +use scylla::QueryResult; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; +use std::time::Instant; + +pub struct ScyInsertFut<'a> { + fut: Pin> + Send + 'a>>, + polled: usize, + ts_create: Instant, + ts_poll_first: Instant, +} + +impl<'a> ScyInsertFut<'a> { + const NAME: &'static str = "ScyInsertFut"; + + pub fn new(scy: &'a ScySession, query: &'a PreparedStatement, values: V) -> Self + where + V: ValueList + Send + 'static, + { + let fut = scy.execute(query, values); + let fut = Box::pin(fut) as _; + let tsnow = Instant::now(); + Self { + fut, + polled: 0, + ts_create: tsnow, + ts_poll_first: tsnow, + } + } +} + +impl<'a> Future for ScyInsertFut<'a> { + type Output = Result<(), Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + use Poll::*; + if self.polled == 0 { + self.ts_poll_first = Instant::now(); + } + self.polled += 1; + loop { + break match self.fut.poll_unpin(cx) { + Ready(k) => match k { + Ok(_res) => Ready(Ok(())), + Err(e) => { + let tsnow = Instant::now(); + let dt_created = tsnow.duration_since(self.ts_create).as_secs_f32() * 1e3; + let dt_poll_first = tsnow.duration_since(self.ts_poll_first).as_secs_f32() * 1e3; + error!( + "{} polled {} dt_created {:6.2} ms dt_poll_first {:6.2} ms", + Self::NAME, + self.polled, + dt_created, + dt_poll_first + ); + error!("{} done Err {:?}", Self::NAME, e); + Ready(Err(e.into())) + } + }, + Pending => Pending, + }; + } + } +} diff --git a/scywr/src/insertworker.rs b/scywr/src/insertworker.rs new file mode 100644 index 0000000..8ae77b9 --- /dev/null +++ b/scywr/src/insertworker.rs @@ -0,0 +1,324 @@ +use crate::iteminsertqueue::insert_channel_status; +use crate::iteminsertqueue::insert_connection_status; +use crate::iteminsertqueue::insert_item; +use crate::iteminsertqueue::CommonInsertItemQueue; +use crate::iteminsertqueue::QueryItem; +use crate::store::DataStore; +use err::Error; +use log::*; +use netpod::timeunits::MS; +use netpod::timeunits::SEC; +use netpod::ScyllaConfig; +use std::sync::atomic; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::time::Duration; +use std::time::Instant; +use taskrun::tokio; +use taskrun::tokio::task::JoinHandle; + +fn stats_inc_for_err(stats: &stats::CaConnStats, err: &crate::iteminsertqueue::Error) { + use crate::iteminsertqueue::Error; + match err { + Error::DbOverload => { + stats.store_worker_insert_overload_inc(); + } + Error::DbTimeout => { + stats.store_worker_insert_timeout_inc(); + } + Error::DbUnavailable => { + stats.store_worker_insert_unavailable_inc(); + } + Error::DbError(e) => { + if false { + warn!("db error {e}"); + } + stats.store_worker_insert_error_inc(); + } + Error::QueryError(_) => { + stats.store_worker_insert_error_inc(); + } + } +} + +fn back_off_next(backoff_dt: &mut Duration) { + *backoff_dt = *backoff_dt + (*backoff_dt) * 3 / 2; + let dtmax = Duration::from_millis(4000); + if *backoff_dt > dtmax { + *backoff_dt = dtmax; + } +} + +async fn back_off_sleep(backoff_dt: &mut Duration) { + back_off_next(backoff_dt); + tokio::time::sleep(*backoff_dt).await; +} + +#[derive(Debug, Clone)] +pub struct Ttls { + pub index: Duration, + pub d0: Duration, + pub d1: Duration, + pub binned: Duration, +} + +pub struct InsertWorkerOpts { + pub store_workers_rate: Arc, + pub insert_workers_running: Arc, + pub insert_frac: Arc, +} + +pub async fn spawn_scylla_insert_workers( + scyconf: ScyllaConfig, + insert_scylla_sessions: usize, + insert_worker_count: usize, + insert_item_queue: Arc, + insert_worker_opts: Arc, + store_stats: Arc, + use_rate_limit_queue: bool, + ttls: Ttls, +) -> Result>, Error> { + let (q2_tx, q2_rx) = async_channel::bounded( + insert_item_queue + .receiver() + .map_or(20000, |x| x.capacity().unwrap_or(20000)), + ); + { + let insert_worker_opts = insert_worker_opts.clone(); + let stats = store_stats.clone(); + let recv = insert_item_queue + .receiver() + .ok_or_else(|| Error::with_msg_no_trace("can not derive insert queue receiver"))?; + let store_stats = store_stats.clone(); + let fut = async move { + if !use_rate_limit_queue { + return; + } + let mut ts_forward_last = Instant::now(); + let mut ivl_ema = stats::Ema64::with_k(0.00001); + loop { + let item = if let Ok(x) = recv.recv().await { + x + } else { + break; + }; + let ts_received = Instant::now(); + let allowed_to_drop = match &item { + QueryItem::Insert(_) => true, + _ => false, + }; + let dt_min = { + let rate = insert_worker_opts.store_workers_rate.load(Ordering::Acquire); + Duration::from_nanos(SEC / rate) + }; + let mut ema2 = ivl_ema.clone(); + { + let dt = ts_received.duration_since(ts_forward_last); + let dt_ns = SEC * dt.as_secs() + dt.subsec_nanos() as u64; + ema2.update(dt_ns.min(MS * 100) as f32); + } + let ivl2 = Duration::from_nanos(ema2.ema() as u64); + if allowed_to_drop && ivl2 < dt_min { + //tokio::time::sleep_until(ts_recv_last.checked_add(dt_min).unwrap().into()).await; + stats.store_worker_ratelimit_drop_inc(); + } else { + if q2_tx.send(item).await.is_err() { + break; + } else { + let tsnow = Instant::now(); + let dt = tsnow.duration_since(ts_forward_last); + let dt_ns = SEC * dt.as_secs() + dt.subsec_nanos() as u64; + ivl_ema.update(dt_ns.min(MS * 100) as f32); + ts_forward_last = tsnow; + store_stats.inter_ivl_ema.store(ivl_ema.ema() as u64, Ordering::Release); + } + } + } + info!("intermediate queue done"); + }; + tokio::spawn(fut); + } + + let mut jhs = Vec::new(); + let mut data_stores = Vec::new(); + for _ in 0..insert_scylla_sessions { + let data_store = Arc::new(DataStore::new(&scyconf).await.map_err(|e| Error::from(e.to_string()))?); + data_stores.push(data_store); + } + for worker_ix in 0..insert_worker_count { + let data_store = data_stores[worker_ix * data_stores.len() / insert_worker_count].clone(); + let stats = store_stats.clone(); + let recv = if use_rate_limit_queue { + q2_rx.clone() + } else { + insert_item_queue + .receiver() + .ok_or_else(|| Error::with_msg_no_trace("can not derive receiver"))? + }; + let insert_worker_opts = insert_worker_opts.clone(); + let fut = async move { + insert_worker_opts + .insert_workers_running + .fetch_add(1, atomic::Ordering::AcqRel); + let backoff_0 = Duration::from_millis(10); + let mut backoff = backoff_0.clone(); + let mut i1 = 0; + loop { + let item = if let Ok(item) = recv.recv().await { + stats.store_worker_item_recv_inc(); + item + } else { + break; + }; + match item { + QueryItem::ConnectionStatus(item) => { + match insert_connection_status(item, ttls.index, &data_store, &stats).await { + Ok(_) => { + stats.connection_status_insert_done_inc(); + backoff = backoff_0; + } + Err(e) => { + stats_inc_for_err(&stats, &e); + back_off_sleep(&mut backoff).await; + } + } + } + QueryItem::ChannelStatus(item) => { + match insert_channel_status(item, ttls.index, &data_store, &stats).await { + Ok(_) => { + stats.channel_status_insert_done_inc(); + backoff = backoff_0; + } + Err(e) => { + stats_inc_for_err(&stats, &e); + back_off_sleep(&mut backoff).await; + } + } + } + QueryItem::Insert(item) => { + let insert_frac = insert_worker_opts.insert_frac.load(Ordering::Acquire); + if i1 % 1000 < insert_frac { + match insert_item(item, ttls.index, ttls.d0, ttls.d1, &data_store, &stats).await { + Ok(_) => { + stats.store_worker_insert_done_inc(); + backoff = backoff_0; + } + Err(e) => { + stats_inc_for_err(&stats, &e); + back_off_sleep(&mut backoff).await; + } + } + } else { + stats.store_worker_fraction_drop_inc(); + } + i1 += 1; + } + QueryItem::Mute(item) => { + let values = ( + (item.series.id() & 0xff) as i32, + item.series.id() as i64, + item.ts as i64, + item.ema, + item.emd, + ttls.index.as_secs() as i32, + ); + let qres = data_store.scy.execute(&data_store.qu_insert_muted, values).await; + match qres { + Ok(_) => { + stats.mute_insert_done_inc(); + backoff = backoff_0; + } + Err(e) => { + stats_inc_for_err(&stats, &crate::iteminsertqueue::Error::QueryError(e)); + back_off_sleep(&mut backoff).await; + } + } + } + QueryItem::Ivl(item) => { + let values = ( + (item.series.id() & 0xff) as i32, + item.series.id() as i64, + item.ts as i64, + item.ema, + item.emd, + ttls.index.as_secs() as i32, + ); + let qres = data_store + .scy + .execute(&data_store.qu_insert_item_recv_ivl, values) + .await; + match qres { + Ok(_) => { + stats.ivl_insert_done_inc(); + backoff = backoff_0; + } + Err(e) => { + stats_inc_for_err(&stats, &crate::iteminsertqueue::Error::QueryError(e)); + back_off_sleep(&mut backoff).await; + } + } + } + QueryItem::ChannelInfo(item) => { + let params = ( + (item.series.id() & 0xff) as i32, + item.ts_msp as i32, + item.series.id() as i64, + item.ivl, + item.interest, + item.evsize as i32, + ttls.index.as_secs() as i32, + ); + let qres = data_store.scy.execute(&data_store.qu_insert_channel_ping, params).await; + match qres { + Ok(_) => { + stats.channel_info_insert_done_inc(); + backoff = backoff_0; + } + Err(e) => { + stats_inc_for_err(&stats, &crate::iteminsertqueue::Error::QueryError(e)); + back_off_sleep(&mut backoff).await; + } + } + } + QueryItem::TimeBinPatchSimpleF32(item) => { + info!("have time bin patch to insert: {item:?}"); + let params = ( + item.series.id() as i64, + item.bin_len_sec as i32, + item.bin_count as i32, + item.off_msp as i32, + item.off_lsp as i32, + item.counts, + item.mins, + item.maxs, + item.avgs, + ttls.binned.as_secs() as i32, + ); + let qres = data_store + .scy + .execute(&data_store.qu_insert_binned_scalar_f32_v01, params) + .await; + match qres { + Ok(_) => { + stats.store_worker_insert_binned_done_inc(); + backoff = backoff_0; + } + Err(e) => { + stats_inc_for_err(&stats, &crate::iteminsertqueue::Error::QueryError(e)); + back_off_sleep(&mut backoff).await; + } + } + } + } + } + insert_worker_opts + .insert_workers_running + .fetch_sub(1, atomic::Ordering::AcqRel); + trace!("insert worker {worker_ix} done"); + }; + let jh = tokio::spawn(fut); + jhs.push(jh); + } + Ok(jhs) +} diff --git a/netfetch/src/store.rs b/scywr/src/iteminsertqueue.rs similarity index 74% rename from netfetch/src/store.rs rename to scywr/src/iteminsertqueue.rs index 81c4953..60ee277 100644 --- a/netfetch/src/store.rs +++ b/scywr/src/iteminsertqueue.rs @@ -1,138 +1,67 @@ -use crate::ca::proto::CaDataArrayValue; -use crate::ca::proto::CaDataScalarValue; -use crate::ca::proto::CaDataValue; -use crate::ca::store::DataStore; -use crate::errconv::ErrConv; -use crate::series::SeriesId; -use futures_util::Future; -use futures_util::FutureExt; +pub use netpod::CONNECTION_STATUS_DIV; + +use crate::store::DataStore; +use err::thiserror; +use err::ThisError; use log::*; use netpod::ScalarType; use netpod::Shape; -use scylla::frame::value::ValueList; use scylla::prepared_statement::PreparedStatement; use scylla::transport::errors::DbError; use scylla::transport::errors::QueryError; -use scylla::QueryResult; -use scylla::Session as ScySession; use stats::CaConnStats; use std::net::SocketAddrV4; -use std::pin::Pin; -use std::task::Context; -use std::task::Poll; use std::time::Duration; -use std::time::Instant; use std::time::SystemTime; -pub use netpod::CONNECTION_STATUS_DIV; - -#[derive(Debug)] +#[derive(Debug, ThisError)] pub enum Error { - DbUnavailable, - DbOverload, DbTimeout, - DbError(String), + DbOverload, + DbUnavailable, + DbError(#[from] DbError), + QueryError(#[from] QueryError), } -impl From for err::Error { - fn from(e: Error) -> Self { - err::Error::with_msg_no_trace(format!("{e:?}")) +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub struct SeriesId(u64); + +impl SeriesId { + pub fn new(id: u64) -> Self { + Self(id) + } + + pub fn id(&self) -> u64 { + self.0 } } -pub trait IntoSimplerError { - fn into_simpler(self) -> Error; +#[derive(Clone, Debug)] +pub enum ScalarValue { + I8(i8), + I16(i16), + I32(i32), + F32(f32), + F64(f64), + Enum(i16), + String(String), + Bool(bool), } -impl IntoSimplerError for QueryError { - fn into_simpler(self) -> Error { - let e = self; - match e { - QueryError::DbError(e, msg) => match e { - DbError::Unavailable { .. } => Error::DbUnavailable, - DbError::Overloaded => Error::DbOverload, - DbError::IsBootstrapping => Error::DbUnavailable, - DbError::ReadTimeout { .. } => Error::DbTimeout, - DbError::WriteTimeout { .. } => Error::DbTimeout, - _ => Error::DbError(format!("{e} {msg}")), - }, - QueryError::BadQuery(e) => Error::DbError(e.to_string()), - QueryError::IoError(e) => Error::DbError(e.to_string()), - QueryError::ProtocolError(e) => Error::DbError(e.to_string()), - QueryError::InvalidMessage(e) => Error::DbError(e.to_string()), - QueryError::TimeoutError => Error::DbTimeout, - QueryError::TooManyOrphanedStreamIds(e) => Error::DbError(e.to_string()), - QueryError::UnableToAllocStreamId => Error::DbError(e.to_string()), - QueryError::RequestTimeout(e) => Error::DbError(e.to_string()), - QueryError::TranslationError(e) => Error::DbError(e.to_string()), - } - } +#[derive(Clone, Debug)] +pub enum ArrayValue { + I8(Vec), + I16(Vec), + I32(Vec), + F32(Vec), + F64(Vec), + Bool(Vec), } -impl From for Error { - fn from(e: T) -> Self { - e.into_simpler() - } -} - -pub struct ScyInsertFut<'a> { - fut: Pin> + Send + 'a>>, - polled: usize, - ts_create: Instant, - ts_poll_first: Instant, -} - -impl<'a> ScyInsertFut<'a> { - const NAME: &'static str = "ScyInsertFut"; - - pub fn new(scy: &'a ScySession, query: &'a PreparedStatement, values: V) -> Self - where - V: ValueList + Send + 'static, - { - let fut = scy.execute(query, values); - let fut = Box::pin(fut) as _; - let tsnow = Instant::now(); - Self { - fut, - polled: 0, - ts_create: tsnow, - ts_poll_first: tsnow, - } - } -} - -impl<'a> Future for ScyInsertFut<'a> { - type Output = Result<(), err::Error>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - use Poll::*; - if self.polled == 0 { - self.ts_poll_first = Instant::now(); - } - self.polled += 1; - loop { - break match self.fut.poll_unpin(cx) { - Ready(k) => match k { - Ok(_res) => Ready(Ok(())), - Err(e) => { - let tsnow = Instant::now(); - let dt_created = tsnow.duration_since(self.ts_create).as_secs_f32() * 1e3; - let dt_poll_first = tsnow.duration_since(self.ts_poll_first).as_secs_f32() * 1e3; - error!( - "{} polled {} dt_created {:6.2} ms dt_poll_first {:6.2} ms", - Self::NAME, - self.polled, - dt_created, - dt_poll_first - ); - error!("{} done Err {:?}", Self::NAME, e); - Ready(Err(e).err_conv()) - } - }, - Pending => Pending, - }; - } - } +#[derive(Clone, Debug)] +pub enum DataValue { + Scalar(ScalarValue), + Array(ArrayValue), } #[derive(Debug)] @@ -267,7 +196,7 @@ pub struct InsertItem { pub pulse: u64, pub scalar_type: ScalarType, pub shape: Shape, - pub val: CaDataValue, + pub val: DataValue, } #[derive(Debug)] @@ -415,11 +344,12 @@ where Ok(_) => Ok(()), Err(e) => match e { QueryError::TimeoutError => Err(Error::DbTimeout), - QueryError::DbError(e, msg) => match e { + // TODO use `msg` + QueryError::DbError(e, _msg) => match e { DbError::Overloaded => Err(Error::DbOverload), - _ => Err(Error::DbError(format!("{e} {msg}"))), + _ => Err(e.into()), }, - _ => Err(Error::DbError(format!("{e}"))), + _ => Err(e.into()), }, } } @@ -473,7 +403,7 @@ pub async fn insert_item( .await?; stats.inserts_msp_grid_inc(); } - use CaDataValue::*; + use DataValue::*; match item.val { Scalar(val) => { let par = InsParCom { @@ -483,7 +413,7 @@ pub async fn insert_item( pulse: item.pulse, ttl: ttl_0d.as_secs() as _, }; - use CaDataScalarValue::*; + use ScalarValue::*; match val { I8(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i8, &data_store).await?, I16(val) => insert_scalar_gen(par, val, &data_store.qu_insert_scalar_i16, &data_store).await?, @@ -503,7 +433,7 @@ pub async fn insert_item( pulse: item.pulse, ttl: ttl_1d.as_secs() as _, }; - use CaDataArrayValue::*; + use ArrayValue::*; match val { I8(val) => insert_array_gen(par, val, &data_store.qu_insert_array_i8, &data_store).await?, I16(val) => insert_array_gen(par, val, &data_store.qu_insert_array_i16, &data_store).await?, diff --git a/scywr/src/lib.rs b/scywr/src/lib.rs index e6c7ce9..70d81b3 100644 --- a/scywr/src/lib.rs +++ b/scywr/src/lib.rs @@ -1,8 +1,15 @@ pub mod access; pub mod config; +pub mod err; pub mod fut; pub mod futbatch; pub mod futbatchgen; +pub mod futinsert; pub mod futinsertloop; +pub mod insertworker; +pub mod iteminsertqueue; pub mod schema; pub mod session; +pub mod store; + +pub use scylla; diff --git a/scywr/src/schema.rs b/scywr/src/schema.rs index 65c4eb8..ab327ba 100644 --- a/scywr/src/schema.rs +++ b/scywr/src/schema.rs @@ -1,7 +1,28 @@ -use crate::access::Error; +use crate::session::create_session; use crate::session::ScySession; +use err::thiserror; +use err::ThisError; use futures_util::StreamExt; -// use netpod::ScyllaConfig; +use netpod::ScyllaConfig; +use scylla::transport::errors::DbError; +use scylla::transport::errors::QueryError; +use std::fmt; + +#[derive(Debug, ThisError)] +pub enum Error { + NoKeyspaceChosen, + Fmt(#[from] fmt::Error), + Query(#[from] QueryError), + NewSession(String), +} + +impl From for Error { + fn from(value: crate::session::Error) -> Self { + match value { + crate::session::Error::NewSession(x) => Self::NewSession(x), + } + } +} pub async fn has_table(name: &str, scy: &ScySession) -> Result { let ks = scy.get_keyspace().ok_or_else(|| Error::NoKeyspaceChosen)?; @@ -21,3 +42,255 @@ pub async fn has_table(name: &str, scy: &ScySession) -> Result { } Ok(false) } + +pub async fn check_table_readable(name: &str, scy: &ScySession) -> Result { + match scy.query(format!("select * from {} limit 1", name), ()).await { + Ok(_) => Ok(true), + Err(e) => match &e { + QueryError::DbError(e2, msg) => match e2 { + DbError::Invalid => { + if msg.contains("unconfigured table") { + Ok(false) + } else { + Err(e.into()) + } + } + _ => Err(e.into()), + }, + _ => Err(e.into()), + }, + } +} + +pub async fn create_table_ts_msp(scy: &ScySession) -> Result<(), Error> { + use std::fmt::Write; + // seconds: + let default_time_to_live = 60 * 60 * 5; + // hours: + let twcs_window_index = 24 * 4; + let mut s = String::new(); + s.write_str("create table ts_msp (series bigint, ts_msp bigint, primary key (series, ts_msp))")?; + write!(s, " with default_time_to_live = {}", default_time_to_live)?; + s.write_str(" and compaction = { 'class': 'TimeWindowCompactionStrategy'")?; + s.write_str(", 'compaction_window_unit': 'HOURS'")?; + write!(s, ", 'compaction_window_size': {}", twcs_window_index)?; + s.write_str(" }")?; + scy.query(s, ()).await?; + Ok(()) +} + +struct GenTwcsTab { + name: String, + cql: String, + default_time_to_live: usize, + compaction_window_size: usize, +} + +impl GenTwcsTab { + fn name(&self) -> String { + self.name.clone() + } + + fn cql(&self) -> String { + use std::fmt::Write; + let mut s = String::new(); + write!(s, "create table {}", self.name()).unwrap(); + s.write_str(&self.cql).unwrap(); + write!(s, " with default_time_to_live = {}", self.default_time_to_live).unwrap(); + s.write_str(" and compaction = { 'class': 'TimeWindowCompactionStrategy', 'compaction_window_unit': 'HOURS'") + .unwrap(); + write!(s, ", 'compaction_window_size': {}", self.compaction_window_size).unwrap(); + s.write_str(" }").unwrap(); + s + } +} + +struct EvTabDim0 { + sty: String, + cqlsty: String, + // SCYLLA_TTL_EVENTS_DIM0 + default_time_to_live: usize, + // TWCS_WINDOW_0D + compaction_window_size: usize, +} + +impl EvTabDim0 { + fn name(&self) -> String { + format!("events_scalar_{}", self.sty) + } + + fn cql(&self) -> String { + use std::fmt::Write; + let mut s = String::new(); + write!(s, "create table {}", self.name()).unwrap(); + write!(s, " (series bigint, ts_msp bigint, ts_lsp bigint, pulse bigint, value {}, primary key ((series, ts_msp), ts_lsp))", self.cqlsty).unwrap(); + write!(s, " with default_time_to_live = {}", self.default_time_to_live).unwrap(); + s.write_str(" and compaction = { 'class': 'TimeWindowCompactionStrategy', 'compaction_window_unit': 'HOURS'") + .unwrap(); + write!(s, ", 'compaction_window_size': {}", self.compaction_window_size).unwrap(); + s.write_str(" }").unwrap(); + s + } +} + +struct EvTabDim1 { + sty: String, + cqlsty: String, + // SCYLLA_TTL_EVENTS_DIM1 + default_time_to_live: usize, + // TWCS_WINDOW_1D + compaction_window_size: usize, +} + +impl EvTabDim1 { + fn name(&self) -> String { + format!("events_array_{}", self.sty) + } + + fn cql(&self) -> String { + use std::fmt::Write; + let mut s = String::new(); + write!(s, "create table {}", self.name()).unwrap(); + write!(s, " (series bigint, ts_msp bigint, ts_lsp bigint, pulse bigint, value {}, primary key ((series, ts_msp), ts_lsp))", self.cqlsty).unwrap(); + write!(s, " with default_time_to_live = {}", self.default_time_to_live).unwrap(); + s.write_str(" and compaction = { 'class': 'TimeWindowCompactionStrategy', 'compaction_window_unit': 'HOURS'") + .unwrap(); + write!(s, ", 'compaction_window_size': {}", self.compaction_window_size).unwrap(); + s.write_str(" }").unwrap(); + s + } +} + +async fn check_event_tables(scy: &ScySession) -> Result<(), Error> { + let stys = [ + "u8", "u16", "u32", "u64", "i8", "i16", "i32", "i64", "f32", "f64", "bool", "string", + ]; + let cqlstys = [ + "tinyint", "smallint", "int", "bigint", "tinyint", "smallint", "int", "bigint", "float", "double", "boolean", + "text", + ]; + for (sty, cqlsty) in stys.into_iter().zip(cqlstys) { + let desc = EvTabDim0 { + sty: sty.into(), + cqlsty: cqlsty.into(), + // ttl is set in actual data inserts + default_time_to_live: 60 * 60 * 1, + compaction_window_size: 48, + }; + if !check_table_readable(&desc.name(), scy).await? { + scy.query(desc.cql(), ()).await?; + } + let desc = EvTabDim1 { + sty: sty.into(), + cqlsty: format!("frozen>", cqlsty), + // ttl is set in actual data inserts + default_time_to_live: 60 * 60 * 1, + compaction_window_size: 12, + }; + if !check_table_readable(&desc.name(), scy).await? { + scy.query(desc.cql(), ()).await?; + } + } + Ok(()) +} + +pub async fn migrate_keyspace(scyconf: &ScyllaConfig) -> Result<(), Error> { + let scy2 = create_session(scyconf).await?; + let scy = &scy2; + if !check_table_readable("ts_msp", &scy).await? { + create_table_ts_msp(scy).await?; + } + check_event_tables(scy).await?; + { + let desc = GenTwcsTab { + name: "series_by_ts_msp".into(), + cql: "(part int, ts_msp int, shape_kind int, scalar_type int, series bigint, primary key ((part, ts_msp, shape_kind, scalar_type), series))".into(), + default_time_to_live: 60 * 60 * 5, + compaction_window_size: 24 * 4, + }; + if !check_table_readable(&desc.name(), scy).await? { + scy.query(desc.cql(), ()).await?; + } + } + { + let desc = GenTwcsTab { + name: "connection_status".into(), + cql: "(ts_msp bigint, ts_lsp bigint, kind int, addr text, primary key (ts_msp, ts_lsp))".into(), + default_time_to_live: 60 * 60 * 1, + compaction_window_size: 24 * 4, + }; + if !check_table_readable(&desc.name(), scy).await? { + scy.query(desc.cql(), ()).await?; + } + } + { + let desc = GenTwcsTab { + name: "channel_status".into(), + cql: "(series bigint, ts_msp bigint, ts_lsp bigint, kind int, primary key ((series, ts_msp), ts_lsp))" + .into(), + default_time_to_live: 60 * 60 * 1, + compaction_window_size: 24 * 4, + }; + if !check_table_readable(&desc.name(), scy).await? { + scy.query(desc.cql(), ()).await?; + } + } + { + let desc = GenTwcsTab { + name: "channel_status_by_ts_msp".into(), + cql: "(ts_msp bigint, ts_lsp bigint, series bigint, kind int, primary key (ts_msp, ts_lsp))".into(), + default_time_to_live: 60 * 60 * 1, + compaction_window_size: 24 * 4, + }; + if !check_table_readable(&desc.name(), scy).await? { + scy.query(desc.cql(), ()).await?; + } + } + { + let desc = GenTwcsTab { + name: "channel_ping".into(), + cql: "(part int, ts_msp int, series bigint, ivl float, interest float, evsize int, primary key ((part, ts_msp), series))" + .into(), + default_time_to_live: 60 * 60 * 1, + compaction_window_size: 24 * 4, + }; + if !check_table_readable(&desc.name(), scy).await? { + scy.query(desc.cql(), ()).await?; + } + } + { + let desc = GenTwcsTab { + name: "muted".into(), + cql: "(part int, series bigint, ts bigint, ema float, emd float, primary key (part, series, ts))".into(), + default_time_to_live: 60 * 60 * 4, + compaction_window_size: 24 * 1, + }; + if !check_table_readable(&desc.name(), scy).await? { + scy.query(desc.cql(), ()).await?; + } + } + { + let desc = GenTwcsTab { + name: "item_recv_ivl".into(), + cql: "(part int, series bigint, ts bigint, ema float, emd float, primary key (part, series, ts))".into(), + default_time_to_live: 60 * 60 * 4, + compaction_window_size: 24 * 1, + }; + if !check_table_readable(&desc.name(), scy).await? { + scy.query(desc.cql(), ()).await?; + } + } + { + let desc = GenTwcsTab { + name: "binned_scalar_f32_v01".into(), + cql: "(series bigint, bin_len_sec int, bin_count int, off_msp int, off_lsp int, counts frozen>, mins frozen>, maxs frozen>, avgs frozen>, primary key ((series, bin_len_sec, bin_count, off_msp), off_lsp))" + .into(), + default_time_to_live: 60 * 60 * 24 * 30, + compaction_window_size: 24 * 4, + }; + if !check_table_readable(&desc.name(), scy).await? { + scy.query(desc.cql(), ()).await?; + } + } + Ok(()) +} diff --git a/netfetch/src/ca/store.rs b/scywr/src/store.rs similarity index 100% rename from netfetch/src/ca/store.rs rename to scywr/src/store.rs