diff --git a/httpret/src/err.rs b/httpret/src/err.rs index d02f676..6b5b16f 100644 --- a/httpret/src/err.rs +++ b/httpret/src/err.rs @@ -90,3 +90,4 @@ impl Convable for http::uri::InvalidUri {} impl Convable for http::Error {} impl Convable for http::header::ToStrError {} impl Convable for hyper::Error {} +impl Convable for std::array::TryFromSliceError {} diff --git a/httpret/src/pulsemap.rs b/httpret/src/pulsemap.rs index 2d4c14e..12b54eb 100644 --- a/httpret/src/pulsemap.rs +++ b/httpret/src/pulsemap.rs @@ -2,7 +2,7 @@ use crate::err::Error; use crate::response; use bytes::BufMut; use bytes::{Buf, BytesMut}; -use futures_util::stream::FuturesOrdered; +use futures_util::stream::{FuturesOrdered, FuturesUnordered}; use futures_util::FutureExt; use http::{Method, StatusCode, Uri}; use hyper::{Body, Request, Response}; @@ -158,6 +158,17 @@ struct ChunkInfo { pulse: u64, } +#[derive(Debug)] +struct IndexChunkInfo { + pos_index: u64, + #[allow(unused)] + pos_data: u64, + #[allow(unused)] + len: u64, + ts: u64, + pulse: u64, +} + async fn read_buf_or_eof(file: &mut File, buf: &mut BytesMut) -> Result { let mut m = 0; loop { @@ -178,7 +189,7 @@ async fn read_buf_or_eof(file: &mut File, buf: &mut BytesMut) -> Result Result<(Option, File, File), Error> { +) -> Result<(Option, File, File), Error> { file_index.seek(SeekFrom::Start(0)).await?; let mut buf = BytesMut::with_capacity(1024); let n1 = read_buf_or_eof(&mut file_index, &mut buf).await?; @@ -191,18 +202,26 @@ async fn read_first_index_chunk( if ver != 0 { return Err(Error::with_msg_no_trace(format!("unknown file version ver {}", ver))); } + let p2 = 2; let ts = buf.get_u64(); - let pos = buf.get_u64(); - trace!("read_first_index_chunk ts {ts} pos {pos}"); - let (chunk, file_data) = read_chunk_at(file_data, pos, None).await?; + let pos_data = buf.get_u64(); + trace!("read_first_index_chunk ts {ts} pos_data {pos_data}"); + let (chunk, file_data) = read_chunk_at(file_data, pos_data, None).await?; trace!("read_first_index_chunk successful: {chunk:?}"); - Ok((Some(chunk), file_index, file_data)) + let ret = IndexChunkInfo { + pos_index: p2, + pos_data: chunk.pos, + len: chunk.len, + ts: chunk.ts, + pulse: chunk.pulse, + }; + Ok((Some(ret), file_index, file_data)) } async fn read_last_index_chunk( mut file_index: File, file_data: File, -) -> Result<(Option, File, File), Error> { +) -> Result<(Option, File, File), Error> { let flen = file_index.seek(SeekFrom::End(0)).await?; let entry_len = 16; let c1 = (flen - 2) / entry_len; @@ -219,11 +238,18 @@ async fn read_last_index_chunk( return Ok((None, file_index, file_data)); } let ts = buf.get_u64(); - let pos = buf.get_u64(); - trace!("read_last_index_chunk ts {ts} pos {pos}"); - let (chunk, file_data) = read_chunk_at(file_data, pos, None).await?; + let pos_data = buf.get_u64(); + trace!("read_last_index_chunk p2 {p2} ts {ts} pos_data {pos_data}"); + let (chunk, file_data) = read_chunk_at(file_data, pos_data, None).await?; trace!("read_last_index_chunk successful: {chunk:?}"); - Ok((Some(chunk), file_index, file_data)) + let ret = IndexChunkInfo { + pos_index: p2, + pos_data: chunk.pos, + len: chunk.len, + ts: chunk.ts, + pulse: chunk.pulse, + }; + Ok((Some(ret), file_index, file_data)) } async fn read_first_chunk(mut file: File) -> Result<(Option, File), Error> { @@ -316,7 +342,7 @@ async fn read_chunk_at(mut file: File, pos: u64, chunk_len: Option) -> Resu let _ttl = buf.get_u64(); let ts = buf.get_u64(); let pulse = buf.get_u64(); - info!("data chunk len {} ts {} pulse {}", clen, ts, pulse); + trace!("data chunk len {} ts {} pulse {}", clen, ts, pulse); let ret = ChunkInfo { pos, len: clen, @@ -363,12 +389,6 @@ impl IndexFullHttpFunction { let splitted: Vec<_> = path.to_str().unwrap().split("/").collect(); let timebin: u64 = splitted[splitted.len() - 3].parse()?; let split: u64 = splitted[splitted.len() - 2].parse()?; - if false { - info!( - "hostname {} timebin {} split {}", - node_config.node.host, timebin, split - ); - } let file = tokio::fs::OpenOptions::new().read(true).open(&path).await?; let (r2, file) = read_first_chunk(file).await?; msg = format!("{}\n{:?}", msg, r2); @@ -403,12 +423,6 @@ impl IndexFullHttpFunction { let splitted: Vec<_> = path_index.to_str().unwrap().split("/").collect(); let timebin: u64 = splitted[splitted.len() - 3].parse()?; let split: u64 = splitted[splitted.len() - 2].parse()?; - if false { - info!( - "hostname {} timebin {} split {}", - node_config.node.host, timebin, split - ); - } let file_index = tokio::fs::OpenOptions::new().read(true).open(&path_index).await?; let file_data = tokio::fs::OpenOptions::new().read(true).open(&path_data).await?; let (r2, file_index, file_data) = read_first_index_chunk(file_index, file_data).await?; @@ -509,7 +523,7 @@ async fn update_task(do_abort: Arc, node_config: NodeConfigCached) Ok(_) => {} Err(e) => { error!("issue during last update task: {:?}", e); - tokio::time::sleep(Duration::from_millis(5000)).await; + tokio::time::sleep(Duration::from_millis(50000)).await; } } let ts2 = Instant::now(); @@ -571,6 +585,7 @@ async fn search_pulse(pulse: u64, path: &Path) -> Result, Error> { return Ok(Some(ck1.ts)); } if ck1.pulse > pulse { + trace!("search_pulse {} lower than first {:?}", pulse, path); return Ok(None); } let (ck2, mut f1) = read_last_chunk(f1, ck1.pos, ck1.len).await?; @@ -579,11 +594,10 @@ async fn search_pulse(pulse: u64, path: &Path) -> Result, Error> { return Ok(Some(ck2.ts)); } if ck2.pulse < pulse { + trace!("search_pulse {} higher than last {:?}", pulse, path); return Ok(None); } let chunk_len = ck1.len; - //let flen = f1.seek(SeekFrom::End(0)).await?; - //let chunk_count = (flen - ck1.pos) / ck1.len; let mut p1 = ck1.pos; let mut p2 = ck2.pos; loop { @@ -592,6 +606,7 @@ async fn search_pulse(pulse: u64, path: &Path) -> Result, Error> { return Err(Error::with_msg_no_trace(format!("search_pulse "))); } if d <= chunk_len { + trace!("search_pulse {} not in {:?}", pulse, path); return Ok(None); } let m = p1 + d / chunk_len / 2 * chunk_len; @@ -613,6 +628,77 @@ async fn search_pulse(pulse: u64, path: &Path) -> Result, Error> { } } +async fn search_index_pulse(pulse: u64, path: &Path) -> Result, Error> { + let fn_index = format!("{}_Index", path.file_name().unwrap().to_str().unwrap()); + let path_index = path.parent().unwrap().join(fn_index); + let f1 = tokio::fs::OpenOptions::new().read(true).open(path_index).await?; + let f2 = tokio::fs::OpenOptions::new().read(true).open(path).await?; + let (ck1, f1, f2) = read_first_index_chunk(f1, f2).await?; + if let Some(ck1) = ck1 { + if ck1.pulse == pulse { + return Ok(Some(ck1.ts)); + } + if ck1.pulse > pulse { + return Ok(None); + } + let (ck2, mut f1, mut f2) = read_last_index_chunk(f1, f2).await?; + if let Some(ck2) = ck2 { + if ck2.pulse == pulse { + return Ok(Some(ck2.ts)); + } + if ck2.pulse < pulse { + return Ok(None); + } + let index_entry_len = 16; + let chunk_len = index_entry_len; + let mut p1 = ck1.pos_index; + let mut p2 = ck2.pos_index; + loop { + let d = p2 - p1; + if 0 != d % index_entry_len { + return Err(Error::with_msg_no_trace(format!("search_pulse "))); + } + if d <= chunk_len { + return Ok(None); + } + let m = p1 + d / chunk_len / 2 * chunk_len; + let (z, f1b, f2b) = async { + f1.seek(SeekFrom::Start(m)).await?; + let mut buf3 = [0; 16]; + f1.read_exact(&mut buf3).await?; + let ts = u64::from_be_bytes(buf3[0..8].try_into()?); + let pos_data = u64::from_be_bytes(buf3[8..16].try_into()?); + trace!("search loop in index m {m} ts {ts} pos_data {pos_data}"); + let (chunk, f2) = read_chunk_at(f2, pos_data, None).await?; + trace!("search loop read_chunk_at successful: {chunk:?}"); + let ret = IndexChunkInfo { + pos_index: p2, + pos_data: chunk.pos, + len: chunk.len, + ts: chunk.ts, + pulse: chunk.pulse, + }; + Ok::<_, Error>((ret, f1, f2)) + } + .await?; + f1 = f1b; + f2 = f2b; + if z.pulse == pulse { + return Ok(Some(z.ts)); + } else if z.pulse > pulse { + p2 = m; + } else { + p1 = m; + } + } + } else { + Ok(None) + } + } else { + Ok(None) + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct MapPulseQuery { pub backend: String, @@ -762,10 +848,11 @@ impl MapPulseLocalHttpFunction { if req.method() != Method::GET { return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); } - let urls = format!("{}", req.uri()); + let urls = req.uri().to_string(); let pulse: u64 = urls[MAP_PULSE_LOCAL_URL_PREFIX.len()..] .parse() .map_err(|_| Error::with_public_msg_no_trace(format!("can not understand pulse map url: {}", req.uri())))?; + let ts1 = Instant::now(); let conn = dbconn::create_connection(&node_config.node_config.cluster.database).await?; let sql = "select channel, hostname, timebin, split, ks from map_pulse_files where hostname = $1 and pulse_min <= $2 and (pulse_max >= $2 or closed = 0)"; let rows = conn.query(sql, &[&node_config.node.host, &(pulse as i64)]).await?; @@ -780,12 +867,56 @@ impl MapPulseLocalHttpFunction { (channel, hostname, timebin as u32, split as u32, ks as u32) }) .collect(); + trace!( + "database query took {}s", + Instant::now().duration_since(ts1).as_secs_f32() + ); //let mut msg = String::new(); //use std::fmt::Write; //write!(&mut msg, "cands: {:?}\n", cands)?; + let mut futs = FuturesUnordered::new(); + for (ch, hostname, tb, sp, ks) in cands { + futs.push(Self::search(pulse, ch, hostname, tb, sp, ks, node_config)); + } let mut tss = Vec::new(); let mut channels = Vec::new(); - for (ch, _, tb, sp, ks) in cands { + use futures_util::StreamExt; + while let Some(k) = futs.next().await { + match k { + Ok(item) => match item { + Some((ts, ch)) => { + tss.push(ts); + channels.push(ch); + } + None => {} + }, + Err(e) => { + error!("{e}"); + } + } + } + let ret = LocalMap { pulse, tss, channels }; + Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&ret)?))?) + } + + async fn search( + pulse: u64, + ch: String, + hostname: String, + tb: u32, + sp: u32, + ks: u32, + node_config: &NodeConfigCached, + ) -> Result, Error> { + trace!( + "search in ks {} sp {} tb {} host {} ch {}", + ks, + sp, + tb, + hostname, + ch + ); + if ks == 2 { match disk::paths::data_path_tb(ks, &ch, tb, 86400000, sp, &node_config.node) { Ok(path) => { //write!(&mut msg, "data_path_tb: {:?}\n", path)?; @@ -793,22 +924,51 @@ impl MapPulseLocalHttpFunction { Ok(ts) => { //write!(&mut msg, "SEARCH: {:?} for {}\n", ts, pulse)?; if let Some(ts) = ts { - tss.push(ts); - channels.push(ch); + info!("Found in ks {} sp {} tb {} ch {} ts {}", ks, sp, tb, ch, ts); + Ok(Some((ts, ch))) + } else { + Ok(None) } } Err(e) => { warn!("can not map pulse with {ch} {sp} {tb} {e}"); + return Err(e); } } } Err(e) => { warn!("can not get path to files {ch} {e}"); + return Err(e)?; } } + } else if ks == 3 { + match disk::paths::data_path_tb(ks, &ch, tb, 86400000, sp, &node_config.node) { + Ok(path) => { + //write!(&mut msg, "data_path_tb: {:?}\n", path)?; + match search_index_pulse(pulse, &path).await { + Ok(ts) => { + //write!(&mut msg, "SEARCH: {:?} for {}\n", ts, pulse)?; + if let Some(ts) = ts { + info!("Found in ks {} sp {} tb {} ch {} ts {}", ks, sp, tb, ch, ts); + Ok(Some((ts, ch))) + } else { + Ok(None) + } + } + Err(e) => { + warn!("can not map pulse with {ch} {sp} {tb} {e}"); + return Err(e); + } + } + } + Err(e) => { + warn!("can not get path to files {ch} {e}"); + return Err(e)?; + } + } + } else { + return Err(Error::with_msg_no_trace(format!("bad keyspace {ks}"))); } - let ret = LocalMap { pulse, tss, channels }; - Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&ret)?))?) } } @@ -843,7 +1003,10 @@ impl MapPulseHistoHttpFunction { pub async fn histo(pulse: u64, node_config: &NodeConfigCached) -> Result { let mut futs = FuturesOrdered::new(); for node in &node_config.node_config.cluster.nodes { - let s = format!("http://{}:{}/api/1/map/pulse/local/{}", node.host, node.port, pulse); + let s = format!( + "http://{}:{}{}{}", + node.host, node.port, MAP_PULSE_LOCAL_URL_PREFIX, pulse + ); let uri: Uri = s.parse()?; let fut = hyper::Client::new().get(uri); let fut = tokio::time::timeout(Duration::from_millis(1000), fut); @@ -928,6 +1091,7 @@ impl Api4MapPulseHttpFunction { if req.method() != Method::GET { return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); } + let ts1 = Instant::now(); info!("Api4MapPulseHttpFunction handle uri: {:?}", req.uri()); let url = Url::parse(&format!("dummy:{}", req.uri()))?; let q = MapPulseQuery::from_url(&url)?; @@ -940,6 +1104,14 @@ impl Api4MapPulseHttpFunction { i1 = i2; } } + let ts2 = Instant::now(); + info!( + "Api4MapPulseHttpFunction took {:.2}s", + ts2.duration_since(ts1).as_secs_f32() + ); + if histo.tss.len() > 1 { + warn!("Ambigious pulse map pulse {} histo {:?}", q.pulse, histo); + } if max > 0 { Ok(response(StatusCode::OK).body(Body::from(serde_json::to_vec(&histo.tss[i1])?))?) } else {