diff --git a/daqbuffer/src/bin/daqbuffer.rs b/daqbuffer/src/bin/daqbuffer.rs index 87c40b2..cf6d8a8 100644 --- a/daqbuffer/src/bin/daqbuffer.rs +++ b/daqbuffer/src/bin/daqbuffer.rs @@ -10,10 +10,10 @@ use disk::SfDbChConf; use err::Error; use netpod::log::*; use netpod::query::CacheUsage; +use netpod::DtNano; use netpod::NodeConfig; use netpod::NodeConfigCached; use netpod::ProxyConfig; -use netpod::TsNano; use tokio::fs::File; use tokio::io::AsyncReadExt; @@ -138,7 +138,7 @@ fn simple_fetch() { channel_config: SfDbChConf { channel: SfDbChannel::from_name("sf-databuffer", "S10BC01-DBAM070:BAM_CH1_NORM"), keyspace: 3, - time_bin_size: TsNano(DAY), + time_bin_size: DtNano::from_ns(DAY), array: true, scalar_type: ScalarType::F64, shape: Shape::Wave(42), diff --git a/daqbufp2/src/test/api1/data_api_python.rs b/daqbufp2/src/test/api1/data_api_python.rs index a90b090..da85c4f 100644 --- a/daqbufp2/src/test/api1/data_api_python.rs +++ b/daqbufp2/src/test/api1/data_api_python.rs @@ -40,6 +40,7 @@ async fn fetch_data_api_python_blob( "endDate": end_date, }, "channels": channels.iter().map(|x| x.name()).collect::>(), + "create_errors": "nodenet_parse_query", }); let query_str = serde_json::to_string_pretty(&query)?; let hp = HostPort::from_node(node0); diff --git a/dbconn/src/dbconn.rs b/dbconn/src/dbconn.rs index e52b37e..6415a5c 100644 --- a/dbconn/src/dbconn.rs +++ b/dbconn/src/dbconn.rs @@ -71,10 +71,10 @@ pub async fn create_connection(db_config: &Database) -> Result Ok(cl) } -pub async fn channel_exists(channel: &SfDbChannel, node_config: &NodeConfigCached) -> Result { +pub async fn channel_exists(channel_name: &str, node_config: &NodeConfigCached) -> Result { let cl = create_connection(&node_config.node_config.cluster.database).await?; let rows = cl - .query("select rowid from channels where name = $1::text", &[&channel.name()]) + .query("select rowid from channels where name = $1::text", &[&channel_name]) .await .err_conv()?; debug!("channel_exists {} rows", rows.len()); diff --git a/disk/src/aggtest.rs b/disk/src/aggtest.rs index dd359ce..96ac227 100644 --- a/disk/src/aggtest.rs +++ b/disk/src/aggtest.rs @@ -9,13 +9,13 @@ use netpod::timeunits::*; use netpod::ByteOrder; use netpod::ByteSize; use netpod::DiskIoTune; +use netpod::DtNano; use netpod::Node; use netpod::ScalarType; use netpod::SfChFetchInfo; use netpod::SfDatabuffer; use netpod::SfDbChannel; use netpod::Shape; -use netpod::TsNano; pub fn make_test_node(id: u32) -> Node { Node { @@ -51,7 +51,7 @@ async fn agg_x_dim_0_inner() { channel_config: SfDbChConf { channel: SfDbChannel::from_name("sf-databuffer", "S10BC01-DBAM070:EOM1_T1"), keyspace: 2, - time_bin_size: TsNano(DAY), + time_bin_size: DtNano::from_ns(DAY), array: false, shape: Shape::Scalar, scalar_type: ScalarType::F64, @@ -66,13 +66,13 @@ async fn agg_x_dim_0_inner() { "sf-databuffer", "S10BC01-DBAM070:EOM1_T1", 2, - TsNano(DAY), + DtNano::from_ns(DAY), ByteOrder::Big, ScalarType::F64, Shape::Scalar, ); let _bin_count = 20; - let ts1 = query.timebin as u64 * query.channel_config.time_bin_size.0; + let ts1 = query.timebin as u64 * query.channel_config.time_bin_size.ns(); let ts2 = ts1 + HOUR * 24; let range = NanoRange { beg: ts1, end: ts2 }; let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); @@ -113,7 +113,7 @@ async fn agg_x_dim_1_inner() { channel_config: SfDbChConf { channel: SfDbChannel::from_name("ks", "wave1"), keyspace: 3, - time_bin_size: TsNano(DAY), + time_bin_size: DtNano::from_ns(DAY), array: true, shape: Shape::Wave(1024), scalar_type: ScalarType::F64, @@ -128,13 +128,13 @@ async fn agg_x_dim_1_inner() { "ks", "wave1", 2, - TsNano(DAY), + DtNano::from_ns(DAY), ByteOrder::Big, ScalarType::F64, Shape::Scalar, ); let _bin_count = 10; - let ts1 = query.timebin as u64 * query.channel_config.time_bin_size.0; + let ts1 = query.timebin as u64 * query.channel_config.time_bin_size.ns(); let ts2 = ts1 + HOUR * 24; let range = NanoRange { beg: ts1, end: ts2 }; let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); diff --git a/disk/src/disk.rs b/disk/src/disk.rs index 26e9f25..c897836 100644 --- a/disk/src/disk.rs +++ b/disk/src/disk.rs @@ -28,12 +28,12 @@ use futures_util::TryFutureExt; use netpod::log::*; use netpod::ByteOrder; use netpod::DiskIoTune; +use netpod::DtNano; use netpod::Node; use netpod::ReadSys; use netpod::ScalarType; use netpod::SfDbChannel; use netpod::Shape; -use netpod::TsNano; use serde::Deserialize; use serde::Serialize; use std::collections::VecDeque; @@ -65,7 +65,7 @@ use tokio::sync::mpsc; pub struct SfDbChConf { pub channel: SfDbChannel, pub keyspace: u8, - pub time_bin_size: TsNano, + pub time_bin_size: DtNano, pub scalar_type: ScalarType, pub compression: bool, pub shape: Shape, diff --git a/disk/src/gen.rs b/disk/src/gen.rs index 98d2222..df0fcbe 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -7,6 +7,7 @@ use err::Error; use netpod::log::*; use netpod::timeunits::*; use netpod::ByteOrder; +use netpod::DtNano; use netpod::GenVar; use netpod::Node; use netpod::ScalarType; @@ -36,7 +37,7 @@ pub async fn gen_test_data() -> Result<(), Error> { config: SfDbChConf { channel: SfDbChannel::from_name(&backend, "scalar-i32-be"), keyspace: 2, - time_bin_size: TsNano(DAY), + time_bin_size: DtNano::from_ns(DAY), scalar_type: ScalarType::I32, byte_order: ByteOrder::Big, shape: Shape::Scalar, @@ -51,7 +52,7 @@ pub async fn gen_test_data() -> Result<(), Error> { config: SfDbChConf { channel: SfDbChannel::from_name(&backend, "wave-f64-be-n21"), keyspace: 3, - time_bin_size: TsNano(DAY), + time_bin_size: DtNano::from_ns(DAY), array: true, scalar_type: ScalarType::F64, shape: Shape::Wave(21), @@ -66,7 +67,7 @@ pub async fn gen_test_data() -> Result<(), Error> { config: SfDbChConf { channel: SfDbChannel::from_name(&backend, "wave-u16-le-n77"), keyspace: 3, - time_bin_size: TsNano(DAY), + time_bin_size: DtNano::from_ns(DAY), scalar_type: ScalarType::U16, byte_order: ByteOrder::Little, shape: Shape::Wave(77), @@ -81,7 +82,7 @@ pub async fn gen_test_data() -> Result<(), Error> { config: SfDbChConf { channel: SfDbChannel::from_name(&backend, "tw-scalar-i32-be"), keyspace: 2, - time_bin_size: TsNano(DAY), + time_bin_size: DtNano::from_ns(DAY), scalar_type: ScalarType::I32, byte_order: ByteOrder::Little, shape: Shape::Scalar, @@ -96,7 +97,7 @@ pub async fn gen_test_data() -> Result<(), Error> { config: SfDbChConf { channel: SfDbChannel::from_name(&backend, "const-regular-scalar-i32-be"), keyspace: 2, - time_bin_size: TsNano(DAY), + time_bin_size: DtNano::from_ns(DAY), scalar_type: ScalarType::I32, byte_order: ByteOrder::Little, shape: Shape::Scalar, diff --git a/disk/src/merge/mergedblobsfromremotes.rs b/disk/src/merge/mergedblobsfromremotes.rs index 5355b2f..d15fd6d 100644 --- a/disk/src/merge/mergedblobsfromremotes.rs +++ b/disk/src/merge/mergedblobsfromremotes.rs @@ -9,6 +9,9 @@ use netpod::log::*; use netpod::ChannelTypeConfigGen; use netpod::Cluster; use netpod::PerfOpts; +use query::api4::events::EventsSubQuery; +use query::api4::events::EventsSubQuerySelect; +use query::api4::events::EventsSubQuerySettings; use query::api4::events::PlainEventsQuery; use std::future::Future; use std::pin::Pin; @@ -30,10 +33,17 @@ pub struct MergedBlobsFromRemotes { impl MergedBlobsFromRemotes { pub fn new(evq: PlainEventsQuery, perf_opts: PerfOpts, ch_conf: ChannelTypeConfigGen, cluster: Cluster) -> Self { debug!("MergedBlobsFromRemotes evq {:?}", evq); + let select = EventsSubQuerySelect::new(ch_conf.clone(), evq.range().clone(), evq.transform().clone()); + let settings = EventsSubQuerySettings::from(&evq); + let subq = EventsSubQuery::from_parts(select, settings); let mut tcp_establish_futs = Vec::new(); for node in &cluster.nodes { - let f = - x_processed_event_blobs_stream_from_node(evq.clone(), ch_conf.clone(), perf_opts.clone(), node.clone()); + let f = x_processed_event_blobs_stream_from_node( + subq.clone(), + ch_conf.clone(), + perf_opts.clone(), + node.clone(), + ); let f: T002 = Box::pin(f); tcp_establish_futs.push(f); } diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index 3c672c2..f7c1a92 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -18,7 +18,7 @@ use netpod::ByteSize; use netpod::DiskIoTune; use netpod::NodeConfigCached; use netpod::SfChFetchInfo; -use query::api4::events::PlainEventsQuery; +use query::api4::events::EventsSubQuery; use std::pin::Pin; const TEST_BACKEND: &str = "testbackend-00"; @@ -53,15 +53,16 @@ fn make_num_pipeline_stream_evs( } pub async fn make_event_pipe( - evq: &PlainEventsQuery, + evq: EventsSubQuery, fetch_info: SfChFetchInfo, ncc: &NodeConfigCached, ) -> Result> + Send>>, Error> { // sf-databuffer type backends identify channels by their (backend, name) only. let range = evq.range().clone(); + let one_before = evq.transform().need_one_before_range(); info!( "make_event_pipe need_expand {need_expand} {evq:?}", - need_expand = evq.one_before_range() + need_expand = one_before ); let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); // TODO should not need this for correctness. @@ -71,6 +72,7 @@ pub async fn make_event_pipe( } else { 128 }; + let do_decompress = true; let event_blobs = EventChunkerMultifile::new( (&range).try_into()?, fetch_info.clone(), @@ -78,8 +80,8 @@ pub async fn make_event_pipe( ncc.ix, DiskIoTune::default(), event_chunker_conf, - evq.one_before_range(), - true, + one_before, + do_decompress, out_max_len, ); error!("TODO replace AggKind in the called code"); @@ -155,18 +157,18 @@ pub fn make_remote_event_blobs_stream( } pub async fn make_event_blobs_pipe_real( - evq: &PlainEventsQuery, + subq: &EventsSubQuery, fetch_info: &SfChFetchInfo, node_config: &NodeConfigCached, ) -> Result> + Send>>, Error> { if false { - match dbconn::channel_exists(evq.channel(), &node_config).await { + match dbconn::channel_exists(subq.name(), &node_config).await { Ok(_) => (), Err(e) => return Err(e)?, } } - let expand = evq.one_before_range(); - let range = evq.range(); + let expand = subq.transform().need_one_before_range(); + let range = subq.range(); let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); // TODO should depend on host config let do_local = node_config.node_config.cluster.is_central_storage; @@ -204,14 +206,14 @@ pub async fn make_event_blobs_pipe_real( } pub async fn make_event_blobs_pipe_test( - evq: &PlainEventsQuery, + subq: &EventsSubQuery, node_config: &NodeConfigCached, ) -> Result> + Send>>, Error> { warn!("GENERATE INMEM TEST DATA"); let node_count = node_config.node_config.cluster.nodes.len() as u64; let node_ix = node_config.ix as u64; - let chn = evq.channel().name(); - let range = evq.range().clone(); + let chn = subq.name(); + let range = subq.range().clone(); if chn == "test-gen-i32-dim0-v00" { Ok(Box::pin(EventBlobsGeneratorI32Test00::new(node_ix, node_count, range))) } else if chn == "test-gen-i32-dim0-v01" { @@ -247,14 +249,14 @@ pub async fn make_event_blobs_pipe_test( } pub async fn make_event_blobs_pipe( - evq: &PlainEventsQuery, + subq: &EventsSubQuery, fetch_info: &SfChFetchInfo, node_config: &NodeConfigCached, ) -> Result> + Send>>, Error> { - debug!("make_event_blobs_pipe {evq:?}"); - if evq.channel().backend() == TEST_BACKEND { - make_event_blobs_pipe_test(evq, node_config).await + debug!("make_event_blobs_pipe {subq:?}"); + if subq.backend() == TEST_BACKEND { + make_event_blobs_pipe_test(subq, node_config).await } else { - make_event_blobs_pipe_real(evq, fetch_info, node_config).await + make_event_blobs_pipe_real(subq, fetch_info, node_config).await } } diff --git a/httpret/src/api4/binned.rs b/httpret/src/api4/binned.rs index a05d521..738e660 100644 --- a/httpret/src/api4/binned.rs +++ b/httpret/src/api4/binned.rs @@ -3,7 +3,6 @@ use crate::bodystream::ToPublicResponse; use crate::channelconfig::ch_conf_from_binned; use crate::err::Error; use crate::response_err; -use err::anyhow::Context; use http::Method; use http::Request; use http::Response; @@ -39,7 +38,7 @@ async fn binned_json(url: Url, req: Request, node_config: &NodeConfigCache span1.in_scope(|| { debug!("begin"); }); - let item = streams::timebinnedjson::timebinned_json(query, &ch_conf, node_config.node_config.cluster.clone()) + let item = streams::timebinnedjson::timebinned_json(query, ch_conf, node_config.node_config.cluster.clone()) .instrument(span1) .await?; let buf = serde_json::to_vec(&item)?; diff --git a/httpret/src/api4/events.rs b/httpret/src/api4/events.rs index 8c013f2..092ff79 100644 --- a/httpret/src/api4/events.rs +++ b/httpret/src/api4/events.rs @@ -95,7 +95,7 @@ async fn plain_events_json( info!("plain_events_json query {query:?}"); let ch_conf = chconf_from_events_v1(&query, node_config).await.map_err(Error::from)?; info!("plain_events_json chconf_from_events_v1: {ch_conf:?}"); - let item = streams::plaineventsjson::plain_events_json(&query, &ch_conf, &node_config.node_config.cluster).await; + let item = streams::plaineventsjson::plain_events_json(&query, ch_conf, &node_config.node_config.cluster).await; let item = match item { Ok(item) => item, Err(e) => { diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index 588c05a..7809b30 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -13,9 +13,7 @@ use bytes::Bytes; use chrono::DateTime; use chrono::TimeZone; use chrono::Utc; -use err::anyhow; use err::Error; -use err::Res2; use futures_util::Stream; use futures_util::StreamExt; use range::evrange::NanoRange; @@ -1174,6 +1172,19 @@ impl DtNano { pub fn ns(&self) -> u64 { self.0 } + + pub fn ms(&self) -> u64 { + self.0 / MS + } +} + +impl fmt::Debug for DtNano { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let sec = self.0 / SEC; + let ms = (self.0 - SEC * sec) / MS; + let ns = self.0 - SEC * sec - MS * ms; + f.debug_tuple("DtNano").field(&sec).field(&ms).field(&ns).finish() + } } mod dt_nano_serde { @@ -1300,7 +1311,10 @@ impl fmt::Debug for TsNano { f.debug_struct("TsNano") .field( "ts", - &ts.earliest().unwrap_or(Default::default()).format(DATETIME_FMT_3MS), + &ts.earliest() + .unwrap_or(Default::default()) + .format(DATETIME_FMT_3MS) + .to_string(), ) .finish() } @@ -2649,7 +2663,7 @@ pub struct SfChFetchInfo { backend: String, name: String, ks: u8, - bs: TsNano, + bs: DtNano, scalar_type: ScalarType, shape: Shape, compression: bool, @@ -2662,7 +2676,7 @@ impl SfChFetchInfo { backend: S1, name: S2, ks: u8, - bs: TsNano, + bs: DtNano, byte_order: ByteOrder, scalar_type: ScalarType, shape: Shape, @@ -2706,7 +2720,7 @@ impl SfChFetchInfo { self.ks } - pub fn bs(&self) -> TsNano { + pub fn bs(&self) -> DtNano { self.bs.clone() } @@ -2746,6 +2760,20 @@ impl ChannelTypeConfigGen { } } + pub fn backend(&self) -> &str { + match self { + ChannelTypeConfigGen::Scylla(x) => x.backend(), + ChannelTypeConfigGen::SfDatabuffer(x) => x.backend(), + } + } + + pub fn name(&self) -> &str { + match self { + ChannelTypeConfigGen::Scylla(x) => x.name(), + ChannelTypeConfigGen::SfDatabuffer(x) => x.name(), + } + } + pub fn scalar_type(&self) -> &ScalarType { match self { ChannelTypeConfigGen::Scylla(x) => &x.scalar_type, diff --git a/nodenet/src/channelconfig.rs b/nodenet/src/channelconfig.rs index e8822af..dba4d89 100644 --- a/nodenet/src/channelconfig.rs +++ b/nodenet/src/channelconfig.rs @@ -4,12 +4,12 @@ use netpod::range::evrange::NanoRange; use netpod::timeunits::DAY; use netpod::ByteOrder; use netpod::ChannelTypeConfigGen; +use netpod::DtNano; use netpod::NodeConfigCached; use netpod::ScalarType; use netpod::SfChFetchInfo; use netpod::SfDbChannel; use netpod::Shape; -use netpod::TsNano; const TEST_BACKEND: &str = "testbackend-00"; @@ -20,7 +20,7 @@ fn channel_config_test_backend(channel: SfDbChannel) -> Result Result Result Result Result Result> From<(E, OwnedWriteHalf)> for ConnErr { } async fn make_channel_events_stream_data( - evq: PlainEventsQuery, - ch_conf: ChannelTypeConfigGen, - node_config: &NodeConfigCached, + subq: EventsSubQuery, + ncc: &NodeConfigCached, ) -> Result> + Send>>, Error> { - if evq.channel().backend() == TEST_BACKEND { + if subq.backend() == TEST_BACKEND { debug!("use test backend data {}", TEST_BACKEND); - let node_count = node_config.node_config.cluster.nodes.len() as u64; - let node_ix = node_config.ix as u64; - let chn = evq.channel().name(); - let range = evq.range().clone(); + let node_count = ncc.node_config.cluster.nodes.len() as u64; + let node_ix = ncc.ix as u64; + let chn = subq.name(); + let range = subq.range().clone(); + let one_before = subq.transform().need_one_before_range(); if chn == "test-gen-i32-dim0-v00" { - Ok(Box::pin(GenerateI32V00::new( - node_ix, - node_count, - range, - evq.one_before_range(), - ))) + Ok(Box::pin(GenerateI32V00::new(node_ix, node_count, range, one_before))) } else if chn == "test-gen-i32-dim0-v01" { - Ok(Box::pin(GenerateI32V01::new( - node_ix, - node_count, - range, - evq.one_before_range(), - ))) + Ok(Box::pin(GenerateI32V01::new(node_ix, node_count, range, one_before))) } else if chn == "test-gen-f64-dim1-v00" { - Ok(Box::pin(GenerateF64V00::new( - node_ix, - node_count, - range, - evq.one_before_range(), - ))) + Ok(Box::pin(GenerateF64V00::new(node_ix, node_count, range, one_before))) } else { let na: Vec<_> = chn.split("-").collect(); if na.len() != 3 { @@ -115,7 +102,7 @@ async fn make_channel_events_stream_data( "make_channel_events_stream_data can not understand test channel name: {chn:?}" ))) } else { - let range = evq.range().clone(); + let range = subq.range().clone(); if na[1] == "d0" { if na[2] == "i32" { //generator::generate_i32(node_ix, node_count, range) @@ -136,44 +123,33 @@ async fn make_channel_events_stream_data( } } } - } else if let Some(scyconf) = &node_config.node_config.cluster.scylla { - scylla_channel_event_stream(evq, ch_conf.to_scylla()?, scyconf, node_config).await - } else if let Some(_) = &node_config.node.channel_archiver { + } else if let Some(scyconf) = &ncc.node_config.cluster.scylla { + let cfg = subq.ch_conf().to_scylla()?; + scylla_channel_event_stream(subq, cfg, scyconf, ncc).await + } else if let Some(_) = &ncc.node.channel_archiver { let e = Error::with_msg_no_trace("archapp not built"); Err(e) - } else if let Some(_) = &node_config.node.archiver_appliance { + } else if let Some(_) = &ncc.node.archiver_appliance { let e = Error::with_msg_no_trace("archapp not built"); Err(e) } else { - Ok(disk::raw::conn::make_event_pipe(&evq, ch_conf.to_sf_databuffer()?, node_config).await?) + let cfg = subq.ch_conf().to_sf_databuffer()?; + Ok(disk::raw::conn::make_event_pipe(subq, cfg, ncc).await?) } } async fn make_channel_events_stream( - evq: PlainEventsQuery, - ch_conf: ChannelTypeConfigGen, - node_config: &NodeConfigCached, + subq: EventsSubQuery, + ncc: &NodeConfigCached, ) -> Result> + Send>>, Error> { - let empty = empty_events_dyn_ev(ch_conf.scalar_type(), ch_conf.shape())?; + let empty = empty_events_dyn_ev(subq.ch_conf().scalar_type(), subq.ch_conf().shape())?; let empty = sitem_data(ChannelEvents::Events(empty)); - let stream = make_channel_events_stream_data(evq, ch_conf, node_config).await?; + let stream = make_channel_events_stream_data(subq, ncc).await?; let ret = futures_util::stream::iter([empty]).chain(stream); let ret = Box::pin(ret); Ok(ret) } -#[derive(Debug, Serialize, Deserialize)] -pub struct Frame1Parts { - query: PlainEventsQuery, - ch_conf: ChannelTypeConfigGen, -} - -impl Frame1Parts { - pub fn new(query: PlainEventsQuery, ch_conf: ChannelTypeConfigGen) -> Self { - Self { query, ch_conf } - } -} - async fn events_get_input_frames(netin: OwnedReadHalf) -> Result, Error> { let perf_opts = PerfOpts::default(); let mut h = InMemoryFrameAsyncReadStream::new(netin, perf_opts.inmem_bufcap); @@ -198,9 +174,7 @@ async fn events_get_input_frames(netin: OwnedReadHalf) -> Result, -) -> Result<(PlainEventsQuery, ChannelTypeConfigGen), Error> { +async fn events_parse_input_query(frames: Vec) -> Result<(EventsSubQuery,), Error> { if frames.len() != 1 { error!("{:?}", frames); error!("missing command frame len {}", frames.len()); @@ -225,16 +199,13 @@ async fn events_parse_input_query( }, Err(e) => return Err(e), }; - let cmd: Frame1Parts = serde_json::from_str(&qitem.str()).map_err(|e| { + let frame1: Frame1Parts = serde_json::from_str(&qitem.str()).map_err(|e| { let e = Error::with_msg_no_trace(format!("json parse error: {} inp {:?}", e, qitem.str())); error!("{e}"); e })?; - debug!("events_parse_input_query {:?}", cmd); - if query_frame.tyid() != EVENT_QUERY_JSON_STRING_FRAME { - return Err(Error::with_msg("query frame wrong type")); - } - Ok((cmd.query, cmd.ch_conf)) + info!("events_parse_input_query {:?}", frame1); + Ok(frame1.parts()) } async fn events_conn_handler_inner_try( @@ -248,13 +219,17 @@ async fn events_conn_handler_inner_try( Ok(x) => x, Err(e) => return Err((e, netout).into()), }; - let (evq, ch_conf) = match events_parse_input_query(frames).await { + let (evq,) = match events_parse_input_query(frames).await { Ok(x) => x, Err(e) => return Err((e, netout).into()), }; + if evq.create_errors_contains("nodenet_parse_query") { + let e = Error::with_msg_no_trace("produced error on request nodenet_parse_query"); + return Err((e, netout).into()); + } let mut stream: Pin> + Send>> = if evq.is_event_blobs() { // TODO support event blobs as transform - let fetch_info = match ch_conf.to_sf_databuffer() { + let fetch_info = match evq.ch_conf().to_sf_databuffer() { Ok(x) => x, Err(e) => return Err((e, netout).into()), }; @@ -266,7 +241,7 @@ async fn events_conn_handler_inner_try( Err(e) => return Err((e, netout).into()), } } else { - match make_channel_events_stream(evq.clone(), ch_conf, ncc).await { + match make_channel_events_stream(evq.clone(), ncc).await { Ok(stream) => { if false { // TODO wasm example @@ -309,7 +284,11 @@ async fn events_conn_handler_inner_try( Ok(buf) => { buf_len_histo.ingest(buf.len() as u32); match netout.write_all(&buf).await { - Ok(_) => {} + Ok(()) => { + // TODO collect timing information and send as summary in a stats item. + // TODO especially collect a distribution over the buf lengths that were send. + // TODO we want to see a reasonable batch size. + } Err(e) => return Err((e, netout))?, } } @@ -331,7 +310,7 @@ async fn events_conn_handler_inner_try( Err(e) => return Err((e, netout))?, }; match netout.write_all(&buf).await { - Ok(_) => (), + Ok(()) => (), Err(e) => return Err((e, netout))?, } } @@ -340,11 +319,11 @@ async fn events_conn_handler_inner_try( Err(e) => return Err((e, netout))?, }; match netout.write_all(&buf).await { - Ok(_) => (), + Ok(()) => (), Err(e) => return Err((e, netout))?, } match netout.flush().await { - Ok(_) => (), + Ok(()) => (), Err(e) => return Err((e, netout))?, } Ok(()) diff --git a/nodenet/src/conn/test.rs b/nodenet/src/conn/test.rs index e7cfb9b..84e98a1 100644 --- a/nodenet/src/conn/test.rs +++ b/nodenet/src/conn/test.rs @@ -14,17 +14,25 @@ use items_2::framable::EventQueryJsonStringFrame; use items_2::framable::Framable; use items_2::frame::decode_frame; use netpod::range::evrange::NanoRange; +use netpod::timeunits::DAY; use netpod::timeunits::SEC; +use netpod::ByteOrder; use netpod::Cluster; use netpod::Database; +use netpod::DtNano; use netpod::FileIoBufferSize; use netpod::Node; use netpod::NodeConfig; use netpod::NodeConfigCached; use netpod::PerfOpts; +use netpod::ScalarType; +use netpod::SfChFetchInfo; use netpod::SfDatabuffer; -use netpod::SfDbChannel; -use query::api4::events::PlainEventsQuery; +use netpod::Shape; +use query::api4::events::EventsSubQuery; +use query::api4::events::EventsSubQuerySelect; +use query::api4::events::EventsSubQuerySettings; +use query::transform::TransformQuery; use streams::frames::inmem::InMemoryFrameAsyncReadStream; use tokio::io::AsyncWriteExt; use tokio::net::TcpListener; @@ -75,13 +83,23 @@ fn raw_data_00() { }, ix: 0, }; - let channel = SfDbChannel::from_name(TEST_BACKEND, "scalar-i32"); let range = NanoRange { beg: SEC, end: SEC * 10, }; - let qu = PlainEventsQuery::new(channel, range); - let frame1 = Frame1Parts::new(qu, err::todoval()); + let fetch_info = SfChFetchInfo::new( + TEST_BACKEND, + "scalar-i32", + 2, + DtNano::from_ns(DAY), + ByteOrder::Big, + ScalarType::I32, + Shape::Scalar, + ); + let select = EventsSubQuerySelect::new(fetch_info.into(), range.into(), TransformQuery::default_events()); + let settings = EventsSubQuerySettings::default(); + let qu = EventsSubQuery::from_parts(select, settings); + let frame1 = Frame1Parts::new(qu); let query = EventQueryJsonStringFrame(serde_json::to_string(&frame1).unwrap()); let frame = sitem_data(query).make_frame()?; let jh = taskrun::spawn(events_conn_handler(client, addr, cfg)); diff --git a/nodenet/src/scylla.rs b/nodenet/src/scylla.rs index 0da5b9e..47de606 100644 --- a/nodenet/src/scylla.rs +++ b/nodenet/src/scylla.rs @@ -9,11 +9,11 @@ use netpod::log::*; use netpod::ChConf; use netpod::NodeConfigCached; use netpod::ScyllaConfig; -use query::api4::events::PlainEventsQuery; +use query::api4::events::EventsSubQuery; use std::pin::Pin; pub async fn scylla_channel_event_stream( - evq: PlainEventsQuery, + evq: EventsSubQuery, chconf: ChConf, scyco: &ScyllaConfig, _ncc: &NodeConfigCached, diff --git a/parse/src/channelconfig.rs b/parse/src/channelconfig.rs index bc0ffd2..952ae24 100644 --- a/parse/src/channelconfig.rs +++ b/parse/src/channelconfig.rs @@ -4,8 +4,7 @@ use netpod::range::evrange::NanoRange; use netpod::timeunits::DAY; use netpod::timeunits::MS; use netpod::ByteOrder; -use netpod::ChannelConfigQuery; -use netpod::ChannelConfigResponse; +use netpod::DtNano; use netpod::NodeConfigCached; use netpod::ScalarType; use netpod::SfDbChannel; @@ -23,7 +22,6 @@ use num_derive::ToPrimitive; use num_traits::ToPrimitive; use serde::Deserialize; use serde::Serialize; -use std::cmp; use std::fmt; use tokio::io::ErrorKind; @@ -88,7 +86,7 @@ pub struct ConfigEntry { pub ts: TsNano, pub pulse: i64, pub ks: i32, - pub bs: TsNano, + pub bs: DtNano, pub split_count: i32, pub status: i32, pub bb: i8, @@ -176,7 +174,7 @@ pub fn parse_entry(inp: &[u8]) -> NRes> { let (inp, pulse) = be_i64(inp)?; let (inp, ks) = be_i32(inp)?; let (inp, bs) = be_i64(inp)?; - let bs = TsNano(bs as u64 * MS); + let bs = DtNano::from_ns(bs as u64 * MS); let (inp, split_count) = be_i32(inp)?; let (inp, status) = be_i32(inp)?; let (inp, bb) = be_i8(inp)?; @@ -348,10 +346,10 @@ async fn read_local_config_test(channel: SfDbChannel, ncc: &NodeConfigCached) -> format_version: 0, channel_name: channel.name().into(), entries: vec![ConfigEntry { - ts: TsNano(0), + ts: TsNano::from_ns(0), pulse: 0, ks: 2, - bs: TsNano(DAY), + bs: DtNano::from_ns(DAY), split_count: ncc.node_config.cluster.nodes.len() as _, status: -1, bb: -1, @@ -378,10 +376,10 @@ async fn read_local_config_test(channel: SfDbChannel, ncc: &NodeConfigCached) -> format_version: 0, channel_name: channel.name().into(), entries: vec![ConfigEntry { - ts: TsNano(0), + ts: TsNano::from_ns(0), pulse: 0, ks: 2, - bs: TsNano(DAY), + bs: DtNano::from_ns(DAY), split_count: ncc.node_config.cluster.nodes.len() as _, status: -1, bb: -1, diff --git a/query/src/api4/events.rs b/query/src/api4/events.rs index a3e81b0..35bdb8e 100644 --- a/query/src/api4/events.rs +++ b/query/src/api4/events.rs @@ -1,3 +1,4 @@ +use super::binned::BinnedQuery; use crate::transform::TransformQuery; use err::Error; use netpod::get_url_query_pairs; @@ -9,6 +10,7 @@ use netpod::query::TimeRangeQuery; use netpod::range::evrange::SeriesRange; use netpod::AppendToUrl; use netpod::ByteSize; +use netpod::ChannelTypeConfigGen; use netpod::FromUrl; use netpod::HasBackend; use netpod::HasTimeout; @@ -48,6 +50,8 @@ pub struct PlainEventsQuery { test_do_wasm: bool, #[serde(default, skip_serializing_if = "Option::is_none")] merger_out_len_max: Option, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + create_errors: Vec, } impl PlainEventsQuery { @@ -69,6 +73,7 @@ impl PlainEventsQuery { do_test_stream_error: false, test_do_wasm: false, merger_out_len_max: None, + create_errors: Vec::new(), } } @@ -160,6 +165,10 @@ impl PlainEventsQuery { pub fn need_value_data(&self) -> bool { self.transform.need_value_data() } + + pub fn create_errors_contains(&self, x: &str) -> bool { + self.create_errors.contains(&String::from(x)) + } } impl HasBackend for PlainEventsQuery { @@ -227,6 +236,10 @@ impl FromUrl for PlainEventsQuery { merger_out_len_max: pairs .get("mergerOutLenMax") .map_or(Ok(None), |k| k.parse().map(|k| Some(k)))?, + create_errors: pairs + .get("create_errors") + .map(|x| x.split(",").map(|x| x.to_string()).collect()) + .unwrap_or(Vec::new()), }; Ok(ret) } @@ -274,5 +287,169 @@ impl AppendToUrl for PlainEventsQuery { if let Some(x) = self.merger_out_len_max.as_ref() { g.append_pair("mergerOutLenMax", &format!("{}", x)); } + if self.create_errors.len() != 0 { + g.append_pair("create_errors", &self.create_errors.join(",")); + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct EventsSubQuerySelect { + ch_conf: ChannelTypeConfigGen, + range: SeriesRange, + transform: TransformQuery, +} + +impl EventsSubQuerySelect { + pub fn new(ch_info: ChannelTypeConfigGen, range: SeriesRange, transform: TransformQuery) -> Self { + Self { + ch_conf: ch_info, + range, + transform, + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct EventsSubQuerySettings { + timeout: Option, + events_max: Option, + event_delay: Option, + stream_batch_len: Option, + buf_len_disk_io: Option, + test_do_wasm: bool, + create_errors: Vec, +} + +impl Default for EventsSubQuerySettings { + fn default() -> Self { + Self { + timeout: Default::default(), + events_max: Default::default(), + event_delay: Default::default(), + stream_batch_len: Default::default(), + buf_len_disk_io: Default::default(), + test_do_wasm: Default::default(), + create_errors: Default::default(), + } + } +} + +impl From<&PlainEventsQuery> for EventsSubQuerySettings { + fn from(value: &PlainEventsQuery) -> Self { + Self { + timeout: value.timeout, + events_max: value.events_max, + event_delay: value.event_delay, + stream_batch_len: value.stream_batch_len, + buf_len_disk_io: value.buf_len_disk_io, + test_do_wasm: value.test_do_wasm, + create_errors: value.create_errors.clone(), + } + } +} + +impl From<&BinnedQuery> for EventsSubQuerySettings { + fn from(value: &BinnedQuery) -> Self { + Self { + timeout: value.timeout(), + // TODO ? + events_max: None, + event_delay: None, + stream_batch_len: None, + buf_len_disk_io: None, + test_do_wasm: false, + create_errors: Vec::new(), + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct EventsSubQuery { + select: EventsSubQuerySelect, + settings: EventsSubQuerySettings, + ty: String, +} + +impl EventsSubQuery { + pub fn from_parts(select: EventsSubQuerySelect, settings: EventsSubQuerySettings) -> Self { + Self { + select, + settings, + ty: "EventsSubQuery".into(), + } + } + + pub fn backend(&self) -> &str { + &self.select.ch_conf.backend() + } + + pub fn name(&self) -> &str { + &self.select.ch_conf.name() + } + + pub fn range(&self) -> &SeriesRange { + &self.select.range + } + + pub fn transform(&self) -> &TransformQuery { + &self.select.transform + } + + pub fn ch_conf(&self) -> &ChannelTypeConfigGen { + &self.select.ch_conf + } + + pub fn timeout(&self) -> Duration { + self.settings.timeout.unwrap_or(Duration::from_millis(10000)) + } + + pub fn events_max(&self) -> u64 { + self.settings.events_max.unwrap_or(1024 * 128) + } + + pub fn event_delay(&self) -> &Option { + &self.settings.event_delay + } + + pub fn buf_len_disk_io(&self) -> usize { + self.settings.buf_len_disk_io.unwrap_or(1024 * 8) + } + + // A rough indication on how many bytes this request is allowed to return. Otherwise, the result should + // be a partial result. + pub fn bytes_max(&self) -> u64 { + self.settings.events_max.unwrap_or(1024 * 512) + } + + pub fn test_do_wasm(&self) -> bool { + self.settings.test_do_wasm + } + + pub fn is_event_blobs(&self) -> bool { + self.select.transform.is_event_blobs() + } + + pub fn need_value_data(&self) -> bool { + self.select.transform.need_value_data() + } + + pub fn create_errors_contains(&self, x: &str) -> bool { + self.settings.create_errors.contains(&String::from(x)) + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct Frame1Parts { + query: EventsSubQuery, +} + +impl Frame1Parts { + pub fn new(query: EventsSubQuery) -> Self { + Self { query } + } + + pub fn parts(self) -> (EventsSubQuery,) { + (self.query,) } } diff --git a/streams/src/plaineventsjson.rs b/streams/src/plaineventsjson.rs index 84d821a..5acd7ac 100644 --- a/streams/src/plaineventsjson.rs +++ b/streams/src/plaineventsjson.rs @@ -14,21 +14,27 @@ use items_2::streams::PlainEventStream; use netpod::log::*; use netpod::ChannelTypeConfigGen; use netpod::Cluster; +use query::api4::events::EventsSubQuery; +use query::api4::events::EventsSubQuerySelect; +use query::api4::events::EventsSubQuerySettings; use query::api4::events::PlainEventsQuery; use serde_json::Value as JsonValue; use std::time::Instant; pub async fn plain_events_json( evq: &PlainEventsQuery, - ch_conf: &ChannelTypeConfigGen, + ch_conf: ChannelTypeConfigGen, cluster: &Cluster, ) -> Result { info!("plain_events_json evquery {:?}", evq); + let select = EventsSubQuerySelect::new(ch_conf, evq.range().clone(), evq.transform().clone()); + let settings = EventsSubQuerySettings::from(evq); + let subq = EventsSubQuery::from_parts(select, settings); // TODO remove magic constant let deadline = Instant::now() + evq.timeout(); let mut tr = build_merged_event_transform(evq.transform())?; // TODO make sure the empty container arrives over the network. - let inps = open_tcp_streams::<_, ChannelEvents>(&evq, ch_conf, cluster).await?; + let inps = open_tcp_streams::(subq, cluster).await?; // TODO propagate also the max-buf-len for the first stage event reader. // TODO use a mixture of count and byte-size as threshold. let stream = Merger::new(inps, evq.merger_out_len_max()); diff --git a/streams/src/tcprawclient.rs b/streams/src/tcprawclient.rs index 8899793..44aeb90 100644 --- a/streams/src/tcprawclient.rs +++ b/streams/src/tcprawclient.rs @@ -21,36 +21,29 @@ use netpod::ChannelTypeConfigGen; use netpod::Cluster; use netpod::Node; use netpod::PerfOpts; -use query::api4::events::PlainEventsQuery; +use query::api4::events::EventsSubQuery; +use query::api4::events::Frame1Parts; use serde::de::DeserializeOwned; -use serde::Serialize; -use serde_json::json; use std::fmt; use std::pin::Pin; use tokio::io::AsyncWriteExt; use tokio::net::TcpStream; -pub fn make_node_command_frame(query: Q, ch_conf: &ChannelTypeConfigGen) -> Result -where - Q: Serialize, -{ - let obj = json!({ - "query": query, - "ch_conf":ch_conf, - }); +pub fn make_node_command_frame(query: EventsSubQuery) -> Result { + let obj = Frame1Parts::new(query); let ret = serde_json::to_string(&obj)?; Ok(EventQueryJsonStringFrame(ret)) } pub async fn x_processed_event_blobs_stream_from_node( - query: PlainEventsQuery, + query: EventsSubQuery, ch_conf: ChannelTypeConfigGen, perf_opts: PerfOpts, node: Node, ) -> Result> + Send>>, Error> { let addr = format!("{}:{}", node.host, node.port_raw); debug!("x_processed_event_blobs_stream_from_node to: {addr}",); - let frame1 = make_node_command_frame(&query, &ch_conf)?; + let frame1 = make_node_command_frame(query)?; let net = TcpStream::connect(addr.clone()).await?; let (netin, mut netout) = net.into_split(); let item = sitem_data(frame1); @@ -68,18 +61,13 @@ pub async fn x_processed_event_blobs_stream_from_node( pub type BoxedStream = Pin> + Send>>; -pub async fn open_tcp_streams( - query: Q, - ch_conf: &ChannelTypeConfigGen, - cluster: &Cluster, -) -> Result>, Error> +pub async fn open_tcp_streams(query: EventsSubQuery, cluster: &Cluster) -> Result>, Error> where - Q: Serialize, // Group bounds in new trait T: FrameTypeInnerStatic + DeserializeOwned + Send + Unpin + fmt::Debug + 'static, { // TODO when unit tests established, change to async connect: - let frame1 = make_node_command_frame(&query, &ch_conf)?; + let frame1 = make_node_command_frame(query)?; let mut streams = Vec::new(); for node in &cluster.nodes { let addr = format!("{}:{}", node.host, node.port_raw); diff --git a/streams/src/timebinnedjson.rs b/streams/src/timebinnedjson.rs index d4be048..90e9d84 100644 --- a/streams/src/timebinnedjson.rs +++ b/streams/src/timebinnedjson.rs @@ -24,7 +24,9 @@ use netpod::BinnedRangeEnum; use netpod::ChannelTypeConfigGen; use netpod::Cluster; use query::api4::binned::BinnedQuery; -use query::api4::events::PlainEventsQuery; +use query::api4::events::EventsSubQuery; +use query::api4::events::EventsSubQuerySelect; +use query::api4::events::EventsSubQuerySettings; use serde_json::Value as JsonValue; use std::pin::Pin; use std::time::Instant; @@ -38,13 +40,14 @@ async fn timebinnable_stream( query: BinnedQuery, range: NanoRange, one_before_range: bool, - ch_conf: &ChannelTypeConfigGen, + ch_conf: ChannelTypeConfigGen, cluster: Cluster, ) -> Result { - let evq = PlainEventsQuery::new(query.channel().clone(), range.clone()).for_time_weighted_scalar(); - let mut tr = build_merged_event_transform(evq.transform())?; - - let inps = open_tcp_streams::<_, ChannelEvents>(&evq, ch_conf, &cluster).await?; + let select = EventsSubQuerySelect::new(ch_conf, range.clone().into(), query.transform().clone()); + let settings = EventsSubQuerySettings::from(&query); + let subq = EventsSubQuery::from_parts(select, settings); + let mut tr = build_merged_event_transform(subq.transform())?; + let inps = open_tcp_streams::(subq, &cluster).await?; // TODO propagate also the max-buf-len for the first stage event reader. // TODO use a mixture of count and byte-size as threshold. let stream = Merger::new(inps, query.merger_out_len_max()); @@ -68,7 +71,7 @@ async fn timebinnable_stream( async fn timebinned_stream( query: BinnedQuery, binned_range: BinnedRangeEnum, - ch_conf: &ChannelTypeConfigGen, + ch_conf: ChannelTypeConfigGen, cluster: Cluster, ) -> Result>> + Send>>, Error> { let range = binned_range.binned_range_time().to_nano_range(); @@ -101,7 +104,7 @@ fn timebinned_to_collectable( pub async fn timebinned_json( query: BinnedQuery, - ch_conf: &ChannelTypeConfigGen, + ch_conf: ChannelTypeConfigGen, cluster: Cluster, ) -> Result { let deadline = Instant::now().checked_add(query.timeout_value()).unwrap();