WIP first test with post impl

This commit is contained in:
Dominik Werder
2024-11-06 15:03:07 +01:00
parent db6e55bcb6
commit 1364f43f6f
10 changed files with 99 additions and 291 deletions

View File

@@ -1,243 +0,0 @@
use chrono::Utc;
use err::Error;
use netpod::log::*;
use netpod::AggKind;
use netpod::Cluster;
use netpod::NodeConfigCached;
use netpod::PreBinnedPatchCoordEnum;
use netpod::SfDbChannel;
use serde::Deserialize;
use serde::Serialize;
use std::collections::VecDeque;
use std::io;
use std::path::PathBuf;
use std::time::Duration;
use std::time::Instant;
use taskrun::tokio;
use tiny_keccak::Hasher;
// For file-based caching, this determined the node where the cache file is located.
// No longer needed for scylla-based caching.
pub fn node_ix_for_patch(patch_coord: &PreBinnedPatchCoordEnum, channel: &SfDbChannel, cluster: &Cluster) -> u32 {
let mut hash = tiny_keccak::Sha3::v256();
hash.update(channel.backend().as_bytes());
hash.update(channel.name().as_bytes());
/*hash.update(&patch_coord.patch_beg().to_le_bytes());
hash.update(&patch_coord.patch_end().to_le_bytes());
hash.update(&patch_coord.bin_t_len().to_le_bytes());
hash.update(&patch_coord.patch_t_len().to_le_bytes());*/
let mut out = [0; 32];
hash.finalize(&mut out);
let a = [out[0], out[1], out[2], out[3]];
let ix = u32::from_le_bytes(a) % cluster.nodes.len() as u32;
ix
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CacheFileDesc {
// What identifies a cached file?
channel: SfDbChannel,
patch: PreBinnedPatchCoordEnum,
agg_kind: AggKind,
}
impl CacheFileDesc {
pub fn new(channel: SfDbChannel, patch: PreBinnedPatchCoordEnum, agg_kind: AggKind) -> Self {
Self {
channel,
patch,
agg_kind,
}
}
pub fn hash(&self) -> String {
let mut h = tiny_keccak::Sha3::v256();
h.update(b"V000");
h.update(self.channel.backend().as_bytes());
h.update(self.channel.name().as_bytes());
h.update(format!("{}", self.agg_kind).as_bytes());
//h.update(&self.patch.spec().bin_t_len().to_le_bytes());
//h.update(&self.patch.spec().patch_t_len().to_le_bytes());
//h.update(&self.patch.ix().to_le_bytes());
let mut buf = [0; 32];
h.finalize(&mut buf);
hex::encode(&buf)
}
pub fn hash_channel(&self) -> String {
let mut h = tiny_keccak::Sha3::v256();
h.update(b"V000");
h.update(self.channel.backend().as_bytes());
h.update(self.channel.name().as_bytes());
let mut buf = [0; 32];
h.finalize(&mut buf);
hex::encode(&buf)
}
pub fn path(&self, node_config: &NodeConfigCached) -> PathBuf {
let hash = self.hash();
let hc = self.hash_channel();
PathBuf::from(format!("{}", err::todoval::<u8>()))
.join("cache")
.join(&hc[0..3])
.join(&hc[3..6])
.join(self.channel.name())
.join(format!("{}", self.agg_kind))
/*.join(format!(
"{:010}-{:010}",
self.patch.spec().bin_t_len() / SEC,
self.patch.spec().patch_t_len() / SEC
))
.join(format!("{}-{:012}", &hash[0..6], self.patch.ix()))*/
}
}
pub struct WrittenPbCache {
pub bytes: u64,
pub duration: Duration,
}
// TODO only used for old archiver
pub async fn write_pb_cache_min_max_avg_scalar<T>(
values: T,
patch: PreBinnedPatchCoordEnum,
agg_kind: AggKind,
channel: SfDbChannel,
node_config: NodeConfigCached,
) -> Result<WrittenPbCache, Error>
where
T: Serialize,
{
let cfd = CacheFileDesc {
channel: channel.clone(),
patch: patch.clone(),
agg_kind: agg_kind.clone(),
};
let path = cfd.path(&node_config);
let enc = serde_cbor::to_vec(&values)?;
let ts1 = Instant::now();
tokio::fs::create_dir_all(path.parent().unwrap()).await.map_err(|e| {
error!("can not create cache directory {:?}", path.parent());
e
})?;
let now = Utc::now();
let mut h = crc32fast::Hasher::new();
h.update(&now.timestamp_nanos().to_le_bytes());
let r = h.finalize();
let tmp_path =
path.parent()
.unwrap()
.join(format!("{}.tmp.{:08x}", path.file_name().unwrap().to_str().unwrap(), r));
let res = tokio::task::spawn_blocking({
let tmp_path = tmp_path.clone();
move || {
use fs2::FileExt;
use io::Write;
info!("try to write tmp at {:?}", tmp_path);
let mut f = std::fs::OpenOptions::new()
.create_new(true)
.write(true)
.open(&tmp_path)?;
if false {
f.lock_exclusive()?;
}
f.write_all(&enc)?;
if false {
f.unlock()?;
}
f.flush()?;
Ok::<_, Error>(enc.len())
}
})
.await
.map_err(Error::from_string)??;
tokio::fs::rename(&tmp_path, &path).await?;
let ts2 = Instant::now();
let ret = WrittenPbCache {
bytes: res as u64,
duration: ts2.duration_since(ts1),
};
Ok(ret)
}
#[derive(Serialize)]
pub struct ClearCacheAllResult {
pub log: Vec<String>,
}
pub async fn clear_cache_all(node_config: &NodeConfigCached, dry: bool) -> Result<ClearCacheAllResult, Error> {
let mut log = vec![];
log.push(format!("begin at {:?}", chrono::Utc::now()));
if dry {
log.push(format!("dry run"));
}
let mut dirs = VecDeque::new();
let mut stack = VecDeque::new();
// stack.push_front(node_config.node.cache_base_path.join("cache"));
stack.push_front(PathBuf::from("UNDEFINED/NOTHING/REMOVE/ME").join("cache"));
loop {
match stack.pop_front() {
Some(path) => {
info!("clear_cache_all try read dir {:?}", path);
let mut rd = tokio::fs::read_dir(path).await?;
while let Some(entry) = rd.next_entry().await? {
let path = entry.path();
match path.to_str() {
Some(_pathstr) => {
let meta = path.symlink_metadata()?;
//log.push(format!("len {:7} pathstr {}", meta.len(), pathstr,));
let filename_str = path.file_name().unwrap().to_str().unwrap();
if filename_str.ends_with("..") || filename_str.ends_with(".") {
log.push(format!("ERROR encountered . or .."));
} else {
if meta.is_dir() {
stack.push_front(path.clone());
dirs.push_front((meta.len(), path));
} else if meta.is_file() {
log.push(format!("remove file len {:7} {}", meta.len(), path.to_string_lossy()));
if !dry {
match tokio::fs::remove_file(&path).await {
Ok(_) => {}
Err(e) => {
log.push(format!(
"can not remove file {} {:?}",
path.to_string_lossy(),
e
));
}
}
}
} else {
log.push(format!("not file, note dir"));
}
}
}
None => {
log.push(format!("Invalid utf-8 path encountered"));
}
}
}
}
None => break,
}
}
log.push(format!(
"start to remove {} dirs at {:?}",
dirs.len(),
chrono::Utc::now()
));
for (len, path) in dirs {
log.push(format!("remove dir len {} {}", len, path.to_string_lossy()));
if !dry {
match tokio::fs::remove_dir(&path).await {
Ok(_) => {}
Err(e) => {
log.push(format!("can not remove dir {} {:?}", path.to_string_lossy(), e));
}
}
}
}
log.push(format!("done at {:?}", chrono::Utc::now()));
let ret = ClearCacheAllResult { log };
Ok(ret)
}

View File

@@ -1,7 +1,6 @@
#[cfg(test)]
pub mod aggtest;
pub mod binnedstream;
pub mod cache;
pub mod channelconfig;
pub mod dataopen;
pub mod decode;

View File

@@ -2,6 +2,7 @@ use futures_util::pin_mut;
use futures_util::Stream;
use futures_util::StreamExt;
use futures_util::TryFutureExt;
use httpclient::postimpl::HttpSimplePostImpl1;
use items_0::streamitem::sitem_err2_from_string;
use items_0::streamitem::Sitemty;
use items_2::eventfull::EventFull;
@@ -32,7 +33,8 @@ impl MergedBlobsFromRemotes {
debug!("MergedBlobsFromRemotes::new subq {:?}", subq);
let mut tcp_establish_futs = Vec::new();
for node in &cluster.nodes {
let post = todo!();
let post = HttpSimplePostImpl1::new();
let post = Box::new(post);
let f = x_processed_event_blobs_stream_from_node(subq.clone(), node.clone(), post, ctx.clone());
let f = f.map_err(sitem_err2_from_string);
let f: T002<EventFull> = Box::pin(f);

View File

@@ -22,6 +22,7 @@ async-channel = "1.9.0"
err = { path = "../err" }
netpod = { path = "../netpod" }
parse = { path = "../parse" }
streams = { path = "../streams" }
thiserror = "0.0.1"
[patch.crates-io]

View File

@@ -344,7 +344,12 @@ pub async fn http_post(url: Url, accept: &str, body: String, ctx: &ReqCtx) -> Re
Ok(buf)
}
pub async fn connect_client(uri: &http::Uri) -> Result<SendRequest<StreamBody>, Error> {
pub async fn connect_client<B>(uri: &http::Uri) -> Result<SendRequest<B>, Error>
where
B: Body + Send + 'static,
<B as Body>::Data: Send,
<B as Body>::Error: Into<Box<(dyn std::error::Error + Send + Sync + 'static)>>,
{
let scheme = uri.scheme_str().unwrap_or("http");
let host = uri.host().ok_or_else(|| Error::NoHostInUrl)?;
let port = uri.port_u16().unwrap_or_else(|| {

View File

@@ -1,4 +1,5 @@
pub mod httpclient;
pub mod postimpl;
pub use crate::httpclient::*;
pub use http;

View File

@@ -0,0 +1,76 @@
use crate::httpclient;
use bytes::Bytes;
use http::Request;
use http::Response;
use http::StatusCode;
use http_body::Body;
use http_body_util::combinators::UnsyncBoxBody;
use http_body_util::BodyExt;
use http_body_util::Full;
use std::fmt;
use std::future::Future;
use std::pin::Pin;
#[derive(Debug)]
pub enum BoxBodyError {}
impl fmt::Display for BoxBodyError {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "BoxBodyError {{ .. }}")
}
}
impl std::error::Error for BoxBodyError {}
pub type ResponseTy = Response<UnsyncBoxBody<Bytes, streams::tcprawclient::ErrorBody>>;
fn make_error_response<E>(e: E) -> ResponseTy
where
E: std::error::Error,
{
let body = Bytes::from(e.to_string().as_bytes().to_vec());
let body = Full::new(body);
let body = body.map_err(|_| streams::tcprawclient::ErrorBody::Msg(String::new()));
let body = UnsyncBoxBody::new(body);
Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(body)
.unwrap()
}
pub struct HttpSimplePostImpl1 {}
impl HttpSimplePostImpl1 {
pub fn new() -> Self {
Self {}
}
}
fn is_body<B>(_: &B)
where
B: Body,
{
}
impl streams::tcprawclient::HttpSimplePost for HttpSimplePostImpl1 {
fn http_simple_post(&self, req: Request<Full<Bytes>>) -> Pin<Box<dyn Future<Output = ResponseTy> + Send>> {
let fut = async move {
let mut client = match httpclient::connect_client(req.uri()).await {
Ok(x) => x,
Err(e) => return make_error_response(e),
};
let res = match client.send_request(req).await {
Ok(x) => x,
Err(e) => return make_error_response(e),
};
let (head, body) = res.into_parts();
use http_body_util::BodyExt;
let stream = body
.map_err(|e| streams::tcprawclient::ErrorBody::Msg(e.to_string()))
.boxed_unsync();
is_body(&stream);
Response::from_parts(head, stream)
};
Box::pin(fut)
}
}

View File

@@ -411,12 +411,6 @@ async fn http_service_inner(
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?)
}
} else if path == "/api/4/clear_cache" {
if req.method() == Method::GET {
Ok(clear_cache_all(req, ctx, &node_config).await?)
} else {
Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(body_empty())?)
}
} else if let Some(h) = api4::maintenance::UpdateDbWithChannelNamesHandler::handler(&req) {
Ok(h.handle(req, ctx, &node_config).await?)
} else if let Some(h) = api4::maintenance::UpdateDbWithAllChannelConfigsHandler::handler(&req) {
@@ -525,23 +519,6 @@ async fn random_channel(
Ok(ret)
}
async fn clear_cache_all(
req: Requ,
_ctx: &ReqCtx,
node_config: &NodeConfigCached,
) -> Result<StreamResponse, RetrievalError> {
let (head, _body) = req.into_parts();
let dry = match head.uri.query() {
Some(q) => q.contains("dry"),
None => false,
};
let res = disk::cache::clear_cache_all(node_config, dry).await?;
let ret = response(StatusCode::OK)
.header(http::header::CONTENT_TYPE, APP_JSON)
.body(body_string(serde_json::to_string(&res)?))?;
Ok(ret)
}
pub async fn test_log() {
error!("------");
warn!("------");

View File

@@ -28,7 +28,6 @@ query = { path = "../query" }
items_0 = { path = "../items_0" }
items_2 = { path = "../items_2" }
parse = { path = "../parse" }
#httpclient = { path = "../httpclient" }
http = "1"
http-body = "1"
http-body-util = "0.1.0"

View File

@@ -14,6 +14,7 @@ use futures_util::Stream;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use http::Uri;
use http_body_util::BodyExt;
use items_0::framable::FrameTypeInnerStatic;
use items_0::streamitem::sitem_data;
use items_0::streamitem::sitem_err2_from_string;
@@ -122,15 +123,17 @@ pub async fn x_processed_event_blobs_stream_from_node_tcp(
}
#[derive(Debug, thiserror::Error)]
pub enum ErrorBody {}
pub enum ErrorBody {
#[error("{0}")]
Msg(String),
}
pub trait HttpSimplePost: Send {
fn http_simple_post(
&self,
// req: http::Request<http_body_util::BodyDataStream<http_body::Frame<Bytes>>>,
req: http::Request<http_body_util::BodyDataStream<Bytes>>,
) -> http::Response<
http_body_util::StreamBody<Pin<Box<dyn Stream<Item = Result<http_body::Frame<Bytes>, ErrorBody>> + Send>>>,
req: http::Request<http_body_util::Full<Bytes>>,
) -> Pin<
Box<dyn Future<Output = http::Response<http_body_util::combinators::UnsyncBoxBody<Bytes, ErrorBody>>> + Send>,
>;
}
@@ -167,18 +170,15 @@ pub async fn x_processed_event_blobs_stream_from_node_http(
let url = node.baseurl().join("/api/4/private/eventdata/frames").unwrap();
debug!("open_event_data_streams_http post {url}");
let uri: Uri = url.as_str().parse().unwrap();
let body = http_body_util::BodyDataStream::new(buf);
let body = http_body_util::Full::new(buf);
let req = Request::builder()
.method(Method::POST)
.uri(&uri)
.header(header::HOST, uri.host().unwrap())
.header(header::ACCEPT, APP_OCTET)
.header(ctx.header_name(), ctx.header_value())
// .body(body_bytes(buf))?;
.body(body)?;
let res = post.http_simple_post(req);
// let mut client = httpclient::connect_client(req.uri()).await?;
// let res = client.send_request(req).await?;
let res = post.http_simple_post(req).await;
if res.status() != StatusCode::OK {
let (head, body) = res.into_parts();
error!("server error {:?}", head);
@@ -187,22 +187,13 @@ pub async fn x_processed_event_blobs_stream_from_node_http(
return Err(Error::ServerError(head, s.to_string()));
}
let (_head, body) = res.into_parts();
// while let Some(x) = body.next().await {
// let fr = x?;
// }
let inp = body;
let inp = inp.into_data_stream();
let inp = inp.map(|x| match x {
Ok(x) => match x.into_data() {
Ok(x) => Ok(x),
Err(e) => {
debug!("see non-data frame {e:?}");
Ok(Bytes::new())
}
},
Ok(x) => Ok(x),
Err(e) => Err(sitem_err2_from_string(e)),
});
let inp = Box::pin(inp) as BoxedBytesStream;
// let inp = Box::pin(httpclient::IncomingStream::new(body)) as BoxedBytesStream;
let frames = InMemoryFrameStream::new(inp, subq.inmem_bufcap());
let frames = frames.map_err(sitem_err2_from_string);
let frames = Box::pin(frames);