From d67608fabc14c43feccec451605d9ced2ff5b697 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 2 Mar 2022 21:40:03 +0100 Subject: [PATCH] WIP Read3 --- disk/Cargo.toml | 1 + disk/src/disk.rs | 130 +++++++++++++++++++++++++++++++++- disk/src/eventblobs.rs | 23 ++++++- disk/src/raw/conn.rs | 4 ++ disk/src/readat.rs | 153 +++++++++++++++++++++++++++++++++++++++++ err/src/lib.rs | 19 +++++ httpret/src/api1.rs | 94 +++++++++++++++---------- httpret/src/httpret.rs | 42 ++++++++++- httpret/src/proxy.rs | 8 +-- 9 files changed, 426 insertions(+), 48 deletions(-) create mode 100644 disk/src/readat.rs diff --git a/disk/Cargo.toml b/disk/Cargo.toml index 712a3d5..34493bb 100644 --- a/disk/Cargo.toml +++ b/disk/Cargo.toml @@ -17,6 +17,7 @@ tokio = { version = "1.11.0", features = ["rt-multi-thread", "io-util", "net", " tokio-stream = {version = "0.1.5", features = ["fs"]} hyper = { version = "0.14", features = ["http1", "http2", "client", "server", "tcp", "stream"] } async-channel = "1.6" +crossbeam = "0.8" bytes = "1.0.1" crc32fast = "1.2.1" arrayref = "0.3.6" diff --git a/disk/src/disk.rs b/disk/src/disk.rs index 9569711..2510ef0 100644 --- a/disk/src/disk.rs +++ b/disk/src/disk.rs @@ -3,18 +3,22 @@ use bytes::{Bytes, BytesMut}; use err::Error; use futures_core::Stream; use futures_util::future::FusedFuture; -use futures_util::StreamExt; +use futures_util::{FutureExt, StreamExt, TryFutureExt}; use netpod::histo::HistoLog2; use netpod::{log::*, FileIoBufferSize}; use netpod::{ChannelConfig, Node, Shape}; +use readat::ReadResult; +use std::collections::VecDeque; use std::future::Future; +use std::io::SeekFrom; +use std::os::unix::prelude::AsRawFd; use std::path::PathBuf; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; use std::{fmt, mem}; use tokio::fs::{File, OpenOptions}; -use tokio::io::{AsyncRead, ReadBuf}; +use tokio::io::{AsyncRead, AsyncSeekExt, ReadBuf}; pub mod agg; #[cfg(test)] @@ -34,6 +38,7 @@ pub mod index; pub mod merge; pub mod paths; pub mod raw; +pub mod readat; pub mod streamlog; // TODO transform this into a self-test or remove. @@ -269,6 +274,127 @@ pub fn file_content_stream( FileContentStream::new(file, file_io_buffer_size) } +enum FCS2 { + GetPosition, + Reading, +} + +enum ReadStep { + Fut(Pin> + Send>>), + Res(Result), +} + +pub struct FileContentStream2 { + fcs2: FCS2, + file: Pin>, + file_pos: u64, + file_io_buffer_size: FileIoBufferSize, + get_position_fut: Pin> + Send>>, + reads: VecDeque, + nlog: usize, + done: bool, + complete: bool, +} + +impl FileContentStream2 { + pub fn new(file: File, file_io_buffer_size: FileIoBufferSize) -> Self { + let mut file = Box::pin(file); + let ffr = unsafe { + let ffr = Pin::get_unchecked_mut(file.as_mut()); + std::mem::transmute::<&mut File, &'static mut File>(ffr) + }; + let ff = ffr + .seek(SeekFrom::Current(0)) + .map_err(|e| Error::with_msg_no_trace(format!("Seek error"))); + Self { + fcs2: FCS2::GetPosition, + file, + file_pos: 0, + file_io_buffer_size, + get_position_fut: Box::pin(ff), + reads: VecDeque::new(), + nlog: 0, + done: false, + complete: false, + } + } +} + +impl Stream for FileContentStream2 { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + loop { + break if self.complete { + panic!("poll_next on complete") + } else if self.done { + self.complete = true; + Ready(None) + } else { + match self.fcs2 { + FCS2::GetPosition => match self.get_position_fut.poll_unpin(cx) { + Ready(Ok(k)) => { + self.file_pos = k; + continue; + } + Ready(Err(e)) => { + self.done = true; + Ready(Some(Err(e))) + } + Pending => Pending, + }, + FCS2::Reading => { + // TODO Keep the read queue full. + // TODO Do not add more reads when EOF is encountered. + while self.reads.len() < 4 { + let count = self.file_io_buffer_size.bytes() as u64; + let r3 = readat::Read3::get(); + let x = r3.read(self.file.as_raw_fd(), self.file_pos, count); + self.reads.push_back(ReadStep::Fut(Box::pin(x))); + self.file_pos += count; + } + // TODO must poll all futures to make progress... but if they resolve, must poll no more! + // therefore, need some enum type for the pending futures list to also store the resolved ones. + for e in &mut self.reads { + match e { + ReadStep::Fut(k) => match k.poll_unpin(cx) { + Ready(k) => { + *e = ReadStep::Res(k); + } + Pending => {} + }, + ReadStep::Res(_k) => {} + } + } + // TODO Check the front if something is ready. + if let Some(ReadStep::Res(_)) = self.reads.front() { + if let Some(ReadStep::Res(res)) = self.reads.pop_front() { + // TODO check for error or return the read data. + // TODO if read data contains EOF flag, raise EOF flag also in self, + // and abort. + // TODO make sure that everything runs stable even if this Stream is simply dropped + // or read results are not waited for and channels or oneshots get dropped. + } else { + // TODO return error, this should never happen because we check before. + } + } + // TODO handle case that self.reads is empty. + todo!() + } + } + }; + } + } +} + +pub fn file_content_stream_2( + file: File, + file_io_buffer_size: FileIoBufferSize, +) -> impl Stream> + Send { + FileContentStream2::new(file, file_io_buffer_size) +} + pub struct NeedMinBuffer { inp: Pin> + Send>>, need_min: u32, diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index fd39e93..8523650 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -32,6 +32,7 @@ pub struct EventChunkerMultifile { expand: bool, do_decompress: bool, max_ts: u64, + emit_count: usize, } impl EventChunkerMultifile { @@ -65,6 +66,7 @@ impl EventChunkerMultifile { expand, do_decompress, max_ts: 0, + emit_count: 0, } } } @@ -106,10 +108,25 @@ impl Stream for EventChunkerMultifile { Ready(Some(Err(e))) } else { self.max_ts = g; - if true { - info!("EventChunkerMultifile emit {} events", h.tss.len()); + const EMIT_COUNT_MAX: usize = 10; + if self.emit_count < EMIT_COUNT_MAX { + info!( + "EventChunkerMultifile emit {}/{} events {}", + self.emit_count, + EMIT_COUNT_MAX, + h.tss.len() + ); + self.emit_count += 1; + Ready(Some(k)) + } else if (self.range.beg % 1000000) / 1000 == 666 { + // TODO move this test feature into some other query parameter. + warn!("GENERATE ERROR FOR TESTING PURPOSE"); + let e = Error::with_msg(format!("Private-error-message")); + let e = e.add_public_msg(format!("Public-error-message")); + Ready(Some(Err(e))) + } else { + Ready(Some(k)) } - Ready(Some(k)) } } else { Ready(Some(k)) diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index 98b820b..fb39e19 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -238,6 +238,10 @@ pub fn make_local_event_blobs_stream( file_io_buffer_size: FileIoBufferSize, node_config: &NodeConfigCached, ) -> Result { + info!("make_local_event_blobs_stream do_decompress {do_decompress} file_io_buffer_size {file_io_buffer_size:?}"); + if do_decompress { + warn!("Possible issue: decompress central storage event blob stream"); + } let shape = match entry.to_shape() { Ok(k) => k, Err(e) => return Err(e)?, diff --git a/disk/src/readat.rs b/disk/src/readat.rs new file mode 100644 index 0000000..6570cc3 --- /dev/null +++ b/disk/src/readat.rs @@ -0,0 +1,153 @@ +use bytes::BytesMut; +use err::Error; +use netpod::log::*; +use std::os::unix::prelude::RawFd; +use std::sync::atomic::{AtomicPtr, Ordering}; +use std::sync::Once; +use tokio::sync::{mpsc, oneshot}; + +pub struct ReadTask { + fd: RawFd, + pos: u64, + count: u64, + rescell: oneshot::Sender>, +} + +pub struct ReadResult { + pub buf: BytesMut, + pub eof: bool, +} + +/* +Async code must be able to interact with the Read3 system via async methods. +The async code must be able to enqueue a read in non-blocking fashion. +Since the queue of pending read requests must be bounded, this must be able to async-block. +*/ +pub struct Read3 { + jobs_tx: mpsc::Sender, + rtx: crossbeam::channel::Sender>, +} + +impl Read3 { + pub fn get() -> &'static Self { + static INIT: Once = Once::new(); + INIT.call_once(|| { + let (jtx, jrx) = mpsc::channel(32); + let (rtx, rrx) = crossbeam::channel::bounded(16); + let read3 = Read3 { jobs_tx: jtx, rtx }; + let b = Box::new(read3); + let ptr = Box::into_raw(b); + READ3.store(ptr, Ordering::SeqCst); + let ptr = READ3.load(Ordering::SeqCst); + let h = unsafe { &*ptr }; + if let Err(_) = h.rtx.send(jrx) { + error!("Read3 INIT: can not enqueue main job reader"); + } + for _ in 0..2 { + let rrx = rrx.clone(); + tokio::task::spawn_blocking(move || h.read_worker(rrx)); + } + }); + let ptr = READ3.load(Ordering::SeqCst); + unsafe { &*ptr } + } + + pub async fn read(&self, fd: RawFd, pos: u64, count: u64) -> Result { + let (tx, rx) = oneshot::channel(); + let rt = ReadTask { + fd, + pos, + count, + rescell: tx, + }; + match self.jobs_tx.send(rt).await { + Ok(_) => match rx.await { + Ok(res) => res, + Err(e) => Err(Error::with_msg(format!("can not receive read task result: {e}"))), + }, + Err(e) => Err(Error::with_msg(format!("can not send read job task: {e}"))), + } + } + + fn read_worker(&self, rrx: crossbeam::channel::Receiver>) { + 'outer: loop { + match rrx.recv() { + Ok(mut jrx) => match jrx.blocking_recv() { + Some(rt) => match self.rtx.send(jrx) { + Ok(_) => { + let mut buf = BytesMut::with_capacity(rt.count as usize); + let mut writable = rt.count as usize; + let rr = unsafe { + loop { + info!("do pread fd {} count {} offset {}", rt.fd, writable, rt.pos); + let ec = libc::pread(rt.fd, buf.as_mut_ptr() as _, writable, rt.pos as i64); + if ec == -1 { + let errno = *libc::__errno_location(); + if errno == libc::EINVAL { + info!("pread EOF fd {} count {} offset {}", rt.fd, writable, rt.pos); + let rr = ReadResult { buf, eof: true }; + break Ok(rr); + } else { + warn!( + "pread ERROR errno {} fd {} count {} offset {}", + errno, rt.fd, writable, rt.pos + ); + // TODO use a more structured error + let e = Error::with_msg_no_trace(format!( + "pread ERROR errno {} fd {} count {} offset {}", + errno, rt.fd, writable, rt.pos + )); + break Err(e); + } + } else if ec == 0 { + info!("pread EOF fd {} count {} offset {}", rt.fd, writable, rt.pos); + let rr = ReadResult { buf, eof: true }; + break Ok(rr); + } else if ec > 0 { + buf.set_len(ec as usize); + if ec as usize > writable { + error!( + "pread TOOLARGE ec {} fd {} count {} offset {}", + ec, rt.fd, writable, rt.pos + ); + break 'outer; + } + writable -= ec as usize; + if writable == 0 { + let rr = ReadResult { buf, eof: false }; + break Ok(rr); + } + } else { + error!( + "pread UNEXPECTED ec {} fd {} count {} offset {}", + ec, rt.fd, writable, rt.pos + ); + break 'outer; + } + } + }; + match rt.rescell.send(rr) { + Ok(_) => {} + Err(_) => { + error!("can not publish the read result"); + break 'outer; + } + } + } + Err(e) => { + error!("can not return the job receiver: {e}"); + break 'outer; + } + }, + None => break 'outer, + }, + Err(e) => { + error!("read_worker sees: {e}"); + break 'outer; + } + } + } + } +} + +static READ3: AtomicPtr = AtomicPtr::new(std::ptr::null_mut()); diff --git a/err/src/lib.rs b/err/src/lib.rs index 6c8f1d9..4a43ce0 100644 --- a/err/src/lib.rs +++ b/err/src/lib.rs @@ -364,6 +364,25 @@ impl PublicError { } } +// TODO make this more useful +impl From for PublicError { + fn from(k: Error) -> Self { + Self { + reason: k.reason(), + msg: k.msg().into(), + } + } +} + +impl From<&Error> for PublicError { + fn from(k: &Error) -> Self { + Self { + reason: k.reason(), + msg: k.msg().into(), + } + } +} + pub fn todo() { todo!("TODO"); } diff --git a/httpret/src/api1.rs b/httpret/src/api1.rs index f40ba36..9675d19 100644 --- a/httpret/src/api1.rs +++ b/httpret/src/api1.rs @@ -464,31 +464,6 @@ async fn process_answer(res: Response) -> Result { } } -pub async fn proxy_distribute_v1(req: Request) -> Result, Error> { - let (mut sink, body) = Body::channel(); - let uri = format!("http://sf-daqbuf-33:8371{}", req.uri().path()); - let res = Response::builder().status(StatusCode::OK).body(body)?; - tokio::spawn(async move { - let req = Request::builder().method(Method::GET).uri(uri).body(Body::empty())?; - let res = Client::new().request(req).await?; - if res.status() == StatusCode::OK { - let (_heads, mut body) = res.into_parts(); - loop { - use hyper::body::HttpBody; - let chunk = body.data().await; - if let Some(k) = chunk { - let k = k?; - sink.send_data(k).await?; - } else { - break; - } - } - } - Ok::<_, Error>(()) - }); - Ok(res) -} - #[derive(Debug, Serialize, Deserialize)] pub struct Api1Range { #[serde(rename = "startDate")] @@ -696,12 +671,15 @@ impl Stream for DataApiPython3DataStream { self.data_done = true; let mut sb = crate::status_board().unwrap(); sb.add_error(&self.status_id, e); - // TODO format as python data api error frame: - //let mut buf = BytesMut::with_capacity(1024); - //buf.put_slice("".as_bytes()); - //Ready(Some(Ok(buf))) - self.data_done = true; - Ready(None) + if false { + // TODO format as python data api error frame: + let mut buf = BytesMut::with_capacity(1024); + buf.put_slice("".as_bytes()); + Ready(Some(Ok(buf))) + } else { + self.data_done = true; + Ready(None) + } } }, None => { @@ -740,7 +718,7 @@ impl Stream for DataApiPython3DataStream { let perf_opts = PerfOpts { inmem_bufcap: 1024 * 4 }; // TODO is this a good to place decide this? let s = if self.node_config.node_config.cluster.is_central_storage { - debug!("Set up central storage stream"); + info!("Set up central storage stream"); // TODO pull up this config let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); let s = disk::raw::conn::make_local_event_blobs_stream( @@ -755,6 +733,11 @@ impl Stream for DataApiPython3DataStream { )?; Box::pin(s) as Pin> + Send>> } else { + if let Some(sh) = &entry.shape { + if sh.len() > 1 { + warn!("Remote stream fetch for shape {sh:?}"); + } + } debug!("Set up merged remote stream"); let s = disk::merge::mergedblobsfromremotes::MergedBlobsFromRemotes::new( evq, @@ -859,13 +842,13 @@ impl Api1EventsBinaryHandler { if req.method() != Method::POST { return Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?); } - let accept = req - .headers() + let (head, body) = req.into_parts(); + let accept = head + .headers .get(http::header::ACCEPT) .map_or(Ok(ACCEPT_ALL), |k| k.to_str()) .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))? .to_owned(); - let (_head, body) = req.into_parts(); let body_data = hyper::body::to_bytes(body).await?; let qu: Api1Query = if let Ok(qu) = serde_json::from_slice(&body_data) { qu @@ -926,3 +909,44 @@ impl Api1EventsBinaryHandler { Ok(ret) } } + +pub struct RequestStatusHandler {} + +impl RequestStatusHandler { + pub fn path_prefix() -> &'static str { + "/api/1/requestStatus/" + } + + pub fn handler(req: &Request) -> Option { + if req.uri().path().starts_with(Self::path_prefix()) { + Some(Self {}) + } else { + None + } + } + + pub async fn handle(&self, req: Request, _node_config: &NodeConfigCached) -> Result, Error> { + let (head, body) = req.into_parts(); + if head.method != Method::GET { + return Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?); + } + let accept = head + .headers + .get(http::header::ACCEPT) + .map_or(Ok(ACCEPT_ALL), |k| k.to_str()) + .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))? + .to_owned(); + if accept != APP_JSON && accept != ACCEPT_ALL { + // TODO set the public error code and message and return Err(e). + let e = Error::with_public_msg(format!("Unsupported Accept: {:?}", accept)); + error!("{e:?}"); + return Ok(response(StatusCode::NOT_ACCEPTABLE).body(Body::empty())?); + } + let _body_data = hyper::body::to_bytes(body).await?; + let status_id = &head.uri.path()[Self::path_prefix().len()..]; + info!("RequestStatusHandler status_id {:?}", status_id); + let s = crate::status_board()?.status_as_json(status_id); + let ret = response(StatusCode::OK).body(Body::from(s))?; + Ok(ret) + } +} diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index 88cb295..2e3dd57 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -211,6 +211,8 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } + } else if let Some(h) = StatusBoardAllHandler::handler(&req) { + h.handle(req, &node_config).await } else if path == "/api/4/search/channel" { if req.method() == Method::GET { Ok(search::channel_search(req, &node_config).await?) @@ -331,8 +333,8 @@ async fn http_service_try(req: Request, node_config: &NodeConfigCached) -> h.handle(req, &node_config).await } else if let Some(h) = channelarchiver::BlockStream::handler(path) { h.handle(req, &node_config).await - } else if path.starts_with("/api/1/requestStatus/") { - Ok(response(StatusCode::OK).body(Body::from("{}"))?) + } else if let Some(h) = api1::RequestStatusHandler::handler(&req) { + h.handle(req, &node_config).await } else if path.starts_with("/api/1/documentation/") { if req.method() == Method::GET { api_1_docs(path) @@ -766,8 +768,11 @@ pub struct StatusBoardEntry { ts_created: SystemTime, #[serde(serialize_with = "instant_serde::ser")] ts_updated: SystemTime, + #[serde(skip_serializing_if = "items::bool_is_false")] is_error: bool, + #[serde(skip_serializing_if = "items::bool_is_false")] is_ok: bool, + #[serde(skip_serializing_if = "Vec::is_empty")] errors: Vec, } @@ -777,7 +782,7 @@ mod instant_serde { pub fn ser(x: &SystemTime, ser: S) -> Result { let dur = x.duration_since(std::time::UNIX_EPOCH).unwrap(); let dt = chrono::TimeZone::timestamp(&chrono::Utc, dur.as_secs() as i64, dur.subsec_nanos()); - let s = dt.format("%Y-%m-%d %H:%M:%S").to_string(); + let s = dt.format("%Y-%m-%dT%H:%M:%S%.3f").to_string(); ser.serialize_str(&s) } } @@ -815,6 +820,7 @@ impl StatusBoard { f.read_exact(&mut buf).unwrap(); let n = u64::from_le_bytes(buf); let s = format!("{:016x}", n); + info!("new_status_id {s}"); self.entries.insert(s.clone(), StatusBoardEntry::new()); s } @@ -868,6 +874,36 @@ impl StatusBoard { } } } + + pub fn status_as_json(&self, status_id: &str) -> String { + #[derive(Serialize)] + struct StatJs { + #[serde(skip_serializing_if = "Vec::is_empty")] + errors: Vec<::err::PublicError>, + } + match self.entries.get(status_id) { + Some(e) => { + if e.is_ok { + let js = StatJs { errors: vec![] }; + return serde_json::to_string(&js).unwrap(); + } else if e.is_error { + let errors = e.errors.iter().map(|e| (&e.0).into()).collect(); + let js = StatJs { errors }; + return serde_json::to_string(&js).unwrap(); + } else { + warn!("requestStatus for unfinished {status_id}"); + let js = StatJs { errors: vec![] }; + return serde_json::to_string(&js).unwrap(); + } + } + None => { + error!("can not find status id {}", status_id); + let e = ::err::Error::with_public_msg_no_trace(format!("Request status ID unknown {status_id}")); + let js = StatJs { errors: vec![e.into()] }; + return serde_json::to_string(&js).unwrap(); + } + } + } } static STATUS_BOARD: AtomicPtr> = AtomicPtr::new(std::ptr::null_mut()); diff --git a/httpret/src/proxy.rs b/httpret/src/proxy.rs index 273e099..602906b 100644 --- a/httpret/src/proxy.rs +++ b/httpret/src/proxy.rs @@ -1,6 +1,6 @@ pub mod api4; -use crate::api1::{channel_search_configs_v1, channel_search_list_v1, gather_json_2_v1, proxy_distribute_v1}; +use crate::api1::{channel_search_configs_v1, channel_search_list_v1, gather_json_2_v1}; use crate::err::Error; use crate::gather::{gather_get_json_generic, SubRes}; use crate::pulsemap::MapPulseQuery; @@ -13,11 +13,11 @@ use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Request, Response, Server}; use hyper_tls::HttpsConnector; use itertools::Itertools; +use netpod::log::*; use netpod::query::BinnedQuery; -use netpod::{log::*, ACCEPT_ALL}; use netpod::{ AppendToUrl, ChannelConfigQuery, ChannelSearchQuery, ChannelSearchResult, ChannelSearchSingleResult, FromUrl, - HasBackend, HasTimeout, ProxyConfig, APP_JSON, + HasBackend, HasTimeout, ProxyConfig, ACCEPT_ALL, APP_JSON, }; use serde::{Deserialize, Serialize}; use serde_json::Value as JsonValue; @@ -115,8 +115,6 @@ async fn proxy_http_service_try(req: Request, proxy_config: &ProxyConfig) Ok(proxy_single_backend_query::(req, proxy_config).await?) } else if path == "/api/4/channel/config" { Ok(proxy_single_backend_query::(req, proxy_config).await?) - } else if path.starts_with("/distribute") { - proxy_distribute_v1(req).await } else if path.starts_with("/api/1/documentation/") { if req.method() == Method::GET { api_1_docs(path)