From e0d24b6258d7dd45078abd456340f1e31c09172e Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 27 Jun 2024 16:28:52 +0200 Subject: [PATCH] WIP refactor binwriter --- daqingest/Cargo.toml | 2 +- netfetch/src/ca/conn.rs | 66 +++++++++++++++++++++++-- scywr/src/delete.rs | 1 - scywr/src/lib.rs | 1 - serieswriter/src/binwriter.rs | 92 +++++++++++++++++++++++++++++++++++ serieswriter/src/lib.rs | 1 + serieswriter/src/writer.rs | 16 ------ 7 files changed, 155 insertions(+), 24 deletions(-) delete mode 100644 scywr/src/delete.rs create mode 100644 serieswriter/src/binwriter.rs diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index 1313ae3..86c7ea7 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqingest" -version = "0.2.2-aa.0" +version = "0.2.2-aa.1" authors = ["Dominik Werder "] edition = "2021" diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index eaeeb2c..256f08e 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -49,6 +49,7 @@ use scywriiq::ConnectionStatusItem; use serde::Serialize; use series::ChannelStatusSeriesId; use series::SeriesId; +use serieswriter::binwriter::BinWriter; use serieswriter::establish_worker::EstablishWorkerJob; use serieswriter::establish_worker::JobId; use serieswriter::rtwriter::RtWriter; @@ -133,6 +134,7 @@ pub enum Error { IocIssue, Protocol(#[from] crate::ca::proto::Error), RtWriter(#[from] serieswriter::rtwriter::Error), + BinWriter(#[from] serieswriter::binwriter::Error), // TODO remove false positive from ThisError derive #[allow(private_interfaces)] UnknownCid(Cid), @@ -342,6 +344,7 @@ struct WritableState { tsbeg: Instant, channel: CreatedState, writer: RtWriter, + binwriter: BinWriter, reading: ReadingState, } @@ -414,6 +417,8 @@ struct CreatedState { dw_st_last: SystemTime, dw_mt_last: SystemTime, dw_lt_last: SystemTime, + scalar_type: ScalarType, + shape: Shape, } impl CreatedState { @@ -451,6 +456,8 @@ impl CreatedState { dw_st_last: SystemTime::UNIX_EPOCH, dw_mt_last: SystemTime::UNIX_EPOCH, dw_lt_last: SystemTime::UNIX_EPOCH, + scalar_type: ScalarType::I8, + shape: Shape::Scalar, } } } @@ -1105,6 +1112,7 @@ impl CaConn { fn handle_writer_establish_inner(&mut self, cid: Cid, writer: RtWriter) -> Result<(), Error> { trace!("handle_writer_establish_inner {cid:?}"); + let stnow = self.tmp_ts_poll.clone(); // At this point we have created the channel and created a writer for that type and sid. // We do not yet monitor. // TODO main objectives now: @@ -1116,6 +1124,13 @@ impl CaConn { let conf_poll_conf = conf.poll_conf(); let chst = &mut conf.state; if let ChannelState::MakingSeriesWriter(st2) = chst { + let binwriter = BinWriter::new( + st2.channel.cssid, + writer.sid(), + st2.channel.scalar_type.clone(), + st2.channel.shape.clone(), + stnow, + )?; self.stats.get_series_id_ok.inc(); { let item = QueryItem::ChannelStatus(ChannelStatusItem { @@ -1131,6 +1146,7 @@ impl CaConn { channel: std::mem::replace(&mut st2.channel, CreatedState::dummy()), // channel: st2.channel.clone(), writer, + binwriter, reading: ReadingState::Polling(PollingState { tsbeg: self.poll_tsnow, poll_ivl: Duration::from_millis(ivl), @@ -1168,6 +1184,7 @@ impl CaConn { tsbeg: self.poll_tsnow, channel: std::mem::replace(&mut st2.channel, CreatedState::dummy()), writer, + binwriter, reading: ReadingState::EnableMonitoring(EnableMonitoringState { tsbeg: self.poll_tsnow, subid, @@ -1456,9 +1473,20 @@ impl CaConn { }); let crst = &mut st.channel; let writer = &mut st.writer; + let binwriter = &mut st.binwriter; let iqdqs = &mut self.iqdqs; let stats = self.stats.as_ref(); - Self::event_add_ingest(ev.payload_len, ev.value, crst, writer, iqdqs, tsnow, stnow, stats)?; + Self::event_add_ingest( + ev.payload_len, + ev.value, + crst, + writer, + binwriter, + iqdqs, + tsnow, + stnow, + stats, + )?; } ReadingState::Monitoring(st2) => { match &mut st2.mon2state { @@ -1473,9 +1501,20 @@ impl CaConn { } let crst = &mut st.channel; let writer = &mut st.writer; + let binwriter = &mut st.binwriter; let iqdqs = &mut self.iqdqs; let stats = self.stats.as_ref(); - Self::event_add_ingest(ev.payload_len, ev.value, crst, writer, iqdqs, tsnow, stnow, stats)?; + Self::event_add_ingest( + ev.payload_len, + ev.value, + crst, + writer, + binwriter, + iqdqs, + tsnow, + stnow, + stats, + )?; } ReadingState::StopMonitoringForPolling(st2) => { // TODO count for metrics @@ -1665,7 +1704,18 @@ impl CaConn { ) -> Result<(), Error> { let crst = &mut st.channel; let writer = &mut st.writer; - Self::event_add_ingest(ev.payload_len, ev.value, crst, writer, iqdqs, tsnow, stnow, stats)?; + let binwriter = &mut st.binwriter; + Self::event_add_ingest( + ev.payload_len, + ev.value, + crst, + writer, + binwriter, + iqdqs, + tsnow, + stnow, + stats, + )?; Ok(()) } @@ -1674,6 +1724,7 @@ impl CaConn { value: CaEventValue, crst: &mut CreatedState, writer: &mut RtWriter, + binwriter: &mut BinWriter, iqdqs: &mut InsertDeques, tsnow: Instant, stnow: SystemTime, @@ -1705,9 +1756,12 @@ impl CaConn { Self::check_ev_value_data(&value.data, &writer.scalar_type())?; crst.muted_before = 0; crst.insert_item_ivl_ema.tick(tsnow); + let ts_ioc = TsNano::from_ns(ts); + let ts_local = TsNano::from_ns(ts_local); + let val: DataValue = value.data.into(); + binwriter.ingest(ts_ioc, ts_local, &val, iqdqs)?; { - let val: DataValue = value.data.into(); - let ((dwst, dwmt, dwlt),) = writer.write(TsNano::from_ns(ts), TsNano::from_ns(ts_local), val, iqdqs)?; + let ((dwst, dwmt, dwlt),) = writer.write(ts_ioc, ts_local, val, iqdqs)?; if dwst { crst.dw_st_last = stnow; crst.acc_st.push_written(payload_len); @@ -2250,6 +2304,8 @@ impl CaConn { dw_st_last: SystemTime::UNIX_EPOCH, dw_mt_last: SystemTime::UNIX_EPOCH, dw_lt_last: SystemTime::UNIX_EPOCH, + scalar_type: scalar_type.clone(), + shape: shape.clone(), }; *chst = ChannelState::MakingSeriesWriter(MakingSeriesWriterState { tsbeg: tsnow, channel }); let job = EstablishWorkerJob::new( diff --git a/scywr/src/delete.rs b/scywr/src/delete.rs deleted file mode 100644 index 8b13789..0000000 --- a/scywr/src/delete.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/scywr/src/lib.rs b/scywr/src/lib.rs index 6190a9b..9071e95 100644 --- a/scywr/src/lib.rs +++ b/scywr/src/lib.rs @@ -1,6 +1,5 @@ pub mod access; pub mod config; -pub mod delete; pub mod err; pub mod fut; pub mod futbatch; diff --git a/serieswriter/src/binwriter.rs b/serieswriter/src/binwriter.rs new file mode 100644 index 0000000..5133fc0 --- /dev/null +++ b/serieswriter/src/binwriter.rs @@ -0,0 +1,92 @@ +use crate::timebin::ConnTimeBin; +use crate::writer::SeriesWriter; +use async_channel::Sender; +use err::thiserror; +use err::ThisError; +use netpod::log::*; +use netpod::ScalarType; +use netpod::Shape; +use netpod::TsNano; +use scywr::insertqueues::InsertDeques; +use scywr::iteminsertqueue::DataValue; +use scywr::iteminsertqueue::QueryItem; +use series::ChannelStatusSeriesId; +use series::SeriesId; +use std::collections::VecDeque; +use std::time::Duration; +use std::time::SystemTime; + +#[allow(unused)] +macro_rules! trace_binning { + ($($arg:tt)*) => { + if false { + trace!($($arg)*); + } + }; +} + +#[derive(Debug, ThisError)] +pub enum Error { + SeriesLookupError, + SeriesWriter(#[from] crate::writer::Error), + Timebin(#[from] crate::timebin::Error), +} + +#[derive(Debug)] +pub struct BinWriter { + sid: SeriesId, + scalar_type: ScalarType, + shape: Shape, + binner: ConnTimeBin, +} + +impl BinWriter { + pub fn new( + // channel_info_tx: Sender, + cssid: ChannelStatusSeriesId, + sid: SeriesId, + scalar_type: ScalarType, + shape: Shape, + stnow: SystemTime, + ) -> Result { + type A = SeriesWriter; + let mut binner = ConnTimeBin::empty(sid.clone(), TsNano::from_ms(1000 * 2)); + binner.setup_for(&scalar_type, &shape, stnow)?; + let ret = Self { + sid, + scalar_type, + shape, + binner, + }; + Ok(ret) + } + + pub fn sid(&self) -> SeriesId { + self.sid.clone() + } + + pub fn scalar_type(&self) -> ScalarType { + self.scalar_type.clone() + } + + pub fn shape(&self) -> Shape { + self.shape.clone() + } + + pub fn ingest( + &mut self, + ts_ioc: TsNano, + ts_local: TsNano, + val: &DataValue, + iqdqs: &mut InsertDeques, + ) -> Result<(), Error> { + let ts_main = ts_local; + self.binner.push(ts_main.clone(), val)?; + Ok(()) + } + + pub fn tick(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> { + self.binner.tick(iqdqs)?; + Ok(()) + } +} diff --git a/serieswriter/src/lib.rs b/serieswriter/src/lib.rs index eac21c1..6cf4740 100644 --- a/serieswriter/src/lib.rs +++ b/serieswriter/src/lib.rs @@ -1,3 +1,4 @@ +pub mod binwriter; pub mod establish_worker; pub mod patchcollect; pub mod rtwriter; diff --git a/serieswriter/src/writer.rs b/serieswriter/src/writer.rs index 298c86e..5f67c5f 100644 --- a/serieswriter/src/writer.rs +++ b/serieswriter/src/writer.rs @@ -1,4 +1,3 @@ -use crate::timebin::ConnTimeBin; use async_channel::Sender; use dbpg::seriesbychannel::ChannelInfoQuery; use err::thiserror; @@ -57,7 +56,6 @@ pub struct SeriesWriter { msp_max_bytes: u32, // TODO this should be in an Option: ts_msp_grid_last: u32, - binner: Option, } impl SeriesWriter { @@ -115,10 +113,6 @@ impl SeriesWriter { shape: Shape, stnow: SystemTime, ) -> Result { - let mut binner = ConnTimeBin::empty(sid.clone(), TsNano::from_ns(SEC * 10)); - binner.setup_for(&scalar_type, &shape, stnow)?; - let _ = binner; - let binner = None; let res = Self { cssid, sid, @@ -130,7 +124,6 @@ impl SeriesWriter { msp_max_entries: 64000, msp_max_bytes: 1024 * 1024 * 20, ts_msp_grid_last: 0, - binner, }; Ok(res) } @@ -156,11 +149,6 @@ impl SeriesWriter { ) -> Result<(), Error> { let ts_main = ts_local; - // TODO compute the binned data here as well and flush completed bins if needed. - if let Some(binner) = self.binner.as_mut() { - binner.push(ts_main.clone(), &val)?; - } - // TODO decide on better msp/lsp: random offset! // As long as one writer is active, the msp is arbitrary. @@ -215,10 +203,6 @@ impl SeriesWriter { } pub fn tick(&mut self, deque: &mut VecDeque) -> Result<(), Error> { - if let Some(binner) = self.binner.as_mut() { - // TODO - //binner.tick(deque)?; - } Ok(()) } }