diff --git a/dbconn/src/dbconn.rs b/dbconn/src/dbconn.rs index 08d7add..001e89d 100644 --- a/dbconn/src/dbconn.rs +++ b/dbconn/src/dbconn.rs @@ -3,7 +3,7 @@ pub mod scan; pub mod search; pub mod pg { - pub use tokio_postgres::{Client, Error}; + pub use tokio_postgres::{Client, Error, NoTls}; } use err::Error; @@ -11,9 +11,9 @@ use netpod::log::*; use netpod::TableSizes; use netpod::{Channel, Database, NodeConfigCached}; use netpod::{ScalarType, Shape}; +use pg::{Client as PgClient, NoTls}; use std::sync::Arc; use std::time::Duration; -use tokio_postgres::{Client, Client as PgClient, NoTls}; trait ErrConv { fn err_conv(self) -> Result; @@ -49,7 +49,7 @@ pub async fn delay_io_medium() { delay_us(2000).await; } -pub async fn create_connection(db_config: &Database) -> Result { +pub async fn create_connection(db_config: &Database) -> Result { // TODO use a common already running worker pool for these queries: let d = db_config; let uri = format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name); @@ -135,7 +135,7 @@ pub async fn random_channel(node_config: &NodeConfigCached) -> Result Result<(), Error> { +pub async fn insert_channel(name: String, facility: i64, dbc: &PgClient) -> Result<(), Error> { let rows = dbc .query( "select count(rowid) from channels where facility = $1 and name = $2", diff --git a/dbconn/src/scan.rs b/dbconn/src/scan.rs index 526501b..ad209b6 100644 --- a/dbconn/src/scan.rs +++ b/dbconn/src/scan.rs @@ -1,23 +1,37 @@ -use crate::{create_connection, delay_io_medium, delay_io_short, ErrConv}; -use async_channel::{bounded, Receiver}; -use chrono::{DateTime, Utc}; +use crate::create_connection; +use crate::delay_io_medium; +use crate::delay_io_short; +use crate::ErrConv; +use async_channel::bounded; +use async_channel::Receiver; +use chrono::DateTime; +use chrono::Utc; use err::Error; -use futures_util::{FutureExt, Stream}; +use futures_util::FutureExt; +use futures_util::Stream; use netpod::log::*; -use netpod::{Database, NodeConfigCached}; +use netpod::Database; +use netpod::NodeConfigCached; use parse::channelconfig::NErr; use pin_project::pin_project; -use serde::{Deserialize, Serialize}; +use serde::Deserialize; +use serde::Serialize; use std::future::Future; use std::io::ErrorKind; use std::os::unix::ffi::OsStringExt; -use std::path::{Path, PathBuf}; +use std::path::Path; +use std::path::PathBuf; use std::pin::Pin; -use std::sync::{Arc, RwLock}; -use std::task::{Context, Poll}; -use tokio::fs::{DirEntry, ReadDir}; +use std::sync::Arc; +use std::sync::RwLock; +use std::task::Context; +use std::task::Poll; +use tokio::fs::DirEntry; +use tokio::fs::ReadDir; use tokio_postgres::Client; +mod updatechannelnames; + #[derive(Debug, Serialize, Deserialize)] pub struct NodeDiskIdent { pub rowid: i64, @@ -89,9 +103,10 @@ pub async fn get_node_disk_ident_2( pub struct FindChannelNamesFromConfigReadDir { #[pin] read_dir_fut: Option> + Send>>>, - read_dir: Option, #[pin] - dir_entry_fut: Option>> + Send>>>, + read_dir: Option>>, + #[pin] + done: bool, } impl FindChannelNamesFromConfigReadDir { @@ -99,7 +114,7 @@ impl FindChannelNamesFromConfigReadDir { Self { read_dir_fut: Some(Box::pin(tokio::fs::read_dir(base_dir.as_ref().join("config")))), read_dir: None, - dir_entry_fut: None, + done: false, } } } @@ -109,35 +124,36 @@ impl Stream for FindChannelNamesFromConfigReadDir { fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; + let span = span!(Level::INFO, "FindChNameCfgDir"); + let _spg = span.enter(); let mut pself = self.project(); loop { - break if let Some(fut) = pself.dir_entry_fut.as_mut().as_pin_mut() { - match fut.poll(cx) { - Ready(Ok(Some(item))) => { - let g = unsafe { &mut *(pself.read_dir.as_mut().unwrap() as *mut ReadDir) }; - let fut = g.next_entry(); - *pself.dir_entry_fut = Some(Box::pin(fut)); - Ready(Some(Ok(item))) - } + break if *pself.done { + Ready(None) + } else if let Some(mut fut) = pself.read_dir.as_mut().as_pin_mut() { + match fut.poll_next_entry(cx) { + Ready(Ok(Some(item))) => Ready(Some(Ok(item))), Ready(Ok(None)) => { - *pself.dir_entry_fut = None; + *pself.done = true; Ready(None) } - Ready(Err(e)) => Ready(Some(Err(e.into()))), + Ready(Err(e)) => { + *pself.done = true; + Ready(Some(Err(e.into()))) + } Pending => Pending, } } else if let Some(fut) = pself.read_dir_fut.as_mut().as_pin_mut() { match fut.poll(cx) { Ready(Ok(item)) => { *pself.read_dir_fut = None; - *pself.read_dir = Some(item); - //let fut = pself.read_dir.as_mut().unwrap().next_entry(); - let g = unsafe { &mut *(pself.read_dir.as_mut().unwrap() as *mut ReadDir) }; - let fut = g.next_entry(); - *pself.dir_entry_fut = Some(Box::pin(fut)); + *pself.read_dir = Some(Box::pin(item)); continue; } - Ready(Err(e)) => Ready(Some(Err(e.into()))), + Ready(Err(e)) => { + *pself.done = true; + Ready(Some(Err(e.into()))) + } Pending => Pending, } } else { @@ -161,166 +177,7 @@ where Ok(()) } -#[derive(Debug, Serialize, Deserialize)] -pub struct UpdatedDbWithChannelNames { - msg: String, - count: u32, -} - -#[pin_project] -pub struct UpdatedDbWithChannelNamesStream { - errored: bool, - data_complete: bool, - #[allow(dead_code)] - node_config: Pin>, - // TODO can we pass a Pin to the async fn instead of creating static ref? - node_config_ref: &'static NodeConfigCached, - #[pin] - client_fut: Option> + Send>>>, - #[pin] - client: Option, - client_ref: Option<&'static Client>, - #[pin] - ident_fut: Option> + Send>>>, - ident: Option, - #[pin] - find: Option, - #[pin] - update_batch: Option> + Send>>>, - channel_inp_done: bool, - clist: Vec, -} - -impl UpdatedDbWithChannelNamesStream { - pub fn new(node_config: NodeConfigCached) -> Result { - let node_config = Box::pin(node_config.clone()); - let node_config_ref = unsafe { &*(&node_config as &NodeConfigCached as *const _) }; - let mut ret = Self { - errored: false, - data_complete: false, - node_config, - node_config_ref, - client_fut: None, - client: None, - client_ref: None, - ident_fut: None, - ident: None, - find: None, - update_batch: None, - channel_inp_done: false, - clist: Vec::new(), - }; - ret.client_fut = Some(Box::pin(create_connection( - &ret.node_config_ref.node_config.cluster.database, - ))); - Ok(ret) - } -} - -impl Stream for UpdatedDbWithChannelNamesStream { - type Item = Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - use Poll::*; - let mut pself = self.project(); - loop { - break if *pself.errored { - Ready(None) - } else if *pself.data_complete { - Ready(None) - } else if let Some(fut) = pself.find.as_mut().as_pin_mut() { - match fut.poll_next(cx) { - Ready(Some(Ok(item))) => { - pself - .clist - .push(String::from_utf8(item.file_name().into_vec()).unwrap()); - continue; - } - Ready(Some(Err(e))) => { - *pself.errored = true; - Ready(Some(Err(e))) - } - Ready(None) => { - *pself.channel_inp_done = true; - // Work through the collected items - let l = std::mem::replace(pself.clist, Vec::new()); - let fut = update_db_with_channel_name_list( - l, - pself.ident.as_ref().unwrap().facility, - pself.client.as_ref().get_ref().as_ref().unwrap(), - ); - // TODO - //pself.update_batch.replace(Box::pin(fut)); - let _ = fut; - continue; - } - Pending => Pending, - } - } else if let Some(fut) = pself.ident_fut.as_mut().as_pin_mut() { - match fut.poll(cx) { - Ready(Ok(item)) => { - *pself.ident_fut = None; - *pself.ident = Some(item); - let ret = UpdatedDbWithChannelNames { - msg: format!("Got ident {:?}", pself.ident), - count: 43, - }; - let base_path = &pself - .node_config - .node - .sf_databuffer - .as_ref() - .ok_or_else(|| Error::with_msg(format!("missing sf databuffer config in node")))? - .data_base_path; - let s = FindChannelNamesFromConfigReadDir::new(base_path); - *pself.find = Some(s); - Ready(Some(Ok(ret))) - } - Ready(Err(e)) => { - *pself.errored = true; - Ready(Some(Err(e))) - } - Pending => Pending, - } - } else if let Some(fut) = pself.client_fut.as_mut().as_pin_mut() { - match fut.poll(cx) { - Ready(Ok(item)) => { - *pself.client_fut = None; - //*pself.client = Some(Box::pin(item)); - //*pself.client_ref = Some(unsafe { &*(&pself.client.as_ref().unwrap() as &Client as *const _) }); - *pself.client = Some(item); - let c2: &Client = pself.client.as_ref().get_ref().as_ref().unwrap(); - *pself.client_ref = Some(unsafe { &*(c2 as *const _) }); - - //() == pself.node_config.as_ref(); - //() == pself.client.as_ref().as_pin_ref().unwrap(); - /* *pself.ident_fut = Some(Box::pin(get_node_disk_ident_2( - pself.node_config.as_ref(), - pself.client.as_ref().as_pin_ref().unwrap(), - )));*/ - *pself.ident_fut = Some(Box::pin(get_node_disk_ident( - pself.node_config_ref, - pself.client_ref.as_ref().unwrap(), - ))); - let ret = UpdatedDbWithChannelNames { - msg: format!("Client opened connection"), - count: 42, - }; - Ready(Some(Ok(ret))) - } - Ready(Err(e)) => { - *pself.errored = true; - Ready(Some(Err(e))) - } - Pending => Pending, - } - } else { - Ready(None) - }; - } - } -} - +#[allow(unused)] async fn update_db_with_channel_name_list(list: Vec, backend: i64, dbc: &Client) -> Result<(), Error> { delay_io_short().await; dbc.query("begin", &[]).await.err_conv()?; @@ -336,6 +193,12 @@ async fn update_db_with_channel_name_list(list: Vec, backend: i64, dbc: Ok(()) } +#[derive(Debug, Serialize, Deserialize)] +pub struct UpdatedDbWithChannelNames { + msg: String, + count: u32, +} + pub async fn update_db_with_channel_names( node_config: NodeConfigCached, db_config: &Database, @@ -413,8 +276,8 @@ pub async fn update_db_with_channel_names( Ok(rx) } -pub fn update_db_with_channel_names_3<'a>( - node_config: &'a NodeConfigCached, +pub fn update_db_with_channel_names_3( + node_config: &NodeConfigCached, ) -> impl Stream> + 'static { let base_path = &node_config .node @@ -559,7 +422,7 @@ pub enum UpdateChannelConfigResult { /** Parse the config of the given channel and update database. */ -pub async fn update_db_with_channel_config( +async fn update_db_with_channel_config( node_config: &NodeConfigCached, node_disk_ident: &NodeDiskIdent, channel_id: i64, diff --git a/dbconn/src/scan/updatechannelnames.rs b/dbconn/src/scan/updatechannelnames.rs new file mode 100644 index 0000000..a0220da --- /dev/null +++ b/dbconn/src/scan/updatechannelnames.rs @@ -0,0 +1,171 @@ +use super::get_node_disk_ident; +use super::update_db_with_channel_name_list; +use super::FindChannelNamesFromConfigReadDir; +use super::NodeDiskIdent; +use super::UpdatedDbWithChannelNames; +use crate::create_connection; +use crate::pg::Client as PgClient; +use err::Error; +use futures_util::Future; +use futures_util::Stream; +use netpod::NodeConfigCached; +use pin_project::pin_project; +use std::os::unix::prelude::OsStringExt; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +#[pin_project] +struct UpdatedDbWithChannelNamesStream { + errored: bool, + data_complete: bool, + #[allow(dead_code)] + node_config: Pin>, + // TODO can we pass a Pin to the async fn instead of creating static ref? + node_config_ref: &'static NodeConfigCached, + #[pin] + client_fut: Option> + Send>>>, + #[pin] + client: Option, + client_ref: Option<&'static PgClient>, + #[pin] + ident_fut: Option> + Send>>>, + ident: Option, + #[pin] + find: Option, + #[pin] + update_batch: Option> + Send>>>, + channel_inp_done: bool, + clist: Vec, +} + +impl UpdatedDbWithChannelNamesStream { + #[allow(unused)] + fn new(node_config: NodeConfigCached) -> Result { + let node_config = Box::pin(node_config.clone()); + let node_config_ref = unsafe { &*(&node_config as &NodeConfigCached as *const _) }; + let mut ret = Self { + errored: false, + data_complete: false, + node_config, + node_config_ref, + client_fut: None, + client: None, + client_ref: None, + ident_fut: None, + ident: None, + find: None, + update_batch: None, + channel_inp_done: false, + clist: Vec::new(), + }; + ret.client_fut = Some(Box::pin(create_connection( + &ret.node_config_ref.node_config.cluster.database, + ))); + Ok(ret) + } +} + +impl Stream for UpdatedDbWithChannelNamesStream { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + use Poll::*; + let mut pself = self.project(); + loop { + break if *pself.errored { + Ready(None) + } else if *pself.data_complete { + Ready(None) + } else if let Some(fut) = pself.find.as_mut().as_pin_mut() { + match fut.poll_next(cx) { + Ready(Some(Ok(item))) => { + pself + .clist + .push(String::from_utf8(item.file_name().into_vec()).unwrap()); + continue; + } + Ready(Some(Err(e))) => { + *pself.errored = true; + Ready(Some(Err(e))) + } + Ready(None) => { + *pself.channel_inp_done = true; + // Work through the collected items + let l = std::mem::replace(pself.clist, Vec::new()); + let fut = update_db_with_channel_name_list( + l, + pself.ident.as_ref().unwrap().facility, + pself.client.as_ref().get_ref().as_ref().unwrap(), + ); + // TODO + //pself.update_batch.replace(Box::pin(fut)); + let _ = fut; + continue; + } + Pending => Pending, + } + } else if let Some(fut) = pself.ident_fut.as_mut().as_pin_mut() { + match fut.poll(cx) { + Ready(Ok(item)) => { + *pself.ident_fut = None; + *pself.ident = Some(item); + let ret = UpdatedDbWithChannelNames { + msg: format!("Got ident {:?}", pself.ident), + count: 43, + }; + let base_path = &pself + .node_config + .node + .sf_databuffer + .as_ref() + .ok_or_else(|| Error::with_msg(format!("missing sf databuffer config in node")))? + .data_base_path; + let s = FindChannelNamesFromConfigReadDir::new(base_path); + *pself.find = Some(s); + Ready(Some(Ok(ret))) + } + Ready(Err(e)) => { + *pself.errored = true; + Ready(Some(Err(e))) + } + Pending => Pending, + } + } else if let Some(fut) = pself.client_fut.as_mut().as_pin_mut() { + match fut.poll(cx) { + Ready(Ok(item)) => { + *pself.client_fut = None; + //*pself.client = Some(Box::pin(item)); + //*pself.client_ref = Some(unsafe { &*(&pself.client.as_ref().unwrap() as &Client as *const _) }); + *pself.client = Some(item); + let c2: &PgClient = pself.client.as_ref().get_ref().as_ref().unwrap(); + *pself.client_ref = Some(unsafe { &*(c2 as *const _) }); + + //() == pself.node_config.as_ref(); + //() == pself.client.as_ref().as_pin_ref().unwrap(); + /* *pself.ident_fut = Some(Box::pin(get_node_disk_ident_2( + pself.node_config.as_ref(), + pself.client.as_ref().as_pin_ref().unwrap(), + )));*/ + *pself.ident_fut = Some(Box::pin(get_node_disk_ident( + pself.node_config_ref, + pself.client_ref.as_ref().unwrap(), + ))); + let ret = UpdatedDbWithChannelNames { + msg: format!("Client opened connection"), + count: 42, + }; + Ready(Some(Ok(ret))) + } + Ready(Err(e)) => { + *pself.errored = true; + Ready(Some(Err(e))) + } + Pending => Pending, + } + } else { + Ready(None) + }; + } + } +} diff --git a/dbconn/src/search.rs b/dbconn/src/search.rs index 3d2e029..b8c9ea2 100644 --- a/dbconn/src/search.rs +++ b/dbconn/src/search.rs @@ -100,6 +100,7 @@ pub async fn search_channel_scylla( " series, facility, channel, scalar_type, shape_dims", " from series_by_channel", " where channel ~* $1", + " and scalar_type != -2147483647", " limit 400000", )); let pgclient = crate::create_connection(pgconf).await?; diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index bb5f667..c847379 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -134,8 +134,6 @@ impl Stream for EventChunkerMultifile { } } if max >= self.range.end { - info!("REACHED RANGE END, TRUNCATE"); - info!("{:20} ... {:20}", self.range.beg, self.range.end); self.range_final = true; h.truncate_ts(self.range.end); self.evs = None; @@ -187,7 +185,6 @@ impl Stream for EventChunkerMultifile { path.clone(), self.expand, self.do_decompress, - format!("{:?}", path), ); let filtered = RangeFilter::new(chunker, self.range.clone(), self.expand); self.evs = Some(Box::pin(filtered)); @@ -223,7 +220,6 @@ impl Stream for EventChunkerMultifile { of.path.clone(), self.expand, self.do_decompress, - format!("{:?}", of.path), ); chunkers.push(chunker); } diff --git a/disk/src/merge.rs b/disk/src/merge.rs index 97885ad..cf16f63 100644 --- a/disk/src/merge.rs +++ b/disk/src/merge.rs @@ -154,6 +154,8 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; + let span = netpod::log::span!(Level::INFO, "disk::merge"); + let _spg = span.enter(); 'outer: loop { break if self.completed { panic!("poll_next on completed"); @@ -390,7 +392,6 @@ mod test { dbg_path.clone(), expand, do_decompress, - format!("{:?}", dbg_path), ); chunker }) diff --git a/dq/src/bin/dq.rs b/dq/src/bin/dq.rs index 6519600..f2a3158 100644 --- a/dq/src/bin/dq.rs +++ b/dq/src/bin/dq.rs @@ -109,7 +109,6 @@ pub fn main() -> Result<(), Error> { path.clone(), false, true, - format!("{:?}", path), ); err::todo(); Ok(()) diff --git a/httpret/Cargo.toml b/httpret/Cargo.toml index 7356a47..49f36c8 100644 --- a/httpret/Cargo.toml +++ b/httpret/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "httpret" -version = "0.3.6" +version = "0.4.0" authors = ["Dominik Werder "] edition = "2021" diff --git a/httpret/src/httpret.rs b/httpret/src/httpret.rs index b6e9b77..9cf9f26 100644 --- a/httpret/src/httpret.rs +++ b/httpret/src/httpret.rs @@ -477,7 +477,7 @@ async fn prebinned_inner( todo!() } -pub async fn random_channel( +async fn random_channel( req: Request, _ctx: &ReqCtx, node_config: &NodeConfigCached, @@ -488,7 +488,7 @@ pub async fn random_channel( Ok(ret) } -pub async fn clear_cache_all( +async fn clear_cache_all( req: Request, _ctx: &ReqCtx, node_config: &NodeConfigCached, @@ -505,7 +505,7 @@ pub async fn clear_cache_all( Ok(ret) } -pub async fn update_db_with_channel_names( +async fn update_db_with_channel_names( req: Request, _ctx: &ReqCtx, node_config: &NodeConfigCached, @@ -541,7 +541,7 @@ pub async fn update_db_with_channel_names( } } -pub async fn update_db_with_channel_names_3( +async fn update_db_with_channel_names_3( req: Request, _ctx: &ReqCtx, node_config: &NodeConfigCached, @@ -564,7 +564,7 @@ pub async fn update_db_with_channel_names_3( Ok(ret) } -pub async fn update_db_with_all_channel_configs( +async fn update_db_with_all_channel_configs( req: Request, _ctx: &ReqCtx, node_config: &NodeConfigCached, @@ -587,7 +587,7 @@ pub async fn update_db_with_all_channel_configs( Ok(ret) } -pub async fn update_search_cache( +async fn update_search_cache( req: Request, _ctx: &ReqCtx, node_config: &NodeConfigCached, diff --git a/items/src/eventfull.rs b/items/src/eventfull.rs index 19bdd37..6b62a16 100644 --- a/items/src/eventfull.rs +++ b/items/src/eventfull.rs @@ -107,10 +107,6 @@ impl EventFull { let mut nkeep = usize::MAX; for (i, &ts) in self.tss.iter().enumerate() { if ts >= end { - for (i, &ts) in self.tss.iter().enumerate() { - eprintln!("{i:5} {ts:20}"); - } - eprintln!("truncate to i {i} ts {ts}"); nkeep = i; break; } diff --git a/items_2/src/eventsdim0.rs b/items_2/src/eventsdim0.rs index acbdca5..261d67e 100644 --- a/items_2/src/eventsdim0.rs +++ b/items_2/src/eventsdim0.rs @@ -207,7 +207,6 @@ pub struct EventsDim0CollectorOutput { timed_out: bool, #[serde(rename = "continueAt", default, skip_serializing_if = "Option::is_none")] continue_at: Option, - dummy_marker: u32, } impl EventsDim0CollectorOutput { @@ -377,7 +376,6 @@ impl items_0::collect_s::CollectorType for EventsDim0Collector) -> Box; +} + +pub struct IdentityTransform {} + +impl IdentityTransform { + pub fn default() -> Self { + Self {} + } +} + +impl EventTransform for IdentityTransform { + fn transform(&mut self, src: Box) -> Box { + src + } +} diff --git a/items_2/src/items_2.rs b/items_2/src/items_2.rs index 04784e8..c19ae2a 100644 --- a/items_2/src/items_2.rs +++ b/items_2/src/items_2.rs @@ -5,6 +5,7 @@ pub mod databuffereventblobs; pub mod eventsdim0; pub mod eventsdim1; pub mod eventsxbindim0; +pub mod eventtransform; pub mod merger; pub mod streams; #[cfg(test)] @@ -251,7 +252,7 @@ pub fn empty_events_dyn_ev( } }, Shape::Wave(..) => match agg_kind { - AggKind::Plain => { + AggKind::Plain | AggKind::TimeWeightedScalar => { use ScalarType::*; type K = eventsdim1::EventsDim1; match scalar_type { diff --git a/items_2/src/merger.rs b/items_2/src/merger.rs index 74337d9..e372165 100644 --- a/items_2/src/merger.rs +++ b/items_2/src/merger.rs @@ -367,7 +367,7 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; self.poll_count += 1; - let span1 = span!(Level::INFO, "Merger", pc = self.poll_count); + let span1 = span!(Level::TRACE, "Merger", pc = self.poll_count); let _spg = span1.enter(); loop { trace3!("poll"); diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index b2fbeaa..b2bb813 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -3,6 +3,7 @@ pub mod histo; pub mod query; pub mod status; pub mod streamext; +pub mod transform; use crate::log::*; use bytes::Bytes; diff --git a/netpod/src/query.rs b/netpod/src/query.rs index 63d0191..a02a73e 100644 --- a/netpod/src/query.rs +++ b/netpod/src/query.rs @@ -5,6 +5,7 @@ pub mod prebinned; use crate::get_url_query_pairs; use crate::is_false; use crate::log::*; +use crate::transform::Transform; use crate::AggKind; use crate::AppendToUrl; use crate::ByteSize; @@ -86,6 +87,8 @@ pub struct PlainEventsQuery { range: NanoRange, #[serde(default, skip_serializing_if = "Option::is_none")] agg_kind: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + transform: Option, #[serde(default, skip_serializing_if = "Option::is_none", with = "humantime_serde")] timeout: Option, #[serde(default, skip_serializing_if = "Option::is_none")] @@ -114,6 +117,7 @@ impl PlainEventsQuery { channel, range, agg_kind, + transform: None, timeout, events_max, event_delay: None, @@ -156,7 +160,7 @@ impl PlainEventsQuery { } pub fn events_max(&self) -> u64 { - self.events_max.unwrap_or(1024 * 1024) + self.events_max.unwrap_or(1024 * 512) } pub fn event_delay(&self) -> &Option { @@ -206,12 +210,13 @@ impl FromUrl for PlainEventsQuery { let beg_date = pairs.get("begDate").ok_or(Error::with_public_msg("missing begDate"))?; let end_date = pairs.get("endDate").ok_or(Error::with_public_msg("missing endDate"))?; let ret = Self { - channel: Channel::from_pairs(&pairs)?, + channel: Channel::from_pairs(pairs)?, range: NanoRange { beg: beg_date.parse::>()?.to_nanos(), end: end_date.parse::>()?.to_nanos(), }, - agg_kind: agg_kind_from_binning_scheme(&pairs)?, + agg_kind: agg_kind_from_binning_scheme(pairs)?, + transform: Some(Transform::from_pairs(pairs)?), timeout: pairs .get("timeout") .map(|x| x.parse::().map(Duration::from_millis).ok()) @@ -247,6 +252,9 @@ impl AppendToUrl for PlainEventsQuery { fn append_to_url(&self, url: &mut Url) { let date_fmt = "%Y-%m-%dT%H:%M:%S.%6fZ"; self.channel.append_to_url(url); + if let Some(x) = &self.transform { + x.append_to_url(url); + } if let Some(x) = &self.agg_kind { binning_scheme_append_to_url(x, url); } diff --git a/netpod/src/transform.rs b/netpod/src/transform.rs new file mode 100644 index 0000000..a56ef19 --- /dev/null +++ b/netpod/src/transform.rs @@ -0,0 +1,47 @@ +use crate::get_url_query_pairs; +use crate::AppendToUrl; +use crate::FromUrl; +use serde::Deserialize; +use serde::Serialize; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Transform { + array_pick: Option, +} + +impl Transform { + fn url_prefix() -> &'static str { + "transform" + } +} + +impl FromUrl for Transform { + fn from_url(url: &url::Url) -> Result { + let pairs = get_url_query_pairs(url); + Self::from_pairs(&pairs) + } + + fn from_pairs(pairs: &std::collections::BTreeMap) -> Result { + let upre = Self::url_prefix(); + let ret = Self { + array_pick: pairs + .get(&format!("{}ArrayPick", upre)) + .map(|x| match x.parse::() { + Ok(n) => Some(n), + Err(_) => None, + }) + .unwrap_or(None), + }; + Ok(ret) + } +} + +impl AppendToUrl for Transform { + fn append_to_url(&self, url: &mut url::Url) { + let upre = Self::url_prefix(); + let mut g = url.query_pairs_mut(); + if let Some(x) = &self.array_pick { + g.append_pair(&format!("{}ArrayPick", upre), &format!("{}", x)); + } + } +} diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index ff26c0b..19cc1b2 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -249,7 +249,30 @@ async fn events_conn_handler_inner_try( } else { match make_channel_events_stream(evq, node_config).await { Ok(stream) => { - let stream = stream.map(|x| Box::new(x) as _); + let stream = stream + .map({ + use items_2::eventtransform::EventTransform; + let mut tf = items_2::eventtransform::IdentityTransform::default(); + move |item| match item { + Ok(item2) => match item2 { + StreamItem::DataItem(item3) => match item3 { + RangeCompletableItem::Data(item4) => match item4 { + ChannelEvents::Events(item5) => { + let a = tf.transform(item5); + Ok(StreamItem::DataItem(RangeCompletableItem::Data( + ChannelEvents::Events(a), + ))) + } + x => Ok(StreamItem::DataItem(RangeCompletableItem::Data(x))), + }, + x => Ok(StreamItem::DataItem(x)), + }, + x => Ok(x), + }, + _ => item, + } + }) + .map(|x| Box::new(x) as _); Box::pin(stream) } Err(e) => { @@ -266,7 +289,7 @@ async fn events_conn_handler_inner_try( if buf.len() > 1024 * 64 { warn!("emit buf len {}", buf.len()); } else { - trace!("emit buf len {}", buf.len()); + info!("emit buf len {}", buf.len()); } buf_len_histo.ingest(buf.len() as u32); match netout.write_all(&buf).await { diff --git a/streams/src/eventchunker.rs b/streams/src/eventchunker.rs index 017d895..1d58778 100644 --- a/streams/src/eventchunker.rs +++ b/streams/src/eventchunker.rs @@ -40,7 +40,6 @@ pub struct EventChunker { seen_after_range_count: usize, unordered_warn_count: usize, repeated_ts_warn_count: usize, - dbgdesc: String, } impl Drop for EventChunker { @@ -84,7 +83,6 @@ impl EventChunker { dbg_path: PathBuf, expand: bool, do_decompress: bool, - dbgdesc: String, ) -> Self { trace!("EventChunker::from_start"); let mut inp = NeedMinBuffer::new(inp); @@ -113,7 +111,6 @@ impl EventChunker { seen_after_range_count: 0, unordered_warn_count: 0, repeated_ts_warn_count: 0, - dbgdesc, } } @@ -126,18 +123,8 @@ impl EventChunker { dbg_path: PathBuf, expand: bool, do_decompress: bool, - dbgdesc: String, ) -> Self { - let mut ret = Self::from_start( - inp, - channel_config, - range, - stats_conf, - dbg_path, - expand, - do_decompress, - dbgdesc, - ); + let mut ret = Self::from_start(inp, channel_config, range, stats_conf, dbg_path, expand, do_decompress); ret.state = DataFileState::Event; ret.need_min = 4; ret.inp.set_need_min(4); diff --git a/streams/src/frames/eventsfromframes.rs b/streams/src/frames/eventsfromframes.rs index ff6237f..99b1168 100644 --- a/streams/src/frames/eventsfromframes.rs +++ b/streams/src/frames/eventsfromframes.rs @@ -1,7 +1,8 @@ -use super::inmem::InMemoryFrameAsyncReadStream; +use err::Error; use futures_util::Stream; use futures_util::StreamExt; use items::frame::decode_frame; +use items::inmem::InMemoryFrame; use items::FrameTypeInnerStatic; use items::Sitemty; use items::StreamItem; @@ -11,23 +12,16 @@ use std::marker::PhantomData; use std::pin::Pin; use std::task::Context; use std::task::Poll; -use tokio::io::AsyncRead; -pub struct EventsFromFrames -where - T: AsyncRead + Unpin, -{ - inp: InMemoryFrameAsyncReadStream, +pub struct EventsFromFrames { + inp: Pin, Error>> + Send>>, errored: bool, completed: bool, - _m1: PhantomData, + _m1: PhantomData, } -impl EventsFromFrames -where - T: AsyncRead + Unpin, -{ - pub fn new(inp: InMemoryFrameAsyncReadStream) -> Self { +impl EventsFromFrames { + pub fn new(inp: Pin, Error>> + Send>>) -> Self { Self { inp, errored: false, @@ -37,15 +31,16 @@ where } } -impl Stream for EventsFromFrames +impl Stream for EventsFromFrames where - T: AsyncRead + Unpin, - I: FrameTypeInnerStatic + DeserializeOwned + Unpin, + O: FrameTypeInnerStatic + DeserializeOwned + Unpin, { - type Item = Sitemty; + type Item = Sitemty; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; + let span = netpod::log::span!(netpod::log::Level::INFO, "EvFrFr"); + let _spg = span.enter(); loop { break if self.completed { panic!("poll_next on completed"); @@ -57,7 +52,7 @@ where Ready(Some(Ok(item))) => match item { StreamItem::Log(item) => Ready(Some(Ok(StreamItem::Log(item)))), StreamItem::Stats(item) => Ready(Some(Ok(StreamItem::Stats(item)))), - StreamItem::DataItem(frame) => match decode_frame::>(&frame) { + StreamItem::DataItem(frame) => match decode_frame::>(&frame) { Ok(item) => match item { Ok(item) => Ready(Some(Ok(item))), Err(e) => { diff --git a/streams/src/frames/inmem.rs b/streams/src/frames/inmem.rs index 54ef891..5c65556 100644 --- a/streams/src/frames/inmem.rs +++ b/streams/src/frames/inmem.rs @@ -9,6 +9,7 @@ use netpod::log::*; use std::pin::Pin; use std::task::{Context, Poll}; use tokio::io::{AsyncRead, ReadBuf}; +use tracing::Instrument; #[allow(unused)] macro_rules! trace2 { @@ -152,7 +153,7 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { use Poll::*; - let span = span!(Level::TRACE, "inmem"); + let span = span!(Level::INFO, "InMemRd"); let _spanguard = span.enter(); loop { break if self.complete { diff --git a/streams/src/slidebuf.rs b/streams/src/slidebuf.rs index 7b2e14d..aa25174 100644 --- a/streams/src/slidebuf.rs +++ b/streams/src/slidebuf.rs @@ -93,6 +93,9 @@ impl SlideBuf { pub fn wadv(&mut self, x: usize) -> Result<(), Error> { check_invariants!(self); + if self.wcap() < x { + self.rewind(); + } if self.wcap() < x { return Err(Error::NotEnoughSpace); } else { @@ -253,6 +256,9 @@ impl SlideBuf { pub fn ___write_buf___(&mut self, n: usize) -> Result<&mut [u8], Error> { check_invariants!(self); self.rewind_if_needed(n); + if self.wcap() < n { + self.rewind(); + } if self.wcap() < n { Err(Error::NotEnoughSpace) } else { @@ -283,6 +289,9 @@ impl SlideBuf { pub fn available_writable_area(&mut self, need_min: usize) -> Result<&mut [u8], Error> { check_invariants!(self); self.rewind_if_needed(need_min); + if self.wcap() < need_min { + self.rewind(); + } if self.wcap() < need_min { Err(Error::NotEnoughSpace) } else { @@ -294,6 +303,9 @@ impl SlideBuf { pub fn put_slice(&mut self, buf: &[u8]) -> Result<(), Error> { check_invariants!(self); self.rewind_if_needed(buf.len()); + if self.wcap() < buf.len() { + self.rewind(); + } if self.wcap() < buf.len() { return Err(Error::NotEnoughSpace); } else { @@ -308,6 +320,9 @@ impl SlideBuf { type T = u8; const TS: usize = std::mem::size_of::(); self.rewind_if_needed(TS); + if self.wcap() < TS { + self.rewind(); + } if self.wcap() < TS { return Err(Error::NotEnoughSpace); } else { @@ -322,6 +337,9 @@ impl SlideBuf { type T = u16; const TS: usize = std::mem::size_of::(); self.rewind_if_needed(TS); + if self.wcap() < TS { + self.rewind(); + } if self.wcap() < TS { return Err(Error::NotEnoughSpace); } else { @@ -336,6 +354,9 @@ impl SlideBuf { type T = u32; const TS: usize = std::mem::size_of::(); self.rewind_if_needed(TS); + if self.wcap() < TS { + self.rewind(); + } if self.wcap() < TS { return Err(Error::NotEnoughSpace); } else { @@ -350,6 +371,9 @@ impl SlideBuf { type T = u64; const TS: usize = std::mem::size_of::(); self.rewind_if_needed(TS); + if self.wcap() < TS { + self.rewind(); + } if self.wcap() < TS { return Err(Error::NotEnoughSpace); } else { @@ -364,6 +388,9 @@ impl SlideBuf { type T = f32; const TS: usize = std::mem::size_of::(); self.rewind_if_needed(TS); + if self.wcap() < TS { + self.rewind(); + } if self.wcap() < TS { return Err(Error::NotEnoughSpace); } else { @@ -378,6 +405,9 @@ impl SlideBuf { type T = f64; const TS: usize = std::mem::size_of::(); self.rewind_if_needed(TS); + if self.wcap() < TS { + self.rewind(); + } if self.wcap() < TS { return Err(Error::NotEnoughSpace); } else { diff --git a/streams/src/tcprawclient.rs b/streams/src/tcprawclient.rs index 5ca5497..98ee1a9 100644 --- a/streams/src/tcprawclient.rs +++ b/streams/src/tcprawclient.rs @@ -20,6 +20,7 @@ use netpod::{Node, PerfOpts}; use std::pin::Pin; use tokio::io::AsyncWriteExt; use tokio::net::TcpStream; +use tracing::Instrument; pub async fn x_processed_event_blobs_stream_from_node( query: PlainEventsQuery, @@ -43,6 +44,7 @@ pub async fn x_processed_event_blobs_stream_from_node( netout.flush().await?; netout.forget(); let frames = InMemoryFrameAsyncReadStream::new(netin, perf_opts.inmem_bufcap); + let frames = Box::pin(frames) as _; let items = EventsFromFrames::new(frames); Ok(Box::pin(items)) } @@ -71,8 +73,11 @@ where netout.flush().await?; netout.forget(); // TODO for images, we need larger buffer capacity - let frames = InMemoryFrameAsyncReadStream::new(netin, 1024 * 1024 * 2); - let stream = EventsFromFrames::<_, T>::new(frames); + let frames = InMemoryFrameAsyncReadStream::new(netin, 1024 * 1024 * 2) + //.instrument(netpod::log::span!(netpod::log::Level::TRACE, "InMemRd")) + ; + let frames = Box::pin(frames) as _; + let stream = EventsFromFrames::::new(frames); streams.push(Box::pin(stream) as _); } Ok(streams)