Disentangle deps, typechecks
This commit is contained in:
@@ -3,13 +3,22 @@ use err::*;
|
|||||||
#[allow(unused)]
|
#[allow(unused)]
|
||||||
use netpod::log::*;
|
use netpod::log::*;
|
||||||
use netpod::range::evrange::NanoRange;
|
use netpod::range::evrange::NanoRange;
|
||||||
|
use netpod::timeunits::DAY;
|
||||||
|
use netpod::ByteOrder;
|
||||||
|
use netpod::DtNano;
|
||||||
use netpod::NodeConfigCached;
|
use netpod::NodeConfigCached;
|
||||||
|
use netpod::ScalarType;
|
||||||
use netpod::SfDbChannel;
|
use netpod::SfDbChannel;
|
||||||
|
use netpod::TsNano;
|
||||||
use parse::channelconfig::extract_matching_config_entry;
|
use parse::channelconfig::extract_matching_config_entry;
|
||||||
use parse::channelconfig::read_local_config;
|
use parse::channelconfig::parse_config;
|
||||||
use parse::channelconfig::ChannelConfigs;
|
use parse::channelconfig::ChannelConfigs;
|
||||||
use parse::channelconfig::ConfigEntry;
|
use parse::channelconfig::ConfigEntry;
|
||||||
use parse::channelconfig::ConfigParseError;
|
use parse::channelconfig::ConfigParseError;
|
||||||
|
use std::time::Duration;
|
||||||
|
use std::time::SystemTime;
|
||||||
|
use streams::tcprawclient::TEST_BACKEND;
|
||||||
|
use taskrun::tokio;
|
||||||
|
|
||||||
#[derive(Debug, ThisError)]
|
#[derive(Debug, ThisError)]
|
||||||
#[cstm(name = "ChannelConfig")]
|
#[cstm(name = "ChannelConfig")]
|
||||||
@@ -50,6 +59,116 @@ pub async fn config_entry_best_match(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn read_local_config_real(
|
||||||
|
channel: SfDbChannel,
|
||||||
|
ncc: &NodeConfigCached,
|
||||||
|
) -> Result<ChannelConfigs, ConfigParseError> {
|
||||||
|
use std::io::ErrorKind;
|
||||||
|
let path = ncc
|
||||||
|
.node
|
||||||
|
.sf_databuffer
|
||||||
|
.as_ref()
|
||||||
|
.ok_or_else(|| ConfigParseError::NotSupportedOnNode)?
|
||||||
|
.data_base_path
|
||||||
|
.join("config")
|
||||||
|
.join(channel.name())
|
||||||
|
.join("latest")
|
||||||
|
.join("00000_Config");
|
||||||
|
match tokio::fs::read(&path).await {
|
||||||
|
Ok(buf) => parse_config(&buf),
|
||||||
|
Err(e) => match e.kind() {
|
||||||
|
ErrorKind::NotFound => Err(ConfigParseError::FileNotFound),
|
||||||
|
ErrorKind::PermissionDenied => Err(ConfigParseError::PermissionDenied),
|
||||||
|
e => {
|
||||||
|
error!("read_local_config_real {e:?}");
|
||||||
|
Err(ConfigParseError::IO)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn read_local_config_test(
|
||||||
|
channel: SfDbChannel,
|
||||||
|
ncc: &NodeConfigCached,
|
||||||
|
) -> Result<ChannelConfigs, ConfigParseError> {
|
||||||
|
if channel.name() == "test-gen-i32-dim0-v00" {
|
||||||
|
let ts = 0;
|
||||||
|
let ret = ChannelConfigs {
|
||||||
|
format_version: 0,
|
||||||
|
channel_name: channel.name().into(),
|
||||||
|
entries: vec![ConfigEntry {
|
||||||
|
ts: TsNano::from_ns(ts),
|
||||||
|
ts_human: SystemTime::UNIX_EPOCH + Duration::from_nanos(ts as u64),
|
||||||
|
pulse: 0,
|
||||||
|
ks: 2,
|
||||||
|
bs: DtNano::from_ns(DAY),
|
||||||
|
split_count: ncc.node_config.cluster.nodes.len() as _,
|
||||||
|
status: -1,
|
||||||
|
bb: -1,
|
||||||
|
modulo: -1,
|
||||||
|
offset: -1,
|
||||||
|
precision: -1,
|
||||||
|
scalar_type: ScalarType::I32,
|
||||||
|
is_compressed: false,
|
||||||
|
is_shaped: false,
|
||||||
|
is_array: false,
|
||||||
|
byte_order: ByteOrder::Big,
|
||||||
|
compression_method: None,
|
||||||
|
shape: None,
|
||||||
|
source_name: None,
|
||||||
|
unit: None,
|
||||||
|
description: None,
|
||||||
|
optional_fields: None,
|
||||||
|
value_converter: None,
|
||||||
|
}],
|
||||||
|
};
|
||||||
|
Ok(ret)
|
||||||
|
} else if channel.name() == "test-gen-i32-dim0-v01" {
|
||||||
|
let ts = 0;
|
||||||
|
let ret = ChannelConfigs {
|
||||||
|
format_version: 0,
|
||||||
|
channel_name: channel.name().into(),
|
||||||
|
entries: vec![ConfigEntry {
|
||||||
|
ts: TsNano::from_ns(ts),
|
||||||
|
ts_human: SystemTime::UNIX_EPOCH + Duration::from_nanos(ts as u64),
|
||||||
|
pulse: 0,
|
||||||
|
ks: 2,
|
||||||
|
bs: DtNano::from_ns(DAY),
|
||||||
|
split_count: ncc.node_config.cluster.nodes.len() as _,
|
||||||
|
status: -1,
|
||||||
|
bb: -1,
|
||||||
|
modulo: -1,
|
||||||
|
offset: -1,
|
||||||
|
precision: -1,
|
||||||
|
scalar_type: ScalarType::I32,
|
||||||
|
is_compressed: false,
|
||||||
|
is_shaped: false,
|
||||||
|
is_array: false,
|
||||||
|
byte_order: ByteOrder::Big,
|
||||||
|
compression_method: None,
|
||||||
|
shape: None,
|
||||||
|
source_name: None,
|
||||||
|
unit: None,
|
||||||
|
description: None,
|
||||||
|
optional_fields: None,
|
||||||
|
value_converter: None,
|
||||||
|
}],
|
||||||
|
};
|
||||||
|
Ok(ret)
|
||||||
|
} else {
|
||||||
|
Err(ConfigParseError::NotSupported)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO can I take parameters as ref, even when used in custom streams?
|
||||||
|
async fn read_local_config(channel: SfDbChannel, ncc: NodeConfigCached) -> Result<ChannelConfigs, ConfigParseError> {
|
||||||
|
if channel.backend() == TEST_BACKEND {
|
||||||
|
read_local_config_test(channel, &ncc).await
|
||||||
|
} else {
|
||||||
|
read_local_config_real(channel, &ncc).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn channel_configs(
|
pub async fn channel_configs(
|
||||||
channel: SfDbChannel,
|
channel: SfDbChannel,
|
||||||
node_config: &NodeConfigCached,
|
node_config: &NodeConfigCached,
|
||||||
|
|||||||
@@ -27,7 +27,6 @@ err = { path = "../err" }
|
|||||||
items_0 = { path = "../items_0" }
|
items_0 = { path = "../items_0" }
|
||||||
items_proc = { path = "../items_proc" }
|
items_proc = { path = "../items_proc" }
|
||||||
netpod = { path = "../netpod" }
|
netpod = { path = "../netpod" }
|
||||||
taskrun = { path = "../taskrun" }
|
|
||||||
parse = { path = "../parse" }
|
parse = { path = "../parse" }
|
||||||
bitshuffle = { path = "../bitshuffle" }
|
bitshuffle = { path = "../bitshuffle" }
|
||||||
|
|
||||||
|
|||||||
@@ -799,7 +799,7 @@ mod test_frame {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn events_serialize() {
|
fn events_serialize() {
|
||||||
taskrun::tracing_init_testing().unwrap();
|
// taskrun::tracing_init_testing().unwrap();
|
||||||
let mut events = EventsDim0::empty();
|
let mut events = EventsDim0::empty();
|
||||||
events.push(123, 234, 55f32);
|
events.push(123, 234, 55f32);
|
||||||
let events = events;
|
let events = events;
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ edition = "2021"
|
|||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
serde_json = "1.0"
|
serde_json = "1.0"
|
||||||
humantime-serde = "1.1"
|
humantime-serde = "1.1"
|
||||||
tokio = { version = "1.28.2", features = ["fs"] }
|
#tokio = { version = "1.28.2", features = ["fs"] }
|
||||||
chrono = { version = "0.4.26", features = ["serde"] }
|
chrono = { version = "0.4.26", features = ["serde"] }
|
||||||
bytes = "1.4"
|
bytes = "1.4"
|
||||||
byteorder = "1.4"
|
byteorder = "1.4"
|
||||||
|
|||||||
@@ -22,9 +22,6 @@ use serde::Serialize;
|
|||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use std::time::SystemTime;
|
use std::time::SystemTime;
|
||||||
use tokio::io::ErrorKind;
|
|
||||||
|
|
||||||
const TEST_BACKEND: &str = "testbackend-00";
|
|
||||||
|
|
||||||
#[derive(Debug, ThisError)]
|
#[derive(Debug, ThisError)]
|
||||||
#[cstm(name = "ConfigParse")]
|
#[cstm(name = "ConfigParse")]
|
||||||
@@ -302,118 +299,6 @@ pub fn parse_config(inp: &[u8]) -> Result<ChannelConfigs, ConfigParseError> {
|
|||||||
Ok(ret)
|
Ok(ret)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn read_local_config_real(
|
|
||||||
channel: SfDbChannel,
|
|
||||||
ncc: &NodeConfigCached,
|
|
||||||
) -> Result<ChannelConfigs, ConfigParseError> {
|
|
||||||
let path = ncc
|
|
||||||
.node
|
|
||||||
.sf_databuffer
|
|
||||||
.as_ref()
|
|
||||||
.ok_or_else(|| ConfigParseError::NotSupportedOnNode)?
|
|
||||||
.data_base_path
|
|
||||||
.join("config")
|
|
||||||
.join(channel.name())
|
|
||||||
.join("latest")
|
|
||||||
.join("00000_Config");
|
|
||||||
match tokio::fs::read(&path).await {
|
|
||||||
Ok(buf) => parse_config(&buf),
|
|
||||||
Err(e) => match e.kind() {
|
|
||||||
ErrorKind::NotFound => Err(ConfigParseError::FileNotFound),
|
|
||||||
ErrorKind::PermissionDenied => Err(ConfigParseError::PermissionDenied),
|
|
||||||
e => {
|
|
||||||
error!("read_local_config_real {e:?}");
|
|
||||||
Err(ConfigParseError::IO)
|
|
||||||
}
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn read_local_config_test(
|
|
||||||
channel: SfDbChannel,
|
|
||||||
ncc: &NodeConfigCached,
|
|
||||||
) -> Result<ChannelConfigs, ConfigParseError> {
|
|
||||||
if channel.name() == "test-gen-i32-dim0-v00" {
|
|
||||||
let ts = 0;
|
|
||||||
let ret = ChannelConfigs {
|
|
||||||
format_version: 0,
|
|
||||||
channel_name: channel.name().into(),
|
|
||||||
entries: vec![ConfigEntry {
|
|
||||||
ts: TsNano::from_ns(ts),
|
|
||||||
ts_human: SystemTime::UNIX_EPOCH + Duration::from_nanos(ts as u64),
|
|
||||||
pulse: 0,
|
|
||||||
ks: 2,
|
|
||||||
bs: DtNano::from_ns(DAY),
|
|
||||||
split_count: ncc.node_config.cluster.nodes.len() as _,
|
|
||||||
status: -1,
|
|
||||||
bb: -1,
|
|
||||||
modulo: -1,
|
|
||||||
offset: -1,
|
|
||||||
precision: -1,
|
|
||||||
scalar_type: ScalarType::I32,
|
|
||||||
is_compressed: false,
|
|
||||||
is_shaped: false,
|
|
||||||
is_array: false,
|
|
||||||
byte_order: ByteOrder::Big,
|
|
||||||
compression_method: None,
|
|
||||||
shape: None,
|
|
||||||
source_name: None,
|
|
||||||
unit: None,
|
|
||||||
description: None,
|
|
||||||
optional_fields: None,
|
|
||||||
value_converter: None,
|
|
||||||
}],
|
|
||||||
};
|
|
||||||
Ok(ret)
|
|
||||||
} else if channel.name() == "test-gen-i32-dim0-v01" {
|
|
||||||
let ts = 0;
|
|
||||||
let ret = ChannelConfigs {
|
|
||||||
format_version: 0,
|
|
||||||
channel_name: channel.name().into(),
|
|
||||||
entries: vec![ConfigEntry {
|
|
||||||
ts: TsNano::from_ns(ts),
|
|
||||||
ts_human: SystemTime::UNIX_EPOCH + Duration::from_nanos(ts as u64),
|
|
||||||
pulse: 0,
|
|
||||||
ks: 2,
|
|
||||||
bs: DtNano::from_ns(DAY),
|
|
||||||
split_count: ncc.node_config.cluster.nodes.len() as _,
|
|
||||||
status: -1,
|
|
||||||
bb: -1,
|
|
||||||
modulo: -1,
|
|
||||||
offset: -1,
|
|
||||||
precision: -1,
|
|
||||||
scalar_type: ScalarType::I32,
|
|
||||||
is_compressed: false,
|
|
||||||
is_shaped: false,
|
|
||||||
is_array: false,
|
|
||||||
byte_order: ByteOrder::Big,
|
|
||||||
compression_method: None,
|
|
||||||
shape: None,
|
|
||||||
source_name: None,
|
|
||||||
unit: None,
|
|
||||||
description: None,
|
|
||||||
optional_fields: None,
|
|
||||||
value_converter: None,
|
|
||||||
}],
|
|
||||||
};
|
|
||||||
Ok(ret)
|
|
||||||
} else {
|
|
||||||
Err(ConfigParseError::NotSupported)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO can I take parameters as ref, even when used in custom streams?
|
|
||||||
pub async fn read_local_config(
|
|
||||||
channel: SfDbChannel,
|
|
||||||
ncc: NodeConfigCached,
|
|
||||||
) -> Result<ChannelConfigs, ConfigParseError> {
|
|
||||||
if channel.backend() == TEST_BACKEND {
|
|
||||||
read_local_config_test(channel, &ncc).await
|
|
||||||
} else {
|
|
||||||
read_local_config_real(channel, &ncc).await
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub enum MatchingConfigEntry<'a> {
|
pub enum MatchingConfigEntry<'a> {
|
||||||
None,
|
None,
|
||||||
|
|||||||
1
crates/streamio/src/frames.rs
Normal file
1
crates/streamio/src/frames.rs
Normal file
@@ -0,0 +1 @@
|
|||||||
|
pub mod inmem;
|
||||||
1
crates/streamio/src/frames/inmem.rs
Normal file
1
crates/streamio/src/frames/inmem.rs
Normal file
@@ -0,0 +1 @@
|
|||||||
|
|
||||||
@@ -1,3 +1,4 @@
|
|||||||
|
pub mod frames;
|
||||||
pub mod streamtimeout;
|
pub mod streamtimeout;
|
||||||
pub mod tcprawclient;
|
pub mod tcprawclient;
|
||||||
pub mod tcpreadasbytes;
|
pub mod tcpreadasbytes;
|
||||||
|
|||||||
@@ -5,8 +5,6 @@ authors = ["Dominik Werder <dominik.werder@gmail.com>"]
|
|||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
#tokio = { version = "1.41", features = ["io-util", "net", "time", "sync", "fs"] }
|
|
||||||
#tokio-stream = "0.1.16"
|
|
||||||
futures-util = "0.3.15"
|
futures-util = "0.3.15"
|
||||||
pin-project = "1.0.12"
|
pin-project = "1.0.12"
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
@@ -32,12 +30,13 @@ http = "1"
|
|||||||
http-body = "1"
|
http-body = "1"
|
||||||
http-body-util = "0.1.0"
|
http-body-util = "0.1.0"
|
||||||
|
|
||||||
[dev-dependencies]
|
#[dev-dependencies]
|
||||||
taskrun = { path = "../taskrun" }
|
#taskrun = { path = "../taskrun" }
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
wasm_transform = ["wasmer"]
|
wasm_transform = ["wasmer"]
|
||||||
indev = []
|
indev = []
|
||||||
|
tests-runtime = []
|
||||||
|
|
||||||
[patch.crates-io]
|
[patch.crates-io]
|
||||||
thiserror = { git = "https://github.com/dominikwerder/thiserror.git", branch = "cstm" }
|
thiserror = { git = "https://github.com/dominikwerder/thiserror.git", branch = "cstm" }
|
||||||
|
|||||||
@@ -14,7 +14,7 @@ use netpod::ByteSize;
|
|||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::task::Context;
|
use std::task::Context;
|
||||||
use std::task::Poll;
|
use std::task::Poll;
|
||||||
use tokio::io::AsyncRead;
|
// use tokio::io::AsyncRead;
|
||||||
|
|
||||||
#[derive(Debug, thiserror::Error)]
|
#[derive(Debug, thiserror::Error)]
|
||||||
#[cstm(name = "InMem")]
|
#[cstm(name = "InMem")]
|
||||||
|
|||||||
@@ -1,3 +1,6 @@
|
|||||||
|
#[cfg(feature = "tests-runtime")]
|
||||||
|
mod test;
|
||||||
|
|
||||||
use futures_util::Stream;
|
use futures_util::Stream;
|
||||||
use futures_util::StreamExt;
|
use futures_util::StreamExt;
|
||||||
use items_0::streamitem::sitem_err_from_string;
|
use items_0::streamitem::sitem_err_from_string;
|
||||||
@@ -17,11 +20,6 @@ use std::pin::Pin;
|
|||||||
use std::task::Context;
|
use std::task::Context;
|
||||||
use std::task::Poll;
|
use std::task::Poll;
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
use items_0::Events;
|
|
||||||
#[cfg(test)]
|
|
||||||
use std::collections::VecDeque;
|
|
||||||
|
|
||||||
macro_rules! trace_emit { ($det:expr, $($arg:tt)*) => ( if false && $det { trace!($($arg)*); } ) }
|
macro_rules! trace_emit { ($det:expr, $($arg:tt)*) => ( if false && $det { trace!($($arg)*); } ) }
|
||||||
|
|
||||||
#[derive(Debug, thiserror::Error)]
|
#[derive(Debug, thiserror::Error)]
|
||||||
@@ -276,260 +274,3 @@ where
|
|||||||
debug!("drop {:?}", self);
|
debug!("drop {:?}", self);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_00() {
|
|
||||||
use items_0::Empty;
|
|
||||||
use items_2::eventsdim0::EventsDim0;
|
|
||||||
let ms = 1_000_000;
|
|
||||||
let beg = TsNano::from_ms(1000 * 10);
|
|
||||||
let end = TsNano::from_ms(1000 * 20);
|
|
||||||
let mut item1 = EventsDim0::<f32>::empty();
|
|
||||||
item1.push_back(beg.ns() + 0 * ms, 0, 3.);
|
|
||||||
item1.push_back(beg.ns() + 1 * ms, 0, 3.1);
|
|
||||||
item1.push_back(beg.ns() + 2 * ms, 0, 3.2);
|
|
||||||
item1.push_back(beg.ns() + 3 * ms, 0, 3.3);
|
|
||||||
item1.push_back(beg.ns() + 4 * ms, 0, 3.4);
|
|
||||||
item1.push_back(end.ns() - 1, 0, 4.0);
|
|
||||||
item1.push_back(end.ns() + 0, 0, 4.1);
|
|
||||||
item1.push_back(end.ns() + 1, 0, 4.1);
|
|
||||||
let w1: Box<dyn Events> = Box::new(item1.clone());
|
|
||||||
let e1 = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w1)));
|
|
||||||
let inp = futures_util::stream::iter([e1]);
|
|
||||||
let one_before_range = false;
|
|
||||||
let range = NanoRange::from((beg.ns(), end.ns()));
|
|
||||||
let stream = RangeFilter2::new(inp, range, one_before_range);
|
|
||||||
let fut = async move {
|
|
||||||
let tss_items = fetch_into_tss_items(stream).await;
|
|
||||||
let exp: &[&[u64]] = &[&[
|
|
||||||
beg.ns() + 0 * ms,
|
|
||||||
beg.ns() + 1 * ms,
|
|
||||||
beg.ns() + 2 * ms,
|
|
||||||
beg.ns() + 3 * ms,
|
|
||||||
beg.ns() + 4 * ms,
|
|
||||||
end.ns() - 1,
|
|
||||||
]];
|
|
||||||
assert_eq!(&tss_items, &exp);
|
|
||||||
Ok::<_, Error>(())
|
|
||||||
};
|
|
||||||
taskrun::run(fut).unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_cut_before_00() {
|
|
||||||
use items_0::Empty;
|
|
||||||
use items_2::eventsdim0::EventsDim0;
|
|
||||||
let ms = 1_000_000;
|
|
||||||
let beg = TsNano::from_ms(1000 * 10);
|
|
||||||
let end = TsNano::from_ms(1000 * 20);
|
|
||||||
let mut items = Vec::new();
|
|
||||||
{
|
|
||||||
let mut item = EventsDim0::<f32>::empty();
|
|
||||||
item.push_back(beg.ns() - 1, 0, 2.9);
|
|
||||||
let w: Box<dyn Events> = Box::new(item.clone());
|
|
||||||
let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w)));
|
|
||||||
items.push(e);
|
|
||||||
}
|
|
||||||
{
|
|
||||||
let mut item = EventsDim0::<f32>::empty();
|
|
||||||
item.push_back(beg.ns() + 0 * ms, 0, 3.);
|
|
||||||
item.push_back(beg.ns() + 1 * ms, 0, 3.1);
|
|
||||||
item.push_back(beg.ns() + 2 * ms, 0, 3.2);
|
|
||||||
item.push_back(beg.ns() + 3 * ms, 0, 3.3);
|
|
||||||
item.push_back(beg.ns() + 4 * ms, 0, 3.4);
|
|
||||||
item.push_back(end.ns() - 1, 0, 4.0);
|
|
||||||
item.push_back(end.ns() + 0, 0, 4.1);
|
|
||||||
item.push_back(end.ns() + 1, 0, 4.1);
|
|
||||||
let w: Box<dyn Events> = Box::new(item.clone());
|
|
||||||
let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w)));
|
|
||||||
items.push(e);
|
|
||||||
}
|
|
||||||
let inp = futures_util::stream::iter(items);
|
|
||||||
let one_before_range = false;
|
|
||||||
let range = NanoRange::from((beg.ns(), end.ns()));
|
|
||||||
let stream = RangeFilter2::new(inp, range, one_before_range);
|
|
||||||
let fut = async move {
|
|
||||||
let tss_items = fetch_into_tss_items(stream).await;
|
|
||||||
let exp: &[&[u64]] = &[
|
|
||||||
// TODO in the future this empty may be discarded
|
|
||||||
&[],
|
|
||||||
&[
|
|
||||||
beg.ns() + 0 * ms,
|
|
||||||
beg.ns() + 1 * ms,
|
|
||||||
beg.ns() + 2 * ms,
|
|
||||||
beg.ns() + 3 * ms,
|
|
||||||
beg.ns() + 4 * ms,
|
|
||||||
end.ns() - 1,
|
|
||||||
],
|
|
||||||
];
|
|
||||||
assert_eq!(&tss_items, &exp);
|
|
||||||
Ok::<_, Error>(())
|
|
||||||
};
|
|
||||||
taskrun::run(fut).unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_one_before_00() {
|
|
||||||
use items_0::Empty;
|
|
||||||
use items_2::eventsdim0::EventsDim0;
|
|
||||||
let ms = 1_000_000;
|
|
||||||
let beg = TsNano::from_ms(1000 * 10);
|
|
||||||
let end = TsNano::from_ms(1000 * 20);
|
|
||||||
let mut items = Vec::new();
|
|
||||||
{
|
|
||||||
let mut item = EventsDim0::<f32>::empty();
|
|
||||||
item.push_back(beg.ns() - 1, 0, 2.9);
|
|
||||||
let w: Box<dyn Events> = Box::new(item.clone());
|
|
||||||
let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w)));
|
|
||||||
items.push(e);
|
|
||||||
}
|
|
||||||
{
|
|
||||||
let mut item = EventsDim0::<f32>::empty();
|
|
||||||
item.push_back(beg.ns() + 0 * ms, 0, 3.);
|
|
||||||
item.push_back(beg.ns() + 1 * ms, 0, 3.1);
|
|
||||||
item.push_back(beg.ns() + 2 * ms, 0, 3.2);
|
|
||||||
item.push_back(beg.ns() + 3 * ms, 0, 3.3);
|
|
||||||
item.push_back(beg.ns() + 4 * ms, 0, 3.4);
|
|
||||||
item.push_back(end.ns() - 1, 0, 4.0);
|
|
||||||
item.push_back(end.ns() + 0, 0, 4.1);
|
|
||||||
item.push_back(end.ns() + 1, 0, 4.1);
|
|
||||||
let w: Box<dyn Events> = Box::new(item.clone());
|
|
||||||
let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w)));
|
|
||||||
items.push(e);
|
|
||||||
}
|
|
||||||
let inp = futures_util::stream::iter(items);
|
|
||||||
let one_before_range = true;
|
|
||||||
let range = NanoRange::from((beg.ns(), end.ns()));
|
|
||||||
let stream = RangeFilter2::new(inp, range, one_before_range);
|
|
||||||
let fut = async move {
|
|
||||||
let tss_items = fetch_into_tss_items(stream).await;
|
|
||||||
let exp: &[&[u64]] = &[
|
|
||||||
// TODO in the future this empty may be discarded
|
|
||||||
&[],
|
|
||||||
&[
|
|
||||||
//
|
|
||||||
beg.ns() - 1,
|
|
||||||
],
|
|
||||||
&[
|
|
||||||
beg.ns() + 0 * ms,
|
|
||||||
beg.ns() + 1 * ms,
|
|
||||||
beg.ns() + 2 * ms,
|
|
||||||
beg.ns() + 3 * ms,
|
|
||||||
beg.ns() + 4 * ms,
|
|
||||||
end.ns() - 1,
|
|
||||||
],
|
|
||||||
];
|
|
||||||
assert_eq!(&tss_items, &exp);
|
|
||||||
Ok::<_, Error>(())
|
|
||||||
};
|
|
||||||
taskrun::run(fut).unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_one_before_01() {
|
|
||||||
use items_0::Empty;
|
|
||||||
use items_2::eventsdim0::EventsDim0;
|
|
||||||
let ms = 1_000_000;
|
|
||||||
let beg = TsNano::from_ms(1000 * 10);
|
|
||||||
let end = TsNano::from_ms(1000 * 20);
|
|
||||||
let mut items = Vec::new();
|
|
||||||
{
|
|
||||||
let mut item = EventsDim0::<f32>::empty();
|
|
||||||
item.push_back(beg.ns() - 1, 0, 2.9);
|
|
||||||
item.push_back(beg.ns() + 0 * ms, 0, 3.);
|
|
||||||
let w: Box<dyn Events> = Box::new(item.clone());
|
|
||||||
let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w)));
|
|
||||||
items.push(e);
|
|
||||||
}
|
|
||||||
{
|
|
||||||
let mut item = EventsDim0::<f32>::empty();
|
|
||||||
item.push_back(beg.ns() + 1 * ms, 0, 3.1);
|
|
||||||
item.push_back(beg.ns() + 2 * ms, 0, 3.2);
|
|
||||||
item.push_back(beg.ns() + 3 * ms, 0, 3.3);
|
|
||||||
item.push_back(beg.ns() + 4 * ms, 0, 3.4);
|
|
||||||
item.push_back(end.ns() - 1, 0, 4.0);
|
|
||||||
item.push_back(end.ns() + 0, 0, 4.1);
|
|
||||||
item.push_back(end.ns() + 1, 0, 4.1);
|
|
||||||
let w: Box<dyn Events> = Box::new(item.clone());
|
|
||||||
let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w)));
|
|
||||||
items.push(e);
|
|
||||||
}
|
|
||||||
let inp = futures_util::stream::iter(items);
|
|
||||||
let one_before_range = true;
|
|
||||||
let range = NanoRange::from((beg.ns(), end.ns()));
|
|
||||||
let stream = RangeFilter2::new(inp, range, one_before_range);
|
|
||||||
let fut = async move {
|
|
||||||
let tss_items = fetch_into_tss_items(stream).await;
|
|
||||||
let exp: &[&[u64]] = &[
|
|
||||||
// TODO in the future this empty may be discarded
|
|
||||||
// &[],
|
|
||||||
&[
|
|
||||||
//
|
|
||||||
beg.ns() - 1,
|
|
||||||
beg.ns() + 0 * ms,
|
|
||||||
],
|
|
||||||
&[
|
|
||||||
beg.ns() + 1 * ms,
|
|
||||||
beg.ns() + 2 * ms,
|
|
||||||
beg.ns() + 3 * ms,
|
|
||||||
beg.ns() + 4 * ms,
|
|
||||||
end.ns() - 1,
|
|
||||||
],
|
|
||||||
];
|
|
||||||
assert_eq!(&tss_items, &exp);
|
|
||||||
Ok::<_, Error>(())
|
|
||||||
};
|
|
||||||
taskrun::run(fut).unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_one_before_only() {
|
|
||||||
use items_0::Empty;
|
|
||||||
use items_2::eventsdim0::EventsDim0;
|
|
||||||
let _ms = 1_000_000;
|
|
||||||
let beg = TsNano::from_ms(1000 * 10);
|
|
||||||
let end = TsNano::from_ms(1000 * 20);
|
|
||||||
let mut items = Vec::new();
|
|
||||||
{
|
|
||||||
let mut item = EventsDim0::<f32>::empty();
|
|
||||||
item.push_back(beg.ns() - 1, 0, 2.9);
|
|
||||||
let w: Box<dyn Events> = Box::new(item.clone());
|
|
||||||
let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w)));
|
|
||||||
items.push(e);
|
|
||||||
}
|
|
||||||
let inp = futures_util::stream::iter(items);
|
|
||||||
let one_before_range = true;
|
|
||||||
let range = NanoRange::from((beg.ns(), end.ns()));
|
|
||||||
let stream = RangeFilter2::new(inp, range, one_before_range);
|
|
||||||
let fut = async move {
|
|
||||||
let tss_items = fetch_into_tss_items(stream).await;
|
|
||||||
let exp: &[&[u64]] = &[
|
|
||||||
// TODO in the future this empty may be discarded
|
|
||||||
&[],
|
|
||||||
&[
|
|
||||||
//
|
|
||||||
beg.ns() - 1,
|
|
||||||
],
|
|
||||||
];
|
|
||||||
assert_eq!(&tss_items, &exp);
|
|
||||||
Ok::<_, Error>(())
|
|
||||||
};
|
|
||||||
taskrun::run(fut).unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
async fn fetch_into_tss_items<INP>(mut inp: INP) -> VecDeque<VecDeque<u64>>
|
|
||||||
where
|
|
||||||
INP: Stream<Item = Sitemty<Box<dyn Events>>> + Unpin,
|
|
||||||
{
|
|
||||||
let mut tss_items = VecDeque::new();
|
|
||||||
while let Some(e) = inp.next().await {
|
|
||||||
if let Ok(StreamItem::DataItem(RangeCompletableItem::Data(evs))) = e {
|
|
||||||
eprintln!("{:?}", evs);
|
|
||||||
tss_items.push_back(Events::tss(&evs).clone());
|
|
||||||
} else {
|
|
||||||
eprintln!("other item ----------: {:?}", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
tss_items
|
|
||||||
}
|
|
||||||
|
|||||||
267
crates/streams/src/rangefilter2/test.rs
Normal file
267
crates/streams/src/rangefilter2/test.rs
Normal file
@@ -0,0 +1,267 @@
|
|||||||
|
use crate::rangefilter2::RangeFilter2;
|
||||||
|
use futures_util::Stream;
|
||||||
|
use futures_util::StreamExt;
|
||||||
|
use items_0::streamitem::RangeCompletableItem;
|
||||||
|
use items_0::streamitem::Sitemty;
|
||||||
|
use items_0::streamitem::StreamItem;
|
||||||
|
use items_0::Events;
|
||||||
|
use netpod::range::evrange::NanoRange;
|
||||||
|
use netpod::TsNano;
|
||||||
|
use std::collections::VecDeque;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_00() {
|
||||||
|
use items_0::Empty;
|
||||||
|
use items_2::eventsdim0::EventsDim0;
|
||||||
|
let ms = 1_000_000;
|
||||||
|
let beg = TsNano::from_ms(1000 * 10);
|
||||||
|
let end = TsNano::from_ms(1000 * 20);
|
||||||
|
let mut item1 = EventsDim0::<f32>::empty();
|
||||||
|
item1.push_back(beg.ns() + 0 * ms, 0, 3.);
|
||||||
|
item1.push_back(beg.ns() + 1 * ms, 0, 3.1);
|
||||||
|
item1.push_back(beg.ns() + 2 * ms, 0, 3.2);
|
||||||
|
item1.push_back(beg.ns() + 3 * ms, 0, 3.3);
|
||||||
|
item1.push_back(beg.ns() + 4 * ms, 0, 3.4);
|
||||||
|
item1.push_back(end.ns() - 1, 0, 4.0);
|
||||||
|
item1.push_back(end.ns() + 0, 0, 4.1);
|
||||||
|
item1.push_back(end.ns() + 1, 0, 4.1);
|
||||||
|
let w1: Box<dyn Events> = Box::new(item1.clone());
|
||||||
|
let e1 = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w1)));
|
||||||
|
let inp = futures_util::stream::iter([e1]);
|
||||||
|
let one_before_range = false;
|
||||||
|
let range = NanoRange::from((beg.ns(), end.ns()));
|
||||||
|
let stream = RangeFilter2::new(inp, range, one_before_range);
|
||||||
|
let fut = async move {
|
||||||
|
let tss_items = fetch_into_tss_items(stream).await;
|
||||||
|
let exp: &[&[u64]] = &[&[
|
||||||
|
beg.ns() + 0 * ms,
|
||||||
|
beg.ns() + 1 * ms,
|
||||||
|
beg.ns() + 2 * ms,
|
||||||
|
beg.ns() + 3 * ms,
|
||||||
|
beg.ns() + 4 * ms,
|
||||||
|
end.ns() - 1,
|
||||||
|
]];
|
||||||
|
assert_eq!(&tss_items, &exp);
|
||||||
|
Ok::<_, Error>(())
|
||||||
|
};
|
||||||
|
taskrun::run(fut).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_cut_before_00() {
|
||||||
|
use items_0::Empty;
|
||||||
|
use items_2::eventsdim0::EventsDim0;
|
||||||
|
let ms = 1_000_000;
|
||||||
|
let beg = TsNano::from_ms(1000 * 10);
|
||||||
|
let end = TsNano::from_ms(1000 * 20);
|
||||||
|
let mut items = Vec::new();
|
||||||
|
{
|
||||||
|
let mut item = EventsDim0::<f32>::empty();
|
||||||
|
item.push_back(beg.ns() - 1, 0, 2.9);
|
||||||
|
let w: Box<dyn Events> = Box::new(item.clone());
|
||||||
|
let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w)));
|
||||||
|
items.push(e);
|
||||||
|
}
|
||||||
|
{
|
||||||
|
let mut item = EventsDim0::<f32>::empty();
|
||||||
|
item.push_back(beg.ns() + 0 * ms, 0, 3.);
|
||||||
|
item.push_back(beg.ns() + 1 * ms, 0, 3.1);
|
||||||
|
item.push_back(beg.ns() + 2 * ms, 0, 3.2);
|
||||||
|
item.push_back(beg.ns() + 3 * ms, 0, 3.3);
|
||||||
|
item.push_back(beg.ns() + 4 * ms, 0, 3.4);
|
||||||
|
item.push_back(end.ns() - 1, 0, 4.0);
|
||||||
|
item.push_back(end.ns() + 0, 0, 4.1);
|
||||||
|
item.push_back(end.ns() + 1, 0, 4.1);
|
||||||
|
let w: Box<dyn Events> = Box::new(item.clone());
|
||||||
|
let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w)));
|
||||||
|
items.push(e);
|
||||||
|
}
|
||||||
|
let inp = futures_util::stream::iter(items);
|
||||||
|
let one_before_range = false;
|
||||||
|
let range = NanoRange::from((beg.ns(), end.ns()));
|
||||||
|
let stream = RangeFilter2::new(inp, range, one_before_range);
|
||||||
|
let fut = async move {
|
||||||
|
let tss_items = fetch_into_tss_items(stream).await;
|
||||||
|
let exp: &[&[u64]] = &[
|
||||||
|
// TODO in the future this empty may be discarded
|
||||||
|
&[],
|
||||||
|
&[
|
||||||
|
beg.ns() + 0 * ms,
|
||||||
|
beg.ns() + 1 * ms,
|
||||||
|
beg.ns() + 2 * ms,
|
||||||
|
beg.ns() + 3 * ms,
|
||||||
|
beg.ns() + 4 * ms,
|
||||||
|
end.ns() - 1,
|
||||||
|
],
|
||||||
|
];
|
||||||
|
assert_eq!(&tss_items, &exp);
|
||||||
|
Ok::<_, Error>(())
|
||||||
|
};
|
||||||
|
taskrun::run(fut).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_one_before_00() {
|
||||||
|
use items_0::Empty;
|
||||||
|
use items_2::eventsdim0::EventsDim0;
|
||||||
|
let ms = 1_000_000;
|
||||||
|
let beg = TsNano::from_ms(1000 * 10);
|
||||||
|
let end = TsNano::from_ms(1000 * 20);
|
||||||
|
let mut items = Vec::new();
|
||||||
|
{
|
||||||
|
let mut item = EventsDim0::<f32>::empty();
|
||||||
|
item.push_back(beg.ns() - 1, 0, 2.9);
|
||||||
|
let w: Box<dyn Events> = Box::new(item.clone());
|
||||||
|
let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w)));
|
||||||
|
items.push(e);
|
||||||
|
}
|
||||||
|
{
|
||||||
|
let mut item = EventsDim0::<f32>::empty();
|
||||||
|
item.push_back(beg.ns() + 0 * ms, 0, 3.);
|
||||||
|
item.push_back(beg.ns() + 1 * ms, 0, 3.1);
|
||||||
|
item.push_back(beg.ns() + 2 * ms, 0, 3.2);
|
||||||
|
item.push_back(beg.ns() + 3 * ms, 0, 3.3);
|
||||||
|
item.push_back(beg.ns() + 4 * ms, 0, 3.4);
|
||||||
|
item.push_back(end.ns() - 1, 0, 4.0);
|
||||||
|
item.push_back(end.ns() + 0, 0, 4.1);
|
||||||
|
item.push_back(end.ns() + 1, 0, 4.1);
|
||||||
|
let w: Box<dyn Events> = Box::new(item.clone());
|
||||||
|
let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w)));
|
||||||
|
items.push(e);
|
||||||
|
}
|
||||||
|
let inp = futures_util::stream::iter(items);
|
||||||
|
let one_before_range = true;
|
||||||
|
let range = NanoRange::from((beg.ns(), end.ns()));
|
||||||
|
let stream = RangeFilter2::new(inp, range, one_before_range);
|
||||||
|
let fut = async move {
|
||||||
|
let tss_items = fetch_into_tss_items(stream).await;
|
||||||
|
let exp: &[&[u64]] = &[
|
||||||
|
// TODO in the future this empty may be discarded
|
||||||
|
&[],
|
||||||
|
&[
|
||||||
|
//
|
||||||
|
beg.ns() - 1,
|
||||||
|
],
|
||||||
|
&[
|
||||||
|
beg.ns() + 0 * ms,
|
||||||
|
beg.ns() + 1 * ms,
|
||||||
|
beg.ns() + 2 * ms,
|
||||||
|
beg.ns() + 3 * ms,
|
||||||
|
beg.ns() + 4 * ms,
|
||||||
|
end.ns() - 1,
|
||||||
|
],
|
||||||
|
];
|
||||||
|
assert_eq!(&tss_items, &exp);
|
||||||
|
Ok::<_, Error>(())
|
||||||
|
};
|
||||||
|
taskrun::run(fut).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_one_before_01() {
|
||||||
|
use items_0::Empty;
|
||||||
|
use items_2::eventsdim0::EventsDim0;
|
||||||
|
let ms = 1_000_000;
|
||||||
|
let beg = TsNano::from_ms(1000 * 10);
|
||||||
|
let end = TsNano::from_ms(1000 * 20);
|
||||||
|
let mut items = Vec::new();
|
||||||
|
{
|
||||||
|
let mut item = EventsDim0::<f32>::empty();
|
||||||
|
item.push_back(beg.ns() - 1, 0, 2.9);
|
||||||
|
item.push_back(beg.ns() + 0 * ms, 0, 3.);
|
||||||
|
let w: Box<dyn Events> = Box::new(item.clone());
|
||||||
|
let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w)));
|
||||||
|
items.push(e);
|
||||||
|
}
|
||||||
|
{
|
||||||
|
let mut item = EventsDim0::<f32>::empty();
|
||||||
|
item.push_back(beg.ns() + 1 * ms, 0, 3.1);
|
||||||
|
item.push_back(beg.ns() + 2 * ms, 0, 3.2);
|
||||||
|
item.push_back(beg.ns() + 3 * ms, 0, 3.3);
|
||||||
|
item.push_back(beg.ns() + 4 * ms, 0, 3.4);
|
||||||
|
item.push_back(end.ns() - 1, 0, 4.0);
|
||||||
|
item.push_back(end.ns() + 0, 0, 4.1);
|
||||||
|
item.push_back(end.ns() + 1, 0, 4.1);
|
||||||
|
let w: Box<dyn Events> = Box::new(item.clone());
|
||||||
|
let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w)));
|
||||||
|
items.push(e);
|
||||||
|
}
|
||||||
|
let inp = futures_util::stream::iter(items);
|
||||||
|
let one_before_range = true;
|
||||||
|
let range = NanoRange::from((beg.ns(), end.ns()));
|
||||||
|
let stream = RangeFilter2::new(inp, range, one_before_range);
|
||||||
|
let fut = async move {
|
||||||
|
let tss_items = fetch_into_tss_items(stream).await;
|
||||||
|
let exp: &[&[u64]] = &[
|
||||||
|
// TODO in the future this empty may be discarded
|
||||||
|
// &[],
|
||||||
|
&[
|
||||||
|
//
|
||||||
|
beg.ns() - 1,
|
||||||
|
beg.ns() + 0 * ms,
|
||||||
|
],
|
||||||
|
&[
|
||||||
|
beg.ns() + 1 * ms,
|
||||||
|
beg.ns() + 2 * ms,
|
||||||
|
beg.ns() + 3 * ms,
|
||||||
|
beg.ns() + 4 * ms,
|
||||||
|
end.ns() - 1,
|
||||||
|
],
|
||||||
|
];
|
||||||
|
assert_eq!(&tss_items, &exp);
|
||||||
|
Ok::<_, Error>(())
|
||||||
|
};
|
||||||
|
taskrun::run(fut).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_one_before_only() {
|
||||||
|
use items_0::Empty;
|
||||||
|
use items_2::eventsdim0::EventsDim0;
|
||||||
|
let _ms = 1_000_000;
|
||||||
|
let beg = TsNano::from_ms(1000 * 10);
|
||||||
|
let end = TsNano::from_ms(1000 * 20);
|
||||||
|
let mut items = Vec::new();
|
||||||
|
{
|
||||||
|
let mut item = EventsDim0::<f32>::empty();
|
||||||
|
item.push_back(beg.ns() - 1, 0, 2.9);
|
||||||
|
let w: Box<dyn Events> = Box::new(item.clone());
|
||||||
|
let e: Sitemty<_> = Ok(StreamItem::DataItem(RangeCompletableItem::Data(w)));
|
||||||
|
items.push(e);
|
||||||
|
}
|
||||||
|
let inp = futures_util::stream::iter(items);
|
||||||
|
let one_before_range = true;
|
||||||
|
let range = NanoRange::from((beg.ns(), end.ns()));
|
||||||
|
let stream = RangeFilter2::new(inp, range, one_before_range);
|
||||||
|
let fut = async move {
|
||||||
|
let tss_items = fetch_into_tss_items(stream).await;
|
||||||
|
let exp: &[&[u64]] = &[
|
||||||
|
// TODO in the future this empty may be discarded
|
||||||
|
&[],
|
||||||
|
&[
|
||||||
|
//
|
||||||
|
beg.ns() - 1,
|
||||||
|
],
|
||||||
|
];
|
||||||
|
assert_eq!(&tss_items, &exp);
|
||||||
|
Ok::<_, Error>(())
|
||||||
|
};
|
||||||
|
taskrun::run(fut).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
async fn fetch_into_tss_items<INP>(mut inp: INP) -> VecDeque<VecDeque<u64>>
|
||||||
|
where
|
||||||
|
INP: Stream<Item = Sitemty<Box<dyn Events>>> + Unpin,
|
||||||
|
{
|
||||||
|
let mut tss_items = VecDeque::new();
|
||||||
|
while let Some(e) = inp.next().await {
|
||||||
|
if let Ok(StreamItem::DataItem(RangeCompletableItem::Data(evs))) = e {
|
||||||
|
eprintln!("{:?}", evs);
|
||||||
|
tss_items.push_back(Events::tss(&evs).clone());
|
||||||
|
} else {
|
||||||
|
eprintln!("other item ----------: {:?}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tss_items
|
||||||
|
}
|
||||||
@@ -1,8 +1,5 @@
|
|||||||
#[cfg(test)]
|
|
||||||
mod collect;
|
mod collect;
|
||||||
#[cfg(test)]
|
|
||||||
mod events;
|
mod events;
|
||||||
#[cfg(test)]
|
|
||||||
mod timebin;
|
mod timebin;
|
||||||
|
|
||||||
use futures_util::stream;
|
use futures_util::stream;
|
||||||
@@ -59,5 +56,7 @@ where
|
|||||||
F: std::future::Future<Output = Result<T, E>>,
|
F: std::future::Future<Output = Result<T, E>>,
|
||||||
E: std::error::Error,
|
E: std::error::Error,
|
||||||
{
|
{
|
||||||
taskrun::run(fut)
|
// taskrun::run(fut)
|
||||||
|
let _ = fut;
|
||||||
|
todo!()
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user