WIP refactor
This commit is contained in:
@@ -1,9 +1,17 @@
|
||||
use crate::eventblobs::EventChunkerMultifile;
|
||||
use netpod::{test_data_base_path_databuffer, timeunits::*, SfDatabuffer};
|
||||
use netpod::{ByteOrder, ByteSize, Channel, ChannelConfig, NanoRange, Nanos, Node, ScalarType, Shape};
|
||||
use netpod::range::evrange::NanoRange;
|
||||
use netpod::test_data_base_path_databuffer;
|
||||
use netpod::timeunits::*;
|
||||
use netpod::ByteOrder;
|
||||
use netpod::ByteSize;
|
||||
use netpod::Channel;
|
||||
use netpod::ChannelConfig;
|
||||
use netpod::Node;
|
||||
use netpod::ScalarType;
|
||||
use netpod::SfDatabuffer;
|
||||
use netpod::Shape;
|
||||
use netpod::TsNano;
|
||||
use streams::eventchunker::EventChunkerConf;
|
||||
#[allow(unused_imports)]
|
||||
use tracing::{debug, error, info, trace, warn};
|
||||
|
||||
pub fn make_test_node(id: u32) -> Node {
|
||||
Node {
|
||||
@@ -43,7 +51,7 @@ async fn agg_x_dim_0_inner() {
|
||||
series: None,
|
||||
},
|
||||
keyspace: 2,
|
||||
time_bin_size: Nanos { ns: DAY },
|
||||
time_bin_size: TsNano(DAY),
|
||||
array: false,
|
||||
shape: Shape::Scalar,
|
||||
scalar_type: ScalarType::F64,
|
||||
@@ -55,7 +63,7 @@ async fn agg_x_dim_0_inner() {
|
||||
buffer_size: 1024 * 4,
|
||||
};
|
||||
let _bin_count = 20;
|
||||
let ts1 = query.timebin as u64 * query.channel_config.time_bin_size.ns;
|
||||
let ts1 = query.timebin as u64 * query.channel_config.time_bin_size.0;
|
||||
let ts2 = ts1 + HOUR * 24;
|
||||
let range = NanoRange { beg: ts1, end: ts2 };
|
||||
let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024));
|
||||
@@ -100,7 +108,7 @@ async fn agg_x_dim_1_inner() {
|
||||
series: None,
|
||||
},
|
||||
keyspace: 3,
|
||||
time_bin_size: Nanos { ns: DAY },
|
||||
time_bin_size: TsNano(DAY),
|
||||
array: true,
|
||||
shape: Shape::Wave(1024),
|
||||
scalar_type: ScalarType::F64,
|
||||
@@ -112,7 +120,7 @@ async fn agg_x_dim_1_inner() {
|
||||
buffer_size: 17,
|
||||
};
|
||||
let _bin_count = 10;
|
||||
let ts1 = query.timebin as u64 * query.channel_config.time_bin_size.ns;
|
||||
let ts1 = query.timebin as u64 * query.channel_config.time_bin_size.0;
|
||||
let ts2 = ts1 + HOUR * 24;
|
||||
let range = NanoRange { beg: ts1, end: ts2 };
|
||||
let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024));
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use err::Error;
|
||||
use netpod::range::evrange::NanoRange;
|
||||
use netpod::Channel;
|
||||
use netpod::ChannelConfig;
|
||||
use netpod::NanoRange;
|
||||
use netpod::NodeConfigCached;
|
||||
use parse::channelconfig::extract_matching_config_entry;
|
||||
use parse::channelconfig::read_local_config;
|
||||
@@ -29,7 +29,7 @@ pub async fn config(
|
||||
let channel_config = ChannelConfig {
|
||||
channel: channel.clone(),
|
||||
keyspace: entry.ks as u8,
|
||||
time_bin_size: entry.bs,
|
||||
time_bin_size: entry.bs.clone(),
|
||||
shape: shape,
|
||||
scalar_type: entry.scalar_type.clone(),
|
||||
byte_order: entry.byte_order.clone(),
|
||||
|
||||
@@ -1,14 +1,22 @@
|
||||
use super::paths;
|
||||
use bytes::BytesMut;
|
||||
use err::{ErrStr, Error};
|
||||
use err::ErrStr;
|
||||
use err::Error;
|
||||
use futures_util::StreamExt;
|
||||
use netpod::log::*;
|
||||
use netpod::{ChannelConfig, NanoRange, Nanos, Node};
|
||||
use netpod::range::evrange::NanoRange;
|
||||
use netpod::ChannelConfig;
|
||||
use netpod::Node;
|
||||
use netpod::TsNano;
|
||||
use std::fmt;
|
||||
use std::path::PathBuf;
|
||||
use std::time::Instant;
|
||||
use tokio::fs::{File, OpenOptions};
|
||||
use tokio::io::{AsyncReadExt, AsyncSeekExt, ErrorKind, SeekFrom};
|
||||
use tokio::fs::File;
|
||||
use tokio::fs::OpenOptions;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::io::AsyncSeekExt;
|
||||
use tokio::io::ErrorKind;
|
||||
use tokio::io::SeekFrom;
|
||||
|
||||
pub struct Positioned {
|
||||
pub file: OpenedFile,
|
||||
@@ -237,13 +245,11 @@ async fn open_files_inner(
|
||||
return Ok(());
|
||||
}
|
||||
for &tb in &timebins {
|
||||
let ts_bin = Nanos {
|
||||
ns: tb * channel_config.time_bin_size.ns,
|
||||
};
|
||||
if ts_bin.ns >= range.end {
|
||||
let ts_bin = TsNano(tb * channel_config.time_bin_size.0);
|
||||
if ts_bin.ns() >= range.end {
|
||||
continue;
|
||||
}
|
||||
if ts_bin.ns + channel_config.time_bin_size.ns <= range.beg {
|
||||
if ts_bin.ns() + channel_config.time_bin_size.ns() <= range.beg {
|
||||
continue;
|
||||
}
|
||||
let mut a = Vec::new();
|
||||
@@ -337,10 +343,8 @@ async fn open_expanded_files_inner(
|
||||
}
|
||||
let mut p1 = None;
|
||||
for (i1, tb) in timebins.iter().enumerate().rev() {
|
||||
let ts_bin = Nanos {
|
||||
ns: tb * channel_config.time_bin_size.ns,
|
||||
};
|
||||
if ts_bin.ns <= range.beg {
|
||||
let ts_bin = TsNano(tb * channel_config.time_bin_size.ns());
|
||||
if ts_bin.ns() <= range.beg {
|
||||
p1 = Some(i1);
|
||||
break;
|
||||
}
|
||||
@@ -407,8 +411,10 @@ async fn open_expanded_files_inner(
|
||||
mod test {
|
||||
use super::*;
|
||||
use err::Error;
|
||||
use netpod::range::evrange::NanoRange;
|
||||
use netpod::test_data_base_path_databuffer;
|
||||
use netpod::timeunits::*;
|
||||
use netpod::{test_data_base_path_databuffer, ChannelConfig, NanoRange, Nanos};
|
||||
use netpod::ChannelConfig;
|
||||
use std::path::PathBuf;
|
||||
use tokio::fs::OpenOptions;
|
||||
|
||||
@@ -810,7 +816,7 @@ mod test {
|
||||
|
||||
#[test]
|
||||
fn expanded_file_list() {
|
||||
let range = netpod::NanoRange {
|
||||
let range = NanoRange {
|
||||
beg: DAY + HOUR * 5,
|
||||
end: DAY + HOUR * 8,
|
||||
};
|
||||
@@ -823,7 +829,7 @@ mod test {
|
||||
let channel_config = ChannelConfig {
|
||||
channel: chn,
|
||||
keyspace: 2,
|
||||
time_bin_size: Nanos { ns: DAY },
|
||||
time_bin_size: TsNano(DAY),
|
||||
scalar_type: netpod::ScalarType::I32,
|
||||
byte_order: netpod::ByteOrder::Big,
|
||||
shape: netpod::Shape::Scalar,
|
||||
|
||||
@@ -11,10 +11,10 @@ use items_0::streamitem::StreamItem;
|
||||
use items_2::eventfull::EventFull;
|
||||
use items_2::merger::Merger;
|
||||
use netpod::log::*;
|
||||
use netpod::range::evrange::NanoRange;
|
||||
use netpod::timeunits::SEC;
|
||||
use netpod::ChannelConfig;
|
||||
use netpod::DiskIoTune;
|
||||
use netpod::NanoRange;
|
||||
use netpod::Node;
|
||||
use std::pin::Pin;
|
||||
use std::task::Context;
|
||||
@@ -271,16 +271,17 @@ mod test {
|
||||
use items_0::streamitem::RangeCompletableItem;
|
||||
use items_0::streamitem::StreamItem;
|
||||
use netpod::log::*;
|
||||
use netpod::range::evrange::NanoRange;
|
||||
use netpod::timeunits::DAY;
|
||||
use netpod::timeunits::MS;
|
||||
use netpod::ByteSize;
|
||||
use netpod::ChannelConfig;
|
||||
use netpod::DiskIoTune;
|
||||
use netpod::Nanos;
|
||||
use netpod::TsNano;
|
||||
use streams::eventchunker::EventChunkerConf;
|
||||
use streams::rangefilter2::RangeFilter2;
|
||||
|
||||
fn read_expanded_for_range(range: netpod::NanoRange, nodeix: usize) -> Result<(usize, Vec<u64>), Error> {
|
||||
fn read_expanded_for_range(range: NanoRange, nodeix: usize) -> Result<(usize, Vec<u64>), Error> {
|
||||
let chn = netpod::Channel {
|
||||
backend: "test-disk-databuffer".into(),
|
||||
name: "scalar-i32-be".into(),
|
||||
@@ -290,7 +291,7 @@ mod test {
|
||||
let channel_config = ChannelConfig {
|
||||
channel: chn,
|
||||
keyspace: 2,
|
||||
time_bin_size: Nanos { ns: DAY },
|
||||
time_bin_size: TsNano(DAY),
|
||||
scalar_type: netpod::ScalarType::I32,
|
||||
byte_order: netpod::ByteOrder::Big,
|
||||
shape: netpod::Shape::Scalar,
|
||||
@@ -346,7 +347,7 @@ mod test {
|
||||
|
||||
#[test]
|
||||
fn read_expanded_0() -> Result<(), Error> {
|
||||
let range = netpod::NanoRange {
|
||||
let range = NanoRange {
|
||||
beg: DAY + MS * 0,
|
||||
end: DAY + MS * 100,
|
||||
};
|
||||
@@ -362,7 +363,7 @@ mod test {
|
||||
|
||||
#[test]
|
||||
fn read_expanded_1() -> Result<(), Error> {
|
||||
let range = netpod::NanoRange {
|
||||
let range = NanoRange {
|
||||
beg: DAY + MS * 0,
|
||||
end: DAY + MS * 1501,
|
||||
};
|
||||
@@ -376,7 +377,7 @@ mod test {
|
||||
|
||||
#[test]
|
||||
fn read_expanded_2() -> Result<(), Error> {
|
||||
let range = netpod::NanoRange {
|
||||
let range = NanoRange {
|
||||
beg: DAY - MS * 100,
|
||||
end: DAY + MS * 1501,
|
||||
};
|
||||
@@ -388,7 +389,7 @@ mod test {
|
||||
#[test]
|
||||
fn read_expanded_3() -> Result<(), Error> {
|
||||
use netpod::timeunits::*;
|
||||
let range = netpod::NanoRange {
|
||||
let range = NanoRange {
|
||||
beg: DAY - MS * 1500,
|
||||
end: DAY + MS * 1501,
|
||||
};
|
||||
|
||||
@@ -1,12 +1,23 @@
|
||||
use crate::ChannelConfigExt;
|
||||
use bitshuffle::bitshuffle_compress;
|
||||
use bytes::{BufMut, BytesMut};
|
||||
use bytes::BufMut;
|
||||
use bytes::BytesMut;
|
||||
use err::Error;
|
||||
use netpod::log::*;
|
||||
use netpod::timeunits::*;
|
||||
use netpod::{ByteOrder, Channel, ChannelConfig, GenVar, Nanos, Node, ScalarType, SfDatabuffer, Shape};
|
||||
use std::path::{Path, PathBuf};
|
||||
use tokio::fs::{File, OpenOptions};
|
||||
use netpod::ByteOrder;
|
||||
use netpod::Channel;
|
||||
use netpod::ChannelConfig;
|
||||
use netpod::GenVar;
|
||||
use netpod::Node;
|
||||
use netpod::ScalarType;
|
||||
use netpod::SfDatabuffer;
|
||||
use netpod::Shape;
|
||||
use netpod::TsNano;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use tokio::fs::File;
|
||||
use tokio::fs::OpenOptions;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
|
||||
pub async fn gen_test_data() -> Result<(), Error> {
|
||||
@@ -27,7 +38,7 @@ pub async fn gen_test_data() -> Result<(), Error> {
|
||||
series: None,
|
||||
},
|
||||
keyspace: 2,
|
||||
time_bin_size: Nanos { ns: DAY },
|
||||
time_bin_size: TsNano(DAY),
|
||||
scalar_type: ScalarType::I32,
|
||||
byte_order: ByteOrder::Big,
|
||||
shape: Shape::Scalar,
|
||||
@@ -46,7 +57,7 @@ pub async fn gen_test_data() -> Result<(), Error> {
|
||||
series: None,
|
||||
},
|
||||
keyspace: 3,
|
||||
time_bin_size: Nanos { ns: DAY },
|
||||
time_bin_size: TsNano(DAY),
|
||||
array: true,
|
||||
scalar_type: ScalarType::F64,
|
||||
shape: Shape::Wave(21),
|
||||
@@ -65,7 +76,7 @@ pub async fn gen_test_data() -> Result<(), Error> {
|
||||
series: None,
|
||||
},
|
||||
keyspace: 3,
|
||||
time_bin_size: Nanos { ns: DAY },
|
||||
time_bin_size: TsNano(DAY),
|
||||
scalar_type: ScalarType::U16,
|
||||
byte_order: ByteOrder::Little,
|
||||
shape: Shape::Wave(77),
|
||||
@@ -84,7 +95,7 @@ pub async fn gen_test_data() -> Result<(), Error> {
|
||||
series: None,
|
||||
},
|
||||
keyspace: 2,
|
||||
time_bin_size: Nanos { ns: DAY },
|
||||
time_bin_size: TsNano(DAY),
|
||||
scalar_type: ScalarType::I32,
|
||||
byte_order: ByteOrder::Little,
|
||||
shape: Shape::Scalar,
|
||||
@@ -103,7 +114,7 @@ pub async fn gen_test_data() -> Result<(), Error> {
|
||||
series: None,
|
||||
},
|
||||
keyspace: 2,
|
||||
time_bin_size: Nanos { ns: DAY },
|
||||
time_bin_size: TsNano(DAY),
|
||||
scalar_type: ScalarType::I32,
|
||||
byte_order: ByteOrder::Little,
|
||||
shape: Shape::Scalar,
|
||||
@@ -170,9 +181,9 @@ async fn gen_channel(chn: &ChannelGenProps, split: u32, node: &Node, ensemble: &
|
||||
.await
|
||||
.map_err(|k| Error::with_msg(format!("can not generate config {:?}", k)))?;
|
||||
let mut evix = 0;
|
||||
let mut ts = Nanos { ns: 0 };
|
||||
let mut ts = TsNano(0);
|
||||
let mut pulse = 0;
|
||||
while ts.ns < DAY * 3 {
|
||||
while ts.ns() < DAY * 3 {
|
||||
let res = gen_timebin(
|
||||
evix,
|
||||
ts,
|
||||
@@ -187,7 +198,7 @@ async fn gen_channel(chn: &ChannelGenProps, split: u32, node: &Node, ensemble: &
|
||||
)
|
||||
.await?;
|
||||
evix = res.evix;
|
||||
ts.ns = res.ts.ns;
|
||||
ts = res.ts;
|
||||
pulse = res.pulse;
|
||||
}
|
||||
Ok(())
|
||||
@@ -231,7 +242,7 @@ async fn gen_config(
|
||||
buf.put_i64(ts);
|
||||
buf.put_i64(pulse);
|
||||
buf.put_u32(config.keyspace as u32);
|
||||
buf.put_u64(config.time_bin_size.ns / MS);
|
||||
buf.put_u64(config.time_bin_size.ns() / MS);
|
||||
buf.put_i32(sc);
|
||||
buf.put_i32(status);
|
||||
buf.put_i8(bb);
|
||||
@@ -316,13 +327,13 @@ impl CountedFile {
|
||||
|
||||
struct GenTimebinRes {
|
||||
evix: u64,
|
||||
ts: Nanos,
|
||||
ts: TsNano,
|
||||
pulse: u64,
|
||||
}
|
||||
|
||||
async fn gen_timebin(
|
||||
evix: u64,
|
||||
ts: Nanos,
|
||||
ts: TsNano,
|
||||
pulse: u64,
|
||||
ts_spacing: u64,
|
||||
channel_path: &Path,
|
||||
@@ -332,11 +343,11 @@ async fn gen_timebin(
|
||||
ensemble: &Ensemble,
|
||||
gen_var: &GenVar,
|
||||
) -> Result<GenTimebinRes, Error> {
|
||||
let tb = ts.ns / config.time_bin_size.ns;
|
||||
let tb = ts.ns() / config.time_bin_size.ns();
|
||||
let path = channel_path.join(format!("{:019}", tb)).join(format!("{:010}", split));
|
||||
tokio::fs::create_dir_all(&path).await?;
|
||||
let data_path = path.join(format!("{:019}_{:05}_Data", config.time_bin_size.ns / MS, 0));
|
||||
let index_path = path.join(format!("{:019}_{:05}_Data_Index", config.time_bin_size.ns / MS, 0));
|
||||
let data_path = path.join(format!("{:019}_{:05}_Data", config.time_bin_size.ns() / MS, 0));
|
||||
let index_path = path.join(format!("{:019}_{:05}_Data_Index", config.time_bin_size.ns() / MS, 0));
|
||||
info!("open data file {:?}", data_path);
|
||||
let file = OpenOptions::new()
|
||||
.write(true)
|
||||
@@ -363,34 +374,32 @@ async fn gen_timebin(
|
||||
let mut evix = evix;
|
||||
let mut ts = ts;
|
||||
let mut pulse = pulse;
|
||||
let tsmax = Nanos {
|
||||
ns: (tb + 1) * config.time_bin_size.ns,
|
||||
};
|
||||
while ts.ns < tsmax.ns {
|
||||
let tsmax = TsNano((tb + 1) * config.time_bin_size.ns());
|
||||
while ts.ns() < tsmax.ns() {
|
||||
match gen_var {
|
||||
// TODO
|
||||
// Splits and nodes are not in 1-to-1 correspondence.
|
||||
GenVar::Default => {
|
||||
if evix % ensemble.nodes.len() as u64 == split as u64 {
|
||||
gen_event(&mut file, index_file.as_mut(), evix, ts, pulse, config, gen_var).await?;
|
||||
gen_event(&mut file, index_file.as_mut(), evix, ts.clone(), pulse, config, gen_var).await?;
|
||||
}
|
||||
}
|
||||
GenVar::ConstRegular => {
|
||||
if evix % ensemble.nodes.len() as u64 == split as u64 {
|
||||
gen_event(&mut file, index_file.as_mut(), evix, ts, pulse, config, gen_var).await?;
|
||||
gen_event(&mut file, index_file.as_mut(), evix, ts.clone(), pulse, config, gen_var).await?;
|
||||
}
|
||||
}
|
||||
GenVar::TimeWeight => {
|
||||
let m = evix % 20;
|
||||
if m == 0 || m == 1 {
|
||||
if evix % ensemble.nodes.len() as u64 == split as u64 {
|
||||
gen_event(&mut file, index_file.as_mut(), evix, ts, pulse, config, gen_var).await?;
|
||||
gen_event(&mut file, index_file.as_mut(), evix, ts.clone(), pulse, config, gen_var).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
evix += 1;
|
||||
ts.ns += ts_spacing;
|
||||
ts.0 += ts_spacing;
|
||||
pulse += 1;
|
||||
}
|
||||
let ret = GenTimebinRes { evix, ts, pulse };
|
||||
@@ -413,7 +422,7 @@ async fn gen_event(
|
||||
file: &mut CountedFile,
|
||||
index_file: Option<&mut CountedFile>,
|
||||
evix: u64,
|
||||
ts: Nanos,
|
||||
ts: TsNano,
|
||||
pulse: u64,
|
||||
config: &ChannelConfig,
|
||||
gen_var: &GenVar,
|
||||
@@ -423,7 +432,7 @@ async fn gen_event(
|
||||
let mut buf = BytesMut::with_capacity(1024 * 16);
|
||||
buf.put_i32(0xcafecafe as u32 as i32);
|
||||
buf.put_u64(ttl);
|
||||
buf.put_u64(ts.ns);
|
||||
buf.put_u64(ts.ns());
|
||||
buf.put_u64(pulse);
|
||||
buf.put_u64(ioc_ts);
|
||||
buf.put_u8(0);
|
||||
@@ -541,7 +550,7 @@ async fn gen_event(
|
||||
file.write_all(buf.as_ref()).await?;
|
||||
if let Some(f) = index_file {
|
||||
let mut buf = BytesMut::with_capacity(16);
|
||||
buf.put_u64(ts.ns);
|
||||
buf.put_u64(ts.ns());
|
||||
buf.put_u64(z);
|
||||
f.write_all(&buf).await?;
|
||||
}
|
||||
|
||||
@@ -1,11 +1,13 @@
|
||||
use arrayref::array_ref;
|
||||
use err::Error;
|
||||
use netpod::log::*;
|
||||
use netpod::NanoRange;
|
||||
use netpod::Nanos;
|
||||
use netpod::range::evrange::NanoRange;
|
||||
use netpod::TsNano;
|
||||
use std::mem::size_of;
|
||||
use tokio::fs::File;
|
||||
use tokio::io::{AsyncReadExt, AsyncSeekExt, SeekFrom};
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::io::AsyncSeekExt;
|
||||
use tokio::io::SeekFrom;
|
||||
|
||||
pub fn find_ge(range: NanoRange, expand_right: bool, buf: &[u8]) -> Result<Option<(u64, u64)>, Error> {
|
||||
type VT = u64;
|
||||
@@ -174,7 +176,7 @@ pub fn parse_channel_header(buf: &[u8]) -> Result<(u32,), Error> {
|
||||
Ok((len1 as u32,))
|
||||
}
|
||||
|
||||
pub fn parse_event(buf: &[u8]) -> Result<(u32, Nanos), Error> {
|
||||
pub fn parse_event(buf: &[u8]) -> Result<(u32, TsNano), Error> {
|
||||
if buf.len() < 4 {
|
||||
return Err(Error::with_msg(format!("parse_event buf len: {}", buf.len())));
|
||||
}
|
||||
@@ -194,10 +196,10 @@ pub fn parse_event(buf: &[u8]) -> Result<(u32, Nanos), Error> {
|
||||
return Err(Error::with_msg(format!("len mismatch len1: {} len2: {}", len1, len2)));
|
||||
}
|
||||
let ts = u64::from_be_bytes(*array_ref![buf, 12, 8]);
|
||||
Ok((len1 as u32, Nanos { ns: ts }))
|
||||
Ok((len1 as u32, TsNano(ts)))
|
||||
}
|
||||
|
||||
pub async fn read_event_at(pos: u64, file: &mut File) -> Result<(u32, Nanos), Error> {
|
||||
pub async fn read_event_at(pos: u64, file: &mut File) -> Result<(u32, TsNano), Error> {
|
||||
file.seek(SeekFrom::Start(pos)).await?;
|
||||
let mut buf = vec![0; 1024];
|
||||
let _n1 = read(&mut buf, file).await?;
|
||||
@@ -220,7 +222,7 @@ pub async fn position_static_len_datafile(
|
||||
let evlen = ev.0 as u64;
|
||||
let mut j = headoff;
|
||||
let mut k = ((flen - headoff) / evlen - 1) * evlen + headoff;
|
||||
let x = ev.1.ns;
|
||||
let x = ev.1.ns();
|
||||
let t = read_event_at(k, &mut file).await?;
|
||||
if t.0 != evlen as u32 {
|
||||
Err(Error::with_msg(format!(
|
||||
@@ -228,7 +230,7 @@ pub async fn position_static_len_datafile(
|
||||
t.0, evlen
|
||||
)))?;
|
||||
}
|
||||
let y = t.1.ns;
|
||||
let y = t.1.ns();
|
||||
let mut nreads = 2;
|
||||
if x >= range.end {
|
||||
if expand_right {
|
||||
@@ -275,7 +277,7 @@ pub async fn position_static_len_datafile(
|
||||
)))?;
|
||||
}
|
||||
nreads += 1;
|
||||
let e = t.1.ns;
|
||||
let e = t.1.ns();
|
||||
if e < range.beg {
|
||||
x = e;
|
||||
j = m;
|
||||
@@ -301,7 +303,7 @@ pub async fn position_static_len_datafile_at_largest_smaller_than(
|
||||
let evlen = ev.0 as u64;
|
||||
let mut j = headoff;
|
||||
let mut k = ((flen - headoff) / evlen - 1) * evlen + headoff;
|
||||
let x = ev.1.ns;
|
||||
let x = ev.1.ns();
|
||||
let t = read_event_at(k, &mut file).await?;
|
||||
if t.0 != evlen as u32 {
|
||||
Err(Error::with_msg(format!(
|
||||
@@ -309,7 +311,7 @@ pub async fn position_static_len_datafile_at_largest_smaller_than(
|
||||
t.0, evlen
|
||||
)))?;
|
||||
}
|
||||
let y = t.1.ns;
|
||||
let y = t.1.ns();
|
||||
let mut nreads = 2;
|
||||
if x >= range.beg {
|
||||
file.seek(SeekFrom::Start(j)).await?;
|
||||
@@ -333,7 +335,7 @@ pub async fn position_static_len_datafile_at_largest_smaller_than(
|
||||
)))?;
|
||||
}
|
||||
nreads += 1;
|
||||
let x = t.1.ns;
|
||||
let x = t.1.ns();
|
||||
if x < range.beg {
|
||||
j = m;
|
||||
} else {
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
use err::Error;
|
||||
use futures_util::StreamExt;
|
||||
use netpod::timeunits::MS;
|
||||
use netpod::{ChannelConfig, Nanos, Node};
|
||||
use netpod::ChannelConfig;
|
||||
use netpod::Node;
|
||||
use netpod::TsNano;
|
||||
use std::path::PathBuf;
|
||||
|
||||
// TODO remove/replace this
|
||||
@@ -19,7 +21,7 @@ pub fn datapath(timebin: u64, config: &netpod::ChannelConfig, split: u32, node:
|
||||
.join(config.channel.name.clone())
|
||||
.join(format!("{:019}", timebin))
|
||||
.join(format!("{:010}", split))
|
||||
.join(format!("{:019}_00000_Data", config.time_bin_size.ns / MS))
|
||||
.join(format!("{:019}_00000_Data", config.time_bin_size.ns() / MS))
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -76,7 +78,7 @@ pub async fn datapaths_for_timebin(
|
||||
.join(config.channel.name.clone())
|
||||
.join(format!("{:019}", timebin))
|
||||
.join(format!("{:010}", split))
|
||||
.join(format!("{:019}_00000_Data", config.time_bin_size.ns / MS));
|
||||
.join(format!("{:019}_00000_Data", config.time_bin_size.ns() / MS));
|
||||
ret.push(path);
|
||||
}
|
||||
Ok(ret)
|
||||
@@ -92,21 +94,21 @@ pub fn channel_timebins_dir_path(channel_config: &ChannelConfig, node: &Node) ->
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
pub fn data_dir_path(ts: Nanos, channel_config: &ChannelConfig, split: u32, node: &Node) -> Result<PathBuf, Error> {
|
||||
pub fn data_dir_path(ts: TsNano, channel_config: &ChannelConfig, split: u32, node: &Node) -> Result<PathBuf, Error> {
|
||||
let ret = channel_timebins_dir_path(channel_config, node)?
|
||||
.join(format!("{:019}", ts.ns / channel_config.time_bin_size.ns))
|
||||
.join(format!("{:019}", ts.ns() / channel_config.time_bin_size.ns()))
|
||||
.join(format!("{:010}", split));
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
pub fn data_path(ts: Nanos, channel_config: &ChannelConfig, split: u32, node: &Node) -> Result<PathBuf, Error> {
|
||||
let fname = format!("{:019}_{:05}_Data", channel_config.time_bin_size.ns / MS, 0);
|
||||
pub fn data_path(ts: TsNano, channel_config: &ChannelConfig, split: u32, node: &Node) -> Result<PathBuf, Error> {
|
||||
let fname = format!("{:019}_{:05}_Data", channel_config.time_bin_size.ns() / MS, 0);
|
||||
let ret = data_dir_path(ts, channel_config, split, node)?.join(fname);
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
pub fn index_path(ts: Nanos, channel_config: &ChannelConfig, split: u32, node: &Node) -> Result<PathBuf, Error> {
|
||||
let fname = format!("{:019}_{:05}_Data_Index", channel_config.time_bin_size.ns / MS, 0);
|
||||
pub fn index_path(ts: TsNano, channel_config: &ChannelConfig, split: u32, node: &Node) -> Result<PathBuf, Error> {
|
||||
let fname = format!("{:019}_{:05}_Data_Index", channel_config.time_bin_size.ns() / MS, 0);
|
||||
let ret = data_dir_path(ts, channel_config, split, node)?.join(fname);
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
@@ -10,11 +10,11 @@ use items_2::channelevents::ChannelEvents;
|
||||
use items_2::eventfull::EventFull;
|
||||
use netpod::log::*;
|
||||
use netpod::query::PlainEventsQuery;
|
||||
use netpod::range::evrange::NanoRange;
|
||||
use netpod::AggKind;
|
||||
use netpod::ByteSize;
|
||||
use netpod::Channel;
|
||||
use netpod::DiskIoTune;
|
||||
use netpod::NanoRange;
|
||||
use netpod::NodeConfigCached;
|
||||
use netpod::ScalarType;
|
||||
use netpod::Shape;
|
||||
@@ -92,7 +92,7 @@ pub async fn make_event_pipe(
|
||||
let channel_config = netpod::ChannelConfig {
|
||||
channel: evq.channel().clone(),
|
||||
keyspace: entry.ks as u8,
|
||||
time_bin_size: entry.bs,
|
||||
time_bin_size: entry.bs.clone(),
|
||||
shape,
|
||||
scalar_type: entry.scalar_type.clone(),
|
||||
byte_order: entry.byte_order.clone(),
|
||||
@@ -172,8 +172,8 @@ pub fn make_local_event_blobs_stream(
|
||||
let channel_config = netpod::ChannelConfig {
|
||||
channel,
|
||||
keyspace: entry.ks as u8,
|
||||
time_bin_size: entry.bs,
|
||||
shape: shape,
|
||||
time_bin_size: entry.bs.clone(),
|
||||
shape,
|
||||
scalar_type: entry.scalar_type.clone(),
|
||||
byte_order: entry.byte_order.clone(),
|
||||
array: entry.is_array,
|
||||
@@ -218,7 +218,7 @@ pub fn make_remote_event_blobs_stream(
|
||||
let channel_config = netpod::ChannelConfig {
|
||||
channel,
|
||||
keyspace: entry.ks as u8,
|
||||
time_bin_size: entry.bs,
|
||||
time_bin_size: entry.bs.clone(),
|
||||
shape: shape,
|
||||
scalar_type: entry.scalar_type.clone(),
|
||||
byte_order: entry.byte_order.clone(),
|
||||
|
||||
Reference in New Issue
Block a user