diff --git a/disk/src/aggtest.rs b/disk/src/aggtest.rs index a5a7b1e..05bdcf9 100644 --- a/disk/src/aggtest.rs +++ b/disk/src/aggtest.rs @@ -1,7 +1,6 @@ use crate::eventblobs::EventChunkerMultifile; use crate::eventchunker::EventChunkerConf; -use netpod::{test_data_base_path_databuffer, FileIoBufferSize}; -use netpod::{timeunits::*, SfDatabuffer}; +use netpod::{test_data_base_path_databuffer, timeunits::*, SfDatabuffer}; use netpod::{ByteOrder, ByteSize, Channel, ChannelConfig, NanoRange, Nanos, Node, ScalarType, Shape}; #[allow(unused_imports)] use tracing::{debug, error, info, trace, warn}; @@ -58,13 +57,15 @@ async fn agg_x_dim_0_inner() { let ts2 = ts1 + HOUR * 24; let range = NanoRange { beg: ts1, end: ts2 }; let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); - let file_io_buffer_size = FileIoBufferSize::new(query.buffer_size as usize); + // TODO let upstream already provide DiskIoTune: + let mut disk_io_tune = netpod::DiskIoTune::default_for_testing(); + disk_io_tune.read_buffer_len = query.buffer_size as usize; let fut1 = EventChunkerMultifile::new( range.clone(), query.channel_config.clone(), node.clone(), 0, - file_io_buffer_size, + disk_io_tune, event_chunker_conf, false, true, @@ -110,13 +111,15 @@ async fn agg_x_dim_1_inner() { let ts2 = ts1 + HOUR * 24; let range = NanoRange { beg: ts1, end: ts2 }; let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); - let file_io_buffer_size = FileIoBufferSize::new(query.buffer_size as usize); + // TODO let upstream already provide DiskIoTune: + let mut disk_io_tune = netpod::DiskIoTune::default_for_testing(); + disk_io_tune.read_buffer_len = query.buffer_size as usize; let fut1 = super::eventblobs::EventChunkerMultifile::new( range.clone(), query.channel_config.clone(), node.clone(), 0, - file_io_buffer_size, + disk_io_tune, event_chunker_conf, false, true, diff --git a/disk/src/binned.rs b/disk/src/binned.rs index e4e8c47..4de12dc 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -111,11 +111,14 @@ impl ChannelExecFunction for BinnedBinaryChannelExec { debug!( "BinnedBinaryChannelExec no covering range for prebinned, merge from remotes instead {range:?}" ); + // TODO let BinnedQuery provide the DiskIoTune. + let mut disk_io_tune = netpod::DiskIoTune::default(); + disk_io_tune.read_buffer_len = self.query.disk_io_buffer_size() as usize; let evq = RawEventsQuery { channel: self.query.channel().clone(), range: self.query.range().clone(), agg_kind: self.query.agg_kind().clone(), - disk_io_buffer_size: self.query.disk_io_buffer_size(), + disk_io_tune, do_decompress: true, }; let x_bin_count = x_bin_count(&shape, self.query.agg_kind()); @@ -360,11 +363,14 @@ impl ChannelExecFunction for BinnedJsonChannelExec { } Ok(None) => { debug!("BinnedJsonChannelExec no covering range for prebinned, merge from remotes instead {range:?}"); + // TODO let BinnedQuery provide the DiskIoTune. + let mut disk_io_tune = netpod::DiskIoTune::default(); + disk_io_tune.read_buffer_len = self.query.disk_io_buffer_size() as usize; let evq = RawEventsQuery { channel: self.query.channel().clone(), range: self.query.range().clone(), agg_kind: self.query.agg_kind().clone(), - disk_io_buffer_size: self.query.disk_io_buffer_size(), + disk_io_tune, do_decompress: true, }; let x_bin_count = x_bin_count(&shape, self.query.agg_kind()); diff --git a/disk/src/binned/pbv.rs b/disk/src/binned/pbv.rs index 5e8cf62..bb41b7f 100644 --- a/disk/src/binned/pbv.rs +++ b/disk/src/binned/pbv.rs @@ -105,11 +105,14 @@ where Pin::Output as TimeBinnableType>::Output>> + Send>>, Error, > { + // TODO let PreBinnedQuery provide the tune: + let mut disk_io_tune = netpod::DiskIoTune::default(); + disk_io_tune.read_buffer_len = self.query.disk_io_buffer_size(); let evq = RawEventsQuery { channel: self.query.channel().clone(), range: self.query.patch().patch_range(), agg_kind: self.query.agg_kind().clone(), - disk_io_buffer_size: self.query.disk_io_buffer_size(), + disk_io_tune, do_decompress: true, }; if self.query.patch().patch_t_len() % self.query.patch().bin_t_len() != 0 { diff --git a/disk/src/channelexec.rs b/disk/src/channelexec.rs index 2646085..fd481ba 100644 --- a/disk/src/channelexec.rs +++ b/disk/src/channelexec.rs @@ -274,11 +274,14 @@ impl ChannelExecFunction for PlainEvents { let _ = byte_order; let _ = event_value_shape; let perf_opts = PerfOpts { inmem_bufcap: 4096 }; + // TODO let upstream provide DiskIoTune + let mut disk_io_tune = netpod::DiskIoTune::default(); + disk_io_tune.read_buffer_len = self.disk_io_buffer_size; let evq = RawEventsQuery { channel: self.channel, range: self.range, agg_kind: self.agg_kind, - disk_io_buffer_size: self.disk_io_buffer_size, + disk_io_tune, do_decompress: true, }; let s = MergedFromRemotes::>::new(evq, perf_opts, self.node_config.node_config.cluster); @@ -452,11 +455,14 @@ impl ChannelExecFunction for PlainEventsJson { let _ = byte_order; let _ = event_value_shape; let perf_opts = PerfOpts { inmem_bufcap: 4096 }; + // TODO let upstream provide DiskIoTune + let mut disk_io_tune = netpod::DiskIoTune::default(); + disk_io_tune.read_buffer_len = self.disk_io_buffer_size; let evq = RawEventsQuery { channel: self.channel, range: self.range, agg_kind: self.agg_kind, - disk_io_buffer_size: self.disk_io_buffer_size, + disk_io_tune, do_decompress: true, }; let s = MergedFromRemotes::::new(evq, perf_opts, self.node_config.node_config.cluster); diff --git a/disk/src/disk.rs b/disk/src/disk.rs index 2510ef0..0f43c55 100644 --- a/disk/src/disk.rs +++ b/disk/src/disk.rs @@ -1,25 +1,3 @@ -use crate::dtflags::{ARRAY, BIG_ENDIAN, COMPRESSION, SHAPE}; -use bytes::{Bytes, BytesMut}; -use err::Error; -use futures_core::Stream; -use futures_util::future::FusedFuture; -use futures_util::{FutureExt, StreamExt, TryFutureExt}; -use netpod::histo::HistoLog2; -use netpod::{log::*, FileIoBufferSize}; -use netpod::{ChannelConfig, Node, Shape}; -use readat::ReadResult; -use std::collections::VecDeque; -use std::future::Future; -use std::io::SeekFrom; -use std::os::unix::prelude::AsRawFd; -use std::path::PathBuf; -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::time::{Duration, Instant}; -use std::{fmt, mem}; -use tokio::fs::{File, OpenOptions}; -use tokio::io::{AsyncRead, AsyncSeekExt, ReadBuf}; - pub mod agg; #[cfg(test)] pub mod aggtest; @@ -38,9 +16,31 @@ pub mod index; pub mod merge; pub mod paths; pub mod raw; -pub mod readat; +pub mod read3; pub mod streamlog; +use crate::dtflags::{ARRAY, BIG_ENDIAN, COMPRESSION, SHAPE}; +use bytes::{Bytes, BytesMut}; +use err::Error; +use futures_core::Stream; +use futures_util::future::FusedFuture; +use futures_util::{FutureExt, StreamExt, TryFutureExt}; +use netpod::histo::HistoLog2; +use netpod::log::*; +use netpod::{ChannelConfig, DiskIoTune, Node, Shape}; +use read3::ReadResult; +use std::collections::VecDeque; +use std::future::Future; +use std::io::SeekFrom; +use std::os::unix::prelude::AsRawFd; +use std::path::PathBuf; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::{Duration, Instant}; +use std::{fmt, mem}; +use tokio::fs::{File, OpenOptions}; +use tokio::io::{AsyncRead, AsyncSeekExt, ReadBuf}; + // TODO transform this into a self-test or remove. pub async fn read_test_1(query: &netpod::AggQuerySingleChannel, node: Node) -> Result { let path = paths::datapath(query.timebin as u64, &query.channel_config, 0, &node); @@ -152,20 +152,20 @@ unsafe impl Send for Fopen1 {} pub struct FileChunkRead { buf: BytesMut, - cap0: usize, - rem0: usize, - remmut0: usize, duration: Duration, } +impl FileChunkRead { + pub fn into_buf(self) -> BytesMut { + self.buf + } +} + impl fmt::Debug for FileChunkRead { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("FileChunkRead") .field("buf.len", &self.buf.len()) .field("buf.cap", &self.buf.capacity()) - .field("cap0", &self.cap0) - .field("rem0", &self.rem0) - .field("remmut0", &self.remmut0) .field("duration", &self.duration) .finish() } @@ -173,7 +173,7 @@ impl fmt::Debug for FileChunkRead { pub struct FileContentStream { file: File, - file_io_buffer_size: FileIoBufferSize, + disk_io_tune: DiskIoTune, read_going: bool, buf: BytesMut, ts1: Instant, @@ -183,10 +183,10 @@ pub struct FileContentStream { } impl FileContentStream { - pub fn new(file: File, file_io_buffer_size: FileIoBufferSize) -> Self { + pub fn new(file: File, disk_io_tune: DiskIoTune) -> Self { Self { file, - file_io_buffer_size, + disk_io_tune, read_going: false, buf: BytesMut::new(), ts1: Instant::now(), @@ -212,7 +212,7 @@ impl Stream for FileContentStream { let mut buf = if !self.read_going { self.ts1 = Instant::now(); let mut buf = BytesMut::new(); - buf.resize(self.file_io_buffer_size.0, 0); + buf.resize(self.disk_io_tune.read_buffer_len, 0); buf } else { mem::replace(&mut self.buf, BytesMut::new()) @@ -225,18 +225,12 @@ impl Stream for FileContentStream { match pollres { Ready(Ok(_)) => { let nread = rb.filled().len(); - let cap0 = rb.capacity(); - let rem0 = rb.remaining(); - let remmut0 = nread; buf.truncate(nread); self.read_going = false; let ts2 = Instant::now(); if nread == 0 { let ret = FileChunkRead { buf, - cap0, - rem0, - remmut0, duration: ts2.duration_since(self.ts1), }; self.done = true; @@ -244,14 +238,11 @@ impl Stream for FileContentStream { } else { let ret = FileChunkRead { buf, - cap0, - rem0, - remmut0, duration: ts2.duration_since(self.ts1), }; if false && self.nlog < 6 { self.nlog += 1; - info!("{:?} ret {:?}", self.file_io_buffer_size, ret); + info!("{:?} ret {:?}", self.disk_io_tune, ret); } Ready(Some(Ok(ret))) } @@ -269,13 +260,15 @@ impl Stream for FileContentStream { pub fn file_content_stream( file: File, - file_io_buffer_size: FileIoBufferSize, + disk_io_tune: DiskIoTune, ) -> impl Stream> + Send { - FileContentStream::new(file, file_io_buffer_size) + warn!("file_content_stream disk_io_tune {disk_io_tune:?}"); + FileContentStream::new(file, disk_io_tune) } enum FCS2 { GetPosition, + ReadingSimple, Reading, } @@ -288,16 +281,17 @@ pub struct FileContentStream2 { fcs2: FCS2, file: Pin>, file_pos: u64, - file_io_buffer_size: FileIoBufferSize, + eof: bool, + disk_io_tune: DiskIoTune, get_position_fut: Pin> + Send>>, + read_fut: Pin> + Send>>, reads: VecDeque, - nlog: usize, done: bool, complete: bool, } impl FileContentStream2 { - pub fn new(file: File, file_io_buffer_size: FileIoBufferSize) -> Self { + pub fn new(file: File, disk_io_tune: DiskIoTune) -> Self { let mut file = Box::pin(file); let ffr = unsafe { let ffr = Pin::get_unchecked_mut(file.as_mut()); @@ -305,15 +299,18 @@ impl FileContentStream2 { }; let ff = ffr .seek(SeekFrom::Current(0)) - .map_err(|e| Error::with_msg_no_trace(format!("Seek error"))); + .map_err(|_| Error::with_msg_no_trace(format!("Seek error"))); Self { fcs2: FCS2::GetPosition, file, file_pos: 0, - file_io_buffer_size, + eof: false, + disk_io_tune, get_position_fut: Box::pin(ff), + read_fut: Box::pin(futures_util::future::ready(Err(Error::with_msg_no_trace(format!( + "dummy" + ))))), reads: VecDeque::new(), - nlog: 0, done: false, complete: false, } @@ -335,7 +332,17 @@ impl Stream for FileContentStream2 { match self.fcs2 { FCS2::GetPosition => match self.get_position_fut.poll_unpin(cx) { Ready(Ok(k)) => { + info!("current file pos: {k}"); self.file_pos = k; + if false { + let fd = self.file.as_raw_fd(); + let count = self.disk_io_tune.read_buffer_len as u64; + self.read_fut = Box::pin(read3::Read3::get().read(fd, self.file_pos, count)); + self.file_pos += count; + self.fcs2 = FCS2::ReadingSimple; + } else { + self.fcs2 = FCS2::Reading; + } continue; } Ready(Err(e)) => { @@ -344,43 +351,96 @@ impl Stream for FileContentStream2 { } Pending => Pending, }, + FCS2::ReadingSimple => match self.read_fut.poll_unpin(cx) { + Ready(Ok(res)) => { + if res.eof { + let item = FileChunkRead { + buf: res.buf, + duration: Duration::from_millis(0), + }; + self.done = true; + Ready(Some(Ok(item))) + } else { + let item = FileChunkRead { + buf: res.buf, + duration: Duration::from_millis(0), + }; + let fd = self.file.as_raw_fd(); + let count = self.disk_io_tune.read_buffer_len as u64; + self.read_fut = Box::pin(read3::Read3::get().read(fd, self.file_pos, count)); + self.file_pos += count; + Ready(Some(Ok(item))) + } + } + Ready(Err(e)) => { + self.done = true; + Ready(Some(Err(e))) + } + Pending => Pending, + }, FCS2::Reading => { - // TODO Keep the read queue full. - // TODO Do not add more reads when EOF is encountered. - while self.reads.len() < 4 { - let count = self.file_io_buffer_size.bytes() as u64; - let r3 = readat::Read3::get(); - let x = r3.read(self.file.as_raw_fd(), self.file_pos, count); - self.reads.push_back(ReadStep::Fut(Box::pin(x))); + while !self.eof && self.reads.len() < self.disk_io_tune.read_queue_len { + let fd = self.file.as_raw_fd(); + let pos = self.file_pos; + let count = self.disk_io_tune.read_buffer_len as u64; + trace!("create ReadTask fd {fd} pos {pos} count {count}"); + let r3 = read3::Read3::get(); + let fut = r3.read(fd, pos, count); + self.reads.push_back(ReadStep::Fut(Box::pin(fut))); self.file_pos += count; } - // TODO must poll all futures to make progress... but if they resolve, must poll no more! - // therefore, need some enum type for the pending futures list to also store the resolved ones. for e in &mut self.reads { match e { ReadStep::Fut(k) => match k.poll_unpin(cx) { Ready(k) => { + trace!("received a result"); *e = ReadStep::Res(k); } Pending => {} }, - ReadStep::Res(_k) => {} + ReadStep::Res(_) => {} } } - // TODO Check the front if something is ready. if let Some(ReadStep::Res(_)) = self.reads.front() { if let Some(ReadStep::Res(res)) = self.reads.pop_front() { - // TODO check for error or return the read data. - // TODO if read data contains EOF flag, raise EOF flag also in self, - // and abort. - // TODO make sure that everything runs stable even if this Stream is simply dropped - // or read results are not waited for and channels or oneshots get dropped. + trace!("pop front result"); + match res { + Ok(rr) => { + if rr.eof { + if self.eof { + trace!("see EOF in ReadResult AGAIN"); + } else { + debug!("see EOF in ReadResult SET OUR FLAG"); + self.eof = true; + } + } + let res = FileChunkRead { + buf: rr.buf, + duration: Duration::from_millis(0), + }; + Ready(Some(Ok(res))) + } + Err(e) => { + error!("received ReadResult error: {e}"); + self.done = true; + let e = Error::with_msg(format!("I/O error: {e}")); + Ready(Some(Err(e))) + } + } } else { - // TODO return error, this should never happen because we check before. + self.done = true; + let e = Error::with_msg(format!("logic error")); + error!("{e}"); + Ready(Some(Err(e))) } + } else if let None = self.reads.front() { + debug!("empty read fut queue, end"); + self.done = true; + continue; + } else { + trace!("read fut queue Pending"); + Pending } - // TODO handle case that self.reads is empty. - todo!() } } }; @@ -390,9 +450,10 @@ impl Stream for FileContentStream2 { pub fn file_content_stream_2( file: File, - file_io_buffer_size: FileIoBufferSize, + disk_io_tune: DiskIoTune, ) -> impl Stream> + Send { - FileContentStream2::new(file, file_io_buffer_size) + warn!("file_content_stream_2 disk_io_tune {disk_io_tune:?}"); + FileContentStream2::new(file, disk_io_tune) } pub struct NeedMinBuffer { diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index 8523650..bd53926 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -1,14 +1,13 @@ use crate::dataopen::{open_expanded_files, open_files, OpenedFileSet}; use crate::eventchunker::{EventChunker, EventChunkerConf, EventFull}; -use crate::file_content_stream; use crate::merge::MergedStream; use err::Error; use futures_core::Stream; use futures_util::StreamExt; use items::{LogItem, RangeCompletableItem, Sitemty, StreamItem}; +use netpod::log::*; use netpod::timeunits::SEC; -use netpod::{log::*, FileIoBufferSize}; -use netpod::{ChannelConfig, NanoRange, Node}; +use netpod::{ChannelConfig, DiskIoTune, NanoRange, Node}; use std::pin::Pin; use std::task::{Context, Poll}; use streams::rangefilter::RangeFilter; @@ -21,7 +20,7 @@ pub struct EventChunkerMultifile { channel_config: ChannelConfig, file_chan: async_channel::Receiver>, evs: Option>>, - file_io_buffer_size: FileIoBufferSize, + disk_io_tune: DiskIoTune, event_chunker_conf: EventChunkerConf, range: NanoRange, data_completed: bool, @@ -41,7 +40,7 @@ impl EventChunkerMultifile { channel_config: ChannelConfig, node: Node, node_ix: usize, - file_io_buffer_size: FileIoBufferSize, + disk_io_tune: DiskIoTune, event_chunker_conf: EventChunkerConf, expand: bool, do_decompress: bool, @@ -54,7 +53,7 @@ impl EventChunkerMultifile { Self { file_chan, evs: None, - file_io_buffer_size, + disk_io_tune, event_chunker_conf, channel_config, range, @@ -157,9 +156,9 @@ impl Stream for EventChunkerMultifile { let item = LogItem::quick(Level::INFO, msg); match file.file { Some(file) => { - let inp = Box::pin(file_content_stream( + let inp = Box::pin(crate::file_content_stream( file, - self.file_io_buffer_size.clone(), + self.disk_io_tune.clone(), )); let chunker = EventChunker::from_event_boundary( inp, @@ -184,14 +183,14 @@ impl Stream for EventChunkerMultifile { Ready(Some(Ok(StreamItem::Log(item)))) } else { let msg = format!("handle OFS MERGED {:?}", ofs); - debug!("{}", msg); + info!("{}", msg); let item = LogItem::quick(Level::INFO, msg); let mut chunkers = vec![]; for of in ofs.files { if let Some(file) = of.file { - let inp = Box::pin(file_content_stream( + let inp = Box::pin(crate::file_content_stream_2( file, - self.file_io_buffer_size.clone(), + self.disk_io_tune.clone(), )); let chunker = EventChunker::from_event_boundary( inp, @@ -245,9 +244,9 @@ mod test { use err::Error; use futures_util::StreamExt; use items::{RangeCompletableItem, StreamItem}; - use netpod::log::*; use netpod::timeunits::{DAY, MS}; - use netpod::{ByteSize, ChannelConfig, FileIoBufferSize, Nanos}; + use netpod::{log::*, DiskIoTune}; + use netpod::{ByteSize, ChannelConfig, Nanos}; use streams::rangefilter::RangeFilter; fn read_expanded_for_range(range: netpod::NanoRange, nodeix: usize) -> Result<(usize, Vec), Error> { @@ -268,10 +267,10 @@ mod test { }; let cluster = netpod::test_cluster(); let node = cluster.nodes[nodeix].clone(); - let buffer_size = 512; let event_chunker_conf = EventChunkerConf { disk_stats_every: ByteSize::kb(1024), }; + let disk_io_tune = DiskIoTune::default_for_testing(); let task = async move { let mut event_count = 0; let events = EventChunkerMultifile::new( @@ -279,7 +278,7 @@ mod test { channel_config, node, nodeix, - FileIoBufferSize::new(buffer_size), + disk_io_tune, event_chunker_conf, true, true, diff --git a/disk/src/merge.rs b/disk/src/merge.rs index 1f189f8..56abddb 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -299,7 +299,7 @@ mod test { use netpod::log::*; use netpod::test_data_base_path_databuffer; use netpod::timeunits::{DAY, MS}; - use netpod::{ByteOrder, ByteSize, Channel, ChannelConfig, FileIoBufferSize, NanoRange, Nanos, ScalarType, Shape}; + use netpod::{ByteOrder, ByteSize, Channel, ChannelConfig, NanoRange, Nanos, ScalarType, Shape}; use std::path::PathBuf; fn scalar_file_path() -> PathBuf { @@ -336,8 +336,8 @@ mod test { let inps = files .into_iter() .map(|file| { - let file_io_buffer_size = FileIoBufferSize(1024 * 4); - let inp = file_content_stream(file, file_io_buffer_size); + let disk_io_tune = netpod::DiskIoTune::default(); + let inp = file_content_stream(file, disk_io_tune); inp }) .map(|inp| { diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index fb39e19..4db30a2 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -11,7 +11,7 @@ use items::numops::{BoolNum, NumOps, StringNum}; use items::{EventsNodeProcessor, Framable, RangeCompletableItem, Sitemty, StreamItem}; use netpod::log::*; use netpod::query::RawEventsQuery; -use netpod::{AggKind, ByteOrder, ByteSize, Channel, FileIoBufferSize, NanoRange, NodeConfigCached, ScalarType, Shape}; +use netpod::{AggKind, ByteOrder, ByteSize, Channel, DiskIoTune, NanoRange, NodeConfigCached, ScalarType, Shape}; use parse::channelconfig::{extract_matching_config_entry, read_local_config, ConfigEntry, MatchingConfigEntry}; use std::pin::Pin; @@ -194,7 +194,7 @@ pub async fn make_event_pipe( channel_config.clone(), node_config.node.clone(), node_config.ix, - FileIoBufferSize::new(evq.disk_io_buffer_size), + evq.disk_io_tune.clone(), event_chunker_conf, evq.agg_kind.need_expand(), true, @@ -235,10 +235,10 @@ pub fn make_local_event_blobs_stream( expand: bool, do_decompress: bool, event_chunker_conf: EventChunkerConf, - file_io_buffer_size: FileIoBufferSize, + disk_io_tune: DiskIoTune, node_config: &NodeConfigCached, ) -> Result { - info!("make_local_event_blobs_stream do_decompress {do_decompress} file_io_buffer_size {file_io_buffer_size:?}"); + info!("make_local_event_blobs_stream do_decompress {do_decompress} disk_io_tune {disk_io_tune:?}"); if do_decompress { warn!("Possible issue: decompress central storage event blob stream"); } @@ -261,7 +261,7 @@ pub fn make_local_event_blobs_stream( channel_config.clone(), node_config.node.clone(), node_config.ix, - file_io_buffer_size, + disk_io_tune, event_chunker_conf, expand, do_decompress, @@ -276,7 +276,7 @@ pub fn make_remote_event_blobs_stream( expand: bool, do_decompress: bool, event_chunker_conf: EventChunkerConf, - file_io_buffer_size: FileIoBufferSize, + disk_io_tune: DiskIoTune, node_config: &NodeConfigCached, ) -> Result>, Error> { let shape = match entry.to_shape() { @@ -298,7 +298,7 @@ pub fn make_remote_event_blobs_stream( channel_config.clone(), node_config.node.clone(), node_config.ix, - file_io_buffer_size, + disk_io_tune, event_chunker_conf, expand, do_decompress, @@ -316,7 +316,6 @@ pub async fn make_event_blobs_pipe( Err(e) => return Err(e)?, } } - let file_io_buffer_size = FileIoBufferSize::new(evq.disk_io_buffer_size); let expand = evq.agg_kind.need_expand(); let range = &evq.range; let entry = get_applicable_entry(&evq.range, evq.channel.clone(), node_config).await?; @@ -329,7 +328,7 @@ pub async fn make_event_blobs_pipe( expand, evq.do_decompress, event_chunker_conf, - file_io_buffer_size, + evq.disk_io_tune.clone(), node_config, )?; let s = event_blobs.map(|item| Box::new(item) as Box); @@ -345,7 +344,7 @@ pub async fn make_event_blobs_pipe( expand, evq.do_decompress, event_chunker_conf, - file_io_buffer_size, + evq.disk_io_tune.clone(), node_config, )?; let s = event_blobs.map(|item| Box::new(item) as Box); diff --git a/disk/src/readat.rs b/disk/src/read3.rs similarity index 52% rename from disk/src/readat.rs rename to disk/src/read3.rs index 6570cc3..1daa19b 100644 --- a/disk/src/readat.rs +++ b/disk/src/read3.rs @@ -2,10 +2,13 @@ use bytes::BytesMut; use err::Error; use netpod::log::*; use std::os::unix::prelude::RawFd; -use std::sync::atomic::{AtomicPtr, Ordering}; +use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; use std::sync::Once; +use std::time::{Duration, Instant}; use tokio::sync::{mpsc, oneshot}; +const DO_TRACE: bool = false; + pub struct ReadTask { fd: RawFd, pos: u64, @@ -18,40 +21,50 @@ pub struct ReadResult { pub eof: bool, } -/* -Async code must be able to interact with the Read3 system via async methods. -The async code must be able to enqueue a read in non-blocking fashion. -Since the queue of pending read requests must be bounded, this must be able to async-block. -*/ pub struct Read3 { jobs_tx: mpsc::Sender, rtx: crossbeam::channel::Sender>, + threads_max: AtomicUsize, + can_not_publish: AtomicUsize, } impl Read3 { pub fn get() -> &'static Self { static INIT: Once = Once::new(); INIT.call_once(|| { - let (jtx, jrx) = mpsc::channel(32); - let (rtx, rrx) = crossbeam::channel::bounded(16); - let read3 = Read3 { jobs_tx: jtx, rtx }; + let (jtx, jrx) = mpsc::channel(512); + let (rtx, rrx) = crossbeam::channel::bounded(32); + let read3 = Read3 { + jobs_tx: jtx, + rtx, + threads_max: AtomicUsize::new(32), + can_not_publish: AtomicUsize::new(0), + }; let b = Box::new(read3); let ptr = Box::into_raw(b); - READ3.store(ptr, Ordering::SeqCst); - let ptr = READ3.load(Ordering::SeqCst); + READ3.store(ptr, Ordering::Release); + let ptr = READ3.load(Ordering::Acquire); let h = unsafe { &*ptr }; if let Err(_) = h.rtx.send(jrx) { error!("Read3 INIT: can not enqueue main job reader"); } - for _ in 0..2 { + for wid in 0..128 { let rrx = rrx.clone(); - tokio::task::spawn_blocking(move || h.read_worker(rrx)); + tokio::task::spawn_blocking(move || h.read_worker(wid, rrx)); } }); - let ptr = READ3.load(Ordering::SeqCst); + let ptr = READ3.load(Ordering::Acquire); unsafe { &*ptr } } + pub fn threads_max(&self) -> usize { + self.threads_max.load(Ordering::Acquire) + } + + pub fn set_threads_max(&self, max: usize) { + self.threads_max.store(max, Ordering::Release); + } + pub async fn read(&self, fd: RawFd, pos: u64, count: u64) -> Result { let (tx, rx) = oneshot::channel(); let rt = ReadTask { @@ -69,58 +82,73 @@ impl Read3 { } } - fn read_worker(&self, rrx: crossbeam::channel::Receiver>) { + fn read_worker(&self, wid: u32, rrx: crossbeam::channel::Receiver>) { 'outer: loop { + while wid as usize >= self.threads_max.load(Ordering::Acquire) { + std::thread::sleep(Duration::from_millis(4000)); + } match rrx.recv() { Ok(mut jrx) => match jrx.blocking_recv() { Some(rt) => match self.rtx.send(jrx) { Ok(_) => { + let ts1 = Instant::now(); + let mut prc = 0; + let fd = rt.fd; + let mut rpos = rt.pos; let mut buf = BytesMut::with_capacity(rt.count as usize); let mut writable = rt.count as usize; let rr = unsafe { loop { - info!("do pread fd {} count {} offset {}", rt.fd, writable, rt.pos); - let ec = libc::pread(rt.fd, buf.as_mut_ptr() as _, writable, rt.pos as i64); + if DO_TRACE { + trace!("do pread fd {fd} count {writable} offset {rpos} wid {wid}"); + } + let ec = libc::pread(fd, buf.as_mut_ptr() as _, writable, rpos as i64); + prc += 1; if ec == -1 { let errno = *libc::__errno_location(); if errno == libc::EINVAL { - info!("pread EOF fd {} count {} offset {}", rt.fd, writable, rt.pos); + debug!("pread EOF fd {fd} count {writable} offset {rpos} wid {wid}"); let rr = ReadResult { buf, eof: true }; break Ok(rr); } else { - warn!( - "pread ERROR errno {} fd {} count {} offset {}", - errno, rt.fd, writable, rt.pos - ); + warn!("pread ERROR errno {errno} fd {fd} count {writable} offset {rpos} wid {wid}"); // TODO use a more structured error let e = Error::with_msg_no_trace(format!( - "pread ERROR errno {} fd {} count {} offset {}", - errno, rt.fd, writable, rt.pos + "pread ERROR errno {errno} fd {fd} count {writable} offset {rpos} wid {wid}" )); break Err(e); } } else if ec == 0 { - info!("pread EOF fd {} count {} offset {}", rt.fd, writable, rt.pos); + debug!("pread EOF fd {fd} count {writable} offset {rpos} wid {wid} prc {prc}"); let rr = ReadResult { buf, eof: true }; break Ok(rr); } else if ec > 0 { - buf.set_len(ec as usize); if ec as usize > writable { error!( - "pread TOOLARGE ec {} fd {} count {} offset {}", - ec, rt.fd, writable, rt.pos + "pread TOOLARGE ec {ec} fd {fd} count {writable} offset {rpos} wid {wid} prc {prc}" ); break 'outer; - } - writable -= ec as usize; - if writable == 0 { - let rr = ReadResult { buf, eof: false }; - break Ok(rr); + } else { + rpos += ec as u64; + writable -= ec as usize; + buf.set_len(buf.len() + (ec as usize)); + if writable == 0 { + let ts2 = Instant::now(); + let dur = ts2.duration_since(ts1); + let dms = 1e3 * dur.as_secs_f32(); + if DO_TRACE { + trace!( + "pread DONE ec {ec} fd {fd} wid {wid} prc {prc} dms {dms:.2}" + ); + } + let rr = ReadResult { buf, eof: false }; + break Ok(rr); + } } } else { error!( - "pread UNEXPECTED ec {} fd {} count {} offset {}", - ec, rt.fd, writable, rt.pos + "pread UNEXPECTED ec {} fd {} count {} offset {rpos} wid {wid}", + ec, rt.fd, writable ); break 'outer; } @@ -129,20 +157,23 @@ impl Read3 { match rt.rescell.send(rr) { Ok(_) => {} Err(_) => { - error!("can not publish the read result"); - break 'outer; + self.can_not_publish.fetch_add(1, Ordering::AcqRel); + warn!("can not publish the read result wid {wid}"); } } } Err(e) => { - error!("can not return the job receiver: {e}"); + error!("can not return the job receiver: wid {wid} {e}"); break 'outer; } }, - None => break 'outer, + None => { + let _ = self.rtx.send(jrx); + break 'outer; + } }, Err(e) => { - error!("read_worker sees: {e}"); + error!("read_worker sees: wid {wid} {e}"); break 'outer; } } diff --git a/dq/src/bin/dq.rs b/dq/src/bin/dq.rs index 8867129..66aaf5c 100644 --- a/dq/src/bin/dq.rs +++ b/dq/src/bin/dq.rs @@ -72,10 +72,8 @@ pub fn main() -> Result<(), Error> { }; eprintln!("Read config: {:?}", config); let file = File::open(&sub.datafile).await?; - let inp = Box::pin(disk::file_content_stream( - file, - netpod::FileIoBufferSize::new(1024 * 16), - )); + let disk_io_tune = netpod::DiskIoTune::default(); + let inp = Box::pin(disk::file_content_stream(file, disk_io_tune)); let ce = &config.entries[0]; let channel_config = ChannelConfig { channel: Channel { diff --git a/dq/src/dq.rs b/dq/src/dq.rs index 16e9281..ab2a0fd 100644 --- a/dq/src/dq.rs +++ b/dq/src/dq.rs @@ -381,7 +381,7 @@ pub async fn convert(convert_params: ConvertParams) -> Result<(), Error> { end: u64::MAX, }, agg_kind: AggKind::Plain, - disk_io_buffer_size: 1024 * 4, + disk_io_tune: netpod::DiskIoTune::default(), do_decompress: true, }; let f1 = pbr.into_file(); diff --git a/httpret/src/api1.rs b/httpret/src/api1.rs index 9675d19..025acae 100644 --- a/httpret/src/api1.rs +++ b/httpret/src/api1.rs @@ -11,7 +11,7 @@ use items::{RangeCompletableItem, Sitemty, StreamItem}; use itertools::Itertools; use netpod::query::RawEventsQuery; use netpod::timeunits::SEC; -use netpod::{log::*, ACCEPT_ALL}; +use netpod::{log::*, DiskIoTune, ACCEPT_ALL}; use netpod::{ByteSize, Channel, FileIoBufferSize, NanoRange, NodeConfigCached, PerfOpts, Shape, APP_OCTET}; use netpod::{ChannelSearchQuery, ChannelSearchResult, ProxyConfig, APP_JSON}; use parse::channelconfig::{ @@ -477,12 +477,27 @@ pub struct Api1Query { channels: Vec, range: Api1Range, // All following parameters are private and not to be used - #[serde(rename = "fileIoBufferSize", default)] + #[serde(default)] file_io_buffer_size: Option, #[serde(default)] decompress: bool, - #[serde(rename = "eventsMax", default = "u64_max", skip_serializing_if = "is_u64_max")] + #[serde(default = "u64_max", skip_serializing_if = "is_u64_max")] events_max: u64, + #[serde(default)] + io_queue_len: u64, +} + +impl Api1Query { + pub fn disk_io_tune(&self) -> DiskIoTune { + let mut k = DiskIoTune::default(); + if let Some(x) = &self.file_io_buffer_size { + k.read_buffer_len = x.0; + } + if self.io_queue_len != 0 { + k.read_queue_len = self.io_queue_len as usize; + } + k + } } fn u64_max() -> u64 { @@ -511,7 +526,7 @@ pub struct DataApiPython3DataStream { chan_ix: usize, chan_stream: Option> + Send>>>, config_fut: Option> + Send>>>, - file_io_buffer_size: FileIoBufferSize, + disk_io_tune: DiskIoTune, do_decompress: bool, #[allow(unused)] event_count: u64, @@ -526,7 +541,7 @@ impl DataApiPython3DataStream { pub fn new( range: NanoRange, channels: Vec, - file_io_buffer_size: FileIoBufferSize, + disk_io_tune: DiskIoTune, do_decompress: bool, events_max: u64, status_id: String, @@ -539,7 +554,7 @@ impl DataApiPython3DataStream { chan_ix: 0, chan_stream: None, config_fut: None, - file_io_buffer_size, + disk_io_tune, do_decompress, event_count: 0, events_max, @@ -712,7 +727,7 @@ impl Stream for DataApiPython3DataStream { channel, range: self.range.clone(), agg_kind: netpod::AggKind::EventBlobs, - disk_io_buffer_size: self.file_io_buffer_size.0, + disk_io_tune: self.disk_io_tune.clone(), do_decompress: self.do_decompress, }; let perf_opts = PerfOpts { inmem_bufcap: 1024 * 4 }; @@ -728,7 +743,7 @@ impl Stream for DataApiPython3DataStream { evq.agg_kind.need_expand(), evq.do_decompress, event_chunker_conf, - self.file_io_buffer_size.clone(), + self.disk_io_tune.clone(), &self.node_config, )?; Box::pin(s) as Pin> + Send>> @@ -792,6 +807,13 @@ impl Stream for DataApiPython3DataStream { } else { if self.chan_ix >= self.channels.len() { self.data_done = true; + { + let n = Instant::now(); + let mut sb = crate::status_board().unwrap(); + sb.mark_alive(&self.status_id); + self.ping_last = n; + sb.mark_ok(&self.status_id); + } continue; } else { let channel = self.channels[self.chan_ix].clone(); @@ -850,6 +872,10 @@ impl Api1EventsBinaryHandler { .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))? .to_owned(); let body_data = hyper::body::to_bytes(body).await?; + info!( + "Api1EventsBinaryHandler query json: {}", + String::from_utf8_lossy(&body_data) + ); let qu: Api1Query = if let Ok(qu) = serde_json::from_slice(&body_data) { qu } else { @@ -888,17 +914,12 @@ impl Api1EventsBinaryHandler { name: x.clone(), }) .collect(); - let file_io_buffer_size = if let Some(k) = qu.file_io_buffer_size { - k - } else { - node_config.node_config.cluster.file_io_buffer_size.clone() - }; // TODO use a better stream protocol with built-in error delivery. let status_id = super::status_board()?.new_status_id(); let s = DataApiPython3DataStream::new( range.clone(), chans, - file_io_buffer_size, + qu.disk_io_tune().clone(), qu.decompress, qu.events_max, status_id.clone(), @@ -939,7 +960,7 @@ impl RequestStatusHandler { if accept != APP_JSON && accept != ACCEPT_ALL { // TODO set the public error code and message and return Err(e). let e = Error::with_public_msg(format!("Unsupported Accept: {:?}", accept)); - error!("{e:?}"); + error!("{e}"); return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); } let _body_data = hyper::body::to_bytes(body).await?; diff --git a/httpret/src/download.rs b/httpret/src/download.rs new file mode 100644 index 0000000..462f8fa --- /dev/null +++ b/httpret/src/download.rs @@ -0,0 +1,91 @@ +use std::pin::Pin; + +use crate::err::Error; +use crate::response; +use futures_util::{Stream, TryStreamExt}; +use http::{Method, StatusCode}; +use hyper::{Body, Request, Response}; +use netpod::{get_url_query_pairs, DiskIoTune, FromUrl, NodeConfigCached, ReadSys}; +use url::Url; + +#[derive(Clone, Debug)] +pub struct DownloadQuery { + disk_io_tune: DiskIoTune, +} + +impl FromUrl for DownloadQuery { + fn from_url(url: &Url) -> Result { + let pairs = get_url_query_pairs(url); + let read_sys = pairs + .get("ReadSys") + .map(|x| x as &str) + .unwrap_or("TokioAsyncRead") + .into(); + let read_buffer_len = pairs + .get("ReadBufferLen") + .map(|x| x as &str) + .unwrap_or("xx") + .parse() + .unwrap_or(1024 * 4); + let read_queue_len = pairs + .get("ReadQueueLen") + .map(|x| x as &str) + .unwrap_or("xx") + .parse() + .unwrap_or(8); + let disk_io_tune = DiskIoTune { + read_sys, + read_buffer_len, + read_queue_len, + }; + let ret = Self { disk_io_tune }; + Ok(ret) + } +} + +pub struct DownloadHandler {} + +impl DownloadHandler { + pub fn path_prefix() -> &'static str { + "/api/4/test/download/" + } + + pub fn handler(req: &Request) -> Option { + if req.uri().path().starts_with(Self::path_prefix()) { + Some(Self {}) + } else { + None + } + } + + pub async fn get(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + let (head, _body) = req.into_parts(); + let p2 = &head.uri.path()[Self::path_prefix().len()..]; + let base = match &node_config.node.sf_databuffer { + Some(k) => k.data_base_path.clone(), + None => "/UNDEFINED".into(), + }; + let url = url::Url::parse(&format!("http://dummy{}", head.uri))?; + let query = DownloadQuery::from_url(&url)?; + let file = tokio::fs::OpenOptions::new().read(true).open(base.join(p2)).await?; + let s = match query.disk_io_tune.read_sys { + ReadSys::TokioAsyncRead => { + let s = disk::file_content_stream(file, query.disk_io_tune.clone()).map_ok(|x| x.into_buf()); + Box::pin(s) as Pin + Send>> + } + ReadSys::Read3 => { + let s = disk::file_content_stream_2(file, query.disk_io_tune.clone()).map_ok(|x| x.into_buf()); + Box::pin(s) as _ + } + }; + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::wrap_stream(s))?) + } + + pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + if req.method() == Method::GET { + self.get(req, node_config).await + } else { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + } + } +} diff --git a/httpret/src/evinfo.rs b/httpret/src/evinfo.rs index f3485c6..cd60a75 100644 --- a/httpret/src/evinfo.rs +++ b/httpret/src/evinfo.rs @@ -145,11 +145,14 @@ impl ChannelExecFunction for EvInfoFunc { let _ = byte_order; let _ = event_value_shape; let perf_opts = PerfOpts { inmem_bufcap: 4096 }; + // TODO let PlainEventsJsonQuery provide the tune + let mut disk_io_tune = netpod::DiskIoTune::default(); + disk_io_tune.read_buffer_len = self.query.disk_io_buffer_size(); let evq = RawEventsQuery { channel: self.query.channel().clone(), range: self.query.range().clone(), agg_kind: AggKind::Plain, - disk_io_buffer_size: self.query.disk_io_buffer_size(), + disk_io_tune, do_decompress: true, }; diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index 2e3dd57..764dd42 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -1,6 +1,7 @@ pub mod api1; pub mod channelarchiver; pub mod channelconfig; +pub mod download; pub mod err; pub mod events; pub mod evinfo; @@ -8,6 +9,7 @@ pub mod gather; pub mod proxy; pub mod pulsemap; pub mod search; +pub mod settings; use crate::err::Error; use crate::gather::gather_get_json; @@ -301,6 +303,10 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } + } else if let Some(h) = download::DownloadHandler::handler(&req) { + h.handle(req, &node_config).await + } else if let Some(h) = settings::SettingsThreadsMaxHandler::handler(&req) { + h.handle(req, &node_config).await } else if let Some(h) = api1::Api1EventsBinaryHandler::handler(&req) { h.handle(req, &node_config).await } else if let Some(h) = evinfo::EventInfoScan::handler(&req) { diff --git a/httpret/src/settings.rs b/httpret/src/settings.rs new file mode 100644 index 0000000..04ebf33 --- /dev/null +++ b/httpret/src/settings.rs @@ -0,0 +1,62 @@ +use crate::err::Error; +use crate::response; +use http::{Method, StatusCode}; +use hyper::{Body, Request, Response}; +use netpod::log::*; +use netpod::NodeConfigCached; +use netpod::{ACCEPT_ALL, APP_JSON}; + +pub struct SettingsThreadsMaxHandler {} + +impl SettingsThreadsMaxHandler { + pub fn path_prefix() -> &'static str { + "/api/4/settings/read3/threads_max" + } + + pub fn handler(req: &Request) -> Option { + if req.uri().path().starts_with(Self::path_prefix()) { + Some(Self {}) + } else { + None + } + } + + pub async fn put(&self, req: Request, _node_config: &NodeConfigCached) -> Result, Error> { + let (head, body) = req.into_parts(); + let accept = head + .headers + .get(http::header::ACCEPT) + .map_or(Ok(ACCEPT_ALL), |k| k.to_str()) + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))? + .to_owned(); + if accept != APP_JSON && accept != ACCEPT_ALL { + // TODO set the public error code and message and return Err(e). + let e = Error::with_public_msg(format!("Unsupported Accept: {:?}", accept)); + error!("{e}"); + return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); + } + let body = hyper::body::to_bytes(body).await?; + //let threads_max: usize = head.uri.path()[Self::path_prefix().len()..].parse()?; + let threads_max: usize = String::from_utf8_lossy(&body).parse()?; + info!("threads_max {threads_max}"); + disk::read3::Read3::get().set_threads_max(threads_max); + let ret = response(StatusCode::NO_CONTENT).body(Body::empty())?; + Ok(ret) + } + + pub async fn get(&self, _req: Request, _node_config: &NodeConfigCached) -> Result, Error> { + let threads_max = disk::read3::Read3::get().threads_max(); + let ret = response(StatusCode::OK).body(Body::from(format!("{threads_max}")))?; + Ok(ret) + } + + pub async fn handle(&self, req: Request, node_config: &NodeConfigCached) -> Result, Error> { + if req.method() == Method::GET { + self.get(req, node_config).await + } else if req.method() == Method::PUT { + self.put(req, node_config).await + } else { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + } + } +} diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index e16a362..8bfcc09 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -1332,6 +1332,54 @@ impl Default for FileIoBufferSize { } } +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum ReadSys { + TokioAsyncRead, + Read3, +} + +impl ReadSys { + pub fn default() -> Self { + Self::TokioAsyncRead + } +} + +impl From<&str> for ReadSys { + fn from(k: &str) -> Self { + if k == "TokioAsyncRead" { + Self::TokioAsyncRead + } else if k == "Read3" { + Self::Read3 + } else { + Self::default() + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct DiskIoTune { + pub read_sys: ReadSys, + pub read_buffer_len: usize, + pub read_queue_len: usize, +} + +impl DiskIoTune { + pub fn default_for_testing() -> Self { + Self { + read_sys: ReadSys::default(), + read_buffer_len: 1024 * 4, + read_queue_len: 4, + } + } + pub fn default() -> Self { + Self { + read_sys: ReadSys::default(), + read_buffer_len: 1024 * 4, + read_queue_len: 4, + } + } +} + pub fn channel_from_pairs(pairs: &BTreeMap) -> Result { let ret = Channel { backend: pairs diff --git a/netpod/src/query.rs b/netpod/src/query.rs index e050e93..1ec0eb4 100644 --- a/netpod/src/query.rs +++ b/netpod/src/query.rs @@ -1,8 +1,8 @@ -use crate::log::*; use crate::{ channel_from_pairs, get_url_query_pairs, AggKind, AppendToUrl, ByteSize, Channel, FromUrl, HasBackend, HasTimeout, NanoRange, ToNanos, }; +use crate::{log::*, DiskIoTune}; use chrono::{DateTime, TimeZone, Utc}; use err::Error; use serde::{Deserialize, Serialize}; @@ -71,7 +71,7 @@ pub struct RawEventsQuery { pub channel: Channel, pub range: NanoRange, pub agg_kind: AggKind, - pub disk_io_buffer_size: usize, + pub disk_io_tune: DiskIoTune, pub do_decompress: bool, } diff --git a/taskrun/src/append.rs b/taskrun/src/append.rs index 433036d..5578186 100644 --- a/taskrun/src/append.rs +++ b/taskrun/src/append.rs @@ -25,37 +25,68 @@ impl Buffer { self.wp - self.rp } + pub fn check_invariant(&self) { + if self.wp > self.buf.len() { + eprintln!("ERROR wp {} rp {}", self.wp, self.rp); + } + if self.rp > self.wp { + eprintln!("ERROR wp {} rp {}", self.wp, self.rp); + } + assert!(self.wp <= self.buf.len()); + assert!(self.rp <= self.wp); + } + pub fn writable(&mut self) -> &mut [u8] { + self.check_invariant(); self.wrap_if_needed(); &mut self.buf[self.wp..] } pub fn readable(&self) -> &[u8] { + self.check_invariant(); &self.buf[self.rp..self.wp] } pub fn advance(&mut self, c: usize) { + self.check_invariant(); + if c > self.len() { + eprintln!("ERROR advance wp {} rp {} c {}", self.wp, self.rp, c); + } assert!(c <= self.len()); self.rp += c; } pub fn inc_wp(&mut self, c: usize) { + self.check_invariant(); + if c > self.buf.len() - self.wp { + eprintln!("ERROR inc_wp wp {} rp {} c {}", self.wp, self.rp, c); + } assert!(c <= self.buf.len() - self.wp); self.wp += c; } fn wrap_if_needed(&mut self) { - if self.rp == self.wp && self.rp != 0 { + self.check_invariant(); + //eprintln!("wrap_if_needed wp {} rp {}", self.wp, self.rp); + if self.wp == 0 { + } else if self.rp == self.wp { self.rp = 0; self.wp = 0; - } else if self.rp > BUFFER_CAP / 4 * 3 { - assert!(self.wp < BUFFER_CAP); - assert!(self.rp <= self.wp); + } else if self.rp > self.buf.len() / 4 * 3 { + if self.rp >= self.wp { + eprintln!("ERROR wrap_if_needed wp {} rp {}", self.wp, self.rp); + } + assert!(self.rp < self.wp); + let ll = self.len(); unsafe { let src = &self.buf[self.rp..][0] as *const u8; let dst = &mut self.buf[..][0] as *mut u8; - std::ptr::copy(src, dst, self.len()); + std::ptr::copy(src, dst, ll); } + self.rp = 0; + self.wp = ll; + } else if self.wp == self.buf.len() { + eprintln!("ERROR no more space in buffer"); } } } @@ -166,8 +197,18 @@ pub fn append_inner(dirname: &str, mut stdin: Stdin) -> Result<(), Error> { if false { write!(&mut fout, "[APPEND-WRITABLE] {} writable bytes\n", b.len())?; } + if b.len() < 1 { + eprintln!("ERROR attempt to read with zero length buffer"); + } let n1 = stdin.read(b)?; buf.inc_wp(n1); + if false { + eprintln!( + "{} bytes read from stdin, total readable {} bytes", + n1, + buf.readable().len() + ); + } if false { write!( &mut fout, @@ -178,6 +219,9 @@ pub fn append_inner(dirname: &str, mut stdin: Stdin) -> Result<(), Error> { } match parse_lines(buf.readable()) { Ok((lines, n2)) => { + if false { + eprintln!("parse_lines Ok n2 {n2} lines len {}", lines.len()); + } if false { write!(&mut fout, "[APPEND-PARSED-LINES]: {}\n", lines.len())?; } @@ -190,7 +234,8 @@ pub fn append_inner(dirname: &str, mut stdin: Stdin) -> Result<(), Error> { buf.advance(n2); } Err(e) => { - write!(&mut fout, "[APPEND-PARSE-ERROR]: {:?}\n", e)?; + eprintln!("ERROR parse fail: {e}"); + write!(&mut fout, "[APPEND-PARSE-ERROR]: {e}\n")?; return Ok(()); } } @@ -232,6 +277,7 @@ pub fn append_inner(dirname: &str, mut stdin: Stdin) -> Result<(), Error> { }; } if n1 == 0 { + eprintln!("break because n1 == 0"); break Ok(()); } } @@ -239,12 +285,25 @@ pub fn append_inner(dirname: &str, mut stdin: Stdin) -> Result<(), Error> { pub fn append(dirname: &str, stdin: Stdin) -> Result<(), Error> { match append_inner(dirname, stdin) { - Ok(k) => Ok(k), + Ok(k) => { + eprintln!("append_inner has returned"); + Ok(k) + } Err(e) => { + eprintln!("ERROR append {e:?}"); let dir = PathBuf::from(dirname); let mut fout = open_latest_or_new(&dir)?; - let _ = write!(fout, "ERROR in append_inner: {:?}", e); + let _ = write!(fout, "ERROR in append_inner: {e:?}"); Err(e) } } } + +#[test] +fn test_vec_index() { + let mut buf = vec![0u8; BUFFER_CAP]; + let a = &mut buf[BUFFER_CAP - 1..BUFFER_CAP]; + a[0] = 123; + let a = &mut buf[BUFFER_CAP..]; + assert_eq!(a.len(), 0); +} diff --git a/taskrun/src/taskrun.rs b/taskrun/src/taskrun.rs index 6efc742..03b7354 100644 --- a/taskrun/src/taskrun.rs +++ b/taskrun/src/taskrun.rs @@ -104,7 +104,7 @@ pub fn tracing_init() { "streams::rangefilter=info", "items::eventvalues=info", "items::xbinnedscalarevents=info", - "disk::binned=info", + "disk=debug", "nodenet::conn=info", "daqbuffer::test=info", "dq=info",