This commit is contained in:
Dominik Werder
2023-03-30 17:04:11 +02:00
parent c09c5be926
commit a2e17848ba
28 changed files with 924 additions and 827 deletions

View File

@@ -5,6 +5,8 @@ use clap::Parser;
use daqbuffer::cli::ClientType;
use daqbuffer::cli::Opts;
use daqbuffer::cli::SubCmd;
use disk::AggQuerySingleChannel;
use disk::SfDbChConf;
use err::Error;
use netpod::log::*;
use netpod::query::CacheUsage;
@@ -132,14 +134,13 @@ fn simple_fetch() {
use netpod::timeunits::*;
use netpod::ByteOrder;
use netpod::Channel;
use netpod::ChannelConfig;
use netpod::ScalarType;
use netpod::Shape;
taskrun::run(async {
let _rh = daqbufp2::nodes::require_test_hosts_running()?;
let t1 = chrono::Utc::now();
let query = netpod::AggQuerySingleChannel {
channel_config: ChannelConfig {
let query = AggQuerySingleChannel {
channel_config: SfDbChConf {
channel: Channel {
backend: "sf-databuffer".into(),
name: "S10BC01-DBAM070:BAM_CH1_NORM".into(),

View File

@@ -45,7 +45,7 @@ pub async fn chconf_from_scylla_type_backend(channel: &Channel, ncc: &NodeConfig
let shape = Shape::from_scylla_shape_dims(&row.get::<_, Vec<i32>>(2))?;
let ret = ChConf {
backend,
series,
series: Some(series),
name,
scalar_type,
shape,
@@ -78,7 +78,7 @@ pub async fn chconf_from_scylla_type_backend(channel: &Channel, ncc: &NodeConfig
let shape = Shape::from_scylla_shape_dims(&row.get::<_, Vec<i32>>(3))?;
let ret = ChConf {
backend,
series,
series: Some(series),
name,
scalar_type,
shape,

View File

@@ -1,31 +1,50 @@
use crate::create_connection;
use crate::ErrConv;
use err::Error;
use netpod::log::*;
use netpod::Channel;
use netpod::NodeConfigCached;
// For sf-databuffer backend, given a Channel, try to complete the information if only id is given.
pub async fn sf_databuffer_fetch_channel_by_series(channel: Channel, ncc: &NodeConfigCached) -> Result<Channel, Error> {
info!("sf_databuffer_fetch_channel_by_series");
// TODO should not be needed at some point.
if channel.backend().is_empty() || channel.name().is_empty() {
let series = channel
.series()
.ok_or_else(|| Error::with_msg_no_trace("no series id given"))? as i64;
let pgcon = create_connection(&ncc.node_config.cluster.database).await?;
let mut rows = pgcon
.query("select name from channels where rowid = $1", &[&series])
.await
.err_conv()?;
if let Some(row) = rows.pop() {
let name: String = row.get(0);
let channel = Channel {
series: channel.series,
backend: ncc.node_config.cluster.backend.clone(),
name,
};
Ok(channel)
if let Some(series) = channel.series() {
if series < 1 {
error!("sf_databuffer_fetch_channel_by_series bad input: {channel:?}");
Err(Error::with_msg_no_trace(format!(
"sf_databuffer_fetch_channel_by_series bad input: {channel:?}"
)))
} else {
info!("sf_databuffer_fetch_channel_by_series do the lookup");
let series = channel
.series()
.ok_or_else(|| Error::with_msg_no_trace("no series id given"))? as i64;
let pgcon = create_connection(&ncc.node_config.cluster.database).await?;
let mut rows = pgcon
.query("select name from channels where rowid = $1", &[&series])
.await
.err_conv()?;
if let Some(row) = rows.pop() {
info!("sf_databuffer_fetch_channel_by_series got a row {row:?}");
let name: String = row.get(0);
let channel = Channel {
series: channel.series,
backend: ncc.node_config.cluster.backend.clone(),
name,
};
info!("sf_databuffer_fetch_channel_by_series return {channel:?}");
Ok(channel)
} else {
info!("sf_databuffer_fetch_channel_by_series nothing found");
Err(Error::with_msg_no_trace("can not find series"))
}
}
} else {
Err(Error::with_msg_no_trace("can not find series"))
Err(Error::with_msg_no_trace(format!(
"sf_databuffer_fetch_channel_by_series bad input: {channel:?}"
)))
}
} else {
Ok(channel)

View File

@@ -1,17 +1,19 @@
use crate::eventblobs::EventChunkerMultifile;
use crate::eventchunker::EventChunkerConf;
use crate::AggQuerySingleChannel;
use crate::SfDbChConf;
use netpod::range::evrange::NanoRange;
use netpod::test_data_base_path_databuffer;
use netpod::timeunits::*;
use netpod::ByteOrder;
use netpod::ByteSize;
use netpod::Channel;
use netpod::ChannelConfig;
use netpod::DiskIoTune;
use netpod::Node;
use netpod::ScalarType;
use netpod::SfDatabuffer;
use netpod::Shape;
use netpod::TsNano;
use streams::eventchunker::EventChunkerConf;
pub fn make_test_node(id: u32) -> Node {
Node {
@@ -43,8 +45,8 @@ fn agg_x_dim_0() {
async fn agg_x_dim_0_inner() {
let node = make_test_node(0);
let query = netpod::AggQuerySingleChannel {
channel_config: ChannelConfig {
let query = AggQuerySingleChannel {
channel_config: SfDbChConf {
channel: Channel {
backend: "sf-databuffer".into(),
name: "S10BC01-DBAM070:EOM1_T1".into(),
@@ -68,7 +70,7 @@ async fn agg_x_dim_0_inner() {
let range = NanoRange { beg: ts1, end: ts2 };
let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024));
// TODO let upstream already provide DiskIoTune:
let mut disk_io_tune = netpod::DiskIoTune::default_for_testing();
let mut disk_io_tune = DiskIoTune::default_for_testing();
disk_io_tune.read_buffer_len = query.buffer_size as usize;
let fut1 = EventChunkerMultifile::new(
range.clone(),
@@ -100,8 +102,8 @@ async fn agg_x_dim_1_inner() {
// /data/sf-databuffer/daq_swissfel/daq_swissfel_3/byTime/S10BC01-DBAM070\:BAM_CH1_NORM/*
// S10BC01-DBAM070:BAM_CH1_NORM
let node = make_test_node(0);
let query = netpod::AggQuerySingleChannel {
channel_config: ChannelConfig {
let query = AggQuerySingleChannel {
channel_config: SfDbChConf {
channel: Channel {
backend: "ks".into(),
name: "wave1".into(),
@@ -125,7 +127,7 @@ async fn agg_x_dim_1_inner() {
let range = NanoRange { beg: ts1, end: ts2 };
let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024));
// TODO let upstream already provide DiskIoTune:
let mut disk_io_tune = netpod::DiskIoTune::default_for_testing();
let mut disk_io_tune = DiskIoTune::default_for_testing();
disk_io_tune.read_buffer_len = query.buffer_size as usize;
let fut1 = super::eventblobs::EventChunkerMultifile::new(
range.clone(),

View File

@@ -1,18 +1,15 @@
use err::Error;
use netpod::range::evrange::NanoRange;
use netpod::Channel;
use netpod::ChannelConfig;
use netpod::NodeConfigCached;
use parse::channelconfig::extract_matching_config_entry;
use parse::channelconfig::read_local_config;
use parse::channelconfig::ChannelConfigs;
use parse::channelconfig::MatchingConfigEntry;
pub async fn config(
range: NanoRange,
channel: Channel,
node_config: &NodeConfigCached,
) -> Result<ChannelConfig, Error> {
use crate::SfDbChConf;
pub async fn config(range: NanoRange, channel: Channel, node_config: &NodeConfigCached) -> Result<SfDbChConf, Error> {
let channel_configs = read_local_config(channel.clone(), node_config.node.clone()).await?;
let entry_res = match extract_matching_config_entry(&range, &channel_configs) {
Ok(k) => k,
@@ -37,7 +34,7 @@ pub async fn config(
Ok(k) => k,
Err(e) => return Err(e)?,
};
let channel_config = ChannelConfig {
let channel_config = SfDbChConf {
channel: channel.clone(),
keyspace: entry.ks as u8,
time_bin_size: entry.bs.clone(),

View File

@@ -1,3 +1,5 @@
use crate::SfDbChConf;
use super::paths;
use bytes::BytesMut;
use err::ErrStr;
@@ -5,7 +7,6 @@ use err::Error;
use futures_util::StreamExt;
use netpod::log::*;
use netpod::range::evrange::NanoRange;
use netpod::ChannelConfig;
use netpod::Node;
use netpod::TsNano;
use std::fmt;
@@ -207,7 +208,7 @@ impl fmt::Debug for OpenedFile {
pub fn open_files(
range: &NanoRange,
channel_config: &ChannelConfig,
channel_config: &SfDbChConf,
node: Node,
) -> async_channel::Receiver<Result<OpenedFileSet, Error>> {
let (chtx, chrx) = async_channel::bounded(2);
@@ -236,7 +237,7 @@ pub fn open_files(
async fn open_files_inner(
chtx: &async_channel::Sender<Result<OpenedFileSet, Error>>,
range: &NanoRange,
channel_config: &ChannelConfig,
channel_config: &SfDbChConf,
node: Node,
) -> Result<(), Error> {
let channel_config = channel_config.clone();
@@ -276,7 +277,7 @@ Expanded to one event before and after the requested range, if exists.
*/
pub fn open_expanded_files(
range: &NanoRange,
channel_config: &ChannelConfig,
channel_config: &SfDbChConf,
node: Node,
) -> async_channel::Receiver<Result<OpenedFileSet, Error>> {
let (chtx, chrx) = async_channel::bounded(2);
@@ -297,7 +298,7 @@ pub fn open_expanded_files(
chrx
}
async fn get_timebins(channel_config: &ChannelConfig, node: Node) -> Result<Vec<u64>, Error> {
async fn get_timebins(channel_config: &SfDbChConf, node: Node) -> Result<Vec<u64>, Error> {
let mut timebins = Vec::new();
let p0 = paths::channel_timebins_dir_path(&channel_config, &node)?;
match tokio::fs::read_dir(&p0).await {
@@ -333,7 +334,7 @@ async fn get_timebins(channel_config: &ChannelConfig, node: Node) -> Result<Vec<
async fn open_expanded_files_inner(
chtx: &async_channel::Sender<Result<OpenedFileSet, Error>>,
range: &NanoRange,
channel_config: &ChannelConfig,
channel_config: &SfDbChConf,
node: Node,
) -> Result<(), Error> {
let channel_config = channel_config.clone();
@@ -414,7 +415,6 @@ mod test {
use netpod::range::evrange::NanoRange;
use netpod::test_data_base_path_databuffer;
use netpod::timeunits::*;
use netpod::ChannelConfig;
use std::path::PathBuf;
use tokio::fs::OpenOptions;
@@ -826,7 +826,7 @@ mod test {
series: None,
};
// TODO read config from disk? Or expose the config from data generator?
let channel_config = ChannelConfig {
let channel_config = SfDbChConf {
channel: chn,
keyspace: 2,
time_bin_size: TsNano(DAY),

View File

@@ -11,7 +11,6 @@ use items_0::WithLen;
use items_2::eventfull::EventFull;
use items_2::eventsdim0::EventsDim0;
use items_2::eventsdim1::EventsDim1;
#[allow(unused)]
use netpod::log::*;
use netpod::AggKind;
use netpod::ScalarType;
@@ -25,13 +24,17 @@ use std::task::Poll;
pub trait Endianness: Send + Unpin {
fn is_big() -> bool;
}
pub struct LittleEndian {}
pub struct BigEndian {}
impl Endianness for LittleEndian {
fn is_big() -> bool {
false
}
}
impl Endianness for BigEndian {
fn is_big() -> bool {
true
@@ -369,7 +372,7 @@ impl EventsDynStream {
fn handle_stream_item(
&mut self,
item: StreamItem<RangeCompletableItem<EventFull>>,
) -> Result<Option<Sitemty<Box<dyn items_0::Events>>>, Error> {
) -> Result<Option<Sitemty<Box<dyn Events>>>, Error> {
let ret = match item {
StreamItem::DataItem(item) => match item {
RangeCompletableItem::RangeComplete => {
@@ -396,7 +399,7 @@ impl EventsDynStream {
}
impl Stream for EventsDynStream {
type Item = Sitemty<Box<dyn items_0::Events>>;
type Item = Sitemty<Box<dyn Events>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;

View File

@@ -26,11 +26,16 @@ use futures_util::Stream;
use futures_util::StreamExt;
use futures_util::TryFutureExt;
use netpod::log::*;
use netpod::ChannelConfig;
use netpod::ByteOrder;
use netpod::Channel;
use netpod::DiskIoTune;
use netpod::Node;
use netpod::ReadSys;
use netpod::ScalarType;
use netpod::Shape;
use netpod::TsNano;
use serde::Deserialize;
use serde::Serialize;
use std::collections::VecDeque;
use std::future::Future;
use std::io::SeekFrom;
@@ -55,8 +60,29 @@ use tokio::io::AsyncSeekExt;
use tokio::io::ReadBuf;
use tokio::sync::mpsc;
// TODO move to databuffer-specific crate
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SfDbChConf {
pub channel: Channel,
pub keyspace: u8,
pub time_bin_size: TsNano,
pub scalar_type: ScalarType,
pub compression: bool,
pub shape: Shape,
pub array: bool,
pub byte_order: ByteOrder,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AggQuerySingleChannel {
pub channel_config: SfDbChConf,
pub timebin: u32,
pub tb_file_count: u32,
pub buffer_size: u32,
}
// TODO transform this into a self-test or remove.
pub async fn read_test_1(query: &netpod::AggQuerySingleChannel, node: Node) -> Result<netpod::BodyStream, Error> {
pub async fn read_test_1(query: &AggQuerySingleChannel, node: Node) -> Result<netpod::BodyStream, Error> {
let path = paths::datapath(query.timebin as u64, &query.channel_config, 0, &node);
debug!("try path: {:?}", path);
let fin = OpenOptions::new().read(true).open(path).await?;
@@ -744,7 +770,7 @@ trait ChannelConfigExt {
fn dtflags(&self) -> u8;
}
impl ChannelConfigExt for ChannelConfig {
impl ChannelConfigExt for SfDbChConf {
fn dtflags(&self) -> u8 {
let mut ret = 0;
if self.compression {

View File

@@ -1,6 +1,9 @@
use crate::dataopen::open_expanded_files;
use crate::dataopen::open_files;
use crate::dataopen::OpenedFileSet;
use crate::eventchunker::EventChunker;
use crate::eventchunker::EventChunkerConf;
use crate::SfDbChConf;
use err::Error;
use futures_util::Stream;
use futures_util::StreamExt;
@@ -14,14 +17,12 @@ use items_2::merger::Merger;
use netpod::log::*;
use netpod::range::evrange::NanoRange;
use netpod::timeunits::SEC;
use netpod::ChannelConfig;
use netpod::DiskIoTune;
use netpod::Node;
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use streams::eventchunker::EventChunker;
use streams::eventchunker::EventChunkerConf;
use streams::rangefilter2::RangeFilter2;
pub trait InputTraits: Stream<Item = Sitemty<EventFull>> {}
@@ -29,7 +30,7 @@ pub trait InputTraits: Stream<Item = Sitemty<EventFull>> {}
impl<T> InputTraits for T where T: Stream<Item = Sitemty<EventFull>> {}
pub struct EventChunkerMultifile {
channel_config: ChannelConfig,
channel_config: SfDbChConf,
file_chan: async_channel::Receiver<Result<OpenedFileSet, Error>>,
evs: Option<Pin<Box<dyn InputTraits + Send>>>,
disk_io_tune: DiskIoTune,
@@ -44,6 +45,7 @@ pub struct EventChunkerMultifile {
emit_count: usize,
do_emit_err_after: Option<usize>,
range_final: bool,
log_queue: VecDeque<LogItem>,
done: bool,
done_emit_range_final: bool,
complete: bool,
@@ -52,7 +54,7 @@ pub struct EventChunkerMultifile {
impl EventChunkerMultifile {
pub fn new(
range: NanoRange,
channel_config: ChannelConfig,
channel_config: SfDbChConf,
node: Node,
node_ix: usize,
disk_io_tune: DiskIoTune,
@@ -83,6 +85,7 @@ impl EventChunkerMultifile {
emit_count: 0,
do_emit_err_after: None,
range_final: false,
log_queue: VecDeque::new(),
done: false,
done_emit_range_final: false,
complete: false,
@@ -98,7 +101,9 @@ impl Stream for EventChunkerMultifile {
let _spg = span1.enter();
use Poll::*;
'outer: loop {
break if self.complete {
break if let Some(item) = self.log_queue.pop_front() {
Ready(Some(Ok(StreamItem::Log(item))))
} else if self.complete {
panic!("EventChunkerMultifile poll_next on complete");
} else if self.done_emit_range_final {
self.complete = true;
@@ -122,6 +127,12 @@ impl Stream for EventChunkerMultifile {
if min <= self.max_ts {
let msg = format!("EventChunkerMultifile repeated or unordered ts {}", min);
error!("{}", msg);
let item = LogItem {
node_ix: self.node_ix as _,
level: Level::INFO,
msg,
};
self.log_queue.push_back(item);
}
self.max_ts = max;
if let Some(after) = self.do_emit_err_after {
@@ -262,6 +273,8 @@ impl Stream for EventChunkerMultifile {
#[cfg(test)]
mod test {
use crate::eventblobs::EventChunkerMultifile;
use crate::eventchunker::EventChunkerConf;
use crate::SfDbChConf;
use err::Error;
use futures_util::StreamExt;
use items_0::streamitem::RangeCompletableItem;
@@ -272,10 +285,8 @@ mod test {
use netpod::timeunits::DAY;
use netpod::timeunits::MS;
use netpod::ByteSize;
use netpod::ChannelConfig;
use netpod::DiskIoTune;
use netpod::TsNano;
use streams::eventchunker::EventChunkerConf;
use streams::rangefilter2::RangeFilter2;
fn read_expanded_for_range(range: NanoRange, nodeix: usize) -> Result<(usize, Vec<u64>), Error> {
@@ -285,7 +296,7 @@ mod test {
series: None,
};
// TODO read config from disk.
let channel_config = ChannelConfig {
let channel_config = SfDbChConf {
channel: chn,
keyspace: 2,
time_bin_size: TsNano(DAY),

View File

@@ -1 +1,604 @@
use crate::SfDbChConf;
use bitshuffle::bitshuffle_decompress;
use bytes::Buf;
use bytes::BytesMut;
use err::Error;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::StatsItem;
use items_0::streamitem::StreamItem;
use items_0::Empty;
use items_0::WithLen;
use items_2::eventfull::EventFull;
use netpod::histo::HistoLog2;
use netpod::log::*;
use netpod::range::evrange::NanoRange;
use netpod::timeunits::SEC;
use netpod::ByteSize;
use netpod::EventDataReadStats;
use netpod::ScalarType;
use netpod::Shape;
use parse::channelconfig::CompressionMethod;
use std::path::PathBuf;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use std::time::Instant;
use streams::dtflags::*;
use streams::filechunkread::FileChunkRead;
use streams::needminbuffer::NeedMinBuffer;
pub struct EventChunker {
inp: NeedMinBuffer,
state: DataFileState,
need_min: u32,
channel_config: SfDbChConf,
errored: bool,
completed: bool,
range: NanoRange,
stats_conf: EventChunkerConf,
seen_beyond_range: bool,
sent_beyond_range: bool,
data_emit_complete: bool,
final_stats_sent: bool,
parsed_bytes: u64,
dbg_path: PathBuf,
max_ts: u64,
expand: bool,
do_decompress: bool,
decomp_dt_histo: HistoLog2,
item_len_emit_histo: HistoLog2,
seen_before_range_count: usize,
seen_after_range_count: usize,
unordered_warn_count: usize,
repeated_ts_warn_count: usize,
}
impl Drop for EventChunker {
fn drop(&mut self) {
// TODO collect somewhere
debug!(
"EventChunker Drop Stats:\ndecomp_dt_histo: {:?}\nitem_len_emit_histo: {:?}",
self.decomp_dt_histo, self.item_len_emit_histo
);
}
}
enum DataFileState {
FileHeader,
Event,
}
struct ParseResult {
events: EventFull,
parsed_bytes: u64,
}
#[derive(Clone, Debug)]
pub struct EventChunkerConf {
pub disk_stats_every: ByteSize,
}
impl EventChunkerConf {
pub fn new(disk_stats_every: ByteSize) -> Self {
Self { disk_stats_every }
}
}
impl EventChunker {
// TODO `expand` flag usage
pub fn from_start(
inp: Pin<Box<dyn Stream<Item = Result<FileChunkRead, Error>> + Send>>,
channel_config: SfDbChConf,
range: NanoRange,
stats_conf: EventChunkerConf,
dbg_path: PathBuf,
expand: bool,
do_decompress: bool,
) -> Self {
info!("EventChunker::{} do_decompress {}", "from_start", do_decompress);
let mut inp = NeedMinBuffer::new(inp);
inp.set_need_min(6);
Self {
inp,
state: DataFileState::FileHeader,
need_min: 6,
channel_config,
errored: false,
completed: false,
range,
stats_conf,
seen_beyond_range: false,
sent_beyond_range: false,
data_emit_complete: false,
final_stats_sent: false,
parsed_bytes: 0,
dbg_path,
max_ts: 0,
expand,
do_decompress,
decomp_dt_histo: HistoLog2::new(8),
item_len_emit_histo: HistoLog2::new(0),
seen_before_range_count: 0,
seen_after_range_count: 0,
unordered_warn_count: 0,
repeated_ts_warn_count: 0,
}
}
// TODO `expand` flag usage
pub fn from_event_boundary(
inp: Pin<Box<dyn Stream<Item = Result<FileChunkRead, Error>> + Send>>,
channel_config: SfDbChConf,
range: NanoRange,
stats_conf: EventChunkerConf,
dbg_path: PathBuf,
expand: bool,
do_decompress: bool,
) -> Self {
info!(
"EventChunker::{} do_decompress {}",
"from_event_boundary", do_decompress
);
let mut ret = Self::from_start(inp, channel_config, range, stats_conf, dbg_path, expand, do_decompress);
ret.state = DataFileState::Event;
ret.need_min = 4;
ret.inp.set_need_min(4);
ret
}
fn parse_buf(&mut self, buf: &mut BytesMut) -> Result<ParseResult, Error> {
span!(Level::INFO, "EventChunker::parse_buf").in_scope(|| self.parse_buf_inner(buf))
}
fn parse_buf_inner(&mut self, buf: &mut BytesMut) -> Result<ParseResult, Error> {
let mut ret = EventFull::empty();
let mut parsed_bytes = 0;
use byteorder::{ReadBytesExt, BE};
loop {
if (buf.len() as u32) < self.need_min {
break;
}
match self.state {
DataFileState::FileHeader => {
if buf.len() < 6 {
Err(Error::with_msg("need min 6 for FileHeader"))?;
}
let mut sl = std::io::Cursor::new(buf.as_ref());
let fver = sl.read_i16::<BE>().unwrap();
if fver != 0 {
Err(Error::with_msg("unexpected data file version"))?;
}
let len = sl.read_i32::<BE>().unwrap();
if len <= 0 || len >= 128 {
Err(Error::with_msg("large channel header len"))?;
}
let totlen = len as usize + 2;
if buf.len() < totlen {
self.need_min = totlen as u32;
break;
} else {
sl.advance(len as usize - 8);
let len2 = sl.read_i32::<BE>().unwrap();
if len != len2 {
Err(Error::with_msg("channel header len mismatch"))?;
}
String::from_utf8(buf.as_ref()[6..(len as usize + 6 - 8)].to_vec())?;
self.state = DataFileState::Event;
self.need_min = 4;
buf.advance(totlen);
parsed_bytes += totlen as u64;
}
}
DataFileState::Event => {
let p0 = 0;
let mut sl = std::io::Cursor::new(buf.as_ref());
let len = sl.read_i32::<BE>().unwrap();
if len < 20 || len > 1024 * 1024 * 20 {
Err(Error::with_msg("unexpected large event chunk"))?;
}
let len = len as u32;
if (buf.len() as u32) < len {
self.need_min = len as u32;
break;
} else {
let mut sl = std::io::Cursor::new(buf.as_ref());
let len1b = sl.read_i32::<BE>().unwrap();
assert!(len == len1b as u32);
let _ttl = sl.read_i64::<BE>().unwrap();
let ts = sl.read_i64::<BE>().unwrap() as u64;
let pulse = sl.read_i64::<BE>().unwrap() as u64;
if ts == self.max_ts {
if self.repeated_ts_warn_count < 20 {
let msg = format!(
"EventChunker repeated event ts ix {} ts {}.{} max_ts {}.{} config {:?} path {:?}",
self.repeated_ts_warn_count,
ts / SEC,
ts % SEC,
self.max_ts / SEC,
self.max_ts % SEC,
self.channel_config.shape,
self.dbg_path
);
warn!("{}", msg);
self.repeated_ts_warn_count += 1;
}
}
if ts < self.max_ts {
if self.unordered_warn_count < 20 {
let msg = format!(
"EventChunker unordered event ix {} ts {}.{} max_ts {}.{} config {:?} path {:?}",
self.unordered_warn_count,
ts / SEC,
ts % SEC,
self.max_ts / SEC,
self.max_ts % SEC,
self.channel_config.shape,
self.dbg_path
);
warn!("{}", msg);
self.unordered_warn_count += 1;
let e = Error::with_public_msg_no_trace(msg);
return Err(e);
}
}
self.max_ts = ts;
if ts >= self.range.end {
self.seen_after_range_count += 1;
if !self.expand || self.seen_after_range_count >= 2 {
self.seen_beyond_range = true;
self.data_emit_complete = true;
break;
}
}
if ts < self.range.beg {
self.seen_before_range_count += 1;
if self.seen_before_range_count > 1 {
let msg = format!(
"seen before range: event ts {}.{} range beg {}.{} range end {}.{} pulse {} config {:?} path {:?}",
ts / SEC,
ts % SEC,
self.range.beg / SEC,
self.range.beg % SEC,
self.range.end / SEC,
self.range.end % SEC,
pulse,
self.channel_config.shape,
self.dbg_path
);
warn!("{}", msg);
let e = Error::with_public_msg(msg);
Err(e)?;
}
}
let _ioc_ts = sl.read_i64::<BE>().unwrap();
let status = sl.read_i8().unwrap();
let severity = sl.read_i8().unwrap();
let optional = sl.read_i32::<BE>().unwrap();
if status != 0 {
Err(Error::with_msg(format!("status != 0: {}", status)))?;
}
if severity != 0 {
Err(Error::with_msg(format!("severity != 0: {}", severity)))?;
}
if optional != -1 {
Err(Error::with_msg(format!("optional != -1: {}", optional)))?;
}
let type_flags = sl.read_u8().unwrap();
let type_index = sl.read_u8().unwrap();
if type_index > 13 {
Err(Error::with_msg(format!("type_index: {}", type_index)))?;
}
let scalar_type = ScalarType::from_dtype_index(type_index)?;
let is_compressed = type_flags & COMPRESSION != 0;
let is_array = type_flags & ARRAY != 0;
let is_big_endian = type_flags & BIG_ENDIAN != 0;
let is_shaped = type_flags & SHAPE != 0;
if let Shape::Wave(_) = self.channel_config.shape {
if !is_array {
Err(Error::with_msg(format!("dim1 but not array {:?}", self.channel_config)))?;
}
}
let compression_method = if is_compressed { sl.read_u8().unwrap() } else { 0 };
let shape_dim = if is_shaped { sl.read_u8().unwrap() } else { 0 };
assert!(compression_method <= 0);
assert!(!is_shaped || (shape_dim >= 1 && shape_dim <= 2));
let mut shape_lens = [0, 0, 0, 0];
for i1 in 0..shape_dim {
shape_lens[i1 as usize] = sl.read_u32::<BE>().unwrap();
}
let shape_this = {
if is_shaped {
if shape_dim == 1 {
Shape::Wave(shape_lens[0])
} else if shape_dim == 2 {
Shape::Image(shape_lens[0], shape_lens[1])
} else {
err::todoval()
}
} else {
Shape::Scalar
}
};
let comp_this = if is_compressed {
if compression_method == 0 {
Some(CompressionMethod::BitshuffleLZ4)
} else {
err::todoval()
}
} else {
None
};
let p1 = sl.position();
let k1 = len as u64 - (p1 - p0) - 4;
if is_compressed {
//debug!("event ts {} is_compressed {}", ts, is_compressed);
let value_bytes = sl.read_u64::<BE>().unwrap();
let block_size = sl.read_u32::<BE>().unwrap();
//debug!("event len {} ts {} is_compressed {} shape_dim {} len-dim-0 {} value_bytes {} block_size {}", len, ts, is_compressed, shape_dim, shape_lens[0], value_bytes, block_size);
match self.channel_config.shape {
Shape::Scalar => {
assert!(value_bytes < 1024 * 1);
}
Shape::Wave(_) => {
assert!(value_bytes < 1024 * 64);
}
Shape::Image(_, _) => {
assert!(value_bytes < 1024 * 1024 * 20);
}
}
assert!(block_size <= 1024 * 32);
let type_size = scalar_type.bytes() as u32;
let ele_count = value_bytes / type_size as u64;
let ele_size = type_size;
match self.channel_config.shape {
Shape::Scalar => {
if is_array {
Err(Error::with_msg(format!(
"ChannelConfig expects Scalar but we find event is_array"
)))?;
}
}
Shape::Wave(dim1count) => {
if dim1count != ele_count as u32 {
Err(Error::with_msg(format!(
"ChannelConfig expects {:?} but event has ele_count {}",
self.channel_config.shape, ele_count,
)))?;
}
}
Shape::Image(n1, n2) => {
let nt = n1 as usize * n2 as usize;
if nt != ele_count as usize {
Err(Error::with_msg(format!(
"ChannelConfig expects {:?} but event has ele_count {}",
self.channel_config.shape, ele_count,
)))?;
}
}
}
let data = &buf.as_ref()[(p1 as usize)..(p1 as usize + k1 as usize)];
let decomp = {
if self.do_decompress {
assert!(data.len() > 12);
let ts1 = Instant::now();
let decomp_bytes = (type_size * ele_count as u32) as usize;
let mut decomp = vec![0; decomp_bytes];
// TODO limit the buf slice range
match bitshuffle_decompress(
&data[12..],
&mut decomp,
ele_count as usize,
ele_size as usize,
0,
) {
Ok(c1) => {
assert!(c1 as u64 + 12 == k1);
let ts2 = Instant::now();
let dt = ts2.duration_since(ts1);
// TODO analyze the histo
self.decomp_dt_histo.ingest(dt.as_secs() as u32 + dt.subsec_micros());
Some(decomp)
}
Err(e) => {
return Err(Error::with_msg(format!("decompression failed {:?}", e)))?;
}
}
} else {
None
}
};
ret.add_event(
ts,
pulse,
Some(data.to_vec()),
decomp,
ScalarType::from_dtype_index(type_index)?,
is_big_endian,
shape_this,
comp_this,
);
} else {
if len < p1 as u32 + 4 {
let msg = format!("uncomp len: {} p1: {}", len, p1);
Err(Error::with_msg(msg))?;
}
let vlen = len - p1 as u32 - 4;
let data = &buf[p1 as usize..(p1 as u32 + vlen) as usize];
ret.add_event(
ts,
pulse,
Some(data.to_vec()),
Some(data.to_vec()),
ScalarType::from_dtype_index(type_index)?,
is_big_endian,
shape_this,
comp_this,
);
}
buf.advance(len as usize);
parsed_bytes += len as u64;
self.need_min = 4;
}
}
}
}
Ok(ParseResult {
events: ret,
parsed_bytes,
})
}
}
impl Stream for EventChunker {
type Item = Result<StreamItem<RangeCompletableItem<EventFull>>, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
'outer: loop {
break if self.completed {
panic!("EventChunker poll_next on completed");
} else if self.errored {
self.completed = true;
Ready(None)
} else if self.parsed_bytes >= self.stats_conf.disk_stats_every.bytes() as u64 {
let item = EventDataReadStats {
parsed_bytes: self.parsed_bytes,
};
self.parsed_bytes = 0;
let ret = StreamItem::Stats(StatsItem::EventDataReadStats(item));
Ready(Some(Ok(ret)))
} else if self.sent_beyond_range {
self.completed = true;
Ready(None)
} else if self.final_stats_sent {
self.sent_beyond_range = true;
trace!("sent_beyond_range");
if self.seen_beyond_range {
trace!("sent_beyond_range RangeComplete");
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
} else {
trace!("sent_beyond_range non-complete");
continue 'outer;
}
} else if self.data_emit_complete {
let item = EventDataReadStats {
parsed_bytes: self.parsed_bytes,
};
self.parsed_bytes = 0;
let ret = StreamItem::Stats(StatsItem::EventDataReadStats(item));
self.final_stats_sent = true;
Ready(Some(Ok(ret)))
} else {
match self.inp.poll_next_unpin(cx) {
Ready(Some(Ok(mut fcr))) => {
if false {
// TODO collect for stats:
info!(
"file read bytes {} ms {}",
fcr.buf().len(),
fcr.duration().as_millis()
);
}
let r = self.parse_buf(fcr.buf_mut());
match r {
Ok(res) => {
self.parsed_bytes += res.parsed_bytes;
if fcr.buf().len() > 0 {
// TODO gather stats about this:
self.inp.put_back(fcr);
}
match self.channel_config.shape {
Shape::Scalar => {
if self.need_min > 1024 * 8 {
let msg =
format!("spurious EventChunker asks for need_min {}", self.need_min);
self.errored = true;
return Ready(Some(Err(Error::with_msg(msg))));
}
}
Shape::Wave(_) => {
if self.need_min > 1024 * 32 {
let msg =
format!("spurious EventChunker asks for need_min {}", self.need_min);
self.errored = true;
return Ready(Some(Err(Error::with_msg(msg))));
}
}
Shape::Image(_, _) => {
if self.need_min > 1024 * 1024 * 20 {
let msg =
format!("spurious EventChunker asks for need_min {}", self.need_min);
self.errored = true;
return Ready(Some(Err(Error::with_msg(msg))));
}
}
}
let x = self.need_min;
self.inp.set_need_min(x);
if false {
info!(
"EventChunker emits {} events tss {:?}",
res.events.len(),
res.events.tss
);
};
self.item_len_emit_histo.ingest(res.events.len() as u32);
let ret = StreamItem::DataItem(RangeCompletableItem::Data(res.events));
Ready(Some(Ok(ret)))
}
Err(e) => {
self.errored = true;
Ready(Some(Err(e.into())))
}
}
}
Ready(Some(Err(e))) => {
self.errored = true;
Ready(Some(Err(e)))
}
Ready(None) => {
self.data_emit_complete = true;
continue 'outer;
}
Pending => Pending,
}
};
}
}
}
#[cfg(test)]
mod test {
//use err::Error;
//use netpod::timeunits::*;
//use netpod::{ByteSize, Nanos};
/*
#[test]
fn read_expanded_for_range(range: netpod::NanoRange, nodeix: usize) -> Result<(usize, usize), Error> {
let chn = netpod::Channel {
backend: "testbackend".into(),
name: "scalar-i32-be".into(),
};
// TODO read config from disk.
let channel_config = ChannelConfig {
channel: chn,
keyspace: 2,
time_bin_size: Nanos { ns: DAY },
scalar_type: netpod::ScalarType::I32,
byte_order: netpod::ByteOrder::big_endian(),
shape: netpod::Shape::Scalar,
array: false,
compression: false,
};
let cluster = taskrun::test_cluster();
let node = cluster.nodes[nodeix].clone();
let buffer_size = 512;
let event_chunker_conf = EventChunkerConf {
disk_stats_every: ByteSize::kb(1024),
};
}
*/
}

View File

@@ -1,4 +1,5 @@
use crate::ChannelConfigExt;
use crate::SfDbChConf;
use bitshuffle::bitshuffle_compress;
use bytes::BufMut;
use bytes::BytesMut;
@@ -7,7 +8,6 @@ use netpod::log::*;
use netpod::timeunits::*;
use netpod::ByteOrder;
use netpod::Channel;
use netpod::ChannelConfig;
use netpod::GenVar;
use netpod::Node;
use netpod::ScalarType;
@@ -31,7 +31,7 @@ pub async fn gen_test_data() -> Result<(), Error> {
};
{
let chn = ChannelGenProps {
config: ChannelConfig {
config: SfDbChConf {
channel: Channel {
backend: backend.clone(),
name: "scalar-i32-be".into(),
@@ -50,7 +50,7 @@ pub async fn gen_test_data() -> Result<(), Error> {
};
ensemble.channels.push(chn);
let chn = ChannelGenProps {
config: ChannelConfig {
config: SfDbChConf {
channel: Channel {
backend: backend.clone(),
name: "wave-f64-be-n21".into(),
@@ -69,7 +69,7 @@ pub async fn gen_test_data() -> Result<(), Error> {
};
ensemble.channels.push(chn);
let chn = ChannelGenProps {
config: ChannelConfig {
config: SfDbChConf {
channel: Channel {
backend: backend.clone(),
name: "wave-u16-le-n77".into(),
@@ -88,7 +88,7 @@ pub async fn gen_test_data() -> Result<(), Error> {
};
ensemble.channels.push(chn);
let chn = ChannelGenProps {
config: ChannelConfig {
config: SfDbChConf {
channel: Channel {
backend: backend.clone(),
name: "tw-scalar-i32-be".into(),
@@ -107,7 +107,7 @@ pub async fn gen_test_data() -> Result<(), Error> {
};
ensemble.channels.push(chn);
let chn = ChannelGenProps {
config: ChannelConfig {
config: SfDbChConf {
channel: Channel {
backend: backend.clone(),
name: "const-regular-scalar-i32-be".into(),
@@ -156,7 +156,7 @@ struct Ensemble {
}
pub struct ChannelGenProps {
config: ChannelConfig,
config: SfDbChConf,
time_spacing: u64,
gen_var: GenVar,
}
@@ -204,12 +204,7 @@ async fn gen_channel(chn: &ChannelGenProps, split: u32, node: &Node, ensemble: &
Ok(())
}
async fn gen_config(
config_path: &Path,
config: &ChannelConfig,
_node: &Node,
_ensemble: &Ensemble,
) -> Result<(), Error> {
async fn gen_config(config_path: &Path, config: &SfDbChConf, _node: &Node, _ensemble: &Ensemble) -> Result<(), Error> {
let path = config_path.join("latest");
tokio::fs::create_dir_all(&path).await?;
let path = path.join("00000_Config");
@@ -337,7 +332,7 @@ async fn gen_timebin(
pulse: u64,
ts_spacing: u64,
channel_path: &Path,
config: &ChannelConfig,
config: &SfDbChConf,
split: u32,
_node: &Node,
ensemble: &Ensemble,
@@ -406,7 +401,7 @@ async fn gen_timebin(
Ok(ret)
}
async fn gen_datafile_header(file: &mut CountedFile, config: &ChannelConfig) -> Result<(), Error> {
async fn gen_datafile_header(file: &mut CountedFile, config: &SfDbChConf) -> Result<(), Error> {
let mut buf = BytesMut::with_capacity(1024);
let cnenc = config.channel.name.as_bytes();
let len1 = cnenc.len() + 8;
@@ -424,7 +419,7 @@ async fn gen_event(
evix: u64,
ts: TsNano,
pulse: u64,
config: &ChannelConfig,
config: &SfDbChConf,
gen_var: &GenVar,
) -> Result<(), Error> {
let ttl = 0xcafecafe;

View File

@@ -1,13 +1,13 @@
use crate::SfDbChConf;
use err::Error;
use futures_util::StreamExt;
use netpod::timeunits::MS;
use netpod::ChannelConfig;
use netpod::Node;
use netpod::TsNano;
use std::path::PathBuf;
// TODO remove/replace this
pub fn datapath(timebin: u64, config: &netpod::ChannelConfig, split: u32, node: &Node) -> PathBuf {
pub fn datapath(timebin: u64, config: &SfDbChConf, split: u32, node: &Node) -> PathBuf {
node.sf_databuffer
.as_ref()
.unwrap()
@@ -30,11 +30,7 @@ Return potential datafile paths for the given timebin.
It says "potential datafile paths" because we don't open the file here yet and of course,
files may vanish until then. Also, the timebin may actually not exist.
*/
pub async fn datapaths_for_timebin(
timebin: u64,
config: &netpod::ChannelConfig,
node: &Node,
) -> Result<Vec<PathBuf>, Error> {
pub async fn datapaths_for_timebin(timebin: u64, config: &SfDbChConf, node: &Node) -> Result<Vec<PathBuf>, Error> {
let sfc = node.sf_databuffer.as_ref().unwrap();
let timebin_path = sfc
.data_base_path
@@ -84,7 +80,7 @@ pub async fn datapaths_for_timebin(
Ok(ret)
}
pub fn channel_timebins_dir_path(channel_config: &ChannelConfig, node: &Node) -> Result<PathBuf, Error> {
pub fn channel_timebins_dir_path(channel_config: &SfDbChConf, node: &Node) -> Result<PathBuf, Error> {
let sfc = node.sf_databuffer.as_ref().unwrap();
let ret = sfc
.data_base_path
@@ -94,20 +90,20 @@ pub fn channel_timebins_dir_path(channel_config: &ChannelConfig, node: &Node) ->
Ok(ret)
}
pub fn data_dir_path(ts: TsNano, channel_config: &ChannelConfig, split: u32, node: &Node) -> Result<PathBuf, Error> {
pub fn data_dir_path(ts: TsNano, channel_config: &SfDbChConf, split: u32, node: &Node) -> Result<PathBuf, Error> {
let ret = channel_timebins_dir_path(channel_config, node)?
.join(format!("{:019}", ts.ns() / channel_config.time_bin_size.ns()))
.join(format!("{:010}", split));
Ok(ret)
}
pub fn data_path(ts: TsNano, channel_config: &ChannelConfig, split: u32, node: &Node) -> Result<PathBuf, Error> {
pub fn data_path(ts: TsNano, channel_config: &SfDbChConf, split: u32, node: &Node) -> Result<PathBuf, Error> {
let fname = format!("{:019}_{:05}_Data", channel_config.time_bin_size.ns() / MS, 0);
let ret = data_dir_path(ts, channel_config, split, node)?.join(fname);
Ok(ret)
}
pub fn index_path(ts: TsNano, channel_config: &ChannelConfig, split: u32, node: &Node) -> Result<PathBuf, Error> {
pub fn index_path(ts: TsNano, channel_config: &SfDbChConf, split: u32, node: &Node) -> Result<PathBuf, Error> {
let fname = format!("{:019}_{:05}_Data_Index", channel_config.time_bin_size.ns() / MS, 0);
let ret = data_dir_path(ts, channel_config, split, node)?.join(fname);
Ok(ret)

View File

@@ -1,4 +1,6 @@
use crate::eventblobs::EventChunkerMultifile;
use crate::eventchunker::EventChunkerConf;
use crate::SfDbChConf;
use err::Error;
use futures_util::stream;
use futures_util::Stream;
@@ -12,25 +14,24 @@ use netpod::log::*;
use netpod::range::evrange::NanoRange;
use netpod::AggKind;
use netpod::ByteSize;
use netpod::ChConf;
use netpod::Channel;
use netpod::DiskIoTune;
use netpod::NodeConfigCached;
use netpod::ScalarType;
use netpod::Shape;
use parse::channelconfig::extract_matching_config_entry;
use parse::channelconfig::read_local_config;
use parse::channelconfig::ConfigEntry;
use parse::channelconfig::MatchingConfigEntry;
use query::api4::events::PlainEventsQuery;
use std::pin::Pin;
use streams::eventchunker::EventChunkerConf;
fn make_num_pipeline_stream_evs(
scalar_type: ScalarType,
shape: Shape,
chconf: ChConf,
agg_kind: AggKind,
event_blobs: EventChunkerMultifile,
) -> Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>> {
let scalar_type = chconf.scalar_type.clone();
let shape = chconf.shape.clone();
let event_stream = match crate::decode::EventsDynStream::new(scalar_type, shape, agg_kind, event_blobs) {
Ok(k) => k,
Err(e) => {
@@ -55,18 +56,33 @@ fn make_num_pipeline_stream_evs(
pub async fn make_event_pipe(
evq: &PlainEventsQuery,
node_config: &NodeConfigCached,
chconf: ChConf,
ncc: &NodeConfigCached,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>, Error> {
info!("---------- disk::raw::conn::make_event_pipe");
if false {
match dbconn::channel_exists(&evq.channel(), &node_config).await {
match dbconn::channel_exists(&evq.channel(), &ncc).await {
Ok(_) => (),
Err(e) => return Err(e)?,
}
}
let chn = evq.channel().clone();
let chn = if chn.name().is_empty() {
if let Some(series) = chn.series() {
if series < 1 {
error!("BAD QUERY: {evq:?}");
return Err(Error::with_msg_no_trace(format!("BAD QUERY: {evq:?}")));
} else {
dbconn::query::sf_databuffer_fetch_channel_by_series(chn, ncc).await?
}
} else {
chn
}
} else {
chn
};
let range = evq.range().clone();
let channel_config =
crate::channelconfig::config(evq.range().try_into()?, evq.channel().clone(), node_config).await;
let channel_config = crate::channelconfig::config(evq.range().try_into()?, chn, ncc).await;
let channel_config = match channel_config {
Ok(x) => x,
Err(e) => {
@@ -80,14 +96,14 @@ pub async fn make_event_pipe(
}
}
};
trace!(
info!(
"make_event_pipe need_expand {need_expand} {evq:?}",
need_expand = evq.one_before_range()
);
let event_chunker_conf = EventChunkerConf::new(ByteSize::kb(1024));
// TODO should not need this for correctness.
// Should limit based on return size and latency.
let out_max_len = if node_config.node_config.cluster.is_central_storage {
let out_max_len = if ncc.node_config.cluster.is_central_storage {
128
} else {
128
@@ -95,18 +111,16 @@ pub async fn make_event_pipe(
let event_blobs = EventChunkerMultifile::new(
(&range).try_into()?,
channel_config.clone(),
node_config.node.clone(),
node_config.ix,
ncc.node.clone(),
ncc.ix,
DiskIoTune::default(),
event_chunker_conf,
evq.one_before_range(),
true,
out_max_len,
);
let scalar_type = channel_config.scalar_type.clone();
let shape = channel_config.shape.clone();
error!("TODO replace AggKind in the called code");
let pipe = make_num_pipeline_stream_evs(scalar_type, shape.clone(), AggKind::TimeWeightedScalar, event_blobs);
let pipe = make_num_pipeline_stream_evs(chconf, AggKind::TimeWeightedScalar, event_blobs);
Ok(pipe)
}
@@ -115,6 +129,7 @@ pub async fn get_applicable_entry(
channel: Channel,
node_config: &NodeConfigCached,
) -> Result<ConfigEntry, Error> {
info!("---------- disk::raw::conn::get_applicable_entry");
let channel_config = read_local_config(channel.clone(), node_config.node.clone()).await?;
let entry_res = match extract_matching_config_entry(range, &channel_config) {
Ok(k) => k,
@@ -156,7 +171,7 @@ pub fn make_local_event_blobs_stream(
Ok(k) => k,
Err(e) => return Err(e)?,
};
let channel_config = netpod::ChannelConfig {
let channel_config = SfDbChConf {
channel,
keyspace: entry.ks as u8,
time_bin_size: entry.bs.clone(),
@@ -202,7 +217,7 @@ pub fn make_remote_event_blobs_stream(
Ok(k) => k,
Err(e) => return Err(e)?,
};
let channel_config = netpod::ChannelConfig {
let channel_config = SfDbChConf {
channel,
keyspace: entry.ks as u8,
time_bin_size: entry.bs.clone(),

View File

@@ -1,5 +1,8 @@
use clap::ArgAction;
use clap::Parser;
use disk::eventchunker::EventChunker;
use disk::eventchunker::EventChunkerConf;
use disk::SfDbChConf;
use err::Error;
#[allow(unused)]
use netpod::log::*;
@@ -7,11 +10,8 @@ use netpod::range::evrange::NanoRange;
use netpod::ByteOrder;
use netpod::ByteSize;
use netpod::Channel;
use netpod::ChannelConfig;
use netpod::Shape;
use std::path::PathBuf;
use streams::eventchunker::EventChunker;
use streams::eventchunker::EventChunkerConf;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
@@ -80,7 +80,7 @@ pub fn main() -> Result<(), Error> {
let disk_io_tune = netpod::DiskIoTune::default();
let inp = Box::pin(disk::file_content_stream(path.clone(), file, disk_io_tune));
let ce = &config.entries[0];
let channel_config = ChannelConfig {
let channel_config = SfDbChConf {
channel: Channel {
backend: String::new(),
name: config.channel_name.clone(),

View File

@@ -6,6 +6,7 @@ use crate::BodyStream;
use crate::ReqCtx;
use bytes::BufMut;
use bytes::BytesMut;
use disk::eventchunker::EventChunkerConf;
use disk::merge::mergedblobsfromremotes::MergedBlobsFromRemotes;
use disk::raw::conn::make_local_event_blobs_stream;
use futures_util::stream;
@@ -59,7 +60,6 @@ use std::task::Context;
use std::task::Poll;
use std::time::Duration;
use std::time::Instant;
use streams::eventchunker::EventChunkerConf;
use tracing_futures::Instrument;
use url::Url;

View File

@@ -30,7 +30,7 @@ async fn binned_json(url: Url, req: Request<Body>, node_config: &NodeConfigCache
let chconf = chconf_from_binned(&query, node_config).await?;
// Update the series id since we don't require some unique identifier yet.
let mut query = query;
query.set_series_id(chconf.series);
query.set_series_id(chconf.try_series()?);
let query = query;
// ---
let span1 = span!(

View File

@@ -78,7 +78,7 @@ async fn plain_events_binary(
info!("plain_events_binary chconf_from_events_v1: {chconf:?}");
// Update the series id since we don't require some unique identifier yet.
let mut query = query;
query.set_series_id(chconf.series);
query.set_series_id(chconf.try_series()?);
let query = query;
// ---
let _ = query;
@@ -103,7 +103,7 @@ async fn plain_events_json(
info!("plain_events_json chconf_from_events_v1: {chconf:?}");
// Update the series id since we don't require some unique identifier yet.
let mut query = query;
query.set_series_id(chconf.series);
query.set_series_id(chconf.try_series()?);
let query = query;
// ---
//let query = RawEventsQuery::new(query.channel().clone(), query.range().clone(), AggKind::Plain);

View File

@@ -74,7 +74,8 @@ impl ConnectionStatusEvents {
.as_ref()
.ok_or_else(|| Error::with_public_msg_no_trace(format!("no scylla configured")))?;
let _scy = scyllaconn::create_scy_session(scyco).await?;
let chconf = nodenet::channelconfig::channel_config(q.range().clone(), q.channel().clone(), node_config).await?;
let chconf =
nodenet::channelconfig::channel_config(q.range().clone(), q.channel().clone(), node_config).await?;
let _series = chconf.series;
let _do_one_before_range = true;
let ret = Vec::new();
@@ -148,10 +149,15 @@ impl ChannelStatusEvents {
.as_ref()
.ok_or_else(|| Error::with_public_msg_no_trace(format!("no scylla configured")))?;
let scy = scyllaconn::create_scy_session(scyco).await?;
let chconf = nodenet::channelconfig::channel_config(q.range().clone(),q.channel().clone(), node_config).await?;
let chconf =
nodenet::channelconfig::channel_config(q.range().clone(), q.channel().clone(), node_config).await?;
let do_one_before_range = true;
let mut stream =
scyllaconn::status::StatusStreamScylla::new(chconf.series, q.range().clone(), do_one_before_range, scy);
let mut stream = scyllaconn::status::StatusStreamScylla::new(
chconf.try_series()?,
q.range().clone(),
do_one_before_range,
scy,
);
let mut ret = Vec::new();
while let Some(item) = stream.next().await {
let item = item?;

View File

@@ -42,10 +42,7 @@ pub async fn chconf_from_events_v1(q: &PlainEventsQuery, ncc: &NodeConfigCached)
pub async fn chconf_from_prebinned(q: &PreBinnedQuery, _ncc: &NodeConfigCached) -> Result<ChConf, Error> {
let ret = ChConf {
backend: q.channel().backend().into(),
series: q
.channel()
.series()
.expect("PreBinnedQuery is expected to contain the series id"),
series: q.channel().series().clone(),
name: q.channel().name().into(),
scalar_type: q.scalar_type().clone(),
shape: q.shape().clone(),
@@ -105,7 +102,7 @@ impl ChannelConfigHandler {
let c = nodenet::channelconfig::channel_config(q.range.clone(), q.channel.clone(), node_config).await?;
ChannelConfigResponse {
channel: Channel {
series: Some(c.series),
series: c.series.clone(),
backend: q.channel.backend().into(),
name: c.name,
},

View File

@@ -751,8 +751,7 @@ impl<NTY: ScalarOps> TimeBinnable for EventsDim0<NTY> {
impl<STY> TypeName for EventsDim0<STY> {
fn type_name(&self) -> String {
let self_name = any::type_name::<Self>();
let sty = any::type_name::<STY>();
format!("EventsDim0<{sty}> aka {self_name}<{sty}>")
format!("{self_name}")
}
}

View File

@@ -1,9 +1,9 @@
pub use crate::Error;
use crate::Error;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::container::ByteEstimate;
use items_0::streamitem::sitem_data;
use items_0::streamitem::LogItem;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::Sitemty;
use items_0::streamitem::StreamItem;
@@ -20,6 +20,7 @@ use std::task::Context;
use std::task::Poll;
const OUT_MAX_BYTES: u64 = 1024 * 200;
const DO_DETECT_NON_MONO: bool = true;
#[allow(unused)]
macro_rules! trace2 {
@@ -60,6 +61,8 @@ pub struct Merger<T> {
out_max_len: usize,
range_complete: Vec<bool>,
out_of_band_queue: VecDeque<Sitemty<T>>,
log_queue: VecDeque<LogItem>,
dim0ix_max: u64,
done_data: bool,
done_buffered: bool,
done_range_complete: bool,
@@ -100,6 +103,8 @@ where
out_max_len,
range_complete: vec![false; n],
out_of_band_queue: VecDeque::new(),
log_queue: VecDeque::new(),
dim0ix_max: 0,
done_data: false,
done_buffered: false,
done_range_complete: false,
@@ -144,6 +149,7 @@ where
fn process(mut self: Pin<&mut Self>, _cx: &mut Context) -> Result<ControlFlow<()>, Error> {
use ControlFlow::*;
trace4!("process");
let mut log_items = Vec::new();
let mut tslows = [None, None];
for (i1, itemopt) in self.items.iter_mut().enumerate() {
if let Some(item) = itemopt {
@@ -168,12 +174,31 @@ where
}
} else {
// the item seems empty.
// TODO count for stats.
trace2!("empty item, something to do here?");
*itemopt = None;
return Ok(Continue(()));
}
}
}
if DO_DETECT_NON_MONO {
if let Some((i1, t1)) = tslows[0].as_ref() {
if *t1 <= self.dim0ix_max {
self.dim0ix_max = *t1;
let item = LogItem {
node_ix: *i1 as _,
level: Level::INFO,
msg: format!(
"dim0ix_max {} vs {} diff {}",
self.dim0ix_max,
t1,
self.dim0ix_max - t1
),
};
log_items.push(item);
}
}
}
trace4!("tslows {tslows:?}");
if let Some((il0, _tl0)) = tslows[0] {
if let Some((_il1, tl1)) = tslows[1] {
@@ -367,7 +392,9 @@ where
let _spg = span1.enter();
loop {
trace3!("poll");
break if self.poll_count == usize::MAX {
break if let Some(item) = self.log_queue.pop_front() {
Ready(Some(Ok(StreamItem::Log(item))))
} else if self.poll_count == usize::MAX {
self.done_range_complete = true;
continue;
} else if self.complete {

View File

@@ -62,14 +62,6 @@ impl CmpZero for u32 {
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AggQuerySingleChannel {
pub channel_config: ChannelConfig,
pub timebin: u32,
pub tb_file_count: u32,
pub buffer_size: u32,
}
pub struct BodyStream {
//pub receiver: async_channel::Receiver<Result<Bytes, Error>>,
pub inner: Box<dyn Stream<Item = Result<Bytes, Error>> + Send + Unpin>,
@@ -766,19 +758,6 @@ pub enum GenVar {
ConstRegular,
}
// TODO move to databuffer-specific crate
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ChannelConfig {
pub channel: Channel,
pub keyspace: u8,
pub time_bin_size: TsNano,
pub scalar_type: ScalarType,
pub compression: bool,
pub shape: Shape,
pub array: bool,
pub byte_order: ByteOrder,
}
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize)]
pub enum ShapeOld {
Scalar,
@@ -2293,12 +2272,19 @@ pub struct ChannelInfo {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChConf {
pub backend: String,
pub series: u64,
pub series: Option<u64>,
pub name: String,
pub scalar_type: ScalarType,
pub shape: Shape,
}
impl ChConf {
pub fn try_series(&self) -> Result<u64, Error> {
self.series
.ok_or_else(|| Error::with_msg_no_trace("ChConf without SeriesId"))
}
}
pub fn f32_close(a: f32, b: f32) -> bool {
if (a - b).abs() < 1e-4 || (a / b > 0.999 && a / b < 1.001) {
true

View File

@@ -15,7 +15,7 @@ pub async fn channel_config(range: NanoRange, channel: Channel, ncc: &NodeConfig
let ret = if channel.name() == "scalar-i32-be" {
let ret = ChConf {
backend,
series: 1,
series: Some(1),
name: channel.name().into(),
scalar_type: ScalarType::I32,
shape: Shape::Scalar,
@@ -24,7 +24,7 @@ pub async fn channel_config(range: NanoRange, channel: Channel, ncc: &NodeConfig
} else if channel.name() == "wave-f64-be-n21" {
let ret = ChConf {
backend,
series: 2,
series: Some(2),
name: channel.name().into(),
scalar_type: ScalarType::F64,
shape: Shape::Wave(21),
@@ -33,7 +33,7 @@ pub async fn channel_config(range: NanoRange, channel: Channel, ncc: &NodeConfig
} else if channel.name() == "const-regular-scalar-i32-be" {
let ret = ChConf {
backend,
series: 3,
series: Some(3),
name: channel.name().into(),
scalar_type: ScalarType::I32,
shape: Shape::Scalar,
@@ -50,7 +50,7 @@ pub async fn channel_config(range: NanoRange, channel: Channel, ncc: &NodeConfig
let ret = if channel.name() == "inmem-d0-i32" {
let ret = ChConf {
backend,
series: 1,
series: Some(1),
name: channel.name().into(),
scalar_type: ScalarType::I32,
shape: Shape::Scalar,
@@ -69,12 +69,16 @@ pub async fn channel_config(range: NanoRange, channel: Channel, ncc: &NodeConfig
.map_err(Error::from)?;
Ok(ret)
} else if ncc.node.sf_databuffer.is_some() {
info!("channel_config BEFORE {channel:?}");
info!("try to get ChConf for sf-databuffer type backend");
// TODO in the future we should not need this:
let channel = sf_databuffer_fetch_channel_by_series(channel, ncc).await?;
let c1 = disk::channelconfig::config(range, channel, ncc).await?;
info!("channel_config AFTER {channel:?}");
let c1 = disk::channelconfig::config(range, channel.clone(), ncc).await?;
info!("channel_config THEN {c1:?}");
let ret = ChConf {
backend: c1.channel.backend,
series: 0,
series: channel.series,
name: c1.channel.name,
scalar_type: c1.scalar_type,
shape: c1.shape,

View File

@@ -14,9 +14,11 @@ use items_2::framable::EventQueryJsonStringFrame;
use items_2::framable::Framable;
use items_2::frame::decode_frame;
use items_2::frame::make_term_frame;
use items_2::inmem::InMemoryFrame;
use netpod::histo::HistoLog2;
use netpod::log::*;
use netpod::AggKind;
use netpod::ChConf;
use netpod::NodeConfigCached;
use netpod::PerfOpts;
use query::api4::events::PlainEventsQuery;
@@ -24,6 +26,7 @@ use std::net::SocketAddr;
use std::pin::Pin;
use streams::frames::inmem::InMemoryFrameAsyncReadStream;
use tokio::io::AsyncWriteExt;
use tokio::net::tcp::OwnedReadHalf;
use tokio::net::tcp::OwnedWriteHalf;
use tokio::net::TcpStream;
use tracing::Instrument;
@@ -63,6 +66,7 @@ impl<E: Into<Error>> From<(E, OwnedWriteHalf)> for ConnErr {
async fn make_channel_events_stream(
evq: PlainEventsQuery,
chconf: ChConf,
node_config: &NodeConfigCached,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>, Error> {
info!("nodenet::conn::make_channel_events_stream");
@@ -100,7 +104,7 @@ async fn make_channel_events_stream(
Ok(Box::pin(stream))
}
} else if let Some(scyconf) = &node_config.node_config.cluster.scylla {
scylla_channel_event_stream(evq, scyconf, node_config).await
scylla_channel_event_stream(evq, chconf, scyconf, node_config).await
} else if let Some(_) = &node_config.node.channel_archiver {
let e = Error::with_msg_no_trace("archapp not built");
Err(e)
@@ -108,17 +112,11 @@ async fn make_channel_events_stream(
let e = Error::with_msg_no_trace("archapp not built");
Err(e)
} else {
Ok(disk::raw::conn::make_event_pipe(&evq, node_config).await?)
Ok(disk::raw::conn::make_event_pipe(&evq, chconf, node_config).await?)
}
}
async fn events_conn_handler_inner_try(
stream: TcpStream,
addr: SocketAddr,
node_config: &NodeConfigCached,
) -> Result<(), ConnErr> {
let _ = addr;
let (netin, mut netout) = stream.into_split();
async fn events_get_input_frames(netin: OwnedReadHalf) -> Result<Vec<InMemoryFrame>, Error> {
warn!("fix magic inmem_bufcap option");
let perf_opts = PerfOpts::default();
let mut h = InMemoryFrameAsyncReadStream::new(netin, perf_opts.inmem_bufcap);
@@ -136,19 +134,26 @@ async fn events_conn_handler_inner_try(
debug!("ignored incoming frame {:?}", item);
}
Err(e) => {
return Err((e, netout).into());
return Err(e);
}
}
}
debug!("events_conn_handler input frames received");
Ok(frames)
}
async fn events_parse_input_query(
frames: Vec<InMemoryFrame>,
ncc: &NodeConfigCached,
) -> Result<(PlainEventsQuery, ChConf), Error> {
if frames.len() != 1 {
error!("{:?}", frames);
error!("missing command frame len {}", frames.len());
return Err((Error::with_msg("missing command frame"), netout).into());
let e = Error::with_msg("missing command frame");
return Err(e);
}
let query_frame = &frames[0];
if query_frame.tyid() != EVENT_QUERY_JSON_STRING_FRAME {
return Err((Error::with_msg("query frame wrong type"), netout).into());
return Err(Error::with_msg("query frame wrong type"));
}
// TODO this does not need all variants of Sitemty.
let qitem = match decode_frame::<Sitemty<EventQueryJsonStringFrame>>(query_frame) {
@@ -156,32 +161,46 @@ async fn events_conn_handler_inner_try(
Ok(k) => match k {
StreamItem::DataItem(k) => match k {
RangeCompletableItem::Data(k) => k,
RangeCompletableItem::RangeComplete => {
return Err((Error::with_msg("bad query item"), netout).into())
}
RangeCompletableItem::RangeComplete => return Err(Error::with_msg("bad query item")),
},
_ => return Err((Error::with_msg("bad query item"), netout).into()),
_ => return Err(Error::with_msg("bad query item")),
},
Err(e) => return Err((e, netout).into()),
Err(e) => return Err(e),
},
Err(e) => return Err((e, netout).into()),
Err(e) => return Err(e),
};
let res: Result<PlainEventsQuery, _> = serde_json::from_str(&qitem.0);
let evq = match res {
Ok(k) => k,
Err(e) => {
error!("json parse error: {:?}", e);
return Err((Error::with_msg("json parse error"), netout).into());
let e = Error::with_msg_no_trace(format!("json parse error: {e}"));
error!("{e}");
return Err(e);
}
};
info!("events_conn_handler_inner_try evq {:?}", evq);
let chconf = crate::channelconfig::channel_config(evq.range().try_into()?, evq.channel().clone(), ncc)
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
Ok((evq, chconf))
}
if evq.channel().name() == "test-do-trigger-main-error" {
let e = Error::with_msg(format!("Test error private message."))
.add_public_msg(format!("Test error PUBLIC message."));
return Err((e, netout).into());
}
async fn events_conn_handler_inner_try(
stream: TcpStream,
addr: SocketAddr,
node_config: &NodeConfigCached,
) -> Result<(), ConnErr> {
let _ = addr;
let (netin, mut netout) = stream.into_split();
let frames = match events_get_input_frames(netin).await {
Ok(x) => x,
Err(e) => return Err((e, netout).into()),
};
debug!("events_conn_handler input frames received");
let (evq, chconf) = match events_parse_input_query(frames, node_config).await {
Ok(x) => x,
Err(e) => return Err((e, netout).into()),
};
let mut stream: Pin<Box<dyn Stream<Item = Box<dyn Framable + Send>> + Send>> = if evq.is_event_blobs() {
// TODO support event blobs as transform
match disk::raw::conn::make_event_blobs_pipe(&evq, node_config).await {
@@ -194,7 +213,7 @@ async fn events_conn_handler_inner_try(
}
}
} else {
match make_channel_events_stream(evq.clone(), node_config).await {
match make_channel_events_stream(evq.clone(), chconf, node_config).await {
Ok(stream) => {
let stream = stream
.map({

View File

@@ -8,6 +8,7 @@ use items_0::Appendable;
use items_0::Empty;
use items_2::channelevents::ChannelEvents;
use netpod::log::*;
use netpod::ChConf;
use netpod::NodeConfigCached;
use netpod::ScyllaConfig;
use query::api4::events::PlainEventsQuery;
@@ -15,6 +16,7 @@ use std::pin::Pin;
pub async fn scylla_channel_event_stream(
evq: PlainEventsQuery,
chconf: ChConf,
scyco: &ScyllaConfig,
node_config: &NodeConfigCached,
) -> Result<Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>>, Error> {
@@ -26,12 +28,12 @@ pub async fn scylla_channel_event_stream(
.await
.map_err(|e| Error::with_msg_no_trace(format!("{e:?}")))?;
let scy = scyllaconn::create_scy_session(scyco).await?;
let series = f.series;
let series = f.try_series()?;
let scalar_type = f.scalar_type;
let shape = f.shape;
let do_test_stream_error = false;
let with_values = evq.need_value_data();
debug!("Make EventsStreamScylla for {series} {scalar_type:?} {shape:?}");
debug!("Make EventsStreamScylla for {series:?} {scalar_type:?} {shape:?}");
let stream = scyllaconn::events::EventsStreamScylla::new(
series,
evq.range().into(),
@@ -43,38 +45,6 @@ pub async fn scylla_channel_event_stream(
do_test_stream_error,
);
let stream = stream
.map({
let is_pulse_id_diff = evq.transform().is_pulse_id_diff();
let mut pulse_last = None;
move |item| match item {
Ok(item) => {
let x = if is_pulse_id_diff {
let x = match item {
ChannelEvents::Events(item) => {
let (tss, pulses) = items_0::EventsNonObj::into_tss_pulses(item);
let mut item = items_2::eventsdim0::EventsDim0::empty();
for (ts, pulse) in tss.into_iter().zip(pulses) {
let value = if let Some(last) = pulse_last {
pulse as i64 - last as i64
} else {
0
};
item.push(ts, pulse, value);
pulse_last = Some(pulse);
}
ChannelEvents::Events(Box::new(item))
}
ChannelEvents::Status(x) => ChannelEvents::Status(x),
};
x
} else {
item
};
Ok(x)
}
Err(e) => Err(e),
}
})
.map(move |item| match &item {
Ok(k) => match k {
ChannelEvents::Events(k) => {

View File

@@ -177,11 +177,35 @@ impl FromUrl for TransformQuery {
impl AppendToUrl for TransformQuery {
fn append_to_url(&self, url: &mut Url) {
warn!("TODO AppendToUrl for Transform");
let upre = Self::url_prefix();
let mut g = url.query_pairs_mut();
if let Some(x) = &Some(123) {
g.append_pair(&format!("{}ArrayPick", upre), &format!("{}", x));
if false {
let upre = Self::url_prefix();
if let Some(x) = &Some(123) {
g.append_pair(&format!("{}ArrayPick", upre), &format!("{}", x));
}
}
let key = "binningScheme";
match &self.event {
EventTransformQuery::EventBlobsVerbatim => {
g.append_pair(key, &format!("{}", "eventBlobs"));
}
EventTransformQuery::EventBlobsUncompressed => {
// TODO
g.append_pair(key, &format!("{}", "eventBlobs"));
}
EventTransformQuery::ValueFull => {
g.append_pair(key, &format!("{}", "fullValue"));
}
EventTransformQuery::ArrayPick(_) => {
// TODO
g.append_pair(key, &format!("{}", "fullValue"));
}
EventTransformQuery::MinMaxAvgDev => {
g.append_pair(key, &format!("{}", "timeWeightedScalar"));
}
EventTransformQuery::PulseIdDiff => {
g.append_pair(key, &format!("{}", "pulseIdDiff"));
}
}
}
}

View File

@@ -1,604 +1 @@
use crate::filechunkread::FileChunkRead;
use crate::needminbuffer::NeedMinBuffer;
use bitshuffle::bitshuffle_decompress;
use bytes::Buf;
use bytes::BytesMut;
use err::Error;
use futures_util::Stream;
use futures_util::StreamExt;
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::StatsItem;
use items_0::streamitem::StreamItem;
use items_0::Empty;
use items_0::WithLen;
use items_2::eventfull::EventFull;
use netpod::histo::HistoLog2;
use netpod::log::*;
use netpod::range::evrange::NanoRange;
use netpod::timeunits::SEC;
use netpod::ByteSize;
use netpod::ChannelConfig;
use netpod::EventDataReadStats;
use netpod::ScalarType;
use netpod::Shape;
use parse::channelconfig::CompressionMethod;
use std::path::PathBuf;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use std::time::Instant;
pub struct EventChunker {
inp: NeedMinBuffer,
state: DataFileState,
need_min: u32,
channel_config: ChannelConfig,
errored: bool,
completed: bool,
range: NanoRange,
stats_conf: EventChunkerConf,
seen_beyond_range: bool,
sent_beyond_range: bool,
data_emit_complete: bool,
final_stats_sent: bool,
parsed_bytes: u64,
dbg_path: PathBuf,
max_ts: u64,
expand: bool,
do_decompress: bool,
decomp_dt_histo: HistoLog2,
item_len_emit_histo: HistoLog2,
seen_before_range_count: usize,
seen_after_range_count: usize,
unordered_warn_count: usize,
repeated_ts_warn_count: usize,
}
impl Drop for EventChunker {
fn drop(&mut self) {
// TODO collect somewhere
debug!(
"EventChunker Drop Stats:\ndecomp_dt_histo: {:?}\nitem_len_emit_histo: {:?}",
self.decomp_dt_histo, self.item_len_emit_histo
);
}
}
enum DataFileState {
FileHeader,
Event,
}
struct ParseResult {
events: EventFull,
parsed_bytes: u64,
}
#[derive(Clone, Debug)]
pub struct EventChunkerConf {
pub disk_stats_every: ByteSize,
}
impl EventChunkerConf {
pub fn new(disk_stats_every: ByteSize) -> Self {
Self { disk_stats_every }
}
}
impl EventChunker {
// TODO `expand` flag usage
pub fn from_start(
inp: Pin<Box<dyn Stream<Item = Result<FileChunkRead, Error>> + Send>>,
channel_config: ChannelConfig,
range: NanoRange,
stats_conf: EventChunkerConf,
dbg_path: PathBuf,
expand: bool,
do_decompress: bool,
) -> Self {
info!("EventChunker::{} do_decompress {}", "from_start", do_decompress);
let mut inp = NeedMinBuffer::new(inp);
inp.set_need_min(6);
Self {
inp,
state: DataFileState::FileHeader,
need_min: 6,
channel_config,
errored: false,
completed: false,
range,
stats_conf,
seen_beyond_range: false,
sent_beyond_range: false,
data_emit_complete: false,
final_stats_sent: false,
parsed_bytes: 0,
dbg_path,
max_ts: 0,
expand,
do_decompress,
decomp_dt_histo: HistoLog2::new(8),
item_len_emit_histo: HistoLog2::new(0),
seen_before_range_count: 0,
seen_after_range_count: 0,
unordered_warn_count: 0,
repeated_ts_warn_count: 0,
}
}
// TODO `expand` flag usage
pub fn from_event_boundary(
inp: Pin<Box<dyn Stream<Item = Result<FileChunkRead, Error>> + Send>>,
channel_config: ChannelConfig,
range: NanoRange,
stats_conf: EventChunkerConf,
dbg_path: PathBuf,
expand: bool,
do_decompress: bool,
) -> Self {
info!(
"EventChunker::{} do_decompress {}",
"from_event_boundary", do_decompress
);
let mut ret = Self::from_start(inp, channel_config, range, stats_conf, dbg_path, expand, do_decompress);
ret.state = DataFileState::Event;
ret.need_min = 4;
ret.inp.set_need_min(4);
ret
}
fn parse_buf(&mut self, buf: &mut BytesMut) -> Result<ParseResult, Error> {
span!(Level::INFO, "EventChunker::parse_buf").in_scope(|| self.parse_buf_inner(buf))
}
fn parse_buf_inner(&mut self, buf: &mut BytesMut) -> Result<ParseResult, Error> {
let mut ret = EventFull::empty();
let mut parsed_bytes = 0;
use byteorder::{ReadBytesExt, BE};
loop {
if (buf.len() as u32) < self.need_min {
break;
}
match self.state {
DataFileState::FileHeader => {
if buf.len() < 6 {
Err(Error::with_msg("need min 6 for FileHeader"))?;
}
let mut sl = std::io::Cursor::new(buf.as_ref());
let fver = sl.read_i16::<BE>().unwrap();
if fver != 0 {
Err(Error::with_msg("unexpected data file version"))?;
}
let len = sl.read_i32::<BE>().unwrap();
if len <= 0 || len >= 128 {
Err(Error::with_msg("large channel header len"))?;
}
let totlen = len as usize + 2;
if buf.len() < totlen {
self.need_min = totlen as u32;
break;
} else {
sl.advance(len as usize - 8);
let len2 = sl.read_i32::<BE>().unwrap();
if len != len2 {
Err(Error::with_msg("channel header len mismatch"))?;
}
String::from_utf8(buf.as_ref()[6..(len as usize + 6 - 8)].to_vec())?;
self.state = DataFileState::Event;
self.need_min = 4;
buf.advance(totlen);
parsed_bytes += totlen as u64;
}
}
DataFileState::Event => {
let p0 = 0;
let mut sl = std::io::Cursor::new(buf.as_ref());
let len = sl.read_i32::<BE>().unwrap();
if len < 20 || len > 1024 * 1024 * 20 {
Err(Error::with_msg("unexpected large event chunk"))?;
}
let len = len as u32;
if (buf.len() as u32) < len {
self.need_min = len as u32;
break;
} else {
let mut sl = std::io::Cursor::new(buf.as_ref());
let len1b = sl.read_i32::<BE>().unwrap();
assert!(len == len1b as u32);
let _ttl = sl.read_i64::<BE>().unwrap();
let ts = sl.read_i64::<BE>().unwrap() as u64;
let pulse = sl.read_i64::<BE>().unwrap() as u64;
if ts == self.max_ts {
if self.repeated_ts_warn_count < 20 {
let msg = format!(
"EventChunker repeated event ts ix {} ts {}.{} max_ts {}.{} config {:?} path {:?}",
self.repeated_ts_warn_count,
ts / SEC,
ts % SEC,
self.max_ts / SEC,
self.max_ts % SEC,
self.channel_config.shape,
self.dbg_path
);
warn!("{}", msg);
self.repeated_ts_warn_count += 1;
}
}
if ts < self.max_ts {
if self.unordered_warn_count < 20 {
let msg = format!(
"EventChunker unordered event ix {} ts {}.{} max_ts {}.{} config {:?} path {:?}",
self.unordered_warn_count,
ts / SEC,
ts % SEC,
self.max_ts / SEC,
self.max_ts % SEC,
self.channel_config.shape,
self.dbg_path
);
warn!("{}", msg);
self.unordered_warn_count += 1;
let e = Error::with_public_msg_no_trace(msg);
return Err(e);
}
}
self.max_ts = ts;
if ts >= self.range.end {
self.seen_after_range_count += 1;
if !self.expand || self.seen_after_range_count >= 2 {
self.seen_beyond_range = true;
self.data_emit_complete = true;
break;
}
}
if ts < self.range.beg {
self.seen_before_range_count += 1;
if self.seen_before_range_count > 1 {
let msg = format!(
"seen before range: event ts {}.{} range beg {}.{} range end {}.{} pulse {} config {:?} path {:?}",
ts / SEC,
ts % SEC,
self.range.beg / SEC,
self.range.beg % SEC,
self.range.end / SEC,
self.range.end % SEC,
pulse,
self.channel_config.shape,
self.dbg_path
);
warn!("{}", msg);
let e = Error::with_public_msg(msg);
Err(e)?;
}
}
let _ioc_ts = sl.read_i64::<BE>().unwrap();
let status = sl.read_i8().unwrap();
let severity = sl.read_i8().unwrap();
let optional = sl.read_i32::<BE>().unwrap();
if status != 0 {
Err(Error::with_msg(format!("status != 0: {}", status)))?;
}
if severity != 0 {
Err(Error::with_msg(format!("severity != 0: {}", severity)))?;
}
if optional != -1 {
Err(Error::with_msg(format!("optional != -1: {}", optional)))?;
}
let type_flags = sl.read_u8().unwrap();
let type_index = sl.read_u8().unwrap();
if type_index > 13 {
Err(Error::with_msg(format!("type_index: {}", type_index)))?;
}
let scalar_type = ScalarType::from_dtype_index(type_index)?;
use super::dtflags::*;
let is_compressed = type_flags & COMPRESSION != 0;
let is_array = type_flags & ARRAY != 0;
let is_big_endian = type_flags & BIG_ENDIAN != 0;
let is_shaped = type_flags & SHAPE != 0;
if let Shape::Wave(_) = self.channel_config.shape {
if !is_array {
Err(Error::with_msg(format!("dim1 but not array {:?}", self.channel_config)))?;
}
}
let compression_method = if is_compressed { sl.read_u8().unwrap() } else { 0 };
let shape_dim = if is_shaped { sl.read_u8().unwrap() } else { 0 };
assert!(compression_method <= 0);
assert!(!is_shaped || (shape_dim >= 1 && shape_dim <= 2));
let mut shape_lens = [0, 0, 0, 0];
for i1 in 0..shape_dim {
shape_lens[i1 as usize] = sl.read_u32::<BE>().unwrap();
}
let shape_this = {
if is_shaped {
if shape_dim == 1 {
Shape::Wave(shape_lens[0])
} else if shape_dim == 2 {
Shape::Image(shape_lens[0], shape_lens[1])
} else {
err::todoval()
}
} else {
Shape::Scalar
}
};
let comp_this = if is_compressed {
if compression_method == 0 {
Some(CompressionMethod::BitshuffleLZ4)
} else {
err::todoval()
}
} else {
None
};
let p1 = sl.position();
let k1 = len as u64 - (p1 - p0) - 4;
if is_compressed {
//debug!("event ts {} is_compressed {}", ts, is_compressed);
let value_bytes = sl.read_u64::<BE>().unwrap();
let block_size = sl.read_u32::<BE>().unwrap();
//debug!("event len {} ts {} is_compressed {} shape_dim {} len-dim-0 {} value_bytes {} block_size {}", len, ts, is_compressed, shape_dim, shape_lens[0], value_bytes, block_size);
match self.channel_config.shape {
Shape::Scalar => {
assert!(value_bytes < 1024 * 1);
}
Shape::Wave(_) => {
assert!(value_bytes < 1024 * 64);
}
Shape::Image(_, _) => {
assert!(value_bytes < 1024 * 1024 * 20);
}
}
assert!(block_size <= 1024 * 32);
let type_size = scalar_type.bytes() as u32;
let ele_count = value_bytes / type_size as u64;
let ele_size = type_size;
match self.channel_config.shape {
Shape::Scalar => {
if is_array {
Err(Error::with_msg(format!(
"ChannelConfig expects Scalar but we find event is_array"
)))?;
}
}
Shape::Wave(dim1count) => {
if dim1count != ele_count as u32 {
Err(Error::with_msg(format!(
"ChannelConfig expects {:?} but event has ele_count {}",
self.channel_config.shape, ele_count,
)))?;
}
}
Shape::Image(n1, n2) => {
let nt = n1 as usize * n2 as usize;
if nt != ele_count as usize {
Err(Error::with_msg(format!(
"ChannelConfig expects {:?} but event has ele_count {}",
self.channel_config.shape, ele_count,
)))?;
}
}
}
let data = &buf.as_ref()[(p1 as usize)..(p1 as usize + k1 as usize)];
let decomp = {
if self.do_decompress {
assert!(data.len() > 12);
let ts1 = Instant::now();
let decomp_bytes = (type_size * ele_count as u32) as usize;
let mut decomp = vec![0; decomp_bytes];
// TODO limit the buf slice range
match bitshuffle_decompress(
&data[12..],
&mut decomp,
ele_count as usize,
ele_size as usize,
0,
) {
Ok(c1) => {
assert!(c1 as u64 + 12 == k1);
let ts2 = Instant::now();
let dt = ts2.duration_since(ts1);
// TODO analyze the histo
self.decomp_dt_histo.ingest(dt.as_secs() as u32 + dt.subsec_micros());
Some(decomp)
}
Err(e) => {
return Err(Error::with_msg(format!("decompression failed {:?}", e)))?;
}
}
} else {
None
}
};
ret.add_event(
ts,
pulse,
Some(data.to_vec()),
decomp,
ScalarType::from_dtype_index(type_index)?,
is_big_endian,
shape_this,
comp_this,
);
} else {
if len < p1 as u32 + 4 {
let msg = format!("uncomp len: {} p1: {}", len, p1);
Err(Error::with_msg(msg))?;
}
let vlen = len - p1 as u32 - 4;
let data = &buf[p1 as usize..(p1 as u32 + vlen) as usize];
ret.add_event(
ts,
pulse,
Some(data.to_vec()),
Some(data.to_vec()),
ScalarType::from_dtype_index(type_index)?,
is_big_endian,
shape_this,
comp_this,
);
}
buf.advance(len as usize);
parsed_bytes += len as u64;
self.need_min = 4;
}
}
}
}
Ok(ParseResult {
events: ret,
parsed_bytes,
})
}
}
impl Stream for EventChunker {
type Item = Result<StreamItem<RangeCompletableItem<EventFull>>, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
use Poll::*;
'outer: loop {
break if self.completed {
panic!("EventChunker poll_next on completed");
} else if self.errored {
self.completed = true;
Ready(None)
} else if self.parsed_bytes >= self.stats_conf.disk_stats_every.bytes() as u64 {
let item = EventDataReadStats {
parsed_bytes: self.parsed_bytes,
};
self.parsed_bytes = 0;
let ret = StreamItem::Stats(StatsItem::EventDataReadStats(item));
Ready(Some(Ok(ret)))
} else if self.sent_beyond_range {
self.completed = true;
Ready(None)
} else if self.final_stats_sent {
self.sent_beyond_range = true;
trace!("sent_beyond_range");
if self.seen_beyond_range {
trace!("sent_beyond_range RangeComplete");
Ready(Some(Ok(StreamItem::DataItem(RangeCompletableItem::RangeComplete))))
} else {
trace!("sent_beyond_range non-complete");
continue 'outer;
}
} else if self.data_emit_complete {
let item = EventDataReadStats {
parsed_bytes: self.parsed_bytes,
};
self.parsed_bytes = 0;
let ret = StreamItem::Stats(StatsItem::EventDataReadStats(item));
self.final_stats_sent = true;
Ready(Some(Ok(ret)))
} else {
match self.inp.poll_next_unpin(cx) {
Ready(Some(Ok(mut fcr))) => {
if false {
// TODO collect for stats:
info!(
"file read bytes {} ms {}",
fcr.buf().len(),
fcr.duration().as_millis()
);
}
let r = self.parse_buf(fcr.buf_mut());
match r {
Ok(res) => {
self.parsed_bytes += res.parsed_bytes;
if fcr.buf().len() > 0 {
// TODO gather stats about this:
self.inp.put_back(fcr);
}
match self.channel_config.shape {
Shape::Scalar => {
if self.need_min > 1024 * 8 {
let msg =
format!("spurious EventChunker asks for need_min {}", self.need_min);
self.errored = true;
return Ready(Some(Err(Error::with_msg(msg))));
}
}
Shape::Wave(_) => {
if self.need_min > 1024 * 32 {
let msg =
format!("spurious EventChunker asks for need_min {}", self.need_min);
self.errored = true;
return Ready(Some(Err(Error::with_msg(msg))));
}
}
Shape::Image(_, _) => {
if self.need_min > 1024 * 1024 * 20 {
let msg =
format!("spurious EventChunker asks for need_min {}", self.need_min);
self.errored = true;
return Ready(Some(Err(Error::with_msg(msg))));
}
}
}
let x = self.need_min;
self.inp.set_need_min(x);
if false {
info!(
"EventChunker emits {} events tss {:?}",
res.events.len(),
res.events.tss
);
};
self.item_len_emit_histo.ingest(res.events.len() as u32);
let ret = StreamItem::DataItem(RangeCompletableItem::Data(res.events));
Ready(Some(Ok(ret)))
}
Err(e) => {
self.errored = true;
Ready(Some(Err(e.into())))
}
}
}
Ready(Some(Err(e))) => {
self.errored = true;
Ready(Some(Err(e)))
}
Ready(None) => {
self.data_emit_complete = true;
continue 'outer;
}
Pending => Pending,
}
};
}
}
}
#[cfg(test)]
mod test {
//use err::Error;
//use netpod::timeunits::*;
//use netpod::{ByteSize, Nanos};
/*
#[test]
fn read_expanded_for_range(range: netpod::NanoRange, nodeix: usize) -> Result<(usize, usize), Error> {
let chn = netpod::Channel {
backend: "testbackend".into(),
name: "scalar-i32-be".into(),
};
// TODO read config from disk.
let channel_config = ChannelConfig {
channel: chn,
keyspace: 2,
time_bin_size: Nanos { ns: DAY },
scalar_type: netpod::ScalarType::I32,
byte_order: netpod::ByteOrder::big_endian(),
shape: netpod::Shape::Scalar,
array: false,
compression: false,
};
let cluster = taskrun::test_cluster();
let node = cluster.nodes[nodeix].clone();
let buffer_size = 512;
let event_chunker_conf = EventChunkerConf {
disk_stats_every: ByteSize::kb(1024),
};
}
*/
}

View File

@@ -56,9 +56,9 @@ pub async fn plain_events_json(evq: &PlainEventsQuery, chconf: &ChConf, cluster:
use items_0::streamitem::Sitemty;
use std::pin::Pin;
let stream: Pin<Box<dyn Stream<Item = Sitemty<ChannelEvents>> + Send>> = if evq.transform().is_pulse_id_diff() {
Box::pin(stream.map(|item| {
let mut pulse_last = None;
on_sitemty_data!(item, move |item| {
let mut pulse_last = None;
Box::pin(stream.map(move |item| {
on_sitemty_data!(item, |item| {
use items_0::streamitem::RangeCompletableItem;
use items_0::streamitem::StreamItem;
use items_0::Appendable;