diff --git a/disk/src/cache.rs b/disk/src/cache.rs index 9715280..a276b26 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -51,11 +51,41 @@ impl Query { } } -pub fn binned_bytes_for_http(node_config: Arc, query: &Query) -> Result { +pub async fn binned_bytes_for_http( + node_config: Arc, + query: &Query, +) -> Result { let agg_kind = AggKind::DimXBins1; - // TODO - // Translate the Query TimeRange + AggKind into an iterator over the pre-binned patches. + let channel_config = super::channelconfig::read_local_config(&query.channel, node_config.clone()).await?; + let entry; + { + let mut ixs = vec![]; + for i1 in 0..channel_config.entries.len() { + let e1 = &channel_config.entries[i1]; + if i1 + 1 < channel_config.entries.len() { + let e2 = &channel_config.entries[i1 + 1]; + if e1.ts < query.range.end && e2.ts >= query.range.beg { + ixs.push(i1); + } else { + } + } else { + if e1.ts < query.range.end { + ixs.push(i1); + } else { + } + } + } + if ixs.len() == 0 { + return Err(Error::with_msg(format!("no config entries found"))); + } else if ixs.len() > 1 { + return Err(Error::with_msg(format!("too many config entries found: {}", ixs.len()))); + } + entry = &channel_config.entries[ixs[0]]; + } + + info!("found config entry {:?}", entry); + let grid = PreBinnedPatchRange::covering_range(query.range.clone(), query.count); match grid { Some(spec) => { diff --git a/disk/src/channelconfig.rs b/disk/src/channelconfig.rs index e34e8a5..bf3bfc0 100644 --- a/disk/src/channelconfig.rs +++ b/disk/src/channelconfig.rs @@ -1,4 +1,5 @@ use err::Error; +use netpod::{Channel, NodeConfig}; use nom::number::complete::{be_i16, be_i32, be_i64, be_i8, be_u8}; use nom::Needed; #[allow(unused_imports)] @@ -10,6 +11,7 @@ use nom::{ use num_derive::{FromPrimitive, ToPrimitive}; use num_traits::ToPrimitive; use serde::{Deserialize, Serialize}; +use std::sync::Arc; #[allow(unused_imports)] use tracing::{debug, error, info, trace, warn}; @@ -60,7 +62,7 @@ impl CompressionMethod { #[derive(Debug, Serialize, Deserialize)] pub struct ConfigEntry { - pub ts: i64, + pub ts: u64, pub pulse: i64, pub ks: i32, pub bs: i64, @@ -198,7 +200,7 @@ pub fn parse_entry(inp: &[u8]) -> NRes> { Ok(( inp_e, Some(ConfigEntry { - ts, + ts: ts as u64, pulse, ks, bs, @@ -262,31 +264,49 @@ pub fn parse_config(inp: &[u8]) -> NRes { Ok((inp, ret)) } +pub async fn read_local_config(channel: &Channel, node_config: Arc) -> Result { + let path = node_config + .node + .data_base_path + .join("config") + .join(&channel.name) + .join("latest") + .join("00000_Config"); + let buf = tokio::fs::read(&path).await?; + info!("try to parse config {} bytes", buf.len()); + let config = parse_config(&buf)?; + Ok(config.1) +} + #[cfg(test)] -fn read_data() -> Vec { - use std::io::Read; - let path = "ks/config/S10CB01-RLOD100-PUP10:SIG-AMPLT/latest/00000_Config"; - let mut f1 = std::fs::File::open(path).unwrap(); - let mut buf = vec![]; - f1.read_to_end(&mut buf).unwrap(); - buf -} +mod test { + use super::parse_config; -#[test] -fn parse_dummy() { - let config = parse_config(&[0, 0, 0, 0, 0, 11, 0x61, 0x62, 0x63, 0, 0, 0, 11, 0, 0, 0, 1]).unwrap(); - assert_eq!(0, config.1.format_version); - assert_eq!("abc", config.1.channel_name); -} + fn read_data() -> Vec { + use std::io::Read; + let path = "ks/config/S10CB01-RLOD100-PUP10:SIG-AMPLT/latest/00000_Config"; + let mut f1 = std::fs::File::open(path).unwrap(); + let mut buf = vec![]; + f1.read_to_end(&mut buf).unwrap(); + buf + } -#[test] -fn open_file() { - let config = parse_config(&read_data()).unwrap().1; - assert_eq!(0, config.format_version); - assert_eq!(9, config.entries.len()); - for e in &config.entries { - assert!(e.ts >= 631152000000000000); - assert!(e.ts <= 1591106812800073974); - assert!(e.shape.is_some()); + #[test] + fn parse_dummy() { + let config = parse_config(&[0, 0, 0, 0, 0, 11, 0x61, 0x62, 0x63, 0, 0, 0, 11, 0, 0, 0, 1]).unwrap(); + assert_eq!(0, config.1.format_version); + assert_eq!("abc", config.1.channel_name); + } + + #[test] + fn open_file() { + let config = parse_config(&read_data()).unwrap().1; + assert_eq!(0, config.format_version); + assert_eq!(9, config.entries.len()); + for e in &config.entries { + assert!(e.ts >= 631152000000000000); + assert!(e.ts <= 1591106812800073974); + assert!(e.shape.is_some()); + } } } diff --git a/disk/src/gen.rs b/disk/src/gen.rs index d0e66a6..a66c6f3 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -42,7 +42,7 @@ pub async fn gen_test_data() -> Result<(), Error> { big_endian: true, compression: true, }, - time_spacing: MS * 10, + time_spacing: MS * 2000, }; ensemble.channels.push(chn); } @@ -150,8 +150,8 @@ async fn gen_config( { // this len does not include itself and there seems to be no copy of it afterwards. - buf.put_i32(0x20202020); let p3 = buf.len(); + buf.put_i32(404040); buf.put_u8(config.dtflags()); buf.put_u8(config.scalar_type.index()); if config.compression { @@ -161,13 +161,25 @@ async fn gen_config( match config.shape { Shape::Scalar => {} Shape::Wave(k) => { + buf.put_i8(1); buf.put_i32(k as i32); } } - let len = buf.len() - p3; + let len = buf.len() - p3 - 4; buf.as_mut()[p3..].as_mut().put_i32(len as i32); } + // source name + buf.put_i32(-1); + // unit + buf.put_i32(-1); + // description + buf.put_i32(-1); + // optional fields + buf.put_i32(-1); + // value converter + buf.put_i32(-1); + let p2 = buf.len(); let len = p2 - p1 + 4; buf.put_i32(len as i32); diff --git a/err/Cargo.toml b/err/Cargo.toml index f274563..01e86de 100644 --- a/err/Cargo.toml +++ b/err/Cargo.toml @@ -8,6 +8,7 @@ edition = "2018" hyper = { version = "0.14", features = ["http1", "http2", "client", "server", "tcp"] } http = "0.2" tokio = { version = "1.5.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } +backtrace = "0.3.56" serde_json = "1.0" async-channel = "1.6" chrono = { version = "0.4.19", features = ["serde"] } diff --git a/err/src/lib.rs b/err/src/lib.rs index 8c78d3e..e76b291 100644 --- a/err/src/lib.rs +++ b/err/src/lib.rs @@ -4,32 +4,41 @@ use std::num::ParseIntError; use std::string::FromUtf8Error; use tokio::task::JoinError; -#[derive(Debug)] pub struct Error { msg: String, + trace: backtrace::Backtrace, } impl Error { pub fn with_msg>(s: S) -> Self { - Self { msg: s.into() } + Self { + msg: s.into(), + trace: backtrace::Backtrace::new(), + } } } -impl From for Error { - fn from(k: String) -> Self { - Self { msg: k } - } -} - -impl From<&str> for Error { - fn from(k: &str) -> Self { - Self { msg: k.into() } +impl std::fmt::Debug for Error { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(fmt, "Error {} backtrace:\n{:?}", self.msg, self.trace) } } impl std::fmt::Display for Error { fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(fmt, "Error") + write!(fmt, "Error {} backtrace:\n{:?}", self.msg, self.trace) + } +} + +impl From for Error { + fn from(k: String) -> Self { + Self::with_msg(k) + } +} + +impl From<&str> for Error { + fn from(k: &str) -> Self { + Self::with_msg(k) } } @@ -37,49 +46,49 @@ impl std::error::Error for Error {} impl From for Error { fn from(k: std::io::Error) -> Self { - Self { msg: k.to_string() } + Self::with_msg(k.to_string()) } } impl From for Error { fn from(k: http::Error) -> Self { - Self { msg: k.to_string() } + Self::with_msg(k.to_string()) } } impl From for Error { fn from(k: hyper::Error) -> Self { - Self { msg: k.to_string() } + Self::with_msg(k.to_string()) } } impl From for Error { fn from(k: serde_json::Error) -> Self { - Self { msg: k.to_string() } + Self::with_msg(k.to_string()) } } impl From for Error { fn from(k: async_channel::RecvError) -> Self { - Self { msg: k.to_string() } + Self::with_msg(k.to_string()) } } impl From for Error { fn from(k: chrono::format::ParseError) -> Self { - Self { msg: k.to_string() } + Self::with_msg(k.to_string()) } } impl From for Error { fn from(k: ParseIntError) -> Self { - Self { msg: k.to_string() } + Self::with_msg(k.to_string()) } } impl From for Error { fn from(k: FromUtf8Error) -> Self { - Self { msg: k.to_string() } + Self::with_msg(k.to_string()) } } @@ -106,8 +115,5 @@ impl From for Error { } pub fn todoval() -> T { - if true { - todo!("TODO todoval"); - } - todo!() + todo!("TODO todoval") } diff --git a/httpret/src/lib.rs b/httpret/src/lib.rs index 5184625..1a9167f 100644 --- a/httpret/src/lib.rs +++ b/httpret/src/lib.rs @@ -216,16 +216,8 @@ where async fn binned(req: Request, node_config: Arc) -> Result, Error> { info!("-------------------------------------------------------- BINNED"); let (head, _body) = req.into_parts(); - //let params = netpod::query_params(head.uri.query()); - - // TODO - // Channel, time range, bin size. - // Try to locate that file in cache, otherwise create it on the fly: - // Look up and parse channel config. - // Extract the relevant channel config entry. - let query = disk::cache::Query::from_request(&head)?; - let ret = match disk::cache::binned_bytes_for_http(node_config, &query) { + let ret = match disk::cache::binned_bytes_for_http(node_config, &query).await { Ok(s) => response(StatusCode::OK).body(BodyStream::wrapped(s))?, Err(e) => { error!("{:?}", e); diff --git a/retrieval/src/test.rs b/retrieval/src/test.rs index 37ed6ff..52aa172 100644 --- a/retrieval/src/test.rs +++ b/retrieval/src/test.rs @@ -1,4 +1,5 @@ use crate::spawn_test_hosts; +use chrono::Utc; use err::Error; use hyper::Body; use netpod::{Cluster, Node}; @@ -35,13 +36,24 @@ async fn get_cached_0_inner() -> Result<(), Error> { let cluster = Arc::new(test_cluster()); let node0 = &cluster.nodes[0]; let hosts = spawn_test_hosts(cluster.clone()); + let beg_date: chrono::DateTime = "1970-01-01T00:00:10.000Z".parse()?; + let end_date: chrono::DateTime = "1970-01-01T00:00:51.000Z".parse()?; + let channel = "wave1"; + let date_fmt = "%Y-%m-%dT%H:%M:%S%.3fZ"; + let uri = format!( + "http://{}:{}/api/1/binned?channel_backend=testbackend&channel_name={}&bin_count=4&beg_date={}&end_date={}", + node0.host, + node0.port, + channel, + beg_date.format(date_fmt), + end_date.format(date_fmt), + ); + info!("URI {:?}", uri); let req = hyper::Request::builder() - .method(http::Method::GET) - .uri(format!( - "http://{}:{}/api/1/binned?channel_backend=testbackend&channel_keyspace=3&channel_name=wave1&bin_count=4&beg_date=1970-01-01T00:00:10.000Z&end_date=1970-01-01T00:00:51.000Z", - node0.host, node0.port - )) - .body(Body::empty())?; + .method(http::Method::GET) + .uri(uri) + .body(Body::empty())?; + info!("Request for {:?}", req); let client = hyper::Client::new(); let res = client.request(req).await?; info!("client response {:?}", res); diff --git a/taskrun/src/lib.rs b/taskrun/src/lib.rs index 232c1ac..89e9b3f 100644 --- a/taskrun/src/lib.rs +++ b/taskrun/src/lib.rs @@ -7,7 +7,7 @@ use tracing::{debug, error, info, trace, warn}; pub fn run>>(f: F) -> Result { tracing_init(); - tokio::runtime::Builder::new_multi_thread() + let res = tokio::runtime::Builder::new_multi_thread() .worker_threads(12) .max_blocking_threads(256) .enable_all() @@ -25,7 +25,14 @@ pub fn run>>(f: F) -> Result }) .build() .unwrap() - .block_on(async { f.await }) + .block_on(async { f.await }); + match res { + Ok(k) => Ok(k), + Err(e) => { + error!("{:?}", e); + Err(e) + } + } } pub fn tracing_init() {