From cf0ed57e7ec6fdd244c98e0174804e316673a384 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 27 Mar 2024 16:14:56 +0100 Subject: [PATCH] Start add support for http post ingest --- .cargo/config.toml | 2 +- Cargo.toml | 2 +- daqingest/Cargo.toml | 4 +- mrucache/Cargo.toml | 9 +++ mrucache/src/lib.rs | 1 + mrucache/src/mucache.rs | 79 ++++++++++++++++++++++++ netfetch/Cargo.toml | 3 +- netfetch/src/metrics.rs | 1 + netfetch/src/metrics/postingest.rs | 97 ++++++++++++++++++++++++++++++ scywr/Cargo.toml | 2 +- scywr/src/iteminsertqueue.rs | 23 +++++++ 11 files changed, 217 insertions(+), 6 deletions(-) create mode 100644 mrucache/Cargo.toml create mode 100644 mrucache/src/lib.rs create mode 100644 mrucache/src/mucache.rs create mode 100644 netfetch/src/metrics/postingest.rs diff --git a/.cargo/config.toml b/.cargo/config.toml index 32b7a6e..41a8d07 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,7 +1,7 @@ [build] rustflags = [ #"-C", "target-cpu=native", - "-C", "target-cpu=skylake", + #"-C", "target-cpu=skylake", #"-C", "force-frame-pointers=yes", #"-C", "force-unwind-tables=yes", #"-C", "relocation-model=static", diff --git a/Cargo.toml b/Cargo.toml index 1ab4739..16f7b42 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ resolver = "2" [profile.release] opt-level = 2 -debug = 1 +debug = 0 overflow-checks = false debug-assertions = false lto = "thin" diff --git a/daqingest/Cargo.toml b/daqingest/Cargo.toml index b68e179..8fc6934 100644 --- a/daqingest/Cargo.toml +++ b/daqingest/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "daqingest" -version = "0.2.0-aa.2" +version = "0.2.0-aa.3" authors = ["Dominik Werder "] edition = "2021" @@ -29,5 +29,5 @@ dbpg = { path = "../dbpg" } series = { path = "../series" } netfetch = { path = "../netfetch" } serieswriter = { path = "../serieswriter" } -ingest-bsread = { path = "../ingest-bsread" } +#ingest-bsread = { path = "../ingest-bsread" } ingest-linux = { path = "../ingest-linux" } diff --git a/mrucache/Cargo.toml b/mrucache/Cargo.toml new file mode 100644 index 0000000..4abf57b --- /dev/null +++ b/mrucache/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "mrucache" +version = "0.0.1" +edition = "2021" +authors = ["Dominik Werder "] + +[dependencies] +hashbrown = "0.14" +log = { path = "../log" } diff --git a/mrucache/src/lib.rs b/mrucache/src/lib.rs new file mode 100644 index 0000000..8d3db2f --- /dev/null +++ b/mrucache/src/lib.rs @@ -0,0 +1 @@ +pub mod mucache; diff --git a/mrucache/src/mucache.rs b/mrucache/src/mucache.rs new file mode 100644 index 0000000..02ca28c --- /dev/null +++ b/mrucache/src/mucache.rs @@ -0,0 +1,79 @@ +use hashbrown::HashMap; +use log::*; +use std::hash::Hash; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; +use std::sync::OnceLock; +use std::sync::RwLock; +use std::time::Instant; + +fn tsref() -> Instant { + static C: OnceLock = OnceLock::new(); + let c = C.get_or_init(Instant::now); + c.clone() +} + +fn dts_now() -> u64 { + let tsref = tsref(); + let ts = Instant::now(); + let dt = ts.saturating_duration_since(tsref); + let ts = 1000 * dt.as_secs() + dt.subsec_millis() as u64; + ts +} + +pub struct MuCache { + cap: usize, + map: RwLock>, +} + +impl MuCache { + pub fn new(cap: usize) -> Self { + Self { + cap, + map: RwLock::new(HashMap::with_capacity(cap)), + } + } + + pub fn insert(&self, k: K, v: V) -> Result<(), ()> { + let ts = AtomicU64::new(dts_now()); + let mut map = self.map.write().unwrap(); + let nmax = self.cap * 5 / 4; + if map.len() >= nmax { + Self::remove_unused(&mut map, self.cap); + } + if map.len() >= nmax { + warn!("no space in MuCache"); + Err(()) + } else { + map.insert(k, (ts, v)); + Ok(()) + } + } + + pub fn get(&self, k: &K) -> Option { + let map = self.map.read().unwrap(); + match map.get(k) { + Some((lu, v)) => { + lu.store(dts_now(), Ordering::Release); + Some(v.clone()) + } + None => None, + } + } + + fn remove_unused(map: &mut HashMap, cap: usize) { + let map1 = core::mem::replace(map, HashMap::new()); + let mut items: Vec<_> = map1 + .into_iter() + .map(|x| (x.1 .0.load(Ordering::Acquire), x.1 .1, x.0)) + .collect(); + items.sort_unstable_by_key(|x| x.0); + let ts_cut = items[items.len() - cap].0; + let map2 = items + .into_iter() + .filter(|x| x.0 > ts_cut) + .map(|x| (x.2, (AtomicU64::new(x.0), x.1))) + .collect(); + *map = map2; + } +} diff --git a/netfetch/Cargo.toml b/netfetch/Cargo.toml index ebcda1e..b0ecc28 100644 --- a/netfetch/Cargo.toml +++ b/netfetch/Cargo.toml @@ -48,5 +48,6 @@ items_0 = { path = "../../daqbuffer/crates/items_0" } items_2 = { path = "../../daqbuffer/crates/items_2" } streams = { path = "../../daqbuffer/crates/streams" } taskrun = { path = "../../daqbuffer/crates/taskrun" } -bitshuffle = { path = "../../daqbuffer/crates/bitshuffle" } +#bitshuffle = { path = "../../daqbuffer/crates/bitshuffle" } +mrucache = { path = "../mrucache" } batchtools = { path = "../batchtools" } diff --git a/netfetch/src/metrics.rs b/netfetch/src/metrics.rs index f7cb124..82b4aaa 100644 --- a/netfetch/src/metrics.rs +++ b/netfetch/src/metrics.rs @@ -1,3 +1,4 @@ +pub mod postingest; pub mod status; use crate::ca::conn::ChannelStateInfo; diff --git a/netfetch/src/metrics/postingest.rs b/netfetch/src/metrics/postingest.rs new file mode 100644 index 0000000..47a2aa8 --- /dev/null +++ b/netfetch/src/metrics/postingest.rs @@ -0,0 +1,97 @@ +use async_channel::Receiver; +use async_channel::Sender; +use dbpg::seriesbychannel::ChannelInfoQuery; +use err::thiserror; +use err::ThisError; +use netpod::ScalarType; +use netpod::Shape; +use netpod::TsNano; +use scywr::iteminsertqueue::DataValue; +use scywr::iteminsertqueue::QueryItem; +use scywr::iteminsertqueue::ScalarValue; +use serieswriter::writer::SeriesWriter; +use std::collections::VecDeque; +use std::time::Duration; +use std::time::Instant; +use std::time::SystemTime; + +#[derive(Debug, ThisError)] +pub enum Error { + Msg, + SeriesWriter(#[from] serieswriter::writer::Error), + SendError, +} + +impl From> for Error { + fn from(value: async_channel::SendError) -> Self { + Error::SendError + } +} + +#[derive(Debug)] +pub struct EventValueItem { + ts: TsNano, + channel: String, + val: DataValue, +} + +async fn process_api_query_items( + backend: String, + item_rx: Receiver, + info_worker_tx: Sender, + iiq_tx: Sender, +) -> Result<(), Error> { + let mut item_qu = VecDeque::new(); + let mut sw_tick_last = Instant::now(); + + #[allow(irrefutable_let_patterns)] + while let item = taskrun::tokio::time::timeout(Duration::from_millis(500), item_rx.recv()).await { + let tsnow = Instant::now(); + if tsnow.saturating_duration_since(sw_tick_last) >= Duration::from_millis(5000) { + sw_tick_last = tsnow; + tick_writers(&mut item_qu)?; + } + let item = match item { + Ok(Ok(item)) => item, + Ok(Err(_)) => break, + Err(_) => { + continue; + } + }; + let scalar_type = item.val.scalar_type(); + let shape = item.val.shape(); + + // TODO cache the SeriesWriter. + // Evict only from cache if older than some threshold. + // If full, then reject the insert. + let stnow = SystemTime::now(); + let mut sw = SeriesWriter::establish( + info_worker_tx.clone(), + backend.clone(), + item.channel, + scalar_type, + shape, + stnow, + ) + .await?; + + let sw = &mut sw; + sw.write(item.ts, item.ts, item.val, &mut item_qu)?; + + for e in item_qu.drain(..).into_iter() { + iiq_tx.send(e).await?; + } + } + // let scalar_type = ScalarType::F32; + // let shape = Shape::Scalar; + + // TODO SeriesWriter need to get ticked. + + Ok(()) +} + +fn tick_writers(iiq: &mut VecDeque) -> Result<(), Error> { + let sw: &mut SeriesWriter = err::todoval(); + sw.tick(iiq)?; + Ok(()) +} diff --git a/scywr/Cargo.toml b/scywr/Cargo.toml index 301e13d..748eed1 100644 --- a/scywr/Cargo.toml +++ b/scywr/Cargo.toml @@ -19,4 +19,4 @@ series = { path = "../series" } err = { path = "../../daqbuffer/crates/err" } netpod = { path = "../../daqbuffer/crates/netpod" } taskrun = { path = "../../daqbuffer/crates/taskrun" } -bitshuffle = { path = "../../daqbuffer/crates/bitshuffle" } +#bitshuffle = { path = "../../daqbuffer/crates/bitshuffle" } diff --git a/scywr/src/iteminsertqueue.rs b/scywr/src/iteminsertqueue.rs index e2a270f..bf2d589 100644 --- a/scywr/src/iteminsertqueue.rs +++ b/scywr/src/iteminsertqueue.rs @@ -193,6 +193,29 @@ impl DataValue { } } + pub fn scalar_type(&self) -> ScalarType { + match self { + DataValue::Scalar(x) => match x { + ScalarValue::I8(_) => ScalarType::I8, + ScalarValue::I16(_) => ScalarType::I16, + ScalarValue::I32(_) => ScalarType::I32, + ScalarValue::F32(_) => ScalarType::F32, + ScalarValue::F64(_) => ScalarType::F64, + ScalarValue::Enum(_) => ScalarType::U16, + ScalarValue::String(_) => ScalarType::STRING, + ScalarValue::Bool(_) => ScalarType::BOOL, + }, + DataValue::Array(x) => match x { + ArrayValue::I8(_) => ScalarType::I8, + ArrayValue::I16(_) => ScalarType::I16, + ArrayValue::I32(_) => ScalarType::I32, + ArrayValue::F32(_) => ScalarType::F32, + ArrayValue::F64(_) => ScalarType::F64, + ArrayValue::Bool(_) => ScalarType::BOOL, + }, + } + } + pub fn shape(&self) -> Shape { match self { DataValue::Scalar(_) => Shape::Scalar,