WIP checks

This commit is contained in:
Dominik Werder
2023-06-17 23:07:59 +02:00
parent 3cd74601a4
commit 44dd43240b
24 changed files with 492 additions and 368 deletions

View File

@@ -11,6 +11,7 @@ use netpod::ByteSize;
use netpod::DiskIoTune;
use netpod::Node;
use netpod::ScalarType;
use netpod::SfChFetchInfo;
use netpod::SfDatabuffer;
use netpod::SfDbChannel;
use netpod::Shape;
@@ -61,6 +62,15 @@ async fn agg_x_dim_0_inner() {
tb_file_count: 1,
buffer_size: 1024 * 4,
};
let fetch_info = SfChFetchInfo::new(
"sf-databuffer",
"S10BC01-DBAM070:EOM1_T1",
2,
TsNano(DAY),
ByteOrder::Big,
ScalarType::F64,
Shape::Scalar,
);
let _bin_count = 20;
let ts1 = query.timebin as u64 * query.channel_config.time_bin_size.0;
let ts2 = ts1 + HOUR * 24;
@@ -71,7 +81,7 @@ async fn agg_x_dim_0_inner() {
disk_io_tune.read_buffer_len = query.buffer_size as usize;
let fut1 = EventChunkerMultifile::new(
range.clone(),
query.channel_config.clone(),
fetch_info,
node.clone(),
0,
disk_io_tune,
@@ -114,6 +124,15 @@ async fn agg_x_dim_1_inner() {
tb_file_count: 1,
buffer_size: 17,
};
let fetch_info = SfChFetchInfo::new(
"ks",
"wave1",
2,
TsNano(DAY),
ByteOrder::Big,
ScalarType::F64,
Shape::Scalar,
);
let _bin_count = 10;
let ts1 = query.timebin as u64 * query.channel_config.time_bin_size.0;
let ts2 = ts1 + HOUR * 24;
@@ -124,7 +143,7 @@ async fn agg_x_dim_1_inner() {
disk_io_tune.read_buffer_len = query.buffer_size as usize;
let fut1 = super::eventblobs::EventChunkerMultifile::new(
range.clone(),
query.channel_config.clone(),
fetch_info,
node.clone(),
0,
disk_io_tune,

View File

@@ -7,6 +7,7 @@ use futures_util::StreamExt;
use netpod::log::*;
use netpod::range::evrange::NanoRange;
use netpod::Node;
use netpod::SfChFetchInfo;
use netpod::TsNano;
use std::fmt;
use std::path::PathBuf;
@@ -209,18 +210,18 @@ impl fmt::Debug for OpenedFile {
pub fn open_files(
range: &NanoRange,
channel_config: &SfDbChConf,
fetch_info: &SfChFetchInfo,
node: Node,
) -> async_channel::Receiver<Result<OpenedFileSet, Error>> {
let (chtx, chrx) = async_channel::bounded(2);
let range = range.clone();
let channel_config = channel_config.clone();
let fetch_info = fetch_info.clone();
tokio::spawn(async move {
match open_files_inner(&chtx, &range, &channel_config, node).await {
match open_files_inner(&chtx, &range, &fetch_info, node).await {
Ok(_) => {}
Err(e) => {
let e = e.add_public_msg(format!(
"Can not open file for channel: {channel_config:?} range: {range:?}"
"Can not open file for channel: {fetch_info:?} range: {range:?}"
));
match chtx.send(Err(e.into())).await {
Ok(_) => {}
@@ -238,24 +239,24 @@ pub fn open_files(
async fn open_files_inner(
chtx: &async_channel::Sender<Result<OpenedFileSet, Error>>,
range: &NanoRange,
channel_config: &SfDbChConf,
fetch_info: &SfChFetchInfo,
node: Node,
) -> Result<(), Error> {
let channel_config = channel_config.clone();
let timebins = get_timebins(&channel_config, node.clone()).await?;
let fetch_info = fetch_info.clone();
let timebins = get_timebins(&fetch_info, node.clone()).await?;
if timebins.len() == 0 {
return Ok(());
}
for &tb in &timebins {
let ts_bin = TsNano(tb * channel_config.time_bin_size.0);
let ts_bin = TsNano(tb * fetch_info.bs().ns());
if ts_bin.ns() >= range.end {
continue;
}
if ts_bin.ns() + channel_config.time_bin_size.ns() <= range.beg {
if ts_bin.ns() + fetch_info.bs().ns() <= range.beg {
continue;
}
let mut a = Vec::new();
for path in paths::datapaths_for_timebin(tb, &channel_config, &node).await? {
for path in paths::datapaths_for_timebin(tb, &fetch_info, &node).await? {
let w = position_file(&path, range, false, false).await?;
if w.found {
a.push(w.file);
@@ -278,14 +279,14 @@ Expanded to one event before and after the requested range, if exists.
*/
pub fn open_expanded_files(
range: &NanoRange,
channel_config: &SfDbChConf,
fetch_info: &SfChFetchInfo,
node: Node,
) -> async_channel::Receiver<Result<OpenedFileSet, Error>> {
let (chtx, chrx) = async_channel::bounded(2);
let range = range.clone();
let channel_config = channel_config.clone();
let fetch_info = fetch_info.clone();
tokio::spawn(async move {
match open_expanded_files_inner(&chtx, &range, &channel_config, node).await {
match open_expanded_files_inner(&chtx, &range, &fetch_info, node).await {
Ok(_) => {}
Err(e) => match chtx.send(Err(e.into())).await {
Ok(_) => {}
@@ -299,9 +300,9 @@ pub fn open_expanded_files(
chrx
}
async fn get_timebins(channel_config: &SfDbChConf, node: Node) -> Result<Vec<u64>, Error> {
async fn get_timebins(fetch_info: &SfChFetchInfo, node: Node) -> Result<Vec<u64>, Error> {
let mut timebins = Vec::new();
let p0 = paths::channel_timebins_dir_path(&channel_config, &node)?;
let p0 = paths::channel_timebins_dir_path(&fetch_info, &node)?;
match tokio::fs::read_dir(&p0).await {
Ok(rd) => {
let mut rd = tokio_stream::wrappers::ReadDirStream::new(rd);
@@ -323,10 +324,7 @@ async fn get_timebins(channel_config: &SfDbChConf, node: Node) -> Result<Vec<u64
Ok(timebins)
}
Err(e) => {
debug!(
"get_timebins no timebins for {:?} {:?} p0 {:?}",
channel_config, e, p0
);
debug!("get_timebins no timebins for {:?} {:?} p0 {:?}", fetch_info, e, p0);
Ok(Vec::new())
}
}
@@ -335,17 +333,17 @@ async fn get_timebins(channel_config: &SfDbChConf, node: Node) -> Result<Vec<u64
async fn open_expanded_files_inner(
chtx: &async_channel::Sender<Result<OpenedFileSet, Error>>,
range: &NanoRange,
channel_config: &SfDbChConf,
fetch_info: &SfChFetchInfo,
node: Node,
) -> Result<(), Error> {
let channel_config = channel_config.clone();
let timebins = get_timebins(&channel_config, node.clone()).await?;
let fetch_info = fetch_info.clone();
let timebins = get_timebins(&fetch_info, node.clone()).await?;
if timebins.len() == 0 {
return Ok(());
}
let mut p1 = None;
for (i1, tb) in timebins.iter().enumerate().rev() {
let ts_bin = TsNano(tb * channel_config.time_bin_size.ns());
let ts_bin = TsNano(tb * fetch_info.bs().ns());
if ts_bin.ns() <= range.beg {
p1 = Some(i1);
break;
@@ -354,15 +352,15 @@ async fn open_expanded_files_inner(
let mut p1 = if let Some(i1) = p1 { i1 } else { 0 };
if p1 >= timebins.len() {
return Err(Error::with_msg(format!(
"logic error p1 {} range {:?} channel_config {:?}",
p1, range, channel_config
"logic error p1 {} range {:?} fetch_info {:?}",
p1, range, fetch_info
)));
}
let mut found_pre = false;
loop {
let tb = timebins[p1];
let mut a = Vec::new();
for path in paths::datapaths_for_timebin(tb, &channel_config, &node).await? {
for path in paths::datapaths_for_timebin(tb, &fetch_info, &node).await? {
let w = position_file(&path, range, true, false).await?;
if w.found {
debug!("----- open_expanded_files_inner w.found for {:?}", path);
@@ -390,7 +388,7 @@ async fn open_expanded_files_inner(
while p1 < timebins.len() {
let tb = timebins[p1];
let mut a = Vec::new();
for path in paths::datapaths_for_timebin(tb, &channel_config, &node).await? {
for path in paths::datapaths_for_timebin(tb, &fetch_info, &node).await? {
let w = position_file(&path, range, false, true).await?;
if w.found {
a.push(w.file);
@@ -404,7 +402,7 @@ async fn open_expanded_files_inner(
// TODO emit statsfor this or log somewhere?
debug!("Could not find some event before the requested range, fall back to standard file list.");
// Try to locate files according to non-expand-algorithm.
open_files_inner(chtx, range, &channel_config, node).await?;
open_files_inner(chtx, range, &fetch_info, node).await?;
}
Ok(())
}
@@ -823,20 +821,21 @@ mod test {
};
let chn = netpod::SfDbChannel::from_name(BACKEND, "scalar-i32-be");
// TODO read config from disk? Or expose the config from data generator?
let channel_config = SfDbChConf {
channel: chn,
keyspace: 2,
time_bin_size: TsNano(DAY),
scalar_type: netpod::ScalarType::I32,
byte_order: netpod::ByteOrder::Big,
shape: netpod::Shape::Scalar,
array: false,
compression: false,
};
let fetch_info = todo!();
// let fetch_info = SfChFetchInfo {
// channel: chn,
// keyspace: 2,
// time_bin_size: TsNano(DAY),
// scalar_type: netpod::ScalarType::I32,
// byte_order: netpod::ByteOrder::Big,
// shape: netpod::Shape::Scalar,
// array: false,
// compression: false,
// };
let cluster = netpod::test_cluster();
let task = async move {
let mut paths = Vec::new();
let mut files = open_expanded_files(&range, &channel_config, cluster.nodes[0].clone());
let mut files = open_expanded_files(&range, &fetch_info, cluster.nodes[0].clone());
while let Some(file) = files.next().await {
match file {
Ok(k) => {

View File

@@ -3,7 +3,6 @@ use crate::dataopen::open_files;
use crate::dataopen::OpenedFileSet;
use crate::eventchunker::EventChunker;
use crate::eventchunker::EventChunkerConf;
use crate::SfDbChConf;
use err::Error;
use futures_util::Stream;
use futures_util::StreamExt;
@@ -19,6 +18,7 @@ use netpod::range::evrange::NanoRange;
use netpod::timeunits::SEC;
use netpod::DiskIoTune;
use netpod::Node;
use netpod::SfChFetchInfo;
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::Context;
@@ -30,7 +30,7 @@ pub trait InputTraits: Stream<Item = Sitemty<EventFull>> {}
impl<T> InputTraits for T where T: Stream<Item = Sitemty<EventFull>> {}
pub struct EventChunkerMultifile {
channel_config: SfDbChConf,
fetch_info: SfChFetchInfo,
file_chan: async_channel::Receiver<Result<OpenedFileSet, Error>>,
evs: Option<Pin<Box<dyn InputTraits + Send>>>,
disk_io_tune: DiskIoTune,
@@ -58,7 +58,7 @@ impl EventChunkerMultifile {
pub fn new(
range: NanoRange,
channel_config: SfDbChConf,
fetch_info: SfChFetchInfo,
node: Node,
node_ix: usize,
disk_io_tune: DiskIoTune,
@@ -69,16 +69,16 @@ impl EventChunkerMultifile {
) -> Self {
info!("EventChunkerMultifile expand {expand} do_decompress {do_decompress}");
let file_chan = if expand {
open_expanded_files(&range, &channel_config, node)
open_expanded_files(&range, &fetch_info, node)
} else {
open_files(&range, &channel_config, node)
open_files(&range, &fetch_info, node)
};
Self {
file_chan,
evs: None,
disk_io_tune,
event_chunker_conf,
channel_config,
fetch_info,
range,
files_count: 0,
node_ix,
@@ -196,7 +196,7 @@ impl Stream for EventChunkerMultifile {
));
let chunker = EventChunker::from_event_boundary(
inp,
self.channel_config.clone(),
self.fetch_info.clone(),
self.range.clone(),
self.event_chunker_conf.clone(),
path.clone(),
@@ -231,7 +231,7 @@ impl Stream for EventChunkerMultifile {
);
let chunker = EventChunker::from_event_boundary(
inp,
self.channel_config.clone(),
self.fetch_info.clone(),
self.range.clone(),
self.event_chunker_conf.clone(),
of.path.clone(),

View File

@@ -1,4 +1,3 @@
use crate::SfDbChConf;
use bitshuffle::bitshuffle_decompress;
use bytes::Buf;
use bytes::BytesMut;
@@ -18,6 +17,7 @@ use netpod::timeunits::SEC;
use netpod::ByteSize;
use netpod::EventDataReadStats;
use netpod::ScalarType;
use netpod::SfChFetchInfo;
use netpod::Shape;
use parse::channelconfig::CompressionMethod;
use std::path::PathBuf;
@@ -33,7 +33,7 @@ pub struct EventChunker {
inp: NeedMinBuffer,
state: DataFileState,
need_min: u32,
channel_config: SfDbChConf,
fetch_info: SfChFetchInfo,
errored: bool,
completed: bool,
range: NanoRange,
@@ -94,7 +94,7 @@ impl EventChunker {
// TODO `expand` flag usage
pub fn from_start(
inp: Pin<Box<dyn Stream<Item = Result<FileChunkRead, Error>> + Send>>,
channel_config: SfDbChConf,
fetch_info: SfChFetchInfo,
range: NanoRange,
stats_conf: EventChunkerConf,
dbg_path: PathBuf,
@@ -108,7 +108,7 @@ impl EventChunker {
inp,
state: DataFileState::FileHeader,
need_min: 6,
channel_config,
fetch_info,
errored: false,
completed: false,
range,
@@ -135,7 +135,7 @@ impl EventChunker {
// TODO `expand` flag usage
pub fn from_event_boundary(
inp: Pin<Box<dyn Stream<Item = Result<FileChunkRead, Error>> + Send>>,
channel_config: SfDbChConf,
fetch_info: SfChFetchInfo,
range: NanoRange,
stats_conf: EventChunkerConf,
dbg_path: PathBuf,
@@ -146,7 +146,7 @@ impl EventChunker {
"EventChunker::{} do_decompress {}",
"from_event_boundary", do_decompress
);
let mut ret = Self::from_start(inp, channel_config, range, stats_conf, dbg_path, expand, do_decompress);
let mut ret = Self::from_start(inp, fetch_info, range, stats_conf, dbg_path, expand, do_decompress);
ret.state = DataFileState::Event;
ret.need_min = 4;
ret.inp.set_need_min(4);
@@ -223,7 +223,7 @@ impl EventChunker {
ts % SEC,
self.max_ts / SEC,
self.max_ts % SEC,
self.channel_config.shape,
self.fetch_info.shape(),
self.dbg_path
);
warn!("{}", msg);
@@ -239,7 +239,7 @@ impl EventChunker {
ts % SEC,
self.max_ts / SEC,
self.max_ts % SEC,
self.channel_config.shape,
self.fetch_info.shape(),
self.dbg_path
);
warn!("{}", msg);
@@ -269,7 +269,7 @@ impl EventChunker {
self.range.end / SEC,
self.range.end % SEC,
pulse,
self.channel_config.shape,
self.fetch_info.shape(),
self.dbg_path
);
warn!("{}", msg);
@@ -300,9 +300,9 @@ impl EventChunker {
let is_array = type_flags & ARRAY != 0;
let is_big_endian = type_flags & BIG_ENDIAN != 0;
let is_shaped = type_flags & SHAPE != 0;
if let Shape::Wave(_) = self.channel_config.shape {
if let Shape::Wave(_) = self.fetch_info.shape() {
if !is_array {
Err(Error::with_msg(format!("dim1 but not array {:?}", self.channel_config)))?;
Err(Error::with_msg(format!("dim1 but not array {:?}", self.fetch_info)))?;
}
}
let compression_method = if is_compressed { sl.read_u8().unwrap() } else { 0 };
@@ -342,7 +342,7 @@ impl EventChunker {
let value_bytes = sl.read_u64::<BE>().unwrap();
let block_size = sl.read_u32::<BE>().unwrap();
//debug!("event len {} ts {} is_compressed {} shape_dim {} len-dim-0 {} value_bytes {} block_size {}", len, ts, is_compressed, shape_dim, shape_lens[0], value_bytes, block_size);
match self.channel_config.shape {
match self.fetch_info.shape() {
Shape::Scalar => {
assert!(value_bytes < 1024 * 1);
}
@@ -357,19 +357,19 @@ impl EventChunker {
let type_size = scalar_type.bytes() as u32;
let ele_count = value_bytes / type_size as u64;
let ele_size = type_size;
let config_matches = match self.channel_config.shape {
let config_matches = match self.fetch_info.shape() {
Shape::Scalar => {
if is_array {
if false {
error!(
"channel config mismatch {:?} {:?} {:?} {:?}",
self.channel_config, is_array, ele_count, self.dbg_path,
self.fetch_info, is_array, ele_count, self.dbg_path,
);
}
if false {
return Err(Error::with_msg(format!(
"ChannelConfig expects {:?} but we find event is_array",
self.channel_config,
self.fetch_info,
)));
}
false
@@ -378,17 +378,17 @@ impl EventChunker {
}
}
Shape::Wave(dim1count) => {
if dim1count != ele_count as u32 {
if *dim1count != ele_count as u32 {
if false {
error!(
"channel config mismatch {:?} {:?} {:?} {:?}",
self.channel_config, is_array, ele_count, self.dbg_path,
self.fetch_info, is_array, ele_count, self.dbg_path,
);
}
if false {
return Err(Error::with_msg(format!(
"ChannelConfig expects {:?} but event has ele_count {}",
self.channel_config, ele_count,
self.fetch_info, ele_count,
)));
}
false
@@ -397,18 +397,18 @@ impl EventChunker {
}
}
Shape::Image(n1, n2) => {
let nt = n1 as usize * n2 as usize;
let nt = (*n1 as usize) * (*n2 as usize);
if nt != ele_count as usize {
if false {
error!(
"channel config mismatch {:?} {:?} {:?} {:?}",
self.channel_config, is_array, ele_count, self.dbg_path,
self.fetch_info, is_array, ele_count, self.dbg_path,
);
}
if false {
return Err(Error::with_msg(format!(
"ChannelConfig expects {:?} but event has ele_count {}",
self.channel_config, ele_count,
self.fetch_info, ele_count,
)));
}
false
@@ -552,7 +552,7 @@ impl Stream for EventChunker {
// TODO gather stats about this:
self.inp.put_back(fcr);
}
match self.channel_config.shape {
match self.fetch_info.shape() {
Shape::Scalar => {
if self.need_min > 1024 * 8 {
let msg =

View File

@@ -6,6 +6,7 @@ use items_0::streamitem::Sitemty;
use items_2::eventfull::EventFull;
use items_2::merger::Merger;
use netpod::log::*;
use netpod::ChannelTypeConfigGen;
use netpod::Cluster;
use netpod::PerfOpts;
use query::api4::events::PlainEventsQuery;
@@ -27,11 +28,12 @@ pub struct MergedBlobsFromRemotes {
}
impl MergedBlobsFromRemotes {
pub fn new(evq: PlainEventsQuery, perf_opts: PerfOpts, cluster: Cluster) -> Self {
pub fn new(evq: PlainEventsQuery, perf_opts: PerfOpts, ch_conf: ChannelTypeConfigGen, cluster: Cluster) -> Self {
debug!("MergedBlobsFromRemotes evq {:?}", evq);
let mut tcp_establish_futs = Vec::new();
for node in &cluster.nodes {
let f = x_processed_event_blobs_stream_from_node(evq.clone(), perf_opts.clone(), node.clone());
let f =
x_processed_event_blobs_stream_from_node(evq.clone(), ch_conf.clone(), perf_opts.clone(), node.clone());
let f: T002<EventFull> = Box::pin(f);
tcp_establish_futs.push(f);
}

View File

@@ -3,6 +3,7 @@ use err::Error;
use futures_util::StreamExt;
use netpod::timeunits::MS;
use netpod::Node;
use netpod::SfChFetchInfo;
use netpod::TsNano;
use std::path::PathBuf;
@@ -30,13 +31,17 @@ Return potential datafile paths for the given timebin.
It says "potential datafile paths" because we don't open the file here yet and of course,
files may vanish until then. Also, the timebin may actually not exist.
*/
pub async fn datapaths_for_timebin(timebin: u64, config: &SfDbChConf, node: &Node) -> Result<Vec<PathBuf>, Error> {
pub async fn datapaths_for_timebin(
timebin: u64,
fetch_info: &SfChFetchInfo,
node: &Node,
) -> Result<Vec<PathBuf>, Error> {
let sfc = node.sf_databuffer.as_ref().unwrap();
let timebin_path = sfc
.data_base_path
.join(format!("{}_{}", sfc.ksprefix, config.keyspace))
.join(format!("{}_{}", sfc.ksprefix, fetch_info.ks()))
.join("byTime")
.join(config.channel.name())
.join(fetch_info.name())
.join(format!("{:019}", timebin));
let rd = tokio::fs::read_dir(timebin_path).await?;
let mut rd = tokio_stream::wrappers::ReadDirStream::new(rd);
@@ -69,43 +74,43 @@ pub async fn datapaths_for_timebin(timebin: u64, config: &SfDbChConf, node: &Nod
for split in splits {
let path = sfc
.data_base_path
.join(format!("{}_{}", sfc.ksprefix, config.keyspace))
.join(format!("{}_{}", sfc.ksprefix, fetch_info.ks()))
.join("byTime")
.join(config.channel.name())
.join(fetch_info.name())
.join(format!("{:019}", timebin))
.join(format!("{:010}", split))
.join(format!("{:019}_00000_Data", config.time_bin_size.ns() / MS));
.join(format!("{:019}_00000_Data", fetch_info.bs().ns() / MS));
ret.push(path);
}
Ok(ret)
}
pub fn channel_timebins_dir_path(channel_config: &SfDbChConf, node: &Node) -> Result<PathBuf, Error> {
pub fn channel_timebins_dir_path(fetch_info: &SfChFetchInfo, node: &Node) -> Result<PathBuf, Error> {
let sfc = node.sf_databuffer.as_ref().unwrap();
let ret = sfc
.data_base_path
.join(format!("{}_{}", sfc.ksprefix, channel_config.keyspace))
.join(format!("{}_{}", sfc.ksprefix, fetch_info.ks()))
.join("byTime")
.join(channel_config.channel.name());
.join(fetch_info.name());
Ok(ret)
}
pub fn data_dir_path(ts: TsNano, channel_config: &SfDbChConf, split: u32, node: &Node) -> Result<PathBuf, Error> {
let ret = channel_timebins_dir_path(channel_config, node)?
.join(format!("{:019}", ts.ns() / channel_config.time_bin_size.ns()))
pub fn data_dir_path(ts: TsNano, fetch_info: &SfChFetchInfo, split: u32, node: &Node) -> Result<PathBuf, Error> {
let ret = channel_timebins_dir_path(fetch_info, node)?
.join(format!("{:019}", ts.ns() / fetch_info.bs().ns()))
.join(format!("{:010}", split));
Ok(ret)
}
pub fn data_path(ts: TsNano, channel_config: &SfDbChConf, split: u32, node: &Node) -> Result<PathBuf, Error> {
let fname = format!("{:019}_{:05}_Data", channel_config.time_bin_size.ns() / MS, 0);
let ret = data_dir_path(ts, channel_config, split, node)?.join(fname);
pub fn data_path(ts: TsNano, fetch_info: &SfChFetchInfo, split: u32, node: &Node) -> Result<PathBuf, Error> {
let fname = format!("{:019}_{:05}_Data", fetch_info.bs().ns() / MS, 0);
let ret = data_dir_path(ts, fetch_info, split, node)?.join(fname);
Ok(ret)
}
pub fn index_path(ts: TsNano, channel_config: &SfDbChConf, split: u32, node: &Node) -> Result<PathBuf, Error> {
let fname = format!("{:019}_{:05}_Data_Index", channel_config.time_bin_size.ns() / MS, 0);
let ret = data_dir_path(ts, channel_config, split, node)?.join(fname);
pub fn index_path(ts: TsNano, fetch_info: &SfChFetchInfo, split: u32, node: &Node) -> Result<PathBuf, Error> {
let fname = format!("{:019}_{:05}_Data_Index", fetch_info.bs().ns() / MS, 0);
let ret = data_dir_path(ts, fetch_info, split, node)?.join(fname);
Ok(ret)
}

View File

@@ -1,9 +1,7 @@
use crate::channelconfig::config_entry_best_match;
use crate::eventblobs::EventChunkerMultifile;
use crate::eventchunker::EventChunkerConf;
use crate::raw::generated::EventBlobsGeneratorI32Test00;
use crate::raw::generated::EventBlobsGeneratorI32Test01;
use crate::SfDbChConf;
use err::Error;
use futures_util::stream;
use futures_util::Stream;
@@ -17,23 +15,21 @@ use netpod::log::*;
use netpod::range::evrange::NanoRange;
use netpod::AggKind;
use netpod::ByteSize;
use netpod::ChConf;
use netpod::DiskIoTune;
use netpod::NodeConfigCached;
use netpod::SfDbChannel;
use parse::channelconfig::ConfigEntry;
use netpod::SfChFetchInfo;
use query::api4::events::PlainEventsQuery;
use std::pin::Pin;
const TEST_BACKEND: &str = "testbackend-00";
fn make_num_pipeline_stream_evs(
chconf: ChConf,
fetch_info: SfChFetchInfo,
agg_kind: AggKind,
event_blobs: EventChunkerMultifile,
) -> Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>> {
let scalar_type = chconf.scalar_type.clone();
let shape = chconf.shape.clone();
let scalar_type = fetch_info.scalar_type().clone();
let shape = fetch_info.shape().clone();
let event_stream = match crate::decode::EventsDynStream::new(scalar_type, shape, agg_kind, event_blobs) {
Ok(k) => k,
Err(e) => {
@@ -58,30 +54,11 @@ fn make_num_pipeline_stream_evs(
pub async fn make_event_pipe(
evq: &PlainEventsQuery,
chconf: ChConf,
fetch_info: SfChFetchInfo,
ncc: &NodeConfigCached,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>, Error> {
// sf-databuffer type backends identify channels by their (backend, name) only.
let channel = evq.channel().clone();
let range = evq.range().clone();
let x = crate::channelconfig::channel_config_best_match(evq.range().try_into()?, channel, ncc).await;
let channel_config = match x {
Ok(Some(x)) => x,
Ok(None) => {
error!("make_event_pipe can not find config");
return Err(Error::with_msg_no_trace("make_event_pipe can not find config"));
}
Err(e) => {
error!("make_event_pipe can not find config");
if e.msg().contains("ErrorKind::NotFound") {
warn!("{e}");
let s = futures_util::stream::empty();
return Ok(Box::pin(s));
} else {
return Err(e);
}
}
};
info!(
"make_event_pipe need_expand {need_expand} {evq:?}",
need_expand = evq.one_before_range()
@@ -96,7 +73,7 @@ pub async fn make_event_pipe(
};
let event_blobs = EventChunkerMultifile::new(
(&range).try_into()?,
channel_config.clone(),
fetch_info.clone(),
ncc.node.clone(),
ncc.ix,
DiskIoTune::default(),
@@ -106,38 +83,25 @@ pub async fn make_event_pipe(
out_max_len,
);
error!("TODO replace AggKind in the called code");
let pipe = make_num_pipeline_stream_evs(chconf, AggKind::TimeWeightedScalar, event_blobs);
let pipe = make_num_pipeline_stream_evs(fetch_info, AggKind::TimeWeightedScalar, event_blobs);
Ok(pipe)
}
pub fn make_local_event_blobs_stream(
range: NanoRange,
channel: SfDbChannel,
entry: &ConfigEntry,
fetch_info: &SfChFetchInfo,
expand: bool,
do_decompress: bool,
event_chunker_conf: EventChunkerConf,
disk_io_tune: DiskIoTune,
node_config: &NodeConfigCached,
) -> Result<EventChunkerMultifile, Error> {
info!("make_local_event_blobs_stream do_decompress {do_decompress} disk_io_tune {disk_io_tune:?}");
info!(
"make_local_event_blobs_stream {fetch_info:?} do_decompress {do_decompress} disk_io_tune {disk_io_tune:?}"
);
if do_decompress {
warn!("Possible issue: decompress central storage event blob stream");
}
let shape = match entry.to_shape() {
Ok(k) => k,
Err(e) => return Err(e)?,
};
let channel_config = SfDbChConf {
channel,
keyspace: entry.ks as u8,
time_bin_size: entry.bs.clone(),
shape,
scalar_type: entry.scalar_type.clone(),
byte_order: entry.byte_order.clone(),
array: entry.is_array,
compression: entry.is_compressed,
};
// TODO should not need this for correctness.
// Should limit based on return size and latency.
let out_max_len = if node_config.node_config.cluster.is_central_storage {
@@ -147,7 +111,7 @@ pub fn make_local_event_blobs_stream(
};
let event_blobs = EventChunkerMultifile::new(
range,
channel_config.clone(),
fetch_info.clone(),
node_config.node.clone(),
node_config.ix,
disk_io_tune,
@@ -161,8 +125,7 @@ pub fn make_local_event_blobs_stream(
pub fn make_remote_event_blobs_stream(
range: NanoRange,
channel: SfDbChannel,
entry: &ConfigEntry,
fetch_info: &SfChFetchInfo,
expand: bool,
do_decompress: bool,
event_chunker_conf: EventChunkerConf,
@@ -170,20 +133,6 @@ pub fn make_remote_event_blobs_stream(
node_config: &NodeConfigCached,
) -> Result<impl Stream<Item = Sitemty<EventFull>>, Error> {
debug!("make_remote_event_blobs_stream");
let shape = match entry.to_shape() {
Ok(k) => k,
Err(e) => return Err(e)?,
};
let channel_config = SfDbChConf {
channel,
keyspace: entry.ks as u8,
time_bin_size: entry.bs.clone(),
shape: shape,
scalar_type: entry.scalar_type.clone(),
byte_order: entry.byte_order.clone(),
array: entry.is_array,
compression: entry.is_compressed,
};
// TODO should not need this for correctness.
// Should limit based on return size and latency.
let out_max_len = if node_config.node_config.cluster.is_central_storage {
@@ -193,7 +142,7 @@ pub fn make_remote_event_blobs_stream(
};
let event_blobs = EventChunkerMultifile::new(
range,
channel_config.clone(),
fetch_info.clone(),
node_config.node.clone(),
node_config.ix,
disk_io_tune,
@@ -207,6 +156,7 @@ pub fn make_remote_event_blobs_stream(
pub async fn make_event_blobs_pipe_real(
evq: &PlainEventsQuery,
fetch_info: &SfChFetchInfo,
node_config: &NodeConfigCached,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>, Error> {
if false {
@@ -217,34 +167,13 @@ pub async fn make_event_blobs_pipe_real(
}
let expand = evq.one_before_range();
let range = evq.range();
let entry = match config_entry_best_match(&evq.range().try_into()?, evq.channel().clone(), node_config).await {
Ok(Some(x)) => x,
Ok(None) => {
let e = Error::with_msg_no_trace("no config entry found");
error!("{e}");
return Err(e);
}
Err(e) => {
if e.to_public_error().msg().contains("no config entry found") {
let item = items_0::streamitem::LogItem {
node_ix: node_config.ix as _,
level: Level::WARN,
msg: format!("{} {}", node_config.node.host, e),
};
return Ok(Box::pin(stream::iter([Ok(StreamItem::Log(item))])));
} else {
return Err(e);
}
}
};
let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024));
// TODO should depend on host config
let do_local = node_config.node_config.cluster.is_central_storage;
let pipe = if do_local {
let event_blobs = make_local_event_blobs_stream(
range.try_into()?,
evq.channel().clone(),
&entry,
fetch_info,
expand,
false,
event_chunker_conf,
@@ -255,8 +184,7 @@ pub async fn make_event_blobs_pipe_real(
} else {
let event_blobs = make_remote_event_blobs_stream(
range.try_into()?,
evq.channel().clone(),
&entry,
fetch_info,
expand,
true,
event_chunker_conf,
@@ -320,12 +248,13 @@ pub async fn make_event_blobs_pipe_test(
pub async fn make_event_blobs_pipe(
evq: &PlainEventsQuery,
fetch_info: &SfChFetchInfo,
node_config: &NodeConfigCached,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>, Error> {
debug!("make_event_blobs_pipe {evq:?}");
if evq.channel().backend() == TEST_BACKEND {
make_event_blobs_pipe_test(evq, node_config).await
} else {
make_event_blobs_pipe_real(evq, node_config).await
make_event_blobs_pipe_real(evq, fetch_info, node_config).await
}
}