WIP bin api

This commit is contained in:
Dominik Werder
2025-05-21 15:14:57 +02:00
parent 7afafe6dff
commit 30fa13a083
8 changed files with 262 additions and 24 deletions

View File

@@ -27,6 +27,7 @@ use netpod::log;
use netpod::req_uri_to_url;
use netpod::timeunits::SEC;
use netpod::ttl::RetentionTime;
use netpod::BinnedRange;
use netpod::ChannelTypeConfigGen;
use netpod::FromUrl;
use netpod::NodeConfigCached;
@@ -72,6 +73,9 @@ autoerr::create_error_v1!(
BinnedStream(err::Error),
TimebinnedJson(#[from] streams::timebinnedjson::Error),
ReadAllCoarse(#[from] scyllaconn::binwriteindex::read_all_coarse::Error),
Binned2FromBinned(#[from] scyllaconn::binned2::frombinned::Error),
BinnedQuery(#[from] query::api4::binned::Error),
BadRange,
},
);
@@ -237,24 +241,53 @@ fn make_read_provider(
(events_read_provider, cache_read_provider)
}
fn to_debug<T: std::fmt::Debug>(x: T) -> String {
format!("{:?}", x)
}
async fn binned_json_framed(
res2: HandleRes2<'_>,
ctx: &ReqCtx,
_ncc: &NodeConfigCached,
) -> Result<StreamResponse, Error> {
use futures_util::Stream;
let series = SeriesId::new(res2.ch_conf.series().unwrap());
let range = res2.query.range().to_time().unwrap();
let scyqueue = res2.scyqueue.as_ref().unwrap();
let res = scyllaconn::binwriteindex::read_all_coarse::read_all_coarse(series, range, scyqueue).await?;
let mut strings = Vec::new();
for e in res {
strings.push(format!("{:?}", e));
}
let stream = if res2.url.as_str().contains("testpart=read_all_coarse") {
let stream = scyllaconn::binwriteindex::read_all_coarse::ReadAllCoarse::new(series, range, scyqueue.clone());
let stream = stream.map_ok(to_debug).map_err(Error::from);
let msg = format!("{}", res2.url.as_str());
let stream = futures_util::stream::iter([Ok(msg)]).chain(stream);
Box::pin(stream) as Pin<Box<dyn Stream<Item = _> + Send>>
} else if res2.url.as_str().contains("testpart=frombinned") {
let binrange = res2
.query
.covering_range()?
.binned_range_time()
.ok_or_else(|| Error::BadRange)?;
let stream = scyllaconn::binned2::frombinned::FromBinned::new(series, binrange, scyqueue);
let stream = stream.map_err(Error::from);
let msg = format!("{}", res2.url.as_str());
let stream = futures_util::stream::iter([Ok(msg)]).chain(stream);
Box::pin(stream) as Pin<Box<dyn Stream<Item = _> + Send>>
} else {
let msg = format!("UNKNOWN {}", res2.url.as_str());
let stream = futures_util::stream::iter([Ok(msg)]);
Box::pin(stream)
};
let stream = streams::lenframe::bytes_chunks_to_len_framed_str(stream);
let stream = streams::instrument::InstrumentStream::new(stream, res2.logspan);
let ret = response(StatusCode::OK)
.header(CONTENT_TYPE, APP_JSON)
.header(CONTENT_TYPE, APP_JSON_FRAMED)
.header(HEADER_NAME_REQUEST_ID, ctx.reqid())
.body(ToJsonBody::from(&strings).into_body())?;
.body(body_stream(stream))?;
Ok(ret)
// let ret = response(StatusCode::OK)
// .header(CONTENT_TYPE, APP_JSON)
// .header(HEADER_NAME_REQUEST_ID, ctx.reqid())
// .body(ToJsonBody::from(&strings).into_body())?;
// Ok(ret)
}
struct HandleRes2<'a> {

View File

@@ -9,6 +9,8 @@ futures-util = "0.3.31"
pin-project = "1"
async-channel = "2.3.1"
scylla = "1.1"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
time = { version = "0.3.41", features = ["parsing", "formatting", "macros"] }
autoerr = "0.0.3"
daqbuf-err = { path = "../../../daqbuf-err" }

View File

@@ -1,4 +1,6 @@
pub mod binnedrtbinlen;
pub mod binnedrtmsplsps;
pub mod frombinned;
pub mod frombinnedandevents;
pub mod intraday;
pub mod msplspiter;

View File

@@ -74,7 +74,7 @@ impl BinnedRtMspLsps {
let msp = self.msp.to_u64();
let offs = self.lsps.0.to_u32()..self.lsps.1.to_u32();
// SAFETY we only use scyqueue while we self are alive.
let scyqueue = unsafe { &mut *(&mut self.scyqueue as *mut ScyllaQueue) };
let scyqueue = unsafe { &*(&self.scyqueue as *const ScyllaQueue) };
let fut = scyqueue.read_prebinned_f32(rt, series, binlen, msp, offs);
let fut = Box::pin(fut);
Some(fut)

View File

@@ -0,0 +1,117 @@
use crate::binwriteindex::read_all_coarse::ReadAllCoarse;
use crate::worker::ScyllaQueue;
use daqbuf_series::SeriesId;
use daqbuf_series::msp::LspU32;
use daqbuf_series::msp::MspU32;
use futures_util::Stream;
use futures_util::StreamExt;
use netpod::BinnedRange;
use netpod::DtMs;
use netpod::TsNano;
use netpod::range::evrange::NanoRange;
use netpod::ttl::RetentionTime;
use serde::Serialize;
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
autoerr::create_error_v1!(
name(Error, "Binned2FromBinned"),
enum variants {
A,
},
);
type IndexRow = (RetentionTime, MspU32, LspU32, DtMs);
enum StateA {
ReadAllCoarse(ReadAllCoarse, VecDeque<IndexRow>),
Done,
}
pub struct FromBinned {
// range: NanoRange,
binrange: BinnedRange<TsNano>,
state_a: StateA,
outbuf: VecDeque<String>,
}
fn def<T: Default>() -> T {
Default::default()
}
impl FromBinned {
pub fn new(series: SeriesId, binrange: BinnedRange<TsNano>, scyqueue: &ScyllaQueue) -> Self {
let state_a = StateA::ReadAllCoarse(
ReadAllCoarse::new(series, binrange.to_nano_range(), scyqueue.clone()),
def(),
);
Self {
binrange,
state_a,
outbuf: def(),
}
}
fn push_string<T: ToString>(&mut self, x: T) {
self.outbuf.push_back(x.to_string());
}
fn push_json<T: Serialize>(&mut self, x: T) {
let js = serde_json::to_string(&x).unwrap();
self.outbuf.push_back(js);
}
fn handle_coarse_index(&mut self, rows: VecDeque<IndexRow>) {
self.push_string(format!("handle_coarse_index"));
for e in rows {
self.push_string(format!("{:?}", e));
}
}
}
impl Stream for FromBinned {
type Item = Result<String, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
loop {
break if let Some(x) = self.outbuf.pop_front() {
Ready(Some(Ok(x)))
} else {
use StateA::*;
let self2 = self.as_mut().get_mut();
match &mut self2.state_a {
ReadAllCoarse(stb, rows) => match stb.poll_next_unpin(cx) {
Ready(Some(Ok(x))) => {
match x.into_data() {
Ok(x) => {
rows.push_back(x);
}
Err(x) => {
self2.push_string(format!("{:?}", x));
}
}
continue;
}
Ready(Some(Err(e))) => {
self2.push_string(e);
self2.state_a = StateA::Done;
continue;
}
Ready(None) => {
let a = std::mem::replace(rows, def());
self2.handle_coarse_index(a);
self2.state_a = StateA::Done;
self2.push_json(&"done with reading coarse");
continue;
}
Pending => Pending,
},
Done => Ready(None),
}
};
}
}
}

View File

@@ -0,0 +1 @@
pub struct FromBinnedAndEvents;

View File

@@ -2,16 +2,20 @@ pub mod bwxcmb;
pub mod read_all_coarse;
use crate::worker::ScyllaQueue;
use daqbuf_series::SeriesId;
use daqbuf_series::msp::MspU32;
use daqbuf_series::msp::PrebinnedPartitioning;
use daqbuf_series::SeriesId;
use futures_util::Future;
use futures_util::FutureExt;
use futures_util::Stream;
use items_0::streamitem::LogItem;
use items_0::streamitem::Sitemty3;
use items_0::streamitem::StreamItem;
use items_0::streamitem::sitem3_data;
use netpod::DtMs;
use netpod::log;
use netpod::range::evrange::NanoRange;
use netpod::ttl::RetentionTime;
use netpod::DtMs;
use std::collections::VecDeque;
use std::fmt;
use std::pin::Pin;
@@ -62,6 +66,7 @@ pub struct BinWriteIndexRtStream {
msp_end: u32,
lsp_end: u32,
fut1: Option<Fut1>,
logbuf: VecDeque<LogItem>,
}
impl BinWriteIndexRtStream {
@@ -95,6 +100,7 @@ impl BinWriteIndexRtStream {
msp_end,
lsp_end,
fut1: None,
logbuf: Default::default(),
}
}
@@ -115,12 +121,17 @@ impl BinWriteIndexRtStream {
}
fn make_next_query_fut(mut self: Pin<&mut Self>, _cx: &mut Context) -> Option<Fut1> {
let msg = format!(
"make_next_query_fut msp {} msp_end {} lsp_min {} lsp_end {}",
self.msp, self.msp_end, self.lsp_min, self.lsp_end
);
self.logbuf.push_back(LogItem::info(msg));
if self.msp <= self.msp_end {
let msp = self.msp;
let lsp_min = self.lsp_min;
self.msp += 1;
let lsp_min = self.lsp_min;
self.lsp_min = 0;
let lsp_max = if self.msp == self.msp_end {
let lsp_max = if self.msp > self.msp_end {
self.lsp_end
} else {
self.pbp.patch_len()
@@ -144,12 +155,14 @@ impl BinWriteIndexRtStream {
}
impl Stream for BinWriteIndexRtStream {
type Item = Result<BinWriteIndexSet, Error>;
type Item = Sitemty3<BinWriteIndexSet, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
loop {
break if let Some(fut) = self.fut1.as_mut() {
break if let Some(x) = self.logbuf.pop_front() {
Ready(Some(Ok(StreamItem::Log(x))))
} else if let Some(fut) = self.fut1.as_mut() {
match fut.0.poll_unpin(cx) {
Ready(Ok(x)) => {
self.fut1 = None;
@@ -157,7 +170,7 @@ impl Stream for BinWriteIndexRtStream {
msp: MspU32(x.0),
entries: x.3,
};
Ready(Some(Ok(item)))
Ready(Some(sitem3_data(item)))
}
Ready(Err(e)) => {
self.fut1 = None;

View File

@@ -1,14 +1,22 @@
use super::BinWriteIndexRtStream;
use crate::worker::ScyllaQueue;
use daqbuf_series::SeriesId;
use daqbuf_series::msp::LspU32;
use daqbuf_series::msp::MspU32;
use daqbuf_series::msp::PrebinnedPartitioning;
use daqbuf_series::SeriesId;
use futures_util::FutureExt;
use futures_util::Stream;
use futures_util::TryStreamExt;
use items_0::streamitem::Sitemty3;
use items_0::streamitem::sitem3_data;
use netpod::DtMs;
use netpod::log;
use netpod::range::evrange::NanoRange;
use netpod::ttl::RetentionTime;
use netpod::DtMs;
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
macro_rules! info { ($($arg:expr),*) => ( if true { log::info!($($arg),*); } ); }
macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); } ); }
@@ -21,11 +29,11 @@ autoerr::create_error_v1!(
},
);
pub async fn read_all_coarse(
async fn read_all_coarse(
series: SeriesId,
range: NanoRange,
scyqueue: &ScyllaQueue,
) -> Result<VecDeque<(RetentionTime, MspU32, u32, DtMs)>, Error> {
) -> Result<VecDeque<(RetentionTime, MspU32, LspU32, DtMs)>, Error> {
let rts = {
use RetentionTime::*;
[Long, Medium, Short]
@@ -35,20 +43,82 @@ pub async fn read_all_coarse(
let pbp = PrebinnedPartitioning::Day1;
let mut stream = BinWriteIndexRtStream::new(rt.clone(), series, pbp, range.clone(), scyqueue.clone());
while let Some(x) = stream.try_next().await? {
for e in x.entries {
let binlen = DtMs::from_ms_u64(e.binlen as u64);
let item = (rt.clone(), x.msp.clone(), e.lsp, binlen);
ret.push_back(item);
match x.into_data() {
Ok(x) => {
for e in x.entries {
let binlen = DtMs::from_ms_u64(e.binlen as u64);
let item = (rt.clone(), x.msp.clone(), LspU32(e.lsp), binlen);
ret.push_back(item);
}
}
Err(x) => {
// TODO check for other item types.
// match directly instead of into-helper.
}
}
}
}
Ok(ret)
}
pub fn select_potential_binlen(options: VecDeque<(RetentionTime, MspU32, u32, DtMs)>) -> Result<(), Error> {
pub fn select_potential_binlen(options: VecDeque<(RetentionTime, MspU32, LspU32, DtMs)>) -> Result<(), Error> {
// Check first if there are common binlen over all the range.
// If not, filter out the options which could build content from finer resolution.
// Then heuristically select the best match.
// PrebinnedPartitioning::Day1.msp_lsp(val)
todo!()
}
pub struct ReadAllCoarse {
#[allow(unused)]
scyqueue: Box<ScyllaQueue>,
fut: Option<Pin<Box<dyn Future<Output = Result<VecDeque<(RetentionTime, MspU32, LspU32, DtMs)>, Error>> + Send>>>,
results: VecDeque<(RetentionTime, MspU32, LspU32, DtMs)>,
}
impl ReadAllCoarse {
pub fn new(series: SeriesId, range: NanoRange, scyqueue: ScyllaQueue) -> Self {
let scyqueue = Box::new(scyqueue);
let fut = {
let scyqueue = unsafe { &*(scyqueue.as_ref() as *const ScyllaQueue) };
read_all_coarse(series, range, scyqueue)
};
Self {
scyqueue,
fut: Some(Box::pin(fut)),
results: VecDeque::new(),
}
}
}
impl Stream for ReadAllCoarse {
type Item = Sitemty3<(RetentionTime, MspU32, LspU32, DtMs), Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
loop {
break match self.fut.as_mut() {
Some(fut) => match fut.poll_unpin(cx) {
Ready(x) => {
self.fut = None;
match x {
Ok(x) => {
self.results.extend(x);
continue;
}
Err(e) => Ready(Some(Err(e))),
}
}
Pending => Pending,
},
None => {
if let Some(item) = self.results.pop_front() {
Ready(Some(sitem3_data(item)))
} else {
Ready(None)
}
}
};
}
}
}