Cleanup and load test

This commit is contained in:
Dominik Werder
2023-07-21 23:06:15 +02:00
parent 9314c58a9b
commit 7c26b72537
16 changed files with 171 additions and 118 deletions

View File

@@ -19,7 +19,6 @@ pub mod streamlog;
pub use parse;
use bytes::Bytes;
use bytes::BytesMut;
use err::Error;
use futures_util::future::FusedFuture;
@@ -31,7 +30,6 @@ use netpod::log::*;
use netpod::ByteOrder;
use netpod::DiskIoTune;
use netpod::DtNano;
use netpod::Node;
use netpod::ReadSys;
use netpod::ScalarType;
use netpod::SfDbChannel;
@@ -85,64 +83,6 @@ pub struct AggQuerySingleChannel {
pub buffer_size: u32,
}
// TODO transform this into a self-test or remove.
pub async fn read_test_1(query: &AggQuerySingleChannel, node: Node) -> Result<netpod::BodyStream, Error> {
let path = paths::datapath(query.timebin as u64, &query.channel_config, 0, &node);
debug!("try path: {:?}", path);
let fin = OpenOptions::new().read(true).open(path).await?;
let meta = fin.metadata().await;
debug!("file meta {:?}", meta);
let stream = netpod::BodyStream {
inner: Box::new(FileReader {
file: fin,
nreads: 0,
buffer_size: query.buffer_size,
}),
};
Ok(stream)
}
struct FileReader {
file: tokio::fs::File,
nreads: u32,
buffer_size: u32,
}
impl Stream for FileReader {
type Item = Result<Bytes, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
err::todo();
// TODO remove if no longer used?
let blen = self.buffer_size as usize;
let mut buf2 = BytesMut::with_capacity(blen);
buf2.resize(buf2.capacity(), 0);
if buf2.as_mut().len() != blen {
panic!("logic");
}
let mut buf = tokio::io::ReadBuf::new(buf2.as_mut());
if buf.filled().len() != 0 {
panic!("logic");
}
match Pin::new(&mut self.file).poll_read(cx, &mut buf) {
Poll::Ready(Ok(_)) => {
let rlen = buf.filled().len();
if rlen == 0 {
Poll::Ready(None)
} else {
if rlen != blen {
info!("short read {} of {}", buf.filled().len(), blen);
}
self.nreads += 1;
Poll::Ready(Some(Ok(buf2.freeze())))
}
}
Poll::Ready(Err(e)) => Poll::Ready(Some(Err(Error::from(e)))),
Poll::Pending => Poll::Pending,
}
}
}
pub struct Fopen1 {
#[allow(dead_code)]
opts: OpenOptions,
@@ -196,6 +136,7 @@ unsafe impl Send for Fopen1 {}
pub struct FileContentStream {
file: File,
cap: usize,
disk_io_tune: DiskIoTune,
read_going: bool,
buf: BytesMut,
@@ -203,25 +144,49 @@ pub struct FileContentStream {
nlog: usize,
done: bool,
complete: bool,
total_read: usize,
}
impl Drop for FileContentStream {
fn drop(&mut self) {
debug!("FileContentStream total_read {} cap {}", self.total_read, self.cap);
}
}
impl FileContentStream {
pub fn type_name() -> &'static str {
pub fn self_name() -> &'static str {
std::any::type_name::<Self>()
}
pub fn new(file: File, disk_io_tune: DiskIoTune) -> Self {
let cap = disk_io_tune.read_buffer_len;
let buf = Self::prepare_buf(cap);
Self {
file,
cap,
disk_io_tune,
read_going: false,
buf: BytesMut::new(),
buf,
ts1: Instant::now(),
nlog: 0,
done: false,
complete: false,
total_read: 0,
}
}
fn prepare_buf(cap: usize) -> BytesMut {
let mut buf = BytesMut::with_capacity(cap);
unsafe {
// SAFETY if we got here, then we have the required capacity
buf.set_len(buf.capacity());
}
buf
}
fn mut_file_and_buf(&mut self) -> (&mut File, &mut BytesMut) {
(&mut self.file, &mut self.buf)
}
}
impl Stream for FileContentStream {
@@ -231,29 +196,31 @@ impl Stream for FileContentStream {
use Poll::*;
loop {
break if self.complete {
panic!("{} poll_next on complete", Self::type_name())
panic!("{} poll_next on complete", Self::self_name())
} else if self.done {
self.complete = true;
Ready(None)
} else {
let mut buf = if !self.read_going {
if !self.read_going {
self.read_going = true;
self.ts1 = Instant::now();
let mut buf = BytesMut::new();
buf.resize(self.disk_io_tune.read_buffer_len, 0);
buf
} else {
mem::replace(&mut self.buf, BytesMut::new())
};
let mutsl = buf.as_mut();
let mut rb = ReadBuf::new(mutsl);
let f1 = &mut self.file;
let f2 = Pin::new(f1);
let pollres = AsyncRead::poll_read(f2, cx, &mut rb);
match pollres {
Ready(Ok(_)) => {
// TODO remove
// std::thread::sleep(Duration::from_millis(10));
}
let (file, buf) = self.mut_file_and_buf();
let mut rb = ReadBuf::new(buf.as_mut());
futures_util::pin_mut!(file);
match AsyncRead::poll_read(file, cx, &mut rb) {
Ready(Ok(())) => {
let nread = rb.filled().len();
if nread < rb.capacity() {
debug!("read less than capacity {} vs {}", nread, rb.capacity());
}
let cap = self.cap;
let mut buf = mem::replace(&mut self.buf, Self::prepare_buf(cap));
buf.truncate(nread);
self.read_going = false;
self.total_read += nread;
let ts2 = Instant::now();
if nread == 0 {
let ret = FileChunkRead::with_buf_dur(buf, ts2.duration_since(self.ts1));
@@ -263,7 +230,7 @@ impl Stream for FileContentStream {
let ret = FileChunkRead::with_buf_dur(buf, ts2.duration_since(self.ts1));
if false && self.nlog < 6 {
self.nlog += 1;
info!("{:?} ret {:?}", self.disk_io_tune, ret);
debug!("{:?} ret {:?}", self.disk_io_tune, ret);
}
Ready(Some(Ok(ret)))
}
@@ -286,6 +253,7 @@ fn start_read5(
disk_io_tune: DiskIoTune,
reqid: String,
) -> Result<(), Error> {
warn!("start_read5");
let fut = async move {
let mut file = file;
let pos_beg = match file.stream_position().await {
@@ -760,9 +728,13 @@ pub fn file_content_stream<S>(
where
S: Into<String>,
{
if let ReadSys::TokioAsyncRead = disk_io_tune.read_sys {
} else {
warn!("reading via {:?}", disk_io_tune.read_sys);
}
let reqid = reqid.into();
debug!("file_content_stream disk_io_tune {disk_io_tune:?}");
match &disk_io_tune.read_sys {
match disk_io_tune.read_sys {
ReadSys::TokioAsyncRead => {
let s = FileContentStream::new(file, disk_io_tune);
Box::pin(s) as Pin<Box<dyn Stream<Item = _> + Send>>

View File

@@ -28,7 +28,7 @@ pub struct MergedBlobsFromRemotes {
impl MergedBlobsFromRemotes {
pub fn new(subq: EventsSubQuery, cluster: Cluster) -> Self {
debug!("MergedBlobsFromRemotes subq {:?}", subq);
debug!("MergedBlobsFromRemotes::new subq {:?}", subq);
let mut tcp_establish_futs = Vec::new();
for node in &cluster.nodes {
let f = x_processed_event_blobs_stream_from_node(subq.clone(), node.clone());

View File

@@ -175,7 +175,7 @@ pub async fn make_event_blobs_pipe_real(
fetch_info.clone(),
expand,
event_chunker_conf,
DiskIoTune::default(),
DiskIoTune::default().with_read_buffer_len(subq.buf_len_disk_io()),
reqctx,
node_config,
)?;
@@ -186,7 +186,7 @@ pub async fn make_event_blobs_pipe_real(
fetch_info.clone(),
expand,
event_chunker_conf,
DiskIoTune::default(),
DiskIoTune::default().with_read_buffer_len(subq.buf_len_disk_io()),
reqctx,
node_config,
)?;