Take events into binner in iterative way

This commit is contained in:
Dominik Werder
2025-02-20 10:15:34 +01:00
parent 31538ee23a
commit dd88288ba7
2 changed files with 131 additions and 105 deletions

View File

@@ -1408,6 +1408,7 @@ impl CaConn {
let binwriter = BinWriter::new(
beg,
conf.conf.min_quiets(),
conf.conf.is_polled(),
st2.channel.cssid,
writer.series(),
st2.channel.scalar_type.clone(),

View File

@@ -18,6 +18,7 @@ use netpod::TsNano;
use scywr::insertqueues::InsertDeques;
use scywr::iteminsertqueue::QueryItem;
use scywr::iteminsertqueue::TimeBinSimpleF32V02;
use series::msp::PrebinnedPartitioning;
use series::ChannelStatusSeriesId;
use series::SeriesId;
use std::time::Duration;
@@ -40,28 +41,28 @@ autoerr::create_error_v1!(
UnexpectedContainerType,
PartitionMsp(#[from] series::msp::Error),
UnsupportedGridDiv(DtMs, DtMs),
BinnerNoProgress,
IngestLoopLimit,
},
);
fn bin_len_clamp(dur: Duration) -> DtMs {
let dur = DtMs::from_ms_u64(1000 * dur.as_secs());
fn bin_len_clamp(dur: DtMs) -> PrebinnedPartitioning {
if dur < DtMs::from_ms_u64(1000 * 2) {
DtMs::from_ms_u64(1000 * 1)
PrebinnedPartitioning::Sec1
} else if dur <= DtMs::from_ms_u64(1000 * 20) {
DtMs::from_ms_u64(1000 * 10)
PrebinnedPartitioning::Sec10
} else if dur <= DtMs::from_ms_u64(1000 * 60 * 2) {
DtMs::from_ms_u64(1000 * 60 * 1)
PrebinnedPartitioning::Min1
} else if dur <= DtMs::from_ms_u64(1000 * 60 * 20) {
DtMs::from_ms_u64(1000 * 60 * 10)
PrebinnedPartitioning::Min10
} else if dur <= DtMs::from_ms_u64(1000 * 60 * 60 * 2) {
DtMs::from_ms_u64(1000 * 60 * 60 * 1)
PrebinnedPartitioning::Hour1
} else {
DtMs::from_ms_u64(1000 * 60 * 60 * 24)
PrebinnedPartitioning::Day1
}
}
fn get_div(bin_len: DtMs) -> Result<DtMs, Error> {
let pbp = series::msp::PrebinnedPartitioning::try_from(bin_len)?;
fn get_div(pbp: PrebinnedPartitioning) -> Result<DtMs, Error> {
let ret = pbp.msp_div();
Ok(ret)
}
@@ -98,6 +99,7 @@ impl BinWriter {
pub fn new(
beg: TsNano,
min_quiets: MinQuiets,
is_polled: bool,
cssid: ChannelStatusSeriesId,
sid: SeriesId,
scalar_type: ScalarType,
@@ -109,45 +111,40 @@ impl BinWriter {
debug_bin2!(trd, "enabled debug for {}", chname);
}
const DUR_ZERO: DtMs = DtMs::from_ms_u64(0);
const DUR_MAX: DtMs = DtMs::from_ms_u64(1000 * 60 * 60 * 24 * 40);
const DUR_MAX: DtMs = DtMs::from_ms_u64(1000 * 60 * 60 * 24 * 123);
let rts = [RetentionTime::Short, RetentionTime::Medium, RetentionTime::Long];
let quiets = [min_quiets.st.clone(), min_quiets.mt.clone(), min_quiets.lt.clone()];
let mut binner_1st = None;
let mut binner_others = Vec::new();
let mut combs: Vec<_> = rts
.into_iter()
.zip(quiets.into_iter().map(bin_len_clamp))
.zip(quiets.into_iter().map(|x| DtMs::from_ms_u64(x.as_millis() as u64)))
.filter(|x| x.1 > DUR_ZERO && x.1 < DUR_MAX)
.map(|x| (x.0, bin_len_clamp(x.1)))
.map(|x| (x.0, x.1, WriteCntZero::Disable))
.collect();
if let Some(last) = combs.last_mut() {
if last.1 >= DtMs::from_ms_u64(1000 * 60 * 60 * 24) {
last.0 = RetentionTime::Long;
last.1 = DtMs::from_ms_u64(1000 * 60 * 60 * 24);
last.2 = WriteCntZero::Enable;
} else if last.1 >= DtMs::from_ms_u64(1000 * 60 * 60 * 1) {
last.0 = RetentionTime::Long;
last.1 = DtMs::from_ms_u64(1000 * 60 * 60 * 1);
combs.push((
RetentionTime::Long,
DtMs::from_ms_u64(1000 * 60 * 60 * 24),
WriteCntZero::Enable,
));
} else {
combs.push((
RetentionTime::Long,
DtMs::from_ms_u64(1000 * 60 * 60 * 1),
WriteCntZero::Disable,
));
combs.push((
RetentionTime::Long,
DtMs::from_ms_u64(1000 * 60 * 60 * 24),
WriteCntZero::Enable,
));
match &last.1 {
PrebinnedPartitioning::Day1 => {
last.0 = RetentionTime::Long;
last.2 = WriteCntZero::Enable;
}
PrebinnedPartitioning::Hour1 => {
last.0 = RetentionTime::Long;
combs.push((RetentionTime::Long, PrebinnedPartitioning::Day1, WriteCntZero::Enable));
}
_ => {
combs.push((RetentionTime::Long, PrebinnedPartitioning::Hour1, WriteCntZero::Disable));
combs.push((RetentionTime::Long, PrebinnedPartitioning::Day1, WriteCntZero::Enable));
}
}
}
if !is_polled && combs.len() > 1 {
combs.remove(0);
}
// check
for e in combs.iter() {
if get_div(e.1).is_err() {
if get_div(e.1.clone()).is_err() {
info!("unsupported bin length {:?} {:?} {:?}", e.0, e.1, chname);
combs.clear();
break;
@@ -155,24 +152,22 @@ impl BinWriter {
}
let combs = combs;
debug_bin2!(trd, "{:?} binning combs {:?}", chname, combs);
for (rt, bin_len, write_zero) in combs {
if bin_len > DUR_ZERO && bin_len <= DUR_MAX {
if binner_1st.is_none() {
let range = BinnedRange::from_beg_to_inf(beg, bin_len);
let mut binner = BinnedEventsTimeweight::new(range);
if let WriteCntZero::Enable = write_zero {
binner.cnt_zero_enable();
}
binner_1st = Some((rt, binner, write_zero));
} else {
let range = BinnedRange::from_beg_to_inf(beg, bin_len);
let binner = BinnedBinsTimeweight::new(range);
if let WriteCntZero::Enable = write_zero {
// TODO
// binner.cnt_zero_enable();
}
binner_others.push((rt, binner, write_zero));
for (rt, pbp, write_zero) in combs {
if binner_1st.is_none() {
let range = BinnedRange::from_beg_to_inf(beg, pbp.bin_len());
let mut binner = BinnedEventsTimeweight::new(range);
if let WriteCntZero::Enable = write_zero {
binner.cnt_zero_enable();
}
binner_1st = Some((rt, binner, write_zero));
} else {
let range = BinnedRange::from_beg_to_inf(beg, pbp.bin_len());
let binner = BinnedBinsTimeweight::new(range);
if let WriteCntZero::Enable = write_zero {
// TODO
// binner.cnt_zero_enable();
}
binner_others.push((rt, binner, write_zero));
}
}
let ret = Self {
@@ -212,65 +207,94 @@ impl BinWriter {
pub fn tick(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> {
if self.evbuf.len() != 0 {
trace_tick!("tick evbuf len {}", self.evbuf.len());
let buf = &self.evbuf;
if true {
if let Some(ee) = self.binner_1st.as_mut() {
let rt = ee.0.clone();
let write_zero = ee.2.clone();
let binner = &mut ee.1;
// TODO avoid boxing
binner.ingest(&Box::new(buf))?;
let bins = binner.output();
if bins.len() > 0 {
trace_bin2!(self.trd, "binner_1st out len {}", bins.len());
Self::handle_output_ready(self.trd, self.sid, rt, &bins, write_zero, iqdqs)?;
// TODO avoid boxing
let mut bins2: BinsBoxed = Box::new(bins);
for i in 0..self.binner_others.len() {
let (rt, binner, write_zero) = &mut self.binner_others[i];
let write_zero = write_zero.clone();
binner.ingest(&bins2)?;
let bb: Option<BinsBoxed> = binner.output()?;
match bb {
Some(bb) => {
if bb.len() > 0 {
trace_bin2!(self.trd, "binner_others {} out len {}", i, bb.len());
if let Some(bb2) = bb.as_any_ref().downcast_ref::<ContainerBins<f32, f32>>() {
Self::handle_output_ready(
self.trd,
self.sid,
rt.clone(),
&bb2,
write_zero,
iqdqs,
)?;
} else {
return Err(Error::UnexpectedContainerType);
}
bins2 = bb;
} else {
break;
}
}
None => {
break;
}
}
}
} else {
// nothing to do
}
} else {
// nothing to do
}
self.tick_ingest_loop(iqdqs)?;
} else {
self.evbuf.clear();
}
self.evbuf.clear();
} else {
trace_tick_verbose!("tick nothing to ingest");
}
Ok(())
}
fn tick_ingest_loop(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> {
// loop until all events are ingested.
// remove zero bins for small bin lengths.
let mut i = 0;
while self.evbuf.len() != 0 {
self.tick_ingest_and_handle(iqdqs)?;
i += 1;
if i > 20000 {
let e = Error::IngestLoopLimit;
return Err(e);
}
}
Ok(())
}
fn tick_ingest_and_handle(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> {
let buf = &self.evbuf;
if let Some(ee) = self.binner_1st.as_mut() {
let rt = ee.0.clone();
let write_zero = ee.2.clone();
let binner = &mut ee.1;
// TODO avoid boxing
let bufbox = Box::new(buf);
use items_0::timebin::IngestReport;
let consumed_evs = match binner.ingest(&bufbox)? {
IngestReport::ConsumedAll => {
let n = bufbox.len();
self.evbuf.clear();
n
}
items_0::timebin::IngestReport::ConsumedPart(n) => {
self.evbuf.truncate_front(self.evbuf.len() - n);
n
}
};
let bins = binner.output();
if bins.len() > 0 {
trace_bin2!(self.trd, "binner_1st out len {}", bins.len());
Self::handle_output_ready(self.trd, self.sid, rt, &bins, write_zero, iqdqs)?;
// TODO avoid boxing
let mut bins2: BinsBoxed = Box::new(bins);
for i in 0..self.binner_others.len() {
let (rt, binner, write_zero) = &mut self.binner_others[i];
let write_zero = write_zero.clone();
binner.ingest(&bins2)?;
let bb: Option<BinsBoxed> = binner.output()?;
match bb {
Some(bb) => {
if bb.len() > 0 {
trace_bin2!(self.trd, "binner_others {} out len {}", i, bb.len());
if let Some(bb2) = bb.as_any_ref().downcast_ref::<ContainerBins<f32, f32>>() {
Self::handle_output_ready(self.trd, self.sid, rt.clone(), &bb2, write_zero, iqdqs)?;
} else {
return Err(Error::UnexpectedContainerType);
}
bins2 = bb;
} else {
break;
}
}
None => {
break;
}
}
}
Ok(())
} else if consumed_evs == 0 {
let e = Error::BinnerNoProgress;
return Err(e);
} else {
Ok(())
}
} else {
Ok(())
}
}
fn handle_output_ready(
trd: bool,
series: SeriesId,
@@ -292,7 +316,8 @@ impl BinWriter {
} else if cnt == 0 && !write_zero.enabled() {
info!("zero count bin {:?}", series);
} else {
let div = get_div(bin_len)?;
let pbp = PrebinnedPartitioning::try_from(bin_len)?;
let div = get_div(pbp)?;
if div.ns() % bin_len.ns() != 0 {
let e = Error::UnsupportedGridDiv(bin_len, div);
return Err(e);