diff --git a/disk/src/aggtest.rs b/disk/src/aggtest.rs index 35bff8e..dd359ce 100644 --- a/disk/src/aggtest.rs +++ b/disk/src/aggtest.rs @@ -11,6 +11,7 @@ use netpod::ByteSize; use netpod::DiskIoTune; use netpod::Node; use netpod::ScalarType; +use netpod::SfChFetchInfo; use netpod::SfDatabuffer; use netpod::SfDbChannel; use netpod::Shape; @@ -61,6 +62,15 @@ async fn agg_x_dim_0_inner() { tb_file_count: 1, buffer_size: 1024 * 4, }; + let fetch_info = SfChFetchInfo::new( + "sf-databuffer", + "S10BC01-DBAM070:EOM1_T1", + 2, + TsNano(DAY), + ByteOrder::Big, + ScalarType::F64, + Shape::Scalar, + ); let _bin_count = 20; let ts1 = query.timebin as u64 * query.channel_config.time_bin_size.0; let ts2 = ts1 + HOUR * 24; @@ -71,7 +81,7 @@ async fn agg_x_dim_0_inner() { disk_io_tune.read_buffer_len = query.buffer_size as usize; let fut1 = EventChunkerMultifile::new( range.clone(), - query.channel_config.clone(), + fetch_info, node.clone(), 0, disk_io_tune, @@ -114,6 +124,15 @@ async fn agg_x_dim_1_inner() { tb_file_count: 1, buffer_size: 17, }; + let fetch_info = SfChFetchInfo::new( + "ks", + "wave1", + 2, + TsNano(DAY), + ByteOrder::Big, + ScalarType::F64, + Shape::Scalar, + ); let _bin_count = 10; let ts1 = query.timebin as u64 * query.channel_config.time_bin_size.0; let ts2 = ts1 + HOUR * 24; @@ -124,7 +143,7 @@ async fn agg_x_dim_1_inner() { disk_io_tune.read_buffer_len = query.buffer_size as usize; let fut1 = super::eventblobs::EventChunkerMultifile::new( range.clone(), - query.channel_config.clone(), + fetch_info, node.clone(), 0, disk_io_tune, diff --git a/disk/src/dataopen.rs b/disk/src/dataopen.rs index fba34b7..778762c 100644 --- a/disk/src/dataopen.rs +++ b/disk/src/dataopen.rs @@ -7,6 +7,7 @@ use futures_util::StreamExt; use netpod::log::*; use netpod::range::evrange::NanoRange; use netpod::Node; +use netpod::SfChFetchInfo; use netpod::TsNano; use std::fmt; use std::path::PathBuf; @@ -209,18 +210,18 @@ impl fmt::Debug for OpenedFile { pub fn open_files( range: &NanoRange, - channel_config: &SfDbChConf, + fetch_info: &SfChFetchInfo, node: Node, ) -> async_channel::Receiver> { let (chtx, chrx) = async_channel::bounded(2); let range = range.clone(); - let channel_config = channel_config.clone(); + let fetch_info = fetch_info.clone(); tokio::spawn(async move { - match open_files_inner(&chtx, &range, &channel_config, node).await { + match open_files_inner(&chtx, &range, &fetch_info, node).await { Ok(_) => {} Err(e) => { let e = e.add_public_msg(format!( - "Can not open file for channel: {channel_config:?} range: {range:?}" + "Can not open file for channel: {fetch_info:?} range: {range:?}" )); match chtx.send(Err(e.into())).await { Ok(_) => {} @@ -238,24 +239,24 @@ pub fn open_files( async fn open_files_inner( chtx: &async_channel::Sender>, range: &NanoRange, - channel_config: &SfDbChConf, + fetch_info: &SfChFetchInfo, node: Node, ) -> Result<(), Error> { - let channel_config = channel_config.clone(); - let timebins = get_timebins(&channel_config, node.clone()).await?; + let fetch_info = fetch_info.clone(); + let timebins = get_timebins(&fetch_info, node.clone()).await?; if timebins.len() == 0 { return Ok(()); } for &tb in &timebins { - let ts_bin = TsNano(tb * channel_config.time_bin_size.0); + let ts_bin = TsNano(tb * fetch_info.bs().ns()); if ts_bin.ns() >= range.end { continue; } - if ts_bin.ns() + channel_config.time_bin_size.ns() <= range.beg { + if ts_bin.ns() + fetch_info.bs().ns() <= range.beg { continue; } let mut a = Vec::new(); - for path in paths::datapaths_for_timebin(tb, &channel_config, &node).await? { + for path in paths::datapaths_for_timebin(tb, &fetch_info, &node).await? { let w = position_file(&path, range, false, false).await?; if w.found { a.push(w.file); @@ -278,14 +279,14 @@ Expanded to one event before and after the requested range, if exists. */ pub fn open_expanded_files( range: &NanoRange, - channel_config: &SfDbChConf, + fetch_info: &SfChFetchInfo, node: Node, ) -> async_channel::Receiver> { let (chtx, chrx) = async_channel::bounded(2); let range = range.clone(); - let channel_config = channel_config.clone(); + let fetch_info = fetch_info.clone(); tokio::spawn(async move { - match open_expanded_files_inner(&chtx, &range, &channel_config, node).await { + match open_expanded_files_inner(&chtx, &range, &fetch_info, node).await { Ok(_) => {} Err(e) => match chtx.send(Err(e.into())).await { Ok(_) => {} @@ -299,9 +300,9 @@ pub fn open_expanded_files( chrx } -async fn get_timebins(channel_config: &SfDbChConf, node: Node) -> Result, Error> { +async fn get_timebins(fetch_info: &SfChFetchInfo, node: Node) -> Result, Error> { let mut timebins = Vec::new(); - let p0 = paths::channel_timebins_dir_path(&channel_config, &node)?; + let p0 = paths::channel_timebins_dir_path(&fetch_info, &node)?; match tokio::fs::read_dir(&p0).await { Ok(rd) => { let mut rd = tokio_stream::wrappers::ReadDirStream::new(rd); @@ -323,10 +324,7 @@ async fn get_timebins(channel_config: &SfDbChConf, node: Node) -> Result { - debug!( - "get_timebins no timebins for {:?} {:?} p0 {:?}", - channel_config, e, p0 - ); + debug!("get_timebins no timebins for {:?} {:?} p0 {:?}", fetch_info, e, p0); Ok(Vec::new()) } } @@ -335,17 +333,17 @@ async fn get_timebins(channel_config: &SfDbChConf, node: Node) -> Result>, range: &NanoRange, - channel_config: &SfDbChConf, + fetch_info: &SfChFetchInfo, node: Node, ) -> Result<(), Error> { - let channel_config = channel_config.clone(); - let timebins = get_timebins(&channel_config, node.clone()).await?; + let fetch_info = fetch_info.clone(); + let timebins = get_timebins(&fetch_info, node.clone()).await?; if timebins.len() == 0 { return Ok(()); } let mut p1 = None; for (i1, tb) in timebins.iter().enumerate().rev() { - let ts_bin = TsNano(tb * channel_config.time_bin_size.ns()); + let ts_bin = TsNano(tb * fetch_info.bs().ns()); if ts_bin.ns() <= range.beg { p1 = Some(i1); break; @@ -354,15 +352,15 @@ async fn open_expanded_files_inner( let mut p1 = if let Some(i1) = p1 { i1 } else { 0 }; if p1 >= timebins.len() { return Err(Error::with_msg(format!( - "logic error p1 {} range {:?} channel_config {:?}", - p1, range, channel_config + "logic error p1 {} range {:?} fetch_info {:?}", + p1, range, fetch_info ))); } let mut found_pre = false; loop { let tb = timebins[p1]; let mut a = Vec::new(); - for path in paths::datapaths_for_timebin(tb, &channel_config, &node).await? { + for path in paths::datapaths_for_timebin(tb, &fetch_info, &node).await? { let w = position_file(&path, range, true, false).await?; if w.found { debug!("----- open_expanded_files_inner w.found for {:?}", path); @@ -390,7 +388,7 @@ async fn open_expanded_files_inner( while p1 < timebins.len() { let tb = timebins[p1]; let mut a = Vec::new(); - for path in paths::datapaths_for_timebin(tb, &channel_config, &node).await? { + for path in paths::datapaths_for_timebin(tb, &fetch_info, &node).await? { let w = position_file(&path, range, false, true).await?; if w.found { a.push(w.file); @@ -404,7 +402,7 @@ async fn open_expanded_files_inner( // TODO emit statsfor this or log somewhere? debug!("Could not find some event before the requested range, fall back to standard file list."); // Try to locate files according to non-expand-algorithm. - open_files_inner(chtx, range, &channel_config, node).await?; + open_files_inner(chtx, range, &fetch_info, node).await?; } Ok(()) } @@ -823,20 +821,21 @@ mod test { }; let chn = netpod::SfDbChannel::from_name(BACKEND, "scalar-i32-be"); // TODO read config from disk? Or expose the config from data generator? - let channel_config = SfDbChConf { - channel: chn, - keyspace: 2, - time_bin_size: TsNano(DAY), - scalar_type: netpod::ScalarType::I32, - byte_order: netpod::ByteOrder::Big, - shape: netpod::Shape::Scalar, - array: false, - compression: false, - }; + let fetch_info = todo!(); + // let fetch_info = SfChFetchInfo { + // channel: chn, + // keyspace: 2, + // time_bin_size: TsNano(DAY), + // scalar_type: netpod::ScalarType::I32, + // byte_order: netpod::ByteOrder::Big, + // shape: netpod::Shape::Scalar, + // array: false, + // compression: false, + // }; let cluster = netpod::test_cluster(); let task = async move { let mut paths = Vec::new(); - let mut files = open_expanded_files(&range, &channel_config, cluster.nodes[0].clone()); + let mut files = open_expanded_files(&range, &fetch_info, cluster.nodes[0].clone()); while let Some(file) = files.next().await { match file { Ok(k) => { diff --git a/disk/src/eventblobs.rs b/disk/src/eventblobs.rs index dada951..4b6ed4f 100644 --- a/disk/src/eventblobs.rs +++ b/disk/src/eventblobs.rs @@ -3,7 +3,6 @@ use crate::dataopen::open_files; use crate::dataopen::OpenedFileSet; use crate::eventchunker::EventChunker; use crate::eventchunker::EventChunkerConf; -use crate::SfDbChConf; use err::Error; use futures_util::Stream; use futures_util::StreamExt; @@ -19,6 +18,7 @@ use netpod::range::evrange::NanoRange; use netpod::timeunits::SEC; use netpod::DiskIoTune; use netpod::Node; +use netpod::SfChFetchInfo; use std::collections::VecDeque; use std::pin::Pin; use std::task::Context; @@ -30,7 +30,7 @@ pub trait InputTraits: Stream> {} impl InputTraits for T where T: Stream> {} pub struct EventChunkerMultifile { - channel_config: SfDbChConf, + fetch_info: SfChFetchInfo, file_chan: async_channel::Receiver>, evs: Option>>, disk_io_tune: DiskIoTune, @@ -58,7 +58,7 @@ impl EventChunkerMultifile { pub fn new( range: NanoRange, - channel_config: SfDbChConf, + fetch_info: SfChFetchInfo, node: Node, node_ix: usize, disk_io_tune: DiskIoTune, @@ -69,16 +69,16 @@ impl EventChunkerMultifile { ) -> Self { info!("EventChunkerMultifile expand {expand} do_decompress {do_decompress}"); let file_chan = if expand { - open_expanded_files(&range, &channel_config, node) + open_expanded_files(&range, &fetch_info, node) } else { - open_files(&range, &channel_config, node) + open_files(&range, &fetch_info, node) }; Self { file_chan, evs: None, disk_io_tune, event_chunker_conf, - channel_config, + fetch_info, range, files_count: 0, node_ix, @@ -196,7 +196,7 @@ impl Stream for EventChunkerMultifile { )); let chunker = EventChunker::from_event_boundary( inp, - self.channel_config.clone(), + self.fetch_info.clone(), self.range.clone(), self.event_chunker_conf.clone(), path.clone(), @@ -231,7 +231,7 @@ impl Stream for EventChunkerMultifile { ); let chunker = EventChunker::from_event_boundary( inp, - self.channel_config.clone(), + self.fetch_info.clone(), self.range.clone(), self.event_chunker_conf.clone(), of.path.clone(), diff --git a/disk/src/eventchunker.rs b/disk/src/eventchunker.rs index f8a0c6d..1c22e80 100644 --- a/disk/src/eventchunker.rs +++ b/disk/src/eventchunker.rs @@ -1,4 +1,3 @@ -use crate::SfDbChConf; use bitshuffle::bitshuffle_decompress; use bytes::Buf; use bytes::BytesMut; @@ -18,6 +17,7 @@ use netpod::timeunits::SEC; use netpod::ByteSize; use netpod::EventDataReadStats; use netpod::ScalarType; +use netpod::SfChFetchInfo; use netpod::Shape; use parse::channelconfig::CompressionMethod; use std::path::PathBuf; @@ -33,7 +33,7 @@ pub struct EventChunker { inp: NeedMinBuffer, state: DataFileState, need_min: u32, - channel_config: SfDbChConf, + fetch_info: SfChFetchInfo, errored: bool, completed: bool, range: NanoRange, @@ -94,7 +94,7 @@ impl EventChunker { // TODO `expand` flag usage pub fn from_start( inp: Pin> + Send>>, - channel_config: SfDbChConf, + fetch_info: SfChFetchInfo, range: NanoRange, stats_conf: EventChunkerConf, dbg_path: PathBuf, @@ -108,7 +108,7 @@ impl EventChunker { inp, state: DataFileState::FileHeader, need_min: 6, - channel_config, + fetch_info, errored: false, completed: false, range, @@ -135,7 +135,7 @@ impl EventChunker { // TODO `expand` flag usage pub fn from_event_boundary( inp: Pin> + Send>>, - channel_config: SfDbChConf, + fetch_info: SfChFetchInfo, range: NanoRange, stats_conf: EventChunkerConf, dbg_path: PathBuf, @@ -146,7 +146,7 @@ impl EventChunker { "EventChunker::{} do_decompress {}", "from_event_boundary", do_decompress ); - let mut ret = Self::from_start(inp, channel_config, range, stats_conf, dbg_path, expand, do_decompress); + let mut ret = Self::from_start(inp, fetch_info, range, stats_conf, dbg_path, expand, do_decompress); ret.state = DataFileState::Event; ret.need_min = 4; ret.inp.set_need_min(4); @@ -223,7 +223,7 @@ impl EventChunker { ts % SEC, self.max_ts / SEC, self.max_ts % SEC, - self.channel_config.shape, + self.fetch_info.shape(), self.dbg_path ); warn!("{}", msg); @@ -239,7 +239,7 @@ impl EventChunker { ts % SEC, self.max_ts / SEC, self.max_ts % SEC, - self.channel_config.shape, + self.fetch_info.shape(), self.dbg_path ); warn!("{}", msg); @@ -269,7 +269,7 @@ impl EventChunker { self.range.end / SEC, self.range.end % SEC, pulse, - self.channel_config.shape, + self.fetch_info.shape(), self.dbg_path ); warn!("{}", msg); @@ -300,9 +300,9 @@ impl EventChunker { let is_array = type_flags & ARRAY != 0; let is_big_endian = type_flags & BIG_ENDIAN != 0; let is_shaped = type_flags & SHAPE != 0; - if let Shape::Wave(_) = self.channel_config.shape { + if let Shape::Wave(_) = self.fetch_info.shape() { if !is_array { - Err(Error::with_msg(format!("dim1 but not array {:?}", self.channel_config)))?; + Err(Error::with_msg(format!("dim1 but not array {:?}", self.fetch_info)))?; } } let compression_method = if is_compressed { sl.read_u8().unwrap() } else { 0 }; @@ -342,7 +342,7 @@ impl EventChunker { let value_bytes = sl.read_u64::().unwrap(); let block_size = sl.read_u32::().unwrap(); //debug!("event len {} ts {} is_compressed {} shape_dim {} len-dim-0 {} value_bytes {} block_size {}", len, ts, is_compressed, shape_dim, shape_lens[0], value_bytes, block_size); - match self.channel_config.shape { + match self.fetch_info.shape() { Shape::Scalar => { assert!(value_bytes < 1024 * 1); } @@ -357,19 +357,19 @@ impl EventChunker { let type_size = scalar_type.bytes() as u32; let ele_count = value_bytes / type_size as u64; let ele_size = type_size; - let config_matches = match self.channel_config.shape { + let config_matches = match self.fetch_info.shape() { Shape::Scalar => { if is_array { if false { error!( "channel config mismatch {:?} {:?} {:?} {:?}", - self.channel_config, is_array, ele_count, self.dbg_path, + self.fetch_info, is_array, ele_count, self.dbg_path, ); } if false { return Err(Error::with_msg(format!( "ChannelConfig expects {:?} but we find event is_array", - self.channel_config, + self.fetch_info, ))); } false @@ -378,17 +378,17 @@ impl EventChunker { } } Shape::Wave(dim1count) => { - if dim1count != ele_count as u32 { + if *dim1count != ele_count as u32 { if false { error!( "channel config mismatch {:?} {:?} {:?} {:?}", - self.channel_config, is_array, ele_count, self.dbg_path, + self.fetch_info, is_array, ele_count, self.dbg_path, ); } if false { return Err(Error::with_msg(format!( "ChannelConfig expects {:?} but event has ele_count {}", - self.channel_config, ele_count, + self.fetch_info, ele_count, ))); } false @@ -397,18 +397,18 @@ impl EventChunker { } } Shape::Image(n1, n2) => { - let nt = n1 as usize * n2 as usize; + let nt = (*n1 as usize) * (*n2 as usize); if nt != ele_count as usize { if false { error!( "channel config mismatch {:?} {:?} {:?} {:?}", - self.channel_config, is_array, ele_count, self.dbg_path, + self.fetch_info, is_array, ele_count, self.dbg_path, ); } if false { return Err(Error::with_msg(format!( "ChannelConfig expects {:?} but event has ele_count {}", - self.channel_config, ele_count, + self.fetch_info, ele_count, ))); } false @@ -552,7 +552,7 @@ impl Stream for EventChunker { // TODO gather stats about this: self.inp.put_back(fcr); } - match self.channel_config.shape { + match self.fetch_info.shape() { Shape::Scalar => { if self.need_min > 1024 * 8 { let msg = diff --git a/disk/src/merge/mergedblobsfromremotes.rs b/disk/src/merge/mergedblobsfromremotes.rs index 3d0ddd9..5355b2f 100644 --- a/disk/src/merge/mergedblobsfromremotes.rs +++ b/disk/src/merge/mergedblobsfromremotes.rs @@ -6,6 +6,7 @@ use items_0::streamitem::Sitemty; use items_2::eventfull::EventFull; use items_2::merger::Merger; use netpod::log::*; +use netpod::ChannelTypeConfigGen; use netpod::Cluster; use netpod::PerfOpts; use query::api4::events::PlainEventsQuery; @@ -27,11 +28,12 @@ pub struct MergedBlobsFromRemotes { } impl MergedBlobsFromRemotes { - pub fn new(evq: PlainEventsQuery, perf_opts: PerfOpts, cluster: Cluster) -> Self { + pub fn new(evq: PlainEventsQuery, perf_opts: PerfOpts, ch_conf: ChannelTypeConfigGen, cluster: Cluster) -> Self { debug!("MergedBlobsFromRemotes evq {:?}", evq); let mut tcp_establish_futs = Vec::new(); for node in &cluster.nodes { - let f = x_processed_event_blobs_stream_from_node(evq.clone(), perf_opts.clone(), node.clone()); + let f = + x_processed_event_blobs_stream_from_node(evq.clone(), ch_conf.clone(), perf_opts.clone(), node.clone()); let f: T002 = Box::pin(f); tcp_establish_futs.push(f); } diff --git a/disk/src/paths.rs b/disk/src/paths.rs index d290dad..8c18901 100644 --- a/disk/src/paths.rs +++ b/disk/src/paths.rs @@ -3,6 +3,7 @@ use err::Error; use futures_util::StreamExt; use netpod::timeunits::MS; use netpod::Node; +use netpod::SfChFetchInfo; use netpod::TsNano; use std::path::PathBuf; @@ -30,13 +31,17 @@ Return potential datafile paths for the given timebin. It says "potential datafile paths" because we don't open the file here yet and of course, files may vanish until then. Also, the timebin may actually not exist. */ -pub async fn datapaths_for_timebin(timebin: u64, config: &SfDbChConf, node: &Node) -> Result, Error> { +pub async fn datapaths_for_timebin( + timebin: u64, + fetch_info: &SfChFetchInfo, + node: &Node, +) -> Result, Error> { let sfc = node.sf_databuffer.as_ref().unwrap(); let timebin_path = sfc .data_base_path - .join(format!("{}_{}", sfc.ksprefix, config.keyspace)) + .join(format!("{}_{}", sfc.ksprefix, fetch_info.ks())) .join("byTime") - .join(config.channel.name()) + .join(fetch_info.name()) .join(format!("{:019}", timebin)); let rd = tokio::fs::read_dir(timebin_path).await?; let mut rd = tokio_stream::wrappers::ReadDirStream::new(rd); @@ -69,43 +74,43 @@ pub async fn datapaths_for_timebin(timebin: u64, config: &SfDbChConf, node: &Nod for split in splits { let path = sfc .data_base_path - .join(format!("{}_{}", sfc.ksprefix, config.keyspace)) + .join(format!("{}_{}", sfc.ksprefix, fetch_info.ks())) .join("byTime") - .join(config.channel.name()) + .join(fetch_info.name()) .join(format!("{:019}", timebin)) .join(format!("{:010}", split)) - .join(format!("{:019}_00000_Data", config.time_bin_size.ns() / MS)); + .join(format!("{:019}_00000_Data", fetch_info.bs().ns() / MS)); ret.push(path); } Ok(ret) } -pub fn channel_timebins_dir_path(channel_config: &SfDbChConf, node: &Node) -> Result { +pub fn channel_timebins_dir_path(fetch_info: &SfChFetchInfo, node: &Node) -> Result { let sfc = node.sf_databuffer.as_ref().unwrap(); let ret = sfc .data_base_path - .join(format!("{}_{}", sfc.ksprefix, channel_config.keyspace)) + .join(format!("{}_{}", sfc.ksprefix, fetch_info.ks())) .join("byTime") - .join(channel_config.channel.name()); + .join(fetch_info.name()); Ok(ret) } -pub fn data_dir_path(ts: TsNano, channel_config: &SfDbChConf, split: u32, node: &Node) -> Result { - let ret = channel_timebins_dir_path(channel_config, node)? - .join(format!("{:019}", ts.ns() / channel_config.time_bin_size.ns())) +pub fn data_dir_path(ts: TsNano, fetch_info: &SfChFetchInfo, split: u32, node: &Node) -> Result { + let ret = channel_timebins_dir_path(fetch_info, node)? + .join(format!("{:019}", ts.ns() / fetch_info.bs().ns())) .join(format!("{:010}", split)); Ok(ret) } -pub fn data_path(ts: TsNano, channel_config: &SfDbChConf, split: u32, node: &Node) -> Result { - let fname = format!("{:019}_{:05}_Data", channel_config.time_bin_size.ns() / MS, 0); - let ret = data_dir_path(ts, channel_config, split, node)?.join(fname); +pub fn data_path(ts: TsNano, fetch_info: &SfChFetchInfo, split: u32, node: &Node) -> Result { + let fname = format!("{:019}_{:05}_Data", fetch_info.bs().ns() / MS, 0); + let ret = data_dir_path(ts, fetch_info, split, node)?.join(fname); Ok(ret) } -pub fn index_path(ts: TsNano, channel_config: &SfDbChConf, split: u32, node: &Node) -> Result { - let fname = format!("{:019}_{:05}_Data_Index", channel_config.time_bin_size.ns() / MS, 0); - let ret = data_dir_path(ts, channel_config, split, node)?.join(fname); +pub fn index_path(ts: TsNano, fetch_info: &SfChFetchInfo, split: u32, node: &Node) -> Result { + let fname = format!("{:019}_{:05}_Data_Index", fetch_info.bs().ns() / MS, 0); + let ret = data_dir_path(ts, fetch_info, split, node)?.join(fname); Ok(ret) } diff --git a/disk/src/raw/conn.rs b/disk/src/raw/conn.rs index a71fe2a..3c672c2 100644 --- a/disk/src/raw/conn.rs +++ b/disk/src/raw/conn.rs @@ -1,9 +1,7 @@ -use crate::channelconfig::config_entry_best_match; use crate::eventblobs::EventChunkerMultifile; use crate::eventchunker::EventChunkerConf; use crate::raw::generated::EventBlobsGeneratorI32Test00; use crate::raw::generated::EventBlobsGeneratorI32Test01; -use crate::SfDbChConf; use err::Error; use futures_util::stream; use futures_util::Stream; @@ -17,23 +15,21 @@ use netpod::log::*; use netpod::range::evrange::NanoRange; use netpod::AggKind; use netpod::ByteSize; -use netpod::ChConf; use netpod::DiskIoTune; use netpod::NodeConfigCached; -use netpod::SfDbChannel; -use parse::channelconfig::ConfigEntry; +use netpod::SfChFetchInfo; use query::api4::events::PlainEventsQuery; use std::pin::Pin; const TEST_BACKEND: &str = "testbackend-00"; fn make_num_pipeline_stream_evs( - chconf: ChConf, + fetch_info: SfChFetchInfo, agg_kind: AggKind, event_blobs: EventChunkerMultifile, ) -> Pin> + Send>> { - let scalar_type = chconf.scalar_type.clone(); - let shape = chconf.shape.clone(); + let scalar_type = fetch_info.scalar_type().clone(); + let shape = fetch_info.shape().clone(); let event_stream = match crate::decode::EventsDynStream::new(scalar_type, shape, agg_kind, event_blobs) { Ok(k) => k, Err(e) => { @@ -58,30 +54,11 @@ fn make_num_pipeline_stream_evs( pub async fn make_event_pipe( evq: &PlainEventsQuery, - chconf: ChConf, + fetch_info: SfChFetchInfo, ncc: &NodeConfigCached, ) -> Result> + Send>>, Error> { // sf-databuffer type backends identify channels by their (backend, name) only. - let channel = evq.channel().clone(); let range = evq.range().clone(); - let x = crate::channelconfig::channel_config_best_match(evq.range().try_into()?, channel, ncc).await; - let channel_config = match x { - Ok(Some(x)) => x, - Ok(None) => { - error!("make_event_pipe can not find config"); - return Err(Error::with_msg_no_trace("make_event_pipe can not find config")); - } - Err(e) => { - error!("make_event_pipe can not find config"); - if e.msg().contains("ErrorKind::NotFound") { - warn!("{e}"); - let s = futures_util::stream::empty(); - return Ok(Box::pin(s)); - } else { - return Err(e); - } - } - }; info!( "make_event_pipe need_expand {need_expand} {evq:?}", need_expand = evq.one_before_range() @@ -96,7 +73,7 @@ pub async fn make_event_pipe( }; let event_blobs = EventChunkerMultifile::new( (&range).try_into()?, - channel_config.clone(), + fetch_info.clone(), ncc.node.clone(), ncc.ix, DiskIoTune::default(), @@ -106,38 +83,25 @@ pub async fn make_event_pipe( out_max_len, ); error!("TODO replace AggKind in the called code"); - let pipe = make_num_pipeline_stream_evs(chconf, AggKind::TimeWeightedScalar, event_blobs); + let pipe = make_num_pipeline_stream_evs(fetch_info, AggKind::TimeWeightedScalar, event_blobs); Ok(pipe) } pub fn make_local_event_blobs_stream( range: NanoRange, - channel: SfDbChannel, - entry: &ConfigEntry, + fetch_info: &SfChFetchInfo, expand: bool, do_decompress: bool, event_chunker_conf: EventChunkerConf, disk_io_tune: DiskIoTune, node_config: &NodeConfigCached, ) -> Result { - info!("make_local_event_blobs_stream do_decompress {do_decompress} disk_io_tune {disk_io_tune:?}"); + info!( + "make_local_event_blobs_stream {fetch_info:?} do_decompress {do_decompress} disk_io_tune {disk_io_tune:?}" + ); if do_decompress { warn!("Possible issue: decompress central storage event blob stream"); } - let shape = match entry.to_shape() { - Ok(k) => k, - Err(e) => return Err(e)?, - }; - let channel_config = SfDbChConf { - channel, - keyspace: entry.ks as u8, - time_bin_size: entry.bs.clone(), - shape, - scalar_type: entry.scalar_type.clone(), - byte_order: entry.byte_order.clone(), - array: entry.is_array, - compression: entry.is_compressed, - }; // TODO should not need this for correctness. // Should limit based on return size and latency. let out_max_len = if node_config.node_config.cluster.is_central_storage { @@ -147,7 +111,7 @@ pub fn make_local_event_blobs_stream( }; let event_blobs = EventChunkerMultifile::new( range, - channel_config.clone(), + fetch_info.clone(), node_config.node.clone(), node_config.ix, disk_io_tune, @@ -161,8 +125,7 @@ pub fn make_local_event_blobs_stream( pub fn make_remote_event_blobs_stream( range: NanoRange, - channel: SfDbChannel, - entry: &ConfigEntry, + fetch_info: &SfChFetchInfo, expand: bool, do_decompress: bool, event_chunker_conf: EventChunkerConf, @@ -170,20 +133,6 @@ pub fn make_remote_event_blobs_stream( node_config: &NodeConfigCached, ) -> Result>, Error> { debug!("make_remote_event_blobs_stream"); - let shape = match entry.to_shape() { - Ok(k) => k, - Err(e) => return Err(e)?, - }; - let channel_config = SfDbChConf { - channel, - keyspace: entry.ks as u8, - time_bin_size: entry.bs.clone(), - shape: shape, - scalar_type: entry.scalar_type.clone(), - byte_order: entry.byte_order.clone(), - array: entry.is_array, - compression: entry.is_compressed, - }; // TODO should not need this for correctness. // Should limit based on return size and latency. let out_max_len = if node_config.node_config.cluster.is_central_storage { @@ -193,7 +142,7 @@ pub fn make_remote_event_blobs_stream( }; let event_blobs = EventChunkerMultifile::new( range, - channel_config.clone(), + fetch_info.clone(), node_config.node.clone(), node_config.ix, disk_io_tune, @@ -207,6 +156,7 @@ pub fn make_remote_event_blobs_stream( pub async fn make_event_blobs_pipe_real( evq: &PlainEventsQuery, + fetch_info: &SfChFetchInfo, node_config: &NodeConfigCached, ) -> Result> + Send>>, Error> { if false { @@ -217,34 +167,13 @@ pub async fn make_event_blobs_pipe_real( } let expand = evq.one_before_range(); let range = evq.range(); - let entry = match config_entry_best_match(&evq.range().try_into()?, evq.channel().clone(), node_config).await { - Ok(Some(x)) => x, - Ok(None) => { - let e = Error::with_msg_no_trace("no config entry found"); - error!("{e}"); - return Err(e); - } - Err(e) => { - if e.to_public_error().msg().contains("no config entry found") { - let item = items_0::streamitem::LogItem { - node_ix: node_config.ix as _, - level: Level::WARN, - msg: format!("{} {}", node_config.node.host, e), - }; - return Ok(Box::pin(stream::iter([Ok(StreamItem::Log(item))]))); - } else { - return Err(e); - } - } - }; let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); // TODO should depend on host config let do_local = node_config.node_config.cluster.is_central_storage; let pipe = if do_local { let event_blobs = make_local_event_blobs_stream( range.try_into()?, - evq.channel().clone(), - &entry, + fetch_info, expand, false, event_chunker_conf, @@ -255,8 +184,7 @@ pub async fn make_event_blobs_pipe_real( } else { let event_blobs = make_remote_event_blobs_stream( range.try_into()?, - evq.channel().clone(), - &entry, + fetch_info, expand, true, event_chunker_conf, @@ -320,12 +248,13 @@ pub async fn make_event_blobs_pipe_test( pub async fn make_event_blobs_pipe( evq: &PlainEventsQuery, + fetch_info: &SfChFetchInfo, node_config: &NodeConfigCached, ) -> Result> + Send>>, Error> { debug!("make_event_blobs_pipe {evq:?}"); if evq.channel().backend() == TEST_BACKEND { make_event_blobs_pipe_test(evq, node_config).await } else { - make_event_blobs_pipe_real(evq, node_config).await + make_event_blobs_pipe_real(evq, fetch_info, node_config).await } } diff --git a/dq/src/bin/dq.rs b/dq/src/bin/dq.rs index c951f66..7b873e3 100644 --- a/dq/src/bin/dq.rs +++ b/dq/src/bin/dq.rs @@ -2,15 +2,13 @@ use clap::ArgAction; use clap::Parser; use disk::eventchunker::EventChunker; use disk::eventchunker::EventChunkerConf; -use disk::SfDbChConf; use err::Error; #[allow(unused)] use netpod::log::*; use netpod::range::evrange::NanoRange; use netpod::ByteOrder; use netpod::ByteSize; -use netpod::SfDbChannel; -use netpod::Shape; +use netpod::SfChFetchInfo; use std::path::PathBuf; use tokio::fs::File; use tokio::io::AsyncReadExt; @@ -80,16 +78,15 @@ pub fn main() -> Result<(), Error> { let disk_io_tune = netpod::DiskIoTune::default(); let inp = Box::pin(disk::file_content_stream(path.clone(), file, disk_io_tune)); let ce = &config.entries[0]; - let channel_config = SfDbChConf { - channel: SfDbChannel::from_name("", &config.channel_name), - keyspace: ce.ks as u8, - time_bin_size: ce.bs.clone(), - scalar_type: ce.scalar_type.clone(), - compression: false, - shape: Shape::Scalar, - array: false, - byte_order: ByteOrder::Little, - }; + let fetch_info = SfChFetchInfo::new( + "", + config.channel_name, + ce.ks as _, + ce.bs.clone(), + ByteOrder::Little, + ce.scalar_type.clone(), + ce.to_shape()?, + ); let range = NanoRange { beg: u64::MIN, end: u64::MAX, @@ -97,15 +94,7 @@ pub fn main() -> Result<(), Error> { let stats_conf = EventChunkerConf { disk_stats_every: ByteSize::mb(2), }; - let _chunks = EventChunker::from_start( - inp, - channel_config.clone(), - range, - stats_conf, - path.clone(), - false, - true, - ); + let _chunks = EventChunker::from_start(inp, fetch_info, range, stats_conf, path.clone(), false, true); err::todo(); Ok(()) } diff --git a/httpret/src/api1.rs b/httpret/src/api1.rs index 5eeb67e..76b7e14 100644 --- a/httpret/src/api1.rs +++ b/httpret/src/api1.rs @@ -1,5 +1,3 @@ -pub mod configquorum; - use crate::err::Error; use crate::gather::gather_get_json_generic; use crate::gather::SubRes; @@ -34,13 +32,16 @@ use netpod::query::api1::Api1Query; use netpod::range::evrange::NanoRange; use netpod::timeunits::SEC; use netpod::ByteSize; +use netpod::ChConf; use netpod::ChannelSearchQuery; use netpod::ChannelSearchResult; +use netpod::ChannelTypeConfigGen; use netpod::DiskIoTune; use netpod::NodeConfigCached; use netpod::PerfOpts; use netpod::ProxyConfig; use netpod::ScalarType; +use netpod::SfChFetchInfo; use netpod::SfDbChannel; use netpod::Shape; use netpod::ACCEPT_ALL; @@ -639,13 +640,19 @@ impl Api1ChannelHeader { } } +async fn find_ch_conf(channel: SfDbChannel, ncc: NodeConfigCached) -> Result { + //find_sf_channel_config_basics_quorum() + todo!() +} + pub struct DataApiPython3DataStream { range: NanoRange, channels: VecDeque, current_channel: Option, node_config: NodeConfigCached, chan_stream: Option> + Send>>>, - config_fut: Option> + Send>>>, + config_fut: Option> + Send>>>, + ch_conf: Option, disk_io_tune: DiskIoTune, do_decompress: bool, #[allow(unused)] @@ -674,6 +681,7 @@ impl DataApiPython3DataStream { node_config, chan_stream: None, config_fut: None, + ch_conf: None, disk_io_tune, do_decompress, event_count: 0, @@ -688,10 +696,11 @@ impl DataApiPython3DataStream { fn convert_item( b: EventFull, channel: &SfDbChannel, - entry: &ConfigEntry, + fetch_info: &SfChFetchInfo, header_out: &mut bool, count_events: &mut usize, ) -> Result { + let shape = fetch_info.shape(); let mut d = BytesMut::new(); for i1 in 0..b.len() { const EVIMAX: usize = 6; @@ -727,7 +736,7 @@ impl DataApiPython3DataStream { }, // The shape is inconsistent on the events. // Seems like the config is to be trusted in this case. - shape: shape_to_api3proto(&entry.shape), + shape: shape.to_u32_vec(), compression, }; let h = serde_json::to_string(&head)?; @@ -787,86 +796,66 @@ impl DataApiPython3DataStream { } } - fn handle_config_fut_ready(&mut self, item: Result) -> Result<(), Error> { - match item { - Ok(config) => { - self.config_fut = None; - let res = extract_matching_config_entry(&self.range, &config)?; - let entry = match res.best() { - Some(k) => k, - None => { - warn!("DataApiPython3DataStream no config entry found for {:?}", config); - self.chan_stream = Some(Box::pin(stream::empty())); - // TODO remember the issue for status and metrics - return Ok(()); - } - }; - let entry = entry.clone(); - let channel = self.current_channel.as_ref().unwrap(); - debug!("found channel_config for {}: {:?}", channel.name(), entry); - let evq = PlainEventsQuery::new(channel.clone(), self.range.clone()).for_event_blobs(); - debug!("query for event blobs retrieval: evq {evq:?}"); - // TODO important TODO - debug!("TODO fix magic inmem_bufcap"); - debug!("TODO add timeout option to data api3 download"); - let perf_opts = PerfOpts::default(); - // TODO is this a good to place decide this? - let s = if self.node_config.node_config.cluster.is_central_storage { - info!("Set up central storage stream"); - // TODO pull up this config - let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); - let s = make_local_event_blobs_stream( - evq.range().try_into()?, - evq.channel().clone(), - &entry, - evq.one_before_range(), - self.do_decompress, - event_chunker_conf, - self.disk_io_tune.clone(), - &self.node_config, - )?; - Box::pin(s) as Pin> + Send>> - } else { - if let Some(sh) = &entry.shape { - if sh.len() > 1 { - warn!("Remote stream fetch for shape {sh:?}"); - } - } - debug!("Set up merged remote stream"); - let s = MergedBlobsFromRemotes::new(evq, perf_opts, self.node_config.node_config.cluster.clone()); - Box::pin(s) as Pin> + Send>> - }; - let s = s.map({ - let mut header_out = false; - let mut count_events = 0; - let channel = self.current_channel.clone().unwrap(); - move |b| { - let ret = match b { - Ok(b) => { - let f = match b { - StreamItem::DataItem(RangeCompletableItem::Data(b)) => { - Self::convert_item(b, &channel, &entry, &mut header_out, &mut count_events)? - } - _ => BytesMut::new(), - }; - Ok(f) + // TODO this stream can currently only handle sf-databuffer type backend anyway. + fn handle_config_fut_ready(&mut self, fetch_info: SfChFetchInfo) -> Result<(), Error> { + self.config_fut = None; + debug!("found channel_config {:?}", fetch_info); + let channel = SfDbChannel::from_name(fetch_info.backend(), fetch_info.name()); + let evq = PlainEventsQuery::new(channel.clone(), self.range.clone()).for_event_blobs(); + debug!("query for event blobs retrieval: evq {evq:?}"); + // TODO important TODO + debug!("TODO fix magic inmem_bufcap"); + debug!("TODO add timeout option to data api3 download"); + let perf_opts = PerfOpts::default(); + // TODO is this a good to place decide this? + let s = if self.node_config.node_config.cluster.is_central_storage { + info!("Set up central storage stream"); + // TODO pull up this config + let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024)); + let s = make_local_event_blobs_stream( + evq.range().try_into()?, + &fetch_info, + evq.one_before_range(), + self.do_decompress, + event_chunker_conf, + self.disk_io_tune.clone(), + &self.node_config, + )?; + Box::pin(s) as Pin> + Send>> + } else { + debug!("Set up merged remote stream"); + let ch_conf: ChannelTypeConfigGen = fetch_info.clone().into(); + let s = MergedBlobsFromRemotes::new(evq, perf_opts, ch_conf, self.node_config.node_config.cluster.clone()); + Box::pin(s) as Pin> + Send>> + }; + let s = s.map({ + let mut header_out = false; + let mut count_events = 0; + let channel = self.current_channel.clone().unwrap(); + move |b| { + let ret = match b { + Ok(b) => { + let f = match b { + StreamItem::DataItem(RangeCompletableItem::Data(b)) => { + Self::convert_item(b, &channel, &fetch_info, &mut header_out, &mut count_events)? } - Err(e) => Err(e), + _ => BytesMut::new(), }; - ret + Ok(f) } - }); - //let _ = Box::new(s) as Box> + Unpin>; - let evm = if self.events_max == 0 { - usize::MAX - } else { - self.events_max as usize + Err(e) => Err(e), }; - self.chan_stream = Some(Box::pin(s.map_err(Error::from).take(evm))); - Ok(()) + ret } - Err(e) => Err(Error::with_msg_no_trace(format!("can not parse channel config {e}"))), - } + }); + //let _ = Box::new(s) as Box> + Unpin>; + let evm = if self.events_max == 0 { + usize::MAX + } else { + self.events_max as usize + }; + self.chan_stream = Some(Box::pin(s.map_err(Error::from).take(evm))); + Ok(()) } } @@ -893,7 +882,7 @@ impl Stream for DataApiPython3DataStream { } } else if let Some(fut) = &mut self.config_fut { match fut.poll_unpin(cx) { - Ready(k) => match self.handle_config_fut_ready(k) { + Ready(Ok(k)) => match self.handle_config_fut_ready(k) { Ok(()) => continue, Err(e) => { self.config_fut = None; @@ -902,13 +891,16 @@ impl Stream for DataApiPython3DataStream { Ready(Some(Err(e))) } }, + Ready(Err(e)) => { + self.data_done = true; + Ready(Some(Err(e))) + } Pending => Pending, } } else { if let Some(channel) = self.channels.pop_front() { self.current_channel = Some(channel.clone()); - let fut = read_local_config(channel, self.node_config.clone()).map_err(Error::from); - self.config_fut = Some(Box::pin(fut)); + self.config_fut = Some(Box::pin(find_ch_conf(channel, self.node_config.clone()))); continue; } else { self.data_done = true; @@ -987,9 +979,11 @@ impl Api1EventsBinaryHandler { } }; let span = if qu.log_level() == "trace" { - tracing::span!(tracing::Level::TRACE, "log_span_t") + debug!("enable trace for handler"); + tracing::span!(tracing::Level::TRACE, "log_span_trace") } else if qu.log_level() == "debug" { - tracing::span!(tracing::Level::DEBUG, "log_span_d") + debug!("enable debug for handler"); + tracing::span!(tracing::Level::DEBUG, "log_span_debug") } else { tracing::Span::none() }; diff --git a/httpret/src/api1/configquorum.rs b/httpret/src/api1/configquorum.rs deleted file mode 100644 index 7561f15..0000000 --- a/httpret/src/api1/configquorum.rs +++ /dev/null @@ -1,4 +0,0 @@ -pub async fn find_config_quorum() { - // TODO create new endpoint which only returns the most matching config entry - // for some given channel and time range. -} diff --git a/httpret/src/api4/binned.rs b/httpret/src/api4/binned.rs index 5c60fe9..a05d521 100644 --- a/httpret/src/api4/binned.rs +++ b/httpret/src/api4/binned.rs @@ -1,6 +1,6 @@ use crate::bodystream::response; use crate::bodystream::ToPublicResponse; -use crate::channelconfig::chconf_from_binned; +use crate::channelconfig::ch_conf_from_binned; use crate::err::Error; use crate::response_err; use err::anyhow::Context; @@ -28,13 +28,7 @@ async fn binned_json(url: Url, req: Request, node_config: &NodeConfigCache let msg = format!("can not parse query: {}", e.msg()); e.add_public_msg(msg) })?; - let chconf = chconf_from_binned(&query, node_config).await?; - // Update the series id since we don't require some unique identifier yet. - let query = { - let mut query = query; - query.set_series_id(chconf.try_series().context("binned_json")?); - query - }; + let ch_conf = ch_conf_from_binned(&query, node_config).await?; let span1 = span!( Level::INFO, "httpret::binned", @@ -45,7 +39,7 @@ async fn binned_json(url: Url, req: Request, node_config: &NodeConfigCache span1.in_scope(|| { debug!("begin"); }); - let item = streams::timebinnedjson::timebinned_json(query, chconf, node_config.node_config.cluster.clone()) + let item = streams::timebinnedjson::timebinned_json(query, &ch_conf, node_config.node_config.cluster.clone()) .instrument(span1) .await?; let buf = serde_json::to_vec(&item)?; diff --git a/httpret/src/api4/events.rs b/httpret/src/api4/events.rs index 5a6d621..8c013f2 100644 --- a/httpret/src/api4/events.rs +++ b/httpret/src/api4/events.rs @@ -4,7 +4,6 @@ use crate::response; use crate::response_err; use crate::BodyStream; use crate::ToPublicResponse; -use err::anyhow::Context; use futures_util::stream; use futures_util::TryStreamExt; use http::Method; @@ -75,14 +74,8 @@ async fn plain_events_binary( ) -> Result, Error> { debug!("plain_events_binary req: {:?}", req); let query = PlainEventsQuery::from_url(&url).map_err(|e| e.add_public_msg(format!("Can not understand query")))?; - let chconf = chconf_from_events_v1(&query, node_config).await?; - info!("plain_events_binary chconf_from_events_v1: {chconf:?}"); - // Update the series id since we don't require some unique identifier yet. - let mut query = query; - query.set_series_id(chconf.try_series().context("plain_events_binary")?); - let query = query; - // --- - let _ = query; + let ch_conf = chconf_from_events_v1(&query, node_config).await?; + info!("plain_events_binary chconf_from_events_v1: {ch_conf:?}"); let s = stream::iter([Ok::<_, Error>(String::from("TODO_PREBINNED_BINARY_STREAM"))]); let ret = response(StatusCode::OK).body(BodyStream::wrapped( s.map_err(Error::from), @@ -100,21 +93,9 @@ async fn plain_events_json( let (_head, _body) = req.into_parts(); let query = PlainEventsQuery::from_url(&url)?; info!("plain_events_json query {query:?}"); - let chconf = chconf_from_events_v1(&query, node_config).await.map_err(Error::from)?; - info!("plain_events_json chconf_from_events_v1: {chconf:?}"); - // Update the series id since we don't require some unique identifier yet. - let mut query = query; - let kk = chconf.try_series(); - let kk = kk.context("plain_events_json"); - if let Err(e) = &kk { - warn!("kk ctx debug {kk:?}"); - warn!("kk e ctx display {e}"); - } - query.set_series_id(kk?); - let query = query; - // --- - //let query = RawEventsQuery::new(query.channel().clone(), query.range().clone(), AggKind::Plain); - let item = streams::plaineventsjson::plain_events_json(&query, &chconf, &node_config.node_config.cluster).await; + let ch_conf = chconf_from_events_v1(&query, node_config).await.map_err(Error::from)?; + info!("plain_events_json chconf_from_events_v1: {ch_conf:?}"); + let item = streams::plaineventsjson::plain_events_json(&query, &ch_conf, &node_config.node_config.cluster).await; let item = match item { Ok(item) => item, Err(e) => { diff --git a/httpret/src/channelconfig.rs b/httpret/src/channelconfig.rs index b283c3c..b00c655 100644 --- a/httpret/src/channelconfig.rs +++ b/httpret/src/channelconfig.rs @@ -15,6 +15,7 @@ use netpod::timeunits::*; use netpod::ChConf; use netpod::ChannelConfigQuery; use netpod::ChannelConfigResponse; +use netpod::ChannelTypeConfigGen; use netpod::FromUrl; use netpod::NodeConfigCached; use netpod::ScalarType; @@ -33,8 +34,12 @@ use serde::Serialize; use std::collections::BTreeMap; use url::Url; -pub async fn chconf_from_events_v1(q: &PlainEventsQuery, ncc: &NodeConfigCached) -> Result { - let ret = nodenet::channelconfig::channel_config(q.range().try_into()?, q.channel().clone(), ncc).await?; +pub async fn chconf_from_events_v1( + q: &PlainEventsQuery, + ncc: &NodeConfigCached, +) -> Result { + // let ret = nodenet::channelconfig::channel_config(q.range().try_into()?, q.channel().clone(), ncc).await?; + let ret = nodenet::configquorum::find_config_basics_quorum(q.channel(), ncc).await?; Ok(ret) } @@ -49,8 +54,9 @@ pub async fn chconf_from_prebinned(q: &PreBinnedQuery, _ncc: &NodeConfigCached) Ok(ret) } -pub async fn chconf_from_binned(q: &BinnedQuery, ncc: &NodeConfigCached) -> Result { - let ret = nodenet::channelconfig::channel_config(q.range().try_into()?, q.channel().clone(), ncc).await?; +pub async fn ch_conf_from_binned(q: &BinnedQuery, ncc: &NodeConfigCached) -> Result { + // let ret = nodenet::channelconfig::channel_config(q.range().try_into()?, q.channel().clone(), ncc).await?; + let ret = nodenet::configquorum::find_config_basics_quorum(q.channel(), ncc).await?; Ok(ret) } diff --git a/items_2/src/merger.rs b/items_2/src/merger.rs index 3c65e7f..e144d12 100644 --- a/items_2/src/merger.rs +++ b/items_2/src/merger.rs @@ -440,7 +440,6 @@ where trace3!("emit out-of-band data len {}", k.len()); sitem_data(k) }); - trace!("emit out-of-band"); Ready(Some(item)) } else { match Self::poll2(self.as_mut(), cx) { diff --git a/netpod/src/netpod.rs b/netpod/src/netpod.rs index 9cf05e8..88148a6 100644 --- a/netpod/src/netpod.rs +++ b/netpod/src/netpod.rs @@ -1066,6 +1066,15 @@ impl Shape { } } + pub fn to_u32_vec(&self) -> Vec { + use Shape::*; + match self { + Scalar => Vec::new(), + Wave(n) => vec![*n as u32], + Image(n, m) => vec![*n as u32, *m as u32], + } + } + pub fn from_url_str(s: &str) -> Result { let ret = serde_json::from_str(s)?; Ok(ret) @@ -2429,6 +2438,127 @@ impl ChConf { } } +// Includes the necessary information to know where to localize datafiles for sf-databuffer +// and what (approximate) types to expect. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SfChFetchInfo { + backend: String, + name: String, + ks: u8, + bs: TsNano, + scalar_type: ScalarType, + shape: Shape, + compression: bool, + byte_order: ByteOrder, + array: bool, +} + +impl SfChFetchInfo { + pub fn new( + backend: S1, + name: S2, + ks: u8, + bs: TsNano, + byte_order: ByteOrder, + scalar_type: ScalarType, + shape: Shape, + ) -> Self + where + S1: Into, + S2: Into, + { + Self { + backend: backend.into(), + name: name.into(), + ks, + bs, + scalar_type, + shape, + byte_order, + compression: false, + array: false, + } + } + + pub fn with_compression(mut self, x: bool) -> Self { + self.compression = x; + self + } + + pub fn with_array(mut self, x: bool) -> Self { + self.array = x; + self + } + + pub fn backend(&self) -> &str { + &self.backend + } + + pub fn name(&self) -> &str { + &self.name + } + + pub fn ks(&self) -> u8 { + self.ks + } + + pub fn bs(&self) -> TsNano { + self.bs.clone() + } + + pub fn scalar_type(&self) -> &ScalarType { + &self.scalar_type + } + + pub fn shape(&self) -> &Shape { + &self.shape + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum ChannelTypeConfigGen { + Scylla(ChConf), + SfDatabuffer(SfChFetchInfo), +} + +impl ChannelTypeConfigGen { + pub fn to_scylla(&self) -> Result { + if let ChannelTypeConfigGen::Scylla(k) = self { + Ok(k.clone()) + } else { + Err(Error::with_msg_no_trace("this ChannelTypeConfigGen is not for scylla")) + } + } + + pub fn to_sf_databuffer(&self) -> Result { + if let ChannelTypeConfigGen::SfDatabuffer(k) = self { + Ok(k.clone()) + } else { + Err(Error::with_msg_no_trace("this ChannelTypeConfigGen is not for scylla")) + } + } + + pub fn scalar_type(&self) -> &ScalarType { + match self { + ChannelTypeConfigGen::Scylla(x) => &x.scalar_type, + ChannelTypeConfigGen::SfDatabuffer(x) => x.scalar_type(), + } + } + + pub fn shape(&self) -> &Shape { + match self { + ChannelTypeConfigGen::Scylla(x) => &x.shape, + ChannelTypeConfigGen::SfDatabuffer(x) => x.shape(), + } + } +} + +impl From for ChannelTypeConfigGen { + fn from(value: SfChFetchInfo) -> Self { + Self::SfDatabuffer(value) + } +} + pub fn f32_close(a: f32, b: f32) -> bool { if (a - b).abs() < 1e-4 || (a / b > 0.999 && a / b < 1.001) { true diff --git a/nodenet/src/configquorum.rs b/nodenet/src/configquorum.rs new file mode 100644 index 0000000..1b72ac4 --- /dev/null +++ b/nodenet/src/configquorum.rs @@ -0,0 +1,33 @@ +use disk::SfDbChConf; +use err::Error; +use netpod::ChannelTypeConfigGen; +use netpod::NodeConfigCached; +use netpod::SfChFetchInfo; +use netpod::SfDbChannel; + +async fn find_sf_channel_config_basics_quorum() -> Result { + type _A = SfDbChannel; + type _B = SfDbChConf; + // TODO create new endpoint which only returns the most matching config entry + // for some given channel and time range. + todo!() +} + +pub async fn find_config_basics_quorum( + channel: &SfDbChannel, + ncc: &NodeConfigCached, +) -> Result { + if let Some(cfg) = &ncc.node.sf_databuffer { + let ret: SfChFetchInfo = err::todoval(); + Ok(ChannelTypeConfigGen::SfDatabuffer(ret)) + } else if let Some(cfg) = &ncc.node_config.cluster.scylla { + let ret = dbconn::channelconfig::chconf_from_scylla_type_backend(&channel, ncc) + .await + .map_err(Error::from)?; + Ok(ChannelTypeConfigGen::Scylla(ret)) + } else { + Err(Error::with_msg_no_trace( + "find_config_basics_quorum not supported backend", + )) + } +} diff --git a/nodenet/src/conn.rs b/nodenet/src/conn.rs index db62d62..f211ea9 100644 --- a/nodenet/src/conn.rs +++ b/nodenet/src/conn.rs @@ -20,9 +20,11 @@ use items_2::inmem::InMemoryFrame; use netpod::histo::HistoLog2; use netpod::log::*; use netpod::ChConf; +use netpod::ChannelTypeConfigGen; use netpod::NodeConfigCached; use netpod::PerfOpts; use query::api4::events::PlainEventsQuery; +use serde_json::Value as JsValue; use std::net::SocketAddr; use std::pin::Pin; use streams::frames::inmem::InMemoryFrameAsyncReadStream; @@ -71,7 +73,7 @@ impl> From<(E, OwnedWriteHalf)> for ConnErr { async fn make_channel_events_stream_data( evq: PlainEventsQuery, - chconf: ChConf, + ch_conf: ChannelTypeConfigGen, node_config: &NodeConfigCached, ) -> Result> + Send>>, Error> { if evq.channel().backend() == TEST_BACKEND { @@ -135,7 +137,7 @@ async fn make_channel_events_stream_data( } } } else if let Some(scyconf) = &node_config.node_config.cluster.scylla { - scylla_channel_event_stream(evq, chconf, scyconf, node_config).await + scylla_channel_event_stream(evq, ch_conf.to_scylla()?, scyconf, node_config).await } else if let Some(_) = &node_config.node.channel_archiver { let e = Error::with_msg_no_trace("archapp not built"); Err(e) @@ -143,18 +145,18 @@ async fn make_channel_events_stream_data( let e = Error::with_msg_no_trace("archapp not built"); Err(e) } else { - Ok(disk::raw::conn::make_event_pipe(&evq, chconf, node_config).await?) + Ok(disk::raw::conn::make_event_pipe(&evq, ch_conf.to_sf_databuffer()?, node_config).await?) } } async fn make_channel_events_stream( evq: PlainEventsQuery, - chconf: ChConf, + ch_conf: ChannelTypeConfigGen, node_config: &NodeConfigCached, ) -> Result> + Send>>, Error> { - let empty = empty_events_dyn_ev(&chconf.scalar_type, &chconf.shape)?; + let empty = empty_events_dyn_ev(ch_conf.scalar_type(), ch_conf.shape())?; let empty = sitem_data(ChannelEvents::Events(empty)); - let stream = make_channel_events_stream_data(evq, chconf, node_config).await?; + let stream = make_channel_events_stream_data(evq, ch_conf, node_config).await?; let ret = futures_util::stream::iter([empty]).chain(stream); let ret = Box::pin(ret); Ok(ret) @@ -187,8 +189,8 @@ async fn events_get_input_frames(netin: OwnedReadHalf) -> Result, ncc: &NodeConfigCached, -) -> Result<(PlainEventsQuery, ChConf), Error> { - if frames.len() != 1 { +) -> Result<(PlainEventsQuery, ChannelTypeConfigGen), Error> { + if frames.len() != 2 { error!("{:?}", frames); error!("missing command frame len {}", frames.len()); let e = Error::with_msg("missing command frame"); @@ -212,20 +214,31 @@ async fn events_parse_input_query( }, Err(e) => return Err(e), }; - let res: Result = serde_json::from_str(&qitem.0); - let evq = match res { - Ok(k) => k, - Err(e) => { - let e = Error::with_msg_no_trace(format!("json parse error: {e}")); - error!("{e}"); - return Err(e); - } - }; + let evq: PlainEventsQuery = serde_json::from_str(&qitem.0).map_err(|e| { + let e = Error::with_msg_no_trace(format!("json parse error: {e}")); + error!("{e}"); + e + })?; debug!("events_parse_input_query {:?}", evq); - let chconf = crate::channelconfig::channel_config(evq.range().try_into()?, evq.channel().clone(), ncc) - .await - .map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?; - Ok((evq, chconf)) + if query_frame.tyid() != EVENT_QUERY_JSON_STRING_FRAME { + return Err(Error::with_msg("query frame wrong type")); + } + let qitem = match decode_frame::>(&frames[1]) { + Ok(k) => match k { + Ok(k) => match k { + StreamItem::DataItem(k) => match k { + RangeCompletableItem::Data(k) => k, + RangeCompletableItem::RangeComplete => return Err(Error::with_msg("bad query item")), + }, + _ => return Err(Error::with_msg("bad query item")), + }, + Err(e) => return Err(e), + }, + Err(e) => return Err(e), + }; + let ch_conf: ChannelTypeConfigGen = serde_json::from_str(&qitem.0)?; + info!("\n\nparsed second frame:\n{ch_conf:?}"); + Ok((evq, ch_conf)) } async fn events_conn_handler_inner_try( @@ -239,23 +252,25 @@ async fn events_conn_handler_inner_try( Ok(x) => x, Err(e) => return Err((e, netout).into()), }; - let (evq, chconf) = match events_parse_input_query(frames, node_config).await { + let (evq, ch_conf) = match events_parse_input_query(frames, node_config).await { Ok(x) => x, Err(e) => return Err((e, netout).into()), }; let mut stream: Pin> + Send>> = if evq.is_event_blobs() { // TODO support event blobs as transform - match disk::raw::conn::make_event_blobs_pipe(&evq, node_config).await { + let fetch_info = match ch_conf.to_sf_databuffer() { + Ok(x) => x, + Err(e) => return Err((e, netout).into()), + }; + match disk::raw::conn::make_event_blobs_pipe(&evq, &fetch_info, node_config).await { Ok(stream) => { let stream = stream.map(|x| Box::new(x) as _); Box::pin(stream) } - Err(e) => { - return Err((e, netout).into()); - } + Err(e) => return Err((e, netout).into()), } } else { - match make_channel_events_stream(evq.clone(), chconf, node_config).await { + match make_channel_events_stream(evq.clone(), ch_conf, node_config).await { Ok(stream) => { if false { // TODO wasm example diff --git a/nodenet/src/conn/test.rs b/nodenet/src/conn/test.rs index 383302e..d3330f7 100644 --- a/nodenet/src/conn/test.rs +++ b/nodenet/src/conn/test.rs @@ -79,6 +79,9 @@ fn raw_data_00() { beg: SEC, end: SEC * 10, }; + if true { + todo!("must add 2nd frame with channel type info"); + } let qu = PlainEventsQuery::new(channel, range); let query = EventQueryJsonStringFrame(serde_json::to_string(&qu).unwrap()); let frame = sitem_data(query).make_frame()?; diff --git a/nodenet/src/lib.rs b/nodenet/src/lib.rs index 0200f82..a0011c1 100644 --- a/nodenet/src/lib.rs +++ b/nodenet/src/lib.rs @@ -1,3 +1,4 @@ pub mod channelconfig; +pub mod configquorum; pub mod conn; -pub mod scylla; \ No newline at end of file +pub mod scylla; diff --git a/streams/src/frames/inmem.rs b/streams/src/frames/inmem.rs index fe2a670..a4da4e5 100644 --- a/streams/src/frames/inmem.rs +++ b/streams/src/frames/inmem.rs @@ -72,7 +72,7 @@ where Ready(Ok(())) => { let n = buf.filled().len(); self.buf.wadv(n)?; - trace!("recv bytes {}", n); + trace2!("recv bytes {}", n); Ready(Ok(n)) } Ready(Err(e)) => Ready(Err(e.into())), diff --git a/streams/src/plaineventsjson.rs b/streams/src/plaineventsjson.rs index a5ea971..84d821a 100644 --- a/streams/src/plaineventsjson.rs +++ b/streams/src/plaineventsjson.rs @@ -12,7 +12,7 @@ use items_2::channelevents::ChannelEvents; use items_2::merger::Merger; use items_2::streams::PlainEventStream; use netpod::log::*; -use netpod::ChConf; +use netpod::ChannelTypeConfigGen; use netpod::Cluster; use query::api4::events::PlainEventsQuery; use serde_json::Value as JsonValue; @@ -20,7 +20,7 @@ use std::time::Instant; pub async fn plain_events_json( evq: &PlainEventsQuery, - _chconf: &ChConf, + ch_conf: &ChannelTypeConfigGen, cluster: &Cluster, ) -> Result { info!("plain_events_json evquery {:?}", evq); @@ -28,7 +28,7 @@ pub async fn plain_events_json( let deadline = Instant::now() + evq.timeout(); let mut tr = build_merged_event_transform(evq.transform())?; // TODO make sure the empty container arrives over the network. - let inps = open_tcp_streams::<_, ChannelEvents>(&evq, cluster).await?; + let inps = open_tcp_streams::<_, ChannelEvents>(&evq, ch_conf, cluster).await?; // TODO propagate also the max-buf-len for the first stage event reader. // TODO use a mixture of count and byte-size as threshold. let stream = Merger::new(inps, evq.merger_out_len_max()); diff --git a/streams/src/tcprawclient.rs b/streams/src/tcprawclient.rs index 2171793..44ab81c 100644 --- a/streams/src/tcprawclient.rs +++ b/streams/src/tcprawclient.rs @@ -17,6 +17,7 @@ use items_2::framable::EventQueryJsonStringFrame; use items_2::framable::Framable; use items_2::frame::make_term_frame; use netpod::log::*; +use netpod::ChannelTypeConfigGen; use netpod::Cluster; use netpod::Node; use netpod::PerfOpts; @@ -30,6 +31,7 @@ use tokio::net::TcpStream; pub async fn x_processed_event_blobs_stream_from_node( query: PlainEventsQuery, + ch_conf: ChannelTypeConfigGen, perf_opts: PerfOpts, node: Node, ) -> Result> + Send>>, Error> { @@ -38,9 +40,16 @@ pub async fn x_processed_event_blobs_stream_from_node( let net = TcpStream::connect(addr.clone()).await?; let qjs = serde_json::to_string(&query)?; let (netin, mut netout) = net.into_split(); + let item = sitem_data(EventQueryJsonStringFrame(qjs)); let buf = item.make_frame()?; netout.write_all(&buf).await?; + + let s = serde_json::to_string(&ch_conf)?; + let item = sitem_data(EventQueryJsonStringFrame(s)); + let buf = item.make_frame()?; + netout.write_all(&buf).await?; + let buf = make_term_frame()?; netout.write_all(&buf).await?; netout.flush().await?; @@ -53,7 +62,11 @@ pub async fn x_processed_event_blobs_stream_from_node( pub type BoxedStream = Pin> + Send>>; -pub async fn open_tcp_streams(query: Q, cluster: &Cluster) -> Result>, Error> +pub async fn open_tcp_streams( + query: Q, + ch_conf: &ChannelTypeConfigGen, + cluster: &Cluster, +) -> Result>, Error> where Q: Serialize, // Group bounds in new trait @@ -67,9 +80,16 @@ where let net = TcpStream::connect(addr.clone()).await?; let qjs = serde_json::to_string(&query)?; let (netin, mut netout) = net.into_split(); + let item = sitem_data(EventQueryJsonStringFrame(qjs)); let buf = item.make_frame()?; netout.write_all(&buf).await?; + + let s = serde_json::to_string(ch_conf)?; + let item = sitem_data(EventQueryJsonStringFrame(s)); + let buf = item.make_frame()?; + netout.write_all(&buf).await?; + let buf = make_term_frame()?; netout.write_all(&buf).await?; netout.flush().await?; diff --git a/streams/src/timebinnedjson.rs b/streams/src/timebinnedjson.rs index 949dc59..d4be048 100644 --- a/streams/src/timebinnedjson.rs +++ b/streams/src/timebinnedjson.rs @@ -21,7 +21,7 @@ use items_2::streams::PlainEventStream; use netpod::log::*; use netpod::range::evrange::NanoRange; use netpod::BinnedRangeEnum; -use netpod::ChConf; +use netpod::ChannelTypeConfigGen; use netpod::Cluster; use query::api4::binned::BinnedQuery; use query::api4::events::PlainEventsQuery; @@ -38,12 +38,13 @@ async fn timebinnable_stream( query: BinnedQuery, range: NanoRange, one_before_range: bool, + ch_conf: &ChannelTypeConfigGen, cluster: Cluster, ) -> Result { let evq = PlainEventsQuery::new(query.channel().clone(), range.clone()).for_time_weighted_scalar(); let mut tr = build_merged_event_transform(evq.transform())?; - let inps = open_tcp_streams::<_, ChannelEvents>(&evq, &cluster).await?; + let inps = open_tcp_streams::<_, ChannelEvents>(&evq, ch_conf, &cluster).await?; // TODO propagate also the max-buf-len for the first stage event reader. // TODO use a mixture of count and byte-size as threshold. let stream = Merger::new(inps, query.merger_out_len_max()); @@ -67,6 +68,7 @@ async fn timebinnable_stream( async fn timebinned_stream( query: BinnedQuery, binned_range: BinnedRangeEnum, + ch_conf: &ChannelTypeConfigGen, cluster: Cluster, ) -> Result>> + Send>>, Error> { let range = binned_range.binned_range_time().to_nano_range(); @@ -74,7 +76,7 @@ async fn timebinned_stream( let do_time_weight = true; let one_before_range = true; - let stream = timebinnable_stream(query.clone(), range, one_before_range, cluster).await?; + let stream = timebinnable_stream(query.clone(), range, one_before_range, ch_conf, cluster).await?; let stream: Pin> = stream.0; let stream = Box::pin(stream); // TODO rename TimeBinnedStream to make it more clear that it is the component which initiates the time binning. @@ -97,11 +99,15 @@ fn timebinned_to_collectable( stream } -pub async fn timebinned_json(query: BinnedQuery, _chconf: ChConf, cluster: Cluster) -> Result { +pub async fn timebinned_json( + query: BinnedQuery, + ch_conf: &ChannelTypeConfigGen, + cluster: Cluster, +) -> Result { let deadline = Instant::now().checked_add(query.timeout_value()).unwrap(); let binned_range = BinnedRangeEnum::covering_range(query.range().clone(), query.bin_count())?; let collect_max = 10000; - let stream = timebinned_stream(query.clone(), binned_range.clone(), cluster).await?; + let stream = timebinned_stream(query.clone(), binned_range.clone(), ch_conf, cluster).await?; let stream = timebinned_to_collectable(stream); let collected = Collect::new(stream, deadline, collect_max, None, Some(binned_range)); let collected: BoxFuture<_> = Box::pin(collected); diff --git a/taskrun/src/taskrun.rs b/taskrun/src/taskrun.rs index b5a96d8..3d00a25 100644 --- a/taskrun/src/taskrun.rs +++ b/taskrun/src/taskrun.rs @@ -2,6 +2,7 @@ use crate::log::*; use err::Error; use std::fmt; use std::future::Future; +use std::io; use std::panic; use std::sync::Arc; use std::sync::Mutex; @@ -108,6 +109,7 @@ fn tracing_init_inner() -> Result<(), Error> { .from_env() .map_err(|e| Error::with_msg_no_trace(format!("can not build tracing env filter {e}")))?; let fmt_layer = tracing_subscriber::fmt::Layer::new() + .with_writer(io::stderr) .with_timer(timer) .with_target(true) .with_ansi(false) @@ -125,6 +127,7 @@ fn tracing_init_inner() -> Result<(), Error> { // TODO tracing_loki seems not well composable, try open telemetry instead. if false { /*let fmt_layer = tracing_subscriber::fmt::Layer::new() + .with_writer(io::stderr) .with_timer(timer) .with_target(true) .with_ansi(false)