From a5caec0591aa357f841b39c1a7b7901e39bcf890 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 26 Jul 2024 11:54:42 +0200 Subject: [PATCH] WIP refactor channel status write --- .cargo/cargo-lock | 48 ++++----- daqingest/src/bin/daqingest.rs | 2 +- netfetch/src/ca/conn.rs | 147 ++++++++++++++++++---------- netfetch/src/ca/connset.rs | 54 ++++++---- netfetch/src/ca/statemap.rs | 31 +++++- netfetch/src/metrics/ingest.rs | 2 - scywr/src/insertqueues.rs | 63 ++++++------ scywr/src/insertworker.rs | 24 +---- scywr/src/iteminsertqueue.rs | 133 ++----------------------- scywr/src/store.rs | 34 ------- serieswriter/src/fixgridwriter.rs | 97 ++++++++++++++++++ serieswriter/src/lib.rs | 1 + serieswriter/src/msptool.rs | 2 + serieswriter/src/msptool/fixgrid.rs | 29 ++++++ serieswriter/src/rtwriter.rs | 12 +-- serieswriter/src/timebin.rs | 2 +- 16 files changed, 360 insertions(+), 321 deletions(-) create mode 100644 serieswriter/src/fixgridwriter.rs create mode 100644 serieswriter/src/msptool/fixgrid.rs diff --git a/.cargo/cargo-lock b/.cargo/cargo-lock index a648199..e108632 100644 --- a/.cargo/cargo-lock +++ b/.cargo/cargo-lock @@ -61,9 +61,9 @@ dependencies = [ [[package]] name = "anstream" -version = "0.6.14" +version = "0.6.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "418c75fa768af9c03be99d17643f93f79bbba589895012a80e3452a19ddda15b" +checksum = "64e15c1ab1f89faffbf04a634d5e1962e9074f2741eef6d97f3c4e322426d526" dependencies = [ "anstyle", "anstyle-parse", @@ -76,33 +76,33 @@ dependencies = [ [[package]] name = "anstyle" -version = "1.0.7" +version = "1.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "038dfcf04a5feb68e9c60b21c9625a54c2c0616e79b72b0fd87075a056ae1d1b" +checksum = "1bec1de6f59aedf83baf9ff929c98f2ad654b97c9510f4e70cf6f661d49fd5b1" [[package]] name = "anstyle-parse" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c03a11a9034d92058ceb6ee011ce58af4a9bf61491aa7e1e59ecd24bd40d22d4" +checksum = "eb47de1e80c2b463c735db5b217a0ddc39d612e7ac9e2e96a5aed1f57616c1cb" dependencies = [ "utf8parse", ] [[package]] name = "anstyle-query" -version = "1.1.0" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad186efb764318d35165f1758e7dcef3b10628e26d41a44bc5550652e6804391" +checksum = "6d36fc52c7f6c869915e99412912f22093507da8d9e942ceaf66fe4b7c14422a" dependencies = [ "windows-sys", ] [[package]] name = "anstyle-wincon" -version = "3.0.3" +version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61a38449feb7068f52bb06c12759005cf459ee52bb4adc1d5a7c4322d716fb19" +checksum = "5bf74e1b6e971609db8ca7a9ce79fd5768ab6ae46441c572e46cf596f59e57f8" dependencies = [ "anstyle", "windows-sys", @@ -379,9 +379,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.10" +version = "4.5.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f6b81fb3c84f5563d509c59b5a48d935f689e993afa90fe39047f05adef9142" +checksum = "35723e6a11662c2afb578bcf0b88bf6ea8e21282a953428f240574fcc3a2b5b3" dependencies = [ "clap_builder", "clap_derive", @@ -389,9 +389,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.10" +version = "4.5.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ca6706fd5224857d9ac5eb9355f6683563cc0541c7cd9d014043b57cbec78ac" +checksum = "49eb96cbfa7cfa35017b7cd548c75b14c3118c98b423041d70562665e07fb0fa" dependencies = [ "anstream", "anstyle", @@ -401,9 +401,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.8" +version = "4.5.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2bac35c6dafb060fd4d275d9a4ffae97917c13a6327903a8be2153cd964f7085" +checksum = "5d029b67f89d30bbb547c89fd5161293c0aec155fc691d7924b64550662db93e" dependencies = [ "heck", "proc-macro2", @@ -413,9 +413,9 @@ dependencies = [ [[package]] name = "clap_lex" -version = "0.7.1" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b82cf0babdbd58558212896d1a4272303a57bdb245c2bf1147185fb45640e70" +checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97" [[package]] name = "cobs" @@ -425,9 +425,9 @@ checksum = "67ba02a97a2bd10f4b59b25c7973101c79642302776489e030cd13cdab09ed15" [[package]] name = "colorchoice" -version = "1.0.1" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b6a852b24ab71dffc585bcb46eaf7959d175cb865a7152e35b348d1b2960422" +checksum = "d3fd119d74b830634cea2a0f58bbd0d54540518a14397557951e79340abc28c0" [[package]] name = "concurrent-queue" @@ -1124,9 +1124,9 @@ checksum = "f958d3d68f4167080a18141e10381e7634563984a537f2a49a30fd8e53ac5767" [[package]] name = "is_terminal_polyfill" -version = "1.70.0" +version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8478577c03552c21db0e2724ffb8986a5ce7af88107e6be5d2ee6e158c12800" +checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" [[package]] name = "items_0" @@ -2580,9 +2580,9 @@ checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" [[package]] name = "version_check" -version = "0.9.4" +version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" [[package]] name = "want" diff --git a/daqingest/src/bin/daqingest.rs b/daqingest/src/bin/daqingest.rs index a22f5e3..441fb60 100644 --- a/daqingest/src/bin/daqingest.rs +++ b/daqingest/src/bin/daqingest.rs @@ -30,7 +30,7 @@ async fn main_run(opts: DaqIngestOpts) -> Result<(), Error> { } async fn main_run_inner(opts: DaqIngestOpts) -> Result<(), Error> { - let buildmark = "+0006"; + let buildmark = "+0007"; use daqingest::opts::ChannelAccess; use daqingest::opts::SubCmd; match opts.subcmd { diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 6c1ceaf..33cbdc9 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -56,6 +56,9 @@ use serde::Serialize; use series::ChannelStatusSeriesId; use series::SeriesId; use serieswriter::binwriter::BinWriter; +use serieswriter::fixgridwriter::ChannelStatusSeriesWriter; +use serieswriter::fixgridwriter::ChannelStatusWriteState; +use serieswriter::fixgridwriter::CHANNEL_STATUS_GRID; use serieswriter::msptool::MspSplit; use serieswriter::rtwriter::RtWriter; use serieswriter::writer::EmittableType; @@ -156,6 +159,7 @@ pub enum Error { Protocol(#[from] crate::ca::proto::Error), RtWriter(#[from] serieswriter::rtwriter::Error), BinWriter(#[from] serieswriter::binwriter::Error), + SeriesWriter(#[from] serieswriter::writer::Error), // TODO remove false positive from ThisError derive #[allow(private_interfaces)] UnknownCid(Cid), @@ -538,9 +542,24 @@ struct ClosingState { struct ChannelConf { conf: ChannelConfig, state: ChannelState, + wrst: WriterStatus, } impl ChannelConf { + fn new(conf: ChannelConfig, cssid: ChannelStatusSeriesId) -> Self { + Self { + conf, + state: ChannelState::Init(cssid), + wrst: WriterStatus { + writer_status: serieswriter::writer::SeriesWriter::new(SeriesId::new(cssid.id())).unwrap(), + writer_status_state: serieswriter::fixgridwriter::ChannelStatusWriteState::new( + SeriesId::new(cssid.id()), + serieswriter::fixgridwriter::CHANNEL_STATUS_GRID, + ), + }, + } + } + pub fn poll_conf(&self) -> Option<(u64,)> { self.conf.poll_conf() } @@ -665,6 +684,29 @@ impl ChannelState { } } +#[derive(Debug)] +struct WriterStatus { + writer_status: ChannelStatusSeriesWriter, + writer_status_state: ChannelStatusWriteState, +} + +impl WriterStatus { + fn emit_channel_status_item( + &mut self, + item: ChannelStatusItem, + deque: &mut VecDeque, + ) -> Result<(), Error> { + let (ts, val) = item.to_ts_val(); + self.writer_status.write( + serieswriter::fixgridwriter::ChannelStatusWriteValue::new(ts, val), + &mut self.writer_status_state, + Instant::now(), + deque, + )?; + Ok(()) + } +} + enum CaConnState { Unconnected(Instant), Connecting( @@ -1128,17 +1170,24 @@ impl CaConn { self.channel_state_on_shutdown(channel_reason); let addr = self.remote_addr_dbg.clone(); // TODO handle Err: - let _ = self - .iqdqs - .emit_status_item(QueryItem::ConnectionStatus(ConnectionStatusItem { - ts: self.tmp_ts_poll, - addr, - // TODO map to appropriate status - status: ConnectionStatus::Closing, - })); + let item = ConnectionStatusItem { + ts: self.tmp_ts_poll, + addr, + // TODO map to appropriate status + status: ConnectionStatus::Closing, + }; + if self.emit_connection_status_item(item).is_err() { + self.stats.logic_error().inc(); + } self.proto = None; } + fn emit_connection_status_item(&mut self, _item: ConnectionStatusItem) -> Result<(), Error> { + // todo!() + // TODO emit + Ok(()) + } + fn cmd_channel_close(&mut self, name: String) { self.channel_close(name); // TODO return the result @@ -1279,8 +1328,7 @@ impl CaConn { if let Some(conf) = self.channels.get_mut(&cid) { // TODO refactor, should only execute this when required: let conf_poll_conf = conf.poll_conf(); - let chst = &mut conf.state; - if let ChannelState::MakingSeriesWriter(st2) = chst { + if let ChannelState::MakingSeriesWriter(st2) = &mut conf.state { let dt = stnow.duration_since(SystemTime::UNIX_EPOCH).unwrap(); let beg = TsNano::from_ns(SEC * dt.as_secs() + dt.subsec_nanos() as u64); let binwriter = BinWriter::new( @@ -1293,12 +1341,13 @@ impl CaConn { )?; self.stats.get_series_id_ok.inc(); { - let item = QueryItem::ChannelStatus(ChannelStatusItem { + info!("queued Opened {:?}", st2.channel.cssid); + let item = ChannelStatusItem { ts: self.tmp_ts_poll, cssid: st2.channel.cssid.clone(), status: ChannelStatus::Opened, - }); - self.iqdqs.emit_status_item(item)?; + }; + conf.wrst.emit_channel_status_item(item, &mut self.iqdqs.st_rf3_qu)?; } if let Some((ivl,)) = conf_poll_conf { let created_state = WritableState { @@ -1380,10 +1429,7 @@ impl CaConn { error!("logic error channel already exists {conf:?}"); Ok(()) } else { - let conf = ChannelConf { - conf, - state: ChannelState::Init(cssid), - }; + let conf = ChannelConf::new(conf, cssid); self.channels.insert(cid, conf); // TODO do not count, use separate queue for those channels. self.init_state_count += 1; @@ -1472,12 +1518,15 @@ impl CaConn { let cssid = st2.channel.cssid.clone(); // TODO should call the proper channel-close handler which in turn emits the status item. // Make sure I record the reason for the "Close": user command, IOC error, etc.. - let item = QueryItem::ChannelStatus(ChannelStatusItem { + let item = ChannelStatusItem { ts: self.tmp_ts_poll, cssid: cssid.clone(), status: ChannelStatus::Closed(channel_reason.clone()), - }); - self.iqdqs.emit_status_item(item); + }; + let deque = &mut self.iqdqs.st_rf3_qu; + if conf.wrst.emit_channel_status_item(item, deque).is_err() { + self.stats.logic_error().inc(); + } *chst = ChannelState::Ended(cssid); } ChannelState::Error(..) => { @@ -2617,12 +2666,11 @@ impl CaConn { Ok(Ok(tcp)) => { self.stats.tcp_connected.inc(); let addr = addr.clone(); - self.iqdqs - .emit_status_item(QueryItem::ConnectionStatus(ConnectionStatusItem { - ts: self.tmp_ts_poll, - addr, - status: ConnectionStatus::Established, - }))?; + self.emit_connection_status_item(ConnectionStatusItem { + ts: self.tmp_ts_poll, + addr, + status: ConnectionStatus::Established, + })?; self.backoff_reset(); let proto = CaProto::new( tcp, @@ -2638,12 +2686,11 @@ impl CaConn { use std::io::ErrorKind; debug!("error connect to {addr} {e}"); let addr = addr.clone(); - self.iqdqs - .emit_status_item(QueryItem::ConnectionStatus(ConnectionStatusItem { - ts: self.tmp_ts_poll, - addr, - status: ConnectionStatus::ConnectError, - }))?; + self.emit_connection_status_item(ConnectionStatusItem { + ts: self.tmp_ts_poll, + addr, + status: ConnectionStatus::ConnectError, + })?; let reason = match e.kind() { ErrorKind::ConnectionRefused => ShutdownReason::ConnectRefused, _ => ShutdownReason::IoError, @@ -2655,12 +2702,11 @@ impl CaConn { // TODO log with exponential backoff debug!("timeout connect to {addr} {e}"); let addr = addr.clone(); - self.iqdqs - .emit_status_item(QueryItem::ConnectionStatus(ConnectionStatusItem { - ts: self.tmp_ts_poll, - addr, - status: ConnectionStatus::ConnectTimeout, - }))?; + self.emit_connection_status_item(ConnectionStatusItem { + ts: self.tmp_ts_poll, + addr, + status: ConnectionStatus::ConnectTimeout, + })?; self.trigger_shutdown(ShutdownReason::ConnectTimeout); Ok(Ready(Some(()))) } @@ -2876,8 +2922,8 @@ impl CaConn { } fn emit_channel_event_pong(&mut self) { - for (cid, ch) in self.channels.iter() { - match &ch.state { + for (_, ch) in self.channels.iter_mut() { + match &mut ch.state { ChannelState::Init(_) => {} ChannelState::Creating(_) => {} ChannelState::FetchEnumDetails(_) => {} @@ -2889,8 +2935,10 @@ impl CaConn { cssid: st1.channel.cssid, status: ChannelStatus::Pong, }; - let item = QueryItem::ChannelStatus(item); - self.iqdqs.st_rf3_rx.push_back(item); + let deque = &mut self.iqdqs.st_rf3_qu; + if ch.wrst.emit_channel_status_item(item, deque).is_err() { + self.stats.logic_error().inc(); + } } ChannelState::Closing(_) => {} ChannelState::Error(_) => {} @@ -2909,10 +2957,6 @@ impl CaConn { Ok(()) } - fn check_ticker_connecting_timeout(&mut self, since: Instant) -> Result<(), Error> { - Ok(()) - } - fn queues_out_flushed(&self) -> bool { debug!( "async out flushed iiq {} {} caout {}", @@ -3024,7 +3068,6 @@ macro_rules! flush_queue_dqs { // let sp = std::pin::pin!(obj.iqsp.$sp); // let sp = &mut obj.iqsp.$sp; // let sp = std::pin::pin!(sp); - // let sp = todo!(); let sp = obj.iqsp.as_mut().$sp(); match Self::attempt_flush_queue(qu, sp, $batcher, $loop_max, $cx, $id, $stats) { Ok(Ready(Some(()))) => { @@ -3108,7 +3151,7 @@ impl Stream for CaConn { }; flush_queue_dqs!( self, - st_rf1_rx, + st_rf1_qu, st_rf1_sp_pin, send_batched::<256, _>, 32, @@ -3124,7 +3167,7 @@ impl Stream for CaConn { }; flush_queue_dqs!( self, - st_rf3_rx, + st_rf3_qu, st_rf3_sp_pin, send_batched::<256, _>, 32, @@ -3140,7 +3183,7 @@ impl Stream for CaConn { }; flush_queue_dqs!( self, - mt_rf3_rx, + mt_rf3_qu, mt_rf3_sp_pin, send_batched::<256, _>, 32, @@ -3156,7 +3199,7 @@ impl Stream for CaConn { }; flush_queue_dqs!( self, - lt_rf3_rx, + lt_rf3_qu, lt_rf3_sp_pin, send_batched::<256, _>, 32, @@ -3489,7 +3532,7 @@ impl EmittableType for CaWriterValue { ts_net, ))); } - let data_value = DataValue::Scalar(ScalarValue::CaStatus(meta.status as i16)); + let data_value = DataValue::Scalar(ScalarValue::I16(meta.status as i16)); let item = scywriiq::InsertItem { series: state.series_status.clone(), ts_msp: ts_msp.to_ts_ms(), diff --git a/netfetch/src/ca/connset.rs b/netfetch/src/ca/connset.rs index 04e2b8d..5ebae65 100644 --- a/netfetch/src/ca/connset.rs +++ b/netfetch/src/ca/connset.rs @@ -63,6 +63,7 @@ use std::pin::Pin; use netpod::OnDrop; use scywr::insertqueues::InsertQueuesTx; +use series::SeriesId; use std::sync::Arc; use std::task::Context; use std::task::Poll; @@ -173,7 +174,7 @@ pub struct ChannelStatusRequest { pub tx: Sender, } -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Serialize)] pub struct ChannelStatusResponse { pub channels_ca_conn: BTreeMap, pub channels_ca_conn_set: BTreeMap, @@ -191,7 +192,7 @@ pub struct ChannelStatusesRequest { pub tx: Sender, } -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Serialize)] pub struct ChannelStatusesResponse { pub channels_ca_conn_set: BTreeMap, } @@ -238,10 +239,6 @@ pub struct CaConnSetCtrl { } impl CaConnSetCtrl { - pub fn new() -> Self { - todo!() - } - pub fn sender(&self) -> Sender { self.tx.clone() } @@ -561,7 +558,7 @@ impl CaConnSet { backend: cmd.backend, channel: channel_name, kind: SeriesKind::ChannelStatus, - scalar_type: ScalarType::ChannelStatus, + scalar_type: ScalarType::U64, shape: Shape::Scalar, tx: Box::pin(SeriesLookupSender { tx }), }; @@ -636,12 +633,20 @@ impl CaConnSet { self.cssid_latency_max = dt + Duration::from_millis(2000); debug!("slow cssid fetch dt {:.0} ms {:?}", 1e3 * dt.as_secs_f32(), cmd); } + let writer_status = serieswriter::writer::SeriesWriter::new(SeriesId::new(cmd.cssid.id())) + .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; + let writer_status_state = serieswriter::fixgridwriter::ChannelStatusWriteState::new( + SeriesId::new(cmd.cssid.id()), + serieswriter::fixgridwriter::CHANNEL_STATUS_GRID, + ); *chst2 = ActiveChannelState::WithStatusSeriesId(WithStatusSeriesIdState { cssid: cmd.cssid, addr_find_backoff: 0, inner: WithStatusSeriesIdStateInner::AddrSearchPending { since: SystemTime::now(), }, + writer_status: Some(writer_status), + writer_status_state: Some(writer_status_state), }); let qu = IocAddrQuery::cached(name.into()); self.find_ioc_query_queue.push_back(qu); @@ -682,6 +687,12 @@ impl CaConnSet { trace!("handle_add_channel_with_addr INNER {cmd:?}"); self.stats.handle_add_channel_with_addr().inc(); let tsnow = SystemTime::now(); + let writer_status = serieswriter::writer::SeriesWriter::new(SeriesId::new(cmd.cssid.id())) + .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; + let writer_status_state = serieswriter::fixgridwriter::ChannelStatusWriteState::new( + SeriesId::new(cmd.cssid.id()), + serieswriter::fixgridwriter::CHANNEL_STATUS_GRID, + ); *st3 = WithStatusSeriesIdState { cssid: cmd.cssid.clone(), addr_find_backoff: 0, @@ -693,6 +704,8 @@ impl CaConnSet { value: ConnectionStateValue::Unknown, }), }, + writer_status: Some(writer_status), + writer_status_state: Some(writer_status_state), }; let addr = cmd.addr; if self.ca_conn_ress.contains_key(&addr) { @@ -1131,14 +1144,6 @@ impl CaConnSet { } } - fn push_channel_status(&mut self, item: ChannelStatusItem) -> Result<(), Error> { - let item = QueryItem::ChannelStatus(item); - let mut v = VecDeque::new(); - v.push_back(item); - self.storage_insert_queue.push_back(v); - Ok(()) - } - #[allow(unused)] async fn __enqueue_command_to_all(&self, cmdgen: F) -> Result, Error> where @@ -1272,7 +1277,6 @@ impl CaConnSet { let (mut search_pending_count, mut assigned_without_health_update) = self.update_channel_state_counts(); let mut cmd_remove_channel = Vec::new(); let mut cmd_add_channel = Vec::new(); - let mut channel_status_items = Vec::new(); let k = self.chan_check_next.take(); let it = if let Some(last) = k { trace!("check_chans start at {:?}", last); @@ -1280,6 +1284,7 @@ impl CaConnSet { } else { self.channel_states.range_mut(..) }; + let mut item_deque = VecDeque::new(); for (i, (ch, st)) in it.enumerate() { match &mut st.value { ChannelStateValue::Active(st2) => match st2 { @@ -1368,7 +1373,18 @@ impl CaConnSet { MaybeWrongAddressState::new(stnow, st3.addr_find_backoff), ); let item = ChannelStatusItem::new_closed_conn_timeout(stnow, st3.cssid.clone()); - channel_status_items.push(item); + let (ts, val) = item.to_ts_val(); + let deque = &mut item_deque; + st3.writer_status + .as_mut() + .unwrap() + .write( + serieswriter::fixgridwriter::ChannelStatusWriteValue::new(ts, val), + st3.writer_status_state.as_mut().unwrap(), + tsnow, + deque, + ) + .map_err(|e| Error::with_msg_no_trace(e.to_string()))?; } } } @@ -1404,9 +1420,7 @@ impl CaConnSet { break; } } - for item in channel_status_items { - self.push_channel_status(item)?; - } + self.storage_insert_queue.push_back(item_deque); for (addr, ch) in cmd_remove_channel { if let Some(g) = self.ca_conn_ress.get_mut(&addr) { let cmd = ConnCommand::channel_close(ch.name().into()); diff --git a/netfetch/src/ca/statemap.rs b/netfetch/src/ca/statemap.rs index b60f880..877086d 100644 --- a/netfetch/src/ca/statemap.rs +++ b/netfetch/src/ca/statemap.rs @@ -4,6 +4,8 @@ use crate::daemon_common::Channel; use dashmap::DashMap; use serde::Serialize; use series::ChannelStatusSeriesId; +use serieswriter::fixgridwriter::ChannelStatusSeriesWriter; +use serieswriter::fixgridwriter::ChannelStatusWriteState; use std::collections::btree_map::RangeMut; use std::collections::BTreeMap; use std::collections::HashMap; @@ -105,11 +107,36 @@ impl MaybeWrongAddressState { } } -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Serialize)] pub struct WithStatusSeriesIdState { pub cssid: ChannelStatusSeriesId, pub addr_find_backoff: u32, pub inner: WithStatusSeriesIdStateInner, + #[serde(serialize_with = "serde_ser_channel_status_writer")] + pub writer_status: Option, + #[serde(skip)] + pub writer_status_state: Option, +} + +// Need Clone because we use the state tree for metrics output +// TODO use a new info struct +impl Clone for WithStatusSeriesIdState { + fn clone(&self) -> Self { + Self { + cssid: self.cssid.clone(), + addr_find_backoff: self.addr_find_backoff.clone(), + inner: self.inner.clone(), + writer_status: None, + writer_status_state: None, + } + } +} + +fn serde_ser_channel_status_writer(_: &Option, ser: S) -> Result +where + S: serde::Serializer, +{ + ser.serialize_none() } #[derive(Debug, Clone, Serialize)] @@ -137,7 +164,7 @@ pub struct ChannelState { pub config: ChannelConfig, } -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Serialize)] pub struct ChannelStateMap { map: BTreeMap, #[serde(skip)] diff --git a/netfetch/src/metrics/ingest.rs b/netfetch/src/metrics/ingest.rs index e644310..4c4c988 100644 --- a/netfetch/src/metrics/ingest.rs +++ b/netfetch/src/metrics/ingest.rs @@ -245,7 +245,6 @@ async fn post_v01_try( })?; } ScalarType::Enum => return Err(Error::NotSupported), - ScalarType::ChannelStatus => return Err(Error::NotSupported), }, Shape::Wave(_) => match &scalar_type { ScalarType::U8 => { @@ -281,7 +280,6 @@ async fn post_v01_try( ScalarType::BOOL => return Err(Error::NotSupported), ScalarType::STRING => return Err(Error::NotSupported), ScalarType::Enum => return Err(Error::NotSupported), - ScalarType::ChannelStatus => return Err(Error::NotSupported), }, Shape::Image(_, _) => return Err(Error::NotSupported), } diff --git a/scywr/src/insertqueues.rs b/scywr/src/insertqueues.rs index 1c42448..b21265c 100644 --- a/scywr/src/insertqueues.rs +++ b/scywr/src/insertqueues.rs @@ -34,28 +34,28 @@ impl InsertQueuesTx { pub async fn send_all(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> { // Send each buffer down the corresponding channel if false { - let item = core::mem::replace(&mut iqdqs.st_rf1_rx, VecDeque::new()); + let item = core::mem::replace(&mut iqdqs.st_rf1_qu, VecDeque::new()); self.st_rf1_tx .send(item) .await .map_err(|_| Error::ChannelSend(RetentionTime::Short, 1))?; } { - let item = core::mem::replace(&mut iqdqs.st_rf3_rx, VecDeque::new()); + let item = core::mem::replace(&mut iqdqs.st_rf3_qu, VecDeque::new()); self.st_rf3_tx .send(item) .await .map_err(|_| Error::ChannelSend(RetentionTime::Short, 3))?; } { - let item = core::mem::replace(&mut iqdqs.mt_rf3_rx, VecDeque::new()); + let item = core::mem::replace(&mut iqdqs.mt_rf3_qu, VecDeque::new()); self.mt_rf3_tx .send(item) .await .map_err(|_| Error::ChannelSend(RetentionTime::Medium, 3))?; } { - let item = core::mem::replace(&mut iqdqs.lt_rf3_rx, VecDeque::new()); + let item = core::mem::replace(&mut iqdqs.lt_rf3_qu, VecDeque::new()); self.lt_rf3_tx .send(item) .await @@ -108,45 +108,38 @@ pub struct InsertQueuesRx { } pub struct InsertDeques { - pub st_rf1_rx: VecDeque, - pub st_rf3_rx: VecDeque, - pub mt_rf3_rx: VecDeque, - pub lt_rf3_rx: VecDeque, + pub st_rf1_qu: VecDeque, + pub st_rf3_qu: VecDeque, + pub mt_rf3_qu: VecDeque, + pub lt_rf3_qu: VecDeque, } impl InsertDeques { pub fn new() -> Self { Self { - st_rf1_rx: VecDeque::new(), - st_rf3_rx: VecDeque::new(), - mt_rf3_rx: VecDeque::new(), - lt_rf3_rx: VecDeque::new(), + st_rf1_qu: VecDeque::new(), + st_rf3_qu: VecDeque::new(), + mt_rf3_qu: VecDeque::new(), + lt_rf3_qu: VecDeque::new(), } } /// Total number of items cumulated over all queues. pub fn len(&self) -> usize { - self.st_rf1_rx.len() + self.st_rf3_rx.len() + self.mt_rf3_rx.len() + self.lt_rf3_rx.len() + self.st_rf1_qu.len() + self.st_rf3_qu.len() + self.mt_rf3_qu.len() + self.lt_rf3_qu.len() } pub fn clear(&mut self) { - self.st_rf1_rx.clear(); - self.st_rf3_rx.clear(); - self.mt_rf3_rx.clear(); - self.lt_rf3_rx.clear(); + self.st_rf1_qu.clear(); + self.st_rf3_qu.clear(); + self.mt_rf3_qu.clear(); + self.lt_rf3_qu.clear(); } pub fn summary(&self) -> InsertDequesSummary { InsertDequesSummary { obj: self } } - // Should be used only for connection and channel status items. - // It encapsulates the decision to which queue(s) we want to send these kind of items. - pub fn emit_status_item(&mut self, item: QueryItem) -> Result<(), Error> { - self.deque(RetentionTime::Long).push_back(item); - Ok(()) - } - // Should be used only for connection and channel status items. // It encapsulates the decision to which queue(s) we want to send these kind of items. pub fn emit_accounting_item(&mut self, rt: RetentionTime, item: Accounting) -> Result<(), Error> { @@ -164,18 +157,18 @@ impl InsertDeques { pub fn deque(&mut self, rt: RetentionTime) -> &mut VecDeque { match rt { - RetentionTime::Short => &mut self.st_rf3_rx, - RetentionTime::Medium => &mut self.mt_rf3_rx, - RetentionTime::Long => &mut self.lt_rf3_rx, + RetentionTime::Short => &mut self.st_rf3_qu, + RetentionTime::Medium => &mut self.mt_rf3_qu, + RetentionTime::Long => &mut self.lt_rf3_qu, } } pub fn housekeeping(&mut self) { let qus = [ - &mut self.st_rf1_rx, - &mut self.st_rf3_rx, - &mut self.mt_rf3_rx, - &mut self.lt_rf3_rx, + &mut self.st_rf1_qu, + &mut self.st_rf3_qu, + &mut self.mt_rf3_qu, + &mut self.lt_rf3_qu, ]; for qu in qus { if qu.len() * 2 < qu.capacity() { @@ -195,10 +188,10 @@ impl<'a> fmt::Display for InsertDequesSummary<'a> { write!( fmt, "InsertDeques {{ st_rf1_len: {}, st_rf3_len: {}, mt_rf3_len: {}, lt_rf3_len: {} }}", - obj.st_rf1_rx.len(), - obj.st_rf3_rx.len(), - obj.mt_rf3_rx.len(), - obj.lt_rf3_rx.len() + obj.st_rf1_qu.len(), + obj.st_rf3_qu.len(), + obj.mt_rf3_qu.len(), + obj.lt_rf3_qu.len() ) } } diff --git a/scywr/src/insertworker.rs b/scywr/src/insertworker.rs index 97ee78a..cd7f7da 100644 --- a/scywr/src/insertworker.rs +++ b/scywr/src/insertworker.rs @@ -1,6 +1,4 @@ use crate::config::ScyllaIngestConfig; -use crate::iteminsertqueue::insert_channel_status_fut; -use crate::iteminsertqueue::insert_connection_status_fut; use crate::iteminsertqueue::insert_item_fut; use crate::iteminsertqueue::insert_msp_fut; use crate::iteminsertqueue::Accounting; @@ -275,15 +273,6 @@ where let futs = match item { QueryItem::Insert(item) => prepare_query_insert_futs(item, &data_store, &stats, tsnow), QueryItem::Msp(item) => prepare_msp_insert_futs(item, &data_store, &stats, tsnow), - QueryItem::ConnectionStatus(item) => { - stats.inserted_connection_status().inc(); - let fut = insert_connection_status_fut(item, &data_store, stats.clone()); - smallvec![fut] - } - QueryItem::ChannelStatus(item) => { - stats.inserted_channel_status().inc(); - insert_channel_status_fut(item, &data_store, stats.clone()) - } QueryItem::TimeBinSimpleF32(item) => prepare_timebin_insert_futs(item, &data_store, &stats, tsnow), QueryItem::Accounting(item) => prepare_accounting_insert_futs(item, &data_store, &stats, tsnow), QueryItem::AccountingRecv(item) => { @@ -307,18 +296,12 @@ fn inspect_items( item_inp.inspect(move |batch| { for item in batch { match &item { - QueryItem::ConnectionStatus(_) => { - trace_item_execute!("execute {worker_name} ConnectionStatus {item:?}"); - } - QueryItem::ChannelStatus(_) => { - trace_item_execute!("execute {worker_name} ChannelStatus {item:?}"); + QueryItem::Insert(item) => { + trace_item_execute!("execute {worker_name} Insert {}", item.string_short()); } QueryItem::Msp(item) => { trace_item_execute!("execute {worker_name} Msp {}", item.string_short()); } - QueryItem::Insert(item) => { - trace_item_execute!("execute {worker_name} Insert {}", item.string_short()); - } QueryItem::TimeBinSimpleF32(_) => { trace_item_execute!("execute {worker_name} TimeBinSimpleF32"); } @@ -370,9 +353,8 @@ fn prepare_query_insert_futs( let dt_ms = 1000 * dt.as_secs() as u32 + dt.subsec_millis(); stats.item_lat_net_worker().ingest(dt_ms); let do_insert = true; - let mut futs = smallvec![]; let fut = insert_item_fut(item, &data_store, do_insert, stats); - futs.push(fut); + let futs = smallvec![fut]; futs } diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index f37e361..99e3be4 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -7,8 +7,6 @@ use err::thiserror; use err::ThisError; use futures_util::Future; use futures_util::FutureExt; -#[allow(unused)] -use netpod::log::*; use netpod::DtNano; use netpod::Shape; use netpod::TsMs; @@ -23,8 +21,6 @@ use scylla::transport::errors::QueryError; use scylla::QueryResult; use series::ChannelStatusSeriesId; use series::SeriesId; -use smallvec::smallvec; -use smallvec::SmallVec; use stats::InsertWorkerStats; use std::net::SocketAddrV4; use std::pin::Pin; @@ -62,7 +58,6 @@ pub enum ScalarValue { Enum(i16, String), String(String), Bool(bool), - CaStatus(i16), } impl ScalarValue { @@ -81,7 +76,6 @@ impl ScalarValue { ScalarValue::Enum(_, y) => 2 + y.len() as u32, ScalarValue::String(x) => x.len() as u32, ScalarValue::Bool(_) => 1, - ScalarValue::CaStatus(_) => 2, } } @@ -100,7 +94,6 @@ impl ScalarValue { ScalarValue::Enum(x, y) => format!("({}, {})", x, y), ScalarValue::String(x) => x.to_string(), ScalarValue::Bool(x) => x.to_string(), - ScalarValue::CaStatus(x) => x.to_string(), } } } @@ -488,6 +481,10 @@ impl ChannelStatus { }; Ok(ret) } + + pub fn to_u64(&self) -> u64 { + self.to_kind() as u64 + } } #[derive(Debug, Clone)] @@ -517,6 +514,11 @@ impl ChannelStatusItem { status: ChannelStatus::Closed(ChannelStatusClosedReason::IocTimeout), } } + + pub fn to_ts_val(&self) -> (netpod::TsNano, u64) { + let dt = TsNano::from_system_time(self.ts); + (dt, self.status.to_u64()) + } } #[derive(Debug, Clone)] @@ -584,10 +586,8 @@ pub struct TimeBinSimpleF32 { // Needs to be Clone to send it to multiple retention times if required. #[derive(Debug, Clone)] pub enum QueryItem { - ConnectionStatus(ConnectionStatusItem), - ChannelStatus(ChannelStatusItem), - Msp(MspItem), Insert(InsertItem), + Msp(MspItem), TimeBinSimpleF32(TimeBinSimpleF32), Accounting(Accounting), AccountingRecv(AccountingRecv), @@ -757,7 +757,6 @@ pub fn insert_item_fut( } String(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_string.clone(), scy), Bool(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_bool.clone(), scy), - CaStatus(val) => insert_scalar_gen_fut(par, val, data_store.qu_insert_scalar_castatus.clone(), scy), } } Array(val) => { @@ -789,118 +788,6 @@ pub fn insert_item_fut( } } -#[cfg(DISABLED)] -pub fn insert_connection_status_fut( - item: ConnectionStatusItem, - data_store: &DataStore, - stats: Arc, -) -> InsertFut { - warn!("separate connection status table no longer used"); - InsertFut::dummy(data_store.scy.clone(), data_store.qu_dummy.clone()) -} - -pub fn insert_connection_status_fut( - item: ConnectionStatusItem, - data_store: &DataStore, - stats: Arc, -) -> InsertFut { - let tsnow = TsNano::from_system_time(item.ts); - let (msp, lsp) = tsnow.to_ts_ms().to_grid_02(CONNECTION_STATUS_DIV); - // TODO is that the good tsnet to use? - let kind = item.status.to_kind(); - let addr = format!("{}", item.addr); - let params = (msp.to_i64(), lsp.to_i64(), kind as i32, addr); - InsertFut::new( - data_store.scy.clone(), - data_store.qu_insert_connection_status.clone(), - params, - Instant::now(), - stats, - ) -} - -#[cfg(DISABLED)] -pub fn insert_channel_status_fut( - item: ChannelStatusItem, - data_store: &DataStore, - stats: Arc, -) -> SmallVec<[InsertFut; 4]> { - warn!("separate channel status table no longer used"); - SmallVec::new() -} - -pub fn insert_channel_status_fut( - item: ChannelStatusItem, - data_store: &DataStore, - stats: Arc, -) -> SmallVec<[InsertFut; 4]> { - let tsnow = TsNano::from_system_time(item.ts); - let (msp, lsp) = tsnow.to_ts_ms().to_grid_02(CONNECTION_STATUS_DIV); - let tsnet = Instant::now(); - let kind = item.status.to_kind(); - let cssid = item.cssid.id(); - let params = (cssid as i64, msp.to_i64(), lsp.to_i64(), kind as i32); - let fut1 = InsertFut::new( - data_store.scy.clone(), - data_store.qu_insert_channel_status.clone(), - params, - tsnet, - stats.clone(), - ); - let params = (msp.to_i64(), lsp.to_i64(), cssid as i64, kind as i32); - let fut2 = InsertFut::new( - data_store.scy.clone(), - data_store.qu_insert_channel_status_by_ts_msp.clone(), - params, - tsnet, - stats, - ); - smallvec![fut1, fut2] -} - -#[cfg(DISABLED)] -pub async fn insert_connection_status(item: ConnectionStatusItem, data_store: &DataStore) -> Result<(), Error> { - warn!("separate connection status table no longer used"); - Ok(()) -} - -pub async fn insert_connection_status(item: ConnectionStatusItem, data_store: &DataStore) -> Result<(), Error> { - let ts = TsMs::from_system_time(item.ts); - let (msp, lsp) = ts.to_grid_02(CONNECTION_STATUS_DIV); - let kind = item.status.to_kind(); - let addr = format!("{}", item.addr); - let params = (msp.to_i64(), lsp.to_i64(), kind as i32, addr); - data_store - .scy - .execute(&data_store.qu_insert_connection_status, params) - .await?; - Ok(()) -} - -#[cfg(DISABLED)] -pub async fn insert_channel_status(item: ChannelStatusItem, data_store: &DataStore) -> Result<(), Error> { - warn!("separate channel status table no longer used"); - Ok(()) -} - -pub async fn insert_channel_status(item: ChannelStatusItem, data_store: &DataStore) -> Result<(), Error> { - let ts = TsMs::from_system_time(item.ts); - let (msp, lsp) = ts.to_grid_02(CONNECTION_STATUS_DIV); - let kind = item.status.to_kind(); - let cssid = item.cssid.id(); - let params = (cssid as i64, msp.to_i64(), lsp.to_i64(), kind as i32); - data_store - .scy - .execute(&data_store.qu_insert_channel_status, params) - .await?; - let params = (msp.to_i64(), lsp.to_i64(), cssid as i64, kind as i32); - data_store - .scy - .execute(&data_store.qu_insert_channel_status_by_ts_msp, params) - .await?; - Ok(()) -} - pub enum InsertFutKind { Value, } diff --git a/scywr/src/store.rs b/scywr/src/store.rs index d70a40f..c4f761f 100644 --- a/scywr/src/store.rs +++ b/scywr/src/store.rs @@ -34,7 +34,6 @@ pub struct DataStore { pub qu_insert_scalar_bool: Arc, pub qu_insert_scalar_string: Arc, pub qu_insert_scalar_enum: Arc, - pub qu_insert_scalar_castatus: Arc, pub qu_insert_array_u8: Arc, pub qu_insert_array_u16: Arc, pub qu_insert_array_u32: Arc, @@ -46,9 +45,6 @@ pub struct DataStore { pub qu_insert_array_f32: Arc, pub qu_insert_array_f64: Arc, pub qu_insert_array_bool: Arc, - pub qu_insert_connection_status: Arc, - pub qu_insert_channel_status: Arc, - pub qu_insert_channel_status_by_ts_msp: Arc, pub qu_insert_binned_scalar_f32_v02: Arc, pub qu_account_00: Arc, pub qu_account_recv_00: Arc, @@ -140,7 +136,6 @@ impl DataStore { let qu_insert_scalar_bool = prep_qu_ins_a!("events_scalar_bool", rett, scy); let qu_insert_scalar_string = prep_qu_ins_a!("events_scalar_string", rett, scy); let qu_insert_scalar_enum = prep_qu_ins_enum!("events_scalar_enum", rett, scy); - let qu_insert_scalar_castatus = prep_qu_ins_a!("events_scalar_castatus", rett, scy); let qu_insert_array_u8 = prep_qu_ins_b!("events_array_u8", rett, scy); let qu_insert_array_u16 = prep_qu_ins_b!("events_array_u16", rett, scy); @@ -154,31 +149,6 @@ impl DataStore { let qu_insert_array_f64 = prep_qu_ins_b!("events_array_f64", rett, scy); let qu_insert_array_bool = prep_qu_ins_b!("events_array_bool", rett, scy); - // Connection status: - let qu_insert_connection_status = prep_qu_ins_c!( - "connection_status", - "ts_msp, ts_lsp, kind, addr", - "?, ?, ?, ?", - rett, - scy - ); - - let qu_insert_channel_status = prep_qu_ins_c!( - "channel_status", - "series, ts_msp, ts_lsp, kind", - "?, ?, ?, ?", - rett, - scy - ); - - let qu_insert_channel_status_by_ts_msp = prep_qu_ins_c!( - "channel_status_by_ts_msp", - "ts_msp, ts_lsp, series, kind", - "?, ?, ?, ?", - rett, - scy - ); - let qu_insert_binned_scalar_f32_v02 = prep_qu_ins_c!( "binned_scalar_f32", "series, bin_len_ms, ts_msp, off, count, min, max, avg", @@ -229,7 +199,6 @@ impl DataStore { qu_insert_scalar_bool, qu_insert_scalar_string, qu_insert_scalar_enum, - qu_insert_scalar_castatus, qu_insert_array_u8, qu_insert_array_u16, qu_insert_array_u32, @@ -241,9 +210,6 @@ impl DataStore { qu_insert_array_f32, qu_insert_array_f64, qu_insert_array_bool, - qu_insert_connection_status, - qu_insert_channel_status, - qu_insert_channel_status_by_ts_msp, qu_insert_binned_scalar_f32_v02, qu_account_00, qu_account_recv_00, diff --git a/serieswriter/src/fixgridwriter.rs b/serieswriter/src/fixgridwriter.rs new file mode 100644 index 0000000..b5e39b9 --- /dev/null +++ b/serieswriter/src/fixgridwriter.rs @@ -0,0 +1,97 @@ +use crate as serieswriter; +use crate::msptool::fixgrid::MspSplitFixGrid; +use crate::writer::EmittableType; +use crate::writer::SeriesWriter; +use netpod::DtMs; +use netpod::TsNano; +use scywr::iteminsertqueue::DataValue; +use scywr::iteminsertqueue::MspItem; +use scywr::iteminsertqueue::QueryItem; +use scywr::iteminsertqueue::ScalarValue; +use series::SeriesId; +use std::time::Instant; + +pub const CHANNEL_STATUS_GRID: DtMs = DtMs::from_ms_u64(1000 * 60 * 60); + +pub type ChannelStatusSeriesWriter = SeriesWriter; + +#[derive(Debug, Clone)] +pub struct ChannelStatusWriteValue(TsNano, u64); + +impl ChannelStatusWriteValue { + pub fn new(ts: TsNano, val: u64) -> Self { + Self(ts, val) + } +} + +impl EmittableType for ChannelStatusWriteValue { + type State = ChannelStatusWriteState; + + fn ts(&self) -> TsNano { + self.0 + } + + fn has_change(&self, _k: &Self) -> bool { + // for channel status the actual event is information, e.g. periodic ca echo. + true + } + + fn byte_size(&self) -> u32 { + 8 + } + + fn into_query_item( + self, + ts_net: Instant, + state: &mut ::State, + ) -> serieswriter::writer::EmitRes { + let mut items = serieswriter::writer::SmallVec::new(); + let ts = self.ts(); + state.last_accepted_ts = ts; + state.last_accepted_val = Some(self.1); + let byte_size = self.byte_size(); + { + let (ts_msp, ts_lsp, ts_msp_chg) = state.msp_split.split(ts, self.byte_size()); + if ts_msp_chg { + items.push(QueryItem::Msp(MspItem::new( + state.series.clone(), + ts_msp.to_ts_ms(), + ts_net, + ))); + } + let item = scywr::iteminsertqueue::InsertItem { + series: state.series.clone(), + ts_msp: ts_msp.to_ts_ms(), + ts_lsp, + ts_net, + val: DataValue::Scalar(ScalarValue::U64(self.1)), + }; + items.push(QueryItem::Insert(item)); + } + let ret = serieswriter::writer::EmitRes { + items, + bytes: byte_size, + status: 0, + }; + ret + } +} + +#[derive(Debug)] +pub struct ChannelStatusWriteState { + series: SeriesId, + msp_split: MspSplitFixGrid, + last_accepted_ts: TsNano, + last_accepted_val: Option, +} + +impl ChannelStatusWriteState { + pub fn new(series: SeriesId, grid_dt: DtMs) -> Self { + Self { + series, + msp_split: MspSplitFixGrid::new(grid_dt), + last_accepted_ts: TsNano::from_ns(0), + last_accepted_val: None, + } + } +} diff --git a/serieswriter/src/lib.rs b/serieswriter/src/lib.rs index f696bfa..9c48b41 100644 --- a/serieswriter/src/lib.rs +++ b/serieswriter/src/lib.rs @@ -1,5 +1,6 @@ pub mod binwriter; pub mod changewriter; +pub mod fixgridwriter; pub mod msptool; pub mod patchcollect; pub mod ratelimitwriter; diff --git a/serieswriter/src/msptool.rs b/serieswriter/src/msptool.rs index e839694..159068b 100644 --- a/serieswriter/src/msptool.rs +++ b/serieswriter/src/msptool.rs @@ -1,3 +1,5 @@ +pub mod fixgrid; + use netpod::DtNano; use netpod::TsNano; diff --git a/serieswriter/src/msptool/fixgrid.rs b/serieswriter/src/msptool/fixgrid.rs new file mode 100644 index 0000000..cc0d6b4 --- /dev/null +++ b/serieswriter/src/msptool/fixgrid.rs @@ -0,0 +1,29 @@ +use netpod::DtMs; +use netpod::DtNano; +use netpod::TsMs; +use netpod::TsNano; + +#[derive(Debug)] +pub struct MspSplitFixGrid { + grid_dt: DtMs, + last: Option, +} + +impl MspSplitFixGrid { + pub fn new(grid_dt: DtMs) -> Self { + Self { grid_dt, last: None } + } + + pub fn split(&mut self, ts: TsNano, _item_bytes: u32) -> (TsNano, DtNano, bool) { + let (msp, _) = ts.to_ts_ms().to_grid_02(self.grid_dt); + let changed = if self.last != Some(msp) { + self.last = Some(msp); + true + } else { + false + }; + let msp = msp.ns(); + let lsp = ts.delta(msp); + (msp, lsp, changed) + } +} diff --git a/serieswriter/src/rtwriter.rs b/serieswriter/src/rtwriter.rs index 35488c7..7a713d8 100644 --- a/serieswriter/src/rtwriter.rs +++ b/serieswriter/src/rtwriter.rs @@ -135,9 +135,9 @@ where // TODO // Optimize for the common case that we only write into one of the stores. // Make the decision first, based on ref, then clone only as required. - let res_st = Self::write_inner(&mut self.state_st, item.clone(), ts_net, &mut iqdqs.st_rf3_rx)?; - let res_mt = Self::write_inner(&mut self.state_mt, item.clone(), ts_net, &mut iqdqs.mt_rf3_rx)?; - let res_lt = Self::write_inner(&mut self.state_lt, item, ts_net, &mut iqdqs.lt_rf3_rx)?; + let res_st = Self::write_inner(&mut self.state_st, item.clone(), ts_net, &mut iqdqs.st_rf3_qu)?; + let res_mt = Self::write_inner(&mut self.state_mt, item.clone(), ts_net, &mut iqdqs.mt_rf3_qu)?; + let res_lt = Self::write_inner(&mut self.state_lt, item, ts_net, &mut iqdqs.lt_rf3_qu)?; let ret = WriteRes { st: WriteRtRes { accept: res_st.accept, @@ -168,9 +168,9 @@ where } pub fn tick(&mut self, iqdqs: &mut InsertDeques) -> Result<(), Error> { - self.state_st.writer.tick(&mut iqdqs.st_rf3_rx)?; - self.state_mt.writer.tick(&mut iqdqs.mt_rf3_rx)?; - self.state_lt.writer.tick(&mut iqdqs.lt_rf3_rx)?; + self.state_st.writer.tick(&mut iqdqs.st_rf3_qu)?; + self.state_mt.writer.tick(&mut iqdqs.mt_rf3_qu)?; + self.state_lt.writer.tick(&mut iqdqs.lt_rf3_qu)?; Ok(()) } } diff --git a/serieswriter/src/timebin.rs b/serieswriter/src/timebin.rs index 87388b7..8397cba 100644 --- a/serieswriter/src/timebin.rs +++ b/serieswriter/src/timebin.rs @@ -509,7 +509,7 @@ fn test_02() { ctb.tick(&mut iqdqs).unwrap(); ctb.finish(&mut iqdqs).unwrap(); assert_eq!(iqdqs.len(), 1); - for e in iqdqs.st_rf3_rx { + for e in iqdqs.st_rf3_qu { eprintln!("{e:?}"); if let QueryItem::TimeBinSimpleF32(x) = e { assert!(f32_close(x.avg, 10.2));