From 8fd7e72796cddc81c9937cb3ebb37c31ef3e1d28 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Thu, 7 Nov 2024 17:44:37 +0100 Subject: [PATCH] Disentangle deps, typechecks --- crates/disk/src/channelconfig.rs | 121 ++++++++++- crates/items_2/Cargo.toml | 1 - crates/items_2/src/eventsdim0.rs | 2 +- crates/parse/Cargo.toml | 2 +- crates/parse/src/channelconfig.rs | 115 ---------- crates/streamio/src/frames.rs | 1 + crates/streamio/src/frames/inmem.rs | 1 + crates/streamio/src/lib.rs | 1 + crates/streams/Cargo.toml | 7 +- crates/streams/src/frames/inmem.rs | 2 +- crates/streams/src/rangefilter2.rs | 265 +---------------------- crates/streams/src/rangefilter2/test.rs | 267 ++++++++++++++++++++++++ crates/streams/src/test.rs | 7 +- 13 files changed, 402 insertions(+), 390 deletions(-) create mode 100644 crates/streamio/src/frames.rs create mode 100644 crates/streamio/src/frames/inmem.rs create mode 100644 crates/streams/src/rangefilter2/test.rs diff --git a/crates/disk/src/channelconfig.rs b/crates/disk/src/channelconfig.rs index f19ebc6..856a962 100644 --- a/crates/disk/src/channelconfig.rs +++ b/crates/disk/src/channelconfig.rs @@ -3,13 +3,22 @@ use err::*; #[allow(unused)] use netpod::log::*; use netpod::range::evrange::NanoRange; +use netpod::timeunits::DAY; +use netpod::ByteOrder; +use netpod::DtNano; use netpod::NodeConfigCached; +use netpod::ScalarType; use netpod::SfDbChannel; +use netpod::TsNano; use parse::channelconfig::extract_matching_config_entry; -use parse::channelconfig::read_local_config; +use parse::channelconfig::parse_config; use parse::channelconfig::ChannelConfigs; use parse::channelconfig::ConfigEntry; use parse::channelconfig::ConfigParseError; +use std::time::Duration; +use std::time::SystemTime; +use streams::tcprawclient::TEST_BACKEND; +use taskrun::tokio; #[derive(Debug, ThisError)] #[cstm(name = "ChannelConfig")] @@ -50,6 +59,116 @@ pub async fn config_entry_best_match( } } +async fn read_local_config_real( + channel: SfDbChannel, + ncc: &NodeConfigCached, +) -> Result { + use std::io::ErrorKind; + let path = ncc + .node + .sf_databuffer + .as_ref() + .ok_or_else(|| ConfigParseError::NotSupportedOnNode)? + .data_base_path + .join("config") + .join(channel.name()) + .join("latest") + .join("00000_Config"); + match tokio::fs::read(&path).await { + Ok(buf) => parse_config(&buf), + Err(e) => match e.kind() { + ErrorKind::NotFound => Err(ConfigParseError::FileNotFound), + ErrorKind::PermissionDenied => Err(ConfigParseError::PermissionDenied), + e => { + error!("read_local_config_real {e:?}"); + Err(ConfigParseError::IO) + } + }, + } +} + +async fn read_local_config_test( + channel: SfDbChannel, + ncc: &NodeConfigCached, +) -> Result { + if channel.name() == "test-gen-i32-dim0-v00" { + let ts = 0; + let ret = ChannelConfigs { + format_version: 0, + channel_name: channel.name().into(), + entries: vec![ConfigEntry { + ts: TsNano::from_ns(ts), + ts_human: SystemTime::UNIX_EPOCH + Duration::from_nanos(ts as u64), + pulse: 0, + ks: 2, + bs: DtNano::from_ns(DAY), + split_count: ncc.node_config.cluster.nodes.len() as _, + status: -1, + bb: -1, + modulo: -1, + offset: -1, + precision: -1, + scalar_type: ScalarType::I32, + is_compressed: false, + is_shaped: false, + is_array: false, + byte_order: ByteOrder::Big, + compression_method: None, + shape: None, + source_name: None, + unit: None, + description: None, + optional_fields: None, + value_converter: None, + }], + }; + Ok(ret) + } else if channel.name() == "test-gen-i32-dim0-v01" { + let ts = 0; + let ret = ChannelConfigs { + format_version: 0, + channel_name: channel.name().into(), + entries: vec![ConfigEntry { + ts: TsNano::from_ns(ts), + ts_human: SystemTime::UNIX_EPOCH + Duration::from_nanos(ts as u64), + pulse: 0, + ks: 2, + bs: DtNano::from_ns(DAY), + split_count: ncc.node_config.cluster.nodes.len() as _, + status: -1, + bb: -1, + modulo: -1, + offset: -1, + precision: -1, + scalar_type: ScalarType::I32, + is_compressed: false, + is_shaped: false, + is_array: false, + byte_order: ByteOrder::Big, + compression_method: None, + shape: None, + source_name: None, + unit: None, + description: None, + optional_fields: None, + value_converter: None, + }], + }; + Ok(ret) + } else { + Err(ConfigParseError::NotSupported) + } +} + +// TODO can I take parameters as ref, even when used in custom streams? +async fn read_local_config(channel: SfDbChannel, ncc: NodeConfigCached) -> Result { + if channel.backend() == TEST_BACKEND { + read_local_config_test(channel, &ncc).await + } else { + read_local_config_real(channel, &ncc).await + } +} + pub async fn channel_configs( channel: SfDbChannel, node_config: &NodeConfigCached, diff --git a/crates/items_2/Cargo.toml b/crates/items_2/Cargo.toml index 7d8dd1d..c2dbf4e 100644 --- a/crates/items_2/Cargo.toml +++ b/crates/items_2/Cargo.toml @@ -27,7 +27,6 @@ err = { path = "../err" } items_0 = { path = "../items_0" } items_proc = { path = "../items_proc" } netpod = { path = "../netpod" } -taskrun = { path = "../taskrun" } parse = { path = "../parse" } bitshuffle = { path = "../bitshuffle" } diff --git a/crates/items_2/src/eventsdim0.rs b/crates/items_2/src/eventsdim0.rs index b5cde68..d87f6a3 100644 --- a/crates/items_2/src/eventsdim0.rs +++ b/crates/items_2/src/eventsdim0.rs @@ -799,7 +799,7 @@ mod test_frame { #[test] fn events_serialize() { - taskrun::tracing_init_testing().unwrap(); + // taskrun::tracing_init_testing().unwrap(); let mut events = EventsDim0::empty(); events.push(123, 234, 55f32); let events = events; diff --git a/crates/parse/Cargo.toml b/crates/parse/Cargo.toml index 4c02396..bd4ac20 100644 --- a/crates/parse/Cargo.toml +++ b/crates/parse/Cargo.toml @@ -8,7 +8,7 @@ edition = "2021" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" humantime-serde = "1.1" -tokio = { version = "1.28.2", features = ["fs"] } +#tokio = { version = "1.28.2", features = ["fs"] } chrono = { version = "0.4.26", features = ["serde"] } bytes = "1.4" byteorder = "1.4" diff --git a/crates/parse/src/channelconfig.rs b/crates/parse/src/channelconfig.rs index a5a0d1e..e7948f6 100644 --- a/crates/parse/src/channelconfig.rs +++ b/crates/parse/src/channelconfig.rs @@ -22,9 +22,6 @@ use serde::Serialize; use std::fmt; use std::time::Duration; use std::time::SystemTime; -use tokio::io::ErrorKind; - -const TEST_BACKEND: &str = "testbackend-00"; #[derive(Debug, ThisError)] #[cstm(name = "ConfigParse")] @@ -302,118 +299,6 @@ pub fn parse_config(inp: &[u8]) -> Result { Ok(ret) } -async fn read_local_config_real( - channel: SfDbChannel, - ncc: &NodeConfigCached, -) -> Result { - let path = ncc - .node - .sf_databuffer - .as_ref() - .ok_or_else(|| ConfigParseError::NotSupportedOnNode)? - .data_base_path - .join("config") - .join(channel.name()) - .join("latest") - .join("00000_Config"); - match tokio::fs::read(&path).await { - Ok(buf) => parse_config(&buf), - Err(e) => match e.kind() { - ErrorKind::NotFound => Err(ConfigParseError::FileNotFound), - ErrorKind::PermissionDenied => Err(ConfigParseError::PermissionDenied), - e => { - error!("read_local_config_real {e:?}"); - Err(ConfigParseError::IO) - } - }, - } -} - -async fn read_local_config_test( - channel: SfDbChannel, - ncc: &NodeConfigCached, -) -> Result { - if channel.name() == "test-gen-i32-dim0-v00" { - let ts = 0; - let ret = ChannelConfigs { - format_version: 0, - channel_name: channel.name().into(), - entries: vec![ConfigEntry { - ts: TsNano::from_ns(ts), - ts_human: SystemTime::UNIX_EPOCH + Duration::from_nanos(ts as u64), - pulse: 0, - ks: 2, - bs: DtNano::from_ns(DAY), - split_count: ncc.node_config.cluster.nodes.len() as _, - status: -1, - bb: -1, - modulo: -1, - offset: -1, - precision: -1, - scalar_type: ScalarType::I32, - is_compressed: false, - is_shaped: false, - is_array: false, - byte_order: ByteOrder::Big, - compression_method: None, - shape: None, - source_name: None, - unit: None, - description: None, - optional_fields: None, - value_converter: None, - }], - }; - Ok(ret) - } else if channel.name() == "test-gen-i32-dim0-v01" { - let ts = 0; - let ret = ChannelConfigs { - format_version: 0, - channel_name: channel.name().into(), - entries: vec![ConfigEntry { - ts: TsNano::from_ns(ts), - ts_human: SystemTime::UNIX_EPOCH + Duration::from_nanos(ts as u64), - pulse: 0, - ks: 2, - bs: DtNano::from_ns(DAY), - split_count: ncc.node_config.cluster.nodes.len() as _, - status: -1, - bb: -1, - modulo: -1, - offset: -1, - precision: -1, - scalar_type: ScalarType::I32, - is_compressed: false, - is_shaped: false, - is_array: false, - byte_order: ByteOrder::Big, - compression_method: None, - shape: None, - source_name: None, - unit: None, - description: None, - optional_fields: None, - value_converter: None, - }], - }; - Ok(ret) - } else { - Err(ConfigParseError::NotSupported) - } -} - -// TODO can I take parameters as ref, even when used in custom streams? -pub async fn read_local_config( - channel: SfDbChannel, - ncc: NodeConfigCached, -) -> Result { - if channel.backend() == TEST_BACKEND { - read_local_config_test(channel, &ncc).await - } else { - read_local_config_real(channel, &ncc).await - } -} - #[derive(Clone)] pub enum MatchingConfigEntry<'a> { None, diff --git a/crates/streamio/src/frames.rs b/crates/streamio/src/frames.rs new file mode 100644 index 0000000..acca1c0 --- /dev/null +++ b/crates/streamio/src/frames.rs @@ -0,0 +1 @@ +pub mod inmem; diff --git a/crates/streamio/src/frames/inmem.rs b/crates/streamio/src/frames/inmem.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/crates/streamio/src/frames/inmem.rs @@ -0,0 +1 @@ + diff --git a/crates/streamio/src/lib.rs b/crates/streamio/src/lib.rs index f864227..46e6f73 100644 --- a/crates/streamio/src/lib.rs +++ b/crates/streamio/src/lib.rs @@ -1,3 +1,4 @@ +pub mod frames; pub mod streamtimeout; pub mod tcprawclient; pub mod tcpreadasbytes; diff --git a/crates/streams/Cargo.toml b/crates/streams/Cargo.toml index f67f92e..196c96d 100644 --- a/crates/streams/Cargo.toml +++ b/crates/streams/Cargo.toml @@ -5,8 +5,6 @@ authors = ["Dominik Werder "] edition = "2021" [dependencies] -#tokio = { version = "1.41", features = ["io-util", "net", "time", "sync", "fs"] } -#tokio-stream = "0.1.16" futures-util = "0.3.15" pin-project = "1.0.12" serde = { version = "1.0", features = ["derive"] } @@ -32,12 +30,13 @@ http = "1" http-body = "1" http-body-util = "0.1.0" -[dev-dependencies] -taskrun = { path = "../taskrun" } +#[dev-dependencies] +#taskrun = { path = "../taskrun" } [features] wasm_transform = ["wasmer"] indev = [] +tests-runtime = [] [patch.crates-io] thiserror = { git = "https://github.com/dominikwerder/thiserror.git", branch = "cstm" } diff --git a/crates/streams/src/frames/inmem.rs b/crates/streams/src/frames/inmem.rs index 1e275e2..5cfda2e 100644 --- a/crates/streams/src/frames/inmem.rs +++ b/crates/streams/src/frames/inmem.rs @@ -14,7 +14,7 @@ use netpod::ByteSize; use std::pin::Pin; use std::task::Context; use std::task::Poll; -use tokio::io::AsyncRead; +// use tokio::io::AsyncRead; #[derive(Debug, thiserror::Error)] #[cstm(name = "InMem")] diff --git a/crates/streams/src/rangefilter2.rs b/crates/streams/src/rangefilter2.rs index a8325ec..e3f2ec9 100644 --- a/crates/streams/src/rangefilter2.rs +++ b/crates/streams/src/rangefilter2.rs @@ -1,3 +1,6 @@ +#[cfg(feature = "tests-runtime")] +mod test; + use futures_util::Stream; use futures_util::StreamExt; use items_0::streamitem::sitem_err_from_string; @@ -17,11 +20,6 @@ use std::pin::Pin; use std::task::Context; use std::task::Poll; -#[cfg(test)] -use items_0::Events; -#[cfg(test)] -use std::collections::VecDeque; - macro_rules! trace_emit { ($det:expr, $($arg:tt)*) => ( if false && $det { trace!($($arg)*); } ) } #[derive(Debug, thiserror::Error)] @@ -276,260 +274,3 @@ where debug!("drop {:?}", self); } } - -#[test] -fn test_00() { - use items_0::Empty; - use items_2::eventsdim0::EventsDim0; - let ms = 1_000_000; - let beg = TsNano::from_ms(1000 * 10); - let end = TsNano::from_ms(1000 * 20); - let mut item1 = EventsDim0::::empty(); - item1.push_back(beg.ns() + 0 * ms, 0, 3.); - item1.push_back(beg.ns() + 1 * ms, 0, 3.1); - item1.push_back(beg.ns() + 2 * ms, 0, 3.2); - item1.push_back(beg.ns() + 3 * ms, 0, 3.3); - item1.push_back(beg.ns() + 4 * ms, 0, 3.4); - item1.push_back(end.ns() - 1, 0, 4.0); - item1.push_back(end.ns() + 0, 0, 4.1); - item1.push_back(end.ns() + 1, 0, 4.1); - let w1: Box = Box::new(item1.clone()); - let e1 = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w1))); - let inp = futures_util::stream::iter([e1]); - let one_before_range = false; - let range = NanoRange::from((beg.ns(), end.ns())); - let stream = RangeFilter2::new(inp, range, one_before_range); - let fut = async move { - let tss_items = fetch_into_tss_items(stream).await; - let exp: &[&[u64]] = &[&[ - beg.ns() + 0 * ms, - beg.ns() + 1 * ms, - beg.ns() + 2 * ms, - beg.ns() + 3 * ms, - beg.ns() + 4 * ms, - end.ns() - 1, - ]]; - assert_eq!(&tss_items, &exp); - Ok::<_, Error>(()) - }; - taskrun::run(fut).unwrap(); -} - -#[test] -fn test_cut_before_00() { - use items_0::Empty; - use items_2::eventsdim0::EventsDim0; - let ms = 1_000_000; - let beg = TsNano::from_ms(1000 * 10); - let end = TsNano::from_ms(1000 * 20); - let mut items = Vec::new(); - { - let mut item = EventsDim0::::empty(); - item.push_back(beg.ns() - 1, 0, 2.9); - let w: Box = Box::new(item.clone()); - let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w))); - items.push(e); - } - { - let mut item = EventsDim0::::empty(); - item.push_back(beg.ns() + 0 * ms, 0, 3.); - item.push_back(beg.ns() + 1 * ms, 0, 3.1); - item.push_back(beg.ns() + 2 * ms, 0, 3.2); - item.push_back(beg.ns() + 3 * ms, 0, 3.3); - item.push_back(beg.ns() + 4 * ms, 0, 3.4); - item.push_back(end.ns() - 1, 0, 4.0); - item.push_back(end.ns() + 0, 0, 4.1); - item.push_back(end.ns() + 1, 0, 4.1); - let w: Box = Box::new(item.clone()); - let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w))); - items.push(e); - } - let inp = futures_util::stream::iter(items); - let one_before_range = false; - let range = NanoRange::from((beg.ns(), end.ns())); - let stream = RangeFilter2::new(inp, range, one_before_range); - let fut = async move { - let tss_items = fetch_into_tss_items(stream).await; - let exp: &[&[u64]] = &[ - // TODO in the future this empty may be discarded - &[], - &[ - beg.ns() + 0 * ms, - beg.ns() + 1 * ms, - beg.ns() + 2 * ms, - beg.ns() + 3 * ms, - beg.ns() + 4 * ms, - end.ns() - 1, - ], - ]; - assert_eq!(&tss_items, &exp); - Ok::<_, Error>(()) - }; - taskrun::run(fut).unwrap(); -} - -#[test] -fn test_one_before_00() { - use items_0::Empty; - use items_2::eventsdim0::EventsDim0; - let ms = 1_000_000; - let beg = TsNano::from_ms(1000 * 10); - let end = TsNano::from_ms(1000 * 20); - let mut items = Vec::new(); - { - let mut item = EventsDim0::::empty(); - item.push_back(beg.ns() - 1, 0, 2.9); - let w: Box = Box::new(item.clone()); - let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w))); - items.push(e); - } - { - let mut item = EventsDim0::::empty(); - item.push_back(beg.ns() + 0 * ms, 0, 3.); - item.push_back(beg.ns() + 1 * ms, 0, 3.1); - item.push_back(beg.ns() + 2 * ms, 0, 3.2); - item.push_back(beg.ns() + 3 * ms, 0, 3.3); - item.push_back(beg.ns() + 4 * ms, 0, 3.4); - item.push_back(end.ns() - 1, 0, 4.0); - item.push_back(end.ns() + 0, 0, 4.1); - item.push_back(end.ns() + 1, 0, 4.1); - let w: Box = Box::new(item.clone()); - let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w))); - items.push(e); - } - let inp = futures_util::stream::iter(items); - let one_before_range = true; - let range = NanoRange::from((beg.ns(), end.ns())); - let stream = RangeFilter2::new(inp, range, one_before_range); - let fut = async move { - let tss_items = fetch_into_tss_items(stream).await; - let exp: &[&[u64]] = &[ - // TODO in the future this empty may be discarded - &[], - &[ - // - beg.ns() - 1, - ], - &[ - beg.ns() + 0 * ms, - beg.ns() + 1 * ms, - beg.ns() + 2 * ms, - beg.ns() + 3 * ms, - beg.ns() + 4 * ms, - end.ns() - 1, - ], - ]; - assert_eq!(&tss_items, &exp); - Ok::<_, Error>(()) - }; - taskrun::run(fut).unwrap(); -} - -#[test] -fn test_one_before_01() { - use items_0::Empty; - use items_2::eventsdim0::EventsDim0; - let ms = 1_000_000; - let beg = TsNano::from_ms(1000 * 10); - let end = TsNano::from_ms(1000 * 20); - let mut items = Vec::new(); - { - let mut item = EventsDim0::::empty(); - item.push_back(beg.ns() - 1, 0, 2.9); - item.push_back(beg.ns() + 0 * ms, 0, 3.); - let w: Box = Box::new(item.clone()); - let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w))); - items.push(e); - } - { - let mut item = EventsDim0::::empty(); - item.push_back(beg.ns() + 1 * ms, 0, 3.1); - item.push_back(beg.ns() + 2 * ms, 0, 3.2); - item.push_back(beg.ns() + 3 * ms, 0, 3.3); - item.push_back(beg.ns() + 4 * ms, 0, 3.4); - item.push_back(end.ns() - 1, 0, 4.0); - item.push_back(end.ns() + 0, 0, 4.1); - item.push_back(end.ns() + 1, 0, 4.1); - let w: Box = Box::new(item.clone()); - let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w))); - items.push(e); - } - let inp = futures_util::stream::iter(items); - let one_before_range = true; - let range = NanoRange::from((beg.ns(), end.ns())); - let stream = RangeFilter2::new(inp, range, one_before_range); - let fut = async move { - let tss_items = fetch_into_tss_items(stream).await; - let exp: &[&[u64]] = &[ - // TODO in the future this empty may be discarded - // &[], - &[ - // - beg.ns() - 1, - beg.ns() + 0 * ms, - ], - &[ - beg.ns() + 1 * ms, - beg.ns() + 2 * ms, - beg.ns() + 3 * ms, - beg.ns() + 4 * ms, - end.ns() - 1, - ], - ]; - assert_eq!(&tss_items, &exp); - Ok::<_, Error>(()) - }; - taskrun::run(fut).unwrap(); -} - -#[test] -fn test_one_before_only() { - use items_0::Empty; - use items_2::eventsdim0::EventsDim0; - let _ms = 1_000_000; - let beg = TsNano::from_ms(1000 * 10); - let end = TsNano::from_ms(1000 * 20); - let mut items = Vec::new(); - { - let mut item = EventsDim0::::empty(); - item.push_back(beg.ns() - 1, 0, 2.9); - let w: Box = Box::new(item.clone()); - let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w))); - items.push(e); - } - let inp = futures_util::stream::iter(items); - let one_before_range = true; - let range = NanoRange::from((beg.ns(), end.ns())); - let stream = RangeFilter2::new(inp, range, one_before_range); - let fut = async move { - let tss_items = fetch_into_tss_items(stream).await; - let exp: &[&[u64]] = &[ - // TODO in the future this empty may be discarded - &[], - &[ - // - beg.ns() - 1, - ], - ]; - assert_eq!(&tss_items, &exp); - Ok::<_, Error>(()) - }; - taskrun::run(fut).unwrap(); -} - -#[cfg(test)] -async fn fetch_into_tss_items(mut inp: INP) -> VecDeque> -where - INP: Stream>> + Unpin, -{ - let mut tss_items = VecDeque::new(); - while let Some(e) = inp.next().await { - if let Ok(StreamItem::DataItem(RangeCompletableItem::Data(evs))) = e { - eprintln!("{:?}", evs); - tss_items.push_back(Events::tss(&evs).clone()); - } else { - eprintln!("other item ----------: {:?}", e); - } - } - tss_items -} diff --git a/crates/streams/src/rangefilter2/test.rs b/crates/streams/src/rangefilter2/test.rs new file mode 100644 index 0000000..5abe556 --- /dev/null +++ b/crates/streams/src/rangefilter2/test.rs @@ -0,0 +1,267 @@ +use crate::rangefilter2::RangeFilter2; +use futures_util::Stream; +use futures_util::StreamExt; +use items_0::streamitem::RangeCompletableItem; +use items_0::streamitem::Sitemty; +use items_0::streamitem::StreamItem; +use items_0::Events; +use netpod::range::evrange::NanoRange; +use netpod::TsNano; +use std::collections::VecDeque; + +#[test] +fn test_00() { + use items_0::Empty; + use items_2::eventsdim0::EventsDim0; + let ms = 1_000_000; + let beg = TsNano::from_ms(1000 * 10); + let end = TsNano::from_ms(1000 * 20); + let mut item1 = EventsDim0::::empty(); + item1.push_back(beg.ns() + 0 * ms, 0, 3.); + item1.push_back(beg.ns() + 1 * ms, 0, 3.1); + item1.push_back(beg.ns() + 2 * ms, 0, 3.2); + item1.push_back(beg.ns() + 3 * ms, 0, 3.3); + item1.push_back(beg.ns() + 4 * ms, 0, 3.4); + item1.push_back(end.ns() - 1, 0, 4.0); + item1.push_back(end.ns() + 0, 0, 4.1); + item1.push_back(end.ns() + 1, 0, 4.1); + let w1: Box = Box::new(item1.clone()); + let e1 = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w1))); + let inp = futures_util::stream::iter([e1]); + let one_before_range = false; + let range = NanoRange::from((beg.ns(), end.ns())); + let stream = RangeFilter2::new(inp, range, one_before_range); + let fut = async move { + let tss_items = fetch_into_tss_items(stream).await; + let exp: &[&[u64]] = &[&[ + beg.ns() + 0 * ms, + beg.ns() + 1 * ms, + beg.ns() + 2 * ms, + beg.ns() + 3 * ms, + beg.ns() + 4 * ms, + end.ns() - 1, + ]]; + assert_eq!(&tss_items, &exp); + Ok::<_, Error>(()) + }; + taskrun::run(fut).unwrap(); +} + +#[test] +fn test_cut_before_00() { + use items_0::Empty; + use items_2::eventsdim0::EventsDim0; + let ms = 1_000_000; + let beg = TsNano::from_ms(1000 * 10); + let end = TsNano::from_ms(1000 * 20); + let mut items = Vec::new(); + { + let mut item = EventsDim0::::empty(); + item.push_back(beg.ns() - 1, 0, 2.9); + let w: Box = Box::new(item.clone()); + let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w))); + items.push(e); + } + { + let mut item = EventsDim0::::empty(); + item.push_back(beg.ns() + 0 * ms, 0, 3.); + item.push_back(beg.ns() + 1 * ms, 0, 3.1); + item.push_back(beg.ns() + 2 * ms, 0, 3.2); + item.push_back(beg.ns() + 3 * ms, 0, 3.3); + item.push_back(beg.ns() + 4 * ms, 0, 3.4); + item.push_back(end.ns() - 1, 0, 4.0); + item.push_back(end.ns() + 0, 0, 4.1); + item.push_back(end.ns() + 1, 0, 4.1); + let w: Box = Box::new(item.clone()); + let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w))); + items.push(e); + } + let inp = futures_util::stream::iter(items); + let one_before_range = false; + let range = NanoRange::from((beg.ns(), end.ns())); + let stream = RangeFilter2::new(inp, range, one_before_range); + let fut = async move { + let tss_items = fetch_into_tss_items(stream).await; + let exp: &[&[u64]] = &[ + // TODO in the future this empty may be discarded + &[], + &[ + beg.ns() + 0 * ms, + beg.ns() + 1 * ms, + beg.ns() + 2 * ms, + beg.ns() + 3 * ms, + beg.ns() + 4 * ms, + end.ns() - 1, + ], + ]; + assert_eq!(&tss_items, &exp); + Ok::<_, Error>(()) + }; + taskrun::run(fut).unwrap(); +} + +#[test] +fn test_one_before_00() { + use items_0::Empty; + use items_2::eventsdim0::EventsDim0; + let ms = 1_000_000; + let beg = TsNano::from_ms(1000 * 10); + let end = TsNano::from_ms(1000 * 20); + let mut items = Vec::new(); + { + let mut item = EventsDim0::::empty(); + item.push_back(beg.ns() - 1, 0, 2.9); + let w: Box = Box::new(item.clone()); + let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w))); + items.push(e); + } + { + let mut item = EventsDim0::::empty(); + item.push_back(beg.ns() + 0 * ms, 0, 3.); + item.push_back(beg.ns() + 1 * ms, 0, 3.1); + item.push_back(beg.ns() + 2 * ms, 0, 3.2); + item.push_back(beg.ns() + 3 * ms, 0, 3.3); + item.push_back(beg.ns() + 4 * ms, 0, 3.4); + item.push_back(end.ns() - 1, 0, 4.0); + item.push_back(end.ns() + 0, 0, 4.1); + item.push_back(end.ns() + 1, 0, 4.1); + let w: Box = Box::new(item.clone()); + let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w))); + items.push(e); + } + let inp = futures_util::stream::iter(items); + let one_before_range = true; + let range = NanoRange::from((beg.ns(), end.ns())); + let stream = RangeFilter2::new(inp, range, one_before_range); + let fut = async move { + let tss_items = fetch_into_tss_items(stream).await; + let exp: &[&[u64]] = &[ + // TODO in the future this empty may be discarded + &[], + &[ + // + beg.ns() - 1, + ], + &[ + beg.ns() + 0 * ms, + beg.ns() + 1 * ms, + beg.ns() + 2 * ms, + beg.ns() + 3 * ms, + beg.ns() + 4 * ms, + end.ns() - 1, + ], + ]; + assert_eq!(&tss_items, &exp); + Ok::<_, Error>(()) + }; + taskrun::run(fut).unwrap(); +} + +#[test] +fn test_one_before_01() { + use items_0::Empty; + use items_2::eventsdim0::EventsDim0; + let ms = 1_000_000; + let beg = TsNano::from_ms(1000 * 10); + let end = TsNano::from_ms(1000 * 20); + let mut items = Vec::new(); + { + let mut item = EventsDim0::::empty(); + item.push_back(beg.ns() - 1, 0, 2.9); + item.push_back(beg.ns() + 0 * ms, 0, 3.); + let w: Box = Box::new(item.clone()); + let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w))); + items.push(e); + } + { + let mut item = EventsDim0::::empty(); + item.push_back(beg.ns() + 1 * ms, 0, 3.1); + item.push_back(beg.ns() + 2 * ms, 0, 3.2); + item.push_back(beg.ns() + 3 * ms, 0, 3.3); + item.push_back(beg.ns() + 4 * ms, 0, 3.4); + item.push_back(end.ns() - 1, 0, 4.0); + item.push_back(end.ns() + 0, 0, 4.1); + item.push_back(end.ns() + 1, 0, 4.1); + let w: Box = Box::new(item.clone()); + let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w))); + items.push(e); + } + let inp = futures_util::stream::iter(items); + let one_before_range = true; + let range = NanoRange::from((beg.ns(), end.ns())); + let stream = RangeFilter2::new(inp, range, one_before_range); + let fut = async move { + let tss_items = fetch_into_tss_items(stream).await; + let exp: &[&[u64]] = &[ + // TODO in the future this empty may be discarded + // &[], + &[ + // + beg.ns() - 1, + beg.ns() + 0 * ms, + ], + &[ + beg.ns() + 1 * ms, + beg.ns() + 2 * ms, + beg.ns() + 3 * ms, + beg.ns() + 4 * ms, + end.ns() - 1, + ], + ]; + assert_eq!(&tss_items, &exp); + Ok::<_, Error>(()) + }; + taskrun::run(fut).unwrap(); +} + +#[test] +fn test_one_before_only() { + use items_0::Empty; + use items_2::eventsdim0::EventsDim0; + let _ms = 1_000_000; + let beg = TsNano::from_ms(1000 * 10); + let end = TsNano::from_ms(1000 * 20); + let mut items = Vec::new(); + { + let mut item = EventsDim0::::empty(); + item.push_back(beg.ns() - 1, 0, 2.9); + let w: Box = Box::new(item.clone()); + let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w))); + items.push(e); + } + let inp = futures_util::stream::iter(items); + let one_before_range = true; + let range = NanoRange::from((beg.ns(), end.ns())); + let stream = RangeFilter2::new(inp, range, one_before_range); + let fut = async move { + let tss_items = fetch_into_tss_items(stream).await; + let exp: &[&[u64]] = &[ + // TODO in the future this empty may be discarded + &[], + &[ + // + beg.ns() - 1, + ], + ]; + assert_eq!(&tss_items, &exp); + Ok::<_, Error>(()) + }; + taskrun::run(fut).unwrap(); +} + +#[cfg(test)] +async fn fetch_into_tss_items(mut inp: INP) -> VecDeque> +where + INP: Stream>> + Unpin, +{ + let mut tss_items = VecDeque::new(); + while let Some(e) = inp.next().await { + if let Ok(StreamItem::DataItem(RangeCompletableItem::Data(evs))) = e { + eprintln!("{:?}", evs); + tss_items.push_back(Events::tss(&evs).clone()); + } else { + eprintln!("other item ----------: {:?}", e); + } + } + tss_items +} diff --git a/crates/streams/src/test.rs b/crates/streams/src/test.rs index c7244f9..a4ad10a 100644 --- a/crates/streams/src/test.rs +++ b/crates/streams/src/test.rs @@ -1,8 +1,5 @@ -#[cfg(test)] mod collect; -#[cfg(test)] mod events; -#[cfg(test)] mod timebin; use futures_util::stream; @@ -59,5 +56,7 @@ where F: std::future::Future>, E: std::error::Error, { - taskrun::run(fut) + // taskrun::run(fut) + let _ = fut; + todo!() }