diff --git a/disk/Cargo.toml b/disk/Cargo.toml index ceea577..bf97a01 100644 --- a/disk/Cargo.toml +++ b/disk/Cargo.toml @@ -7,6 +7,7 @@ edition = "2018" [dependencies] serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" +serde_cbor = "0.11.1" http = "0.2" chrono = { version = "0.4.19", features = ["serde"] } tokio = { version = "1.5.0", features = ["rt-multi-thread", "io-util", "net", "time", "sync", "fs"] } diff --git a/disk/src/cache.rs b/disk/src/cache.rs index 1813e87..0d5883c 100644 --- a/disk/src/cache.rs +++ b/disk/src/cache.rs @@ -1,4 +1,5 @@ use crate::agg::eventbatch::MinMaxAvgScalarEventBatchStreamItem; +use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatch; use crate::binnedstream::BinnedStream; use crate::cache::pbv::PreBinnedValueByteStream; use crate::channelconfig::{extract_matching_config_entry, read_local_config}; @@ -15,12 +16,15 @@ use netpod::{ AggKind, BinnedRange, Channel, Cluster, NanoRange, NodeConfigCached, PreBinnedPatchCoord, PreBinnedPatchIterator, PreBinnedPatchRange, ToNanos, }; +use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; use std::future::Future; +use std::path::PathBuf; use std::pin::Pin; use std::task::{Context, Poll}; use tiny_keccak::Hasher; -use tokio::io::{AsyncRead, ReadBuf}; +use tokio::fs::OpenOptions; +use tokio::io::{AsyncRead, AsyncWriteExt, ReadBuf}; #[allow(unused_imports)] use tracing::{debug, error, info, trace, warn}; @@ -244,8 +248,16 @@ pub fn pre_binned_bytes_for_http( query: &PreBinnedQuery, ) -> Result { info!("pre_binned_bytes_for_http {:?} {:?}", query, node_config.node); - let ret = super::cache::pbv::pre_binned_value_byte_stream_new(query, node_config); - Ok(ret) + let patch_node_ix = node_ix_for_patch(&query.patch, &query.channel, &node_config.node_config.cluster); + if node_config.ix as u32 != patch_node_ix { + Err(Error::with_msg(format!( + "pre_binned_bytes_for_http node mismatch node_config.ix {} patch_node_ix {}", + node_config.ix, patch_node_ix + ))) + } else { + let ret = super::cache::pbv::pre_binned_value_byte_stream_new(query, node_config); + Ok(ret) + } } pub struct HttpBodyAsAsyncRead { @@ -432,3 +444,80 @@ pub fn node_ix_for_patch(patch_coord: &PreBinnedPatchCoord, channel: &Channel, c let ix = u32::from_le_bytes(a) % cluster.nodes.len() as u32; ix } + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct CacheFileDesc { + // What identifies a cached file? + channel: Channel, + agg_kind: AggKind, + patch: PreBinnedPatchCoord, +} + +impl CacheFileDesc { + pub fn hash(&self) -> String { + let mut h = tiny_keccak::Sha3::v256(); + h.update(b"V000"); + h.update(self.channel.backend.as_bytes()); + h.update(self.channel.name.as_bytes()); + h.update(format!("{:?}", self.agg_kind).as_bytes()); + h.update(&self.patch.spec().bin_t_len().to_le_bytes()); + h.update(&self.patch.spec().patch_t_len().to_le_bytes()); + h.update(&self.patch.ix().to_le_bytes()); + let mut buf = [0; 32]; + h.finalize(&mut buf); + hex::encode(&buf) + } + + pub fn hash_channel(&self) -> String { + let mut h = tiny_keccak::Sha3::v256(); + h.update(b"V000"); + h.update(self.channel.backend.as_bytes()); + h.update(self.channel.name.as_bytes()); + let mut buf = [0; 32]; + h.finalize(&mut buf); + hex::encode(&buf) + } + + pub fn path(&self, node_config: &NodeConfigCached) -> PathBuf { + let hash = self.hash(); + let hc = self.hash_channel(); + node_config + .node + .data_base_path + .join("cache") + .join(&hc[0..3]) + .join(&hc[3..6]) + .join(&self.channel.name) + .join(format!("{:?}", self.agg_kind)) + .join(format!("{:019}", self.patch.spec().bin_t_len())) + .join(&hash[0..2]) + .join(format!("{:019}", self.patch.ix())) + } +} + +pub async fn write_pb_cache_min_max_avg_scalar( + values: MinMaxAvgScalarBinBatch, + patch: PreBinnedPatchCoord, + agg_kind: AggKind, + channel: Channel, + node_config: NodeConfigCached, +) -> Result<(), Error> { + let cfd = CacheFileDesc { + channel: channel.clone(), + patch: patch.clone(), + agg_kind: agg_kind.clone(), + }; + let path = cfd.path(&node_config); + info!("Writing cache file\n{:?}\npath: {:?}", cfd, path); + let enc = serde_cbor::to_vec(&values)?; + info!("Encoded size: {}", enc.len()); + tokio::fs::create_dir_all(path.parent().unwrap()).await?; + let mut f = OpenOptions::new() + .truncate(true) + .create(true) + .write(true) + .open(&path) + .await?; + f.write_all(&enc).await?; + Ok(()) +} diff --git a/disk/src/cache/pbv.rs b/disk/src/cache/pbv.rs index d9cab9d..875511f 100644 --- a/disk/src/cache/pbv.rs +++ b/disk/src/cache/pbv.rs @@ -1,13 +1,14 @@ use crate::agg::binnedt::IntoBinnedT; -use crate::agg::scalarbinbatch::MinMaxAvgScalarBinBatchStreamItem; +use crate::agg::scalarbinbatch::{MinMaxAvgScalarBinBatch, MinMaxAvgScalarBinBatchStreamItem}; use crate::cache::pbvfs::{PreBinnedItem, PreBinnedValueFetchedStream}; -use crate::cache::{node_ix_for_patch, MergedFromRemotes, PreBinnedQuery}; +use crate::cache::{MergedFromRemotes, PreBinnedQuery}; use crate::frame::makeframe::make_frame; use crate::raw::EventsQuery; use crate::streamlog::Streamlog; use bytes::Bytes; use err::Error; use futures_core::Stream; +use futures_util::pin_mut; use futures_util::{FutureExt, StreamExt}; use netpod::log::*; use netpod::streamext::SCC; @@ -58,12 +59,12 @@ pub struct PreBinnedValueStream { errored: bool, completed: bool, streamlog: Streamlog, + values: MinMaxAvgScalarBinBatch, + write_fut: Option> + Send>>>, } impl PreBinnedValueStream { pub fn new(query: PreBinnedQuery, node_config: &NodeConfigCached) -> Self { - // TODO check that we are the correct node. - let _node_ix = node_ix_for_patch(&query.patch, &query.channel, &node_config.node_config.cluster); Self { query, node_config: node_config.clone(), @@ -74,7 +75,9 @@ impl PreBinnedValueStream { range_complete_emitted: false, errored: false, completed: false, - streamlog: Streamlog::new(), + streamlog: Streamlog::new(node_config.ix as u32), + values: MinMaxAvgScalarBinBatch::empty(), + write_fut: None, } } @@ -182,32 +185,49 @@ impl Stream for PreBinnedValueStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - if self.completed { - panic!("PreBinnedValueStream poll_next on completed"); - } - if self.errored { - self.completed = true; - return Ready(None); - } - if let Some(item) = self.streamlog.pop() { - return Ready(Some(Ok(PreBinnedItem::Log(item)))); - } 'outer: loop { - break if self.data_complete { + break if self.completed { + panic!("PreBinnedValueStream poll_next on completed"); + } else if self.errored { + self.completed = true; + Ready(None) + } else if let Some(item) = self.streamlog.pop() { + Ready(Some(Ok(PreBinnedItem::Log(item)))) + } else if let Some(fut) = &mut self.write_fut { + pin_mut!(fut); + match fut.poll(cx) { + Ready(Ok(())) => { + self.write_fut = None; + self.streamlog.append(Level::INFO, format!("cache file written")); + continue 'outer; + } + Ready(Err(e)) => { + self.errored = true; + Ready(Some(Err(e))) + } + Pending => Pending, + } + } else if self.data_complete { if self.range_complete_observed { if self.range_complete_emitted { self.completed = true; Ready(None) } else { let msg = format!( - "======== STREAMLOG ========= WRITE CACHE FILE\n{:?}\n\n\n", - self.query.patch + "Write cache file\n{:?}\nN: {}\n\n\n", + self.query.patch, + self.values.ts1s.len() ); self.streamlog.append(Level::INFO, msg); - info!( - "======================== WRITE CACHE FILE\n{:?}\n\n\n", - self.query.patch + let values = std::mem::replace(&mut self.values, MinMaxAvgScalarBinBatch::empty()); + let fut = super::write_pb_cache_min_max_avg_scalar( + values, + self.query.patch.clone(), + self.query.agg_kind.clone(), + self.query.channel.clone(), + self.node_config.clone(), ); + self.write_fut = Some(Box::pin(fut)); self.range_complete_emitted = true; Ready(Some(Ok(PreBinnedItem::RangeComplete))) } @@ -220,10 +240,17 @@ impl Stream for PreBinnedValueStream { Ready(Some(k)) => match k { Ok(PreBinnedItem::RangeComplete) => { self.range_complete_observed = true; - //Ready(Some(Ok(PreBinnedItem::RangeComplete))) continue 'outer; } - Ok(PreBinnedItem::Batch(batch)) => Ready(Some(Ok(PreBinnedItem::Batch(batch)))), + Ok(PreBinnedItem::Batch(batch)) => { + self.values.ts1s.extend(batch.ts1s.iter()); + self.values.ts2s.extend(batch.ts2s.iter()); + self.values.counts.extend(batch.counts.iter()); + self.values.mins.extend(batch.mins.iter()); + self.values.maxs.extend(batch.maxs.iter()); + self.values.avgs.extend(batch.avgs.iter()); + Ready(Some(Ok(PreBinnedItem::Batch(batch)))) + } Ok(PreBinnedItem::EventDataReadStats(stats)) => { Ready(Some(Ok(PreBinnedItem::EventDataReadStats(stats)))) } diff --git a/disk/src/streamlog.rs b/disk/src/streamlog.rs index d890474..5c856fa 100644 --- a/disk/src/streamlog.rs +++ b/disk/src/streamlog.rs @@ -6,6 +6,7 @@ use std::fmt::Formatter; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct LogItem { + node_ix: u32, #[serde(with = "levelserde")] level: Level, msg: String, @@ -17,7 +18,7 @@ impl<'de> Visitor<'de> for VisitLevel { type Value = u32; fn expecting(&self, fmt: &mut Formatter) -> std::fmt::Result { - write!(fmt, "") + write!(fmt, "expect u32 Level code") } fn visit_u32(self, v: u32) -> Result @@ -75,19 +76,47 @@ mod levelserde { pub struct Streamlog { items: VecDeque, + node_ix: u32, } impl Streamlog { - pub fn new() -> Self { - Self { items: VecDeque::new() } + pub fn new(node_ix: u32) -> Self { + Self { + items: VecDeque::new(), + node_ix, + } } pub fn append(&mut self, level: Level, msg: String) { - let item = LogItem { level, msg }; + let item = LogItem { + node_ix: self.node_ix, + level, + msg, + }; self.items.push_back(item); } pub fn pop(&mut self) -> Option { self.items.pop_back() } + + pub fn emit(item: &LogItem) { + match item.level { + Level::ERROR => { + error!("StreamLog Node {} {}", item.node_ix, item.msg); + } + Level::WARN => { + warn!("StreamLog Node {} {}", item.node_ix, item.msg); + } + Level::INFO => { + info!("StreamLog Node {} {}", item.node_ix, item.msg); + } + Level::DEBUG => { + debug!("StreamLog Node {} {}", item.node_ix, item.msg); + } + Level::TRACE => { + trace!("StreamLog Node {} {}", item.node_ix, item.msg); + } + } + } } diff --git a/err/Cargo.toml b/err/Cargo.toml index 7590356..4ae8711 100644 --- a/err/Cargo.toml +++ b/err/Cargo.toml @@ -16,3 +16,4 @@ async-channel = "1.6" chrono = { version = "0.4.19", features = ["serde"] } nom = "6.1.2" tokio-postgres = { version = "0.7", features = ["runtime", "with-chrono-0_4", "with-serde_json-1"] } +serde_cbor = "0.11.1" diff --git a/err/src/lib.rs b/err/src/lib.rs index 183f9e6..663b9d1 100644 --- a/err/src/lib.rs +++ b/err/src/lib.rs @@ -210,6 +210,12 @@ impl From for Error { } } +impl From for Error { + fn from(k: serde_cbor::Error) -> Self { + Self::with_msg(k.to_string()) + } +} + pub fn todo() { todo!("TODO"); } diff --git a/netpod/Cargo.toml b/netpod/Cargo.toml index 6a9ca92..45f0cfc 100644 --- a/netpod/Cargo.toml +++ b/netpod/Cargo.toml @@ -13,4 +13,5 @@ futures-core = "0.3.12" futures-util = "0.3.14" tracing = "0.1.25" url = "2.2" +lazy_static = "1.4.0" err = { path = "../err" } diff --git a/netpod/src/lib.rs b/netpod/src/lib.rs index 2e28938..249e0a8 100644 --- a/netpod/src/lib.rs +++ b/netpod/src/lib.rs @@ -128,18 +128,22 @@ pub struct NodeConfig { } impl NodeConfig { - pub fn get_node(&self) -> Option<&Node> { + pub fn get_node(&self) -> Option<(&Node, usize)> { if self.name.contains(":") { + let mut i1 = 0; for n in &self.cluster.nodes { if self.name == format!("{}:{}", n.host, n.port) { - return Some(n); + return Some((n, i1)); } + i1 += 1; } } else { + let mut i1 = 0; for n in &self.cluster.nodes { if self.name == format!("{}", n.host) { - return Some(n); + return Some((n, i1)); } + i1 += 1; } } None @@ -156,11 +160,11 @@ pub struct NodeConfigCached { impl From for Result { fn from(k: NodeConfig) -> Self { match k.get_node() { - Some(node) => { + Some((node, ix)) => { let ret = NodeConfigCached { node: node.clone(), node_config: k, - ix: 0, + ix, }; Ok(ret) } @@ -284,7 +288,7 @@ const BIN_THRESHOLDS: [u64; 33] = [ WEEK * 60, ]; -#[derive(Clone)] +#[derive(Clone, Serialize, Deserialize)] pub struct PreBinnedPatchGridSpec { bin_t_len: u64, } @@ -374,7 +378,7 @@ impl PreBinnedPatchRange { } } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct PreBinnedPatchCoord { spec: PreBinnedPatchGridSpec, ix: u64, @@ -405,7 +409,7 @@ impl PreBinnedPatchCoord { } pub fn bin_count(&self) -> u64 { - self.patch_t_len() / self.spec.bin_t_len + self.spec.patch_t_len() / self.spec.bin_t_len } pub fn spec(&self) -> &PreBinnedPatchGridSpec { @@ -645,7 +649,7 @@ where pub mod log { #[allow(unused_imports)] - pub use tracing::{debug, error, info, span, trace, warn, Level}; + pub use tracing::{debug, error, event, info, span, trace, warn, Level}; } #[derive(Debug, Serialize, Deserialize)] diff --git a/retrieval/src/test.rs b/retrieval/src/test.rs index 995f42f..f6b30f7 100644 --- a/retrieval/src/test.rs +++ b/retrieval/src/test.rs @@ -3,6 +3,7 @@ use bytes::BytesMut; use chrono::{DateTime, Utc}; use disk::agg::scalarbinbatch::MinMaxAvgScalarBinBatchStreamItem; use disk::frame::inmem::InMemoryFrameAsyncReadStream; +use disk::streamlog::Streamlog; use err::Error; use futures_util::StreamExt; use futures_util::TryStreamExt; @@ -144,10 +145,16 @@ where //info!("TEST GOT FRAME len {}", frame.buf().len()); match bincode::deserialize::(frame.buf()) { Ok(item) => match item { - Ok(item) => { - info!("TEST GOT ITEM {:?}\n", item); - Some(Ok(item)) - } + Ok(item) => match item { + MinMaxAvgScalarBinBatchStreamItem::Log(item) => { + Streamlog::emit(&item); + Some(Ok(MinMaxAvgScalarBinBatchStreamItem::Log(item))) + } + item => { + info!("TEST GOT ITEM {:?}\n", item); + Some(Ok(item)) + } + }, Err(e) => { error!("TEST GOT ERROR FRAME: {:?}", e); Some(Err(e))