diff --git a/disk/src/binned.rs b/disk/src/binned.rs index bfcebe7..7d55ff9 100644 --- a/disk/src/binned.rs +++ b/disk/src/binned.rs @@ -355,6 +355,7 @@ where T: ReadableFromFile, { buf: Vec, + all: Vec, file: Option, _marker: std::marker::PhantomData, } @@ -365,7 +366,9 @@ where { fn new(file: File) -> Self { Self { - buf: vec![], + // TODO make buffer size a parameter: + buf: vec![0; 4096], + all: vec![], file: Some(file), _marker: std::marker::PhantomData::default(), } @@ -380,20 +383,22 @@ where fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { use Poll::*; - 'outer: loop { - // TODO make buffer size a parameter: - let mut buf = vec![0; 4096]; + let mut buf = std::mem::replace(&mut self.buf, Vec::new()); + let ret = 'outer: loop { let mut dst = ReadBuf::new(&mut buf); + if dst.remaining() == 0 || dst.capacity() == 0 { + break Ready(Err(Error::with_msg("bad read buffer"))); + } let fp = self.file.as_mut().unwrap(); let f = Pin::new(fp); break match File::poll_read(f, cx, &mut dst) { Ready(res) => match res { Ok(_) => { if dst.filled().len() > 0 { - self.buf.extend_from_slice(&mut buf); + self.all.extend_from_slice(dst.filled()); continue 'outer; } else { - match T::from_buf(&mut self.buf) { + match T::from_buf(&mut self.all) { Ok(item) => Ready(Ok(StreamItem::DataItem(RangeCompletableItem::Data(item)))), Err(e) => Ready(Err(e)), } @@ -403,7 +408,9 @@ where }, Pending => Pending, }; - } + }; + self.buf = buf; + ret } } diff --git a/retrieval/src/client.rs b/retrieval/src/client.rs index 695b73f..26df6a8 100644 --- a/retrieval/src/client.rs +++ b/retrieval/src/client.rs @@ -47,6 +47,10 @@ pub async fn get_binned( disk_stats_every_kb: u32, ) -> Result<(), Error> { info!("------- get_binned client"); + info!("channel {}", channel_name); + info!("beg {}", beg_date); + info!("end {}", end_date); + info!("-------"); let t1 = Utc::now(); let date_fmt = "%Y-%m-%dT%H:%M:%S.%3fZ"; let uri = format!( @@ -65,7 +69,6 @@ pub async fn get_binned( cache_usage.query_param_value(), disk_stats_every_kb, ); - info!("get_binned uri {:?}", uri); let req = hyper::Request::builder() .method(http::Method::GET) .uri(uri) @@ -78,7 +81,15 @@ pub async fn get_binned( let (head, body) = res.into_parts(); let buf = hyper::body::to_bytes(body).await?; let s = String::from_utf8_lossy(&buf); - return Err(Error::with_msg(format!("Server error {:?}\n---------------------- message from http body:\n{}\n---------------------- end of http body", head, s))); + return Err(Error::with_msg(format!( + concat!( + "Server error {:?}\n", + "---------------------- message from http body:\n", + "{}\n", + "---------------------- end of http body", + ), + head, s + ))); } let perf_opts = PerfOpts { inmem_bufcap: 512 }; let s1 = disk::cache::HttpBodyAsAsyncRead::new(res);