Improve bin writer

This commit is contained in:
Dominik Werder
2025-02-03 16:33:02 +01:00
parent cd8d1e135c
commit cca3d77af9
10 changed files with 222 additions and 206 deletions
+1 -1
View File
@@ -1,6 +1,6 @@
[package]
name = "daqingest"
version = "0.2.5"
version = "0.2.6-aa.0"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"
+11 -5
View File
@@ -1412,6 +1412,7 @@ impl CaConn {
writer.series(),
st2.channel.scalar_type.clone(),
st2.channel.shape.clone(),
conf.conf.name().into(),
)?;
self.stats.get_series_id_ok.inc();
{
@@ -1773,13 +1774,13 @@ impl CaConn {
// return Err(Error::with_msg_no_trace());
return Ok(());
};
if dbg_chn {
info!("handle_event_add_res {:?} {:?}", cid, ev);
if false && dbg_chn {
trace!("handle_event_add_res {:?} {:?}", cid, ev);
}
match ch_s {
ChannelState::Writable(st) => {
if dbg_chn {
info!("handle_event_add_res Writable {:?} {:?}", cid, ev);
if false && dbg_chn {
trace!("handle_event_add_res Writable {:?} {:?}", cid, ev);
}
// debug!(
// "CaConn sees data_count {} payload_len {}",
@@ -3061,7 +3062,12 @@ impl CaConn {
}
if self.tick_last_writer + Duration::from_millis(2000) <= tsnow {
self.tick_last_writer = tsnow;
self.tick_writers()?;
match self.tick_writers() {
Ok(()) => {}
Err(e) => {
error!("error in writers: {e}");
}
}
}
match &self.state {
CaConnState::Unconnected(_) => {}
+1 -1
View File
@@ -362,7 +362,7 @@ impl IocAddrQuery {
}
fn bump_backoff(x: &mut u32) {
*x = (1 + *x).min(10);
*x = (1 + *x).min(20);
}
struct SeriesLookupSender {
+11 -1
View File
@@ -112,7 +112,17 @@ pub struct MaybeWrongAddressState {
impl MaybeWrongAddressState {
pub fn new(since: SystemTime, backoff_cnt: u32) -> Self {
let f = 2. + 60. * (backoff_cnt as f32 / 5.).tanh();
// print(", ".join(["{:.5}".format(tanh(i/10)) for i in range(24)]))
const TANH: [f32; 24] = [
0.0, 0.099668, 0.19738, 0.29131, 0.37995, 0.46212, 0.53705, 0.60437, 0.66404, 0.7163, 0.76159, 0.8005,
0.83365, 0.86172, 0.88535, 0.90515, 0.92167, 0.93541, 0.94681, 0.95624, 0.96403, 0.97045, 0.97574, 0.9801,
];
const Y1: f32 = 30.;
const Y20: f32 = 300.;
const B: f32 = (Y20 - Y1) / (TANH[20] - TANH[1]);
const A: f32 = Y1 - B * TANH[1];
let backoff_cnt = backoff_cnt.max(1).min(20);
let f = A + B * TANH[backoff_cnt as usize];
let dtms = 1e3 * f;
Self {
since,
+6 -5
View File
@@ -13,11 +13,12 @@ use std::collections::HashMap;
use std::net::SocketAddr;
use std::time::SystemTime;
#[derive(Debug, ThisError)]
#[cstm(name = "StatusError")]
pub enum Error {
Internal,
}
autoerr::create_error_v1!(
name(Error, "StatusError"),
enum variants {
Internal,
},
);
#[derive(Debug, Serialize)]
pub struct ChannelStates {
+1
View File
@@ -10,6 +10,7 @@ async-channel = "2.1.1"
futures-util = "0.3.30"
smallvec = "1.13.2"
autoerr = "0.0.3"
itertools = "0.14"
log = { path = "../log" }
netpod = { path = "../../daqbuf-netpod", package = "daqbuf-netpod" }
items_0 = { path = "../../daqbuf-items-0", package = "daqbuf-items-0" }
+190 -29
View File
@@ -1,22 +1,33 @@
use crate::binwritergrid::BinWriterGrid;
use crate::log::*;
use crate::rtwriter::MinQuiets;
use items_0::timebin::BinnedBinsTimeweightTrait;
use items_0::timebin::BinnedEventsTimeweightTrait;
use items_0::timebin::BinsBoxed;
use items_2::binning::container_bins::ContainerBins;
use items_2::binning::container_events::ContainerEvents;
use items_2::binning::timeweight::timeweight_bins::BinnedBinsTimeweight;
use items_2::binning::timeweight::timeweight_bins_lazy::BinnedBinsTimeweightLazy;
use items_2::binning::timeweight::timeweight_events::BinnedEventsTimeweight;
use items_2::binning::timeweight::timeweight_events_dyn::BinnedEventsTimeweightLazy;
use netpod::ttl::RetentionTime;
use netpod::BinnedRange;
use netpod::DtMs;
use netpod::ScalarType;
use netpod::Shape;
use netpod::TsNano;
use scywr::insertqueues::InsertDeques;
use scywr::iteminsertqueue::QueryItem;
use scywr::iteminsertqueue::TimeBinSimpleF32V02;
use series::ChannelStatusSeriesId;
use series::SeriesId;
use std::mem;
use std::time::Duration;
macro_rules! trace_ingest { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
macro_rules! trace_tick { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
macro_rules! trace_tick_verbose { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
macro_rules! debug_bin2 { ($t:expr, $($arg:tt)*) => ( if $t { debug!($($arg)*); } ) }
autoerr::create_error_v1!(
name(Error, "SerieswriterBinwriter"),
enum variants {
@@ -24,18 +35,46 @@ autoerr::create_error_v1!(
SeriesWriter(#[from] crate::writer::Error),
Binning(#[from] items_2::binning::timeweight::timeweight_events::Error),
UnsupportedBinGrid(DtMs),
BinWriterGrid(#[from] crate::binwritergrid::Error),
BinBinning(#[from] items_0::timebin::BinningggError),
UnexpectedContainerType,
PartitionMsp(#[from] series::msp::Error),
},
);
fn bin_len_clamp(dur: Duration) -> DtMs {
let dur = DtMs::from_ms_u64(1000 * dur.as_secs());
if dur < DtMs::from_ms_u64(1000 * 2) {
DtMs::from_ms_u64(1000 * 1)
} else if dur <= DtMs::from_ms_u64(1000 * 20) {
DtMs::from_ms_u64(1000 * 10)
} else if dur <= DtMs::from_ms_u64(1000 * 60 * 2) {
DtMs::from_ms_u64(1000 * 60 * 1)
} else if dur <= DtMs::from_ms_u64(1000 * 60 * 20) {
DtMs::from_ms_u64(1000 * 60 * 10)
} else if dur <= DtMs::from_ms_u64(1000 * 60 * 60 * 2) {
DtMs::from_ms_u64(1000 * 60 * 60 * 1)
} else {
DtMs::from_ms_u64(1000 * 60 * 60 * 24)
}
}
fn get_div(bin_len: DtMs) -> Result<DtMs, Error> {
let pbp = series::msp::PrebinnedPartitioning::try_from(bin_len)?;
let ret = pbp.msp_div();
Ok(ret)
}
#[derive(Debug)]
pub struct BinWriter {
chname: String,
cssid: ChannelStatusSeriesId,
sid: SeriesId,
scalar_type: ScalarType,
shape: Shape,
evbuf: ContainerEvents<f32>,
writers: Vec<BinWriterGrid>,
binner_1st: Option<(RetentionTime, BinnedEventsTimeweight<f32>)>,
binner_others: Vec<(RetentionTime, BinnedBinsTimeweight<f32, f32>)>,
trd: bool,
}
impl BinWriter {
@@ -46,35 +85,59 @@ impl BinWriter {
sid: SeriesId,
scalar_type: ScalarType,
shape: Shape,
chname: String,
) -> Result<Self, Error> {
let mut writers = Vec::new();
for (rt, dur) in [RetentionTime::Short, RetentionTime::Medium, RetentionTime::Long]
.into_iter()
.zip([min_quiets.st.clone(), min_quiets.mt.clone(), min_quiets.lt.clone()].into_iter())
{
if dur > Duration::ZERO && dur < Duration::from_millis(1000 * 60 * 60 * 24) {
let bin_len = if dur < Duration::from_millis(1000 * 2) {
DtMs::from_ms_u64(1000 * 1)
} else if dur < Duration::from_millis(1000 * 20) {
DtMs::from_ms_u64(1000 * 10)
} else if dur < Duration::from_millis(1000 * 60 * 2) {
DtMs::from_ms_u64(1000 * 60 * 1)
} else if dur < Duration::from_millis(1000 * 60 * 20) {
DtMs::from_ms_u64(1000 * 60 * 10)
let trd = series::dbg::dbg_chn(&chname);
if trd {
debug_bin2!(trd, "enabled debug for {}", chname);
}
const DUR_ZERO: DtMs = DtMs::from_ms_u64(0);
const DUR_MAX: DtMs = DtMs::from_ms_u64(1000 * 60 * 60 * 24 * 40);
let rts = [RetentionTime::Short, RetentionTime::Medium, RetentionTime::Long];
let quiets = [min_quiets.st.clone(), min_quiets.mt.clone(), min_quiets.lt.clone()];
let mut binner_1st = None;
let mut binner_others = Vec::new();
let mut combs: Vec<_> = rts.into_iter().zip(quiets.into_iter().map(bin_len_clamp)).collect();
if let Some(last) = combs.last_mut() {
if last.1 >= DtMs::from_ms_u64(1000 * 60 * 60 * 24) {
last.0 = RetentionTime::Long;
} else {
combs.push((RetentionTime::Long, DtMs::from_ms_u64(1000 * 60 * 60 * 24)));
}
}
// check
for e in combs.iter() {
if get_div(e.1).is_err() {
info!("unsupported bin length {:?} {:?} {:?}", e.0, e.1, chname);
combs.clear();
break;
}
}
let combs = combs;
debug_bin2!(trd, "{:?} binning combs {:?}", chname, combs);
for (rt, bin_len) in combs {
if bin_len > DUR_ZERO && bin_len <= DUR_MAX {
if binner_1st.is_none() {
let range = BinnedRange::from_beg_to_inf(beg, bin_len);
let binner = BinnedEventsTimeweight::new(range);
binner_1st = Some((rt, binner));
} else {
DtMs::from_ms_u64(1000 * 60 * 60 * 1)
};
let writer = BinWriterGrid::new(beg, rt, bin_len, cssid, sid, scalar_type.clone(), shape.clone())?;
writers.push(writer);
let range = BinnedRange::from_beg_to_inf(beg, bin_len);
let binner = BinnedBinsTimeweight::new(range);
binner_others.push((rt, binner));
}
}
}
let ret = Self {
chname,
cssid,
sid,
scalar_type,
shape,
evbuf: ContainerEvents::new(),
writers,
binner_1st,
binner_others,
trd,
};
let _ = ret.cssid;
Ok(ret)
@@ -102,16 +165,114 @@ impl BinWriter {
pub fn tick(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> {
if self.evbuf.len() != 0 {
trace_tick!("tick evbuf len {}", self.evbuf.len());
let buf = mem::replace(&mut self.evbuf, ContainerEvents::new());
// TODO bin the more fine grid from the coarse grid, do not clone events
for writer in self.writers.iter_mut() {
writer.ingest(&buf, iqdqs)?;
let buf = &self.evbuf;
if true {
if let Some(binner) = self.binner_1st.as_mut() {
let rt = binner.0.clone();
// TODO avoid boxing
binner.1.ingest(&Box::new(buf))?;
let bins = binner.1.output();
if bins.len() > 0 {
trace_bin2!(self.trd, "binner_1st out len {}", bins.len());
Self::handle_output_ready(self.trd, self.sid, rt, &bins, iqdqs)?;
//
// TODO write these bins to scylla
//
// TODO avoid boxing
let mut bins2: BinsBoxed = Box::new(bins);
for i in 0..self.binner_others.len() {
let (rt, binner) = &mut self.binner_others[i];
binner.ingest(&bins2)?;
let bb: Option<BinsBoxed> = binner.output()?;
match bb {
Some(bb) => {
if bb.len() > 0 {
trace_bin2!(self.trd, "binner_others {} out len {}", i, bb.len());
if let Some(bb2) = bb.as_any_ref().downcast_ref::<ContainerBins<f32, f32>>() {
Self::handle_output_ready(self.trd, self.sid, rt.clone(), &bb2, iqdqs)?;
} else {
return Err(Error::UnexpectedContainerType);
}
//
// TODO write these bins to scylla
//
bins2 = bb;
} else {
break;
}
}
None => {
break;
}
}
}
} else {
// nothing to do
}
} else {
// nothing to do
}
}
self.evbuf.clear();
} else {
trace_tick_verbose!("tick NOTHING TO INGEST");
}
for writer in self.writers.iter_mut() {
writer.tick(iqdqs)?;
Ok(())
}
fn handle_output_ready(
trd: bool,
series: SeriesId,
rt: RetentionTime,
bins: &ContainerBins<f32, f32>,
iqdqs: &mut InsertDeques,
) -> Result<(), Error> {
let selfname = "handle_output_ready";
trace_tick!("{selfname} bins ready len {}", bins.len());
for e in bins.iter_debug() {
trace_tick_verbose!("{e:?}");
}
let bins_len = bins.len();
for (ts1, ts2, cnt, min, max, avg, lst, fnl) in bins.zip_iter_2() {
if fnl == false {
info!("non final bin");
} else if cnt == 0 {
info!("zero count bin");
} else {
let bin_len = DtMs::from_ms_u64(ts2.delta(ts1).ms_u64());
let div = get_div(bin_len)?;
if div.ns() % bin_len.ns() != 0 {
panic!("divisor not a multiple {:?} {:?}", bin_len, div);
}
let msp = ts1.ms() / div.ms();
let off = (ts1.ms() - div.ms() * msp) / bin_len.ms();
let item = QueryItem::TimeBinSimpleF32V02(TimeBinSimpleF32V02 {
series,
binlen: bin_len.ms() as i32,
msp: msp as i64,
off: off as i32,
cnt: cnt as i64,
min,
max,
avg,
dev: f32::NAN,
lst,
});
if bin_len >= DtMs::from_ms_u64(1000 * 60 * 60) {
debug_bin2!(trd, "handle_output_ready emit {:?} len {} {:?}", rt, bins_len, item);
}
match rt {
RetentionTime::Short => {
iqdqs.st_rf3_qu.push_back(item);
}
RetentionTime::Medium => {
iqdqs.mt_rf3_qu.push_back(item);
}
RetentionTime::Long => {
iqdqs.lt_rf3_qu.push_back(item);
}
}
}
}
Ok(())
}
-156
View File
@@ -1,156 +0,0 @@
use items_2::binning::container_bins::ContainerBins;
use items_2::binning::container_events::ContainerEvents;
use items_2::binning::timeweight::timeweight_events::BinnedEventsTimeweight;
use netpod::log::*;
use netpod::range::evrange::NanoRange;
use netpod::ttl::RetentionTime;
use netpod::BinnedRange;
use netpod::DtMs;
use netpod::ScalarType;
use netpod::Shape;
use netpod::TsNano;
use scywr::insertqueues::InsertDeques;
use scywr::iteminsertqueue::QueryItem;
use scywr::iteminsertqueue::TimeBinSimpleF32V02;
use series::ChannelStatusSeriesId;
use series::SeriesId;
macro_rules! trace_ingest { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
macro_rules! trace_tick { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
macro_rules! trace_tick_verbose { ($($arg:tt)*) => ( if false { trace!($($arg)*); } ) }
autoerr::create_error_v1!(
name(Error, "SerieswriterBinwriterGrid"),
enum variants {
SeriesLookupError,
SeriesWriter(#[from] crate::writer::Error),
Binning(#[from] items_2::binning::timeweight::timeweight_events::Error),
UnsupportedBinGrid(DtMs),
},
);
#[derive(Debug)]
pub struct BinWriterGrid {
rt: RetentionTime,
cssid: ChannelStatusSeriesId,
sid: SeriesId,
scalar_type: ScalarType,
shape: Shape,
binner: BinnedEventsTimeweight<f32>,
}
impl BinWriterGrid {
pub fn new(
beg: TsNano,
rt: RetentionTime,
bin_len: DtMs,
cssid: ChannelStatusSeriesId,
sid: SeriesId,
scalar_type: ScalarType,
shape: Shape,
) -> Result<Self, Error> {
let margin = 1000 * 1000 * 1000 * 60 * 60 * 24 * 40;
let end = (u64::MAX - margin) / bin_len.ns() * bin_len.ns();
let range = BinnedRange::from_nano_range(NanoRange::from_ns_u64(beg.ns(), end), bin_len);
let binner = BinnedEventsTimeweight::new(range).disable_cnt_zero();
let ret = Self {
rt,
cssid,
sid,
scalar_type,
shape,
binner,
};
let _ = &ret.rt;
let _ = &ret.cssid;
Ok(ret)
}
pub fn sid(&self) -> SeriesId {
self.sid.clone()
}
pub fn scalar_type(&self) -> ScalarType {
self.scalar_type.clone()
}
pub fn shape(&self) -> Shape {
self.shape.clone()
}
pub fn ingest(&mut self, evs: &ContainerEvents<f32>, iqdqs: &mut InsertDeques) -> Result<(), Error> {
let _ = iqdqs;
trace_ingest!("{:?} {:?}", self, evs);
self.binner.ingest(evs)?;
Ok(())
}
fn handle_output_ready(&mut self, out: ContainerBins<f32, f32>, iqdqs: &mut InsertDeques) -> Result<(), Error> {
let selfname = "handle_output_ready";
trace_tick!("{selfname} bins ready len {}", out.len());
for e in out.iter_debug() {
trace_tick_verbose!("{e:?}");
}
for (((((((&ts1, &ts2), &cnt), min), max), avg), lst), &fnl) in out.zip_iter() {
if fnl == false {
info!("non final bin");
} else if cnt == 0 {
info!("zero count bin");
} else {
let bin_len = DtMs::from_ms_u64(ts2.delta(ts1).ms_u64());
let div = if bin_len == DtMs::from_ms_u64(1000 * 1) {
DtMs::from_ms_u64(1000 * 60 * 10)
} else if bin_len == DtMs::from_ms_u64(1000 * 10) {
DtMs::from_ms_u64(1000 * 60 * 60 * 2)
} else if bin_len == DtMs::from_ms_u64(1000 * 60 * 1) {
DtMs::from_ms_u64(1000 * 60 * 60 * 8)
} else if bin_len == DtMs::from_ms_u64(1000 * 60 * 10) {
DtMs::from_ms_u64(1000 * 60 * 60 * 24 * 4)
} else if bin_len == DtMs::from_ms_u64(1000 * 60 * 60 * 1) {
DtMs::from_ms_u64(1000 * 60 * 60 * 24 * 28)
} else {
return Err(Error::UnsupportedBinGrid(bin_len));
};
if div.ns() % bin_len.ns() != 0 {
panic!("divisor not a multiple {:?} {:?}", bin_len, div);
}
let msp = ts1.ms() / div.ms();
let off = (ts1.ms() - div.ms() * msp) / bin_len.ms();
let item = QueryItem::TimeBinSimpleF32V02(TimeBinSimpleF32V02 {
series: self.sid.clone(),
binlen: bin_len.ms() as i32,
msp: msp as i64,
off: off as i32,
cnt: cnt as i64,
min,
max,
avg,
dev: f32::NAN,
lst,
});
match &self.rt {
RetentionTime::Short => {
iqdqs.st_rf3_qu.push_back(item);
}
RetentionTime::Medium => {
iqdqs.mt_rf3_qu.push_back(item);
}
RetentionTime::Long => {
iqdqs.lt_rf3_qu.push_back(item);
}
}
}
}
Ok(())
}
pub fn tick(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> {
let out = self.binner.output();
if out.len() != 0 {
self.handle_output_ready(out, iqdqs)?;
} else {
trace_tick_verbose!("tick NO BINS YET");
}
Ok(())
}
}
-1
View File
@@ -1,5 +1,4 @@
pub mod binwriter;
pub mod binwritergrid;
pub mod changewriter;
pub mod fixgridwriter;
pub mod msptool;
+1 -7
View File
@@ -11,13 +11,7 @@ use std::marker::PhantomData;
use std::time::Duration;
use std::time::Instant;
macro_rules! trace_rt_decision {
($det:expr, $($arg:tt)*) => {
if $det {
trace!($($arg)*);
}
};
}
macro_rules! trace_rt_decision { ($det:expr, $($arg:tt)*) => { if $det { trace!($($arg)*); } }; }
autoerr::create_error_v1!(
name(Error, "RateLimitWriter"),