diff --git a/crates/disk/src/cache.rs b/crates/disk/src/cache.rs deleted file mode 100644 index 1461c42..0000000 --- a/crates/disk/src/cache.rs +++ /dev/null @@ -1,243 +0,0 @@ -use chrono::Utc; -use err::Error; -use netpod::log::*; -use netpod::AggKind; -use netpod::Cluster; -use netpod::NodeConfigCached; -use netpod::PreBinnedPatchCoordEnum; -use netpod::SfDbChannel; -use serde::Deserialize; -use serde::Serialize; -use std::collections::VecDeque; -use std::io; -use std::path::PathBuf; -use std::time::Duration; -use std::time::Instant; -use taskrun::tokio; -use tiny_keccak::Hasher; - -// For file-based caching, this determined the node where the cache file is located. -// No longer needed for scylla-based caching. -pub fn node_ix_for_patch(patch_coord: &PreBinnedPatchCoordEnum, channel: &SfDbChannel, cluster: &Cluster) -> u32 { - let mut hash = tiny_keccak::Sha3::v256(); - hash.update(channel.backend().as_bytes()); - hash.update(channel.name().as_bytes()); - /*hash.update(&patch_coord.patch_beg().to_le_bytes()); - hash.update(&patch_coord.patch_end().to_le_bytes()); - hash.update(&patch_coord.bin_t_len().to_le_bytes()); - hash.update(&patch_coord.patch_t_len().to_le_bytes());*/ - let mut out = [0; 32]; - hash.finalize(&mut out); - let a = [out[0], out[1], out[2], out[3]]; - let ix = u32::from_le_bytes(a) % cluster.nodes.len() as u32; - ix -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct CacheFileDesc { - // What identifies a cached file? - channel: SfDbChannel, - patch: PreBinnedPatchCoordEnum, - agg_kind: AggKind, -} - -impl CacheFileDesc { - pub fn new(channel: SfDbChannel, patch: PreBinnedPatchCoordEnum, agg_kind: AggKind) -> Self { - Self { - channel, - patch, - agg_kind, - } - } - - pub fn hash(&self) -> String { - let mut h = tiny_keccak::Sha3::v256(); - h.update(b"V000"); - h.update(self.channel.backend().as_bytes()); - h.update(self.channel.name().as_bytes()); - h.update(format!("{}", self.agg_kind).as_bytes()); - //h.update(&self.patch.spec().bin_t_len().to_le_bytes()); - //h.update(&self.patch.spec().patch_t_len().to_le_bytes()); - //h.update(&self.patch.ix().to_le_bytes()); - let mut buf = [0; 32]; - h.finalize(&mut buf); - hex::encode(&buf) - } - - pub fn hash_channel(&self) -> String { - let mut h = tiny_keccak::Sha3::v256(); - h.update(b"V000"); - h.update(self.channel.backend().as_bytes()); - h.update(self.channel.name().as_bytes()); - let mut buf = [0; 32]; - h.finalize(&mut buf); - hex::encode(&buf) - } - - pub fn path(&self, node_config: &NodeConfigCached) -> PathBuf { - let hash = self.hash(); - let hc = self.hash_channel(); - PathBuf::from(format!("{}", err::todoval::())) - .join("cache") - .join(&hc[0..3]) - .join(&hc[3..6]) - .join(self.channel.name()) - .join(format!("{}", self.agg_kind)) - /*.join(format!( - "{:010}-{:010}", - self.patch.spec().bin_t_len() / SEC, - self.patch.spec().patch_t_len() / SEC - )) - .join(format!("{}-{:012}", &hash[0..6], self.patch.ix()))*/ - } -} - -pub struct WrittenPbCache { - pub bytes: u64, - pub duration: Duration, -} - -// TODO only used for old archiver -pub async fn write_pb_cache_min_max_avg_scalar( - values: T, - patch: PreBinnedPatchCoordEnum, - agg_kind: AggKind, - channel: SfDbChannel, - node_config: NodeConfigCached, -) -> Result -where - T: Serialize, -{ - let cfd = CacheFileDesc { - channel: channel.clone(), - patch: patch.clone(), - agg_kind: agg_kind.clone(), - }; - let path = cfd.path(&node_config); - let enc = serde_cbor::to_vec(&values)?; - let ts1 = Instant::now(); - tokio::fs::create_dir_all(path.parent().unwrap()).await.map_err(|e| { - error!("can not create cache directory {:?}", path.parent()); - e - })?; - let now = Utc::now(); - let mut h = crc32fast::Hasher::new(); - h.update(&now.timestamp_nanos().to_le_bytes()); - let r = h.finalize(); - let tmp_path = - path.parent() - .unwrap() - .join(format!("{}.tmp.{:08x}", path.file_name().unwrap().to_str().unwrap(), r)); - let res = tokio::task::spawn_blocking({ - let tmp_path = tmp_path.clone(); - move || { - use fs2::FileExt; - use io::Write; - info!("try to write tmp at {:?}", tmp_path); - let mut f = std::fs::OpenOptions::new() - .create_new(true) - .write(true) - .open(&tmp_path)?; - if false { - f.lock_exclusive()?; - } - f.write_all(&enc)?; - if false { - f.unlock()?; - } - f.flush()?; - Ok::<_, Error>(enc.len()) - } - }) - .await - .map_err(Error::from_string)??; - tokio::fs::rename(&tmp_path, &path).await?; - let ts2 = Instant::now(); - let ret = WrittenPbCache { - bytes: res as u64, - duration: ts2.duration_since(ts1), - }; - Ok(ret) -} - -#[derive(Serialize)] -pub struct ClearCacheAllResult { - pub log: Vec, -} - -pub async fn clear_cache_all(node_config: &NodeConfigCached, dry: bool) -> Result { - let mut log = vec![]; - log.push(format!("begin at {:?}", chrono::Utc::now())); - if dry { - log.push(format!("dry run")); - } - let mut dirs = VecDeque::new(); - let mut stack = VecDeque::new(); - // stack.push_front(node_config.node.cache_base_path.join("cache")); - stack.push_front(PathBuf::from("UNDEFINED/NOTHING/REMOVE/ME").join("cache")); - loop { - match stack.pop_front() { - Some(path) => { - info!("clear_cache_all try read dir {:?}", path); - let mut rd = tokio::fs::read_dir(path).await?; - while let Some(entry) = rd.next_entry().await? { - let path = entry.path(); - match path.to_str() { - Some(_pathstr) => { - let meta = path.symlink_metadata()?; - //log.push(format!("len {:7} pathstr {}", meta.len(), pathstr,)); - let filename_str = path.file_name().unwrap().to_str().unwrap(); - if filename_str.ends_with("..") || filename_str.ends_with(".") { - log.push(format!("ERROR encountered . or ..")); - } else { - if meta.is_dir() { - stack.push_front(path.clone()); - dirs.push_front((meta.len(), path)); - } else if meta.is_file() { - log.push(format!("remove file len {:7} {}", meta.len(), path.to_string_lossy())); - if !dry { - match tokio::fs::remove_file(&path).await { - Ok(_) => {} - Err(e) => { - log.push(format!( - "can not remove file {} {:?}", - path.to_string_lossy(), - e - )); - } - } - } - } else { - log.push(format!("not file, note dir")); - } - } - } - None => { - log.push(format!("Invalid utf-8 path encountered")); - } - } - } - } - None => break, - } - } - log.push(format!( - "start to remove {} dirs at {:?}", - dirs.len(), - chrono::Utc::now() - )); - for (len, path) in dirs { - log.push(format!("remove dir len {} {}", len, path.to_string_lossy())); - if !dry { - match tokio::fs::remove_dir(&path).await { - Ok(_) => {} - Err(e) => { - log.push(format!("can not remove dir {} {:?}", path.to_string_lossy(), e)); - } - } - } - } - log.push(format!("done at {:?}", chrono::Utc::now())); - let ret = ClearCacheAllResult { log }; - Ok(ret) -} diff --git a/crates/disk/src/disk.rs b/crates/disk/src/disk.rs index 4874bda..a872f6b 100644 --- a/crates/disk/src/disk.rs +++ b/crates/disk/src/disk.rs @@ -1,7 +1,6 @@ #[cfg(test)] pub mod aggtest; pub mod binnedstream; -pub mod cache; pub mod channelconfig; pub mod dataopen; pub mod decode; diff --git a/crates/disk/src/merge/mergedblobsfromremotes.rs b/crates/disk/src/merge/mergedblobsfromremotes.rs index b42577a..16fecc6 100644 --- a/crates/disk/src/merge/mergedblobsfromremotes.rs +++ b/crates/disk/src/merge/mergedblobsfromremotes.rs @@ -2,6 +2,7 @@ use futures_util::pin_mut; use futures_util::Stream; use futures_util::StreamExt; use futures_util::TryFutureExt; +use httpclient::postimpl::HttpSimplePostImpl1; use items_0::streamitem::sitem_err2_from_string; use items_0::streamitem::Sitemty; use items_2::eventfull::EventFull; @@ -32,7 +33,8 @@ impl MergedBlobsFromRemotes { debug!("MergedBlobsFromRemotes::new subq {:?}", subq); let mut tcp_establish_futs = Vec::new(); for node in &cluster.nodes { - let post = todo!(); + let post = HttpSimplePostImpl1::new(); + let post = Box::new(post); let f = x_processed_event_blobs_stream_from_node(subq.clone(), node.clone(), post, ctx.clone()); let f = f.map_err(sitem_err2_from_string); let f: T002 = Box::pin(f); diff --git a/crates/httpclient/Cargo.toml b/crates/httpclient/Cargo.toml index f1b8029..5a8b70e 100644 --- a/crates/httpclient/Cargo.toml +++ b/crates/httpclient/Cargo.toml @@ -22,6 +22,7 @@ async-channel = "1.9.0" err = { path = "../err" } netpod = { path = "../netpod" } parse = { path = "../parse" } +streams = { path = "../streams" } thiserror = "0.0.1" [patch.crates-io] diff --git a/crates/httpclient/src/httpclient.rs b/crates/httpclient/src/httpclient.rs index 20dacfb..4549ccd 100644 --- a/crates/httpclient/src/httpclient.rs +++ b/crates/httpclient/src/httpclient.rs @@ -344,7 +344,12 @@ pub async fn http_post(url: Url, accept: &str, body: String, ctx: &ReqCtx) -> Re Ok(buf) } -pub async fn connect_client(uri: &http::Uri) -> Result, Error> { +pub async fn connect_client(uri: &http::Uri) -> Result, Error> +where + B: Body + Send + 'static, + ::Data: Send, + ::Error: Into>, +{ let scheme = uri.scheme_str().unwrap_or("http"); let host = uri.host().ok_or_else(|| Error::NoHostInUrl)?; let port = uri.port_u16().unwrap_or_else(|| { diff --git a/crates/httpclient/src/lib.rs b/crates/httpclient/src/lib.rs index 50aaf80..2fe63a2 100644 --- a/crates/httpclient/src/lib.rs +++ b/crates/httpclient/src/lib.rs @@ -1,4 +1,5 @@ pub mod httpclient; +pub mod postimpl; pub use crate::httpclient::*; pub use http; diff --git a/crates/httpclient/src/postimpl.rs b/crates/httpclient/src/postimpl.rs new file mode 100644 index 0000000..e796b16 --- /dev/null +++ b/crates/httpclient/src/postimpl.rs @@ -0,0 +1,76 @@ +use crate::httpclient; +use bytes::Bytes; +use http::Request; +use http::Response; +use http::StatusCode; +use http_body::Body; +use http_body_util::combinators::UnsyncBoxBody; +use http_body_util::BodyExt; +use http_body_util::Full; +use std::fmt; +use std::future::Future; +use std::pin::Pin; + +#[derive(Debug)] +pub enum BoxBodyError {} + +impl fmt::Display for BoxBodyError { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "BoxBodyError {{ .. }}") + } +} + +impl std::error::Error for BoxBodyError {} + +pub type ResponseTy = Response>; + +fn make_error_response(e: E) -> ResponseTy +where + E: std::error::Error, +{ + let body = Bytes::from(e.to_string().as_bytes().to_vec()); + let body = Full::new(body); + let body = body.map_err(|_| streams::tcprawclient::ErrorBody::Msg(String::new())); + let body = UnsyncBoxBody::new(body); + Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body(body) + .unwrap() +} + +pub struct HttpSimplePostImpl1 {} + +impl HttpSimplePostImpl1 { + pub fn new() -> Self { + Self {} + } +} + +fn is_body(_: &B) +where + B: Body, +{ +} + +impl streams::tcprawclient::HttpSimplePost for HttpSimplePostImpl1 { + fn http_simple_post(&self, req: Request>) -> Pin + Send>> { + let fut = async move { + let mut client = match httpclient::connect_client(req.uri()).await { + Ok(x) => x, + Err(e) => return make_error_response(e), + }; + let res = match client.send_request(req).await { + Ok(x) => x, + Err(e) => return make_error_response(e), + }; + let (head, body) = res.into_parts(); + use http_body_util::BodyExt; + let stream = body + .map_err(|e| streams::tcprawclient::ErrorBody::Msg(e.to_string())) + .boxed_unsync(); + is_body(&stream); + Response::from_parts(head, stream) + }; + Box::pin(fut) + } +} diff --git a/crates/httpret/src/httpret.rs b/crates/httpret/src/httpret.rs index 34da205..fc0c53e 100644 --- a/crates/httpret/src/httpret.rs +++ b/crates/httpret/src/httpret.rs @@ -411,12 +411,6 @@ async fn http_service_inner( } else { Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?) } - } else if path == "/api/4/clear_cache" { - if req.method() == Method::GET { - Ok(clear_cache_all(req, ctx, &node_config).await?) - } else { - Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?) - } } else if let Some(h) = api4::maintenance::UpdateDbWithChannelNamesHandler::handler(&req) { Ok(h.handle(req, ctx, &node_config).await?) } else if let Some(h) = api4::maintenance::UpdateDbWithAllChannelConfigsHandler::handler(&req) { @@ -525,23 +519,6 @@ async fn random_channel( Ok(ret) } -async fn clear_cache_all( - req: Requ, - _ctx: &ReqCtx, - node_config: &NodeConfigCached, -) -> Result { - let (head, _body) = req.into_parts(); - let dry = match head.uri.query() { - Some(q) => q.contains("dry"), - None => false, - }; - let res = disk::cache::clear_cache_all(node_config, dry).await?; - let ret = response(StatusCode::OK) - .header(http::header::CONTENT_TYPE, APP_JSON) - .body(body_string(serde_json::to_string(&res)?))?; - Ok(ret) -} - pub async fn test_log() { error!("------"); warn!("------"); diff --git a/crates/streams/Cargo.toml b/crates/streams/Cargo.toml index cdfc8d3..90fd47c 100644 --- a/crates/streams/Cargo.toml +++ b/crates/streams/Cargo.toml @@ -28,7 +28,6 @@ query = { path = "../query" } items_0 = { path = "../items_0" } items_2 = { path = "../items_2" } parse = { path = "../parse" } -#httpclient = { path = "../httpclient" } http = "1" http-body = "1" http-body-util = "0.1.0" diff --git a/crates/streams/src/tcprawclient.rs b/crates/streams/src/tcprawclient.rs index c568d2f..466cb1e 100644 --- a/crates/streams/src/tcprawclient.rs +++ b/crates/streams/src/tcprawclient.rs @@ -14,6 +14,7 @@ use futures_util::Stream; use futures_util::StreamExt; use futures_util::TryStreamExt; use http::Uri; +use http_body_util::BodyExt; use items_0::framable::FrameTypeInnerStatic; use items_0::streamitem::sitem_data; use items_0::streamitem::sitem_err2_from_string; @@ -122,15 +123,17 @@ pub async fn x_processed_event_blobs_stream_from_node_tcp( } #[derive(Debug, thiserror::Error)] -pub enum ErrorBody {} +pub enum ErrorBody { + #[error("{0}")] + Msg(String), +} pub trait HttpSimplePost: Send { fn http_simple_post( &self, - // req: http::Request>>, - req: http::Request>, - ) -> http::Response< - http_body_util::StreamBody, ErrorBody>> + Send>>>, + req: http::Request>, + ) -> Pin< + Box>> + Send>, >; } @@ -167,18 +170,15 @@ pub async fn x_processed_event_blobs_stream_from_node_http( let url = node.baseurl().join("/api/4/private/eventdata/frames").unwrap(); debug!("open_event_data_streams_http post {url}"); let uri: Uri = url.as_str().parse().unwrap(); - let body = http_body_util::BodyDataStream::new(buf); + let body = http_body_util::Full::new(buf); let req = Request::builder() .method(Method::POST) .uri(&uri) .header(header::HOST, uri.host().unwrap()) .header(header::ACCEPT, APP_OCTET) .header(ctx.header_name(), ctx.header_value()) - // .body(body_bytes(buf))?; .body(body)?; - let res = post.http_simple_post(req); - // let mut client = httpclient::connect_client(req.uri()).await?; - // let res = client.send_request(req).await?; + let res = post.http_simple_post(req).await; if res.status() != StatusCode::OK { let (head, body) = res.into_parts(); error!("server error {:?}", head); @@ -187,22 +187,13 @@ pub async fn x_processed_event_blobs_stream_from_node_http( return Err(Error::ServerError(head, s.to_string())); } let (_head, body) = res.into_parts(); - // while let Some(x) = body.next().await { - // let fr = x?; - // } let inp = body; + let inp = inp.into_data_stream(); let inp = inp.map(|x| match x { - Ok(x) => match x.into_data() { - Ok(x) => Ok(x), - Err(e) => { - debug!("see non-data frame {e:?}"); - Ok(Bytes::new()) - } - }, + Ok(x) => Ok(x), Err(e) => Err(sitem_err2_from_string(e)), }); let inp = Box::pin(inp) as BoxedBytesStream; - // let inp = Box::pin(httpclient::IncomingStream::new(body)) as BoxedBytesStream; let frames = InMemoryFrameStream::new(inp, subq.inmem_bufcap()); let frames = frames.map_err(sitem_err2_from_string); let frames = Box::pin(frames);