diff --git a/crates/httpret/src/api4/binned.rs b/crates/httpret/src/api4/binned.rs index a0ce87e..45b4cdf 100644 --- a/crates/httpret/src/api4/binned.rs +++ b/crates/httpret/src/api4/binned.rs @@ -32,7 +32,6 @@ use netpod::HEADER_NAME_REQUEST_ID; use nodenet::client::OpenBoxedBytesViaHttp; use nodenet::scylla::ScyllaEventReadProvider; use query::api4::binned::BinnedQuery; -use scyllaconn::bincache::ScyllaCacheReadProvider; use scyllaconn::worker::ScyllaQueue; use std::pin::Pin; use std::sync::Arc; @@ -167,7 +166,7 @@ fn make_read_provider( let cache_read_provider = if ncc.node_config.cluster.scylla_lt().is_some() { scyqueue .clone() - .map(|qu| ScyllaCacheReadProvider::new(qu)) + .map(|qu| scyllaconn::bincache::ScyllaPrebinnedReadProvider::new(qu)) .map(|x| Arc::new(x) as Arc) .expect("scylla queue") } else if ncc.node.sf_databuffer.is_some() { diff --git a/crates/scyllaconn/src/bincache.rs b/crates/scyllaconn/src/bincache.rs index f840ef3..a8a57ee 100644 --- a/crates/scyllaconn/src/bincache.rs +++ b/crates/scyllaconn/src/bincache.rs @@ -1,25 +1,85 @@ use crate::events2::prepare::StmtsCache; +use crate::events2::prepare::StmtsEvents; +use crate::log::*; use crate::worker::ScyllaQueue; -use futures_util::StreamExt; use futures_util::TryStreamExt; +use items_0::merge::MergeableTy; use items_0::timebin::BinsBoxed; use items_2::binning::container_bins::ContainerBins; +use netpod::ttl::RetentionTime; use netpod::DtMs; use netpod::TsNano; use scylla::Session as ScySession; use std::ops::Range; +use streams::timebin::cached::reader::BinsReadRes; +use streams::timebin::cached::reader::PrebinnedPartitioning; -pub struct ScyllaCacheReadProvider { +async fn scylla_read_prebinned_f32( + series: u64, + bin_len: DtMs, + msp: u64, + offs: Range, + scyqueue: ScyllaQueue, +) -> BinsReadRes { + let rts = [RetentionTime::Short, RetentionTime::Medium, RetentionTime::Long]; + let mut res = Vec::new(); + for rt in rts { + let x = scyqueue + .read_prebinned_f32(rt, series, bin_len, msp, offs.clone()) + .await?; + res.push(x); + } + let mut out = ContainerBins::new(); + let mut dmp = ContainerBins::new(); + loop { + // TODO count for metrics when duplicates are found, or other issues. + let mins: Vec<_> = res.iter().map(|x| MergeableTy::ts_min(x)).collect(); + let mut ix = None; + let mut min2 = None; + for (i, min) in mins.iter().map(|x| x.clone()).enumerate() { + if let Some(min) = min { + if let Some(min2a) = min2 { + if min < min2a { + ix = Some(i); + min2 = Some(min); + } + } else { + ix = Some(i); + min2 = Some(min); + } + } + } + if let Some(ix) = ix { + let min2 = min2.unwrap(); + res[ix].drain_into(&mut out, 0..1); + let pps: Vec<_> = res.iter().map(|x| MergeableTy::find_lowest_index_gt(x, min2)).collect(); + for (i, pp) in pps.into_iter().enumerate() { + if let Some(pp) = pp { + MergeableTy::drain_into(res.get_mut(i).unwrap(), &mut dmp, 0..pp); + } + } + } else { + break; + } + } + if out.len() == 0 { + Ok(None) + } else { + Ok(Some(Box::new(out))) + } +} + +pub struct ScyllaPrebinnedReadProvider { scyqueue: ScyllaQueue, } -impl ScyllaCacheReadProvider { +impl ScyllaPrebinnedReadProvider { pub fn new(scyqueue: ScyllaQueue) -> Self { Self { scyqueue } } } -impl streams::timebin::CacheReadProvider for ScyllaCacheReadProvider { +impl streams::timebin::CacheReadProvider for ScyllaPrebinnedReadProvider { fn read( &self, series: u64, @@ -27,17 +87,16 @@ impl streams::timebin::CacheReadProvider for ScyllaCacheReadProvider { msp: u64, offs: Range, ) -> streams::timebin::cached::reader::CacheReading { - let scyqueue = self.scyqueue.clone(); - // let fut = async move { scyqueue.read_cache_f32(series, bin_len, msp, offs).await }; - let fut = async { todo!("TODO impl scylla cache read") }; + // let fut = async { todo!("TODO impl scylla cache read") }; + let fut = scylla_read_prebinned_f32(series, bin_len, msp, offs, self.scyqueue.clone()); streams::timebin::cached::reader::CacheReading::new(Box::pin(fut)) } - fn write(&self, series: u64, bins: BinsBoxed) -> streams::timebin::cached::reader::CacheWriting { - let scyqueue = self.scyqueue.clone(); - let bins = todo!("TODO impl scylla cache write"); - let fut = async move { scyqueue.write_cache_f32(series, bins).await }; - streams::timebin::cached::reader::CacheWriting::new(Box::pin(fut)) + fn write(&self, _series: u64, _bins: BinsBoxed) -> streams::timebin::cached::reader::CacheWriting { + // let scyqueue = self.scyqueue.clone(); + // let fut = async move { scyqueue.write_cache_f32(series, bins).await }; + // streams::timebin::cached::reader::CacheWriting::new(Box::pin(fut)) + todo!("TODO impl scylla cache write") } } @@ -47,9 +106,14 @@ pub async fn worker_write( stmts_cache: &StmtsCache, scy: &ScySession, ) -> Result<(), streams::timebin::cached::reader::Error> { - for (((((((&ts1, &ts2), &cnt), min), max), avg), lst), &fnl) in bins.zip_iter() { + if true { + error!("TODO retrieval should probably not write a cache at all"); + return Err(streams::timebin::cached::reader::Error::TodoImpl); + } + for (((((((&ts1, &ts2), &cnt), min), max), avg), lst), _fnl) in bins.zip_iter() { let bin_len = DtMs::from_ms_u64((ts2.ns() - ts1.ns()) / 1000000); - let div = streams::timebin::cached::reader::part_len(bin_len).ns(); + // let div = streams::timebin::cached::reader::part_len(bin_len).ns(); + let div = 42424242424242; let msp = ts1.ns() / div; let off = (ts1.ns() - msp * div) / bin_len.ns(); let params = ( @@ -72,14 +136,16 @@ pub async fn worker_write( } pub async fn worker_read( + rt: RetentionTime, series: u64, bin_len: DtMs, msp: u64, offs: core::ops::Range, - stmts_cache: &StmtsCache, + stmts: &StmtsEvents, scy: &ScySession, ) -> Result, streams::timebin::cached::reader::Error> { - let div = streams::timebin::cached::reader::part_len(bin_len).ns(); + let partt = PrebinnedPartitioning::try_from(bin_len)?; + let div = partt.msp_div(); let params = ( series as i64, bin_len.ms() as i32, @@ -88,7 +154,7 @@ pub async fn worker_read( offs.end as i32, ); let res = scy - .execute_iter(stmts_cache.st_read_f32().clone(), params) + .execute_iter(stmts.rt(&rt).prebinned_f32().clone(), params) .await .map_err(|e| streams::timebin::cached::reader::Error::Scylla(e.to_string()))?; let mut it = res @@ -106,7 +172,7 @@ pub async fn worker_read( let max = row.3; let avg = row.4; let lst = row.5; - let ts1 = TsNano::from_ns(bin_len.ns() * off + div * msp); + let ts1 = TsNano::from_ns(bin_len.ns() * off + div.ns() * msp); let ts2 = TsNano::from_ns(ts1.ns() + bin_len.ns()); // By assumption, bins which got written to storage are considered final let fnl = true; diff --git a/crates/scyllaconn/src/events2/prepare.rs b/crates/scyllaconn/src/events2/prepare.rs index ca439ae..e75fff7 100644 --- a/crates/scyllaconn/src/events2/prepare.rs +++ b/crates/scyllaconn/src/events2/prepare.rs @@ -81,6 +81,7 @@ pub struct StmtsEventsRt { lsp_bck_val: StmtsLspDir, lsp_fwd_ts: StmtsLspDir, lsp_bck_ts: StmtsLspDir, + prebinned_f32: PreparedStatement, } impl StmtsEventsRt { @@ -107,6 +108,10 @@ impl StmtsEventsRt { } } } + + pub fn prebinned_f32(&self) -> &PreparedStatement { + &self.prebinned_f32 + } } async fn make_msp_dir(ks: &str, rt: &RetentionTime, bck: bool, scy: &Session) -> Result { @@ -212,6 +217,20 @@ async fn make_lsp_dir( Ok(ret) } +async fn make_prebinned_f32(ks: &str, rt: &RetentionTime, scy: &Session) -> Result { + let cql = format!( + concat!( + "select off, cnt, min, max, avg, lst from {}.{}binned_scalar_f32_v02", + " where series = ? and binlen = ? and msp = ?", + " and off >= ? and off < ?" + ), + ks, + rt.table_prefix() + ); + let qu = scy.prepare(cql).await?; + Ok(qu) +} + async fn make_rt(ks: &str, rt: &RetentionTime, scy: &Session) -> Result { let ret = StmtsEventsRt { ts_msp_fwd: make_msp_dir(ks, rt, false, scy).await?, @@ -220,6 +239,7 @@ async fn make_rt(ks: &str, rt: &RetentionTime, scy: &Session) -> Result, Sender>, ), - ReadCacheF32(ReadCacheF32), + ReadPrebinnedF32(ReadPrebinnedF32), } struct ReadNextValues { @@ -168,15 +170,17 @@ impl ScyllaQueue { Ok(res) } - pub async fn read_cache_f32( + pub async fn read_prebinned_f32( &self, + rt: RetentionTime, series: u64, bin_len: DtMs, msp: u64, offs: core::ops::Range, ) -> Result, streams::timebin::cached::reader::Error> { let (tx, rx) = async_channel::bounded(1); - let job = Job::ReadCacheF32(ReadCacheF32 { + let job = Job::ReadPrebinnedF32(ReadPrebinnedF32 { + rt, series, bin_len, msp, @@ -209,7 +213,7 @@ impl ScyllaWorker { scyconf_mt: ScyllaConfig, scyconf_lt: ScyllaConfig, ) -> Result<(ScyllaQueue, Self), Error> { - let (tx, rx) = async_channel::bounded(200); + let (tx, rx) = async_channel::bounded(SCYLLA_WORKER_QUEUE_LEN); let queue = ScyllaQueue { tx }; let worker = Self { rx, @@ -236,8 +240,8 @@ impl ScyllaWorker { debug!("scylla worker prepare start"); let stmts = StmtsEvents::new(kss.try_into().map_err(|_| Error::MissingKeyspaceConfig)?, &scy).await?; let stmts = Arc::new(stmts); - let stmts_cache = StmtsCache::new(kss[0], &scy).await?; - let stmts_cache = Arc::new(stmts_cache); + // let stmts_cache = StmtsCache::new(kss[0], &scy).await?; + // let stmts_cache = Arc::new(stmts_cache); debug!("scylla worker prepare done"); self.rx .map(|job| async { @@ -266,19 +270,21 @@ impl ScyllaWorker { // TODO count for stats } } - Job::WriteCacheF32(series, bins, tx) => { - let res = super::bincache::worker_write(series, bins, &stmts_cache, &scy).await; + Job::WriteCacheF32(_, _, tx) => { + // let res = super::bincache::worker_write(series, bins, &stmts_cache, &scy).await; + let res = Err(streams::timebin::cached::reader::Error::TodoImpl); if tx.send(res).await.is_err() { // TODO count for stats } } - Job::ReadCacheF32(job) => { + Job::ReadPrebinnedF32(job) => { let res = super::bincache::worker_read( + job.rt, job.series, job.bin_len, job.msp, job.offs, - &stmts_cache, + &stmts, &scy, ) .await; @@ -288,7 +294,7 @@ impl ScyllaWorker { } } }) - .buffer_unordered(80) + .buffer_unordered(CONCURRENT_QUERIES_PER_WORKER) .for_each(|_| futures_util::future::ready(())) .await; info!("scylla worker finished");