From 1baa78bd3a6a0f7d831e082f3d94cac0f51a0566 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Sat, 18 Mar 2023 09:25:04 +0100 Subject: [PATCH] WIP refactor --- daqbuffer/src/bin/daqbuffer.rs | 23 ++- daqbufp2/src/client.rs | 2 +- daqbufp2/src/test/api4/binnedjson.rs | 2 +- daqbufp2/src/test/api4/eventsjson.rs | 2 +- daqbufp2/src/test/binnedbinary.rs | 2 +- daqbufp2/src/test/events.rs | 2 +- daqbufp2/src/test/timeweightedjson.rs | 2 +- disk/src/aggtest.rs | 24 ++- disk/src/channelconfig.rs | 4 +- disk/src/dataopen.rs | 38 +++-- disk/src/eventblobs.rs | 17 ++- disk/src/gen.rs | 67 ++++---- disk/src/index.rs | 26 ++-- disk/src/paths.rs | 20 +-- disk/src/raw/conn.rs | 10 +- dq/src/bin/dq.rs | 4 +- httpret/src/api1.rs | 2 +- items/src/lib.rs | 4 +- items_0/src/collect_s.rs | 2 +- items_0/src/items_0.rs | 22 +-- items_2/src/binsdim0.rs | 2 +- items_2/src/binsxbindim0.rs | 4 +- items_2/src/channelevents.rs | 4 +- items_2/src/eventsdim0.rs | 4 +- items_2/src/eventsdim1.rs | 2 +- items_2/src/eventsxbindim0.rs | 2 +- items_2/src/items_2.rs | 3 +- items_2/src/test.rs | 3 +- netpod/src/netpod.rs | 210 +++++++------------------- netpod/src/range.rs | 1 + netpod/src/range/evrange.rs | 133 ++++++++++++++++ nodenet/src/channelconfig.rs | 2 +- nodenet/src/conn/test.rs | 2 +- parse/src/channelconfig.rs | 27 +++- scyllaconn/src/scyllaconn.rs | 3 +- scyllaconn/src/status.rs | 3 +- streams/src/collect.rs | 2 +- streams/src/eventchunker.rs | 2 +- streams/src/rangefilter2.rs | 2 +- 39 files changed, 381 insertions(+), 305 deletions(-) create mode 100644 netpod/src/range.rs create mode 100644 netpod/src/range/evrange.rs diff --git a/daqbuffer/src/bin/daqbuffer.rs b/daqbuffer/src/bin/daqbuffer.rs index 0fb1aa1..7d1fcf3 100644 --- a/daqbuffer/src/bin/daqbuffer.rs +++ b/daqbuffer/src/bin/daqbuffer.rs @@ -1,10 +1,17 @@ -use chrono::{DateTime, Duration, Utc}; +use chrono::DateTime; +use chrono::Duration; +use chrono::Utc; use clap::Parser; -use daqbuffer::cli::{ClientType, Opts, SubCmd}; +use daqbuffer::cli::ClientType; +use daqbuffer::cli::Opts; +use daqbuffer::cli::SubCmd; use err::Error; use netpod::log::*; use netpod::query::CacheUsage; -use netpod::{NodeConfig, NodeConfigCached, ProxyConfig}; +use netpod::NodeConfig; +use netpod::NodeConfigCached; +use netpod::ProxyConfig; +use netpod::TsNano; use tokio::fs::File; use tokio::io::AsyncReadExt; @@ -122,8 +129,12 @@ async fn go() -> Result<(), Error> { #[allow(unused)] fn simple_fetch() { use daqbuffer::err::ErrConv; - use netpod::Nanos; - use netpod::{timeunits::*, ByteOrder, Channel, ChannelConfig, ScalarType, Shape}; + use netpod::timeunits::*; + use netpod::ByteOrder; + use netpod::Channel; + use netpod::ChannelConfig; + use netpod::ScalarType; + use netpod::Shape; taskrun::run(async { let _rh = daqbufp2::nodes::require_test_hosts_running()?; let t1 = chrono::Utc::now(); @@ -135,7 +146,7 @@ fn simple_fetch() { series: None, }, keyspace: 3, - time_bin_size: Nanos { ns: DAY }, + time_bin_size: TsNano(DAY), array: true, scalar_type: ScalarType::F64, shape: Shape::Wave(42), diff --git a/daqbufp2/src/client.rs b/daqbufp2/src/client.rs index 6d5a1fe..3b8e5d5 100644 --- a/daqbufp2/src/client.rs +++ b/daqbufp2/src/client.rs @@ -11,11 +11,11 @@ use items_0::streamitem::StreamItem; use netpod::log::*; use netpod::query::BinnedQuery; use netpod::query::CacheUsage; +use netpod::range::evrange::NanoRange; use netpod::AppendToUrl; use netpod::ByteSize; use netpod::Channel; use netpod::HostPort; -use netpod::NanoRange; use netpod::PerfOpts; use netpod::APP_OCTET; use streams::frames::inmem::InMemoryFrameAsyncReadStream; diff --git a/daqbufp2/src/test/api4/binnedjson.rs b/daqbufp2/src/test/api4/binnedjson.rs index b5e4947..56a053e 100644 --- a/daqbufp2/src/test/api4/binnedjson.rs +++ b/daqbufp2/src/test/api4/binnedjson.rs @@ -7,11 +7,11 @@ use http::StatusCode; use hyper::Body; use netpod::log::*; use netpod::query::BinnedQuery; +use netpod::range::evrange::NanoRange; use netpod::AppendToUrl; use netpod::Channel; use netpod::Cluster; use netpod::HostPort; -use netpod::NanoRange; use netpod::APP_JSON; use serde_json::Value as JsonValue; use url::Url; diff --git a/daqbufp2/src/test/api4/eventsjson.rs b/daqbufp2/src/test/api4/eventsjson.rs index c38a068..3df3a3b 100644 --- a/daqbufp2/src/test/api4/eventsjson.rs +++ b/daqbufp2/src/test/api4/eventsjson.rs @@ -7,11 +7,11 @@ use http::StatusCode; use hyper::Body; use netpod::log::*; use netpod::query::PlainEventsQuery; +use netpod::range::evrange::NanoRange; use netpod::AppendToUrl; use netpod::Channel; use netpod::Cluster; use netpod::HostPort; -use netpod::NanoRange; use netpod::APP_JSON; use serde_json::Value as JsonValue; use url::Url; diff --git a/daqbufp2/src/test/binnedbinary.rs b/daqbufp2/src/test/binnedbinary.rs index 863b450..46d2a00 100644 --- a/daqbufp2/src/test/binnedbinary.rs +++ b/daqbufp2/src/test/binnedbinary.rs @@ -13,11 +13,11 @@ use items_0::subfr::SubFrId; use netpod::log::*; use netpod::query::BinnedQuery; use netpod::query::CacheUsage; +use netpod::range::evrange::NanoRange; use netpod::AppendToUrl; use netpod::Channel; use netpod::Cluster; use netpod::HostPort; -use netpod::NanoRange; use netpod::PerfOpts; use netpod::APP_OCTET; use serde::de::DeserializeOwned; diff --git a/daqbufp2/src/test/events.rs b/daqbufp2/src/test/events.rs index dfd3d96..d2b0ba7 100644 --- a/daqbufp2/src/test/events.rs +++ b/daqbufp2/src/test/events.rs @@ -12,11 +12,11 @@ use hyper::Body; use items_0::streamitem::StreamItem; use netpod::log::*; use netpod::query::PlainEventsQuery; +use netpod::range::evrange::NanoRange; use netpod::AppendToUrl; use netpod::Channel; use netpod::Cluster; use netpod::HostPort; -use netpod::NanoRange; use netpod::PerfOpts; use netpod::APP_JSON; use netpod::APP_OCTET; diff --git a/daqbufp2/src/test/timeweightedjson.rs b/daqbufp2/src/test/timeweightedjson.rs index 47b709b..6ff25fa 100644 --- a/daqbufp2/src/test/timeweightedjson.rs +++ b/daqbufp2/src/test/timeweightedjson.rs @@ -8,10 +8,10 @@ use hyper::Body; use netpod::log::*; use netpod::query::BinnedQuery; use netpod::query::CacheUsage; +use netpod::range::evrange::NanoRange; use netpod::AppendToUrl; use netpod::Channel; use netpod::Cluster; -use netpod::NanoRange; use netpod::APP_JSON; use std::time::Duration; use url::Url; diff --git a/disk/src/aggtest.rs b/disk/src/aggtest.rs index ba90346..7220b6d 100644 --- a/disk/src/aggtest.rs +++ b/disk/src/aggtest.rs @@ -1,9 +1,17 @@ use crate::eventblobs::EventChunkerMultifile; -use netpod::{test_data_base_path_databuffer, timeunits::*, SfDatabuffer}; -use netpod::{ByteOrder, ByteSize, Channel, ChannelConfig, NanoRange, Nanos, Node, ScalarType, Shape}; +use netpod::range::evrange::NanoRange; +use netpod::test_data_base_path_databuffer; +use netpod::timeunits::*; +use netpod::ByteOrder; +use netpod::ByteSize; +use netpod::Channel; +use netpod::ChannelConfig; +use netpod::Node; +use netpod::ScalarType; +use netpod::SfDatabuffer; +use netpod::Shape; +use netpod::TsNano; use streams::eventchunker::EventChunkerConf; -#[allow(unused_imports)] -use tracing::{debug, error, info, trace, warn}; pub fn make_test_node(id: u32) -> Node { Node { @@ -43,7 +51,7 @@ async fn agg_x_dim_0_inner() { series: None, }, keyspace: 2, - time_bin_size: Nanos { ns: DAY }, + time_bin_size: TsNano(DAY), array: false, shape: Shape::Scalar, scalar_type: ScalarType::F64, @@ -55,7 +63,7 @@ async fn agg_x_dim_0_inner() { buffer_size: 1024 * 4, }; let _bin_count = 20; - let ts1 = query.timebin as u64 * query.channel_config.time_bin_size.ns; + let ts1 = query.timebin as u64 * query.channel_config.time_bin_size.0; let ts2 = ts1 + HOUR * 24; let range = NanoRange { beg: ts1, end: ts2 }; let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); @@ -100,7 +108,7 @@ async fn agg_x_dim_1_inner() { series: None, }, keyspace: 3, - time_bin_size: Nanos { ns: DAY }, + time_bin_size: TsNano(DAY), array: true, shape: Shape::Wave(1024), scalar_type: ScalarType::F64, @@ -112,7 +120,7 @@ async fn agg_x_dim_1_inner() { buffer_size: 17, }; let _bin_count = 10; - let ts1 = query.timebin as u64 * query.channel_config.time_bin_size.ns; + let ts1 = query.timebin as u64 * query.channel_config.time_bin_size.0; let ts2 = ts1 + HOUR * 24; let range = NanoRange { beg: ts1, end: ts2 }; let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); diff --git a/disk/src/channelconfig.rs b/disk/src/channelconfig.rs index 82acd49..cc5f612 100644 --- a/disk/src/channelconfig.rs +++ b/disk/src/channelconfig.rs @@ -1,7 +1,7 @@ use err::Error; +use netpod::range::evrange::NanoRange; use netpod::Channel; use netpod::ChannelConfig; -use netpod::NanoRange; use netpod::NodeConfigCached; use parse::channelconfig::extract_matching_config_entry; use parse::channelconfig::read_local_config; @@ -29,7 +29,7 @@ pub async fn config( let channel_config = ChannelConfig { channel: channel.clone(), keyspace: entry.ks as u8, - time_bin_size: entry.bs, + time_bin_size: entry.bs.clone(), shape: shape, scalar_type: entry.scalar_type.clone(), byte_order: entry.byte_order.clone(), diff --git a/disk/src/dataopen.rs b/disk/src/dataopen.rs index b6c6f1e..74d8f42 100644 --- a/disk/src/dataopen.rs +++ b/disk/src/dataopen.rs @@ -1,14 +1,22 @@ use super::paths; use bytes::BytesMut; -use err::{ErrStr, Error}; +use err::ErrStr; +use err::Error; use futures_util::StreamExt; use netpod::log::*; -use netpod::{ChannelConfig, NanoRange, Nanos, Node}; +use netpod::range::evrange::NanoRange; +use netpod::ChannelConfig; +use netpod::Node; +use netpod::TsNano; use std::fmt; use std::path::PathBuf; use std::time::Instant; -use tokio::fs::{File, OpenOptions}; -use tokio::io::{AsyncReadExt, AsyncSeekExt, ErrorKind, SeekFrom}; +use tokio::fs::File; +use tokio::fs::OpenOptions; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncSeekExt; +use tokio::io::ErrorKind; +use tokio::io::SeekFrom; pub struct Positioned { pub file: OpenedFile, @@ -237,13 +245,11 @@ async fn open_files_inner( return Ok(()); } for &tb in &timebins { - let ts_bin = Nanos { - ns: tb * channel_config.time_bin_size.ns, - }; - if ts_bin.ns >= range.end { + let ts_bin = TsNano(tb * channel_config.time_bin_size.0); + if ts_bin.ns() >= range.end { continue; } - if ts_bin.ns + channel_config.time_bin_size.ns <= range.beg { + if ts_bin.ns() + channel_config.time_bin_size.ns() <= range.beg { continue; } let mut a = Vec::new(); @@ -337,10 +343,8 @@ async fn open_expanded_files_inner( } let mut p1 = None; for (i1, tb) in timebins.iter().enumerate().rev() { - let ts_bin = Nanos { - ns: tb * channel_config.time_bin_size.ns, - }; - if ts_bin.ns <= range.beg { + let ts_bin = TsNano(tb * channel_config.time_bin_size.ns()); + if ts_bin.ns() <= range.beg { p1 = Some(i1); break; } @@ -407,8 +411,10 @@ async fn open_expanded_files_inner( mod test { use super::*; use err::Error; + use netpod::range::evrange::NanoRange; + use netpod::test_data_base_path_databuffer; use netpod::timeunits::*; - use netpod::{test_data_base_path_databuffer, ChannelConfig, NanoRange, Nanos}; + use netpod::ChannelConfig; use std::path::PathBuf; use tokio::fs::OpenOptions; @@ -810,7 +816,7 @@ mod test { #[test] fn expanded_file_list() { - let range = netpod::NanoRange { + let range = NanoRange { beg: DAY + HOUR * 5, end: DAY + HOUR * 8, }; @@ -823,7 +829,7 @@ mod test { let channel_config = ChannelConfig { channel: chn, keyspace: 2, - time_bin_size: Nanos { ns: DAY }, + time_bin_size: TsNano(DAY), scalar_type: netpod::ScalarType::I32, byte_order: netpod::ByteOrder::Big, shape: netpod::Shape::Scalar, diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index e557588..23776dd 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -11,10 +11,10 @@ use items_0::streamitem::StreamItem; use items_2::eventfull::EventFull; use items_2::merger::Merger; use netpod::log::*; +use netpod::range::evrange::NanoRange; use netpod::timeunits::SEC; use netpod::ChannelConfig; use netpod::DiskIoTune; -use netpod::NanoRange; use netpod::Node; use std::pin::Pin; use std::task::Context; @@ -271,16 +271,17 @@ mod test { use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::StreamItem; use netpod::log::*; + use netpod::range::evrange::NanoRange; use netpod::timeunits::DAY; use netpod::timeunits::MS; use netpod::ByteSize; use netpod::ChannelConfig; use netpod::DiskIoTune; - use netpod::Nanos; + use netpod::TsNano; use streams::eventchunker::EventChunkerConf; use streams::rangefilter2::RangeFilter2; - fn read_expanded_for_range(range: netpod::NanoRange, nodeix: usize) -> Result<(usize, Vec), Error> { + fn read_expanded_for_range(range: NanoRange, nodeix: usize) -> Result<(usize, Vec), Error> { let chn = netpod::Channel { backend: "test-disk-databuffer".into(), name: "scalar-i32-be".into(), @@ -290,7 +291,7 @@ mod test { let channel_config = ChannelConfig { channel: chn, keyspace: 2, - time_bin_size: Nanos { ns: DAY }, + time_bin_size: TsNano(DAY), scalar_type: netpod::ScalarType::I32, byte_order: netpod::ByteOrder::Big, shape: netpod::Shape::Scalar, @@ -346,7 +347,7 @@ mod test { #[test] fn read_expanded_0() -> Result<(), Error> { - let range = netpod::NanoRange { + let range = NanoRange { beg: DAY + MS * 0, end: DAY + MS * 100, }; @@ -362,7 +363,7 @@ mod test { #[test] fn read_expanded_1() -> Result<(), Error> { - let range = netpod::NanoRange { + let range = NanoRange { beg: DAY + MS * 0, end: DAY + MS * 1501, }; @@ -376,7 +377,7 @@ mod test { #[test] fn read_expanded_2() -> Result<(), Error> { - let range = netpod::NanoRange { + let range = NanoRange { beg: DAY - MS * 100, end: DAY + MS * 1501, }; @@ -388,7 +389,7 @@ mod test { #[test] fn read_expanded_3() -> Result<(), Error> { use netpod::timeunits::*; - let range = netpod::NanoRange { + let range = NanoRange { beg: DAY - MS * 1500, end: DAY + MS * 1501, }; diff --git a/disk/src/gen.rs b/disk/src/gen.rs index 3c4e6ff..5f718d9 100644 --- a/disk/src/gen.rs +++ b/disk/src/gen.rs @@ -1,12 +1,23 @@ use crate::ChannelConfigExt; use bitshuffle::bitshuffle_compress; -use bytes::{BufMut, BytesMut}; +use bytes::BufMut; +use bytes::BytesMut; use err::Error; use netpod::log::*; use netpod::timeunits::*; -use netpod::{ByteOrder, Channel, ChannelConfig, GenVar, Nanos, Node, ScalarType, SfDatabuffer, Shape}; -use std::path::{Path, PathBuf}; -use tokio::fs::{File, OpenOptions}; +use netpod::ByteOrder; +use netpod::Channel; +use netpod::ChannelConfig; +use netpod::GenVar; +use netpod::Node; +use netpod::ScalarType; +use netpod::SfDatabuffer; +use netpod::Shape; +use netpod::TsNano; +use std::path::Path; +use std::path::PathBuf; +use tokio::fs::File; +use tokio::fs::OpenOptions; use tokio::io::AsyncWriteExt; pub async fn gen_test_data() -> Result<(), Error> { @@ -27,7 +38,7 @@ pub async fn gen_test_data() -> Result<(), Error> { series: None, }, keyspace: 2, - time_bin_size: Nanos { ns: DAY }, + time_bin_size: TsNano(DAY), scalar_type: ScalarType::I32, byte_order: ByteOrder::Big, shape: Shape::Scalar, @@ -46,7 +57,7 @@ pub async fn gen_test_data() -> Result<(), Error> { series: None, }, keyspace: 3, - time_bin_size: Nanos { ns: DAY }, + time_bin_size: TsNano(DAY), array: true, scalar_type: ScalarType::F64, shape: Shape::Wave(21), @@ -65,7 +76,7 @@ pub async fn gen_test_data() -> Result<(), Error> { series: None, }, keyspace: 3, - time_bin_size: Nanos { ns: DAY }, + time_bin_size: TsNano(DAY), scalar_type: ScalarType::U16, byte_order: ByteOrder::Little, shape: Shape::Wave(77), @@ -84,7 +95,7 @@ pub async fn gen_test_data() -> Result<(), Error> { series: None, }, keyspace: 2, - time_bin_size: Nanos { ns: DAY }, + time_bin_size: TsNano(DAY), scalar_type: ScalarType::I32, byte_order: ByteOrder::Little, shape: Shape::Scalar, @@ -103,7 +114,7 @@ pub async fn gen_test_data() -> Result<(), Error> { series: None, }, keyspace: 2, - time_bin_size: Nanos { ns: DAY }, + time_bin_size: TsNano(DAY), scalar_type: ScalarType::I32, byte_order: ByteOrder::Little, shape: Shape::Scalar, @@ -170,9 +181,9 @@ async fn gen_channel(chn: &ChannelGenProps, split: u32, node: &Node, ensemble: & .await .map_err(|k| Error::with_msg(format!("can not generate config {:?}", k)))?; let mut evix = 0; - let mut ts = Nanos { ns: 0 }; + let mut ts = TsNano(0); let mut pulse = 0; - while ts.ns < DAY * 3 { + while ts.ns() < DAY * 3 { let res = gen_timebin( evix, ts, @@ -187,7 +198,7 @@ async fn gen_channel(chn: &ChannelGenProps, split: u32, node: &Node, ensemble: & ) .await?; evix = res.evix; - ts.ns = res.ts.ns; + ts = res.ts; pulse = res.pulse; } Ok(()) @@ -231,7 +242,7 @@ async fn gen_config( buf.put_i64(ts); buf.put_i64(pulse); buf.put_u32(config.keyspace as u32); - buf.put_u64(config.time_bin_size.ns / MS); + buf.put_u64(config.time_bin_size.ns() / MS); buf.put_i32(sc); buf.put_i32(status); buf.put_i8(bb); @@ -316,13 +327,13 @@ impl CountedFile { struct GenTimebinRes { evix: u64, - ts: Nanos, + ts: TsNano, pulse: u64, } async fn gen_timebin( evix: u64, - ts: Nanos, + ts: TsNano, pulse: u64, ts_spacing: u64, channel_path: &Path, @@ -332,11 +343,11 @@ async fn gen_timebin( ensemble: &Ensemble, gen_var: &GenVar, ) -> Result { - let tb = ts.ns / config.time_bin_size.ns; + let tb = ts.ns() / config.time_bin_size.ns(); let path = channel_path.join(format!("{:019}", tb)).join(format!("{:010}", split)); tokio::fs::create_dir_all(&path).await?; - let data_path = path.join(format!("{:019}_{:05}_Data", config.time_bin_size.ns / MS, 0)); - let index_path = path.join(format!("{:019}_{:05}_Data_Index", config.time_bin_size.ns / MS, 0)); + let data_path = path.join(format!("{:019}_{:05}_Data", config.time_bin_size.ns() / MS, 0)); + let index_path = path.join(format!("{:019}_{:05}_Data_Index", config.time_bin_size.ns() / MS, 0)); info!("open data file {:?}", data_path); let file = OpenOptions::new() .write(true) @@ -363,34 +374,32 @@ async fn gen_timebin( let mut evix = evix; let mut ts = ts; let mut pulse = pulse; - let tsmax = Nanos { - ns: (tb + 1) * config.time_bin_size.ns, - }; - while ts.ns < tsmax.ns { + let tsmax = TsNano((tb + 1) * config.time_bin_size.ns()); + while ts.ns() < tsmax.ns() { match gen_var { // TODO // Splits and nodes are not in 1-to-1 correspondence. GenVar::Default => { if evix % ensemble.nodes.len() as u64 == split as u64 { - gen_event(&mut file, index_file.as_mut(), evix, ts, pulse, config, gen_var).await?; + gen_event(&mut file, index_file.as_mut(), evix, ts.clone(), pulse, config, gen_var).await?; } } GenVar::ConstRegular => { if evix % ensemble.nodes.len() as u64 == split as u64 { - gen_event(&mut file, index_file.as_mut(), evix, ts, pulse, config, gen_var).await?; + gen_event(&mut file, index_file.as_mut(), evix, ts.clone(), pulse, config, gen_var).await?; } } GenVar::TimeWeight => { let m = evix % 20; if m == 0 || m == 1 { if evix % ensemble.nodes.len() as u64 == split as u64 { - gen_event(&mut file, index_file.as_mut(), evix, ts, pulse, config, gen_var).await?; + gen_event(&mut file, index_file.as_mut(), evix, ts.clone(), pulse, config, gen_var).await?; } } } } evix += 1; - ts.ns += ts_spacing; + ts.0 += ts_spacing; pulse += 1; } let ret = GenTimebinRes { evix, ts, pulse }; @@ -413,7 +422,7 @@ async fn gen_event( file: &mut CountedFile, index_file: Option<&mut CountedFile>, evix: u64, - ts: Nanos, + ts: TsNano, pulse: u64, config: &ChannelConfig, gen_var: &GenVar, @@ -423,7 +432,7 @@ async fn gen_event( let mut buf = BytesMut::with_capacity(1024 * 16); buf.put_i32(0xcafecafe as u32 as i32); buf.put_u64(ttl); - buf.put_u64(ts.ns); + buf.put_u64(ts.ns()); buf.put_u64(pulse); buf.put_u64(ioc_ts); buf.put_u8(0); @@ -541,7 +550,7 @@ async fn gen_event( file.write_all(buf.as_ref()).await?; if let Some(f) = index_file { let mut buf = BytesMut::with_capacity(16); - buf.put_u64(ts.ns); + buf.put_u64(ts.ns()); buf.put_u64(z); f.write_all(&buf).await?; } diff --git a/disk/src/index.rs b/disk/src/index.rs index 04c555b..39f1f03 100644 --- a/disk/src/index.rs +++ b/disk/src/index.rs @@ -1,11 +1,13 @@ use arrayref::array_ref; use err::Error; use netpod::log::*; -use netpod::NanoRange; -use netpod::Nanos; +use netpod::range::evrange::NanoRange; +use netpod::TsNano; use std::mem::size_of; use tokio::fs::File; -use tokio::io::{AsyncReadExt, AsyncSeekExt, SeekFrom}; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncSeekExt; +use tokio::io::SeekFrom; pub fn find_ge(range: NanoRange, expand_right: bool, buf: &[u8]) -> Result, Error> { type VT = u64; @@ -174,7 +176,7 @@ pub fn parse_channel_header(buf: &[u8]) -> Result<(u32,), Error> { Ok((len1 as u32,)) } -pub fn parse_event(buf: &[u8]) -> Result<(u32, Nanos), Error> { +pub fn parse_event(buf: &[u8]) -> Result<(u32, TsNano), Error> { if buf.len() < 4 { return Err(Error::with_msg(format!("parse_event buf len: {}", buf.len()))); } @@ -194,10 +196,10 @@ pub fn parse_event(buf: &[u8]) -> Result<(u32, Nanos), Error> { return Err(Error::with_msg(format!("len mismatch len1: {} len2: {}", len1, len2))); } let ts = u64::from_be_bytes(*array_ref![buf, 12, 8]); - Ok((len1 as u32, Nanos { ns: ts })) + Ok((len1 as u32, TsNano(ts))) } -pub async fn read_event_at(pos: u64, file: &mut File) -> Result<(u32, Nanos), Error> { +pub async fn read_event_at(pos: u64, file: &mut File) -> Result<(u32, TsNano), Error> { file.seek(SeekFrom::Start(pos)).await?; let mut buf = vec![0; 1024]; let _n1 = read(&mut buf, file).await?; @@ -220,7 +222,7 @@ pub async fn position_static_len_datafile( let evlen = ev.0 as u64; let mut j = headoff; let mut k = ((flen - headoff) / evlen - 1) * evlen + headoff; - let x = ev.1.ns; + let x = ev.1.ns(); let t = read_event_at(k, &mut file).await?; if t.0 != evlen as u32 { Err(Error::with_msg(format!( @@ -228,7 +230,7 @@ pub async fn position_static_len_datafile( t.0, evlen )))?; } - let y = t.1.ns; + let y = t.1.ns(); let mut nreads = 2; if x >= range.end { if expand_right { @@ -275,7 +277,7 @@ pub async fn position_static_len_datafile( )))?; } nreads += 1; - let e = t.1.ns; + let e = t.1.ns(); if e < range.beg { x = e; j = m; @@ -301,7 +303,7 @@ pub async fn position_static_len_datafile_at_largest_smaller_than( let evlen = ev.0 as u64; let mut j = headoff; let mut k = ((flen - headoff) / evlen - 1) * evlen + headoff; - let x = ev.1.ns; + let x = ev.1.ns(); let t = read_event_at(k, &mut file).await?; if t.0 != evlen as u32 { Err(Error::with_msg(format!( @@ -309,7 +311,7 @@ pub async fn position_static_len_datafile_at_largest_smaller_than( t.0, evlen )))?; } - let y = t.1.ns; + let y = t.1.ns(); let mut nreads = 2; if x >= range.beg { file.seek(SeekFrom::Start(j)).await?; @@ -333,7 +335,7 @@ pub async fn position_static_len_datafile_at_largest_smaller_than( )))?; } nreads += 1; - let x = t.1.ns; + let x = t.1.ns(); if x < range.beg { j = m; } else { diff --git a/disk/src/paths.rs b/disk/src/paths.rs index 5be86a7..68790c0 100644 --- a/disk/src/paths.rs +++ b/disk/src/paths.rs @@ -1,7 +1,9 @@ use err::Error; use futures_util::StreamExt; use netpod::timeunits::MS; -use netpod::{ChannelConfig, Nanos, Node}; +use netpod::ChannelConfig; +use netpod::Node; +use netpod::TsNano; use std::path::PathBuf; // TODO remove/replace this @@ -19,7 +21,7 @@ pub fn datapath(timebin: u64, config: &netpod::ChannelConfig, split: u32, node: .join(config.channel.name.clone()) .join(format!("{:019}", timebin)) .join(format!("{:010}", split)) - .join(format!("{:019}_00000_Data", config.time_bin_size.ns / MS)) + .join(format!("{:019}_00000_Data", config.time_bin_size.ns() / MS)) } /** @@ -76,7 +78,7 @@ pub async fn datapaths_for_timebin( .join(config.channel.name.clone()) .join(format!("{:019}", timebin)) .join(format!("{:010}", split)) - .join(format!("{:019}_00000_Data", config.time_bin_size.ns / MS)); + .join(format!("{:019}_00000_Data", config.time_bin_size.ns() / MS)); ret.push(path); } Ok(ret) @@ -92,21 +94,21 @@ pub fn channel_timebins_dir_path(channel_config: &ChannelConfig, node: &Node) -> Ok(ret) } -pub fn data_dir_path(ts: Nanos, channel_config: &ChannelConfig, split: u32, node: &Node) -> Result { +pub fn data_dir_path(ts: TsNano, channel_config: &ChannelConfig, split: u32, node: &Node) -> Result { let ret = channel_timebins_dir_path(channel_config, node)? - .join(format!("{:019}", ts.ns / channel_config.time_bin_size.ns)) + .join(format!("{:019}", ts.ns() / channel_config.time_bin_size.ns())) .join(format!("{:010}", split)); Ok(ret) } -pub fn data_path(ts: Nanos, channel_config: &ChannelConfig, split: u32, node: &Node) -> Result { - let fname = format!("{:019}_{:05}_Data", channel_config.time_bin_size.ns / MS, 0); +pub fn data_path(ts: TsNano, channel_config: &ChannelConfig, split: u32, node: &Node) -> Result { + let fname = format!("{:019}_{:05}_Data", channel_config.time_bin_size.ns() / MS, 0); let ret = data_dir_path(ts, channel_config, split, node)?.join(fname); Ok(ret) } -pub fn index_path(ts: Nanos, channel_config: &ChannelConfig, split: u32, node: &Node) -> Result { - let fname = format!("{:019}_{:05}_Data_Index", channel_config.time_bin_size.ns / MS, 0); +pub fn index_path(ts: TsNano, channel_config: &ChannelConfig, split: u32, node: &Node) -> Result { + let fname = format!("{:019}_{:05}_Data_Index", channel_config.time_bin_size.ns() / MS, 0); let ret = data_dir_path(ts, channel_config, split, node)?.join(fname); Ok(ret) } diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index 0b2b8c4..6fff6d9 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -10,11 +10,11 @@ use items_2::channelevents::ChannelEvents; use items_2::eventfull::EventFull; use netpod::log::*; use netpod::query::PlainEventsQuery; +use netpod::range::evrange::NanoRange; use netpod::AggKind; use netpod::ByteSize; use netpod::Channel; use netpod::DiskIoTune; -use netpod::NanoRange; use netpod::NodeConfigCached; use netpod::ScalarType; use netpod::Shape; @@ -92,7 +92,7 @@ pub async fn make_event_pipe( let channel_config = netpod::ChannelConfig { channel: evq.channel().clone(), keyspace: entry.ks as u8, - time_bin_size: entry.bs, + time_bin_size: entry.bs.clone(), shape, scalar_type: entry.scalar_type.clone(), byte_order: entry.byte_order.clone(), @@ -172,8 +172,8 @@ pub fn make_local_event_blobs_stream( let channel_config = netpod::ChannelConfig { channel, keyspace: entry.ks as u8, - time_bin_size: entry.bs, - shape: shape, + time_bin_size: entry.bs.clone(), + shape, scalar_type: entry.scalar_type.clone(), byte_order: entry.byte_order.clone(), array: entry.is_array, @@ -218,7 +218,7 @@ pub fn make_remote_event_blobs_stream( let channel_config = netpod::ChannelConfig { channel, keyspace: entry.ks as u8, - time_bin_size: entry.bs, + time_bin_size: entry.bs.clone(), shape: shape, scalar_type: entry.scalar_type.clone(), byte_order: entry.byte_order.clone(), diff --git a/dq/src/bin/dq.rs b/dq/src/bin/dq.rs index f2a3158..7f2f214 100644 --- a/dq/src/bin/dq.rs +++ b/dq/src/bin/dq.rs @@ -3,11 +3,11 @@ use clap::Parser; use err::Error; #[allow(unused)] use netpod::log::*; +use netpod::range::evrange::NanoRange; use netpod::ByteOrder; use netpod::ByteSize; use netpod::Channel; use netpod::ChannelConfig; -use netpod::NanoRange; use netpod::Shape; use std::path::PathBuf; use streams::eventchunker::EventChunker; @@ -87,7 +87,7 @@ pub fn main() -> Result<(), Error> { series: None, }, keyspace: ce.ks as u8, - time_bin_size: ce.bs, + time_bin_size: ce.bs.clone(), scalar_type: ce.scalar_type.clone(), compression: false, shape: Shape::Scalar, diff --git a/httpret/src/api1.rs b/httpret/src/api1.rs index 0a2caed..8546e6f 100644 --- a/httpret/src/api1.rs +++ b/httpret/src/api1.rs @@ -27,13 +27,13 @@ use itertools::Itertools; use netpod::log::*; use netpod::query::api1::Api1Query; use netpod::query::PlainEventsQuery; +use netpod::range::evrange::NanoRange; use netpod::timeunits::SEC; use netpod::ByteSize; use netpod::Channel; use netpod::ChannelSearchQuery; use netpod::ChannelSearchResult; use netpod::DiskIoTune; -use netpod::NanoRange; use netpod::NodeConfigCached; use netpod::PerfOpts; use netpod::ProxyConfig; diff --git a/items/src/lib.rs b/items/src/lib.rs index 4d7b66b..5168a83 100644 --- a/items/src/lib.rs +++ b/items/src/lib.rs @@ -1,7 +1,7 @@ use err::Error; use items_0::streamitem::RangeCompletableItem; use items_0::streamitem::StreamItem; -use netpod::NanoRange; +use netpod::range::evrange::NanoRange; use std::future::Future; use std::marker::PhantomData; use std::pin::Pin; @@ -60,7 +60,7 @@ where Self { // TODO make buffer size a parameter: buf: vec![0; 1024 * 32], - all: vec![], + all: Vec::new(), file: Some(file), _m1: PhantomData, } diff --git a/items_0/src/collect_s.rs b/items_0/src/collect_s.rs index 43cdec4..c337857 100644 --- a/items_0/src/collect_s.rs +++ b/items_0/src/collect_s.rs @@ -4,8 +4,8 @@ use crate::Events; use crate::WithLen; use err::Error; use netpod::log::*; +use netpod::range::evrange::SeriesRange; use netpod::BinnedRangeEnum; -use netpod::SeriesRange; use serde::Serialize; use std::any::Any; use std::fmt; diff --git a/items_0/src/items_0.rs b/items_0/src/items_0.rs index 7400e21..b0e50e1 100644 --- a/items_0/src/items_0.rs +++ b/items_0/src/items_0.rs @@ -12,10 +12,8 @@ pub mod bincode { use collect_s::Collectable; use collect_s::ToJsonResult; +use netpod::range::evrange::SeriesRange; use netpod::BinnedRangeEnum; -use netpod::ScalarType; -use netpod::SeriesRange; -use netpod::Shape; use std::any::Any; use std::collections::VecDeque; use std::fmt; @@ -31,30 +29,12 @@ pub trait TimeBins { fn ts_min_max(&self) -> Option<(u64, u64)>; } -pub enum Fits { - Empty, - Lower, - Greater, - Inside, - PartlyLower, - PartlyGreater, - PartlyLowerAndGreater, -} - pub trait RangeOverlapInfo { fn ends_before(&self, range: &SeriesRange) -> bool; fn ends_after(&self, range: &SeriesRange) -> bool; fn starts_after(&self, range: &SeriesRange) -> bool; } -pub trait EmptyForScalarTypeShape { - fn empty(scalar_type: ScalarType, shape: Shape) -> Self; -} - -pub trait EmptyForShape { - fn empty(shape: Shape) -> Self; -} - pub trait Empty { fn empty() -> Self; } diff --git a/items_2/src/binsdim0.rs b/items_2/src/binsdim0.rs index 3d65ce0..12e3649 100644 --- a/items_2/src/binsdim0.rs +++ b/items_2/src/binsdim0.rs @@ -19,10 +19,10 @@ use items_0::TimeBinner; use items_0::TimeBins; use items_0::WithLen; use netpod::log::*; +use netpod::range::evrange::SeriesRange; use netpod::timeunits::SEC; use netpod::BinnedRangeEnum; use netpod::Dim0Kind; -use netpod::SeriesRange; use serde::Deserialize; use serde::Serialize; use std::any; diff --git a/items_2/src/binsxbindim0.rs b/items_2/src/binsxbindim0.rs index a36de9a..b59bb16 100644 --- a/items_2/src/binsxbindim0.rs +++ b/items_2/src/binsxbindim0.rs @@ -22,11 +22,11 @@ use items_0::TimeBinner; use items_0::TimeBins; use items_0::WithLen; use netpod::log::*; +use netpod::range::evrange::NanoRange; +use netpod::range::evrange::SeriesRange; use netpod::timeunits::SEC; use netpod::BinnedRangeEnum; use netpod::Dim0Kind; -use netpod::NanoRange; -use netpod::SeriesRange; use num_traits::Zero; use serde::Deserialize; use serde::Serialize; diff --git a/items_2/src/channelevents.rs b/items_2/src/channelevents.rs index b4575c2..555ad5d 100644 --- a/items_2/src/channelevents.rs +++ b/items_2/src/channelevents.rs @@ -12,10 +12,10 @@ use items_0::AsAnyRef; use items_0::MergeError; use items_0::WithLen; use netpod::log::*; +use netpod::range::evrange::NanoRange; +use netpod::range::evrange::SeriesRange; use netpod::BinnedRange; use netpod::BinnedRangeEnum; -use netpod::NanoRange; -use netpod::SeriesRange; use serde::Deserialize; use serde::Serialize; use std::any::Any; diff --git a/items_2/src/eventsdim0.rs b/items_2/src/eventsdim0.rs index e8f9325..fd7358c 100644 --- a/items_2/src/eventsdim0.rs +++ b/items_2/src/eventsdim0.rs @@ -22,11 +22,11 @@ use items_0::TimeBinnable; use items_0::TimeBinner; use items_0::WithLen; use netpod::log::*; +use netpod::range::evrange::NanoRange; +use netpod::range::evrange::SeriesRange; use netpod::timeunits::MS; use netpod::timeunits::SEC; use netpod::BinnedRangeEnum; -use netpod::NanoRange; -use netpod::SeriesRange; use serde::Deserialize; use serde::Serialize; use std::any; diff --git a/items_2/src/eventsdim1.rs b/items_2/src/eventsdim1.rs index baf11c5..f58cabf 100644 --- a/items_2/src/eventsdim1.rs +++ b/items_2/src/eventsdim1.rs @@ -22,9 +22,9 @@ use items_0::TimeBinnable; use items_0::TimeBinner; use items_0::WithLen; use netpod::log::*; +use netpod::range::evrange::SeriesRange; use netpod::timeunits::SEC; use netpod::BinnedRangeEnum; -use netpod::SeriesRange; use serde::Deserialize; use serde::Serialize; use std::any; diff --git a/items_2/src/eventsxbindim0.rs b/items_2/src/eventsxbindim0.rs index 8d07da5..c979a16 100644 --- a/items_2/src/eventsxbindim0.rs +++ b/items_2/src/eventsxbindim0.rs @@ -15,8 +15,8 @@ use items_0::AsAnyRef; use items_0::Empty; use items_0::WithLen; use netpod::log::*; +use netpod::range::evrange::SeriesRange; use netpod::BinnedRangeEnum; -use netpod::SeriesRange; use serde::Deserialize; use serde::Serialize; use std::any; diff --git a/items_2/src/items_2.rs b/items_2/src/items_2.rs index 883bf53..5a9d475 100644 --- a/items_2/src/items_2.rs +++ b/items_2/src/items_2.rs @@ -29,8 +29,9 @@ use items_0::Events; use items_0::MergeError; use items_0::RangeOverlapInfo; use merger::Mergeable; +use netpod::range::evrange::NanoRange; +use netpod::range::evrange::SeriesRange; use netpod::timeunits::*; -use netpod::SeriesRange; use serde::Deserialize; use serde::Serialize; use serde::Serializer; diff --git a/items_2/src/test.rs b/items_2/src/test.rs index f6039d5..89bb00f 100644 --- a/items_2/src/test.rs +++ b/items_2/src/test.rs @@ -28,11 +28,12 @@ use items_0::Appendable; use items_0::Empty; use items_0::WithLen; use netpod::log::*; +use netpod::range::evrange::NanoRange; +use netpod::range::evrange::SeriesRange; use netpod::timeunits::*; use netpod::AggKind; use netpod::BinnedRange; use netpod::BinnedRangeEnum; -use netpod::NanoRange; use netpod::ScalarType; use netpod::Shape; use std::time::Duration; diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index 3d8af4c..afbfcc9 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -1,6 +1,7 @@ pub mod api4; pub mod histo; pub mod query; +pub mod range; pub mod status; pub mod streamext; pub mod transform; @@ -13,6 +14,9 @@ use chrono::Utc; use err::Error; use futures_util::Stream; use futures_util::StreamExt; +use range::evrange::NanoRange; +use range::evrange::PulseRange; +use range::evrange::SeriesRange; use serde::Deserialize; use serde::Serialize; use serde_json::Value as JsVal; @@ -700,148 +704,6 @@ impl From for u64 { } } -#[derive(Clone, Debug, Serialize, Deserialize)] -pub enum TimeRange { - Time { beg: DateTime, end: DateTime }, - Pulse { beg: u64, end: u64 }, - Nano { beg: u64, end: u64 }, -} - -#[derive(Clone, Copy, PartialEq, PartialOrd, Serialize, Deserialize)] -pub struct Nanos { - pub ns: u64, -} - -impl Nanos { - pub fn from_ns(ns: u64) -> Self { - Self { ns } - } -} - -impl fmt::Debug for Nanos { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let ts = chrono::Utc.timestamp_opt((self.ns / SEC) as i64, (self.ns % SEC) as u32); - f.debug_struct("Nanos").field("ns", &ts).finish() - } -} - -#[derive(Clone, Serialize, Deserialize, PartialEq)] -pub struct NanoRange { - pub beg: u64, - pub end: u64, -} - -impl fmt::Debug for NanoRange { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let beg = chrono::Utc - .timestamp_opt((self.beg / SEC) as i64, (self.beg % SEC) as u32) - .earliest(); - let end = chrono::Utc - .timestamp_opt((self.end / SEC) as i64, (self.end % SEC) as u32) - .earliest(); - if let (Some(a), Some(b)) = (beg, end) { - f.debug_struct("NanoRange").field("beg", &a).field("end", &b).finish() - } else { - f.debug_struct("NanoRange") - .field("beg", &beg) - .field("end", &end) - .finish() - } - } -} - -impl NanoRange { - pub fn from_date_time(beg: DateTime, end: DateTime) -> Self { - Self { - beg: beg.timestamp_nanos() as u64, - end: end.timestamp_nanos() as u64, - } - } - - pub fn delta(&self) -> u64 { - self.end - self.beg - } -} - -impl TryFrom<&SeriesRange> for NanoRange { - type Error = Error; - - fn try_from(val: &SeriesRange) -> Result { - match val { - SeriesRange::TimeRange(x) => Ok(x.clone()), - SeriesRange::PulseRange(_) => Err(Error::with_msg_no_trace("not a Time range")), - } - } -} - -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -pub struct PulseRange { - pub beg: u64, - pub end: u64, -} - -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -pub enum SeriesRange { - TimeRange(NanoRange), - PulseRange(PulseRange), -} - -impl SeriesRange { - pub fn dim0kind(&self) -> Dim0Kind { - match self { - SeriesRange::TimeRange(_) => Dim0Kind::Time, - SeriesRange::PulseRange(_) => Dim0Kind::Pulse, - } - } - - pub fn is_time(&self) -> bool { - match self { - SeriesRange::TimeRange(_) => true, - SeriesRange::PulseRange(_) => false, - } - } - - pub fn is_pulse(&self) -> bool { - match self { - SeriesRange::TimeRange(_) => false, - SeriesRange::PulseRange(_) => true, - } - } - - pub fn beg_u64(&self) -> u64 { - match self { - SeriesRange::TimeRange(x) => x.beg, - SeriesRange::PulseRange(x) => x.beg, - } - } - - pub fn end_u64(&self) -> u64 { - match self { - SeriesRange::TimeRange(x) => x.beg, - SeriesRange::PulseRange(x) => x.beg, - } - } - - pub fn delta_u64(&self) -> u64 { - match self { - SeriesRange::TimeRange(x) => x.end - x.beg, - SeriesRange::PulseRange(x) => x.end - x.beg, - } - } -} - -impl From for SeriesRange { - fn from(k: NanoRange) -> Self { - Self::TimeRange(k) - } -} - -impl From for SeriesRange { - fn from(k: PulseRange) -> Self { - Self::PulseRange(k) - } -} - #[derive(Clone, Debug, Serialize, Deserialize)] pub enum ByteOrder { Little, @@ -897,7 +759,7 @@ pub enum GenVar { pub struct ChannelConfig { pub channel: Channel, pub keyspace: u8, - pub time_bin_size: Nanos, + pub time_bin_size: TsNano, pub scalar_type: ScalarType, pub compression: bool, pub shape: Shape, @@ -1159,10 +1021,6 @@ fn test_shape_serde() { assert_eq!(s, Shape::Image(12, 13)); } -pub trait HasShape { - fn shape(&self) -> Shape; -} - pub mod timeunits { pub const MU: u64 = 1000; pub const MS: u64 = MU * 1000; @@ -1199,12 +1057,35 @@ pub trait Dim0Index: Clone + fmt::Debug + PartialOrd { fn to_binned_range_enum(&self, bin_off: u64, bin_cnt: u64) -> BinnedRangeEnum; } -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, PartialOrd)] -pub struct TsNano(u64); +#[derive(Clone, Serialize, Deserialize, PartialEq, PartialOrd)] +pub struct TsNano(pub u64); #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, PartialOrd)] pub struct PulseId(u64); +impl TsNano { + pub fn from_ns(ns: u64) -> Self { + Self(ns) + } + + pub fn ns(&self) -> u64 { + self.0 + } +} + +impl fmt::Debug for TsNano { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let ts = Utc.timestamp_opt((self.0 / SEC) as i64, (self.0 % SEC) as u32); + f.debug_struct("TsNano").field("ns", &ts).finish() + } +} + +impl PulseId { + pub fn from_id(id: u64) -> Self { + Self(id) + } +} + impl Dim0Index for TsNano { fn add(&self, v: &Self) -> Self { Self(self.0 + v.0) @@ -1235,7 +1116,7 @@ impl Dim0Index for TsNano { } fn series_range(a: Self, b: Self) -> SeriesRange { - todo!() + SeriesRange::TimeRange(NanoRange { beg: a.0, end: b.0 }) } fn prebin_bin_len_opts() -> Vec { @@ -1305,7 +1186,7 @@ impl Dim0Index for PulseId { } fn series_range(a: Self, b: Self) -> SeriesRange { - todo!() + SeriesRange::PulseRange(PulseRange { beg: a.0, end: b.0 }) } fn prebin_bin_len_opts() -> Vec { @@ -1723,7 +1604,32 @@ impl BinnedRangeEnum { } pub fn range_at(&self, i: usize) -> Option { - err::todoval() + match self { + BinnedRangeEnum::Time(k) => { + if (i as u64) < k.bin_cnt { + let beg = k.bin_off + k.bin_len.0 * i as u64; + let x = SeriesRange::TimeRange(NanoRange { + beg, + end: beg + k.bin_len.0, + }); + Some(x) + } else { + None + } + } + BinnedRangeEnum::Pulse(k) => { + if (i as u64) < k.bin_cnt { + let beg = k.bin_off + k.bin_len.0 * i as u64; + let x = SeriesRange::PulseRange(PulseRange { + beg, + end: beg + k.bin_len.0, + }); + Some(x) + } else { + None + } + } + } } pub fn dim0kind(&self) -> Dim0Kind { diff --git a/netpod/src/range.rs b/netpod/src/range.rs new file mode 100644 index 0000000..e701876 --- /dev/null +++ b/netpod/src/range.rs @@ -0,0 +1 @@ +pub mod evrange; diff --git a/netpod/src/range/evrange.rs b/netpod/src/range/evrange.rs new file mode 100644 index 0000000..7ea38ad --- /dev/null +++ b/netpod/src/range/evrange.rs @@ -0,0 +1,133 @@ +use crate::timeunits::SEC; +use crate::Dim0Kind; +use chrono::DateTime; +use chrono::TimeZone; +use chrono::Utc; +use err::Error; +use serde::Deserialize; +use serde::Serialize; +use std::fmt; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum TimeRange { + Time { beg: DateTime, end: DateTime }, + Pulse { beg: u64, end: u64 }, + Nano { beg: u64, end: u64 }, +} + +#[derive(Clone, Serialize, Deserialize, PartialEq)] +pub struct NanoRange { + pub beg: u64, + pub end: u64, +} + +impl fmt::Debug for NanoRange { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let beg = chrono::Utc + .timestamp_opt((self.beg / SEC) as i64, (self.beg % SEC) as u32) + .earliest(); + let end = chrono::Utc + .timestamp_opt((self.end / SEC) as i64, (self.end % SEC) as u32) + .earliest(); + if let (Some(a), Some(b)) = (beg, end) { + f.debug_struct("NanoRange").field("beg", &a).field("end", &b).finish() + } else { + f.debug_struct("NanoRange") + .field("beg", &beg) + .field("end", &end) + .finish() + } + } +} + +impl NanoRange { + pub fn from_date_time(beg: DateTime, end: DateTime) -> Self { + Self { + beg: beg.timestamp_nanos() as u64, + end: end.timestamp_nanos() as u64, + } + } + + pub fn delta(&self) -> u64 { + self.end - self.beg + } +} + +impl TryFrom<&SeriesRange> for NanoRange { + type Error = Error; + + fn try_from(val: &SeriesRange) -> Result { + match val { + SeriesRange::TimeRange(x) => Ok(x.clone()), + SeriesRange::PulseRange(_) => Err(Error::with_msg_no_trace("not a Time range")), + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct PulseRange { + pub beg: u64, + pub end: u64, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub enum SeriesRange { + TimeRange(NanoRange), + PulseRange(PulseRange), +} + +impl SeriesRange { + pub fn dim0kind(&self) -> Dim0Kind { + match self { + SeriesRange::TimeRange(_) => Dim0Kind::Time, + SeriesRange::PulseRange(_) => Dim0Kind::Pulse, + } + } + + pub fn is_time(&self) -> bool { + match self { + SeriesRange::TimeRange(_) => true, + SeriesRange::PulseRange(_) => false, + } + } + + pub fn is_pulse(&self) -> bool { + match self { + SeriesRange::TimeRange(_) => false, + SeriesRange::PulseRange(_) => true, + } + } + + pub fn beg_u64(&self) -> u64 { + match self { + SeriesRange::TimeRange(x) => x.beg, + SeriesRange::PulseRange(x) => x.beg, + } + } + + pub fn end_u64(&self) -> u64 { + match self { + SeriesRange::TimeRange(x) => x.beg, + SeriesRange::PulseRange(x) => x.beg, + } + } + + pub fn delta_u64(&self) -> u64 { + match self { + SeriesRange::TimeRange(x) => x.end - x.beg, + SeriesRange::PulseRange(x) => x.end - x.beg, + } + } +} + +impl From for SeriesRange { + fn from(k: NanoRange) -> Self { + Self::TimeRange(k) + } +} + +impl From for SeriesRange { + fn from(k: PulseRange) -> Self { + Self::PulseRange(k) + } +} diff --git a/nodenet/src/channelconfig.rs b/nodenet/src/channelconfig.rs index 25a3991..fae087c 100644 --- a/nodenet/src/channelconfig.rs +++ b/nodenet/src/channelconfig.rs @@ -1,8 +1,8 @@ use err::Error; use netpod::log::*; +use netpod::range::evrange::NanoRange; use netpod::ChConf; use netpod::Channel; -use netpod::NanoRange; use netpod::NodeConfigCached; use netpod::ScalarType; use netpod::Shape; diff --git a/nodenet/src/conn/test.rs b/nodenet/src/conn/test.rs index 17e830a..6fa919d 100644 --- a/nodenet/src/conn/test.rs +++ b/nodenet/src/conn/test.rs @@ -12,13 +12,13 @@ use items_2::framable::EventQueryJsonStringFrame; use items_2::frame::decode_frame; use items_2::frame::make_frame; use netpod::query::PlainEventsQuery; +use netpod::range::evrange::NanoRange; use netpod::timeunits::SEC; use netpod::AggKind; use netpod::Channel; use netpod::Cluster; use netpod::Database; use netpod::FileIoBufferSize; -use netpod::NanoRange; use netpod::Node; use netpod::NodeConfig; use netpod::NodeConfigCached; diff --git a/parse/src/channelconfig.rs b/parse/src/channelconfig.rs index 10a110b..d523805 100644 --- a/parse/src/channelconfig.rs +++ b/parse/src/channelconfig.rs @@ -1,13 +1,26 @@ use err::Error; +use netpod::range::evrange::NanoRange; use netpod::timeunits::MS; -use netpod::{ByteOrder, Channel, NanoRange, Nanos, Node, ScalarType, Shape}; -use netpod::{ChannelConfigQuery, ChannelConfigResponse}; +use netpod::ByteOrder; +use netpod::Channel; +use netpod::ChannelConfigQuery; +use netpod::ChannelConfigResponse; +use netpod::Node; +use netpod::ScalarType; +use netpod::Shape; +use netpod::TsNano; use nom::bytes::complete::take; -use nom::number::complete::{be_i16, be_i32, be_i64, be_i8, be_u8}; +use nom::number::complete::be_i16; +use nom::number::complete::be_i32; +use nom::number::complete::be_i64; +use nom::number::complete::be_i8; +use nom::number::complete::be_u8; use nom::Needed; -use num_derive::{FromPrimitive, ToPrimitive}; +use num_derive::FromPrimitive; +use num_derive::ToPrimitive; use num_traits::ToPrimitive; -use serde::{Deserialize, Serialize}; +use serde::Deserialize; +use serde::Serialize; use std::fmt; use tokio::io::ErrorKind; @@ -70,7 +83,7 @@ pub struct ConfigEntry { pub ts: u64, pub pulse: i64, pub ks: i32, - pub bs: Nanos, + pub bs: TsNano, pub split_count: i32, pub status: i32, pub bb: i8, @@ -158,7 +171,7 @@ pub fn parse_entry(inp: &[u8]) -> NRes> { let (inp, pulse) = be_i64(inp)?; let (inp, ks) = be_i32(inp)?; let (inp, bs) = be_i64(inp)?; - let bs = Nanos { ns: bs as u64 * MS }; + let bs = TsNano(bs as u64 * MS); let (inp, split_count) = be_i32(inp)?; let (inp, status) = be_i32(inp)?; let (inp, bb) = be_i8(inp)?; diff --git a/scyllaconn/src/scyllaconn.rs b/scyllaconn/src/scyllaconn.rs index 5802fba..f4d9d98 100644 --- a/scyllaconn/src/scyllaconn.rs +++ b/scyllaconn/src/scyllaconn.rs @@ -6,8 +6,9 @@ pub mod status; use err::Error; use errconv::ErrConv; +use netpod::range::evrange::NanoRange; +use netpod::range::evrange::SeriesRange; use netpod::ScyllaConfig; -use netpod::SeriesRange; use scylla::statement::Consistency; use scylla::Session as ScySession; use std::sync::Arc; diff --git a/scyllaconn/src/status.rs b/scyllaconn/src/status.rs index f53afac..a184a57 100644 --- a/scyllaconn/src/status.rs +++ b/scyllaconn/src/status.rs @@ -6,7 +6,8 @@ use futures_util::Stream; use items_2::channelevents::ChannelStatus; use items_2::channelevents::ChannelStatusEvent; use netpod::log::*; -use netpod::NanoRange; +use netpod::range::evrange::NanoRange; +use netpod::range::evrange::SeriesRange; use netpod::CONNECTION_STATUS_DIV; use scylla::Session as ScySession; use std::collections::VecDeque; diff --git a/streams/src/collect.rs b/streams/src/collect.rs index 21c0efa..5b1115f 100644 --- a/streams/src/collect.rs +++ b/streams/src/collect.rs @@ -10,9 +10,9 @@ use items_0::streamitem::StatsItem; use items_0::streamitem::StreamItem; use items_0::WithLen; use netpod::log::*; +use netpod::range::evrange::SeriesRange; use netpod::BinnedRangeEnum; use netpod::DiskStats; -use netpod::SeriesRange; use std::fmt; use std::time::Duration; use std::time::Instant; diff --git a/streams/src/eventchunker.rs b/streams/src/eventchunker.rs index f931692..61f1b58 100644 --- a/streams/src/eventchunker.rs +++ b/streams/src/eventchunker.rs @@ -14,11 +14,11 @@ use items_0::WithLen; use items_2::eventfull::EventFull; use netpod::histo::HistoLog2; use netpod::log::*; +use netpod::range::evrange::NanoRange; use netpod::timeunits::SEC; use netpod::ByteSize; use netpod::ChannelConfig; use netpod::EventDataReadStats; -use netpod::NanoRange; use netpod::ScalarType; use netpod::Shape; use parse::channelconfig::CompressionMethod; diff --git a/streams/src/rangefilter2.rs b/streams/src/rangefilter2.rs index 288f74a..71662b8 100644 --- a/streams/src/rangefilter2.rs +++ b/streams/src/rangefilter2.rs @@ -8,7 +8,7 @@ use items_0::streamitem::StreamItem; use items_0::MergeError; use items_2::merger::Mergeable; use netpod::log::*; -use netpod::NanoRange; +use netpod::range::evrange::NanoRange; use netpod::RangeFilterStats; use std::fmt; use std::pin::Pin;