From 91f3ec790c1beed59ccb196d503842651804f588 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Wed, 14 Apr 2021 17:49:44 +0200 Subject: [PATCH] Better diagnostics and more basic flow structure --- .gitignore | 1 + disk/src/agg.rs | 15 +++-- disk/src/cache.rs | 119 ++++++++++++++++++++++++--------- disk/src/gen.rs | 9 +-- disk/src/lib.rs | 2 +- httpret/src/lib.rs | 97 ++++++++++++++++++++++----- netpod/src/lib.rs | 10 +-- retrieval/src/bin/retrieval.rs | 3 +- retrieval/src/lib.rs | 7 +- taskrun/Cargo.toml | 1 + taskrun/src/lib.rs | 10 +++ 11 files changed, 203 insertions(+), 71 deletions(-) diff --git a/.gitignore b/.gitignore index 84eac3f..9245b17 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ /target /.idea /tmpdata +/docs diff --git a/disk/src/agg.rs b/disk/src/agg.rs index c049323..acc0928 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -708,12 +708,13 @@ where I: AggregatableTdim + Unpin, T: Stream> + Unpin, I:: } } -pub fn make_test_node(ix: u8) -> Node { +pub fn make_test_node(id: u32) -> Node { Node { + id, host: "localhost".into(), - port: 8800 + ix as u16, - data_base_path: format!("../tmpdata/node{:02}", ix).into(), - split: ix, + port: 8800 + id as u16, + data_base_path: format!("../tmpdata/node{:02}", id).into(), + split: id, ksprefix: "ks".into(), } } @@ -731,9 +732,9 @@ async fn agg_x_dim_0_inner() { channel_config: ChannelConfig { channel: Channel { backend: "sf-databuffer".into(), - keyspace: 2, name: "S10BC01-DBAM070:EOM1_T1".into(), }, + keyspace: 2, time_bin_size: DAY, shape: Shape::Scalar, scalar_type: ScalarType::F64, @@ -786,9 +787,9 @@ async fn agg_x_dim_1_inner() { channel_config: ChannelConfig { channel: Channel { backend: "ks".into(), - keyspace: 3, name: "wave1".into(), }, + keyspace: 3, time_bin_size: DAY, shape: Shape::Wave(1024), scalar_type: ScalarType::F64, @@ -835,9 +836,9 @@ async fn merge_0_inner() { channel_config: ChannelConfig { channel: Channel { backend: "ks".into(), - keyspace: 3, name: "wave1".into(), }, + keyspace: 3, time_bin_size: DAY, shape: Shape::Wave(17), scalar_type: ScalarType::F64, diff --git a/disk/src/cache.rs b/disk/src/cache.rs index 9741449..fcb8752 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -38,7 +38,6 @@ impl Query { agg_kind: AggKind::DimXBins1, channel: Channel { backend: params.get("channel_backend").unwrap().into(), - keyspace: params.get("channel_keyspace").unwrap().parse().unwrap(), name: params.get("channel_name").unwrap().into(), }, }; @@ -59,7 +58,7 @@ pub fn binned_bytes_for_http(node_config: Arc, query: &Query) -> Res Some(spec) => { info!("GOT PreBinnedPatchGridSpec: {:?}", spec); warn!("Pass here to BinnedStream what kind of Agg, range, ..."); - let s1 = BinnedStream::new(PreBinnedPatchIterator::from_range(spec), query.channel.clone(), agg_kind, node_config.cluster.clone()); + let s1 = BinnedStream::new(PreBinnedPatchIterator::from_range(spec), query.channel.clone(), agg_kind, node_config.clone()); // Iterate over the patches. // Request the patch from each node. // Merge. @@ -130,7 +129,6 @@ impl PreBinnedQuery { agg_kind: AggKind::DimXBins1, channel: Channel { backend: params.get("channel_backend").unwrap().into(), - keyspace: params.get("channel_keyspace").unwrap().parse().unwrap(), name: params.get("channel_name").unwrap().into(), }, }; @@ -153,12 +151,15 @@ pub fn pre_binned_bytes_for_http(node_config: Arc, query: &PreBinned pub struct PreBinnedValueByteStream { + inp: PreBinnedValueStream, } impl PreBinnedValueByteStream { pub fn new(patch: PreBinnedPatchCoord, channel: Channel, agg_kind: AggKind, node_config: Arc) -> Self { + warn!("PreBinnedValueByteStream"); Self { + inp: PreBinnedValueStream::new(patch, channel, agg_kind, node_config), } } @@ -167,9 +168,19 @@ impl PreBinnedValueByteStream { impl Stream for PreBinnedValueByteStream { type Item = Result; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { error!("PreBinnedValueByteStream poll_next"); - todo!() + use Poll::*; + match self.inp.poll_next_unpin(cx) { + Ready(Some(Ok(k))) => { + error!("TODO convert item to Bytes"); + let buf = Bytes::new(); + Ready(Some(Ok(buf))) + } + Ready(Some(Err(e))) => Ready(Some(Err(e))), + Ready(None) => Ready(None), + Pending => Pending, + } } } @@ -179,23 +190,66 @@ impl Stream for PreBinnedValueByteStream { pub struct PreBinnedValueStream { + patch_coord: PreBinnedPatchCoord, + channel: Channel, + agg_kind: AggKind, + node_config: Arc, +} + +impl PreBinnedValueStream { + + pub fn new(patch_coord: PreBinnedPatchCoord, channel: Channel, agg_kind: AggKind, node_config: Arc) -> Self { + let node_ix = node_ix_for_patch(&patch_coord, &channel, &node_config.cluster); + assert!(node_ix == node_config.node.id); + Self { + patch_coord, + channel, + agg_kind, + node_config, + } + } + +} + +impl Stream for PreBinnedValueStream { + // TODO need this generic for scalar and array (when wave is not binned down to a single scalar point) + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + // TODO provide the data from a local cached file, the on-the-fly binning of higher-res + // pre-binned patches which we can get again via http, or if there is no higher resolution + // then from raw events, or a combination of all those especially if there is not yet + // a pre-binned patch, and we have to stitch from higher-res-pre-bin plus extend with + // on-the-fly binning of fresh data. + error!("TODO provide the pre-binned data"); + Ready(None) + } + +} + + + + +pub struct PreBinnedValueFetchedStream { uri: http::Uri, patch_coord: PreBinnedPatchCoord, resfut: Option, res: Option>, } -impl PreBinnedValueStream { +impl PreBinnedValueFetchedStream { - pub fn new(patch_coord: PreBinnedPatchCoord, channel: Channel, agg_kind: AggKind, cluster: Arc) -> Self { - let nodeix = node_ix_for_patch(&patch_coord, &channel, &cluster); - let node = &cluster.nodes[nodeix]; + pub fn new(patch_coord: PreBinnedPatchCoord, channel: Channel, agg_kind: AggKind, node_config: Arc) -> Self { + let nodeix = node_ix_for_patch(&patch_coord, &channel, &node_config.cluster); + let node = &node_config.cluster.nodes[nodeix as usize]; let uri: hyper::Uri = format!( - "http://{}:{}/api/1/prebinned?beg={}&end={}&channel={}&agg_kind={:?}", + "http://{}:{}/api/1/prebinned?beg={}&end={}&channel_backend={}&channel_name={}&agg_kind={:?}", node.host, node.port, patch_coord.range.beg, patch_coord.range.end, + channel.backend, channel.name, agg_kind, ).parse().unwrap(); @@ -209,23 +263,7 @@ impl PreBinnedValueStream { } - -pub fn node_ix_for_patch(patch_coord: &PreBinnedPatchCoord, channel: &Channel, cluster: &Cluster) -> usize { - let mut hash = tiny_keccak::Sha3::v256(); - hash.update(channel.backend.as_bytes()); - hash.update(channel.name.as_bytes()); - hash.update(&patch_coord.range.beg.to_le_bytes()); - hash.update(&patch_coord.range.end.to_le_bytes()); - let mut out = [0; 32]; - hash.finalize(&mut out); - let mut a = [out[0], out[1], out[2], out[3]]; - let ix = u32::from_le_bytes(a) % cluster.nodes.len() as u32; - info!("node_ix_for_patch {}", ix); - ix as usize -} - - -impl Stream for PreBinnedValueStream { +impl Stream for PreBinnedValueFetchedStream { // TODO need this generic for scalar and array (when wave is not binned down to a single scalar point) type Item = Result; @@ -259,7 +297,10 @@ impl Stream for PreBinnedValueStream { self.res = Some(res); continue 'outer; } - Err(e) => Ready(Some(Err(e.into()))), + Err(e) => { + error!("PreBinnedValueStream error in stream {:?}", e); + Ready(Some(Err(e.into()))) + } } } Pending => { @@ -286,17 +327,19 @@ impl Stream for PreBinnedValueStream { } + pub struct BinnedStream { inp: Pin> + Send>>, } impl BinnedStream { - pub fn new(patch_it: PreBinnedPatchIterator, channel: Channel, agg_kind: AggKind, cluster: Arc) -> Self { + pub fn new(patch_it: PreBinnedPatchIterator, channel: Channel, agg_kind: AggKind, node_config: Arc) -> Self { + warn!("BinnedStream will open a PreBinnedValueStream"); let mut patch_it = patch_it; let inp = futures_util::stream::iter(patch_it) .map(move |coord| { - PreBinnedValueStream::new(coord, channel.clone(), agg_kind.clone(), cluster.clone()) + PreBinnedValueFetchedStream::new(coord, channel.clone(), agg_kind.clone(), node_config.clone()) }) .flatten() .map(|k| { @@ -340,3 +383,19 @@ impl From for Bytes { } } + + + +pub fn node_ix_for_patch(patch_coord: &PreBinnedPatchCoord, channel: &Channel, cluster: &Cluster) -> u32 { + let mut hash = tiny_keccak::Sha3::v256(); + hash.update(channel.backend.as_bytes()); + hash.update(channel.name.as_bytes()); + hash.update(&patch_coord.range.beg.to_le_bytes()); + hash.update(&patch_coord.range.end.to_le_bytes()); + let mut out = [0; 32]; + hash.finalize(&mut out); + let mut a = [out[0], out[1], out[2], out[3]]; + let ix = u32::from_le_bytes(a) % cluster.nodes.len() as u32; + info!("node_ix_for_patch {}", ix); + ix +} diff --git a/disk/src/gen.rs b/disk/src/gen.rs index bd14a55..cddec8f 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -36,9 +36,9 @@ pub async fn gen_test_data() -> Result<(), Error> { config: ChannelConfig { channel: Channel { backend: "test".into(), - keyspace: 3, name: "wave1".into(), }, + keyspace: 3, time_bin_size: DAY, scalar_type: ScalarType::F64, shape: Shape::Wave(17), @@ -51,9 +51,10 @@ pub async fn gen_test_data() -> Result<(), Error> { } for i1 in 0..13 { let node = Node { + id: i1, host: "localhost".into(), - port: 7780 + i1, - split: i1 as u8, + port: 7780 + i1 as u16, + split: i1, data_base_path: data_base_path.join(format!("node{:02}", i1)), ksprefix: ksprefix.clone(), }; @@ -88,7 +89,7 @@ async fn gen_channel(chn: &ChannelGenProps, node: &Node, ensemble: &Ensemble) -> .join(&chn.config.channel.name); tokio::fs::create_dir_all(&config_path).await?; let channel_path = node.data_base_path - .join(format!("{}_{}", node.ksprefix, chn.config.channel.keyspace)) + .join(format!("{}_{}", node.ksprefix, chn.config.keyspace)) .join("byTime") .join(&chn.config.channel.name); tokio::fs::create_dir_all(&channel_path).await?; diff --git a/disk/src/lib.rs b/disk/src/lib.rs index 1f551aa..4f0db5d 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -891,7 +891,7 @@ pub fn raw_concat_channel_read_stream_timebin(query: &netpod::AggQuerySingleChan fn datapath(timebin: u64, config: &netpod::ChannelConfig, node: &Node) -> PathBuf { //let pre = "/data/sf-databuffer/daq_swissfel"; node.data_base_path - .join(format!("{}_{}", node.ksprefix, config.channel.keyspace)) + .join(format!("{}_{}", node.ksprefix, config.keyspace)) .join("byTime") .join(config.channel.name.clone()) .join(format!("{:019}", timebin)) diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index f2de13b..55e7303 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -1,24 +1,27 @@ #[allow(unused_imports)] use tracing::{error, warn, info, debug, trace}; use err::Error; -use std::net::SocketAddr; +use std::{task, future, pin, net, panic, sync}; +use net::SocketAddr; use http::{Method, StatusCode, HeaderMap}; -use hyper::{Body, Request, Response}; -use hyper::server::Server; +use hyper::{Body, Request, Response, server::Server}; use hyper::service::{make_service_fn, service_fn}; -use std::task::{Context, Poll}; -use std::pin::Pin; -use futures_util::FutureExt; +use task::{Context, Poll}; +use future::Future; +use pin::Pin; +use futures_core::Stream; +use futures_util::{FutureExt, StreamExt}; use netpod::{Node, Cluster, AggKind, NodeConfig}; -use std::sync::Arc; +use sync::Arc; use disk::cache::PreBinnedQuery; -use std::future::Future; -use std::panic::UnwindSafe; +use panic::{UnwindSafe, AssertUnwindSafe}; +use bytes::Bytes; pub async fn host(node_config: Arc) -> Result<(), Error> { let addr = SocketAddr::from(([0, 0, 0, 0], node_config.node.port)); let make_service = make_service_fn({ - move |_conn| { + move |conn| { + info!("new conn {:?}", conn); let node_config = node_config.clone(); async move { Ok::<_, Error>(service_fn({ @@ -48,18 +51,27 @@ struct Cont { f: Pin>, } -impl Future for Cont where F: Future { +impl Future for Cont where F: Future> { type Output = ::Output; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - /*let h = std::panic::catch_unwind(|| { + let h = std::panic::catch_unwind(AssertUnwindSafe(|| { self.f.poll_unpin(cx) - }); + })); match h { Ok(k) => k, - Err(e) => todo!(), - }*/ - self.f.poll_unpin(cx) + Err(e) => { + error!("Cont catch_unwind {:?}", e); + match e.downcast_ref::() { + Some(e) => { + error!("Cont catch_unwind is Error: {:?}", e); + } + None => { + } + } + Poll::Ready(Err(Error::from(format!("{:?}", e)))) + } + } } } @@ -162,7 +174,55 @@ impl hyper::body::HttpBody for BodyStreamWrap { } +struct BodyStream { + inp: S, +} + +impl BodyStream where S: Stream> + Unpin + Send + 'static, I: Into + Sized + 'static { + + pub fn new(inp: S) -> Self { + Self { + inp, + } + } + + pub fn wrapped(inp: S) -> Body { + Body::wrap_stream(Self::new(inp)) + } + +} + +impl Stream for BodyStream where S: Stream> + Unpin, I: Into + Sized { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + let t = std::panic::catch_unwind(AssertUnwindSafe(|| { + self.inp.poll_next_unpin(cx) + })); + let r = match t { + Ok(k) => k, + Err(e) => { + error!("panic {:?}", e); + panic!() + } + }; + match r { + Ready(Some(Ok(k))) => Ready(Some(Ok(k))), + Ready(Some(Err(e))) => { + error!("body stream error: {:?}", e); + Ready(Some(Err(e.into()))) + } + Ready(None) => Ready(None), + Pending => Pending, + } + } + +} + + async fn binned(req: Request, node_config: Arc) -> Result, Error> { + info!("-------------------------------------------------------- BINNED"); let (head, body) = req.into_parts(); //let params = netpod::query_params(head.uri.query()); @@ -176,7 +236,7 @@ async fn binned(req: Request, node_config: Arc) -> Result { response(StatusCode::OK) - .body(Body::wrap_stream(s))? + .body(BodyStream::wrapped(s))? } Err(e) => { error!("{:?}", e); @@ -188,12 +248,13 @@ async fn binned(req: Request, node_config: Arc) -> Result, node_config: Arc) -> Result, Error> { + info!("-------------------------------------------------------- PRE-BINNED"); let (head, body) = req.into_parts(); let q = PreBinnedQuery::from_request(&head)?; let ret = match disk::cache::pre_binned_bytes_for_http(node_config, &q) { Ok(s) => { response(StatusCode::OK) - .body(Body::wrap_stream(s))? + .body(BodyStream::wrapped(s))? } Err(e) => { error!("{:?}", e); diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index 34b612c..8602914 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -93,9 +93,10 @@ impl ScalarType { #[derive(Debug)] pub struct Node { + pub id: u32, pub host: String, pub port: u16, - pub split: u8, + pub split: u32, pub data_base_path: PathBuf, pub ksprefix: String, } @@ -122,7 +123,6 @@ pub struct NodeConfig { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct Channel { - pub keyspace: u8, pub backend: String, pub name: String, } @@ -166,14 +166,10 @@ impl NanoRange { } -#[test] -fn serde_channel() { - let _ex = "{\"name\":\"thechannel\",\"backend\":\"thebackend\"}"; -} - #[derive(Clone, Debug, Serialize, Deserialize)] pub struct ChannelConfig { pub channel: Channel, + pub keyspace: u8, pub time_bin_size: u64, pub scalar_type: ScalarType, pub shape: Shape, diff --git a/retrieval/src/bin/retrieval.rs b/retrieval/src/bin/retrieval.rs index aca1083..a3de740 100644 --- a/retrieval/src/bin/retrieval.rs +++ b/retrieval/src/bin/retrieval.rs @@ -34,6 +34,7 @@ fn simple_fetch() { taskrun::run(async { let t1 = chrono::Utc::now(); let node = Node { + id: 0, host: "localhost".into(), port: 8360, data_base_path: todo!(), @@ -45,9 +46,9 @@ fn simple_fetch() { channel_config: ChannelConfig { channel: Channel { backend: "sf-databuffer".into(), - keyspace: 3, name: "S10BC01-DBAM070:BAM_CH1_NORM".into(), }, + keyspace: 3, time_bin_size: DAY, scalar_type: ScalarType::F64, shape: Shape::Wave(todo!()), diff --git a/retrieval/src/lib.rs b/retrieval/src/lib.rs index ccc90e6..90e21d3 100644 --- a/retrieval/src/lib.rs +++ b/retrieval/src/lib.rs @@ -54,11 +54,12 @@ async fn get_cached_0_inner() -> Result<(), Error> { fn test_cluster() -> Cluster { - let nodes = (0..1).into_iter().map(|k| { + let nodes = (0..1).into_iter().map(|id| { let node = Node { + id, host: "localhost".into(), - port: 8360 + k, - data_base_path: format!("../tmpdata/node{:02}", k).into(), + port: 8360 + id as u16, + data_base_path: format!("../tmpdata/node{:02}", id).into(), ksprefix: "ks".into(), split: 0, }; diff --git a/taskrun/Cargo.toml b/taskrun/Cargo.toml index f5e8a87..79b2b02 100644 --- a/taskrun/Cargo.toml +++ b/taskrun/Cargo.toml @@ -8,4 +8,5 @@ edition = "2018" tokio = { version = "1.5.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } tracing = "0.1.25" tracing-subscriber = "0.2.17" +backtrace = "0.3.56" err = { path = "../err" } diff --git a/taskrun/src/lib.rs b/taskrun/src/lib.rs index 1eeb67d..bc8e889 100644 --- a/taskrun/src/lib.rs +++ b/taskrun/src/lib.rs @@ -1,4 +1,7 @@ +#[allow(unused_imports)] +use tracing::{error, warn, info, debug, trace}; use err::Error; +use std::panic; pub fn run>>(f: F) -> Result { tracing_init(); @@ -6,6 +9,13 @@ pub fn run>>(f: F) -> Result