From 9ac197e7556bf2bedfcdf22900d4a0c648457acb Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Tue, 16 Jul 2024 15:31:47 +0200 Subject: [PATCH] Refactor series writer --- batchtools/src/jobchn.rs | 9 + batchtools/src/lib.rs | 1 + daqingest/src/daemon.rs | 55 ++--- netfetch/src/ca/beacons.rs | 14 +- netfetch/src/ca/conn.rs | 312 ++++++++++++++++++--------- netfetch/src/ca/conn/enumfetch.rs | 37 ++-- netfetch/src/ca/connset.rs | 5 - netfetch/src/ca/proto.rs | 51 ++++- netfetch/src/metrics.rs | 1 - netfetch/src/metrics/ingest.rs | 55 ++++- netfetch/src/metrics/postingest.rs | 109 ---------- scywr/src/insertqueues.rs | 14 ++ scywr/src/insertworker.rs | 17 +- scywr/src/iteminsertqueue.rs | 30 +-- serieswriter/src/changewriter.rs | 0 serieswriter/src/establish_worker.rs | 193 ----------------- serieswriter/src/lib.rs | 3 +- serieswriter/src/ratelimitwriter.rs | 117 ++++++++++ serieswriter/src/rtwriter.rs | 195 ++++------------- serieswriter/src/writer.rs | 91 +++----- 20 files changed, 592 insertions(+), 717 deletions(-) create mode 100644 batchtools/src/jobchn.rs delete mode 100644 netfetch/src/metrics/postingest.rs create mode 100644 serieswriter/src/changewriter.rs delete mode 100644 serieswriter/src/establish_worker.rs create mode 100644 serieswriter/src/ratelimitwriter.rs diff --git a/batchtools/src/jobchn.rs b/batchtools/src/jobchn.rs new file mode 100644 index 0000000..e4fa603 --- /dev/null +++ b/batchtools/src/jobchn.rs @@ -0,0 +1,9 @@ +use std::marker::PhantomData; + +pub struct JobChnWorker { + _t1: PhantomData, +} + +impl JobChnWorker {} + +//pub fn submit_and_await(job) diff --git a/batchtools/src/lib.rs b/batchtools/src/lib.rs index 6a20484..40af1b4 100644 --- a/batchtools/src/lib.rs +++ b/batchtools/src/lib.rs @@ -1,3 +1,4 @@ pub mod batcher; #[cfg(test)] pub mod channeltest; +pub mod jobchn; diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 2b823d4..29392d0 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -25,6 +25,8 @@ use scywr::insertqueues::InsertQueuesTx; use scywr::insertworker::InsertWorkerOpts; use scywr::iteminsertqueue as scywriiq; use scywriiq::QueryItem; +use stats::rand_xoshiro::rand_core::RngCore; +use stats::rand_xoshiro::Xoshiro128PlusPlus; use stats::DaemonStats; use stats::InsertWorkerStats; use stats::SeriesByChannelStats; @@ -103,13 +105,6 @@ impl Daemon { let insert_queue_counter = Arc::new(AtomicUsize::new(0)); - let wrest_stats = Arc::new(SeriesWriterEstablishStats::new()); - let (writer_establis_tx,) = serieswriter::establish_worker::start_writer_establish_worker( - channel_info_query_tx.clone(), - wrest_stats.clone(), - ) - .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; - let local_epics_hostname = ingest_linux::net::local_hostname(); #[cfg(DISABLED)] @@ -169,7 +164,6 @@ impl Daemon { iqtx, channel_info_query_tx.clone(), ingest_opts.clone(), - writer_establis_tx, ); // TODO remove @@ -642,15 +636,6 @@ impl Daemon { } pub async fn daemon(mut self) -> Result<(), Error> { - let worker_jh = { - let backend = String::new(); - let (_item_tx, item_rx) = async_channel::bounded(256); - let info_worker_tx = self.channel_info_query_tx.clone(); - use netfetch::metrics::postingest::process_api_query_items; - let iqtx = self.iqtx.clone().unwrap(); - let worker_fut = process_api_query_items(backend, item_rx, info_worker_tx, iqtx); - taskrun::spawn(worker_fut) - }; self.spawn_metrics().await?; Self::spawn_ticker(self.tx.clone(), self.stats.clone()); loop { @@ -677,22 +662,6 @@ impl Daemon { jh.await??; } debug!("joined metrics handler"); - debug!("wait for postingest task"); - match worker_jh.await? { - Ok(_) => {} - Err(e) => match e { - netfetch::metrics::postingest::Error::Msg => { - error!("{e}"); - } - netfetch::metrics::postingest::Error::SeriesWriter(_) => { - error!("{e}"); - } - netfetch::metrics::postingest::Error::SendError => { - error!("join postingest in better way"); - } - }, - } - debug!("joined postingest task"); debug!("wait for insert workers"); while let Some(jh) = self.insert_workers_jh.pop() { match jh.await.map_err(Error::from_string) { @@ -794,7 +763,19 @@ pub async fn run(opts: CaIngestOpts, channels_config: Option) -> debug!("will configure {} channels", channels_config.len()); let mut thr_msg = ThrottleTrace::new(Duration::from_millis(1000)); let mut i = 0; - for ch_cfg in channels_config.channels() { + let nmax = usize::MAX; + let nn = channels_config.channels().len(); + let mut ixs: Vec = (0..nn).into_iter().collect(); + if false { + let mut rng = stats::xoshiro_from_time(); + for _ in 0..2 * ixs.len() { + let i = rng.next_u32() as usize % nn; + let j = rng.next_u32() as usize % nn; + ixs.swap(i, j); + } + } + for ix in ixs.into_iter().take(nmax) { + let ch_cfg = &channels_config.channels()[ix]; match daemon_tx .send(DaemonEvent::ChannelAdd(ch_cfg.clone(), async_channel::bounded(1).0)) .await @@ -808,7 +789,11 @@ pub async fn run(opts: CaIngestOpts, channels_config: Option) -> thr_msg.trigger("daemon sent ChannelAdd", &[&i as &_]); i += 1; } - debug!("{} configured channels applied", channels_config.len()); + debug!( + "{} of {} configured channels applied", + i, + channels_config.channels().len() + ); } daemon_jh.await.map_err(|e| Error::with_msg_no_trace(e.to_string()))??; info!("Joined daemon"); diff --git a/netfetch/src/ca/beacons.rs b/netfetch/src/ca/beacons.rs index 0e5774f..7f5aed6 100644 --- a/netfetch/src/ca/beacons.rs +++ b/netfetch/src/ca/beacons.rs @@ -32,12 +32,12 @@ pub async fn listen_beacons( let channel = "epics-ca-beacons".to_string(); let scalar_type = ScalarType::U64; let shape = Shape::Scalar; - let mut writer = SeriesWriter::establish(worker_tx, backend, channel, scalar_type, shape, stnow).await?; + // let mut writer = SeriesWriter::establish(worker_tx, backend, channel, scalar_type, shape, stnow).await?; + // let mut deque = VecDeque::new(); let sock = UdpSocket::bind("0.0.0.0:5065").await?; sock.set_broadcast(true).unwrap(); let mut buf = Vec::new(); buf.resize(1024 * 4, 0); - let mut deque = VecDeque::new(); loop { let bb = &mut buf; let (n, remote) = taskrun::tokio::select! { @@ -66,13 +66,13 @@ pub async fn listen_beacons( let ts_local = ts; let blob = addr_u32 as i64; let val = DataValue::Scalar(ScalarValue::I64(blob)); - writer.write(ts, ts_local, val, &mut deque)?; + // writer.write(ts, ts_local, val, &mut deque)?; } } - if deque.len() != 0 { - // TODO deliver to insert queue - deque.clear(); - } + // if deque.len() != 0 { + // TODO deliver to insert queue + // deque.clear(); + // } } Ok(()) } diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index bf2dd9c..eadc2b2 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -1,6 +1,7 @@ mod enumfetch; use super::proto; +use super::proto::CaDataValue; use super::proto::CaEventValue; use super::proto::ReadNotify; use crate::ca::proto::ChannelClose; @@ -12,9 +13,11 @@ use async_channel::Receiver; use async_channel::Sender; use core::fmt; use dbpg::seriesbychannel::ChannelInfoQuery; +use dbpg::seriesbychannel::ChannelInfoResult; use enumfetch::ConnFuture; use err::thiserror; use err::ThisError; +use futures_util::pin_mut; use futures_util::Future; use futures_util::FutureExt; use futures_util::Stream; @@ -24,6 +27,7 @@ use log::*; use netpod::timeunits::*; use netpod::ttl::RetentionTime; use netpod::ScalarType; +use netpod::SeriesKind; use netpod::Shape; use netpod::TsMs; use netpod::TsNano; @@ -53,9 +57,8 @@ use serde::Serialize; use series::ChannelStatusSeriesId; use series::SeriesId; use serieswriter::binwriter::BinWriter; -use serieswriter::establish_worker::EstablishWorkerJob; -use serieswriter::establish_worker::JobId; use serieswriter::rtwriter::RtWriter; +use serieswriter::writer::EmittableType; use stats::rand_xoshiro::rand_core::RngCore; use stats::rand_xoshiro::rand_core::SeedableRng; use stats::rand_xoshiro::Xoshiro128PlusPlus; @@ -142,6 +145,8 @@ fn dbg_chn_cid(cid: Cid, conn: &CaConn) -> bool { } } +type CaRtWriter = RtWriter; + #[derive(Debug, ThisError)] #[cstm(name = "NetfetchConn")] pub enum Error { @@ -171,6 +176,7 @@ pub enum Error { FutLogic, MissingTimestamp, EnumFetch(#[from] enumfetch::Error), + SeriesLookup(#[from] dbpg::seriesbychannel::Error), } impl err::ToErr for Error { @@ -374,7 +380,7 @@ enum PollTickState { struct WritableState { tsbeg: Instant, channel: CreatedState, - writer: RtWriter, + writer: CaRtWriter, binwriter: BinWriter, reading: ReadingState, } @@ -600,7 +606,7 @@ impl ChannelState { _ => None, }; let series = match self { - ChannelState::Writable(s) => Some(s.writer.sid()), + ChannelState::Writable(s) => Some(s.writer.series()), _ => None, }; let interest_score = 1. / item_recv_ivl_ema.unwrap_or(1e10).max(1e-6).min(1e10); @@ -845,15 +851,22 @@ impl CaConnEvent { } pub fn desc_short(&self) -> CaConnEventDescShort { - CaConnEventDescShort {} + CaConnEventDescShort { inner: self } } } -pub struct CaConnEventDescShort {} +pub struct CaConnEventDescShort<'a> { + inner: &'a CaConnEvent, +} -impl fmt::Display for CaConnEventDescShort { +impl<'a> fmt::Display for CaConnEventDescShort<'a> { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - write!(fmt, "CaConnEventDescShort {{ TODO-impl }}") + write!( + fmt, + "CaConnEventDescShort {{ ts: {:?}, value: {} }}", + self.inner.ts, + self.inner.value.desc_short() + ) } } @@ -867,6 +880,19 @@ pub enum CaConnEventValue { EndOfStream(EndOfStreamReason), } +impl CaConnEventValue { + pub fn desc_short(&self) -> &'static str { + match self { + CaConnEventValue::None => "None", + CaConnEventValue::EchoTimeout => "EchoTimeout", + CaConnEventValue::ConnCommandResult(_) => "ConnCommandResult", + CaConnEventValue::ChannelStatus(_) => "ChannelStatus", + CaConnEventValue::ChannelCreateFail(_) => "ChannelCreateFail", + CaConnEventValue::EndOfStream(_) => "EndOfStream", + } + } +} + #[derive(Debug)] pub enum EndOfStreamReason { UnspecifiedReason, @@ -934,10 +960,12 @@ pub struct CaConn { ca_proto_stats: Arc, weird_count: usize, rng: Xoshiro128PlusPlus, - writer_establish_qu: VecDeque, - writer_establish_tx: Pin>>, - writer_tx: Sender<(JobId, Result)>, - writer_rx: Pin)>>>, + channel_info_query_qu: VecDeque, + channel_info_query_tx: Pin>>, + channel_info_query_res_rxs: VecDeque<( + Pin>>>, + Cid, + )>, tmp_ts_poll: SystemTime, poll_tsnow: Instant, ioid: u32, @@ -961,11 +989,9 @@ impl CaConn { channel_info_query_tx: Sender, stats: Arc, ca_proto_stats: Arc, - writer_establish_tx: Sender, ) -> Self { let _ = channel_info_query_tx; let tsnow = Instant::now(); - let (writer_tx, writer_rx) = async_channel::bounded(32); let (cq_tx, cq_rx) = async_channel::bounded(32); let mut rng = stats::xoshiro_from_time(); Self { @@ -1001,10 +1027,9 @@ impl CaConn { ca_proto_stats, weird_count: 0, rng, - writer_establish_qu: VecDeque::new(), - writer_establish_tx: Box::pin(SenderPolling::new(writer_establish_tx)), - writer_tx, - writer_rx: Box::pin(writer_rx), + channel_info_query_qu: VecDeque::new(), + channel_info_query_tx: Box::pin(SenderPolling::new(channel_info_query_tx)), + channel_info_query_res_rxs: VecDeque::new(), tmp_ts_poll: SystemTime::now(), poll_tsnow: tsnow, ioid: 100, @@ -1142,30 +1167,65 @@ impl CaConn { fn handle_writer_establish_result(&mut self, cx: &mut Context) -> Result>, Error> { use Poll::*; + let mut have_progress = false; + let mut have_pending = false; + let stnow = self.tmp_ts_poll; if self.is_shutdown() { Ok(Ready(None)) } else { - let rx = self.writer_rx.as_mut(); - match rx.poll_next(cx) { - Ready(Some(res)) => { - trace!("handle_writer_establish_result recv {}", self.remote_addr_dbg); - let jobid = res.0; - // by convention: - let cid = Cid(jobid.0 as _); - let wr = res.1?; - self.handle_writer_establish_inner(cid, wr)?; - Ok(Ready(Some(()))) + let n = self.channel_info_query_res_rxs.len().min(16); + let mut i = 0; + while let Some(x) = self.channel_info_query_res_rxs.pop_front() { + let mut rx = x.0; + let cid = x.1; + match rx.poll_next_unpin(cx) { + Ready(Some(res)) => { + trace!("handle_writer_establish_result recv {}", self.remote_addr_dbg); + let chinfo = res?; + if let Some(ch) = self.channels.get(&cid) { + if let ChannelState::MakingSeriesWriter(st) = &ch.state { + let scalar_type = st.channel.scalar_type.clone(); + let shape = st.channel.shape.clone(); + let writer = RtWriter::new( + chinfo.series.to_series(), + scalar_type, + shape, + ch.conf.min_quiets(), + stnow, + )?; + self.handle_writer_establish_inner(cid, writer)?; + have_progress = true; + } else { + return Err(Error::Error); + } + } else { + return Err(Error::Error); + } + } + Ready(None) => { + error!("channel lookup queue closed"); + } + Pending => { + self.channel_info_query_res_rxs.push_back((rx, cid)); + have_pending = true; + } } - Ready(None) => { - error!("writer_establish queue closed"); - Ok(Ready(None)) + i += 1; + if i >= n { + break; } - Pending => Ok(Pending), + } + if have_progress { + Ok(Ready(Some(()))) + } else if have_pending { + Ok(Pending) + } else { + Ok(Ready(None)) } } } - fn handle_writer_establish_inner(&mut self, cid: Cid, writer: RtWriter) -> Result<(), Error> { + fn handle_writer_establish_inner(&mut self, cid: Cid, writer: CaRtWriter) -> Result<(), Error> { trace!("handle_writer_establish_inner {cid:?}"); let dbg_chn_cid = dbg_chn_cid(cid, self); if dbg_chn_cid { @@ -1183,7 +1243,7 @@ impl CaConn { beg, RetentionTime::Short, st2.channel.cssid, - writer.sid(), + writer.series(), st2.channel.scalar_type.clone(), st2.channel.shape.clone(), )?; @@ -1812,55 +1872,11 @@ impl CaConn { Ok(()) } - fn convert_event_data(crst: &mut CreatedState, data: super::proto::CaDataValue) -> Result { - use super::proto::CaDataValue; - use scywr::iteminsertqueue::DataValue; - let ret = match data { - CaDataValue::Scalar(val) => DataValue::Scalar({ - use super::proto::CaDataScalarValue; - use scywr::iteminsertqueue::ScalarValue; - match val { - CaDataScalarValue::I8(x) => ScalarValue::I8(x), - CaDataScalarValue::I16(x) => ScalarValue::I16(x), - CaDataScalarValue::I32(x) => ScalarValue::I32(x), - CaDataScalarValue::F32(x) => ScalarValue::F32(x), - CaDataScalarValue::F64(x) => ScalarValue::F64(x), - CaDataScalarValue::Enum(x) => ScalarValue::Enum(x, { - let conv = crst.enum_str_table.as_ref().map_or_else( - || String::from("missingstrings"), - |map| { - map.get(x as usize) - .map_or_else(|| String::from("undefined"), String::from) - }, - ); - info!("convert_event_data {} {:?}", crst.name(), conv); - conv - }), - CaDataScalarValue::String(x) => ScalarValue::String(x), - CaDataScalarValue::Bool(x) => ScalarValue::Bool(x), - } - }), - CaDataValue::Array(val) => DataValue::Array({ - use super::proto::CaDataArrayValue; - use scywr::iteminsertqueue::ArrayValue; - match val { - CaDataArrayValue::I8(x) => ArrayValue::I8(x), - CaDataArrayValue::I16(x) => ArrayValue::I16(x), - CaDataArrayValue::I32(x) => ArrayValue::I32(x), - CaDataArrayValue::F32(x) => ArrayValue::F32(x), - CaDataArrayValue::F64(x) => ArrayValue::F64(x), - CaDataArrayValue::Bool(x) => ArrayValue::Bool(x), - } - }), - }; - Ok(ret) - } - fn event_add_ingest( payload_len: u32, value: CaEventValue, crst: &mut CreatedState, - writer: &mut RtWriter, + writer: &mut CaRtWriter, binwriter: &mut BinWriter, iqdqs: &mut InsertDeques, tsnow: Instant, @@ -1872,7 +1888,7 @@ impl CaConn { match &value.meta { CaMetaTime(meta) => { if meta.status != 0 { - let sid = writer.sid(); + let sid = writer.series(); debug!("{:?} status {:3} severity {:3}", sid, meta.status, meta.severity); } } @@ -1901,10 +1917,9 @@ impl CaConn { crst.insert_item_ivl_ema.tick(tsnow); let ts_ioc = TsNano::from_ns(ts); let ts_local = TsNano::from_ns(ts_local); - let val = Self::convert_event_data(crst, value.data)?; // binwriter.ingest(ts_ioc, ts_local, &val, iqdqs)?; { - let ((dwst, dwmt, dwlt),) = writer.write(ts_ioc, ts_local, val, iqdqs)?; + let ((dwst, dwmt, dwlt),) = writer.write(CaWriterValue::new(value, crst), tsnow, iqdqs)?; if dwst { crst.dw_st_last = stnow; crst.acc_st.push_written(payload_len); @@ -2477,6 +2492,7 @@ impl CaConn { } match &scalar_type { ScalarType::Enum => { + // TODO channel created, now fetch enum variants, later make writer let min_quiets = conf.conf.min_quiets(); let fut = enumfetch::EnumFetch::new(created_state, self, min_quiets); // TODO should always check if the slot is free. @@ -2485,22 +2501,26 @@ impl CaConn { self.handler_by_ioid.insert(ioid, Some(x)); } _ => { + let backend = self.backend.clone(); + let channel_name = created_state.name().into(); *chst = ChannelState::MakingSeriesWriter(MakingSeriesWriterState { tsbeg: tsnow, channel: created_state, }); - let job = EstablishWorkerJob::new( - JobId(cid.0 as _), - self.backend.clone(), - conf.conf.name().into(), - cssid, - scalar_type, - shape, - conf.conf.min_quiets(), - self.writer_tx.clone(), - self.tmp_ts_poll, - ); - self.writer_establish_qu.push_back(job); + // TODO create a channel for the answer. + // Keep only a certain max number of channels in-flight because have to poll on them. + // TODO register the channel for the answer. + let (tx, rx) = async_channel::bounded(8); + let item = ChannelInfoQuery { + backend, + channel: channel_name, + kind: SeriesKind::ChannelData, + scalar_type: scalar_type.clone(), + shape: shape.clone(), + tx: Box::pin(tx), + }; + self.channel_info_query_qu.push_back(item); + self.channel_info_query_res_rxs.push_back((Box::pin(rx), cid)); } } Ok(()) @@ -2714,6 +2734,7 @@ impl CaConn { CaConnState::Shutdown(..) => {} CaConnState::EndOfStream => {} } + self.iqdqs.housekeeping(); Ok(()) } @@ -2760,7 +2781,7 @@ impl CaConn { ]) { if acc.beg != msp { if acc.usage().count() != 0 { - let series = st1.writer.sid(); + let series = st1.writer.series(); let item = Accounting { part: (series.id() & 0xff) as i32, ts: acc.beg, @@ -2778,7 +2799,7 @@ impl CaConn { let acc = &mut ch.acc_recv; if acc.beg != msp { if acc.usage().count() != 0 { - let series = st1.writer.sid(); + let series = st1.writer.series(); let item = AccountingRecv { part: (series.id() & 0xff) as i32, ts: acc.beg, @@ -3071,12 +3092,12 @@ impl Stream for CaConn { if !self.is_shutdown() { flush_queue!( self, - writer_establish_qu, - writer_establish_tx, + channel_info_query_qu, + channel_info_query_tx, send_individual, 32, (&mut have_progress, &mut have_pending), - "wrest", + "chinf", cx, |_| {} ); @@ -3215,3 +3236,96 @@ impl Stream for CaConn { ret } } + +#[derive(Debug, Clone)] +struct CaWriterValue(CaEventValue, Option); + +impl CaWriterValue { + fn new(val: CaEventValue, crst: &CreatedState) -> Self { + let valstr = match &val.data { + CaDataValue::Scalar(val) => { + use super::proto::CaDataScalarValue; + match val { + CaDataScalarValue::Enum(x) => { + let x = *x; + let table = crst.enum_str_table.as_ref(); + let conv = table.map_or_else( + || String::from("missingstrings"), + |map| { + map.get(x as usize) + .map_or_else(|| String::from("undefined"), String::from) + }, + ); + trace!("CaWriterValue convert enum {} {:?}", crst.name(), conv); + Some(conv) + } + _ => None, + } + } + _ => None, + }; + Self(val, valstr) + } +} + +impl EmittableType for CaWriterValue { + fn ts(&self) -> TsNano { + TsNano::from_ns(self.0.ts().unwrap_or(0)) + } + + fn has_change(&self, k: &Self) -> bool { + if self.0.data != k.0.data { + true + } else { + false + } + } + + fn byte_size(&self) -> u32 { + self.0.data.byte_size() + } + + fn into_data_value(mut self) -> DataValue { + // TODO need to pass a ref to channel state to convert enum strings. + // Or do that already when we construct this? + // Also, in general, need to produce a SmallVec of values to emit: value, status, severity, etc.. + // let val = Self::convert_event_data(crst, value.data)?; + use super::proto::CaDataValue; + use scywr::iteminsertqueue::DataValue; + let ret = match self.0.data { + CaDataValue::Scalar(val) => DataValue::Scalar({ + use super::proto::CaDataScalarValue; + use scywr::iteminsertqueue::ScalarValue; + match val { + CaDataScalarValue::I8(x) => ScalarValue::I8(x), + CaDataScalarValue::I16(x) => ScalarValue::I16(x), + CaDataScalarValue::I32(x) => ScalarValue::I32(x), + CaDataScalarValue::F32(x) => ScalarValue::F32(x), + CaDataScalarValue::F64(x) => ScalarValue::F64(x), + CaDataScalarValue::Enum(x) => ScalarValue::Enum( + x, + self.1.take().unwrap_or_else(|| { + warn!("NoEnumStr"); + String::from("NoEnumStr") + }), + ), + CaDataScalarValue::String(x) => ScalarValue::String(x), + CaDataScalarValue::Bool(x) => ScalarValue::Bool(x), + } + }), + CaDataValue::Array(val) => DataValue::Array({ + use super::proto::CaDataArrayValue; + use scywr::iteminsertqueue::ArrayValue; + match val { + CaDataArrayValue::I8(x) => ArrayValue::I8(x), + CaDataArrayValue::I16(x) => ArrayValue::I16(x), + CaDataArrayValue::I32(x) => ArrayValue::I32(x), + CaDataArrayValue::F32(x) => ArrayValue::F32(x), + CaDataArrayValue::F64(x) => ArrayValue::F64(x), + CaDataArrayValue::Bool(x) => ArrayValue::Bool(x), + } + }), + }; + ret + } +} diff --git a/netfetch/src/ca/conn/enumfetch.rs b/netfetch/src/ca/conn/enumfetch.rs index 5e09de1..46c6f77 100644 --- a/netfetch/src/ca/conn/enumfetch.rs +++ b/netfetch/src/ca/conn/enumfetch.rs @@ -3,10 +3,10 @@ use super::CreatedState; use super::Ioid; use crate::ca::proto::CaMsg; use crate::ca::proto::ReadNotify; +use dbpg::seriesbychannel::ChannelInfoQuery; use err::thiserror; use err::ThisError; use log::*; -use serieswriter::establish_worker::EstablishWorkerJob; use std::pin::Pin; use std::time::Instant; @@ -67,29 +67,36 @@ impl ConnFuture for EnumFetch { super::proto::CaMetaValue::CaMetaVariants(meta) => { crst.enum_str_table = Some(meta.variants); } - _ => {} + _ => { + warn!("unexpected message"); + } }, - _ => {} + _ => { + warn!("unexpected message"); + } }; + // TODO create a channel for the answer. + // TODO register the channel for the answer. + let cid = crst.cid.clone(); + let (tx, rx) = async_channel::bounded(8); + let item = ChannelInfoQuery { + backend: conn.backend.clone(), + channel: crst.name().into(), + kind: netpod::SeriesKind::ChannelData, + scalar_type: crst.scalar_type.clone(), + shape: crst.shape.clone(), + tx: Box::pin(tx), + }; + conn.channel_info_query_qu.push_back(item); + conn.channel_info_query_res_rxs.push_back((Box::pin(rx), cid)); + // This handler must not exist if the channel gets removed. let conf = conn.channels.get_mut(&crst.cid).ok_or(Error::MissingState)?; conf.state = super::ChannelState::MakingSeriesWriter(super::MakingSeriesWriterState { tsbeg: tsnow, channel: crst.clone(), }); - let job = EstablishWorkerJob::new( - serieswriter::establish_worker::JobId(crst.cid.0 as _), - conn.backend.clone(), - crst.name().into(), - crst.cssid.clone(), - crst.scalar_type.clone(), - crst.shape.clone(), - self.min_quiets.clone(), - conn.writer_tx.clone(), - conn.tmp_ts_poll, - ); - conn.writer_establish_qu.push_back(job); conn.handler_by_ioid.remove(&self.ioid); Ok(()) diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index cebda2c..04e2b8d 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -63,7 +63,6 @@ use std::pin::Pin; use netpod::OnDrop; use scywr::insertqueues::InsertQueuesTx; -use serieswriter::establish_worker::EstablishWorkerJob; use std::sync::Arc; use std::task::Context; use std::task::Poll; @@ -387,7 +386,6 @@ pub struct CaConnSet { ca_proto_stats: Arc, rogue_channel_count: u64, connect_fail_count: usize, - establish_worker_tx: async_channel::Sender, cssid_latency_max: Duration, } @@ -402,7 +400,6 @@ impl CaConnSet { iqtx: InsertQueuesTx, channel_info_query_tx: Sender, ingest_opts: CaIngestOpts, - establish_worker_tx: async_channel::Sender, ) -> CaConnSetCtrl { let (ca_conn_res_tx, ca_conn_res_rx) = async_channel::bounded(200); let (connset_inp_tx, connset_inp_rx) = async_channel::bounded(200); @@ -459,7 +456,6 @@ impl CaConnSet { ca_proto_stats: ca_proto_stats.clone(), rogue_channel_count: 0, connect_fail_count: 0, - establish_worker_tx, cssid_latency_max: Duration::from_millis(2000), }; // TODO await on jh @@ -1054,7 +1050,6 @@ impl CaConnSet { .ok_or_else(|| Error::with_msg_no_trace("no more channel_info_query_tx available"))?, self.ca_conn_stats.clone(), self.ca_proto_stats.clone(), - self.establish_worker_tx.clone(), ); let conn_tx = conn.conn_command_tx(); let conn_stats = conn.stats(); diff --git a/netfetch/src/ca/proto.rs b/netfetch/src/ca/proto.rs index 437f90c..1949826 100644 --- a/netfetch/src/ca/proto.rs +++ b/netfetch/src/ca/proto.rs @@ -247,7 +247,7 @@ impl CaDbrType { } } -#[derive(Clone, Debug)] +#[derive(Debug, Clone, PartialEq)] pub enum CaDataScalarValue { I8(i8), I16(i16), @@ -260,7 +260,22 @@ pub enum CaDataScalarValue { Bool(bool), } -#[derive(Clone, Debug)] +impl CaDataScalarValue { + fn byte_size(&self) -> u32 { + match self { + CaDataScalarValue::I8(_) => 1, + CaDataScalarValue::I16(_) => 2, + CaDataScalarValue::I32(_) => 4, + CaDataScalarValue::F32(_) => 4, + CaDataScalarValue::F64(_) => 8, + CaDataScalarValue::Enum(_) => 2, + CaDataScalarValue::String(v) => v.len() as u32, + CaDataScalarValue::Bool(_) => 1, + } + } +} + +#[derive(Debug, Clone, PartialEq)] pub enum CaDataArrayValue { I8(Vec), I16(Vec), @@ -271,13 +286,35 @@ pub enum CaDataArrayValue { Bool(Vec), } -#[derive(Clone, Debug)] +impl CaDataArrayValue { + fn byte_size(&self) -> u32 { + match self { + CaDataArrayValue::I8(x) => 1 * x.len() as u32, + CaDataArrayValue::I16(x) => 2 * x.len() as u32, + CaDataArrayValue::I32(x) => 4 * x.len() as u32, + CaDataArrayValue::F32(x) => 4 * x.len() as u32, + CaDataArrayValue::F64(x) => 8 * x.len() as u32, + CaDataArrayValue::Bool(x) => 1 * x.len() as u32, + } + } +} + +#[derive(Debug, Clone, PartialEq)] pub enum CaDataValue { Scalar(CaDataScalarValue), Array(CaDataArrayValue), } -#[derive(Clone, Debug)] +impl CaDataValue { + pub fn byte_size(&self) -> u32 { + match self { + CaDataValue::Scalar(x) => x.byte_size(), + CaDataValue::Array(x) => x.byte_size(), + } + } +} + +#[derive(Debug, Clone, PartialEq)] pub struct CaEventValue { pub data: CaDataValue, pub meta: CaMetaValue, @@ -296,13 +333,13 @@ impl CaEventValue { } } -#[derive(Clone, Debug)] +#[derive(Debug, Clone, PartialEq)] pub enum CaMetaValue { CaMetaTime(CaMetaTime), CaMetaVariants(CaMetaVariants), } -#[derive(Clone, Debug)] +#[derive(Debug, Clone, PartialEq)] pub struct CaMetaTime { pub status: u16, pub severity: u16, @@ -310,7 +347,7 @@ pub struct CaMetaTime { pub ca_nanos: u32, } -#[derive(Clone, Debug)] +#[derive(Debug, Clone, PartialEq)] pub struct CaMetaVariants { pub status: u16, pub severity: u16, diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index 71e3b87..66fb42e 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -1,7 +1,6 @@ #![allow(unused)] pub mod delete; pub mod ingest; -pub mod postingest; pub mod status; use crate::ca::conn::ChannelStateInfo; diff --git a/netfetch/src/metrics/ingest.rs b/netfetch/src/metrics/ingest.rs index b81e335..a04700e 100644 --- a/netfetch/src/metrics/ingest.rs +++ b/netfetch/src/metrics/ingest.rs @@ -5,6 +5,7 @@ use axum::http::HeaderMap; use axum::Json; use bytes::Bytes; use core::fmt; +use dbpg::seriesbychannel::ChannelInfoQuery; use err::thiserror; use err::ThisError; use futures_util::StreamExt; @@ -16,6 +17,7 @@ use items_2::eventsdim1::EventsDim1NoPulse; use netpod::log::*; use netpod::ttl::RetentionTime; use netpod::ScalarType; +use netpod::SeriesKind; use netpod::Shape; use netpod::TsNano; use netpod::APP_CBOR_FRAMED; @@ -25,12 +27,14 @@ use scywr::iteminsertqueue::DataValue; use scywr::iteminsertqueue::QueryItem; use scywr::iteminsertqueue::ScalarValue; use serde::Deserialize; +use serieswriter::writer::EmittableType; use serieswriter::writer::SeriesWriter; use std::collections::HashMap; use std::collections::VecDeque; use std::io::Cursor; use std::sync::Arc; use std::time::Duration; +use std::time::Instant; use std::time::SystemTime; use streams::framed_bytes::FramedBytesStream; use taskrun::tokio::time::timeout; @@ -63,6 +67,29 @@ macro_rules! trace_queues { }; } +type ValueSeriesWriter = SeriesWriter; + +#[derive(Debug, Clone)] +struct WritableType(DataValue); + +impl EmittableType for WritableType { + fn ts(&self) -> TsNano { + todo!() + } + + fn has_change(&self, k: &Self) -> bool { + todo!() + } + + fn byte_size(&self) -> u32 { + todo!() + } + + fn into_data_value(self) -> DataValue { + todo!() + } +} + #[derive(Debug, ThisError)] #[cstm(name = "MetricsIngest")] pub enum Error { @@ -131,8 +158,18 @@ async fn post_v01_try( shape, rt ); - let mut writer = - SeriesWriter::establish(worker_tx, backend, channel, scalar_type.clone(), shape.clone(), stnow).await?; + let (tx, rx) = async_channel::bounded(8); + let qu = ChannelInfoQuery { + backend, + channel, + kind: SeriesKind::ChannelData, + scalar_type: scalar_type.clone(), + shape: shape.clone(), + tx: Box::pin(tx), + }; + rres.worker_tx.send(qu).await.unwrap(); + let chinfo = rx.recv().await.unwrap().unwrap(); + let mut writer = SeriesWriter::new(chinfo.series.to_series())?; debug_setup!("series writer established"); let mut iqdqs = InsertDeques::new(); let mut iqtx = rres.iqtx.clone(); @@ -262,7 +299,7 @@ async fn post_v01_try( fn evpush_dim0( frame: &Bytes, deque: &mut VecDeque, - writer: &mut SeriesWriter, + writer: &mut ValueSeriesWriter, f1: F1, ) -> Result<(), Error> where @@ -276,11 +313,12 @@ where .map_err(|_| Error::Decode)?; let evs: EventsDim0 = evs.into(); trace_input!("see events {:?}", evs); + let tsnow = Instant::now(); for (i, (&ts, val)) in evs.tss.iter().zip(evs.values.iter()).enumerate() { let val = val.clone(); trace_input!("ev {:6} {:20} {:20?}", i, ts, val); let val = f1(val); - writer.write(TsNano::from_ns(ts), TsNano::from_ns(ts), val, deque)?; + writer.write(WritableType(val), tsnow, deque)?; } Ok(()) } @@ -288,7 +326,7 @@ where fn evpush_dim1( frame: &Bytes, deque: &mut VecDeque, - writer: &mut SeriesWriter, + writer: &mut ValueSeriesWriter, f1: F1, ) -> Result<(), Error> where @@ -302,21 +340,22 @@ where .map_err(|_| Error::Decode)?; let evs: EventsDim1 = evs.into(); trace_input!("see events {:?}", evs); + let tsnow = Instant::now(); for (i, (&ts, val)) in evs.tss.iter().zip(evs.values.iter()).enumerate() { let val = val.clone(); trace_input!("ev {:6} {:20} {:20?}", i, ts, val); let val = f1(val); - writer.write(TsNano::from_ns(ts), TsNano::from_ns(ts), val, deque)?; + writer.write(WritableType(val), tsnow, deque)?; } Ok(()) } -fn tick_writers(writer: &mut SeriesWriter, deques: &mut InsertDeques, rt: RetentionTime) -> Result<(), Error> { +fn tick_writers(writer: &mut ValueSeriesWriter, deques: &mut InsertDeques, rt: RetentionTime) -> Result<(), Error> { writer.tick(deques.deque(rt))?; Ok(()) } -fn finish_writers(writer: &mut SeriesWriter, deques: &mut InsertDeques, rt: RetentionTime) -> Result<(), Error> { +fn finish_writers(writer: &mut ValueSeriesWriter, deques: &mut InsertDeques, rt: RetentionTime) -> Result<(), Error> { writer.tick(deques.deque(rt))?; Ok(()) } diff --git a/netfetch/src/metrics/postingest.rs b/netfetch/src/metrics/postingest.rs deleted file mode 100644 index 4cb6d59..0000000 --- a/netfetch/src/metrics/postingest.rs +++ /dev/null @@ -1,109 +0,0 @@ -use async_channel::Receiver; -use async_channel::Sender; -use dbpg::seriesbychannel::ChannelInfoQuery; -use err::thiserror; -use err::ThisError; -use mrucache::mucache::MuCache; -use netpod::ScalarType; -use netpod::Shape; -use netpod::TsNano; -use scywr::insertqueues::InsertDeques; -use scywr::insertqueues::InsertQueuesTx; -use scywr::iteminsertqueue::DataValue; -use scywr::iteminsertqueue::QueryItem; -use scywr::iteminsertqueue::ScalarValue; -use serieswriter::writer::SeriesWriter; -use std::collections::VecDeque; -use std::time::Duration; -use std::time::Instant; -use std::time::SystemTime; - -#[derive(Debug, ThisError)] -#[cstm(name = "HttpPostingest")] -pub enum Error { - Msg, - SeriesWriter(#[from] serieswriter::writer::Error), - SendError, -} - -impl From>> for Error { - fn from(value: async_channel::SendError>) -> Self { - Error::SendError - } -} - -#[derive(Debug)] -pub struct EventValueItem { - ts: TsNano, - channel: String, - val: DataValue, -} - -struct SeriesWriterIngredients { - writer: SeriesWriter, -} - -pub async fn process_api_query_items( - backend: String, - item_rx: Receiver, - info_worker_tx: Sender, - mut iqtx: InsertQueuesTx, -) -> Result<(), Error> { - // TODO so far arbitrary upper limit on the number of ad-hoc channels: - let mut mucache: MuCache = MuCache::new(2000); - let mut iqdqs = InsertDeques::new(); - let mut sw_tick_last = Instant::now(); - - #[allow(irrefutable_let_patterns)] - while let item = taskrun::tokio::time::timeout(Duration::from_millis(500), item_rx.recv()).await { - let deque = &mut iqdqs.st_rf3_rx; - let tsnow = Instant::now(); - if tsnow.saturating_duration_since(sw_tick_last) >= Duration::from_millis(5000) { - sw_tick_last = tsnow; - tick_writers(mucache.all_ref_mut(), deque)?; - } - let item = match item { - Ok(Ok(item)) => item, - Ok(Err(_)) => break, - Err(_) => { - continue; - } - }; - let scalar_type = item.val.scalar_type(); - let shape = item.val.shape(); - - // TODO cache the SeriesWriter. - // Evict only from cache if older than some threshold. - // If full, then reject the insert. - let stnow = SystemTime::now(); - let mut sw = SeriesWriter::establish( - info_worker_tx.clone(), - backend.clone(), - item.channel, - scalar_type, - shape, - stnow, - ) - .await?; - sw.write(item.ts, item.ts, item.val, deque)?; - iqtx.send_all(&mut iqdqs).await.map_err(|_| Error::SendError)?; - } - let deque = &mut iqdqs.st_rf3_rx; - finish_writers(mucache.all_ref_mut(), deque)?; - iqtx.send_all(&mut iqdqs).await.map_err(|_| Error::SendError)?; - Ok(()) -} - -fn tick_writers(sws: Vec<&mut SeriesWriter>, deque: &mut VecDeque) -> Result<(), Error> { - for sw in sws { - sw.tick(deque)?; - } - Ok(()) -} - -fn finish_writers(sws: Vec<&mut SeriesWriter>, deque: &mut VecDeque) -> Result<(), Error> { - for sw in sws { - sw.tick(deque)?; - } - Ok(()) -} diff --git a/scywr/src/insertqueues.rs b/scywr/src/insertqueues.rs index a5f20cd..1c42448 100644 --- a/scywr/src/insertqueues.rs +++ b/scywr/src/insertqueues.rs @@ -169,6 +169,20 @@ impl InsertDeques { RetentionTime::Long => &mut self.lt_rf3_rx, } } + + pub fn housekeeping(&mut self) { + let qus = [ + &mut self.st_rf1_rx, + &mut self.st_rf3_rx, + &mut self.mt_rf3_rx, + &mut self.lt_rf3_rx, + ]; + for qu in qus { + if qu.len() * 2 < qu.capacity() { + qu.truncate(qu.capacity() * 3 / 4); + } + } + } } pub struct InsertDequesSummary<'a> { diff --git a/scywr/src/insertworker.rs b/scywr/src/insertworker.rs index a6ba258..a53dd11 100644 --- a/scywr/src/insertworker.rs +++ b/scywr/src/insertworker.rs @@ -18,6 +18,7 @@ use futures_util::StreamExt; use log::*; use netpod::ttl::RetentionTime; use netpod::TsMs; +use netpod::TsNano; use smallvec::smallvec; use smallvec::SmallVec; use stats::InsertWorkerStats; @@ -25,6 +26,7 @@ use std::collections::VecDeque; use std::sync::atomic; use std::sync::Arc; use std::time::Duration; +use std::time::Instant; use std::time::SystemTime; use taskrun::tokio; use tokio::task::JoinHandle; @@ -269,7 +271,7 @@ where item_inp.map(move |batch| { stats.item_recv.inc(); trace!("transform_to_db_futures have batch len {}", batch.len()); - let tsnow = TsMs::from_system_time(SystemTime::now()); + let tsnow = Instant::now(); let mut res = Vec::with_capacity(32); for item in batch { let futs = match item { @@ -333,12 +335,13 @@ fn prepare_query_insert_futs( item: InsertItem, data_store: &Arc, stats: &Arc, - tsnow: TsMs, + tsnow: Instant, ) -> SmallVec<[InsertFut; 4]> { stats.inserts_value().inc(); let item_ts_net = item.ts_net; - let dt = tsnow.to_u64().saturating_sub(item_ts_net.to_u64()) as u32; - stats.item_lat_net_worker().ingest(dt); + let dt = tsnow.saturating_duration_since(item_ts_net); + let dt_ms = 1000 * dt.as_secs() as u32 + dt.subsec_millis(); + stats.item_lat_net_worker().ingest(dt_ms); let msp_bump = item.msp_bump; let series = item.series.clone(); let ts_msp = item.ts_msp; @@ -366,7 +369,7 @@ fn prepare_timebin_insert_futs( item: TimeBinSimpleF32, data_store: &Arc, stats: &Arc, - tsnow: TsMs, + tsnow: Instant, ) -> SmallVec<[InsertFut; 4]> { trace!("have time bin patch to insert: {item:?}"); let params = ( @@ -408,7 +411,7 @@ fn prepare_accounting_insert_futs( item: Accounting, data_store: &Arc, stats: &Arc, - tsnow: TsMs, + tsnow: Instant, ) -> SmallVec<[InsertFut; 4]> { let params = ( item.part, @@ -432,7 +435,7 @@ fn prepare_accounting_recv_insert_futs( item: AccountingRecv, data_store: &Arc, stats: &Arc, - tsnow: TsMs, + tsnow: Instant, ) -> SmallVec<[InsertFut; 4]> { let params = ( item.part, diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index f6691eb..888242e 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -33,6 +33,7 @@ use std::ptr::NonNull; use std::sync::Arc; use std::task::Context; use std::task::Poll; +use std::time::Instant; use std::time::SystemTime; #[derive(Debug, ThisError)] @@ -553,7 +554,7 @@ pub struct InsertItem { pub ts_lsp: DtNano, pub msp_bump: bool, pub val: DataValue, - pub ts_net: TsMs, + pub ts_net: Instant, pub ts_alt_1: TsNano, } @@ -563,7 +564,7 @@ impl InsertItem { "{} {} {} {}", self.series.id(), self.ts_msp.ms(), - self.ts_lsp.ms(), + self.ts_lsp.ms_u64(), self.val.string_short() ) } @@ -614,7 +615,7 @@ struct InsParCom { series: SeriesId, ts_msp: TsMs, ts_lsp: DtNano, - ts_net: TsMs, + ts_net: Instant, do_insert: bool, stats: Arc, } @@ -671,16 +672,16 @@ impl InsertFut { qu: Arc, params: V, // timestamp when we first encountered the data to-be inserted, for metrics - tsnet: TsMs, + tsnet: Instant, stats: Arc, ) -> Self { let scy_ref = unsafe { NonNull::from(scy.as_ref()).as_ref() }; let qu_ref = unsafe { NonNull::from(qu.as_ref()).as_ref() }; let fut = scy_ref.execute_paged(qu_ref, params, None); let fut = fut.map(move |x| { - let tsnow = TsMs::from_system_time(SystemTime::now()); - let dt = tsnow.to_u64().saturating_sub(tsnet.to_u64()) as u32; - stats.item_lat_net_store().ingest(dt); + let dt = tsnet.elapsed(); + let dt_ms = 1000 * dt.as_secs() as u32 + dt.subsec_millis(); + stats.item_lat_net_store().ingest(dt_ms); x }); let fut = taskrun::tokio::task::unconstrained(fut); @@ -711,7 +712,7 @@ pub fn insert_msp_fut( series: SeriesId, ts_msp: TsMs, // for stats, the timestamp when we received that data - tsnet: TsMs, + tsnet: Instant, scy: Arc, qu: Arc, stats: Arc, @@ -801,10 +802,9 @@ pub fn insert_connection_status_fut( data_store: &DataStore, stats: Arc, ) -> InsertFut { - let ts = TsMs::from_system_time(item.ts); - let (msp, lsp) = ts.to_grid_02(CONNECTION_STATUS_DIV); + let tsnow = TsNano::from_system_time(item.ts); + let (msp, lsp) = tsnow.to_ts_ms().to_grid_02(CONNECTION_STATUS_DIV); // TODO is that the good tsnet to use? - let tsnet = ts; let kind = item.status.to_kind(); let addr = format!("{}", item.addr); let params = (msp.to_i64(), lsp.to_i64(), kind as i32, addr); @@ -812,7 +812,7 @@ pub fn insert_connection_status_fut( data_store.scy.clone(), data_store.qu_insert_connection_status.clone(), params, - tsnet, + Instant::now(), stats, ) } @@ -832,9 +832,9 @@ pub fn insert_channel_status_fut( data_store: &DataStore, stats: Arc, ) -> SmallVec<[InsertFut; 4]> { - let ts = TsMs::from_system_time(item.ts); - let (msp, lsp) = ts.to_grid_02(CONNECTION_STATUS_DIV); - let tsnet = ts; + let tsnow = TsNano::from_system_time(item.ts); + let (msp, lsp) = tsnow.to_ts_ms().to_grid_02(CONNECTION_STATUS_DIV); + let tsnet = Instant::now(); let kind = item.status.to_kind(); let cssid = item.cssid.id(); let params = (cssid as i64, msp.to_i64(), lsp.to_i64(), kind as i32); diff --git a/serieswriter/src/changewriter.rs b/serieswriter/src/changewriter.rs new file mode 100644 index 0000000..e69de29 diff --git a/serieswriter/src/establish_worker.rs b/serieswriter/src/establish_worker.rs deleted file mode 100644 index b497769..0000000 --- a/serieswriter/src/establish_worker.rs +++ /dev/null @@ -1,193 +0,0 @@ -use crate::rtwriter::MinQuiets; -use crate::rtwriter::RtWriter; -use crate::writer::SeriesWriter; -use async_channel::Receiver; -use async_channel::Sender; -use dbpg::seriesbychannel::ChannelInfoQuery; -use err::thiserror; -use err::ThisError; -use futures_util::future; -use futures_util::StreamExt; -use netpod::log::*; -use netpod::timeunits::HOUR; -use netpod::timeunits::SEC; -use netpod::ScalarType; -use netpod::Shape; -use netpod::TsNano; -use scywr::insertqueues::InsertDeques; -use scywr::iteminsertqueue::DataValue; -use series::ChannelStatusSeriesId; -use series::SeriesId; -use stats::SeriesWriterEstablishStats; -use std::sync::atomic; -use std::sync::atomic::AtomicU64; -use std::sync::Arc; -use std::time::Duration; -use std::time::SystemTime; - -#[derive(Debug, ThisError)] -#[cstm(name = "SerieswriterEstablishWorker")] -pub enum Error { - Postgres(#[from] dbpg::err::Error), - PostgresSchema(#[from] dbpg::schema::Error), - ScyllaSession(#[from] scywr::session::Error), - ScyllaSchema(#[from] scywr::schema::Error), - SeriesWriter(#[from] crate::writer::Error), - SeriesByChannel(#[from] dbpg::seriesbychannel::Error), -} - -pub struct JobId(pub u64); - -pub struct EstablishWriterWorker { - worker_tx: Sender, - jobrx: Receiver, - stats: Arc, -} - -impl EstablishWriterWorker { - fn new( - worker_tx: Sender, - jobrx: Receiver, - stats: Arc, - ) -> Self { - Self { - worker_tx, - jobrx, - stats, - } - } - - async fn work(self) { - let cnt = Arc::new(AtomicU64::new(0)); - taskrun::spawn({ - let cnt = cnt.clone(); - async move { - if true { - return Ok::<_, Error>(()); - } - loop { - taskrun::tokio::time::sleep(Duration::from_millis(10000)).await; - debug!("EstablishWriterWorker cnt {}", cnt.load(atomic::Ordering::SeqCst)); - } - Ok::<_, Error>(()) - } - }); - self.jobrx - .map(move |item| { - let wtx = self.worker_tx.clone(); - let cnt = cnt.clone(); - let stats = self.stats.clone(); - async move { - let res = RtWriter::new( - wtx.clone(), - item.backend, - item.channel, - item.scalar_type, - item.shape, - item.min_quiets, - item.tsnow, - ) - .await; - cnt.fetch_add(1, atomic::Ordering::SeqCst); - if item.restx.send((item.job_id, res)).await.is_err() { - stats.result_send_fail().inc(); - trace!("can not send writer establish result"); - } - } - }) - .buffer_unordered(512) - .for_each(|_| future::ready(())) - .await; - } -} - -pub struct EstablishWorkerJob { - job_id: JobId, - backend: String, - channel: String, - cssid: ChannelStatusSeriesId, - scalar_type: ScalarType, - shape: Shape, - min_quiets: MinQuiets, - restx: Sender<(JobId, Result)>, - tsnow: SystemTime, -} - -impl EstablishWorkerJob { - pub fn new( - job_id: JobId, - backend: String, - channel: String, - cssid: ChannelStatusSeriesId, - scalar_type: ScalarType, - shape: Shape, - min_quiets: MinQuiets, - restx: Sender<(JobId, Result)>, - tsnow: SystemTime, - ) -> Self { - Self { - job_id, - backend, - channel, - cssid, - scalar_type, - shape, - min_quiets, - restx, - tsnow, - } - } -} - -pub fn start_writer_establish_worker( - worker_tx: Sender, - stats: Arc, -) -> Result<(Sender,), Error> { - let (tx, rx) = async_channel::bounded(256); - let worker = EstablishWriterWorker::new(worker_tx, rx, stats); - taskrun::spawn(worker.work()); - Ok((tx,)) -} - -#[test] -fn write_00() { - use netpod::Database; - use scywr::config::ScyllaIngestConfig; - use stats::SeriesByChannelStats; - use std::sync::Arc; - let fut = async { - let dbconf = &Database { - name: "daqbuffer".into(), - host: "localhost".into(), - port: 5432, - user: "daqbuffer".into(), - pass: "daqbuffer".into(), - }; - let scyconf = &ScyllaIngestConfig::new(["127.0.0.1:19042"], "daqingest_test_00_rf3", "daqingest_test_00_rf1"); - let (pgc, pg_jh) = dbpg::conn::make_pg_client(dbconf).await?; - dbpg::schema::schema_check(&pgc).await?; - scywr::schema::migrate_scylla_data_schema(scyconf, netpod::ttl::RetentionTime::Short).await?; - let scy = scywr::session::create_session(scyconf).await?; - let stats = SeriesByChannelStats::new(); - let stats = Arc::new(stats); - let (tx, jhs, jh) = - dbpg::seriesbychannel::start_lookup_workers::(1, dbconf, stats) - .await?; - let backend = "bck-test-00"; - let channel = "chn-test-00"; - let scalar_type = ScalarType::I16; - let shape = Shape::Scalar; - let tsnow = SystemTime::now(); - let mut writer = SeriesWriter::establish(tx, backend.into(), channel.into(), scalar_type, shape, tsnow).await?; - eprintln!("{writer:?}"); - let mut iqdqs = InsertDeques::new(); - for i in 0..10 { - let ts = TsNano::from_ns(HOUR * 24 + SEC * i); - let ts_local = ts.clone(); - let val = DataValue::Scalar(scywr::iteminsertqueue::ScalarValue::I16(i as _)); - writer.write(ts, ts_local, val, &mut iqdqs.st_rf3_rx)?; - } - Ok::<_, Error>(()) - }; - taskrun::run(fut).unwrap(); -} diff --git a/serieswriter/src/lib.rs b/serieswriter/src/lib.rs index 6cf4740..e4d5855 100644 --- a/serieswriter/src/lib.rs +++ b/serieswriter/src/lib.rs @@ -1,6 +1,7 @@ pub mod binwriter; -pub mod establish_worker; +pub mod changewriter; pub mod patchcollect; +pub mod ratelimitwriter; pub mod rtwriter; pub mod timebin; pub mod writer; diff --git a/serieswriter/src/ratelimitwriter.rs b/serieswriter/src/ratelimitwriter.rs new file mode 100644 index 0000000..082da6b --- /dev/null +++ b/serieswriter/src/ratelimitwriter.rs @@ -0,0 +1,117 @@ +use crate::writer::EmittableType; +use crate::writer::SeriesWriter; +use core::fmt; +use err::thiserror; +use err::ThisError; +use netpod::log::*; +use netpod::DtNano; +use netpod::TsNano; +use scywr::iteminsertqueue::QueryItem; +use series::SeriesId; +use std::collections::VecDeque; +use std::marker::PhantomData; +use std::time::Duration; +use std::time::Instant; + +#[allow(unused)] +macro_rules! trace_rt_decision { + ($($arg:tt)*) => { + if true { + trace!($($arg)*); + } + }; +} + +#[derive(Debug, ThisError)] +#[cstm(name = "RateLimitWriter")] +pub enum Error { + SeriesWriter(#[from] crate::writer::Error), +} + +pub struct RateLimitWriter { + series: SeriesId, + min_quiet: Duration, + last_insert_ts: TsNano, + last_insert_val: Option, + dbgname: String, + writer: SeriesWriter, + _t1: PhantomData, +} + +impl RateLimitWriter +where + ET: EmittableType, +{ + pub fn new(series: SeriesId, min_quiet: Duration, dbgname: String) -> Result { + let writer = SeriesWriter::new(series)?; + let ret = Self { + series, + min_quiet, + last_insert_ts: TsNano::from_ns(0), + last_insert_val: None, + dbgname, + writer, + _t1: PhantomData, + }; + Ok(ret) + } + + pub fn write(&mut self, item: ET, ts_net: Instant, deque: &mut VecDeque) -> Result<(bool,), Error> { + // Decide whether we want to write. + // TODO catch already in CaConn the cases when the IOC-timestamp did not change. + let tsl = self.last_insert_ts.clone(); + let dbgname = &self.dbgname; + let sid = &self.series; + let do_write = { + let ts = item.ts(); + if ts == tsl { + trace_rt_decision!("{dbgname} {sid} ignore, because same time {ts:?} {tsl:?}"); + false + } else if ts < tsl { + trace_rt_decision!("{dbgname} {sid} ignore, because ts_local rewind {ts:?} {tsl:?}"); + false + } else if ts.ms() < tsl.ms() + 1000 * self.min_quiet.as_secs() { + trace_rt_decision!("{dbgname} {sid} ignore, because not min quiet {ts:?} {tsl:?}"); + false + } else if ts < tsl.add_dt_nano(DtNano::from_ms(5)) { + trace_rt_decision!("{dbgname} {sid} ignore, because store rate cap"); + false + } else if self + .last_insert_val + .as_ref() + .map(|k| !item.has_change(k)) + .unwrap_or(false) + { + trace_rt_decision!("{dbgname} {sid} ignore, because value did not change"); + false + } else { + trace_rt_decision!("{dbgname} {sid} accept"); + true + } + }; + if do_write { + self.last_insert_ts = item.ts(); + self.last_insert_val = Some(item.clone()); + self.writer.write(item, ts_net, deque)?; + } + Ok((do_write,)) + } + + pub fn tick(&mut self, iqdqs: &mut VecDeque) -> Result<(), Error> { + let ret = self.writer.tick(iqdqs)?; + Ok(ret) + } +} + +impl fmt::Debug for RateLimitWriter +where + ET: fmt::Debug, +{ + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("RateLimitWriter") + .field("min_quiet", &self.min_quiet) + .field("last_insert_ts", &self.last_insert_ts) + .field("last_insert_val", &self.last_insert_val) + .finish() + } +} diff --git a/serieswriter/src/rtwriter.rs b/serieswriter/src/rtwriter.rs index 8a5d18f..8d6d520 100644 --- a/serieswriter/src/rtwriter.rs +++ b/serieswriter/src/rtwriter.rs @@ -1,10 +1,10 @@ -use crate::writer::SeriesWriter; +use crate::ratelimitwriter::RateLimitWriter; +use crate::writer::EmittableType; use async_channel::Sender; use dbpg::seriesbychannel::ChannelInfoQuery; use err::thiserror; use err::ThisError; use netpod::log::*; -use netpod::DtNano; use netpod::ScalarType; use netpod::SeriesKind; use netpod::Shape; @@ -15,10 +15,11 @@ use scywr::iteminsertqueue::QueryItem; use series::SeriesId; use std::collections::VecDeque; use std::time::Duration; +use std::time::Instant; use std::time::SystemTime; #[allow(unused)] -macro_rules! trace_rt_decision { +macro_rules! trace_ { ($($arg:tt)*) => { if false { trace!($($arg)*); @@ -31,6 +32,7 @@ macro_rules! trace_rt_decision { pub enum Error { SeriesLookupError, SeriesWriter(#[from] crate::writer::Error), + RateLimitWriter(#[from] crate::ratelimitwriter::Error), } #[derive(Debug, Clone)] @@ -41,69 +43,26 @@ pub struct MinQuiets { } #[derive(Debug)] -pub struct RtWriter { - sid: SeriesId, +struct State { + writer: RateLimitWriter, +} + +#[derive(Debug)] +pub struct RtWriter { + series: SeriesId, scalar_type: ScalarType, shape: Shape, - state_st: State, - state_mt: State, - state_lt: State, + state_st: State, + state_mt: State, + state_lt: State, min_quiets: MinQuiets, } -impl RtWriter { - pub async fn new( - channel_info_tx: Sender, - backend: String, - channel: String, - scalar_type: ScalarType, - shape: Shape, - min_quiets: MinQuiets, - stnow: SystemTime, - ) -> Result { - let sid = { - let (tx, rx) = async_channel::bounded(1); - let item = ChannelInfoQuery { - backend, - channel, - kind: SeriesKind::ChannelData, - scalar_type: scalar_type.clone(), - shape: shape.clone(), - tx: Box::pin(tx), - }; - channel_info_tx.send(item).await.map_err(|_| Error::SeriesLookupError)?; - let res = rx - .recv() - .await - .map_err(|_| Error::SeriesLookupError)? - .map_err(|_| Error::SeriesLookupError)?; - res.series.to_series() - }; - let state_st = { - let writer = SeriesWriter::establish_with_sid(sid, stnow)?; - State { writer, last_ins: None } - }; - let state_mt = { - let writer = SeriesWriter::establish_with_sid(sid, stnow)?; - State { writer, last_ins: None } - }; - let state_lt = { - let writer = SeriesWriter::establish_with_sid(sid, stnow)?; - State { writer, last_ins: None } - }; - let ret = Self { - sid, - scalar_type, - shape, - state_st, - state_mt, - state_lt, - min_quiets, - }; - Ok(ret) - } - - pub fn new_with_series_id( +impl RtWriter +where + ET: EmittableType, +{ + pub fn new( series: SeriesId, scalar_type: ScalarType, shape: Shape, @@ -111,19 +70,20 @@ impl RtWriter { stnow: SystemTime, ) -> Result { let state_st = { - let writer = SeriesWriter::establish_with_sid(series, stnow)?; - State { writer, last_ins: None } + // let writer = SeriesWriter::establish_with_sid(sid, stnow)?; + let writer = RateLimitWriter::new(series, min_quiets.st, "st".into())?; + State { writer } }; let state_mt = { - let writer = SeriesWriter::establish_with_sid(series, stnow)?; - State { writer, last_ins: None } + let writer = RateLimitWriter::new(series, min_quiets.mt, "mt".into())?; + State { writer } }; let state_lt = { - let writer = SeriesWriter::establish_with_sid(series, stnow)?; - State { writer, last_ins: None } + let writer = RateLimitWriter::new(series, min_quiets.lt, "lt".into())?; + State { writer } }; let ret = Self { - sid: series, + series, scalar_type, shape, state_st, @@ -134,8 +94,8 @@ impl RtWriter { Ok(ret) } - pub fn sid(&self) -> SeriesId { - self.sid.clone() + pub fn series(&self) -> SeriesId { + self.series.clone() } pub fn scalar_type(&self) -> ScalarType { @@ -152,90 +112,27 @@ impl RtWriter { pub fn write( &mut self, - ts_ioc: TsNano, - ts_local: TsNano, - val: DataValue, + item: ET, + ts_net: Instant, iqdqs: &mut InsertDeques, ) -> Result<((bool, bool, bool),), Error> { - let sid = self.sid; - let (did_write_st,) = Self::write_inner( - "ST", - self.min_quiets.st, - &mut self.state_st, - &mut iqdqs.st_rf3_rx, - ts_ioc, - ts_local, - val.clone(), - sid, - )?; - let (did_write_mt,) = Self::write_inner( - "MT", - self.min_quiets.mt, - &mut self.state_mt, - &mut iqdqs.mt_rf3_rx, - ts_ioc, - ts_local, - val.clone(), - sid, - )?; - let (did_write_lt,) = Self::write_inner( - "LT", - self.min_quiets.lt, - &mut self.state_lt, - &mut iqdqs.lt_rf3_rx, - ts_ioc, - ts_local, - val.clone(), - sid, - )?; + trace!("write {:?}", item.ts()); + // TODO + // Optimize for the common case that we only write into one of the stores. + // Make the decision first, based on ref, then clone only as required. + let (did_write_st,) = Self::write_inner(&mut self.state_st, item.clone(), ts_net, &mut iqdqs.st_rf3_rx)?; + let (did_write_mt,) = Self::write_inner(&mut self.state_mt, item.clone(), ts_net, &mut iqdqs.mt_rf3_rx)?; + let (did_write_lt,) = Self::write_inner(&mut self.state_lt, item, ts_net, &mut iqdqs.lt_rf3_rx)?; Ok(((did_write_st, did_write_mt, did_write_lt),)) } fn write_inner( - rt: &str, - min_quiet: Duration, - state: &mut State, + state: &mut State, + item: ET, + ts_net: Instant, deque: &mut VecDeque, - ts_ioc: TsNano, - ts_local: TsNano, - val: DataValue, - sid: SeriesId, ) -> Result<(bool,), Error> { - // Decide whether we want to write. - // Use the IOC time for the decision whether to write. - // But use the ingest local time as the primary index. - let do_write = if let Some(last) = &state.last_ins { - if ts_ioc == last.ts_ioc { - trace_rt_decision!("{rt} {sid} ignore, because same IOC time {ts_ioc:?} {ts_local:?}"); - false - } else if ts_local < last.ts_local { - trace_rt_decision!("{rt} {sid} ignore, because ts_local rewind {ts_ioc:?} {ts_local:?}"); - false - } else if ts_local.ms() - last.ts_local.ms() < 1000 * min_quiet.as_secs() { - trace_rt_decision!("{rt} {sid} ignore, because not min quiet"); - false - } else if ts_local.delta(last.ts_local) < DtNano::from_ms(5) { - trace_rt_decision!("{rt} {sid} ignore, because store rate cap"); - false - } else if val == last.val { - trace_rt_decision!("{rt} {sid} ignore, because value did not change"); - false - } else { - trace_rt_decision!("{rt} {sid} accept"); - true - } - } else { - true - }; - if do_write { - state.last_ins = Some(LastIns { - ts_local, - ts_ioc, - val: val.clone(), - }); - state.writer.write(ts_ioc, ts_local, val.clone(), deque)?; - } - Ok((do_write,)) + Ok(state.writer.write(item, ts_net, deque)?) } pub fn tick(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> { @@ -252,9 +149,3 @@ struct LastIns { ts_ioc: TsNano, val: DataValue, } - -#[derive(Debug)] -struct State { - writer: SeriesWriter, - last_ins: Option, -} diff --git a/serieswriter/src/writer.rs b/serieswriter/src/writer.rs index 0ccde1a..46a6228 100644 --- a/serieswriter/src/writer.rs +++ b/serieswriter/src/writer.rs @@ -2,11 +2,13 @@ use async_channel::Sender; use dbpg::seriesbychannel::ChannelInfoQuery; use err::thiserror; use err::ThisError; +use log::*; use netpod::timeunits::HOUR; use netpod::timeunits::SEC; use netpod::ScalarType; use netpod::SeriesKind; use netpod::Shape; +use netpod::TsMs; use netpod::TsNano; use scywr::insertqueues::InsertDeques; use scywr::iteminsertqueue::DataValue; @@ -15,8 +17,17 @@ use scywr::iteminsertqueue::QueryItem; use series::ChannelStatusSeriesId; use series::SeriesId; use std::collections::VecDeque; +use std::marker::PhantomData; +use std::time::Instant; use std::time::SystemTime; +pub trait EmittableType: Clone { + fn ts(&self) -> TsNano; + fn has_change(&self, k: &Self) -> bool; + fn byte_size(&self) -> u32; + fn into_data_value(self) -> DataValue; +} + #[derive(Debug, ThisError)] #[cstm(name = "SerieswriterWriter")] pub enum Error { @@ -44,7 +55,7 @@ impl From for Error { } #[derive(Debug)] -pub struct SeriesWriter { +pub struct SeriesWriter { sid: SeriesId, ts_msp_last: Option, inserted_in_current_msp: u32, @@ -53,56 +64,14 @@ pub struct SeriesWriter { msp_max_bytes: u32, // TODO this should be in an Option: ts_msp_grid_last: u32, + _t1: PhantomData, } -impl SeriesWriter { - pub async fn establish( - worker_tx: Sender, - backend: String, - channel: String, - scalar_type: ScalarType, - shape: Shape, - stnow: SystemTime, - ) -> Result { - let (tx, rx) = async_channel::bounded(1); - let item = ChannelInfoQuery { - backend: backend.clone(), - channel: channel.clone(), - kind: SeriesKind::ChannelStatus, - scalar_type: ScalarType::ChannelStatus, - shape: Shape::Scalar, - tx: Box::pin(tx), - }; - worker_tx.send(item).await?; - let res = rx.recv().await?.map_err(|_| Error::SeriesLookupError)?; - let _cssid = ChannelStatusSeriesId::new(res.series.to_series().id()); - Self::establish_with(worker_tx, backend, channel, scalar_type, shape, stnow).await - } - - pub async fn establish_with( - channel_info_tx: Sender, - backend: String, - channel: String, - scalar_type: ScalarType, - shape: Shape, - stnow: SystemTime, - ) -> Result { - let (tx, rx) = async_channel::bounded(1); - let item = ChannelInfoQuery { - backend, - channel, - kind: SeriesKind::ChannelData, - scalar_type: scalar_type.clone(), - shape: shape.clone(), - tx: Box::pin(tx), - }; - channel_info_tx.send(item).await?; - let res = rx.recv().await?.map_err(|_| Error::SeriesLookupError)?; - let sid = res.series.to_series(); - Self::establish_with_sid(sid, stnow) - } - - pub fn establish_with_sid(sid: SeriesId, stnow: SystemTime) -> Result { +impl SeriesWriter +where + ET: EmittableType, +{ + pub fn new(sid: SeriesId) -> Result { let res = Self { sid, ts_msp_last: None, @@ -111,6 +80,7 @@ impl SeriesWriter { msp_max_entries: 64000, msp_max_bytes: 1024 * 1024 * 20, ts_msp_grid_last: 0, + _t1: PhantomData, }; Ok(res) } @@ -119,14 +89,8 @@ impl SeriesWriter { self.sid.clone() } - pub fn write( - &mut self, - ts_ioc: TsNano, - ts_local: TsNano, - val: DataValue, - deque: &mut VecDeque, - ) -> Result<(), Error> { - let ts_main = ts_local; + pub fn write(&mut self, item: ET, ts_net: Instant, deque: &mut VecDeque) -> Result<(), Error> { + let ts_main = item.ts(); // TODO decide on better msp/lsp: random offset! // As long as one writer is active, the msp is arbitrary. @@ -146,12 +110,12 @@ impl SeriesWriter { } else { self.ts_msp_last = Some(ts_msp); self.inserted_in_current_msp = 1; - self.bytes_in_current_msp = val.byte_size(); + self.bytes_in_current_msp = item.byte_size(); (ts_msp, true) } } else { self.inserted_in_current_msp += 1; - self.bytes_in_current_msp += val.byte_size(); + self.bytes_in_current_msp += item.byte_size(); (ts_msp_last, false) } } @@ -159,7 +123,7 @@ impl SeriesWriter { let ts_msp = ts_main.div(msp_res_max).mul(msp_res_max); self.ts_msp_last = Some(ts_msp); self.inserted_in_current_msp = 1; - self.bytes_in_current_msp = val.byte_size(); + self.bytes_in_current_msp = item.byte_size(); (ts_msp, true) } }; @@ -168,12 +132,13 @@ impl SeriesWriter { series: self.sid.clone(), ts_msp: ts_msp.to_ts_ms(), ts_lsp, - ts_net: ts_local.to_ts_ms(), - ts_alt_1: ts_ioc, + ts_net, + ts_alt_1: ts_main, msp_bump: ts_msp_changed, - val, + val: item.into_data_value(), }; // TODO decide on the path in the new deques struct + trace!("emit value for ts {:?}", ts_main); deque.push_back(QueryItem::Insert(item)); Ok(()) }