Do not use files where no matching files have been found
This commit is contained in:
@@ -78,6 +78,7 @@ async fn open_files_inner(
|
||||
debug!("opening path {:?}", &path);
|
||||
let mut file = OpenOptions::new().read(true).open(&path).await?;
|
||||
debug!("opened file {:?} {:?}", &path, &file);
|
||||
let mut use_file = false;
|
||||
{
|
||||
let index_path = paths::index_path(ts_bin, &channel_config, &node)?;
|
||||
match OpenOptions::new().read(true).open(&index_path).await {
|
||||
@@ -112,23 +113,32 @@ async fn open_files_inner(
|
||||
Some(o) => {
|
||||
debug!("FOUND ts IN INDEX: {:?}", o);
|
||||
file.seek(SeekFrom::Start(o.1)).await?;
|
||||
use_file = true;
|
||||
}
|
||||
None => {
|
||||
debug!("NOT FOUND IN INDEX");
|
||||
file.seek(SeekFrom::End(0)).await?;
|
||||
use_file = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => match e.kind() {
|
||||
ErrorKind::NotFound => {
|
||||
file = super::index::position_file(file, range.beg).await?;
|
||||
let res = super::index::position_static_len_datafile(file, range.beg).await?;
|
||||
file = res.0;
|
||||
if res.1 {
|
||||
use_file = true;
|
||||
} else {
|
||||
use_file = false;
|
||||
}
|
||||
}
|
||||
_ => Err(e)?,
|
||||
},
|
||||
}
|
||||
}
|
||||
let ret = OpenedFile { file, path };
|
||||
chtx.send(Ok(ret)).await?;
|
||||
if use_file {
|
||||
let ret = OpenedFile { file, path };
|
||||
chtx.send(Ok(ret)).await?;
|
||||
}
|
||||
}
|
||||
// TODO keep track of number of running
|
||||
debug!("open_files_inner done");
|
||||
|
||||
@@ -6,6 +6,8 @@ use futures_core::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use netpod::{ChannelConfig, NanoRange, Node};
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
pub struct EventBlobsComplete {
|
||||
@@ -17,6 +19,7 @@ pub struct EventBlobsComplete {
|
||||
range: NanoRange,
|
||||
errored: bool,
|
||||
completed: bool,
|
||||
max_ts: Arc<AtomicU64>,
|
||||
}
|
||||
|
||||
impl EventBlobsComplete {
|
||||
@@ -36,6 +39,7 @@ impl EventBlobsComplete {
|
||||
range,
|
||||
errored: false,
|
||||
completed: false,
|
||||
max_ts: Arc::new(AtomicU64::new(0)),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -72,6 +76,7 @@ impl Stream for EventBlobsComplete {
|
||||
self.range.clone(),
|
||||
self.event_chunker_conf.clone(),
|
||||
path,
|
||||
self.max_ts.clone(),
|
||||
);
|
||||
self.evs = Some(chunker);
|
||||
continue 'outer;
|
||||
|
||||
@@ -9,6 +9,8 @@ use netpod::timeunits::SEC;
|
||||
use netpod::{ByteSize, ChannelConfig, EventDataReadStats, NanoRange, ScalarType, Shape};
|
||||
use std::path::PathBuf;
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
pub struct EventChunker {
|
||||
@@ -26,6 +28,7 @@ pub struct EventChunker {
|
||||
final_stats_sent: bool,
|
||||
parsed_bytes: u64,
|
||||
path: PathBuf,
|
||||
max_ts: Arc<AtomicU64>,
|
||||
}
|
||||
|
||||
enum DataFileState {
|
||||
@@ -56,6 +59,7 @@ impl EventChunker {
|
||||
range: NanoRange,
|
||||
stats_conf: EventChunkerConf,
|
||||
path: PathBuf,
|
||||
max_ts: Arc<AtomicU64>,
|
||||
) -> Self {
|
||||
let mut inp = NeedMinBuffer::new(inp);
|
||||
inp.set_need_min(6);
|
||||
@@ -74,6 +78,7 @@ impl EventChunker {
|
||||
final_stats_sent: false,
|
||||
parsed_bytes: 0,
|
||||
path,
|
||||
max_ts,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -83,8 +88,9 @@ impl EventChunker {
|
||||
range: NanoRange,
|
||||
stats_conf: EventChunkerConf,
|
||||
path: PathBuf,
|
||||
max_ts: Arc<AtomicU64>,
|
||||
) -> Self {
|
||||
let mut ret = Self::from_start(inp, channel_config, range, stats_conf, path);
|
||||
let mut ret = Self::from_start(inp, channel_config, range, stats_conf, path, max_ts);
|
||||
ret.state = DataFileState::Event;
|
||||
ret.need_min = 4;
|
||||
ret.inp.set_need_min(4);
|
||||
@@ -151,6 +157,19 @@ impl EventChunker {
|
||||
let _ttl = sl.read_i64::<BE>().unwrap();
|
||||
let ts = sl.read_i64::<BE>().unwrap() as u64;
|
||||
let pulse = sl.read_i64::<BE>().unwrap() as u64;
|
||||
let max_ts = self.max_ts.load(Ordering::SeqCst);
|
||||
if ts < max_ts {
|
||||
Err(Error::with_msg(format!(
|
||||
"unordered event ts: {}.{} max_ts {}.{} config {:?} path {:?}",
|
||||
ts / SEC,
|
||||
ts % SEC,
|
||||
max_ts / SEC,
|
||||
max_ts % SEC,
|
||||
self.channel_config.shape,
|
||||
self.path
|
||||
)))?;
|
||||
}
|
||||
self.max_ts.store(ts, Ordering::SeqCst);
|
||||
if ts >= self.range.end {
|
||||
self.seen_beyond_range = true;
|
||||
self.data_emit_complete = true;
|
||||
|
||||
@@ -140,19 +140,11 @@ pub async fn read_event_at(pos: u64, file: &mut File) -> Result<(u32, Nanos), Er
|
||||
Ok(ev)
|
||||
}
|
||||
|
||||
pub async fn position_file(mut file: File, beg: u64) -> Result<File, Error> {
|
||||
// Read first chunk which should include channel name packet, and a first event.
|
||||
// It can be that file is empty.
|
||||
// It can be that there is a a channel header but zero events.
|
||||
pub async fn position_static_len_datafile(mut file: File, beg: u64) -> Result<(File, bool), Error> {
|
||||
let flen = file.seek(SeekFrom::End(0)).await?;
|
||||
file.seek(SeekFrom::Start(0)).await?;
|
||||
let mut buf = vec![0; 1024];
|
||||
let n1 = read(&mut buf, &mut file).await?;
|
||||
if n1 < buf.len() {
|
||||
// file has less content than our buffer
|
||||
} else {
|
||||
//
|
||||
}
|
||||
let _n1 = read(&mut buf, &mut file).await?;
|
||||
let hres = parse_channel_header(&buf)?;
|
||||
let headoff = 2 + hres.0 as u64;
|
||||
let ev = parse_event(&buf[headoff as usize..])?;
|
||||
@@ -160,22 +152,36 @@ pub async fn position_file(mut file: File, beg: u64) -> Result<File, Error> {
|
||||
let mut j = headoff;
|
||||
let mut k = ((flen - headoff) / evlen - 1) * evlen + headoff;
|
||||
let x = ev.1.ns;
|
||||
let y = read_event_at(k, &mut file).await?.1.ns;
|
||||
let t = read_event_at(k, &mut file).await?;
|
||||
if t.0 != evlen as u32 {
|
||||
Err(Error::with_msg(format!(
|
||||
"inconsistent event lengths: {} vs {}",
|
||||
t.0, evlen
|
||||
)))?;
|
||||
}
|
||||
let y = t.1.ns;
|
||||
if x >= beg {
|
||||
file.seek(SeekFrom::Start(j)).await?;
|
||||
return Ok(file);
|
||||
return Ok((file, true));
|
||||
}
|
||||
if y < beg {
|
||||
file.seek(SeekFrom::Start(j)).await?;
|
||||
return Ok(file);
|
||||
return Ok((file, false));
|
||||
}
|
||||
loop {
|
||||
if k - j < 2 * evlen {
|
||||
file.seek(SeekFrom::Start(k)).await?;
|
||||
return Ok(file);
|
||||
return Ok((file, true));
|
||||
}
|
||||
let m = j + (k - j) / 2 / evlen * evlen;
|
||||
let x = read_event_at(m, &mut file).await?.1.ns;
|
||||
let t = read_event_at(m, &mut file).await?;
|
||||
if t.0 != evlen as u32 {
|
||||
Err(Error::with_msg(format!(
|
||||
"inconsistent event lengths: {} vs {}",
|
||||
t.0, evlen
|
||||
)))?;
|
||||
}
|
||||
let x = t.1.ns;
|
||||
if x < beg {
|
||||
j = m;
|
||||
} else {
|
||||
|
||||
@@ -348,7 +348,8 @@ pub fn parsed1(
|
||||
Ok(file) => {
|
||||
let inp = Box::pin(file_content_stream(file.file, query.buffer_size as usize));
|
||||
let range = err::todoval();
|
||||
let mut chunker = eventchunker::EventChunker::from_event_boundary(inp, err::todoval(), range, stats_conf.clone(), file.path);
|
||||
let max_ts = err::todoval();
|
||||
let mut chunker = eventchunker::EventChunker::from_event_boundary(inp, err::todoval(), range, stats_conf.clone(), file.path, max_ts);
|
||||
while let Some(evres) = chunker.next().await {
|
||||
use eventchunker::EventChunkerItem;
|
||||
match evres {
|
||||
|
||||
101
httpret/src/gather.rs
Normal file
101
httpret/src/gather.rs
Normal file
@@ -0,0 +1,101 @@
|
||||
use crate::response;
|
||||
use err::Error;
|
||||
use http::{Method, StatusCode};
|
||||
use hyper::{Body, Client, Request, Response};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value as JsonValue;
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize)]
|
||||
struct GatherFrom {
|
||||
hosts: Vec<GatherHost>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize)]
|
||||
struct GatherHost {
|
||||
host: String,
|
||||
port: u16,
|
||||
inst: String,
|
||||
}
|
||||
|
||||
async fn process_answer(res: Response<Body>) -> Result<JsonValue, Error> {
|
||||
let (pre, mut body) = res.into_parts();
|
||||
if pre.status != StatusCode::OK {
|
||||
use hyper::body::HttpBody;
|
||||
if let Some(c) = body.data().await {
|
||||
let c: bytes::Bytes = c?;
|
||||
let s1 = String::from_utf8(c.to_vec())?;
|
||||
Ok(JsonValue::String(format!(
|
||||
"status {} body {}",
|
||||
pre.status.as_str(),
|
||||
s1
|
||||
)))
|
||||
} else {
|
||||
Ok(JsonValue::String(format!("status {}", pre.status.as_str())))
|
||||
}
|
||||
} else {
|
||||
let body: hyper::Body = body;
|
||||
let body_all = hyper::body::to_bytes(body).await?;
|
||||
let val = match serde_json::from_slice(&body_all) {
|
||||
Ok(k) => k,
|
||||
Err(_e) => JsonValue::String(String::from_utf8(body_all.to_vec())?),
|
||||
};
|
||||
Ok::<_, Error>(val)
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn gather_json(req: Request<Body>, pathpre: &str) -> Result<Response<Body>, Error> {
|
||||
let (part_head, part_body) = req.into_parts();
|
||||
let bodyslice = hyper::body::to_bytes(part_body).await?;
|
||||
let gather_from: GatherFrom = serde_json::from_slice(&bodyslice)?;
|
||||
let mut spawned = vec![];
|
||||
let uri = part_head.uri;
|
||||
let path_post = &uri.path()[pathpre.len()..];
|
||||
for gh in gather_from.hosts {
|
||||
let uri = format!("http://{}:{}/{}", gh.host, gh.port, path_post);
|
||||
let req = Request::builder().method(Method::GET).uri(uri);
|
||||
let req = if gh.inst.len() > 0 {
|
||||
req.header("retrieval_instance", &gh.inst)
|
||||
} else {
|
||||
req
|
||||
};
|
||||
let req = req.header(http::header::ACCEPT, "application/json");
|
||||
let req = req.body(Body::empty());
|
||||
use futures_util::select;
|
||||
use futures_util::FutureExt;
|
||||
use std::time::Duration;
|
||||
use tokio::time::sleep;
|
||||
let task = tokio::spawn(async move {
|
||||
select! {
|
||||
_ = sleep(Duration::from_millis(1500)).fuse() => {
|
||||
Err(Error::with_msg("timeout"))
|
||||
}
|
||||
res = Client::new().request(req?).fuse() => Ok(process_answer(res?).await?)
|
||||
}
|
||||
});
|
||||
spawned.push((gh.clone(), task));
|
||||
}
|
||||
#[derive(Serialize)]
|
||||
struct Hres {
|
||||
gh: GatherHost,
|
||||
res: JsonValue,
|
||||
}
|
||||
#[derive(Serialize)]
|
||||
struct Jres {
|
||||
hosts: Vec<Hres>,
|
||||
}
|
||||
let mut a = vec![];
|
||||
for tr in spawned {
|
||||
let res = match tr.1.await {
|
||||
Ok(k) => match k {
|
||||
Ok(k) => k,
|
||||
Err(e) => JsonValue::String(format!("ERROR({:?})", e)),
|
||||
},
|
||||
Err(e) => JsonValue::String(format!("ERROR({:?})", e)),
|
||||
};
|
||||
a.push(Hres { gh: tr.0, res });
|
||||
}
|
||||
let res = response(StatusCode::OK)
|
||||
.header(http::header::CONTENT_TYPE, "application/json")
|
||||
.body(serde_json::to_string(&Jres { hosts: a })?.into())?;
|
||||
Ok(res)
|
||||
}
|
||||
@@ -10,6 +10,7 @@ use http::{HeaderMap, Method, StatusCode};
|
||||
use hyper::service::{make_service_fn, service_fn};
|
||||
use hyper::{server::Server, Body, Request, Response};
|
||||
use net::SocketAddr;
|
||||
use netpod::log::*;
|
||||
use netpod::{ByteSize, Node, NodeConfigCached};
|
||||
use panic::{AssertUnwindSafe, UnwindSafe};
|
||||
use pin::Pin;
|
||||
@@ -17,8 +18,8 @@ use serde::{Deserialize, Serialize};
|
||||
use std::{future, net, panic, pin, task};
|
||||
use task::{Context, Poll};
|
||||
use tracing::field::Empty;
|
||||
#[allow(unused_imports)]
|
||||
use tracing::{debug, error, info, span, trace, warn, Level};
|
||||
|
||||
pub mod gather;
|
||||
|
||||
pub async fn host(node_config: NodeConfigCached) -> Result<(), Error> {
|
||||
let node_config = node_config.clone();
|
||||
|
||||
Reference in New Issue
Block a user