diff --git a/disk/Cargo.toml b/disk/Cargo.toml index 7bae80c..d881c80 100644 --- a/disk/Cargo.toml +++ b/disk/Cargo.toml @@ -5,15 +5,18 @@ authors = ["Dominik Werder "] edition = "2018" [dependencies] -tokio = { version = "1.4.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } -tracing = "0.1.25" +http = "0.2" +serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" +chrono = { version = "0.4.19", features = ["serde"] } +tokio = { version = "1.4.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } async-channel = "1.6" bytes = "1.0.1" byteorder = "1.4.3" futures-core = "0.3.14" futures-util = "0.3.14" async-stream = "0.3.0" +tracing = "0.1.25" hex = "0.4.3" err = { path = "../err" } taskrun = { path = "../taskrun" } diff --git a/disk/src/agg.rs b/disk/src/agg.rs index 27e06bd..20c608d 100644 --- a/disk/src/agg.rs +++ b/disk/src/agg.rs @@ -8,6 +8,7 @@ use futures_core::Stream; use futures_util::{pin_mut, StreamExt, future::ready}; use netpod::{Channel, ChannelConfig, ScalarType, Shape, Node, timeunits::*}; use crate::merge::MergeDim1F32Stream; +use netpod::BinSpecDimT; pub trait AggregatorTdim { type InputValue; @@ -658,7 +659,7 @@ where I: AggregatableTdim + Unpin, T: Stream> + Unpin, I:: if self.aggtor.is_none() { let range = self.spec.get_range(self.curbin); //info!("range: {} {}", range.ts1, range.ts2); - self.aggtor = Some(k.aggregator_new(range.ts1, range.ts2)); + self.aggtor = Some(k.aggregator_new(range.beg, range.end)); } let ag = self.aggtor.as_mut().unwrap(); if ag.ends_before(&k) { @@ -706,69 +707,6 @@ where I: AggregatableTdim + Unpin, T: Stream> + Unpin, I:: } } - -pub struct BinSpecDimT { - count: u64, - ts1: u64, - ts2: u64, - bs: u64, -} - -impl BinSpecDimT { - - pub fn over_range(count: u64, ts1: u64, ts2: u64) -> Self { - assert!(count >= 1); - assert!(count <= 2000); - assert!(ts2 > ts1); - let dt = ts2 - ts1; - assert!(dt <= DAY * 14); - let bs = dt / count; - let thresholds = [ - 2, 10, 100, - 1000, 10_000, 100_000, - MU, MU * 10, MU * 100, - MS, MS * 10, MS * 100, - SEC, SEC * 5, SEC * 10, SEC * 20, - MIN, MIN * 5, MIN * 10, MIN * 20, - HOUR, HOUR * 2, HOUR * 4, HOUR * 12, - DAY, DAY * 2, DAY * 4, DAY * 8, DAY * 16, - WEEK, WEEK * 2, WEEK * 10, WEEK * 60, - ]; - let mut i1 = 0; - let bs = loop { - if i1 >= thresholds.len() { break *thresholds.last().unwrap(); } - let t = thresholds[i1]; - if bs < t { break t; } - i1 += 1; - }; - //info!("INPUT TS {} {}", ts1, ts2); - //info!("chosen binsize: {} {}", i1, bs); - let ts1 = ts1 / bs * bs; - let ts2 = (ts2 + bs - 1) / bs * bs; - //info!("ADJUSTED TS {} {}", ts1, ts2); - BinSpecDimT { - count, - ts1, - ts2, - bs, - } - } - - pub fn get_range(&self, ix: u32) -> TimeRange { - TimeRange { - ts1: self.ts1 + ix as u64 * self.bs, - ts2: self.ts1 + (ix as u64 + 1) * self.bs, - } - } - -} - -pub struct TimeRange { - ts1: u64, - ts2: u64, -} - - pub fn make_test_node(ix: u8) -> Node { Node { host: "localhost".into(), diff --git a/disk/src/cache.rs b/disk/src/cache.rs new file mode 100644 index 0000000..9f59250 --- /dev/null +++ b/disk/src/cache.rs @@ -0,0 +1,64 @@ +#[allow(unused_imports)] +use tracing::{error, warn, info, debug, trace}; +use err::Error; +use netpod::{Node, Cluster, AggKind, NanoRange, ToNanos}; +use futures_core::Stream; +use std::pin::Pin; +use std::task::{Context, Poll}; +use bytes::Bytes; +use chrono::{DateTime, Utc}; + + +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub struct Query { + range: NanoRange, + agg_kind: AggKind, +} + +impl Query { + + pub fn from_request(req: &http::request::Parts) -> Result { + let params = netpod::query_params(req.uri.query()); + let beg_date = params.get("beg_date").ok_or(Error::with_msg("missing beg_date"))?; + let end_date = params.get("end_date").ok_or(Error::with_msg("missing end_date"))?; + let ret = Query { + range: NanoRange { + beg: beg_date.parse::>()?.to_nanos(), + end: end_date.parse::>()?.to_nanos(), + }, + agg_kind: AggKind::DimXBins1, + }; + info!("Query::from_request {:?}", ret); + Ok(ret) + } + +} + + +pub struct BinParams { + pub node: Node, + pub cluster: Cluster, +} + +pub fn binned_bytes_for_http(params: BinParams) -> Result { + + // TODO + // Translate the Query TimeRange + AggKind into an iterator over the pre-binned patches. + + let ret = BinnedBytesForHttpStream {}; + Ok(ret) +} + +pub struct BinnedBytesForHttpStream { +} + +impl Stream for BinnedBytesForHttpStream { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + // TODO + use Poll::*; + Ready(None) + } + +} diff --git a/disk/src/lib.rs b/disk/src/lib.rs index c3888fa..eba7c8a 100644 --- a/disk/src/lib.rs +++ b/disk/src/lib.rs @@ -1,7 +1,3 @@ -pub mod agg; -pub mod gen; -pub mod merge; - #[allow(unused_imports)] use tracing::{error, warn, info, debug, trace}; use err::Error; @@ -18,6 +14,11 @@ use std::path::PathBuf; use bitshuffle::bitshuffle_decompress; use netpod::{ScalarType, Shape, Node, ChannelConfig}; +pub mod agg; +pub mod gen; +pub mod merge; +pub mod cache; + pub async fn read_test_1(query: &netpod::AggQuerySingleChannel, node: &Node) -> Result { let path = datapath(query.timebin as u64, &query.channel_config, node); diff --git a/err/Cargo.toml b/err/Cargo.toml index 80c9c6e..7a3dd2a 100644 --- a/err/Cargo.toml +++ b/err/Cargo.toml @@ -9,3 +9,4 @@ hyper = { version = "0.14", features = ["http1", "http2", "client", "server", "t http = "0.2" serde_json = "1.0" async-channel = "1.6" +chrono = { version = "0.4.19", features = ["serde"] } diff --git a/err/src/lib.rs b/err/src/lib.rs index d4056f9..9f41bdc 100644 --- a/err/src/lib.rs +++ b/err/src/lib.rs @@ -67,3 +67,11 @@ impl From for Error { } } } + +impl From for Error { + fn from (k: chrono::format::ParseError) -> Self { + Self { + msg: k.to_string(), + } + } +} diff --git a/httpret/Cargo.toml b/httpret/Cargo.toml index 0dfc504..c35284b 100644 --- a/httpret/Cargo.toml +++ b/httpret/Cargo.toml @@ -7,6 +7,7 @@ edition = "2018" [dependencies] hyper = { version = "0.14", features = ["http1", "http2", "client", "server", "tcp", "stream"] } http = "0.2" +url = "2.2" bytes = "1.0.1" futures-core = "0.3.12" tracing = "0.1.25" diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 7f8246d..a00117b 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -8,22 +8,34 @@ use hyper::server::Server; use hyper::service::{make_service_fn, service_fn}; use std::task::{Context, Poll}; use std::pin::Pin; -//use std::pin::Pin; -//use std::future::Future; -//use serde_derive::{Serialize, Deserialize}; -//use serde_json::{Value as SerdeValue, Value as JsonValue}; +use netpod::{Node, Cluster, AggKind}; +use disk::cache::BinParams; -pub async fn host(port: u16) -> Result<(), Error> { - let addr = SocketAddr::from(([0, 0, 0, 0], port)); - let make_service = make_service_fn(|_conn| async { - Ok::<_, Error>(service_fn(data_api_proxy)) +pub async fn host(node: Node, cluster: Cluster) -> Result<(), Error> { + let addr = SocketAddr::from(([0, 0, 0, 0], node.port)); + let make_service = make_service_fn({ + move |_conn| { + let node = node.clone(); + let cluster = cluster.clone(); + async move { + Ok::<_, Error>(service_fn({ + move |req| { + let hc = HostConf { + node: node.clone(), + cluster: cluster.clone(), + }; + data_api_proxy(req, hc) + } + })) + } + } }); Server::bind(&addr).serve(make_service).await?; Ok(()) } -async fn data_api_proxy(req: Request) -> Result, Error> { - match data_api_proxy_try(req).await { +async fn data_api_proxy(req: Request, hconf: HostConf) -> Result, Error> { + match data_api_proxy_try(req, hconf).await { Ok(k) => { Ok(k) } Err(e) => { error!("{:?}", e); @@ -32,7 +44,7 @@ async fn data_api_proxy(req: Request) -> Result, Error> { } } -async fn data_api_proxy_try(req: Request) -> Result, Error> { +async fn data_api_proxy_try(req: Request, hconf: HostConf) -> Result, Error> { let uri = req.uri().clone(); let path = uri.path(); if path == "/api/1/parsed_raw" { @@ -43,6 +55,14 @@ async fn data_api_proxy_try(req: Request) -> Result, Error> Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) } } + else if path == "/api/1/binned" { + if req.method() == Method::GET { + Ok(binned(req, hconf).await?) + } + else { + Ok(response(StatusCode::METHOD_NOT_ALLOWED).body(Body::empty())?) + } + } else { Ok(response(StatusCode::NOT_FOUND).body(Body::empty())?) } @@ -58,6 +78,7 @@ fn response(status: T) -> http::response::Builder .header("access-control-allow-headers", "*") } + async fn parsed_raw(req: Request) -> Result, Error> { let node = todo!("get node from config"); use netpod::AggQuerySingleChannel; @@ -108,3 +129,40 @@ impl hyper::body::HttpBody for BodyStreamWrap { } } + + +async fn binned(req: Request, hconf: HostConf) -> Result, Error> { + let (head, body) = req.into_parts(); + let params = netpod::query_params(head.uri.query()); + + // TODO + // Channel, time range, bin size. + // Try to locate that file in cache, otherwise create it on the fly: + // Look up and parse channel config. + // Extract the relevant channel config entry. + + disk::cache::Query::from_request(&head)?; + let params = BinParams { + node: hconf.node.clone(), + cluster: hconf.cluster.clone(), + }; + let ret = match disk::cache::binned_bytes_for_http(params) { + Ok(s) => { + response(StatusCode::OK) + .body(Body::wrap_stream(s))? + } + Err(e) => { + error!("{:?}", e); + response(StatusCode::INTERNAL_SERVER_ERROR).body(Body::empty())? + } + }; + Ok(ret) +} + + + +#[derive(Clone)] +pub struct HostConf { + node: Node, + cluster: Cluster, +} diff --git a/netpod/Cargo.toml b/netpod/Cargo.toml index cc4b130..3418394 100644 --- a/netpod/Cargo.toml +++ b/netpod/Cargo.toml @@ -6,8 +6,8 @@ edition = "2018" [dependencies] serde = { version = "1.0", features = ["derive"] } -#serde_derive = "1.0" async-channel = "1.6" bytes = "1.0.1" +chrono = { version = "0.4.19", features = ["serde"] } futures-core = "0.3.12" err = { path = "../err" } diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index ecdd28d..7902646 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -1,7 +1,7 @@ use serde::{Serialize, Deserialize}; use err::Error; use std::path::PathBuf; - +use chrono::{DateTime, Utc, TimeZone}; #[derive(Clone, Debug, Serialize, Deserialize)] @@ -103,6 +103,13 @@ impl Node { } } + +#[derive(Clone)] +pub struct Cluster { + pub nodes: Vec, +} + + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct Channel { pub keyspace: u8, @@ -116,6 +123,31 @@ impl Channel { } } + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum TimeRange { + Time { + beg: DateTime, + end: DateTime, + }, + Pulse { + beg: u64, + end: u64, + }, + Nano { + beg: u64, + end: u64, + }, +} + + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct NanoRange { + pub beg: u64, + pub end: u64, +} + + #[test] fn serde_channel() { let _ex = "{\"name\":\"thechannel\",\"backend\":\"thebackend\"}"; @@ -146,3 +178,121 @@ pub mod timeunits { pub const DAY: u64 = HOUR * 24; pub const WEEK: u64 = DAY * 7; } + + + +pub struct BinSpecDimT { + pub count: u64, + pub ts1: u64, + pub ts2: u64, + pub bs: u64, +} + +impl BinSpecDimT { + + pub fn over_range(count: u64, ts1: u64, ts2: u64) -> Self { + use timeunits::*; + assert!(count >= 1); + assert!(count <= 2000); + assert!(ts2 > ts1); + let dt = ts2 - ts1; + assert!(dt <= DAY * 14); + let bs = dt / count; + let thresholds = [ + 2, 10, 100, + 1000, 10_000, 100_000, + MU, MU * 10, MU * 100, + MS, MS * 10, MS * 100, + SEC, SEC * 5, SEC * 10, SEC * 20, + MIN, MIN * 5, MIN * 10, MIN * 20, + HOUR, HOUR * 2, HOUR * 4, HOUR * 12, + DAY, DAY * 2, DAY * 4, DAY * 8, DAY * 16, + WEEK, WEEK * 2, WEEK * 10, WEEK * 60, + ]; + let mut i1 = 0; + let bs = loop { + if i1 >= thresholds.len() { break *thresholds.last().unwrap(); } + let t = thresholds[i1]; + if bs < t { break t; } + i1 += 1; + }; + //info!("INPUT TS {} {}", ts1, ts2); + //info!("chosen binsize: {} {}", i1, bs); + let ts1 = ts1 / bs * bs; + let ts2 = (ts2 + bs - 1) / bs * bs; + //info!("ADJUSTED TS {} {}", ts1, ts2); + BinSpecDimT { + count, + ts1, + ts2, + bs, + } + } + + pub fn get_range(&self, ix: u32) -> NanoRange { + NanoRange { + beg: self.ts1 + ix as u64 * self.bs, + end: self.ts1 + (ix as u64 + 1) * self.bs, + } + } + +} + + +pub struct PreBinnedPatchIterator { + agg_kind: AggKind, +} + +impl PreBinnedPatchIterator { + + pub fn iter_blocks_for_request(range: NanoRange) -> Self { + todo!() + } + +} + +impl Iterator for PreBinnedPatchIterator { + type Item = (); + + fn next(&mut self) -> Option { + todo!() + } + +} + + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum AggKind { + DimXBins1, +} + + +pub fn query_params(q: Option<&str>) -> std::collections::BTreeMap { + let mut map = std::collections::BTreeMap::new(); + match q { + Some(k) => { + for par in k.split("&") { + let mut u = par.split("="); + if let Some(t1) = u.next() { + if let Some(t2) = u.next() { + map.insert(t1.into(), t2.into()); + } + } + } + } + None => { + } + } + map +} + + +pub trait ToNanos { + fn to_nanos(&self) -> u64; +} + +impl ToNanos for DateTime { + fn to_nanos(&self) -> u64 { + self.timestamp() as u64 * timeunits::SEC + self.timestamp_subsec_nanos() as u64 + } +} diff --git a/retrieval/src/bin/retrieval.rs b/retrieval/src/bin/retrieval.rs index 072feba..3017102 100644 --- a/retrieval/src/bin/retrieval.rs +++ b/retrieval/src/bin/retrieval.rs @@ -1,7 +1,7 @@ #[allow(unused_imports)] use tracing::{error, warn, info, debug, trace}; use err::Error; -use netpod::{ChannelConfig, Channel, timeunits::*, ScalarType, Shape, Node}; +use netpod::{ChannelConfig, Channel, timeunits::*, ScalarType, Shape, Node, Cluster}; pub fn main() { match taskrun::run(go()) { @@ -34,7 +34,7 @@ fn simple_fetch() { let t1 = chrono::Utc::now(); let node = Node { host: "localhost".into(), - port: 8888, + port: 8360, data_base_path: todo!(), ksprefix: "daq_swissfel".into(), split: 0, @@ -56,12 +56,15 @@ fn simple_fetch() { tb_file_count: 1, buffer_size: 1024 * 8, }; + let cluster = Cluster { + nodes: vec![node], + }; let query_string = serde_json::to_string(&query).unwrap(); - let _host = tokio::spawn(httpret::host(8360)); + let _host = tokio::spawn(httpret::host(cluster.nodes[0].clone(), cluster.clone())); let req = hyper::Request::builder() .method(http::Method::POST) .uri("http://localhost:8360/api/1/parsed_raw") - .body(query_string.into()).unwrap(); + .body(query_string.into())?; let client = hyper::Client::new(); let res = client.request(req).await?; info!("client response {:?}", res); diff --git a/retrieval/src/lib.rs b/retrieval/src/lib.rs index 4f77372..45ab3b9 100644 --- a/retrieval/src/lib.rs +++ b/retrieval/src/lib.rs @@ -1 +1,78 @@ +#[allow(unused_imports)] +use tracing::{error, warn, info, debug, trace}; +use err::Error; +use tokio::task::JoinHandle; +use netpod::{Node, Cluster}; +use hyper::Body; + pub mod cli; + +#[test] +fn get_cached_0() { + taskrun::run(get_cached_0_inner()).unwrap(); +} + +#[cfg(test)] +async fn get_cached_0_inner() -> Result<(), Error> { + let t1 = chrono::Utc::now(); + let cluster = test_cluster(); + let node0 = &cluster.nodes[0]; + let hosts = spawn_test_hosts(&cluster); + let req = hyper::Request::builder() + .method(http::Method::GET) + .uri(format!("http://{}:{}/api/1/binned?beg_date=1970-01-01T00:00:01.4253Z&end_date=1970-01-01T00:00:04.000Z", node0.host, node0.port)) + .body(Body::empty())?; + let client = hyper::Client::new(); + let res = client.request(req).await?; + info!("client response {:?}", res); + let mut res_body = res.into_body(); + use hyper::body::HttpBody; + let mut ntot = 0 as u64; + loop { + match res_body.data().await { + Some(Ok(k)) => { + //info!("packet.. len {}", k.len()); + ntot += k.len() as u64; + } + Some(Err(e)) => { + error!("{:?}", e); + } + None => { + info!("response stream exhausted"); + break; + } + } + } + let t2 = chrono::Utc::now(); + let ms = t2.signed_duration_since(t1).num_milliseconds() as u64; + let throughput = ntot / 1024 * 1000 / ms; + info!("get_cached_0 DONE total download {} MB throughput {:5} kB/s", ntot / 1024 / 1024, throughput); + //Err::<(), _>(format!("test error").into()) + Ok(()) +} + + +fn test_cluster() -> Cluster { + let nodes = (0..1).into_iter().map(|k| { + Node { + host: "localhost".into(), + port: 8360 + k, + data_base_path: format!("../tmpdata/node{:02}", k).into(), + ksprefix: "ks".into(), + split: 0, + } + }) + .collect(); + Cluster { + nodes: nodes, + } +} + +fn spawn_test_hosts(cluster: &Cluster) -> Vec>> { + let mut ret = vec![]; + for node in &cluster.nodes { + let h = tokio::spawn(httpret::host(node.clone(), cluster.clone())); + ret.push(h); + } + ret +}