From 7c26b725372aa3651809113550b9eaa9c64c868d Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 21 Jul 2023 23:06:15 +0200 Subject: [PATCH] Cleanup and load test --- .cargo/config.toml | 1 + crates/daqbuffer/Cargo.toml | 4 - crates/daqbuffer/build.rs | 7 + crates/daqbuffer/src/bin/daqbuffer.rs | 9 ++ crates/disk/src/disk.rs | 130 +++++++----------- .../disk/src/merge/mergedblobsfromremotes.rs | 2 +- crates/disk/src/raw/conn.rs | 4 +- crates/httpclient/src/httpclient.rs | 1 + crates/httpret/src/api1.rs | 28 +++- crates/httpret/src/download.rs | 19 ++- crates/httpret/src/proxy.rs | 1 + crates/netpod/src/netpod.rs | 37 ++++- crates/query/src/api4/events.rs | 3 +- crates/streams/src/frames/inmem.rs | 1 + crates/taskrun/Cargo.toml | 10 +- crates/taskrun/src/taskrun.rs | 32 +++-- 16 files changed, 171 insertions(+), 118 deletions(-) create mode 100644 crates/daqbuffer/build.rs diff --git a/.cargo/config.toml b/.cargo/config.toml index 3522d2d..962ced9 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -7,4 +7,5 @@ rustflags = [ #"-C", "inline-threshold=1000", #"-Z", "time-passes=yes", #"-Z", "time-llvm-passes=yes", + "--cfg", "tokio_unstable", ] diff --git a/crates/daqbuffer/Cargo.toml b/crates/daqbuffer/Cargo.toml index f2df0bd..be8e4c5 100644 --- a/crates/daqbuffer/Cargo.toml +++ b/crates/daqbuffer/Cargo.toml @@ -5,12 +5,8 @@ authors = ["Dominik Werder "] edition = "2021" [dependencies] -tokio = { version = "1.22.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } hyper = "0.14" http = "0.2" -tracing = "0.1.25" -tracing-subscriber = "0.2.17" -futures-core = "0.3.14" futures-util = "0.3.14" bytes = "1.0.1" #dashmap = "3" diff --git a/crates/daqbuffer/build.rs b/crates/daqbuffer/build.rs new file mode 100644 index 0000000..9c2650f --- /dev/null +++ b/crates/daqbuffer/build.rs @@ -0,0 +1,7 @@ +fn main() { + let names = ["DEBUG", "OPT_LEVEL", "TARGET", "CARGO_ENCODED_RUSTFLAGS"]; + for name in names { + let val = std::env::var(name).unwrap(); + println!("cargo:rustc-env=DAQBUF_{}={}", name, val); + } +} diff --git a/crates/daqbuffer/src/bin/daqbuffer.rs b/crates/daqbuffer/src/bin/daqbuffer.rs index 785ac87..373d01e 100644 --- a/crates/daqbuffer/src/bin/daqbuffer.rs +++ b/crates/daqbuffer/src/bin/daqbuffer.rs @@ -15,6 +15,7 @@ use netpod::NodeConfig; use netpod::NodeConfigCached; use netpod::ProxyConfig; use netpod::ServiceVersion; +use taskrun::tokio; use tokio::fs::File; use tokio::io::AsyncReadExt; @@ -66,6 +67,14 @@ async fn go() -> Result<(), Error> { SubCmd::Retrieval(subcmd) => { info!("daqbuffer version {} 0000", clap::crate_version!()); info!("{:?}", service_version); + { + #[allow(non_snake_case)] + let TARGET = std::env!("DAQBUF_TARGET"); + #[allow(non_snake_case)] + let CARGO_ENCODED_RUSTFLAGS = std::env!("DAQBUF_CARGO_ENCODED_RUSTFLAGS"); + info!("TARGET: {:?}", TARGET); + info!("CARGO_ENCODED_RUSTFLAGS: {:?}", CARGO_ENCODED_RUSTFLAGS); + } let mut config_file = File::open(&subcmd.config).await?; let mut buf = Vec::new(); config_file.read_to_end(&mut buf).await?; diff --git a/crates/disk/src/disk.rs b/crates/disk/src/disk.rs index 89633dd..e773d3d 100644 --- a/crates/disk/src/disk.rs +++ b/crates/disk/src/disk.rs @@ -19,7 +19,6 @@ pub mod streamlog; pub use parse; -use bytes::Bytes; use bytes::BytesMut; use err::Error; use futures_util::future::FusedFuture; @@ -31,7 +30,6 @@ use netpod::log::*; use netpod::ByteOrder; use netpod::DiskIoTune; use netpod::DtNano; -use netpod::Node; use netpod::ReadSys; use netpod::ScalarType; use netpod::SfDbChannel; @@ -85,64 +83,6 @@ pub struct AggQuerySingleChannel { pub buffer_size: u32, } -// TODO transform this into a self-test or remove. -pub async fn read_test_1(query: &AggQuerySingleChannel, node: Node) -> Result { - let path = paths::datapath(query.timebin as u64, &query.channel_config, 0, &node); - debug!("try path: {:?}", path); - let fin = OpenOptions::new().read(true).open(path).await?; - let meta = fin.metadata().await; - debug!("file meta {:?}", meta); - let stream = netpod::BodyStream { - inner: Box::new(FileReader { - file: fin, - nreads: 0, - buffer_size: query.buffer_size, - }), - }; - Ok(stream) -} - -struct FileReader { - file: tokio::fs::File, - nreads: u32, - buffer_size: u32, -} - -impl Stream for FileReader { - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - err::todo(); - // TODO remove if no longer used? - let blen = self.buffer_size as usize; - let mut buf2 = BytesMut::with_capacity(blen); - buf2.resize(buf2.capacity(), 0); - if buf2.as_mut().len() != blen { - panic!("logic"); - } - let mut buf = tokio::io::ReadBuf::new(buf2.as_mut()); - if buf.filled().len() != 0 { - panic!("logic"); - } - match Pin::new(&mut self.file).poll_read(cx, &mut buf) { - Poll::Ready(Ok(_)) => { - let rlen = buf.filled().len(); - if rlen == 0 { - Poll::Ready(None) - } else { - if rlen != blen { - info!("short read {} of {}", buf.filled().len(), blen); - } - self.nreads += 1; - Poll::Ready(Some(Ok(buf2.freeze()))) - } - } - Poll::Ready(Err(e)) => Poll::Ready(Some(Err(Error::from(e)))), - Poll::Pending => Poll::Pending, - } - } -} - pub struct Fopen1 { #[allow(dead_code)] opts: OpenOptions, @@ -196,6 +136,7 @@ unsafe impl Send for Fopen1 {} pub struct FileContentStream { file: File, + cap: usize, disk_io_tune: DiskIoTune, read_going: bool, buf: BytesMut, @@ -203,25 +144,49 @@ pub struct FileContentStream { nlog: usize, done: bool, complete: bool, + total_read: usize, +} + +impl Drop for FileContentStream { + fn drop(&mut self) { + debug!("FileContentStream total_read {} cap {}", self.total_read, self.cap); + } } impl FileContentStream { - pub fn type_name() -> &'static str { + pub fn self_name() -> &'static str { std::any::type_name::() } pub fn new(file: File, disk_io_tune: DiskIoTune) -> Self { + let cap = disk_io_tune.read_buffer_len; + let buf = Self::prepare_buf(cap); Self { file, + cap, disk_io_tune, read_going: false, - buf: BytesMut::new(), + buf, ts1: Instant::now(), nlog: 0, done: false, complete: false, + total_read: 0, } } + + fn prepare_buf(cap: usize) -> BytesMut { + let mut buf = BytesMut::with_capacity(cap); + unsafe { + // SAFETY if we got here, then we have the required capacity + buf.set_len(buf.capacity()); + } + buf + } + + fn mut_file_and_buf(&mut self) -> (&mut File, &mut BytesMut) { + (&mut self.file, &mut self.buf) + } } impl Stream for FileContentStream { @@ -231,29 +196,31 @@ impl Stream for FileContentStream { use Poll::*; loop { break if self.complete { - panic!("{} poll_next on complete", Self::type_name()) + panic!("{} poll_next on complete", Self::self_name()) } else if self.done { self.complete = true; Ready(None) } else { - let mut buf = if !self.read_going { + if !self.read_going { + self.read_going = true; self.ts1 = Instant::now(); - let mut buf = BytesMut::new(); - buf.resize(self.disk_io_tune.read_buffer_len, 0); - buf - } else { - mem::replace(&mut self.buf, BytesMut::new()) - }; - let mutsl = buf.as_mut(); - let mut rb = ReadBuf::new(mutsl); - let f1 = &mut self.file; - let f2 = Pin::new(f1); - let pollres = AsyncRead::poll_read(f2, cx, &mut rb); - match pollres { - Ready(Ok(_)) => { + // TODO remove + // std::thread::sleep(Duration::from_millis(10)); + } + let (file, buf) = self.mut_file_and_buf(); + let mut rb = ReadBuf::new(buf.as_mut()); + futures_util::pin_mut!(file); + match AsyncRead::poll_read(file, cx, &mut rb) { + Ready(Ok(())) => { let nread = rb.filled().len(); + if nread < rb.capacity() { + debug!("read less than capacity {} vs {}", nread, rb.capacity()); + } + let cap = self.cap; + let mut buf = mem::replace(&mut self.buf, Self::prepare_buf(cap)); buf.truncate(nread); self.read_going = false; + self.total_read += nread; let ts2 = Instant::now(); if nread == 0 { let ret = FileChunkRead::with_buf_dur(buf, ts2.duration_since(self.ts1)); @@ -263,7 +230,7 @@ impl Stream for FileContentStream { let ret = FileChunkRead::with_buf_dur(buf, ts2.duration_since(self.ts1)); if false && self.nlog < 6 { self.nlog += 1; - info!("{:?} ret {:?}", self.disk_io_tune, ret); + debug!("{:?} ret {:?}", self.disk_io_tune, ret); } Ready(Some(Ok(ret))) } @@ -286,6 +253,7 @@ fn start_read5( disk_io_tune: DiskIoTune, reqid: String, ) -> Result<(), Error> { + warn!("start_read5"); let fut = async move { let mut file = file; let pos_beg = match file.stream_position().await { @@ -760,9 +728,13 @@ pub fn file_content_stream( where S: Into, { + if let ReadSys::TokioAsyncRead = disk_io_tune.read_sys { + } else { + warn!("reading via {:?}", disk_io_tune.read_sys); + } let reqid = reqid.into(); debug!("file_content_stream disk_io_tune {disk_io_tune:?}"); - match &disk_io_tune.read_sys { + match disk_io_tune.read_sys { ReadSys::TokioAsyncRead => { let s = FileContentStream::new(file, disk_io_tune); Box::pin(s) as Pin + Send>> diff --git a/crates/disk/src/merge/mergedblobsfromremotes.rs b/crates/disk/src/merge/mergedblobsfromremotes.rs index 5f05592..94f1339 100644 --- a/crates/disk/src/merge/mergedblobsfromremotes.rs +++ b/crates/disk/src/merge/mergedblobsfromremotes.rs @@ -28,7 +28,7 @@ pub struct MergedBlobsFromRemotes { impl MergedBlobsFromRemotes { pub fn new(subq: EventsSubQuery, cluster: Cluster) -> Self { - debug!("MergedBlobsFromRemotes subq {:?}", subq); + debug!("MergedBlobsFromRemotes::new subq {:?}", subq); let mut tcp_establish_futs = Vec::new(); for node in &cluster.nodes { let f = x_processed_event_blobs_stream_from_node(subq.clone(), node.clone()); diff --git a/crates/disk/src/raw/conn.rs b/crates/disk/src/raw/conn.rs index 5739054..4cd4167 100644 --- a/crates/disk/src/raw/conn.rs +++ b/crates/disk/src/raw/conn.rs @@ -175,7 +175,7 @@ pub async fn make_event_blobs_pipe_real( fetch_info.clone(), expand, event_chunker_conf, - DiskIoTune::default(), + DiskIoTune::default().with_read_buffer_len(subq.buf_len_disk_io()), reqctx, node_config, )?; @@ -186,7 +186,7 @@ pub async fn make_event_blobs_pipe_real( fetch_info.clone(), expand, event_chunker_conf, - DiskIoTune::default(), + DiskIoTune::default().with_read_buffer_len(subq.buf_len_disk_io()), reqctx, node_config, )?; diff --git a/crates/httpclient/src/httpclient.rs b/crates/httpclient/src/httpclient.rs index 6279e80..b5483da 100644 --- a/crates/httpclient/src/httpclient.rs +++ b/crates/httpclient/src/httpclient.rs @@ -108,6 +108,7 @@ impl HttpBodyAsAsyncRead { impl AsyncRead for HttpBodyAsAsyncRead { fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut ReadBuf) -> Poll> { + trace!("impl AsyncRead for HttpBodyAsAsyncRead"); use Poll::*; if self.left.len() != 0 { let n1 = buf.remaining(); diff --git a/crates/httpret/src/api1.rs b/crates/httpret/src/api1.rs index fe0e799..0e86acc 100644 --- a/crates/httpret/src/api1.rs +++ b/crates/httpret/src/api1.rs @@ -32,6 +32,7 @@ use netpod::ChannelSearchQuery; use netpod::ChannelSearchResult; use netpod::ChannelTypeConfigGen; use netpod::DiskIoTune; +use netpod::FromUrl; use netpod::NodeConfigCached; use netpod::ProxyConfig; use netpod::ReqCtxArc; @@ -544,6 +545,7 @@ impl DataApiPython3DataStream { reqctx: ReqCtxArc, node_config: NodeConfigCached, ) -> Self { + debug!("DataApiPython3DataStream::new settings {settings:?} disk_io_tune {disk_io_tune:?}"); Self { range, channels: channels.into_iter().collect(), @@ -910,17 +912,33 @@ impl Api1EventsBinaryHandler { } else { tracing::Span::none() }; + let url = { + let s1 = format!("dummy:{}", head.uri); + Url::parse(&s1) + .map_err(Error::from) + .map_err(|e| e.add_public_msg(format!("Can not parse query url")))? + }; + let disk_tune = DiskIoTune::from_url(&url)?; let reqidspan = tracing::info_span!("api1query", reqid = reqctx.reqid()); - self.handle_for_query(qu, accept, &reqctx, span.clone(), reqidspan.clone(), node_config) - .instrument(span) - .instrument(reqidspan) - .await + self.handle_for_query( + qu, + accept, + disk_tune, + &reqctx, + span.clone(), + reqidspan.clone(), + node_config, + ) + .instrument(span) + .instrument(reqidspan) + .await } pub async fn handle_for_query( &self, qu: Api1Query, accept: String, + disk_io_tune: DiskIoTune, reqctx: &ReqCtxArc, span: tracing::Span, reqidspan: tracing::Span, @@ -981,7 +999,7 @@ impl Api1EventsBinaryHandler { chans, // TODO carry those settings from the query again settings, - DiskIoTune::default(), + disk_io_tune, qu.decompress() .unwrap_or_else(|| ncc.node_config.cluster.decompress_default()), qu.events_max().unwrap_or(u64::MAX), diff --git a/crates/httpret/src/download.rs b/crates/httpret/src/download.rs index 5422fc9..b9406a0 100644 --- a/crates/httpret/src/download.rs +++ b/crates/httpret/src/download.rs @@ -27,20 +27,17 @@ impl FromUrl for DownloadQuery { fn from_pairs(pairs: &std::collections::BTreeMap) -> Result { let read_sys = pairs .get("ReadSys") - .map(|x| x as &str) - .unwrap_or("TokioAsyncRead") - .into(); + .map(|x| x.as_str().into()) + .unwrap_or_else(|| netpod::ReadSys::default()); let read_buffer_len = pairs .get("ReadBufferLen") - .map(|x| x as &str) - .unwrap_or("xx") - .parse() + .map(|x| x.parse().map_or(None, Some)) + .unwrap_or(None) .unwrap_or(1024 * 4); let read_queue_len = pairs .get("ReadQueueLen") - .map(|x| x as &str) - .unwrap_or("xx") - .parse() + .map(|x| x.parse().map_or(None, Some)) + .unwrap_or(None) .unwrap_or(8); let disk_io_tune = DiskIoTune { read_sys, @@ -67,10 +64,10 @@ impl DownloadHandler { } } - pub async fn get(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + pub async fn get(&self, req: Request, ncc: &NodeConfigCached) -> Result, Error> { let (head, _body) = req.into_parts(); let p2 = &head.uri.path()[Self::path_prefix().len()..]; - let base = match &node_config.node.sf_databuffer { + let base = match &ncc.node.sf_databuffer { Some(k) => k.data_base_path.clone(), None => "/UNDEFINED".into(), }; diff --git a/crates/httpret/src/proxy.rs b/crates/httpret/src/proxy.rs index e7faf8f..3d7c547 100644 --- a/crates/httpret/src/proxy.rs +++ b/crates/httpret/src/proxy.rs @@ -251,6 +251,7 @@ impl Stream for FileStream { let mut rb = ReadBuf::new(&mut buf); let f = &mut self.file; pin_mut!(f); + trace!("poll_read for proxy distri"); match f.poll_read(cx, &mut rb) { Ready(k) => match k { Ok(_) => { diff --git a/crates/netpod/src/netpod.rs b/crates/netpod/src/netpod.rs index f1fb0d9..0b04c29 100644 --- a/crates/netpod/src/netpod.rs +++ b/crates/netpod/src/netpod.rs @@ -2357,7 +2357,7 @@ pub enum ReadSys { impl ReadSys { pub fn default() -> Self { - Self::Read5 + Self::TokioAsyncRead } } @@ -2394,6 +2394,7 @@ impl DiskIoTune { read_queue_len: 4, } } + pub fn default() -> Self { Self { read_sys: ReadSys::default(), @@ -2401,6 +2402,11 @@ impl DiskIoTune { read_queue_len: 4, } } + + pub fn with_read_buffer_len(mut self, x: usize) -> Self { + self.read_buffer_len = x; + self + } } impl Default for DiskIoTune { @@ -2409,6 +2415,35 @@ impl Default for DiskIoTune { } } +impl FromUrl for DiskIoTune { + fn from_url(url: &Url) -> Result { + Self::from_pairs(&get_url_query_pairs(url)) + } + + fn from_pairs(pairs: &BTreeMap) -> Result { + let read_sys = pairs + .get("ReadSys") + .map(|x| x.as_str().into()) + .unwrap_or_else(|| ReadSys::default()); + let read_buffer_len = pairs + .get("ReadBufferLen") + .map(|x| x.parse().map_or(None, Some)) + .unwrap_or(None) + .unwrap_or(1024 * 4); + let read_queue_len = pairs + .get("ReadQueueLen") + .map(|x| x.parse().map_or(None, Some)) + .unwrap_or(None) + .unwrap_or(8); + let ret = DiskIoTune { + read_sys, + read_buffer_len, + read_queue_len, + }; + Ok(ret) + } +} + #[derive(Debug, Serialize, Deserialize)] pub struct ChannelSearchQuery { pub backend: Option, diff --git a/crates/query/src/api4/events.rs b/crates/query/src/api4/events.rs index 37b169b..3897610 100644 --- a/crates/query/src/api4/events.rs +++ b/crates/query/src/api4/events.rs @@ -365,13 +365,14 @@ impl From<&BinnedQuery> for EventsSubQuerySettings { impl From<&Api1Query> for EventsSubQuerySettings { fn from(value: &Api1Query) -> Self { + let disk_io_tune = value.disk_io_tune(); Self { timeout: value.timeout(), // TODO ? events_max: None, event_delay: None, stream_batch_len: None, - buf_len_disk_io: None, + buf_len_disk_io: Some(disk_io_tune.read_buffer_len), test_do_wasm: false, create_errors: Vec::new(), } diff --git a/crates/streams/src/frames/inmem.rs b/crates/streams/src/frames/inmem.rs index 7640ecc..8f3151d 100644 --- a/crates/streams/src/frames/inmem.rs +++ b/crates/streams/src/frames/inmem.rs @@ -69,6 +69,7 @@ where let mut buf = ReadBuf::new(self.buf.available_writable_area(self.need_min - self.buf.len())?); let inp = &mut self.inp; pin_mut!(inp); + trace!("poll_upstream"); match AsyncRead::poll_read(inp, cx, &mut buf) { Ready(Ok(())) => { let n = buf.filled().len(); diff --git a/crates/taskrun/Cargo.toml b/crates/taskrun/Cargo.toml index 14bb3d5..f2b3ec4 100644 --- a/crates/taskrun/Cargo.toml +++ b/crates/taskrun/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "taskrun" -version = "0.0.3" +version = "0.0.4" authors = ["Dominik Werder "] edition = "2021" @@ -8,13 +8,13 @@ edition = "2021" path = "src/taskrun.rs" [dependencies] -futures-util = "0.3" -tokio = { version = "1.21.2", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs", "tracing"] } +tokio = { version = "1.29.1", features = ["full", "tracing"] } +futures-util = "0.3.28" tracing = "0.1.37" -tracing-subscriber = { version = "0.3.16", features = ["fmt", "time"] } +tracing-subscriber = { version = "0.3.17", features = ["fmt", "time"] } #tracing-loki = { version = "0.2.1", default-features = false, features = ["compat-0-2-1"] } +console-subscriber = { version = "0.1.10" } time = { version = "0.3", features = ["formatting"] } -console-subscriber = "0.1.5" backtrace = "0.3.56" lazy_static = "1.4.0" chrono = "0.4" diff --git a/crates/taskrun/src/taskrun.rs b/crates/taskrun/src/taskrun.rs index 3d00a25..508b637 100644 --- a/crates/taskrun/src/taskrun.rs +++ b/crates/taskrun/src/taskrun.rs @@ -1,3 +1,5 @@ +pub use tokio; + use crate::log::*; use err::Error; use std::fmt; @@ -85,7 +87,8 @@ where eprintln!("ERROR tracing: can not init"); } } - let res = runtime.block_on(async { fut.await }); + // let res = runtime.block_on(async { fut.await }); + let res = runtime.block_on(fut); match res { Ok(k) => Ok(k), Err(e) => { @@ -115,13 +118,24 @@ fn tracing_init_inner() -> Result<(), Error> { .with_ansi(false) .with_thread_names(true) .with_filter(filter); - let z = tracing_subscriber::registry().with(fmt_layer); - #[cfg(CONSOLE)] - { - let console_layer = console_subscriber::spawn(); - let z = z.with(console_layer); - } - z.try_init().map_err(|e| format!("{e}"))?; + + let console_layer = console_subscriber::ConsoleLayer::builder() + .retention(std::time::Duration::from_secs(4)) + .server_addr(([127, 0, 0, 1], 2875)) + .spawn(); + // .build(); + + // eprintln!("spawn console sever"); + // tokio::spawn(console_server.serve()); + + let reg = tracing_subscriber::registry().with(console_layer); + + let reg = reg.with(fmt_layer); + + reg.try_init().map_err(|e| { + eprintln!("SOMETHING BAD HAPPENED: {e}"); + format!("{e}") + })?; } #[cfg(DISABLED_LOKI)] // TODO tracing_loki seems not well composable, try open telemetry instead. @@ -176,7 +190,7 @@ pub fn tracing_init() -> Result<(), ()> { } else if *initg == 1 { Ok(()) } else { - eprintln!("ERROR Unknown tracing state"); + eprintln!("ERROR unknown tracing state"); Err(()) } }