From 0944624e84d0d22d398bcb2796f216b59e75c393 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 15 Jul 2022 09:19:26 +0200 Subject: [PATCH] Allow different kinds of items to be sent to database insert queue --- netfetch/src/ca.rs | 86 ++++++++++++++++++++++++++++++++--------- netfetch/src/ca/conn.rs | 58 ++++++++++++++++++++------- netfetch/src/series.rs | 30 ++++++++------ netfetch/src/store.rs | 33 +++++++++++++--- 4 files changed, 157 insertions(+), 50 deletions(-) diff --git a/netfetch/src/ca.rs b/netfetch/src/ca.rs index 075c5d0..202ff31 100644 --- a/netfetch/src/ca.rs +++ b/netfetch/src/ca.rs @@ -4,7 +4,7 @@ pub mod store; use self::conn::FindIocStream; use self::store::DataStore; -use crate::store::CommonInsertItemQueue; +use crate::store::{CommonInsertItemQueue, QueryItem}; use conn::CaConn; use err::Error; use futures_util::StreamExt; @@ -275,29 +275,77 @@ async fn spawn_scylla_insert_workers( let fut = async move { let mut i1 = 0; while let Ok(item) = recv.recv().await { - stats.store_worker_item_recv_inc(); - let insert_frac = insert_frac.load(Ordering::Acquire); - if i1 % 1000 < insert_frac { - match crate::store::insert_item(item, &data_store, &stats).await { - Ok(_) => { - stats.store_worker_item_insert_inc(); + match item { + QueryItem::Insert(item) => { + stats.store_worker_item_recv_inc(); + let insert_frac = insert_frac.load(Ordering::Acquire); + if i1 % 1000 < insert_frac { + match crate::store::insert_item(item, &data_store, &stats).await { + Ok(_) => { + stats.store_worker_item_insert_inc(); + } + Err(e) => { + stats.store_worker_item_error_inc(); + // TODO introduce more structured error variants. + if e.msg().contains("WriteTimeout") { + tokio::time::sleep(Duration::from_millis(100)).await; + } else { + // TODO back off but continue. + error!("insert worker sees error: {e:?}"); + break; + } + } + } + } else { + stats.store_worker_item_drop_inc(); } - Err(e) => { - stats.store_worker_item_error_inc(); - // TODO introduce more structured error variants. - if e.msg().contains("WriteTimeout") { - tokio::time::sleep(Duration::from_millis(100)).await; - } else { - // TODO back off but continue. - error!("insert worker sees error: {e:?}"); - break; + i1 += 1; + } + QueryItem::Mute(item) => { + let values = ( + (item.series & 0xff) as i32, + item.series as i64, + item.ts as i64, + item.ema, + item.emd, + ); + let qres = data_store + .scy + .query( + "insert into muted (part, series, ts, ema, emd) values (?, ?, ?, ?, ?)", + values, + ) + .await; + match qres { + Ok(_) => {} + Err(_) => { + stats.store_worker_item_error_inc(); + } + } + } + QueryItem::Ivl(item) => { + let values = ( + (item.series & 0xff) as i32, + item.series as i64, + item.ts as i64, + item.ema, + item.emd, + ); + let qres = data_store + .scy + .query( + "insert into item_recv_ivl (part, series, ts, ema, emd) values (?, ?, ?, ?, ?)", + values, + ) + .await; + match qres { + Ok(_) => {} + Err(_) => { + stats.store_worker_item_error_inc(); } } } - } else { - stats.store_worker_item_drop_inc(); } - i1 += 1; } }; tokio::spawn(fut); diff --git a/netfetch/src/ca/conn.rs b/netfetch/src/ca/conn.rs index 6425051..9e5d883 100644 --- a/netfetch/src/ca/conn.rs +++ b/netfetch/src/ca/conn.rs @@ -3,7 +3,7 @@ use super::store::DataStore; use crate::bsread::ChannelDescDecoded; use crate::ca::proto::{CreateChan, EventAdd, HeadInfo}; use crate::series::{Existence, SeriesId}; -use crate::store::{CommonInsertItemQueueSender, InsertItem}; +use crate::store::{CommonInsertItemQueueSender, InsertItem, IvlItem, MuteItem, QueryItem}; use err::Error; use futures_util::stream::FuturesOrdered; use futures_util::{Future, FutureExt, Stream, StreamExt, TryFutureExt}; @@ -62,9 +62,10 @@ struct CreatedState { ts_msp_grid_last: u32, inserted_in_ts_msp: u64, insert_item_ivl_ema: IntervalEma, + item_recv_ivl_ema: IntervalEma, + insert_recv_ivl_last: Instant, insert_next_earliest: Instant, - #[allow(unused)] - fast_warn_count: u32, + muted_before: u32, } #[allow(unused)] @@ -114,9 +115,9 @@ pub struct CaConn { name_by_cid: BTreeMap, poll_count: usize, data_store: Arc, - insert_item_queue: VecDeque, + insert_item_queue: VecDeque, insert_item_sender: CommonInsertItemQueueSender, - insert_item_send_fut: Option>, + insert_item_send_fut: Option>, fut_get_series: FuturesOrdered), Error>> + Send>>>, remote_addr_dbg: SocketAddrV4, @@ -271,8 +272,10 @@ impl CaConn { ts_msp_grid_last: 0, inserted_in_ts_msp: u64::MAX, insert_item_ivl_ema: IntervalEma::new(), + item_recv_ivl_ema: IntervalEma::new(), + insert_recv_ivl_last: Instant::now(), insert_next_earliest: Instant::now(), - fast_warn_count: 0, + muted_before: 0, }); let scalar_type = ScalarType::from_ca_id(data_type)?; let shape = Shape::from_ca_count(data_count)?; @@ -351,7 +354,7 @@ impl CaConn { val: ev.value, ts_msp_grid, }; - item_queue.push_back(item); + item_queue.push_back(QueryItem::Insert(item)); self.stats.insert_item_create_inc(); Ok(()) } @@ -398,23 +401,26 @@ impl CaConn { return Err(format!("no series id on insert").into()); } }; + let ts = { + let ts = SystemTime::now(); + let epoch = ts.duration_since(std::time::UNIX_EPOCH).unwrap(); + epoch.as_secs() * SEC + epoch.subsec_nanos() as u64 + }; let tsnow = Instant::now(); if tsnow >= st.insert_next_earliest { + st.muted_before = 0; st.insert_item_ivl_ema.tick(tsnow); let em = st.insert_item_ivl_ema.ema(); let ema = em.ema(); - let mm = self.insert_ivl_min.load(Ordering::Acquire); - let mm = (mm as f32) * 1e-6; - let dt = (mm - ema) / em.k(); + let ivl_min = self.insert_ivl_min.load(Ordering::Acquire); + let ivl_min = (ivl_min as f32) * 1e-6; + let dt = (ivl_min - ema).max(0.) / em.k(); st.insert_next_earliest = tsnow .checked_add(Duration::from_micros((dt * 1e6) as u64)) .ok_or_else(|| Error::with_msg_no_trace("time overflow in next insert"))?; let ts_msp_last = st.ts_msp_last; let inserted_in_ts_msp = st.inserted_in_ts_msp; // TODO get event timestamp from channel access field - let ts = SystemTime::now(); - let epoch = ts.duration_since(std::time::UNIX_EPOCH).unwrap(); - let ts = epoch.as_secs() * SEC + epoch.subsec_nanos() as u64; let ts_msp_grid = (ts / (SEC * 10 * 6 * 2)) as u32 * (6 * 2); let ts_msp_grid = if st.ts_msp_grid_last != ts_msp_grid { st.ts_msp_grid_last = ts_msp_grid; @@ -435,6 +441,28 @@ impl CaConn { )?; } else { self.stats.channel_fast_item_drop_inc(); + if tsnow.duration_since(st.insert_recv_ivl_last) >= Duration::from_millis(2000) { + st.insert_recv_ivl_last = tsnow; + let ema = st.insert_item_ivl_ema.ema(); + let item = IvlItem { + series: series.id(), + ts, + ema: ema.ema(), + emd: ema.emv().sqrt(), + }; + self.insert_item_queue.push_back(QueryItem::Ivl(item)); + } + if false && st.muted_before == 0 { + let ema = st.insert_item_ivl_ema.ema(); + let item = MuteItem { + series: series.id(), + ts, + ema: ema.ema(), + emd: ema.emv().sqrt(), + }; + self.insert_item_queue.push_back(QueryItem::Mute(item)); + } + st.muted_before = 1; } } _ => { @@ -586,8 +614,10 @@ impl CaConn { ts_msp_grid_last: 0, inserted_in_ts_msp: u64::MAX, insert_item_ivl_ema: IntervalEma::new(), + item_recv_ivl_ema: IntervalEma::new(), + insert_recv_ivl_last: Instant::now(), insert_next_earliest: Instant::now(), - fast_warn_count: 0, + muted_before: 0, }); // TODO handle error in different way. Should most likely not abort. let cd = ChannelDescDecoded { diff --git a/netfetch/src/series.rs b/netfetch/src/series.rs index adb466c..34bb1de 100644 --- a/netfetch/src/series.rs +++ b/netfetch/src/series.rs @@ -1,9 +1,8 @@ use crate::bsread::ChannelDescDecoded; use crate::errconv::ErrConv; use err::Error; -#[allow(unused)] use log::*; -use std::time::Duration; +use std::time::{Duration, Instant}; use tokio_postgres::Client as PgClient; #[derive(Clone, Debug)] @@ -41,21 +40,29 @@ pub async fn get_series_id(pg_client: &PgClient, cd: &ChannelDescDecoded) -> Res all.push(series); } let rn = all.len(); + let tsbeg = Instant::now(); if rn == 0 { use md5::Digest; let mut h = md5::Md5::new(); h.update(facility.as_bytes()); h.update(channel_name.as_bytes()); - h.update(format!("{:?} {:?}", scalar_type, shape).as_bytes()); - let f = h.finalize(); - let mut series = u64::from_le_bytes(f.as_slice()[0..8].try_into().unwrap()); - if series > i64::MAX as u64 { - series &= 0x7fffffffffffffff; - } - for _ in 0..2000 { - if series < 1 || series > i64::MAX as u64 { + h.update(format!("{:?}", scalar_type).as_bytes()); + h.update(format!("{:?}", shape).as_bytes()); + for _ in 0..200 { + h.update(tsbeg.elapsed().subsec_nanos().to_ne_bytes()); + let f = h.clone().finalize(); + let mut series = u64::from_le_bytes(f.as_slice()[0..8].try_into().unwrap()); + if series > i64::MAX as u64 { + series &= 0x7fffffffffffffff; + } + if series == 0 { series = 1; } + if series <= 0 || series > i64::MAX as u64 { + return Err(Error::with_msg_no_trace(format!( + "attempt to insert bad series id {series}" + ))); + } let res = pg_client .execute( concat!( @@ -76,9 +83,8 @@ pub async fn get_series_id(pg_client: &PgClient, cd: &ChannelDescDecoded) -> Res ); } tokio::time::sleep(Duration::from_millis(20)).await; - series += 1; } - error!("tried to insert {series:?} for {facility} {channel_name} {scalar_type:?} {shape:?} but it failed"); + error!("tried to insert new series id for {facility} {channel_name} {scalar_type:?} {shape:?} but failed"); Err(Error::with_msg_no_trace(format!("get_series_id can not create and insert series id {facility:?} {channel_name:?} {scalar_type:?} {shape:?}"))) } else { let series = all[0] as u64; diff --git a/netfetch/src/store.rs b/netfetch/src/store.rs index ff7073a..de84795 100644 --- a/netfetch/src/store.rs +++ b/netfetch/src/store.rs @@ -96,13 +96,36 @@ pub struct InsertItem { pub val: CaDataValue, } +#[derive(Debug)] +pub struct MuteItem { + pub series: u64, + pub ts: u64, + pub ema: f32, + pub emd: f32, +} + +#[derive(Debug)] +pub struct IvlItem { + pub series: u64, + pub ts: u64, + pub ema: f32, + pub emd: f32, +} + +#[derive(Debug)] +pub enum QueryItem { + Insert(InsertItem), + Mute(MuteItem), + Ivl(IvlItem), +} + pub struct CommonInsertItemQueueSender { - sender: async_channel::Sender, + sender: async_channel::Sender, } impl CommonInsertItemQueueSender { #[inline(always)] - pub fn send(&self, k: InsertItem) -> async_channel::Send { + pub fn send(&self, k: QueryItem) -> async_channel::Send { self.sender.send(k) } @@ -113,8 +136,8 @@ impl CommonInsertItemQueueSender { } pub struct CommonInsertItemQueue { - sender: async_channel::Sender, - recv: async_channel::Receiver, + sender: async_channel::Sender, + recv: async_channel::Receiver, } impl CommonInsertItemQueue { @@ -132,7 +155,7 @@ impl CommonInsertItemQueue { } } - pub fn receiver(&self) -> async_channel::Receiver { + pub fn receiver(&self) -> async_channel::Receiver { self.recv.clone() } }