Factor out common function

This commit is contained in:
Dominik Werder
2022-10-21 14:31:05 +02:00
parent 881ff1fa44
commit cd4cfe2355
3 changed files with 333 additions and 210 deletions

View File

@@ -1,5 +1,6 @@
use crate::err::Error;
use crate::response;
use bytes::BufMut;
use bytes::{Buf, BytesMut};
use futures_util::stream::FuturesOrdered;
use futures_util::FutureExt;
@@ -108,15 +109,31 @@ struct ChunkInfo {
pulse: u64,
}
async fn read_first_chunk(mut file: File) -> Result<(ChunkInfo, File), Error> {
async fn read_buf_or_eof(file: &mut File, buf: &mut BytesMut) -> Result<usize, Error> {
let mut m = 0;
loop {
if buf.has_remaining_mut() {
let n = file.read_buf(buf).await?;
if n == 0 {
break;
} else {
m += n;
}
} else {
break;
}
}
Ok(m)
}
async fn read_first_chunk(mut file: File) -> Result<(Option<ChunkInfo>, File), Error> {
file.seek(SeekFrom::Start(0)).await?;
let mut buf = BytesMut::with_capacity(1024);
let n1 = file.read_buf(&mut buf).await?;
let n1 = read_buf_or_eof(&mut file, &mut buf).await?;
if n1 < 6 {
return Err(Error::with_msg_no_trace(format!(
"can not even read 6 bytes from datafile n1 {}",
n1
)));
let msg = format!("can not even read 6 bytes from datafile n1 {}", n1);
warn!("{msg}");
return Ok((None, file));
}
let ver = buf.get_i16();
if ver != 0 {
@@ -124,10 +141,9 @@ async fn read_first_chunk(mut file: File) -> Result<(ChunkInfo, File), Error> {
}
let hlen = buf.get_u32() as u64;
if n1 < 2 + hlen as usize + 4 + 3 * 8 {
return Err(Error::with_msg_no_trace(format!(
"did not read enough for first event n1 {}",
n1
)));
let msg = format!("did not read enough for first event n1 {}", n1);
warn!("{msg}");
return Ok((None, file));
}
buf.advance(hlen as usize - 4);
let clen = buf.get_u32() as u64;
@@ -140,43 +156,42 @@ async fn read_first_chunk(mut file: File) -> Result<(ChunkInfo, File), Error> {
ts,
pulse,
};
Ok((ret, file))
Ok((Some(ret), file))
}
async fn read_last_chunk(mut file: File, pos_first: u64, chunk_len: u64) -> Result<(ChunkInfo, File), Error> {
async fn read_last_chunk(mut file: File, pos_first: u64, chunk_len: u64) -> Result<(Option<ChunkInfo>, File), Error> {
let flen = file.seek(SeekFrom::End(0)).await?;
let c1 = (flen - pos_first) / chunk_len;
if c1 == 0 {
return Err(Error::with_msg_no_trace("no chunks in this file"));
return Ok((None, file));
}
let p2 = pos_first + (c1 - 1) * chunk_len;
file.seek(SeekFrom::Start(p2)).await?;
let mut buf = BytesMut::with_capacity(1024);
let n1 = file.read_buf(&mut buf).await?;
let n1 = read_buf_or_eof(&mut file, &mut buf).await?;
if n1 < 4 + 3 * 8 {
return Err(Error::with_msg_no_trace(format!(
"can not read enough from datafile n1 {}",
n1
"can not read enough from datafile flen {} pos_first {} chunk_len {} c1 {} n1 {}",
flen, pos_first, chunk_len, c1, n1
)));
}
let clen = buf.get_u32() as u64;
if clen != chunk_len {
return Err(Error::with_msg_no_trace(format!(
"read_last_chunk mismatch: pos_first {} flen {} clen {} chunk_len {}",
pos_first, flen, clen, chunk_len
"read_last_chunk mismatch flen {} pos_first {} chunk_len {} clen {} c1 {} n1 {}",
flen, pos_first, chunk_len, clen, c1, n1
)));
}
let _ttl = buf.get_u64();
let ts = buf.get_u64();
let pulse = buf.get_u64();
//info!("data chunk len {} ts {} pulse {}", clen, ts, pulse);
let ret = ChunkInfo {
pos: p2,
len: clen,
ts,
pulse,
};
Ok((ret, file))
Ok((Some(ret), file))
}
async fn read_chunk_at(mut file: File, pos: u64, chunk_len: u64) -> Result<(ChunkInfo, File), Error> {
@@ -249,25 +264,31 @@ impl IndexFullHttpFunction {
node_config.node.host, timebin, split
);
}
let file = tokio::fs::OpenOptions::new().read(true).open(path).await?;
let file = tokio::fs::OpenOptions::new().read(true).open(&path).await?;
let (r2, file) = read_first_chunk(file).await?;
msg = format!("{}\n{:?}", msg, r2);
let (r3, _file) = read_last_chunk(file, r2.pos, r2.len).await?;
msg = format!("{}\n{:?}", msg, r3);
// TODO remove update of static columns when older clients are removed.
let sql = "insert into map_pulse_files (channel, split, timebin, pulse_min, pulse_max, hostname) values ($1, $2, $3, $4, $5, $6) on conflict (channel, split, timebin) do update set pulse_min = $4, pulse_max = $5, upc1 = map_pulse_files.upc1 + 1, hostname = $6";
conn.execute(
sql,
&[
&channel_name,
&(split as i32),
&(timebin as i32),
&(r2.pulse as i64),
&(r3.pulse as i64),
&node_config.node.host,
],
)
.await?;
if let Some(r2) = r2 {
let (r3, _file) = read_last_chunk(file, r2.pos, r2.len).await?;
msg = format!("{}\n{:?}", msg, r3);
if let Some(r3) = r3 {
// TODO remove update of static columns when older clients are removed.
let sql = "insert into map_pulse_files (channel, split, timebin, pulse_min, pulse_max, hostname) values ($1, $2, $3, $4, $5, $6) on conflict (channel, split, timebin) do update set pulse_min = $4, pulse_max = $5, upc1 = map_pulse_files.upc1 + 1, hostname = $6";
conn.execute(
sql,
&[
&channel_name,
&(split as i32),
&(timebin as i32),
&(r2.pulse as i64),
&(r3.pulse as i64),
&node_config.node.host,
],
)
.await?;
}
} else {
warn!("could not find first event chunk in {path:?}");
}
}
Ok(msg)
}
@@ -393,42 +414,50 @@ impl UpdateTask {
async fn search_pulse(pulse: u64, path: &Path) -> Result<Option<u64>, Error> {
let f1 = tokio::fs::OpenOptions::new().read(true).open(path).await?;
let (ck1, f1) = read_first_chunk(f1).await?;
if ck1.pulse == pulse {
return Ok(Some(ck1.ts));
}
if ck1.pulse > pulse {
return Ok(None);
}
let (ck2, mut f1) = read_last_chunk(f1, ck1.pos, ck1.len).await?;
if ck2.pulse == pulse {
return Ok(Some(ck2.ts));
}
if ck2.pulse < pulse {
return Ok(None);
}
let chunk_len = ck1.len;
//let flen = f1.seek(SeekFrom::End(0)).await?;
//let chunk_count = (flen - ck1.pos) / ck1.len;
let mut p1 = ck1.pos;
let mut p2 = ck2.pos;
loop {
let d = p2 - p1;
if 0 != d % chunk_len {
return Err(Error::with_msg_no_trace(format!("search_pulse ")));
if let Some(ck1) = ck1 {
if ck1.pulse == pulse {
return Ok(Some(ck1.ts));
}
if d <= chunk_len {
if ck1.pulse > pulse {
return Ok(None);
}
let m = p1 + d / chunk_len / 2 * chunk_len;
let (z, f2) = read_chunk_at(f1, m, chunk_len).await?;
f1 = f2;
if z.pulse == pulse {
return Ok(Some(z.ts));
} else if z.pulse > pulse {
p2 = m;
let (ck2, mut f1) = read_last_chunk(f1, ck1.pos, ck1.len).await?;
if let Some(ck2) = ck2 {
if ck2.pulse == pulse {
return Ok(Some(ck2.ts));
}
if ck2.pulse < pulse {
return Ok(None);
}
let chunk_len = ck1.len;
//let flen = f1.seek(SeekFrom::End(0)).await?;
//let chunk_count = (flen - ck1.pos) / ck1.len;
let mut p1 = ck1.pos;
let mut p2 = ck2.pos;
loop {
let d = p2 - p1;
if 0 != d % chunk_len {
return Err(Error::with_msg_no_trace(format!("search_pulse ")));
}
if d <= chunk_len {
return Ok(None);
}
let m = p1 + d / chunk_len / 2 * chunk_len;
let (z, f2) = read_chunk_at(f1, m, chunk_len).await?;
f1 = f2;
if z.pulse == pulse {
return Ok(Some(z.ts));
} else if z.pulse > pulse {
p2 = m;
} else {
p1 = m;
}
}
} else {
p1 = m;
Ok(None)
}
} else {
Ok(None)
}
}