Pulse map also for non-scalar channels

This commit is contained in:
Dominik Werder
2023-01-11 13:30:47 +01:00
parent 4a75793281
commit 9c68476626
2 changed files with 207 additions and 34 deletions

View File

@@ -90,3 +90,4 @@ impl Convable for http::uri::InvalidUri {}
impl Convable for http::Error {}
impl Convable for http::header::ToStrError {}
impl Convable for hyper::Error {}
impl Convable for std::array::TryFromSliceError {}

View File

@@ -2,7 +2,7 @@ use crate::err::Error;
use crate::response;
use bytes::BufMut;
use bytes::{Buf, BytesMut};
use futures_util::stream::FuturesOrdered;
use futures_util::stream::{FuturesOrdered, FuturesUnordered};
use futures_util::FutureExt;
use http::{Method, StatusCode, Uri};
use hyper::{Body, Request, Response};
@@ -158,6 +158,17 @@ struct ChunkInfo {
pulse: u64,
}
#[derive(Debug)]
struct IndexChunkInfo {
pos_index: u64,
#[allow(unused)]
pos_data: u64,
#[allow(unused)]
len: u64,
ts: u64,
pulse: u64,
}
async fn read_buf_or_eof(file: &mut File, buf: &mut BytesMut) -> Result<usize, Error> {
let mut m = 0;
loop {
@@ -178,7 +189,7 @@ async fn read_buf_or_eof(file: &mut File, buf: &mut BytesMut) -> Result<usize, E
async fn read_first_index_chunk(
mut file_index: File,
file_data: File,
) -> Result<(Option<ChunkInfo>, File, File), Error> {
) -> Result<(Option<IndexChunkInfo>, File, File), Error> {
file_index.seek(SeekFrom::Start(0)).await?;
let mut buf = BytesMut::with_capacity(1024);
let n1 = read_buf_or_eof(&mut file_index, &mut buf).await?;
@@ -191,18 +202,26 @@ async fn read_first_index_chunk(
if ver != 0 {
return Err(Error::with_msg_no_trace(format!("unknown file version ver {}", ver)));
}
let p2 = 2;
let ts = buf.get_u64();
let pos = buf.get_u64();
trace!("read_first_index_chunk ts {ts} pos {pos}");
let (chunk, file_data) = read_chunk_at(file_data, pos, None).await?;
let pos_data = buf.get_u64();
trace!("read_first_index_chunk ts {ts} pos_data {pos_data}");
let (chunk, file_data) = read_chunk_at(file_data, pos_data, None).await?;
trace!("read_first_index_chunk successful: {chunk:?}");
Ok((Some(chunk), file_index, file_data))
let ret = IndexChunkInfo {
pos_index: p2,
pos_data: chunk.pos,
len: chunk.len,
ts: chunk.ts,
pulse: chunk.pulse,
};
Ok((Some(ret), file_index, file_data))
}
async fn read_last_index_chunk(
mut file_index: File,
file_data: File,
) -> Result<(Option<ChunkInfo>, File, File), Error> {
) -> Result<(Option<IndexChunkInfo>, File, File), Error> {
let flen = file_index.seek(SeekFrom::End(0)).await?;
let entry_len = 16;
let c1 = (flen - 2) / entry_len;
@@ -219,11 +238,18 @@ async fn read_last_index_chunk(
return Ok((None, file_index, file_data));
}
let ts = buf.get_u64();
let pos = buf.get_u64();
trace!("read_last_index_chunk ts {ts} pos {pos}");
let (chunk, file_data) = read_chunk_at(file_data, pos, None).await?;
let pos_data = buf.get_u64();
trace!("read_last_index_chunk p2 {p2} ts {ts} pos_data {pos_data}");
let (chunk, file_data) = read_chunk_at(file_data, pos_data, None).await?;
trace!("read_last_index_chunk successful: {chunk:?}");
Ok((Some(chunk), file_index, file_data))
let ret = IndexChunkInfo {
pos_index: p2,
pos_data: chunk.pos,
len: chunk.len,
ts: chunk.ts,
pulse: chunk.pulse,
};
Ok((Some(ret), file_index, file_data))
}
async fn read_first_chunk(mut file: File) -> Result<(Option<ChunkInfo>, File), Error> {
@@ -316,7 +342,7 @@ async fn read_chunk_at(mut file: File, pos: u64, chunk_len: Option<u64>) -> Resu
let _ttl = buf.get_u64();
let ts = buf.get_u64();
let pulse = buf.get_u64();
info!("data chunk len {} ts {} pulse {}", clen, ts, pulse);
trace!("data chunk len {} ts {} pulse {}", clen, ts, pulse);
let ret = ChunkInfo {
pos,
len: clen,
@@ -363,12 +389,6 @@ impl IndexFullHttpFunction {
let splitted: Vec<_> = path.to_str().unwrap().split("/").collect();
let timebin: u64 = splitted[splitted.len() - 3].parse()?;
let split: u64 = splitted[splitted.len() - 2].parse()?;
if false {
info!(
"hostname {} timebin {} split {}",
node_config.node.host, timebin, split
);
}
let file = tokio::fs::OpenOptions::new().read(true).open(&path).await?;
let (r2, file) = read_first_chunk(file).await?;
msg = format!("{}\n{:?}", msg, r2);
@@ -403,12 +423,6 @@ impl IndexFullHttpFunction {
let splitted: Vec<_> = path_index.to_str().unwrap().split("/").collect();
let timebin: u64 = splitted[splitted.len() - 3].parse()?;
let split: u64 = splitted[splitted.len() - 2].parse()?;
if false {
info!(
"hostname {} timebin {} split {}",
node_config.node.host, timebin, split
);
}
let file_index = tokio::fs::OpenOptions::new().read(true).open(&path_index).await?;
let file_data = tokio::fs::OpenOptions::new().read(true).open(&path_data).await?;
let (r2, file_index, file_data) = read_first_index_chunk(file_index, file_data).await?;
@@ -509,7 +523,7 @@ async fn update_task(do_abort: Arc<AtomicUsize>, node_config: NodeConfigCached)
Ok(_) => {}
Err(e) => {
error!("issue during last update task: {:?}", e);
tokio::time::sleep(Duration::from_millis(5000)).await;
tokio::time::sleep(Duration::from_millis(50000)).await;
}
}
let ts2 = Instant::now();
@@ -571,6 +585,7 @@ async fn search_pulse(pulse: u64, path: &Path) -> Result<Option<u64>, Error> {
return Ok(Some(ck1.ts));
}
if ck1.pulse > pulse {
trace!("search_pulse {} lower than first {:?}", pulse, path);
return Ok(None);
}
let (ck2, mut f1) = read_last_chunk(f1, ck1.pos, ck1.len).await?;
@@ -579,11 +594,10 @@ async fn search_pulse(pulse: u64, path: &Path) -> Result<Option<u64>, Error> {
return Ok(Some(ck2.ts));
}
if ck2.pulse < pulse {
trace!("search_pulse {} higher than last {:?}", pulse, path);
return Ok(None);
}
let chunk_len = ck1.len;
//let flen = f1.seek(SeekFrom::End(0)).await?;
//let chunk_count = (flen - ck1.pos) / ck1.len;
let mut p1 = ck1.pos;
let mut p2 = ck2.pos;
loop {
@@ -592,6 +606,7 @@ async fn search_pulse(pulse: u64, path: &Path) -> Result<Option<u64>, Error> {
return Err(Error::with_msg_no_trace(format!("search_pulse ")));
}
if d <= chunk_len {
trace!("search_pulse {} not in {:?}", pulse, path);
return Ok(None);
}
let m = p1 + d / chunk_len / 2 * chunk_len;
@@ -613,6 +628,77 @@ async fn search_pulse(pulse: u64, path: &Path) -> Result<Option<u64>, Error> {
}
}
async fn search_index_pulse(pulse: u64, path: &Path) -> Result<Option<u64>, Error> {
let fn_index = format!("{}_Index", path.file_name().unwrap().to_str().unwrap());
let path_index = path.parent().unwrap().join(fn_index);
let f1 = tokio::fs::OpenOptions::new().read(true).open(path_index).await?;
let f2 = tokio::fs::OpenOptions::new().read(true).open(path).await?;
let (ck1, f1, f2) = read_first_index_chunk(f1, f2).await?;
if let Some(ck1) = ck1 {
if ck1.pulse == pulse {
return Ok(Some(ck1.ts));
}
if ck1.pulse > pulse {
return Ok(None);
}
let (ck2, mut f1, mut f2) = read_last_index_chunk(f1, f2).await?;
if let Some(ck2) = ck2 {
if ck2.pulse == pulse {
return Ok(Some(ck2.ts));
}
if ck2.pulse < pulse {
return Ok(None);
}
let index_entry_len = 16;
let chunk_len = index_entry_len;
let mut p1 = ck1.pos_index;
let mut p2 = ck2.pos_index;
loop {
let d = p2 - p1;
if 0 != d % index_entry_len {
return Err(Error::with_msg_no_trace(format!("search_pulse ")));
}
if d <= chunk_len {
return Ok(None);
}
let m = p1 + d / chunk_len / 2 * chunk_len;
let (z, f1b, f2b) = async {
f1.seek(SeekFrom::Start(m)).await?;
let mut buf3 = [0; 16];
f1.read_exact(&mut buf3).await?;
let ts = u64::from_be_bytes(buf3[0..8].try_into()?);
let pos_data = u64::from_be_bytes(buf3[8..16].try_into()?);
trace!("search loop in index m {m} ts {ts} pos_data {pos_data}");
let (chunk, f2) = read_chunk_at(f2, pos_data, None).await?;
trace!("search loop read_chunk_at successful: {chunk:?}");
let ret = IndexChunkInfo {
pos_index: p2,
pos_data: chunk.pos,
len: chunk.len,
ts: chunk.ts,
pulse: chunk.pulse,
};
Ok::<_, Error>((ret, f1, f2))
}
.await?;
f1 = f1b;
f2 = f2b;
if z.pulse == pulse {
return Ok(Some(z.ts));
} else if z.pulse > pulse {
p2 = m;
} else {
p1 = m;
}
}
} else {
Ok(None)
}
} else {
Ok(None)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MapPulseQuery {
pub backend: String,
@@ -762,10 +848,11 @@ impl MapPulseLocalHttpFunction {
if req.method() != Method::GET {
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?);
}
let urls = format!("{}", req.uri());
let urls = req.uri().to_string();
let pulse: u64 = urls[MAP_PULSE_LOCAL_URL_PREFIX.len()..]
.parse()
.map_err(|_| Error::with_public_msg_no_trace(format!("can not understand pulse map url: {}", req.uri())))?;
let ts1 = Instant::now();
let conn = dbconn::create_connection(&node_config.node_config.cluster.database).await?;
let sql = "select channel, hostname, timebin, split, ks from map_pulse_files where hostname = $1 and pulse_min <= $2 and (pulse_max >= $2 or closed = 0)";
let rows = conn.query(sql, &[&node_config.node.host, &(pulse as i64)]).await?;
@@ -780,12 +867,56 @@ impl MapPulseLocalHttpFunction {
(channel, hostname, timebin as u32, split as u32, ks as u32)
})
.collect();
trace!(
"database query took {}s",
Instant::now().duration_since(ts1).as_secs_f32()
);
//let mut msg = String::new();
//use std::fmt::Write;
//write!(&mut msg, "cands: {:?}\n", cands)?;
let mut futs = FuturesUnordered::new();
for (ch, hostname, tb, sp, ks) in cands {
futs.push(Self::search(pulse, ch, hostname, tb, sp, ks, node_config));
}
let mut tss = Vec::new();
let mut channels = Vec::new();
for (ch, _, tb, sp, ks) in cands {
use futures_util::StreamExt;
while let Some(k) = futs.next().await {
match k {
Ok(item) => match item {
Some((ts, ch)) => {
tss.push(ts);
channels.push(ch);
}
None => {}
},
Err(e) => {
error!("{e}");
}
}
}
let ret = LocalMap { pulse, tss, channels };
Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&ret)?))?)
}
async fn search(
pulse: u64,
ch: String,
hostname: String,
tb: u32,
sp: u32,
ks: u32,
node_config: &NodeConfigCached,
) -> Result<Option<(u64, String)>, Error> {
trace!(
"search in ks {} sp {} tb {} host {} ch {}",
ks,
sp,
tb,
hostname,
ch
);
if ks == 2 {
match disk::paths::data_path_tb(ks, &ch, tb, 86400000, sp, &node_config.node) {
Ok(path) => {
//write!(&mut msg, "data_path_tb: {:?}\n", path)?;
@@ -793,22 +924,51 @@ impl MapPulseLocalHttpFunction {
Ok(ts) => {
//write!(&mut msg, "SEARCH: {:?} for {}\n", ts, pulse)?;
if let Some(ts) = ts {
tss.push(ts);
channels.push(ch);
info!("Found in ks {} sp {} tb {} ch {} ts {}", ks, sp, tb, ch, ts);
Ok(Some((ts, ch)))
} else {
Ok(None)
}
}
Err(e) => {
warn!("can not map pulse with {ch} {sp} {tb} {e}");
return Err(e);
}
}
}
Err(e) => {
warn!("can not get path to files {ch} {e}");
return Err(e)?;
}
}
} else if ks == 3 {
match disk::paths::data_path_tb(ks, &ch, tb, 86400000, sp, &node_config.node) {
Ok(path) => {
//write!(&mut msg, "data_path_tb: {:?}\n", path)?;
match search_index_pulse(pulse, &path).await {
Ok(ts) => {
//write!(&mut msg, "SEARCH: {:?} for {}\n", ts, pulse)?;
if let Some(ts) = ts {
info!("Found in ks {} sp {} tb {} ch {} ts {}", ks, sp, tb, ch, ts);
Ok(Some((ts, ch)))
} else {
Ok(None)
}
}
Err(e) => {
warn!("can not map pulse with {ch} {sp} {tb} {e}");
return Err(e);
}
}
}
Err(e) => {
warn!("can not get path to files {ch} {e}");
return Err(e)?;
}
}
} else {
return Err(Error::with_msg_no_trace(format!("bad keyspace {ks}")));
}
let ret = LocalMap { pulse, tss, channels };
Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&ret)?))?)
}
}
@@ -843,7 +1003,10 @@ impl MapPulseHistoHttpFunction {
pub async fn histo(pulse: u64, node_config: &NodeConfigCached) -> Result<TsHisto, Error> {
let mut futs = FuturesOrdered::new();
for node in &node_config.node_config.cluster.nodes {
let s = format!("http://{}:{}/api/1/map/pulse/local/{}", node.host, node.port, pulse);
let s = format!(
"http://{}:{}{}{}",
node.host, node.port, MAP_PULSE_LOCAL_URL_PREFIX, pulse
);
let uri: Uri = s.parse()?;
let fut = hyper::Client::new().get(uri);
let fut = tokio::time::timeout(Duration::from_millis(1000), fut);
@@ -928,6 +1091,7 @@ impl Api4MapPulseHttpFunction {
if req.method() != Method::GET {
return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?);
}
let ts1 = Instant::now();
info!("Api4MapPulseHttpFunction handle uri: {:?}", req.uri());
let url = Url::parse(&format!("dummy:{}", req.uri()))?;
let q = MapPulseQuery::from_url(&url)?;
@@ -940,6 +1104,14 @@ impl Api4MapPulseHttpFunction {
i1 = i2;
}
}
let ts2 = Instant::now();
info!(
"Api4MapPulseHttpFunction took {:.2}s",
ts2.duration_since(ts1).as_secs_f32()
);
if histo.tss.len() > 1 {
warn!("Ambigious pulse map pulse {} histo {:?}", q.pulse, histo);
}
if max > 0 {
Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&histo.tss[i1])?))?)
} else {