diff --git a/disk/src/dataopen.rs b/disk/src/dataopen.rs index 02644f4..33b4d56 100644 --- a/disk/src/dataopen.rs +++ b/disk/src/dataopen.rs @@ -78,6 +78,7 @@ async fn open_files_inner( debug!("opening path {:?}", &path); let mut file = OpenOptions::new().read(true).open(&path).await?; debug!("opened file {:?} {:?}", &path, &file); + let mut use_file = false; { let index_path = paths::index_path(ts_bin, &channel_config, &node)?; match OpenOptions::new().read(true).open(&index_path).await { @@ -112,23 +113,32 @@ async fn open_files_inner( Some(o) => { debug!("FOUND ts IN INDEX: {:?}", o); file.seek(SeekFrom::Start(o.1)).await?; + use_file = true; } None => { debug!("NOT FOUND IN INDEX"); - file.seek(SeekFrom::End(0)).await?; + use_file = false; } } } Err(e) => match e.kind() { ErrorKind::NotFound => { - file = super::index::position_file(file, range.beg).await?; + let res = super::index::position_static_len_datafile(file, range.beg).await?; + file = res.0; + if res.1 { + use_file = true; + } else { + use_file = false; + } } _ => Err(e)?, }, } } - let ret = OpenedFile { file, path }; - chtx.send(Ok(ret)).await?; + if use_file { + let ret = OpenedFile { file, path }; + chtx.send(Ok(ret)).await?; + } } // TODO keep track of number of running debug!("open_files_inner done"); diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index 82b5799..aab7874 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -6,6 +6,8 @@ use futures_core::Stream; use futures_util::StreamExt; use netpod::{ChannelConfig, NanoRange, Node}; use std::pin::Pin; +use std::sync::atomic::AtomicU64; +use std::sync::Arc; use std::task::{Context, Poll}; pub struct EventBlobsComplete { @@ -17,6 +19,7 @@ pub struct EventBlobsComplete { range: NanoRange, errored: bool, completed: bool, + max_ts: Arc, } impl EventBlobsComplete { @@ -36,6 +39,7 @@ impl EventBlobsComplete { range, errored: false, completed: false, + max_ts: Arc::new(AtomicU64::new(0)), } } } @@ -72,6 +76,7 @@ impl Stream for EventBlobsComplete { self.range.clone(), self.event_chunker_conf.clone(), path, + self.max_ts.clone(), ); self.evs = Some(chunker); continue 'outer; diff --git a/disk/src/eventchunker.rs b/disk/src/eventchunker.rs index e73533f..3e34ed7 100644 --- a/disk/src/eventchunker.rs +++ b/disk/src/eventchunker.rs @@ -9,6 +9,8 @@ use netpod::timeunits::SEC; use netpod::{ByteSize, ChannelConfig, EventDataReadStats, NanoRange, ScalarType, Shape}; use std::path::PathBuf; use std::pin::Pin; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; use std::task::{Context, Poll}; pub struct EventChunker { @@ -26,6 +28,7 @@ pub struct EventChunker { final_stats_sent: bool, parsed_bytes: u64, path: PathBuf, + max_ts: Arc, } enum DataFileState { @@ -56,6 +59,7 @@ impl EventChunker { range: NanoRange, stats_conf: EventChunkerConf, path: PathBuf, + max_ts: Arc, ) -> Self { let mut inp = NeedMinBuffer::new(inp); inp.set_need_min(6); @@ -74,6 +78,7 @@ impl EventChunker { final_stats_sent: false, parsed_bytes: 0, path, + max_ts, } } @@ -83,8 +88,9 @@ impl EventChunker { range: NanoRange, stats_conf: EventChunkerConf, path: PathBuf, + max_ts: Arc, ) -> Self { - let mut ret = Self::from_start(inp, channel_config, range, stats_conf, path); + let mut ret = Self::from_start(inp, channel_config, range, stats_conf, path, max_ts); ret.state = DataFileState::Event; ret.need_min = 4; ret.inp.set_need_min(4); @@ -151,6 +157,19 @@ impl EventChunker { let _ttl = sl.read_i64::().unwrap(); let ts = sl.read_i64::().unwrap() as u64; let pulse = sl.read_i64::().unwrap() as u64; + let max_ts = self.max_ts.load(Ordering::SeqCst); + if ts < max_ts { + Err(Error::with_msg(format!( + "unordered event ts: {}.{} max_ts {}.{} config {:?} path {:?}", + ts / SEC, + ts % SEC, + max_ts / SEC, + max_ts % SEC, + self.channel_config.shape, + self.path + )))?; + } + self.max_ts.store(ts, Ordering::SeqCst); if ts >= self.range.end { self.seen_beyond_range = true; self.data_emit_complete = true; diff --git a/disk/src/index.rs b/disk/src/index.rs index db37d05..65e9ca4 100644 --- a/disk/src/index.rs +++ b/disk/src/index.rs @@ -140,19 +140,11 @@ pub async fn read_event_at(pos: u64, file: &mut File) -> Result<(u32, Nanos), Er Ok(ev) } -pub async fn position_file(mut file: File, beg: u64) -> Result { - // Read first chunk which should include channel name packet, and a first event. - // It can be that file is empty. - // It can be that there is a a channel header but zero events. +pub async fn position_static_len_datafile(mut file: File, beg: u64) -> Result<(File, bool), Error> { let flen = file.seek(SeekFrom::End(0)).await?; file.seek(SeekFrom::Start(0)).await?; let mut buf = vec![0; 1024]; - let n1 = read(&mut buf, &mut file).await?; - if n1 < buf.len() { - // file has less content than our buffer - } else { - // - } + let _n1 = read(&mut buf, &mut file).await?; let hres = parse_channel_header(&buf)?; let headoff = 2 + hres.0 as u64; let ev = parse_event(&buf[headoff as usize..])?; @@ -160,22 +152,36 @@ pub async fn position_file(mut file: File, beg: u64) -> Result { let mut j = headoff; let mut k = ((flen - headoff) / evlen - 1) * evlen + headoff; let x = ev.1.ns; - let y = read_event_at(k, &mut file).await?.1.ns; + let t = read_event_at(k, &mut file).await?; + if t.0 != evlen as u32 { + Err(Error::with_msg(format!( + "inconsistent event lengths: {} vs {}", + t.0, evlen + )))?; + } + let y = t.1.ns; if x >= beg { file.seek(SeekFrom::Start(j)).await?; - return Ok(file); + return Ok((file, true)); } if y < beg { file.seek(SeekFrom::Start(j)).await?; - return Ok(file); + return Ok((file, false)); } loop { if k - j < 2 * evlen { file.seek(SeekFrom::Start(k)).await?; - return Ok(file); + return Ok((file, true)); } let m = j + (k - j) / 2 / evlen * evlen; - let x = read_event_at(m, &mut file).await?.1.ns; + let t = read_event_at(m, &mut file).await?; + if t.0 != evlen as u32 { + Err(Error::with_msg(format!( + "inconsistent event lengths: {} vs {}", + t.0, evlen + )))?; + } + let x = t.1.ns; if x < beg { j = m; } else { diff --git a/disk/src/lib.rs b/disk/src/lib.rs index d465e20..44631ef 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -348,7 +348,8 @@ pub fn parsed1( Ok(file) => { let inp = Box::pin(file_content_stream(file.file, query.buffer_size as usize)); let range = err::todoval(); - let mut chunker = eventchunker::EventChunker::from_event_boundary(inp, err::todoval(), range, stats_conf.clone(), file.path); + let max_ts = err::todoval(); + let mut chunker = eventchunker::EventChunker::from_event_boundary(inp, err::todoval(), range, stats_conf.clone(), file.path, max_ts); while let Some(evres) = chunker.next().await { use eventchunker::EventChunkerItem; match evres { diff --git a/httpret/src/gather.rs b/httpret/src/gather.rs new file mode 100644 index 0000000..c55e9b7 --- /dev/null +++ b/httpret/src/gather.rs @@ -0,0 +1,101 @@ +use crate::response; +use err::Error; +use http::{Method, StatusCode}; +use hyper::{Body, Client, Request, Response}; +use serde::{Deserialize, Serialize}; +use serde_json::Value as JsonValue; + +#[derive(Clone, Serialize, Deserialize)] +struct GatherFrom { + hosts: Vec, +} + +#[derive(Clone, Serialize, Deserialize)] +struct GatherHost { + host: String, + port: u16, + inst: String, +} + +async fn process_answer(res: Response) -> Result { + let (pre, mut body) = res.into_parts(); + if pre.status != StatusCode::OK { + use hyper::body::HttpBody; + if let Some(c) = body.data().await { + let c: bytes::Bytes = c?; + let s1 = String::from_utf8(c.to_vec())?; + Ok(JsonValue::String(format!( + "status {} body {}", + pre.status.as_str(), + s1 + ))) + } else { + Ok(JsonValue::String(format!("status {}", pre.status.as_str()))) + } + } else { + let body: hyper::Body = body; + let body_all = hyper::body::to_bytes(body).await?; + let val = match serde_json::from_slice(&body_all) { + Ok(k) => k, + Err(_e) => JsonValue::String(String::from_utf8(body_all.to_vec())?), + }; + Ok::<_, Error>(val) + } +} + +pub async fn gather_json(req: Request, pathpre: &str) -> Result, Error> { + let (part_head, part_body) = req.into_parts(); + let bodyslice = hyper::body::to_bytes(part_body).await?; + let gather_from: GatherFrom = serde_json::from_slice(&bodyslice)?; + let mut spawned = vec![]; + let uri = part_head.uri; + let path_post = &uri.path()[pathpre.len()..]; + for gh in gather_from.hosts { + let uri = format!("http://{}:{}/{}", gh.host, gh.port, path_post); + let req = Request::builder().method(Method::GET).uri(uri); + let req = if gh.inst.len() > 0 { + req.header("retrieval_instance", &gh.inst) + } else { + req + }; + let req = req.header(http::header::ACCEPT, "application/json"); + let req = req.body(Body::empty()); + use futures_util::select; + use futures_util::FutureExt; + use std::time::Duration; + use tokio::time::sleep; + let task = tokio::spawn(async move { + select! { + _ = sleep(Duration::from_millis(1500)).fuse() => { + Err(Error::with_msg("timeout")) + } + res = Client::new().request(req?).fuse() => Ok(process_answer(res?).await?) + } + }); + spawned.push((gh.clone(), task)); + } + #[derive(Serialize)] + struct Hres { + gh: GatherHost, + res: JsonValue, + } + #[derive(Serialize)] + struct Jres { + hosts: Vec, + } + let mut a = vec![]; + for tr in spawned { + let res = match tr.1.await { + Ok(k) => match k { + Ok(k) => k, + Err(e) => JsonValue::String(format!("ERROR({:?})", e)), + }, + Err(e) => JsonValue::String(format!("ERROR({:?})", e)), + }; + a.push(Hres { gh: tr.0, res }); + } + let res = response(StatusCode::OK) + .header(http::header::CONTENT_TYPE, "application/json") + .body(serde_json::to_string(&Jres { hosts: a })?.into())?; + Ok(res) +} diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index c2ea70a..4ac3eb9 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -10,6 +10,7 @@ use http::{HeaderMap, Method, StatusCode}; use hyper::service::{make_service_fn, service_fn}; use hyper::{server::Server, Body, Request, Response}; use net::SocketAddr; +use netpod::log::*; use netpod::{ByteSize, Node, NodeConfigCached}; use panic::{AssertUnwindSafe, UnwindSafe}; use pin::Pin; @@ -17,8 +18,8 @@ use serde::{Deserialize, Serialize}; use std::{future, net, panic, pin, task}; use task::{Context, Poll}; use tracing::field::Empty; -#[allow(unused_imports)] -use tracing::{debug, error, info, span, trace, warn, Level}; + +pub mod gather; pub async fn host(node_config: NodeConfigCached) -> Result<(), Error> { let node_config = node_config.clone();