Files
daqbuffer/crates/scyllaconn/src/bincache.rs
2024-12-09 23:06:30 +01:00

183 lines
5.9 KiB
Rust

use crate::events2::prepare::StmtsCache;
use crate::events2::prepare::StmtsEvents;
use crate::log::*;
use crate::worker::ScyllaQueue;
use futures_util::TryStreamExt;
use items_0::merge::MergeableTy;
use items_0::timebin::BinsBoxed;
use items_2::binning::container_bins::ContainerBins;
use netpod::ttl::RetentionTime;
use netpod::DtMs;
use netpod::TsNano;
use scylla::Session as ScySession;
use std::ops::Range;
use streams::timebin::cached::reader::BinsReadRes;
use streams::timebin::cached::reader::PrebinnedPartitioning;
async fn scylla_read_prebinned_f32(
series: u64,
bin_len: DtMs,
msp: u64,
offs: Range<u32>,
scyqueue: ScyllaQueue,
) -> BinsReadRes {
let rts = [RetentionTime::Short, RetentionTime::Medium, RetentionTime::Long];
let mut res = Vec::new();
for rt in rts {
let x = scyqueue
.read_prebinned_f32(rt, series, bin_len, msp, offs.clone())
.await?;
res.push(x);
}
let mut out = ContainerBins::new();
let mut dmp = ContainerBins::new();
loop {
// TODO count for metrics when duplicates are found, or other issues.
let mins: Vec<_> = res.iter().map(|x| MergeableTy::ts_min(x)).collect();
let mut ix = None;
let mut min2 = None;
for (i, min) in mins.iter().map(|x| x.clone()).enumerate() {
if let Some(min) = min {
if let Some(min2a) = min2 {
if min < min2a {
ix = Some(i);
min2 = Some(min);
}
} else {
ix = Some(i);
min2 = Some(min);
}
}
}
if let Some(ix) = ix {
let min2 = min2.unwrap();
res[ix].drain_into(&mut out, 0..1);
let pps: Vec<_> = res.iter().map(|x| MergeableTy::find_lowest_index_gt(x, min2)).collect();
for (i, pp) in pps.into_iter().enumerate() {
if let Some(pp) = pp {
MergeableTy::drain_into(res.get_mut(i).unwrap(), &mut dmp, 0..pp);
}
}
} else {
break;
}
}
if out.len() == 0 {
Ok(None)
} else {
Ok(Some(Box::new(out)))
}
}
pub struct ScyllaPrebinnedReadProvider {
scyqueue: ScyllaQueue,
}
impl ScyllaPrebinnedReadProvider {
pub fn new(scyqueue: ScyllaQueue) -> Self {
Self { scyqueue }
}
}
impl streams::timebin::CacheReadProvider for ScyllaPrebinnedReadProvider {
fn read(
&self,
series: u64,
bin_len: DtMs,
msp: u64,
offs: Range<u32>,
) -> streams::timebin::cached::reader::CacheReading {
// let fut = async { todo!("TODO impl scylla cache read") };
let fut = scylla_read_prebinned_f32(series, bin_len, msp, offs, self.scyqueue.clone());
streams::timebin::cached::reader::CacheReading::new(Box::pin(fut))
}
fn write(&self, _series: u64, _bins: BinsBoxed) -> streams::timebin::cached::reader::CacheWriting {
// let scyqueue = self.scyqueue.clone();
// let fut = async move { scyqueue.write_cache_f32(series, bins).await };
// streams::timebin::cached::reader::CacheWriting::new(Box::pin(fut))
todo!("TODO impl scylla cache write")
}
}
pub async fn worker_write(
series: u64,
bins: ContainerBins<f32, f32>,
stmts_cache: &StmtsCache,
scy: &ScySession,
) -> Result<(), streams::timebin::cached::reader::Error> {
if true {
error!("TODO retrieval should probably not write a cache at all");
return Err(streams::timebin::cached::reader::Error::TodoImpl);
}
for (((((((&ts1, &ts2), &cnt), min), max), avg), lst), _fnl) in bins.zip_iter() {
let bin_len = DtMs::from_ms_u64((ts2.ns() - ts1.ns()) / 1000000);
// let div = streams::timebin::cached::reader::part_len(bin_len).ns();
let div = 42424242424242;
let msp = ts1.ns() / div;
let off = (ts1.ns() - msp * div) / bin_len.ns();
let params = (
series as i64,
bin_len.ms() as i32,
msp as i64,
off as i32,
cnt as i64,
min,
max,
avg,
lst,
);
// trace!("cache write {:?}", params);
scy.execute_unpaged(stmts_cache.st_write_f32(), params)
.await
.map_err(|e| streams::timebin::cached::reader::Error::Scylla(e.to_string()))?;
}
Ok(())
}
pub async fn worker_read(
rt: RetentionTime,
series: u64,
bin_len: DtMs,
msp: u64,
offs: core::ops::Range<u32>,
stmts: &StmtsEvents,
scy: &ScySession,
) -> Result<ContainerBins<f32, f32>, streams::timebin::cached::reader::Error> {
let partt = PrebinnedPartitioning::try_from(bin_len)?;
let div = partt.msp_div();
let params = (
series as i64,
bin_len.ms() as i32,
msp as i64,
offs.start as i32,
offs.end as i32,
);
let res = scy
.execute_iter(stmts.rt(&rt).prebinned_f32().clone(), params)
.await
.map_err(|e| streams::timebin::cached::reader::Error::Scylla(e.to_string()))?;
let mut it = res
.rows_stream::<(i32, i64, f32, f32, f32, f32)>()
.map_err(|e| streams::timebin::cached::reader::Error::Scylla(e.to_string()))?;
let mut bins = ContainerBins::new();
while let Some(row) = it
.try_next()
.await
.map_err(|e| streams::timebin::cached::reader::Error::Scylla(e.to_string()))?
{
let off = row.0 as u64;
let cnt = row.1 as u64;
let min = row.2;
let max = row.3;
let avg = row.4;
let lst = row.5;
let ts1 = TsNano::from_ns(bin_len.ns() * off + div.ns() * msp);
let ts2 = TsNano::from_ns(ts1.ns() + bin_len.ns());
// By assumption, bins which got written to storage are considered final
let fnl = true;
bins.push_back(ts1, ts2, cnt, min, max, avg, lst, fnl);
}
Ok(bins)
}