From a9f9d1ada668ed67bd14769d7096b32381ab7001 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 7 Feb 2022 21:35:25 +0100 Subject: [PATCH] Forward non-200 status in proxy. Start with event stats reader --- Cargo.toml | 16 +- archapp/src/archeng/indexfiles.rs | 19 +- archapp/src/archeng/pipe.rs | 2 + archapp/src/events.rs | 19 +- archapp/src/test.rs | 1 + daqbuffer/Cargo.toml | 2 +- daqbuffer/src/bin/daqbuffer.rs | 2 +- dbconn/src/lib.rs | 6 +- disk/src/agg/enp.rs | 38 ++- disk/src/aggtest.rs | 1 + disk/src/binned/binnedfrompbv.rs | 2 +- disk/src/binned/prebinned.rs | 8 + disk/src/channelexec.rs | 22 +- disk/src/decode.rs | 5 +- disk/src/gen.rs | 3 +- disk/src/raw/conn.rs | 5 + dq/Cargo.toml | 2 +- err/Cargo.toml | 5 - httpret/src/api1.rs | 18 +- httpret/src/evinfo.rs | 171 +++++++++++++ httpret/src/gather.rs | 39 ++- httpret/src/httpret.rs | 15 +- httpret/src/proxy.rs | 175 +++++++++---- httpret/src/proxy/api4.rs | 9 +- httpret/src/pulsemap.rs | 5 +- items/src/binnedevents.rs | 3 + items/src/lib.rs | 2 + items/src/plainevents.rs | 1 + items/src/scalarevents.rs | 3 +- items/src/statsevents.rs | 404 ++++++++++++++++++++++++++++++ netpod/src/netpod.rs | 10 + netpod/src/query.rs | 5 + parse/Cargo.toml | 2 +- taskrun/Cargo.toml | 3 +- taskrun/src/append.rs | 12 +- 35 files changed, 913 insertions(+), 122 deletions(-) create mode 100644 httpret/src/evinfo.rs create mode 100644 items/src/statsevents.rs diff --git a/Cargo.toml b/Cargo.toml index 4379c8c..2701596 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,18 +2,18 @@ members = ["daqbuffer", "h5out", "archapp", "archapp_xc", "archapp_wrap", "items", "items_proc", "nodenet", "httpclient", "fsio", "dq"] [profile.release] -opt-level = 2 -debug = 0 +opt-level = 1 +debug = 1 overflow-checks = false debug-assertions = false -lto = "thin" -codegen-units = 4 -incremental = false +lto = "off" +codegen-units = 64 +incremental = true -[profile.rel2] +[profile.release2] inherits = "release" opt-level = 1 -debug = 0 +debug = 2 overflow-checks = false debug-assertions = false lto = "off" @@ -21,4 +21,4 @@ codegen-units = 32 incremental = true [patch.crates-io] -tokio = { git = "https://github.com/dominikwerder/tokio", rev = "f069c87f" } +tokio = { git = "https://github.com/dominikwerder/tokio", rev = "995221d8" } diff --git a/archapp/src/archeng/indexfiles.rs b/archapp/src/archeng/indexfiles.rs index 7d28c7f..ae8587b 100644 --- a/archapp/src/archeng/indexfiles.rs +++ b/archapp/src/archeng/indexfiles.rs @@ -104,8 +104,18 @@ pub async fn get_level_1(lev0: Vec) -> Result, Error> { pub async fn database_connect(db_config: &Database) -> Result { let d = db_config; - let uri = format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, 5432, d.name); - let (cl, conn) = tokio_postgres::connect(&uri, tokio_postgres::NoTls).await.errstr()?; + let dbport = 5432; + let uri = format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, dbport, d.name); + let (cl, conn) = tokio_postgres::connect(&uri, tokio_postgres::NoTls) + .await + .map_err(|e| { + error!( + "Can not connect to database postgresql://{}:...@{}:{}/{}", + d.user, d.host, dbport, d.name + ); + e + }) + .errstr()?; // TODO monitor connection drop. let _cjh = tokio::spawn(async move { if let Err(e) = conn.await { @@ -542,7 +552,10 @@ fn categorize_index_files(list: &Vec) -> Result, Error> { } pub async fn index_file_path_list(channel: Channel, dbconf: Database) -> Result, Error> { - let dbc = database_connect(&dbconf).await?; + let dbc = database_connect(&dbconf).await.map_err(|e| { + error!("CAN NOT CONNECT TO DATABASE [{e:?}]"); + e + })?; let sql = "select i.path from indexfiles i, channels c, channel_index_map m where c.name = $1 and m.channel = c.rowid and i.rowid = m.index"; let rows = dbc.query(sql, &[&channel.name()]).await.errstr()?; let mut index_paths = vec![]; diff --git a/archapp/src/archeng/pipe.rs b/archapp/src/archeng/pipe.rs index 26ddf7d..8ff8341 100644 --- a/archapp/src/archeng/pipe.rs +++ b/archapp/src/archeng/pipe.rs @@ -86,6 +86,7 @@ pub async fn make_event_pipe( } AggKind::DimXBinsN(_) => err::todoval(), AggKind::EventBlobs => err::todoval(), + AggKind::Stats1 => err::todoval(), }, Shape::Wave(_n1) => match evq.agg_kind { AggKind::Plain => Box::pin(filtered) as Pin + Send>>, @@ -286,6 +287,7 @@ pub async fn make_event_pipe( Box::pin(tr) as _ } AggKind::EventBlobs => err::todoval(), + AggKind::Stats1 => err::todoval(), }, _ => { error!("TODO shape {:?}", channel_config.shape); diff --git a/archapp/src/events.rs b/archapp/src/events.rs index 9c552e8..1636dbf 100644 --- a/archapp/src/events.rs +++ b/archapp/src/events.rs @@ -182,10 +182,23 @@ macro_rules! arm2 { macro_rules! arm1 { ($item:expr, $sty1:ident, $sty2:ident, $shape:expr, $ak:expr) => {{ + if let AggKind::Stats1 = $ak { + err::todo(); + return arm2!( + $item, + ScalarEvents, + Plain, + PlainEvents, + Scalar, + ScalarPlainEvents, + $sty1, + $sty2 + ); + } match $shape { Shape::Scalar => match $ak { AggKind::EventBlobs => { - warn!("arm1 unhandled EventBlobs"); + warn!("arm1 unhandled AggKind::EventBlobs"); panic!() } AggKind::Plain => arm2!( @@ -228,6 +241,8 @@ macro_rules! arm1 { $sty1, $sty2 ), + // Handled above.. + AggKind::Stats1 => panic!(), }, Shape::Wave(_) => match $ak { AggKind::EventBlobs => { @@ -274,6 +289,8 @@ macro_rules! arm1 { $sty1, $sty2 ), + // Handled above.. + AggKind::Stats1 => panic!(), }, Shape::Image(..) => { // There should be no images on archiver. diff --git a/archapp/src/test.rs b/archapp/src/test.rs index da2a5e3..c5b3b34 100644 --- a/archapp/src/test.rs +++ b/archapp/src/test.rs @@ -15,6 +15,7 @@ fn read_pb_00() -> Result<(), Error> { let block1 = async move { let homedir = std::env::var("HOME").unwrap(); let path = PathBuf::from(homedir) + .join("daqbuffer-testdata") .join("archappdata") .join("lts") .join("ArchiverStore") diff --git a/daqbuffer/Cargo.toml b/daqbuffer/Cargo.toml index a5d8856..abf28fc 100644 --- a/daqbuffer/Cargo.toml +++ b/daqbuffer/Cargo.toml @@ -5,7 +5,7 @@ authors = ["Dominik Werder "] edition = "2021" [dependencies] -tokio = { version = "=1.14.77", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +tokio = { version = "=1.16.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } hyper = "0.14" http = "0.2" tracing = "0.1.25" diff --git a/daqbuffer/src/bin/daqbuffer.rs b/daqbuffer/src/bin/daqbuffer.rs index 7b68a97..3fd0be7 100644 --- a/daqbuffer/src/bin/daqbuffer.rs +++ b/daqbuffer/src/bin/daqbuffer.rs @@ -99,7 +99,7 @@ async fn go() -> Result<(), Error> { } SubCmd::Logappend(k) => { let jh = tokio::task::spawn_blocking(move || { - taskrun::append::append(&k.dir, std::io::stdin(), std::io::stderr()).unwrap(); + taskrun::append::append(&k.dir, std::io::stdin()).unwrap(); }); jh.await.map_err(Error::from_string)?; } diff --git a/dbconn/src/lib.rs b/dbconn/src/lib.rs index 5b4d7fa..639f84e 100644 --- a/dbconn/src/lib.rs +++ b/dbconn/src/lib.rs @@ -48,7 +48,11 @@ pub async fn delay_io_medium() { pub async fn create_connection(db_config: &Database) -> Result { let d = db_config; let uri = format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, 5432, d.name); - let (cl, conn) = tokio_postgres::connect(&uri, NoTls).await.errconv()?; + let (cl, conn) = tokio_postgres::connect(&uri, NoTls) + .await + .map_err(|e| format!("Can not connect to database: {e:?}")) + //.errconv() + ?; // TODO monitor connection drop. let _cjh = tokio::spawn(async move { if let Err(e) = conn.await { diff --git a/disk/src/agg/enp.rs b/disk/src/agg/enp.rs index df63906..cfc77d6 100644 --- a/disk/src/agg/enp.rs +++ b/disk/src/agg/enp.rs @@ -1,6 +1,7 @@ -use items::numops::NumOps; use items::scalarevents::ScalarEvents; +use items::waveevents::WaveEvents; use items::EventsNodeProcessor; +use items::{numops::NumOps, statsevents::StatsEvents}; use netpod::{AggKind, Shape}; use std::marker::PhantomData; @@ -23,3 +24,38 @@ where inp } } + +pub struct Stats1Scalar {} + +impl EventsNodeProcessor for Stats1Scalar { + type Input = StatsEvents; + type Output = StatsEvents; + + fn create(_shape: Shape, _agg_kind: AggKind) -> Self { + Self {} + } + + fn process(&self, inp: Self::Input) -> Self::Output { + inp + } +} + +pub struct Stats1Wave { + _m1: PhantomData, +} + +impl EventsNodeProcessor for Stats1Wave +where + NTY: NumOps, +{ + type Input = WaveEvents; + type Output = StatsEvents; + + fn create(_shape: Shape, _agg_kind: AggKind) -> Self { + Self { _m1: PhantomData } + } + + fn process(&self, _inp: Self::Input) -> Self::Output { + err::todoval() + } +} diff --git a/disk/src/aggtest.rs b/disk/src/aggtest.rs index 8a83ab0..30f395d 100644 --- a/disk/src/aggtest.rs +++ b/disk/src/aggtest.rs @@ -11,6 +11,7 @@ pub fn make_test_node(id: u32) -> Node { listen: "0.0.0.0".into(), port: 8800 + id as u16, port_raw: 8800 + id as u16 + 100, + // TODO use a common function to supply the tmp path. data_base_path: format!("../tmpdata/node{:02}", id).into(), cache_base_path: format!("../tmpdata/node{:02}", id).into(), ksprefix: "ks".into(), diff --git a/disk/src/binned/binnedfrompbv.rs b/disk/src/binned/binnedfrompbv.rs index 7f0a9d9..1db22f6 100644 --- a/disk/src/binned/binnedfrompbv.rs +++ b/disk/src/binned/binnedfrompbv.rs @@ -107,7 +107,7 @@ where continue 'outer; } else { let msg = - format!("PreBinnedValueFetchedStream got non-OK result from sub request: {res:?}"); + format!("PreBinnedValueFetchedStream non-OK result from sub request: {res:?}"); error!("{msg}"); let e = Error::with_msg_no_trace(msg); self.errored = true; diff --git a/disk/src/binned/prebinned.rs b/disk/src/binned/prebinned.rs index fbb52a5..8ae3bad 100644 --- a/disk/src/binned/prebinned.rs +++ b/disk/src/binned/prebinned.rs @@ -81,6 +81,10 @@ where AggKind::Plain => { panic!(); } + AggKind::Stats1 => { + // Currently not meant to be binned. + panic!(); + } } } Shape::Wave(n) => { @@ -112,6 +116,10 @@ where AggKind::Plain => { panic!(); } + AggKind::Stats1 => { + // Currently not meant to be binned. + panic!(); + } } } Shape::Image(..) => { diff --git a/disk/src/channelexec.rs b/disk/src/channelexec.rs index 85dcdd9..1ee041f 100644 --- a/disk/src/channelexec.rs +++ b/disk/src/channelexec.rs @@ -9,8 +9,8 @@ use err::Error; use futures_core::Stream; use futures_util::future::FutureExt; use futures_util::StreamExt; -use items::scalarevents::ScalarEvents; use items::numops::{BoolNum, NumOps}; +use items::scalarevents::ScalarEvents; use items::streams::{Collectable, Collector}; use items::{ Clearable, EventsNodeProcessor, Framable, FrameType, PushableIndex, RangeCompletableItem, Sitemty, StreamItem, @@ -99,55 +99,55 @@ where { match shape { Shape::Scalar => { - // + let evs = EventValuesDim0Case::new(); match agg_kind { AggKind::EventBlobs => panic!(), AggKind::Plain => { - let evs = EventValuesDim0Case::new(); let events_node_proc = < as EventValueShape>::NumXAggPlain as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); channel_exec_nty_end_evs_enp(f, byte_order, scalar_type, shape, evs, events_node_proc) } AggKind::TimeWeightedScalar => { - let evs = EventValuesDim0Case::new(); let events_node_proc = < as EventValueShape>::NumXAggToSingleBin as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); channel_exec_nty_end_evs_enp(f, byte_order, scalar_type, shape, evs, events_node_proc) } AggKind::DimXBins1 => { - let evs = EventValuesDim0Case::new(); let events_node_proc = < as EventValueShape>::NumXAggToSingleBin as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); channel_exec_nty_end_evs_enp(f, byte_order, scalar_type, shape, evs, events_node_proc) } AggKind::DimXBinsN(_) => { - let evs = EventValuesDim0Case::new(); let events_node_proc = < as EventValueShape>::NumXAggToNBins as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); channel_exec_nty_end_evs_enp(f, byte_order, scalar_type, shape, evs, events_node_proc) } + AggKind::Stats1 => { + let events_node_proc = < as EventValueShape>::NumXAggToStats1 as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); + channel_exec_nty_end_evs_enp(f, byte_order, scalar_type, shape, evs, events_node_proc) + } } } Shape::Wave(n) => { - // + let evs = EventValuesDim1Case::new(n); match agg_kind { AggKind::EventBlobs => panic!(), AggKind::Plain => { - let evs = EventValuesDim1Case::new(n); let events_node_proc = < as EventValueShape>::NumXAggPlain as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); channel_exec_nty_end_evs_enp(f, byte_order, scalar_type, shape, evs, events_node_proc) } AggKind::TimeWeightedScalar => { - let evs = EventValuesDim1Case::new(n); let events_node_proc = < as EventValueShape>::NumXAggToSingleBin as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); channel_exec_nty_end_evs_enp(f, byte_order, scalar_type, shape, evs, events_node_proc) } AggKind::DimXBins1 => { - let evs = EventValuesDim1Case::new(n); let events_node_proc = < as EventValueShape>::NumXAggToSingleBin as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); channel_exec_nty_end_evs_enp(f, byte_order, scalar_type, shape, evs, events_node_proc) } AggKind::DimXBinsN(_) => { - let evs = EventValuesDim1Case::new(n); let events_node_proc = < as EventValueShape>::NumXAggToNBins as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); channel_exec_nty_end_evs_enp(f, byte_order, scalar_type, shape, evs, events_node_proc) } + AggKind::Stats1 => { + let events_node_proc = < as EventValueShape>::NumXAggToStats1 as EventsNodeProcessor>::create(shape.clone(), agg_kind.clone()); + channel_exec_nty_end_evs_enp(f, byte_order, scalar_type, shape, evs, events_node_proc) + } } } Shape::Image(..) => { diff --git a/disk/src/decode.rs b/disk/src/decode.rs index 3528e5c..a699976 100644 --- a/disk/src/decode.rs +++ b/disk/src/decode.rs @@ -5,9 +5,9 @@ use err::Error; use futures_core::Stream; use futures_util::StreamExt; use items::eventsitem::EventsItem; -use items::scalarevents::ScalarEvents; use items::numops::{BoolNum, NumOps}; use items::plainevents::{PlainEvents, ScalarPlainEvents}; +use items::scalarevents::ScalarEvents; use items::waveevents::{WaveEvents, WaveNBinner, WavePlainProc, WaveXBinner}; use items::{Appendable, EventAppendable, EventsNodeProcessor, RangeCompletableItem, Sitemty, StreamItem}; use netpod::{ScalarType, Shape}; @@ -140,6 +140,7 @@ where type NumXAggToSingleBin: EventsNodeProcessor>::Batch>; type NumXAggToNBins: EventsNodeProcessor>::Batch>; type NumXAggPlain: EventsNodeProcessor>::Batch>; + type NumXAggToStats1: EventsNodeProcessor>::Batch>; } pub struct EventValuesDim0Case { @@ -160,6 +161,7 @@ where // TODO is this sufficient? type NumXAggToNBins = Identity; type NumXAggPlain = Identity; + type NumXAggToStats1 = Identity; } pub struct EventValuesDim1Case { @@ -180,6 +182,7 @@ where type NumXAggToSingleBin = WaveXBinner; type NumXAggToNBins = WaveNBinner; type NumXAggPlain = WavePlainProc; + type NumXAggToStats1 = crate::agg::enp::Stats1Wave; } pub struct EventsDecodedStream diff --git a/disk/src/gen.rs b/disk/src/gen.rs index 0b1f44c..4de8729 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -18,7 +18,8 @@ pub fn gen_test_data_test() { } pub async fn gen_test_data() -> Result<(), Error> { - let data_base_path = PathBuf::from("tmpdata"); + let homedir = std::env::var("HOME").unwrap(); + let data_base_path = PathBuf::from(homedir).join("daqbuffer-testdata").join("databuffer"); let ksprefix = String::from("ks"); let mut ensemble = Ensemble { nodes: vec![], diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index c375bc0..64b4d69 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -68,6 +68,11 @@ macro_rules! pipe4 { <$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggPlain::create($shape, $agg_kind), $event_blobs, ), + AggKind::Stats1 => make_num_pipeline_stream_evs::<$nty, $end, $evs<$nty>, _>( + $evsv, + <$evs<$nty> as EventValueShape<$nty, $end>>::NumXAggToStats1::create($shape, $agg_kind), + $event_blobs, + ), } }; } diff --git a/dq/Cargo.toml b/dq/Cargo.toml index 8546793..e1a1951 100644 --- a/dq/Cargo.toml +++ b/dq/Cargo.toml @@ -10,7 +10,7 @@ path = "src/dq.rs" [dependencies] #serde = { version = "1.0", features = ["derive"] } #serde_json = "1.0" -tokio = { version = "1.14.77", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +tokio = { version = "=1.16.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } futures-util = "0.3.14" clap = { version = "3.0.6", features = ["derive", "cargo"] } chrono = "0.4.19" diff --git a/err/Cargo.toml b/err/Cargo.toml index 82e97ef..023c083 100644 --- a/err/Cargo.toml +++ b/err/Cargo.toml @@ -5,8 +5,6 @@ authors = ["Dominik Werder "] edition = "2021" [dependencies] -#hyper = { version = "0.14", features = ["http1", "http2", "client", "server", "tcp"] } -#http = "0.2" backtrace = "0.3.56" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" @@ -16,6 +14,3 @@ chrono = { version = "0.4.19", features = ["serde"] } url = "2.2" regex = "1.5.4" bincode = "1.3.3" -#tokio = { version = "1.14.77", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } -#tokio-postgres = { version = "0.7", features = ["runtime", "with-chrono-0_4", "with-serde_json-1"] } -#nom = "6.1.2" diff --git a/httpret/src/api1.rs b/httpret/src/api1.rs index 8d27df8..acc46f2 100644 --- a/httpret/src/api1.rs +++ b/httpret/src/api1.rs @@ -122,14 +122,19 @@ pub async fn channel_search_list_v1(req: Request, proxy_config: &ProxyConf a })?; let tags: Vec<_> = urls.iter().map(|k| k.to_string()).collect(); - let nt = |res| { + let nt = |tag, res| { let fut = async { let body = hyper::body::to_bytes(res).await?; let res: ChannelSearchResult = match serde_json::from_slice(&body) { Ok(k) => k, Err(_) => ChannelSearchResult { channels: vec![] }, }; - Ok(res) + let ret = SubRes { + tag, + status: StatusCode::OK, + val: res, + }; + Ok(ret) }; Box::pin(fut) as Pin + Send>> }; @@ -216,14 +221,19 @@ pub async fn channel_search_configs_v1( a })?; let tags: Vec<_> = urls.iter().map(|k| k.to_string()).collect(); - let nt = |res| { + let nt = |tag, res| { let fut = async { let body = hyper::body::to_bytes(res).await?; let res: ChannelSearchResult = match serde_json::from_slice(&body) { Ok(k) => k, Err(_) => ChannelSearchResult { channels: vec![] }, }; - Ok(res) + let ret = SubRes { + tag, + status: StatusCode::OK, + val: res, + }; + Ok(ret) }; Box::pin(fut) as Pin + Send>> }; diff --git a/httpret/src/evinfo.rs b/httpret/src/evinfo.rs new file mode 100644 index 0000000..524c40a --- /dev/null +++ b/httpret/src/evinfo.rs @@ -0,0 +1,171 @@ +use crate::err::Error; +use crate::response; +use bytes::Bytes; +use disk::channelexec::channel_exec; +use disk::channelexec::collect_plain_events_json; +use disk::channelexec::ChannelExecFunction; +use disk::decode::Endianness; +use disk::decode::EventValueFromBytes; +use disk::decode::EventValueShape; +use disk::decode::NumFromBytes; +use disk::events::PlainEventsJsonQuery; +use disk::merge::mergedfromremotes::MergedFromRemotes; +use futures_util::FutureExt; +use futures_util::Stream; +use futures_util::TryStreamExt; +use http::{Method, StatusCode}; +use hyper::{Body, Request, Response}; +use items::numops::NumOps; +use items::streams::Collectable; +use items::Clearable; +use items::EventsNodeProcessor; +use items::Framable; +use items::FrameType; +use items::PushableIndex; +use items::Sitemty; +use items::TimeBinnableType; +use netpod::log::*; +use netpod::query::RawEventsQuery; +use netpod::AggKind; +use netpod::Channel; +use netpod::NanoRange; +use netpod::NodeConfigCached; +use netpod::PerfOpts; +use netpod::ScalarType; +use netpod::Shape; +use serde::de::DeserializeOwned; +use std::fmt::Debug; +use std::pin::Pin; +use std::time::Duration; +use url::Url; + +pub struct EventInfoScan {} + +impl EventInfoScan { + pub fn handler(req: &Request) -> Option { + if req.uri().path().starts_with("/api/4/event/info") { + Some(Self {}) + } else { + None + } + } + + pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + info!("EventInfoScan::handle"); + if req.method() != Method::GET { + return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); + } + let (head, _body) = req.into_parts(); + let url = Url::parse(&format!("dummy:{}", head.uri))?; + let query = PlainEventsJsonQuery::from_url(&url)?; + let ret = match Self::exec(&query, node_config).await { + Ok(stream) => { + // + let stream = stream.map_ok(|_| Bytes::new()); + response(StatusCode::OK).body(Body::wrap_stream(stream))? + } + Err(e) => response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::from(format!("{:?}", e)))?, + }; + Ok(ret) + } + + pub async fn exec( + query: &PlainEventsJsonQuery, + node_config: &NodeConfigCached, + ) -> Result> + Send>>, Error> { + let ret = channel_exec( + EvInfoFunc::new(query.clone(), query.timeout(), node_config.clone()), + query.channel(), + query.range(), + AggKind::Stats1, + node_config, + ) + .await?; + Ok(Box::pin(ret.map_err(Error::from))) + } +} + +pub struct EvInfoFunc { + query: PlainEventsJsonQuery, + timeout: Duration, + node_config: NodeConfigCached, +} + +impl EvInfoFunc { + pub fn new(query: PlainEventsJsonQuery, timeout: Duration, node_config: NodeConfigCached) -> Self { + Self { + query, + timeout, + node_config, + } + } + + pub fn channel(&self) -> &Channel { + &self.query.channel() + } + + pub fn range(&self) -> &NanoRange { + &self.query.range() + } +} + +impl ChannelExecFunction for EvInfoFunc { + type Output = Pin> + Send>>; + + fn exec( + self, + byte_order: END, + _scalar_type: ScalarType, + _shape: Shape, + event_value_shape: EVS, + _events_node_proc: ENP, + ) -> Result + where + NTY: NumOps + NumFromBytes + 'static, + END: Endianness + 'static, + EVS: EventValueShape + EventValueFromBytes + 'static, + ENP: EventsNodeProcessor>::Batch> + 'static, + // TODO require these things in general? + ::Output: Debug + Collectable + PushableIndex + Clearable, + <::Output as TimeBinnableType>::Output: Debug + + TimeBinnableType::Output as TimeBinnableType>::Output> + + Collectable + + Unpin, + Sitemty<::Output>: FrameType + Framable + 'static, + Sitemty<<::Output as TimeBinnableType>::Output>: + FrameType + Framable + DeserializeOwned, + { + let _ = byte_order; + let _ = event_value_shape; + let perf_opts = PerfOpts { inmem_bufcap: 4096 }; + let evq = RawEventsQuery { + channel: self.query.channel().clone(), + range: self.query.range().clone(), + agg_kind: AggKind::Plain, + disk_io_buffer_size: self.query.disk_io_buffer_size(), + do_decompress: true, + }; + + // TODO Use a Merged-From-Multiple-Local-Splits. + // TODO Pass the read buffer size from query parameter: GPFS needs a larger buffer.. + // TODO Must issue multiple reads to GPFS, keep futures in a ordered queue. + + let s = MergedFromRemotes::::new(evq, perf_opts, self.node_config.node_config.cluster); + let f = collect_plain_events_json(s, self.timeout, 0, self.query.do_log()); + let f = FutureExt::map(f, |item| match item { + Ok(item) => { + // TODO add channel entry info here? + //let obj = item.as_object_mut().unwrap(); + //obj.insert("channelName", JsonValue::String(en)); + Ok(Bytes::from(serde_json::to_vec(&item)?)) + } + Err(e) => Err(e.into()), + }); + let s = futures_util::stream::once(f); + Ok(Box::pin(s)) + } + + fn empty() -> Self::Output { + Box::pin(futures_util::stream::empty()) + } +} diff --git a/httpret/src/gather.rs b/httpret/src/gather.rs index 09e927f..f45a484 100644 --- a/httpret/src/gather.rs +++ b/httpret/src/gather.rs @@ -4,6 +4,7 @@ use futures_util::{select, FutureExt}; use http::{Method, StatusCode}; use hyper::{Body, Client, Request, Response}; use hyper_tls::HttpsConnector; +use netpod::log::*; use netpod::{Node, NodeConfigCached, APP_JSON}; use serde::{Deserialize, Serialize}; use serde_json::Value as JsonValue; @@ -164,6 +165,7 @@ pub async fn gather_get_json(req: Request, node_config: &NodeConfigCached) pub struct SubRes { pub tag: String, + pub status: StatusCode, pub val: T, } @@ -178,7 +180,11 @@ pub async fn gather_get_json_generic( ) -> Result, Error> where SM: Send + 'static, - NT: Fn(Response) -> Pin> + Send>> + Send + Sync + Copy + 'static, + NT: Fn(String, Response) -> Pin, Error>> + Send>> + + Send + + Sync + + Copy + + 'static, FT: Fn(Vec>) -> Result, Error>, { assert!(urls.len() == bodies.len()); @@ -223,10 +229,7 @@ where client.request(req?).fuse() } } => { - let ret = SubRes { - tag: tag, - val:nt(res?).await?, - }; + let ret = nt(tag, res?).await?; Ok(ret) } } @@ -234,18 +237,23 @@ where (url, task) }) .collect(); - let mut a = vec![]; + let mut a: Vec> = vec![]; for (_schemehostport, jh) in spawned { - let res = match jh.await { + let res: SubRes = match jh.await { Ok(k) => match k { Ok(k) => k, - Err(e) => return Err(e), + Err(e) => { + warn!("{e:?}"); + return Err(e); + } }, - Err(e) => return Err(e.into()), + Err(e) => { + warn!("{e:?}"); + return Err(e.into()); + } }; a.push(res); } - let a = a; ft(a) } @@ -260,8 +268,15 @@ mod test { vec![], vec![], vec![], - |_res| { - let fut = async { Ok(()) }; + |tag, _res| { + let fut = async { + let ret = SubRes { + tag, + status: StatusCode::OK, + val: (), + }; + Ok(ret) + }; Box::pin(fut) }, |_all| { diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index 3ee38a0..cc866a8 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -1,6 +1,7 @@ pub mod api1; pub mod channelarchiver; pub mod err; +pub mod evinfo; pub mod gather; pub mod proxy; pub mod pulsemap; @@ -50,14 +51,20 @@ pub async fn host(node_config: NodeConfigCached) -> Result<(), Error> { let addr = SocketAddr::from_str(&format!("{}:{}", node_config.node.listen, node_config.node.port))?; let make_service = make_service_fn({ move |conn: &AddrStream| { - // TODO send to logstash debug!("new connection from {:?}", conn.remote_addr()); let node_config = node_config.clone(); let addr = conn.remote_addr(); async move { Ok::<_, Error>(service_fn({ move |req| { - info!("REQUEST {:?} {:?}", addr, req.uri()); + // TODO send to logstash + info!( + "REQUEST {:?} - {:?} - {:?} - {:?}", + addr, + req.method(), + req.uri(), + req.headers() + ); let f = http_service(req, node_config.clone()); Cont { f: Box::pin(f) } } @@ -170,8 +177,6 @@ macro_rules! static_http_api1 { } async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> Result, Error> { - // TODO send to logstash - debug!("http_service_try {:?}", req.uri()); let uri = req.uri().clone(); let path = uri.path(); if path == "/api/4/node_status" { @@ -289,6 +294,8 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } + } else if let Some(h) = evinfo::EventInfoScan::handler(&req) { + h.handle(req, &node_config).await } else if let Some(h) = pulsemap::IndexFullHttpFunction::handler(&req) { h.handle(req, &node_config).await } else if let Some(h) = pulsemap::MarkClosedHttpFunction::handler(&req) { diff --git a/httpret/src/proxy.rs b/httpret/src/proxy.rs index 2eb0411..776a06d 100644 --- a/httpret/src/proxy.rs +++ b/httpret/src/proxy.rs @@ -41,6 +41,14 @@ pub async fn proxy(proxy_config: ProxyConfig) -> Result<(), Error> { async move { Ok::<_, Error>(service_fn({ move |req| { + // TODO send to logstash + info!( + "REQUEST {:?} - {:?} - {:?} - {:?}", + addr, + req.method(), + req.uri(), + req.headers() + ); let f = proxy_http_service(req, proxy_config.clone()); Cont { f: Box::pin(f) } } @@ -119,6 +127,26 @@ async fn proxy_http_service_try(req: Request, proxy_config: &ProxyConfig) } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } + } else if path.starts_with("/api/4/test/http/204") { + Ok(response(StatusCode::NO_CONTENT).body(Body::from("No Content"))?) + } else if path.starts_with("/api/4/test/http/400") { + Ok(response(StatusCode::BAD_REQUEST).body(Body::from("Bad Request"))?) + } else if path.starts_with("/api/4/test/http/405") { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::from("Method Not Allowed"))?) + } else if path.starts_with("/api/4/test/http/406") { + Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::from("Not Acceptable"))?) + } else if path.starts_with("/api/4/test/log/error") { + error!("{path}"); + Ok(response(StatusCode::OK).body(Body::empty())?) + } else if path.starts_with("/api/4/test/log/warn") { + warn!("{path}"); + Ok(response(StatusCode::OK).body(Body::empty())?) + } else if path.starts_with("/api/4/test/log/info") { + info!("{path}"); + Ok(response(StatusCode::OK).body(Body::empty())?) + } else if path.starts_with("/api/4/test/log/debug") { + debug!("{path}"); + Ok(response(StatusCode::OK).body(Body::empty())?) } else if path.starts_with(DISTRI_PRE) { proxy_distribute_v2(req).await } else { @@ -241,55 +269,74 @@ pub async fn channel_search(req: Request, proxy_config: &ProxyConfig) -> R }); } let tags = urls.iter().map(|k| k.to_string()).collect(); - let nt = |res| { + let nt = |tag, res| { let fut = async { let body = hyper::body::to_bytes(res).await?; //info!("got a result {:?}", body); - let res: ChannelSearchResult = match serde_json::from_slice::(&body) { - Ok(k) => k, - Err(_) => { - #[derive(Deserialize)] - struct ResItemApi0 { - name: String, - source: String, - backend: String, - #[serde(rename = "type")] - ty: String, + let res: SubRes = + match serde_json::from_slice::(&body) { + Ok(val) => { + let ret = SubRes { + tag, + status: StatusCode::OK, + val, + }; + ret } - #[derive(Deserialize)] - struct ResContApi0 { - #[allow(dead_code)] - backend: String, - channels: Vec, - } - match serde_json::from_slice::>(&body) { - Ok(k) => { - let mut a = vec![]; - if let Some(g) = k.first() { - for c in &g.channels { - let z = ChannelSearchSingleResult { - backend: c.backend.clone(), - description: String::new(), - name: c.name.clone(), - shape: vec![], - source: c.source.clone(), - ty: c.ty.clone(), - unit: String::new(), - is_api_0: Some(true), - }; - a.push(z); + Err(_) => { + #[derive(Deserialize)] + struct ResItemApi0 { + name: String, + source: String, + backend: String, + #[serde(rename = "type")] + ty: String, + } + #[derive(Deserialize)] + struct ResContApi0 { + #[allow(dead_code)] + backend: String, + channels: Vec, + } + match serde_json::from_slice::>(&body) { + Ok(k) => { + let mut a = vec![]; + if let Some(g) = k.first() { + for c in &g.channels { + let z = ChannelSearchSingleResult { + backend: c.backend.clone(), + description: String::new(), + name: c.name.clone(), + shape: vec![], + source: c.source.clone(), + ty: c.ty.clone(), + unit: String::new(), + is_api_0: Some(true), + }; + a.push(z); + } } + let ret = ChannelSearchResult { channels: a }; + let ret = SubRes { + tag, + status: StatusCode::OK, + val: ret, + }; + ret + } + Err(_) => { + error!("Channel search response parse failed"); + let ret = ChannelSearchResult { channels: vec![] }; + let ret = SubRes { + tag, + status: StatusCode::OK, + val: ret, + }; + ret } - let ret = ChannelSearchResult { channels: a }; - ret - } - Err(_) => { - error!("Channel search response parse failed"); - ChannelSearchResult { channels: vec![] } } } - } - }; + }; Ok(res) }; Box::pin(fut) as Pin + Send>> @@ -428,22 +475,46 @@ where a })?; let tags: Vec<_> = urls.iter().map(|k| k.to_string()).collect(); - let nt = |res| { + let nt = |tag: String, res: Response| { let fut = async { - let body = hyper::body::to_bytes(res).await?; - match serde_json::from_slice::(&body) { - Ok(k) => Ok(k), - Err(e) => Err(e.into()), + let (head, body) = res.into_parts(); + if head.status == StatusCode::OK { + let body = hyper::body::to_bytes(body).await?; + match serde_json::from_slice::(&body) { + Ok(val) => { + let ret = SubRes { + tag, + status: head.status, + val, + }; + Ok(ret) + } + Err(e) => { + warn!("can not parse response: {e:?}"); + Err(e.into()) + } + } + } else { + let body = hyper::body::to_bytes(body).await?; + let b = String::from_utf8_lossy(&body); + let ret = SubRes { + tag, + status: head.status, + // TODO would like to pass arbitrary type of body in these cases: + val: serde_json::Value::String(format!("{}", b)), + }; + Ok(ret) } }; - Box::pin(fut) as Pin + Send>> + Box::pin(fut) as Pin, Error>> + Send>> }; let ft = |mut all: Vec>| { if all.len() > 0 { all.truncate(1); let z = all.pop().unwrap(); let res = z.val; - let res = response(StatusCode::OK) + // TODO want to pass arbitrary body type: + let res = response(z.status) .header(http::header::CONTENT_TYPE, APP_JSON) .body(Body::from(serde_json::to_string(&res)?))?; return Ok(res); @@ -472,9 +543,11 @@ fn get_query_host_for_backend(backend: &str, proxy_config: &ProxyConfig) -> Resu return Err(Error::with_msg(format!("host not found for backend {:?}", backend))); } -fn get_query_host_for_backend_2(backend: &str, _proxy_config: &ProxyConfig) -> Result { - if backend == "sf-databuffer" { - return Ok("https://sf-data-api.psi.ch".into()); +fn get_query_host_for_backend_2(backend: &str, proxy_config: &ProxyConfig) -> Result { + for back in &proxy_config.backends2 { + if back.name == backend { + return Ok(back.url.clone()); + } } return Err(Error::with_msg(format!("host not found for backend {:?}", backend))); } diff --git a/httpret/src/proxy/api4.rs b/httpret/src/proxy/api4.rs index 89251ba..b55a725 100644 --- a/httpret/src/proxy/api4.rs +++ b/httpret/src/proxy/api4.rs @@ -42,7 +42,7 @@ pub async fn channel_search(req: Request, proxy_config: &ProxyConfig) -> R a })?; let tags = urls.iter().map(|k| k.to_string()).collect(); - let nt = |res| { + let nt = |tag, res| { let fut = async { let body = hyper::body::to_bytes(res).await?; //info!("got a result {:?}", body); @@ -54,7 +54,12 @@ pub async fn channel_search(req: Request, proxy_config: &ProxyConfig) -> R return Err(Error::with_msg_no_trace(msg)); } }; - Ok(res) + let ret = SubRes { + tag, + status: StatusCode::OK, + val: res, + }; + Ok(ret) }; Box::pin(fut) as Pin + Send>> }; diff --git a/httpret/src/pulsemap.rs b/httpret/src/pulsemap.rs index 34e6639..fceb473 100644 --- a/httpret/src/pulsemap.rs +++ b/httpret/src/pulsemap.rs @@ -326,7 +326,7 @@ async fn update_task(do_abort: Arc, node_config: NodeConfigCached) info!("update_task break A"); break; } - tokio::time::sleep(Duration::from_millis(10000)).await; + tokio::time::sleep(Duration::from_millis(60000)).await; if do_abort.load(Ordering::SeqCst) != 0 { info!("update_task break B"); break; @@ -464,7 +464,6 @@ impl FromUrl for MapPulseQuery { .into(); let pulse: u64 = pulsestr.parse()?; let ret = Self { backend, pulse }; - info!("GOT {:?}", ret); Ok(ret) } } @@ -647,7 +646,7 @@ impl Api4MapPulseHttpFunction { if req.method() != Method::GET { return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); } - info!("Api4MapPulseHttpFunction handle uri: {:?}", req.uri()); + //info!("Api4MapPulseHttpFunction handle uri: {:?}", req.uri()); let url = Url::parse(&format!("dummy:{}", req.uri()))?; let q = MapPulseQuery::from_url(&url)?; let histo = MapPulseHistoHttpFunction::histo(q.pulse, node_config).await?; diff --git a/items/src/binnedevents.rs b/items/src/binnedevents.rs index b523f4f..bcf45af 100644 --- a/items/src/binnedevents.rs +++ b/items/src/binnedevents.rs @@ -25,6 +25,7 @@ impl SingleBinWaveEvents { items_proc::tycases1!(self, Self, (k), { "$id".into() }) } + // TODO possible to remove? fn x_aggregate(self, ak: &AggKind) -> EventsItem { items_proc::tycases1!(self, Self, (k), { match ak { @@ -33,6 +34,7 @@ impl SingleBinWaveEvents { AggKind::TimeWeightedScalar => err::todoval(), AggKind::DimXBins1 => err::todoval(), AggKind::DimXBinsN(_) => EventsItem::Plain(PlainEvents::Wave(err::todoval())), + AggKind::Stats1 => err::todoval(), } }) } @@ -121,6 +123,7 @@ impl MultiBinWaveEvents { AggKind::TimeWeightedScalar => err::todoval(), AggKind::DimXBins1 => err::todoval(), AggKind::DimXBinsN(_) => EventsItem::Plain(PlainEvents::Wave(err::todoval())), + AggKind::Stats1 => err::todoval(), } }) } diff --git a/items/src/lib.rs b/items/src/lib.rs index 4848736..1c9d7fb 100644 --- a/items/src/lib.rs +++ b/items/src/lib.rs @@ -8,6 +8,7 @@ pub mod minmaxavgwavebins; pub mod numops; pub mod plainevents; pub mod scalarevents; +pub mod statsevents; pub mod streams; pub mod waveevents; pub mod xbinnedscalarevents; @@ -41,6 +42,7 @@ pub const MIN_MAX_AVG_WAVE_BINS: u32 = 0xa00; pub const MIN_MAX_AVG_DIM_1_BINS_FRAME_TYPE_ID: u32 = 0xb00; pub const EVENT_FULL_FRAME_TYPE_ID: u32 = 0x2200; pub const EVENTS_ITEM_FRAME_TYPE_ID: u32 = 0x2300; +pub const STATS_EVENTS_FRAME_TYPE_ID: u32 = 0x2400; pub fn bool_is_false(j: &bool) -> bool { *j == false diff --git a/items/src/plainevents.rs b/items/src/plainevents.rs index b0a5dd4..012f9bc 100644 --- a/items/src/plainevents.rs +++ b/items/src/plainevents.rs @@ -132,6 +132,7 @@ impl WavePlainEvents { EventsItem::XBinnedEvents(XBinnedEvents::SingleBinWave(SingleBinWaveEvents::$id(j))) } AggKind::DimXBinsN(_) => EventsItem::Plain(PlainEvents::Wave(err::todoval())), + AggKind::Stats1 => err::todoval(), } }) } diff --git a/items/src/scalarevents.rs b/items/src/scalarevents.rs index b547669..7fe6e1f 100644 --- a/items/src/scalarevents.rs +++ b/items/src/scalarevents.rs @@ -13,8 +13,7 @@ use serde::{Deserialize, Serialize}; use std::fmt; use tokio::fs::File; -// TODO add pulse. Is this even used?? -// TODO change name, it's not only about values, but more like batch of whole events. +// TODO add pulse. #[derive(Serialize, Deserialize)] pub struct ScalarEvents { pub tss: Vec, diff --git a/items/src/statsevents.rs b/items/src/statsevents.rs new file mode 100644 index 0000000..dfddb96 --- /dev/null +++ b/items/src/statsevents.rs @@ -0,0 +1,404 @@ +use crate::streams::{Collectable, Collector}; +use crate::{ + ts_offs_from_abs, Appendable, ByteEstimate, Clearable, EventAppendable, FilterFittingInside, Fits, FitsInside, + PushableIndex, RangeOverlapInfo, ReadPbv, ReadableFromFile, SitemtyFrameType, TimeBinnableType, + TimeBinnableTypeAggregator, WithLen, WithTimestamps, +}; +use err::Error; +use netpod::log::*; +use netpod::NanoRange; +use serde::{Deserialize, Serialize}; +use std::fmt; +use tokio::fs::File; + +#[derive(Serialize, Deserialize)] +pub struct StatsEvents { + pub tss: Vec, + pub pulses: Vec, +} + +impl SitemtyFrameType for StatsEvents { + const FRAME_TYPE_ID: u32 = crate::STATS_EVENTS_FRAME_TYPE_ID; +} + +impl StatsEvents { + pub fn empty() -> Self { + Self { + tss: vec![], + pulses: vec![], + } + } +} + +impl fmt::Debug for StatsEvents { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!( + fmt, + "count {} tss {:?} .. {:?} pulses {:?} .. {:?}", + self.tss.len(), + self.tss.first(), + self.tss.last(), + self.pulses.first(), + self.pulses.last(), + ) + } +} + +impl WithLen for StatsEvents { + fn len(&self) -> usize { + self.tss.len() + } +} + +impl WithTimestamps for StatsEvents { + fn ts(&self, ix: usize) -> u64 { + self.tss[ix] + } +} + +impl ByteEstimate for StatsEvents { + fn byte_estimate(&self) -> u64 { + if self.tss.len() == 0 { + 0 + } else { + // TODO improve via a const fn on NTY + self.tss.len() as u64 * 16 + } + } +} + +impl RangeOverlapInfo for StatsEvents { + fn ends_before(&self, range: NanoRange) -> bool { + match self.tss.last() { + Some(&ts) => ts < range.beg, + None => true, + } + } + + fn ends_after(&self, range: NanoRange) -> bool { + match self.tss.last() { + Some(&ts) => ts >= range.end, + None => panic!(), + } + } + + fn starts_after(&self, range: NanoRange) -> bool { + match self.tss.first() { + Some(&ts) => ts >= range.end, + None => panic!(), + } + } +} + +impl FitsInside for StatsEvents { + fn fits_inside(&self, range: NanoRange) -> Fits { + if self.tss.is_empty() { + Fits::Empty + } else { + let t1 = *self.tss.first().unwrap(); + let t2 = *self.tss.last().unwrap(); + if t2 < range.beg { + Fits::Lower + } else if t1 > range.end { + Fits::Greater + } else if t1 < range.beg && t2 > range.end { + Fits::PartlyLowerAndGreater + } else if t1 < range.beg { + Fits::PartlyLower + } else if t2 > range.end { + Fits::PartlyGreater + } else { + Fits::Inside + } + } + } +} + +impl FilterFittingInside for StatsEvents { + fn filter_fitting_inside(self, fit_range: NanoRange) -> Option { + match self.fits_inside(fit_range) { + Fits::Inside | Fits::PartlyGreater | Fits::PartlyLower | Fits::PartlyLowerAndGreater => Some(self), + _ => None, + } + } +} + +impl PushableIndex for StatsEvents { + fn push_index(&mut self, src: &Self, ix: usize) { + self.tss.push(src.tss[ix]); + self.pulses.push(src.pulses[ix]); + } +} + +impl Appendable for StatsEvents { + fn empty_like_self(&self) -> Self { + Self::empty() + } + + fn append(&mut self, src: &Self) { + self.tss.extend_from_slice(&src.tss); + self.pulses.extend_from_slice(&src.pulses); + } +} + +impl Clearable for StatsEvents { + fn clear(&mut self) { + self.tss.clear(); + self.pulses.clear(); + } +} + +impl ReadableFromFile for StatsEvents { + fn read_from_file(_file: File) -> Result, Error> { + // TODO refactor types such that this can be removed. + panic!() + } + + fn from_buf(_buf: &[u8]) -> Result { + panic!() + } +} + +impl TimeBinnableType for StatsEvents { + type Output = StatsEvents; + type Aggregator = StatsEventsAggregator; + + fn aggregator(range: NanoRange, _x_bin_count: usize, do_time_weight: bool) -> Self::Aggregator { + Self::Aggregator::new(range, do_time_weight) + } +} + +pub struct StatsEventsCollector { + vals: StatsEvents, + range_complete: bool, + timed_out: bool, +} + +impl StatsEventsCollector { + pub fn new() -> Self { + Self { + vals: StatsEvents::empty(), + range_complete: false, + timed_out: false, + } + } +} + +impl WithLen for StatsEventsCollector { + fn len(&self) -> usize { + self.vals.tss.len() + } +} + +#[derive(Serialize)] +pub struct StatsEventsCollectorOutput { + #[serde(rename = "tsAnchor")] + ts_anchor_sec: u64, + #[serde(rename = "tsMs")] + ts_off_ms: Vec, + #[serde(rename = "tsNs")] + ts_off_ns: Vec, + // TODO what to collect? pulse min/max + #[serde(skip_serializing_if = "crate::bool_is_false", rename = "finalisedRange")] + range_complete: bool, + #[serde(skip_serializing_if = "crate::bool_is_false", rename = "timedOut")] + timed_out: bool, +} + +impl Collector for StatsEventsCollector { + type Input = StatsEvents; + type Output = StatsEventsCollectorOutput; + + fn ingest(&mut self, src: &Self::Input) { + self.vals.append(src); + } + + fn set_range_complete(&mut self) { + self.range_complete = true; + } + + fn set_timed_out(&mut self) { + self.timed_out = true; + } + + fn result(self) -> Result { + let tst = ts_offs_from_abs(&self.vals.tss); + let ret = Self::Output { + ts_anchor_sec: tst.0, + ts_off_ms: tst.1, + ts_off_ns: tst.2, + range_complete: self.range_complete, + timed_out: self.timed_out, + }; + Ok(ret) + } +} + +impl Collectable for StatsEvents { + type Collector = StatsEventsCollector; + + fn new_collector(_bin_count_exp: u32) -> Self::Collector { + Self::Collector::new() + } +} + +pub struct StatsEventsAggregator { + range: NanoRange, + count: u64, + sumc: u64, + sum: f32, + int_ts: u64, + last_ts: u64, + do_time_weight: bool, +} + +impl StatsEventsAggregator { + pub fn new(range: NanoRange, do_time_weight: bool) -> Self { + let int_ts = range.beg; + Self { + range, + count: 0, + sum: 0f32, + sumc: 0, + int_ts, + last_ts: 0, + do_time_weight, + } + } + + fn apply_min_max(&mut self, _val: f32) { + // TODO currently no values to min/max + } + + fn apply_event_unweight(&mut self, val: f32) { + self.apply_min_max(val); + let vf = val; + if vf.is_nan() { + } else { + self.sum += vf; + self.sumc += 1; + } + } + + fn apply_event_time_weight(&mut self, _ts: u64) { + // TODO currently no value to weight. + } + + fn ingest_unweight(&mut self, item: &::Input) { + for i1 in 0..item.tss.len() { + let ts = item.tss[i1]; + let val = 0.0; + if ts < self.range.beg { + } else if ts >= self.range.end { + } else { + self.apply_event_unweight(val); + self.count += 1; + } + } + } + + fn ingest_time_weight(&mut self, item: &::Input) { + for i1 in 0..item.tss.len() { + let ts = item.tss[i1]; + //let val = 0.0; + if ts < self.int_ts { + self.last_ts = ts; + //self.last_val = Some(val); + } else if ts >= self.range.end { + return; + } else { + self.apply_event_time_weight(ts); + self.count += 1; + self.last_ts = ts; + //self.last_val = Some(val); + } + } + } + + fn reset(&mut self, range: NanoRange) { + self.int_ts = range.beg; + self.range = range; + self.count = 0; + //self.min = None; + //self.max = None; + self.sum = 0f32; + self.sumc = 0; + } + + fn result_reset_unweight( + &mut self, + range: NanoRange, + _expand: bool, + ) -> ::Output { + let _avg = if self.sumc == 0 { + None + } else { + Some(self.sum / self.sumc as f32) + }; + // TODO return some meaningful value + let ret = StatsEvents::empty(); + self.reset(range); + ret + } + + fn result_reset_time_weight( + &mut self, + range: NanoRange, + expand: bool, + ) -> ::Output { + // TODO check callsite for correct expand status. + if true || expand { + debug!("result_reset_time_weight calls apply_event_time_weight"); + self.apply_event_time_weight(self.range.end); + } else { + debug!("result_reset_time_weight NO EXPAND"); + } + let _avg = { + let sc = self.range.delta() as f32 * 1e-9; + Some(self.sum / sc) + }; + // TODO return some meaningful value + let ret = StatsEvents::empty(); + self.reset(range); + ret + } +} + +impl TimeBinnableTypeAggregator for StatsEventsAggregator { + type Input = StatsEvents; + type Output = StatsEvents; + + fn range(&self) -> &NanoRange { + &self.range + } + + fn ingest(&mut self, item: &Self::Input) { + if self.do_time_weight { + self.ingest_time_weight(item) + } else { + self.ingest_unweight(item) + } + } + + fn result_reset(&mut self, range: NanoRange, expand: bool) -> Self::Output { + if self.do_time_weight { + self.result_reset_time_weight(range, expand) + } else { + self.result_reset_unweight(range, expand) + } + } +} + +impl EventAppendable for StatsEvents { + type Value = f32; + + fn append_event(ret: Option, ts: u64, _value: Self::Value) -> Self { + let mut ret = if let Some(ret) = ret { ret } else { Self::empty() }; + ret.tss.push(ts); + // TODO + error!("TODO statsevents append_event"); + err::todo(); + ret.pulses.push(42); + ret + } +} diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index 5cfc0d5..9325110 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -932,6 +932,7 @@ pub enum AggKind { DimXBinsN(u32), Plain, TimeWeightedScalar, + Stats1, } impl AggKind { @@ -942,6 +943,7 @@ impl AggKind { Self::DimXBins1 => false, Self::DimXBinsN(_) => false, Self::Plain => false, + Self::Stats1 => false, } } @@ -952,6 +954,7 @@ impl AggKind { Self::DimXBins1 => false, Self::DimXBinsN(_) => false, Self::Plain => false, + Self::Stats1 => false, } } } @@ -977,6 +980,7 @@ pub fn x_bin_count(shape: &Shape, agg_kind: &AggKind) -> usize { Shape::Wave(n) => *n as usize, Shape::Image(j, k) => *j as usize * *k as usize, }, + AggKind::Stats1 => 0, } } @@ -998,6 +1002,9 @@ impl fmt::Display for AggKind { Self::TimeWeightedScalar => { write!(fmt, "TimeWeightedScalar") } + Self::Stats1 => { + write!(fmt, "Stats1") + } } } } @@ -1019,6 +1026,8 @@ impl FromStr for AggKind { Ok(AggKind::DimXBins1) } else if s == "TimeWeightedScalar" { Ok(AggKind::TimeWeightedScalar) + } else if s == "Stats1" { + Ok(AggKind::Stats1) } else if s.starts_with(nmark) { let nbins: u32 = s[nmark.len()..].parse()?; Ok(AggKind::DimXBinsN(nbins)) @@ -1316,6 +1325,7 @@ pub struct ProxyConfig { pub port: u16, pub search_hosts: Vec, pub backends: Vec, + pub backends2: Vec, pub api_0_search_hosts: Option>, pub api_0_search_backends: Option>, } diff --git a/netpod/src/query.rs b/netpod/src/query.rs index 562c0c8..e050e93 100644 --- a/netpod/src/query.rs +++ b/netpod/src/query.rs @@ -284,6 +284,9 @@ pub fn binning_scheme_append_to_url(agg_kind: &AggKind, url: &mut Url) { g.append_pair("binningScheme", "binnedX"); g.append_pair("binnedXcount", &format!("{}", n)); } + AggKind::Stats1 => { + g.append_pair("binningScheme", "stats1"); + } } } @@ -303,6 +306,8 @@ pub fn agg_kind_from_binning_scheme(pairs: &BTreeMap) -> Result< } else if s == "binnedX" { let u = pairs.get("binnedXcount").map_or("1", |k| k).parse()?; AggKind::DimXBinsN(u) + } else if s == "stats1" { + AggKind::Stats1 } else { return Err(Error::with_msg("can not extract binningScheme")); }; diff --git a/parse/Cargo.toml b/parse/Cargo.toml index d3da9fa..9cdbe38 100644 --- a/parse/Cargo.toml +++ b/parse/Cargo.toml @@ -6,7 +6,7 @@ edition = "2018" [dependencies] serde = { version = "1.0", features = ["derive"] } -tokio = { version = "1.14.77", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +tokio = { version = "=1.16.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } chrono = { version = "0.4.19", features = ["serde"] } bytes = "1.0.1" byteorder = "1.4.3" diff --git a/taskrun/Cargo.toml b/taskrun/Cargo.toml index 2b9bd17..0bd9cc6 100644 --- a/taskrun/Cargo.toml +++ b/taskrun/Cargo.toml @@ -8,11 +8,10 @@ edition = "2021" path = "src/taskrun.rs" [dependencies] -tokio = { version = "1.14.77", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +tokio = { version = "=1.16.1", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } tracing = "0.1.25" tracing-subscriber = { version = "0.2.17", features= ["chrono"] } backtrace = "0.3.56" lazy_static = "1.4.0" chrono = "0.4" err = { path = "../err" } -#netpod = { path = "../netpod" } diff --git a/taskrun/src/append.rs b/taskrun/src/append.rs index 5425f9d..433036d 100644 --- a/taskrun/src/append.rs +++ b/taskrun/src/append.rs @@ -1,7 +1,7 @@ use err::Error; use std::borrow::Cow; use std::fs; -use std::io::{BufWriter, Read, Seek, SeekFrom, Stderr, Stdin, Write}; +use std::io::{BufWriter, Read, Seek, SeekFrom, Stdin, Write}; use std::path::{Path, PathBuf}; pub struct Buffer { @@ -155,7 +155,7 @@ fn next_file(dir: &Path) -> Result, Error> { Ok(ret) } -pub fn append_inner(dirname: &str, mut stdin: Stdin, _stderr: Stderr) -> Result<(), Error> { +pub fn append_inner(dirname: &str, mut stdin: Stdin) -> Result<(), Error> { let mut bytes_written = 0; let dir = PathBuf::from(dirname); let mut fout = open_latest_or_new(&dir)?; @@ -237,11 +237,13 @@ pub fn append_inner(dirname: &str, mut stdin: Stdin, _stderr: Stderr) -> Result< } } -pub fn append(dirname: &str, stdin: Stdin, _stderr: Stderr) -> Result<(), Error> { - match append_inner(dirname, stdin, _stderr) { +pub fn append(dirname: &str, stdin: Stdin) -> Result<(), Error> { + match append_inner(dirname, stdin) { Ok(k) => Ok(k), Err(e) => { - eprintln!("got error {:?}", e); + let dir = PathBuf::from(dirname); + let mut fout = open_latest_or_new(&dir)?; + let _ = write!(fout, "ERROR in append_inner: {:?}", e); Err(e) } }