rustc panics
This commit is contained in:
@@ -86,10 +86,10 @@ async fn agg_x_dim_0_inner() {
|
||||
0,
|
||||
disk_io_tune,
|
||||
event_chunker_conf,
|
||||
false,
|
||||
true,
|
||||
// TODO
|
||||
32,
|
||||
netpod::ReqCtx::new("req-000"),
|
||||
);
|
||||
let _ = fut1;
|
||||
// TODO add the binning and expectation and await the result.
|
||||
@@ -148,10 +148,10 @@ async fn agg_x_dim_1_inner() {
|
||||
0,
|
||||
disk_io_tune,
|
||||
event_chunker_conf,
|
||||
false,
|
||||
true,
|
||||
// TODO
|
||||
32,
|
||||
netpod::ReqCtx::new("req-000"),
|
||||
);
|
||||
let _ = fut1;
|
||||
// TODO add the binning and expectation and await the result.
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use crate::SfDbChConf;
|
||||
use err::thiserror;
|
||||
use err::*;
|
||||
#[allow(unused)]
|
||||
use netpod::log::*;
|
||||
use netpod::range::evrange::NanoRange;
|
||||
@@ -10,21 +10,14 @@ use parse::channelconfig::read_local_config;
|
||||
use parse::channelconfig::ChannelConfigs;
|
||||
use parse::channelconfig::ConfigEntry;
|
||||
use parse::channelconfig::ConfigParseError;
|
||||
use std::fmt;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[derive(Debug, ThisError)]
|
||||
pub enum ConfigError {
|
||||
ParseError(ConfigParseError),
|
||||
NotFound,
|
||||
Error,
|
||||
}
|
||||
|
||||
// impl fmt::Display for ConfigError {
|
||||
// fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
||||
// write!(fmt, "ConfigError::{self:?}")
|
||||
// }
|
||||
// }
|
||||
|
||||
impl From<ConfigParseError> for ConfigError {
|
||||
fn from(value: ConfigParseError) -> Self {
|
||||
match value {
|
||||
|
||||
@@ -19,6 +19,7 @@ use tokio::io::AsyncSeekExt;
|
||||
use tokio::io::ErrorKind;
|
||||
use tokio::io::SeekFrom;
|
||||
|
||||
#[cfg(test)]
|
||||
const BACKEND: &str = "testbackend-00";
|
||||
|
||||
pub struct Positioned {
|
||||
|
||||
@@ -248,7 +248,10 @@ fn make_scalar_conv(
|
||||
agg_kind: &AggKind,
|
||||
) -> Result<Box<dyn ValueFromBytes>, Error> {
|
||||
let ret = match agg_kind {
|
||||
AggKind::EventBlobs => todo!("make_scalar_conv EventBlobs"),
|
||||
AggKind::EventBlobs => {
|
||||
error!("make_scalar_conv EventBlobs");
|
||||
return Err(Error::with_msg_no_trace("make_scalar_conv EventBlobs"));
|
||||
}
|
||||
AggKind::Plain
|
||||
| AggKind::DimXBinsN(_)
|
||||
| AggKind::DimXBins1
|
||||
@@ -285,7 +288,10 @@ fn make_scalar_conv(
|
||||
ScalarType::STRING => ValueDim1FromBytesImpl::<String>::boxed(shape),
|
||||
}
|
||||
}
|
||||
Shape::Image(_, _) => todo!("make_scalar_conv Image"),
|
||||
Shape::Image(_, _) => {
|
||||
error!("make_scalar_conv Image");
|
||||
return Err(Error::with_msg_no_trace("make_scalar_conv Image"));
|
||||
}
|
||||
},
|
||||
};
|
||||
Ok(ret)
|
||||
@@ -343,7 +349,7 @@ impl EventsDynStream {
|
||||
fn replace_events_out(&mut self) -> Result<Box<dyn Events>, Error> {
|
||||
let st = &self.scalar_type;
|
||||
let sh = &self.shape;
|
||||
error!("TODO replace_events_out feed through transform");
|
||||
// error!("TODO replace_events_out feed through transform");
|
||||
// TODO do we need/want the empty item from here?
|
||||
let empty = items_2::empty::empty_events_dyn_ev(st, sh)?;
|
||||
let evs = mem::replace(&mut self.events_out, empty);
|
||||
@@ -362,11 +368,6 @@ impl EventsDynStream {
|
||||
.zip(item.pulses.iter())
|
||||
{
|
||||
let endian = if be { Endian::Big } else { Endian::Little };
|
||||
let buf = if let Some(x) = buf {
|
||||
x
|
||||
} else {
|
||||
return Err(Error::with_msg_no_trace("no buf in event"));
|
||||
};
|
||||
self.scalar_conv
|
||||
.convert(ts, pulse, buf, endian, self.events_out.as_mut())?;
|
||||
}
|
||||
|
||||
@@ -283,6 +283,7 @@ fn start_read5(
|
||||
file: File,
|
||||
tx: async_channel::Sender<Result<FileChunkRead, Error>>,
|
||||
disk_io_tune: DiskIoTune,
|
||||
reqid: String,
|
||||
) -> Result<(), Error> {
|
||||
let fut = async move {
|
||||
let mut file = file;
|
||||
@@ -300,7 +301,7 @@ fn start_read5(
|
||||
}
|
||||
};
|
||||
let mut pos = pos_beg;
|
||||
info!("read5 begin {disk_io_tune:?}");
|
||||
debug!("read5 begin {disk_io_tune:?}");
|
||||
loop {
|
||||
let mut buf = BytesMut::new();
|
||||
buf.resize(disk_io_tune.read_buffer_len, 0);
|
||||
@@ -357,9 +358,9 @@ pub struct FileContentStream5 {
|
||||
}
|
||||
|
||||
impl FileContentStream5 {
|
||||
pub fn new(path: PathBuf, file: File, disk_io_tune: DiskIoTune) -> Result<Self, Error> {
|
||||
pub fn new(path: PathBuf, file: File, disk_io_tune: DiskIoTune, reqid: String) -> Result<Self, Error> {
|
||||
let (tx, rx) = async_channel::bounded(32);
|
||||
start_read5(path, file, tx, disk_io_tune)?;
|
||||
start_read5(path, file, tx, disk_io_tune, reqid)?;
|
||||
let ret = Self { rx };
|
||||
Ok(ret)
|
||||
}
|
||||
@@ -747,11 +748,16 @@ impl Stream for FileContentStream4 {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn file_content_stream(
|
||||
pub fn file_content_stream<S>(
|
||||
path: PathBuf,
|
||||
file: File,
|
||||
disk_io_tune: DiskIoTune,
|
||||
) -> Pin<Box<dyn Stream<Item = Result<FileChunkRead, Error>> + Send>> {
|
||||
reqid: S,
|
||||
) -> Pin<Box<dyn Stream<Item = Result<FileChunkRead, Error>> + Send>>
|
||||
where
|
||||
S: Into<String>,
|
||||
{
|
||||
let reqid = reqid.into();
|
||||
debug!("file_content_stream disk_io_tune {disk_io_tune:?}");
|
||||
match &disk_io_tune.read_sys {
|
||||
ReadSys::TokioAsyncRead => {
|
||||
@@ -771,7 +777,7 @@ pub fn file_content_stream(
|
||||
Box::pin(s) as _
|
||||
}
|
||||
ReadSys::Read5 => {
|
||||
let s = FileContentStream5::new(path, file, disk_io_tune).unwrap();
|
||||
let s = FileContentStream5::new(path, file, disk_io_tune, reqid).unwrap();
|
||||
Box::pin(s) as _
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@ use netpod::range::evrange::NanoRange;
|
||||
use netpod::timeunits::SEC;
|
||||
use netpod::DiskIoTune;
|
||||
use netpod::Node;
|
||||
use netpod::ReqCtxArc;
|
||||
use netpod::SfChFetchInfo;
|
||||
use std::collections::VecDeque;
|
||||
use std::pin::Pin;
|
||||
@@ -39,7 +40,6 @@ pub struct EventChunkerMultifile {
|
||||
files_count: u32,
|
||||
node_ix: usize,
|
||||
expand: bool,
|
||||
do_decompress: bool,
|
||||
max_ts: u64,
|
||||
out_max_len: usize,
|
||||
emit_count: usize,
|
||||
@@ -49,6 +49,7 @@ pub struct EventChunkerMultifile {
|
||||
done: bool,
|
||||
done_emit_range_final: bool,
|
||||
complete: bool,
|
||||
reqctx: ReqCtxArc,
|
||||
}
|
||||
|
||||
impl EventChunkerMultifile {
|
||||
@@ -64,10 +65,10 @@ impl EventChunkerMultifile {
|
||||
disk_io_tune: DiskIoTune,
|
||||
event_chunker_conf: EventChunkerConf,
|
||||
expand: bool,
|
||||
do_decompress: bool,
|
||||
out_max_len: usize,
|
||||
reqctx: ReqCtxArc,
|
||||
) -> Self {
|
||||
info!("EventChunkerMultifile expand {expand} do_decompress {do_decompress}");
|
||||
debug!("EventChunkerMultifile expand {expand}");
|
||||
let file_chan = if expand {
|
||||
open_expanded_files(&range, &fetch_info, node)
|
||||
} else {
|
||||
@@ -83,7 +84,6 @@ impl EventChunkerMultifile {
|
||||
files_count: 0,
|
||||
node_ix,
|
||||
expand,
|
||||
do_decompress,
|
||||
max_ts: 0,
|
||||
out_max_len,
|
||||
emit_count: 0,
|
||||
@@ -93,6 +93,7 @@ impl EventChunkerMultifile {
|
||||
done: false,
|
||||
done_emit_range_final: false,
|
||||
complete: false,
|
||||
reqctx,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -103,7 +104,6 @@ impl Stream for EventChunkerMultifile {
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
let span1 = span!(Level::INFO, "EvChMul", node_ix = self.node_ix);
|
||||
let _spg = span1.enter();
|
||||
info!("EventChunkerMultifile poll_next");
|
||||
use Poll::*;
|
||||
'outer: loop {
|
||||
break if let Some(item) = self.log_queue.pop_front() {
|
||||
@@ -194,6 +194,7 @@ impl Stream for EventChunkerMultifile {
|
||||
path.clone(),
|
||||
file,
|
||||
self.disk_io_tune.clone(),
|
||||
self.reqctx.reqid(),
|
||||
));
|
||||
let chunker = EventChunker::from_event_boundary(
|
||||
inp,
|
||||
@@ -202,7 +203,6 @@ impl Stream for EventChunkerMultifile {
|
||||
self.event_chunker_conf.clone(),
|
||||
path.clone(),
|
||||
self.expand,
|
||||
self.do_decompress,
|
||||
);
|
||||
let filtered = RangeFilter2::new(chunker, self.range.clone(), self.expand);
|
||||
self.evs = Some(Box::pin(filtered));
|
||||
@@ -229,6 +229,7 @@ impl Stream for EventChunkerMultifile {
|
||||
of.path.clone(),
|
||||
file,
|
||||
self.disk_io_tune.clone(),
|
||||
self.reqctx.reqid(),
|
||||
);
|
||||
let chunker = EventChunker::from_event_boundary(
|
||||
inp,
|
||||
@@ -237,7 +238,6 @@ impl Stream for EventChunkerMultifile {
|
||||
self.event_chunker_conf.clone(),
|
||||
of.path.clone(),
|
||||
self.expand,
|
||||
self.do_decompress,
|
||||
);
|
||||
chunkers.push(Box::pin(chunker) as _);
|
||||
}
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
use bitshuffle::bitshuffle_decompress;
|
||||
use bytes::Buf;
|
||||
use bytes::BytesMut;
|
||||
use err::thiserror;
|
||||
use err::Error;
|
||||
use err::ThisError;
|
||||
use futures_util::Stream;
|
||||
use futures_util::StreamExt;
|
||||
use items_0::streamitem::RangeCompletableItem;
|
||||
@@ -21,23 +21,21 @@ use netpod::ScalarType;
|
||||
use netpod::SfChFetchInfo;
|
||||
use netpod::Shape;
|
||||
use parse::channelconfig::CompressionMethod;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use std::io::Cursor;
|
||||
use std::path::PathBuf;
|
||||
use std::pin::Pin;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
use std::time::Instant;
|
||||
use streams::dtflags::*;
|
||||
use streams::filechunkread::FileChunkRead;
|
||||
use streams::needminbuffer::NeedMinBuffer;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[derive(Debug, ThisError, Serialize, Deserialize)]
|
||||
pub enum DataParseError {
|
||||
#[error("DataFrameLengthMismatch")]
|
||||
DataFrameLengthMismatch,
|
||||
#[error("FileHeaderTooShort")]
|
||||
FileHeaderTooShort,
|
||||
#[error("BadVersionTag")]
|
||||
BadVersionTag,
|
||||
#[error("HeaderTooLarge")]
|
||||
HeaderTooLarge,
|
||||
@@ -59,9 +57,7 @@ pub enum DataParseError {
|
||||
ShapedWithoutDims,
|
||||
#[error("TooManyDims")]
|
||||
TooManyDims,
|
||||
#[error("UnknownCompression")]
|
||||
UnknownCompression,
|
||||
#[error("BadCompresionBlockSize")]
|
||||
BadCompresionBlockSize,
|
||||
}
|
||||
|
||||
@@ -83,7 +79,6 @@ pub struct EventChunker {
|
||||
dbg_path: PathBuf,
|
||||
last_ts: u64,
|
||||
expand: bool,
|
||||
do_decompress: bool,
|
||||
decomp_dt_histo: HistoLog2,
|
||||
item_len_emit_histo: HistoLog2,
|
||||
seen_before_range_count: usize,
|
||||
@@ -155,36 +150,6 @@ fn is_config_match(is_array: &bool, ele_count: &u64, fetch_info: &SfChFetchInfo)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum DecompError {
|
||||
#[error("Error")]
|
||||
Error,
|
||||
}
|
||||
|
||||
fn decompress(databuf: &[u8], type_size: u32, ele_count: u64) -> Result<Vec<u8>, DecompError> {
|
||||
if databuf.len() < 13 {
|
||||
return Err(DecompError::Error);
|
||||
}
|
||||
let ts1 = Instant::now();
|
||||
let decomp_bytes = type_size as u64 * ele_count;
|
||||
let mut decomp = vec![0; decomp_bytes as usize];
|
||||
let ele_size = type_size;
|
||||
// TODO limit the buf slice range
|
||||
match bitshuffle_decompress(&databuf[12..], &mut decomp, ele_count as usize, ele_size as usize, 0) {
|
||||
Ok(c1) => {
|
||||
if 12 + c1 != databuf.len() {}
|
||||
let ts2 = Instant::now();
|
||||
let dt = ts2.duration_since(ts1);
|
||||
// TODO analyze the histo
|
||||
//self.decomp_dt_histo.ingest(dt.as_secs() as u32 + dt.subsec_micros());
|
||||
Ok(decomp)
|
||||
}
|
||||
Err(e) => {
|
||||
return Err(DecompError::Error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl EventChunker {
|
||||
pub fn self_name() -> &'static str {
|
||||
std::any::type_name::<Self>()
|
||||
@@ -198,14 +163,8 @@ impl EventChunker {
|
||||
stats_conf: EventChunkerConf,
|
||||
dbg_path: PathBuf,
|
||||
expand: bool,
|
||||
do_decompress: bool,
|
||||
) -> Self {
|
||||
info!(
|
||||
"{}::{} do_decompress {}",
|
||||
Self::self_name(),
|
||||
"from_start",
|
||||
do_decompress
|
||||
);
|
||||
info!("{}::{}", Self::self_name(), "from_start");
|
||||
let need_min_max = match fetch_info.shape() {
|
||||
Shape::Scalar => 1024 * 8,
|
||||
Shape::Wave(_) => 1024 * 32,
|
||||
@@ -231,7 +190,6 @@ impl EventChunker {
|
||||
dbg_path,
|
||||
last_ts: 0,
|
||||
expand,
|
||||
do_decompress,
|
||||
decomp_dt_histo: HistoLog2::new(8),
|
||||
item_len_emit_histo: HistoLog2::new(0),
|
||||
seen_before_range_count: 0,
|
||||
@@ -251,15 +209,9 @@ impl EventChunker {
|
||||
stats_conf: EventChunkerConf,
|
||||
dbg_path: PathBuf,
|
||||
expand: bool,
|
||||
do_decompress: bool,
|
||||
) -> Self {
|
||||
info!(
|
||||
"{}::{} do_decompress {}",
|
||||
Self::self_name(),
|
||||
"from_event_boundary",
|
||||
do_decompress
|
||||
);
|
||||
let mut ret = Self::from_start(inp, fetch_info, range, stats_conf, dbg_path, expand, do_decompress);
|
||||
info!("{}::{}", Self::self_name(), "from_event_boundary");
|
||||
let mut ret = Self::from_start(inp, fetch_info, range, stats_conf, dbg_path, expand);
|
||||
ret.state = DataFileState::Event;
|
||||
ret.need_min = 4;
|
||||
ret.inp.set_need_min(4);
|
||||
@@ -275,7 +227,7 @@ impl EventChunker {
|
||||
fn parse_buf_inner(&mut self, buf: &mut BytesMut) -> Result<ParseResult, DataParseError> {
|
||||
use byteorder::ReadBytesExt;
|
||||
use byteorder::BE;
|
||||
info!("parse_buf_inner buf len {}", buf.len());
|
||||
trace!("parse_buf_inner buf len {}", buf.len());
|
||||
let mut ret = EventFull::empty();
|
||||
let mut parsed_bytes = 0;
|
||||
loop {
|
||||
@@ -485,37 +437,13 @@ impl EventChunker {
|
||||
let n1 = p1 - p0;
|
||||
let n2 = len as u64 - n1 - 4;
|
||||
let databuf = buf[p1 as usize..(p1 as usize + n2 as usize)].as_ref();
|
||||
if false && is_compressed {
|
||||
//debug!("event ts {} is_compressed {}", ts, is_compressed);
|
||||
let value_bytes = sl.read_u64::<BE>().unwrap();
|
||||
let block_size = sl.read_u32::<BE>().unwrap();
|
||||
//debug!("event len {} ts {} is_compressed {} shape_dim {} len-dim-0 {} value_bytes {} block_size {}", len, ts, is_compressed, shape_dim, shape_lens[0], value_bytes, block_size);
|
||||
match self.fetch_info.shape() {
|
||||
Shape::Scalar => {
|
||||
assert!(value_bytes < 1024 * 1);
|
||||
}
|
||||
Shape::Wave(_) => {
|
||||
assert!(value_bytes < 1024 * 64);
|
||||
}
|
||||
Shape::Image(_, _) => {
|
||||
assert!(value_bytes < 1024 * 1024 * 20);
|
||||
}
|
||||
}
|
||||
if block_size > 1024 * 32 {
|
||||
return Err(DataParseError::BadCompresionBlockSize);
|
||||
}
|
||||
let type_size = scalar_type.bytes() as u32;
|
||||
let _ele_count = value_bytes / type_size as u64;
|
||||
let _ele_size = type_size;
|
||||
}
|
||||
if discard {
|
||||
self.discard_count += 1;
|
||||
} else {
|
||||
ret.add_event(
|
||||
ts,
|
||||
pulse,
|
||||
Some(databuf.to_vec()),
|
||||
None,
|
||||
databuf.to_vec(),
|
||||
scalar_type,
|
||||
is_big_endian,
|
||||
shape_this,
|
||||
@@ -635,39 +563,3 @@ impl Stream for EventChunker {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
//use err::Error;
|
||||
//use netpod::timeunits::*;
|
||||
//use netpod::{ByteSize, Nanos};
|
||||
|
||||
//const TEST_BACKEND: &str = "testbackend-00";
|
||||
|
||||
/*
|
||||
#[test]
|
||||
fn read_expanded_for_range(range: netpod::NanoRange, nodeix: usize) -> Result<(usize, usize), Error> {
|
||||
let chn = netpod::Channel {
|
||||
backend: TEST_BACKEND.into(),
|
||||
name: "scalar-i32-be".into(),
|
||||
};
|
||||
// TODO read config from disk.
|
||||
let channel_config = ChannelConfig {
|
||||
channel: chn,
|
||||
keyspace: 2,
|
||||
time_bin_size: Nanos { ns: DAY },
|
||||
scalar_type: netpod::ScalarType::I32,
|
||||
byte_order: netpod::ByteOrder::big_endian(),
|
||||
shape: netpod::Shape::Scalar,
|
||||
array: false,
|
||||
compression: false,
|
||||
};
|
||||
let cluster = taskrun::test_cluster();
|
||||
let node = cluster.nodes[nodeix].clone();
|
||||
let buffer_size = 512;
|
||||
let event_chunker_conf = EventChunkerConf {
|
||||
disk_stats_every: ByteSize::kb(1024),
|
||||
};
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
@@ -7,18 +7,18 @@ use netpod::SfChFetchInfo;
|
||||
use netpod::TsNano;
|
||||
use std::path::PathBuf;
|
||||
|
||||
// TODO remove/replace this
|
||||
pub fn datapath(timebin: u64, config: &SfDbChConf, split: u32, node: &Node) -> PathBuf {
|
||||
pub fn datapath_for_keyspace(ks: u32, node: &Node) -> PathBuf {
|
||||
node.sf_databuffer
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.data_base_path
|
||||
.join(format!(
|
||||
"{}_{}",
|
||||
node.sf_databuffer.as_ref().unwrap().ksprefix,
|
||||
config.keyspace
|
||||
))
|
||||
.join(format!("{}_{}", node.sf_databuffer.as_ref().unwrap().ksprefix, ks))
|
||||
.join("byTime")
|
||||
}
|
||||
|
||||
// TODO remove/replace this
|
||||
pub fn datapath(timebin: u64, config: &SfDbChConf, split: u32, node: &Node) -> PathBuf {
|
||||
datapath_for_keyspace(config.keyspace as u32, node)
|
||||
.join(config.channel.name())
|
||||
.join(format!("{:019}", timebin))
|
||||
.join(format!("{:010}", split))
|
||||
@@ -37,10 +37,7 @@ pub async fn datapaths_for_timebin(
|
||||
node: &Node,
|
||||
) -> Result<Vec<PathBuf>, Error> {
|
||||
let sfc = node.sf_databuffer.as_ref().unwrap();
|
||||
let timebin_path = sfc
|
||||
.data_base_path
|
||||
.join(format!("{}_{}", sfc.ksprefix, fetch_info.ks()))
|
||||
.join("byTime")
|
||||
let timebin_path = datapath_for_keyspace(fetch_info.ks() as u32, node)
|
||||
.join(fetch_info.name())
|
||||
.join(format!("{:019}", timebin));
|
||||
let rd = tokio::fs::read_dir(timebin_path).await?;
|
||||
@@ -70,12 +67,9 @@ pub async fn datapaths_for_timebin(
|
||||
}
|
||||
}
|
||||
}
|
||||
let mut ret = vec![];
|
||||
let mut ret = Vec::new();
|
||||
for split in splits {
|
||||
let path = sfc
|
||||
.data_base_path
|
||||
.join(format!("{}_{}", sfc.ksprefix, fetch_info.ks()))
|
||||
.join("byTime")
|
||||
let path = datapath_for_keyspace(fetch_info.ks() as u32, node)
|
||||
.join(fetch_info.name())
|
||||
.join(format!("{:019}", timebin))
|
||||
.join(format!("{:010}", split))
|
||||
@@ -86,12 +80,7 @@ pub async fn datapaths_for_timebin(
|
||||
}
|
||||
|
||||
pub fn channel_timebins_dir_path(fetch_info: &SfChFetchInfo, node: &Node) -> Result<PathBuf, Error> {
|
||||
let sfc = node.sf_databuffer.as_ref().unwrap();
|
||||
let ret = sfc
|
||||
.data_base_path
|
||||
.join(format!("{}_{}", sfc.ksprefix, fetch_info.ks()))
|
||||
.join("byTime")
|
||||
.join(fetch_info.name());
|
||||
let ret = datapath_for_keyspace(fetch_info.ks() as u32, node).join(fetch_info.name());
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
@@ -115,11 +104,7 @@ pub fn index_path(ts: TsNano, fetch_info: &SfChFetchInfo, split: u32, node: &Nod
|
||||
}
|
||||
|
||||
pub fn data_dir_path_tb(ks: u32, channel_name: &str, tb: u32, split: u32, node: &Node) -> Result<PathBuf, Error> {
|
||||
let sfc = node.sf_databuffer.as_ref().unwrap();
|
||||
let ret = sfc
|
||||
.data_base_path
|
||||
.join(format!("{}_{}", sfc.ksprefix, ks))
|
||||
.join("byTime")
|
||||
let ret = datapath_for_keyspace(ks, node)
|
||||
.join(channel_name)
|
||||
.join(format!("{:019}", tb))
|
||||
.join(format!("{:010}", split));
|
||||
|
||||
@@ -17,6 +17,7 @@ use netpod::AggKind;
|
||||
use netpod::ByteSize;
|
||||
use netpod::DiskIoTune;
|
||||
use netpod::NodeConfigCached;
|
||||
use netpod::ReqCtxArc;
|
||||
use netpod::SfChFetchInfo;
|
||||
use query::api4::events::EventsSubQuery;
|
||||
use std::pin::Pin;
|
||||
@@ -55,6 +56,7 @@ fn make_num_pipeline_stream_evs(
|
||||
pub async fn make_event_pipe(
|
||||
evq: EventsSubQuery,
|
||||
fetch_info: SfChFetchInfo,
|
||||
reqctx: ReqCtxArc,
|
||||
ncc: &NodeConfigCached,
|
||||
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>, Error> {
|
||||
// sf-databuffer type backends identify channels by their (backend, name) only.
|
||||
@@ -72,7 +74,6 @@ pub async fn make_event_pipe(
|
||||
} else {
|
||||
128
|
||||
};
|
||||
let do_decompress = true;
|
||||
let event_blobs = EventChunkerMultifile::new(
|
||||
(&range).try_into()?,
|
||||
fetch_info.clone(),
|
||||
@@ -81,8 +82,8 @@ pub async fn make_event_pipe(
|
||||
DiskIoTune::default(),
|
||||
event_chunker_conf,
|
||||
one_before,
|
||||
do_decompress,
|
||||
out_max_len,
|
||||
reqctx,
|
||||
);
|
||||
error!("TODO replace AggKind in the called code");
|
||||
let pipe = make_num_pipeline_stream_evs(fetch_info, AggKind::TimeWeightedScalar, event_blobs);
|
||||
@@ -93,17 +94,12 @@ pub fn make_local_event_blobs_stream(
|
||||
range: NanoRange,
|
||||
fetch_info: SfChFetchInfo,
|
||||
expand: bool,
|
||||
do_decompress: bool,
|
||||
event_chunker_conf: EventChunkerConf,
|
||||
disk_io_tune: DiskIoTune,
|
||||
reqctx: ReqCtxArc,
|
||||
node_config: &NodeConfigCached,
|
||||
) -> Result<EventChunkerMultifile, Error> {
|
||||
info!(
|
||||
"make_local_event_blobs_stream {fetch_info:?} do_decompress {do_decompress} disk_io_tune {disk_io_tune:?}"
|
||||
);
|
||||
if do_decompress {
|
||||
warn!("Possible issue: decompress central storage event blob stream");
|
||||
}
|
||||
info!("make_local_event_blobs_stream {fetch_info:?} disk_io_tune {disk_io_tune:?}");
|
||||
// TODO should not need this for correctness.
|
||||
// Should limit based on return size and latency.
|
||||
let out_max_len = if node_config.node_config.cluster.is_central_storage {
|
||||
@@ -119,8 +115,8 @@ pub fn make_local_event_blobs_stream(
|
||||
disk_io_tune,
|
||||
event_chunker_conf,
|
||||
expand,
|
||||
do_decompress,
|
||||
out_max_len,
|
||||
reqctx,
|
||||
);
|
||||
Ok(event_blobs)
|
||||
}
|
||||
@@ -129,9 +125,9 @@ pub fn make_remote_event_blobs_stream(
|
||||
range: NanoRange,
|
||||
fetch_info: SfChFetchInfo,
|
||||
expand: bool,
|
||||
do_decompress: bool,
|
||||
event_chunker_conf: EventChunkerConf,
|
||||
disk_io_tune: DiskIoTune,
|
||||
reqctx: ReqCtxArc,
|
||||
node_config: &NodeConfigCached,
|
||||
) -> Result<impl Stream<Item = Sitemty<EventFull>>, Error> {
|
||||
debug!("make_remote_event_blobs_stream");
|
||||
@@ -150,8 +146,8 @@ pub fn make_remote_event_blobs_stream(
|
||||
disk_io_tune,
|
||||
event_chunker_conf,
|
||||
expand,
|
||||
do_decompress,
|
||||
out_max_len,
|
||||
reqctx,
|
||||
);
|
||||
Ok(event_blobs)
|
||||
}
|
||||
@@ -159,6 +155,7 @@ pub fn make_remote_event_blobs_stream(
|
||||
pub async fn make_event_blobs_pipe_real(
|
||||
subq: &EventsSubQuery,
|
||||
fetch_info: &SfChFetchInfo,
|
||||
reqctx: ReqCtxArc,
|
||||
node_config: &NodeConfigCached,
|
||||
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>, Error> {
|
||||
if false {
|
||||
@@ -177,9 +174,9 @@ pub async fn make_event_blobs_pipe_real(
|
||||
range.try_into()?,
|
||||
fetch_info.clone(),
|
||||
expand,
|
||||
false,
|
||||
event_chunker_conf,
|
||||
DiskIoTune::default(),
|
||||
reqctx,
|
||||
node_config,
|
||||
)?;
|
||||
Box::pin(event_blobs) as _
|
||||
@@ -188,9 +185,9 @@ pub async fn make_event_blobs_pipe_real(
|
||||
range.try_into()?,
|
||||
fetch_info.clone(),
|
||||
expand,
|
||||
true,
|
||||
event_chunker_conf,
|
||||
DiskIoTune::default(),
|
||||
reqctx,
|
||||
node_config,
|
||||
)?;
|
||||
/*
|
||||
@@ -251,12 +248,13 @@ pub async fn make_event_blobs_pipe_test(
|
||||
pub async fn make_event_blobs_pipe(
|
||||
subq: &EventsSubQuery,
|
||||
fetch_info: &SfChFetchInfo,
|
||||
reqctx: ReqCtxArc,
|
||||
node_config: &NodeConfigCached,
|
||||
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<EventFull>> + Send>>, Error> {
|
||||
debug!("make_event_blobs_pipe {subq:?}");
|
||||
if subq.backend() == TEST_BACKEND {
|
||||
make_event_blobs_pipe_test(subq, node_config).await
|
||||
} else {
|
||||
make_event_blobs_pipe_real(subq, fetch_info, node_config).await
|
||||
make_event_blobs_pipe_real(subq, fetch_info, reqctx, node_config).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -77,8 +77,7 @@ impl EventBlobsGeneratorI32Test00 {
|
||||
item.add_event(
|
||||
ts,
|
||||
pulse,
|
||||
Some(value.to_be_bytes().to_vec()),
|
||||
None,
|
||||
value.to_be_bytes().to_vec(),
|
||||
self.scalar_type.clone(),
|
||||
self.be,
|
||||
self.shape.clone(),
|
||||
@@ -178,8 +177,7 @@ impl EventBlobsGeneratorI32Test01 {
|
||||
item.add_event(
|
||||
ts,
|
||||
pulse,
|
||||
Some(value.to_be_bytes().to_vec()),
|
||||
None,
|
||||
value.to_be_bytes().to_vec(),
|
||||
self.scalar_type.clone(),
|
||||
self.be,
|
||||
self.shape.clone(),
|
||||
|
||||
Reference in New Issue
Block a user