Deliver from prebinned

This commit is contained in:
Dominik Werder
2024-12-09 23:06:30 +01:00
parent 477ceb9402
commit c52de68418
4 changed files with 141 additions and 50 deletions

View File

@@ -32,7 +32,6 @@ use netpod::HEADER_NAME_REQUEST_ID;
use nodenet::client::OpenBoxedBytesViaHttp;
use nodenet::scylla::ScyllaEventReadProvider;
use query::api4::binned::BinnedQuery;
use scyllaconn::bincache::ScyllaCacheReadProvider;
use scyllaconn::worker::ScyllaQueue;
use std::pin::Pin;
use std::sync::Arc;
@@ -167,7 +166,7 @@ fn make_read_provider(
let cache_read_provider = if ncc.node_config.cluster.scylla_lt().is_some() {
scyqueue
.clone()
.map(|qu| ScyllaCacheReadProvider::new(qu))
.map(|qu| scyllaconn::bincache::ScyllaPrebinnedReadProvider::new(qu))
.map(|x| Arc::new(x) as Arc<dyn CacheReadProvider>)
.expect("scylla queue")
} else if ncc.node.sf_databuffer.is_some() {

View File

@@ -1,25 +1,85 @@
use crate::events2::prepare::StmtsCache;
use crate::events2::prepare::StmtsEvents;
use crate::log::*;
use crate::worker::ScyllaQueue;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use items_0::merge::MergeableTy;
use items_0::timebin::BinsBoxed;
use items_2::binning::container_bins::ContainerBins;
use netpod::ttl::RetentionTime;
use netpod::DtMs;
use netpod::TsNano;
use scylla::Session as ScySession;
use std::ops::Range;
use streams::timebin::cached::reader::BinsReadRes;
use streams::timebin::cached::reader::PrebinnedPartitioning;
pub struct ScyllaCacheReadProvider {
async fn scylla_read_prebinned_f32(
series: u64,
bin_len: DtMs,
msp: u64,
offs: Range<u32>,
scyqueue: ScyllaQueue,
) -> BinsReadRes {
let rts = [RetentionTime::Short, RetentionTime::Medium, RetentionTime::Long];
let mut res = Vec::new();
for rt in rts {
let x = scyqueue
.read_prebinned_f32(rt, series, bin_len, msp, offs.clone())
.await?;
res.push(x);
}
let mut out = ContainerBins::new();
let mut dmp = ContainerBins::new();
loop {
// TODO count for metrics when duplicates are found, or other issues.
let mins: Vec<_> = res.iter().map(|x| MergeableTy::ts_min(x)).collect();
let mut ix = None;
let mut min2 = None;
for (i, min) in mins.iter().map(|x| x.clone()).enumerate() {
if let Some(min) = min {
if let Some(min2a) = min2 {
if min < min2a {
ix = Some(i);
min2 = Some(min);
}
} else {
ix = Some(i);
min2 = Some(min);
}
}
}
if let Some(ix) = ix {
let min2 = min2.unwrap();
res[ix].drain_into(&mut out, 0..1);
let pps: Vec<_> = res.iter().map(|x| MergeableTy::find_lowest_index_gt(x, min2)).collect();
for (i, pp) in pps.into_iter().enumerate() {
if let Some(pp) = pp {
MergeableTy::drain_into(res.get_mut(i).unwrap(), &mut dmp, 0..pp);
}
}
} else {
break;
}
}
if out.len() == 0 {
Ok(None)
} else {
Ok(Some(Box::new(out)))
}
}
pub struct ScyllaPrebinnedReadProvider {
scyqueue: ScyllaQueue,
}
impl ScyllaCacheReadProvider {
impl ScyllaPrebinnedReadProvider {
pub fn new(scyqueue: ScyllaQueue) -> Self {
Self { scyqueue }
}
}
impl streams::timebin::CacheReadProvider for ScyllaCacheReadProvider {
impl streams::timebin::CacheReadProvider for ScyllaPrebinnedReadProvider {
fn read(
&self,
series: u64,
@@ -27,17 +87,16 @@ impl streams::timebin::CacheReadProvider for ScyllaCacheReadProvider {
msp: u64,
offs: Range<u32>,
) -> streams::timebin::cached::reader::CacheReading {
let scyqueue = self.scyqueue.clone();
// let fut = async move { scyqueue.read_cache_f32(series, bin_len, msp, offs).await };
let fut = async { todo!("TODO impl scylla cache read") };
// let fut = async { todo!("TODO impl scylla cache read") };
let fut = scylla_read_prebinned_f32(series, bin_len, msp, offs, self.scyqueue.clone());
streams::timebin::cached::reader::CacheReading::new(Box::pin(fut))
}
fn write(&self, series: u64, bins: BinsBoxed) -> streams::timebin::cached::reader::CacheWriting {
let scyqueue = self.scyqueue.clone();
let bins = todo!("TODO impl scylla cache write");
let fut = async move { scyqueue.write_cache_f32(series, bins).await };
streams::timebin::cached::reader::CacheWriting::new(Box::pin(fut))
fn write(&self, _series: u64, _bins: BinsBoxed) -> streams::timebin::cached::reader::CacheWriting {
// let scyqueue = self.scyqueue.clone();
// let fut = async move { scyqueue.write_cache_f32(series, bins).await };
// streams::timebin::cached::reader::CacheWriting::new(Box::pin(fut))
todo!("TODO impl scylla cache write")
}
}
@@ -47,9 +106,14 @@ pub async fn worker_write(
stmts_cache: &StmtsCache,
scy: &ScySession,
) -> Result<(), streams::timebin::cached::reader::Error> {
for (((((((&ts1, &ts2), &cnt), min), max), avg), lst), &fnl) in bins.zip_iter() {
if true {
error!("TODO retrieval should probably not write a cache at all");
return Err(streams::timebin::cached::reader::Error::TodoImpl);
}
for (((((((&ts1, &ts2), &cnt), min), max), avg), lst), _fnl) in bins.zip_iter() {
let bin_len = DtMs::from_ms_u64((ts2.ns() - ts1.ns()) / 1000000);
let div = streams::timebin::cached::reader::part_len(bin_len).ns();
// let div = streams::timebin::cached::reader::part_len(bin_len).ns();
let div = 42424242424242;
let msp = ts1.ns() / div;
let off = (ts1.ns() - msp * div) / bin_len.ns();
let params = (
@@ -72,14 +136,16 @@ pub async fn worker_write(
}
pub async fn worker_read(
rt: RetentionTime,
series: u64,
bin_len: DtMs,
msp: u64,
offs: core::ops::Range<u32>,
stmts_cache: &StmtsCache,
stmts: &StmtsEvents,
scy: &ScySession,
) -> Result<ContainerBins<f32, f32>, streams::timebin::cached::reader::Error> {
let div = streams::timebin::cached::reader::part_len(bin_len).ns();
let partt = PrebinnedPartitioning::try_from(bin_len)?;
let div = partt.msp_div();
let params = (
series as i64,
bin_len.ms() as i32,
@@ -88,7 +154,7 @@ pub async fn worker_read(
offs.end as i32,
);
let res = scy
.execute_iter(stmts_cache.st_read_f32().clone(), params)
.execute_iter(stmts.rt(&rt).prebinned_f32().clone(), params)
.await
.map_err(|e| streams::timebin::cached::reader::Error::Scylla(e.to_string()))?;
let mut it = res
@@ -106,7 +172,7 @@ pub async fn worker_read(
let max = row.3;
let avg = row.4;
let lst = row.5;
let ts1 = TsNano::from_ns(bin_len.ns() * off + div * msp);
let ts1 = TsNano::from_ns(bin_len.ns() * off + div.ns() * msp);
let ts2 = TsNano::from_ns(ts1.ns() + bin_len.ns());
// By assumption, bins which got written to storage are considered final
let fnl = true;

View File

@@ -81,6 +81,7 @@ pub struct StmtsEventsRt {
lsp_bck_val: StmtsLspDir,
lsp_fwd_ts: StmtsLspDir,
lsp_bck_ts: StmtsLspDir,
prebinned_f32: PreparedStatement,
}
impl StmtsEventsRt {
@@ -107,6 +108,10 @@ impl StmtsEventsRt {
}
}
}
pub fn prebinned_f32(&self) -> &PreparedStatement {
&self.prebinned_f32
}
}
async fn make_msp_dir(ks: &str, rt: &RetentionTime, bck: bool, scy: &Session) -> Result<PreparedStatement, Error> {
@@ -212,6 +217,20 @@ async fn make_lsp_dir(
Ok(ret)
}
async fn make_prebinned_f32(ks: &str, rt: &RetentionTime, scy: &Session) -> Result<PreparedStatement, Error> {
let cql = format!(
concat!(
"select off, cnt, min, max, avg, lst from {}.{}binned_scalar_f32_v02",
" where series = ? and binlen = ? and msp = ?",
" and off >= ? and off < ?"
),
ks,
rt.table_prefix()
);
let qu = scy.prepare(cql).await?;
Ok(qu)
}
async fn make_rt(ks: &str, rt: &RetentionTime, scy: &Session) -> Result<StmtsEventsRt, Error> {
let ret = StmtsEventsRt {
ts_msp_fwd: make_msp_dir(ks, rt, false, scy).await?,
@@ -220,6 +239,7 @@ async fn make_rt(ks: &str, rt: &RetentionTime, scy: &Session) -> Result<StmtsEve
lsp_bck_val: make_lsp_dir(ks, rt, "ts_lsp, value", true, scy).await?,
lsp_fwd_ts: make_lsp_dir(ks, rt, "ts_lsp", false, scy).await?,
lsp_bck_ts: make_lsp_dir(ks, rt, "ts_lsp", true, scy).await?,
prebinned_f32: make_prebinned_f32(ks, rt, scy).await?,
};
Ok(ret)
}

View File

@@ -1,13 +1,10 @@
use crate::conn::create_scy_session_no_ks;
use crate::events::ReadJobTrace;
use crate::events2::prepare::StmtsCache;
use crate::events2::prepare::StmtsEvents;
use crate::range::ScyllaSeriesRange;
use async_channel::Receiver;
use async_channel::Sender;
use daqbuf_err as err;
use err::thiserror;
use err::ThisError;
use futures_util::Future;
use futures_util::StreamExt;
use items_0::timebin::BinningggContainerEventsDyn;
@@ -23,24 +20,29 @@ use std::fmt;
use std::pin::Pin;
use std::sync::Arc;
#[derive(Debug, ThisError)]
#[cstm(name = "ScyllaWorker")]
pub enum Error {
ScyllaConnection(err::Error),
Prepare(#[from] crate::events2::prepare::Error),
EventsQuery(#[from] crate::events::Error),
Msp(#[from] crate::events2::msp::Error),
ChannelSend,
ChannelRecv,
Join,
Toplist(#[from] crate::accounting::toplist::Error),
MissingKeyspaceConfig,
CacheWriteF32(#[from] streams::timebin::cached::reader::Error),
Schema(#[from] crate::schema::Error),
}
const CONCURRENT_QUERIES_PER_WORKER: usize = 80;
const SCYLLA_WORKER_QUEUE_LEN: usize = 200;
autoerr::create_error_v1!(
name(Error, "ScyllaWorker"),
enum variants {
ScyllaConnection(err::Error),
Prepare(#[from] crate::events2::prepare::Error),
EventsQuery(#[from] crate::events::Error),
Msp(#[from] crate::events2::msp::Error),
ChannelSend,
ChannelRecv,
Join,
Toplist(#[from] crate::accounting::toplist::Error),
MissingKeyspaceConfig,
CacheWriteF32(#[from] streams::timebin::cached::reader::Error),
Schema(#[from] crate::schema::Error),
},
);
#[derive(Debug)]
struct ReadCacheF32 {
struct ReadPrebinnedF32 {
rt: RetentionTime,
series: u64,
bin_len: DtMs,
msp: u64,
@@ -69,7 +71,7 @@ enum Job {
ContainerBins<f32, f32>,
Sender<Result<(), streams::timebin::cached::reader::Error>>,
),
ReadCacheF32(ReadCacheF32),
ReadPrebinnedF32(ReadPrebinnedF32),
}
struct ReadNextValues {
@@ -168,15 +170,17 @@ impl ScyllaQueue {
Ok(res)
}
pub async fn read_cache_f32(
pub async fn read_prebinned_f32(
&self,
rt: RetentionTime,
series: u64,
bin_len: DtMs,
msp: u64,
offs: core::ops::Range<u32>,
) -> Result<ContainerBins<f32, f32>, streams::timebin::cached::reader::Error> {
let (tx, rx) = async_channel::bounded(1);
let job = Job::ReadCacheF32(ReadCacheF32 {
let job = Job::ReadPrebinnedF32(ReadPrebinnedF32 {
rt,
series,
bin_len,
msp,
@@ -209,7 +213,7 @@ impl ScyllaWorker {
scyconf_mt: ScyllaConfig,
scyconf_lt: ScyllaConfig,
) -> Result<(ScyllaQueue, Self), Error> {
let (tx, rx) = async_channel::bounded(200);
let (tx, rx) = async_channel::bounded(SCYLLA_WORKER_QUEUE_LEN);
let queue = ScyllaQueue { tx };
let worker = Self {
rx,
@@ -236,8 +240,8 @@ impl ScyllaWorker {
debug!("scylla worker prepare start");
let stmts = StmtsEvents::new(kss.try_into().map_err(|_| Error::MissingKeyspaceConfig)?, &scy).await?;
let stmts = Arc::new(stmts);
let stmts_cache = StmtsCache::new(kss[0], &scy).await?;
let stmts_cache = Arc::new(stmts_cache);
// let stmts_cache = StmtsCache::new(kss[0], &scy).await?;
// let stmts_cache = Arc::new(stmts_cache);
debug!("scylla worker prepare done");
self.rx
.map(|job| async {
@@ -266,19 +270,21 @@ impl ScyllaWorker {
// TODO count for stats
}
}
Job::WriteCacheF32(series, bins, tx) => {
let res = super::bincache::worker_write(series, bins, &stmts_cache, &scy).await;
Job::WriteCacheF32(_, _, tx) => {
// let res = super::bincache::worker_write(series, bins, &stmts_cache, &scy).await;
let res = Err(streams::timebin::cached::reader::Error::TodoImpl);
if tx.send(res).await.is_err() {
// TODO count for stats
}
}
Job::ReadCacheF32(job) => {
Job::ReadPrebinnedF32(job) => {
let res = super::bincache::worker_read(
job.rt,
job.series,
job.bin_len,
job.msp,
job.offs,
&stmts_cache,
&stmts,
&scy,
)
.await;
@@ -288,7 +294,7 @@ impl ScyllaWorker {
}
}
})
.buffer_unordered(80)
.buffer_unordered(CONCURRENT_QUERIES_PER_WORKER)
.for_each(|_| futures_util::future::ready(()))
.await;
info!("scylla worker finished");