WIP binned v2
This commit is contained in:
@@ -329,7 +329,7 @@ async fn binned_cbor_framed(
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
struct HandleRes2<'a> {
|
||||
pub struct HandleRes2<'a> {
|
||||
logspan: Span,
|
||||
query: BinnedQuery,
|
||||
ch_conf: ChannelTypeConfigGen,
|
||||
|
||||
@@ -45,11 +45,14 @@ use series::msp::PrebinnedPartitioning;
|
||||
use series::SeriesId;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use streams::eventsplainreader::DummyCacheReadProvider;
|
||||
use streams::eventsplainreader::SfDatabufferEventReadProvider;
|
||||
use streams::streamtimeout::StreamTimeout2;
|
||||
use streams::streamtimeout::TimeoutableStream;
|
||||
use streams::timebin::cached::reader::EventsReadProvider;
|
||||
use streams::timebin::CacheReadProvider;
|
||||
use streams::timebinnedjson::timeoutable_collectable_stream_to_json_bytes;
|
||||
use tracing::Instrument;
|
||||
use tracing::Span;
|
||||
use url::Url;
|
||||
@@ -251,30 +254,67 @@ async fn binned_json_framed(
|
||||
_ncc: &NodeConfigCached,
|
||||
) -> Result<StreamResponse, Error> {
|
||||
use futures_util::Stream;
|
||||
info!("binned_json_framed V2 prebinned");
|
||||
let series = SeriesId::new(res2.ch_conf.series().unwrap());
|
||||
let range = res2.query.range().to_time().unwrap();
|
||||
let scyqueue = res2.scyqueue.as_ref().unwrap();
|
||||
let stream = if res2.url.as_str().contains("testpart=read_all_coarse") {
|
||||
let stream = scyllaconn::binwriteindex::read_all_coarse::ReadAllCoarse::new(series, range, scyqueue.clone());
|
||||
let stream = stream.map_ok(to_debug).map_err(Error::from);
|
||||
let msg = format!("{}", res2.url.as_str());
|
||||
let stream = futures_util::stream::iter([Ok(msg)]).chain(stream);
|
||||
Box::pin(stream) as Pin<Box<dyn Stream<Item = _> + Send>>
|
||||
// let stream = scyllaconn::binwriteindex::read_all_coarse::ReadAllCoarse::new(series, range, scyqueue.clone());
|
||||
// let stream = stream.map_ok(to_debug).map_err(Error::from);
|
||||
// let msg = format!("{}", res2.url.as_str());
|
||||
// let stream = futures_util::stream::iter([Ok(msg)]).chain(stream);
|
||||
// Box::pin(stream) as Pin<Box<dyn Stream<Item = _> + Send>>
|
||||
todo!()
|
||||
} else if res2.url.as_str().contains("testpart=frombinned") {
|
||||
let binrange = res2
|
||||
.query
|
||||
.covering_range()?
|
||||
.binned_range_time()
|
||||
.ok_or_else(|| Error::BadRange)?;
|
||||
let stream = scyllaconn::binned2::frombinned::FromBinned::new(series, binrange, scyqueue);
|
||||
let stream =
|
||||
scyllaconn::binned2::frombinned::FromBinned::new(series, binrange, scyqueue, res2.cache_read_provider);
|
||||
let stream = stream.map_err(Error::from);
|
||||
let msg = format!("{}", res2.url.as_str());
|
||||
let stream = futures_util::stream::iter([Ok(msg)]).chain(stream);
|
||||
Box::pin(stream) as Pin<Box<dyn Stream<Item = _> + Send>>
|
||||
// let msg = format!("{}", res2.url.as_str());
|
||||
// let stream = futures_util::stream::iter([Ok(msg)]).chain(stream);
|
||||
let stream = stream.map(|x| {
|
||||
//
|
||||
x
|
||||
});
|
||||
let stream = stream.map(|item| {
|
||||
use items_0::streamitem::RangeCompletableItem;
|
||||
use items_0::streamitem::StreamItem;
|
||||
use items_0::timebin::BinsBoxed;
|
||||
match item {
|
||||
Ok(StreamItem::DataItem(mut x)) => {
|
||||
x.fix_numerics();
|
||||
let ret = x.boxed_into_collectable_box();
|
||||
Ok(StreamItem::DataItem(RangeCompletableItem::Data(ret)))
|
||||
}
|
||||
Ok(StreamItem::Log(x)) => Ok(StreamItem::Log(x)),
|
||||
Ok(StreamItem::Stats(x)) => Ok(StreamItem::Stats(x)),
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
});
|
||||
let stream = stream.map_err(|e| daqbuf_err::Error::from_string(e));
|
||||
let timeout_content_base = res2
|
||||
.query
|
||||
.timeout_content()
|
||||
.unwrap_or(Duration::from_millis(2000))
|
||||
.min(Duration::from_millis(8000))
|
||||
.max(Duration::from_millis(334));
|
||||
let timeout_content_2 = timeout_content_base * 2 / 3;
|
||||
let stream = stream.map(|x| Some(x)).chain(futures_util::stream::iter([None]));
|
||||
let stream = TimeoutableStream::new(timeout_content_base, res2.timeout_provider, stream);
|
||||
let stream = Box::pin(stream);
|
||||
let stream = timeoutable_collectable_stream_to_json_bytes(stream, timeout_content_2);
|
||||
// let stream = stream.map(|x| Ok(format!("dummy82749827348932")));
|
||||
// Box::pin(stream) as Pin<Box<dyn Stream<Item = _> + Send>>
|
||||
stream
|
||||
} else {
|
||||
let msg = format!("UNKNOWN {}", res2.url.as_str());
|
||||
let stream = futures_util::stream::iter([Ok(msg)]);
|
||||
Box::pin(stream)
|
||||
// let msg = format!("UNKNOWN {}", res2.url.as_str());
|
||||
// let stream = futures_util::stream::iter([Ok(msg)]);
|
||||
// Box::pin(stream)
|
||||
todo!()
|
||||
};
|
||||
let stream = streams::lenframe::bytes_chunks_to_len_framed_str(stream);
|
||||
let stream = streams::instrument::InstrumentStream::new(stream, res2.logspan);
|
||||
|
||||
@@ -219,6 +219,8 @@ async fn proxy_http_service_inner(
|
||||
Ok(proxy_backend_query::<MapPulseQuery>(req, ctx, proxy_config).await?)
|
||||
} else if path == "/api/4/binned" {
|
||||
Ok(proxy_backend_query::<BinnedQuery>(req, ctx, proxy_config).await?)
|
||||
} else if path == "/api/4/private/binnedv2" {
|
||||
Ok(proxy_backend_query::<BinnedQuery>(req, ctx, proxy_config).await?)
|
||||
} else if path == "/api/4/channel/config" {
|
||||
Ok(proxy_backend_query::<ChannelConfigQuery>(req, ctx, proxy_config).await?)
|
||||
} else if path.starts_with("/api/4/test/http/204") {
|
||||
|
||||
@@ -12,6 +12,7 @@ scylla = "1.1"
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
time = { version = "0.3.41", features = ["parsing", "formatting", "macros"] }
|
||||
hashbrown = "0.15.3"
|
||||
autoerr = "0.0.3"
|
||||
daqbuf-err = { path = "../../../daqbuf-err" }
|
||||
netpod = { path = "../../../daqbuf-netpod", package = "daqbuf-netpod" }
|
||||
|
||||
@@ -3,4 +3,5 @@ pub mod binnedrtmsplsps;
|
||||
pub mod frombinned;
|
||||
pub mod frombinnedandevents;
|
||||
pub mod intraday;
|
||||
pub mod mspiter;
|
||||
pub mod msplspiter;
|
||||
|
||||
@@ -1,20 +1,33 @@
|
||||
use super::mspiter::MspChunker;
|
||||
use super::msplspiter::MspLspIter;
|
||||
use crate::binwriteindex::read_all_coarse::ReadAllCoarse;
|
||||
use crate::worker::ScyllaQueue;
|
||||
use daqbuf_series::SeriesId;
|
||||
use daqbuf_series::msp::LspU32;
|
||||
use daqbuf_series::msp::MspU32;
|
||||
use daqbuf_series::msp::PrebinnedPartitioning;
|
||||
use futures_util::FutureExt;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use hashbrown::HashMap;
|
||||
use items_0::streamitem::Sitemty3;
|
||||
use items_0::streamitem::sitem3_data;
|
||||
use items_0::streamitem::sitem3_log_info;
|
||||
use items_0::timebin::BinningggContainerBinsDyn;
|
||||
use items_0::timebin::BinsBoxed;
|
||||
use netpod::BinnedRange;
|
||||
use netpod::DtMs;
|
||||
use netpod::TsMs;
|
||||
use netpod::TsNano;
|
||||
use netpod::range::evrange::NanoRange;
|
||||
use netpod::ttl::RetentionTime;
|
||||
use serde::Serialize;
|
||||
use std::collections::VecDeque;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
use streams::timebin::CacheReadProvider;
|
||||
|
||||
autoerr::create_error_v1!(
|
||||
name(Error, "Binned2FromBinned"),
|
||||
@@ -25,16 +38,32 @@ autoerr::create_error_v1!(
|
||||
|
||||
type IndexRow = (RetentionTime, MspU32, LspU32, DtMs);
|
||||
|
||||
#[derive(Debug)]
|
||||
struct FetchJob {
|
||||
min: (MspU32, LspU32),
|
||||
max: (MspU32, LspU32),
|
||||
dbg_ts_1: (TsMs, TsMs),
|
||||
dbg_ts_2: (TsMs, TsMs),
|
||||
rt: RetentionTime,
|
||||
pbp: PrebinnedPartitioning,
|
||||
}
|
||||
|
||||
enum StateA {
|
||||
ReadAllCoarse(ReadAllCoarse, VecDeque<IndexRow>),
|
||||
ExecuteJobs,
|
||||
Done,
|
||||
}
|
||||
|
||||
pub struct FromBinned {
|
||||
// range: NanoRange,
|
||||
cache_read_provider: Arc<dyn CacheReadProvider>,
|
||||
series: SeriesId,
|
||||
binrange: BinnedRange<TsNano>,
|
||||
state_a: StateA,
|
||||
outbuf: VecDeque<String>,
|
||||
index_map: HashMap<(MspU32, LspU32), VecDeque<(DtMs, RetentionTime)>>,
|
||||
jobs: VecDeque<FetchJob>,
|
||||
job_fut: Option<Pin<Box<dyn Future<Output = (Vec<String>, Option<Box<dyn BinningggContainerBinsDyn>>)> + Send>>>,
|
||||
logoutbuf: VecDeque<String>,
|
||||
binoutbuf: VecDeque<BinsBoxed>,
|
||||
}
|
||||
|
||||
fn def<T: Default>() -> T {
|
||||
@@ -42,43 +71,211 @@ fn def<T: Default>() -> T {
|
||||
}
|
||||
|
||||
impl FromBinned {
|
||||
pub fn new(series: SeriesId, binrange: BinnedRange<TsNano>, scyqueue: &ScyllaQueue) -> Self {
|
||||
pub fn new(
|
||||
series: SeriesId,
|
||||
binrange: BinnedRange<TsNano>,
|
||||
scyqueue: &ScyllaQueue,
|
||||
cache_read_provider: Arc<dyn CacheReadProvider>,
|
||||
) -> Self {
|
||||
let state_a = StateA::ReadAllCoarse(
|
||||
ReadAllCoarse::new(series, binrange.to_nano_range(), scyqueue.clone()),
|
||||
def(),
|
||||
);
|
||||
Self {
|
||||
cache_read_provider,
|
||||
series,
|
||||
binrange,
|
||||
state_a,
|
||||
outbuf: def(),
|
||||
index_map: def(),
|
||||
jobs: def(),
|
||||
job_fut: None,
|
||||
logoutbuf: def(),
|
||||
binoutbuf: def(),
|
||||
}
|
||||
}
|
||||
|
||||
fn push_string<T: ToString>(&mut self, x: T) {
|
||||
self.outbuf.push_back(x.to_string());
|
||||
self.logoutbuf.push_back(x.to_string());
|
||||
}
|
||||
|
||||
fn push_json<T: Serialize>(&mut self, x: T) {
|
||||
let js = serde_json::to_string(&x).unwrap();
|
||||
self.outbuf.push_back(js);
|
||||
self.logoutbuf.push_back(js);
|
||||
}
|
||||
|
||||
fn handle_coarse_index(&mut self, rows: VecDeque<IndexRow>) {
|
||||
self.push_string(format!("handle_coarse_index"));
|
||||
for e in rows {
|
||||
self.push_string(format!("{:?}", e));
|
||||
let k = (e.1, e.2);
|
||||
let h = &mut self.index_map;
|
||||
if let Some(v) = h.get_mut(&k) {
|
||||
v.push_back((e.3, e.0));
|
||||
} else {
|
||||
let mut v = VecDeque::new();
|
||||
v.push_back((e.3, e.0));
|
||||
h.insert(k, v);
|
||||
}
|
||||
}
|
||||
self.index_map.iter_mut().for_each(|(_, v)| {
|
||||
v.make_contiguous().sort();
|
||||
});
|
||||
}
|
||||
|
||||
fn build_day1_jobs(&mut self) {
|
||||
let mut jobs = VecDeque::new();
|
||||
let pbp1 = PrebinnedPartitioning::Day1;
|
||||
self.push_string(format!("binrange {:?}", self.binrange));
|
||||
self.push_string(format!("to_nano_range {:?}", self.binrange.to_nano_range()));
|
||||
let it = MspLspIter::new_covering(self.binrange.to_nano_range(), pbp1.clone());
|
||||
for day in it {
|
||||
{
|
||||
let nday = pbp1.patch_len() as u64 * day.0.to_u64() + day.1.to_u32() as u64;
|
||||
let ts = 60 * 60 * 24 * nday;
|
||||
let ts = time::UtcDateTime::from_unix_timestamp(ts as _);
|
||||
self.push_string(format!("build_day1_jobs {:?} ts {:?}", day, ts));
|
||||
}
|
||||
let k = (day.0, day.1);
|
||||
if let Some(ixs) = self.index_map.get(&k) {
|
||||
let mut log = Vec::new();
|
||||
log.push(format!("have index {:?}", ixs));
|
||||
let mut found = None;
|
||||
for ix in ixs.iter().rev() {
|
||||
if ix.0.ms() <= self.binrange.bin_len_dt_ms().ms() {
|
||||
if let Some(found) = found.as_ref() {
|
||||
log.push(format!("already found a better solution found {:?} e {:?}", found, ix));
|
||||
} else {
|
||||
match PrebinnedPartitioning::from_binlen(ix.0) {
|
||||
Ok(pbp2) => {
|
||||
log.push(format!("FOUND {:?}", ix));
|
||||
found = Some((ix.1.clone(), pbp2));
|
||||
}
|
||||
Err(_) => {
|
||||
log.push(format!("binlen not a pbp {:?}", ix));
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
log.push(format!("too coarse {:?}", ix));
|
||||
}
|
||||
}
|
||||
for x in log {
|
||||
self.push_string(x);
|
||||
}
|
||||
if let Some(ix) = found {
|
||||
// determine already here the msp/lsp range that this job must query
|
||||
// for the also given pbp.
|
||||
let pbp2 = ix.1.clone();
|
||||
let rbeg = self.binrange.nano_beg().to_ts_ms();
|
||||
let rend = self.binrange.nano_end().to_ts_ms();
|
||||
let gbeg = pbp1.msp_lsp_to_ts(day.0, day.1);
|
||||
let gend = gbeg.add_dt_ms(pbp1.bin_len());
|
||||
let beg = if rbeg > gbeg { rbeg } else { gbeg };
|
||||
let end = if rend < gend { rend } else { gend };
|
||||
{
|
||||
let ts_beg = time::UtcDateTime::from_unix_timestamp(beg.sec() as _);
|
||||
let ts_end = time::UtcDateTime::from_unix_timestamp(end.sec() as _);
|
||||
self.push_string(format!("beg {:?} end {:?}", ts_beg, ts_end));
|
||||
}
|
||||
let it3 = {
|
||||
let range = NanoRange::from_ms_u64(beg.ms(), end.ms());
|
||||
MspLspIter::new_covering(range, pbp2.clone())
|
||||
};
|
||||
let job = FetchJob {
|
||||
min: it3.mins(),
|
||||
max: it3.maxs(),
|
||||
dbg_ts_1: (beg, end),
|
||||
// TODO remove
|
||||
dbg_ts_2: (beg, end),
|
||||
rt: ix.0,
|
||||
// TODO which pbp is expected here?
|
||||
pbp: pbp2,
|
||||
};
|
||||
jobs.push_back(job);
|
||||
}
|
||||
} else {
|
||||
self.push_string(format!("no index entry"));
|
||||
}
|
||||
}
|
||||
self.jobs = jobs;
|
||||
}
|
||||
|
||||
fn make_next_job(
|
||||
&mut self,
|
||||
) -> Option<Pin<Box<dyn Future<Output = (Vec<String>, Option<Box<dyn BinningggContainerBinsDyn>>)> + Send>>> {
|
||||
// iterate over the jobs and subjobs.
|
||||
// each cache read provider task reads from a specific msp.
|
||||
// therefore, I have to break at the msp boundaries.
|
||||
// for that, use a msp iterator.
|
||||
if let Some(job) = self.jobs.pop_front() {
|
||||
let series = self.series;
|
||||
let pbp = job.pbp.clone();
|
||||
let binlen = pbp.bin_len();
|
||||
// let tsbeg: TsMs = job.2;
|
||||
// let tsend: TsMs = job.3;
|
||||
// let range = NanoRange::from_ms_u64(tsbeg.ms(), tsend.ms());
|
||||
|
||||
self.push_string(format!("make job {:?}", job));
|
||||
|
||||
let chunk_it = MspChunker::from_min_max(pbp, job.min, job.max);
|
||||
self.push_string(format!("chunk_it {:?}", chunk_it));
|
||||
let mut futs = VecDeque::new();
|
||||
for chunk in chunk_it {
|
||||
self.push_string(format!("chunk {:?}", chunk));
|
||||
let offs = chunk.lsp1.to_u32()..chunk.lsp2.to_u32();
|
||||
let fut = self
|
||||
.cache_read_provider
|
||||
.read(series.id(), binlen, chunk.msp.to_u64(), offs);
|
||||
futs.push_back(fut);
|
||||
}
|
||||
|
||||
// let msp = job.min.0.0 as u64;
|
||||
// TODO add helper, make safer
|
||||
// let offs = (job.min.1.0 as u32)..(job.max.1.0 as u32);
|
||||
|
||||
let fut = async move {
|
||||
let mut log = Vec::new();
|
||||
let mut bins2: Option<Box<dyn BinningggContainerBinsDyn>> = None;
|
||||
for fut in futs {
|
||||
match fut.await {
|
||||
Ok(x) => match x {
|
||||
Some(mut bins) => match bins2.as_mut() {
|
||||
Some(bins2) => {
|
||||
log.push(format!("drain into len {}", bins.len()));
|
||||
bins.drain_into(bins2.as_mut(), 0..bins.len());
|
||||
}
|
||||
None => {
|
||||
bins2 = Some(bins);
|
||||
}
|
||||
},
|
||||
None => {
|
||||
log.push(format!("binned read job returns None"));
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
log.push(format!("error in binned read job {}", e));
|
||||
}
|
||||
}
|
||||
}
|
||||
(log, bins2)
|
||||
};
|
||||
Some(Box::pin(fut))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for FromBinned {
|
||||
type Item = Result<String, Error>;
|
||||
type Item = Sitemty3<BinsBoxed, Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
use Poll::*;
|
||||
loop {
|
||||
break if let Some(x) = self.outbuf.pop_front() {
|
||||
Ready(Some(Ok(x)))
|
||||
break if let Some(x) = self.logoutbuf.pop_front() {
|
||||
Ready(Some(sitem3_log_info(x)))
|
||||
} else if let Some(x) = self.binoutbuf.pop_front() {
|
||||
Ready(Some(sitem3_data(x)))
|
||||
} else {
|
||||
use StateA::*;
|
||||
let self2 = self.as_mut().get_mut();
|
||||
@@ -102,13 +299,45 @@ impl Stream for FromBinned {
|
||||
}
|
||||
Ready(None) => {
|
||||
let a = std::mem::replace(rows, def());
|
||||
self2.handle_coarse_index(a);
|
||||
self2.state_a = StateA::Done;
|
||||
self2.push_json(&"done with reading coarse");
|
||||
self2.push_string("done with reading coarse");
|
||||
self2.handle_coarse_index(a);
|
||||
self2.build_day1_jobs();
|
||||
self2.state_a = StateA::ExecuteJobs;
|
||||
continue;
|
||||
}
|
||||
Pending => Pending,
|
||||
},
|
||||
ExecuteJobs => {
|
||||
if let Some(fut) = self2.job_fut.as_mut() {
|
||||
match fut.poll_unpin(cx) {
|
||||
Ready((log, bins)) => {
|
||||
self2.job_fut = None;
|
||||
self.push_string(format!("ExecuteJobs sees Ready"));
|
||||
for x in log {
|
||||
self.push_string(x);
|
||||
}
|
||||
match bins {
|
||||
Some(bins) => {
|
||||
self.binoutbuf.push_back(bins);
|
||||
}
|
||||
None => {
|
||||
// TODO report
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
Pending => Pending,
|
||||
}
|
||||
} else if let Some(fut) = self2.make_next_job() {
|
||||
// self2.state_a = StateA::ExecuteJobs;
|
||||
self2.job_fut = Some(fut);
|
||||
continue;
|
||||
} else {
|
||||
self2.state_a = StateA::Done;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
Done => Ready(None),
|
||||
}
|
||||
};
|
||||
|
||||
83
crates/scyllaconn/src/binned2/mspiter.rs
Normal file
83
crates/scyllaconn/src/binned2/mspiter.rs
Normal file
@@ -0,0 +1,83 @@
|
||||
use daqbuf_series::msp::LspU32;
|
||||
use daqbuf_series::msp::MspU32;
|
||||
use daqbuf_series::msp::PrebinnedPartitioning;
|
||||
use netpod::DtMs;
|
||||
use netpod::range::evrange::NanoRange;
|
||||
|
||||
// lsp2 is meant exclusive.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct MspChunkerItem {
|
||||
pub msp: MspU32,
|
||||
pub lsp1: LspU32,
|
||||
pub lsp2: LspU32,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct MspChunker {
|
||||
pbp: PrebinnedPartitioning,
|
||||
#[allow(unused)]
|
||||
min: (MspU32, LspU32),
|
||||
max: (MspU32, LspU32),
|
||||
cur: (MspU32, LspU32),
|
||||
}
|
||||
|
||||
impl MspChunker {
|
||||
pub fn new_covering(range: NanoRange, pbp: PrebinnedPartitioning) -> Self {
|
||||
let mins = pbp.msp_lsp(range.beg_ts().to_ts_ms());
|
||||
let mins = (MspU32(mins.0), LspU32(mins.1));
|
||||
let end1 = range.end_ts().to_ts_ms();
|
||||
let g = DtMs::from_ms_u64(pbp.bin_len().ms() - 1);
|
||||
let end2 = end1.add_dt_ms(g);
|
||||
let maxs = pbp.msp_lsp(end2);
|
||||
let maxs = (MspU32(maxs.0), LspU32(maxs.1));
|
||||
Self {
|
||||
pbp,
|
||||
min: mins,
|
||||
max: maxs,
|
||||
cur: mins,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_min_max(pbp: PrebinnedPartitioning, min: (MspU32, LspU32), max: (MspU32, LspU32)) -> Self {
|
||||
let curs = min;
|
||||
Self {
|
||||
pbp,
|
||||
min,
|
||||
max,
|
||||
cur: curs,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Iterator for MspChunker {
|
||||
type Item = MspChunkerItem;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
if self.cur.0 >= self.max.0 && self.cur.1 >= self.max.1 {
|
||||
None
|
||||
} else {
|
||||
if self.cur.0 < self.max.0 {
|
||||
let item = MspChunkerItem {
|
||||
msp: self.cur.0,
|
||||
lsp1: self.cur.1,
|
||||
lsp2: LspU32(self.pbp.patch_len()),
|
||||
};
|
||||
self.cur.0.0 += 1;
|
||||
self.cur.1 = LspU32(0);
|
||||
Some(item)
|
||||
} else {
|
||||
if self.cur.1 < self.max.1 {
|
||||
let item = MspChunkerItem {
|
||||
msp: self.cur.0,
|
||||
lsp1: self.cur.1,
|
||||
lsp2: self.max.1,
|
||||
};
|
||||
self.cur.1 = self.max.1;
|
||||
Some(item)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,7 +1,7 @@
|
||||
use daqbuf_series::msp::LspU32;
|
||||
use daqbuf_series::msp::MspU32;
|
||||
use daqbuf_series::msp::PrebinnedPartitioning;
|
||||
use netpod::TsMs;
|
||||
use netpod::DtMs;
|
||||
use netpod::range::evrange::NanoRange;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -12,15 +12,37 @@ pub struct MspLspItem {
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct MspLspIter {
|
||||
#[allow(unused)]
|
||||
range: NanoRange,
|
||||
pbp: PrebinnedPartitioning,
|
||||
ts: TsMs,
|
||||
#[allow(unused)]
|
||||
mins: (u32, u32),
|
||||
maxs: (u32, u32),
|
||||
curs: (u32, u32),
|
||||
}
|
||||
|
||||
impl MspLspIter {
|
||||
pub fn new(range: NanoRange, pbp: PrebinnedPartitioning) -> Self {
|
||||
let ts = range.beg_ts().to_ts_ms();
|
||||
Self { range, pbp, ts }
|
||||
pub fn new_covering(range: NanoRange, pbp: PrebinnedPartitioning) -> Self {
|
||||
let mins = pbp.msp_lsp(range.beg_ts().to_ts_ms());
|
||||
let end1 = range.end_ts().to_ts_ms();
|
||||
let g = DtMs::from_ms_u64(pbp.bin_len().ms() - 1);
|
||||
let end2 = end1.add_dt_ms(g);
|
||||
let maxs = pbp.msp_lsp(end2);
|
||||
Self {
|
||||
range,
|
||||
pbp,
|
||||
mins,
|
||||
maxs,
|
||||
curs: mins,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn mins(&self) -> (MspU32, LspU32) {
|
||||
(MspU32(self.mins.0), LspU32(self.mins.1))
|
||||
}
|
||||
|
||||
pub fn maxs(&self) -> (MspU32, LspU32) {
|
||||
(MspU32(self.maxs.0), LspU32(self.maxs.1))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,13 +50,16 @@ impl Iterator for MspLspIter {
|
||||
type Item = (MspU32, LspU32);
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
if self.ts >= self.range.end_ts().to_ts_ms() {
|
||||
if self.curs.0 >= self.maxs.0 && self.curs.1 >= self.maxs.1 {
|
||||
None
|
||||
} else {
|
||||
let x = self.pbp.msp_lsp(self.ts);
|
||||
let msp = MspU32(x.0);
|
||||
let lsp = LspU32(x.1);
|
||||
self.ts = self.ts.add_dt_ms(self.pbp.bin_len());
|
||||
let msp = MspU32(self.curs.0);
|
||||
let lsp = LspU32(self.curs.1);
|
||||
self.curs.1 += 1;
|
||||
if self.curs.1 >= self.pbp.patch_len() {
|
||||
self.curs.1 = 0;
|
||||
self.curs.0 += 1;
|
||||
}
|
||||
Some((msp, lsp))
|
||||
}
|
||||
}
|
||||
@@ -45,7 +70,7 @@ fn test_iter_00() {
|
||||
let range = NanoRange::from_strings("2024-06-07T09:17:31Z", "2024-06-07T09:17:31Z").unwrap();
|
||||
let pbp = PrebinnedPartitioning::Sec1;
|
||||
assert_eq!(pbp.patch_len(), 1200);
|
||||
let it = MspLspIter::new(range, pbp);
|
||||
let it = MspLspIter::new_covering(range, pbp);
|
||||
let a: Vec<_> = it.collect();
|
||||
assert_eq!(a.len(), 0);
|
||||
}
|
||||
@@ -54,12 +79,15 @@ fn test_iter_00() {
|
||||
fn test_iter_01() {
|
||||
let range = NanoRange::from_strings("2024-06-07T09:17:31Z", "2024-06-07T09:17:32Z").unwrap();
|
||||
let pbp = PrebinnedPartitioning::Sec1;
|
||||
assert_eq!(pbp.patch_len(), 1200);
|
||||
let it = MspLspIter::new(range, pbp.clone());
|
||||
let it = MspLspIter::new_covering(range, pbp.clone());
|
||||
let a: Vec<_> = it.collect();
|
||||
assert_eq!(a.len(), 1);
|
||||
assert_eq!(a[0].0.0, 1431459);
|
||||
assert_eq!(a[0].1.0, 1051);
|
||||
let e = a.first().unwrap();
|
||||
assert_eq!(e.0.0, 1431459);
|
||||
assert_eq!(e.1.0, 1051);
|
||||
// let e = a.last().unwrap();
|
||||
// assert_eq!(e.0.0, 1431459);
|
||||
// assert_eq!(e.1.0, 1051);
|
||||
let ts_sec = pbp.patch_len() as u64 * a[0].0.0 as u64 + a[0].1.0 as u64;
|
||||
// time::UtcDateTime::new(time::Date::with, time)
|
||||
let ts1 = time::UtcDateTime::from_unix_timestamp(ts_sec as i64).unwrap();
|
||||
@@ -72,7 +100,7 @@ fn test_iter_01() {
|
||||
fn test_iter_02() {
|
||||
let range = NanoRange::from_strings("2024-06-07T09:17:31Z", "2024-06-07T09:22:00Z").unwrap();
|
||||
let pbp = PrebinnedPartitioning::Sec1;
|
||||
let it = MspLspIter::new(range, pbp.clone());
|
||||
let it = MspLspIter::new_covering(range, pbp.clone());
|
||||
let a: Vec<_> = it.collect();
|
||||
assert_eq!(a.len(), 240 + 29);
|
||||
let e = &a[a.len() - 1];
|
||||
@@ -85,7 +113,7 @@ fn test_iter_03() {
|
||||
// Check clamped covering in correct direction
|
||||
let range = NanoRange::from_strings("2024-06-07T09:17:31.1Z", "2024-06-07T09:21:59.8Z").unwrap();
|
||||
let pbp = PrebinnedPartitioning::Sec1;
|
||||
let it = MspLspIter::new(range, pbp.clone());
|
||||
let it = MspLspIter::new_covering(range, pbp.clone());
|
||||
let a: Vec<_> = it.collect();
|
||||
assert_eq!(a.len(), 240 + 29);
|
||||
let e = &a[0];
|
||||
@@ -95,3 +123,30 @@ fn test_iter_03() {
|
||||
assert_eq!(e.0.0, 1431460);
|
||||
assert_eq!(e.1.0, 119);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_iter_04() {
|
||||
// Check clamped covering in correct direction
|
||||
let range = NanoRange::from_strings("2024-06-07T09:17:31.9Z", "2024-06-07T09:21:59.1Z").unwrap();
|
||||
let pbp = PrebinnedPartitioning::Sec1;
|
||||
let it = MspLspIter::new_covering(range, pbp.clone());
|
||||
let a: Vec<_> = it.collect();
|
||||
assert_eq!(a.len(), 240 + 29);
|
||||
let e = &a[0];
|
||||
assert_eq!(e.0.0, 1431459);
|
||||
assert_eq!(e.1.0, 1051);
|
||||
let e = &a[a.len() - 1];
|
||||
assert_eq!(e.0.0, 1431460);
|
||||
assert_eq!(e.1.0, 119);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_iter_05() {
|
||||
let range = NanoRange::from_strings("2025-05-05T12:00:00Z", "2025-05-08T00:00:00Z").unwrap();
|
||||
let pbp = PrebinnedPartitioning::Day1;
|
||||
let it = MspLspIter::new_covering(range, pbp.clone());
|
||||
let a: Vec<_> = it.collect();
|
||||
assert_eq!(a.len(), 3);
|
||||
let e = &a[0];
|
||||
let e = &a[a.len() - 1];
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user