diff --git a/Cargo.toml b/Cargo.toml index 56e427b..1747421 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["daqbuffer"] +members = ["daqbuffer", "h5out"] [profile.release] debug = 1 diff --git a/daqbuffer/src/test.rs b/daqbuffer/src/test.rs index 2572f14..c66a05a 100644 --- a/daqbuffer/src/test.rs +++ b/daqbuffer/src/test.rs @@ -1,264 +1,9 @@ -use crate::nodes::require_test_hosts_running; use bytes::BytesMut; -use chrono::{DateTime, Utc}; -use disk::agg::streams::{StatsItem, StreamItem}; -use disk::binned::query::{BinnedQuery, CacheUsage}; -use disk::binned::{MinMaxAvgBins, RangeCompletableItem, WithLen}; -use disk::frame::inmem::InMemoryFrameAsyncReadStream; -use disk::frame::makeframe::{FrameType, SubFrId}; -use disk::streamlog::Streamlog; -use disk::Sitemty; -use err::Error; -use futures_util::StreamExt; -use futures_util::TryStreamExt; -use http::StatusCode; -use hyper::Body; -use netpod::log::*; -use netpod::{AggKind, AppendToUrl, Channel, Cluster, HostPort, NanoRange, PerfOpts, APP_OCTET}; -use serde::de::DeserializeOwned; -use std::fmt::Debug; -use std::future::ready; -use tokio::io::AsyncRead; -use url::Url; +pub mod binnedbinary; pub mod binnedjson; pub mod events; -#[test] -fn get_binned_binary() { - taskrun::run(get_binned_binary_inner()).unwrap(); -} - -async fn get_binned_binary_inner() -> Result<(), Error> { - let rh = require_test_hosts_running()?; - let cluster = &rh.cluster; - if true { - get_binned_channel::( - "scalar-i32-be", - "1970-01-01T00:20:10.000Z", - "1970-01-01T00:20:50.000Z", - 3, - cluster, - true, - 4, - ) - .await?; - } - if true { - get_binned_channel::( - "wave-f64-be-n21", - "1970-01-01T00:20:10.000Z", - "1970-01-01T00:20:30.000Z", - 2, - cluster, - true, - 2, - ) - .await?; - } - if true { - get_binned_channel::( - "wave-u16-le-n77", - "1970-01-01T01:11:00.000Z", - "1970-01-01T01:35:00.000Z", - 7, - cluster, - true, - 24, - ) - .await?; - } - if true { - get_binned_channel::( - "wave-u16-le-n77", - "1970-01-01T01:42:00.000Z", - "1970-01-01T03:55:00.000Z", - 2, - cluster, - true, - 3, - ) - .await?; - } - Ok(()) -} - -async fn get_binned_channel( - channel_name: &str, - beg_date: &str, - end_date: &str, - bin_count: u32, - cluster: &Cluster, - expect_range_complete: bool, - expect_bin_count: u64, -) -> Result -where - NTY: Debug + SubFrId + DeserializeOwned, -{ - let t1 = Utc::now(); - let agg_kind = AggKind::DimXBins1; - let node0 = &cluster.nodes[0]; - let beg_date: DateTime = beg_date.parse()?; - let end_date: DateTime = end_date.parse()?; - let channel_backend = "testbackend"; - let perf_opts = PerfOpts { inmem_bufcap: 512 }; - let channel = Channel { - backend: channel_backend.into(), - name: channel_name.into(), - }; - let range = NanoRange::from_date_time(beg_date, end_date); - let mut query = BinnedQuery::new(channel, range, bin_count, agg_kind); - query.set_cache_usage(CacheUsage::Ignore); - let hp = HostPort::from_node(node0); - let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", hp.host, hp.port))?; - query.append_to_url(&mut url); - let url = url; - info!("get_binned_channel get {}", url); - let req = hyper::Request::builder() - .method(http::Method::GET) - .uri(url.to_string()) - .header(http::header::ACCEPT, APP_OCTET) - .body(Body::empty())?; - let client = hyper::Client::new(); - let res = client.request(req).await?; - if res.status() != StatusCode::OK { - error!("client response {:?}", res); - } - let s1 = disk::cache::HttpBodyAsAsyncRead::new(res); - let s2 = InMemoryFrameAsyncReadStream::new(s1, perf_opts.inmem_bufcap); - let res = consume_binned_response::(s2).await?; - let t2 = chrono::Utc::now(); - let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; - info!("get_cached_0 DONE bin_count {} time {} ms", res.bin_count, ms); - if !res.is_valid() { - Err(Error::with_msg(format!("invalid response: {:?}", res))) - } else if res.range_complete_count == 0 && expect_range_complete { - Err(Error::with_msg(format!("expect range complete: {:?}", res))) - } else if res.bin_count != expect_bin_count { - Err(Error::with_msg(format!("bin count mismatch: {:?}", res))) - } else { - Ok(res) - } -} - -#[derive(Debug)] -pub struct BinnedResponse { - bin_count: u64, - err_item_count: u64, - data_item_count: u64, - bytes_read: u64, - range_complete_count: u64, - log_item_count: u64, - stats_item_count: u64, -} - -impl BinnedResponse { - pub fn new() -> Self { - Self { - bin_count: 0, - err_item_count: 0, - data_item_count: 0, - bytes_read: 0, - range_complete_count: 0, - log_item_count: 0, - stats_item_count: 0, - } - } - - pub fn is_valid(&self) -> bool { - if self.range_complete_count > 1 { - false - } else { - true - } - } -} - -async fn consume_binned_response(inp: InMemoryFrameAsyncReadStream) -> Result -where - NTY: Debug + SubFrId + DeserializeOwned, - T: AsyncRead + Unpin, -{ - let s1 = inp - .map_err(|e| error!("TEST GOT ERROR {:?}", e)) - .filter_map(|item| { - let g = match item { - Ok(item) => match item { - StreamItem::Log(item) => { - Streamlog::emit(&item); - None - } - StreamItem::Stats(item) => { - info!("Stats: {:?}", item); - None - } - StreamItem::DataItem(frame) => { - if frame.tyid() != > as FrameType>::FRAME_TYPE_ID { - error!("test receives unexpected tyid {:x}", frame.tyid()); - } - match bincode::deserialize::>>(frame.buf()) { - Ok(item) => match item { - Ok(item) => match item { - StreamItem::Log(item) => { - Streamlog::emit(&item); - Some(Ok(StreamItem::Log(item))) - } - item => { - info!("TEST GOT ITEM {:?}", item); - Some(Ok(item)) - } - }, - Err(e) => { - error!("TEST GOT ERROR FRAME: {:?}", e); - Some(Err(e)) - } - }, - Err(e) => { - error!("bincode error: {:?}", e); - Some(Err(e.into())) - } - } - } - }, - Err(e) => Some(Err(Error::with_msg(format!("WEIRD EMPTY ERROR {:?}", e)))), - }; - ready(g) - }) - .fold(BinnedResponse::new(), |mut a, k| { - let g = match k { - Ok(StreamItem::Log(_item)) => { - a.log_item_count += 1; - a - } - Ok(StreamItem::Stats(item)) => match item { - StatsItem::EventDataReadStats(item) => { - a.bytes_read += item.parsed_bytes; - a - } - }, - Ok(StreamItem::DataItem(item)) => match item { - RangeCompletableItem::RangeComplete => { - a.range_complete_count += 1; - a - } - RangeCompletableItem::Data(item) => { - a.data_item_count += 1; - a.bin_count += WithLen::len(&item) as u64; - a - } - }, - Err(_e) => { - a.err_item_count += 1; - a - } - }; - ready(g) - }); - let ret = s1.await; - info!("BinnedResponse: {:?}", ret); - Ok(ret) -} - #[test] fn bufs() { use bytes::{Buf, BufMut}; diff --git a/daqbuffer/src/test/binnedbinary.rs b/daqbuffer/src/test/binnedbinary.rs new file mode 100644 index 0000000..0880417 --- /dev/null +++ b/daqbuffer/src/test/binnedbinary.rs @@ -0,0 +1,256 @@ +use crate::nodes::require_test_hosts_running; +use chrono::{DateTime, Utc}; +use disk::agg::streams::{StatsItem, StreamItem}; +use disk::binned::query::{BinnedQuery, CacheUsage}; +use disk::binned::{MinMaxAvgBins, RangeCompletableItem, WithLen}; +use disk::frame::inmem::InMemoryFrameAsyncReadStream; +use disk::frame::makeframe::{FrameType, SubFrId}; +use disk::streamlog::Streamlog; +use disk::Sitemty; +use err::Error; +use futures_util::{StreamExt, TryStreamExt}; +use http::StatusCode; +use hyper::Body; +use netpod::log::*; +use netpod::{AggKind, AppendToUrl, Channel, Cluster, HostPort, NanoRange, PerfOpts, APP_OCTET}; +use serde::de::DeserializeOwned; +use std::fmt; +use std::future::ready; +use tokio::io::AsyncRead; +use url::Url; + +#[test] +fn get_binned_binary() { + taskrun::run(get_binned_binary_inner()).unwrap(); +} + +async fn get_binned_binary_inner() -> Result<(), Error> { + let rh = require_test_hosts_running()?; + let cluster = &rh.cluster; + if true { + get_binned_channel::( + "scalar-i32-be", + "1970-01-01T00:20:10.000Z", + "1970-01-01T00:20:50.000Z", + 3, + cluster, + true, + 4, + ) + .await?; + } + if true { + get_binned_channel::( + "wave-f64-be-n21", + "1970-01-01T00:20:10.000Z", + "1970-01-01T00:20:30.000Z", + 2, + cluster, + true, + 2, + ) + .await?; + } + if true { + get_binned_channel::( + "wave-u16-le-n77", + "1970-01-01T01:11:00.000Z", + "1970-01-01T01:35:00.000Z", + 7, + cluster, + true, + 24, + ) + .await?; + } + if true { + get_binned_channel::( + "wave-u16-le-n77", + "1970-01-01T01:42:00.000Z", + "1970-01-01T03:55:00.000Z", + 2, + cluster, + true, + 3, + ) + .await?; + } + Ok(()) +} + +async fn get_binned_channel( + channel_name: &str, + beg_date: &str, + end_date: &str, + bin_count: u32, + cluster: &Cluster, + expect_range_complete: bool, + expect_bin_count: u64, +) -> Result +where + NTY: fmt::Debug + SubFrId + DeserializeOwned, +{ + let t1 = Utc::now(); + let agg_kind = AggKind::DimXBins1; + let node0 = &cluster.nodes[0]; + let beg_date: DateTime = beg_date.parse()?; + let end_date: DateTime = end_date.parse()?; + let channel_backend = "testbackend"; + let perf_opts = PerfOpts { inmem_bufcap: 512 }; + let channel = Channel { + backend: channel_backend.into(), + name: channel_name.into(), + }; + let range = NanoRange::from_date_time(beg_date, end_date); + let mut query = BinnedQuery::new(channel, range, bin_count, agg_kind); + query.set_cache_usage(CacheUsage::Ignore); + query.set_disk_io_buffer_size(1024 * 16); + let hp = HostPort::from_node(node0); + let mut url = Url::parse(&format!("http://{}:{}/api/4/binned", hp.host, hp.port))?; + query.append_to_url(&mut url); + let url = url; + info!("get_binned_channel get {}", url); + let req = hyper::Request::builder() + .method(http::Method::GET) + .uri(url.to_string()) + .header(http::header::ACCEPT, APP_OCTET) + .body(Body::empty())?; + let client = hyper::Client::new(); + let res = client.request(req).await?; + if res.status() != StatusCode::OK { + error!("client response {:?}", res); + } + let s1 = disk::cache::HttpBodyAsAsyncRead::new(res); + let s2 = InMemoryFrameAsyncReadStream::new(s1, perf_opts.inmem_bufcap); + let res = consume_binned_response::(s2).await?; + let t2 = chrono::Utc::now(); + let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; + info!("get_cached_0 DONE bin_count {} time {} ms", res.bin_count, ms); + if !res.is_valid() { + Err(Error::with_msg(format!("invalid response: {:?}", res))) + } else if res.range_complete_count == 0 && expect_range_complete { + Err(Error::with_msg(format!("expect range complete: {:?}", res))) + } else if res.bin_count != expect_bin_count { + Err(Error::with_msg(format!("bin count mismatch: {:?}", res))) + } else { + Ok(res) + } +} + +#[derive(Debug)] +pub struct BinnedResponse { + bin_count: u64, + err_item_count: u64, + data_item_count: u64, + bytes_read: u64, + range_complete_count: u64, + log_item_count: u64, + stats_item_count: u64, +} + +impl BinnedResponse { + pub fn new() -> Self { + Self { + bin_count: 0, + err_item_count: 0, + data_item_count: 0, + bytes_read: 0, + range_complete_count: 0, + log_item_count: 0, + stats_item_count: 0, + } + } + + pub fn is_valid(&self) -> bool { + if self.range_complete_count > 1 { + false + } else { + true + } + } +} + +async fn consume_binned_response(inp: InMemoryFrameAsyncReadStream) -> Result +where + NTY: fmt::Debug + SubFrId + DeserializeOwned, + T: AsyncRead + Unpin, +{ + let s1 = inp + .map_err(|e| error!("TEST GOT ERROR {:?}", e)) + .filter_map(|item| { + let g = match item { + Ok(item) => match item { + StreamItem::Log(item) => { + Streamlog::emit(&item); + None + } + StreamItem::Stats(item) => { + info!("Stats: {:?}", item); + None + } + StreamItem::DataItem(frame) => { + if frame.tyid() != > as FrameType>::FRAME_TYPE_ID { + error!("test receives unexpected tyid {:x}", frame.tyid()); + } + match bincode::deserialize::>>(frame.buf()) { + Ok(item) => match item { + Ok(item) => match item { + StreamItem::Log(item) => { + Streamlog::emit(&item); + Some(Ok(StreamItem::Log(item))) + } + item => { + info!("TEST GOT ITEM {:?}", item); + Some(Ok(item)) + } + }, + Err(e) => { + error!("TEST GOT ERROR FRAME: {:?}", e); + Some(Err(e)) + } + }, + Err(e) => { + error!("bincode error: {:?}", e); + Some(Err(e.into())) + } + } + } + }, + Err(e) => Some(Err(Error::with_msg(format!("WEIRD EMPTY ERROR {:?}", e)))), + }; + ready(g) + }) + .fold(BinnedResponse::new(), |mut a, k| { + let g = match k { + Ok(StreamItem::Log(_item)) => { + a.log_item_count += 1; + a + } + Ok(StreamItem::Stats(item)) => match item { + StatsItem::EventDataReadStats(item) => { + a.bytes_read += item.parsed_bytes; + a + } + }, + Ok(StreamItem::DataItem(item)) => match item { + RangeCompletableItem::RangeComplete => { + a.range_complete_count += 1; + a + } + RangeCompletableItem::Data(item) => { + a.data_item_count += 1; + a.bin_count += WithLen::len(&item) as u64; + a + } + }, + Err(_e) => { + a.err_item_count += 1; + a + } + }; + ready(g) + }); + let ret = s1.await; + info!("BinnedResponse: {:?}", ret); + Ok(ret) +} diff --git a/daqbuffer/src/test/events.rs b/daqbuffer/src/test/events.rs index c19565f..ddb700b 100644 --- a/daqbuffer/src/test/events.rs +++ b/daqbuffer/src/test/events.rs @@ -64,7 +64,7 @@ where name: channel_name.into(), }; let range = NanoRange::from_date_time(beg_date, end_date); - let query = PlainEventsBinaryQuery::new(channel, range); + let query = PlainEventsBinaryQuery::new(channel, range, 1024 * 4); let hp = HostPort::from_node(node0); let mut url = Url::parse(&format!("http://{}:{}", hp.host, hp.port))?; query.append_to_url(&mut url); @@ -271,7 +271,7 @@ async fn get_plain_events_json( name: channel_name.into(), }; let range = NanoRange::from_date_time(beg_date, end_date); - let query = PlainEventsJsonQuery::new(channel, range, false); + let query = PlainEventsJsonQuery::new(channel, range, 1024 * 4, false); let hp = HostPort::from_node(node0); let mut url = Url::parse(&format!("http://{}:{}/api/4/events", hp.host, hp.port))?; query.append_to_url(&mut url); diff --git a/disk/src/agg/binnedt.rs b/disk/src/agg/binnedt.rs index 1210239..aa94fa3 100644 --- a/disk/src/agg/binnedt.rs +++ b/disk/src/agg/binnedt.rs @@ -82,7 +82,30 @@ where Poll::Ready(None) } else { let inp_poll_span = span!(Level::TRACE, "into_t_inp_poll"); - inp_poll_span.in_scope(|| self.inp.poll_next_unpin(cx)) + let t = inp_poll_span.in_scope(|| self.inp.poll_next_unpin(cx)); + if false { + // TODO collect as stats: + use Poll::*; + match &t { + Ready(item) => match item { + Some(item) => match item { + Ok(item) => match item { + StreamItem::DataItem(item) => match item { + RangeCompletableItem::Data(item) => { + info!("time binner got batch len {}", item.len()); + } + _ => {} + }, + _ => {} + }, + _ => {} + }, + _ => {} + }, + _ => {} + } + } + t } } diff --git a/disk/src/aggtest.rs b/disk/src/aggtest.rs index 88dd360..e720281 100644 --- a/disk/src/aggtest.rs +++ b/disk/src/aggtest.rs @@ -1,4 +1,4 @@ -use crate::eventblobs::EventBlobsComplete; +use crate::eventblobs::EventChunkerMultifile; use crate::eventchunker::EventChunkerConf; use netpod::timeunits::*; use netpod::{ByteOrder, ByteSize, Channel, ChannelConfig, NanoRange, Nanos, Node, ScalarType, Shape}; @@ -53,7 +53,7 @@ async fn agg_x_dim_0_inner() { let ts2 = ts1 + HOUR * 24; let range = NanoRange { beg: ts1, end: ts2 }; let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); - let fut1 = EventBlobsComplete::new( + let fut1 = EventChunkerMultifile::new( range.clone(), query.channel_config.clone(), node.clone(), @@ -102,7 +102,7 @@ async fn agg_x_dim_1_inner() { let ts2 = ts1 + HOUR * 24; let range = NanoRange { beg: ts1, end: ts2 }; let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); - let fut1 = super::eventblobs::EventBlobsComplete::new( + let fut1 = super::eventblobs::EventChunkerMultifile::new( range.clone(), query.channel_config.clone(), node.clone(), diff --git a/disk/src/binned.rs b/disk/src/binned.rs index bd7866c..7e98bd8 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -137,6 +137,7 @@ impl ChannelExecFunction for BinnedBinaryChannelExec { shape, self.query.agg_kind().clone(), self.query.cache_usage().clone(), + self.query.disk_io_buffer_size(), &self.node_config, self.query.disk_stats_every().clone(), self.query.report_error(), @@ -156,6 +157,7 @@ impl ChannelExecFunction for BinnedBinaryChannelExec { channel: self.query.channel().clone(), range: self.query.range().clone(), agg_kind: self.query.agg_kind().clone(), + disk_io_buffer_size: self.query.disk_io_buffer_size(), }; let x_bin_count = x_bin_count(&shape, self.query.agg_kind()); let s = MergedFromRemotes::::new(evq, perf_opts, self.node_config.node_config.cluster.clone()); @@ -403,6 +405,7 @@ impl ChannelExecFunction for BinnedJsonChannelExec { shape, self.query.agg_kind().clone(), self.query.cache_usage().clone(), + self.query.disk_io_buffer_size(), &self.node_config, self.query.disk_stats_every().clone(), self.query.report_error(), @@ -423,6 +426,7 @@ impl ChannelExecFunction for BinnedJsonChannelExec { channel: self.query.channel().clone(), range: self.query.range().clone(), agg_kind: self.query.agg_kind().clone(), + disk_io_buffer_size: self.query.disk_io_buffer_size(), }; let x_bin_count = x_bin_count(&shape, self.query.agg_kind()); let s = MergedFromRemotes::::new(evq, perf_opts, self.node_config.node_config.cluster.clone()); diff --git a/disk/src/binned/binnedfrompbv.rs b/disk/src/binned/binnedfrompbv.rs index 8e1aa27..5396441 100644 --- a/disk/src/binned/binnedfrompbv.rs +++ b/disk/src/binned/binnedfrompbv.rs @@ -168,6 +168,7 @@ where shape: Shape, agg_kind: AggKind, cache_usage: CacheUsage, + disk_io_buffer_size: usize, node_config: &NodeConfigCached, disk_stats_every: ByteSize, report_error: bool, @@ -193,6 +194,7 @@ where channel.clone(), agg_kind.clone(), cache_usage.clone(), + disk_io_buffer_size, disk_stats_every.clone(), report_error, ); diff --git a/disk/src/binned/pbv.rs b/disk/src/binned/pbv.rs index b592cb2..e2faf8d 100644 --- a/disk/src/binned/pbv.rs +++ b/disk/src/binned/pbv.rs @@ -114,6 +114,7 @@ where channel: self.query.channel().clone(), range: self.query.patch().patch_range(), agg_kind: self.query.agg_kind().clone(), + disk_io_buffer_size: self.query.disk_io_buffer_size(), }; if self.query.patch().patch_t_len() % self.query.patch().bin_t_len() != 0 { let msg = format!( @@ -172,6 +173,7 @@ where let s = futures_util::stream::iter(patch_it) .map({ let q2 = self.query.clone(); + let disk_io_buffer_size = self.query.disk_io_buffer_size(); let disk_stats_every = self.query.disk_stats_every().clone(); let report_error = self.query.report_error(); move |patch| { @@ -180,6 +182,7 @@ where q2.channel().clone(), q2.agg_kind().clone(), q2.cache_usage().clone(), + disk_io_buffer_size, disk_stats_every.clone(), report_error, ); diff --git a/disk/src/binned/query.rs b/disk/src/binned/query.rs index 860263b..d9be8ba 100644 --- a/disk/src/binned/query.rs +++ b/disk/src/binned/query.rs @@ -16,6 +16,7 @@ pub struct PreBinnedQuery { agg_kind: AggKind, channel: Channel, cache_usage: CacheUsage, + disk_io_buffer_size: usize, disk_stats_every: ByteSize, report_error: bool, } @@ -26,6 +27,7 @@ impl PreBinnedQuery { channel: Channel, agg_kind: AggKind, cache_usage: CacheUsage, + disk_io_buffer_size: usize, disk_stats_every: ByteSize, report_error: bool, ) -> Self { @@ -34,6 +36,7 @@ impl PreBinnedQuery { agg_kind, channel, cache_usage, + disk_io_buffer_size, disk_stats_every, report_error, } @@ -68,6 +71,11 @@ impl PreBinnedQuery { channel: channel_from_pairs(&pairs)?, agg_kind: agg_kind_from_binning_scheme(&pairs).unwrap_or(AggKind::DimXBins1), cache_usage: CacheUsage::from_pairs(&pairs)?, + disk_io_buffer_size: pairs + .get("diskIoBufferSize") + .map_or("4096", |k| k) + .parse() + .map_err(|e| Error::with_msg(format!("can not parse diskIoBufferSize {:?}", e)))?, disk_stats_every: ByteSize::kb(disk_stats_every), report_error: pairs .get("reportError") @@ -107,6 +115,10 @@ impl PreBinnedQuery { pub fn cache_usage(&self) -> CacheUsage { self.cache_usage.clone() } + + pub fn disk_io_buffer_size(&self) -> usize { + self.disk_io_buffer_size + } } impl AppendToUrl for PreBinnedQuery { @@ -117,6 +129,7 @@ impl AppendToUrl for PreBinnedQuery { g.append_pair("channelBackend", &self.channel.backend); g.append_pair("channelName", &self.channel.name); g.append_pair("cacheUsage", &format!("{}", self.cache_usage.query_param_value())); + g.append_pair("diskIoBufferSize", &format!("{}", self.disk_io_buffer_size)); g.append_pair("diskStatsEveryKb", &format!("{}", self.disk_stats_every.bytes() / 1024)); g.append_pair("reportError", &format!("{}", self.report_error())); } @@ -181,6 +194,7 @@ pub struct BinnedQuery { bin_count: u32, agg_kind: AggKind, cache_usage: CacheUsage, + disk_io_buffer_size: usize, disk_stats_every: ByteSize, report_error: bool, timeout: Duration, @@ -196,6 +210,7 @@ impl BinnedQuery { bin_count, agg_kind, cache_usage: CacheUsage::Use, + disk_io_buffer_size: 1024 * 4, disk_stats_every: ByteSize(1024 * 1024 * 4), report_error: false, timeout: Duration::from_millis(2000), @@ -228,6 +243,10 @@ impl BinnedQuery { &self.disk_stats_every } + pub fn disk_io_buffer_size(&self) -> usize { + self.disk_io_buffer_size + } + pub fn report_error(&self) -> bool { self.report_error } @@ -255,6 +274,10 @@ impl BinnedQuery { pub fn set_timeout(&mut self, k: Duration) { self.timeout = k; } + + pub fn set_disk_io_buffer_size(&mut self, k: usize) { + self.disk_io_buffer_size = k; + } } impl HasBackend for BinnedQuery { @@ -291,6 +314,11 @@ impl FromUrl for BinnedQuery { .map_err(|e| Error::with_msg(format!("can not parse binCount {:?}", e)))?, agg_kind: agg_kind_from_binning_scheme(&pairs).unwrap_or(AggKind::DimXBins1), cache_usage: CacheUsage::from_pairs(&pairs)?, + disk_io_buffer_size: pairs + .get("diskIoBufferSize") + .map_or("4096", |k| k) + .parse() + .map_err(|e| Error::with_msg(format!("can not parse diskIoBufferSize {:?}", e)))?, disk_stats_every: ByteSize::kb(disk_stats_every), report_error: pairs .get("reportError") @@ -342,6 +370,7 @@ impl AppendToUrl for BinnedQuery { } { let mut g = url.query_pairs_mut(); + g.append_pair("diskIoBufferSize", &format!("{}", self.disk_io_buffer_size)); g.append_pair("diskStatsEveryKb", &format!("{}", self.disk_stats_every.bytes() / 1024)); g.append_pair("timeout", &format!("{}", self.timeout.as_millis())); g.append_pair("abortAfterBinCount", &format!("{}", self.abort_after_bin_count)); diff --git a/disk/src/channelexec.rs b/disk/src/channelexec.rs index d342619..d4c99ca 100644 --- a/disk/src/channelexec.rs +++ b/disk/src/channelexec.rs @@ -209,15 +209,17 @@ pub struct PlainEvents { channel: Channel, range: NanoRange, agg_kind: AggKind, + disk_io_buffer_size: usize, node_config: NodeConfigCached, } impl PlainEvents { - pub fn new(channel: Channel, range: NanoRange, node_config: NodeConfigCached) -> Self { + pub fn new(channel: Channel, range: NanoRange, disk_io_buffer_size: usize, node_config: NodeConfigCached) -> Self { Self { channel, range, agg_kind: AggKind::Plain, + disk_io_buffer_size, node_config, } } @@ -254,6 +256,7 @@ impl ChannelExecFunction for PlainEvents { channel: self.channel, range: self.range, agg_kind: self.agg_kind, + disk_io_buffer_size: self.disk_io_buffer_size, }; let s = MergedFromRemotes::>::new(evq, perf_opts, self.node_config.node_config.cluster); let s = s.map(|item| Box::new(item) as Box); @@ -269,6 +272,7 @@ pub struct PlainEventsJson { channel: Channel, range: NanoRange, agg_kind: AggKind, + disk_io_buffer_size: usize, timeout: Duration, node_config: NodeConfigCached, do_log: bool, @@ -278,6 +282,7 @@ impl PlainEventsJson { pub fn new( channel: Channel, range: NanoRange, + disk_io_buffer_size: usize, timeout: Duration, node_config: NodeConfigCached, do_log: bool, @@ -286,6 +291,7 @@ impl PlainEventsJson { channel, range, agg_kind: AggKind::Plain, + disk_io_buffer_size, timeout, node_config, do_log, @@ -404,6 +410,7 @@ impl ChannelExecFunction for PlainEventsJson { channel: self.channel, range: self.range, agg_kind: self.agg_kind, + disk_io_buffer_size: self.disk_io_buffer_size, }; let s = MergedFromRemotes::::new(evq, perf_opts, self.node_config.node_config.cluster); let f = collect_plain_events_json(s, self.timeout, 0, self.do_log); diff --git a/disk/src/decode.rs b/disk/src/decode.rs index 86b4c34..a05d6ca 100644 --- a/disk/src/decode.rs +++ b/disk/src/decode.rs @@ -6,7 +6,7 @@ use crate::binned::{ Bool, EventValuesAggregator, EventsNodeProcessor, FilterFittingInside, MinMaxAvgBins, NumOps, PushableIndex, RangeCompletableItem, RangeOverlapInfo, ReadPbv, ReadableFromFile, WithLen, WithTimestamps, }; -use crate::eventblobs::EventBlobsComplete; +use crate::eventblobs::EventChunkerMultifile; use crate::eventchunker::EventFull; use err::Error; use futures_core::Stream; @@ -422,7 +422,7 @@ where EVS: EventValueShape, { evs: EVS, - event_blobs: EventBlobsComplete, + event_blobs: EventChunkerMultifile, completed: bool, errored: bool, _m1: PhantomData, @@ -436,7 +436,7 @@ where END: Endianness, EVS: EventValueShape + EventValueFromBytes, { - pub fn new(evs: EVS, event_blobs: EventBlobsComplete) -> Self { + pub fn new(evs: EVS, event_blobs: EventChunkerMultifile) -> Self { Self { evs, event_blobs, @@ -450,6 +450,8 @@ where fn decode(&mut self, ev: &EventFull) -> Result>::Output>, Error> { let mut ret = EventValues::empty(); + ret.tss.reserve(ev.tss.len()); + ret.values.reserve(ev.tss.len()); for i1 in 0..ev.tss.len() { // TODO check that dtype, event endianness and event shape match our static // expectation about the data in this channel. diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index d4db38b..7440218 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -15,7 +15,7 @@ use std::sync::atomic::AtomicU64; use std::sync::Arc; use std::task::{Context, Poll}; -pub struct EventBlobsComplete { +pub struct EventChunkerMultifile { channel_config: ChannelConfig, file_chan: async_channel::Receiver>, evs: Option, @@ -30,7 +30,7 @@ pub struct EventBlobsComplete { node_ix: usize, } -impl EventBlobsComplete { +impl EventChunkerMultifile { pub fn new( range: NanoRange, channel_config: ChannelConfig, @@ -56,7 +56,7 @@ impl EventBlobsComplete { } } -impl Stream for EventBlobsComplete { +impl Stream for EventChunkerMultifile { type Item = Result>, Error>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { diff --git a/disk/src/events.rs b/disk/src/events.rs index 4b6ef6e..f65a075 100644 --- a/disk/src/events.rs +++ b/disk/src/events.rs @@ -11,15 +11,17 @@ use url::Url; pub struct PlainEventsBinaryQuery { channel: Channel, range: NanoRange, + disk_io_buffer_size: usize, report_error: bool, timeout: Duration, } impl PlainEventsBinaryQuery { - pub fn new(channel: Channel, range: NanoRange) -> Self { + pub fn new(channel: Channel, range: NanoRange, disk_io_buffer_size: usize) -> Self { Self { channel, range, + disk_io_buffer_size, report_error: false, timeout: Duration::from_millis(10000), } @@ -35,6 +37,11 @@ impl PlainEventsBinaryQuery { end: end_date.parse::>()?.to_nanos(), }, channel: channel_from_pairs(&pairs)?, + disk_io_buffer_size: pairs + .get("diskIoBufferSize") + .map_or("4096", |k| k) + .parse() + .map_err(|e| Error::with_msg(format!("can not parse diskIoBufferSize {:?}", e)))?, report_error: pairs .get("reportError") .map_or("false", |k| k) @@ -58,6 +65,10 @@ impl PlainEventsBinaryQuery { &self.channel } + pub fn disk_io_buffer_size(&self) -> usize { + self.disk_io_buffer_size + } + pub fn report_error(&self) -> bool { self.report_error } @@ -85,6 +96,7 @@ impl AppendToUrl for PlainEventsBinaryQuery { "endDate", &Utc.timestamp_nanos(self.range.end as i64).format(date_fmt).to_string(), ); + g.append_pair("diskIoBufferSize", &format!("{}", self.disk_io_buffer_size)); g.append_pair("timeout", &format!("{}", self.timeout.as_millis())); } } @@ -94,16 +106,18 @@ impl AppendToUrl for PlainEventsBinaryQuery { pub struct PlainEventsJsonQuery { channel: Channel, range: NanoRange, + disk_io_buffer_size: usize, report_error: bool, timeout: Duration, do_log: bool, } impl PlainEventsJsonQuery { - pub fn new(channel: Channel, range: NanoRange, do_log: bool) -> Self { + pub fn new(channel: Channel, range: NanoRange, disk_io_buffer_size: usize, do_log: bool) -> Self { Self { channel, range, + disk_io_buffer_size, report_error: false, timeout: Duration::from_millis(10000), do_log, @@ -120,6 +134,11 @@ impl PlainEventsJsonQuery { end: end_date.parse::>()?.to_nanos(), }, channel: channel_from_pairs(&pairs)?, + disk_io_buffer_size: pairs + .get("diskIoBufferSize") + .map_or("4096", |k| k) + .parse() + .map_err(|e| Error::with_msg(format!("can not parse diskIoBufferSize {:?}", e)))?, report_error: pairs .get("reportError") .map_or("false", |k| k) @@ -158,6 +177,10 @@ impl PlainEventsJsonQuery { self.report_error } + pub fn disk_io_buffer_size(&self) -> usize { + self.disk_io_buffer_size + } + pub fn timeout(&self) -> Duration { self.timeout } @@ -183,6 +206,7 @@ impl PlainEventsJsonQuery { "endDate", &Utc.timestamp_nanos(self.range.end as i64).format(date_fmt).to_string(), ); + g.append_pair("diskIoBufferSize", &format!("{}", self.disk_io_buffer_size)); g.append_pair("timeout", &format!("{}", self.timeout.as_millis())); g.append_pair("doLog", &format!("{}", self.do_log)); } diff --git a/disk/src/merge.rs b/disk/src/merge.rs index 7bbd369..f34a016 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -189,8 +189,6 @@ where } if lowest_ix == usize::MAX { if self.batch.len() != 0 { - //let k = std::mem::replace(&mut self.batch, MinMaxAvgScalarEventBatch::empty()); - //let ret = MinMaxAvgScalarEventBatchStreamItem::Values(k); let emp = <::Output>::empty(); let ret = std::mem::replace(&mut self.batch, emp); self.data_emit_complete = true; diff --git a/disk/src/raw.rs b/disk/src/raw.rs index eea6fff..2e7926c 100644 --- a/disk/src/raw.rs +++ b/disk/src/raw.rs @@ -30,6 +30,7 @@ pub struct RawEventsQuery { pub channel: Channel, pub range: NanoRange, pub agg_kind: AggKind, + pub disk_io_buffer_size: usize, } #[derive(Serialize, Deserialize)] diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index 34744c4..435c9a9 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -5,7 +5,7 @@ use crate::decode::{ BigEndian, Endianness, EventValueFromBytes, EventValueShape, EventValuesDim0Case, EventValuesDim1Case, EventsDecodedStream, LittleEndian, NumFromBytes, }; -use crate::eventblobs::EventBlobsComplete; +use crate::eventblobs::EventChunkerMultifile; use crate::eventchunker::EventChunkerConf; use crate::frame::inmem::InMemoryFrameAsyncReadStream; use crate::frame::makeframe::{decode_frame, make_frame, make_term_frame, Framable}; @@ -99,7 +99,7 @@ impl> From<(E, OwnedWriteHalf)> for ConnErr { fn make_num_pipeline_stream_evs( event_value_shape: EVS, events_node_proc: ENP, - event_blobs: EventBlobsComplete, + event_blobs: EventChunkerMultifile, ) -> Pin> + Send>> where NTY: NumOps + NumFromBytes + 'static, @@ -286,16 +286,13 @@ async fn events_conn_handler_inner_try( array: entry.is_array, compression: entry.is_compressed, }; - - // TODO use a requested buffer size - let buffer_size = 1024 * 4; let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); - let event_blobs = EventBlobsComplete::new( + let event_blobs = EventChunkerMultifile::new( range.clone(), channel_config.clone(), node_config.node.clone(), node_config.ix, - buffer_size, + evq.disk_io_buffer_size, event_chunker_conf, ); let shape = entry.to_shape().unwrap(); diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index a30ac17..90d7917 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -427,7 +427,12 @@ async fn plain_events(req: Request, node_config: &NodeConfigCached) -> Res async fn plain_events_binary(req: Request, node_config: &NodeConfigCached) -> Result, Error> { let url = Url::parse(&format!("dummy:{}", req.uri()))?; let query = PlainEventsBinaryQuery::from_url(&url)?; - let op = disk::channelexec::PlainEvents::new(query.channel().clone(), query.range().clone(), node_config.clone()); + let op = disk::channelexec::PlainEvents::new( + query.channel().clone(), + query.range().clone(), + query.disk_io_buffer_size(), + node_config.clone(), + ); let s = disk::channelexec::channel_exec(op, query.channel(), query.range(), AggKind::Plain, node_config).await?; let s = s.map(|item| item.make_frame()); let ret = response(StatusCode::OK).body(BodyStream::wrapped(s, format!("plain_events")))?; @@ -440,6 +445,7 @@ async fn plain_events_json(req: Request, node_config: &NodeConfigCached) - let op = disk::channelexec::PlainEventsJson::new( query.channel().clone(), query.range().clone(), + query.disk_io_buffer_size(), query.timeout(), node_config.clone(), query.do_log(), @@ -595,8 +601,7 @@ pub async fn channel_config(req: Request, node_config: &NodeConfigCached) pub async fn ca_connect_1(req: Request, node_config: &NodeConfigCached) -> Result, Error> { let url = Url::parse(&format!("dummy:{}", req.uri()))?; let pairs = get_url_query_pairs(&url); - let addr = pairs.get("addr").unwrap().into(); - let res = netfetch::ca_connect_1(addr, node_config).await?; + let res = netfetch::ca_connect_1(pairs, node_config).await?; let ret = response(StatusCode::OK) .header(http::header::CONTENT_TYPE, APP_JSON_LINES) .body(Body::wrap_stream(res.map(|k| match serde_json::to_string(&k) { diff --git a/netfetch/Cargo.toml b/netfetch/Cargo.toml index 12728fb..d6b85de 100644 --- a/netfetch/Cargo.toml +++ b/netfetch/Cargo.toml @@ -18,3 +18,4 @@ futures-core = "0.3.14" futures-util = "0.3.14" err = { path = "../err" } netpod = { path = "../netpod" } +taskrun = { path = "../taskrun" } diff --git a/netfetch/src/lib.rs b/netfetch/src/lib.rs index 961ff47..8fcc6e1 100644 --- a/netfetch/src/lib.rs +++ b/netfetch/src/lib.rs @@ -4,6 +4,7 @@ use err::Error; use futures_util::FutureExt; use netpod::NodeConfigCached; use serde::{Deserialize, Serialize}; +use std::collections::BTreeMap; use tokio::io::{AsyncReadExt, AsyncWriteExt}; #[derive(Debug, Serialize, Deserialize)] @@ -20,15 +21,64 @@ pub enum FetchItem { Message(Message), } +#[cfg(test)] +mod test { + use futures_util::StreamExt; + use netpod::log::*; + use netpod::{Cluster, Database, Node, NodeConfig, NodeConfigCached}; + use std::collections::BTreeMap; + use std::iter::FromIterator; + + #[test] + fn ca_connect_1() { + taskrun::run(async { + let it = vec![(String::new(), String::new())].into_iter(); + let pairs = BTreeMap::from_iter(it); + let node_config = NodeConfigCached { + node: Node { + host: "".into(), + bin_grain_kind: 0, + port: 123, + port_raw: 123, + backend: "".into(), + split: 0, + data_base_path: "".into(), + listen: "".into(), + ksprefix: "".into(), + }, + node_config: NodeConfig { + name: "".into(), + cluster: Cluster { + nodes: vec![], + database: Database { + host: "".into(), + name: "".into(), + user: "".into(), + pass: "".into(), + }, + }, + }, + ix: 0, + }; + let mut rx = super::ca_connect_1(pairs, &node_config).await?; + while let Some(item) = rx.next().await { + info!("got next: {:?}", item); + } + Ok(()) + }) + .unwrap(); + } +} + pub async fn ca_connect_1( - addr: String, + _pairs: BTreeMap, _node_config: &NodeConfigCached, ) -> Result>, Error> { let (tx, rx) = bounded(16); let tx2 = tx.clone(); tokio::task::spawn( async move { - let mut conn = tokio::net::TcpStream::connect(addr).await?; + let mut conn = tokio::net::TcpStream::connect("S30CB06-CVME-LLRF2.psi.ch:5064").await?; let (mut inp, mut out) = conn.split(); tx.send(Ok(FetchItem::Log(format!("connected")))).await?; let mut buf = [0; 64];