From 31389fa7e312062b49b4a4812796d22ef8e4210e Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 23 May 2025 16:21:12 +0200 Subject: [PATCH] WIP binned v2 --- crates/httpret/src/api4/binned.rs | 2 +- crates/httpret/src/api4/binned_v2.rs | 64 ++++- crates/httpret/src/proxy.rs | 2 + crates/scyllaconn/Cargo.toml | 1 + crates/scyllaconn/src/binned2.rs | 1 + crates/scyllaconn/src/binned2/frombinned.rs | 251 +++++++++++++++++++- crates/scyllaconn/src/binned2/mspiter.rs | 83 +++++++ crates/scyllaconn/src/binned2/msplspiter.rs | 89 +++++-- 8 files changed, 452 insertions(+), 41 deletions(-) create mode 100644 crates/scyllaconn/src/binned2/mspiter.rs diff --git a/crates/httpret/src/api4/binned.rs b/crates/httpret/src/api4/binned.rs index b0fa73a..6b008ca 100644 --- a/crates/httpret/src/api4/binned.rs +++ b/crates/httpret/src/api4/binned.rs @@ -329,7 +329,7 @@ async fn binned_cbor_framed( Ok(ret) } -struct HandleRes2<'a> { +pub struct HandleRes2<'a> { logspan: Span, query: BinnedQuery, ch_conf: ChannelTypeConfigGen, diff --git a/crates/httpret/src/api4/binned_v2.rs b/crates/httpret/src/api4/binned_v2.rs index 7ab67bb..701beb1 100644 --- a/crates/httpret/src/api4/binned_v2.rs +++ b/crates/httpret/src/api4/binned_v2.rs @@ -45,11 +45,14 @@ use series::msp::PrebinnedPartitioning; use series::SeriesId; use std::pin::Pin; use std::sync::Arc; +use std::time::Duration; use streams::eventsplainreader::DummyCacheReadProvider; use streams::eventsplainreader::SfDatabufferEventReadProvider; use streams::streamtimeout::StreamTimeout2; +use streams::streamtimeout::TimeoutableStream; use streams::timebin::cached::reader::EventsReadProvider; use streams::timebin::CacheReadProvider; +use streams::timebinnedjson::timeoutable_collectable_stream_to_json_bytes; use tracing::Instrument; use tracing::Span; use url::Url; @@ -251,30 +254,67 @@ async fn binned_json_framed( _ncc: &NodeConfigCached, ) -> Result { use futures_util::Stream; + info!("binned_json_framed V2 prebinned"); let series = SeriesId::new(res2.ch_conf.series().unwrap()); let range = res2.query.range().to_time().unwrap(); let scyqueue = res2.scyqueue.as_ref().unwrap(); let stream = if res2.url.as_str().contains("testpart=read_all_coarse") { - let stream = scyllaconn::binwriteindex::read_all_coarse::ReadAllCoarse::new(series, range, scyqueue.clone()); - let stream = stream.map_ok(to_debug).map_err(Error::from); - let msg = format!("{}", res2.url.as_str()); - let stream = futures_util::stream::iter([Ok(msg)]).chain(stream); - Box::pin(stream) as Pin + Send>> + // let stream = scyllaconn::binwriteindex::read_all_coarse::ReadAllCoarse::new(series, range, scyqueue.clone()); + // let stream = stream.map_ok(to_debug).map_err(Error::from); + // let msg = format!("{}", res2.url.as_str()); + // let stream = futures_util::stream::iter([Ok(msg)]).chain(stream); + // Box::pin(stream) as Pin + Send>> + todo!() } else if res2.url.as_str().contains("testpart=frombinned") { let binrange = res2 .query .covering_range()? .binned_range_time() .ok_or_else(|| Error::BadRange)?; - let stream = scyllaconn::binned2::frombinned::FromBinned::new(series, binrange, scyqueue); + let stream = + scyllaconn::binned2::frombinned::FromBinned::new(series, binrange, scyqueue, res2.cache_read_provider); let stream = stream.map_err(Error::from); - let msg = format!("{}", res2.url.as_str()); - let stream = futures_util::stream::iter([Ok(msg)]).chain(stream); - Box::pin(stream) as Pin + Send>> + // let msg = format!("{}", res2.url.as_str()); + // let stream = futures_util::stream::iter([Ok(msg)]).chain(stream); + let stream = stream.map(|x| { + // + x + }); + let stream = stream.map(|item| { + use items_0::streamitem::RangeCompletableItem; + use items_0::streamitem::StreamItem; + use items_0::timebin::BinsBoxed; + match item { + Ok(StreamItem::DataItem(mut x)) => { + x.fix_numerics(); + let ret = x.boxed_into_collectable_box(); + Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret))) + } + Ok(StreamItem::Log(x)) => Ok(StreamItem::Log(x)), + Ok(StreamItem::Stats(x)) => Ok(StreamItem::Stats(x)), + Err(e) => Err(e), + } + }); + let stream = stream.map_err(|e| daqbuf_err::Error::from_string(e)); + let timeout_content_base = res2 + .query + .timeout_content() + .unwrap_or(Duration::from_millis(2000)) + .min(Duration::from_millis(8000)) + .max(Duration::from_millis(334)); + let timeout_content_2 = timeout_content_base * 2 / 3; + let stream = stream.map(|x| Some(x)).chain(futures_util::stream::iter([None])); + let stream = TimeoutableStream::new(timeout_content_base, res2.timeout_provider, stream); + let stream = Box::pin(stream); + let stream = timeoutable_collectable_stream_to_json_bytes(stream, timeout_content_2); + // let stream = stream.map(|x| Ok(format!("dummy82749827348932"))); + // Box::pin(stream) as Pin + Send>> + stream } else { - let msg = format!("UNKNOWN {}", res2.url.as_str()); - let stream = futures_util::stream::iter([Ok(msg)]); - Box::pin(stream) + // let msg = format!("UNKNOWN {}", res2.url.as_str()); + // let stream = futures_util::stream::iter([Ok(msg)]); + // Box::pin(stream) + todo!() }; let stream = streams::lenframe::bytes_chunks_to_len_framed_str(stream); let stream = streams::instrument::InstrumentStream::new(stream, res2.logspan); diff --git a/crates/httpret/src/proxy.rs b/crates/httpret/src/proxy.rs index 5aa358f..bd801eb 100644 --- a/crates/httpret/src/proxy.rs +++ b/crates/httpret/src/proxy.rs @@ -219,6 +219,8 @@ async fn proxy_http_service_inner( Ok(proxy_backend_query::(req, ctx, proxy_config).await?) } else if path == "/api/4/binned" { Ok(proxy_backend_query::(req, ctx, proxy_config).await?) + } else if path == "/api/4/private/binnedv2" { + Ok(proxy_backend_query::(req, ctx, proxy_config).await?) } else if path == "/api/4/channel/config" { Ok(proxy_backend_query::(req, ctx, proxy_config).await?) } else if path.starts_with("/api/4/test/http/204") { diff --git a/crates/scyllaconn/Cargo.toml b/crates/scyllaconn/Cargo.toml index 8f8ba27..0b89e33 100644 --- a/crates/scyllaconn/Cargo.toml +++ b/crates/scyllaconn/Cargo.toml @@ -12,6 +12,7 @@ scylla = "1.1" serde = { version = "1", features = ["derive"] } serde_json = "1" time = { version = "0.3.41", features = ["parsing", "formatting", "macros"] } +hashbrown = "0.15.3" autoerr = "0.0.3" daqbuf-err = { path = "../../../daqbuf-err" } netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" } diff --git a/crates/scyllaconn/src/binned2.rs b/crates/scyllaconn/src/binned2.rs index e8da6a9..2ac380a 100644 --- a/crates/scyllaconn/src/binned2.rs +++ b/crates/scyllaconn/src/binned2.rs @@ -3,4 +3,5 @@ pub mod binnedrtmsplsps; pub mod frombinned; pub mod frombinnedandevents; pub mod intraday; +pub mod mspiter; pub mod msplspiter; diff --git a/crates/scyllaconn/src/binned2/frombinned.rs b/crates/scyllaconn/src/binned2/frombinned.rs index ff4aef0..338f30e 100644 --- a/crates/scyllaconn/src/binned2/frombinned.rs +++ b/crates/scyllaconn/src/binned2/frombinned.rs @@ -1,20 +1,33 @@ +use super::mspiter::MspChunker; +use super::msplspiter::MspLspIter; use crate::binwriteindex::read_all_coarse::ReadAllCoarse; use crate::worker::ScyllaQueue; use daqbuf_series::SeriesId; use daqbuf_series::msp::LspU32; use daqbuf_series::msp::MspU32; +use daqbuf_series::msp::PrebinnedPartitioning; +use futures_util::FutureExt; use futures_util::Stream; use futures_util::StreamExt; +use hashbrown::HashMap; +use items_0::streamitem::Sitemty3; +use items_0::streamitem::sitem3_data; +use items_0::streamitem::sitem3_log_info; +use items_0::timebin::BinningggContainerBinsDyn; +use items_0::timebin::BinsBoxed; use netpod::BinnedRange; use netpod::DtMs; +use netpod::TsMs; use netpod::TsNano; use netpod::range::evrange::NanoRange; use netpod::ttl::RetentionTime; use serde::Serialize; use std::collections::VecDeque; use std::pin::Pin; +use std::sync::Arc; use std::task::Context; use std::task::Poll; +use streams::timebin::CacheReadProvider; autoerr::create_error_v1!( name(Error, "Binned2FromBinned"), @@ -25,16 +38,32 @@ autoerr::create_error_v1!( type IndexRow = (RetentionTime, MspU32, LspU32, DtMs); +#[derive(Debug)] +struct FetchJob { + min: (MspU32, LspU32), + max: (MspU32, LspU32), + dbg_ts_1: (TsMs, TsMs), + dbg_ts_2: (TsMs, TsMs), + rt: RetentionTime, + pbp: PrebinnedPartitioning, +} + enum StateA { ReadAllCoarse(ReadAllCoarse, VecDeque), + ExecuteJobs, Done, } pub struct FromBinned { - // range: NanoRange, + cache_read_provider: Arc, + series: SeriesId, binrange: BinnedRange, state_a: StateA, - outbuf: VecDeque, + index_map: HashMap<(MspU32, LspU32), VecDeque<(DtMs, RetentionTime)>>, + jobs: VecDeque, + job_fut: Option, Option>)> + Send>>>, + logoutbuf: VecDeque, + binoutbuf: VecDeque, } fn def() -> T { @@ -42,43 +71,211 @@ fn def() -> T { } impl FromBinned { - pub fn new(series: SeriesId, binrange: BinnedRange, scyqueue: &ScyllaQueue) -> Self { + pub fn new( + series: SeriesId, + binrange: BinnedRange, + scyqueue: &ScyllaQueue, + cache_read_provider: Arc, + ) -> Self { let state_a = StateA::ReadAllCoarse( ReadAllCoarse::new(series, binrange.to_nano_range(), scyqueue.clone()), def(), ); Self { + cache_read_provider, + series, binrange, state_a, - outbuf: def(), + index_map: def(), + jobs: def(), + job_fut: None, + logoutbuf: def(), + binoutbuf: def(), } } fn push_string(&mut self, x: T) { - self.outbuf.push_back(x.to_string()); + self.logoutbuf.push_back(x.to_string()); } fn push_json(&mut self, x: T) { let js = serde_json::to_string(&x).unwrap(); - self.outbuf.push_back(js); + self.logoutbuf.push_back(js); } fn handle_coarse_index(&mut self, rows: VecDeque) { self.push_string(format!("handle_coarse_index")); for e in rows { self.push_string(format!("{:?}", e)); + let k = (e.1, e.2); + let h = &mut self.index_map; + if let Some(v) = h.get_mut(&k) { + v.push_back((e.3, e.0)); + } else { + let mut v = VecDeque::new(); + v.push_back((e.3, e.0)); + h.insert(k, v); + } + } + self.index_map.iter_mut().for_each(|(_, v)| { + v.make_contiguous().sort(); + }); + } + + fn build_day1_jobs(&mut self) { + let mut jobs = VecDeque::new(); + let pbp1 = PrebinnedPartitioning::Day1; + self.push_string(format!("binrange {:?}", self.binrange)); + self.push_string(format!("to_nano_range {:?}", self.binrange.to_nano_range())); + let it = MspLspIter::new_covering(self.binrange.to_nano_range(), pbp1.clone()); + for day in it { + { + let nday = pbp1.patch_len() as u64 * day.0.to_u64() + day.1.to_u32() as u64; + let ts = 60 * 60 * 24 * nday; + let ts = time::UtcDateTime::from_unix_timestamp(ts as _); + self.push_string(format!("build_day1_jobs {:?} ts {:?}", day, ts)); + } + let k = (day.0, day.1); + if let Some(ixs) = self.index_map.get(&k) { + let mut log = Vec::new(); + log.push(format!("have index {:?}", ixs)); + let mut found = None; + for ix in ixs.iter().rev() { + if ix.0.ms() <= self.binrange.bin_len_dt_ms().ms() { + if let Some(found) = found.as_ref() { + log.push(format!("already found a better solution found {:?} e {:?}", found, ix)); + } else { + match PrebinnedPartitioning::from_binlen(ix.0) { + Ok(pbp2) => { + log.push(format!("FOUND {:?}", ix)); + found = Some((ix.1.clone(), pbp2)); + } + Err(_) => { + log.push(format!("binlen not a pbp {:?}", ix)); + } + } + } + } else { + log.push(format!("too coarse {:?}", ix)); + } + } + for x in log { + self.push_string(x); + } + if let Some(ix) = found { + // determine already here the msp/lsp range that this job must query + // for the also given pbp. + let pbp2 = ix.1.clone(); + let rbeg = self.binrange.nano_beg().to_ts_ms(); + let rend = self.binrange.nano_end().to_ts_ms(); + let gbeg = pbp1.msp_lsp_to_ts(day.0, day.1); + let gend = gbeg.add_dt_ms(pbp1.bin_len()); + let beg = if rbeg > gbeg { rbeg } else { gbeg }; + let end = if rend < gend { rend } else { gend }; + { + let ts_beg = time::UtcDateTime::from_unix_timestamp(beg.sec() as _); + let ts_end = time::UtcDateTime::from_unix_timestamp(end.sec() as _); + self.push_string(format!("beg {:?} end {:?}", ts_beg, ts_end)); + } + let it3 = { + let range = NanoRange::from_ms_u64(beg.ms(), end.ms()); + MspLspIter::new_covering(range, pbp2.clone()) + }; + let job = FetchJob { + min: it3.mins(), + max: it3.maxs(), + dbg_ts_1: (beg, end), + // TODO remove + dbg_ts_2: (beg, end), + rt: ix.0, + // TODO which pbp is expected here? + pbp: pbp2, + }; + jobs.push_back(job); + } + } else { + self.push_string(format!("no index entry")); + } + } + self.jobs = jobs; + } + + fn make_next_job( + &mut self, + ) -> Option, Option>)> + Send>>> { + // iterate over the jobs and subjobs. + // each cache read provider task reads from a specific msp. + // therefore, I have to break at the msp boundaries. + // for that, use a msp iterator. + if let Some(job) = self.jobs.pop_front() { + let series = self.series; + let pbp = job.pbp.clone(); + let binlen = pbp.bin_len(); + // let tsbeg: TsMs = job.2; + // let tsend: TsMs = job.3; + // let range = NanoRange::from_ms_u64(tsbeg.ms(), tsend.ms()); + + self.push_string(format!("make job {:?}", job)); + + let chunk_it = MspChunker::from_min_max(pbp, job.min, job.max); + self.push_string(format!("chunk_it {:?}", chunk_it)); + let mut futs = VecDeque::new(); + for chunk in chunk_it { + self.push_string(format!("chunk {:?}", chunk)); + let offs = chunk.lsp1.to_u32()..chunk.lsp2.to_u32(); + let fut = self + .cache_read_provider + .read(series.id(), binlen, chunk.msp.to_u64(), offs); + futs.push_back(fut); + } + + // let msp = job.min.0.0 as u64; + // TODO add helper, make safer + // let offs = (job.min.1.0 as u32)..(job.max.1.0 as u32); + + let fut = async move { + let mut log = Vec::new(); + let mut bins2: Option> = None; + for fut in futs { + match fut.await { + Ok(x) => match x { + Some(mut bins) => match bins2.as_mut() { + Some(bins2) => { + log.push(format!("drain into len {}", bins.len())); + bins.drain_into(bins2.as_mut(), 0..bins.len()); + } + None => { + bins2 = Some(bins); + } + }, + None => { + log.push(format!("binned read job returns None")); + } + }, + Err(e) => { + log.push(format!("error in binned read job {}", e)); + } + } + } + (log, bins2) + }; + Some(Box::pin(fut)) + } else { + None } } } impl Stream for FromBinned { - type Item = Result; + type Item = Sitemty3; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; loop { - break if let Some(x) = self.outbuf.pop_front() { - Ready(Some(Ok(x))) + break if let Some(x) = self.logoutbuf.pop_front() { + Ready(Some(sitem3_log_info(x))) + } else if let Some(x) = self.binoutbuf.pop_front() { + Ready(Some(sitem3_data(x))) } else { use StateA::*; let self2 = self.as_mut().get_mut(); @@ -102,13 +299,45 @@ impl Stream for FromBinned { } Ready(None) => { let a = std::mem::replace(rows, def()); - self2.handle_coarse_index(a); self2.state_a = StateA::Done; - self2.push_json(&"done with reading coarse"); + self2.push_string("done with reading coarse"); + self2.handle_coarse_index(a); + self2.build_day1_jobs(); + self2.state_a = StateA::ExecuteJobs; continue; } Pending => Pending, }, + ExecuteJobs => { + if let Some(fut) = self2.job_fut.as_mut() { + match fut.poll_unpin(cx) { + Ready((log, bins)) => { + self2.job_fut = None; + self.push_string(format!("ExecuteJobs sees Ready")); + for x in log { + self.push_string(x); + } + match bins { + Some(bins) => { + self.binoutbuf.push_back(bins); + } + None => { + // TODO report + } + } + continue; + } + Pending => Pending, + } + } else if let Some(fut) = self2.make_next_job() { + // self2.state_a = StateA::ExecuteJobs; + self2.job_fut = Some(fut); + continue; + } else { + self2.state_a = StateA::Done; + continue; + } + } Done => Ready(None), } }; diff --git a/crates/scyllaconn/src/binned2/mspiter.rs b/crates/scyllaconn/src/binned2/mspiter.rs new file mode 100644 index 0000000..50e1d3c --- /dev/null +++ b/crates/scyllaconn/src/binned2/mspiter.rs @@ -0,0 +1,83 @@ +use daqbuf_series::msp::LspU32; +use daqbuf_series::msp::MspU32; +use daqbuf_series::msp::PrebinnedPartitioning; +use netpod::DtMs; +use netpod::range::evrange::NanoRange; + +// lsp2 is meant exclusive. +#[derive(Debug, Clone)] +pub struct MspChunkerItem { + pub msp: MspU32, + pub lsp1: LspU32, + pub lsp2: LspU32, +} + +#[derive(Debug)] +pub struct MspChunker { + pbp: PrebinnedPartitioning, + #[allow(unused)] + min: (MspU32, LspU32), + max: (MspU32, LspU32), + cur: (MspU32, LspU32), +} + +impl MspChunker { + pub fn new_covering(range: NanoRange, pbp: PrebinnedPartitioning) -> Self { + let mins = pbp.msp_lsp(range.beg_ts().to_ts_ms()); + let mins = (MspU32(mins.0), LspU32(mins.1)); + let end1 = range.end_ts().to_ts_ms(); + let g = DtMs::from_ms_u64(pbp.bin_len().ms() - 1); + let end2 = end1.add_dt_ms(g); + let maxs = pbp.msp_lsp(end2); + let maxs = (MspU32(maxs.0), LspU32(maxs.1)); + Self { + pbp, + min: mins, + max: maxs, + cur: mins, + } + } + + pub fn from_min_max(pbp: PrebinnedPartitioning, min: (MspU32, LspU32), max: (MspU32, LspU32)) -> Self { + let curs = min; + Self { + pbp, + min, + max, + cur: curs, + } + } +} + +impl Iterator for MspChunker { + type Item = MspChunkerItem; + + fn next(&mut self) -> Option { + if self.cur.0 >= self.max.0 && self.cur.1 >= self.max.1 { + None + } else { + if self.cur.0 < self.max.0 { + let item = MspChunkerItem { + msp: self.cur.0, + lsp1: self.cur.1, + lsp2: LspU32(self.pbp.patch_len()), + }; + self.cur.0.0 += 1; + self.cur.1 = LspU32(0); + Some(item) + } else { + if self.cur.1 < self.max.1 { + let item = MspChunkerItem { + msp: self.cur.0, + lsp1: self.cur.1, + lsp2: self.max.1, + }; + self.cur.1 = self.max.1; + Some(item) + } else { + None + } + } + } + } +} diff --git a/crates/scyllaconn/src/binned2/msplspiter.rs b/crates/scyllaconn/src/binned2/msplspiter.rs index f09d2e4..345f996 100644 --- a/crates/scyllaconn/src/binned2/msplspiter.rs +++ b/crates/scyllaconn/src/binned2/msplspiter.rs @@ -1,7 +1,7 @@ use daqbuf_series::msp::LspU32; use daqbuf_series::msp::MspU32; use daqbuf_series::msp::PrebinnedPartitioning; -use netpod::TsMs; +use netpod::DtMs; use netpod::range::evrange::NanoRange; #[derive(Debug, Clone)] @@ -12,15 +12,37 @@ pub struct MspLspItem { #[derive(Debug)] pub struct MspLspIter { + #[allow(unused)] range: NanoRange, pbp: PrebinnedPartitioning, - ts: TsMs, + #[allow(unused)] + mins: (u32, u32), + maxs: (u32, u32), + curs: (u32, u32), } impl MspLspIter { - pub fn new(range: NanoRange, pbp: PrebinnedPartitioning) -> Self { - let ts = range.beg_ts().to_ts_ms(); - Self { range, pbp, ts } + pub fn new_covering(range: NanoRange, pbp: PrebinnedPartitioning) -> Self { + let mins = pbp.msp_lsp(range.beg_ts().to_ts_ms()); + let end1 = range.end_ts().to_ts_ms(); + let g = DtMs::from_ms_u64(pbp.bin_len().ms() - 1); + let end2 = end1.add_dt_ms(g); + let maxs = pbp.msp_lsp(end2); + Self { + range, + pbp, + mins, + maxs, + curs: mins, + } + } + + pub fn mins(&self) -> (MspU32, LspU32) { + (MspU32(self.mins.0), LspU32(self.mins.1)) + } + + pub fn maxs(&self) -> (MspU32, LspU32) { + (MspU32(self.maxs.0), LspU32(self.maxs.1)) } } @@ -28,13 +50,16 @@ impl Iterator for MspLspIter { type Item = (MspU32, LspU32); fn next(&mut self) -> Option { - if self.ts >= self.range.end_ts().to_ts_ms() { + if self.curs.0 >= self.maxs.0 && self.curs.1 >= self.maxs.1 { None } else { - let x = self.pbp.msp_lsp(self.ts); - let msp = MspU32(x.0); - let lsp = LspU32(x.1); - self.ts = self.ts.add_dt_ms(self.pbp.bin_len()); + let msp = MspU32(self.curs.0); + let lsp = LspU32(self.curs.1); + self.curs.1 += 1; + if self.curs.1 >= self.pbp.patch_len() { + self.curs.1 = 0; + self.curs.0 += 1; + } Some((msp, lsp)) } } @@ -45,7 +70,7 @@ fn test_iter_00() { let range = NanoRange::from_strings("2024-06-07T09:17:31Z", "2024-06-07T09:17:31Z").unwrap(); let pbp = PrebinnedPartitioning::Sec1; assert_eq!(pbp.patch_len(), 1200); - let it = MspLspIter::new(range, pbp); + let it = MspLspIter::new_covering(range, pbp); let a: Vec<_> = it.collect(); assert_eq!(a.len(), 0); } @@ -54,12 +79,15 @@ fn test_iter_00() { fn test_iter_01() { let range = NanoRange::from_strings("2024-06-07T09:17:31Z", "2024-06-07T09:17:32Z").unwrap(); let pbp = PrebinnedPartitioning::Sec1; - assert_eq!(pbp.patch_len(), 1200); - let it = MspLspIter::new(range, pbp.clone()); + let it = MspLspIter::new_covering(range, pbp.clone()); let a: Vec<_> = it.collect(); assert_eq!(a.len(), 1); - assert_eq!(a[0].0.0, 1431459); - assert_eq!(a[0].1.0, 1051); + let e = a.first().unwrap(); + assert_eq!(e.0.0, 1431459); + assert_eq!(e.1.0, 1051); + // let e = a.last().unwrap(); + // assert_eq!(e.0.0, 1431459); + // assert_eq!(e.1.0, 1051); let ts_sec = pbp.patch_len() as u64 * a[0].0.0 as u64 + a[0].1.0 as u64; // time::UtcDateTime::new(time::Date::with, time) let ts1 = time::UtcDateTime::from_unix_timestamp(ts_sec as i64).unwrap(); @@ -72,7 +100,7 @@ fn test_iter_01() { fn test_iter_02() { let range = NanoRange::from_strings("2024-06-07T09:17:31Z", "2024-06-07T09:22:00Z").unwrap(); let pbp = PrebinnedPartitioning::Sec1; - let it = MspLspIter::new(range, pbp.clone()); + let it = MspLspIter::new_covering(range, pbp.clone()); let a: Vec<_> = it.collect(); assert_eq!(a.len(), 240 + 29); let e = &a[a.len() - 1]; @@ -85,7 +113,7 @@ fn test_iter_03() { // Check clamped covering in correct direction let range = NanoRange::from_strings("2024-06-07T09:17:31.1Z", "2024-06-07T09:21:59.8Z").unwrap(); let pbp = PrebinnedPartitioning::Sec1; - let it = MspLspIter::new(range, pbp.clone()); + let it = MspLspIter::new_covering(range, pbp.clone()); let a: Vec<_> = it.collect(); assert_eq!(a.len(), 240 + 29); let e = &a[0]; @@ -95,3 +123,30 @@ fn test_iter_03() { assert_eq!(e.0.0, 1431460); assert_eq!(e.1.0, 119); } + +#[test] +fn test_iter_04() { + // Check clamped covering in correct direction + let range = NanoRange::from_strings("2024-06-07T09:17:31.9Z", "2024-06-07T09:21:59.1Z").unwrap(); + let pbp = PrebinnedPartitioning::Sec1; + let it = MspLspIter::new_covering(range, pbp.clone()); + let a: Vec<_> = it.collect(); + assert_eq!(a.len(), 240 + 29); + let e = &a[0]; + assert_eq!(e.0.0, 1431459); + assert_eq!(e.1.0, 1051); + let e = &a[a.len() - 1]; + assert_eq!(e.0.0, 1431460); + assert_eq!(e.1.0, 119); +} + +#[test] +fn test_iter_05() { + let range = NanoRange::from_strings("2025-05-05T12:00:00Z", "2025-05-08T00:00:00Z").unwrap(); + let pbp = PrebinnedPartitioning::Day1; + let it = MspLspIter::new_covering(range, pbp.clone()); + let a: Vec<_> = it.collect(); + assert_eq!(a.len(), 3); + let e = &a[0]; + let e = &a[a.len() - 1]; +}