Can get some data
This commit is contained in:
@@ -113,12 +113,20 @@ async fn position_file(
|
||||
match OpenOptions::new().read(true).open(&index_path).await {
|
||||
Ok(mut index_file) => {
|
||||
let meta = index_file.metadata().await?;
|
||||
if meta.len() > 1024 * 1024 * 20 {
|
||||
return Err(Error::with_msg(format!(
|
||||
if meta.len() > 1024 * 1024 * 80 {
|
||||
let msg = format!(
|
||||
"too large index file {} bytes for {}",
|
||||
meta.len(),
|
||||
channel_config.channel.name
|
||||
)));
|
||||
);
|
||||
return Err(Error::with_msg(msg));
|
||||
} else if meta.len() > 1024 * 1024 * 20 {
|
||||
let msg = format!(
|
||||
"large index file {} bytes for {}",
|
||||
meta.len(),
|
||||
channel_config.channel.name
|
||||
);
|
||||
warn!("{}", msg);
|
||||
}
|
||||
if meta.len() < 2 {
|
||||
return Err(Error::with_msg(format!(
|
||||
@@ -252,21 +260,35 @@ pub fn open_expanded_files(
|
||||
|
||||
async fn get_timebins(channel_config: &ChannelConfig, node: Node) -> Result<Vec<u64>, Error> {
|
||||
let mut timebins = vec![];
|
||||
let rd = tokio::fs::read_dir(paths::channel_timebins_dir_path(&channel_config, &node)?).await?;
|
||||
let mut rd = tokio_stream::wrappers::ReadDirStream::new(rd);
|
||||
while let Some(e) = rd.next().await {
|
||||
let e = e?;
|
||||
let dn = e
|
||||
.file_name()
|
||||
.into_string()
|
||||
.map_err(|e| Error::with_msg(format!("Bad OS path {:?}", e)))?;
|
||||
let vv = dn.chars().fold(0, |a, x| if x.is_digit(10) { a + 1 } else { a });
|
||||
if vv == 19 {
|
||||
timebins.push(dn.parse::<u64>()?);
|
||||
let p0 = paths::channel_timebins_dir_path(&channel_config, &node)?;
|
||||
match tokio::fs::read_dir(&p0).await {
|
||||
Ok(rd) => {
|
||||
let mut rd = tokio_stream::wrappers::ReadDirStream::new(rd);
|
||||
while let Some(e) = rd.next().await {
|
||||
let e = e?;
|
||||
let dn = e
|
||||
.file_name()
|
||||
.into_string()
|
||||
.map_err(|e| Error::with_msg(format!("Bad OS path {:?}", e)))?;
|
||||
if dn.len() != 19 {
|
||||
warn!("get_timebins weird directory {:?} p0 {:?}", e.path(), p0);
|
||||
}
|
||||
let vv = dn.chars().fold(0, |a, x| if x.is_digit(10) { a + 1 } else { a });
|
||||
if vv == 19 {
|
||||
timebins.push(dn.parse::<u64>()?);
|
||||
}
|
||||
}
|
||||
timebins.sort_unstable();
|
||||
Ok(timebins)
|
||||
}
|
||||
Err(e) => {
|
||||
info!(
|
||||
"get_timebins no timebins for {:?} {:?} p0 {:?}",
|
||||
channel_config, e, p0
|
||||
);
|
||||
Ok(vec![])
|
||||
}
|
||||
}
|
||||
timebins.sort_unstable();
|
||||
Ok(timebins)
|
||||
}
|
||||
|
||||
async fn open_expanded_files_inner(
|
||||
@@ -291,6 +313,12 @@ async fn open_expanded_files_inner(
|
||||
}
|
||||
}
|
||||
let mut p1 = if let Some(i1) = p1 { i1 } else { 0 };
|
||||
if p1 >= timebins.len() {
|
||||
return Err(Error::with_msg(format!(
|
||||
"logic error p1 {} range {:?} channel_config {:?}",
|
||||
p1, range, channel_config
|
||||
)));
|
||||
}
|
||||
let mut found_first = false;
|
||||
loop {
|
||||
let tb = timebins[p1];
|
||||
@@ -320,7 +348,7 @@ async fn open_expanded_files_inner(
|
||||
}
|
||||
if found_first {
|
||||
// Append all following positioned files.
|
||||
loop {
|
||||
while p1 < timebins.len() {
|
||||
let tb = timebins[p1];
|
||||
let mut a = vec![];
|
||||
for path in paths::datapaths_for_timebin(tb, &channel_config, &node).await? {
|
||||
@@ -332,9 +360,6 @@ async fn open_expanded_files_inner(
|
||||
let h = OpenedFileSet { timebin: tb, files: a };
|
||||
chtx.send(Ok(h)).await?;
|
||||
p1 += 1;
|
||||
if p1 >= timebins.len() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
info!("Could not find some event before the requested range, fall back to standard file list.");
|
||||
|
||||
@@ -62,7 +62,9 @@ pub async fn datapaths_for_timebin(
|
||||
.join(format!("{:019}_00000_Data", config.time_bin_size.ns / MS));
|
||||
ret.push(path);
|
||||
}
|
||||
info!("\n\ndatapaths_for_timebin returns: {:?}\n", ret);
|
||||
if false {
|
||||
info!("datapaths_for_timebin returns: {:?}", ret)
|
||||
}
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
|
||||
@@ -460,6 +460,7 @@ async fn plain_events(req: Request<Body>, node_config: &NodeConfigCached) -> Res
|
||||
}
|
||||
|
||||
async fn plain_events_binary(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
|
||||
info!("httpret plain_events_binary req: {:?}", req);
|
||||
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
|
||||
let query = PlainEventsBinaryQuery::from_url(&url)?;
|
||||
let op = disk::channelexec::PlainEvents::new(
|
||||
@@ -475,7 +476,7 @@ async fn plain_events_binary(req: Request<Body>, node_config: &NodeConfigCached)
|
||||
}
|
||||
|
||||
async fn plain_events_json(req: Request<Body>, node_config: &NodeConfigCached) -> Result<Response<Body>, Error> {
|
||||
info!("plain_events_json req: {:?}", req);
|
||||
info!("httpret plain_events_json req: {:?}", req);
|
||||
let (head, _body) = req.into_parts();
|
||||
let query = PlainEventsJsonQuery::from_request_head(&head)?;
|
||||
let op = disk::channelexec::PlainEventsJson::new(
|
||||
|
||||
Reference in New Issue
Block a user