From 750eb7d8c69d307d44f12bed47443cf5dd52eb98 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 11 Jul 2024 15:56:43 +0200 Subject: [PATCH] Refactor writer chain --- daqingest/src/daemon/inserthook.rs | 41 +++++------------------ daqingest/src/tools.rs | 28 ++-------------- netfetch/src/ca/conn.rs | 13 +++++++- scywr/src/iteminsertqueue.rs | 3 -- scywr/src/schema.rs | 48 +++++++++++++++++++++++---- serieswriter/src/establish_worker.rs | 1 - serieswriter/src/rtwriter.rs | 49 +++++++++++++++++++++------- serieswriter/src/writer.rs | 34 +++---------------- 8 files changed, 107 insertions(+), 110 deletions(-) diff --git a/daqingest/src/daemon/inserthook.rs b/daqingest/src/daemon/inserthook.rs index 51bd10f..297c44a 100644 --- a/daqingest/src/daemon/inserthook.rs +++ b/daqingest/src/daemon/inserthook.rs @@ -2,8 +2,6 @@ use crate::daemon::PRINT_ACTIVE_INTERVAL; use async_channel::Receiver; use async_channel::Sender; use log::*; -use netpod::ScalarType; -use netpod::Shape; use scywr::iteminsertqueue::QueryItem; use std::collections::BTreeMap; use std::time::Instant; @@ -26,22 +24,17 @@ pub async fn active_channel_insert_hook_worker(rx: Receiver, tx: Send //trace!("insert queue item {item:?}"); match &item { QueryItem::Insert(item) => { - let shape_kind = match &item.shape { - Shape::Scalar => 0 as u32, - Shape::Wave(_) => 1, - Shape::Image(_, _) => 2, - }; - if let ScalarType::STRING = item.scalar_type {} + // TODO match on the QueryItem itself + let shape_kind = 0 as u8; histo .entry(item.series.clone()) - .and_modify(|(c, msp, lsp, pulse, _shape_kind)| { + .and_modify(|(c, msp, lsp, _shape_kind)| { *c += 1; *msp = item.ts_msp; *lsp = item.ts_lsp; - *pulse = item.pulse; // TODO should check that shape_kind stays the same. }) - .or_insert((0 as usize, item.ts_msp, item.ts_lsp, item.pulse, shape_kind)); + .or_insert((1 as u64, item.ts_msp, item.ts_lsp, shape_kind)); } _ => {} } @@ -57,32 +50,16 @@ pub async fn active_channel_insert_hook_worker(rx: Receiver, tx: Send printed_last = tsnow; let mut all: Vec<_> = histo .iter() - .map(|(k, (c, msp, lsp, pulse, shape_kind))| { - (usize::MAX - *c, k.clone(), *msp, *lsp, *pulse, *shape_kind) - }) + .map(|(k, (c, msp, lsp, shape_kind))| (u64::MAX - *c, k.clone(), *msp, *lsp, *shape_kind)) .collect(); all.sort_unstable(); info!("Active scalar"); - for (c, sid, msp, lsp, pulse, _shape_kind) in all.iter().filter(|x| x.5 == 0).take(6) { - info!( - "{:10} {:20} {:14} {:20} {:?}", - usize::MAX - c, - msp.to_u64(), - lsp.ns(), - pulse, - sid - ); + for (c, sid, msp, lsp, _shape_kind) in all.iter().filter(|x| x.4 == 0).take(6) { + info!("{:10} {:20} {:14} {:?}", u64::MAX - c, msp.to_u64(), lsp.ns(), sid); } info!("Active wave"); - for (c, sid, msp, lsp, pulse, _shape_kind) in all.iter().filter(|x| x.5 == 1).take(6) { - info!( - "{:10} {:20} {:14} {:20} {:?}", - usize::MAX - c, - msp.to_u64(), - lsp.ns(), - pulse, - sid - ); + for (c, sid, msp, lsp, _shape_kind) in all.iter().filter(|x| x.4 == 1).take(6) { + info!("{:10} {:20} {:14} {:?}", u64::MAX - c, msp.to_u64(), lsp.ns(), sid); } histo.clear(); } diff --git a/daqingest/src/tools.rs b/daqingest/src/tools.rs index 879f435..fa82351 100644 --- a/daqingest/src/tools.rs +++ b/daqingest/src/tools.rs @@ -346,36 +346,12 @@ pub async fn find_older_msp( fn table_name_from_type(scalar_type: &ScalarType, shape: &Shape) -> &'static str { match shape { Shape::Scalar => match scalar_type { - ScalarType::U8 => todo!(), - ScalarType::U16 => todo!(), - ScalarType::U32 => todo!(), - ScalarType::U64 => todo!(), - ScalarType::I8 => todo!(), - ScalarType::I16 => todo!(), - ScalarType::I32 => todo!(), - ScalarType::I64 => todo!(), ScalarType::F32 => "events_scalar_f32", - ScalarType::F64 => todo!(), - ScalarType::BOOL => todo!(), - ScalarType::STRING => todo!(), - ScalarType::Enum => todo!(), - ScalarType::ChannelStatus => todo!(), + _ => todo!(), }, Shape::Wave(_) => match scalar_type { - ScalarType::U8 => todo!(), - ScalarType::U16 => todo!(), - ScalarType::U32 => todo!(), - ScalarType::U64 => todo!(), - ScalarType::I8 => todo!(), - ScalarType::I16 => todo!(), - ScalarType::I32 => todo!(), - ScalarType::I64 => todo!(), ScalarType::F32 => "events_array_f32", - ScalarType::F64 => todo!(), - ScalarType::BOOL => todo!(), - ScalarType::STRING => todo!(), - ScalarType::Enum => todo!(), - ScalarType::ChannelStatus => todo!(), + _ => todo!(), }, Shape::Image(_, _) => todo!(), } diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index d7d2937..bf2dd9c 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -1200,7 +1200,6 @@ impl CaConn { let created_state = WritableState { tsbeg: self.poll_tsnow, channel: std::mem::replace(&mut st2.channel, CreatedState::dummy()), - // channel: st2.channel.clone(), writer, binwriter, reading: ReadingState::Polling(PollingState { @@ -1868,6 +1867,18 @@ impl CaConn { stnow: SystemTime, stats: &CaConnStats, ) -> Result<(), Error> { + { + use proto::CaMetaValue::*; + match &value.meta { + CaMetaTime(meta) => { + if meta.status != 0 { + let sid = writer.sid(); + debug!("{:?} status {:3} severity {:3}", sid, meta.status, meta.severity); + } + } + _ => {} + } + } trace_event_incoming!("event_add_ingest payload_len {} value {:?}", payload_len, value); crst.ts_alive_last = tsnow; crst.ts_activity_last = tsnow; diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index 0f6318b..f6691eb 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -552,9 +552,6 @@ pub struct InsertItem { pub ts_msp: TsMs, pub ts_lsp: DtNano, pub msp_bump: bool, - pub pulse: u64, - pub scalar_type: ScalarType, - pub shape: Shape, pub val: DataValue, pub ts_net: TsMs, pub ts_alt_1: TsNano, diff --git a/scywr/src/schema.rs b/scywr/src/schema.rs index 4b5e099..5190f95 100644 --- a/scywr/src/schema.rs +++ b/scywr/src/schema.rs @@ -269,13 +269,6 @@ impl GenTwcsTab { } if let Some(row) = rows.get(0) { let mut set_opts = Vec::new(); - info!( - "{:20} vs {:20} {:20} {:20}", - row.0, - self.default_time_to_live.as_secs(), - self.keyspace, - self.name, - ); if row.0 != self.default_time_to_live.as_secs() { if false { set_opts.push(format!( @@ -284,6 +277,13 @@ impl GenTwcsTab { )); } else { info!("mismatch default_time_to_live"); + info!( + "{:20} vs {:20} {:20} {:20}", + row.0, + self.default_time_to_live.as_secs(), + self.keyspace, + self.name, + ); } } if row.1 != self.gc_grace.as_secs() { @@ -464,6 +464,40 @@ async fn check_event_tables(keyspace: &str, rett: RetentionTime, scy: &ScySessio ); tab.setup(scy).await?; } + { + let tab = GenTwcsTab::new( + keyspace, + rett.table_prefix(), + format!("events_scalar_status"), + &[ + ("series", "bigint"), + ("ts_msp", "bigint"), + ("ts_lsp", "bigint"), + ("value", "smallint"), + ], + ["series", "ts_msp"], + ["ts_lsp"], + rett.ttl_events_d1(), + ); + tab.setup(scy).await?; + } + { + let tab = GenTwcsTab::new( + keyspace, + rett.table_prefix(), + format!("events_scalar_severity"), + &[ + ("series", "bigint"), + ("ts_msp", "bigint"), + ("ts_lsp", "bigint"), + ("value", "smallint"), + ], + ["series", "ts_msp"], + ["ts_lsp"], + rett.ttl_events_d1(), + ); + tab.setup(scy).await?; + } Ok(()) } diff --git a/serieswriter/src/establish_worker.rs b/serieswriter/src/establish_worker.rs index 6e6f0da..b497769 100644 --- a/serieswriter/src/establish_worker.rs +++ b/serieswriter/src/establish_worker.rs @@ -80,7 +80,6 @@ impl EstablishWriterWorker { async move { let res = RtWriter::new( wtx.clone(), - item.cssid, item.backend, item.channel, item.scalar_type, diff --git a/serieswriter/src/rtwriter.rs b/serieswriter/src/rtwriter.rs index eebd198..8a5d18f 100644 --- a/serieswriter/src/rtwriter.rs +++ b/serieswriter/src/rtwriter.rs @@ -12,7 +12,6 @@ use netpod::TsNano; use scywr::insertqueues::InsertDeques; use scywr::iteminsertqueue::DataValue; use scywr::iteminsertqueue::QueryItem; -use series::ChannelStatusSeriesId; use series::SeriesId; use std::collections::VecDeque; use std::time::Duration; @@ -55,7 +54,6 @@ pub struct RtWriter { impl RtWriter { pub async fn new( channel_info_tx: Sender, - cssid: ChannelStatusSeriesId, backend: String, channel: String, scalar_type: ScalarType, @@ -82,18 +80,15 @@ impl RtWriter { res.series.to_series() }; let state_st = { - let writer = - SeriesWriter::establish_with_cssid_sid(cssid, sid, scalar_type.clone(), shape.clone(), stnow).await?; + let writer = SeriesWriter::establish_with_sid(sid, stnow)?; State { writer, last_ins: None } }; let state_mt = { - let writer = - SeriesWriter::establish_with_cssid_sid(cssid, sid, scalar_type.clone(), shape.clone(), stnow).await?; + let writer = SeriesWriter::establish_with_sid(sid, stnow)?; State { writer, last_ins: None } }; let state_lt = { - let writer = - SeriesWriter::establish_with_cssid_sid(cssid, sid, scalar_type.clone(), shape.clone(), stnow).await?; + let writer = SeriesWriter::establish_with_sid(sid, stnow)?; State { writer, last_ins: None } }; let ret = Self { @@ -108,6 +103,37 @@ impl RtWriter { Ok(ret) } + pub fn new_with_series_id( + series: SeriesId, + scalar_type: ScalarType, + shape: Shape, + min_quiets: MinQuiets, + stnow: SystemTime, + ) -> Result { + let state_st = { + let writer = SeriesWriter::establish_with_sid(series, stnow)?; + State { writer, last_ins: None } + }; + let state_mt = { + let writer = SeriesWriter::establish_with_sid(series, stnow)?; + State { writer, last_ins: None } + }; + let state_lt = { + let writer = SeriesWriter::establish_with_sid(series, stnow)?; + State { writer, last_ins: None } + }; + let ret = Self { + sid: series, + scalar_type, + shape, + state_st, + state_mt, + state_lt, + min_quiets, + }; + Ok(ret) + } + pub fn sid(&self) -> SeriesId { self.sid.clone() } @@ -120,6 +146,10 @@ impl RtWriter { self.shape.clone() } + pub fn min_quiets(&self) -> MinQuiets { + self.min_quiets.clone() + } + pub fn write( &mut self, ts_ioc: TsNano, @@ -128,9 +158,6 @@ impl RtWriter { iqdqs: &mut InsertDeques, ) -> Result<((bool, bool, bool),), Error> { let sid = self.sid; - if sid.id() == 6050300124140774549 { - info!("write {:?}", val); - } let (did_write_st,) = Self::write_inner( "ST", self.min_quiets.st, diff --git a/serieswriter/src/writer.rs b/serieswriter/src/writer.rs index 85dbb93..0ccde1a 100644 --- a/serieswriter/src/writer.rs +++ b/serieswriter/src/writer.rs @@ -45,10 +45,7 @@ impl From for Error { #[derive(Debug)] pub struct SeriesWriter { - cssid: ChannelStatusSeriesId, sid: SeriesId, - scalar_type: ScalarType, - shape: Shape, ts_msp_last: Option, inserted_in_current_msp: u32, bytes_in_current_msp: u32, @@ -78,13 +75,12 @@ impl SeriesWriter { }; worker_tx.send(item).await?; let res = rx.recv().await?.map_err(|_| Error::SeriesLookupError)?; - let cssid = ChannelStatusSeriesId::new(res.series.to_series().id()); - Self::establish_with_cssid(worker_tx, cssid, backend, channel, scalar_type, shape, stnow).await + let _cssid = ChannelStatusSeriesId::new(res.series.to_series().id()); + Self::establish_with(worker_tx, backend, channel, scalar_type, shape, stnow).await } - pub async fn establish_with_cssid( + pub async fn establish_with( channel_info_tx: Sender, - cssid: ChannelStatusSeriesId, backend: String, channel: String, scalar_type: ScalarType, @@ -103,21 +99,12 @@ impl SeriesWriter { channel_info_tx.send(item).await?; let res = rx.recv().await?.map_err(|_| Error::SeriesLookupError)?; let sid = res.series.to_series(); - Self::establish_with_cssid_sid(cssid, sid, scalar_type, shape, stnow).await + Self::establish_with_sid(sid, stnow) } - pub async fn establish_with_cssid_sid( - cssid: ChannelStatusSeriesId, - sid: SeriesId, - scalar_type: ScalarType, - shape: Shape, - stnow: SystemTime, - ) -> Result { + pub fn establish_with_sid(sid: SeriesId, stnow: SystemTime) -> Result { let res = Self { - cssid, sid, - scalar_type, - shape, ts_msp_last: None, inserted_in_current_msp: 0, bytes_in_current_msp: 0, @@ -132,14 +119,6 @@ impl SeriesWriter { self.sid.clone() } - pub fn scalar_type(&self) -> &ScalarType { - &self.scalar_type - } - - pub fn shape(&self) -> &Shape { - &self.shape - } - pub fn write( &mut self, ts_ioc: TsNano, @@ -192,9 +171,6 @@ impl SeriesWriter { ts_net: ts_local.to_ts_ms(), ts_alt_1: ts_ioc, msp_bump: ts_msp_changed, - pulse: 0, - scalar_type: self.scalar_type.clone(), - shape: self.shape.clone(), val, }; // TODO decide on the path in the new deques struct