diff --git a/daqbufp2/src/client.rs b/daqbufp2/src/client.rs index 425672c..deea1f2 100644 --- a/daqbufp2/src/client.rs +++ b/daqbufp2/src/client.rs @@ -14,7 +14,6 @@ use netpod::range::evrange::NanoRange; use netpod::AppendToUrl; use netpod::ByteSize; use netpod::HostPort; -use netpod::PerfOpts; use netpod::SfDbChannel; use netpod::APP_OCTET; use query::api4::binned::BinnedQuery; @@ -94,9 +93,8 @@ pub async fn get_binned( head, s ))); } - let perf_opts = PerfOpts::default(); let s1 = HttpBodyAsAsyncRead::new(res); - let s2 = InMemoryFrameAsyncReadStream::new(s1, perf_opts.inmem_bufcap); + let s2 = InMemoryFrameAsyncReadStream::new(s1, ByteSize::from_kb(8)); use futures_util::StreamExt; use std::future::ready; let s3 = s2 diff --git a/disk/src/aggtest.rs b/disk/src/aggtest.rs index 96ac227..7a46206 100644 --- a/disk/src/aggtest.rs +++ b/disk/src/aggtest.rs @@ -75,7 +75,7 @@ async fn agg_x_dim_0_inner() { let ts1 = query.timebin as u64 * query.channel_config.time_bin_size.ns(); let ts2 = ts1 + HOUR * 24; let range = NanoRange { beg: ts1, end: ts2 }; - let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); + let event_chunker_conf = EventChunkerConf::new(ByteSize::from_kb(1024)); // TODO let upstream already provide DiskIoTune: let mut disk_io_tune = DiskIoTune::default_for_testing(); disk_io_tune.read_buffer_len = query.buffer_size as usize; @@ -137,7 +137,7 @@ async fn agg_x_dim_1_inner() { let ts1 = query.timebin as u64 * query.channel_config.time_bin_size.ns(); let ts2 = ts1 + HOUR * 24; let range = NanoRange { beg: ts1, end: ts2 }; - let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); + let event_chunker_conf = EventChunkerConf::new(ByteSize::from_kb(1024)); // TODO let upstream already provide DiskIoTune: let mut disk_io_tune = DiskIoTune::default_for_testing(); disk_io_tune.read_buffer_len = query.buffer_size as usize; diff --git a/disk/src/channelconfig.rs b/disk/src/channelconfig.rs index 6587b89..f5da346 100644 --- a/disk/src/channelconfig.rs +++ b/disk/src/channelconfig.rs @@ -24,7 +24,7 @@ pub async fn config_entry_best_match( } } -pub async fn configs(channel: SfDbChannel, node_config: &NodeConfigCached) -> Result { +pub async fn channel_configs(channel: SfDbChannel, node_config: &NodeConfigCached) -> Result { read_local_config(channel.clone(), node_config.clone()).await } diff --git a/disk/src/merge/mergedblobsfromremotes.rs b/disk/src/merge/mergedblobsfromremotes.rs index d15fd6d..5c9c780 100644 --- a/disk/src/merge/mergedblobsfromremotes.rs +++ b/disk/src/merge/mergedblobsfromremotes.rs @@ -6,13 +6,8 @@ use items_0::streamitem::Sitemty; use items_2::eventfull::EventFull; use items_2::merger::Merger; use netpod::log::*; -use netpod::ChannelTypeConfigGen; use netpod::Cluster; -use netpod::PerfOpts; use query::api4::events::EventsSubQuery; -use query::api4::events::EventsSubQuerySelect; -use query::api4::events::EventsSubQuerySettings; -use query::api4::events::PlainEventsQuery; use std::future::Future; use std::pin::Pin; use std::task::Context; @@ -31,19 +26,11 @@ pub struct MergedBlobsFromRemotes { } impl MergedBlobsFromRemotes { - pub fn new(evq: PlainEventsQuery, perf_opts: PerfOpts, ch_conf: ChannelTypeConfigGen, cluster: Cluster) -> Self { - debug!("MergedBlobsFromRemotes evq {:?}", evq); - let select = EventsSubQuerySelect::new(ch_conf.clone(), evq.range().clone(), evq.transform().clone()); - let settings = EventsSubQuerySettings::from(&evq); - let subq = EventsSubQuery::from_parts(select, settings); + pub fn new(subq: EventsSubQuery, cluster: Cluster) -> Self { + debug!("MergedBlobsFromRemotes evq {:?}", subq); let mut tcp_establish_futs = Vec::new(); for node in &cluster.nodes { - let f = x_processed_event_blobs_stream_from_node( - subq.clone(), - ch_conf.clone(), - perf_opts.clone(), - node.clone(), - ); + let f = x_processed_event_blobs_stream_from_node(subq.clone(), node.clone()); let f: T002 = Box::pin(f); tcp_establish_futs.push(f); } diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index f7c1a92..8236cfd 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -64,7 +64,7 @@ pub async fn make_event_pipe( "make_event_pipe need_expand {need_expand} {evq:?}", need_expand = one_before ); - let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); + let event_chunker_conf = EventChunkerConf::new(ByteSize::from_kb(1024)); // TODO should not need this for correctness. // Should limit based on return size and latency. let out_max_len = if ncc.node_config.cluster.is_central_storage { @@ -169,7 +169,7 @@ pub async fn make_event_blobs_pipe_real( } let expand = subq.transform().need_one_before_range(); let range = subq.range(); - let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); + let event_chunker_conf = EventChunkerConf::new(ByteSize::from_kb(1024)); // TODO should depend on host config let do_local = node_config.node_config.cluster.is_central_storage; let pipe = if do_local { diff --git a/dq/src/bin/dq.rs b/dq/src/bin/dq.rs index 7b873e3..1879740 100644 --- a/dq/src/bin/dq.rs +++ b/dq/src/bin/dq.rs @@ -92,7 +92,7 @@ pub fn main() -> Result<(), Error> { end: u64::MAX, }; let stats_conf = EventChunkerConf { - disk_stats_every: ByteSize::mb(2), + disk_stats_every: ByteSize::from_mb(2), }; let _chunks = EventChunker::from_start(inp, fetch_info, range, stats_conf, path.clone(), false, true); err::todo(); diff --git a/httpclient/Cargo.toml b/httpclient/Cargo.toml index 9f3047e..9d84bdc 100644 --- a/httpclient/Cargo.toml +++ b/httpclient/Cargo.toml @@ -4,9 +4,6 @@ version = "0.0.2" authors = ["Dominik Werder "] edition = "2021" -[lib] -path = "src/httpclient.rs" - [dependencies] futures-util = "0.3.25" serde = { version = "1.0.147", features = ["derive"] } diff --git a/httpclient/src/httpclient.rs b/httpclient/src/httpclient.rs index bb12099..7231474 100644 --- a/httpclient/src/httpclient.rs +++ b/httpclient/src/httpclient.rs @@ -1,14 +1,25 @@ use bytes::Bytes; -use err::{Error, PublicError}; +use err::Error; +use err::PublicError; use futures_util::pin_mut; -use http::{header, Request, Response, StatusCode}; +use http::header; +use http::Request; +use http::Response; +use http::StatusCode; use hyper::body::HttpBody; -use hyper::{Body, Method}; +use hyper::Body; +use hyper::Method; use netpod::log::*; -use netpod::{AppendToUrl, ChannelConfigQuery, ChannelConfigResponse, NodeConfigCached}; +use netpod::AppendToUrl; +use netpod::ChannelConfigQuery; +use netpod::ChannelConfigResponse; +use netpod::NodeConfigCached; use std::pin::Pin; -use std::task::{Context, Poll}; -use tokio::io::{self, AsyncRead, ReadBuf}; +use std::task::Context; +use std::task::Poll; +use tokio::io; +use tokio::io::AsyncRead; +use tokio::io::ReadBuf; use url::Url; pub trait ErrConv { @@ -39,7 +50,6 @@ pub async fn http_get(url: Url, accept: &str) -> Result { let client = hyper::Client::new(); let res = client.request(req).await.ec()?; if res.status() != StatusCode::OK { - error!("Server error {:?}", res); let (head, body) = res.into_parts(); let buf = hyper::body::to_bytes(body).await.ec()?; let s = String::from_utf8_lossy(&buf); diff --git a/httpclient/src/lib.rs b/httpclient/src/lib.rs new file mode 100644 index 0000000..fa9aa4b --- /dev/null +++ b/httpclient/src/lib.rs @@ -0,0 +1,4 @@ +pub mod httpclient; + +pub use crate::httpclient::*; +pub use url; diff --git a/httpret/Cargo.toml b/httpret/Cargo.toml index f412447..2141731 100644 --- a/httpret/Cargo.toml +++ b/httpret/Cargo.toml @@ -37,3 +37,4 @@ nodenet = { path = "../nodenet" } commonio = { path = "../commonio" } taskrun = { path = "../taskrun" } scyllaconn = { path = "../scyllaconn" } +httpclient = { path = "../httpclient" } diff --git a/httpret/src/api1.rs b/httpret/src/api1.rs index 3de3703..aed273e 100644 --- a/httpret/src/api1.rs +++ b/httpret/src/api1.rs @@ -35,7 +35,6 @@ use netpod::ChannelSearchResult; use netpod::ChannelTypeConfigGen; use netpod::DiskIoTune; use netpod::NodeConfigCached; -use netpod::PerfOpts; use netpod::ProxyConfig; use netpod::SfChFetchInfo; use netpod::SfDbChannel; @@ -45,7 +44,10 @@ use netpod::APP_JSON; use netpod::APP_OCTET; use parse::api1_parse::Api1ByteOrder; use parse::api1_parse::Api1ChannelHeader; -use query::api4::events::PlainEventsQuery; +use query::api4::events::EventsSubQuery; +use query::api4::events::EventsSubQuerySelect; +use query::api4::events::EventsSubQuerySettings; +use query::transform::TransformQuery; use serde::Deserialize; use serde::Serialize; use serde_json::Value as JsonValue; @@ -523,12 +525,12 @@ async fn find_ch_conf( pub struct DataApiPython3DataStream { range: NanoRange, - channels: VecDeque, - current_channel: Option, + channels: VecDeque, + settings: EventsSubQuerySettings, + current_channel: Option, node_config: NodeConfigCached, chan_stream: Option> + Send>>>, config_fut: Option> + Send>>>, - ch_conf: Option, disk_io_tune: DiskIoTune, do_decompress: bool, #[allow(unused)] @@ -543,7 +545,8 @@ pub struct DataApiPython3DataStream { impl DataApiPython3DataStream { pub fn new( range: NanoRange, - channels: Vec, + channels: Vec, + settings: EventsSubQuerySettings, disk_io_tune: DiskIoTune, do_decompress: bool, events_max: u64, @@ -553,11 +556,11 @@ impl DataApiPython3DataStream { Self { range, channels: channels.into_iter().collect(), + settings, current_channel: None, node_config, chan_stream: None, config_fut: None, - ch_conf: None, disk_io_tune, do_decompress, event_count: 0, @@ -571,7 +574,7 @@ impl DataApiPython3DataStream { fn convert_item( b: EventFull, - channel: &SfDbChannel, + channel: &ChannelTypeConfigGen, fetch_info: &SfChFetchInfo, header_out: &mut bool, count_events: &mut usize, @@ -674,22 +677,26 @@ impl DataApiPython3DataStream { fn handle_config_fut_ready(&mut self, fetch_info: SfChFetchInfo) -> Result<(), Error> { self.config_fut = None; debug!("found channel_config {:?}", fetch_info); - let channel = SfDbChannel::from_name(fetch_info.backend(), fetch_info.name()); - let evq = PlainEventsQuery::new(channel.clone(), self.range.clone()).for_event_blobs(); - debug!("query for event blobs retrieval: evq {evq:?}"); + let select = EventsSubQuerySelect::new( + ChannelTypeConfigGen::SfDatabuffer(fetch_info.clone()), + self.range.clone().into(), + TransformQuery::for_event_blobs(), + ); + let subq = EventsSubQuery::from_parts(select, self.settings.clone()); + let one_before = subq.transform().need_one_before_range(); + debug!("query for event blobs retrieval subq {subq:?}"); // TODO important TODO debug!("TODO fix magic inmem_bufcap"); debug!("TODO add timeout option to data api3 download"); - let perf_opts = PerfOpts::default(); // TODO is this a good to place decide this? let s = if self.node_config.node_config.cluster.is_central_storage { info!("Set up central storage stream"); // TODO pull up this config - let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); + let event_chunker_conf = EventChunkerConf::new(ByteSize::from_kb(1024)); let s = make_local_event_blobs_stream( - evq.range().try_into()?, + self.range.clone(), &fetch_info, - evq.one_before_range(), + one_before, self.do_decompress, event_chunker_conf, self.disk_io_tune.clone(), @@ -698,8 +705,7 @@ impl DataApiPython3DataStream { Box::pin(s) as Pin> + Send>> } else { debug!("Set up merged remote stream"); - let ch_conf: ChannelTypeConfigGen = fetch_info.clone().into(); - let s = MergedBlobsFromRemotes::new(evq, perf_opts, ch_conf, self.node_config.node_config.cluster.clone()); + let s = MergedBlobsFromRemotes::new(subq, self.node_config.node_config.cluster.clone()); Box::pin(s) as Pin> + Send>> }; let s = s.map({ @@ -782,11 +788,14 @@ impl Stream for DataApiPython3DataStream { } else { if let Some(channel) = self.channels.pop_front() { self.current_channel = Some(channel.clone()); - self.config_fut = Some(Box::pin(find_ch_conf( - self.range.clone(), - channel, - self.node_config.clone(), - ))); + if false { + self.config_fut = Some(Box::pin(find_ch_conf( + self.range.clone(), + err::todoval(), + self.node_config.clone(), + ))); + } + self.config_fut = Some(Box::pin(futures_util::future::ready(Ok(channel)))); continue; } else { self.data_done = true; @@ -805,6 +814,7 @@ impl Stream for DataApiPython3DataStream { } } +#[allow(unused)] fn shape_to_api3proto(sh: &Option>) -> Vec { match sh { None => vec![], @@ -883,7 +893,7 @@ impl Api1EventsBinaryHandler { qu: Api1Query, accept: String, span: tracing::Span, - node_config: &NodeConfigCached, + ncc: &NodeConfigCached, ) -> Result, Error> { // TODO this should go to usage statistics: info!( @@ -892,6 +902,7 @@ impl Api1EventsBinaryHandler { qu.channels().len(), qu.channels().first() ); + let settings = EventsSubQuerySettings::from(&qu); let beg_date = qu.range().beg().clone(); let end_date = qu.range().end().clone(); trace!("Api1Query beg_date {:?} end_date {:?}", beg_date, end_date); @@ -902,22 +913,27 @@ impl Api1EventsBinaryHandler { let end = end_date.timestamp() as u64 * SEC + end_date.timestamp_subsec_nanos() as u64; let range = NanoRange { beg, end }; // TODO check for valid given backend name: - let backend = &node_config.node_config.cluster.backend; - let chans = qu - .channels() - .iter() - .map(|ch| SfDbChannel::from_name(backend, ch.name())) - .collect(); + let backend = &ncc.node_config.cluster.backend; + // TODO ask for channel config quorum for all channels up front. + //httpclient::http_get(url, accept); + let mut chans = Vec::new(); + for ch in qu.channels() { + let ch = SfDbChannel::from_name(backend, ch.name()); + let ch_conf = nodenet::configquorum::find_config_basics_quorum(ch, range.clone().into(), ncc).await?; + chans.push(ch_conf); + } // TODO use a better stream protocol with built-in error delivery. let status_id = super::status_board()?.new_status_id(); let s = DataApiPython3DataStream::new( range.clone(), chans, - qu.disk_io_tune().clone(), + // TODO carry those settings from the query again + settings, + DiskIoTune::default(), qu.decompress(), qu.events_max().unwrap_or(u64::MAX), status_id.clone(), - node_config.clone(), + ncc.clone(), ); let s = s.instrument(span); let body = BodyStream::wrapped(s, format!("Api1EventsBinaryHandler")); diff --git a/httpret/src/channelconfig.rs b/httpret/src/channelconfig.rs index fa54b32..1cf3625 100644 --- a/httpret/src/channelconfig.rs +++ b/httpret/src/channelconfig.rs @@ -37,17 +37,18 @@ pub async fn chconf_from_events_v1( q: &PlainEventsQuery, ncc: &NodeConfigCached, ) -> Result { - let ret = nodenet::configquorum::find_config_basics_quorum(q.channel(), ncc).await?; + let ret = nodenet::configquorum::find_config_basics_quorum(q.channel().clone(), q.range().clone(), ncc).await?; Ok(ret) } pub async fn chconf_from_prebinned(q: &PreBinnedQuery, ncc: &NodeConfigCached) -> Result { - let ret = nodenet::configquorum::find_config_basics_quorum(q.channel(), ncc).await?; + let ret = + nodenet::configquorum::find_config_basics_quorum(q.channel().clone(), q.patch().patch_range(), ncc).await?; Ok(ret) } pub async fn ch_conf_from_binned(q: &BinnedQuery, ncc: &NodeConfigCached) -> Result { - let ret = nodenet::configquorum::find_config_basics_quorum(q.channel(), ncc).await?; + let ret = nodenet::configquorum::find_config_basics_quorum(q.channel().clone(), q.range().clone(), ncc).await?; Ok(ret) } @@ -142,10 +143,10 @@ impl ChannelConfigsHandler { let url = Url::parse(&format!("dummy:{}", req.uri()))?; let q = ChannelConfigQuery::from_url(&url)?; info!("channel_configs for q {q:?}"); - let ch_conf = nodenet::channelconfig::channel_config(q.range.clone(), q.channel, ncc).await?; + let ch_confs = nodenet::channelconfig::channel_configs(q.channel, ncc).await?; let ret = response(StatusCode::OK) .header(http::header::CONTENT_TYPE, APP_JSON) - .body(Body::from(serde_json::to_string(&ch_conf)?))?; + .body(Body::from(serde_json::to_string(&ch_confs)?))?; Ok(ret) } } diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index 7809b30..bf4ec5a 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -960,7 +960,20 @@ mod serde_shape { } impl Shape { - pub fn from_bsread_jsval(v: &JsVal) -> Result { + pub fn from_sf_databuffer_raw(v: &Option>) -> Result { + let ret = match v { + Some(a) => match a.len() { + 0 => Shape::Scalar, + 1 => Shape::Wave(a[0]), + 2 => Shape::Image(a[0], a[1]), + _ => return Err(Error::with_msg_no_trace("can not understand sf databuffer shape spec")), + }, + None => Shape::Scalar, + }; + Ok(ret) + } + + pub fn from_bsread_jsval(v: &JsVal) -> Result { match v { JsVal::Array(v) => match v.len() { 0 => Ok(Shape::Scalar), @@ -991,7 +1004,7 @@ impl Shape { } // TODO use simply a list to represent all shapes: empty, or with 1 or 2 entries. - pub fn from_db_jsval(v: &JsVal) -> Result { + pub fn from_db_jsval(v: &JsVal) -> Result { match v { JsVal::String(s) => { if s == "Scalar" { @@ -1636,6 +1649,13 @@ impl PreBinnedPatchCoordEnum { todo!() } + pub fn patch_range(&self) -> SeriesRange { + match self { + PreBinnedPatchCoordEnum::Time(k) => k.series_range(), + PreBinnedPatchCoordEnum::Pulse(k) => k.series_range(), + } + } + pub fn span_desc(&self) -> String { match self { PreBinnedPatchCoordEnum::Time(k) => { @@ -2228,32 +2248,22 @@ impl ReadExactStats { } } -#[derive(Clone, Debug)] -pub struct PerfOpts { - pub inmem_bufcap: usize, -} - -impl PerfOpts { - pub fn default() -> Self { - Self { - inmem_bufcap: 1024 * 512, - } - } -} - #[derive(Clone, Debug, Serialize, Deserialize)] pub struct ByteSize(pub u32); impl ByteSize { - pub fn b(b: u32) -> Self { + pub fn from_bytes(b: u32) -> Self { Self(b) } - pub fn kb(kb: u32) -> Self { + + pub fn from_kb(kb: u32) -> Self { Self(1024 * kb) } - pub fn mb(mb: u32) -> Self { + + pub fn from_mb(mb: u32) -> Self { Self(1024 * 1024 * mb) } + pub fn bytes(&self) -> u32 { self.0 } @@ -2454,7 +2464,7 @@ pub fn get_url_query_pairs(url: &Url) -> BTreeMap { // At least on some backends the channel configuration may change depending on the queried range. // Therefore, the query includes the range. // The presence of a configuration in some range does not imply that there is any data available. -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct ChannelConfigQuery { pub channel: SfDbChannel, pub range: NanoRange, diff --git a/netpod/src/query/api1.rs b/netpod/src/query/api1.rs index 74f9c59..19e4bec 100644 --- a/netpod/src/query/api1.rs +++ b/netpod/src/query/api1.rs @@ -3,6 +3,7 @@ use crate::{DiskIoTune, FileIoBufferSize, ReadSys}; use err::Error; use serde::{Deserialize, Serialize}; use std::fmt; +use std::time::Duration; fn bool_true() -> bool { true @@ -225,6 +226,8 @@ mod serde_channel_tuple { pub struct Api1Query { range: Api1Range, channels: Vec, + #[serde(default, skip_serializing_if = "Option::is_none")] + timeout: Option, // All following parameters are private and not to be used #[serde(default, skip_serializing_if = "Option::is_none")] file_io_buffer_size: Option, @@ -245,6 +248,7 @@ impl Api1Query { Self { range, channels, + timeout: None, decompress: true, events_max: None, file_io_buffer_size: None, @@ -275,6 +279,14 @@ impl Api1Query { &self.channels } + pub fn timeout(&self) -> Option { + self.timeout + } + + pub fn timeout_or_default(&self) -> Duration { + Duration::from_secs(60 * 30) + } + pub fn log_level(&self) -> &str { &self.log_level } diff --git a/netpod/src/query/prebinned.rs b/netpod/src/query/prebinned.rs index b1d6a23..1361f0e 100644 --- a/netpod/src/query/prebinned.rs +++ b/netpod/src/query/prebinned.rs @@ -76,7 +76,7 @@ impl PreBinnedQuery { .get("diskStatsEveryKb") .map(|k| k.parse().ok()) .unwrap_or(None) - .map(ByteSize::kb), + .map(ByteSize::from_kb), }; Ok(ret) } diff --git a/nodenet/Cargo.toml b/nodenet/Cargo.toml index bcc8034..f292daa 100644 --- a/nodenet/Cargo.toml +++ b/nodenet/Cargo.toml @@ -33,3 +33,4 @@ dbconn = { path = "../dbconn" } scyllaconn = { path = "../scyllaconn" } taskrun = { path = "../taskrun" } streams = { path = "../streams" } +httpclient = { path = "../httpclient" } diff --git a/nodenet/src/channelconfig.rs b/nodenet/src/channelconfig.rs index dba4d89..8f98a4d 100644 --- a/nodenet/src/channelconfig.rs +++ b/nodenet/src/channelconfig.rs @@ -1,8 +1,12 @@ use err::Error; +use httpclient::url::Url; use netpod::log::*; use netpod::range::evrange::NanoRange; use netpod::timeunits::DAY; +use netpod::AppendToUrl; use netpod::ByteOrder; +use netpod::ChannelConfigQuery; +use netpod::ChannelConfigResponse; use netpod::ChannelTypeConfigGen; use netpod::DtNano; use netpod::NodeConfigCached; @@ -10,6 +14,7 @@ use netpod::ScalarType; use netpod::SfChFetchInfo; use netpod::SfDbChannel; use netpod::Shape; +use netpod::APP_JSON; const TEST_BACKEND: &str = "testbackend-00"; @@ -126,3 +131,49 @@ pub async fn channel_config( ); } } + +pub async fn channel_configs(channel: SfDbChannel, ncc: &NodeConfigCached) -> Result, Error> { + if channel.backend() == TEST_BACKEND { + let x = vec![channel_config_test_backend(channel)?]; + Ok(x) + } else if ncc.node_config.cluster.scylla.is_some() { + debug!("try to get ChConf for scylla type backend"); + let ret = dbconn::channelconfig::chconf_from_scylla_type_backend(&channel, ncc) + .await + .map_err(Error::from)?; + Ok(vec![ChannelTypeConfigGen::Scylla(ret)]) + } else if ncc.node.sf_databuffer.is_some() { + debug!("channel_config channel {channel:?}"); + let configs = disk::channelconfig::channel_configs(channel.clone(), ncc).await?; + let a = configs; + let mut configs = Vec::new(); + for config in a.entries { + let ret = SfChFetchInfo::new( + channel.backend(), + channel.name(), + config.ks as _, + config.bs, + config.byte_order, + config.scalar_type, + Shape::from_sf_databuffer_raw(&config.shape)?, + ); + let ret = ChannelTypeConfigGen::SfDatabuffer(ret); + configs.push(ret); + } + Ok(configs) + } else { + return Err( + Error::with_msg_no_trace(format!("no channel config for backend {}", channel.backend())) + .add_public_msg(format!("no channel config for backend {}", channel.backend())), + ); + } +} + +pub async fn http_get_channel_config(qu: ChannelConfigQuery, baseurl: Url) -> Result { + let url = baseurl; + let mut url = url.join("channel/config").unwrap(); + qu.append_to_url(&mut url); + let res = httpclient::http_get(url, APP_JSON).await?; + let ret: ChannelConfigResponse = serde_json::from_slice(&res)?; + Ok(ret) +} diff --git a/nodenet/src/configquorum.rs b/nodenet/src/configquorum.rs index 72397d1..8044da7 100644 --- a/nodenet/src/configquorum.rs +++ b/nodenet/src/configquorum.rs @@ -1,24 +1,54 @@ +use crate::channelconfig::http_get_channel_config; use disk::SfDbChConf; use err::Error; +use httpclient::url::Url; +use netpod::log::*; +use netpod::range::evrange::SeriesRange; +use netpod::ChannelConfigQuery; use netpod::ChannelTypeConfigGen; use netpod::NodeConfigCached; use netpod::SfChFetchInfo; use netpod::SfDbChannel; -async fn find_sf_ch_config_quorum() -> Result { +async fn find_sf_ch_config_quorum( + channel: SfDbChannel, + range: SeriesRange, + ncc: &NodeConfigCached, +) -> Result { + let range = match range { + SeriesRange::TimeRange(x) => x, + SeriesRange::PulseRange(_) => return Err(Error::with_msg_no_trace("expect TimeRange")), + }; type _A = SfDbChannel; type _B = SfDbChConf; + let mut ress = Vec::new(); + for node in &ncc.node_config.cluster.nodes { + // TODO add a baseurl function to struct Node + let baseurl: Url = format!("http://{}:{}/api/4/", node.host, node.port).parse()?; + let qu = ChannelConfigQuery { + channel: channel.clone(), + range: range.clone(), + // TODO + expand: false, + }; + let res = http_get_channel_config(qu, baseurl.clone()).await?; + info!("GOT: {res:?}"); + ress.push(res); + } + // TODO find most likely values. + // TODO create new endpoint which only returns the most matching config entry // for some given channel and time range. todo!() } pub async fn find_config_basics_quorum( - channel: &SfDbChannel, + channel: SfDbChannel, + range: SeriesRange, ncc: &NodeConfigCached, ) -> Result { if let Some(_cfg) = &ncc.node.sf_databuffer { - let ret: SfChFetchInfo = find_sf_ch_config_quorum().await?; + let ret: SfChFetchInfo = find_sf_ch_config_quorum(channel, range, ncc).await?; Ok(ChannelTypeConfigGen::SfDatabuffer(ret)) } else if let Some(_cfg) = &ncc.node_config.cluster.scylla { let ret = dbconn::channelconfig::chconf_from_scylla_type_backend(&channel, ncc) diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index bd6b15a..a3e1782 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -19,14 +19,9 @@ use items_2::frame::make_term_frame; use items_2::inmem::InMemoryFrame; use netpod::histo::HistoLog2; use netpod::log::*; -use netpod::ChannelTypeConfigGen; use netpod::NodeConfigCached; -use netpod::PerfOpts; use query::api4::events::EventsSubQuery; use query::api4::events::Frame1Parts; -use query::api4::events::PlainEventsQuery; -use serde::Deserialize; -use serde::Serialize; use std::net::SocketAddr; use std::pin::Pin; use streams::frames::inmem::InMemoryFrameAsyncReadStream; @@ -102,7 +97,7 @@ async fn make_channel_events_stream_data( "make_channel_events_stream_data can not understand test channel name: {chn:?}" ))) } else { - let range = subq.range().clone(); + let _range = subq.range().clone(); if na[1] == "d0" { if na[2] == "i32" { //generator::generate_i32(node_ix, node_count, range) @@ -151,8 +146,7 @@ async fn make_channel_events_stream( } async fn events_get_input_frames(netin: OwnedReadHalf) -> Result, Error> { - let perf_opts = PerfOpts::default(); - let mut h = InMemoryFrameAsyncReadStream::new(netin, perf_opts.inmem_bufcap); + let mut h = InMemoryFrameAsyncReadStream::new(netin, netpod::ByteSize::from_kb(1)); let mut frames = Vec::new(); while let Some(k) = h .next() @@ -204,7 +198,6 @@ async fn events_parse_input_query(frames: Vec) -> Result<(EventsS error!("{e}"); e })?; - info!("events_parse_input_query {:?}", frame1); Ok(frame1.parts()) } @@ -223,6 +216,7 @@ async fn events_conn_handler_inner_try( Ok(x) => x, Err(e) => return Err((e, netout).into()), }; + info!("events_parse_input_query {evq:?}"); if evq.create_errors_contains("nodenet_parse_query") { let e = Error::with_msg_no_trace("produced error on request nodenet_parse_query"); return Err((e, netout).into()); diff --git a/nodenet/src/conn/test.rs b/nodenet/src/conn/test.rs index 84e98a1..635aa1a 100644 --- a/nodenet/src/conn/test.rs +++ b/nodenet/src/conn/test.rs @@ -24,7 +24,6 @@ use netpod::FileIoBufferSize; use netpod::Node; use netpod::NodeConfig; use netpod::NodeConfigCached; -use netpod::PerfOpts; use netpod::ScalarType; use netpod::SfChFetchInfo; use netpod::SfDatabuffer; @@ -99,7 +98,7 @@ fn raw_data_00() { let select = EventsSubQuerySelect::new(fetch_info.into(), range.into(), TransformQuery::default_events()); let settings = EventsSubQuerySettings::default(); let qu = EventsSubQuery::from_parts(select, settings); - let frame1 = Frame1Parts::new(qu); + let frame1 = Frame1Parts::new(qu.clone()); let query = EventQueryJsonStringFrame(serde_json::to_string(&frame1).unwrap()); let frame = sitem_data(query).make_frame()?; let jh = taskrun::spawn(events_conn_handler(client, addr, cfg)); @@ -108,8 +107,7 @@ fn raw_data_00() { con.shutdown().await.unwrap(); eprintln!("shut down"); - let perf_opts = PerfOpts::default(); - let mut frames = InMemoryFrameAsyncReadStream::new(con, perf_opts.inmem_bufcap); + let mut frames = InMemoryFrameAsyncReadStream::new(con, qu.inmem_bufcap()); while let Some(frame) = frames.next().await { match frame { Ok(frame) => match frame { diff --git a/parse/src/lib.rs b/parse/src/lib.rs index 10ea298..181d75b 100644 --- a/parse/src/lib.rs +++ b/parse/src/lib.rs @@ -1,5 +1,5 @@ pub mod api1_parse; pub mod channelconfig; -mod jsonconf; -pub mod nom2; pub use nom; + +mod jsonconf; diff --git a/parse/src/nom2.rs b/parse/src/nom2.rs deleted file mode 100644 index 3838425..0000000 --- a/parse/src/nom2.rs +++ /dev/null @@ -1,3 +0,0 @@ -// pub use nom::error::VerboseError; -// pub use nom::Err; -// pub use nom::IResult; diff --git a/query/src/api4/binned.rs b/query/src/api4/binned.rs index 9ea4e73..d404167 100644 --- a/query/src/api4/binned.rs +++ b/query/src/api4/binned.rs @@ -185,7 +185,7 @@ impl FromUrl for BinnedQuery { .get("diskStatsEveryKb") .map(|k| k.parse().ok()) .unwrap_or(None) - .map(ByteSize::kb), + .map(ByteSize::from_kb), /*report_error: pairs .get("reportError") .map_or("false", |k| k) diff --git a/query/src/api4/events.rs b/query/src/api4/events.rs index 35bdb8e..c647ff4 100644 --- a/query/src/api4/events.rs +++ b/query/src/api4/events.rs @@ -3,8 +3,7 @@ use crate::transform::TransformQuery; use err::Error; use netpod::get_url_query_pairs; use netpod::is_false; -use netpod::log::*; -use netpod::query::CacheUsage; +use netpod::query::api1::Api1Query; use netpod::query::PulseRangeQuery; use netpod::query::TimeRangeQuery; use netpod::range::evrange::SeriesRange; @@ -364,6 +363,21 @@ impl From<&BinnedQuery> for EventsSubQuerySettings { } } +impl From<&Api1Query> for EventsSubQuerySettings { + fn from(value: &Api1Query) -> Self { + Self { + timeout: value.timeout(), + // TODO ? + events_max: None, + event_delay: None, + stream_batch_len: None, + buf_len_disk_io: None, + test_do_wasm: false, + create_errors: Vec::new(), + } + } +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct EventsSubQuery { select: EventsSubQuerySelect, @@ -416,6 +430,10 @@ impl EventsSubQuery { self.settings.buf_len_disk_io.unwrap_or(1024 * 8) } + pub fn inmem_bufcap(&self) -> ByteSize { + ByteSize::from_kb(4) + } + // A rough indication on how many bytes this request is allowed to return. Otherwise, the result should // be a partial result. pub fn bytes_max(&self) -> u64 { diff --git a/streams/src/frames/inmem.rs b/streams/src/frames/inmem.rs index a4da4e5..7640ecc 100644 --- a/streams/src/frames/inmem.rs +++ b/streams/src/frames/inmem.rs @@ -10,6 +10,7 @@ use items_2::framable::INMEM_FRAME_HEAD; use items_2::framable::INMEM_FRAME_MAGIC; use items_2::inmem::InMemoryFrame; use netpod::log::*; +use netpod::ByteSize; use std::pin::Pin; use std::task::Context; use std::task::Poll; @@ -51,10 +52,10 @@ where std::any::type_name::() } - pub fn new(inp: T, bufcap: usize) -> Self { + pub fn new(inp: T, bufcap: ByteSize) -> Self { Self { inp, - buf: SlideBuf::new(bufcap), + buf: SlideBuf::new(bufcap.bytes() as usize), need_min: INMEM_FRAME_HEAD, done: false, complete: false, diff --git a/streams/src/tcprawclient.rs b/streams/src/tcprawclient.rs index 44aeb90..45b86b9 100644 --- a/streams/src/tcprawclient.rs +++ b/streams/src/tcprawclient.rs @@ -20,7 +20,6 @@ use netpod::log::*; use netpod::ChannelTypeConfigGen; use netpod::Cluster; use netpod::Node; -use netpod::PerfOpts; use query::api4::events::EventsSubQuery; use query::api4::events::Frame1Parts; use serde::de::DeserializeOwned; @@ -36,14 +35,12 @@ pub fn make_node_command_frame(query: EventsSubQuery) -> Result Result> + Send>>, Error> { let addr = format!("{}:{}", node.host, node.port_raw); debug!("x_processed_event_blobs_stream_from_node to: {addr}",); - let frame1 = make_node_command_frame(query)?; + let frame1 = make_node_command_frame(subq.clone())?; let net = TcpStream::connect(addr.clone()).await?; let (netin, mut netout) = net.into_split(); let item = sitem_data(frame1); @@ -53,7 +50,7 @@ pub async fn x_processed_event_blobs_stream_from_node( netout.write_all(&buf).await?; netout.flush().await?; netout.forget(); - let frames = InMemoryFrameAsyncReadStream::new(netin, perf_opts.inmem_bufcap); + let frames = InMemoryFrameAsyncReadStream::new(netin, subq.inmem_bufcap()); let frames = Box::pin(frames); let items = EventsFromFrames::new(frames, addr); Ok(Box::pin(items)) @@ -61,13 +58,13 @@ pub async fn x_processed_event_blobs_stream_from_node( pub type BoxedStream = Pin> + Send>>; -pub async fn open_tcp_streams(query: EventsSubQuery, cluster: &Cluster) -> Result>, Error> +pub async fn open_tcp_streams(subq: EventsSubQuery, cluster: &Cluster) -> Result>, Error> where // Group bounds in new trait T: FrameTypeInnerStatic + DeserializeOwned + Send + Unpin + fmt::Debug + 'static, { // TODO when unit tests established, change to async connect: - let frame1 = make_node_command_frame(query)?; + let frame1 = make_node_command_frame(subq.clone())?; let mut streams = Vec::new(); for node in &cluster.nodes { let addr = format!("{}:{}", node.host, node.port_raw); @@ -82,8 +79,7 @@ where netout.flush().await?; netout.forget(); // TODO for images, we need larger buffer capacity - let perf_opts = PerfOpts::default(); - let frames = InMemoryFrameAsyncReadStream::new(netin, perf_opts.inmem_bufcap); + let frames = InMemoryFrameAsyncReadStream::new(netin, subq.inmem_bufcap()); let frames = Box::pin(frames); let stream = EventsFromFrames::::new(frames, addr); streams.push(Box::pin(stream) as _);