diff --git a/.gitignore b/.gitignore index f7a5862..1120100 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ /Cargo.lock /tmpdoc /.vscode +/tmp diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index 34f19bb..b060e93 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqingest" -version = "0.2.1-aa.1" +version = "0.2.1" authors = ["Dominik Werder "] edition = "2021" @@ -14,7 +14,7 @@ clap = { version = "4.5.1", features = ["derive", "cargo"] } tracing = "0.1" serde = { version = "1.0", features = ["derive"] } tokio-postgres = "0.7.10" -async-channel = "2.2.0" +async-channel = "2.3.1" futures-util = "0.3" chrono = "0.4" bytes = "1.6.0" diff --git a/daqingest/src/daemon.rs b/daqingest/src/daemon.rs index 871db6a..2b823d4 100644 --- a/daqingest/src/daemon.rs +++ b/daqingest/src/daemon.rs @@ -610,6 +610,9 @@ impl Daemon { self.iqtx .take() .ok_or_else(|| Error::with_msg_no_trace("no iqtx available"))?, + self.ingest_opts.scylla_config_st().clone(), + self.ingest_opts.scylla_config_mt().clone(), + self.ingest_opts.scylla_config_lt().clone(), ); let rres = Arc::new(rres); let metrics_jh = { diff --git a/netfetch/Cargo.toml b/netfetch/Cargo.toml index dbe0afb..ae4f339 100644 --- a/netfetch/Cargo.toml +++ b/netfetch/Cargo.toml @@ -15,10 +15,10 @@ serde_yaml = "0.9.16" ciborium = "0.2.2" tokio-stream = { version = "0.1", features = ["fs"] } tracing = "0.1.37" -async-channel = "2.0.0" -bytes = "1.5" +async-channel = "2.3.1" +bytes = "1.6" arrayref = "0.3" -byteorder = "1.4" +byteorder = "1.5" futures-util = "0.3" md-5 = "0.10.5" hex = "0.4.3" @@ -26,7 +26,7 @@ regex = "1.8.4" axum = "0.7.5" http-body = "1" url = "2.2" -hyper = "0.14" +#hyper = "1.3.1" chrono = "0.4" humantime = "2.1.0" humantime-serde = "1.1.1" diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 72608f2..6857c99 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -4,6 +4,7 @@ use super::proto::ReadNotify; use crate::ca::proto::ChannelClose; use crate::ca::proto::EventCancel; use crate::conf::ChannelConfig; +use crate::metrics::status::StorageUsage; use crate::throttletrace::ThrottleTrace; use async_channel::Receiver; use async_channel::Sender; @@ -18,6 +19,7 @@ use futures_util::StreamExt; use hashbrown::HashMap; use log::*; use netpod::timeunits::*; +use netpod::ttl::RetentionTime; use netpod::ScalarType; use netpod::Shape; use netpod::TsMs; @@ -193,6 +195,7 @@ pub struct ChannelStateInfo { mod ser_instant { use super::*; + use netpod::DATETIME_FMT_3MS; use serde::Deserializer; use serde::Serializer; @@ -221,9 +224,7 @@ mod ser_instant { .unwrap(); now.checked_add_signed(dur2).unwrap() }; - //info!("formatting {:?}", t1); - let s = t1.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string(); - //info!("final string {:?}", s); + let s = t1.format(DATETIME_FMT_3MS).to_string(); ser.serialize_str(&s) } None => ser.serialize_none(), @@ -351,6 +352,34 @@ enum ReadingState { Polling(PollingState), } +#[derive(Debug, Clone)] +struct AccountingInfo { + usage: StorageUsage, + beg: TsMs, +} + +impl AccountingInfo { + fn new(beg: TsMs) -> Self { + Self { + usage: StorageUsage::new(), + beg, + } + } + + fn push_written(&mut self, payload_len: u32) { + self.usage.push_written(payload_len); + } + + fn usage(&self) -> &StorageUsage { + &self.usage + } + + fn reset(&mut self, msp: TsMs) { + self.beg = msp; + self.usage.reset(); + } +} + #[derive(Debug, Clone)] struct CreatedState { cssid: ChannelStatusSeriesId, @@ -377,9 +406,9 @@ struct CreatedState { stwin_ts: u64, stwin_count: u32, stwin_bytes: u32, - account_emit_last: TsMs, - account_count: u64, - account_bytes: u64, + acc_st: AccountingInfo, + acc_mt: AccountingInfo, + acc_lt: AccountingInfo, dw_st_last: SystemTime, dw_mt_last: SystemTime, dw_lt_last: SystemTime, @@ -389,6 +418,7 @@ impl CreatedState { fn dummy() -> Self { let tsnow = Instant::now(); let stnow = SystemTime::now(); + let (acc_msp, _) = TsMs::from_system_time(stnow).to_grid_02(EMIT_ACCOUNTING_SNAP); Self { cssid: ChannelStatusSeriesId::new(0), cid: Cid(0), @@ -412,9 +442,9 @@ impl CreatedState { stwin_ts: 0, stwin_count: 0, stwin_bytes: 0, - account_emit_last: TsMs(0), - account_count: 0, - account_bytes: 0, + acc_st: AccountingInfo::new(acc_msp), + acc_mt: AccountingInfo::new(acc_msp), + acc_lt: AccountingInfo::new(acc_msp), dw_st_last: SystemTime::UNIX_EPOCH, dw_mt_last: SystemTime::UNIX_EPOCH, dw_lt_last: SystemTime::UNIX_EPOCH, @@ -829,7 +859,7 @@ pub struct CaConn { cid_by_name: BTreeMap, cid_by_subid: HashMap, cid_by_sid: HashMap, - channel_status_emit_last: Instant, + channel_status_emit_next: Instant, tick_last_writer: Instant, init_state_count: u64, iqdqs: InsertDeques, @@ -896,7 +926,7 @@ impl CaConn { cid_by_name: BTreeMap::new(), cid_by_subid: HashMap::new(), cid_by_sid: HashMap::new(), - channel_status_emit_last: tsnow, + channel_status_emit_next: tsnow + Self::channel_status_emit_ivl(&mut rng), tick_last_writer: tsnow, iqdqs: InsertDeques::new(), remote_addr_dbg, @@ -931,6 +961,10 @@ impl CaConn { IOC_PING_IVL * 100 / (70 + (rng.next_u32() % 60)) } + fn channel_status_emit_ivl(rng: &mut Xoshiro128PlusPlus) -> Duration { + Duration::from_millis(6000 + (rng.next_u32() & 0x7ff) as u64) + } + fn new_self_ticker() -> Pin> { Box::pin(tokio::time::sleep(Duration::from_millis(500))) } @@ -1664,25 +1698,23 @@ impl CaConn { let ts_diff = ts.abs_diff(ts_local); stats.ca_ts_off().ingest((ts_diff / MS) as u32); { - { - crst.account_count += 1; - // TODO how do we account for bytes? Here, we also add 8 bytes for the timestamp. - crst.account_bytes += 8 + payload_len as u64; - crst.muted_before = 0; - crst.insert_item_ivl_ema.tick(tsnow); - } Self::check_ev_value_data(&value.data, &writer.scalar_type())?; + crst.muted_before = 0; + crst.insert_item_ivl_ema.tick(tsnow); { let val: DataValue = value.data.into(); let ((dwst, dwmt, dwlt),) = writer.write(TsNano::from_ns(ts), TsNano::from_ns(ts_local), val, iqdqs)?; if dwst { crst.dw_st_last = stnow; + crst.acc_st.push_written(payload_len); } if dwmt { crst.dw_mt_last = stnow; + crst.acc_mt.push_written(payload_len); } if dwlt { crst.dw_lt_last = stnow; + crst.acc_lt.push_written(payload_len); } } } @@ -1751,7 +1783,7 @@ impl CaConn { Ready(Some(k)) => match k { Ok(k) => match k { CaItem::Empty => { - info!("CaItem::Empty"); + debug!("CaItem::Empty"); Ready(Some(Ok(()))) } CaItem::Msg(msg) => match msg.ty { @@ -2183,6 +2215,7 @@ impl CaConn { let ca_dbr_type = k.data_type + 14; let scalar_type = ScalarType::from_ca_id(k.data_type)?; let shape = Shape::from_ca_count(k.data_count)?; + let (acc_msp, _) = TsMs::from_system_time(stnow).to_grid_02(EMIT_ACCOUNTING_SNAP); let channel = CreatedState { cssid, cid, @@ -2206,9 +2239,9 @@ impl CaConn { stwin_ts: 0, stwin_count: 0, stwin_bytes: 0, - account_emit_last: TsMs::from_ms_u64(0), - account_count: 0, - account_bytes: 0, + acc_st: AccountingInfo::new(acc_msp), + acc_mt: AccountingInfo::new(acc_msp), + acc_lt: AccountingInfo::new(acc_msp), dw_st_last: SystemTime::UNIX_EPOCH, dw_mt_last: SystemTime::UNIX_EPOCH, dw_lt_last: SystemTime::UNIX_EPOCH, @@ -2230,7 +2263,7 @@ impl CaConn { } fn handle_channel_close_res(&mut self, k: proto::ChannelCloseRes, tsnow: Instant) -> Result<(), Error> { - info!("{:?}", k); + debug!("{:?}", k); Ok(()) } @@ -2283,7 +2316,7 @@ impl CaConn { } Ok(Err(e)) => { use std::io::ErrorKind; - info!("error connect to {addr} {e}"); + debug!("error connect to {addr} {e}"); let addr = addr.clone(); self.iqdqs .emit_status_item(QueryItem::ConnectionStatus(ConnectionStatusItem { @@ -2300,7 +2333,7 @@ impl CaConn { } Err(e) => { // TODO log with exponential backoff - info!("timeout connect to {addr} {e}"); + debug!("timeout connect to {addr} {e}"); let addr = addr.clone(); self.iqdqs .emit_status_item(QueryItem::ConnectionStatus(ConnectionStatusItem { @@ -2415,8 +2448,8 @@ impl CaConn { self.check_channels_state_poll(tsnow, cx)?; self.check_channels_alive(tsnow, cx)?; // TODO add some random variation - if self.channel_status_emit_last + Duration::from_millis(3000) <= tsnow { - self.channel_status_emit_last = tsnow; + if self.channel_status_emit_next <= tsnow { + self.channel_status_emit_next = tsnow + Self::channel_status_emit_ivl(&mut self.rng); self.emit_channel_status()?; self.emit_accounting()?; } @@ -2476,22 +2509,25 @@ impl CaConn { match st0 { ChannelState::Writable(st1) => { let ch = &mut st1.channel; - if ch.account_emit_last != msp { - if ch.account_count != 0 { - let series = st1.writer.sid(); - let count = ch.account_count as i64; - let bytes = ch.account_bytes as i64; - ch.account_count = 0; - ch.account_bytes = 0; - let item = QueryItem::Accounting(Accounting { - part: (series.id() & 0xff) as i32, - ts: msp, - series, - count, - bytes, - }); - self.iqdqs.emit_status_item(item)?; - ch.account_emit_last = msp; + for (acc, rt) in [&mut ch.acc_st, &mut ch.acc_mt, &mut ch.acc_lt].into_iter().zip([ + RetentionTime::Short, + RetentionTime::Medium, + RetentionTime::Long, + ]) { + if acc.beg != msp { + if acc.usage().count() != 0 { + let series = st1.writer.sid(); + let item = Accounting { + part: (series.id() & 0xff) as i32, + ts: acc.beg, + series, + count: acc.usage().count() as _, + bytes: acc.usage().bytes() as _, + }; + //info!("EMIT ITEM {rt:?} {item:?}"); + self.iqdqs.emit_accounting_item(rt, item)?; + acc.reset(msp); + } } } } diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index 283df48..71e3b87 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -1,4 +1,5 @@ #![allow(unused)] +pub mod delete; pub mod ingest; pub mod postingest; pub mod status; @@ -26,6 +27,7 @@ use http::Request; use http::StatusCode; use http_body::Body; use log::*; +use scywr::config::ScyllaIngestConfig; use scywr::insertqueues::InsertQueuesTx; use scywr::iteminsertqueue::QueryItem; use serde::Deserialize; @@ -312,14 +314,27 @@ pub struct RoutesResources { backend: String, worker_tx: Sender, iqtx: InsertQueuesTx, + scyconf_st: ScyllaIngestConfig, + scyconf_mt: ScyllaIngestConfig, + scyconf_lt: ScyllaIngestConfig, } impl RoutesResources { - pub fn new(backend: String, worker_tx: Sender, iqtx: InsertQueuesTx) -> Self { + pub fn new( + backend: String, + worker_tx: Sender, + iqtx: InsertQueuesTx, + scyconf_st: ScyllaIngestConfig, + scyconf_mt: ScyllaIngestConfig, + scyconf_lt: ScyllaIngestConfig, + ) -> Self { Self { backend, worker_tx, iqtx, + scyconf_st, + scyconf_mt, + scyconf_lt, } } } @@ -381,6 +396,13 @@ fn make_routes( let dcom = dcom.clone(); |Query(params): Query>| channel_add(params, dcom) }), + ) + .route( + "/remove", + get({ + let dcom = dcom.clone(); + |Query(params): Query>| channel_remove(params, dcom) + }), ), ) .nest( @@ -396,6 +418,33 @@ fn make_routes( )| { ingest::post_v01((headers, params, body), rres) } }), ), + ) + .nest( + "/private", + Router::new() + .nest( + "/channel", + Router::new().route( + "/delete", + post({ + let rres = rres.clone(); + move |(headers, params, body): ( + HeaderMap, + Query>, + axum::body::Body, + )| { + delete::delete((headers, params, body), rres) + } + }), + ), + ) + .route( + "/channel/states", + get({ + let tx = connset_cmd_tx.clone(); + |Query(params): Query>| private_channel_states(params, tx) + }), + ), ), ) .route( @@ -416,20 +465,6 @@ fn make_routes( |Query(params): Query>| find_channel(params, dcom) }), ) - .route( - "/daqingest/private/channel/states", - get({ - let tx = connset_cmd_tx.clone(); - |Query(params): Query>| private_channel_states(params, tx) - }), - ) - .route( - "/daqingest/channel/remove", - get({ - let dcom = dcom.clone(); - |Query(params): Query>| channel_remove(params, dcom) - }), - ) .route( "/daqingest/store_workers_rate", get({ diff --git a/netfetch/src/metrics/delete.rs b/netfetch/src/metrics/delete.rs new file mode 100644 index 0000000..ff49dc2 --- /dev/null +++ b/netfetch/src/metrics/delete.rs @@ -0,0 +1,245 @@ +use super::RoutesResources; +use axum::extract::FromRequest; +use axum::extract::Query; +use axum::handler::Handler; +use axum::http::HeaderMap; +use axum::Json; +use bytes::Bytes; +use chrono::DateTime; +use chrono::Utc; +use core::fmt; +use err::thiserror; +use err::ThisError; +use netpod::log::*; +use netpod::ttl::RetentionTime; +use netpod::ScalarType; +use netpod::TsMs; +use netpod::TsNano; +use scylla::Session as ScySession; +use scywr::config::ScyllaIngestConfig; +use scywr::insertqueues::InsertDeques; +use scywr::iteminsertqueue::ArrayValue; +use scywr::iteminsertqueue::DataValue; +use scywr::iteminsertqueue::QueryItem; +use scywr::iteminsertqueue::ScalarValue; +use scywr::scylla; +use scywr::scylla::prepared_statement::PreparedStatement; +use serde::Deserialize; +use series::SeriesId; +use serieswriter::writer::SeriesWriter; +use std::collections::HashMap; +use std::collections::VecDeque; +use std::io::Cursor; +use std::sync::Arc; +use std::time::Duration; +use std::time::SystemTime; +use streams::framed_bytes::FramedBytesStream; +use taskrun::tokio::time::timeout; + +#[allow(unused)] +macro_rules! debug_cql { + ($($arg:tt)*) => { + if true { + debug!($($arg)*); + } + }; +} + +#[derive(Debug, ThisError)] +pub enum Error { + Logic, + MissingRetentionTime, + MissingSeriesId, + MissingScalarType, + MissingBegDate, + MissingEndDate, + ScyllaTransport(#[from] scylla::transport::errors::NewSessionError), + ScyllaQuery(#[from] scylla::transport::errors::QueryError), + ScyllaRowType(#[from] scylla::transport::query_result::RowsExpectedError), + ScyllaRowError(#[from] scylla::cql_to_rust::FromRowError), + InvalidTimestamp, +} + +pub async fn delete( + (headers, Query(params), body): (HeaderMap, Query>, axum::body::Body), + rres: Arc, +) -> Json { + match delete_try(headers, params, body, rres).await { + Ok(k) => k, + Err(e) => Json(serde_json::json!({ + "error": e.to_string(), + })), + } +} + +fn st_to_ns(v: DateTime) -> Result { + let sec = v.timestamp(); + if sec < 0 { + Err(Error::InvalidTimestamp) + } else if sec > 18446744073 { + Err(Error::InvalidTimestamp) + } else { + let w = 1000000000 * sec as u64 + v.timestamp_subsec_nanos() as u64; + Ok(TsNano::from_ns(w)) + } +} + +// select * from sf_lt.lt_account_00 where token(part, ts) > -100000000 and series = 8554946496751499549 allow filtering + +async fn delete_try( + headers: HeaderMap, + params: HashMap, + body: axum::body::Body, + rres: Arc, +) -> Result, Error> { + let rt: RetentionTime = params + .get("retentionTime") + .ok_or(Error::MissingRetentionTime) + .and_then(|x| x.parse().map_err(|_| Error::MissingRetentionTime))?; + let series = params + .get("series") + .ok_or(Error::MissingSeriesId) + .and_then(|x| x.parse().map_err(|_| Error::MissingSeriesId)) + .map(SeriesId::new)?; + let beg: DateTime = params + .get("begDate") + .ok_or(Error::MissingBegDate) + .and_then(|x| x.parse().map_err(|_| Error::MissingBegDate))?; + let end: DateTime = params + .get("endDate") + .ok_or(Error::MissingEndDate) + .and_then(|x| x.parse().map_err(|_| Error::MissingEndDate))?; + let scalar_type: ScalarType = params + .get("scalarType") + .ok_or(Error::MissingScalarType) + .and_then(|x| ScalarType::from_variant_str(x).map_err(|_| Error::MissingScalarType))?; + debug_cql!("delete params {rt:?} {series:?} {beg:?} {end:?}"); + let beg = st_to_ns(beg)?; + let end = st_to_ns(end)?; + let scyconf = &rres.scyconf_st; + let scy = scy_connect(scyconf).await?; + let qu = { + let cql = format!( + concat!("select ts_msp from {}.{}ts_msp where series = ?"), + scyconf.keyspace(), + rt.table_prefix(), + ); + scy.prepare(scylla::query::Query::new(cql).with_page_size(4)).await? + }; + let qu_delete_val = { + let _cql = format!( + concat!( + "select ts_lsp from {}.{}events_scalar_{}", + " where series = ? and ts_msp = ?", + " and ts_lsp >= ? and ts_lsp < ?", + ), + scyconf.keyspace(), + rt.table_prefix(), + scalar_type.to_scylla_table_name_id(), + ); + let cql = format!( + concat!( + "delete from {}.{}events_scalar_{}", + " where series = ? and ts_msp = ?", + " and ts_lsp >= ? and ts_lsp < ?", + ), + scyconf.keyspace(), + rt.table_prefix(), + scalar_type.to_scylla_table_name_id(), + ); + scy.prepare(scylla::query::Query::new(cql).with_page_size(100)).await? + }; + let mut pst = None; + let mut i = 0; + loop { + // debug_cql!("query iteration {i}"); + let z = scy.execute_paged(&qu, (series.to_i64(),), pst).await?; + pst = z.paging_state.clone(); + for x in z.rows_typed::<(i64,)>()? { + let (msp,) = x?; + let msp = TsMs::from_ms_u64(msp as _); + let msp_ns = msp.ns_u64(); + delete_val(series.clone(), msp, beg, end, &qu_delete_val, &scy).await?; + } + if pst.is_none() { + debug_cql!("last page"); + break; + } + i += 1; + if false { + if i > 20 { + debug_cql!("loop limit"); + break; + } + } + } + Ok(Json(serde_json::Value::Null)) +} + +async fn delete_val( + series: SeriesId, + msp: TsMs, + beg: TsNano, + end: TsNano, + qu_delete_val: &PreparedStatement, + scy: &ScySession, +) -> Result<(), Error> { + let msp_ns = msp.ns_u64(); + if msp_ns >= end.ns() { + // debug_cql!(" return early msp {msp} after range"); + return Ok(()); + } + let r1 = if msp_ns >= beg.ns() { 0 } else { beg.ns() - msp_ns }; + let r2 = end.ns() - msp_ns; + let o0 = DateTime::from_timestamp_millis((msp.ms() + 0 / 1000000) as i64).unwrap(); + let o1 = DateTime::from_timestamp_millis((msp.ms() + r1 / 1000000) as i64).unwrap(); + let o2 = DateTime::from_timestamp_millis((msp.ms() + r2 / 1000000) as i64).unwrap(); + debug_cql!(" sub query {o0:?} {o1:?} {o2:?}"); + let mut pst = None; + let mut i = 0; + loop { + // debug_cql!(" sub query iteration {i}"); + let params = (series.to_i64(), msp.ms() as i64, r1 as i64, r2 as i64); + let z = scy.execute_paged(&qu_delete_val, params, pst).await?; + pst = z.paging_state.clone(); + if z.rows_num().is_ok() { + for (i, x) in z.rows_typed::<(i64,)>()?.enumerate() { + let (lsp,) = x?; + if false && i < 4 { + debug_cql!(" lsp {lsp}"); + } + } + } + if pst.is_none() { + // debug_cql!(" last page"); + break; + } + i += 1; + if false { + if i > 20 { + debug_cql!(" loop limit"); + break; + } + } + } + Ok(()) +} + +async fn scy_connect(scyconf: &ScyllaIngestConfig) -> Result, Error> { + use scylla::execution_profile::ExecutionProfileBuilder; + use scylla::statement::Consistency; + use scylla::transport::session::PoolSize; + use scylla::transport::session_builder::GenericSessionBuilder; + let profile = ExecutionProfileBuilder::default() + .consistency(Consistency::Quorum) + .build() + .into_handle(); + let scy = GenericSessionBuilder::new() + .pool_size(PoolSize::default()) + .known_nodes(scyconf.hosts()) + .default_execution_profile_handle(profile) + .build() + .await?; + let scy = Arc::new(scy); + Ok(scy) +} diff --git a/netfetch/src/metrics/ingest.rs b/netfetch/src/metrics/ingest.rs index 32aa906..fcbd0fe 100644 --- a/netfetch/src/metrics/ingest.rs +++ b/netfetch/src/metrics/ingest.rs @@ -14,6 +14,7 @@ use items_2::eventsdim0::EventsDim0NoPulse; use items_2::eventsdim1::EventsDim1; use items_2::eventsdim1::EventsDim1NoPulse; use netpod::log::*; +use netpod::ttl::RetentionTime; use netpod::ScalarType; use netpod::Shape; use netpod::TsNano; @@ -118,13 +119,16 @@ async fn post_v01_try( let s = params.get("scalarType").ok_or(Error::MissingScalarType)?; let scalar_type = ScalarType::from_variant_str(&s).map_err(|e| Error::Parse(e.to_string()))?; let shape: Shape = serde_json::from_str(params.get("shape").map_or("[]", |x| x.as_str()))?; - debug_setup!("parsed scalar_type {scalar_type:?}"); - debug_setup!("parsed shape {shape:?}"); + let rt: RetentionTime = params + .get("retentionTime") + .and_then(|x| x.parse().ok()) + .unwrap_or(RetentionTime::Short); debug_setup!( - "establishing series writer for {:?} {:?} {:?}", + "establishing series writer for {:?} {:?} {:?} {:?}", channel, scalar_type, - shape + shape, + rt ); let mut writer = SeriesWriter::establish(worker_tx, backend, channel, scalar_type.clone(), shape.clone(), stnow).await?; @@ -137,7 +141,7 @@ async fn post_v01_try( let x = match x { Ok(x) => x, Err(_) => { - tick_writers(&mut writer, &mut iqdqs)?; + tick_writers(&mut writer, &mut iqdqs, rt.clone())?; continue; } }; @@ -149,7 +153,7 @@ async fn post_v01_try( } }; trace_input!("got frame len {}", frame.len()); - let deque = &mut iqdqs.st_rf3_rx; + let deque = iqdqs.deque(rt.clone()); match &shape { Shape::Scalar => match &scalar_type { ScalarType::U8 => { @@ -240,14 +244,14 @@ async fn post_v01_try( trace_queues!("frame send_all begin {} {}", iqdqs.summary(), iqtx.summary()); iqtx.send_all(&mut iqdqs).await?; trace_queues!("frame send_all done {} {}", iqdqs.summary(), iqtx.summary()); - tick_writers(&mut writer, &mut iqdqs)?; + tick_writers(&mut writer, &mut iqdqs, rt.clone())?; trace_queues!("frame tick_writers done {} {}", iqdqs.summary(), iqtx.summary()); } trace_queues!("after send_all begin {} {}", iqdqs.summary(), iqtx.summary()); iqtx.send_all(&mut iqdqs).await?; trace_queues!("after send_all done {} {}", iqdqs.summary(), iqtx.summary()); - finish_writers(&mut writer, &mut iqdqs)?; + finish_writers(&mut writer, &mut iqdqs, rt.clone())?; trace_queues!("after finish_writers done {} {}", iqdqs.summary(), iqtx.summary()); let ret = Json(serde_json::json!({})); @@ -306,12 +310,12 @@ where Ok(()) } -fn tick_writers(writer: &mut SeriesWriter, deque: &mut InsertDeques) -> Result<(), Error> { - writer.tick(&mut deque.st_rf3_rx)?; +fn tick_writers(writer: &mut SeriesWriter, deques: &mut InsertDeques, rt: RetentionTime) -> Result<(), Error> { + writer.tick(deques.deque(rt))?; Ok(()) } -fn finish_writers(writer: &mut SeriesWriter, deque: &mut InsertDeques) -> Result<(), Error> { - writer.tick(&mut deque.st_rf3_rx)?; +fn finish_writers(writer: &mut SeriesWriter, deques: &mut InsertDeques, rt: RetentionTime) -> Result<(), Error> { + writer.tick(deques.deque(rt))?; Ok(()) } diff --git a/netfetch/src/metrics/status.rs b/netfetch/src/metrics/status.rs index 1241f86..5fecd76 100644 --- a/netfetch/src/metrics/status.rs +++ b/netfetch/src/metrics/status.rs @@ -3,6 +3,8 @@ use crate::ca::connset::ChannelStatusesRequest; use crate::ca::connset::ConnSetCmd; use crate::conf::ChannelConfig; use async_channel::Sender; +use chrono::DateTime; +use chrono::Utc; use serde::Serialize; use std::collections::BTreeMap; use std::collections::HashMap; @@ -11,9 +13,42 @@ use std::time::SystemTime; #[derive(Debug, Serialize)] pub struct ChannelStates { + running_since: DateTime, + // #[serde(with = "humantime_serde")] + // running_since_2: SystemTime, channels: BTreeMap, } +#[derive(Debug, Clone, Serialize)] +pub struct StorageUsage { + count: u64, + bytes: u64, +} + +impl StorageUsage { + pub fn new() -> Self { + Self { count: 0, bytes: 0 } + } + + pub fn reset(&mut self) { + self.count = 0; + self.bytes = 0; + } + + pub fn push_written(&mut self, payload_len: u32) { + self.count += 1; + self.bytes += 16 + payload_len as u64; + } + + pub fn count(&self) -> u64 { + self.count + } + + pub fn bytes(&self) -> u64 { + self.bytes + } +} + #[derive(Debug, Serialize)] struct ChannelState { ioc_address: Option, @@ -50,11 +85,7 @@ enum ConnectionState { // BTreeMap pub async fn channel_states(params: HashMap, tx: Sender) -> axum::Json { let name = params.get("name").map_or(String::new(), |x| x.clone()).to_string(); - let limit = params - .get("limit") - .map(|x| x.parse().ok()) - .unwrap_or(None) - .unwrap_or(40); + let limit = params.get("limit").and_then(|x| x.parse().ok()).unwrap_or(40); let (tx2, rx2) = async_channel::bounded(1); let req = ChannelStatusesRequest { name, limit, tx: tx2 }; let item = CaConnSetEvent::ConnSetCmd(ConnSetCmd::ChannelStatuses(req)); @@ -62,6 +93,7 @@ pub async fn channel_states(params: HashMap, tx: Sender Result<(), Error> { - self.lt_rf3_rx.push_back(item); + self.deque(RetentionTime::Long).push_back(item); Ok(()) } + + // Should be used only for connection and channel status items. + // It encapsulates the decision to which queue(s) we want to send these kind of items. + pub fn emit_accounting_item(&mut self, rt: RetentionTime, item: Accounting) -> Result<(), Error> { + self.deque(rt).push_back(QueryItem::Accounting(item)); + Ok(()) + } + + pub fn deque(&mut self, rt: RetentionTime) -> &mut VecDeque { + match rt { + RetentionTime::Short => &mut self.st_rf3_rx, + RetentionTime::Medium => &mut self.mt_rf3_rx, + RetentionTime::Long => &mut self.lt_rf3_rx, + } + } } pub struct InsertDequesSummary<'a> { diff --git a/scywr/src/lib.rs b/scywr/src/lib.rs index 9071e95..6190a9b 100644 --- a/scywr/src/lib.rs +++ b/scywr/src/lib.rs @@ -1,5 +1,6 @@ pub mod access; pub mod config; +pub mod delete; pub mod err; pub mod fut; pub mod futbatch;