Start add support for http post ingest
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
[build]
|
||||
rustflags = [
|
||||
#"-C", "target-cpu=native",
|
||||
"-C", "target-cpu=skylake",
|
||||
#"-C", "target-cpu=skylake",
|
||||
#"-C", "force-frame-pointers=yes",
|
||||
#"-C", "force-unwind-tables=yes",
|
||||
#"-C", "relocation-model=static",
|
||||
|
||||
@@ -4,7 +4,7 @@ resolver = "2"
|
||||
|
||||
[profile.release]
|
||||
opt-level = 2
|
||||
debug = 1
|
||||
debug = 0
|
||||
overflow-checks = false
|
||||
debug-assertions = false
|
||||
lto = "thin"
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "daqingest"
|
||||
version = "0.2.0-aa.2"
|
||||
version = "0.2.0-aa.3"
|
||||
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
|
||||
edition = "2021"
|
||||
|
||||
@@ -29,5 +29,5 @@ dbpg = { path = "../dbpg" }
|
||||
series = { path = "../series" }
|
||||
netfetch = { path = "../netfetch" }
|
||||
serieswriter = { path = "../serieswriter" }
|
||||
ingest-bsread = { path = "../ingest-bsread" }
|
||||
#ingest-bsread = { path = "../ingest-bsread" }
|
||||
ingest-linux = { path = "../ingest-linux" }
|
||||
|
||||
9
mrucache/Cargo.toml
Normal file
9
mrucache/Cargo.toml
Normal file
@@ -0,0 +1,9 @@
|
||||
[package]
|
||||
name = "mrucache"
|
||||
version = "0.0.1"
|
||||
edition = "2021"
|
||||
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
|
||||
|
||||
[dependencies]
|
||||
hashbrown = "0.14"
|
||||
log = { path = "../log" }
|
||||
1
mrucache/src/lib.rs
Normal file
1
mrucache/src/lib.rs
Normal file
@@ -0,0 +1 @@
|
||||
pub mod mucache;
|
||||
79
mrucache/src/mucache.rs
Normal file
79
mrucache/src/mucache.rs
Normal file
@@ -0,0 +1,79 @@
|
||||
use hashbrown::HashMap;
|
||||
use log::*;
|
||||
use std::hash::Hash;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::OnceLock;
|
||||
use std::sync::RwLock;
|
||||
use std::time::Instant;
|
||||
|
||||
fn tsref() -> Instant {
|
||||
static C: OnceLock<Instant> = OnceLock::new();
|
||||
let c = C.get_or_init(Instant::now);
|
||||
c.clone()
|
||||
}
|
||||
|
||||
fn dts_now() -> u64 {
|
||||
let tsref = tsref();
|
||||
let ts = Instant::now();
|
||||
let dt = ts.saturating_duration_since(tsref);
|
||||
let ts = 1000 * dt.as_secs() + dt.subsec_millis() as u64;
|
||||
ts
|
||||
}
|
||||
|
||||
pub struct MuCache<K, V> {
|
||||
cap: usize,
|
||||
map: RwLock<HashMap<K, (AtomicU64, V)>>,
|
||||
}
|
||||
|
||||
impl<K: Clone + Eq + Hash, V: Clone> MuCache<K, V> {
|
||||
pub fn new(cap: usize) -> Self {
|
||||
Self {
|
||||
cap,
|
||||
map: RwLock::new(HashMap::with_capacity(cap)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn insert(&self, k: K, v: V) -> Result<(), ()> {
|
||||
let ts = AtomicU64::new(dts_now());
|
||||
let mut map = self.map.write().unwrap();
|
||||
let nmax = self.cap * 5 / 4;
|
||||
if map.len() >= nmax {
|
||||
Self::remove_unused(&mut map, self.cap);
|
||||
}
|
||||
if map.len() >= nmax {
|
||||
warn!("no space in MuCache");
|
||||
Err(())
|
||||
} else {
|
||||
map.insert(k, (ts, v));
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get(&self, k: &K) -> Option<V> {
|
||||
let map = self.map.read().unwrap();
|
||||
match map.get(k) {
|
||||
Some((lu, v)) => {
|
||||
lu.store(dts_now(), Ordering::Release);
|
||||
Some(v.clone())
|
||||
}
|
||||
None => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn remove_unused(map: &mut HashMap<K, (AtomicU64, V)>, cap: usize) {
|
||||
let map1 = core::mem::replace(map, HashMap::new());
|
||||
let mut items: Vec<_> = map1
|
||||
.into_iter()
|
||||
.map(|x| (x.1 .0.load(Ordering::Acquire), x.1 .1, x.0))
|
||||
.collect();
|
||||
items.sort_unstable_by_key(|x| x.0);
|
||||
let ts_cut = items[items.len() - cap].0;
|
||||
let map2 = items
|
||||
.into_iter()
|
||||
.filter(|x| x.0 > ts_cut)
|
||||
.map(|x| (x.2, (AtomicU64::new(x.0), x.1)))
|
||||
.collect();
|
||||
*map = map2;
|
||||
}
|
||||
}
|
||||
@@ -48,5 +48,6 @@ items_0 = { path = "../../daqbuffer/crates/items_0" }
|
||||
items_2 = { path = "../../daqbuffer/crates/items_2" }
|
||||
streams = { path = "../../daqbuffer/crates/streams" }
|
||||
taskrun = { path = "../../daqbuffer/crates/taskrun" }
|
||||
bitshuffle = { path = "../../daqbuffer/crates/bitshuffle" }
|
||||
#bitshuffle = { path = "../../daqbuffer/crates/bitshuffle" }
|
||||
mrucache = { path = "../mrucache" }
|
||||
batchtools = { path = "../batchtools" }
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
pub mod postingest;
|
||||
pub mod status;
|
||||
|
||||
use crate::ca::conn::ChannelStateInfo;
|
||||
|
||||
97
netfetch/src/metrics/postingest.rs
Normal file
97
netfetch/src/metrics/postingest.rs
Normal file
@@ -0,0 +1,97 @@
|
||||
use async_channel::Receiver;
|
||||
use async_channel::Sender;
|
||||
use dbpg::seriesbychannel::ChannelInfoQuery;
|
||||
use err::thiserror;
|
||||
use err::ThisError;
|
||||
use netpod::ScalarType;
|
||||
use netpod::Shape;
|
||||
use netpod::TsNano;
|
||||
use scywr::iteminsertqueue::DataValue;
|
||||
use scywr::iteminsertqueue::QueryItem;
|
||||
use scywr::iteminsertqueue::ScalarValue;
|
||||
use serieswriter::writer::SeriesWriter;
|
||||
use std::collections::VecDeque;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use std::time::SystemTime;
|
||||
|
||||
#[derive(Debug, ThisError)]
|
||||
pub enum Error {
|
||||
Msg,
|
||||
SeriesWriter(#[from] serieswriter::writer::Error),
|
||||
SendError,
|
||||
}
|
||||
|
||||
impl From<async_channel::SendError<QueryItem>> for Error {
|
||||
fn from(value: async_channel::SendError<QueryItem>) -> Self {
|
||||
Error::SendError
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct EventValueItem {
|
||||
ts: TsNano,
|
||||
channel: String,
|
||||
val: DataValue,
|
||||
}
|
||||
|
||||
async fn process_api_query_items(
|
||||
backend: String,
|
||||
item_rx: Receiver<EventValueItem>,
|
||||
info_worker_tx: Sender<ChannelInfoQuery>,
|
||||
iiq_tx: Sender<QueryItem>,
|
||||
) -> Result<(), Error> {
|
||||
let mut item_qu = VecDeque::new();
|
||||
let mut sw_tick_last = Instant::now();
|
||||
|
||||
#[allow(irrefutable_let_patterns)]
|
||||
while let item = taskrun::tokio::time::timeout(Duration::from_millis(500), item_rx.recv()).await {
|
||||
let tsnow = Instant::now();
|
||||
if tsnow.saturating_duration_since(sw_tick_last) >= Duration::from_millis(5000) {
|
||||
sw_tick_last = tsnow;
|
||||
tick_writers(&mut item_qu)?;
|
||||
}
|
||||
let item = match item {
|
||||
Ok(Ok(item)) => item,
|
||||
Ok(Err(_)) => break,
|
||||
Err(_) => {
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let scalar_type = item.val.scalar_type();
|
||||
let shape = item.val.shape();
|
||||
|
||||
// TODO cache the SeriesWriter.
|
||||
// Evict only from cache if older than some threshold.
|
||||
// If full, then reject the insert.
|
||||
let stnow = SystemTime::now();
|
||||
let mut sw = SeriesWriter::establish(
|
||||
info_worker_tx.clone(),
|
||||
backend.clone(),
|
||||
item.channel,
|
||||
scalar_type,
|
||||
shape,
|
||||
stnow,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let sw = &mut sw;
|
||||
sw.write(item.ts, item.ts, item.val, &mut item_qu)?;
|
||||
|
||||
for e in item_qu.drain(..).into_iter() {
|
||||
iiq_tx.send(e).await?;
|
||||
}
|
||||
}
|
||||
// let scalar_type = ScalarType::F32;
|
||||
// let shape = Shape::Scalar;
|
||||
|
||||
// TODO SeriesWriter need to get ticked.
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn tick_writers(iiq: &mut VecDeque<QueryItem>) -> Result<(), Error> {
|
||||
let sw: &mut SeriesWriter = err::todoval();
|
||||
sw.tick(iiq)?;
|
||||
Ok(())
|
||||
}
|
||||
@@ -19,4 +19,4 @@ series = { path = "../series" }
|
||||
err = { path = "../../daqbuffer/crates/err" }
|
||||
netpod = { path = "../../daqbuffer/crates/netpod" }
|
||||
taskrun = { path = "../../daqbuffer/crates/taskrun" }
|
||||
bitshuffle = { path = "../../daqbuffer/crates/bitshuffle" }
|
||||
#bitshuffle = { path = "../../daqbuffer/crates/bitshuffle" }
|
||||
|
||||
@@ -193,6 +193,29 @@ impl DataValue {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn scalar_type(&self) -> ScalarType {
|
||||
match self {
|
||||
DataValue::Scalar(x) => match x {
|
||||
ScalarValue::I8(_) => ScalarType::I8,
|
||||
ScalarValue::I16(_) => ScalarType::I16,
|
||||
ScalarValue::I32(_) => ScalarType::I32,
|
||||
ScalarValue::F32(_) => ScalarType::F32,
|
||||
ScalarValue::F64(_) => ScalarType::F64,
|
||||
ScalarValue::Enum(_) => ScalarType::U16,
|
||||
ScalarValue::String(_) => ScalarType::STRING,
|
||||
ScalarValue::Bool(_) => ScalarType::BOOL,
|
||||
},
|
||||
DataValue::Array(x) => match x {
|
||||
ArrayValue::I8(_) => ScalarType::I8,
|
||||
ArrayValue::I16(_) => ScalarType::I16,
|
||||
ArrayValue::I32(_) => ScalarType::I32,
|
||||
ArrayValue::F32(_) => ScalarType::F32,
|
||||
ArrayValue::F64(_) => ScalarType::F64,
|
||||
ArrayValue::Bool(_) => ScalarType::BOOL,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn shape(&self) -> Shape {
|
||||
match self {
|
||||
DataValue::Scalar(_) => Shape::Scalar,
|
||||
|
||||
Reference in New Issue
Block a user