From fd3f22fccbb08e833919df481177896bdb552397 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 20 Jan 2023 16:14:52 +0100 Subject: [PATCH] Increase index frequency --- disk/src/disk.rs | 12 ++++++++---- httpret/src/pulsemap.rs | 18 +++++++++++++----- 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/disk/src/disk.rs b/disk/src/disk.rs index a0370b1..8952b63 100644 --- a/disk/src/disk.rs +++ b/disk/src/disk.rs @@ -234,9 +234,13 @@ impl Stream for FileContentStream { } } -fn start_read5(file: File, tx: async_channel::Sender>) -> Result<(), Error> { +fn start_read5( + file: File, + tx: async_channel::Sender>, + disk_io_tune: DiskIoTune, +) -> Result<(), Error> { let fut = async move { - info!("start_read5 BEGIN"); + info!("start_read5 BEGIN {disk_io_tune:?}"); let mut file = file; loop { let mut buf = BytesMut::new(); @@ -267,9 +271,9 @@ pub struct FileContentStream5 { } impl FileContentStream5 { - pub fn new(file: File, _disk_io_tune: DiskIoTune) -> Result { + pub fn new(file: File, disk_io_tune: DiskIoTune) -> Result { let (tx, rx) = async_channel::bounded(32); - start_read5(file, tx)?; + start_read5(file, tx, disk_io_tune)?; let ret = Self { rx }; Ok(ret) } diff --git a/httpret/src/pulsemap.rs b/httpret/src/pulsemap.rs index 12b54eb..7ea1d60 100644 --- a/httpret/src/pulsemap.rs +++ b/httpret/src/pulsemap.rs @@ -85,7 +85,7 @@ fn timer_channel_names() -> Vec { all } -#[derive(Debug)] +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] enum MapfilePath { Scalar(PathBuf), Index(PathBuf, PathBuf), @@ -381,9 +381,17 @@ impl IndexFullHttpFunction { ) -> Result { let mut msg = format!("Index channel {}", channel_name); let files = datafiles_for_channel(channel_name.clone(), node_config).await?; + let mut files = files; + files.sort(); + let files = files; msg = format!("{}\n{:?}", msg, files); let mut latest_pair = (0, 0); - for mp in files { + let n1 = files.len().min(3); + let m1 = files.len() - n1; + for ch in &files[m1..] { + info!(" index over {:?}", ch); + } + for mp in files[m1..].into_iter() { match mp { MapfilePath::Scalar(path) => { let splitted: Vec<_> = path.to_str().unwrap().split("/").collect(); @@ -513,7 +521,7 @@ async fn update_task(do_abort: Arc, node_config: NodeConfigCached) info!("update_task break A"); break; } - tokio::time::sleep(Duration::from_millis(40000 + (0x3fff & commonio::tokio_rand().await?))).await; + tokio::time::sleep(Duration::from_millis(10000 + (0x3fff & commonio::tokio_rand().await?))).await; if do_abort.load(Ordering::SeqCst) != 0 { info!("update_task break B"); break; @@ -523,11 +531,11 @@ async fn update_task(do_abort: Arc, node_config: NodeConfigCached) Ok(_) => {} Err(e) => { error!("issue during last update task: {:?}", e); - tokio::time::sleep(Duration::from_millis(50000)).await; + tokio::time::sleep(Duration::from_millis(20000)).await; } } let ts2 = Instant::now(); - let dt = ts2.duration_since(ts1).as_secs_f64() * 1e3; + let dt = ts2.duration_since(ts1).as_secs_f32() * 1e3; info!("Done update task {:.0} ms", dt); } Ok(())