Increase index frequency

This commit is contained in:
Dominik Werder
2023-01-20 16:14:52 +01:00
parent 8495853f8e
commit fd3f22fccb
2 changed files with 21 additions and 9 deletions

View File

@@ -234,9 +234,13 @@ impl Stream for FileContentStream {
} }
} }
fn start_read5(file: File, tx: async_channel::Sender<Result<FileChunkRead, Error>>) -> Result<(), Error> { fn start_read5(
file: File,
tx: async_channel::Sender<Result<FileChunkRead, Error>>,
disk_io_tune: DiskIoTune,
) -> Result<(), Error> {
let fut = async move { let fut = async move {
info!("start_read5 BEGIN"); info!("start_read5 BEGIN {disk_io_tune:?}");
let mut file = file; let mut file = file;
loop { loop {
let mut buf = BytesMut::new(); let mut buf = BytesMut::new();
@@ -267,9 +271,9 @@ pub struct FileContentStream5 {
} }
impl FileContentStream5 { impl FileContentStream5 {
pub fn new(file: File, _disk_io_tune: DiskIoTune) -> Result<Self, Error> { pub fn new(file: File, disk_io_tune: DiskIoTune) -> Result<Self, Error> {
let (tx, rx) = async_channel::bounded(32); let (tx, rx) = async_channel::bounded(32);
start_read5(file, tx)?; start_read5(file, tx, disk_io_tune)?;
let ret = Self { rx }; let ret = Self { rx };
Ok(ret) Ok(ret)
} }

View File

@@ -85,7 +85,7 @@ fn timer_channel_names() -> Vec<String> {
all all
} }
#[derive(Debug)] #[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
enum MapfilePath { enum MapfilePath {
Scalar(PathBuf), Scalar(PathBuf),
Index(PathBuf, PathBuf), Index(PathBuf, PathBuf),
@@ -381,9 +381,17 @@ impl IndexFullHttpFunction {
) -> Result<String, Error> { ) -> Result<String, Error> {
let mut msg = format!("Index channel {}", channel_name); let mut msg = format!("Index channel {}", channel_name);
let files = datafiles_for_channel(channel_name.clone(), node_config).await?; let files = datafiles_for_channel(channel_name.clone(), node_config).await?;
let mut files = files;
files.sort();
let files = files;
msg = format!("{}\n{:?}", msg, files); msg = format!("{}\n{:?}", msg, files);
let mut latest_pair = (0, 0); let mut latest_pair = (0, 0);
for mp in files { let n1 = files.len().min(3);
let m1 = files.len() - n1;
for ch in &files[m1..] {
info!(" index over {:?}", ch);
}
for mp in files[m1..].into_iter() {
match mp { match mp {
MapfilePath::Scalar(path) => { MapfilePath::Scalar(path) => {
let splitted: Vec<_> = path.to_str().unwrap().split("/").collect(); let splitted: Vec<_> = path.to_str().unwrap().split("/").collect();
@@ -513,7 +521,7 @@ async fn update_task(do_abort: Arc<AtomicUsize>, node_config: NodeConfigCached)
info!("update_task break A"); info!("update_task break A");
break; break;
} }
tokio::time::sleep(Duration::from_millis(40000 + (0x3fff & commonio::tokio_rand().await?))).await; tokio::time::sleep(Duration::from_millis(10000 + (0x3fff & commonio::tokio_rand().await?))).await;
if do_abort.load(Ordering::SeqCst) != 0 { if do_abort.load(Ordering::SeqCst) != 0 {
info!("update_task break B"); info!("update_task break B");
break; break;
@@ -523,11 +531,11 @@ async fn update_task(do_abort: Arc<AtomicUsize>, node_config: NodeConfigCached)
Ok(_) => {} Ok(_) => {}
Err(e) => { Err(e) => {
error!("issue during last update task: {:?}", e); error!("issue during last update task: {:?}", e);
tokio::time::sleep(Duration::from_millis(50000)).await; tokio::time::sleep(Duration::from_millis(20000)).await;
} }
} }
let ts2 = Instant::now(); let ts2 = Instant::now();
let dt = ts2.duration_since(ts1).as_secs_f64() * 1e3; let dt = ts2.duration_since(ts1).as_secs_f32() * 1e3;
info!("Done update task {:.0} ms", dt); info!("Done update task {:.0} ms", dt);
} }
Ok(()) Ok(())