From c0bdc854ff4dced92753179e29095b88e0ec6e9e Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Mon, 20 Mar 2023 07:13:59 +0100 Subject: [PATCH] Factor out query type --- daqbufp2/Cargo.toml | 1 + daqbufp2/src/client.rs | 2 +- daqbufp2/src/test/api4/binnedjson.rs | 2 +- daqbufp2/src/test/api4/eventsjson.rs | 2 +- daqbufp2/src/test/binnedbinary.rs | 2 +- daqbufp2/src/test/events.rs | 2 +- daqbufp2/src/test/timeweightedjson.rs | 2 +- disk/Cargo.toml | 1 + disk/src/decode.rs | 6 +- disk/src/merge/mergedblobsfromremotes.rs | 4 +- disk/src/raw/conn.rs | 17 +- httpret/Cargo.toml | 1 + httpret/src/api1.rs | 2 +- httpret/src/api4/binned.rs | 15 +- httpret/src/api4/events.rs | 2 +- httpret/src/channelconfig.rs | 4 +- httpret/src/proxy.rs | 4 +- items_2/src/binnedcollected.rs | 1 - netpod/src/api4/events.rs | 1 - netpod/src/netpod.rs | 16 +- netpod/src/query.rs | 421 ----------------------- nodenet/Cargo.toml | 1 + nodenet/src/conn.rs | 15 +- nodenet/src/conn/test.rs | 2 +- query/Cargo.toml | 16 + {netpod => query}/src/api4.rs | 1 + query/src/api4/binned.rs | 226 ++++++++++++ query/src/api4/events.rs | 235 +++++++++++++ query/src/lib.rs | 2 + {netpod => query}/src/transform.rs | 56 ++- scyllaconn/Cargo.toml | 1 + scyllaconn/src/bincache.rs | 15 +- streams/Cargo.toml | 1 + streams/src/eventchunker.rs | 6 +- streams/src/plaineventsjson.rs | 3 +- streams/src/tcprawclient.rs | 2 +- streams/src/timebinnedjson.rs | 5 +- 37 files changed, 595 insertions(+), 500 deletions(-) delete mode 100644 netpod/src/api4/events.rs create mode 100644 query/Cargo.toml rename {netpod => query}/src/api4.rs (50%) create mode 100644 query/src/api4/binned.rs create mode 100644 query/src/api4/events.rs create mode 100644 query/src/lib.rs rename {netpod => query}/src/transform.rs (75%) diff --git a/daqbufp2/Cargo.toml b/daqbufp2/Cargo.toml index c9824b9..79119d5 100644 --- a/daqbufp2/Cargo.toml +++ b/daqbufp2/Cargo.toml @@ -25,6 +25,7 @@ lazy_static = "1.4.0" err = { path = "../err" } taskrun = { path = "../taskrun" } netpod = { path = "../netpod" } +query = { path = "../query" } httpret = { path = "../httpret" } httpclient = { path = "../httpclient" } disk = { path = "../disk" } diff --git a/daqbufp2/src/client.rs b/daqbufp2/src/client.rs index 3b8e5d5..a99f100 100644 --- a/daqbufp2/src/client.rs +++ b/daqbufp2/src/client.rs @@ -9,7 +9,6 @@ use httpclient::HttpBodyAsAsyncRead; use hyper::Body; use items_0::streamitem::StreamItem; use netpod::log::*; -use netpod::query::BinnedQuery; use netpod::query::CacheUsage; use netpod::range::evrange::NanoRange; use netpod::AppendToUrl; @@ -18,6 +17,7 @@ use netpod::Channel; use netpod::HostPort; use netpod::PerfOpts; use netpod::APP_OCTET; +use query::api4::binned::BinnedQuery; use streams::frames::inmem::InMemoryFrameAsyncReadStream; use url::Url; diff --git a/daqbufp2/src/test/api4/binnedjson.rs b/daqbufp2/src/test/api4/binnedjson.rs index 56a053e..07b8796 100644 --- a/daqbufp2/src/test/api4/binnedjson.rs +++ b/daqbufp2/src/test/api4/binnedjson.rs @@ -6,13 +6,13 @@ use err::Error; use http::StatusCode; use hyper::Body; use netpod::log::*; -use netpod::query::BinnedQuery; use netpod::range::evrange::NanoRange; use netpod::AppendToUrl; use netpod::Channel; use netpod::Cluster; use netpod::HostPort; use netpod::APP_JSON; +use query::api4::binned::BinnedQuery; use serde_json::Value as JsonValue; use url::Url; diff --git a/daqbufp2/src/test/api4/eventsjson.rs b/daqbufp2/src/test/api4/eventsjson.rs index 3df3a3b..26e21f3 100644 --- a/daqbufp2/src/test/api4/eventsjson.rs +++ b/daqbufp2/src/test/api4/eventsjson.rs @@ -6,13 +6,13 @@ use err::Error; use http::StatusCode; use hyper::Body; use netpod::log::*; -use netpod::query::PlainEventsQuery; use netpod::range::evrange::NanoRange; use netpod::AppendToUrl; use netpod::Channel; use netpod::Cluster; use netpod::HostPort; use netpod::APP_JSON; +use query::api4::events::PlainEventsQuery; use serde_json::Value as JsonValue; use url::Url; diff --git a/daqbufp2/src/test/binnedbinary.rs b/daqbufp2/src/test/binnedbinary.rs index 46d2a00..27f6d52 100644 --- a/daqbufp2/src/test/binnedbinary.rs +++ b/daqbufp2/src/test/binnedbinary.rs @@ -11,7 +11,6 @@ use hyper::Body; use items_0::streamitem::StreamItem; use items_0::subfr::SubFrId; use netpod::log::*; -use netpod::query::BinnedQuery; use netpod::query::CacheUsage; use netpod::range::evrange::NanoRange; use netpod::AppendToUrl; @@ -20,6 +19,7 @@ use netpod::Cluster; use netpod::HostPort; use netpod::PerfOpts; use netpod::APP_OCTET; +use query::api4::binned::BinnedQuery; use serde::de::DeserializeOwned; use std::fmt; use std::future::ready; diff --git a/daqbufp2/src/test/events.rs b/daqbufp2/src/test/events.rs index d2b0ba7..9e07fb9 100644 --- a/daqbufp2/src/test/events.rs +++ b/daqbufp2/src/test/events.rs @@ -11,7 +11,6 @@ use httpclient::HttpBodyAsAsyncRead; use hyper::Body; use items_0::streamitem::StreamItem; use netpod::log::*; -use netpod::query::PlainEventsQuery; use netpod::range::evrange::NanoRange; use netpod::AppendToUrl; use netpod::Channel; @@ -20,6 +19,7 @@ use netpod::HostPort; use netpod::PerfOpts; use netpod::APP_JSON; use netpod::APP_OCTET; +use query::api4::events::PlainEventsQuery; use serde_json::Value as JsonValue; use std::fmt::Debug; use std::future::ready; diff --git a/daqbufp2/src/test/timeweightedjson.rs b/daqbufp2/src/test/timeweightedjson.rs index 6ff25fa..1cd20ce 100644 --- a/daqbufp2/src/test/timeweightedjson.rs +++ b/daqbufp2/src/test/timeweightedjson.rs @@ -6,13 +6,13 @@ use err::Error; use http::StatusCode; use hyper::Body; use netpod::log::*; -use netpod::query::BinnedQuery; use netpod::query::CacheUsage; use netpod::range::evrange::NanoRange; use netpod::AppendToUrl; use netpod::Channel; use netpod::Cluster; use netpod::APP_JSON; +use query::api4::binned::BinnedQuery; use std::time::Duration; use url::Url; diff --git a/disk/Cargo.toml b/disk/Cargo.toml index 059996b..9e3fe4b 100644 --- a/disk/Cargo.toml +++ b/disk/Cargo.toml @@ -36,6 +36,7 @@ tiny-keccak = { version = "2.0", features = ["sha3"] } err = { path = "../err" } taskrun = { path = "../taskrun" } netpod = { path = "../netpod" } +query = { path = "../query" } bitshuffle = { path = "../bitshuffle" } dbconn = { path = "../dbconn" } parse = { path = "../parse" } diff --git a/disk/src/decode.rs b/disk/src/decode.rs index 389d6fa..d540530 100644 --- a/disk/src/decode.rs +++ b/disk/src/decode.rs @@ -310,9 +310,8 @@ impl EventsDynStream { let st = &scalar_type; let sh = &shape; let ag = &agg_kind; - // TODO do we need/want the empty item from here? error!("TODO feed through transform?"); - err::todo(); + // TODO do we need/want the empty item from here? let events_out = items_2::empty::empty_events_dyn_ev(st, sh)?; let scalar_conv = make_scalar_conv(st, sh, ag)?; let emit_threshold = match &shape { @@ -337,9 +336,8 @@ impl EventsDynStream { fn replace_events_out(&mut self) -> Result, Error> { let st = &self.scalar_type; let sh = &self.shape; - // TODO do we need/want the empty item from here? error!("TODO feed through transform?"); - err::todo(); + // TODO do we need/want the empty item from here? let empty = items_2::empty::empty_events_dyn_ev(st, sh)?; let evs = mem::replace(&mut self.events_out, empty); Ok(evs) diff --git a/disk/src/merge/mergedblobsfromremotes.rs b/disk/src/merge/mergedblobsfromremotes.rs index c4f6f71..0b352dd 100644 --- a/disk/src/merge/mergedblobsfromremotes.rs +++ b/disk/src/merge/mergedblobsfromremotes.rs @@ -6,9 +6,9 @@ use items_0::streamitem::Sitemty; use items_2::eventfull::EventFull; use items_2::merger::Merger; use netpod::log::*; -use netpod::query::PlainEventsQuery; use netpod::Cluster; use netpod::PerfOpts; +use query::api4::events::PlainEventsQuery; use std::future::Future; use std::pin::Pin; use std::task::Context; @@ -99,7 +99,7 @@ impl Stream for MergedBlobsFromRemotes { if c1 == self.tcp_establish_futs.len() { let inps = self.nodein.iter_mut().map(|k| k.take().unwrap()).collect(); // TODO set out_max_len dynamically - let s1 = Merger::new(inps, 128); + let s1 = Merger::new(inps, 1); self.merged = Some(Box::pin(s1)); } continue 'outer; diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index 6fff6d9..3c0e073 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -9,7 +9,6 @@ use items_0::streamitem::StreamItem; use items_2::channelevents::ChannelEvents; use items_2::eventfull::EventFull; use netpod::log::*; -use netpod::query::PlainEventsQuery; use netpod::range::evrange::NanoRange; use netpod::AggKind; use netpod::ByteSize; @@ -22,6 +21,7 @@ use parse::channelconfig::extract_matching_config_entry; use parse::channelconfig::read_local_config; use parse::channelconfig::ConfigEntry; use parse::channelconfig::MatchingConfigEntry; +use query::api4::events::PlainEventsQuery; use std::pin::Pin; use streams::eventchunker::EventChunkerConf; @@ -109,7 +109,7 @@ pub async fn make_event_pipe( let out_max_len = if node_config.node_config.cluster.is_central_storage { 1 } else { - 128 + 1 }; let event_blobs = EventChunkerMultifile::new( (&range).try_into()?, @@ -184,7 +184,7 @@ pub fn make_local_event_blobs_stream( let out_max_len = if node_config.node_config.cluster.is_central_storage { 1 } else { - 128 + 1 }; let event_blobs = EventChunkerMultifile::new( range, @@ -230,7 +230,7 @@ pub fn make_remote_event_blobs_stream( let out_max_len = if node_config.node_config.cluster.is_central_storage { 1 } else { - 128 + 1 }; let event_blobs = EventChunkerMultifile::new( range, @@ -263,13 +263,14 @@ pub async fn make_event_blobs_pipe( let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); type ItemType = Sitemty; // TODO should depend on host config - let pipe = if true { - let event_blobs = make_remote_event_blobs_stream( + let do_local = node_config.node_config.cluster.is_central_storage; + let pipe = if do_local { + let event_blobs = make_local_event_blobs_stream( range.try_into()?, evq.channel().clone(), &entry, expand, - true, + false, event_chunker_conf, DiskIoTune::default(), node_config, @@ -281,7 +282,7 @@ pub async fn make_event_blobs_pipe( pipe*/ Box::pin(event_blobs) as _ } else { - let event_blobs = make_local_event_blobs_stream( + let event_blobs = make_remote_event_blobs_stream( range.try_into()?, evq.channel().clone(), &entry, diff --git a/httpret/Cargo.toml b/httpret/Cargo.toml index 49f36c8..208543c 100644 --- a/httpret/Cargo.toml +++ b/httpret/Cargo.toml @@ -26,6 +26,7 @@ md-5 = "0.10" regex = "1.7" err = { path = "../err" } netpod = { path = "../netpod" } +query = { path = "../query" } dbconn = { path = "../dbconn" } disk = { path = "../disk" } items = { path = "../items" } diff --git a/httpret/src/api1.rs b/httpret/src/api1.rs index 8546e6f..3429a74 100644 --- a/httpret/src/api1.rs +++ b/httpret/src/api1.rs @@ -26,7 +26,6 @@ use items_2::eventfull::EventFull; use itertools::Itertools; use netpod::log::*; use netpod::query::api1::Api1Query; -use netpod::query::PlainEventsQuery; use netpod::range::evrange::NanoRange; use netpod::timeunits::SEC; use netpod::ByteSize; @@ -47,6 +46,7 @@ use parse::channelconfig::read_local_config; use parse::channelconfig::Config; use parse::channelconfig::ConfigEntry; use parse::channelconfig::MatchingConfigEntry; +use query::api4::events::PlainEventsQuery; use serde::Deserialize; use serde::Serialize; use serde_json::Value as JsonValue; diff --git a/httpret/src/api4/binned.rs b/httpret/src/api4/binned.rs index 1816e4f..5a948e9 100644 --- a/httpret/src/api4/binned.rs +++ b/httpret/src/api4/binned.rs @@ -1,16 +1,21 @@ -use crate::bodystream::{response, ToPublicResponse}; +use crate::bodystream::response; +use crate::bodystream::ToPublicResponse; use crate::channelconfig::chconf_from_binned; use crate::err::Error; use crate::response_err; -use http::{Method, StatusCode}; -use http::{Request, Response}; +use http::Method; +use http::Request; +use http::Response; +use http::StatusCode; use hyper::Body; use netpod::log::*; -use netpod::query::BinnedQuery; use netpod::timeunits::SEC; use netpod::FromUrl; use netpod::NodeConfigCached; -use netpod::{ACCEPT_ALL, APP_JSON, APP_OCTET}; +use netpod::ACCEPT_ALL; +use netpod::APP_JSON; +use netpod::APP_OCTET; +use query::api4::binned::BinnedQuery; use tracing::Instrument; use url::Url; diff --git a/httpret/src/api4/events.rs b/httpret/src/api4/events.rs index 75b798b..d6ae523 100644 --- a/httpret/src/api4/events.rs +++ b/httpret/src/api4/events.rs @@ -12,12 +12,12 @@ use http::Response; use http::StatusCode; use hyper::Body; use netpod::log::*; -use netpod::query::PlainEventsQuery; use netpod::FromUrl; use netpod::NodeConfigCached; use netpod::ACCEPT_ALL; use netpod::APP_JSON; use netpod::APP_OCTET; +use query::api4::events::PlainEventsQuery; use url::Url; pub struct EventsHandler {} diff --git a/httpret/src/channelconfig.rs b/httpret/src/channelconfig.rs index 91863a6..9f62c09 100644 --- a/httpret/src/channelconfig.rs +++ b/httpret/src/channelconfig.rs @@ -11,8 +11,6 @@ use hyper::Body; use netpod::get_url_query_pairs; use netpod::log::*; use netpod::query::prebinned::PreBinnedQuery; -use netpod::query::BinnedQuery; -use netpod::query::PlainEventsQuery; use netpod::timeunits::*; use netpod::ChConf; use netpod::Channel; @@ -24,6 +22,8 @@ use netpod::ScalarType; use netpod::Shape; use netpod::ACCEPT_ALL; use netpod::APP_JSON; +use query::api4::binned::BinnedQuery; +use query::api4::events::PlainEventsQuery; use scylla::batch::Consistency; use scylla::frame::response::cql_to_rust::FromRowError as ScyFromRowError; use scylla::transport::errors::NewSessionError as ScyNewSessionError; diff --git a/httpret/src/proxy.rs b/httpret/src/proxy.rs index 395ebae..1549690 100644 --- a/httpret/src/proxy.rs +++ b/httpret/src/proxy.rs @@ -27,9 +27,7 @@ use hyper::Response; use hyper::Server; use itertools::Itertools; use netpod::log::*; -use netpod::query::BinnedQuery; use netpod::query::ChannelStateEventsQuery; -use netpod::query::PlainEventsQuery; use netpod::AppendToUrl; use netpod::ChannelConfigQuery; use netpod::ChannelSearchQuery; @@ -41,6 +39,8 @@ use netpod::HasTimeout; use netpod::ProxyConfig; use netpod::ACCEPT_ALL; use netpod::APP_JSON; +use query::api4::binned::BinnedQuery; +use query::api4::events::PlainEventsQuery; use serde::Deserialize; use serde::Serialize; use serde_json::Value as JsonValue; diff --git a/items_2/src/binnedcollected.rs b/items_2/src/binnedcollected.rs index 1bc0652..41bf1fe 100644 --- a/items_2/src/binnedcollected.rs +++ b/items_2/src/binnedcollected.rs @@ -15,7 +15,6 @@ use items_0::TimeBinnable; use items_0::TimeBinner; use items_0::Transformer; use netpod::log::*; -use netpod::transform::Transform; use netpod::BinnedRange; use netpod::BinnedRangeEnum; use netpod::ScalarType; diff --git a/netpod/src/api4/events.rs b/netpod/src/api4/events.rs deleted file mode 100644 index 8b13789..0000000 --- a/netpod/src/api4/events.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index afbfcc9..73128bd 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -1,10 +1,8 @@ -pub mod api4; pub mod histo; pub mod query; pub mod range; pub mod status; pub mod streamext; -pub mod transform; use crate::log::*; use bytes::Bytes; @@ -43,6 +41,13 @@ pub const CONNECTION_STATUS_DIV: u64 = timeunits::DAY; pub const TS_MSP_GRID_UNIT: u64 = timeunits::SEC * 10; pub const TS_MSP_GRID_SPACING: u64 = 6 * 2; +pub fn is_false(x: T) -> bool +where + T: std::borrow::Borrow, +{ + *x.borrow() == false +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct AggQuerySingleChannel { pub channel_config: ChannelConfig, @@ -560,13 +565,6 @@ pub struct NodeStatusSub { pub status: Result, } -fn is_false(x: T) -> bool -where - T: std::borrow::Borrow, -{ - *x.borrow() == false -} - #[derive(Debug, Serialize, Deserialize)] pub struct NodeStatus { pub name: String, diff --git a/netpod/src/query.rs b/netpod/src/query.rs index 7940b06..1bbd77b 100644 --- a/netpod/src/query.rs +++ b/netpod/src/query.rs @@ -5,7 +5,6 @@ pub mod prebinned; use crate::get_url_query_pairs; use crate::is_false; use crate::log::*; -use crate::transform::Transform; use crate::AggKind; use crate::AppendToUrl; use crate::ByteSize; @@ -200,426 +199,6 @@ impl From for PulseRange { } } -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct PlainEventsQuery { - channel: Channel, - range: SeriesRange, - #[serde(default, skip_serializing_if = "is_false", rename = "oneBeforeRange")] - one_before_range: bool, - #[serde( - default = "Transform::default_events", - skip_serializing_if = "Transform::is_default_events" - )] - transform: Transform, - #[serde(default, skip_serializing_if = "Option::is_none", with = "humantime_serde")] - timeout: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] - events_max: Option, - #[serde(default, skip_serializing_if = "Option::is_none", with = "humantime_serde")] - event_delay: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] - stream_batch_len: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] - buf_len_disk_io: Option, - #[serde(default, skip_serializing_if = "is_false")] - do_test_main_error: bool, - #[serde(default, skip_serializing_if = "is_false")] - do_test_stream_error: bool, -} - -impl PlainEventsQuery { - pub fn new(channel: Channel, range: R) -> Self - where - R: Into, - { - Self { - channel, - range: range.into(), - one_before_range: false, - transform: Transform::default_events(), - timeout: Some(Duration::from_millis(4000)), - events_max: Some(10000), - event_delay: None, - stream_batch_len: None, - buf_len_disk_io: None, - do_test_main_error: false, - do_test_stream_error: false, - } - } - - pub fn channel(&self) -> &Channel { - &self.channel - } - - pub fn range(&self) -> &SeriesRange { - &self.range - } - - pub fn one_before_range(&self) -> bool { - self.one_before_range - } - - pub fn transform(&self) -> &Transform { - &self.transform - } - - pub fn buf_len_disk_io(&self) -> usize { - self.buf_len_disk_io.unwrap_or(1024 * 8) - } - - pub fn timeout(&self) -> Duration { - self.timeout.unwrap_or(Duration::from_millis(10000)) - } - - pub fn events_max(&self) -> u64 { - self.events_max.unwrap_or(1024 * 512) - } - - pub fn event_delay(&self) -> &Option { - &self.event_delay - } - - pub fn do_test_main_error(&self) -> bool { - self.do_test_main_error - } - - pub fn do_test_stream_error(&self) -> bool { - self.do_test_stream_error - } - - pub fn set_series_id(&mut self, series: u64) { - self.channel.series = Some(series); - } - - pub fn set_do_test_main_error(&mut self, k: bool) { - self.do_test_main_error = k; - } - - pub fn set_do_test_stream_error(&mut self, k: bool) { - self.do_test_stream_error = k; - } - - pub fn for_event_blobs(self) -> Self { - error!("set transform to event blobs"); - err::todo(); - self - } - - pub fn for_time_weighted_scalar(self) -> Self { - error!("set transform to event blobs"); - err::todo(); - self - } -} - -impl HasBackend for PlainEventsQuery { - fn backend(&self) -> &str { - &self.channel.backend - } -} - -impl HasTimeout for PlainEventsQuery { - fn timeout(&self) -> Duration { - self.timeout() - } -} - -impl FromUrl for PlainEventsQuery { - fn from_url(url: &Url) -> Result { - let pairs = get_url_query_pairs(url); - Self::from_pairs(&pairs) - } - - fn from_pairs(pairs: &BTreeMap) -> Result { - let range = if let Ok(x) = TimeRangeQuery::from_pairs(pairs) { - SeriesRange::TimeRange(x.into()) - } else if let Ok(x) = PulseRangeQuery::from_pairs(pairs) { - SeriesRange::PulseRange(x.into()) - } else { - return Err(Error::with_msg_no_trace("no series range in url")); - }; - let ret = Self { - channel: Channel::from_pairs(pairs)?, - range, - one_before_range: pairs.get("oneBeforeRange").map_or("false", |x| x.as_ref()) == "true", - transform: Transform::from_pairs(pairs)?, - timeout: pairs - .get("timeout") - .map(|x| x.parse::().map(Duration::from_millis).ok()) - .unwrap_or(None), - events_max: pairs - .get("eventsMax") - .map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, - event_delay: pairs.get("eventDelay").map_or(Ok(None), |k| { - k.parse::().map(|x| Duration::from_millis(x)).map(|k| Some(k)) - })?, - stream_batch_len: pairs - .get("streamBatchLen") - .map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, - buf_len_disk_io: pairs - .get("bufLenDiskIo") - .map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, - do_test_main_error: pairs - .get("doTestMainError") - .map_or("false", |k| k) - .parse() - .map_err(|e| Error::with_public_msg(format!("can not parse doTestMainError: {}", e)))?, - do_test_stream_error: pairs - .get("doTestStreamError") - .map_or("false", |k| k) - .parse() - .map_err(|e| Error::with_public_msg(format!("can not parse doTestStreamError: {}", e)))?, - }; - Ok(ret) - } -} - -impl AppendToUrl for PlainEventsQuery { - fn append_to_url(&self, url: &mut Url) { - match &self.range { - SeriesRange::TimeRange(k) => TimeRangeQuery::from(k).append_to_url(url), - SeriesRange::PulseRange(_) => todo!(), - } - self.channel.append_to_url(url); - { - let mut g = url.query_pairs_mut(); - if self.one_before_range() { - g.append_pair("oneBeforeRange", "true"); - } - } - self.transform.append_to_url(url); - let mut g = url.query_pairs_mut(); - if let Some(x) = &self.timeout { - g.append_pair("timeout", &format!("{}", x.as_millis())); - } - if let Some(x) = self.events_max.as_ref() { - g.append_pair("eventsMax", &format!("{}", x)); - } - if let Some(x) = self.event_delay.as_ref() { - g.append_pair("eventDelay", &format!("{:.0}", x.as_secs_f64() * 1e3)); - } - if let Some(x) = self.stream_batch_len.as_ref() { - g.append_pair("streamBatchLen", &format!("{}", x)); - } - if let Some(x) = self.buf_len_disk_io.as_ref() { - g.append_pair("bufLenDiskIo", &format!("{}", x)); - } - if self.do_test_main_error { - g.append_pair("doTestMainError", "true"); - } - if self.do_test_stream_error { - g.append_pair("doTestStreamError", "true"); - } - } -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct BinnedQuery { - channel: Channel, - range: SeriesRange, - bin_count: u32, - #[serde( - default = "Transform::default_time_binned", - skip_serializing_if = "Transform::is_default_time_binned" - )] - transform: Transform, - #[serde(default, skip_serializing_if = "Option::is_none")] - cache_usage: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] - bins_max: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] - timeout: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] - buf_len_disk_io: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] - disk_stats_every: Option, -} - -impl BinnedQuery { - pub fn new(channel: Channel, range: SeriesRange, bin_count: u32) -> Self { - Self { - channel, - range, - bin_count, - transform: Transform::default_time_binned(), - cache_usage: None, - bins_max: None, - buf_len_disk_io: None, - disk_stats_every: None, - timeout: None, - } - } - - pub fn range(&self) -> &SeriesRange { - &self.range - } - - pub fn channel(&self) -> &Channel { - &self.channel - } - - pub fn bin_count(&self) -> u32 { - self.bin_count - } - - pub fn transform(&self) -> &Transform { - &self.transform - } - - pub fn cache_usage(&self) -> CacheUsage { - self.cache_usage.as_ref().map_or(CacheUsage::Use, |x| x.clone()) - } - - pub fn disk_stats_every(&self) -> ByteSize { - match &self.disk_stats_every { - Some(x) => x.clone(), - None => ByteSize(1024 * 1024 * 4), - } - } - - pub fn buf_len_disk_io(&self) -> usize { - match self.buf_len_disk_io { - Some(x) => x, - None => 1024 * 16, - } - } - - pub fn timeout(&self) -> Option { - self.timeout.clone() - } - - pub fn timeout_value(&self) -> Duration { - match &self.timeout { - Some(x) => x.clone(), - None => Duration::from_millis(10000), - } - } - - pub fn bins_max(&self) -> u32 { - self.bins_max.unwrap_or(1024) - } - - pub fn set_series_id(&mut self, series: u64) { - self.channel.series = Some(series); - } - - pub fn channel_mut(&mut self) -> &mut Channel { - &mut self.channel - } - - pub fn set_cache_usage(&mut self, k: CacheUsage) { - self.cache_usage = Some(k); - } - - pub fn set_disk_stats_every(&mut self, k: ByteSize) { - self.disk_stats_every = Some(k); - } - - pub fn set_timeout(&mut self, k: Duration) { - self.timeout = Some(k); - } - - pub fn set_buf_len_disk_io(&mut self, k: usize) { - self.buf_len_disk_io = Some(k); - } - - pub fn for_time_weighted_scalar(self) -> Self { - err::todo(); - self - } -} - -impl HasBackend for BinnedQuery { - fn backend(&self) -> &str { - &self.channel.backend - } -} - -impl HasTimeout for BinnedQuery { - fn timeout(&self) -> Duration { - self.timeout_value() - } -} - -impl FromUrl for BinnedQuery { - fn from_url(url: &Url) -> Result { - let pairs = get_url_query_pairs(url); - Self::from_pairs(&pairs) - } - - fn from_pairs(pairs: &BTreeMap) -> Result { - let range = if let Ok(x) = TimeRangeQuery::from_pairs(pairs) { - SeriesRange::TimeRange(x.into()) - } else if let Ok(x) = PulseRangeQuery::from_pairs(pairs) { - SeriesRange::PulseRange(x.into()) - } else { - return Err(Error::with_msg_no_trace("no series range in url")); - }; - let ret = Self { - channel: Channel::from_pairs(&pairs)?, - range, - bin_count: pairs - .get("binCount") - .ok_or(Error::with_msg("missing binCount"))? - .parse() - .map_err(|e| Error::with_msg(format!("can not parse binCount {:?}", e)))?, - transform: Transform::from_pairs(pairs)?, - cache_usage: CacheUsage::from_pairs(&pairs)?, - buf_len_disk_io: pairs - .get("bufLenDiskIo") - .map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, - disk_stats_every: pairs - .get("diskStatsEveryKb") - .map(|k| k.parse().ok()) - .unwrap_or(None) - .map(ByteSize::kb), - /*report_error: pairs - .get("reportError") - .map_or("false", |k| k) - .parse() - .map_err(|e| Error::with_msg(format!("can not parse reportError {:?}", e)))?,*/ - timeout: pairs - .get("timeout") - .map(|x| x.parse::().map(Duration::from_millis).ok()) - .unwrap_or(None), - bins_max: pairs.get("binsMax").map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, - }; - debug!("BinnedQuery::from_url {:?}", ret); - Ok(ret) - } -} - -impl AppendToUrl for BinnedQuery { - fn append_to_url(&self, url: &mut Url) { - match &self.range { - SeriesRange::TimeRange(k) => TimeRangeQuery::from(k).append_to_url(url), - SeriesRange::PulseRange(k) => PulseRangeQuery::from(k).append_to_url(url), - } - self.channel.append_to_url(url); - { - let mut g = url.query_pairs_mut(); - g.append_pair("binCount", &format!("{}", self.bin_count)); - } - self.transform.append_to_url(url); - let mut g = url.query_pairs_mut(); - if let Some(x) = &self.cache_usage { - g.append_pair("cacheUsage", &x.query_param_value()); - } - if let Some(x) = &self.timeout { - g.append_pair("timeout", &format!("{}", x.as_millis())); - } - if let Some(x) = self.bins_max { - g.append_pair("binsMax", &format!("{}", x)); - } - if let Some(x) = self.buf_len_disk_io { - g.append_pair("bufLenDiskIo", &format!("{}", x)); - } - if let Some(x) = &self.disk_stats_every { - g.append_pair("diskStatsEveryKb", &format!("{}", x.bytes() / 1024)); - } - } -} - pub fn binning_scheme_append_to_url(agg_kind: &AggKind, url: &mut Url) { let mut g = url.query_pairs_mut(); match agg_kind { diff --git a/nodenet/Cargo.toml b/nodenet/Cargo.toml index e5274e1..574ece4 100644 --- a/nodenet/Cargo.toml +++ b/nodenet/Cargo.toml @@ -26,6 +26,7 @@ scylla = "0.7" tokio-postgres = "0.7.7" err = { path = "../err" } netpod = { path = "../netpod" } +query = { path = "../query" } disk = { path = "../disk" } #parse = { path = "../parse" } items = { path = "../items" } diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index d9e2748..4bec02f 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -15,10 +15,10 @@ use items_2::frame::make_error_frame; use items_2::frame::make_term_frame; use netpod::histo::HistoLog2; use netpod::log::*; -use netpod::query::PlainEventsQuery; use netpod::AggKind; use netpod::NodeConfigCached; use netpod::PerfOpts; +use query::api4::events::PlainEventsQuery; use std::net::SocketAddr; use std::pin::Pin; use streams::frames::inmem::InMemoryFrameAsyncReadStream; @@ -277,8 +277,8 @@ async fn events_conn_handler_inner_try( return Err((e, netout).into()); } - let mut stream: Pin> + Send>> = if false { - if true { + let mut stream: Pin> + Send>> = if evq.is_event_blobs() { + if false { error!("TODO support event blob transform"); let e = Error::with_msg(format!("TODO support event blob transform")); return Err((e, netout).into()); @@ -327,15 +327,17 @@ async fn events_conn_handler_inner_try( } }; + let mut buf_len_cnt = 0; + let mut buf_len_sum = 0; let mut buf_len_histo = HistoLog2::new(5); while let Some(item) = stream.next().await { let item = item.make_frame(); match item { Ok(buf) => { - if buf.len() > 1024 * 64 { + buf_len_cnt += 1; + buf_len_sum += buf.len(); + if buf.len() > 1024 * 128 { warn!("emit buf len {}", buf.len()); - } else { - info!("emit buf len {}", buf.len()); } buf_len_histo.ingest(buf.len() as u32); match netout.write_all(&buf).await { @@ -349,6 +351,7 @@ async fn events_conn_handler_inner_try( } } } + info!("buf_len_cnt {} buf_len_avg {}", buf_len_cnt, buf_len_sum / buf_len_cnt); let buf = match make_term_frame() { Ok(k) => k, Err(e) => return Err((e, netout))?, diff --git a/nodenet/src/conn/test.rs b/nodenet/src/conn/test.rs index 6fa919d..b34a292 100644 --- a/nodenet/src/conn/test.rs +++ b/nodenet/src/conn/test.rs @@ -11,7 +11,6 @@ use items_2::channelevents::ChannelEvents; use items_2::framable::EventQueryJsonStringFrame; use items_2::frame::decode_frame; use items_2::frame::make_frame; -use netpod::query::PlainEventsQuery; use netpod::range::evrange::NanoRange; use netpod::timeunits::SEC; use netpod::AggKind; @@ -23,6 +22,7 @@ use netpod::Node; use netpod::NodeConfig; use netpod::NodeConfigCached; use netpod::SfDatabuffer; +use query::api4::events::PlainEventsQuery; use std::time::Duration; use streams::frames::inmem::InMemoryFrameAsyncReadStream; use tokio::io::AsyncWriteExt; diff --git a/query/Cargo.toml b/query/Cargo.toml new file mode 100644 index 0000000..f617061 --- /dev/null +++ b/query/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "query" +version = "0.0.1" +authors = ["Dominik Werder "] +edition = "2021" + +[dependencies] +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +tracing = "0.1" +chrono = { version = "0.4.19", features = ["serde"] } +url = "2.2" +humantime-serde = "1.1.1" +err = { path = "../err" } +netpod = { path = "../netpod" } +items_0 = { path = "../items_0" } diff --git a/netpod/src/api4.rs b/query/src/api4.rs similarity index 50% rename from netpod/src/api4.rs rename to query/src/api4.rs index a9970c2..b11d861 100644 --- a/netpod/src/api4.rs +++ b/query/src/api4.rs @@ -1 +1,2 @@ +pub mod binned; pub mod events; diff --git a/query/src/api4/binned.rs b/query/src/api4/binned.rs new file mode 100644 index 0000000..68c81bd --- /dev/null +++ b/query/src/api4/binned.rs @@ -0,0 +1,226 @@ +use crate::transform::TransformQuery; +use err::Error; +use netpod::get_url_query_pairs; +use netpod::log::*; +use netpod::query::CacheUsage; +use netpod::query::PulseRangeQuery; +use netpod::query::TimeRangeQuery; +use netpod::range::evrange::SeriesRange; +use netpod::AppendToUrl; +use netpod::ByteSize; +use netpod::Channel; +use netpod::FromUrl; +use netpod::HasBackend; +use netpod::HasTimeout; +use serde::Deserialize; +use serde::Serialize; +use std::collections::BTreeMap; +use std::time::Duration; +use url::Url; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct BinnedQuery { + channel: Channel, + range: SeriesRange, + bin_count: u32, + #[serde( + default = "TransformQuery::default_time_binned", + skip_serializing_if = "TransformQuery::is_default_time_binned" + )] + transform: TransformQuery, + #[serde(default, skip_serializing_if = "Option::is_none")] + cache_usage: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + bins_max: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + timeout: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + buf_len_disk_io: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + disk_stats_every: Option, +} + +impl BinnedQuery { + pub fn new(channel: Channel, range: SeriesRange, bin_count: u32) -> Self { + Self { + channel, + range, + bin_count, + transform: TransformQuery::default_time_binned(), + cache_usage: None, + bins_max: None, + buf_len_disk_io: None, + disk_stats_every: None, + timeout: None, + } + } + + pub fn range(&self) -> &SeriesRange { + &self.range + } + + pub fn channel(&self) -> &Channel { + &self.channel + } + + pub fn bin_count(&self) -> u32 { + self.bin_count + } + + pub fn transform(&self) -> &TransformQuery { + &self.transform + } + + pub fn cache_usage(&self) -> CacheUsage { + self.cache_usage.as_ref().map_or(CacheUsage::Use, |x| x.clone()) + } + + pub fn disk_stats_every(&self) -> ByteSize { + match &self.disk_stats_every { + Some(x) => x.clone(), + None => ByteSize(1024 * 1024 * 4), + } + } + + pub fn buf_len_disk_io(&self) -> usize { + match self.buf_len_disk_io { + Some(x) => x, + None => 1024 * 16, + } + } + + pub fn timeout(&self) -> Option { + self.timeout.clone() + } + + pub fn timeout_value(&self) -> Duration { + match &self.timeout { + Some(x) => x.clone(), + None => Duration::from_millis(10000), + } + } + + pub fn bins_max(&self) -> u32 { + self.bins_max.unwrap_or(1024) + } + + pub fn set_series_id(&mut self, series: u64) { + self.channel.series = Some(series); + } + + pub fn channel_mut(&mut self) -> &mut Channel { + &mut self.channel + } + + pub fn set_cache_usage(&mut self, k: CacheUsage) { + self.cache_usage = Some(k); + } + + pub fn set_disk_stats_every(&mut self, k: ByteSize) { + self.disk_stats_every = Some(k); + } + + pub fn set_timeout(&mut self, k: Duration) { + self.timeout = Some(k); + } + + pub fn set_buf_len_disk_io(&mut self, k: usize) { + self.buf_len_disk_io = Some(k); + } + + pub fn for_time_weighted_scalar(self) -> Self { + err::todo(); + self + } +} + +impl HasBackend for BinnedQuery { + fn backend(&self) -> &str { + &self.channel.backend + } +} + +impl HasTimeout for BinnedQuery { + fn timeout(&self) -> Duration { + self.timeout_value() + } +} + +impl FromUrl for BinnedQuery { + fn from_url(url: &Url) -> Result { + let pairs = get_url_query_pairs(url); + Self::from_pairs(&pairs) + } + + fn from_pairs(pairs: &BTreeMap) -> Result { + let range = if let Ok(x) = TimeRangeQuery::from_pairs(pairs) { + SeriesRange::TimeRange(x.into()) + } else if let Ok(x) = PulseRangeQuery::from_pairs(pairs) { + SeriesRange::PulseRange(x.into()) + } else { + return Err(Error::with_msg_no_trace("no series range in url")); + }; + let ret = Self { + channel: Channel::from_pairs(&pairs)?, + range, + bin_count: pairs + .get("binCount") + .ok_or(Error::with_msg("missing binCount"))? + .parse() + .map_err(|e| Error::with_msg(format!("can not parse binCount {:?}", e)))?, + transform: TransformQuery::from_pairs(pairs)?, + cache_usage: CacheUsage::from_pairs(&pairs)?, + buf_len_disk_io: pairs + .get("bufLenDiskIo") + .map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, + disk_stats_every: pairs + .get("diskStatsEveryKb") + .map(|k| k.parse().ok()) + .unwrap_or(None) + .map(ByteSize::kb), + /*report_error: pairs + .get("reportError") + .map_or("false", |k| k) + .parse() + .map_err(|e| Error::with_msg(format!("can not parse reportError {:?}", e)))?,*/ + timeout: pairs + .get("timeout") + .map(|x| x.parse::().map(Duration::from_millis).ok()) + .unwrap_or(None), + bins_max: pairs.get("binsMax").map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, + }; + debug!("BinnedQuery::from_url {:?}", ret); + Ok(ret) + } +} + +impl AppendToUrl for BinnedQuery { + fn append_to_url(&self, url: &mut Url) { + match &self.range { + SeriesRange::TimeRange(k) => TimeRangeQuery::from(k).append_to_url(url), + SeriesRange::PulseRange(k) => PulseRangeQuery::from(k).append_to_url(url), + } + self.channel.append_to_url(url); + { + let mut g = url.query_pairs_mut(); + g.append_pair("binCount", &format!("{}", self.bin_count)); + } + self.transform.append_to_url(url); + let mut g = url.query_pairs_mut(); + if let Some(x) = &self.cache_usage { + g.append_pair("cacheUsage", &x.query_param_value()); + } + if let Some(x) = &self.timeout { + g.append_pair("timeout", &format!("{}", x.as_millis())); + } + if let Some(x) = self.bins_max { + g.append_pair("binsMax", &format!("{}", x)); + } + if let Some(x) = self.buf_len_disk_io { + g.append_pair("bufLenDiskIo", &format!("{}", x)); + } + if let Some(x) = &self.disk_stats_every { + g.append_pair("diskStatsEveryKb", &format!("{}", x.bytes() / 1024)); + } + } +} diff --git a/query/src/api4/events.rs b/query/src/api4/events.rs new file mode 100644 index 0000000..dbe5300 --- /dev/null +++ b/query/src/api4/events.rs @@ -0,0 +1,235 @@ +use crate::transform::TransformQuery; +use err::Error; +use netpod::get_url_query_pairs; +use netpod::is_false; +use netpod::log::*; +use netpod::query::CacheUsage; +use netpod::query::PulseRangeQuery; +use netpod::query::TimeRangeQuery; +use netpod::range::evrange::SeriesRange; +use netpod::AppendToUrl; +use netpod::ByteSize; +use netpod::Channel; +use netpod::FromUrl; +use netpod::HasBackend; +use netpod::HasTimeout; +use serde::Deserialize; +use serde::Serialize; +use std::collections::BTreeMap; +use std::time::Duration; +use url::Url; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct PlainEventsQuery { + channel: Channel, + range: SeriesRange, + #[serde(default, skip_serializing_if = "is_false", rename = "oneBeforeRange")] + one_before_range: bool, + #[serde( + default = "TransformQuery::default_events", + skip_serializing_if = "TransformQuery::is_default_events" + )] + transform: TransformQuery, + #[serde(default, skip_serializing_if = "Option::is_none", with = "humantime_serde")] + timeout: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + events_max: Option, + #[serde(default, skip_serializing_if = "Option::is_none", with = "humantime_serde")] + event_delay: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + stream_batch_len: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + buf_len_disk_io: Option, + #[serde(default, skip_serializing_if = "is_false")] + do_test_main_error: bool, + #[serde(default, skip_serializing_if = "is_false")] + do_test_stream_error: bool, +} + +impl PlainEventsQuery { + pub fn new(channel: Channel, range: R) -> Self + where + R: Into, + { + Self { + channel, + range: range.into(), + one_before_range: false, + transform: TransformQuery::default_events(), + timeout: Some(Duration::from_millis(4000)), + events_max: Some(10000), + event_delay: None, + stream_batch_len: None, + buf_len_disk_io: None, + do_test_main_error: false, + do_test_stream_error: false, + } + } + + pub fn channel(&self) -> &Channel { + &self.channel + } + + pub fn range(&self) -> &SeriesRange { + &self.range + } + + pub fn one_before_range(&self) -> bool { + self.one_before_range + } + + pub fn transform(&self) -> &TransformQuery { + &self.transform + } + + pub fn buf_len_disk_io(&self) -> usize { + self.buf_len_disk_io.unwrap_or(1024 * 8) + } + + pub fn timeout(&self) -> Duration { + self.timeout.unwrap_or(Duration::from_millis(10000)) + } + + pub fn events_max(&self) -> u64 { + self.events_max.unwrap_or(1024 * 512) + } + + pub fn event_delay(&self) -> &Option { + &self.event_delay + } + + pub fn do_test_main_error(&self) -> bool { + self.do_test_main_error + } + + pub fn do_test_stream_error(&self) -> bool { + self.do_test_stream_error + } + + pub fn set_series_id(&mut self, series: u64) { + self.channel.series = Some(series); + } + + pub fn set_do_test_main_error(&mut self, k: bool) { + self.do_test_main_error = k; + } + + pub fn set_do_test_stream_error(&mut self, k: bool) { + self.do_test_stream_error = k; + } + + pub fn for_event_blobs(mut self) -> Self { + self.transform = TransformQuery::for_event_blobs(); + self + } + + pub fn for_time_weighted_scalar(mut self) -> Self { + self.transform = TransformQuery::for_time_weighted_scalar(); + self + } + + pub fn is_event_blobs(&self) -> bool { + self.transform.is_event_blobs() + } +} + +impl HasBackend for PlainEventsQuery { + fn backend(&self) -> &str { + &self.channel.backend + } +} + +impl HasTimeout for PlainEventsQuery { + fn timeout(&self) -> Duration { + self.timeout() + } +} + +impl FromUrl for PlainEventsQuery { + fn from_url(url: &Url) -> Result { + let pairs = get_url_query_pairs(url); + Self::from_pairs(&pairs) + } + + fn from_pairs(pairs: &BTreeMap) -> Result { + let range = if let Ok(x) = TimeRangeQuery::from_pairs(pairs) { + SeriesRange::TimeRange(x.into()) + } else if let Ok(x) = PulseRangeQuery::from_pairs(pairs) { + SeriesRange::PulseRange(x.into()) + } else { + return Err(Error::with_msg_no_trace("no series range in url")); + }; + let ret = Self { + channel: Channel::from_pairs(pairs)?, + range, + one_before_range: pairs.get("oneBeforeRange").map_or("false", |x| x.as_ref()) == "true", + transform: TransformQuery::from_pairs(pairs)?, + timeout: pairs + .get("timeout") + .map(|x| x.parse::().map(Duration::from_millis).ok()) + .unwrap_or(None), + events_max: pairs + .get("eventsMax") + .map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, + event_delay: pairs.get("eventDelay").map_or(Ok(None), |k| { + k.parse::().map(|x| Duration::from_millis(x)).map(|k| Some(k)) + })?, + stream_batch_len: pairs + .get("streamBatchLen") + .map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, + buf_len_disk_io: pairs + .get("bufLenDiskIo") + .map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, + do_test_main_error: pairs + .get("doTestMainError") + .map_or("false", |k| k) + .parse() + .map_err(|e| Error::with_public_msg(format!("can not parse doTestMainError: {}", e)))?, + do_test_stream_error: pairs + .get("doTestStreamError") + .map_or("false", |k| k) + .parse() + .map_err(|e| Error::with_public_msg(format!("can not parse doTestStreamError: {}", e)))?, + }; + Ok(ret) + } +} + +impl AppendToUrl for PlainEventsQuery { + fn append_to_url(&self, url: &mut Url) { + match &self.range { + SeriesRange::TimeRange(k) => TimeRangeQuery::from(k).append_to_url(url), + SeriesRange::PulseRange(_) => todo!(), + } + self.channel.append_to_url(url); + { + let mut g = url.query_pairs_mut(); + if self.one_before_range() { + g.append_pair("oneBeforeRange", "true"); + } + } + self.transform.append_to_url(url); + let mut g = url.query_pairs_mut(); + if let Some(x) = &self.timeout { + g.append_pair("timeout", &format!("{}", x.as_millis())); + } + if let Some(x) = self.events_max.as_ref() { + g.append_pair("eventsMax", &format!("{}", x)); + } + if let Some(x) = self.event_delay.as_ref() { + g.append_pair("eventDelay", &format!("{:.0}", x.as_secs_f64() * 1e3)); + } + if let Some(x) = self.stream_batch_len.as_ref() { + g.append_pair("streamBatchLen", &format!("{}", x)); + } + if let Some(x) = self.buf_len_disk_io.as_ref() { + g.append_pair("bufLenDiskIo", &format!("{}", x)); + } + if self.do_test_main_error { + g.append_pair("doTestMainError", "true"); + } + if self.do_test_stream_error { + g.append_pair("doTestStreamError", "true"); + } + } +} diff --git a/query/src/lib.rs b/query/src/lib.rs new file mode 100644 index 0000000..598bce0 --- /dev/null +++ b/query/src/lib.rs @@ -0,0 +1,2 @@ +pub mod api4; +pub mod transform; diff --git a/netpod/src/transform.rs b/query/src/transform.rs similarity index 75% rename from netpod/src/transform.rs rename to query/src/transform.rs index 6786354..0323ceb 100644 --- a/netpod/src/transform.rs +++ b/query/src/transform.rs @@ -1,8 +1,8 @@ -use crate::get_url_query_pairs; -use crate::log::*; -use crate::AppendToUrl; -use crate::FromUrl; use err::Error; +use netpod::get_url_query_pairs; +use netpod::log::*; +use netpod::AppendToUrl; +use netpod::FromUrl; use serde::Deserialize; use serde::Serialize; use std::collections::BTreeMap; @@ -14,6 +14,7 @@ pub enum EventTransform { EventBlobsUncompressed, ValueFull, ArrayPick(usize), + // TODO should rename to scalar? dim0 will only stay a scalar. MinMaxAvgDev, PulseIdDiff, } @@ -26,12 +27,12 @@ pub enum TimeBinningTransform { } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -pub struct Transform { +pub struct TransformQuery { event: EventTransform, time_binning: TimeBinningTransform, } -impl Transform { +impl TransformQuery { fn url_prefix() -> &'static str { "transform" } @@ -57,9 +58,34 @@ impl Transform { pub fn is_default_time_binned(&self) -> bool { self == &Self::default_time_binned() } + + pub fn for_event_blobs() -> Self { + Self { + event: EventTransform::EventBlobsVerbatim, + time_binning: TimeBinningTransform::None, + } + } + + pub fn for_time_weighted_scalar() -> Self { + Self { + event: EventTransform::MinMaxAvgDev, + time_binning: TimeBinningTransform::TimeWeighted, + } + } + + pub fn is_event_blobs(&self) -> bool { + match &self.event { + EventTransform::EventBlobsVerbatim => true, + EventTransform::EventBlobsUncompressed => { + error!("TODO decide on uncompressed event blobs"); + panic!() + } + _ => false, + } + } } -impl FromUrl for Transform { +impl FromUrl for TransformQuery { fn from_url(url: &Url) -> Result { let pairs = get_url_query_pairs(url); Self::from_pairs(&pairs) @@ -70,34 +96,34 @@ impl FromUrl for Transform { let key = "binningScheme"; if let Some(s) = pairs.get(key) { let ret = if s == "eventBlobs" { - Transform { + TransformQuery { event: EventTransform::EventBlobsVerbatim, time_binning: TimeBinningTransform::None, } } else if s == "fullValue" { - Transform { + TransformQuery { event: EventTransform::ValueFull, time_binning: TimeBinningTransform::None, } } else if s == "timeWeightedScalar" { - Transform { + TransformQuery { event: EventTransform::MinMaxAvgDev, time_binning: TimeBinningTransform::TimeWeighted, } } else if s == "unweightedScalar" { - Transform { + TransformQuery { event: EventTransform::EventBlobsVerbatim, time_binning: TimeBinningTransform::None, } } else if s == "binnedX" { let _u: usize = pairs.get("binnedXcount").map_or("1", |k| k).parse()?; warn!("TODO binnedXcount"); - Transform { + TransformQuery { event: EventTransform::MinMaxAvgDev, time_binning: TimeBinningTransform::None, } } else if s == "pulseIdDiff" { - Transform { + TransformQuery { event: EventTransform::PulseIdDiff, time_binning: TimeBinningTransform::None, } @@ -114,7 +140,7 @@ impl FromUrl for Transform { Err(_) => None, }) .unwrap_or(None); - let ret = Transform { + let ret = TransformQuery { event: EventTransform::EventBlobsVerbatim, time_binning: TimeBinningTransform::None, }; @@ -123,7 +149,7 @@ impl FromUrl for Transform { } } -impl AppendToUrl for Transform { +impl AppendToUrl for TransformQuery { fn append_to_url(&self, url: &mut Url) { warn!("TODO AppendToUrl for Transform"); let upre = Self::url_prefix(); diff --git a/scyllaconn/Cargo.toml b/scyllaconn/Cargo.toml index 00c8b2e..cbea102 100644 --- a/scyllaconn/Cargo.toml +++ b/scyllaconn/Cargo.toml @@ -25,5 +25,6 @@ scylla = "0.7" tokio-postgres = { version = "0.7.7", features = ["with-chrono-0_4", "with-serde_json-1"] } err = { path = "../err" } netpod = { path = "../netpod" } +query = { path = "../query" } items_0 = { path = "../items_0" } items_2 = { path = "../items_2" } diff --git a/scyllaconn/src/bincache.rs b/scyllaconn/src/bincache.rs index b95e6b8..3d46eab 100644 --- a/scyllaconn/src/bincache.rs +++ b/scyllaconn/src/bincache.rs @@ -9,9 +9,7 @@ use items_2::binsdim0::BinsDim0; use items_2::channelevents::ChannelEvents; use netpod::log::*; use netpod::query::CacheUsage; -use netpod::query::PlainEventsQuery; use netpod::timeunits::*; -use netpod::transform::Transform; use netpod::AggKind; use netpod::ChannelTyped; use netpod::Dim0Kind; @@ -21,6 +19,7 @@ use netpod::PreBinnedPatchRange; use netpod::PreBinnedPatchRangeEnum; use netpod::ScalarType; use netpod::Shape; +use query::transform::TransformQuery; use scylla::Session as ScySession; use std::collections::VecDeque; use std::pin::Pin; @@ -216,7 +215,7 @@ pub async fn fetch_uncached_data( chn: ChannelTyped, coord: PreBinnedPatchCoordEnum, one_before_range: bool, - transform: Transform, + transform: TransformQuery, cache_usage: CacheUsage, scy: Arc, ) -> Result, bool)>, Error> { @@ -280,7 +279,7 @@ pub fn fetch_uncached_data_box( chn: &ChannelTyped, coord: &PreBinnedPatchCoordEnum, one_before_range: bool, - transform: Transform, + transform: TransformQuery, cache_usage: CacheUsage, scy: Arc, ) -> Pin, bool)>, Error>> + Send>> { @@ -301,7 +300,7 @@ pub async fn fetch_uncached_higher_res_prebinned( coord: PreBinnedPatchCoordEnum, range: PreBinnedPatchRangeEnum, one_before_range: bool, - transform: Transform, + transform: TransformQuery, cache_usage: CacheUsage, scy: Arc, ) -> Result<(Box, bool), Error> { @@ -381,7 +380,7 @@ pub async fn fetch_uncached_binned_events( chn: &ChannelTyped, coord: PreBinnedPatchCoordEnum, one_before_range: bool, - transform: Transform, + transform: TransformQuery, scy: Arc, ) -> Result<(Box, bool), Error> { /*let edges = coord.edges(); @@ -483,7 +482,7 @@ pub async fn pre_binned_value_stream_with_scy( chn: &ChannelTyped, coord: &PreBinnedPatchCoordEnum, one_before_range: bool, - transform: Transform, + transform: TransformQuery, cache_usage: CacheUsage, scy: Arc, ) -> Result<(Box, bool), Error> { @@ -507,7 +506,7 @@ pub async fn pre_binned_value_stream( chn: &ChannelTyped, coord: &PreBinnedPatchCoordEnum, one_before_range: bool, - transform: Transform, + transform: TransformQuery, agg_kind: AggKind, cache_usage: CacheUsage, scy: Arc, diff --git a/streams/Cargo.toml b/streams/Cargo.toml index 13fd87f..f00bedd 100644 --- a/streams/Cargo.toml +++ b/streams/Cargo.toml @@ -20,6 +20,7 @@ chrono = { version = "0.4.19", features = ["serde"] } wasmer = { version = "3.1.1", default-features = false, features = ["sys", "cranelift"] } err = { path = "../err" } netpod = { path = "../netpod" } +query = { path = "../query" } items = { path = "../items" } items_0 = { path = "../items_0" } items_2 = { path = "../items_2" } diff --git a/streams/src/eventchunker.rs b/streams/src/eventchunker.rs index 61f1b58..5c7e0ed 100644 --- a/streams/src/eventchunker.rs +++ b/streams/src/eventchunker.rs @@ -96,7 +96,7 @@ impl EventChunker { expand: bool, do_decompress: bool, ) -> Self { - trace!("EventChunker::from_start"); + info!("EventChunker::{} do_decompress {}", "from_start", do_decompress); let mut inp = NeedMinBuffer::new(inp); inp.set_need_min(6); Self { @@ -136,6 +136,10 @@ impl EventChunker { expand: bool, do_decompress: bool, ) -> Self { + info!( + "EventChunker::{} do_decompress {}", + "from_event_boundary", do_decompress + ); let mut ret = Self::from_start(inp, channel_config, range, stats_conf, dbg_path, expand, do_decompress); ret.state = DataFileState::Event; ret.need_min = 4; diff --git a/streams/src/plaineventsjson.rs b/streams/src/plaineventsjson.rs index d09670a..f26e30d 100644 --- a/streams/src/plaineventsjson.rs +++ b/streams/src/plaineventsjson.rs @@ -6,10 +6,9 @@ use futures_util::StreamExt; use items_0::streamitem::sitem_data; use items_2::channelevents::ChannelEvents; use netpod::log::*; -use netpod::query::PlainEventsQuery; -use netpod::AggKind; use netpod::ChConf; use netpod::Cluster; +use query::api4::events::PlainEventsQuery; use serde_json::Value as JsonValue; use std::time::Duration; use std::time::Instant; diff --git a/streams/src/tcprawclient.rs b/streams/src/tcprawclient.rs index f2b8c2b..10a67bf 100644 --- a/streams/src/tcprawclient.rs +++ b/streams/src/tcprawclient.rs @@ -20,10 +20,10 @@ use items_2::framable::EventQueryJsonStringFrame; use items_2::frame::make_frame; use items_2::frame::make_term_frame; use netpod::log::*; -use netpod::query::PlainEventsQuery; use netpod::Cluster; use netpod::Node; use netpod::PerfOpts; +use query::api4::events::PlainEventsQuery; use serde::de::DeserializeOwned; use serde::Serialize; use std::fmt; diff --git a/streams/src/timebinnedjson.rs b/streams/src/timebinnedjson.rs index 9204cc8..a2303dd 100644 --- a/streams/src/timebinnedjson.rs +++ b/streams/src/timebinnedjson.rs @@ -8,13 +8,12 @@ use items_0::streamitem::sitem_data; use items_0::streamitem::Sitemty; use items_2::channelevents::ChannelEvents; use items_2::merger::Merger; -#[allow(unused)] use netpod::log::*; -use netpod::query::BinnedQuery; -use netpod::query::PlainEventsQuery; use netpod::BinnedRangeEnum; use netpod::ChConf; use netpod::Cluster; +use query::api4::binned::BinnedQuery; +use query::api4::events::PlainEventsQuery; use serde_json::Value as JsonValue; use std::time::Instant;