From e60f076b278dad419af4ac0b7dc7911f30cec1aa Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 9 Sep 2021 18:26:48 +0200 Subject: [PATCH] Can get some data --- disk/src/dataopen.rs | 65 ++++++++++++++++++++++++++++++-------------- disk/src/paths.rs | 4 ++- httpret/src/lib.rs | 3 +- 3 files changed, 50 insertions(+), 22 deletions(-) diff --git a/disk/src/dataopen.rs b/disk/src/dataopen.rs index 9b21296..8e812bd 100644 --- a/disk/src/dataopen.rs +++ b/disk/src/dataopen.rs @@ -113,12 +113,20 @@ async fn position_file( match OpenOptions::new().read(true).open(&index_path).await { Ok(mut index_file) => { let meta = index_file.metadata().await?; - if meta.len() > 1024 * 1024 * 20 { - return Err(Error::with_msg(format!( + if meta.len() > 1024 * 1024 * 80 { + let msg = format!( "too large index file {} bytes for {}", meta.len(), channel_config.channel.name - ))); + ); + return Err(Error::with_msg(msg)); + } else if meta.len() > 1024 * 1024 * 20 { + let msg = format!( + "large index file {} bytes for {}", + meta.len(), + channel_config.channel.name + ); + warn!("{}", msg); } if meta.len() < 2 { return Err(Error::with_msg(format!( @@ -252,21 +260,35 @@ pub fn open_expanded_files( async fn get_timebins(channel_config: &ChannelConfig, node: Node) -> Result, Error> { let mut timebins = vec![]; - let rd = tokio::fs::read_dir(paths::channel_timebins_dir_path(&channel_config, &node)?).await?; - let mut rd = tokio_stream::wrappers::ReadDirStream::new(rd); - while let Some(e) = rd.next().await { - let e = e?; - let dn = e - .file_name() - .into_string() - .map_err(|e| Error::with_msg(format!("Bad OS path {:?}", e)))?; - let vv = dn.chars().fold(0, |a, x| if x.is_digit(10) { a + 1 } else { a }); - if vv == 19 { - timebins.push(dn.parse::()?); + let p0 = paths::channel_timebins_dir_path(&channel_config, &node)?; + match tokio::fs::read_dir(&p0).await { + Ok(rd) => { + let mut rd = tokio_stream::wrappers::ReadDirStream::new(rd); + while let Some(e) = rd.next().await { + let e = e?; + let dn = e + .file_name() + .into_string() + .map_err(|e| Error::with_msg(format!("Bad OS path {:?}", e)))?; + if dn.len() != 19 { + warn!("get_timebins weird directory {:?} p0 {:?}", e.path(), p0); + } + let vv = dn.chars().fold(0, |a, x| if x.is_digit(10) { a + 1 } else { a }); + if vv == 19 { + timebins.push(dn.parse::()?); + } + } + timebins.sort_unstable(); + Ok(timebins) + } + Err(e) => { + info!( + "get_timebins no timebins for {:?} {:?} p0 {:?}", + channel_config, e, p0 + ); + Ok(vec![]) } } - timebins.sort_unstable(); - Ok(timebins) } async fn open_expanded_files_inner( @@ -291,6 +313,12 @@ async fn open_expanded_files_inner( } } let mut p1 = if let Some(i1) = p1 { i1 } else { 0 }; + if p1 >= timebins.len() { + return Err(Error::with_msg(format!( + "logic error p1 {} range {:?} channel_config {:?}", + p1, range, channel_config + ))); + } let mut found_first = false; loop { let tb = timebins[p1]; @@ -320,7 +348,7 @@ async fn open_expanded_files_inner( } if found_first { // Append all following positioned files. - loop { + while p1 < timebins.len() { let tb = timebins[p1]; let mut a = vec![]; for path in paths::datapaths_for_timebin(tb, &channel_config, &node).await? { @@ -332,9 +360,6 @@ async fn open_expanded_files_inner( let h = OpenedFileSet { timebin: tb, files: a }; chtx.send(Ok(h)).await?; p1 += 1; - if p1 >= timebins.len() { - break; - } } } else { info!("Could not find some event before the requested range, fall back to standard file list."); diff --git a/disk/src/paths.rs b/disk/src/paths.rs index 8f59897..eae4f5c 100644 --- a/disk/src/paths.rs +++ b/disk/src/paths.rs @@ -62,7 +62,9 @@ pub async fn datapaths_for_timebin( .join(format!("{:019}_00000_Data", config.time_bin_size.ns / MS)); ret.push(path); } - info!("\n\ndatapaths_for_timebin returns: {:?}\n", ret); + if false { + info!("datapaths_for_timebin returns: {:?}", ret) + } Ok(ret) } diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 67cb7a9..575627d 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -460,6 +460,7 @@ async fn plain_events(req: Request, node_config: &NodeConfigCached) -> Res } async fn plain_events_binary(req: Request, node_config: &NodeConfigCached) -> Result, Error> { + info!("httpret plain_events_binary req: {:?}", req); let url = Url::parse(&format!("dummy:{}", req.uri()))?; let query = PlainEventsBinaryQuery::from_url(&url)?; let op = disk::channelexec::PlainEvents::new( @@ -475,7 +476,7 @@ async fn plain_events_binary(req: Request, node_config: &NodeConfigCached) } async fn plain_events_json(req: Request, node_config: &NodeConfigCached) -> Result, Error> { - info!("plain_events_json req: {:?}", req); + info!("httpret plain_events_json req: {:?}", req); let (head, _body) = req.into_parts(); let query = PlainEventsJsonQuery::from_request_head(&head)?; let op = disk::channelexec::PlainEventsJson::new(