diff --git a/crates/httpret/src/api4/binned_v2.rs b/crates/httpret/src/api4/binned_v2.rs index dbd592d..7ab67bb 100644 --- a/crates/httpret/src/api4/binned_v2.rs +++ b/crates/httpret/src/api4/binned_v2.rs @@ -27,6 +27,7 @@ use netpod::log; use netpod::req_uri_to_url; use netpod::timeunits::SEC; use netpod::ttl::RetentionTime; +use netpod::BinnedRange; use netpod::ChannelTypeConfigGen; use netpod::FromUrl; use netpod::NodeConfigCached; @@ -72,6 +73,9 @@ autoerr::create_error_v1!( BinnedStream(err::Error), TimebinnedJson(#[from] streams::timebinnedjson::Error), ReadAllCoarse(#[from] scyllaconn::binwriteindex::read_all_coarse::Error), + Binned2FromBinned(#[from] scyllaconn::binned2::frombinned::Error), + BinnedQuery(#[from] query::api4::binned::Error), + BadRange, }, ); @@ -237,24 +241,53 @@ fn make_read_provider( (events_read_provider, cache_read_provider) } +fn to_debug(x: T) -> String { + format!("{:?}", x) +} + async fn binned_json_framed( res2: HandleRes2<'_>, ctx: &ReqCtx, _ncc: &NodeConfigCached, ) -> Result { + use futures_util::Stream; let series = SeriesId::new(res2.ch_conf.series().unwrap()); let range = res2.query.range().to_time().unwrap(); let scyqueue = res2.scyqueue.as_ref().unwrap(); - let res = scyllaconn::binwriteindex::read_all_coarse::read_all_coarse(series, range, scyqueue).await?; - let mut strings = Vec::new(); - for e in res { - strings.push(format!("{:?}", e)); - } + let stream = if res2.url.as_str().contains("testpart=read_all_coarse") { + let stream = scyllaconn::binwriteindex::read_all_coarse::ReadAllCoarse::new(series, range, scyqueue.clone()); + let stream = stream.map_ok(to_debug).map_err(Error::from); + let msg = format!("{}", res2.url.as_str()); + let stream = futures_util::stream::iter([Ok(msg)]).chain(stream); + Box::pin(stream) as Pin + Send>> + } else if res2.url.as_str().contains("testpart=frombinned") { + let binrange = res2 + .query + .covering_range()? + .binned_range_time() + .ok_or_else(|| Error::BadRange)?; + let stream = scyllaconn::binned2::frombinned::FromBinned::new(series, binrange, scyqueue); + let stream = stream.map_err(Error::from); + let msg = format!("{}", res2.url.as_str()); + let stream = futures_util::stream::iter([Ok(msg)]).chain(stream); + Box::pin(stream) as Pin + Send>> + } else { + let msg = format!("UNKNOWN {}", res2.url.as_str()); + let stream = futures_util::stream::iter([Ok(msg)]); + Box::pin(stream) + }; + let stream = streams::lenframe::bytes_chunks_to_len_framed_str(stream); + let stream = streams::instrument::InstrumentStream::new(stream, res2.logspan); let ret = response(StatusCode::OK) - .header(CONTENT_TYPE, APP_JSON) + .header(CONTENT_TYPE, APP_JSON_FRAMED) .header(HEADER_NAME_REQUEST_ID, ctx.reqid()) - .body(ToJsonBody::from(&strings).into_body())?; + .body(body_stream(stream))?; Ok(ret) + // let ret = response(StatusCode::OK) + // .header(CONTENT_TYPE, APP_JSON) + // .header(HEADER_NAME_REQUEST_ID, ctx.reqid()) + // .body(ToJsonBody::from(&strings).into_body())?; + // Ok(ret) } struct HandleRes2<'a> { diff --git a/crates/scyllaconn/Cargo.toml b/crates/scyllaconn/Cargo.toml index 23f0918..8f8ba27 100644 --- a/crates/scyllaconn/Cargo.toml +++ b/crates/scyllaconn/Cargo.toml @@ -9,6 +9,8 @@ futures-util = "0.3.31" pin-project = "1" async-channel = "2.3.1" scylla = "1.1" +serde = { version = "1", features = ["derive"] } +serde_json = "1" time = { version = "0.3.41", features = ["parsing", "formatting", "macros"] } autoerr = "0.0.3" daqbuf-err = { path = "../../../daqbuf-err" } diff --git a/crates/scyllaconn/src/binned2.rs b/crates/scyllaconn/src/binned2.rs index 56fe580..e8da6a9 100644 --- a/crates/scyllaconn/src/binned2.rs +++ b/crates/scyllaconn/src/binned2.rs @@ -1,4 +1,6 @@ pub mod binnedrtbinlen; pub mod binnedrtmsplsps; +pub mod frombinned; +pub mod frombinnedandevents; pub mod intraday; pub mod msplspiter; diff --git a/crates/scyllaconn/src/binned2/binnedrtmsplsps.rs b/crates/scyllaconn/src/binned2/binnedrtmsplsps.rs index 4eaffbc..1ef0212 100644 --- a/crates/scyllaconn/src/binned2/binnedrtmsplsps.rs +++ b/crates/scyllaconn/src/binned2/binnedrtmsplsps.rs @@ -74,7 +74,7 @@ impl BinnedRtMspLsps { let msp = self.msp.to_u64(); let offs = self.lsps.0.to_u32()..self.lsps.1.to_u32(); // SAFETY we only use scyqueue while we self are alive. - let scyqueue = unsafe { &mut *(&mut self.scyqueue as *mut ScyllaQueue) }; + let scyqueue = unsafe { &*(&self.scyqueue as *const ScyllaQueue) }; let fut = scyqueue.read_prebinned_f32(rt, series, binlen, msp, offs); let fut = Box::pin(fut); Some(fut) diff --git a/crates/scyllaconn/src/binned2/frombinned.rs b/crates/scyllaconn/src/binned2/frombinned.rs new file mode 100644 index 0000000..ff4aef0 --- /dev/null +++ b/crates/scyllaconn/src/binned2/frombinned.rs @@ -0,0 +1,117 @@ +use crate::binwriteindex::read_all_coarse::ReadAllCoarse; +use crate::worker::ScyllaQueue; +use daqbuf_series::SeriesId; +use daqbuf_series::msp::LspU32; +use daqbuf_series::msp::MspU32; +use futures_util::Stream; +use futures_util::StreamExt; +use netpod::BinnedRange; +use netpod::DtMs; +use netpod::TsNano; +use netpod::range::evrange::NanoRange; +use netpod::ttl::RetentionTime; +use serde::Serialize; +use std::collections::VecDeque; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +autoerr::create_error_v1!( + name(Error, "Binned2FromBinned"), + enum variants { + A, + }, +); + +type IndexRow = (RetentionTime, MspU32, LspU32, DtMs); + +enum StateA { + ReadAllCoarse(ReadAllCoarse, VecDeque), + Done, +} + +pub struct FromBinned { + // range: NanoRange, + binrange: BinnedRange, + state_a: StateA, + outbuf: VecDeque, +} + +fn def() -> T { + Default::default() +} + +impl FromBinned { + pub fn new(series: SeriesId, binrange: BinnedRange, scyqueue: &ScyllaQueue) -> Self { + let state_a = StateA::ReadAllCoarse( + ReadAllCoarse::new(series, binrange.to_nano_range(), scyqueue.clone()), + def(), + ); + Self { + binrange, + state_a, + outbuf: def(), + } + } + + fn push_string(&mut self, x: T) { + self.outbuf.push_back(x.to_string()); + } + + fn push_json(&mut self, x: T) { + let js = serde_json::to_string(&x).unwrap(); + self.outbuf.push_back(js); + } + + fn handle_coarse_index(&mut self, rows: VecDeque) { + self.push_string(format!("handle_coarse_index")); + for e in rows { + self.push_string(format!("{:?}", e)); + } + } +} + +impl Stream for FromBinned { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + loop { + break if let Some(x) = self.outbuf.pop_front() { + Ready(Some(Ok(x))) + } else { + use StateA::*; + let self2 = self.as_mut().get_mut(); + match &mut self2.state_a { + ReadAllCoarse(stb, rows) => match stb.poll_next_unpin(cx) { + Ready(Some(Ok(x))) => { + match x.into_data() { + Ok(x) => { + rows.push_back(x); + } + Err(x) => { + self2.push_string(format!("{:?}", x)); + } + } + continue; + } + Ready(Some(Err(e))) => { + self2.push_string(e); + self2.state_a = StateA::Done; + continue; + } + Ready(None) => { + let a = std::mem::replace(rows, def()); + self2.handle_coarse_index(a); + self2.state_a = StateA::Done; + self2.push_json(&"done with reading coarse"); + continue; + } + Pending => Pending, + }, + Done => Ready(None), + } + }; + } + } +} diff --git a/crates/scyllaconn/src/binned2/frombinnedandevents.rs b/crates/scyllaconn/src/binned2/frombinnedandevents.rs new file mode 100644 index 0000000..f23f318 --- /dev/null +++ b/crates/scyllaconn/src/binned2/frombinnedandevents.rs @@ -0,0 +1 @@ +pub struct FromBinnedAndEvents; diff --git a/crates/scyllaconn/src/binwriteindex.rs b/crates/scyllaconn/src/binwriteindex.rs index 4595442..0015844 100644 --- a/crates/scyllaconn/src/binwriteindex.rs +++ b/crates/scyllaconn/src/binwriteindex.rs @@ -2,16 +2,20 @@ pub mod bwxcmb; pub mod read_all_coarse; use crate::worker::ScyllaQueue; +use daqbuf_series::SeriesId; use daqbuf_series::msp::MspU32; use daqbuf_series::msp::PrebinnedPartitioning; -use daqbuf_series::SeriesId; use futures_util::Future; use futures_util::FutureExt; use futures_util::Stream; +use items_0::streamitem::LogItem; +use items_0::streamitem::Sitemty3; +use items_0::streamitem::StreamItem; +use items_0::streamitem::sitem3_data; +use netpod::DtMs; use netpod::log; use netpod::range::evrange::NanoRange; use netpod::ttl::RetentionTime; -use netpod::DtMs; use std::collections::VecDeque; use std::fmt; use std::pin::Pin; @@ -62,6 +66,7 @@ pub struct BinWriteIndexRtStream { msp_end: u32, lsp_end: u32, fut1: Option, + logbuf: VecDeque, } impl BinWriteIndexRtStream { @@ -95,6 +100,7 @@ impl BinWriteIndexRtStream { msp_end, lsp_end, fut1: None, + logbuf: Default::default(), } } @@ -115,12 +121,17 @@ impl BinWriteIndexRtStream { } fn make_next_query_fut(mut self: Pin<&mut Self>, _cx: &mut Context) -> Option { + let msg = format!( + "make_next_query_fut msp {} msp_end {} lsp_min {} lsp_end {}", + self.msp, self.msp_end, self.lsp_min, self.lsp_end + ); + self.logbuf.push_back(LogItem::info(msg)); if self.msp <= self.msp_end { let msp = self.msp; - let lsp_min = self.lsp_min; self.msp += 1; + let lsp_min = self.lsp_min; self.lsp_min = 0; - let lsp_max = if self.msp == self.msp_end { + let lsp_max = if self.msp > self.msp_end { self.lsp_end } else { self.pbp.patch_len() @@ -144,12 +155,14 @@ impl BinWriteIndexRtStream { } impl Stream for BinWriteIndexRtStream { - type Item = Result; + type Item = Sitemty3; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; loop { - break if let Some(fut) = self.fut1.as_mut() { + break if let Some(x) = self.logbuf.pop_front() { + Ready(Some(Ok(StreamItem::Log(x)))) + } else if let Some(fut) = self.fut1.as_mut() { match fut.0.poll_unpin(cx) { Ready(Ok(x)) => { self.fut1 = None; @@ -157,7 +170,7 @@ impl Stream for BinWriteIndexRtStream { msp: MspU32(x.0), entries: x.3, }; - Ready(Some(Ok(item))) + Ready(Some(sitem3_data(item))) } Ready(Err(e)) => { self.fut1 = None; diff --git a/crates/scyllaconn/src/binwriteindex/read_all_coarse.rs b/crates/scyllaconn/src/binwriteindex/read_all_coarse.rs index 5bf2a17..b23f060 100644 --- a/crates/scyllaconn/src/binwriteindex/read_all_coarse.rs +++ b/crates/scyllaconn/src/binwriteindex/read_all_coarse.rs @@ -1,14 +1,22 @@ use super::BinWriteIndexRtStream; use crate::worker::ScyllaQueue; +use daqbuf_series::SeriesId; +use daqbuf_series::msp::LspU32; use daqbuf_series::msp::MspU32; use daqbuf_series::msp::PrebinnedPartitioning; -use daqbuf_series::SeriesId; +use futures_util::FutureExt; +use futures_util::Stream; use futures_util::TryStreamExt; +use items_0::streamitem::Sitemty3; +use items_0::streamitem::sitem3_data; +use netpod::DtMs; use netpod::log; use netpod::range::evrange::NanoRange; use netpod::ttl::RetentionTime; -use netpod::DtMs; use std::collections::VecDeque; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; macro_rules! info { ($($arg:expr),*) => ( if true { log::info!($($arg),*); } ); } macro_rules! debug { ($($arg:expr),*) => ( if true { log::debug!($($arg),*); } ); } @@ -21,11 +29,11 @@ autoerr::create_error_v1!( }, ); -pub async fn read_all_coarse( +async fn read_all_coarse( series: SeriesId, range: NanoRange, scyqueue: &ScyllaQueue, -) -> Result, Error> { +) -> Result, Error> { let rts = { use RetentionTime::*; [Long, Medium, Short] @@ -35,20 +43,82 @@ pub async fn read_all_coarse( let pbp = PrebinnedPartitioning::Day1; let mut stream = BinWriteIndexRtStream::new(rt.clone(), series, pbp, range.clone(), scyqueue.clone()); while let Some(x) = stream.try_next().await? { - for e in x.entries { - let binlen = DtMs::from_ms_u64(e.binlen as u64); - let item = (rt.clone(), x.msp.clone(), e.lsp, binlen); - ret.push_back(item); + match x.into_data() { + Ok(x) => { + for e in x.entries { + let binlen = DtMs::from_ms_u64(e.binlen as u64); + let item = (rt.clone(), x.msp.clone(), LspU32(e.lsp), binlen); + ret.push_back(item); + } + } + Err(x) => { + // TODO check for other item types. + // match directly instead of into-helper. + } } } } Ok(ret) } -pub fn select_potential_binlen(options: VecDeque<(RetentionTime, MspU32, u32, DtMs)>) -> Result<(), Error> { +pub fn select_potential_binlen(options: VecDeque<(RetentionTime, MspU32, LspU32, DtMs)>) -> Result<(), Error> { // Check first if there are common binlen over all the range. // If not, filter out the options which could build content from finer resolution. // Then heuristically select the best match. // PrebinnedPartitioning::Day1.msp_lsp(val) todo!() } + +pub struct ReadAllCoarse { + #[allow(unused)] + scyqueue: Box, + fut: Option, Error>> + Send>>>, + results: VecDeque<(RetentionTime, MspU32, LspU32, DtMs)>, +} + +impl ReadAllCoarse { + pub fn new(series: SeriesId, range: NanoRange, scyqueue: ScyllaQueue) -> Self { + let scyqueue = Box::new(scyqueue); + let fut = { + let scyqueue = unsafe { &*(scyqueue.as_ref() as *const ScyllaQueue) }; + read_all_coarse(series, range, scyqueue) + }; + Self { + scyqueue, + fut: Some(Box::pin(fut)), + results: VecDeque::new(), + } + } +} + +impl Stream for ReadAllCoarse { + type Item = Sitemty3<(RetentionTime, MspU32, LspU32, DtMs), Error>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + loop { + break match self.fut.as_mut() { + Some(fut) => match fut.poll_unpin(cx) { + Ready(x) => { + self.fut = None; + match x { + Ok(x) => { + self.results.extend(x); + continue; + } + Err(e) => Ready(Some(Err(e))), + } + } + Pending => Pending, + }, + None => { + if let Some(item) = self.results.pop_front() { + Ready(Some(sitem3_data(item))) + } else { + Ready(None) + } + } + }; + } + } +}