diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index cbf0d7a..0629dcb 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -1408,6 +1408,7 @@ impl CaConn { let binwriter = BinWriter::new( beg, conf.conf.min_quiets(), + conf.conf.is_polled(), st2.channel.cssid, writer.series(), st2.channel.scalar_type.clone(), diff --git a/serieswriter/src/binwriter.rs b/serieswriter/src/binwriter.rs index 8827e3b..b398415 100644 --- a/serieswriter/src/binwriter.rs +++ b/serieswriter/src/binwriter.rs @@ -18,6 +18,7 @@ use netpod::TsNano; use scywr::insertqueues::InsertDeques; use scywr::iteminsertqueue::QueryItem; use scywr::iteminsertqueue::TimeBinSimpleF32V02; +use series::msp::PrebinnedPartitioning; use series::ChannelStatusSeriesId; use series::SeriesId; use std::time::Duration; @@ -40,28 +41,28 @@ autoerr::create_error_v1!( UnexpectedContainerType, PartitionMsp(#[from] series::msp::Error), UnsupportedGridDiv(DtMs, DtMs), + BinnerNoProgress, + IngestLoopLimit, }, ); -fn bin_len_clamp(dur: Duration) -> DtMs { - let dur = DtMs::from_ms_u64(1000 * dur.as_secs()); +fn bin_len_clamp(dur: DtMs) -> PrebinnedPartitioning { if dur < DtMs::from_ms_u64(1000 * 2) { - DtMs::from_ms_u64(1000 * 1) + PrebinnedPartitioning::Sec1 } else if dur <= DtMs::from_ms_u64(1000 * 20) { - DtMs::from_ms_u64(1000 * 10) + PrebinnedPartitioning::Sec10 } else if dur <= DtMs::from_ms_u64(1000 * 60 * 2) { - DtMs::from_ms_u64(1000 * 60 * 1) + PrebinnedPartitioning::Min1 } else if dur <= DtMs::from_ms_u64(1000 * 60 * 20) { - DtMs::from_ms_u64(1000 * 60 * 10) + PrebinnedPartitioning::Min10 } else if dur <= DtMs::from_ms_u64(1000 * 60 * 60 * 2) { - DtMs::from_ms_u64(1000 * 60 * 60 * 1) + PrebinnedPartitioning::Hour1 } else { - DtMs::from_ms_u64(1000 * 60 * 60 * 24) + PrebinnedPartitioning::Day1 } } -fn get_div(bin_len: DtMs) -> Result { - let pbp = series::msp::PrebinnedPartitioning::try_from(bin_len)?; +fn get_div(pbp: PrebinnedPartitioning) -> Result { let ret = pbp.msp_div(); Ok(ret) } @@ -98,6 +99,7 @@ impl BinWriter { pub fn new( beg: TsNano, min_quiets: MinQuiets, + is_polled: bool, cssid: ChannelStatusSeriesId, sid: SeriesId, scalar_type: ScalarType, @@ -109,45 +111,40 @@ impl BinWriter { debug_bin2!(trd, "enabled debug for {}", chname); } const DUR_ZERO: DtMs = DtMs::from_ms_u64(0); - const DUR_MAX: DtMs = DtMs::from_ms_u64(1000 * 60 * 60 * 24 * 40); + const DUR_MAX: DtMs = DtMs::from_ms_u64(1000 * 60 * 60 * 24 * 123); let rts = [RetentionTime::Short, RetentionTime::Medium, RetentionTime::Long]; let quiets = [min_quiets.st.clone(), min_quiets.mt.clone(), min_quiets.lt.clone()]; let mut binner_1st = None; let mut binner_others = Vec::new(); let mut combs: Vec<_> = rts .into_iter() - .zip(quiets.into_iter().map(bin_len_clamp)) + .zip(quiets.into_iter().map(|x| DtMs::from_ms_u64(x.as_millis() as u64))) + .filter(|x| x.1 > DUR_ZERO && x.1 < DUR_MAX) + .map(|x| (x.0, bin_len_clamp(x.1))) .map(|x| (x.0, x.1, WriteCntZero::Disable)) .collect(); if let Some(last) = combs.last_mut() { - if last.1 >= DtMs::from_ms_u64(1000 * 60 * 60 * 24) { - last.0 = RetentionTime::Long; - last.1 = DtMs::from_ms_u64(1000 * 60 * 60 * 24); - last.2 = WriteCntZero::Enable; - } else if last.1 >= DtMs::from_ms_u64(1000 * 60 * 60 * 1) { - last.0 = RetentionTime::Long; - last.1 = DtMs::from_ms_u64(1000 * 60 * 60 * 1); - combs.push(( - RetentionTime::Long, - DtMs::from_ms_u64(1000 * 60 * 60 * 24), - WriteCntZero::Enable, - )); - } else { - combs.push(( - RetentionTime::Long, - DtMs::from_ms_u64(1000 * 60 * 60 * 1), - WriteCntZero::Disable, - )); - combs.push(( - RetentionTime::Long, - DtMs::from_ms_u64(1000 * 60 * 60 * 24), - WriteCntZero::Enable, - )); + match &last.1 { + PrebinnedPartitioning::Day1 => { + last.0 = RetentionTime::Long; + last.2 = WriteCntZero::Enable; + } + PrebinnedPartitioning::Hour1 => { + last.0 = RetentionTime::Long; + combs.push((RetentionTime::Long, PrebinnedPartitioning::Day1, WriteCntZero::Enable)); + } + _ => { + combs.push((RetentionTime::Long, PrebinnedPartitioning::Hour1, WriteCntZero::Disable)); + combs.push((RetentionTime::Long, PrebinnedPartitioning::Day1, WriteCntZero::Enable)); + } } } + if !is_polled && combs.len() > 1 { + combs.remove(0); + } // check for e in combs.iter() { - if get_div(e.1).is_err() { + if get_div(e.1.clone()).is_err() { info!("unsupported bin length {:?} {:?} {:?}", e.0, e.1, chname); combs.clear(); break; @@ -155,24 +152,22 @@ impl BinWriter { } let combs = combs; debug_bin2!(trd, "{:?} binning combs {:?}", chname, combs); - for (rt, bin_len, write_zero) in combs { - if bin_len > DUR_ZERO && bin_len <= DUR_MAX { - if binner_1st.is_none() { - let range = BinnedRange::from_beg_to_inf(beg, bin_len); - let mut binner = BinnedEventsTimeweight::new(range); - if let WriteCntZero::Enable = write_zero { - binner.cnt_zero_enable(); - } - binner_1st = Some((rt, binner, write_zero)); - } else { - let range = BinnedRange::from_beg_to_inf(beg, bin_len); - let binner = BinnedBinsTimeweight::new(range); - if let WriteCntZero::Enable = write_zero { - // TODO - // binner.cnt_zero_enable(); - } - binner_others.push((rt, binner, write_zero)); + for (rt, pbp, write_zero) in combs { + if binner_1st.is_none() { + let range = BinnedRange::from_beg_to_inf(beg, pbp.bin_len()); + let mut binner = BinnedEventsTimeweight::new(range); + if let WriteCntZero::Enable = write_zero { + binner.cnt_zero_enable(); } + binner_1st = Some((rt, binner, write_zero)); + } else { + let range = BinnedRange::from_beg_to_inf(beg, pbp.bin_len()); + let binner = BinnedBinsTimeweight::new(range); + if let WriteCntZero::Enable = write_zero { + // TODO + // binner.cnt_zero_enable(); + } + binner_others.push((rt, binner, write_zero)); } } let ret = Self { @@ -212,65 +207,94 @@ impl BinWriter { pub fn tick(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> { if self.evbuf.len() != 0 { trace_tick!("tick evbuf len {}", self.evbuf.len()); - let buf = &self.evbuf; if true { - if let Some(ee) = self.binner_1st.as_mut() { - let rt = ee.0.clone(); - let write_zero = ee.2.clone(); - let binner = &mut ee.1; - // TODO avoid boxing - binner.ingest(&Box::new(buf))?; - let bins = binner.output(); - if bins.len() > 0 { - trace_bin2!(self.trd, "binner_1st out len {}", bins.len()); - Self::handle_output_ready(self.trd, self.sid, rt, &bins, write_zero, iqdqs)?; - // TODO avoid boxing - let mut bins2: BinsBoxed = Box::new(bins); - for i in 0..self.binner_others.len() { - let (rt, binner, write_zero) = &mut self.binner_others[i]; - let write_zero = write_zero.clone(); - binner.ingest(&bins2)?; - let bb: Option = binner.output()?; - match bb { - Some(bb) => { - if bb.len() > 0 { - trace_bin2!(self.trd, "binner_others {} out len {}", i, bb.len()); - if let Some(bb2) = bb.as_any_ref().downcast_ref::>() { - Self::handle_output_ready( - self.trd, - self.sid, - rt.clone(), - &bb2, - write_zero, - iqdqs, - )?; - } else { - return Err(Error::UnexpectedContainerType); - } - bins2 = bb; - } else { - break; - } - } - None => { - break; - } - } - } - } else { - // nothing to do - } - } else { - // nothing to do - } + self.tick_ingest_loop(iqdqs)?; + } else { + self.evbuf.clear(); } - self.evbuf.clear(); } else { trace_tick_verbose!("tick nothing to ingest"); } Ok(()) } + fn tick_ingest_loop(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> { + // loop until all events are ingested. + // remove zero bins for small bin lengths. + let mut i = 0; + while self.evbuf.len() != 0 { + self.tick_ingest_and_handle(iqdqs)?; + i += 1; + if i > 20000 { + let e = Error::IngestLoopLimit; + return Err(e); + } + } + Ok(()) + } + + fn tick_ingest_and_handle(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> { + let buf = &self.evbuf; + if let Some(ee) = self.binner_1st.as_mut() { + let rt = ee.0.clone(); + let write_zero = ee.2.clone(); + let binner = &mut ee.1; + // TODO avoid boxing + let bufbox = Box::new(buf); + use items_0::timebin::IngestReport; + let consumed_evs = match binner.ingest(&bufbox)? { + IngestReport::ConsumedAll => { + let n = bufbox.len(); + self.evbuf.clear(); + n + } + items_0::timebin::IngestReport::ConsumedPart(n) => { + self.evbuf.truncate_front(self.evbuf.len() - n); + n + } + }; + let bins = binner.output(); + if bins.len() > 0 { + trace_bin2!(self.trd, "binner_1st out len {}", bins.len()); + Self::handle_output_ready(self.trd, self.sid, rt, &bins, write_zero, iqdqs)?; + // TODO avoid boxing + let mut bins2: BinsBoxed = Box::new(bins); + for i in 0..self.binner_others.len() { + let (rt, binner, write_zero) = &mut self.binner_others[i]; + let write_zero = write_zero.clone(); + binner.ingest(&bins2)?; + let bb: Option = binner.output()?; + match bb { + Some(bb) => { + if bb.len() > 0 { + trace_bin2!(self.trd, "binner_others {} out len {}", i, bb.len()); + if let Some(bb2) = bb.as_any_ref().downcast_ref::>() { + Self::handle_output_ready(self.trd, self.sid, rt.clone(), &bb2, write_zero, iqdqs)?; + } else { + return Err(Error::UnexpectedContainerType); + } + bins2 = bb; + } else { + break; + } + } + None => { + break; + } + } + } + Ok(()) + } else if consumed_evs == 0 { + let e = Error::BinnerNoProgress; + return Err(e); + } else { + Ok(()) + } + } else { + Ok(()) + } + } + fn handle_output_ready( trd: bool, series: SeriesId, @@ -292,7 +316,8 @@ impl BinWriter { } else if cnt == 0 && !write_zero.enabled() { info!("zero count bin {:?}", series); } else { - let div = get_div(bin_len)?; + let pbp = PrebinnedPartitioning::try_from(bin_len)?; + let div = get_div(pbp)?; if div.ns() % bin_len.ns() != 0 { let e = Error::UnsupportedGridDiv(bin_len, div); return Err(e);